From 692d3bb19590446a1a98910ecb27b15d900d2cc9 Mon Sep 17 00:00:00 2001 From: huanchao Date: Sat, 7 Oct 2023 18:43:26 +0800 Subject: [PATCH 1/9] [Agent] fix init_flow and icmp srt when disorder --- agent/src/flow_generator/flow_map.rs | 10 +-- agent/src/flow_generator/perf/icmp.rs | 69 +++++++++++++------ .../metrics/flow_metrics/vtap_flow_port.ch | 4 +- .../metrics/flow_metrics/vtap_flow_port.en | 4 +- 4 files changed, 56 insertions(+), 31 deletions(-) diff --git a/agent/src/flow_generator/flow_map.rs b/agent/src/flow_generator/flow_map.rs index 444ae0e0030..6d3a1be6669 100644 --- a/agent/src/flow_generator/flow_map.rs +++ b/agent/src/flow_generator/flow_map.rs @@ -48,7 +48,6 @@ use super::{ use crate::{ common::{ - ebpf::EbpfType, endpoint::{ EndpointData, EndpointDataPov, EndpointInfo, EPC_FROM_DEEPFLOW, EPC_FROM_INTERNET, }, @@ -1009,12 +1008,6 @@ impl FlowMap { fn init_flow(&mut self, config: &Config, meta_packet: &mut MetaPacket) -> Box { let flow_config = config.flow; - match meta_packet.ebpf_type { - EbpfType::GoHttp2Uprobe | EbpfType::GoHttp2UprobeData => {} - _ => { - meta_packet.lookup_key.direction = PacketDirection::ClientToServer; - } - } let mut tagged_flow = TaggedFlow::default(); let lookup_key = &meta_packet.lookup_key; @@ -1574,6 +1567,9 @@ impl FlowMap { node.flow_state = FlowState::Established; // opening timeout node.timeout = config.flow.flow_timeout.opening; + if let Some(meta_flow_log) = node.meta_flow_log.as_mut() { + let _ = meta_flow_log.parse_l3(meta_packet); + } node } diff --git a/agent/src/flow_generator/perf/icmp.rs b/agent/src/flow_generator/perf/icmp.rs index 410060f1a31..de3e684f757 100644 --- a/agent/src/flow_generator/perf/icmp.rs +++ b/agent/src/flow_generator/perf/icmp.rs @@ -14,7 +14,7 @@ * limitations under the License. */ -use std::cmp::max; +use std::{cmp::max, collections::VecDeque}; use pnet::packet::{ icmp::{IcmpType, IcmpTypes}, @@ -45,8 +45,8 @@ pub struct IcmpPerf { srt_max: Timestamp, srt_sum: Timestamp, srt_count: u32, - last_requests: Vec, - last_replies: Vec, + last_requests: VecDeque, + last_replies: VecDeque, data_update_flag: bool, } @@ -54,6 +54,22 @@ impl IcmpPerf { pub fn new() -> Self { IcmpPerf::default() } + + fn set_srt(&mut self, srt: Timestamp) { + if srt <= ART_MAX { + self.srt_max = max(self.srt_max, srt); + self.srt_sum += srt; + self.srt_count += 1; + self.data_update_flag = true; + } + } + + fn reset(&mut self) { + self.srt_max = Timestamp::default(); + self.srt_sum = Timestamp::default(); + self.srt_count = 0; + self.data_update_flag = false; + } } impl L4FlowPerf for IcmpPerf { @@ -89,19 +105,14 @@ impl L4FlowPerf for IcmpPerf { { if pkt_timestamp <= self.last_replies[i].timestamp { let srt = Timestamp::from(self.last_replies[i].timestamp - pkt_timestamp); - if srt <= ART_MAX { - self.srt_max = max(self.srt_max, srt); - self.srt_sum += srt; - self.srt_count += 1; - self.data_update_flag = true; - } + self.set_srt(srt); } self.last_replies.remove(i); } else { if self.last_requests.len() >= MAX_CACHE_COUNT { - self.last_requests.remove(0); + let _ = self.last_requests.pop_front(); } - self.last_requests.push(LastIcmp { + self.last_requests.push_back(LastIcmp { timestamp: pkt_timestamp, id_and_seq: icmp_data.echo_id_seq, }); @@ -114,19 +125,14 @@ impl L4FlowPerf for IcmpPerf { { if pkt_timestamp >= self.last_requests[i].timestamp { let srt = Timestamp::from(pkt_timestamp - self.last_requests[i].timestamp); - if srt <= ART_MAX { - self.srt_max = max(self.srt_max, srt); - self.srt_sum += srt; - self.srt_count += 1; - self.data_update_flag = true; - } + self.set_srt(srt); } self.last_requests.remove(i); } else { if self.last_replies.len() >= MAX_CACHE_COUNT { - self.last_replies.remove(0); + let _ = self.last_replies.pop_front(); } - self.last_replies.push(LastIcmp { + self.last_replies.push_back(LastIcmp { timestamp: pkt_timestamp, id_and_seq: icmp_data.echo_id_seq, }); @@ -140,12 +146,35 @@ impl L4FlowPerf for IcmpPerf { } fn copy_and_reset_data(&mut self, _: bool) -> FlowPerfStats { + for request_index in (0..self.last_requests.len()).rev() { + let request = &self.last_requests[request_index]; + if let Some(reply_index) = self + .last_replies + .iter() + .position(|reply| request.id_and_seq == reply.id_and_seq) + { + let reply = &self.last_replies[reply_index]; + if request.timestamp <= reply.timestamp { + let srt = Timestamp::from(reply.timestamp - request.timestamp); + if srt <= ART_MAX { + self.srt_max = max(self.srt_max, srt); + self.srt_sum += srt; + self.srt_count += 1; + self.data_update_flag = true; + } + } + + self.last_requests.remove(request_index); + self.last_replies.remove(reply_index); + } + } + let mut stats = FlowPerfStats::default(); stats.l4_protocol = L4Protocol::Icmp; stats.tcp.srt_max = (self.srt_max.as_nanos() / Timestamp::from_micros(1).as_nanos()) as u32; stats.tcp.srt_sum = (self.srt_sum.as_nanos() / Timestamp::from_micros(1).as_nanos()) as u32; stats.tcp.srt_count = self.srt_count; - *self = IcmpPerf::default(); + self.reset(); stats } diff --git a/server/querier/db_descriptions/clickhouse/metrics/flow_metrics/vtap_flow_port.ch b/server/querier/db_descriptions/clickhouse/metrics/flow_metrics/vtap_flow_port.ch index 9e61bf1ce35..0f001b1a13e 100644 --- a/server/querier/db_descriptions/clickhouse/metrics/flow_metrics/vtap_flow_port.ch +++ b/server/querier/db_descriptions/clickhouse/metrics/flow_metrics/vtap_flow_port.ch @@ -64,13 +64,13 @@ server_half_close_flow , 断连-服务端半关 , 连接 , TCP 传 rtt , 平均 TCP 建连时延 , 微秒 , 统计周期内,所有 TCP 建连时延的平均值,一次时延的计算见文档描述 rtt_client , 平均 TCP 建连客户端时延 , 微秒 , 统计周期内,所有 TCP 建连客户端时延的平均值,一次时延的计算见文档描述 rtt_server , 平均 TCP 建连服务端时延 , 微秒 , 统计周期内,所有 TCP 建连服务端时延的平均值,一次时延的计算见文档描述 -srt , 平均 TCP 系统时延 , 微秒 , 统计周期内,所有 TCP 系统时延的平均值,一次时延的计算见文档描述 +srt , 平均 TCP/ICMP 系统时延 , 微秒 , 统计周期内,所有 TCP/ICMP 系统时延的平均值,一次时延的计算见文档描述 art , 平均数据时延 , 微秒 , 统计周期内,所有数据时延的平均值,数据时延包含 TCP/UDP,一次时延的计算见文档描述 cit , 平均客户端等待时延 , 微秒 , 统计周期内,所有客户端等待时延的平均值,数据时延仅包含 TCP,一次时延的计算见文档描述 rtt_max , 最大 TCP 建连时延 , 微秒 , 统计周期内,所有 TCP 建连时延的最大值,一次时延的计算见文档描述 rtt_client_max , 最大 TCP 建连客户端时延 , 微秒 , 统计周期内,所有 TCP 建连客户端时延的最大值,一次时延的计算见文档描述 rtt_server_max , 最大 TCP 建连服务端时延 , 微秒 , 统计周期内,所有 TCP 建连服务端时延的最大值,一次时延的计算见文档描述 -srt_max , 最大 TCP 系统时延 , 微秒 , 统计周期内,所有 TCP 系统时延的最大值,一次时延的计算见文档描述 +srt_max , 最大 TCP/ICMP 系统时延 , 微秒 , 统计周期内,所有 TCP/ICMP 系统时延的最大值,一次时延的计算见文档描述 art_max , 最大数据时延 , 微秒 , 统计周期内,所有数据时延的最大值,数据时延包含 TCP/UDP,一次时延的计算见文档描述 cit_max , 最大客户端等待时延 , 微秒 , 统计周期内,所有客户端等待时延的最大值,数据时延仅包含 TCP,一次时延的计算见文档描述 diff --git a/server/querier/db_descriptions/clickhouse/metrics/flow_metrics/vtap_flow_port.en b/server/querier/db_descriptions/clickhouse/metrics/flow_metrics/vtap_flow_port.en index bbb9fe0369f..5c90480107b 100644 --- a/server/querier/db_descriptions/clickhouse/metrics/flow_metrics/vtap_flow_port.en +++ b/server/querier/db_descriptions/clickhouse/metrics/flow_metrics/vtap_flow_port.en @@ -64,13 +64,13 @@ server_half_close_flow , Close - Server Half Close , Flow , rtt , Avg TCP Est. Delay , us , rtt_client , Avg TCP Est. Client Delay , us , rtt_server , Avg TCP Est. Server Delay , us , -srt , Avg TCP ACK Delay , us , +srt , Avg TCP/ICMP Response Delay , us , art , Avg Data Delay , us , cit , Avg Client Idle Delay , us , rtt_max , Max TCP Est. Delay , us , rtt_client_max , Max TCP Est. Client Delay , us , rtt_server_max , Max TCP Est. Server Delay , us , -srt_max , Max TCP ACK Delay , us , +srt_max , Max TCP/ICMP Response Delay , us , art_max , Max Data Delay , us , cit_max , Max Client Idle Delay , us , From 646f4e76f5d1bc9c765a8af6aaa1916fe211aba8 Mon Sep 17 00:00:00 2001 From: taloric Date: Wed, 11 Oct 2023 16:33:49 +0800 Subject: [PATCH 2/9] [querier] remove replica labels by query params --- .../querier/app/prometheus/config/config.go | 1 + .../app/prometheus/model/prometheus.go | 19 +++++++------ .../app/prometheus/router/prometheus.go | 5 ++-- .../app/prometheus/service/converters.go | 28 +++++++++++++------ server/server.yaml | 1 + 5 files changed, 33 insertions(+), 21 deletions(-) diff --git a/server/querier/app/prometheus/config/config.go b/server/querier/app/prometheus/config/config.go index ac14ea75056..829d3deadea 100644 --- a/server/querier/app/prometheus/config/config.go +++ b/server/querier/app/prometheus/config/config.go @@ -25,6 +25,7 @@ type Prometheus struct { RequestQueryWithDebug bool `default:"false" yaml:"request-query-with-debug"` ExternalTagCacheSize int `default:"1024" yaml:"external-tag-cache-size"` ExternalTagLoadInterval int `default:"300" yaml:"external-tag-load-interval"` + ThanosReplicaLabels []string `yaml:"thanos-replica-labels"` Cache PrometheusCache `yaml:"cache"` } diff --git a/server/querier/app/prometheus/model/prometheus.go b/server/querier/app/prometheus/model/prometheus.go index 9658a48e9bf..1504ab80776 100644 --- a/server/querier/app/prometheus/model/prometheus.go +++ b/server/querier/app/prometheus/model/prometheus.go @@ -23,15 +23,16 @@ import ( ) type PromQueryParams struct { - MetricsWithPrefix string - Promql string - StartTime string - EndTime string - Step string - Debug bool - Slimit string - Matchers []string - Context context.Context + MetricsWithPrefix string + Promql string + StartTime string + EndTime string + Step string + Debug bool + Slimit string + ThanosReplicaLabels []string + Matchers []string + Context context.Context } type PromQueryData struct { diff --git a/server/querier/app/prometheus/router/prometheus.go b/server/querier/app/prometheus/router/prometheus.go index 0ae302e9bdb..f3fc0bea873 100644 --- a/server/querier/app/prometheus/router/prometheus.go +++ b/server/querier/app/prometheus/router/prometheus.go @@ -46,6 +46,7 @@ func promQuery(svc *service.PrometheusService) gin.HandlerFunc { args.Slimit = c.Request.FormValue("slimit") debug := c.Request.FormValue("debug") args.Debug, _ = strconv.ParseBool(debug) + result, err := svc.PromInstantQueryService(&args, c.Request.Context()) if err != nil { c.JSON(500, &model.PromQueryResponse{Error: err.Error(), Status: _STATUS_FAIL}) @@ -73,7 +74,6 @@ func promQueryRange(svc *service.PrometheusService) gin.HandlerFunc { c.JSON(500, &model.PromQueryResponse{Error: err.Error(), Status: _STATUS_FAIL}) return } - //pp.Println(result) c.JSON(200, result) }) } @@ -92,9 +92,8 @@ func promReader(svc *service.PrometheusService) gin.HandlerFunc { c.JSON(500, err) return } - //pp.Println(req) + resp, err := svc.PromRemoteReadService(&req, c.Request.Context()) - //pp.Println(resp) if err != nil { c.JSON(500, err) return diff --git a/server/querier/app/prometheus/service/converters.go b/server/querier/app/prometheus/service/converters.go index 3fdebbb3c73..199ac67a3d3 100644 --- a/server/querier/app/prometheus/service/converters.go +++ b/server/querier/app/prometheus/service/converters.go @@ -592,10 +592,25 @@ func (p *prometheusReader) respTransToProm(ctx context.Context, metricsName stri // merge and serialize all tags as map key var deepflowNativeTagString, promTagJson string + var filterTagMap map[string]string // merge prometheus tags if tagIndex > -1 { promTagJson = values[tagIndex].(string) - deepflowNativeTagString = promTagJson + tagMap := make(map[string]string) + json.Unmarshal([]byte(promTagJson), &tagMap) + filterTagMap = make(map[string]string, len(tagMap)) + for k, v := range tagMap { + if k == "" || v == "" { + continue + } + // ignore replica labels if passed + if config.Cfg.Prometheus.ThanosReplicaLabels != nil && common.IsValueInSliceString(k, config.Cfg.Prometheus.ThanosReplicaLabels) { + continue + } + filterTagMap[k] = v + } + promFilterTagJson, _ := json.Marshal(filterTagMap) + deepflowNativeTagString = string(promFilterTagJson) } // merge deepflow autotagging tags @@ -622,14 +637,9 @@ func (p *prometheusReader) respTransToProm(ctx context.Context, metricsName stri // tag label pair var pairs []prompb.Label - if tagIndex > -1 { - tagMap := make(map[string]string) - json.Unmarshal([]byte(promTagJson), &tagMap) - pairs = make([]prompb.Label, 0, 1+len(tagMap)+len(allDeepFlowNativeTags)) - for k, v := range tagMap { - if k == "" || v == "" { - continue - } + if tagIndex > -1 && filterTagMap != nil { + pairs = make([]prompb.Label, 0, 1+len(filterTagMap)+len(allDeepFlowNativeTags)) + for k, v := range filterTagMap { if prefix == prefixTag { // prometheus tag for deepflow metrics pairs = append(pairs, prompb.Label{ diff --git a/server/server.yaml b/server/server.yaml index 273fef7881d..07bf4aeb460 100644 --- a/server/server.yaml +++ b/server/server.yaml @@ -280,6 +280,7 @@ querier: request-query-with-debug: true external-tag-cache-size: 1024 external-tag-load-interval: 300 + thanos-replica-labels: [] # remove duplicate replica labels when query data cache: enabled: true cache-item-size: 512000 # max size of cache item, unit: byte From 132b7ebb43bc982788e9433ed21503d471a1963e Mon Sep 17 00:00:00 2001 From: taloric Date: Fri, 13 Oct 2023 18:08:42 +0800 Subject: [PATCH 3/9] [querier] remove extend end time --- server/querier/app/prometheus/service/promql.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/server/querier/app/prometheus/service/promql.go b/server/querier/app/prometheus/service/promql.go index d999e7af303..a6c86aa5208 100644 --- a/server/querier/app/prometheus/service/promql.go +++ b/server/querier/app/prometheus/service/promql.go @@ -197,14 +197,9 @@ func (p *prometheusExecutor) promQueryRangeExecute(ctx context.Context, args *mo if start.Local().Unix()%int64(step.Seconds()) > int64(p.lookbackDelta.Seconds()) { start = time.Unix(start.Local().Unix()-start.Local().Unix()%int64(step.Seconds()), 0) } - if end.Local().Unix()%int64(step.Seconds()) > int64(p.lookbackDelta.Seconds()) { - end = time.Unix(end.Local().Unix()-end.Local().Unix()%int64(step.Seconds())+int64(step.Seconds()), 0) - } if int(step.Seconds())%86400 == 0 { year_start, month_start, day_start := start.Date() - year_end, month_end, day_end := end.Date() start = time.Date(year_start, month_start, day_start, 0, 0, 0, 0, start.Location()) - end = time.Date(year_end, month_end, day_end, 0, 0, 0, 0, end.Location()) } qry, err := engine.NewRangeQuery(queriable, nil, args.Promql, start, end, step) if qry == nil || err != nil { From b03278796b4dfe9c8fabc227d1fe8b7204ed33c0 Mon Sep 17 00:00:00 2001 From: yuanchao Date: Fri, 13 Oct 2023 19:55:58 +0800 Subject: [PATCH 4/9] [Agent] Fix mongo log request resource is null #22509 --- .../test/flow_generator/mongo/mongo.pcap | Bin 0 -> 15158 bytes .../test/flow_generator/mongo/mongo.result | 30 ++++ .../flow_generator/protocol_logs/sql/mongo.rs | 144 +++++++++++++++--- 3 files changed, 154 insertions(+), 20 deletions(-) create mode 100644 agent/resources/test/flow_generator/mongo/mongo.pcap create mode 100644 agent/resources/test/flow_generator/mongo/mongo.result diff --git a/agent/resources/test/flow_generator/mongo/mongo.pcap b/agent/resources/test/flow_generator/mongo/mongo.pcap new file mode 100644 index 0000000000000000000000000000000000000000..d5be4b29127d9e970350c0a89c580760a3e516ff GIT binary patch literal 15158 zcmeHO3ve98neKfU8?YT2Oe|wUO@wnYwpQ9*JuC~$_G-0j$(D9)?MgBRBW8DIS7Xi2 ztY>ERFckvHT@tu5PUQgeGByw62bkE#F&N@19?2y~D(A|bO+g|^6;*`GaiJ)ZkSpRN z_x-)Ak$0sNT#Cz4g+|q@o$j8mzyANfyZ^s?_RLGi9&$1_8^7Glft!yORLHN^yVyE> zr}W_$R~_rE!RLR@ymb}}F}AX@X$@O*!MQJ7IKG} z?it73&RG|A|Mnl^l<#o4W)kvO8j6sCna9r~Ge*c)ts_)IS8hA-z8XIHYoP0UFWDw6 zcxW7X-h4So=wryK)0wG2%HAZs9sJGyg@DGKyBcIGK_7J=Hjik21yV6irJu zxI$cJR!-(r(NYHGK+fuyHA_i~mdv)JdYLJjYRnQ1i`R8#`eju`P02tv4R(!PsT|+NDz$46iYBT;Fpup-Rg*ADo^{oA)&6=w z=qB4ik6R;InX~RtwA^r^&99&&2wu^cz_SHIR%OwYh2_;Ns{LN?3Smi5H{`3CQ#6wO ziX|tloFVfDh8r9D8tRzc=v9DP4?eOg)K>ev)n1R!*Wj&g^j7<-Wp#OVp+0JnHT?+I zbe??{TYlZ>qhw9t!zGsg_WqqG24J7>EVwssO|Z>YSbaAkzSQhuH^V+D<;UmvSaN1Z z_Yq=ic2dN0_RhkyHe!enR{~`vxY9+2IE#N9|De2b)R_h2Dj28B(zCj%r$>4UY9Pa! znKksJY?|#7lPfNLHvCfkhf9Pbqn9N09BFSHuq-1Ex0t#XO>BjbW6E}!${dv4iRZ<% zOnEIMmTWSnjykEJ*MLS^L@U|P>+k)hgI%cVNl}d}uqXFP&z!%f@dHkx&|rP138C!?BhpoqC;NS)7TNIF*a-r zWj3`+iH44dzgyp|rQ(Uf!E}|`Y^_^Auy$Z;?S^eZ#q3oFgH}zOe^W!SN9%0V!ad!? z8!~!Jpt-q9Y4+87yMEQ0{5!vRwmdPQhJSDs?E78$ z^}IF05_@6eA+qmN9d5P=-KJDJ|2pPWt|G)gHBE|G&fZPDY$HBEh_8Vv$&jwsF<U&x{LSlC=*4U^w zW|M8Hts9s1Yz((c#&5AWW+g+%3yxYHDgT(y7a)2+9Zv)`3po);CwbUPIAz^y=OMj{c}qUy)ZWRzx=*km>8vBBnabe$U7 zWcZf%YTeex#;w`fny&Z?wNYUI69$Xud)ZnK-+g{LS@(CfEG_jA4Wmgx0S=yf0I zb-{nwdcA`L5d+F9B%*CoZo>zJ>afCdh1L^Z=KcR3fuKLS}DrnY~dYv%MjptVSYAHk~TmI7jpxy98gb^4HUn z@2hQWs;l+am&}cS?(+POXkB@Wn|&SCDNT@)RoTL&bh<=JZ>r%_D@jU^Kr96*9V98O z0m_)~ZquqnrX!J4R7ufNI&SECZ*{4ln!Mf%iP4*2v;{LErBXr7jE){8Myq!e3+mL$ z{X3tjAVFPu+!oX>7}vo?Kv{$Kh?ULAiA>vH!uK&J&6!rLv6c2}WY@~R#$;WxzQJ49 zC`l>5UuX^~{~orJRaYd|M}TM%B;l)E|;E_HPcktjks|dIZacvG!t+WML{gqA^56c6|yI#WKF^66T)&r z@Oyo=%$d|PSw%&YpQJW(afODoLB-Iu40+Ux(08Y#m=AH$ALD6akYikntmrf>|1bhff)&_%D3yszF)qqPTQ>vIYSb!i)kRR7-mj5FGV2Wm;l#0A0{2w9pL?LJjIaNg)$bwAWcvP9}pg8th`?|`@wER zm7CLAE?1FSJCXv3Os+ ztt%90?du73#oD70p}sg9?Z*0A1MyJ2y)#rOid`FvhdTQr(Z1HuhOST$rLBdc&Op44 z3Ob^ZwSBG8?v{>FAI}SSM}l$S#|jlgkpPtiVzE$H95~?L*A|Gtx$P>{=!mW*q*xnh zQ%PSW&>0$goC--K5%j+Ya=12zobw*+ZI8D_yY0@weeMzhEwK&~LJ-Y%w6BeXTKfXQ zU?>LnrYxaYCSQhTNpd2WM$Y6>rrX5WLx`DQ9!%B&#UXB^B`Tq$(~3+X8->7_D+ge?zr^H=WN8o za42?VfWp^Q{I{2H+PZVX^u&F4A7rK8$L2X#yh8L7JKby;UKOQM@58lq^l_peEZM#IVK2Ooo#cI7e%kgv z?uYkr7~6_0fcB{iHvKOlv&_;l^<86%lyiNYbCZDf*h1b8#+X&~W;;U5^CMxw-R0%>w65oPRjOb_w zzgMJ+Ccoe3_o5J`)-cLxb`>>A#hNW^6;%}(GmYFplJ-dizN+e~pqPTDTS9)?k%SbM zGHmtTadE>{*p#~mV%>ds>tI1?);9_VdE&7_(*PlzvSq#&fF{z+W zm7+Q%j+m9zLa!{CxpW#m6tM7O;YK+tPtzTRQ1d!#{ZewzTFjVz|O0cLcVyI|&q8 zkm<6uKu1#f7?%feyb#9p7nsi6m#=UT>D%s~G7LUUq*vcoY&t)A0j85G349=WFE2Hv1QR*8}TT7jt3Q>P}onG zy?k%LjRRqDFe6bQ8O>skA%FcZ91rp(3$ITckFM?^>No9iv)ABNQkvkj{&e(vL_KtI zvFCAm6rRUv@;nxL^VC6m4?K^fyMWS)Mu@&m{bppkv28V&Klt(TAiXAM8hqb3zaLDo zA0lPuNsaAt)W}w{uvuJ9F(tOU5s+asm3^2gztMHmYr>G&mDxM zZ&)KeBPBM7RzLd!)yCFrjjAN_?3rt;o}(8=c-gE%5J{&;F2tOcoS}`4P7Z-+LecUs zs8Do~!UjE?g-wD$7TftaeRpQ5i7V_T^z_(n`k0_-M*$i+|Ag!rEAlug+I7IqZbc!b z2{vk0wsRZ(rxL3_t%iSY!Y?*2xZ1YSr%2g!90gaIU$&zE2Dg*=#eDaLI+PEW{&~|i z-zGx;0zzBhqEafgJRJQTAwsLaS!{VfH}~&+c5aE!`!Fo;De`$kXoE=Egrr+waOpZD|W^WJl?`j@oWe(lhdKJPkOY|qUsUj3h)tA<~_13vF}TH$LJeBR?& z{U0X8pFQDb-^F4}X@WoTx0UO;KXGDG#B#@Xr00tf$-TV@i7tEjUXGhjvS(Z3VEm(b zq>dO~e|XCM*ki;nG*Ikdym|*5j2lY!V;_Noah&#J!yrT~ZHAxm{n*6+<9_VyUn>%6 z;r3&0c0XiBX@Z^XsO;c&@T9pz0`5{n2qZdmdPnN9eti%V)2T)HiAdfX+1E60{?HOayy5cM zQ+XpB_{4N{Qp9q{DQ53(?7M~#$s3u2M3+6X|3YtMv3C2t#BkA~*;Abkf1enJ`itkD z*9S58SjFDR&{E7j*N`{T4tB)SX80NRMt<}1U-hRk~%YYuxDTA(z+zqzxri(B5FNfFB(XWsUW+452R z=5J`}jX=S6Fu!cQk{98|4q)N8L?)w+HkbEfUV57F*3+Y}Jl9N&58gM2sX(AK!Ogj0 z^aL>um%OL&#vxeZ8M4F$FXb7}oBs{`3hZe2QlP9uBGxuzdQGiJ{%?Q#If2@K|Nc2s zjr _HEADER_SIZE => { + _OP_MSG if payload.len() > _MSG_DOC_SECTION_OFFSET => { // OP_MSG let mut msg_body = MongoOpMsg::default(); - msg_body.decode(&payload[_HEADER_SIZE..])?; + // TODO: Message Flags + msg_body.decode(&payload[_MSG_DOC_SECTION_OFFSET..])?; match info.msg_type { LogMessageType::Response => { - info.response = msg_body.sections.doc.to_string(); - info.exception = msg_body.sections.c_string.unwrap_or(_UNKNOWN.to_string()); - info.response_code = - msg_body.sections.doc.get_f64("code").unwrap_or(0.0) as i32; + // The data structure of doc is Bson, which is a normal response when there is no errmsg in it + if msg_body.sections.doc.get_str("errmsg").is_err() { + info.response = msg_body.sections.doc.to_string(); + } else { + info.exception = + msg_body.sections.doc.get_str("errmsg").unwrap().to_string(); + } + if info.exception.len() == 0 { + info.exception = + msg_body.sections.c_string.unwrap_or(_UNKNOWN.to_string()); + } + info.response_code = msg_body.sections.doc.get_i32("code").unwrap_or(0); if info.response_code > 0 { self.perf_stats.as_mut().map(|p| p.inc_resp_err()); } @@ -315,13 +327,16 @@ impl MongoDBLog { } _OP_QUERY if payload.len() > 28 => { // "OP_QUERY" - info.exception = CStr::from_bytes_until_nul(&payload[..20]) - .map_err(|_| Error::L7ProtocolUnknown)? - .to_string_lossy() - .into_owned(); + let collection_name = + CStr::from_bytes_until_nul(&payload[_COLLECTION_NAME_OFFSET..]) + .map_err(|_| Error::L7ProtocolUnknown)? + .to_string_lossy() + .into_owned(); - let query = Document::from_reader(&payload[28 + info.exception.len() + 1..]) - .unwrap_or(Document::default()); + let query = Document::from_reader( + &payload[_QUERY_DOC_OFFSET + collection_name.len() + 1..], + ) + .unwrap_or(Document::default()); info.request = query.to_string(); } _OP_GET_MORE | _OP_DELETE if payload.len() > 20 => { @@ -415,18 +430,24 @@ pub struct MongoOpMsg { } impl MongoOpMsg { + const _KIND_OFFSET: usize = 0; + const _KIND_LEN: usize = 1; + const _DOC_LENGTH_OFFSET: usize = Self::_KIND_OFFSET + Self::_KIND_LEN; + const _DOC_LENGTH_LEN: usize = 4; + fn decode(&mut self, payload: &[u8]) -> Result { - if payload.len() < 9 { + if payload.len() < Self::_DOC_LENGTH_OFFSET + Self::_DOC_LENGTH_LEN { return Ok(false); } - // todo: decode flag let mut sections = Sections::default(); //sections.kind = payload[4]; - let section_len = bytes::read_u32_le(&payload[5..9]); - if payload.len() < 4 + section_len as usize { + let section_len = bytes::read_u32_le( + &payload[Self::_DOC_LENGTH_OFFSET..Self::_DOC_LENGTH_OFFSET + Self::_DOC_LENGTH_LEN], + ); + if payload.len() < Self::_DOC_LENGTH_LEN + section_len as usize { return Ok(false); } - let _ = sections.decode(&payload[4..]); + let _ = sections.decode(&payload); self.sections = sections; // todo: decode checksum Ok(true) @@ -455,8 +476,9 @@ impl Sections { 0 => { // Body self.kind_name = "BODY".to_string(); - let lenght = bytes::read_u32_le(&payload[1..5]); - if lenght != payload.len() as u32 - 1 { + let length = bytes::read_u32_le(&payload[1..5]); + // TODO: When ChecksumPresent is 1, there will be checksum in the payload + if length != payload.len() as u32 - 1 && length != payload.len() as u32 - 5 { return Ok(false); } self.doc = Document::from_reader(&payload[1..]).unwrap_or(Document::default()); @@ -567,3 +589,85 @@ pub struct MongoOpUpdate { zero: u32, } */ + +#[cfg(test)] +mod tests { + use std::path::Path; + use std::rc::Rc; + use std::{cell::RefCell, fs}; + + use super::*; + + use crate::{ + common::{flow::PacketDirection, l7_protocol_log::L7PerfCache, MetaPacket}, + flow_generator::L7_RRT_CACHE_CAPACITY, + utils::test::Capture, + }; + + const FILE_DIR: &str = "resources/test/flow_generator/mongo"; + + fn run(name: &str) -> String { + let capture = Capture::load_pcap(Path::new(FILE_DIR).join(name), None); + let log_cache = Rc::new(RefCell::new(L7PerfCache::new(L7_RRT_CACHE_CAPACITY))); + let mut packets = capture.as_meta_packets(); + if packets.is_empty() { + return "".to_string(); + } + + let mut output: String = String::new(); + let first_dst_port = packets[0].lookup_key.dst_port; + for packet in packets.iter_mut() { + packet.lookup_key.direction = if packet.lookup_key.dst_port == first_dst_port { + PacketDirection::ClientToServer + } else { + PacketDirection::ServerToClient + }; + let payload = match packet.get_l4_payload() { + Some(p) => p, + None => continue, + }; + + let mut mongo = MongoDBLog::default(); + let param = &ParseParam::new(packet as &MetaPacket, log_cache.clone(), true, true); + + let is_mongo = mongo.check_payload(payload, param); + let info = mongo.parse_payload(payload, param); + if let Ok(info) = info { + match info.unwrap_single() { + L7ProtocolInfo::MongoDBInfo(i) => { + output.push_str(&format!("{:?} is_mongo: {}\r\n", i, is_mongo)); + } + _ => unreachable!(), + } + } else { + output.push_str(&format!( + "{:?} is_mongo: {}\r\n", + MongoDBInfo::default(), + is_mongo + )); + } + } + output + } + + #[test] + fn check() { + let files = vec![("mongo.pcap", "mongo.result")]; + + for item in files.iter() { + let expected = fs::read_to_string(&Path::new(FILE_DIR).join(item.1)).unwrap(); + let output = run(item.0); + + if output != expected { + let output_path = Path::new("actual.txt"); + fs::write(&output_path, &output).unwrap(); + assert!( + output == expected, + "output different from expected {}, written to {:?}", + item.1, + output_path + ); + } + } + } +} From 76b61afade27de85d95800292fb260a8f8f0864c Mon Sep 17 00:00:00 2001 From: xiaoziv Date: Wed, 11 Oct 2023 07:54:13 +0000 Subject: [PATCH 5/9] [Agent] refactor optimize code --- agent/src/collector/l7_quadruple_generator.rs | 59 +++++++++---------- 1 file changed, 27 insertions(+), 32 deletions(-) diff --git a/agent/src/collector/l7_quadruple_generator.rs b/agent/src/collector/l7_quadruple_generator.rs index c1705224953..afb258db44a 100644 --- a/agent/src/collector/l7_quadruple_generator.rs +++ b/agent/src/collector/l7_quadruple_generator.rs @@ -270,7 +270,7 @@ impl SubQuadGen { flow: flow.clone(), l7_protocol: meter.l7_protocol, endpoint_hash, - endpoint: meter.endpoint.clone(), + endpoint: meter.endpoint, is_active_host0, is_active_host1, time_in_second: tagged_flow.flow.flow_stat_time, @@ -549,8 +549,8 @@ impl L7QuadrupleGenerator { ) { let mut second_inject = false; let mut minute_inject = false; - if let Some(s) = self.second_quad_gen.as_mut() { - if config.vtap_flow_1s_enabled { + if config.vtap_flow_1s_enabled { + if let Some(s) = self.second_quad_gen.as_mut() { second_inject = s.move_window(time_in_second); } } @@ -565,7 +565,11 @@ impl L7QuadrupleGenerator { let endpoint_hash = hash_endpoint(&l7_stats.endpoint) as u32; - let app_meter = Self::generate_app_meter(&l7_stats, config.l7_metrics_enabled); + let app_meter = if config.l7_metrics_enabled { + Self::generate_app_meter(&l7_stats) + } else { + AppMeter::default() + }; if second_inject { self.second_quad_gen.as_mut().unwrap().inject_app_meter( @@ -588,12 +592,7 @@ impl L7QuadrupleGenerator { } } - fn generate_app_meter(l7_stats: &L7Stats, l7_metrics_enabled: bool) -> AppMeter { - let mut app_meter = AppMeter::default(); - - if !l7_metrics_enabled { - return app_meter; - } + fn generate_app_meter(l7_stats: &L7Stats) -> AppMeter { let (close_type, direction_score) = if let Some(tagged_flow) = &l7_stats.flow { ( tagged_flow.flow.close_type, @@ -609,7 +608,7 @@ impl L7QuadrupleGenerator { SignalSource::Packet | SignalSource::EBPF | SignalSource::XFlow, ) => { // only L7Protocol is Unknown or Other and SignalSource != Otel will execute the following logic - app_meter = AppMeter { + AppMeter { traffic: AppTraffic { request: (close_type != CloseType::ForcedReport) as u32, response: (close_type != CloseType::ForcedReport) as u32, @@ -618,28 +617,24 @@ impl L7QuadrupleGenerator { ..Default::default() } } - (_, _) => { - app_meter = AppMeter { - traffic: AppTraffic { - request: stats.request_count, - response: stats.response_count, - direction_score: direction_score, - }, - latency: AppLatency { - rrt_max: stats.rrt_max, - rrt_sum: stats.rrt_sum as u64, - rrt_count: stats.rrt_count, - }, - anomaly: AppAnomaly { - client_error: stats.err_client_count, - server_error: stats.err_server_count, - timeout: stats.err_timeout, - }, - }; - } + (_, _) => AppMeter { + traffic: AppTraffic { + request: stats.request_count, + response: stats.response_count, + direction_score: direction_score, + }, + latency: AppLatency { + rrt_max: stats.rrt_max, + rrt_sum: stats.rrt_sum as u64, + rrt_count: stats.rrt_count, + }, + anomaly: AppAnomaly { + client_error: stats.err_client_count, + server_error: stats.err_server_count, + timeout: stats.err_timeout, + }, + }, } - - app_meter } fn handler_routine(&mut self) { From aa524d48bb57deeb92180e9151fb07c3fb821d28 Mon Sep 17 00:00:00 2001 From: zhuofeng Date: Thu, 12 Oct 2023 15:37:18 +0800 Subject: [PATCH 6/9] [ingester] l7_flow_log store syscall_coroutine_0/syscall_coroutine_1 --- server/go.mod | 2 +- server/go.sum | 4 ++-- server/ingester/ckissu/ckissu.go | 11 ++++++++++- server/ingester/common/const.go | 2 +- server/ingester/flow_log/log_data/l7_flow_log.go | 8 ++++++++ .../clickhouse/tag/flow_log/l7_flow_log | 2 ++ .../clickhouse/tag/flow_log/l7_flow_log.ch | 2 ++ .../clickhouse/tag/flow_log/l7_flow_log.en | 2 ++ 8 files changed, 28 insertions(+), 5 deletions(-) diff --git a/server/go.mod b/server/go.mod index 09a1ba3420a..2adde37171f 100644 --- a/server/go.mod +++ b/server/go.mod @@ -35,7 +35,7 @@ require ( github.com/cornelk/hashmap v1.0.8 github.com/deckarep/golang-set v1.8.0 github.com/deckarep/golang-set/v2 v2.1.0 - github.com/deepflowio/deepflow/message v0.0.0-20230927071530-6d75b95973e7 + github.com/deepflowio/deepflow/message v0.0.0-20231009120220-2f68ce239be3 github.com/deepflowio/deepflow/server/controller/cloud/kubernetes_gather/expand v0.0.0-00010101000000-000000000000 github.com/deepflowio/deepflow/server/controller/cloud/platform v0.0.0-00010101000000-000000000000 github.com/deepflowio/deepflow/server/controller/db/mysql/migrator v0.0.0-00010101000000-000000000000 diff --git a/server/go.sum b/server/go.sum index c9b053f6241..04229db40af 100644 --- a/server/go.sum +++ b/server/go.sum @@ -153,8 +153,8 @@ github.com/deckarep/golang-set v1.8.0 h1:sk9/l/KqpunDwP7pSjUg0keiOOLEnOBHzykLrsP github.com/deckarep/golang-set v1.8.0/go.mod h1:5nI87KwE7wgsBU1F4GKAw2Qod7p5kyS383rP6+o6qqo= github.com/deckarep/golang-set/v2 v2.1.0 h1:g47V4Or+DUdzbs8FxCCmgb6VYd+ptPAngjM6dtGktsI= github.com/deckarep/golang-set/v2 v2.1.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= -github.com/deepflowio/deepflow/message v0.0.0-20230927071530-6d75b95973e7 h1:rrvioA174n8n6g0Jk24vtb+8o1dpvT4fP8ASQfSPJEc= -github.com/deepflowio/deepflow/message v0.0.0-20230927071530-6d75b95973e7/go.mod h1:e+1lUMMlycCvFRKvlwt/y/0vxJnF8wVss3GyR1ARXY0= +github.com/deepflowio/deepflow/message v0.0.0-20231009120220-2f68ce239be3 h1:sR2fwjKCZvK3iP5WCfaoscD6mrPohlXf2r/JSRG3b18= +github.com/deepflowio/deepflow/message v0.0.0-20231009120220-2f68ce239be3/go.mod h1:e+1lUMMlycCvFRKvlwt/y/0vxJnF8wVss3GyR1ARXY0= github.com/deepflowio/tempopb v0.0.0-20230215110519-15853baf3a79 h1:erRwXyZiUZxxZX/Q1QHesZXgxjiunZUFy+ggCRimkD4= github.com/deepflowio/tempopb v0.0.0-20230215110519-15853baf3a79/go.mod h1:h2rkZ319TExIUGuK8a2dlcGd8qc6wdhsrcpXWaJQaQE= github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE= diff --git a/server/ingester/ckissu/ckissu.go b/server/ingester/ckissu/ckissu.go index b0efbe3d9f0..defe0985c88 100644 --- a/server/ingester/ckissu/ckissu.go +++ b/server/ingester/ckissu/ckissu.go @@ -950,6 +950,15 @@ var ColumnDrops635 = []*ColumnDrops{ }, } +var ColumnAdd64 = []*ColumnAdds{ + &ColumnAdds{ + Dbs: []string{"flow_log"}, + Tables: []string{"l7_flow_log", "l7_flow_log_local"}, + ColumnNames: []string{"syscall_coroutine_0", "syscall_coroutine_1"}, + ColumnType: ckdb.UInt64, + }, +} + func getTables(connect *sql.DB, db, tableName string) ([]string, error) { sql := fmt.Sprintf("SHOW TABLES IN %s", db) rows, err := connect.Query(sql) @@ -1330,7 +1339,7 @@ func NewCKIssu(cfg *config.Config) (*Issu, error) { datasourceInfo: make(map[string]*DatasourceInfo), } - allVersionAdds := [][]*ColumnAdds{ColumnAdd610, ColumnAdd611, ColumnAdd612, ColumnAdd613, ColumnAdd615, ColumnAdd618, ColumnAdd620, ColumnAdd623, ColumnAdd625, ColumnAdd626, ColumnAdd633, ColumnAdd635} + allVersionAdds := [][]*ColumnAdds{ColumnAdd610, ColumnAdd611, ColumnAdd612, ColumnAdd613, ColumnAdd615, ColumnAdd618, ColumnAdd620, ColumnAdd623, ColumnAdd625, ColumnAdd626, ColumnAdd633, ColumnAdd635, ColumnAdd64} i.columnAdds = []*ColumnAdd{} for _, versionAdd := range allVersionAdds { for _, adds := range versionAdd { diff --git a/server/ingester/common/const.go b/server/ingester/common/const.go index 2399751b2dc..94774dea735 100644 --- a/server/ingester/common/const.go +++ b/server/ingester/common/const.go @@ -17,6 +17,6 @@ package common const ( - CK_VERSION = "v6.3.8.2" // 用于表示clickhouse的表版本号 + CK_VERSION = "v6.3.4.0" // 用于表示clickhouse的表版本号 DEFAULT_PCAP_DATA_PATH = "/var/lib/pcap" ) diff --git a/server/ingester/flow_log/log_data/l7_flow_log.go b/server/ingester/flow_log/log_data/l7_flow_log.go index 3ade31b9e2d..97a195b1ccc 100644 --- a/server/ingester/flow_log/log_data/l7_flow_log.go +++ b/server/ingester/flow_log/log_data/l7_flow_log.go @@ -81,6 +81,8 @@ type L7Base struct { SyscallTraceIDResponse uint64 SyscallThread0 uint32 SyscallThread1 uint32 + SyscallCoroutine0 uint64 + SyscallCoroutine1 uint64 SyscallCapSeq0 uint32 SyscallCapSeq1 uint32 } @@ -130,6 +132,8 @@ func L7BaseColumns() []*ckdb.Column { ckdb.NewColumn("syscall_trace_id_response", ckdb.UInt64).SetComment("SyscallTraceID-响应"), ckdb.NewColumn("syscall_thread_0", ckdb.UInt32).SetComment("Syscall线程-请求"), ckdb.NewColumn("syscall_thread_1", ckdb.UInt32).SetComment("Syscall线程-响应"), + ckdb.NewColumn("syscall_coroutine_0", ckdb.UInt64).SetComment("Request Syscall Coroutine"), + ckdb.NewColumn("syscall_coroutine_1", ckdb.UInt64).SetComment("Response Syscall Coroutine"), ckdb.NewColumn("syscall_cap_seq_0", ckdb.UInt32).SetComment("Syscall序列号-请求"), ckdb.NewColumn("syscall_cap_seq_1", ckdb.UInt32).SetComment("Syscall序列号-响应"), ) @@ -177,6 +181,8 @@ func (f *L7Base) WriteBlock(block *ckdb.Block) { f.SyscallTraceIDResponse, f.SyscallThread0, f.SyscallThread1, + f.SyscallCoroutine0, + f.SyscallCoroutine1, f.SyscallCapSeq0, f.SyscallCapSeq1) } @@ -538,6 +544,8 @@ func (b *L7Base) Fill(log *pb.AppProtoLogsData, platformData *grpc.PlatformInfoT b.SyscallTraceIDResponse = l.SyscallTraceIdResponse b.SyscallThread0 = l.SyscallTraceIdThread_0 b.SyscallThread1 = l.SyscallTraceIdThread_1 + b.SyscallCoroutine0 = l.SyscallCoroutine_0 + b.SyscallCoroutine1 = l.SyscallCoroutine_1 b.SyscallCapSeq0 = l.SyscallCapSeq_0 b.SyscallCapSeq1 = l.SyscallCapSeq_1 diff --git a/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log b/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log index fde27722e46..81f6f724b97 100644 --- a/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log +++ b/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log @@ -90,6 +90,8 @@ syscall_trace_id_request , syscall_trace_id_request , syscall_trace_id_request syscall_trace_id_response , syscall_trace_id_response , syscall_trace_id_response , int , , Tracing Info , 111 syscall_thread_0 , syscall_thread_0 , syscall_thread_0 , int , , Tracing Info , 111 syscall_thread_1 , syscall_thread_1 , syscall_thread_1 , int , , Tracing Info , 111 +syscall_coroutine_0 , syscall_coroutine_0 , syscall_coroutine_0 , int , , Tracing Info , 111 +syscall_coroutine_1 , syscall_coroutine_1 , syscall_coroutine_1 , int , , Tracing Info , 111 syscall_cap_seq_0 , syscall_cap_seq_0 , syscall_cap_seq_0 , int , , Tracing Info , 111 syscall_cap_seq_1 , syscall_cap_seq_1 , syscall_cap_seq_1 , int , , Tracing Info , 111 diff --git a/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log.ch b/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log.ch index 91a4fc251cf..92c59365c5f 100644 --- a/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log.ch +++ b/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log.ch @@ -90,6 +90,8 @@ syscall_trace_id_request , 请求 Syscall TraceID , syscall_trace_id_response , 响应 Syscall TraceID , syscall_thread_0 , 请求 Syscall 线程 , syscall_thread_1 , 响应 Syscall 线程 , +syscall_coroutine_0 , 请求 Syscall 协程 , +syscall_coroutine_1 , 响应 Syscall 协程 , syscall_cap_seq_0 , 请求 Syscall 序号 , syscall_cap_seq_1 , 响应 Syscall 序号 , diff --git a/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log.en b/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log.en index 60cf12ef192..539a533e722 100644 --- a/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log.en +++ b/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log.en @@ -90,6 +90,8 @@ syscall_trace_id_request , Req Syscall TraceID , syscall_trace_id_response , Resp Syscall TraceID , syscall_thread_0 , Req Syscall Thread , syscall_thread_1 , Resp Syscall Thread , +syscall_coroutine_0 , Req Syscall Coroutine , +syscall_coroutine_1 , Resp Syscall Coroutine , syscall_cap_seq_0 , Req Syscall CapSeq , syscall_cap_seq_1 , Resp Syscall CapSeq , From a9ec76940ac8d5b98e711b906836c81a570f6728 Mon Sep 17 00:00:00 2001 From: zhuofeng Date: Mon, 16 Oct 2023 15:41:24 +0800 Subject: [PATCH 7/9] [ingester] fix deepflow-stats name empty may cause panic --- server/ingester/ext_metrics/decoder/decoder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/ingester/ext_metrics/decoder/decoder.go b/server/ingester/ext_metrics/decoder/decoder.go index 40c0f550a6d..92b3e5d282c 100644 --- a/server/ingester/ext_metrics/decoder/decoder.go +++ b/server/ingester/ext_metrics/decoder/decoder.go @@ -194,7 +194,7 @@ func (d *Decoder) handleDeepflowStats(vtapID uint16, decoder *codec.SimpleDecode d.counter.ErrorCount++ return } - if err := pbStats.Unmarshal(bytes); err != nil { + if err := pbStats.Unmarshal(bytes); err != nil || pbStats.Name == "" { if d.counter.ErrorCount == 0 { log.Warningf("deepflow stats parse failed, err msg: %s", err) } From 3d11340de20c43518e00216704f068f892d30a29 Mon Sep 17 00:00:00 2001 From: Ericsssss Date: Mon, 16 Oct 2023 16:44:00 +0800 Subject: [PATCH 8/9] [Querier] modify pod_group_type showtagvalues error #22514 - run automation test(basic & querier_sql) pass --- server/controller/tagrecorder/const.go | 12 ++--- server/querier/engine/clickhouse/tag/const.go | 53 ++++++++++++------- .../engine/clickhouse/tag/description.go | 4 ++ 3 files changed, 45 insertions(+), 24 deletions(-) diff --git a/server/controller/tagrecorder/const.go b/server/controller/tagrecorder/const.go index 1949b4fac3d..b7dca15e280 100644 --- a/server/controller/tagrecorder/const.go +++ b/server/controller/tagrecorder/const.go @@ -756,12 +756,12 @@ var RESOURCE_TYPE_TO_NODE_TYPE = map[int]string{ common.VIF_DEVICE_TYPE_POD_GROUP: RESOURCE_TYPE_POD_GROUP, common.VIF_DEVICE_TYPE_SERVICE: RESOURCE_TYPE_SERVICE, common.VIF_DEVICE_TYPE_GPROCESS: RESOURCE_TYPE_GPROCESS, - common.VIF_DEVICE_TYPE_POD_GROUP_DEPLOYMENT: RESOURCE_TYPE_CH_POD_GROUP_DEPLOYMENT, - common.VIF_DEVICE_TYPE_POD_GROUP_STATEFULSET: RESOURCE_TYPE_CH_POD_GROUP_STATEFULSET, - common.VIF_DEVICE_TYPE_POD_GROUP_RC: RESOURCE_TYPE_CH_POD_GROUP_RC, - common.VIF_DEVICE_TYPE_POD_GROUP_DAEMON_SET: RESOURCE_TYPE_CH_POD_GROUP_DAEMON_SET, - common.VIF_DEVICE_TYPE_POD_GROUP_REPLICASET_CONTROLLER: RESOURCE_TYPE_CH_POD_GROUP_REPLICASET_CONTROLLER, - common.VIF_DEVICE_TYPE_POD_GROUP_CLONESET: RESOURCE_TYPE_CH_POD_GROUP_CLONESET, + common.VIF_DEVICE_TYPE_POD_GROUP_DEPLOYMENT: RESOURCE_TYPE_POD_GROUP, + common.VIF_DEVICE_TYPE_POD_GROUP_STATEFULSET: RESOURCE_TYPE_POD_GROUP, + common.VIF_DEVICE_TYPE_POD_GROUP_RC: RESOURCE_TYPE_POD_GROUP, + common.VIF_DEVICE_TYPE_POD_GROUP_DAEMON_SET: RESOURCE_TYPE_POD_GROUP, + common.VIF_DEVICE_TYPE_POD_GROUP_REPLICASET_CONTROLLER: RESOURCE_TYPE_POD_GROUP, + common.VIF_DEVICE_TYPE_POD_GROUP_CLONESET: RESOURCE_TYPE_POD_GROUP, common.VIF_DEVICE_TYPE_IP: RESOURCE_TYPE_IP, } diff --git a/server/querier/engine/clickhouse/tag/const.go b/server/querier/engine/clickhouse/tag/const.go index af4068d1d16..7ccf80bc19a 100644 --- a/server/querier/engine/clickhouse/tag/const.go +++ b/server/querier/engine/clickhouse/tag/const.go @@ -17,22 +17,28 @@ package tag const ( - VIF_DEVICE_TYPE_INTERNET = 0 - VIF_DEVICE_TYPE_VM = 1 - VIF_DEVICE_TYPE_VROUTER = 5 - VIF_DEVICE_TYPE_HOST = 6 - VIF_DEVICE_TYPE_DHCP_PORT = 9 - VIF_DEVICE_TYPE_POD = 10 - VIF_DEVICE_TYPE_POD_SERVICE = 11 - VIF_DEVICE_TYPE_REDIS_INSTANCE = 12 - VIF_DEVICE_TYPE_RDS_INSTANCE = 13 - VIF_DEVICE_TYPE_POD_NODE = 14 - VIF_DEVICE_TYPE_LB = 15 - VIF_DEVICE_TYPE_NAT_GATEWAY = 16 - VIF_DEVICE_TYPE_POD_GROUP = 101 - VIF_DEVICE_TYPE_SERVICE = 102 - VIF_DEVICE_TYPE_GPROCESS = 120 - VIF_DEVICE_TYPE_IP = 255 + VIF_DEVICE_TYPE_INTERNET = 0 + VIF_DEVICE_TYPE_VM = 1 + VIF_DEVICE_TYPE_VROUTER = 5 + VIF_DEVICE_TYPE_HOST = 6 + VIF_DEVICE_TYPE_DHCP_PORT = 9 + VIF_DEVICE_TYPE_POD = 10 + VIF_DEVICE_TYPE_POD_SERVICE = 11 + VIF_DEVICE_TYPE_REDIS_INSTANCE = 12 + VIF_DEVICE_TYPE_RDS_INSTANCE = 13 + VIF_DEVICE_TYPE_POD_NODE = 14 + VIF_DEVICE_TYPE_LB = 15 + VIF_DEVICE_TYPE_NAT_GATEWAY = 16 + VIF_DEVICE_TYPE_POD_GROUP = 101 + VIF_DEVICE_TYPE_SERVICE = 102 + VIF_DEVICE_TYPE_GPROCESS = 120 + VIF_DEVICE_TYPE_POD_GROUP_DEPLOYMENT = 130 + VIF_DEVICE_TYPE_POD_GROUP_STATEFULSET = 131 + VIF_DEVICE_TYPE_POD_GROUP_RC = 132 + VIF_DEVICE_TYPE_POD_GROUP_DAEMON_SET = 133 + VIF_DEVICE_TYPE_POD_GROUP_REPLICASET_CONTROLLER = 134 + VIF_DEVICE_TYPE_POD_GROUP_CLONESET = 135 + VIF_DEVICE_TYPE_IP = 255 ) const ( @@ -70,8 +76,19 @@ var AutoPodGroupMap = map[string]int{ } var AutoServiceMap = map[string]int{ - "pod_group": VIF_DEVICE_TYPE_POD_GROUP, - "service": VIF_DEVICE_TYPE_SERVICE, + "pod_group": VIF_DEVICE_TYPE_POD_GROUP, + "deployment": VIF_DEVICE_TYPE_POD_GROUP_DEPLOYMENT, + "stateful_set": VIF_DEVICE_TYPE_POD_GROUP_STATEFULSET, + "replication_controller": VIF_DEVICE_TYPE_POD_GROUP_RC, + "daemon_set": VIF_DEVICE_TYPE_POD_GROUP_DAEMON_SET, + "replica_set_controller": VIF_DEVICE_TYPE_POD_GROUP_REPLICASET_CONTROLLER, + "clone_set": VIF_DEVICE_TYPE_POD_GROUP_CLONESET, + "service": VIF_DEVICE_TYPE_SERVICE, +} + +var PodGroupTypeSlice = []string{ + "deployment", "stateful_set", "replication_controller", "daemon_set", + "replica_set_controller", "clone_set", } var NoLanguageTag = []string{ diff --git a/server/querier/engine/clickhouse/tag/description.go b/server/querier/engine/clickhouse/tag/description.go index 8e0d577a054..093c528c0ea 100644 --- a/server/querier/engine/clickhouse/tag/description.go +++ b/server/querier/engine/clickhouse/tag/description.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "golang.org/x/exp/slices" "regexp" "strconv" "strings" @@ -701,6 +702,9 @@ func GetTagResourceValues(db, table, rawSql string) (*common.Result, []string, e "auto_service": AutoServiceMap, } for resourceKey, resourceType := range autoMap[tag] { + if slices.Contains(PodGroupTypeSlice, resourceKey) { + continue + } resourceId := resourceKey + "_id" resourceName := resourceKey + "_name" if resourceKey == "service" { From 96b678d0db953c62e3c0e4d1e54b82636659bd08 Mon Sep 17 00:00:00 2001 From: Jiping Yin Date: Mon, 16 Oct 2023 21:50:56 +0800 Subject: [PATCH 9/9] [eBPF] Support debian 10.6 - 4.19.0-25-amd64 (#4483) --- agent/src/ebpf/kernel/include/task_struct_utils.h | 13 +++++++------ agent/src/ebpf/user/common.c | 9 +++++++++ 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/agent/src/ebpf/kernel/include/task_struct_utils.h b/agent/src/ebpf/kernel/include/task_struct_utils.h index 82721c366c3..a411edb29c4 100644 --- a/agent/src/ebpf/kernel/include/task_struct_utils.h +++ b/agent/src/ebpf/kernel/include/task_struct_utils.h @@ -127,10 +127,9 @@ static __inline void *infer_and_get_socket_from_fd(int fd_num, // TAG: STRUCT_TASK_FILES_OFFSET // 成员 files 在 struct task_struct 中的偏移量 - // 0xd08 for kernel-devel-4.19.91-26.6.al7 +#ifdef LINUX_VER_5_2_PLUS // 0xa48 for 5.10.0-60.18.0.50.h322_1.hce2.aarch64 // 0xc60 for 5.10.0-106.18.0.68.oe2209.x86_64 -#ifdef LINUX_VER_5_2_PLUS int files_offset_array[] = { 0x790, 0xa80, 0xa88, 0xa90, 0xa98, 0xaa0, 0xaa8, 0xab0, 0xab8, 0xac0, 0xac8, 0xad0, 0xad8, 0xae0, 0xae8, 0xaf0, 0xaf8, 0xb00, 0xb08, 0xb10, @@ -140,12 +139,14 @@ static __inline void *infer_and_get_socket_from_fd(int fd_num, 0xcc8, 0xa48, 0xc60 }; #else + // 0xd08 for kernel-devel-4.19.91-26.6.al7 // 0x740 for 4.19.113-300.el7.x86_64 + // 0x6c0 for 4.19.0-25-amd64 #1 SMP Debian 4.19.289-2 (2023-08-08) int files_offset_array[] = { - 0x790, 0xa80, 0xa88, 0xa90, 0xa98, 0xaa0, 0xaa8, 0xab0, 0xab8, 0xac0, - 0xac8, 0xad0, 0xad8, 0xae0, 0xae8, 0xaf0, 0xaf8, 0xb00, 0xb08, 0xb10, - 0xb18, 0xb20, 0xb28, 0xb48, 0xb50, 0xb58, 0xb60, 0xb68, 0xb70, 0xb78, - 0xb80, 0xb88, 0xb90, 0xb98, 0xba0, 0x740, 0xbb8, 0xbc0, 0xbc8, 0xbd0, + 0x6c0, 0x790, 0xa80, 0xa88, 0xa90, 0xa98, 0xaa0, 0xaa8, 0xab0, 0xab8, + 0xac0, 0xac8, 0xad0, 0xad8, 0xae0, 0xae8, 0xaf0, 0xaf8, 0xb00, 0xb08, + 0xb10, 0xb18, 0xb20, 0xb28, 0xb48, 0xb50, 0xb58, 0xb60, 0xb68, 0xb70, + 0xb78, 0xb88, 0xb90, 0xb98, 0xba0, 0x740, 0xbb8, 0xbc0, 0xbc8, 0xbd0, 0xbd8, 0xbe0, 0xbe8, 0xbf0, 0xbf8, 0xc00, 0xc08, 0xc10, 0xcc8, 0xd08, }; #endif diff --git a/agent/src/ebpf/user/common.c b/agent/src/ebpf/user/common.c index ee7fbe266aa..678740bbf2a 100644 --- a/agent/src/ebpf/user/common.c +++ b/agent/src/ebpf/user/common.c @@ -523,6 +523,15 @@ int fetch_kernel_version(int *major, int *minor, int *patch) if (sscanf(sys_info.release, "%u.%u.%u", major, minor, patch) != 3) return ETR_INVAL; + // Get the real version of Debian + //#1 SMP Debian 4.19.289-2 (2023-08-08) + if (strstr(sys_info.version, "Debian")) { + int num; + if (sscanf(sys_info.version, "%*s %*s %*s %u.%u.%u-%u %*s", + major, minor, patch, &num) != 4) + return ETR_INVAL; + } + return ETR_OK; }