Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support clearing offline agent instances #165

Merged
merged 2 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/content.en/docs/release-notes/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Information about release notes of INFINI Console is provided here.
- Enhance LDAP authentication logging (#156)
- Optimize UI for copying metric requests (#155)
- Enhance deletion tips by adding cluster info for indices
- Retain a single instance when registering duplicate endpoints (#163)
- Support clearing offline agent instances (#165)

## 1.28.2 (2025-02-15)

Expand Down
2 changes: 1 addition & 1 deletion docs/content.zh/docs/release-notes/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ title: "版本历史"
- 增强 LDAP 身份验证的日志记录 (#156)
- 优化监控报表里拷贝指标请求的 UI (#155)
- 删除索引提示增加集群信息 (#162)
- 自动注册实例时相同 endpoint 的实例不再重复注册 (#163)

## 1.28.2 (2025-02-15)

### Features
- 告警功能支持根据桶之间文档数差值和内容差异告警 (#119)
- 当使用 Easysearch 存储指标时,增加 Rollup 索引生命周期 (#128)
- 增加集群指标采集模式变更事件 (#152)
- 支持清理离线 Agent 实例(#165)

### Bug fix
- 修复 Insight API 处理多时间序列数据时数据丢失的问题 (#127)
Expand Down
194 changes: 169 additions & 25 deletions plugin/managed/server/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ package server
import (
"context"
"fmt"
"infini.sh/framework/core/event"
"infini.sh/framework/core/global"
"infini.sh/framework/core/task"
"net/http"
"strconv"
"strings"
Expand Down Expand Up @@ -76,6 +79,8 @@ func init() {

//try to connect to instance
api.HandleAPIMethod(api.POST, "/instance/try_connect", handler.RequireLogin(handler.tryConnect))
//clear instance that is not alive in 7 days
api.HandleAPIMethod(api.POST, "/instance/_clear", handler.RequirePermission(handler.clearInstance, enum.PermissionGatewayInstanceWrite))

}

Expand All @@ -96,30 +101,7 @@ func (h APIHandler) registerInstance(w http.ResponseWriter, req *http.Request, p
oldInst.ID = obj.ID
exists, err := orm.Get(oldInst)
if exists {
errMsg := fmt.Sprintf("agent [%s] already exists", obj.ID)
h.WriteError(w, errMsg, http.StatusInternalServerError)
return
}
err, result := orm.GetBy("endpoint", obj.Endpoint, oldInst)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
if len(result.Result) > 0 {
buf := util.MustToJSONBytes(result.Result[0])
util.MustFromJSONBytes(buf, &oldInst)
if oldInst.ID != "" {
//keep old created time
obj.Created = oldInst.Created
log.Infof("remove old instance [%s] with the same endpoint %s", oldInst.ID, oldInst.Endpoint)
err = orm.Delete(nil, oldInst)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
}
obj.Created = oldInst.Created
}
err = orm.Save(nil, obj)
if err != nil {
Expand Down Expand Up @@ -394,6 +376,168 @@ func (h *APIHandler) getInstanceStatus(w http.ResponseWriter, req *http.Request,
}
h.WriteJSON(w, result, http.StatusOK)
}
func (h *APIHandler) clearInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
appName := h.GetParameterOrDefault(req, "app_name", "")
task.RunWithinGroup("clear_instance", func(ctx context.Context) error {
err := h.clearInstanceByAppName(appName)
if err != nil {
log.Error(err)
}
return err
})
h.WriteAckOKJSON(w)
}

func (h *APIHandler) clearInstanceByAppName(appName string) error {
var (
size = 100
from = 0
)
// Paginated query for all running instances
q := orm.Query{
Size: size,
From: from,
}
if appName != "" {
q.Conds = orm.And(
orm.Eq("application.name", appName),
)
}
q.AddSort("created", orm.ASC)
insts := []model.Instance{}
var (
instanceIDs []string
toRemoveIDs []string
instsCache = map[string]*model.Instance{}
)
client := elastic2.GetClient(global.MustLookupString(elastic2.GlobalSystemElasticsearchID))
for {
err, _ := orm.SearchWithJSONMapper(&insts, &q)
if err != nil {
return err
}
for _, inst := range insts {
instanceIDs = append(instanceIDs, inst.ID)
instsCache[inst.ID] = &inst
}
if len(instanceIDs) == 0 {
break
}
aliveInstanceIDs, err := getAliveInstanceIDs(client, instanceIDs)
if err != nil {
return err
}
for _, instanceID := range instanceIDs {
if _, ok := aliveInstanceIDs[instanceID]; !ok {
toRemoveIDs = append(toRemoveIDs, instanceID)
}
}
if len(toRemoveIDs) > 0 {
// Use the same slice to avoid extra allocation
filteredIDs := toRemoveIDs[:0]
// check whether the instance is still online
for _, instanceID := range toRemoveIDs {
if inst, ok := instsCache[instanceID]; ok {
_, err = h.getInstanceInfo(inst.Endpoint, inst.BasicAuth)
if err == nil {
// Skip online instance, do not append to filtered list
continue
}
}
// Keep only offline instances
filteredIDs = append(filteredIDs, instanceID)
}

// Assign back after filtering
toRemoveIDs = filteredIDs
query := util.MapStr{
"query": util.MapStr{
"terms": util.MapStr{
"id": toRemoveIDs,
},
},
}
// remove instances
err = orm.DeleteBy(model.Instance{}, util.MustToJSONBytes(query))
if err != nil {
return fmt.Errorf("failed to delete instance: %w", err)
}
// remove instance related data
query = util.MapStr{
"query": util.MapStr{
"terms": util.MapStr{
"metadata.labels.agent_id": toRemoveIDs,
},
},
}
err = orm.DeleteBy(model.Setting{}, util.MustToJSONBytes(query))
}

// Exit loop when the number of returned records is less than the page size
if len(insts) <= size {
break
}
// Reset instance state for the next iteration
insts = []model.Instance{}
toRemoveIDs = nil
instsCache = make(map[string]*model.Instance)
q.From += size
}
return nil
}

func getAliveInstanceIDs(client elastic2.API, instanceIDs []string) (map[string]struct{}, error) {
query := util.MapStr{
"size": 0,
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"terms": util.MapStr{
"agent.id": instanceIDs,
},
},
{
"range": util.MapStr{
"timestamp": util.MapStr{
"gt": "now-7d",
},
},
},
},
},
},
"aggs": util.MapStr{
"grp_agent_id": util.MapStr{
"terms": util.MapStr{
"field": "agent.id",
},
"aggs": util.MapStr{
"count": util.MapStr{
"value_count": util.MapStr{
"field": "agent.id",
},
},
},
},
},
}
queryDSL := util.MustToJSONBytes(query)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
response, err := client.QueryDSL(ctx, orm.GetWildcardIndexName(event.Event{}), nil, queryDSL)
if err != nil {
return nil, err
}
ret := map[string]struct{}{}
for _, bk := range response.Aggregations["grp_agent_id"].Buckets {
key := bk["key"].(string)
if bk["doc_count"].(float64) > 0 {
ret[key] = struct{}{}
}
}
return ret, nil
}

func (h *APIHandler) proxy(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
var (
Expand Down Expand Up @@ -442,7 +586,7 @@ func (h *APIHandler) getInstanceInfo(endpoint string, basicAuth *model.BasicAuth
obj := &model.Instance{}
_, err := ProxyAgentRequest("runtime", endpoint, req1, obj)
if err != nil {
panic(err)
return nil, err
}
return obj, err

Expand Down
3 changes: 3 additions & 0 deletions web/src/locales/en-US/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ export default {

"agent.label.agent_credential": "Agent Credential",
"agent.credential.tip": "No credential required",
"agent.instance.clear.title": "Clear Offline Instances",
"agent.instance.clear.modal.title": "Are you sure you want to clear offline instances?",
"agent.instance.clear.modal.desc": "This operation will delete offline instances that have not reported metrics for 7 days."
};
3 changes: 3 additions & 0 deletions web/src/locales/zh-CN/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,7 @@ export default {

"agent.label.agent_credential": "代理凭据",
"agent.credential.tip": "不需要凭据",
"agent.instance.clear.title": "清理离线实例",
"agent.instance.clear.modal.title": "您确定要清理离线实例?",
"agent.instance.clear.modal.desc": "该操作将会删除离线并且 7 天没有上报指标的实例"
};
36 changes: 35 additions & 1 deletion web/src/pages/Agent/Instance/index.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,37 @@ const AgentList = (props) => {
}
};

const [clearLoading, setClearLoading] = useState(false)
const onClearClick = async ()=>{
setClearLoading(true);
const statusRes = await request(`/instance/_clear`, {
method: "POST",
queryParams: {
"app_name": "agent",
},
});
if(statusRes && statusRes.acknowledged){
message.success("submit successfully");
}
setClearLoading(false);
}
const showClearConfirm = useCallback(() => {
Modal.confirm({
title: formatMessage({ id: "agent.instance.clear.modal.title" }),
content: (
<>
<div>{formatMessage({ id: "agent.instance.clear.modal.desc" })}</div>
</>
),
okText: "Yes",
okType: "danger",
cancelText: "No",
onOk() {
onClearClick();
},
});
}, []);

return (
<PageHeaderWrapper>
<Card>
Expand All @@ -390,7 +421,7 @@ const AgentList = (props) => {
marginBottom: 15,
}}
>
<div style={{ maxWidth: 500, flex: "1 1 auto" }}>
<div style={{ maxWidth: 450, flex: "1 1 auto" }}>
<Search
allowClear
placeholder="Type keyword to search"
Expand All @@ -413,6 +444,9 @@ const AgentList = (props) => {
{
hasAuthority("agent.instance:all") && (
<>
<Button loading={clearLoading} onClick={showClearConfirm}>
{formatMessage({ id: "agent.instance.clear.title" })}
</Button>
<Button
type="primary"
onClick={() => {
Expand Down
Loading