Skip to content

Commit

Permalink
Add a new md_add_new_row_to_sorted_list function to insert a row into…
Browse files Browse the repository at this point in the history
… a list column's target table in the correctly sorted position so we do not need to re-sort later. (#44)
  • Loading branch information
jkoritzinsky authored Nov 30, 2023
1 parent 0ad79bf commit 6fd8b66
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 30 deletions.
3 changes: 3 additions & 0 deletions src/dnmd/deltas.c
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ static bool process_log(mdcxt_t* cxt, mdcxt_t* delta)
if (!md_token_to_cursor(cxt, tk, &parent))
return false;

// We don't use md_add_new_row_to_sorted_list here because we don't know the value of the Sequence column
// until we process the next record in the EncLog.
// We'll re-sort the list after we process the next entry in the EncLog.
if (!add_list_target_row(parent, mdtMethodDef_ParamList))
return false;
break;
Expand Down
156 changes: 126 additions & 30 deletions src/dnmd/write.c
Original file line number Diff line number Diff line change
Expand Up @@ -691,51 +691,37 @@ bool md_append_row(mdhandle_t handle, mdtable_id_t table_id, mdcursor_t* new_row
return append_row(table, new_row);
}

bool md_add_new_row_to_list(mdcursor_t list_owner, col_index_t list_col, mdcursor_t* new_row)
static bool add_new_row_to_list(mdcursor_t list_owner, col_index_t list_col, mdcursor_t row_to_insert_before, mdcursor_t* new_row)
{
if (!col_points_to_list(&list_owner, list_col))
return false;

assert(col_points_to_list(&list_owner, list_col));
// Get the range of rows already in the parent's child list.
// If we have an indirection table already, we will get back a range in the indirection table here.
mdcursor_t existing_range;
mdcursor_t range;
uint32_t count;
if (!md_get_column_value_as_range(list_owner, list_col, &existing_range, &count))
return false;

if (CursorTable(&existing_range)->cxt == NULL)
{
// If we don't have a table to add the row to, create one.
if (!allocate_new_table(CursorTable(&list_owner)->cxt, CursorTable(&existing_range)->table_id))
return false;

// Now that we have a table, we recreate the "existing range" cursor as the one-past-the-end cursor
// This allows us to use the remaining logic unchanged.
existing_range = create_cursor(CursorTable(&existing_range), 1);
}

mdcursor_t row_after_range = existing_range;
// Move the cursor just past the end of the range. We'll insert a row at the end of the range.
if (!md_cursor_move(&row_after_range, count))
if (!md_get_column_value_as_range(list_owner, list_col, &range, &count))
return false;

// Assert that the insertion location is in our range.
assert(CursorTable(&range) == CursorTable(&row_to_insert_before));
assert(CursorRow(&range) <= CursorRow(&row_to_insert_before) && CursorRow(&row_to_insert_before) <= CursorRow(&range) + count);

mdcursor_t target_row;
// If the range is in an indirection table, we'll normalize our insert to the actual target table.
if (!md_resolve_indirect_cursor(row_after_range, &target_row))
if (!md_resolve_indirect_cursor(row_to_insert_before, &target_row))
return false;

if (CursorTable(&row_after_range) != CursorTable(&target_row))
if (CursorTable(&row_to_insert_before) != CursorTable(&target_row))
{
// In this case, we resolved the indirect cursor, so we must have an indirection table.
// We need to append to the target table and then insert a new row in the requested place into the indirection table.
if (!append_row(CursorTable(&target_row), new_row))
return false;

mdcursor_t new_indirection_row;
if (!md_insert_row_before(row_after_range, &new_indirection_row))
if (!md_insert_row_before(row_to_insert_before, &new_indirection_row))
return false;

if (!md_set_column_value_as_cursor(new_indirection_row, index_to_col(0, CursorTable(&row_after_range)->table_id), 1, new_row))
if (!md_set_column_value_as_cursor(new_indirection_row, index_to_col(0, CursorTable(&row_to_insert_before)->table_id), 1, new_row))
return false;

if (count == 0)
Expand All @@ -750,11 +736,11 @@ bool md_add_new_row_to_list(mdcursor_t list_owner, col_index_t list_col, mdcurso
md_commit_row_add(new_indirection_row);
return true;
}
else if (CursorEnd(&row_after_range))
else if (CursorEnd(&row_to_insert_before))
{
// In this case, we don't have an indirection table
// and we don't need to create one as we're inserting a row at the end of the table.
if (!append_row(CursorTable(&row_after_range), new_row))
if (!append_row(CursorTable(&row_to_insert_before), new_row))
return false;

if (count == 0)
Expand Down Expand Up @@ -807,8 +793,118 @@ bool md_add_new_row_to_list(mdcursor_t list_owner, col_index_t list_col, mdcurso
*list_col_details = (*list_col_details & ~mdtc_timask) | InsertTable(indirect_table);

// Now that we have created an indirection table, we can insert the row into it.
// We need to re-do all of the work that we've already done, so just call back into ourselves now that we have the indirection table.
return md_add_new_row_to_list(list_owner, list_col, new_row);
// We need to change our "row to insert before" cursor to point at the indirection table.
// Because we just created the indirection table, then we know that each row in the target table corresponds to the same row index
// in the indirection table.
row_to_insert_before = create_cursor(&target_table->cxt->tables[indirect_table], CursorRow(&range));

// Now, we can call back into ourselves to do the actual insert.
return add_new_row_to_list(list_owner, list_col, row_to_insert_before, new_row);
}

bool md_add_new_row_to_list(mdcursor_t list_owner, col_index_t list_col, mdcursor_t* new_row)
{
if (!col_points_to_list(&list_owner, list_col))
return false;

// Get the range of rows already in the parent's child list.
// If we have an indirection table already, we will get back a range in the indirection table here.
mdcursor_t existing_range;
uint32_t count;
if (!md_get_column_value_as_range(list_owner, list_col, &existing_range, &count))
return false;

if (CursorTable(&existing_range)->cxt == NULL)
{
// If we don't have a table to add the row to, create one.
if (!allocate_new_table(CursorTable(&list_owner)->cxt, CursorTable(&existing_range)->table_id))
return false;

// Now that we have a table, we recreate the "existing range" cursor as the one-past-the-end cursor
// This allows us to use the remaining logic unchanged.
existing_range = create_cursor(CursorTable(&existing_range), 1);
}

mdcursor_t row_after_range = existing_range;
// Move the cursor just past the end of the range. We'll insert a row at the end of the range.
if (!md_cursor_move(&row_after_range, count))
return false;

return add_new_row_to_list(list_owner, list_col, row_after_range, new_row);
}

bool md_add_new_row_to_sorted_list(mdcursor_t list_owner, col_index_t list_col, col_index_t sort_order_col, uint32_t sort_col_value, mdcursor_t* new_row)
{
if (!col_points_to_list(&list_owner, list_col))
return false;

// Get the range of rows already in the parent's child list.
// If we have an indirection table already, we will get back a range in the indirection table here.
mdcursor_t existing_range;
uint32_t count;
if (!md_get_column_value_as_range(list_owner, list_col, &existing_range, &count))
return false;

if (CursorTable(&existing_range)->cxt == NULL)
{
// If we don't have a table to add the row to, create one.
if (!allocate_new_table(CursorTable(&list_owner)->cxt, CursorTable(&existing_range)->table_id))
return false;

// Now that we have a table, we recreate the "existing range" cursor as the one-past-the-end cursor
// This allows us to use the remaining logic unchanged.
existing_range = create_cursor(CursorTable(&existing_range), 1);
}

mdcursor_t row_to_insert_before = existing_range;
// Move the cursor to just past the end of the range. If we don't find a place in the middle that we need to insert the row,
// we'll insert it here.
if (!md_cursor_move(&row_to_insert_before, count))
return false;

// The existing list isn't empty, so we need to find the correct place to insert the new row.
if (count > 0)
{
// In most cases we will be inserting at the end of the list,
// so start searching there to make this a little faster.
mdcursor_t row_to_check = row_to_insert_before;

// Move our cursor to the last row in the list.
// This can't return false as we got to row_to_insert_before by moving forward at least one row.
(void)md_cursor_move(&row_to_check, -1);
for (; CursorRow(&row_to_check) >= CursorRow(&existing_range); (void)md_cursor_move(&row_to_check, -1))
{
// If the range is in an indirection table, we need to normalize to the target table to
// get the sort column value.
mdcursor_t target_row;
if (!md_resolve_indirect_cursor(row_to_check, &target_row))
return false;

uint32_t current_sort_col_value;
if (1 != md_get_column_value_as_constant(target_row, sort_order_col, 1, &current_sort_col_value))
return false;

if (current_sort_col_value <= sort_col_value)
{
// row_to_check is the first row with a sort order less than or equal to the new row.
// So we want to insert the new row after this row.
// Set row_to_insert_before to the next row to ensure we insert the new row after this row.
row_to_insert_before = row_to_check;
(void)md_cursor_next(&row_to_insert_before); // We got to row_to_insert_before by moving back from an existing row, so there must be a next row.
break;
}
}
}

if (!add_new_row_to_list(list_owner, list_col, row_to_insert_before, new_row))
return false;

// Now that we've added the new column to the list, set the sort order column to the provided value to
// ensure the sort is accurate.
if (1 != md_set_column_value_as_constant(*new_row, sort_order_col, 1, &sort_col_value))
return false;

return true;
}

bool copy_cursor(mdcursor_t dest, mdcursor_t src)
Expand Down
8 changes: 8 additions & 0 deletions src/inc/dnmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,14 @@ bool md_append_row(mdhandle_t handle, mdtable_id_t table_id, mdcursor_t* new_row
// The table that new_child_row points to is treated as unsorted until md_commit_row_add is called after all columns have been set on the new row.
bool md_add_new_row_to_list(mdcursor_t list_owner, col_index_t list_col, mdcursor_t* new_row);

// Creates a new row in the list for the given cursor specified by the given column such that the values of the sort_order_col are maintained in ascending order.
// This method assumes that the list is currently sorted by the sort_order_col column.
// This method accounts for any indirection tables that may need to be created or maintained to ensure that
// the structure of the list is maintained without moving tokens.
// The table that new_row points to is treated as unsorted until md_commit_row_add is called after all columns have been set on the new row.
// The new_row row will also have the sort_order_col column initialized to sort_col_value.
bool md_add_new_row_to_sorted_list(mdcursor_t list_owner, col_index_t list_col, col_index_t sort_order_col, uint32_t sort_col_value, mdcursor_t* new_row);

// Finish the process of adding a row to the cursor's table.
void md_commit_row_add(mdcursor_t row);

Expand Down

0 comments on commit 6fd8b66

Please sign in to comment.