Skip to content

Commit 5a02966

Browse files
authored
feat: add getting cluster collection stats api (#27)
* fix: empty index metrics in overview info api when collecting mode is agent * feat: add getting cluster monitor state api * chore: updating monitor state api * chore: custom request timeout error with quering metrics
1 parent 020c8aa commit 5a02966

18 files changed

+390
-117
lines changed

core/auth.go

+16
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@
2424
package core
2525

2626
import (
27+
"context"
28+
"errors"
29+
"fmt"
30+
cerr "infini.sh/console/core/errors"
2731
"infini.sh/console/core/security"
2832
"infini.sh/framework/core/api"
2933
httprouter "infini.sh/framework/core/api/router"
@@ -37,6 +41,18 @@ type Handler struct {
3741
api.Handler
3842
}
3943

44+
func (handler Handler) WriteError(w http.ResponseWriter, err interface{}, status int) {
45+
if v, ok := err.(error); ok {
46+
if errors.Is(v, context.DeadlineExceeded) {
47+
handler.Handler.WriteError(w, cerr.New(cerr.ErrTypeRequestTimeout, "", err).Error(), status)
48+
return
49+
}
50+
handler.Handler.WriteError(w, v.Error(), status)
51+
return
52+
}
53+
handler.Handler.WriteError(w, fmt.Sprintf("%v", err), status)
54+
}
55+
4056
func (handler Handler) RequireLogin(h httprouter.Handle) httprouter.Handle {
4157
return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
4258

core/errors/errors.go

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright (C) INFINI Labs & INFINI LIMITED.
2+
//
3+
// The INFINI Console is offered under the GNU Affero General Public License v3.0
4+
// and as commercial software.
5+
//
6+
// For commercial licensing, contact us at:
7+
// - Website: infinilabs.com
8+
// - Email: [email protected]
9+
//
10+
// Open Source licensed under AGPL V3:
11+
// This program is free software: you can redistribute it and/or modify
12+
// it under the terms of the GNU Affero General Public License as published by
13+
// the Free Software Foundation, either version 3 of the License, or
14+
// (at your option) any later version.
15+
//
16+
// This program is distributed in the hope that it will be useful,
17+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
18+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19+
// GNU Affero General Public License for more details.
20+
//
21+
// You should have received a copy of the GNU Affero General Public License
22+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
23+
24+
package errors
25+
26+
import (
27+
"fmt"
28+
"infini.sh/framework/core/errors"
29+
)
30+
31+
const (
32+
ErrTypeRequestParams = "request_params_error"
33+
ErrTypeApplication = "application_error"
34+
ErrTypeAlreadyExists = "already_exists_error"
35+
ErrTypeNotExists = "not_exists_error"
36+
ErrTypeIncorrectPassword = "incorrect_password_error"
37+
ErrTypeDomainPrefixMismatch = "domain_prefix_mismatch_error"
38+
ErrTypeDisabled = "disabled_error"
39+
ErrTypeRequestTimeout = "request_timeout_error"
40+
)
41+
42+
var (
43+
ErrPasswordIncorrect = errors.New("incorrect password")
44+
ErrNotExistsErr = errors.New("not exists")
45+
)
46+
47+
type Error struct {
48+
typ string
49+
msg interface{}
50+
field string
51+
}
52+
53+
func (err Error) Error() string {
54+
return fmt.Sprintf("%s:%v: %v", err.typ, err.field, err.msg)
55+
}
56+
57+
//NewAppError returns an application error
58+
func NewAppError(msg any) *Error {
59+
return New(ErrTypeApplication, "", msg)
60+
}
61+
62+
//NewParamsError returns a request params error
63+
func NewParamsError(field string, msg any) *Error {
64+
return New(ErrTypeRequestParams, field, msg)
65+
}
66+
67+
//NewAlreadyExistsError returns an already exists error
68+
func NewAlreadyExistsError(field string, msg any) *Error {
69+
return New(ErrTypeAlreadyExists, field, msg)
70+
}
71+
72+
//NewNotExistsError returns a not exists error
73+
func NewNotExistsError(field string, msg any) *Error {
74+
return New(ErrTypeNotExists, field, msg)
75+
}
76+
77+
func New(typ string, field string, msg any) *Error {
78+
return &Error{
79+
typ,
80+
msg,
81+
field,
82+
}
83+
}

modules/elastic/api/cluster_overview.go

+96-3
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ package api
2525

2626
import (
2727
"context"
28+
"errors"
2829
"fmt"
30+
cerr "infini.sh/console/core/errors"
2931
"infini.sh/framework/modules/elastic/adapter"
3032
"net/http"
3133
"strings"
@@ -263,7 +265,16 @@ func (h *APIHandler) FetchClusterInfo(w http.ResponseWriter, req *http.Request,
263265
}
264266
ctx, cancel := context.WithTimeout(context.Background(), du)
265267
defer cancel()
266-
indexMetrics := h.getMetrics(ctx, query, indexMetricItems, bucketSize)
268+
indexMetrics, err := h.getMetrics(ctx, query, indexMetricItems, bucketSize)
269+
if err != nil {
270+
log.Error(err)
271+
if errors.Is(err, context.DeadlineExceeded) {
272+
h.WriteError(w, cerr.New(cerr.ErrTypeRequestTimeout, "", err).Error(), http.StatusRequestTimeout)
273+
return
274+
}
275+
h.WriteError(w, err.Error(), http.StatusInternalServerError)
276+
return
277+
}
267278
indexingMetricData := util.MapStr{}
268279
for _, line := range indexMetrics["cluster_indexing"].Lines {
269280
// remove first metric dot
@@ -738,7 +749,7 @@ func (h *APIHandler) GetRealtimeClusterNodes(w http.ResponseWriter, req *http.Re
738749

739750
func (h *APIHandler) GetClusterIndices(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
740751
id := ps.ByName("id")
741-
if GetMonitorState(id) == Console {
752+
if GetMonitorState(id) == elastic.ModeAgentless {
742753
h.APIHandler.GetClusterIndices(w, req, ps)
743754
return
744755
}
@@ -774,7 +785,7 @@ func (h *APIHandler) GetClusterIndices(w http.ResponseWriter, req *http.Request,
774785
func (h *APIHandler) GetRealtimeClusterIndices(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
775786
resBody := map[string]interface{}{}
776787
id := ps.ByName("id")
777-
if GetMonitorState(id) == Console {
788+
if GetMonitorState(id) == elastic.ModeAgentless {
778789
h.APIHandler.GetRealtimeClusterIndices(w, req, ps)
779790
return
780791
}
@@ -1327,3 +1338,85 @@ func (h *APIHandler) SearchClusterMetadata(w http.ResponseWriter, req *http.Requ
13271338
}
13281339
w.Write(util.MustToJSONBytes(response))
13291340
}
1341+
1342+
func (h *APIHandler) getClusterMonitorState(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
1343+
id := ps.ByName("id")
1344+
collectionMode := GetMonitorState(id)
1345+
ret := util.MapStr{
1346+
"cluster_id": id,
1347+
"metric_collection_mode": collectionMode,
1348+
}
1349+
queryDSL := util.MapStr{
1350+
"query": util.MapStr{
1351+
"bool": util.MapStr{
1352+
"must": []util.MapStr{
1353+
{
1354+
"term": util.MapStr{
1355+
"metadata.labels.cluster_id": id,
1356+
},
1357+
},
1358+
{
1359+
"term": util.MapStr{
1360+
"metadata.category": "elasticsearch",
1361+
},
1362+
},
1363+
},
1364+
},
1365+
},
1366+
"size": 0,
1367+
"aggs": util.MapStr{
1368+
"grp_name": util.MapStr{
1369+
"terms": util.MapStr{
1370+
"field": "metadata.name",
1371+
"size": 10,
1372+
},
1373+
"aggs": util.MapStr{
1374+
"max_timestamp": util.MapStr{
1375+
"max": util.MapStr{
1376+
"field": "timestamp",
1377+
},
1378+
},
1379+
},
1380+
},
1381+
},
1382+
}
1383+
dsl := util.MustToJSONBytes(queryDSL)
1384+
response, err := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)).SearchWithRawQueryDSL(getAllMetricsIndex(), dsl)
1385+
if err != nil {
1386+
log.Error(err)
1387+
h.WriteError(w, err.Error(), http.StatusInternalServerError)
1388+
return
1389+
}
1390+
for _, bk := range response.Aggregations["grp_name"].Buckets {
1391+
key := bk["key"].(string)
1392+
if tv, ok := bk["max_timestamp"].(map[string]interface{}); ok {
1393+
if collectionMode == elastic.ModeAgentless {
1394+
if util.StringInArray([]string{ "index_stats", "cluster_health", "cluster_stats", "node_stats"}, key) {
1395+
ret[key] = getCollectionStats(tv["value"])
1396+
}
1397+
}else{
1398+
if util.StringInArray([]string{ "shard_stats", "cluster_health", "cluster_stats", "node_stats"}, key) {
1399+
ret[key] = getCollectionStats(tv["value"])
1400+
}
1401+
}
1402+
}
1403+
1404+
}
1405+
h.WriteJSON(w, ret, http.StatusOK)
1406+
}
1407+
1408+
func getCollectionStats(lastActiveAt interface{}) util.MapStr {
1409+
stats := util.MapStr{
1410+
"last_active_at": lastActiveAt,
1411+
"status": "active",
1412+
}
1413+
if timestamp, ok := lastActiveAt.(float64); ok {
1414+
t := time.Unix(int64(timestamp/1000), 0)
1415+
if time.Now().Sub(t) > 5 * time.Minute {
1416+
stats["status"] = "warning"
1417+
}else{
1418+
stats["status"] = "ok"
1419+
}
1420+
}
1421+
return stats
1422+
}

modules/elastic/api/host.go

+32-12
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,12 @@ func (h *APIHandler) FetchHostInfo(w http.ResponseWriter, req *http.Request, ps
495495
Units: "/s",
496496
},
497497
}
498-
hostMetrics := h.getGroupHostMetric(context.Background(), agentIDs, min, max, bucketSize, hostMetricItems, "agent.id")
498+
hostMetrics, err := h.getGroupHostMetric(context.Background(), agentIDs, min, max, bucketSize, hostMetricItems, "agent.id")
499+
if err != nil {
500+
log.Error(err)
501+
h.WriteError(w, err.Error(), http.StatusInternalServerError)
502+
return
503+
}
499504

500505
networkMetrics := map[string]util.MapStr{}
501506
for key, item := range hostMetrics {
@@ -572,7 +577,7 @@ func (h *APIHandler) GetHostInfo(w http.ResponseWriter, req *http.Request, ps ht
572577

573578
}
574579

575-
func (h *APIHandler) getSingleHostMetric(ctx context.Context, agentID string, min, max int64, bucketSize int, metricItems []*common.MetricItem) map[string]*common.MetricItem {
580+
func (h *APIHandler) getSingleHostMetric(ctx context.Context, agentID string, min, max int64, bucketSize int, metricItems []*common.MetricItem) (map[string]*common.MetricItem, error) {
576581
var must = []util.MapStr{
577582
{
578583
"term": util.MapStr{
@@ -608,7 +613,7 @@ func (h *APIHandler) getSingleHostMetric(ctx context.Context, agentID string, mi
608613
return h.getSingleMetrics(ctx, metricItems, query, bucketSize)
609614
}
610615

611-
func (h *APIHandler) getSingleHostMetricFromNode(ctx context.Context, nodeID string, min, max int64, bucketSize int, metricKey string) map[string]*common.MetricItem {
616+
func (h *APIHandler) getSingleHostMetricFromNode(ctx context.Context, nodeID string, min, max int64, bucketSize int, metricKey string) (map[string]*common.MetricItem, error) {
612617
var must = []util.MapStr{
613618
{
614619
"term": util.MapStr{
@@ -725,7 +730,12 @@ func (h *APIHandler) GetSingleHostMetrics(w http.ResponseWriter, req *http.Reque
725730
ctx, cancel := context.WithTimeout(context.Background(), du)
726731
defer cancel()
727732
if hostInfo.AgentID == "" {
728-
resBody["metrics"] = h.getSingleHostMetricFromNode(ctx, hostInfo.NodeID, min, max, bucketSize, key)
733+
resBody["metrics"], err = h.getSingleHostMetricFromNode(ctx, hostInfo.NodeID, min, max, bucketSize, key)
734+
if err != nil {
735+
log.Error(err)
736+
h.WriteError(w, err, http.StatusInternalServerError)
737+
return
738+
}
729739
h.WriteJSON(w, resBody, http.StatusOK)
730740
return
731741
}
@@ -788,20 +798,30 @@ func (h *APIHandler) GetSingleHostMetrics(w http.ResponseWriter, req *http.Reque
788798
metricItem.AddLine("Disk Write Rate", "Disk Write Rate", "network write rate of host.", "group1", "payload.host.diskio_summary.write.bytes", "max", bucketSizeStr, "%", "bytes", "0,0.[00]", "0,0.[00]", false, true)
789799
metricItems = append(metricItems, metricItem)
790800
case DiskPartitionUsageMetricKey, NetworkInterfaceOutputRateMetricKey:
791-
groupMetrics := h.getGroupHostMetrics(ctx, hostInfo.AgentID, min, max, bucketSize, key)
792-
resBody["metrics"] = groupMetrics
801+
resBody["metrics"] , err = h.getGroupHostMetrics(ctx, hostInfo.AgentID, min, max, bucketSize, key)
802+
if err != nil {
803+
log.Error(err)
804+
h.WriteError(w, err, http.StatusInternalServerError)
805+
return
806+
}
793807
h.WriteJSON(w, resBody, http.StatusOK)
794808
return
795809
}
796-
hostMetrics := h.getSingleHostMetric(ctx, hostInfo.AgentID, min, max, bucketSize, metricItems)
810+
hostMetrics, err := h.getSingleHostMetric(ctx, hostInfo.AgentID, min, max, bucketSize, metricItems)
811+
if err != nil {
812+
log.Error(err)
813+
h.WriteError(w, err, http.StatusInternalServerError)
814+
return
815+
}
797816

798817
resBody["metrics"] = hostMetrics
799818

800819
h.WriteJSON(w, resBody, http.StatusOK)
801820
}
802821

803-
func (h *APIHandler) getGroupHostMetrics(ctx context.Context, agentID string, min, max int64, bucketSize int, metricKey string) map[string]*common.MetricItem {
822+
func (h *APIHandler) getGroupHostMetrics(ctx context.Context, agentID string, min, max int64, bucketSize int, metricKey string) (map[string]*common.MetricItem, error) {
804823
var metrics = make(map[string]*common.MetricItem)
824+
var err error
805825
switch metricKey {
806826
case DiskPartitionUsageMetricKey:
807827
diskPartitionMetric := newMetricItem(DiskPartitionUsageMetricKey, 2, SystemGroupKey)
@@ -817,7 +837,7 @@ func (h *APIHandler) getGroupHostMetrics(ctx context.Context, agentID string, mi
817837
Units: "%",
818838
},
819839
}
820-
metrics = h.getGroupHostMetric(ctx, []string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.disk_partition_usage.partition")
840+
metrics, err = h.getGroupHostMetric(ctx, []string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.disk_partition_usage.partition")
821841
case NetworkInterfaceOutputRateMetricKey:
822842
networkOutputMetric := newMetricItem(NetworkInterfaceOutputRateMetricKey, 2, SystemGroupKey)
823843
networkOutputMetric.AddAxi("Network interface output rate", "group1", common.PositionLeft, "bytes", "0.[0]", "0.[0]", 5, true)
@@ -832,13 +852,13 @@ func (h *APIHandler) getGroupHostMetrics(ctx context.Context, agentID string, mi
832852
Units: "",
833853
},
834854
}
835-
metrics = h.getGroupHostMetric(ctx, []string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.network_interface.name")
855+
metrics, err = h.getGroupHostMetric(ctx, []string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.network_interface.name")
836856
}
837857

838-
return metrics
858+
return metrics, err
839859
}
840860

841-
func (h *APIHandler) getGroupHostMetric(ctx context.Context, agentIDs []string, min, max int64, bucketSize int, hostMetricItems []GroupMetricItem, groupField string) map[string]*common.MetricItem {
861+
func (h *APIHandler) getGroupHostMetric(ctx context.Context, agentIDs []string, min, max int64, bucketSize int, hostMetricItems []GroupMetricItem, groupField string) (map[string]*common.MetricItem, error) {
842862
var must = []util.MapStr{
843863
{
844864
"term": util.MapStr{

modules/elastic/api/index_metrics.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -742,7 +742,7 @@ func (h *APIHandler) getIndexMetrics(ctx context.Context, req *http.Request, clu
742742
},
743743
},
744744
}
745-
return h.getMetrics(ctx, query, indexMetricItems, bucketSize), nil
745+
return h.getMetrics(ctx, query, indexMetricItems, bucketSize)
746746

747747
}
748748

0 commit comments

Comments
 (0)