From 1133fd8cf47289a099aa0985a46aa88e672c2fd1 Mon Sep 17 00:00:00 2001 From: mickeyzzc Date: Thu, 26 Oct 2023 12:58:15 +0800 Subject: [PATCH] rebase main from #4507 Signed-off-by: mickeyzzc --- agent/Cargo.lock | 4 +- .../flow_generator/protocol_logs/mq/kafka.rs | 288 ++++++++++++++---- 2 files changed, 238 insertions(+), 54 deletions(-) diff --git a/agent/Cargo.lock b/agent/Cargo.lock index c3f793d3503..295515ffdde 100644 --- a/agent/Cargo.lock +++ b/agent/Cargo.lock @@ -1034,9 +1034,9 @@ checksum = "c62df5cd2197b4dff3765c5a0de428a6bb9bcfba743e37e0ecc494c13f6c758c" [[package]] name = "encoding_rs" -version = "0.8.32" +version = "0.8.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "071a31f4ee85403370b58aca746f01041ede6f0da2730960ad001edc2b71b394" +checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1" dependencies = [ "cfg-if", ] diff --git a/agent/src/flow_generator/protocol_logs/mq/kafka.rs b/agent/src/flow_generator/protocol_logs/mq/kafka.rs index b82777b26fa..c7d7dc12ef6 100644 --- a/agent/src/flow_generator/protocol_logs/mq/kafka.rs +++ b/agent/src/flow_generator/protocol_logs/mq/kafka.rs @@ -34,6 +34,7 @@ use crate::{ }, utils::bytes::{read_i16_be, read_u16_be, read_u32_be}, }; +use std::str::from_utf8_unchecked; const KAFKA_PRODUCE: u16 = 0; const KAFKA_FETCH: u16 = 1; @@ -68,7 +69,7 @@ pub struct KafkaInfo { #[serde(rename = "response_status")] pub status: L7ResponseStatus, #[serde(rename = "response_code", skip_serializing_if = "Option::is_none")] - pub status_code: Option, + pub status_code: Option, rrt: u64, } @@ -204,14 +205,15 @@ impl From for L7ProtocolSendLog { req_len: f.req_msg_size, resp_len: f.resp_msg_size, req: L7Request { - req_type: String::from(command_str), + req_type: format!("{}_v{}", command_str, f.api_version), resource: f.topic_name, ..Default::default() }, version: Some(f.api_version.to_string()), resp: L7Response { status: f.status, - code: f.status_code, + code: Some(f.status_code.unwrap_or(0).into()), + result: f.status_code.unwrap_or(0).to_string(), ..Default::default() }, ext_info: Some(ExtendedInfo { @@ -278,11 +280,14 @@ impl L7ProtocolParserInterface for KafkaLog { if param.time < previous.time + param.rrt_timeout as u64 => { if let Some(req) = previous.kafka_info.as_ref() { + info.api_key = req.api_key; + info.api_version = req.api_version; self.set_status_code( req.api_key, req.api_version, - read_i16_be(&payload[12..]), + &payload[KAFKA_RESP_HEADER_LEN..], &mut info, + None, ) } } @@ -293,8 +298,9 @@ impl L7ProtocolParserInterface for KafkaLog { self.set_status_code( info.api_key, info.api_version, - resp.code, + &payload[KAFKA_REQ_HEADER_LEN..], &mut info, + Some(resp.code), ) } } @@ -302,19 +308,21 @@ impl L7ProtocolParserInterface for KafkaLog { } } } - - info.cal_rrt( - param, - Some(KafkaInfoCache { - api_key: info.api_key, - api_version: info.api_version, - code: read_i16_be(&payload[12..]), - }), - ) - .map(|rrt| { - info.rrt = rrt; - self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); - }); + // Optimize cache policy to reduce invalid memory consumption + if info.api_key <= KAFKA_FETCH { + info.cal_rrt( + param, + Some(KafkaInfoCache { + api_key: info.api_key, + api_version: info.api_version, + code: info.status_code.unwrap_or(0), + }), + ) + .map(|rrt| { + info.rrt = rrt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); + }); + } if param.parse_log { Ok(L7ParseResult::Single(L7ProtocolInfo::KafkaInfo(info))) } else { @@ -335,37 +343,38 @@ impl L7ProtocolParserInterface for KafkaLog { } } -impl KafkaLog { - const MSG_LEN_SIZE: usize = 4; - const MAX_TRACE_ID: usize = 255; - - fn get_topics_name_offset(api_key: u16, api_version: u16) -> Option { - match api_key { +// Kafka has different fixed offsets for different versions of each api key, +// which returns a fixed offset here. +// Because there are too many versions and large differences, +// only the most common produce and fetch api keys are implemented here. +macro_rules! kafka_apiversion_topic_fixed_offset { + ($api_key:expr, $api_version:expr) => { + match $api_key { KAFKA_PRODUCE => { - if api_version <= 2 { + if $api_version <= 2 { // Offset for API version <= 2 - Some(24) - } else if api_version <= 9 { + Some(10) + } else if $api_version <= 9 { // Offset for API version <= 9 - Some(26) + Some(12) } else { // Invalid API version None } } KAFKA_FETCH => { - if api_version <= 2 { + if $api_version <= 2 { // Offset for API version <= 2 - Some(30) - } else if api_version == 3 { + Some(16) + } else if $api_version == 3 { // Offset for API version == 3 - Some(34) - } else if api_version <= 6 { + Some(20) + } else if $api_version <= 6 { // Offset for API version <= 6 - Some(35) - } else if api_version <= 12 { + Some(21) + } else if $api_version <= 12 { // Offset for API version <= 12 - Some(43) + Some(29) } else { // Invalid API version None @@ -373,7 +382,47 @@ impl KafkaLog { } _ => None, } - } + }; +} + +macro_rules! kafka_apiversion_errcode_fixed_offset { + ($api_key:expr, $api_version:expr) => { + match $api_key { + KAFKA_PRODUCE => { + if $api_version <= 8 { + // Offset for API version <= 8 + Some(14) + } else if $api_version <= 9 { + // Offset for API version <= 9 + // TODO: + None + } else { + // Invalid API version + None + } + } + KAFKA_FETCH => { + if $api_version == 0 { + Some(14) + } else if $api_version <= 6 { + // Offset for API version <= 6 + Some(18) + } else if $api_version <= 15 { + // Offset for API version in [7..15] + Some(4) + } else { + // Invalid API version + None + } + } + _ => None, + } + }; +} + +impl KafkaLog { + const MSG_LEN_SIZE: usize = 4; + const MAX_TRACE_ID: usize = 255; fn check_char_boundary(payload: &str, start: usize, end: usize) -> bool { let mut invalid = false; @@ -443,19 +492,11 @@ impl KafkaLog { return Err(Error::KafkaLogParseFailed); } // topic - if let Some(mut topic_offset) = Self::get_topics_name_offset(info.api_key, info.api_version) - { - topic_offset += client_id_len; - if topic_offset + 2 < payload.len() { - let topic_name_len = read_u16_be(&payload[topic_offset..]) as usize; - if topic_name_len <= payload[topic_offset + 2..].len() { - info.topic_name = String::from_utf8_lossy( - &payload[topic_offset + 2..topic_offset + 2 + topic_name_len], - ) - .into_owned(); - } - } + let topic_name = Self::get_topics_name(self, info.api_key, info.api_version, payload); + if !topic_name.is_none() { + info.topic_name = topic_name.unwrap() } + // sw8 let payload = String::from_utf8_lossy(&payload[14..14 + client_id_len]); Self::decode_sw8_trace_id(&payload, info); @@ -499,6 +540,97 @@ impl KafkaLog { Ok(()) } + fn get_topics_name( + &mut self, + api_key: u16, + api_version: u16, + payload: &[u8], + ) -> Option { + let fixed_offset = kafka_apiversion_topic_fixed_offset!(api_key, api_version); + if fixed_offset.is_none() { + return None; + } + if fixed_offset.unwrap_or(0) > payload.len() { + return None; + } + let mut fixed_offset = fixed_offset.unwrap(); + match api_key { + KAFKA_PRODUCE => { + match api_version { + // Significant improvements since version 9 + 9 => { + let tid_len = payload[0]; + if tid_len > 0 { + fixed_offset = fixed_offset + tid_len as usize + } + if fixed_offset + 1 + payload[fixed_offset] as usize > payload.len() { + return None; + } + return Some(Self::decode_compact_string( + &payload + [fixed_offset..fixed_offset + 1 + payload[fixed_offset] as usize], + )); + } + _ => { + if api_version >= 3 && api_version <= 8 { + let tid_len = read_i16_be(&payload[0..2]); + if tid_len > 0 { + fixed_offset = fixed_offset + tid_len as usize + } + } + } + } + if fixed_offset + 2 > payload.len() { + return None; + } + let len = read_u16_be(&payload[fixed_offset..fixed_offset + 2]); + if fixed_offset + 2 + len as usize > payload.len() { + return None; + } + return Some( + String::from_utf8_lossy( + &payload[fixed_offset + 2..fixed_offset + 2 + len as usize], + ) + .into_owned(), + ); + } + KAFKA_FETCH => { + // The 12th version is a transitional version, and the decoding protocols of the previous versions are more different. + if api_version == 12 { + if payload.len() < fixed_offset + 1 + payload[fixed_offset] as usize { + return None; + } + return Some(Self::decode_compact_string( + &payload[fixed_offset..fixed_offset + 1 + payload[fixed_offset] as usize], + )); + } + let topic_len = read_u16_be(&payload[fixed_offset..fixed_offset + 2]); + if fixed_offset + 2 + topic_len as usize > payload.len() { + return None; + } + return Some( + String::from_utf8_lossy( + &payload[fixed_offset + 2..fixed_offset + 2 + topic_len as usize], + ) + .into_owned(), + ); + } + _ => { + return None; + } + } + } + + fn decode_compact_string(bytes: &[u8]) -> String { + let (size, bytes) = bytes.split_at(1); + let size = size[0] as usize; + + let content = &bytes[..size]; + let text = unsafe { from_utf8_unchecked(content) }; + + text.to_string() + } + /* reference: https://kafka.apache.org/protocol.html#protocol_messages @@ -513,17 +645,69 @@ impl KafkaLog { &mut self, api_key: u16, api_version: u16, - code: i16, + payload: &[u8], info: &mut KafkaInfo, + code: Option, ) { - if api_key == KAFKA_FETCH && api_version >= 7 { - info.status_code = Some(code as i32); + if let Some(code) = code { if code == 0 { info.status = L7ResponseStatus::Ok; + info.status_code = Some(code); } else { info.status = L7ResponseStatus::ServerError; self.perf_stats.as_mut().map(|p| p.inc_resp_err()); } + return; + } + let fixed_offset = kafka_apiversion_errcode_fixed_offset!(api_key, api_version); + if fixed_offset.is_none() { + return; + } + if payload.len() < fixed_offset.unwrap_or(0) { + return; + } + let mut fixed_offset = fixed_offset.unwrap(); + let mut topic_len = 0; + match api_key { + KAFKA_PRODUCE => { + if api_version <= 8 { + topic_len = read_i16_be(&payload[4..6]); + } + //Significant improvements since version 9, does not support now + if api_version == 9 { + return; + }; + } + KAFKA_FETCH => { + if api_version == 0 { + topic_len = read_i16_be(&payload[4..6]); + } else if api_version <= 6 { + topic_len = read_i16_be(&payload[8..10]); + } else if api_version >= 12 { + // The 12th version is a transitional version, and the decoding protocols of the previous versions are more different. + return; + } + } + _ => { + return; + } + } + if topic_len > 0 { + fixed_offset = fixed_offset + topic_len as usize + } + if fixed_offset + 2 > payload.len() { + return; + } + info.status_code = Some( + read_i16_be(&payload[fixed_offset..fixed_offset + 2]) + .try_into() + .unwrap(), + ); + if info.status_code == Some(0) { + info.status = L7ResponseStatus::Ok; + } else { + info.status = L7ResponseStatus::ServerError; + self.perf_stats.as_mut().map(|p| p.inc_resp_err()); } } }