@@ -30,6 +30,9 @@ package server
30
30
import (
31
31
"context"
32
32
"fmt"
33
+ "infini.sh/framework/core/event"
34
+ "infini.sh/framework/core/global"
35
+ "infini.sh/framework/core/task"
33
36
"net/http"
34
37
"strconv"
35
38
"strings"
@@ -76,6 +79,8 @@ func init() {
76
79
77
80
//try to connect to instance
78
81
api .HandleAPIMethod (api .POST , "/instance/try_connect" , handler .RequireLogin (handler .tryConnect ))
82
+ //clear instance that is not alive in 7 days
83
+ api .HandleAPIMethod (api .POST , "/instance/_clear" , handler .RequirePermission (handler .clearInstance , enum .PermissionGatewayInstanceWrite ))
79
84
80
85
}
81
86
@@ -96,30 +101,7 @@ func (h APIHandler) registerInstance(w http.ResponseWriter, req *http.Request, p
96
101
oldInst .ID = obj .ID
97
102
exists , err := orm .Get (oldInst )
98
103
if exists {
99
- errMsg := fmt .Sprintf ("agent [%s] already exists" , obj .ID )
100
- h .WriteError (w , errMsg , http .StatusInternalServerError )
101
- return
102
- }
103
- err , result := orm .GetBy ("endpoint" , obj .Endpoint , oldInst )
104
- if err != nil {
105
- log .Error (err )
106
- h .WriteError (w , err .Error (), http .StatusInternalServerError )
107
- return
108
- }
109
- if len (result .Result ) > 0 {
110
- buf := util .MustToJSONBytes (result .Result [0 ])
111
- util .MustFromJSONBytes (buf , & oldInst )
112
- if oldInst .ID != "" {
113
- //keep old created time
114
- obj .Created = oldInst .Created
115
- log .Infof ("remove old instance [%s] with the same endpoint %s" , oldInst .ID , oldInst .Endpoint )
116
- err = orm .Delete (nil , oldInst )
117
- if err != nil {
118
- log .Error (err )
119
- h .WriteError (w , err .Error (), http .StatusInternalServerError )
120
- return
121
- }
122
- }
104
+ obj .Created = oldInst .Created
123
105
}
124
106
err = orm .Save (nil , obj )
125
107
if err != nil {
@@ -394,6 +376,168 @@ func (h *APIHandler) getInstanceStatus(w http.ResponseWriter, req *http.Request,
394
376
}
395
377
h .WriteJSON (w , result , http .StatusOK )
396
378
}
379
+ func (h * APIHandler ) clearInstance (w http.ResponseWriter , req * http.Request , ps httprouter.Params ) {
380
+ appName := h .GetParameterOrDefault (req , "app_name" , "" )
381
+ task .RunWithinGroup ("clear_instance" , func (ctx context.Context ) error {
382
+ err := h .clearInstanceByAppName (appName )
383
+ if err != nil {
384
+ log .Error (err )
385
+ }
386
+ return err
387
+ })
388
+ h .WriteAckOKJSON (w )
389
+ }
390
+
391
+ func (h * APIHandler ) clearInstanceByAppName (appName string ) error {
392
+ var (
393
+ size = 100
394
+ from = 0
395
+ )
396
+ // Paginated query for all running instances
397
+ q := orm.Query {
398
+ Size : size ,
399
+ From : from ,
400
+ }
401
+ if appName != "" {
402
+ q .Conds = orm .And (
403
+ orm .Eq ("application.name" , appName ),
404
+ )
405
+ }
406
+ q .AddSort ("created" , orm .ASC )
407
+ insts := []model.Instance {}
408
+ var (
409
+ instanceIDs []string
410
+ toRemoveIDs []string
411
+ instsCache = map [string ]* model.Instance {}
412
+ )
413
+ client := elastic2 .GetClient (global .MustLookupString (elastic2 .GlobalSystemElasticsearchID ))
414
+ for {
415
+ err , _ := orm .SearchWithJSONMapper (& insts , & q )
416
+ if err != nil {
417
+ return err
418
+ }
419
+ for _ , inst := range insts {
420
+ instanceIDs = append (instanceIDs , inst .ID )
421
+ instsCache [inst .ID ] = & inst
422
+ }
423
+ if len (instanceIDs ) == 0 {
424
+ break
425
+ }
426
+ aliveInstanceIDs , err := getAliveInstanceIDs (client , instanceIDs )
427
+ if err != nil {
428
+ return err
429
+ }
430
+ for _ , instanceID := range instanceIDs {
431
+ if _ , ok := aliveInstanceIDs [instanceID ]; ! ok {
432
+ toRemoveIDs = append (toRemoveIDs , instanceID )
433
+ }
434
+ }
435
+ if len (toRemoveIDs ) > 0 {
436
+ // Use the same slice to avoid extra allocation
437
+ filteredIDs := toRemoveIDs [:0 ]
438
+ // check whether the instance is still online
439
+ for _ , instanceID := range toRemoveIDs {
440
+ if inst , ok := instsCache [instanceID ]; ok {
441
+ _ , err = h .getInstanceInfo (inst .Endpoint , inst .BasicAuth )
442
+ if err == nil {
443
+ // Skip online instance, do not append to filtered list
444
+ continue
445
+ }
446
+ }
447
+ // Keep only offline instances
448
+ filteredIDs = append (filteredIDs , instanceID )
449
+ }
450
+
451
+ // Assign back after filtering
452
+ toRemoveIDs = filteredIDs
453
+ query := util.MapStr {
454
+ "query" : util.MapStr {
455
+ "terms" : util.MapStr {
456
+ "id" : toRemoveIDs ,
457
+ },
458
+ },
459
+ }
460
+ // remove instances
461
+ err = orm .DeleteBy (model.Instance {}, util .MustToJSONBytes (query ))
462
+ if err != nil {
463
+ return fmt .Errorf ("failed to delete instance: %w" , err )
464
+ }
465
+ // remove instance related data
466
+ query = util.MapStr {
467
+ "query" : util.MapStr {
468
+ "terms" : util.MapStr {
469
+ "metadata.labels.agent_id" : toRemoveIDs ,
470
+ },
471
+ },
472
+ }
473
+ err = orm .DeleteBy (model.Setting {}, util .MustToJSONBytes (query ))
474
+ }
475
+
476
+ // Exit loop when the number of returned records is less than the page size
477
+ if len (insts ) <= size {
478
+ break
479
+ }
480
+ // Reset instance state for the next iteration
481
+ insts = []model.Instance {}
482
+ toRemoveIDs = nil
483
+ instsCache = make (map [string ]* model.Instance )
484
+ q .From += size
485
+ }
486
+ return nil
487
+ }
488
+
489
+ func getAliveInstanceIDs (client elastic2.API , instanceIDs []string ) (map [string ]struct {}, error ) {
490
+ query := util.MapStr {
491
+ "size" : 0 ,
492
+ "query" : util.MapStr {
493
+ "bool" : util.MapStr {
494
+ "must" : []util.MapStr {
495
+ {
496
+ "terms" : util.MapStr {
497
+ "agent.id" : instanceIDs ,
498
+ },
499
+ },
500
+ {
501
+ "range" : util.MapStr {
502
+ "timestamp" : util.MapStr {
503
+ "gt" : "now-7d" ,
504
+ },
505
+ },
506
+ },
507
+ },
508
+ },
509
+ },
510
+ "aggs" : util.MapStr {
511
+ "grp_agent_id" : util.MapStr {
512
+ "terms" : util.MapStr {
513
+ "field" : "agent.id" ,
514
+ },
515
+ "aggs" : util.MapStr {
516
+ "count" : util.MapStr {
517
+ "value_count" : util.MapStr {
518
+ "field" : "agent.id" ,
519
+ },
520
+ },
521
+ },
522
+ },
523
+ },
524
+ }
525
+ queryDSL := util .MustToJSONBytes (query )
526
+ ctx , cancel := context .WithTimeout (context .Background (), time .Second * 10 )
527
+ defer cancel ()
528
+ response , err := client .QueryDSL (ctx , orm .GetWildcardIndexName (event.Event {}), nil , queryDSL )
529
+ if err != nil {
530
+ return nil , err
531
+ }
532
+ ret := map [string ]struct {}{}
533
+ for _ , bk := range response .Aggregations ["grp_agent_id" ].Buckets {
534
+ key := bk ["key" ].(string )
535
+ if bk ["doc_count" ].(float64 ) > 0 {
536
+ ret [key ] = struct {}{}
537
+ }
538
+ }
539
+ return ret , nil
540
+ }
397
541
398
542
func (h * APIHandler ) proxy (w http.ResponseWriter , req * http.Request , ps httprouter.Params ) {
399
543
var (
@@ -442,7 +586,7 @@ func (h *APIHandler) getInstanceInfo(endpoint string, basicAuth *model.BasicAuth
442
586
obj := & model.Instance {}
443
587
_ , err := ProxyAgentRequest ("runtime" , endpoint , req1 , obj )
444
588
if err != nil {
445
- panic ( err )
589
+ return nil , err
446
590
}
447
591
return obj , err
448
592
0 commit comments