Skip to content

Commit

Permalink
Merge branch 'main' into distributed-insert
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li authored May 23, 2024
2 parents f9224d7 + 3965f3a commit 8f1d9ba
Showing 1 changed file with 1 addition and 56 deletions.
57 changes: 1 addition & 56 deletions src/common/tracing/src/loggers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,15 @@ use std::fmt;
use std::io::BufWriter;
use std::path::Path;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use std::time::SystemTime;

use databend_common_base::runtime::ThreadTracker;
use fern::FormatCallback;
use opentelemetry::global;
use opentelemetry::logs::AnyValue;
use opentelemetry::logs::LogError;
use opentelemetry::logs::LogResult;
use opentelemetry::logs::Severity;
use opentelemetry::InstrumentationLibrary;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::export::logs::LogData;
use opentelemetry_sdk::export::logs::LogExporter;
use opentelemetry_sdk::logs::LogProcessor;
use serde_json::Map;
use tracing_appender::non_blocking::NonBlocking;
use tracing_appender::non_blocking::WorkerGuard;
Expand Down Expand Up @@ -96,54 +89,6 @@ pub(crate) struct OpenTelemetryLogger {
provider: opentelemetry_sdk::logs::LoggerProvider,
}

#[derive(Debug)]
struct LogProcessorWarp<T: LogExporter>(Mutex<Box<T>>);

impl<T: LogExporter> LogProcessor for LogProcessorWarp<T> {
fn emit(&self, data: LogData) {
match self.0.lock() {
Ok(mut exporter) => {
let err = databend_common_base::runtime::block_on(
databend_common_base::base::tokio::time::timeout(
Duration::from_secs(20),
exporter.export(vec![data]),
),
);

match err {
Ok(Ok(_)) => {
// do nothing
}
Ok(Err(err)) => {
global::handle_error(err);
}
Err(_) => {
global::handle_error(LogError::Other("timeout with lock".into()));
}
}
}
Err(_) => {
global::handle_error(LogError::Other("simple logprocessor mutex poison".into()));
}
}
}

fn force_flush(&self) -> LogResult<()> {
Ok(())
}

fn shutdown(&mut self) -> LogResult<()> {
if let Ok(mut exporter) = self.0.lock() {
exporter.shutdown();
Ok(())
} else {
Err(LogError::Other(
"simple logprocessor mutex poison during shutdown".into(),
))
}
}
}

impl OpenTelemetryLogger {
pub(crate) fn new(
name: impl ToString,
Expand All @@ -168,7 +113,7 @@ impl OpenTelemetryLogger {
.build_log_exporter()
.expect("build log exporter");
let provider = opentelemetry_sdk::logs::LoggerProvider::builder()
.with_log_processor(LogProcessorWarp(Mutex::new(Box::new(exporter))))
.with_simple_exporter(exporter)
.with_config(
opentelemetry_sdk::logs::Config::default()
.with_resource(opentelemetry_sdk::Resource::new(kvs)),
Expand Down

0 comments on commit 8f1d9ba

Please sign in to comment.