From 06f6bda2da5ba5334a27ceddfdf1304efcafe09e Mon Sep 17 00:00:00 2001 From: Jackson Newhouse Date: Thu, 19 Dec 2024 10:58:56 -0800 Subject: [PATCH] fix(catalog): consistent ordering of catalog operations --- influxdb3_catalog/src/catalog.rs | 236 +++++++++++------- influxdb3_wal/src/create.rs | 8 +- influxdb3_wal/src/lib.rs | 53 +++- influxdb3_wal/src/object_store.rs | 39 ++- influxdb3_write/src/persister.rs | 31 +-- influxdb3_write/src/write_buffer/mod.rs | 125 ++++++---- .../src/write_buffer/queryable_buffer.rs | 12 +- influxdb3_write/src/write_buffer/validator.rs | 72 ++++-- 8 files changed, 361 insertions(+), 215 deletions(-) diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index 611c0892eb4..273918b1d25 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -1,16 +1,17 @@ //! Implementation of the Catalog that sits entirely in memory. use crate::catalog::Error::{ - ProcessingEngineCallExists, ProcessingEngineTriggerExists, TableNotFound, + CatalogUpdatedElsewhere, ProcessingEngineCallExists, ProcessingEngineTriggerExists, + TableNotFound, }; -use bimap::BiHashMap; +use bimap::{BiHashMap, Overwritten}; use hashbrown::HashMap; use indexmap::IndexMap; use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId}; use influxdb3_wal::{ CatalogBatch, CatalogOp, DeleteDatabaseDefinition, DeleteTableDefinition, FieldAdditions, FieldDefinition, LastCacheDefinition, LastCacheDelete, MetaCacheDefinition, MetaCacheDelete, - PluginDefinition, TriggerDefinition, + OrderedCatalogBatch, PluginDefinition, TriggerDefinition, }; use influxdb_line_protocol::FieldValue; use iox_time::Time; @@ -184,10 +185,26 @@ impl Catalog { } } - pub fn apply_catalog_batch(&self, catalog_batch: &CatalogBatch) -> Result<()> { + pub fn apply_catalog_batch( + &self, + catalog_batch: CatalogBatch, + ) -> Result> { self.inner.write().apply_catalog_batch(catalog_batch) } + // Checks the sequence number to see if it needs to be applied. + pub fn apply_ordered_catalog_batch( + &self, + batch: OrderedCatalogBatch, + ) -> Result> { + if batch.sequence_number() >= self.sequence_number().as_u32() { + if let Some(catalog_batch) = self.apply_catalog_batch(batch.batch())? { + return Ok(Some(catalog_batch.batch())); + } + } + Ok(None) + } + pub fn db_or_create(&self, db_name: &str) -> Result> { let db = match self.db_schema(db_name) { Some(db) => db, @@ -259,81 +276,6 @@ impl Catalog { self.inner.read().clone() } - pub fn add_meta_cache(&self, db_id: DbId, table_id: TableId, meta_cache: MetaCacheDefinition) { - let mut inner = self.inner.write(); - let mut db = inner - .databases - .get(&db_id) - .expect("db should exist") - .as_ref() - .clone(); - let mut table = db - .table_definition_by_id(&table_id) - .expect("table should exist") - .as_ref() - .clone(); - table.add_meta_cache(meta_cache); - db.insert_table(table_id, Arc::new(table)); - inner.upsert_db(db); - } - - pub fn remove_meta_cache(&self, db_id: &DbId, table_id: &TableId, name: &str) { - let mut inner = self.inner.write(); - let mut db = inner - .databases - .get(db_id) - .expect("db should exist") - .as_ref() - .clone(); - let mut table = db - .tables - .get(table_id) - .expect("table should exist") - .as_ref() - .clone(); - table.remove_meta_cache(name); - db.insert_table(*table_id, Arc::new(table)); - inner.upsert_db(db); - } - - pub fn add_last_cache(&self, db_id: DbId, table_id: TableId, last_cache: LastCacheDefinition) { - let mut inner = self.inner.write(); - let mut db = inner - .databases - .get(&db_id) - .expect("db should exist") - .as_ref() - .clone(); - let mut table = db - .tables - .get(&table_id) - .expect("table should exist") - .as_ref() - .clone(); - table.add_last_cache(last_cache); - db.insert_table(table_id, Arc::new(table)); - inner.upsert_db(db); - } - - pub fn delete_last_cache(&self, db_id: DbId, table_id: TableId, name: &str) { - let mut inner = self.inner.write(); - let mut db = inner - .databases - .get(&db_id) - .expect("db should exist") - .as_ref() - .clone(); - let mut table = db - .tables - .get(&table_id) - .expect("table should exist") - .as_ref() - .clone(); - table.remove_last_cache(name); - db.insert_table(table_id, Arc::new(table)); - inner.upsert_db(db); - } - pub fn instance_id(&self) -> Arc { Arc::clone(&self.inner.read().instance_id) } @@ -478,24 +420,31 @@ impl InnerCatalog { /// Applies the `CatalogBatch` while validating that all updates are compatible. If updates /// have already been applied, the sequence number and updated tracker are not updated. - pub fn apply_catalog_batch(&mut self, catalog_batch: &CatalogBatch) -> Result<()> { + pub fn apply_catalog_batch( + &mut self, + catalog_batch: CatalogBatch, + ) -> Result> { let table_count = self.table_count(); if let Some(db) = self.databases.get(&catalog_batch.database_id) { - if let Some(new_db) = DatabaseSchema::new_if_updated_from_batch(db, catalog_batch)? { + if let Some(new_db) = DatabaseSchema::new_if_updated_from_batch(db, &catalog_batch)? { check_overall_table_count(Some(db), &new_db, table_count)?; self.upsert_db(new_db); + } else { + return Ok(None); } } else { if self.databases.len() >= Catalog::NUM_DBS_LIMIT { return Err(Error::TooManyDbs); } - let new_db = DatabaseSchema::new_from_batch(catalog_batch)?; + let new_db = DatabaseSchema::new_from_batch(&catalog_batch)?; check_overall_table_count(None, &new_db, table_count)?; self.upsert_db(new_db); } - - Ok(()) + Ok(Some(OrderedCatalogBatch::new( + catalog_batch, + self.sequence.0, + ))) } pub fn db_exists(&self, db_id: DbId) -> bool { @@ -596,10 +545,21 @@ impl DatabaseSchema { &mut self, table_id: TableId, table_def: Arc, - ) -> Option> { - self.table_map - .insert(table_id, Arc::clone(&table_def.table_name)); - self.tables.insert(table_id, table_def) + ) -> Result>> { + match self + .table_map + .insert(table_id, Arc::clone(&table_def.table_name)) + { + Overwritten::Left(_, _) | Overwritten::Right(_, _) | Overwritten::Both(_, _) => { + // This will happen if another table was inserted with the same name between checking + // for existence and insertion. + // We'd like this to be automatically handled by the system, + // but for now it is better to error than get into an inconsistent state. + return Err(CatalogUpdatedElsewhere); + } + Overwritten::Neither | Overwritten::Pair(_, _) => {} + } + Ok(self.tables.insert(table_id, table_def)) } pub fn table_schema(&self, table_name: impl Into>) -> Option { @@ -722,14 +682,14 @@ impl UpdateDatabaseSchema for influxdb3_wal::TableDefinition { if let Cow::Owned(updated_table) = existing_table.check_and_add_new_fields(self)? { database_schema .to_mut() - .insert_table(self.table_id, Arc::new(updated_table)); + .insert_table(self.table_id, Arc::new(updated_table))?; } } None => { let new_table = TableDefinition::new_from_op(self); database_schema .to_mut() - .insert_table(new_table.table_id, Arc::new(new_table)); + .insert_table(new_table.table_id, Arc::new(new_table))?; } } Ok(database_schema) @@ -1146,7 +1106,7 @@ impl UpdateDatabaseSchema for T { if let Cow::Owned(new_table) = self.update_table(Cow::Borrowed(table.as_ref()))? { schema .to_mut() - .insert_table(new_table.table_id, Arc::new(new_table)); + .insert_table(new_table.table_id, Arc::new(new_table))?; } Ok(schema) } @@ -1278,12 +1238,15 @@ pub fn influx_column_type_from_field_value(fv: &FieldValue<'_>) -> InfluxColumnT #[cfg(test)] mod tests { - use influxdb3_wal::{create, FieldDataType}; + use super::*; + use influxdb3_wal::object_store::WalObjectStore; + use influxdb3_wal::CatalogOp::CreateTable; + use influxdb3_wal::{create, FieldDataType, Gen1Duration, WalConfig, WalFileNotifier, WalOp}; + use iox_time::{MockProvider, TimeProvider}; use pretty_assertions::assert_eq; + use std::time::Duration; use test_helpers::assert_contains; - use super::*; - #[test] fn catalog_serialization() { let host_id = Arc::from("sample-host-id"); @@ -1697,7 +1660,7 @@ mod tests { let catalog = Catalog::new(Arc::from("host"), Arc::from("instance")); catalog.insert_database(DatabaseSchema::new(DbId::new(), Arc::from("foo"))); let db_id = catalog.db_name_to_id("foo").unwrap(); - let catalog_batch = create::catalog_batch_op( + let catalog_batch = create::catalog_batch( db_id, "foo", 0, @@ -1714,7 +1677,7 @@ mod tests { )], ); let err = catalog - .apply_catalog_batch(catalog_batch.as_catalog().unwrap()) + .apply_catalog_batch(catalog_batch) .expect_err("should fail to apply AddFields operation for non-existent table"); assert_contains!(err.to_string(), "Table banana not in DB schema for foo"); } @@ -1765,7 +1728,9 @@ mod tests { .unwrap(), ); - database.insert_table(deleted_table_id, table_defn); + database + .insert_table(deleted_table_id, table_defn) + .expect("should be able to insert"); let new_db = DatabaseSchema::new_if_updated_from_batch( &database, &CatalogBatch { @@ -1788,4 +1753,81 @@ mod tests { assert!(deleted_table.deleted); assert!(!deleted_table.series_key.is_empty()); } + + // tests that sorting catalog ops by the sequence number returned from apply_catalog_batch + // fixes potential ordering issues. + #[test] + fn test_out_of_order_ops() -> Result<()> { + let catalog = Catalog::new(Arc::from("host"), Arc::from("instance")); + let db_id = DbId::new(); + let db_name = Arc::from("foo"); + let table_id = TableId::new(); + let table_name = Arc::from("bar"); + let table_definition = influxdb3_wal::TableDefinition { + database_id: db_id, + database_name: Arc::clone(&db_name), + table_name: Arc::clone(&table_name), + table_id, + field_definitions: vec![ + FieldDefinition::new(ColumnId::from(0), "tag_1", FieldDataType::Tag), + FieldDefinition::new(ColumnId::from(1), "time", FieldDataType::Timestamp), + FieldDefinition::new(ColumnId::from(2), "field", FieldDataType::String), + ], + key: vec![ColumnId::from(0)], + }; + let create_op = CatalogBatch { + database_id: db_id, + database_name: Arc::clone(&db_name), + time_ns: 0, + ops: vec![CreateTable(table_definition.clone())], + }; + let add_column_op = CatalogBatch { + database_id: db_id, + database_name: Arc::clone(&db_name), + time_ns: 0, + ops: vec![CatalogOp::AddFields(FieldAdditions { + database_name: Arc::clone(&db_name), + database_id: db_id, + table_name, + table_id, + field_definitions: vec![FieldDefinition::new( + ColumnId::from(3), + "tag_2", + FieldDataType::Tag, + )], + })], + }; + let create_ordered_op = catalog + .apply_catalog_batch(create_op)? + .expect("should be able to create"); + let add_column_op = catalog + .apply_catalog_batch(add_column_op)? + .expect("should produce operation"); + let mut ops = vec![ + WalOp::Catalog(add_column_op), + WalOp::Catalog(create_ordered_op), + ]; + ops.sort(); + + let replayed_catalog = Catalog::new(Arc::from("host"), Arc::from("instance")); + for op in ops { + let WalOp::Catalog(catalog_batch) = op else { + panic!("should produce operation"); + }; + replayed_catalog.apply_catalog_batch(catalog_batch.batch())?; + } + let original_table = catalog + .db_schema_by_id(&db_id) + .unwrap() + .table_definition_by_id(&table_id) + .unwrap(); + let replayed_table = catalog + .db_schema_by_id(&db_id) + .unwrap() + .table_definition_by_id(&table_id) + .unwrap(); + + assert_eq!(original_table, replayed_table); + Ok(()) + } } diff --git a/influxdb3_wal/src/create.rs b/influxdb3_wal/src/create.rs index d1fb4e33122..3521f53d152 100644 --- a/influxdb3_wal/src/create.rs +++ b/influxdb3_wal/src/create.rs @@ -43,18 +43,18 @@ pub fn write_batch_op(write_batch: WriteBatch) -> WalOp { WalOp::Write(write_batch) } -pub fn catalog_batch_op( +pub fn catalog_batch( db_id: DbId, db_name: impl Into>, time_ns: i64, ops: impl IntoIterator, -) -> WalOp { - WalOp::Catalog(CatalogBatch { +) -> CatalogBatch { + CatalogBatch { database_id: db_id, database_name: db_name.into(), time_ns, ops: ops.into_iter().collect(), - }) + } } pub fn add_fields_op( diff --git a/influxdb3_wal/src/lib.rs b/influxdb3_wal/src/lib.rs index 06f3ca5a28e..532748e8b56 100644 --- a/influxdb3_wal/src/lib.rs +++ b/influxdb3_wal/src/lib.rs @@ -21,6 +21,7 @@ use observability_deps::tracing::error; use schema::{InfluxColumnType, InfluxFieldType}; use serde::{Deserialize, Serialize}; use serde_with::serde_as; +use std::cmp::Ordering; use std::fmt::Debug; use std::str::FromStr; use std::sync::Arc; @@ -213,7 +214,31 @@ impl Default for Gen1Duration { #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub enum WalOp { Write(WriteBatch), - Catalog(CatalogBatch), + Catalog(OrderedCatalogBatch), +} + +impl PartialOrd for WalOp { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for WalOp { + fn cmp(&self, other: &Self) -> Ordering { + match (self, other) { + // Catalog ops come before Write ops + (WalOp::Catalog(_), WalOp::Write(_)) => Ordering::Less, + (WalOp::Write(_), WalOp::Catalog(_)) => Ordering::Greater, + + // For two Catalog ops, compare by database_sequence_number + (WalOp::Catalog(a), WalOp::Catalog(b)) => { + a.database_sequence_number.cmp(&b.database_sequence_number) + } + + // For two Write ops, consider them equal + (WalOp::Write(_), WalOp::Write(_)) => Ordering::Equal, + } + } } impl WalOp { @@ -227,7 +252,7 @@ impl WalOp { pub fn as_catalog(&self) -> Option<&CatalogBatch> { match self { WalOp::Write(_) => None, - WalOp::Catalog(c) => Some(c), + WalOp::Catalog(c) => Some(&c.catalog), } } } @@ -240,6 +265,30 @@ pub struct CatalogBatch { pub ops: Vec, } +/// A catalog batch that has been processed by the catalog and given a sequence number. +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub struct OrderedCatalogBatch { + catalog: CatalogBatch, + database_sequence_number: u32, +} + +impl OrderedCatalogBatch { + pub fn new(catalog: CatalogBatch, database_sequence_number: u32) -> Self { + Self { + catalog, + database_sequence_number, + } + } + + pub fn sequence_number(&self) -> u32 { + self.database_sequence_number + } + + pub fn batch(self) -> CatalogBatch { + self.catalog + } +} + #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub enum CatalogOp { CreateDatabase(DatabaseDefinition), diff --git a/influxdb3_wal/src/object_store.rs b/influxdb3_wal/src/object_store.rs index f5e1dd95107..741911a4abd 100644 --- a/influxdb3_wal/src/object_store.rs +++ b/influxdb3_wal/src/object_store.rs @@ -1,8 +1,8 @@ use crate::serialize::verify_file_type_and_deserialize; use crate::snapshot_tracker::{SnapshotInfo, SnapshotTracker, WalPeriod}; use crate::{ - background_wal_flush, CatalogBatch, SnapshotDetails, SnapshotSequenceNumber, Wal, WalConfig, - WalContents, WalFileNotifier, WalFileSequenceNumber, WalOp, WriteBatch, + background_wal_flush, OrderedCatalogBatch, SnapshotDetails, SnapshotSequenceNumber, Wal, + WalConfig, WalContents, WalFileNotifier, WalFileSequenceNumber, WalOp, WriteBatch, }; use bytes::Bytes; use data_types::Timestamp; @@ -516,7 +516,7 @@ struct WalBuffer { op_limit: usize, op_count: usize, database_to_write_batch: HashMap, WriteBatch>, - catalog_batches: Vec, + catalog_batches: Vec, write_op_responses: Vec>, } @@ -594,21 +594,18 @@ impl WalBuffer { } for catalog_batch in &self.catalog_batches { - min_timestamp_ns = min_timestamp_ns.min(catalog_batch.time_ns); - max_timestamp_ns = max_timestamp_ns.max(catalog_batch.time_ns); + min_timestamp_ns = min_timestamp_ns.min(catalog_batch.catalog.time_ns); + max_timestamp_ns = max_timestamp_ns.max(catalog_batch.catalog.time_ns); } // have the catalog ops come before any writes in ordering let mut ops = Vec::with_capacity(self.database_to_write_batch.len() + self.catalog_batches.len()); - for catalog_batch in self.catalog_batches { - ops.push(WalOp::Catalog(catalog_batch)); - } + ops.extend(self.catalog_batches.into_iter().map(WalOp::Catalog)); + ops.extend(self.database_to_write_batch.into_values().map(WalOp::Write)); - for write_batch in self.database_to_write_batch.into_values() { - ops.push(WalOp::Write(write_batch)); - } + ops.sort(); ( WalContents { @@ -667,7 +664,7 @@ mod tests { let time_provider: Arc = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); let object_store: Arc = Arc::new(InMemory::new()); - let notifier: Arc = Arc::new(TestNotfiier::default()); + let notifier: Arc = Arc::new(TestNotifier::default()); let wal_config = WalConfig { max_write_buffer_size: 100, flush_interval: Duration::from_secs(1), @@ -877,7 +874,7 @@ mod tests { ); // before we trigger a snapshot, test replay with a new wal and notifier - let replay_notifier: Arc = Arc::new(TestNotfiier::default()); + let replay_notifier: Arc = Arc::new(TestNotifier::default()); let replay_wal = WalObjectStore::new_without_replay( Arc::clone(&time_provider), Arc::clone(&object_store), @@ -897,7 +894,7 @@ mod tests { replay_wal.replay().await.unwrap(); let replay_notifier = replay_notifier .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap(); { @@ -1008,7 +1005,7 @@ mod tests { }, ); - let notifier = notifier.as_any().downcast_ref::().unwrap(); + let notifier = notifier.as_any().downcast_ref::().unwrap(); { let notified_writes = notifier.notified_writes.lock(); @@ -1022,7 +1019,7 @@ mod tests { .await; // test that replay now only has file 3 - let replay_notifier: Arc = Arc::new(TestNotfiier::default()); + let replay_notifier: Arc = Arc::new(TestNotifier::default()); let replay_wal = WalObjectStore::new_without_replay( Arc::clone(&time_provider), object_store, @@ -1039,7 +1036,7 @@ mod tests { replay_wal.replay().await.unwrap(); let replay_notifier = replay_notifier .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap(); let notified_writes = replay_notifier.notified_writes.lock(); assert_eq!(*notified_writes, vec![file_3_contents.clone()]); @@ -1052,7 +1049,7 @@ mod tests { let time_provider: Arc = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); let object_store: Arc = Arc::new(InMemory::new()); - let notifier: Arc = Arc::new(TestNotfiier::default()); + let notifier: Arc = Arc::new(TestNotifier::default()); let wal_config = WalConfig { max_write_buffer_size: 100, flush_interval: Duration::from_secs(1), @@ -1070,7 +1067,7 @@ mod tests { ); assert!(wal.flush_buffer().await.is_none()); - let notifier = notifier.as_any().downcast_ref::().unwrap(); + let notifier = notifier.as_any().downcast_ref::().unwrap(); assert!(notifier.notified_writes.lock().is_empty()); // make sure no wal file was written @@ -1096,13 +1093,13 @@ mod tests { } #[derive(Debug, Default)] - struct TestNotfiier { + struct TestNotifier { notified_writes: parking_lot::Mutex>, snapshot_details: parking_lot::Mutex>, } #[async_trait] - impl WalFileNotifier for TestNotfiier { + impl WalFileNotifier for TestNotifier { fn notify(&self, write: WalContents) { self.notified_writes.lock().push(write); } diff --git a/influxdb3_write/src/persister.rs b/influxdb3_write/src/persister.rs index 23277bceebc..041e573008e 100644 --- a/influxdb3_write/src/persister.rs +++ b/influxdb3_write/src/persister.rs @@ -16,8 +16,8 @@ use datafusion::execution::memory_pool::UnboundedMemoryPool; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::physical_plan::SendableRecordBatchStream; use futures_util::pin_mut; -use futures_util::stream::StreamExt; use futures_util::stream::TryStreamExt; +use futures_util::stream::{FuturesOrdered, StreamExt}; use influxdb3_cache::last_cache; use influxdb3_catalog::catalog::Catalog; use influxdb3_catalog::catalog::InnerCatalog; @@ -32,7 +32,6 @@ use std::any::Any; use std::io::Write; use std::sync::Arc; use thiserror::Error; -use tokio::task::JoinSet; use uuid::Uuid; #[derive(Debug, Error)] @@ -188,8 +187,9 @@ impl Persister { /// /// This is intended to be used on server start. pub async fn load_snapshots(&self, mut most_recent_n: usize) -> Result> { - let mut join_set = JoinSet::new(); + let mut futures = FuturesOrdered::new(); let mut offset: Option = None; + while most_recent_n > 0 { let count = if most_recent_n > 1000 { most_recent_n -= 1000; @@ -238,7 +238,7 @@ impl Persister { } for item in &list[0..end] { - join_set.spawn(get_snapshot( + futures.push_back(get_snapshot( item.location.clone(), Arc::clone(&self.object_store), )); @@ -254,8 +254,11 @@ impl Persister { offset = Some(list[end - 1].location.clone()); } - // Returns an error if there is one and reuses the Vec's memory as well - join_set.join_all().await.into_iter().collect() + let mut results = Vec::new(); + while let Some(result) = futures.next().await { + results.push(result?); + } + Ok(results) } /// Loads a Parquet file from ObjectStore @@ -486,7 +489,7 @@ mod tests { persister.persist_catalog(&catalog).await.unwrap(); let batch = |name: &str, num: u32| { - let _ = catalog.apply_catalog_batch(&CatalogBatch { + let _ = catalog.apply_catalog_batch(CatalogBatch { database_id: db_schema.id, database_name: Arc::clone(&db_schema.name), time_ns: 5000, @@ -730,11 +733,11 @@ mod tests { #[tokio::test] /// This test makes sure that the logic for offset lists works - async fn persist_and_load_over_9000_snapshot_info_files() { + async fn persist_and_load_over_1000_snapshot_info_files() { let local_disk = LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap(); let persister = Persister::new(Arc::new(local_disk), "test_host"); - for id in 0..9001 { + for id in 0..1001 { let info_file = PersistedSnapshot { host_id: "test_host".to_string(), next_file_id: ParquetFileId::from(id), @@ -754,11 +757,11 @@ mod tests { } let snapshots = persister.load_snapshots(9500).await.unwrap(); // We asked for the most recent 9500 so there should be 9001 of them - assert_eq!(snapshots.len(), 9001); - assert_eq!(snapshots[0].next_file_id.as_u64(), 9000); - assert_eq!(snapshots[0].wal_file_sequence_number.as_u64(), 9000); - assert_eq!(snapshots[0].snapshot_sequence_number.as_u64(), 9000); - assert_eq!(snapshots[0].catalog_sequence_number.as_u32(), 9000); + assert_eq!(snapshots.len(), 1001); + assert_eq!(snapshots[0].next_file_id.as_u64(), 1000); + assert_eq!(snapshots[0].wal_file_sequence_number.as_u64(), 1000); + assert_eq!(snapshots[0].snapshot_sequence_number.as_u64(), 1000); + assert_eq!(snapshots[0].catalog_sequence_number.as_u32(), 1000); } #[tokio::test] diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 243a7a8aeca..9b78ea646f3 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -498,21 +498,23 @@ impl MetaCacheManager for WriteBufferImpl { cache_name: Option, args: CreateMetaCacheArgs, ) -> Result, Error> { - let table_id = args.table_def.table_id; if let Some(new_cache_definition) = self .meta_cache .create_cache(db_schema.id, cache_name.map(Into::into), args) .map_err(Error::MetaCacheError)? { - self.catalog - .add_meta_cache(db_schema.id, table_id, new_cache_definition.clone()); - let add_meta_cache_batch = influxdb3_wal::create::catalog_batch_op( - db_schema.id, - Arc::clone(&db_schema.name), - self.time_provider.now().timestamp_nanos(), - [CatalogOp::CreateMetaCache(new_cache_definition.clone())], - ); - self.wal.write_ops(vec![add_meta_cache_batch]).await?; + let catalog_op = CatalogOp::CreateMetaCache(new_cache_definition.clone()); + let catalog_batch = CatalogBatch { + database_id: db_schema.id, + database_name: db_schema.name.clone(), + time_ns: self.time_provider.now().timestamp_nanos(), + ops: vec![catalog_op], + }; + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { + self.wal + .write_ops(vec![WalOp::Catalog(catalog_batch)]) + .await?; + } Ok(Some(new_cache_definition)) } else { Ok(None) @@ -528,20 +530,21 @@ impl MetaCacheManager for WriteBufferImpl { let catalog = self.catalog(); let db_schema = catalog.db_schema_by_id(db_id).expect("db should exist"); self.meta_cache.delete_cache(db_id, tbl_id, cache_name)?; - catalog.remove_meta_cache(db_id, tbl_id, cache_name); - - self.wal - .write_ops(vec![influxdb3_wal::create::catalog_batch_op( - *db_id, - Arc::clone(&db_schema.name), - self.time_provider.now().timestamp_nanos(), - [CatalogOp::DeleteMetaCache(MetaCacheDelete { - table_name: db_schema.table_id_to_name(tbl_id).expect("table exists"), - table_id: *tbl_id, - cache_name: cache_name.into(), - })], - )]) - .await?; + let catalog_batch = CatalogBatch { + database_id: *db_id, + database_name: Arc::clone(&db_schema.name), + time_ns: self.time_provider.now().timestamp_nanos(), + ops: vec![CatalogOp::DeleteMetaCache(MetaCacheDelete { + table_name: db_schema.table_id_to_name(tbl_id).expect("table exists"), + table_id: *tbl_id, + cache_name: cache_name.into(), + })], + }; + if let Some(catalog_batch) = catalog.apply_catalog_batch(catalog_batch)? { + self.wal + .write_ops(vec![WalOp::Catalog(catalog_batch)]) + .await?; + } Ok(()) } @@ -597,14 +600,17 @@ impl LastCacheManager for WriteBufferImpl { value_columns, }, )? { - self.catalog.add_last_cache(db_id, table_id, info.clone()); - let add_cache_catalog_batch = WalOp::Catalog(CatalogBatch { + let catalog_batch = CatalogBatch { time_ns: self.time_provider.now().timestamp_nanos(), database_id: db_schema.id, database_name: Arc::clone(&db_schema.name), ops: vec![CreateLastCache(info.clone())], - }); - self.wal.write_ops(vec![add_cache_catalog_batch]).await?; + }; + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { + self.wal + .write_ops(vec![WalOp::Catalog(catalog_batch)]) + .await?; + } Ok(Some(info)) } else { @@ -621,22 +627,25 @@ impl LastCacheManager for WriteBufferImpl { let catalog = self.catalog(); let db_schema = catalog.db_schema_by_id(&db_id).expect("db should exist"); self.last_cache.delete_cache(db_id, tbl_id, cache_name)?; - catalog.delete_last_cache(db_id, tbl_id, cache_name); + + let catalog_batch = CatalogBatch { + time_ns: self.time_provider.now().timestamp_nanos(), + database_id: db_id, + database_name: Arc::clone(&db_schema.name), + ops: vec![CatalogOp::DeleteLastCache(LastCacheDelete { + table_id: tbl_id, + table_name: db_schema.table_id_to_name(&tbl_id).expect("table exists"), + name: cache_name.into(), + })], + }; // NOTE: if this fails then the cache will be gone from the running server, but will be // resurrected on server restart. - self.wal - .write_ops(vec![WalOp::Catalog(CatalogBatch { - time_ns: self.time_provider.now().timestamp_nanos(), - database_id: db_id, - database_name: Arc::clone(&db_schema.name), - ops: vec![CatalogOp::DeleteLastCache(LastCacheDelete { - table_id: tbl_id, - table_name: db_schema.table_id_to_name(&tbl_id).expect("table exists"), - name: cache_name.into(), - })], - })]) - .await?; + if let Some(catalog_batch) = catalog.apply_catalog_batch(catalog_batch)? { + self.wal + .write_ops(vec![WalOp::Catalog(catalog_batch)]) + .await?; + } Ok(()) } @@ -663,10 +672,11 @@ impl DatabaseManager for WriteBufferImpl { deletion_time: deletion_time.timestamp_nanos(), })], }; - self.catalog.apply_catalog_batch(&catalog_batch)?; - let wal_op = WalOp::Catalog(catalog_batch); - self.wal.write_ops(vec![wal_op]).await?; - debug!(db_id = ?db_id, name = ?&db_schema.name, "successfully deleted database"); + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { + let wal_op = WalOp::Catalog(catalog_batch); + self.wal.write_ops(vec![wal_op]).await?; + debug!(db_id = ?db_id, name = ?&db_schema.name, "successfully deleted database"); + } Ok(()) } @@ -700,9 +710,11 @@ impl DatabaseManager for WriteBufferImpl { table_name: Arc::clone(&table_defn.table_name), })], }; - self.catalog.apply_catalog_batch(&catalog_batch)?; - let wal_op = WalOp::Catalog(catalog_batch); - self.wal.write_ops(vec![wal_op]).await?; + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { + self.wal + .write_ops(vec![WalOp::Catalog(catalog_batch)]) + .await?; + } debug!( db_id = ?db_id, db_name = ?&db_schema.name, @@ -745,9 +757,10 @@ impl ProcessingEngineManager for WriteBufferImpl { database_name: Arc::clone(&db_schema.name), ops: vec![catalog_op], }; - self.catalog.apply_catalog_batch(&catalog_batch)?; - let wal_op = WalOp::Catalog(catalog_batch); - self.wal.write_ops(vec![wal_op]).await?; + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { + let wal_op = WalOp::Catalog(catalog_batch); + self.wal.write_ops(vec![wal_op]).await?; + } Ok(()) } @@ -783,9 +796,10 @@ impl ProcessingEngineManager for WriteBufferImpl { database_name: Arc::clone(&db_schema.name), ops: vec![catalog_op], }; - self.catalog.apply_catalog_batch(&catalog_batch)?; - let wal_op = WalOp::Catalog(catalog_batch); - self.wal.write_ops(vec![wal_op]).await?; + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { + let wal_op = WalOp::Catalog(catalog_batch); + self.wal.write_ops(vec![wal_op]).await?; + } Ok(()) } @@ -1610,6 +1624,9 @@ mod tests { assert_eq!(ParquetFileId::next_id().as_u64(), 6); } + #[tokio::test] + async fn catalog_ops_correctly_ordered() {} + /// This is the reproducer for [#25277][see] /// /// [see]: https://github.com/influxdata/influxdb/issues/25277 diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 12d973fb29a..1e6cc4fabf1 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -456,11 +456,13 @@ impl BufferState { match op { WalOp::Write(write_batch) => self.add_write_batch(write_batch), WalOp::Catalog(catalog_batch) => { - self.catalog - // just catalog level changes - .apply_catalog_batch(&catalog_batch) - .expect("catalog batch should apply"); - + let Some(catalog_batch) = self + .catalog + .apply_ordered_catalog_batch(catalog_batch) + .expect("should be able to reapply") + else { + continue; + }; let db_schema = self .catalog .db_schema_by_id(&catalog_batch.database_id) diff --git a/influxdb3_write/src/write_buffer/validator.rs b/influxdb3_write/src/write_buffer/validator.rs index 68c86559b91..8354c944e41 100644 --- a/influxdb3_write/src/write_buffer/validator.rs +++ b/influxdb3_write/src/write_buffer/validator.rs @@ -9,8 +9,8 @@ use influxdb3_catalog::catalog::{ use influxdb3_id::{ColumnId, TableId}; use influxdb3_wal::{ - CatalogBatch, CatalogOp, Field, FieldAdditions, FieldData, FieldDefinition, Gen1Duration, Row, - TableChunks, WriteBatch, + CatalogBatch, CatalogOp, Field, FieldAdditions, FieldData, FieldDefinition, Gen1Duration, + OrderedCatalogBatch, Row, TableChunks, WriteBatch, }; use influxdb_line_protocol::{parse_lines, v3, ParsedLine}; use iox_time::Time; @@ -31,7 +31,7 @@ pub struct WithCatalog { pub struct LinesParsed { catalog: WithCatalog, lines: Vec, - catalog_batch: Option, + catalog_batch: Option, errors: Vec, } @@ -136,8 +136,7 @@ impl WriteValidator { time_ns: self.state.time_now_ns, ops: catalog_updates, }; - self.state.catalog.apply_catalog_batch(&catalog_batch)?; - Some(catalog_batch) + self.state.catalog.apply_catalog_batch(catalog_batch)? }; Ok(WriteValidator { @@ -222,8 +221,7 @@ impl WriteValidator { database_name: Arc::clone(&self.state.db_schema.name), ops: catalog_updates, }; - self.state.catalog.apply_catalog_batch(&catalog_batch)?; - Some(catalog_batch) + self.state.catalog.apply_catalog_batch(catalog_batch)? }; Ok(WriteValidator { @@ -403,7 +401,13 @@ fn validate_and_qualify_v3_line( line_number: line_number + 1, error_message: e.to_string(), })?; - db_schema.insert_table(table_id, Arc::new(new_table_def)); + db_schema + .insert_table(table_id, Arc::new(new_table_def)) + .map_err(|e| WriteLineError { + original_line: raw_line.to_string(), + line_number: line_number + 1, + error_message: e.to_string(), + })?; } QualifiedLine { table_id, @@ -475,10 +479,23 @@ fn validate_and_qualify_v3_line( catalog_op = Some(table_definition_op); let db_schema = db_schema.to_mut(); - assert!( - db_schema.insert_table(table_id, Arc::new(table)).is_none(), - "attempted to overwrite existing table" - ); + db_schema + .insert_table(table_id, Arc::new(table)) + .map_err(|e| WriteLineError { + original_line: raw_line.to_string(), + line_number: line_number + 1, + error_message: e.to_string(), + })? + .map_or_else( + || Ok(()), + |_| { + Err(WriteLineError { + original_line: raw_line.to_string(), + line_number: line_number + 1, + error_message: "unexpected overwrite of existing table".to_string(), + }) + }, + )?; QualifiedLine { table_id, row: Row { @@ -611,7 +628,13 @@ fn validate_and_qualify_v1_line( line_number: line_number + 1, error_message: e.to_string(), })?; - db_schema.insert_table(table_id, Arc::new(new_table_def)); + db_schema + .insert_table(table_id, Arc::new(new_table_def)) + .map_err(|e| WriteLineError { + original_line: line.to_string(), + line_number: line_number + 1, + error_message: e.to_string(), + })?; catalog_op = Some(CatalogOp::AddFields(FieldAdditions { database_name, @@ -686,10 +709,23 @@ fn validate_and_qualify_v1_line( let table = TableDefinition::new(table_id, Arc::clone(&table_name), columns, key).unwrap(); let db_schema = db_schema.to_mut(); - assert!( - db_schema.insert_table(table_id, Arc::new(table)).is_none(), - "attempted to overwrite existing table" - ); + db_schema + .insert_table(table_id, Arc::new(table)) + .map_err(|e| WriteLineError { + original_line: line.to_string(), + line_number: line_number + 1, + error_message: e.to_string(), + })? + .map_or_else( + || Ok(()), + |_| { + Err(WriteLineError { + original_line: line.to_string(), + line_number: line_number + 1, + error_message: "unexpected overwrite of existing table".to_string(), + }) + }, + )?; QualifiedLine { table_id, row: Row { @@ -719,7 +755,7 @@ pub struct ValidatedLines { /// Only valid lines will be converted into a WriteBatch pub(crate) valid_data: WriteBatch, /// If any catalog updates were made, they will be included here - pub(crate) catalog_updates: Option, + pub(crate) catalog_updates: Option, } impl From for WriteBatch {