From b62b759acdc4e8e749eb29a8b4c1caf31c1e96fc Mon Sep 17 00:00:00 2001 From: Manuel Rubio Date: Wed, 16 Aug 2023 19:20:12 +0200 Subject: [PATCH] primary key check for schemas --- src/ream/storage/schema.gleam | 105 ++++++----- src/ream/storage/schema/table.gleam | 2 + test/ream/storage/schema_test.gleam | 270 +++++++++++++++++++++++++++- 3 files changed, 331 insertions(+), 46 deletions(-) diff --git a/src/ream/storage/schema.gleam b/src/ream/storage/schema.gleam index 8473f74..5f3826b 100644 --- a/src/ream/storage/schema.gleam +++ b/src/ream/storage/schema.gleam @@ -41,13 +41,34 @@ pub type Schema { ) } +fn check_table( + fields: List(#(FieldId, table.Field)), + primary_key_ids: List(FieldId), +) -> Result(Nil, table.DataError) { + case primary_key_ids { + [] -> Ok(Nil) + [pk_id, ..pk_ids] -> { + case list.key_find(fields, pk_id) { + Ok(field) -> + case field.nilable { + True -> Error(table.PrimaryKeyCannotBeNull(field)) + False -> check_table(fields, pk_ids) + } + Error(Nil) -> Error(table.PrimaryKeyRefInvalid(pk_id)) + } + } + } +} + pub fn create( table: Table, max_memtable_size: Int, max_memtables_loaded: Int, max_value_size: Int, path: String, -) -> Schema { +) -> Result(Schema, table.DataError) { + let check_data = list.map(table.fields, fn(field) { #(field.id, field) }) + use _ <- try(check_table(check_data, table.primary_key)) let path = fs.join([path, "schema", table.name]) let assert Ok(True) = fs.recursive_make_directory(path) @@ -64,7 +85,7 @@ pub fn create( let value_dir = fs.join([path, "value"]) let value_index = index.load(value_dir, max_value_size) - Schema( + Ok(Schema( path, table, ranges, @@ -73,7 +94,7 @@ pub fn create( max_memtables_loaded, max_memtable_size, max_value_size, - ) + )) } pub fn open( @@ -126,42 +147,43 @@ pub fn flush(schema: Schema) -> Result(Nil, Nil) { Ok(Nil) } +fn primary_key_match( + primary_key_ids: List(FieldId), + cells: List(table.DataSet), + acc: List(DataType), +) -> Result(List(DataType), table.DataError) { + case cells { + [] -> Ok(list.reverse(acc)) + [cell, ..rest_cells] -> { + case list.contains(primary_key_ids, cell.field.id) { + True -> + primary_key_match(primary_key_ids, rest_cells, [cell.data, ..acc]) + False -> primary_key_match(primary_key_ids, rest_cells, acc) + } + } + } +} + +fn to_bitstring(entries: List(DataType)) -> BitString { + list.fold( + entries, + <<>>, + fn(acc, entry) { + let entry_bitstring = data_type.to_bitstring(entry) + <> + }, + ) +} + pub fn insert( schema: Schema, data: table.Row, ) -> Result(Schema, table.DataError) { - use row <- try(table.match_fields(schema.table, data)) - // TODO: it should fail if the value for a PK isn't defined - let primary_key_ids = schema.table.primary_key - let primary_key = - list.filter_map( - row, - fn(cell) { - case list.contains(primary_key_ids, cell.field.id) { - True -> Ok(cell.data) - False -> Error(Nil) - } - }, - ) - let primary_key_bitstring = - list.fold( - primary_key, - <<>>, - fn(acc, pk) { - let pk = data_type.to_bitstring(pk) - <> - }, - ) - let row_data = list.map(row, fn(cell) { cell.data }) - let row_data_bitstring = - list.fold( - row_data, - <<>>, - fn(acc, data) { - let data = data_type.to_bitstring(data) - <> - }, - ) + use cells <- try(table.match_fields(schema.table, data)) + use primary_key <- try(primary_key_match(schema.table.primary_key, cells, [])) + let primary_key_bitstring = to_bitstring(primary_key) + let row_data = list.map(cells, fn(cell) { cell.data }) + let row_data_bitstring = to_bitstring(row_data) let key_hash = memtable.hash(primary_key_bitstring) let #(range_id, schema) = find_range(schema, key_hash) @@ -182,15 +204,15 @@ pub fn insert( row_data_bitstring, ) { - Ok(schema) -> schema + Ok(schema) -> { + let assert Ok(schema) = delete_value(schema, old_value) + Ok(schema) + } Error(CapacityExceeded) -> { let schema = split(schema, key_hash, range_id, range, memtable) - let assert Ok(schema) = insert(schema, data) - schema + insert(schema, data) } } - let assert Ok(schema) = delete_value(schema, old_value) - Ok(schema) } Error(Nil) -> { // key isn't in the index yet, insert it as a new key @@ -207,8 +229,7 @@ pub fn insert( Ok(schema) -> Ok(schema) Error(CapacityExceeded) -> { let schema = split(schema, key_hash, range_id, range, memtable) - let assert Ok(schema) = insert(schema, data) - Ok(schema) + insert(schema, data) } } } diff --git a/src/ream/storage/schema/table.gleam b/src/ream/storage/schema/table.gleam index 959d43e..d3c178a 100644 --- a/src/ream/storage/schema/table.gleam +++ b/src/ream/storage/schema/table.gleam @@ -27,6 +27,8 @@ pub type DataError { FieldNotFound(String) UnmatchFieldType(FieldType, DataType) FieldCannotBeNull(Field) + PrimaryKeyCannotBeNull(Field) + PrimaryKeyRefInvalid(FieldId) } pub type FieldId = diff --git a/test/ream/storage/schema_test.gleam b/test/ream/storage/schema_test.gleam index 47056d9..52b8cbd 100644 --- a/test/ream/storage/schema_test.gleam +++ b/test/ream/storage/schema_test.gleam @@ -6,10 +6,124 @@ import ream/storage/file as fs const base_path = "build/schema_test/" +const max_memtable_size = 4096 + +const max_memtables_loaded = 5 + +const max_value_size = 4096 + pub fn create_test() { let path = fs.join([base_path, "create_test"]) let _ = file.recursive_delete(path) + let accounts_table = + Table( + name: "accounts", + fields: [ + Field(1, "id", table.Integer, False), + Field(2, "name", table.String(table.Size(150)), False), + Field(3, "credit", table.Decimal, True), + Field(4, "debit", table.Decimal, True), + Field(5, "balance", table.Decimal, False), + Field(6, "inserted_at", table.Timestamp, False), + ], + primary_key: [1], + indexes: [], + ) + + let users_table = + Table( + name: "users", + fields: [ + Field(1, "id", table.Integer, False), + Field(2, "name", table.String(table.Size(50)), False), + Field(3, "password", table.String(table.Size(50)), False), + Field(4, "role", table.String(table.Size(50)), False), + Field(5, "inserted_at", table.Timestamp, False), + ], + primary_key: [1], + indexes: [table.Unique([2])], + ) + + let assert Ok(accounts) = + schema.create( + accounts_table, + max_memtable_size, + max_memtables_loaded, + max_value_size, + path, + ) + + let assert Ok(users) = + schema.create( + users_table, + max_memtable_size, + max_memtables_loaded, + max_value_size, + path, + ) + + let assert Ok(Nil) = schema.close(accounts) + let assert Ok(Nil) = schema.close(users) + + let assert True = + Ok(accounts_table) == table.load(fs.join([path, "schema/accounts/meta"])) + + let assert True = + Ok(users_table) == table.load(fs.join([path, "schema/users/meta"])) +} + +pub fn wrong_pk_create_test() { + let path = fs.join([base_path, "wrong_pk_create_test"]) + + let table = + Table( + name: "wrong", + fields: [Field(1, "id", table.Integer, True)], + primary_key: [1], + indexes: [], + ) + + let assert Error(table.PrimaryKeyCannotBeNull(Field( + 1, + "id", + table.Integer, + True, + ))) = + schema.create( + table, + max_memtable_size, + max_memtables_loaded, + max_value_size, + path, + ) +} + +pub fn wrong_ref_create_test() { + let path = fs.join([base_path, "wrong_pk_create_test"]) + + let table = + Table( + name: "ref", + fields: [Field(1, "id", table.Integer, False)], + primary_key: [2], + indexes: [], + ) + + let assert Error(table.PrimaryKeyRefInvalid(2)) = + schema.create( + table, + max_memtable_size, + max_memtables_loaded, + max_value_size, + path, + ) +} + +pub fn insert_test() { + let path = fs.join([base_path, "insert_test"]) + let _ = file.recursive_delete(path) + let table = Table( name: "accounts", @@ -24,11 +138,159 @@ pub fn create_test() { primary_key: [1], indexes: [], ) - let max_memtable_size = 4096 - let max_memtables_loaded = 5 - let max_value_size = 4096 - let accounts = + let assert Ok(accounts) = + schema.create( + table, + max_memtable_size, + max_memtables_loaded, + max_value_size, + path, + ) + + let assert Ok(accounts) = + schema.insert( + accounts, + [ + #("id", data_type.Integer(1)), + #("name", data_type.String("Bank")), + #("debit", data_type.Decimal(10_000, 2)), + #("balance", data_type.Decimal(10_000, 2)), + #("inserted_at", data_type.Timestamp(1_690_785_424_366_972)), + ], + ) + + let assert Ok(accounts) = + schema.insert( + accounts, + [ + #("id", data_type.Integer(1)), + #("name", data_type.String("Active")), + #("balance", data_type.Decimal(100, 0)), + #("inserted_at", data_type.Timestamp(1_690_785_424_366_000)), + ], + ) + + let assert #( + Ok([ + [ + data_type.Integer(1), + data_type.String("Active"), + data_type.Null, + data_type.Null, + data_type.Decimal(100, 0), + data_type.Timestamp(1_690_785_424_366_000), + ], + ]), + accounts, + ) = + schema.find( + accounts, + schema.Equal(schema.Field(1), schema.Literal(data_type.Integer(1))), + ) + + let assert Ok(Nil) = schema.close(accounts) +} + +pub fn invalid_primary_key_test() { + let path = fs.join([base_path, "invalid_pk_test"]) + let _ = file.recursive_delete(path) + + let table = + Table( + name: "accounts", + fields: [ + Field(1, "type", table.String(table.Size(30)), False), + Field(2, "class", table.String(table.Size(30)), False), + Field(3, "name", table.String(table.Size(150)), False), + Field(4, "value", table.Decimal, False), + Field(5, "inserted_at", table.Timestamp, False), + ], + primary_key: [1, 2], + indexes: [], + ) + + let assert Ok(accounts) = + schema.create( + table, + max_memtable_size, + max_memtables_loaded, + max_value_size, + path, + ) + + let assert Error(table.FieldCannotBeNull(table.Field( + id: 1, + name: "type", + field_type: table.String(table.Size(30)), + nilable: False, + ))) = + schema.insert( + accounts, + [ + #("name", data_type.String("1984")), + #("value", data_type.Decimal(1000, 2)), + #("inserted_at", data_type.Timestamp(1_690_785_424_366_972)), + ], + ) + + let assert Error(table.FieldCannotBeNull(table.Field( + id: 2, + name: "class", + field_type: table.String(table.Size(30)), + nilable: False, + ))) = + schema.insert( + accounts, + [ + #("type", data_type.String("Books")), + #("class", data_type.Null), + #("name", data_type.String("1984")), + #("value", data_type.Decimal(1000, 2)), + #("inserted_at", data_type.Timestamp(1_690_785_424_366_972)), + ], + ) + + let assert Error(table.FieldCannotBeNull(table.Field( + id: 1, + name: "type", + field_type: table.String(table.Size(30)), + nilable: False, + ))) = + schema.insert( + accounts, + [ + #("type", data_type.Null), + #("class", data_type.String("Fantasy")), + #("name", data_type.String("1984")), + #("value", data_type.Decimal(1000, 2)), + #("inserted_at", data_type.Timestamp(1_690_785_424_366_972)), + ], + ) + + let assert Ok(Nil) = schema.close(accounts) +} + +pub fn create_insert_close_open_find_and_close_test() { + let path = fs.join([base_path, "full_test"]) + let _ = file.recursive_delete(path) + + let table = + Table( + name: "accounts", + fields: [ + Field(1, "id", table.Integer, False), + Field(2, "name", table.String(table.Size(150)), False), + Field(3, "credit", table.Decimal, True), + Field(4, "debit", table.Decimal, True), + Field(5, "balance", table.Decimal, False), + Field(6, "inserted_at", table.Timestamp, False), + ], + primary_key: [1], + indexes: [], + ) + + let assert Ok(accounts) = schema.create( table, max_memtable_size,