Skip to content

Commit

Permalink
Refactor send and send_deser_ser.
Browse files Browse the repository at this point in the history
  • Loading branch information
hoolioh committed Feb 28, 2025
1 parent 5efae72 commit e40eb0d
Showing 1 changed file with 23 additions and 21 deletions.
44 changes: 23 additions & 21 deletions data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,17 @@ impl TraceExporter {
trace_count: usize,
) -> Result<AgentResponse, TraceExporterError> {
self.check_agent_info();

match self.input_format {
TraceExporterInputFormat::Proxy => self.send_proxy(data.as_ref(), trace_count),
TraceExporterInputFormat::V04 => self.send_deser_ser(data),
TraceExporterInputFormat::V05 => self.send_deser_ser(data),
TraceExporterInputFormat::V04 => match msgpack_decoder::v04::from_slice(data) {
Ok((traces, _)) => self.send_deser_ser(TraceCollection::TraceChunk(traces)),
Err(e) => Err(TraceExporterError::Deserialization(e)),
},
TraceExporterInputFormat::V05 => match msgpack_decoder::v05::from_slice(data) {
Ok((traces, _)) => self.send_deser_ser(TraceCollection::TraceChunk(traces)),
Err(e) => Err(TraceExporterError::Deserialization(e)),
},
}
.and_then(|res| {
if res.is_empty() {
Expand All @@ -236,6 +243,16 @@ impl TraceExporter {

Ok(AgentResponse::from(res))
})
.map_err(|err| {
if let TraceExporterError::Deserialization(ref e) = err {
error!("Error deserializing trace from request body: {e}");
self.emit_metric(
HealthMetric::Count(health_metrics::STAT_DESER_TRACES_ERRORS, 1),
None,
);
}
err
})
}

/// Safely shutdown the TraceExporter and all related tasks
Expand Down Expand Up @@ -551,25 +568,10 @@ impl TraceExporter {
}
}

fn send_deser_ser(&self, data: tinybytes::Bytes) -> Result<String, TraceExporterError> {
let result = match self.input_format {
TraceExporterInputFormat::V04 => msgpack_decoder::v04::from_slice(data),
TraceExporterInputFormat::V05 => msgpack_decoder::v05::from_slice(data),
TraceExporterInputFormat::Proxy => todo!("Proxy not implemented"),
};

let (mut collection, _) = match result {
Ok((traces, size)) => (TraceCollection::TraceChunk(traces), size),
Err(err) => {
error!("Error deserializing trace from request body: {err}");
self.emit_metric(
HealthMetric::Count(health_metrics::STAT_DESER_TRACES_ERRORS, 1),
None,
);
return Err(TraceExporterError::Deserialization(err));
}
};

fn send_deser_ser(
&self,
mut collection: TraceCollection,
) -> Result<String, TraceExporterError> {
self.emit_metric(
HealthMetric::Count(health_metrics::STAT_DESER_TRACES, collection.len() as i64),
None,
Expand Down

0 comments on commit e40eb0d

Please sign in to comment.