diff --git a/agent/resources/test/flow_generator/mongo/mongo.pcap b/agent/resources/test/flow_generator/mongo/mongo.pcap new file mode 100644 index 00000000000..d5be4b29127 Binary files /dev/null and b/agent/resources/test/flow_generator/mongo/mongo.pcap differ diff --git a/agent/resources/test/flow_generator/mongo/mongo.result b/agent/resources/test/flow_generator/mongo/mongo.result new file mode 100644 index 00000000000..68f6c87fc1d --- /dev/null +++ b/agent/resources/test/flow_generator/mongo/mongo.result @@ -0,0 +1,30 @@ +MongoDBInfo { msg_type: Request, is_tls: false, req_len: 0, resp_len: 0, request_id: 0, response_id: 0, op_code: 2004, op_code_name: "OP_QUERY", request: "{ \"isMaster\": 1, \"speculativeAuthenticate\": { \"saslStart\": 1, \"mechanism\": \"SCRAM-SHA-256\", \"payload\": Binary(0x0, biwsbj1hZG1pbixyPW5mdGVQaVovV1NuMUZrNjF5QWpFV29xbThaL0Y2MGc5), \"db\": \"admin\" }, \"saslSupportedMechs\": \"admin.admin\", \"client\": { \"application\": { \"name\": \"MongoDB Shell\" }, \"driver\": { \"name\": \"MongoDB Internal Client\", \"version\": \"4.4.25\" }, \"os\": { \"type\": \"Linux\", \"name\": \"CentOS Linux release 7.9.2009 (Core)\", \"architecture\": \"x86_64\", \"version\": \"Kernel 3.10.0-1160.80.1.el7.x86_64\" } } }", response: "", response_code: 0, exception: "", rrt: 0 } is_mongo: true +MongoDBInfo { msg_type: Response, is_tls: false, req_len: 0, resp_len: 0, request_id: 0, response_id: 60, op_code: 1, op_code_name: "OP_REPLY", request: "", response: "{ \"ismaster\": true, \"topologyVersion\": { \"processId\": ObjectId(\"652213ba46c335fa2820b0dc\"), \"counter\": 0 }, \"maxBsonObjectSize\": 16777216, \"maxMessageSizeBytes\": 48000000, \"maxWriteBatchSize\": 100000, \"localTime\": DateTime(\"2023-10-08 2:46:22.212 +00:00:00\"), \"logicalSessionTimeoutMinutes\": 30, \"connectionId\": 3, \"minWireVersion\": 0, \"maxWireVersion\": 9, \"readOnly\": false, \"saslSupportedMechs\": [\"SCRAM-SHA-1\", \"SCRAM-SHA-256\"], \"speculativeAuthenticate\": { \"conversationId\": 1, \"done\": false, \"payload\": Binary(0x0, cj1uZnRlUGlaL1dTbjFGazYxeUFqRVdvcW04Wi9GNjBnOWJMZUpWOExOL3JQUUVtWERkYjZMTjJVb1puZlRidnZnLHM9dEpLa0drajNQcUNpc1dsdkN0L0gyWDZDVm5NOG5GVlV4UG1vQkE9PSxpPTE1MDAw) }, \"ok\": 1 }", response_code: 0, exception: "", rrt: 1053 } is_mongo: false +MongoDBInfo { msg_type: Request, is_tls: false, req_len: 0, resp_len: 0, request_id: 1, response_id: 0, op_code: 2013, op_code_name: "OP_MSG", request: "{ \"saslContinue\": 1, \"payload\": Binary(0x0, Yz1iaXdzLHI9bmZ0ZVBpWi9XU24xRms2MXlBakVXb3FtOFovRjYwZzliTGVKVjhMTi9yUFFFbVhEZGI2TE4yVW9abmZUYnZ2ZyxwPWhBVFRhMkhFWEw1VkRMRWFVdVM4OG84cGNIZmpRK1ZRRklkcnFwQjR1cXM9), \"conversationId\": 1, \"$db\": \"admin\" }", response: "", response_code: 0, exception: "", rrt: 0 } is_mongo: true +MongoDBInfo { msg_type: Response, is_tls: false, req_len: 0, resp_len: 0, request_id: 1, response_id: 61, op_code: 2013, op_code_name: "OP_MSG", request: "", response: "{ \"conversationId\": 1, \"done\": false, \"payload\": Binary(0x0, dj1nT0psRVhyMTdXblV0UThqcDMvUlQ5bDhvRDZRN01GWDlGS3FUelRhdHpjPQ==), \"ok\": 1 }", response_code: 0, exception: "", rrt: 325 } is_mongo: false +MongoDBInfo { msg_type: Request, is_tls: false, req_len: 0, resp_len: 0, request_id: 2, response_id: 0, op_code: 2013, op_code_name: "OP_MSG", request: "{ \"saslContinue\": 1, \"payload\": Binary(0x0, ), \"conversationId\": 1, \"$db\": \"admin\" }", response: "", response_code: 0, exception: "", rrt: 0 } is_mongo: true +MongoDBInfo { msg_type: Response, is_tls: false, req_len: 0, resp_len: 0, request_id: 2, response_id: 62, op_code: 2013, op_code_name: "OP_MSG", request: "", response: "{ \"conversationId\": 1, \"done\": true, \"payload\": Binary(0x0, ), \"ok\": 1 }", response_code: 0, exception: "", rrt: 338 } is_mongo: false +MongoDBInfo { msg_type: Request, is_tls: false, req_len: 0, resp_len: 0, request_id: 3, response_id: 0, op_code: 2013, op_code_name: "OP_MSG", request: "{ \"whatsmyuri\": 1, \"$db\": \"admin\" }", response: "", response_code: 0, exception: "", rrt: 0 } is_mongo: true +MongoDBInfo { msg_type: Response, is_tls: false, req_len: 0, resp_len: 0, request_id: 3, response_id: 63, op_code: 2013, op_code_name: "OP_MSG", request: "", response: "{ \"you\": \"10.50.1.138:43250\", \"ok\": 1 }", response_code: 0, exception: "", rrt: 128 } is_mongo: false +MongoDBInfo { msg_type: Request, is_tls: false, req_len: 0, resp_len: 0, request_id: 4, response_id: 0, op_code: 2013, op_code_name: "OP_MSG", request: "{ \"buildinfo\": 1, \"$db\": \"admin\" }", response: "", response_code: 0, exception: "", rrt: 0 } is_mongo: true +MongoDBInfo { msg_type: Response, is_tls: false, req_len: 0, resp_len: 0, request_id: 4, response_id: 64, op_code: 2013, op_code_name: "OP_MSG", request: "", response: "{}", response_code: 0, exception: "", rrt: 196 } is_mongo: false +MongoDBInfo { msg_type: Request, is_tls: false, req_len: 0, resp_len: 0, request_id: 5, response_id: 0, op_code: 2013, op_code_name: "OP_MSG", request: "{ \"getLog\": \"startupWarnings\", \"lsid\": { \"id\": Binary(0x4, uU9EjcLlRI+tnzaqrJqWqQ==) }, \"$db\": \"admin\" }", response: "", response_code: 0, exception: "", rrt: 0 } is_mongo: true +MongoDBInfo { msg_type: Response, is_tls: false, req_len: 0, resp_len: 0, request_id: 5, response_id: 65, op_code: 2013, op_code_name: "OP_MSG", request: "", response: "{ \"totalLinesWritten\": 3, \"log\": [\"{\"t\":{\"$date\":\"2023-10-08T10:28:11.902+08:00\"},\"s\":\"W\", \"c\":\"CONTROL\", \"id\":22120, \"ctx\":\"initandlisten\",\"msg\":\"Access control is not enabled for the database. Read and write access to data and configuration is unrestricted\",\"tags\":[\"startupWarnings\"]}\", \"{\"t\":{\"$date\":\"2023-10-08T10:28:11.902+08:00\"},\"s\":\"W\", \"c\":\"CONTROL\", \"id\":22178, \"ctx\":\"initandlisten\",\"msg\":\"/sys/kernel/mm/transparent_hugepage/enabled is 'always'. We suggest setting it to 'never'\",\"tags\":[\"startupWarnings\"]}\", \"{\"t\":{\"$date\":\"2023-10-08T10:28:11.902+08:00\"},\"s\":\"W\", \"c\":\"CONTROL\", \"id\":22181, \"ctx\":\"initandlisten\",\"msg\":\"/sys/kernel/mm/transparent_hugepage/defrag is 'always'. We suggest setting it to 'never'\",\"tags\":[\"startupWarnings\"]}\"], \"ok\": 1 }", response_code: 0, exception: "", rrt: 182 } is_mongo: false +MongoDBInfo { msg_type: Request, is_tls: false, req_len: 0, resp_len: 0, request_id: 6, response_id: 0, op_code: 2013, op_code_name: "OP_MSG", request: "{ \"isMaster\": 1, \"forShell\": 1, \"lsid\": { \"id\": Binary(0x4, uU9EjcLlRI+tnzaqrJqWqQ==) }, \"$db\": \"test\" }", response: "", response_code: 0, exception: "", rrt: 0 } is_mongo: true +MongoDBInfo { msg_type: Response, is_tls: false, req_len: 0, resp_len: 0, request_id: 6, response_id: 66, op_code: 2013, op_code_name: "OP_MSG", request: "", response: "{ \"ismaster\": true, \"topologyVersion\": { \"processId\": ObjectId(\"652213ba46c335fa2820b0dc\"), \"counter\": 0 }, \"maxBsonObjectSize\": 16777216, \"maxMessageSizeBytes\": 48000000, \"maxWriteBatchSize\": 100000, \"localTime\": DateTime(\"2023-10-08 2:46:22.3 +00:00:00\"), \"logicalSessionTimeoutMinutes\": 30, \"connectionId\": 3, \"minWireVersion\": 0, \"maxWireVersion\": 9, \"readOnly\": false, \"ok\": 1 }", response_code: 0, exception: "", rrt: 174 } is_mongo: false +MongoDBInfo { msg_type: Request, is_tls: false, req_len: 0, resp_len: 0, request_id: 7, response_id: 0, op_code: 2013, op_code_name: "OP_MSG", request: "{ \"buildInfo\": 1, \"lsid\": { \"id\": Binary(0x4, uU9EjcLlRI+tnzaqrJqWqQ==) }, \"$db\": \"test\" }", response: "", response_code: 0, exception: "", rrt: 0 } is_mongo: true +MongoDBInfo { msg_type: Response, is_tls: false, req_len: 0, resp_len: 0, request_id: 7, response_id: 67, op_code: 2013, op_code_name: "OP_MSG", request: "", response: "{}", response_code: 0, exception: "", rrt: 139 } is_mongo: false +MongoDBInfo { msg_type: Request, is_tls: false, req_len: 0, resp_len: 0, request_id: 8, response_id: 0, op_code: 2013, op_code_name: "OP_MSG", request: "{ \"getCmdLineOpts\": 1, \"lsid\": { \"id\": Binary(0x4, uU9EjcLlRI+tnzaqrJqWqQ==) }, \"$db\": \"admin\" }", response: "", response_code: 0, exception: "", rrt: 0 } is_mongo: true +MongoDBInfo { msg_type: Response, is_tls: false, req_len: 0, resp_len: 0, request_id: 8, response_id: 68, op_code: 2013, op_code_name: "OP_MSG", request: "", response: "{ \"argv\": [\"/usr/bin/mongod\", \"-f\", \"/etc/mongod.conf\"], \"parsed\": { \"config\": \"/etc/mongod.conf\", \"net\": { \"bindIp\": \"0.0.0.0\", \"port\": 27017 }, \"processManagement\": { \"timeZoneInfo\": \"/usr/share/zoneinfo\" }, \"storage\": { \"dbPath\": \"/var/lib/mongo\", \"journal\": { \"enabled\": true } }, \"systemLog\": { \"destination\": \"file\", \"logAppend\": true, \"path\": \"/var/log/mongodb/mongod.log\" } }, \"ok\": 1 }", response_code: 0, exception: "", rrt: 135 } is_mongo: false +MongoDBInfo { msg_type: Request, is_tls: false, req_len: 0, resp_len: 0, request_id: 9, response_id: 0, op_code: 2013, op_code_name: "OP_MSG", request: "{ \"buildInfo\": 1, \"$db\": \"test\" }", response: "", response_code: 0, exception: "", rrt: 0 } is_mongo: true +MongoDBInfo { msg_type: Response, is_tls: false, req_len: 0, resp_len: 0, request_id: 9, response_id: 69, op_code: 2013, op_code_name: "OP_MSG", request: "", response: "{}", response_code: 0, exception: "", rrt: 207 } is_mongo: false +MongoDBInfo { msg_type: Request, is_tls: false, req_len: 0, resp_len: 0, request_id: 10, response_id: 0, op_code: 2013, op_code_name: "OP_MSG", request: "{ \"isMaster\": 1, \"forShell\": 1, \"$db\": \"test\" }", response: "", response_code: 0, exception: "", rrt: 0 } is_mongo: true +MongoDBInfo { msg_type: Response, is_tls: false, req_len: 0, resp_len: 0, request_id: 10, response_id: 70, op_code: 2013, op_code_name: "OP_MSG", request: "", response: "{ \"ismaster\": true, \"topologyVersion\": { \"processId\": ObjectId(\"652213ba46c335fa2820b0dc\"), \"counter\": 0 }, \"maxBsonObjectSize\": 16777216, \"maxMessageSizeBytes\": 48000000, \"maxWriteBatchSize\": 100000, \"localTime\": DateTime(\"2023-10-08 2:46:22.306 +00:00:00\"), \"logicalSessionTimeoutMinutes\": 30, \"connectionId\": 3, \"minWireVersion\": 0, \"maxWireVersion\": 9, \"readOnly\": false, \"ok\": 1 }", response_code: 0, exception: "", rrt: 143 } is_mongo: false +MongoDBInfo { msg_type: Request, is_tls: false, req_len: 0, resp_len: 0, request_id: 11, response_id: 0, op_code: 2013, op_code_name: "OP_MSG", request: "{ \"replSetGetStatus\": 1, \"forShell\": 1, \"$db\": \"admin\" }", response: "", response_code: 0, exception: "", rrt: 0 } is_mongo: true +MongoDBInfo { msg_type: Response, is_tls: false, req_len: 0, resp_len: 0, request_id: 11, response_id: 71, op_code: 2013, op_code_name: "OP_MSG", request: "", response: "", response_code: 76, exception: "not running with --replSet", rrt: 571 } is_mongo: false +MongoDBInfo { msg_type: Request, is_tls: false, req_len: 0, resp_len: 0, request_id: 12, response_id: 0, op_code: 2013, op_code_name: "OP_MSG", request: "{ \"getLog\": \"startupWarnings\", \"lsid\": { \"id\": Binary(0x4, uU9EjcLlRI+tnzaqrJqWqQ==) }, \"$db\": \"admin\" }", response: "", response_code: 0, exception: "", rrt: 0 } is_mongo: true +MongoDBInfo { msg_type: Response, is_tls: false, req_len: 0, resp_len: 0, request_id: 12, response_id: 72, op_code: 2013, op_code_name: "OP_MSG", request: "", response: "{ \"totalLinesWritten\": 3, \"log\": [\"{\"t\":{\"$date\":\"2023-10-08T10:28:11.902+08:00\"},\"s\":\"W\", \"c\":\"CONTROL\", \"id\":22120, \"ctx\":\"initandlisten\",\"msg\":\"Access control is not enabled for the database. Read and write access to data and configuration is unrestricted\",\"tags\":[\"startupWarnings\"]}\", \"{\"t\":{\"$date\":\"2023-10-08T10:28:11.902+08:00\"},\"s\":\"W\", \"c\":\"CONTROL\", \"id\":22178, \"ctx\":\"initandlisten\",\"msg\":\"/sys/kernel/mm/transparent_hugepage/enabled is 'always'. We suggest setting it to 'never'\",\"tags\":[\"startupWarnings\"]}\", \"{\"t\":{\"$date\":\"2023-10-08T10:28:11.902+08:00\"},\"s\":\"W\", \"c\":\"CONTROL\", \"id\":22181, \"ctx\":\"initandlisten\",\"msg\":\"/sys/kernel/mm/transparent_hugepage/defrag is 'always'. We suggest setting it to 'never'\",\"tags\":[\"startupWarnings\"]}\"], \"ok\": 1 }", response_code: 0, exception: "", rrt: 334 } is_mongo: false +MongoDBInfo { msg_type: Request, is_tls: false, req_len: 0, resp_len: 0, request_id: 13, response_id: 0, op_code: 2013, op_code_name: "OP_MSG", request: "{ \"isMaster\": 1, \"forShell\": 1, \"$db\": \"test\" }", response: "", response_code: 0, exception: "", rrt: 0 } is_mongo: true +MongoDBInfo { msg_type: Response, is_tls: false, req_len: 0, resp_len: 0, request_id: 13, response_id: 73, op_code: 2013, op_code_name: "OP_MSG", request: "", response: "{ \"ismaster\": true, \"topologyVersion\": { \"processId\": ObjectId(\"652213ba46c335fa2820b0dc\"), \"counter\": 0 }, \"maxBsonObjectSize\": 16777216, \"maxMessageSizeBytes\": 48000000, \"maxWriteBatchSize\": 100000, \"localTime\": DateTime(\"2023-10-08 2:46:26.793 +00:00:00\"), \"logicalSessionTimeoutMinutes\": 30, \"connectionId\": 3, \"minWireVersion\": 0, \"maxWireVersion\": 9, \"readOnly\": false, \"ok\": 1 }", response_code: 0, exception: "", rrt: 189 } is_mongo: false +MongoDBInfo { msg_type: Request, is_tls: false, req_len: 0, resp_len: 0, request_id: 14, response_id: 0, op_code: 2013, op_code_name: "OP_MSG", request: "{ \"endSessions\": [{ \"id\": Binary(0x4, uU9EjcLlRI+tnzaqrJqWqQ==) }], \"$db\": \"admin\" }", response: "", response_code: 0, exception: "", rrt: 0 } is_mongo: true +MongoDBInfo { msg_type: Response, is_tls: false, req_len: 0, resp_len: 0, request_id: 14, response_id: 74, op_code: 2013, op_code_name: "OP_MSG", request: "", response: "{ \"ok\": 1 }", response_code: 0, exception: "", rrt: 786 } is_mongo: false diff --git a/agent/src/collector/l7_quadruple_generator.rs b/agent/src/collector/l7_quadruple_generator.rs index c1705224953..afb258db44a 100644 --- a/agent/src/collector/l7_quadruple_generator.rs +++ b/agent/src/collector/l7_quadruple_generator.rs @@ -270,7 +270,7 @@ impl SubQuadGen { flow: flow.clone(), l7_protocol: meter.l7_protocol, endpoint_hash, - endpoint: meter.endpoint.clone(), + endpoint: meter.endpoint, is_active_host0, is_active_host1, time_in_second: tagged_flow.flow.flow_stat_time, @@ -549,8 +549,8 @@ impl L7QuadrupleGenerator { ) { let mut second_inject = false; let mut minute_inject = false; - if let Some(s) = self.second_quad_gen.as_mut() { - if config.vtap_flow_1s_enabled { + if config.vtap_flow_1s_enabled { + if let Some(s) = self.second_quad_gen.as_mut() { second_inject = s.move_window(time_in_second); } } @@ -565,7 +565,11 @@ impl L7QuadrupleGenerator { let endpoint_hash = hash_endpoint(&l7_stats.endpoint) as u32; - let app_meter = Self::generate_app_meter(&l7_stats, config.l7_metrics_enabled); + let app_meter = if config.l7_metrics_enabled { + Self::generate_app_meter(&l7_stats) + } else { + AppMeter::default() + }; if second_inject { self.second_quad_gen.as_mut().unwrap().inject_app_meter( @@ -588,12 +592,7 @@ impl L7QuadrupleGenerator { } } - fn generate_app_meter(l7_stats: &L7Stats, l7_metrics_enabled: bool) -> AppMeter { - let mut app_meter = AppMeter::default(); - - if !l7_metrics_enabled { - return app_meter; - } + fn generate_app_meter(l7_stats: &L7Stats) -> AppMeter { let (close_type, direction_score) = if let Some(tagged_flow) = &l7_stats.flow { ( tagged_flow.flow.close_type, @@ -609,7 +608,7 @@ impl L7QuadrupleGenerator { SignalSource::Packet | SignalSource::EBPF | SignalSource::XFlow, ) => { // only L7Protocol is Unknown or Other and SignalSource != Otel will execute the following logic - app_meter = AppMeter { + AppMeter { traffic: AppTraffic { request: (close_type != CloseType::ForcedReport) as u32, response: (close_type != CloseType::ForcedReport) as u32, @@ -618,28 +617,24 @@ impl L7QuadrupleGenerator { ..Default::default() } } - (_, _) => { - app_meter = AppMeter { - traffic: AppTraffic { - request: stats.request_count, - response: stats.response_count, - direction_score: direction_score, - }, - latency: AppLatency { - rrt_max: stats.rrt_max, - rrt_sum: stats.rrt_sum as u64, - rrt_count: stats.rrt_count, - }, - anomaly: AppAnomaly { - client_error: stats.err_client_count, - server_error: stats.err_server_count, - timeout: stats.err_timeout, - }, - }; - } + (_, _) => AppMeter { + traffic: AppTraffic { + request: stats.request_count, + response: stats.response_count, + direction_score: direction_score, + }, + latency: AppLatency { + rrt_max: stats.rrt_max, + rrt_sum: stats.rrt_sum as u64, + rrt_count: stats.rrt_count, + }, + anomaly: AppAnomaly { + client_error: stats.err_client_count, + server_error: stats.err_server_count, + timeout: stats.err_timeout, + }, + }, } - - app_meter } fn handler_routine(&mut self) { diff --git a/agent/src/flow_generator/flow_map.rs b/agent/src/flow_generator/flow_map.rs index 444ae0e0030..6d3a1be6669 100644 --- a/agent/src/flow_generator/flow_map.rs +++ b/agent/src/flow_generator/flow_map.rs @@ -48,7 +48,6 @@ use super::{ use crate::{ common::{ - ebpf::EbpfType, endpoint::{ EndpointData, EndpointDataPov, EndpointInfo, EPC_FROM_DEEPFLOW, EPC_FROM_INTERNET, }, @@ -1009,12 +1008,6 @@ impl FlowMap { fn init_flow(&mut self, config: &Config, meta_packet: &mut MetaPacket) -> Box { let flow_config = config.flow; - match meta_packet.ebpf_type { - EbpfType::GoHttp2Uprobe | EbpfType::GoHttp2UprobeData => {} - _ => { - meta_packet.lookup_key.direction = PacketDirection::ClientToServer; - } - } let mut tagged_flow = TaggedFlow::default(); let lookup_key = &meta_packet.lookup_key; @@ -1574,6 +1567,9 @@ impl FlowMap { node.flow_state = FlowState::Established; // opening timeout node.timeout = config.flow.flow_timeout.opening; + if let Some(meta_flow_log) = node.meta_flow_log.as_mut() { + let _ = meta_flow_log.parse_l3(meta_packet); + } node } diff --git a/agent/src/flow_generator/perf/icmp.rs b/agent/src/flow_generator/perf/icmp.rs index 410060f1a31..de3e684f757 100644 --- a/agent/src/flow_generator/perf/icmp.rs +++ b/agent/src/flow_generator/perf/icmp.rs @@ -14,7 +14,7 @@ * limitations under the License. */ -use std::cmp::max; +use std::{cmp::max, collections::VecDeque}; use pnet::packet::{ icmp::{IcmpType, IcmpTypes}, @@ -45,8 +45,8 @@ pub struct IcmpPerf { srt_max: Timestamp, srt_sum: Timestamp, srt_count: u32, - last_requests: Vec, - last_replies: Vec, + last_requests: VecDeque, + last_replies: VecDeque, data_update_flag: bool, } @@ -54,6 +54,22 @@ impl IcmpPerf { pub fn new() -> Self { IcmpPerf::default() } + + fn set_srt(&mut self, srt: Timestamp) { + if srt <= ART_MAX { + self.srt_max = max(self.srt_max, srt); + self.srt_sum += srt; + self.srt_count += 1; + self.data_update_flag = true; + } + } + + fn reset(&mut self) { + self.srt_max = Timestamp::default(); + self.srt_sum = Timestamp::default(); + self.srt_count = 0; + self.data_update_flag = false; + } } impl L4FlowPerf for IcmpPerf { @@ -89,19 +105,14 @@ impl L4FlowPerf for IcmpPerf { { if pkt_timestamp <= self.last_replies[i].timestamp { let srt = Timestamp::from(self.last_replies[i].timestamp - pkt_timestamp); - if srt <= ART_MAX { - self.srt_max = max(self.srt_max, srt); - self.srt_sum += srt; - self.srt_count += 1; - self.data_update_flag = true; - } + self.set_srt(srt); } self.last_replies.remove(i); } else { if self.last_requests.len() >= MAX_CACHE_COUNT { - self.last_requests.remove(0); + let _ = self.last_requests.pop_front(); } - self.last_requests.push(LastIcmp { + self.last_requests.push_back(LastIcmp { timestamp: pkt_timestamp, id_and_seq: icmp_data.echo_id_seq, }); @@ -114,19 +125,14 @@ impl L4FlowPerf for IcmpPerf { { if pkt_timestamp >= self.last_requests[i].timestamp { let srt = Timestamp::from(pkt_timestamp - self.last_requests[i].timestamp); - if srt <= ART_MAX { - self.srt_max = max(self.srt_max, srt); - self.srt_sum += srt; - self.srt_count += 1; - self.data_update_flag = true; - } + self.set_srt(srt); } self.last_requests.remove(i); } else { if self.last_replies.len() >= MAX_CACHE_COUNT { - self.last_replies.remove(0); + let _ = self.last_replies.pop_front(); } - self.last_replies.push(LastIcmp { + self.last_replies.push_back(LastIcmp { timestamp: pkt_timestamp, id_and_seq: icmp_data.echo_id_seq, }); @@ -140,12 +146,35 @@ impl L4FlowPerf for IcmpPerf { } fn copy_and_reset_data(&mut self, _: bool) -> FlowPerfStats { + for request_index in (0..self.last_requests.len()).rev() { + let request = &self.last_requests[request_index]; + if let Some(reply_index) = self + .last_replies + .iter() + .position(|reply| request.id_and_seq == reply.id_and_seq) + { + let reply = &self.last_replies[reply_index]; + if request.timestamp <= reply.timestamp { + let srt = Timestamp::from(reply.timestamp - request.timestamp); + if srt <= ART_MAX { + self.srt_max = max(self.srt_max, srt); + self.srt_sum += srt; + self.srt_count += 1; + self.data_update_flag = true; + } + } + + self.last_requests.remove(request_index); + self.last_replies.remove(reply_index); + } + } + let mut stats = FlowPerfStats::default(); stats.l4_protocol = L4Protocol::Icmp; stats.tcp.srt_max = (self.srt_max.as_nanos() / Timestamp::from_micros(1).as_nanos()) as u32; stats.tcp.srt_sum = (self.srt_sum.as_nanos() / Timestamp::from_micros(1).as_nanos()) as u32; stats.tcp.srt_count = self.srt_count; - *self = IcmpPerf::default(); + self.reset(); stats } diff --git a/agent/src/flow_generator/protocol_logs/sql/mongo.rs b/agent/src/flow_generator/protocol_logs/sql/mongo.rs index 0c6a76da795..16f035a3c65 100644 --- a/agent/src/flow_generator/protocol_logs/sql/mongo.rs +++ b/agent/src/flow_generator/protocol_logs/sql/mongo.rs @@ -220,11 +220,14 @@ const _OP_COMMAND: u32 = 2010; const _OP_COMMANDREPLY: u32 = 2011; const _OP_COMPRESSED: u32 = 2012; const _OP_MSG: u32 = 2013; -const _UNKNOWN: &str = "Unknown"; +const _UNKNOWN: &str = ""; const _HEADER_SIZE: usize = 16; const _EXCEPTION_OFFSET: usize = 20; +const _COLLECTION_NAME_OFFSET: usize = 20; +const _QUERY_DOC_OFFSET: usize = _COLLECTION_NAME_OFFSET + 8; // 8 is sizeof(Number to skip + Number to Reture) +const _MSG_DOC_SECTION_OFFSET: usize = _HEADER_SIZE + 4; // 4 is sizeof(Message Flags) impl MongoDBLog { // TODO: tracing @@ -264,16 +267,25 @@ impl MongoDBLog { // command decode match info.op_code { - _OP_MSG if payload.len() > _HEADER_SIZE => { + _OP_MSG if payload.len() > _MSG_DOC_SECTION_OFFSET => { // OP_MSG let mut msg_body = MongoOpMsg::default(); - msg_body.decode(&payload[_HEADER_SIZE..])?; + // TODO: Message Flags + msg_body.decode(&payload[_MSG_DOC_SECTION_OFFSET..])?; match info.msg_type { LogMessageType::Response => { - info.response = msg_body.sections.doc.to_string(); - info.exception = msg_body.sections.c_string.unwrap_or(_UNKNOWN.to_string()); - info.response_code = - msg_body.sections.doc.get_f64("code").unwrap_or(0.0) as i32; + // The data structure of doc is Bson, which is a normal response when there is no errmsg in it + if msg_body.sections.doc.get_str("errmsg").is_err() { + info.response = msg_body.sections.doc.to_string(); + } else { + info.exception = + msg_body.sections.doc.get_str("errmsg").unwrap().to_string(); + } + if info.exception.len() == 0 { + info.exception = + msg_body.sections.c_string.unwrap_or(_UNKNOWN.to_string()); + } + info.response_code = msg_body.sections.doc.get_i32("code").unwrap_or(0); if info.response_code > 0 { self.perf_stats.as_mut().map(|p| p.inc_resp_err()); } @@ -315,13 +327,16 @@ impl MongoDBLog { } _OP_QUERY if payload.len() > 28 => { // "OP_QUERY" - info.exception = CStr::from_bytes_until_nul(&payload[..20]) - .map_err(|_| Error::L7ProtocolUnknown)? - .to_string_lossy() - .into_owned(); + let collection_name = + CStr::from_bytes_until_nul(&payload[_COLLECTION_NAME_OFFSET..]) + .map_err(|_| Error::L7ProtocolUnknown)? + .to_string_lossy() + .into_owned(); - let query = Document::from_reader(&payload[28 + info.exception.len() + 1..]) - .unwrap_or(Document::default()); + let query = Document::from_reader( + &payload[_QUERY_DOC_OFFSET + collection_name.len() + 1..], + ) + .unwrap_or(Document::default()); info.request = query.to_string(); } _OP_GET_MORE | _OP_DELETE if payload.len() > 20 => { @@ -415,18 +430,24 @@ pub struct MongoOpMsg { } impl MongoOpMsg { + const _KIND_OFFSET: usize = 0; + const _KIND_LEN: usize = 1; + const _DOC_LENGTH_OFFSET: usize = Self::_KIND_OFFSET + Self::_KIND_LEN; + const _DOC_LENGTH_LEN: usize = 4; + fn decode(&mut self, payload: &[u8]) -> Result { - if payload.len() < 9 { + if payload.len() < Self::_DOC_LENGTH_OFFSET + Self::_DOC_LENGTH_LEN { return Ok(false); } - // todo: decode flag let mut sections = Sections::default(); //sections.kind = payload[4]; - let section_len = bytes::read_u32_le(&payload[5..9]); - if payload.len() < 4 + section_len as usize { + let section_len = bytes::read_u32_le( + &payload[Self::_DOC_LENGTH_OFFSET..Self::_DOC_LENGTH_OFFSET + Self::_DOC_LENGTH_LEN], + ); + if payload.len() < Self::_DOC_LENGTH_LEN + section_len as usize { return Ok(false); } - let _ = sections.decode(&payload[4..]); + let _ = sections.decode(&payload); self.sections = sections; // todo: decode checksum Ok(true) @@ -455,8 +476,9 @@ impl Sections { 0 => { // Body self.kind_name = "BODY".to_string(); - let lenght = bytes::read_u32_le(&payload[1..5]); - if lenght != payload.len() as u32 - 1 { + let length = bytes::read_u32_le(&payload[1..5]); + // TODO: When ChecksumPresent is 1, there will be checksum in the payload + if length != payload.len() as u32 - 1 && length != payload.len() as u32 - 5 { return Ok(false); } self.doc = Document::from_reader(&payload[1..]).unwrap_or(Document::default()); @@ -567,3 +589,85 @@ pub struct MongoOpUpdate { zero: u32, } */ + +#[cfg(test)] +mod tests { + use std::path::Path; + use std::rc::Rc; + use std::{cell::RefCell, fs}; + + use super::*; + + use crate::{ + common::{flow::PacketDirection, l7_protocol_log::L7PerfCache, MetaPacket}, + flow_generator::L7_RRT_CACHE_CAPACITY, + utils::test::Capture, + }; + + const FILE_DIR: &str = "resources/test/flow_generator/mongo"; + + fn run(name: &str) -> String { + let capture = Capture::load_pcap(Path::new(FILE_DIR).join(name), None); + let log_cache = Rc::new(RefCell::new(L7PerfCache::new(L7_RRT_CACHE_CAPACITY))); + let mut packets = capture.as_meta_packets(); + if packets.is_empty() { + return "".to_string(); + } + + let mut output: String = String::new(); + let first_dst_port = packets[0].lookup_key.dst_port; + for packet in packets.iter_mut() { + packet.lookup_key.direction = if packet.lookup_key.dst_port == first_dst_port { + PacketDirection::ClientToServer + } else { + PacketDirection::ServerToClient + }; + let payload = match packet.get_l4_payload() { + Some(p) => p, + None => continue, + }; + + let mut mongo = MongoDBLog::default(); + let param = &ParseParam::new(packet as &MetaPacket, log_cache.clone(), true, true); + + let is_mongo = mongo.check_payload(payload, param); + let info = mongo.parse_payload(payload, param); + if let Ok(info) = info { + match info.unwrap_single() { + L7ProtocolInfo::MongoDBInfo(i) => { + output.push_str(&format!("{:?} is_mongo: {}\r\n", i, is_mongo)); + } + _ => unreachable!(), + } + } else { + output.push_str(&format!( + "{:?} is_mongo: {}\r\n", + MongoDBInfo::default(), + is_mongo + )); + } + } + output + } + + #[test] + fn check() { + let files = vec![("mongo.pcap", "mongo.result")]; + + for item in files.iter() { + let expected = fs::read_to_string(&Path::new(FILE_DIR).join(item.1)).unwrap(); + let output = run(item.0); + + if output != expected { + let output_path = Path::new("actual.txt"); + fs::write(&output_path, &output).unwrap(); + assert!( + output == expected, + "output different from expected {}, written to {:?}", + item.1, + output_path + ); + } + } + } +} diff --git a/server/controller/db/mysql/platform_rsc_model.go b/server/controller/db/mysql/platform_rsc_model.go index 076d8187532..e35b7f02b0c 100644 --- a/server/controller/db/mysql/platform_rsc_model.go +++ b/server/controller/db/mysql/platform_rsc_model.go @@ -68,8 +68,8 @@ type SoftDeleteBase struct { } type Process struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` VTapID uint32 `gorm:"column:vtap_id;type:int;not null;default:0" mapstructure:"VTAP_ID"` PID uint64 `gorm:"column:pid;type:int;not null;default:0" mapstructure:"PID"` @@ -85,8 +85,8 @@ type Process struct { } type Domain struct { - Base `gorm:"embedded"` - OperatedTime `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + OperatedTime `gorm:"embedded" mapstructure:",squash"` SyncedAt *time.Time `gorm:"column:synced_at" mapstructure:"SYNCED_AT"` Name string `gorm:"column:name;type:varchar(64)" mapstructure:"NAME"` IconID int `gorm:"column:icon_id;type:int" mapstructure:"ICON_ID"` @@ -103,8 +103,8 @@ type Domain struct { // TODO 最终可以与cloud模块命名统一,Domain -> DomainLcuuid type SubDomain struct { - Base `gorm:"embedded"` - OperatedTime `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + OperatedTime `gorm:"embedded" mapstructure:",squash"` SyncedAt *time.Time `gorm:"column:synced_at" mapstructure:"SYNCED_AT"` Domain string `gorm:"column:domain;type:char(64);default:''" mapstructure:"DOMAIN"` Name string `gorm:"column:name;type:varchar(64);default:''" mapstructure:"NAME"` @@ -118,8 +118,8 @@ type SubDomain struct { } type Region struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(64);default:''" mapstructure:"NAME"` CreateMethod int `gorm:"column:create_method;type:int;default:0" mapstructure:"CREATE_METHOD"` // 0.learning 1.user_defined Label string `gorm:"column:label;type:varchar(64);default:''" mapstructure:"LABEL"` @@ -128,8 +128,8 @@ type Region struct { } type AZ struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(64);default:''" mapstructure:"NAME"` CreateMethod int `gorm:"column:create_method;type:int;default:0" mapstructure:"CREATE_METHOD"` // 0.learning 1.user_defined Label string `gorm:"column:label;type:varchar(64);default:''" mapstructure:"LABEL"` @@ -142,8 +142,8 @@ func (AZ) TableName() string { } type Host struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` Type int `gorm:"column:type;type:int" mapstructure:"TYPE"` // 1.Server 3.Gateway 4.DFI State int `gorm:"column:state;type:int" mapstructure:"STATE"` // 0.Temp 1.Creating 2.Complete 3.Modifying 4.Exception Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` @@ -168,8 +168,8 @@ func (Host) TableName() string { } type VM struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` State int `gorm:"index:state_server_index;column:state;type:int;not null" mapstructure:"STATE"` // 0.Temp 1.Creating 2.Created 3.To run 4.Running 5.To suspend 6.Suspended 7.To resume 8. To stop 9.Stopped 10.Modifing 11.Exception 12.Destroying Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` Alias string `gorm:"column:alias;type:char(64);default:''" mapstructure:"ALIAS"` @@ -190,7 +190,7 @@ func (VM) TableName() string { } type VMPodNodeConnection struct { - Base `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` VMID int `gorm:"column:vm_id;type:int;default:null" mapstructure:"VM_ID"` PodNodeID int `gorm:"column:pod_node_id;type:int;default:null" mapstructure:"POD_NODE_ID"` Domain string `gorm:"column:domain;type:char(64);not null" mapstructure:"DOMAIN"` @@ -202,7 +202,7 @@ func (VMPodNodeConnection) TableName() string { } type VMSecurityGroup struct { - Base `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` SecurityGroupID int `gorm:"column:sg_id;type:int;not null" mapstructure:"SG_ID"` VMID int `gorm:"column:vm_id;type:int;not null" mapstructure:"VM_ID"` Priority int `gorm:"column:priority;type:int;not null" mapstructure:"PRIORITY"` @@ -213,7 +213,7 @@ func (VMSecurityGroup) TableName() string { } type Contact struct { - Base `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` CreateMethod int `gorm:"column:create_method;type:int;default:0" mapstructure:"CREATE_METHOD"` // 0.learning 1.user_defined Mobile string `gorm:"column:mobile;type:char(13);default:''" mapstructure:"MOBILE"` @@ -229,7 +229,7 @@ type Contact struct { } type VPCContact struct { - Base `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` CreateMethod int `gorm:"column:create_method;type:int;default:0" mapstructure:"CREATE_METHOD"` // 0.learning 1.user_defined VPCID int `gorm:"column:epc_id;type:int;default:0" mapstructure:"VPC_ID"` ContactID int `gorm:"column:contact_id;type:int;default:0" mapstructure:"CONTACT_ID"` @@ -240,8 +240,8 @@ func (VPCContact) TableName() string { } type VPC struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` CreateMethod int `gorm:"column:create_method;type:int;default:0" mapstructure:"CREATE_METHOD"` // 0.learning 1.user_defined Label string `gorm:"column:label;type:varchar(64);default:''" mapstructure:"LABEL"` @@ -260,8 +260,8 @@ func (VPC) TableName() string { } type Network struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` State int `gorm:"column:state;type:int;not null" mapstructure:"STATE"` // 0.Temp 1.Creating 2.Created 3.Exception 4.Modifing 5.Destroying 6.Destroyed NetType int `gorm:"column:net_type;type:int;default:4" mapstructure:"NET_TYPE"` // 1.CTRL 2.SERVICE 3.WAN 4.LAN Name string `gorm:"column:name;type:varchar(256);not null" mapstructure:"NAME"` @@ -287,7 +287,7 @@ func (Network) TableName() string { } type Subnet struct { - Base `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` Prefix string `gorm:"column:prefix;type:char(64);default:''" mapstructure:"PREFIX"` Netmask string `gorm:"column:netmask;type:char(64);default:''" mapstructure:"NETMASK"` NetworkID int `gorm:"column:vl2id;type:int;default:null" mapstructure:"VL2ID"` @@ -302,8 +302,8 @@ func (Subnet) TableName() string { } type VRouter struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` State int `gorm:"index:state_server_index;column:state;type:int;not null" mapstructure:"STATE"` // 0.Temp 1.Creating 2.Created 3.Exception 4.Modifing 5.Destroying 6.To run 7.Running 8.To stop 9.Stopped Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` Label string `gorm:"column:label;type:char(64);default:''" mapstructure:"LABEL"` @@ -320,7 +320,7 @@ func (VRouter) TableName() string { } type RoutingTable struct { - Base `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` VRouterID int `gorm:"column:vnet_id;type:int;default:null" mapstructure:"VNET_ID"` Destination string `gorm:"column:destination;type:text;default:''" mapstructure:"DESTINATION"` NexthopType string `gorm:"column:nexthop_type;type:text;default:''" mapstructure:"NEXTHOP_TYPE"` @@ -328,8 +328,8 @@ type RoutingTable struct { } type DHCPPort struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` Domain string `gorm:"column:domain;type:char(64);not null" mapstructure:"DOMAIN"` Region string `gorm:"column:region;type:char(64);default:''" mapstructure:"REGION"` @@ -342,7 +342,7 @@ func (DHCPPort) TableName() string { } type VInterface struct { - Base `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:char(64);default:''" mapstructure:"NAME"` Index int `gorm:"column:ifindex;type:int;not null" mapstructure:"IFINDEX"` State int `gorm:"column:state;type:int;not null" mapstructure:"STATE"` // 1. Attached 2.Detached 3.Exception @@ -369,7 +369,7 @@ func (VInterface) TableName() string { } type LANIP struct { // TODO 添加region字段 - Base `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` IP string `gorm:"column:ip;type:char(64);default:''" mapstructure:"IP"` Netmask string `gorm:"column:netmask;type:char(64);default:''" mapstructure:"NETMASK"` Gateway string `gorm:"column:gateway;type:char(64);default:''" mapstructure:"GATEWAY"` @@ -390,7 +390,7 @@ func (LANIP) TableName() string { } type WANIP struct { - Base `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` IP string `gorm:"column:ip;type:char(64);default:''" mapstructure:"IP"` Alias string `gorm:"column:alias;type:char(64);default:''" mapstructure:"ALIAS"` Netmask int `gorm:"column:netmask;type:int;default:null" mapstructure:"NETMASK"` @@ -411,7 +411,7 @@ func (WANIP) TableName() string { } type FloatingIP struct { - Base `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` Domain string `gorm:"column:domain;type:char(64);not null" mapstructure:"DOMAIN"` Region string `gorm:"column:region;type:char(64);default:''" mapstructure:"REGION"` VPCID int `gorm:"column:epc_id;type:int;default:0" mapstructure:"VPC_ID"` @@ -425,8 +425,8 @@ func (FloatingIP) TableName() string { } type SecurityGroup struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` Label string `gorm:"column:label;type:varchar(64);default:''" mapstructure:"LABEL"` Alias string `gorm:"column:alias;type:char(64);default:''" mapstructure:"ALIAS"` @@ -437,7 +437,7 @@ type SecurityGroup struct { } type SecurityGroupRule struct { - Base `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` SecurityGroupID int `gorm:"column:sg_id;type:int;not null" mapstructure:"SG_ID"` Direction int `gorm:"column:direction;type:tinyint(1);not null;default:0" mapstructure:"DIRECTION"` // 0.Unknow 1.Ingress 2.Egress Protocol string `gorm:"column:protocol;type:char(64);default:''" mapstructure:"PROTOCOL"` @@ -451,8 +451,8 @@ type SecurityGroupRule struct { } type NATGateway struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` Label string `gorm:"column:label;type:char(64);default:''" mapstructure:"LABEL"` FloatingIPs string `gorm:"column:floating_ips;type:text;default:''" mapstructure:"FLOATING_IPS"` // separated by , @@ -468,7 +468,7 @@ func (NATGateway) TableName() string { } type NATRule struct { - Base `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` NATGatewayID int `gorm:"column:nat_id;type:int;default:0" mapstructure:"NAT_ID"` Type string `gorm:"column:type;type:char(16);default:''" mapstructure:"TYPE"` Protocol string `gorm:"column:protocol;type:char(64);default:''" mapstructure:"PROTOCOL"` @@ -485,7 +485,7 @@ func (NATRule) TableName() string { } type NATVMConnection struct { - Base `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` NATGatewayID int `gorm:"column:nat_id;type:int;default:null" mapstructure:"NAT_ID"` VMID int `gorm:"column:vm_id;type:int;default:null" mapstructure:"VM_ID"` Domain string `gorm:"column:domain;type:char(64);not null" mapstructure:"DOMAIN"` @@ -496,8 +496,8 @@ func (NATVMConnection) TableName() string { } type LB struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` Label string `gorm:"column:label;type:char(64);default:''" mapstructure:"LABEL"` Model int `gorm:"column:model;type:int;default:0" mapstructure:"MODEL"` // 1.Internal 2.External @@ -514,8 +514,8 @@ func (LB) TableName() string { } type LBListener struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` LBID int `gorm:"column:lb_id;type:int;default:0" mapstructure:"LB_ID"` Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` IPs string `gorm:"column:ips;type:text;default:''" mapstructure:"IPS"` // separated by , @@ -531,7 +531,7 @@ func (LBListener) TableName() string { } type LBTargetServer struct { - Base `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` LBID int `gorm:"column:lb_id;type:int;default:0" mapstructure:"LB_ID"` LBListenerID int `gorm:"column:lb_listener_id;type:int;default:0" mapstructure:"LB_LISTENER_ID"` VPCID int `gorm:"column:epc_id;type:int;default:0" mapstructure:"EPC_ID"` @@ -548,7 +548,7 @@ func (LBTargetServer) TableName() string { } type LBVMConnection struct { - Base `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` LBID int `gorm:"column:lb_id;type:int;default:null" mapstructure:"LB_ID"` VMID int `gorm:"column:vm_id;type:int;default:null" mapstructure:"VM_ID"` Domain string `gorm:"column:domain;type:char(64);not null" mapstructure:"DOMAIN"` @@ -559,8 +559,8 @@ func (LBVMConnection) TableName() string { } type PeerConnection struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` Label string `gorm:"column:label;type:char(64);default:''" mapstructure:"LABEL"` LocalVPCID int `gorm:"column:local_epc_id;type:int;default:0" mapstructure:"LOCAL_EPC_ID"` @@ -572,8 +572,8 @@ type PeerConnection struct { } type CEN struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` Label string `gorm:"column:label;type:char(64);default:''" mapstructure:"LABEL"` Alias string `gorm:"column:alias;type:char(64);default:''" mapstructure:"ALIAS"` @@ -586,8 +586,8 @@ func (CEN) TableName() string { } type RDSInstance struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` Label string `gorm:"column:label;type:char(64);default:''" mapstructure:"LABEL"` State int `gorm:"column:state;type:tinyint(1);not null;default:0" mapstructure:"STATE"` // 0. Unknown 1. Running 2. Recovering @@ -607,8 +607,8 @@ func (RDSInstance) TableName() string { } type RedisInstance struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` Label string `gorm:"column:label;type:char(64);default:''" mapstructure:"LABEL"` State int `gorm:"column:state;type:tinyint(1);not null;default:0" mapstructure:"STATE"` // 0. Unknown 1. Running 2. Recovering @@ -623,7 +623,7 @@ type RedisInstance struct { } type VIP struct { - Base `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` IP string `gorm:"column:ip;type:char(64);default:''" mapstructure:"IP"` Domain string `gorm:"column:domain;type:char(64);not null" mapstructure:"DOMAIN"` VTapID uint32 `gorm:"column:vtap_id;type:int;not null;default:0" mapstructure:"VTAP_ID"` @@ -634,8 +634,8 @@ func (VIP) TableName() string { } type PodCluster struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` ClusterName string `gorm:"column:cluster_name;type:varchar(256);default:''" mapstructure:"CLUSTER_NAME"` Version string `gorm:"column:version;type:varchar(256);default:''" mapstructure:"VERSION"` @@ -647,8 +647,8 @@ type PodCluster struct { } type PodNamespace struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` Alias string `gorm:"column:alias;type:char(64);default:''" mapstructure:"ALIAS"` CloudTags string `gorm:"column:cloud_tags;type:text;default:''" mapstructure:"CLOUD_TAGS"` // separated by , @@ -660,8 +660,8 @@ type PodNamespace struct { } type PodNode struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` Alias string `gorm:"column:alias;type:char(64);default:''" mapstructure:"ALIAS"` Type int `gorm:"column:type;type:int;default:null" mapstructure:"TYPE"` // 1: Master 2: Node @@ -679,8 +679,8 @@ type PodNode struct { } type PodIngress struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` Alias string `gorm:"column:alias;type:char(64);default:''" mapstructure:"ALIAS"` PodNamespaceID int `gorm:"column:pod_namespace_id;type:int;default:null" mapstructure:"POD_NAMESPACE_ID"` @@ -692,7 +692,7 @@ type PodIngress struct { } type PodIngressRule struct { - Base `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` Protocol string `gorm:"column:protocol;type:char(64);default:''" mapstructure:"PROTOCOL"` Host string `gorm:"column:host;type:text;default:''" mapstructure:"HOST"` @@ -701,7 +701,7 @@ type PodIngressRule struct { } type PodIngressRuleBackend struct { - Base `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` Path string `gorm:"column:path;type:text;default:''" mapstructure:"PATH"` Port int `gorm:"column:port;type:int;default:null" mapstructure:"PORT"` PodServiceID int `gorm:"column:pod_service_id;type:int;default:null" mapstructure:"POD_SERVICE_ID"` @@ -711,8 +711,8 @@ type PodIngressRuleBackend struct { } type PodService struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` Label string `gorm:"column:label;type:text;default:''" mapstructure:"LABEL"` // separated by , Annotation string `gorm:"column:annotation;type:text;default:''" mapstructure:"ANNOTATION"` // separated by , @@ -731,7 +731,7 @@ type PodService struct { } type PodServicePort struct { - Base `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` Protocol string `gorm:"column:protocol;type:char(64);default:''" mapstructure:"PROTOCOL"` Port int `gorm:"column:port;type:int;default:null" mapstructure:"PORT"` @@ -742,8 +742,8 @@ type PodServicePort struct { } type PodGroup struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` Alias string `gorm:"column:alias;type:char(64);default:''" mapstructure:"ALIAS"` Type int `gorm:"column:type;type:int;default:null" mapstructure:"TYPE"` // 1: Deployment 2: StatefulSet 3: ReplicationController @@ -758,7 +758,7 @@ type PodGroup struct { } type PodGroupPort struct { - Base `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` Protocol string `gorm:"column:protocol;type:char(64);default:''" mapstructure:"PROTOCOL"` Port int `gorm:"column:port;type:int;default:null" mapstructure:"PORT"` @@ -768,8 +768,8 @@ type PodGroupPort struct { } type PodReplicaSet struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` Alias string `gorm:"column:alias;type:char(64);default:''" mapstructure:"ALIAS"` Label string `gorm:"column:label;type:text;default:''" mapstructure:"LABEL"` // separated by , @@ -788,8 +788,8 @@ func (PodReplicaSet) TableName() string { } type PrometheusTarget struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` Instance string `gorm:"column:instance;type:varchar(255);default:''" mapstructure:"INSTANCE"` Job string `gorm:"column:job;type:varchar(255);default:''" mapstructure:"JOB"` ScrapeURL string `gorm:"column:scrape_url;type:varchar(2083);default:''" mapstructure:"SCRAPE_URL"` @@ -805,8 +805,8 @@ func (PrometheusTarget) TableName() string { } type Pod struct { - Base `gorm:"embedded"` - SoftDeleteBase `gorm:"embedded"` + Base `gorm:"embedded" mapstructure:",squash"` + SoftDeleteBase `gorm:"embedded" mapstructure:",squash"` Name string `gorm:"column:name;type:varchar(256);default:''" mapstructure:"NAME"` Alias string `gorm:"column:alias;type:char(64);default:''" mapstructure:"ALIAS"` State int `gorm:"column:state;type:int;not null" mapstructure:"STATE"` // 0.Exception 1.Running diff --git a/server/controller/tagrecorder/ch_app_label.go b/server/controller/tagrecorder/ch_app_label.go index d3b30ce8e4d..4048294fbb8 100644 --- a/server/controller/tagrecorder/ch_app_label.go +++ b/server/controller/tagrecorder/ch_app_label.go @@ -17,6 +17,8 @@ package tagrecorder import ( + "golang.org/x/exp/slices" + "github.com/deepflowio/deepflow/server/controller/db/mysql" ) @@ -47,6 +49,7 @@ func (l *ChAPPLabel) generateNewData() (map[PrometheusAPPLabelKey]mysql.ChAPPLab if !ok { return nil, false } + appLabelSlice, ok := l.generateAPPLabelData() labelNameIDMap, valueNameIDMap, ok := l.generateNameIDData() if !ok { @@ -58,14 +61,17 @@ func (l *ChAPPLabel) generateNewData() (map[PrometheusAPPLabelKey]mysql.ChAPPLab labelID := prometheusMetricLabel.LabelID labelNameValueData := metricLabelIDNameValueMap[labelID] labelName := labelNameValueData["label_name"] - labelNameID := labelNameIDMap[labelName] - labelValue := labelNameValueData["label_value"] - labelValueID := valueNameIDMap[labelValue] - keyToItem[PrometheusAPPLabelKey{LabelNameID: labelNameID, LabelValueID: labelValueID}] = mysql.ChAPPLabel{ - LabelNameID: labelNameID, - LabelValue: labelValue, - LabelValueID: labelValueID, + if slices.Contains(appLabelSlice, labelName) { + labelNameID := labelNameIDMap[labelName] + labelValue := labelNameValueData["label_value"] + labelValueID := valueNameIDMap[labelValue] + keyToItem[PrometheusAPPLabelKey{LabelNameID: labelNameID, LabelValueID: labelValueID}] = mysql.ChAPPLabel{ + LabelNameID: labelNameID, + LabelValue: labelValue, + LabelValueID: labelValueID, + } } + } return keyToItem, true } @@ -101,6 +107,22 @@ func (l *ChAPPLabel) generateLabelIDNameValueData() (map[int]map[string]string, return metricLabelIDNameValueMap, true } +func (l *ChAPPLabel) generateAPPLabelData() ([]string, bool) { + appLabelSlice := []string{} + var prometheusAPPMetricAPPLabelLayouts []mysql.ChPrometheusMetricAPPLabelLayout + err := mysql.Db.Unscoped().Select("app_label_name").Group("app_label_name").Find(&prometheusAPPMetricAPPLabelLayouts).Error + + if err != nil { + log.Errorf(dbQueryResourceFailed(l.resourceTypeName, err)) + return appLabelSlice, false + } + + for _, prometheusAPPMetricAPPLabelLayout := range prometheusAPPMetricAPPLabelLayouts { + appLabelSlice = append(appLabelSlice, prometheusAPPMetricAPPLabelLayout.APPLabelName) + } + return appLabelSlice, true +} + func (l *ChAPPLabel) generateNameIDData() (map[string]int, map[string]int, bool) { labelNameIDMap := make(map[string]int) valueNameIDMap := make(map[string]int) diff --git a/server/controller/tagrecorder/ch_vtap_port.go b/server/controller/tagrecorder/ch_vtap_port.go index 042d81145f9..69d3f6f9fd1 100644 --- a/server/controller/tagrecorder/ch_vtap_port.go +++ b/server/controller/tagrecorder/ch_vtap_port.go @@ -31,6 +31,8 @@ import ( "github.com/deepflowio/deepflow/server/controller/model" ) +const vTapPortNameLength = 256 + type ChVTapPort struct { UpdaterBase[mysql.ChVTapPort, VtapPortKey] } @@ -147,6 +149,9 @@ func (v *ChVTapPort) generateNewData() (map[VtapPortKey]mysql.ChVTapPort, bool) log.Debugf("duplicate name: %s (id: %d)", data.TapName, data.ID) } } + if len(vTapPort.Name) > vTapPortNameLength { + vTapPort.Name = vTapPort.Name[:vTapPortNameLength] + } if vTapPort.DeviceID == 0 && vTapPort.DeviceType == 0 && data.DeviceID != 0 && data.DeviceType != 0 { log.Debugf("device id: %d, device type: %d ", vTapPort.DeviceID, vTapPort.DeviceType) vTapPort.DeviceID = data.DeviceID @@ -195,6 +200,9 @@ func (v *ChVTapPort) generateNewData() (map[VtapPortKey]mysql.ChVTapPort, bool) log.Debugf("duplicate name: %s (id: %d)", data.TapName, data.ID) } } + if len(vTapPort.Name) > vTapPortNameLength { + vTapPort.Name = vTapPort.Name[:vTapPortNameLength] + } if vTapPort.DeviceID == 0 && vTapPort.DeviceType == 0 && data.DeviceID != 0 && data.DeviceType != 0 { log.Debugf("device id: %d, device type: %d ", vTapPort.DeviceID, vTapPort.DeviceType) vTapPort.DeviceID = data.DeviceID @@ -241,6 +249,9 @@ func (v *ChVTapPort) generateNewData() (map[VtapPortKey]mysql.ChVTapPort, bool) } else if !common.Contains(nameSlice, "lo") { vTapPort.Name = strings.Join([]string{"lo", vTapPort.Name}, ", ") } + if len(vTapPort.Name) > vTapPortNameLength { + vTapPort.Name = vTapPort.Name[:vTapPortNameLength] + } if vTapPort.DeviceID == 0 && vTapPort.DeviceType == 0 && deviceInfo.DeviceID != 0 && deviceInfo.DeviceType != 0 { log.Debugf("device id: %d, device type: %d ", vTapPort.DeviceID, vTapPort.DeviceType) vTapPort.DeviceID = deviceInfo.DeviceID diff --git a/server/controller/tagrecorder/const.go b/server/controller/tagrecorder/const.go index 1949b4fac3d..b7dca15e280 100644 --- a/server/controller/tagrecorder/const.go +++ b/server/controller/tagrecorder/const.go @@ -756,12 +756,12 @@ var RESOURCE_TYPE_TO_NODE_TYPE = map[int]string{ common.VIF_DEVICE_TYPE_POD_GROUP: RESOURCE_TYPE_POD_GROUP, common.VIF_DEVICE_TYPE_SERVICE: RESOURCE_TYPE_SERVICE, common.VIF_DEVICE_TYPE_GPROCESS: RESOURCE_TYPE_GPROCESS, - common.VIF_DEVICE_TYPE_POD_GROUP_DEPLOYMENT: RESOURCE_TYPE_CH_POD_GROUP_DEPLOYMENT, - common.VIF_DEVICE_TYPE_POD_GROUP_STATEFULSET: RESOURCE_TYPE_CH_POD_GROUP_STATEFULSET, - common.VIF_DEVICE_TYPE_POD_GROUP_RC: RESOURCE_TYPE_CH_POD_GROUP_RC, - common.VIF_DEVICE_TYPE_POD_GROUP_DAEMON_SET: RESOURCE_TYPE_CH_POD_GROUP_DAEMON_SET, - common.VIF_DEVICE_TYPE_POD_GROUP_REPLICASET_CONTROLLER: RESOURCE_TYPE_CH_POD_GROUP_REPLICASET_CONTROLLER, - common.VIF_DEVICE_TYPE_POD_GROUP_CLONESET: RESOURCE_TYPE_CH_POD_GROUP_CLONESET, + common.VIF_DEVICE_TYPE_POD_GROUP_DEPLOYMENT: RESOURCE_TYPE_POD_GROUP, + common.VIF_DEVICE_TYPE_POD_GROUP_STATEFULSET: RESOURCE_TYPE_POD_GROUP, + common.VIF_DEVICE_TYPE_POD_GROUP_RC: RESOURCE_TYPE_POD_GROUP, + common.VIF_DEVICE_TYPE_POD_GROUP_DAEMON_SET: RESOURCE_TYPE_POD_GROUP, + common.VIF_DEVICE_TYPE_POD_GROUP_REPLICASET_CONTROLLER: RESOURCE_TYPE_POD_GROUP, + common.VIF_DEVICE_TYPE_POD_GROUP_CLONESET: RESOURCE_TYPE_POD_GROUP, common.VIF_DEVICE_TYPE_IP: RESOURCE_TYPE_IP, } diff --git a/server/go.mod b/server/go.mod index 09a1ba3420a..2adde37171f 100644 --- a/server/go.mod +++ b/server/go.mod @@ -35,7 +35,7 @@ require ( github.com/cornelk/hashmap v1.0.8 github.com/deckarep/golang-set v1.8.0 github.com/deckarep/golang-set/v2 v2.1.0 - github.com/deepflowio/deepflow/message v0.0.0-20230927071530-6d75b95973e7 + github.com/deepflowio/deepflow/message v0.0.0-20231009120220-2f68ce239be3 github.com/deepflowio/deepflow/server/controller/cloud/kubernetes_gather/expand v0.0.0-00010101000000-000000000000 github.com/deepflowio/deepflow/server/controller/cloud/platform v0.0.0-00010101000000-000000000000 github.com/deepflowio/deepflow/server/controller/db/mysql/migrator v0.0.0-00010101000000-000000000000 diff --git a/server/go.sum b/server/go.sum index c9b053f6241..04229db40af 100644 --- a/server/go.sum +++ b/server/go.sum @@ -153,8 +153,8 @@ github.com/deckarep/golang-set v1.8.0 h1:sk9/l/KqpunDwP7pSjUg0keiOOLEnOBHzykLrsP github.com/deckarep/golang-set v1.8.0/go.mod h1:5nI87KwE7wgsBU1F4GKAw2Qod7p5kyS383rP6+o6qqo= github.com/deckarep/golang-set/v2 v2.1.0 h1:g47V4Or+DUdzbs8FxCCmgb6VYd+ptPAngjM6dtGktsI= github.com/deckarep/golang-set/v2 v2.1.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= -github.com/deepflowio/deepflow/message v0.0.0-20230927071530-6d75b95973e7 h1:rrvioA174n8n6g0Jk24vtb+8o1dpvT4fP8ASQfSPJEc= -github.com/deepflowio/deepflow/message v0.0.0-20230927071530-6d75b95973e7/go.mod h1:e+1lUMMlycCvFRKvlwt/y/0vxJnF8wVss3GyR1ARXY0= +github.com/deepflowio/deepflow/message v0.0.0-20231009120220-2f68ce239be3 h1:sR2fwjKCZvK3iP5WCfaoscD6mrPohlXf2r/JSRG3b18= +github.com/deepflowio/deepflow/message v0.0.0-20231009120220-2f68ce239be3/go.mod h1:e+1lUMMlycCvFRKvlwt/y/0vxJnF8wVss3GyR1ARXY0= github.com/deepflowio/tempopb v0.0.0-20230215110519-15853baf3a79 h1:erRwXyZiUZxxZX/Q1QHesZXgxjiunZUFy+ggCRimkD4= github.com/deepflowio/tempopb v0.0.0-20230215110519-15853baf3a79/go.mod h1:h2rkZ319TExIUGuK8a2dlcGd8qc6wdhsrcpXWaJQaQE= github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE= diff --git a/server/ingester/ckissu/ckissu.go b/server/ingester/ckissu/ckissu.go index b0efbe3d9f0..defe0985c88 100644 --- a/server/ingester/ckissu/ckissu.go +++ b/server/ingester/ckissu/ckissu.go @@ -950,6 +950,15 @@ var ColumnDrops635 = []*ColumnDrops{ }, } +var ColumnAdd64 = []*ColumnAdds{ + &ColumnAdds{ + Dbs: []string{"flow_log"}, + Tables: []string{"l7_flow_log", "l7_flow_log_local"}, + ColumnNames: []string{"syscall_coroutine_0", "syscall_coroutine_1"}, + ColumnType: ckdb.UInt64, + }, +} + func getTables(connect *sql.DB, db, tableName string) ([]string, error) { sql := fmt.Sprintf("SHOW TABLES IN %s", db) rows, err := connect.Query(sql) @@ -1330,7 +1339,7 @@ func NewCKIssu(cfg *config.Config) (*Issu, error) { datasourceInfo: make(map[string]*DatasourceInfo), } - allVersionAdds := [][]*ColumnAdds{ColumnAdd610, ColumnAdd611, ColumnAdd612, ColumnAdd613, ColumnAdd615, ColumnAdd618, ColumnAdd620, ColumnAdd623, ColumnAdd625, ColumnAdd626, ColumnAdd633, ColumnAdd635} + allVersionAdds := [][]*ColumnAdds{ColumnAdd610, ColumnAdd611, ColumnAdd612, ColumnAdd613, ColumnAdd615, ColumnAdd618, ColumnAdd620, ColumnAdd623, ColumnAdd625, ColumnAdd626, ColumnAdd633, ColumnAdd635, ColumnAdd64} i.columnAdds = []*ColumnAdd{} for _, versionAdd := range allVersionAdds { for _, adds := range versionAdd { diff --git a/server/ingester/common/const.go b/server/ingester/common/const.go index 2399751b2dc..94774dea735 100644 --- a/server/ingester/common/const.go +++ b/server/ingester/common/const.go @@ -17,6 +17,6 @@ package common const ( - CK_VERSION = "v6.3.8.2" // 用于表示clickhouse的表版本号 + CK_VERSION = "v6.3.4.0" // 用于表示clickhouse的表版本号 DEFAULT_PCAP_DATA_PATH = "/var/lib/pcap" ) diff --git a/server/ingester/ext_metrics/decoder/decoder.go b/server/ingester/ext_metrics/decoder/decoder.go index 40c0f550a6d..92b3e5d282c 100644 --- a/server/ingester/ext_metrics/decoder/decoder.go +++ b/server/ingester/ext_metrics/decoder/decoder.go @@ -194,7 +194,7 @@ func (d *Decoder) handleDeepflowStats(vtapID uint16, decoder *codec.SimpleDecode d.counter.ErrorCount++ return } - if err := pbStats.Unmarshal(bytes); err != nil { + if err := pbStats.Unmarshal(bytes); err != nil || pbStats.Name == "" { if d.counter.ErrorCount == 0 { log.Warningf("deepflow stats parse failed, err msg: %s", err) } diff --git a/server/ingester/flow_log/log_data/l7_flow_log.go b/server/ingester/flow_log/log_data/l7_flow_log.go index 3ade31b9e2d..97a195b1ccc 100644 --- a/server/ingester/flow_log/log_data/l7_flow_log.go +++ b/server/ingester/flow_log/log_data/l7_flow_log.go @@ -81,6 +81,8 @@ type L7Base struct { SyscallTraceIDResponse uint64 SyscallThread0 uint32 SyscallThread1 uint32 + SyscallCoroutine0 uint64 + SyscallCoroutine1 uint64 SyscallCapSeq0 uint32 SyscallCapSeq1 uint32 } @@ -130,6 +132,8 @@ func L7BaseColumns() []*ckdb.Column { ckdb.NewColumn("syscall_trace_id_response", ckdb.UInt64).SetComment("SyscallTraceID-响应"), ckdb.NewColumn("syscall_thread_0", ckdb.UInt32).SetComment("Syscall线程-请求"), ckdb.NewColumn("syscall_thread_1", ckdb.UInt32).SetComment("Syscall线程-响应"), + ckdb.NewColumn("syscall_coroutine_0", ckdb.UInt64).SetComment("Request Syscall Coroutine"), + ckdb.NewColumn("syscall_coroutine_1", ckdb.UInt64).SetComment("Response Syscall Coroutine"), ckdb.NewColumn("syscall_cap_seq_0", ckdb.UInt32).SetComment("Syscall序列号-请求"), ckdb.NewColumn("syscall_cap_seq_1", ckdb.UInt32).SetComment("Syscall序列号-响应"), ) @@ -177,6 +181,8 @@ func (f *L7Base) WriteBlock(block *ckdb.Block) { f.SyscallTraceIDResponse, f.SyscallThread0, f.SyscallThread1, + f.SyscallCoroutine0, + f.SyscallCoroutine1, f.SyscallCapSeq0, f.SyscallCapSeq1) } @@ -538,6 +544,8 @@ func (b *L7Base) Fill(log *pb.AppProtoLogsData, platformData *grpc.PlatformInfoT b.SyscallTraceIDResponse = l.SyscallTraceIdResponse b.SyscallThread0 = l.SyscallTraceIdThread_0 b.SyscallThread1 = l.SyscallTraceIdThread_1 + b.SyscallCoroutine0 = l.SyscallCoroutine_0 + b.SyscallCoroutine1 = l.SyscallCoroutine_1 b.SyscallCapSeq0 = l.SyscallCapSeq_0 b.SyscallCapSeq1 = l.SyscallCapSeq_1 diff --git a/server/querier/app/prometheus/config/config.go b/server/querier/app/prometheus/config/config.go index ac14ea75056..829d3deadea 100644 --- a/server/querier/app/prometheus/config/config.go +++ b/server/querier/app/prometheus/config/config.go @@ -25,6 +25,7 @@ type Prometheus struct { RequestQueryWithDebug bool `default:"false" yaml:"request-query-with-debug"` ExternalTagCacheSize int `default:"1024" yaml:"external-tag-cache-size"` ExternalTagLoadInterval int `default:"300" yaml:"external-tag-load-interval"` + ThanosReplicaLabels []string `yaml:"thanos-replica-labels"` Cache PrometheusCache `yaml:"cache"` } diff --git a/server/querier/app/prometheus/model/prometheus.go b/server/querier/app/prometheus/model/prometheus.go index 9658a48e9bf..1504ab80776 100644 --- a/server/querier/app/prometheus/model/prometheus.go +++ b/server/querier/app/prometheus/model/prometheus.go @@ -23,15 +23,16 @@ import ( ) type PromQueryParams struct { - MetricsWithPrefix string - Promql string - StartTime string - EndTime string - Step string - Debug bool - Slimit string - Matchers []string - Context context.Context + MetricsWithPrefix string + Promql string + StartTime string + EndTime string + Step string + Debug bool + Slimit string + ThanosReplicaLabels []string + Matchers []string + Context context.Context } type PromQueryData struct { diff --git a/server/querier/app/prometheus/router/prometheus.go b/server/querier/app/prometheus/router/prometheus.go index 0ae302e9bdb..f3fc0bea873 100644 --- a/server/querier/app/prometheus/router/prometheus.go +++ b/server/querier/app/prometheus/router/prometheus.go @@ -46,6 +46,7 @@ func promQuery(svc *service.PrometheusService) gin.HandlerFunc { args.Slimit = c.Request.FormValue("slimit") debug := c.Request.FormValue("debug") args.Debug, _ = strconv.ParseBool(debug) + result, err := svc.PromInstantQueryService(&args, c.Request.Context()) if err != nil { c.JSON(500, &model.PromQueryResponse{Error: err.Error(), Status: _STATUS_FAIL}) @@ -73,7 +74,6 @@ func promQueryRange(svc *service.PrometheusService) gin.HandlerFunc { c.JSON(500, &model.PromQueryResponse{Error: err.Error(), Status: _STATUS_FAIL}) return } - //pp.Println(result) c.JSON(200, result) }) } @@ -92,9 +92,8 @@ func promReader(svc *service.PrometheusService) gin.HandlerFunc { c.JSON(500, err) return } - //pp.Println(req) + resp, err := svc.PromRemoteReadService(&req, c.Request.Context()) - //pp.Println(resp) if err != nil { c.JSON(500, err) return diff --git a/server/querier/app/prometheus/service/converters.go b/server/querier/app/prometheus/service/converters.go index 3fdebbb3c73..199ac67a3d3 100644 --- a/server/querier/app/prometheus/service/converters.go +++ b/server/querier/app/prometheus/service/converters.go @@ -592,10 +592,25 @@ func (p *prometheusReader) respTransToProm(ctx context.Context, metricsName stri // merge and serialize all tags as map key var deepflowNativeTagString, promTagJson string + var filterTagMap map[string]string // merge prometheus tags if tagIndex > -1 { promTagJson = values[tagIndex].(string) - deepflowNativeTagString = promTagJson + tagMap := make(map[string]string) + json.Unmarshal([]byte(promTagJson), &tagMap) + filterTagMap = make(map[string]string, len(tagMap)) + for k, v := range tagMap { + if k == "" || v == "" { + continue + } + // ignore replica labels if passed + if config.Cfg.Prometheus.ThanosReplicaLabels != nil && common.IsValueInSliceString(k, config.Cfg.Prometheus.ThanosReplicaLabels) { + continue + } + filterTagMap[k] = v + } + promFilterTagJson, _ := json.Marshal(filterTagMap) + deepflowNativeTagString = string(promFilterTagJson) } // merge deepflow autotagging tags @@ -622,14 +637,9 @@ func (p *prometheusReader) respTransToProm(ctx context.Context, metricsName stri // tag label pair var pairs []prompb.Label - if tagIndex > -1 { - tagMap := make(map[string]string) - json.Unmarshal([]byte(promTagJson), &tagMap) - pairs = make([]prompb.Label, 0, 1+len(tagMap)+len(allDeepFlowNativeTags)) - for k, v := range tagMap { - if k == "" || v == "" { - continue - } + if tagIndex > -1 && filterTagMap != nil { + pairs = make([]prompb.Label, 0, 1+len(filterTagMap)+len(allDeepFlowNativeTags)) + for k, v := range filterTagMap { if prefix == prefixTag { // prometheus tag for deepflow metrics pairs = append(pairs, prompb.Label{ diff --git a/server/querier/app/prometheus/service/promql.go b/server/querier/app/prometheus/service/promql.go index d999e7af303..a6c86aa5208 100644 --- a/server/querier/app/prometheus/service/promql.go +++ b/server/querier/app/prometheus/service/promql.go @@ -197,14 +197,9 @@ func (p *prometheusExecutor) promQueryRangeExecute(ctx context.Context, args *mo if start.Local().Unix()%int64(step.Seconds()) > int64(p.lookbackDelta.Seconds()) { start = time.Unix(start.Local().Unix()-start.Local().Unix()%int64(step.Seconds()), 0) } - if end.Local().Unix()%int64(step.Seconds()) > int64(p.lookbackDelta.Seconds()) { - end = time.Unix(end.Local().Unix()-end.Local().Unix()%int64(step.Seconds())+int64(step.Seconds()), 0) - } if int(step.Seconds())%86400 == 0 { year_start, month_start, day_start := start.Date() - year_end, month_end, day_end := end.Date() start = time.Date(year_start, month_start, day_start, 0, 0, 0, 0, start.Location()) - end = time.Date(year_end, month_end, day_end, 0, 0, 0, 0, end.Location()) } qry, err := engine.NewRangeQuery(queriable, nil, args.Promql, start, end, step) if qry == nil || err != nil { diff --git a/server/querier/db_descriptions/clickhouse/metrics/flow_metrics/vtap_flow_port.ch b/server/querier/db_descriptions/clickhouse/metrics/flow_metrics/vtap_flow_port.ch index 9e61bf1ce35..0f001b1a13e 100644 --- a/server/querier/db_descriptions/clickhouse/metrics/flow_metrics/vtap_flow_port.ch +++ b/server/querier/db_descriptions/clickhouse/metrics/flow_metrics/vtap_flow_port.ch @@ -64,13 +64,13 @@ server_half_close_flow , 断连-服务端半关 , 连接 , TCP 传 rtt , 平均 TCP 建连时延 , 微秒 , 统计周期内,所有 TCP 建连时延的平均值,一次时延的计算见文档描述 rtt_client , 平均 TCP 建连客户端时延 , 微秒 , 统计周期内,所有 TCP 建连客户端时延的平均值,一次时延的计算见文档描述 rtt_server , 平均 TCP 建连服务端时延 , 微秒 , 统计周期内,所有 TCP 建连服务端时延的平均值,一次时延的计算见文档描述 -srt , 平均 TCP 系统时延 , 微秒 , 统计周期内,所有 TCP 系统时延的平均值,一次时延的计算见文档描述 +srt , 平均 TCP/ICMP 系统时延 , 微秒 , 统计周期内,所有 TCP/ICMP 系统时延的平均值,一次时延的计算见文档描述 art , 平均数据时延 , 微秒 , 统计周期内,所有数据时延的平均值,数据时延包含 TCP/UDP,一次时延的计算见文档描述 cit , 平均客户端等待时延 , 微秒 , 统计周期内,所有客户端等待时延的平均值,数据时延仅包含 TCP,一次时延的计算见文档描述 rtt_max , 最大 TCP 建连时延 , 微秒 , 统计周期内,所有 TCP 建连时延的最大值,一次时延的计算见文档描述 rtt_client_max , 最大 TCP 建连客户端时延 , 微秒 , 统计周期内,所有 TCP 建连客户端时延的最大值,一次时延的计算见文档描述 rtt_server_max , 最大 TCP 建连服务端时延 , 微秒 , 统计周期内,所有 TCP 建连服务端时延的最大值,一次时延的计算见文档描述 -srt_max , 最大 TCP 系统时延 , 微秒 , 统计周期内,所有 TCP 系统时延的最大值,一次时延的计算见文档描述 +srt_max , 最大 TCP/ICMP 系统时延 , 微秒 , 统计周期内,所有 TCP/ICMP 系统时延的最大值,一次时延的计算见文档描述 art_max , 最大数据时延 , 微秒 , 统计周期内,所有数据时延的最大值,数据时延包含 TCP/UDP,一次时延的计算见文档描述 cit_max , 最大客户端等待时延 , 微秒 , 统计周期内,所有客户端等待时延的最大值,数据时延仅包含 TCP,一次时延的计算见文档描述 diff --git a/server/querier/db_descriptions/clickhouse/metrics/flow_metrics/vtap_flow_port.en b/server/querier/db_descriptions/clickhouse/metrics/flow_metrics/vtap_flow_port.en index bbb9fe0369f..5c90480107b 100644 --- a/server/querier/db_descriptions/clickhouse/metrics/flow_metrics/vtap_flow_port.en +++ b/server/querier/db_descriptions/clickhouse/metrics/flow_metrics/vtap_flow_port.en @@ -64,13 +64,13 @@ server_half_close_flow , Close - Server Half Close , Flow , rtt , Avg TCP Est. Delay , us , rtt_client , Avg TCP Est. Client Delay , us , rtt_server , Avg TCP Est. Server Delay , us , -srt , Avg TCP ACK Delay , us , +srt , Avg TCP/ICMP Response Delay , us , art , Avg Data Delay , us , cit , Avg Client Idle Delay , us , rtt_max , Max TCP Est. Delay , us , rtt_client_max , Max TCP Est. Client Delay , us , rtt_server_max , Max TCP Est. Server Delay , us , -srt_max , Max TCP ACK Delay , us , +srt_max , Max TCP/ICMP Response Delay , us , art_max , Max Data Delay , us , cit_max , Max Client Idle Delay , us , diff --git a/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log b/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log index fde27722e46..81f6f724b97 100644 --- a/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log +++ b/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log @@ -90,6 +90,8 @@ syscall_trace_id_request , syscall_trace_id_request , syscall_trace_id_request syscall_trace_id_response , syscall_trace_id_response , syscall_trace_id_response , int , , Tracing Info , 111 syscall_thread_0 , syscall_thread_0 , syscall_thread_0 , int , , Tracing Info , 111 syscall_thread_1 , syscall_thread_1 , syscall_thread_1 , int , , Tracing Info , 111 +syscall_coroutine_0 , syscall_coroutine_0 , syscall_coroutine_0 , int , , Tracing Info , 111 +syscall_coroutine_1 , syscall_coroutine_1 , syscall_coroutine_1 , int , , Tracing Info , 111 syscall_cap_seq_0 , syscall_cap_seq_0 , syscall_cap_seq_0 , int , , Tracing Info , 111 syscall_cap_seq_1 , syscall_cap_seq_1 , syscall_cap_seq_1 , int , , Tracing Info , 111 diff --git a/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log.ch b/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log.ch index 91a4fc251cf..92c59365c5f 100644 --- a/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log.ch +++ b/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log.ch @@ -90,6 +90,8 @@ syscall_trace_id_request , 请求 Syscall TraceID , syscall_trace_id_response , 响应 Syscall TraceID , syscall_thread_0 , 请求 Syscall 线程 , syscall_thread_1 , 响应 Syscall 线程 , +syscall_coroutine_0 , 请求 Syscall 协程 , +syscall_coroutine_1 , 响应 Syscall 协程 , syscall_cap_seq_0 , 请求 Syscall 序号 , syscall_cap_seq_1 , 响应 Syscall 序号 , diff --git a/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log.en b/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log.en index 60cf12ef192..539a533e722 100644 --- a/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log.en +++ b/server/querier/db_descriptions/clickhouse/tag/flow_log/l7_flow_log.en @@ -90,6 +90,8 @@ syscall_trace_id_request , Req Syscall TraceID , syscall_trace_id_response , Resp Syscall TraceID , syscall_thread_0 , Req Syscall Thread , syscall_thread_1 , Resp Syscall Thread , +syscall_coroutine_0 , Req Syscall Coroutine , +syscall_coroutine_1 , Resp Syscall Coroutine , syscall_cap_seq_0 , Req Syscall CapSeq , syscall_cap_seq_1 , Resp Syscall CapSeq , diff --git a/server/querier/engine/clickhouse/tag/const.go b/server/querier/engine/clickhouse/tag/const.go index af4068d1d16..7ccf80bc19a 100644 --- a/server/querier/engine/clickhouse/tag/const.go +++ b/server/querier/engine/clickhouse/tag/const.go @@ -17,22 +17,28 @@ package tag const ( - VIF_DEVICE_TYPE_INTERNET = 0 - VIF_DEVICE_TYPE_VM = 1 - VIF_DEVICE_TYPE_VROUTER = 5 - VIF_DEVICE_TYPE_HOST = 6 - VIF_DEVICE_TYPE_DHCP_PORT = 9 - VIF_DEVICE_TYPE_POD = 10 - VIF_DEVICE_TYPE_POD_SERVICE = 11 - VIF_DEVICE_TYPE_REDIS_INSTANCE = 12 - VIF_DEVICE_TYPE_RDS_INSTANCE = 13 - VIF_DEVICE_TYPE_POD_NODE = 14 - VIF_DEVICE_TYPE_LB = 15 - VIF_DEVICE_TYPE_NAT_GATEWAY = 16 - VIF_DEVICE_TYPE_POD_GROUP = 101 - VIF_DEVICE_TYPE_SERVICE = 102 - VIF_DEVICE_TYPE_GPROCESS = 120 - VIF_DEVICE_TYPE_IP = 255 + VIF_DEVICE_TYPE_INTERNET = 0 + VIF_DEVICE_TYPE_VM = 1 + VIF_DEVICE_TYPE_VROUTER = 5 + VIF_DEVICE_TYPE_HOST = 6 + VIF_DEVICE_TYPE_DHCP_PORT = 9 + VIF_DEVICE_TYPE_POD = 10 + VIF_DEVICE_TYPE_POD_SERVICE = 11 + VIF_DEVICE_TYPE_REDIS_INSTANCE = 12 + VIF_DEVICE_TYPE_RDS_INSTANCE = 13 + VIF_DEVICE_TYPE_POD_NODE = 14 + VIF_DEVICE_TYPE_LB = 15 + VIF_DEVICE_TYPE_NAT_GATEWAY = 16 + VIF_DEVICE_TYPE_POD_GROUP = 101 + VIF_DEVICE_TYPE_SERVICE = 102 + VIF_DEVICE_TYPE_GPROCESS = 120 + VIF_DEVICE_TYPE_POD_GROUP_DEPLOYMENT = 130 + VIF_DEVICE_TYPE_POD_GROUP_STATEFULSET = 131 + VIF_DEVICE_TYPE_POD_GROUP_RC = 132 + VIF_DEVICE_TYPE_POD_GROUP_DAEMON_SET = 133 + VIF_DEVICE_TYPE_POD_GROUP_REPLICASET_CONTROLLER = 134 + VIF_DEVICE_TYPE_POD_GROUP_CLONESET = 135 + VIF_DEVICE_TYPE_IP = 255 ) const ( @@ -70,8 +76,19 @@ var AutoPodGroupMap = map[string]int{ } var AutoServiceMap = map[string]int{ - "pod_group": VIF_DEVICE_TYPE_POD_GROUP, - "service": VIF_DEVICE_TYPE_SERVICE, + "pod_group": VIF_DEVICE_TYPE_POD_GROUP, + "deployment": VIF_DEVICE_TYPE_POD_GROUP_DEPLOYMENT, + "stateful_set": VIF_DEVICE_TYPE_POD_GROUP_STATEFULSET, + "replication_controller": VIF_DEVICE_TYPE_POD_GROUP_RC, + "daemon_set": VIF_DEVICE_TYPE_POD_GROUP_DAEMON_SET, + "replica_set_controller": VIF_DEVICE_TYPE_POD_GROUP_REPLICASET_CONTROLLER, + "clone_set": VIF_DEVICE_TYPE_POD_GROUP_CLONESET, + "service": VIF_DEVICE_TYPE_SERVICE, +} + +var PodGroupTypeSlice = []string{ + "deployment", "stateful_set", "replication_controller", "daemon_set", + "replica_set_controller", "clone_set", } var NoLanguageTag = []string{ diff --git a/server/querier/engine/clickhouse/tag/description.go b/server/querier/engine/clickhouse/tag/description.go index 8e0d577a054..093c528c0ea 100644 --- a/server/querier/engine/clickhouse/tag/description.go +++ b/server/querier/engine/clickhouse/tag/description.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "golang.org/x/exp/slices" "regexp" "strconv" "strings" @@ -701,6 +702,9 @@ func GetTagResourceValues(db, table, rawSql string) (*common.Result, []string, e "auto_service": AutoServiceMap, } for resourceKey, resourceType := range autoMap[tag] { + if slices.Contains(PodGroupTypeSlice, resourceKey) { + continue + } resourceId := resourceKey + "_id" resourceName := resourceKey + "_name" if resourceKey == "service" { diff --git a/server/querier/engine/clickhouse/tag/translation.go b/server/querier/engine/clickhouse/tag/translation.go index dd73d879b22..7ed4aa6623b 100644 --- a/server/querier/engine/clickhouse/tag/translation.go +++ b/server/querier/engine/clickhouse/tag/translation.go @@ -883,7 +883,7 @@ func GenerateTagResoureMap() map[string]map[string]*Tag { tagResourceMap[podGroupTypeSuffix] = map[string]*Tag{ "default": NewTag( "dictGet(flow_tag.pod_group_map, 'pod_group_type', (toUInt64("+podGroupIDSuffix+")))", - "", + podGroupIDSuffix+"!=0", "toUInt64("+podGroupIDSuffix+") IN (SELECT id FROM flow_tag.pod_group_map WHERE pod_group_type %s %s)", "", ), diff --git a/server/server.yaml b/server/server.yaml index 273fef7881d..07bf4aeb460 100644 --- a/server/server.yaml +++ b/server/server.yaml @@ -280,6 +280,7 @@ querier: request-query-with-debug: true external-tag-cache-size: 1024 external-tag-load-interval: 300 + thanos-replica-labels: [] # remove duplicate replica labels when query data cache: enabled: true cache-item-size: 512000 # max size of cache item, unit: byte