From 099e4ab74e787cfd338ac3e60a5a595aa0dbe45c Mon Sep 17 00:00:00 2001 From: yuanchao Date: Fri, 13 Oct 2023 19:55:58 +0800 Subject: [PATCH] [Agent] Fix mongo log request resource is null #22509 --- .../test/flow_generator/mongo/mongo.pcap | Bin 0 -> 15158 bytes .../test/flow_generator/mongo/mongo.result | 30 ++++ .../flow_generator/protocol_logs/sql/mongo.rs | 144 +++++++++++++++--- 3 files changed, 154 insertions(+), 20 deletions(-) create mode 100644 agent/resources/test/flow_generator/mongo/mongo.pcap create mode 100644 agent/resources/test/flow_generator/mongo/mongo.result diff --git a/agent/resources/test/flow_generator/mongo/mongo.pcap b/agent/resources/test/flow_generator/mongo/mongo.pcap new file mode 100644 index 0000000000000000000000000000000000000000..d5be4b29127d9e970350c0a89c580760a3e516ff GIT binary patch literal 15158 zcmeHO3ve98neKfU8?YT2Oe|wUO@wnYwpQ9*JuC~$_G-0j$(D9)?MgBRBW8DIS7Xi2 ztY>ERFckvHT@tu5PUQgeGByw62bkE#F&N@19?2y~D(A|bO+g|^6;*`GaiJ)ZkSpRN z_x-)Ak$0sNT#Cz4g+|q@o$j8mzyANfyZ^s?_RLGi9&$1_8^7Glft!yORLHN^yVyE> zr}W_$R~_rE!RLR@ymb}}F}AX@X$@O*!MQJ7IKG} z?it73&RG|A|Mnl^l<#o4W)kvO8j6sCna9r~Ge*c)ts_)IS8hA-z8XIHYoP0UFWDw6 zcxW7X-h4So=wryK)0wG2%HAZs9sJGyg@DGKyBcIGK_7J=Hjik21yV6irJu zxI$cJR!-(r(NYHGK+fuyHA_i~mdv)JdYLJjYRnQ1i`R8#`eju`P02tv4R(!PsT|+NDz$46iYBT;Fpup-Rg*ADo^{oA)&6=w z=qB4ik6R;InX~RtwA^r^&99&&2wu^cz_SHIR%OwYh2_;Ns{LN?3Smi5H{`3CQ#6wO ziX|tloFVfDh8r9D8tRzc=v9DP4?eOg)K>ev)n1R!*Wj&g^j7<-Wp#OVp+0JnHT?+I zbe??{TYlZ>qhw9t!zGsg_WqqG24J7>EVwssO|Z>YSbaAkzSQhuH^V+D<;UmvSaN1Z z_Yq=ic2dN0_RhkyHe!enR{~`vxY9+2IE#N9|De2b)R_h2Dj28B(zCj%r$>4UY9Pa! znKksJY?|#7lPfNLHvCfkhf9Pbqn9N09BFSHuq-1Ex0t#XO>BjbW6E}!${dv4iRZ<% zOnEIMmTWSnjykEJ*MLS^L@U|P>+k)hgI%cVNl}d}uqXFP&z!%f@dHkx&|rP138C!?BhpoqC;NS)7TNIF*a-r zWj3`+iH44dzgyp|rQ(Uf!E}|`Y^_^Auy$Z;?S^eZ#q3oFgH}zOe^W!SN9%0V!ad!? z8!~!Jpt-q9Y4+87yMEQ0{5!vRwmdPQhJSDs?E78$ z^}IF05_@6eA+qmN9d5P=-KJDJ|2pPWt|G)gHBE|G&fZPDY$HBEh_8Vv$&jwsF<U&x{LSlC=*4U^w zW|M8Hts9s1Yz((c#&5AWW+g+%3yxYHDgT(y7a)2+9Zv)`3po);CwbUPIAz^y=OMj{c}qUy)ZWRzx=*km>8vBBnabe$U7 zWcZf%YTeex#;w`fny&Z?wNYUI69$Xud)ZnK-+g{LS@(CfEG_jA4Wmgx0S=yf0I zb-{nwdcA`L5d+F9B%*CoZo>zJ>afCdh1L^Z=KcR3fuKLS}DrnY~dYv%MjptVSYAHk~TmI7jpxy98gb^4HUn z@2hQWs;l+am&}cS?(+POXkB@Wn|&SCDNT@)RoTL&bh<=JZ>r%_D@jU^Kr96*9V98O z0m_)~ZquqnrX!J4R7ufNI&SECZ*{4ln!Mf%iP4*2v;{LErBXr7jE){8Myq!e3+mL$ z{X3tjAVFPu+!oX>7}vo?Kv{$Kh?ULAiA>vH!uK&J&6!rLv6c2}WY@~R#$;WxzQJ49 zC`l>5UuX^~{~orJRaYd|M}TM%B;l)E|;E_HPcktjks|dIZacvG!t+WML{gqA^56c6|yI#WKF^66T)&r z@Oyo=%$d|PSw%&YpQJW(afODoLB-Iu40+Ux(08Y#m=AH$ALD6akYikntmrf>|1bhff)&_%D3yszF)qqPTQ>vIYSb!i)kRR7-mj5FGV2Wm;l#0A0{2w9pL?LJjIaNg)$bwAWcvP9}pg8th`?|`@wER zm7CLAE?1FSJCXv3Os+ ztt%90?du73#oD70p}sg9?Z*0A1MyJ2y)#rOid`FvhdTQr(Z1HuhOST$rLBdc&Op44 z3Ob^ZwSBG8?v{>FAI}SSM}l$S#|jlgkpPtiVzE$H95~?L*A|Gtx$P>{=!mW*q*xnh zQ%PSW&>0$goC--K5%j+Ya=12zobw*+ZI8D_yY0@weeMzhEwK&~LJ-Y%w6BeXTKfXQ zU?>LnrYxaYCSQhTNpd2WM$Y6>rrX5WLx`DQ9!%B&#UXB^B`Tq$(~3+X8->7_D+ge?zr^H=WN8o za42?VfWp^Q{I{2H+PZVX^u&F4A7rK8$L2X#yh8L7JKby;UKOQM@58lq^l_peEZM#IVK2Ooo#cI7e%kgv z?uYkr7~6_0fcB{iHvKOlv&_;l^<86%lyiNYbCZDf*h1b8#+X&~W;;U5^CMxw-R0%>w65oPRjOb_w zzgMJ+Ccoe3_o5J`)-cLxb`>>A#hNW^6;%}(GmYFplJ-dizN+e~pqPTDTS9)?k%SbM zGHmtTadE>{*p#~mV%>ds>tI1?);9_VdE&7_(*PlzvSq#&fF{z+W zm7+Q%j+m9zLa!{CxpW#m6tM7O;YK+tPtzTRQ1d!#{ZewzTFjVz|O0cLcVyI|&q8 zkm<6uKu1#f7?%feyb#9p7nsi6m#=UT>D%s~G7LUUq*vcoY&t)A0j85G349=WFE2Hv1QR*8}TT7jt3Q>P}onG zy?k%LjRRqDFe6bQ8O>skA%FcZ91rp(3$ITckFM?^>No9iv)ABNQkvkj{&e(vL_KtI zvFCAm6rRUv@;nxL^VC6m4?K^fyMWS)Mu@&m{bppkv28V&Klt(TAiXAM8hqb3zaLDo zA0lPuNsaAt)W}w{uvuJ9F(tOU5s+asm3^2gztMHmYr>G&mDxM zZ&)KeBPBM7RzLd!)yCFrjjAN_?3rt;o}(8=c-gE%5J{&;F2tOcoS}`4P7Z-+LecUs zs8Do~!UjE?g-wD$7TftaeRpQ5i7V_T^z_(n`k0_-M*$i+|Ag!rEAlug+I7IqZbc!b z2{vk0wsRZ(rxL3_t%iSY!Y?*2xZ1YSr%2g!90gaIU$&zE2Dg*=#eDaLI+PEW{&~|i z-zGx;0zzBhqEafgJRJQTAwsLaS!{VfH}~&+c5aE!`!Fo;De`$kXoE=Egrr+waOpZD|W^WJl?`j@oWe(lhdKJPkOY|qUsUj3h)tA<~_13vF}TH$LJeBR?& z{U0X8pFQDb-^F4}X@WoTx0UO;KXGDG#B#@Xr00tf$-TV@i7tEjUXGhjvS(Z3VEm(b zq>dO~e|XCM*ki;nG*Ikdym|*5j2lY!V;_Noah&#J!yrT~ZHAxm{n*6+<9_VyUn>%6 z;r3&0c0XiBX@Z^XsO;c&@T9pz0`5{n2qZdmdPnN9eti%V)2T)HiAdfX+1E60{?HOayy5cM zQ+XpB_{4N{Qp9q{DQ53(?7M~#$s3u2M3+6X|3YtMv3C2t#BkA~*;Abkf1enJ`itkD z*9S58SjFDR&{E7j*N`{T4tB)SX80NRMt<}1U-hRk~%YYuxDTA(z+zqzxri(B5FNfFB(XWsUW+452R z=5J`}jX=S6Fu!cQk{98|4q)N8L?)w+HkbEfUV57F*3+Y}Jl9N&58gM2sX(AK!Ogj0 z^aL>um%OL&#vxeZ8M4F$FXb7}oBs{`3hZe2QlP9uBGxuzdQGiJ{%?Q#If2@K|Nc2s zjr _HEADER_SIZE => { + _OP_MSG if payload.len() > _MSG_DOC_SECTION_OFFSET => { // OP_MSG let mut msg_body = MongoOpMsg::default(); - msg_body.decode(&payload[_HEADER_SIZE..])?; + // TODO: Message Flags + msg_body.decode(&payload[_MSG_DOC_SECTION_OFFSET..])?; match info.msg_type { LogMessageType::Response => { - info.response = msg_body.sections.doc.to_string(); - info.exception = msg_body.sections.c_string.unwrap_or(_UNKNOWN.to_string()); - info.response_code = - msg_body.sections.doc.get_f64("code").unwrap_or(0.0) as i32; + // The data structure of doc is Bson, which is a normal response when there is no errmsg in it + if msg_body.sections.doc.get_str("errmsg").is_err() { + info.response = msg_body.sections.doc.to_string(); + } else { + info.exception = + msg_body.sections.doc.get_str("errmsg").unwrap().to_string(); + } + if info.exception.len() == 0 { + info.exception = + msg_body.sections.c_string.unwrap_or(_UNKNOWN.to_string()); + } + info.response_code = msg_body.sections.doc.get_i32("code").unwrap_or(0); if info.response_code > 0 { self.perf_stats.as_mut().map(|p| p.inc_resp_err()); } @@ -315,13 +327,16 @@ impl MongoDBLog { } _OP_QUERY if payload.len() > 28 => { // "OP_QUERY" - info.exception = CStr::from_bytes_until_nul(&payload[..20]) - .map_err(|_| Error::L7ProtocolUnknown)? - .to_string_lossy() - .into_owned(); + let collection_name = + CStr::from_bytes_until_nul(&payload[_COLLECTION_NAME_OFFSET..]) + .map_err(|_| Error::L7ProtocolUnknown)? + .to_string_lossy() + .into_owned(); - let query = Document::from_reader(&payload[28 + info.exception.len() + 1..]) - .unwrap_or(Document::default()); + let query = Document::from_reader( + &payload[_QUERY_DOC_OFFSET + collection_name.len() + 1..], + ) + .unwrap_or(Document::default()); info.request = query.to_string(); } _OP_GET_MORE | _OP_DELETE if payload.len() > 20 => { @@ -415,18 +430,24 @@ pub struct MongoOpMsg { } impl MongoOpMsg { + const _KIND_OFFSET: usize = 0; + const _KIND_LEN: usize = 1; + const _DOC_LENGTH_OFFSET: usize = Self::_KIND_OFFSET + Self::_KIND_LEN; + const _DOC_LENGTH_LEN: usize = 4; + fn decode(&mut self, payload: &[u8]) -> Result { - if payload.len() < 9 { + if payload.len() < Self::_DOC_LENGTH_OFFSET + Self::_DOC_LENGTH_LEN { return Ok(false); } - // todo: decode flag let mut sections = Sections::default(); //sections.kind = payload[4]; - let section_len = bytes::read_u32_le(&payload[5..9]); - if payload.len() < 4 + section_len as usize { + let section_len = bytes::read_u32_le( + &payload[Self::_DOC_LENGTH_OFFSET..Self::_DOC_LENGTH_OFFSET + Self::_DOC_LENGTH_LEN], + ); + if payload.len() < Self::_DOC_LENGTH_LEN + section_len as usize { return Ok(false); } - let _ = sections.decode(&payload[4..]); + let _ = sections.decode(&payload); self.sections = sections; // todo: decode checksum Ok(true) @@ -455,8 +476,9 @@ impl Sections { 0 => { // Body self.kind_name = "BODY".to_string(); - let lenght = bytes::read_u32_le(&payload[1..5]); - if lenght != payload.len() as u32 - 1 { + let length = bytes::read_u32_le(&payload[1..5]); + // TODO: When ChecksumPresent is 1, there will be checksum in the payload + if length != payload.len() as u32 - 1 && length != payload.len() as u32 - 5 { return Ok(false); } self.doc = Document::from_reader(&payload[1..]).unwrap_or(Document::default()); @@ -567,3 +589,85 @@ pub struct MongoOpUpdate { zero: u32, } */ + +#[cfg(test)] +mod tests { + use std::path::Path; + use std::rc::Rc; + use std::{cell::RefCell, fs}; + + use super::*; + + use crate::{ + common::{flow::PacketDirection, l7_protocol_log::L7PerfCache, MetaPacket}, + flow_generator::L7_RRT_CACHE_CAPACITY, + utils::test::Capture, + }; + + const FILE_DIR: &str = "resources/test/flow_generator/mongo"; + + fn run(name: &str) -> String { + let capture = Capture::load_pcap(Path::new(FILE_DIR).join(name), None); + let log_cache = Rc::new(RefCell::new(L7PerfCache::new(L7_RRT_CACHE_CAPACITY))); + let mut packets = capture.as_meta_packets(); + if packets.is_empty() { + return "".to_string(); + } + + let mut output: String = String::new(); + let first_dst_port = packets[0].lookup_key.dst_port; + for packet in packets.iter_mut() { + packet.lookup_key.direction = if packet.lookup_key.dst_port == first_dst_port { + PacketDirection::ClientToServer + } else { + PacketDirection::ServerToClient + }; + let payload = match packet.get_l4_payload() { + Some(p) => p, + None => continue, + }; + + let mut mongo = MongoDBLog::default(); + let param = &ParseParam::new(packet as &MetaPacket, log_cache.clone(), true, true); + + let is_mongo = mongo.check_payload(payload, param); + let info = mongo.parse_payload(payload, param); + if let Ok(info) = info { + match info.unwrap_single() { + L7ProtocolInfo::MongoDBInfo(i) => { + output.push_str(&format!("{:?} is_mongo: {}\r\n", i, is_mongo)); + } + _ => unreachable!(), + } + } else { + output.push_str(&format!( + "{:?} is_mongo: {}\r\n", + MongoDBInfo::default(), + is_mongo + )); + } + } + output + } + + #[test] + fn check() { + let files = vec![("mongo.pcap", "mongo.result")]; + + for item in files.iter() { + let expected = fs::read_to_string(&Path::new(FILE_DIR).join(item.1)).unwrap(); + let output = run(item.0); + + if output != expected { + let output_path = Path::new("actual.txt"); + fs::write(&output_path, &output).unwrap(); + assert!( + output == expected, + "output different from expected {}, written to {:?}", + item.1, + output_path + ); + } + } + } +}