Skip to content

Commit

Permalink
primary key check for schemas
Browse files Browse the repository at this point in the history
  • Loading branch information
manuel-rubio committed Aug 16, 2023
1 parent f6c7484 commit b62b759
Show file tree
Hide file tree
Showing 3 changed files with 331 additions and 46 deletions.
105 changes: 63 additions & 42 deletions src/ream/storage/schema.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,34 @@ pub type Schema {
)
}

fn check_table(
fields: List(#(FieldId, table.Field)),
primary_key_ids: List(FieldId),
) -> Result(Nil, table.DataError) {
case primary_key_ids {
[] -> Ok(Nil)
[pk_id, ..pk_ids] -> {
case list.key_find(fields, pk_id) {
Ok(field) ->
case field.nilable {
True -> Error(table.PrimaryKeyCannotBeNull(field))
False -> check_table(fields, pk_ids)
}
Error(Nil) -> Error(table.PrimaryKeyRefInvalid(pk_id))
}
}
}
}

pub fn create(
table: Table,
max_memtable_size: Int,
max_memtables_loaded: Int,
max_value_size: Int,
path: String,
) -> Schema {
) -> Result(Schema, table.DataError) {
let check_data = list.map(table.fields, fn(field) { #(field.id, field) })
use _ <- try(check_table(check_data, table.primary_key))
let path = fs.join([path, "schema", table.name])
let assert Ok(True) = fs.recursive_make_directory(path)

Expand All @@ -64,7 +85,7 @@ pub fn create(
let value_dir = fs.join([path, "value"])
let value_index = index.load(value_dir, max_value_size)

Schema(
Ok(Schema(
path,
table,
ranges,
Expand All @@ -73,7 +94,7 @@ pub fn create(
max_memtables_loaded,
max_memtable_size,
max_value_size,
)
))
}

pub fn open(
Expand Down Expand Up @@ -126,42 +147,43 @@ pub fn flush(schema: Schema) -> Result(Nil, Nil) {
Ok(Nil)
}

fn primary_key_match(
primary_key_ids: List(FieldId),
cells: List(table.DataSet),
acc: List(DataType),
) -> Result(List(DataType), table.DataError) {
case cells {
[] -> Ok(list.reverse(acc))
[cell, ..rest_cells] -> {
case list.contains(primary_key_ids, cell.field.id) {
True ->
primary_key_match(primary_key_ids, rest_cells, [cell.data, ..acc])
False -> primary_key_match(primary_key_ids, rest_cells, acc)
}
}
}
}

fn to_bitstring(entries: List(DataType)) -> BitString {
list.fold(
entries,
<<>>,
fn(acc, entry) {
let entry_bitstring = data_type.to_bitstring(entry)
<<acc:bit_string, entry_bitstring:bit_string>>
},
)
}

pub fn insert(
schema: Schema,
data: table.Row,
) -> Result(Schema, table.DataError) {
use row <- try(table.match_fields(schema.table, data))
// TODO: it should fail if the value for a PK isn't defined
let primary_key_ids = schema.table.primary_key
let primary_key =
list.filter_map(
row,
fn(cell) {
case list.contains(primary_key_ids, cell.field.id) {
True -> Ok(cell.data)
False -> Error(Nil)
}
},
)
let primary_key_bitstring =
list.fold(
primary_key,
<<>>,
fn(acc, pk) {
let pk = data_type.to_bitstring(pk)
<<acc:bit_string, pk:bit_string>>
},
)
let row_data = list.map(row, fn(cell) { cell.data })
let row_data_bitstring =
list.fold(
row_data,
<<>>,
fn(acc, data) {
let data = data_type.to_bitstring(data)
<<acc:bit_string, data:bit_string>>
},
)
use cells <- try(table.match_fields(schema.table, data))
use primary_key <- try(primary_key_match(schema.table.primary_key, cells, []))
let primary_key_bitstring = to_bitstring(primary_key)
let row_data = list.map(cells, fn(cell) { cell.data })
let row_data_bitstring = to_bitstring(row_data)

let key_hash = memtable.hash(primary_key_bitstring)
let #(range_id, schema) = find_range(schema, key_hash)
Expand All @@ -182,15 +204,15 @@ pub fn insert(
row_data_bitstring,
)
{
Ok(schema) -> schema
Ok(schema) -> {
let assert Ok(schema) = delete_value(schema, old_value)
Ok(schema)
}
Error(CapacityExceeded) -> {
let schema = split(schema, key_hash, range_id, range, memtable)
let assert Ok(schema) = insert(schema, data)
schema
insert(schema, data)
}
}
let assert Ok(schema) = delete_value(schema, old_value)
Ok(schema)
}
Error(Nil) -> {
// key isn't in the index yet, insert it as a new key
Expand All @@ -207,8 +229,7 @@ pub fn insert(
Ok(schema) -> Ok(schema)
Error(CapacityExceeded) -> {
let schema = split(schema, key_hash, range_id, range, memtable)
let assert Ok(schema) = insert(schema, data)
Ok(schema)
insert(schema, data)
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/ream/storage/schema/table.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub type DataError {
FieldNotFound(String)
UnmatchFieldType(FieldType, DataType)
FieldCannotBeNull(Field)
PrimaryKeyCannotBeNull(Field)
PrimaryKeyRefInvalid(FieldId)
}

pub type FieldId =
Expand Down
Loading

0 comments on commit b62b759

Please sign in to comment.