Skip to content

Commit

Permalink
feat: log errors on panic in sort_dedupe_persist (#25678)
Browse files Browse the repository at this point in the history
This adds some error handling and logging around the method that sorts,
deduplicates, and persists parquet data during the snapshot process

The errors will need to be handled in follow-on work, but this is for
helping debug fatal errors during the process.
  • Loading branch information
hiltontj authored Dec 18, 2024
1 parent d265e11 commit 93222f7
Showing 1 changed file with 61 additions and 15 deletions.
76 changes: 61 additions & 15 deletions influxdb3_write/src/write_buffer/queryable_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::write_buffer::persisted_files::PersistedFiles;
use crate::write_buffer::table_buffer::TableBuffer;
use crate::write_buffer::PluginEvent;
use crate::{ParquetFile, ParquetFileId, PersistedSnapshot};
use anyhow::Context;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use data_types::{
Expand Down Expand Up @@ -291,22 +292,38 @@ impl QueryableBuffer {
let min_time = persist_job.timestamp_min_max.min;
let max_time = persist_job.timestamp_min_max.max;

let (size_bytes, meta, cache_notifier) = sort_dedupe_persist(
let SortDedupePersistSummary {
file_size_bytes,
file_meta_data,
parquet_cache_rx,
} = sort_dedupe_persist(
persist_job,
Arc::clone(&persister),
Arc::clone(&executor),
parquet_cache.clone(),
)
.await;
cache_notifiers.push(cache_notifier);
.await
.inspect_err(|error| {
error!(
%error,
debug = ?error,
"error during sort, deduplicate, and persist of buffer data as parquet"
);
})
// for now, we are still panicking in this case, see:
// https://github.com/influxdata/influxdb/issues/25676
// https://github.com/influxdata/influxdb/issues/25677
.expect("sort, deduplicate, and persist buffer data as parquet");

cache_notifiers.push(parquet_cache_rx);
persisted_snapshot.add_parquet_file(
database_id,
table_id,
ParquetFile {
id: ParquetFileId::new(),
path,
size_bytes,
row_count: meta.num_rows as u64,
size_bytes: file_size_bytes,
row_count: file_meta_data.num_rows as u64,
chunk_time,
min_time,
max_time,
Expand Down Expand Up @@ -566,12 +583,31 @@ struct PersistJob {
sort_key: SortKey,
}

pub(crate) struct SortDedupePersistSummary {
pub file_size_bytes: u64,
pub file_meta_data: FileMetaData,
pub parquet_cache_rx: Option<oneshot::Receiver<()>>,
}

impl SortDedupePersistSummary {
fn new(
file_size_bytes: u64,
file_meta_data: FileMetaData,
parquet_cache_rx: Option<oneshot::Receiver<()>>,
) -> Self {
Self {
file_size_bytes,
file_meta_data,
parquet_cache_rx,
}
}
}
async fn sort_dedupe_persist(
persist_job: PersistJob,
persister: Arc<Persister>,
executor: Arc<Executor>,
parquet_cache: Option<Arc<dyn ParquetCacheOracle>>,
) -> (u64, FileMetaData, Option<oneshot::Receiver<()>>) {
) -> Result<SortDedupePersistSummary, anyhow::Error> {
// Dedupe and sort using the COMPACT query built into
// iox_query
let row_count = persist_job.batch.num_rows();
Expand Down Expand Up @@ -617,13 +653,20 @@ async fn sort_dedupe_persist(
chunks,
persist_job.sort_key,
)
.unwrap();
.context(
"failed to produce a logical plan to deduplicate and sort chunked data from the buffer",
)?;

// Build physical plan
let physical_plan = ctx.create_physical_plan(&logical_plan).await.unwrap();
let physical_plan = ctx.create_physical_plan(&logical_plan).await.context(
"failed to produce a physical plan to deduplicate and sort chunked data from the buffer",
)?;

// Execute the plan and return compacted record batches
let data = ctx.collect(physical_plan).await.unwrap();
let data = ctx
.collect(physical_plan)
.await
.context("failed to execute the sort and deduplication of chunked data from the buffer")?;

// keep attempting to persist forever. If we can't reach the object store, we'll stop accepting
// writes elsewhere in the system, so we need to keep trying to persist.
Expand All @@ -636,14 +679,17 @@ async fn sort_dedupe_persist(
{
Ok((size_bytes, meta)) => {
info!("Persisted parquet file: {}", persist_job.path.to_string());
if let Some(pq) = parquet_cache {
let parquet_cache_rx = parquet_cache.map(|parquet_cache_oracle| {
let (cache_request, cache_notify_rx) =
CacheRequest::create(Path::from(persist_job.path.to_string()));
pq.register(cache_request);
return (size_bytes, meta, Some(cache_notify_rx));
} else {
return (size_bytes, meta, None);
}
parquet_cache_oracle.register(cache_request);
cache_notify_rx
});
return Ok(SortDedupePersistSummary::new(
size_bytes,
meta,
parquet_cache_rx,
));
}
Err(e) => {
error!(
Expand Down

0 comments on commit 93222f7

Please sign in to comment.