Skip to content

Commit

Permalink
fix: do not count deleted dbs and tables toward limit
Browse files Browse the repository at this point in the history
  • Loading branch information
hiltontj committed Dec 21, 2024
1 parent 0db71b6 commit 12eeeb3
Showing 1 changed file with 266 additions and 9 deletions.
275 changes: 266 additions & 9 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use parking_lot::RwLock;
use schema::{InfluxColumnType, InfluxFieldType, Schema, SchemaBuilder};
use serde::{Deserialize, Serialize, Serializer};
use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::sync::Arc;
use thiserror::Error;
Expand Down Expand Up @@ -211,7 +212,7 @@ impl Catalog {
None => {
let mut inner = self.inner.write();

if inner.databases.len() >= Self::NUM_DBS_LIMIT {
if inner.database_count() >= Self::NUM_DBS_LIMIT {
return Err(Error::TooManyDbs);
}

Expand Down Expand Up @@ -414,8 +415,12 @@ impl InnerCatalog {
self.sequence
}

pub fn database_count(&self) -> usize {
self.databases.iter().filter(|db| !db.1.deleted).count()
}

pub fn table_count(&self) -> usize {
self.databases.values().map(|db| db.tables.len()).sum()
self.databases.values().map(|db| db.table_count()).sum()
}

/// Applies the `CatalogBatch` while validating that all updates are compatible. If updates
Expand All @@ -434,7 +439,7 @@ impl InnerCatalog {
return Ok(None);
}
} else {
if self.databases.len() >= Catalog::NUM_DBS_LIMIT {
if self.database_count() >= Catalog::NUM_DBS_LIMIT {
return Err(Error::TooManyDbs);
}
let new_db = DatabaseSchema::new_from_batch(&catalog_batch)?;
Expand Down Expand Up @@ -471,15 +476,22 @@ fn check_overall_table_count(
current_table_count: usize,
) -> Result<()> {
let existing_table_count = if let Some(existing_db) = existing_db {
existing_db.tables.len()
existing_db.table_count()
} else {
0
};
let newly_added_table_count = new_db.tables.len() - existing_table_count;
if current_table_count + newly_added_table_count > Catalog::NUM_TABLES_LIMIT {
return Err(Error::TooManyTables);
let new_table_count = new_db.table_count();
match new_table_count.cmp(&existing_table_count) {
Ordering::Less | Ordering::Equal => Ok(()),
Ordering::Greater => {
let newly_added_table_count = new_db.table_count() - existing_table_count;
if current_table_count + newly_added_table_count > Catalog::NUM_TABLES_LIMIT {
Err(Error::TooManyTables)
} else {
Ok(())
}
}
}
Ok(())
}

#[derive(Debug, Eq, PartialEq, Clone)]
Expand Down Expand Up @@ -508,6 +520,10 @@ impl DatabaseSchema {
}
}

pub fn table_count(&self) -> usize {
self.tables.iter().filter(|table| !table.1.deleted).count()
}

/// Validates the updates in the `CatalogBatch` are compatible with this schema. If
/// everything is compatible and there are no updates to the existing schema, None will be
/// returned, otherwise a new `DatabaseSchema` will be returned with the updates applied.
Expand Down Expand Up @@ -1250,7 +1266,7 @@ pub fn influx_column_type_from_field_value(fv: &FieldValue<'_>) -> InfluxColumnT
mod tests {
use super::*;
use influxdb3_wal::CatalogOp::CreateTable;
use influxdb3_wal::{create, FieldDataType, WalOp};
use influxdb3_wal::{create, DatabaseDefinition, FieldDataType, WalOp};
use pretty_assertions::assert_eq;
use test_helpers::assert_contains;

Expand Down Expand Up @@ -1837,4 +1853,245 @@ mod tests {
assert_eq!(original_table, replayed_table);
Ok(())
}

#[test]
fn deleted_dbs_dont_count() {
let catalog = Catalog::new(Arc::from("test"), Arc::from("test-instance"));

let mut dbs: Vec<(DbId, String)> = vec![];
for _ in 0..Catalog::NUM_DBS_LIMIT {
let db_id = DbId::new();
let db_name = format!("test-db-{db_id}");
catalog
.apply_catalog_batch(influxdb3_wal::create::catalog_batch(
db_id,
db_name.as_str(),
0,
[influxdb3_wal::create::create_table_op(
db_id,
db_name.as_str(),
TableId::new(),
"test-table",
[
influxdb3_wal::create::field_def(
ColumnId::new(),
"field",
FieldDataType::String,
),
influxdb3_wal::create::field_def(
ColumnId::new(),
"time",
FieldDataType::Timestamp,
),
],
[],
)],
))
.unwrap();
dbs.push((db_id, db_name));
}
// check the count of databases:
assert_eq!(5, catalog.inner.read().database_count());
// now create another database, this should NOT be allowed:
let db_id = DbId::new();
let db_name = "a-database-too-far";
catalog
.apply_catalog_batch(influxdb3_wal::create::catalog_batch(
db_id,
db_name,
0,
[influxdb3_wal::create::create_table_op(
db_id,
db_name,
TableId::new(),
"test-table",
[
influxdb3_wal::create::field_def(
ColumnId::new(),
"field",
FieldDataType::String,
),
influxdb3_wal::create::field_def(
ColumnId::new(),
"time",
FieldDataType::Timestamp,
),
],
[],
)],
))
.expect_err("should not be able to create more than 5 databases");
// now delete a database:
let (db_id, db_name) = dbs.pop().unwrap();
catalog
.apply_catalog_batch(influxdb3_wal::create::catalog_batch(
db_id,
db_name.as_str(),
1,
[CatalogOp::DeleteDatabase(DeleteDatabaseDefinition {
database_id: db_id,
database_name: db_name.as_str().into(),
deletion_time: 1,
})],
))
.unwrap();
// check again, count should have gone down:
assert_eq!(4, catalog.inner.read().database_count());
// now create another database (using same name as the deleted one), this should be allowed:
let db_id = DbId::new();
catalog
.apply_catalog_batch(influxdb3_wal::create::catalog_batch(
db_id,
db_name.as_str(),
0,
[influxdb3_wal::create::create_table_op(
db_id,
db_name.as_str(),
TableId::new(),
"test-table",
[
influxdb3_wal::create::field_def(
ColumnId::new(),
"field",
FieldDataType::String,
),
influxdb3_wal::create::field_def(
ColumnId::new(),
"time",
FieldDataType::Timestamp,
),
],
[],
)],
))
.expect("can create a database again");
// check new count:
assert_eq!(5, catalog.inner.read().database_count());
}

#[test]
fn deleted_tables_dont_count() {
let catalog = Catalog::new(Arc::from("test"), Arc::from("test-instance"));
// create a database:
let db_id = DbId::new();
let db_name = "test-db";
catalog
.apply_catalog_batch(influxdb3_wal::create::catalog_batch(
db_id,
db_name,
0,
[CatalogOp::CreateDatabase(DatabaseDefinition {
database_id: db_id,
database_name: db_name.into(),
})],
))
.unwrap();
let mut tables: Vec<(TableId, Arc<str>)> = vec![];
for i in 0..Catalog::NUM_TABLES_LIMIT {
let table_id = TableId::new();
let table_name = Arc::<str>::from(format!("test-table-{i}").as_str());
catalog
.apply_catalog_batch(influxdb3_wal::create::catalog_batch(
db_id,
db_name,
0,
[influxdb3_wal::create::create_table_op(
db_id,
db_name,
table_id,
Arc::clone(&table_name),
[
influxdb3_wal::create::field_def(
ColumnId::new(),
"field",
FieldDataType::String,
),
influxdb3_wal::create::field_def(
ColumnId::new(),
"time",
FieldDataType::Timestamp,
),
],
[],
)],
))
.unwrap();
tables.push((table_id, table_name));
}
assert_eq!(2_000, catalog.inner.read().table_count());
// should not be able to create another table:
let table_id = TableId::new();
let table_name = Arc::<str>::from("a-table-too-far");
catalog
.apply_catalog_batch(influxdb3_wal::create::catalog_batch(
db_id,
db_name,
0,
[influxdb3_wal::create::create_table_op(
db_id,
db_name,
table_id,
Arc::clone(&table_name),
[
influxdb3_wal::create::field_def(
ColumnId::new(),
"field",
FieldDataType::String,
),
influxdb3_wal::create::field_def(
ColumnId::new(),
"time",
FieldDataType::Timestamp,
),
],
[],
)],
))
.expect_err("should not be able to exceed table limit");
// delete a table
let (table_id, table_name) = tables.pop().unwrap();
catalog
.apply_catalog_batch(influxdb3_wal::create::catalog_batch(
db_id,
db_name,
0,
[CatalogOp::DeleteTable(DeleteTableDefinition {
database_id: db_id,
database_name: db_name.into(),
table_id,
table_name: Arc::clone(&table_name),
deletion_time: 0,
})],
))
.unwrap();
assert_eq!(1_999, catalog.inner.read().table_count());
// now create it again, this should be allowed:
catalog
.apply_catalog_batch(influxdb3_wal::create::catalog_batch(
db_id,
db_name,
0,
[influxdb3_wal::create::create_table_op(
db_id,
db_name,
TableId::new(),
Arc::clone(&table_name),
[
influxdb3_wal::create::field_def(
ColumnId::new(),
"field",
FieldDataType::String,
),
influxdb3_wal::create::field_def(
ColumnId::new(),
"time",
FieldDataType::Timestamp,
),
],
[],
)],
))
.unwrap();
assert_eq!(2_000, catalog.inner.read().table_count());
}
}

0 comments on commit 12eeeb3

Please sign in to comment.