Skip to content

Commit 3f327a1

Browse files
izquierdocijothomaslalitb
authored
Fix error message caused by race condition when using PeriodicReader (#1481)
Co-authored-by: Cijo Thomas <[email protected]> Co-authored-by: Lalit Kumar Bhasin <[email protected]>
1 parent c60a178 commit 3f327a1

File tree

2 files changed

+80
-42
lines changed

2 files changed

+80
-42
lines changed

opentelemetry-sdk/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@
4646
- `LoggerProviderInner` is no longer `pub (crate)`
4747
- `Logger.provider()` now returns `&LoggerProvider` instead of an `Option<LoggerProvider>`
4848

49+
### Fixed
50+
51+
- [#1481](https://github.com/open-telemetry/opentelemetry-rust/pull/1481) Fix error message caused by race condition when using PeriodicReader
52+
4953
## v0.21.2
5054

5155
### Fixed

opentelemetry-sdk/src/metrics/periodic_reader.rs

Lines changed: 76 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::{
2-
env, fmt,
2+
env, fmt, mem,
33
sync::{Arc, Mutex, Weak},
44
time::Duration,
55
};
@@ -127,37 +127,39 @@ where
127127
/// Create a [PeriodicReader] with the given config.
128128
pub fn build(self) -> PeriodicReader {
129129
let (message_sender, message_receiver) = mpsc::channel(256);
130-
let ticker = self
131-
.runtime
132-
.interval(self.interval)
133-
.map(|_| Message::Export);
134130

135-
let messages = Box::pin(stream::select(message_receiver, ticker));
136-
let reader = PeriodicReader {
131+
let worker = move |reader: &PeriodicReader| {
132+
let ticker = self
133+
.runtime
134+
.interval(self.interval)
135+
.map(|_| Message::Export);
136+
137+
let messages = Box::pin(stream::select(message_receiver, ticker));
138+
139+
let runtime = self.runtime.clone();
140+
self.runtime.spawn(Box::pin(
141+
PeriodicReaderWorker {
142+
reader: reader.clone(),
143+
timeout: self.timeout,
144+
runtime,
145+
rm: ResourceMetrics {
146+
resource: Resource::empty(),
147+
scope_metrics: Vec::new(),
148+
},
149+
}
150+
.run(messages),
151+
));
152+
};
153+
154+
PeriodicReader {
137155
exporter: Arc::new(self.exporter),
138156
inner: Arc::new(Mutex::new(PeriodicReaderInner {
139157
message_sender,
140-
sdk_producer: None,
141158
is_shutdown: false,
142159
external_producers: self.producers,
160+
sdk_producer_or_worker: ProducerOrWorker::Worker(Box::new(worker)),
143161
})),
144-
};
145-
146-
let runtime = self.runtime.clone();
147-
self.runtime.spawn(Box::pin(
148-
PeriodicReaderWorker {
149-
reader: reader.clone(),
150-
timeout: self.timeout,
151-
runtime,
152-
rm: ResourceMetrics {
153-
resource: Resource::empty(),
154-
scope_metrics: Vec::new(),
155-
},
156-
}
157-
.run(messages),
158-
));
159-
160-
reader
162+
}
161163
}
162164
}
163165

@@ -223,9 +225,9 @@ impl fmt::Debug for PeriodicReader {
223225

224226
struct PeriodicReaderInner {
225227
message_sender: mpsc::Sender<Message>,
226-
sdk_producer: Option<Weak<dyn SdkProducer>>,
227228
is_shutdown: bool,
228229
external_producers: Vec<Box<dyn MetricProducer>>,
230+
sdk_producer_or_worker: ProducerOrWorker,
229231
}
230232

231233
#[derive(Debug)]
@@ -235,6 +237,11 @@ enum Message {
235237
Shutdown(oneshot::Sender<Result<()>>),
236238
}
237239

240+
enum ProducerOrWorker {
241+
Producer(Weak<dyn SdkProducer>),
242+
Worker(Box<dyn FnOnce(&PeriodicReader) + Send + Sync>),
243+
}
244+
238245
struct PeriodicReaderWorker<RT: Runtime> {
239246
reader: PeriodicReader,
240247
timeout: Duration,
@@ -311,14 +318,19 @@ impl MetricReader for PeriodicReader {
311318
Err(_) => return,
312319
};
313320

314-
// Only register once. If producer is already set, do nothing.
315-
if inner.sdk_producer.is_none() {
316-
inner.sdk_producer = Some(pipeline);
317-
} else {
318-
global::handle_error(MetricsError::Other(
319-
"duplicate meter registration, did not register manual reader".into(),
320-
))
321-
}
321+
let worker = match &mut inner.sdk_producer_or_worker {
322+
ProducerOrWorker::Producer(_) => {
323+
// Only register once. If producer is already set, do nothing.
324+
global::handle_error(MetricsError::Other(
325+
"duplicate meter registration, did not register manual reader".into(),
326+
));
327+
return;
328+
}
329+
ProducerOrWorker::Worker(w) => mem::replace(w, Box::new(|_| {})),
330+
};
331+
332+
inner.sdk_producer_or_worker = ProducerOrWorker::Producer(pipeline);
333+
worker(self);
322334
}
323335

324336
fn collect(&self, rm: &mut ResourceMetrics) -> Result<()> {
@@ -327,14 +339,14 @@ impl MetricReader for PeriodicReader {
327339
return Err(MetricsError::Other("reader is shut down".into()));
328340
}
329341

330-
match &inner.sdk_producer.as_ref().and_then(|w| w.upgrade()) {
331-
Some(producer) => producer.produce(rm)?,
332-
None => {
333-
return Err(MetricsError::Other(
334-
"reader is shut down or not registered".into(),
335-
))
336-
}
337-
};
342+
if let Some(producer) = match &inner.sdk_producer_or_worker {
343+
ProducerOrWorker::Producer(sdk_producer) => sdk_producer.upgrade(),
344+
ProducerOrWorker::Worker(_) => None,
345+
} {
346+
producer.produce(rm)?;
347+
} else {
348+
return Err(MetricsError::Other("reader is not registered".into()));
349+
}
338350

339351
let mut errs = vec![];
340352
for producer in &inner.external_producers {
@@ -392,3 +404,25 @@ impl MetricReader for PeriodicReader {
392404
shutdown_result
393405
}
394406
}
407+
408+
#[cfg(all(test, feature = "testing"))]
409+
mod tests {
410+
use super::PeriodicReader;
411+
use crate::{
412+
metrics::data::ResourceMetrics, metrics::reader::MetricReader, runtime,
413+
testing::metrics::InMemoryMetricsExporter, Resource,
414+
};
415+
416+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
417+
async fn unregistered_collect() {
418+
let exporter = InMemoryMetricsExporter::default();
419+
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
420+
421+
let mut rm = ResourceMetrics {
422+
resource: Resource::empty(),
423+
scope_metrics: Vec::new(),
424+
};
425+
let result = reader.collect(&mut rm);
426+
result.expect_err("error expected when reader is not registered");
427+
}
428+
}

0 commit comments

Comments
 (0)