@@ -25,78 +25,31 @@ impl VectorResultsProducer {
25
25
( Self { schema, sender } , worker)
26
26
}
27
27
28
- async fn send_batch (
29
- batch : Vec < Vec < u8 > > ,
30
- client : Client ,
31
- endpoint : String ,
32
- ) -> Result < ( ) , ExtractCodeError > {
33
- if batch. is_empty ( ) {
34
- return Ok ( ( ) ) ;
35
- }
36
-
37
- let body: String = batch
38
- . iter ( )
39
- . filter_map ( |json| String :: from_utf8 ( json. clone ( ) ) . ok ( ) )
40
- . map ( |s| s + "\n " )
41
- . collect ( ) ;
42
-
43
- // Log the exact payload for debugging
44
- tracing:: debug!( %body, "payload.sending" ) ;
45
- tracing:: debug!( size = body. len( ) , "request.sending_to_vector" ) ;
46
-
47
- let response = client
48
- . post ( & endpoint)
49
- . header ( "Content-Type" , "application/json" )
50
- . body ( body)
51
- . send ( )
52
- . await ;
53
-
54
- match response {
55
- Ok ( _) => {
56
- tracing:: debug!( "request.sent_successfully" ) ;
57
- Ok ( ( ) )
58
- }
59
- Err ( e) => {
60
- tracing:: error!( error = ?e, "request.failed_to_vector" ) ;
61
- Err ( ExtractCodeError :: VectorRequestStatusError (
62
- reqwest:: StatusCode :: INTERNAL_SERVER_ERROR ,
63
- ) )
64
- }
65
- }
66
- }
67
-
68
28
fn spawn_worker (
69
29
vector_batch_size : usize ,
70
30
client : Client ,
71
31
endpoint : String ,
72
32
mut receiver : UnboundedReceiver < Vec < u8 > > ,
73
33
) -> JoinHandle < ( ) > {
74
34
tokio:: spawn ( async move {
75
- tracing:: debug!( %endpoint, "worker.started" ) ;
76
35
let mut batch = Vec :: with_capacity ( vector_batch_size) ;
77
36
78
37
while let Some ( json) = receiver. recv ( ) . await {
79
- tracing:: debug!( size = json. len( ) , "event.received" ) ;
80
38
batch. push ( json) ;
81
- tracing:: debug!( size = batch. len( ) , "batch.size.updated" ) ;
82
39
83
40
if batch. len ( ) < vector_batch_size {
84
41
continue ;
85
42
}
86
43
87
- tracing:: debug!( size = batch. len( ) , "batch.sending" ) ;
88
44
let batch_to_send =
89
45
std:: mem:: replace ( & mut batch, Vec :: with_capacity ( vector_batch_size) ) ;
90
- if let Err ( e) =
91
- Self :: send_batch ( batch_to_send, client. clone ( ) , endpoint. clone ( ) ) . await
92
- {
93
- tracing:: error!( error = ?e, "batch.send_failed" ) ;
46
+ if let Err ( e) = send_batch ( batch_to_send, client. clone ( ) , endpoint. clone ( ) ) . await {
47
+ tracing:: error!( error = ?e, "vector_batch.send_failed" ) ;
94
48
}
95
49
}
96
50
97
51
if !batch. is_empty ( ) {
98
- tracing:: debug!( size = batch. len( ) , "final_batch.sending" ) ;
99
- if let Err ( e) = Self :: send_batch ( batch, client, endpoint) . await {
52
+ if let Err ( e) = send_batch ( batch, client, endpoint) . await {
100
53
tracing:: error!( error = ?e, "final_batch.send_failed" ) ;
101
54
}
102
55
}
@@ -114,15 +67,44 @@ impl ResultsProducer for VectorResultsProducer {
114
67
// Send the serialized result to the worker task
115
68
if self . sender . send ( json) . is_err ( ) {
116
69
tracing:: error!( "event.send_failed_channel_closed" ) ;
117
- return Err ( ExtractCodeError :: VectorRequestStatusError (
118
- reqwest:: StatusCode :: INTERNAL_SERVER_ERROR ,
119
- ) ) ;
70
+ return Err ( ExtractCodeError :: VectorError ) ;
120
71
}
121
72
122
73
Ok ( ( ) )
123
74
}
124
75
}
125
76
77
+ async fn send_batch (
78
+ batch : Vec < Vec < u8 > > ,
79
+ client : Client ,
80
+ endpoint : String ,
81
+ ) -> Result < ( ) , ExtractCodeError > {
82
+ if batch. is_empty ( ) {
83
+ return Ok ( ( ) ) ;
84
+ }
85
+
86
+ let body: String = batch
87
+ . iter ( )
88
+ . filter_map ( |json| String :: from_utf8 ( json. clone ( ) ) . ok ( ) )
89
+ . map ( |s| s + "\n " )
90
+ . collect ( ) ;
91
+
92
+ let response = client
93
+ . post ( & endpoint)
94
+ . header ( "Content-Type" , "application/json" )
95
+ . body ( body)
96
+ . send ( )
97
+ . await ;
98
+
99
+ match response {
100
+ Ok ( _) => Ok ( ( ) ) ,
101
+ Err ( e) => {
102
+ tracing:: error!( error = ?e, "request.failed_to_vector" ) ;
103
+ Err ( ExtractCodeError :: VectorError )
104
+ }
105
+ }
106
+ }
107
+
126
108
#[ cfg( test) ]
127
109
mod tests {
128
110
use super :: VectorResultsProducer ;
@@ -170,7 +152,6 @@ mod tests {
170
152
#[ tokio:: test]
171
153
async fn test_single_event ( ) {
172
154
let mock_server = MockServer :: start ( ) ;
173
- tracing:: debug!( "Mock server started at {}" , mock_server. url( "/" ) ) ;
174
155
let test_result = create_test_result ( ) ;
175
156
176
157
let mock = mock_server. mock ( |when, then| {
@@ -179,16 +160,11 @@ mod tests {
179
160
. header ( "Content-Type" , "application/json" )
180
161
. matches ( |req| {
181
162
if let Some ( body) = & req. body {
182
- tracing:: debug!(
183
- "Received request body: {:?}" ,
184
- String :: from_utf8_lossy( body)
185
- ) ;
186
163
let lines: Vec < _ > = body
187
164
. split ( |& b| b == b'\n' )
188
165
. filter ( |l| !l. is_empty ( ) )
189
166
. collect ( ) ;
190
167
if lines. len ( ) != 1 {
191
- tracing:: debug!( "Expected 1 line, got {}" , lines. len( ) ) ;
192
168
return false ;
193
169
}
194
170
if let Ok ( value) = serde_json:: from_slice :: < serde_json:: Value > ( lines[ 0 ] ) {
@@ -221,16 +197,14 @@ mod tests {
221
197
#[ tokio:: test]
222
198
async fn test_batch_events ( ) {
223
199
let mock_server = MockServer :: start ( ) ;
224
- tracing:: debug!( "Mock server started at {}" , mock_server. url( "/" ) ) ;
225
200
let ( producer, worker) = VectorResultsProducer :: new (
226
201
"uptime-results" ,
227
202
mock_server. url ( "/" ) . to_string ( ) ,
228
203
TEST_BATCH_SIZE ,
229
204
) ;
230
205
231
206
// Create and send BATCH_SIZE + 2 events
232
- for i in 0 ..( 10 + 2 ) {
233
- tracing:: debug!( "Sending event {}" , i + 1 ) ;
207
+ for _ in 0 ..( TEST_BATCH_SIZE + 2 ) {
234
208
let test_result = create_test_result ( ) ;
235
209
let result = producer. produce_checker_result ( & test_result) ;
236
210
assert ! ( result. is_ok( ) ) ;
@@ -243,17 +217,12 @@ mod tests {
243
217
. header ( "Content-Type" , "application/json" )
244
218
. matches ( |req| {
245
219
if let Some ( body) = & req. body {
246
- tracing:: debug!(
247
- "Received request body: {:?}" ,
248
- String :: from_utf8_lossy( body)
249
- ) ;
250
220
let lines: Vec < _ > = body
251
221
. split ( |& b| b == b'\n' )
252
222
. filter ( |l| !l. is_empty ( ) )
253
223
. collect ( ) ;
254
224
let len = lines. len ( ) ;
255
225
if len != 10 && len != 2 {
256
- tracing:: debug!( "Expected {} or 2 lines, got {}" , 10 , len) ;
257
226
return false ;
258
227
}
259
228
// Verify each line is valid JSON with expected fields
@@ -289,7 +258,6 @@ mod tests {
289
258
#[ tokio:: test]
290
259
async fn test_server_error ( ) {
291
260
let mock_server = MockServer :: start ( ) ;
292
- tracing:: debug!( "Mock server started at {}" , mock_server. url( "/" ) ) ;
293
261
let test_result = create_test_result ( ) ;
294
262
295
263
let error_mock = mock_server. mock ( |when, then| {
@@ -298,16 +266,11 @@ mod tests {
298
266
. header ( "Content-Type" , "application/json" )
299
267
. matches ( |req| {
300
268
if let Some ( body) = & req. body {
301
- tracing:: debug!(
302
- "Received request body: {:?}" ,
303
- String :: from_utf8_lossy( body)
304
- ) ;
305
269
let lines: Vec < _ > = body
306
270
. split ( |& b| b == b'\n' )
307
271
. filter ( |l| !l. is_empty ( ) )
308
272
. collect ( ) ;
309
273
if lines. len ( ) != 1 {
310
- tracing:: debug!( "Expected 1 line, got {}" , lines. len( ) ) ;
311
274
return false ;
312
275
}
313
276
if let Ok ( value) = serde_json:: from_slice :: < serde_json:: Value > ( lines[ 0 ] ) {
@@ -343,25 +306,19 @@ mod tests {
343
306
#[ tokio:: test]
344
307
async fn test_flush_on_shutdown ( ) {
345
308
let mock_server = MockServer :: start ( ) ;
346
- tracing:: debug!( "Mock server started at {}" , mock_server. url( "/" ) ) ;
347
309
// Create a mock that expects a single request with less than BATCH_SIZE events
348
310
let mock = mock_server. mock ( |when, then| {
349
311
when. method ( Method :: POST )
350
312
. path ( "/" )
351
313
. header ( "Content-Type" , "application/json" )
352
314
. matches ( |req| {
353
315
if let Some ( body) = & req. body {
354
- tracing:: debug!(
355
- "Received request body: {:?}" ,
356
- String :: from_utf8_lossy( body)
357
- ) ;
358
316
let lines: Vec < _ > = body
359
317
. split ( |& b| b == b'\n' )
360
318
. filter ( |l| !l. is_empty ( ) )
361
319
. collect ( ) ;
362
320
// We expect only 1 event, which is less than BATCH_SIZE
363
321
if lines. len ( ) != 1 {
364
- tracing:: debug!( "Expected 1 line, got {}" , lines. len( ) ) ;
365
322
return false ;
366
323
}
367
324
if let Ok ( value) = serde_json:: from_slice :: < serde_json:: Value > ( lines[ 0 ] ) {
0 commit comments