diff --git a/src/dnmd/deltas.c b/src/dnmd/deltas.c index b0ab0642..dd8efee9 100644 --- a/src/dnmd/deltas.c +++ b/src/dnmd/deltas.c @@ -213,6 +213,9 @@ static bool process_log(mdcxt_t* cxt, mdcxt_t* delta) if (!md_token_to_cursor(cxt, tk, &parent)) return false; + // We don't use md_add_new_row_to_sorted_list here because we don't know the value of the Sequence column + // until we process the next record in the EncLog. + // We'll re-sort the list after we process the next entry in the EncLog. if (!add_list_target_row(parent, mdtMethodDef_ParamList)) return false; break; diff --git a/src/dnmd/write.c b/src/dnmd/write.c index 9d969cb9..23fb5b16 100644 --- a/src/dnmd/write.c +++ b/src/dnmd/write.c @@ -691,40 +691,26 @@ bool md_append_row(mdhandle_t handle, mdtable_id_t table_id, mdcursor_t* new_row return append_row(table, new_row); } -bool md_add_new_row_to_list(mdcursor_t list_owner, col_index_t list_col, mdcursor_t* new_row) +static bool add_new_row_to_list(mdcursor_t list_owner, col_index_t list_col, mdcursor_t row_to_insert_before, mdcursor_t* new_row) { - if (!col_points_to_list(&list_owner, list_col)) - return false; - + assert(col_points_to_list(&list_owner, list_col)); // Get the range of rows already in the parent's child list. // If we have an indirection table already, we will get back a range in the indirection table here. - mdcursor_t existing_range; + mdcursor_t range; uint32_t count; - if (!md_get_column_value_as_range(list_owner, list_col, &existing_range, &count)) - return false; - - if (CursorTable(&existing_range)->cxt == NULL) - { - // If we don't have a table to add the row to, create one. - if (!allocate_new_table(CursorTable(&list_owner)->cxt, CursorTable(&existing_range)->table_id)) - return false; - - // Now that we have a table, we recreate the "existing range" cursor as the one-past-the-end cursor - // This allows us to use the remaining logic unchanged. - existing_range = create_cursor(CursorTable(&existing_range), 1); - } - - mdcursor_t row_after_range = existing_range; - // Move the cursor just past the end of the range. We'll insert a row at the end of the range. - if (!md_cursor_move(&row_after_range, count)) + if (!md_get_column_value_as_range(list_owner, list_col, &range, &count)) return false; + + // Assert that the insertion location is in our range. + assert(CursorTable(&range) == CursorTable(&row_to_insert_before)); + assert(CursorRow(&range) <= CursorRow(&row_to_insert_before) && CursorRow(&row_to_insert_before) <= CursorRow(&range) + count); mdcursor_t target_row; // If the range is in an indirection table, we'll normalize our insert to the actual target table. - if (!md_resolve_indirect_cursor(row_after_range, &target_row)) + if (!md_resolve_indirect_cursor(row_to_insert_before, &target_row)) return false; - if (CursorTable(&row_after_range) != CursorTable(&target_row)) + if (CursorTable(&row_to_insert_before) != CursorTable(&target_row)) { // In this case, we resolved the indirect cursor, so we must have an indirection table. // We need to append to the target table and then insert a new row in the requested place into the indirection table. @@ -732,10 +718,10 @@ bool md_add_new_row_to_list(mdcursor_t list_owner, col_index_t list_col, mdcurso return false; mdcursor_t new_indirection_row; - if (!md_insert_row_before(row_after_range, &new_indirection_row)) + if (!md_insert_row_before(row_to_insert_before, &new_indirection_row)) return false; - if (!md_set_column_value_as_cursor(new_indirection_row, index_to_col(0, CursorTable(&row_after_range)->table_id), 1, new_row)) + if (!md_set_column_value_as_cursor(new_indirection_row, index_to_col(0, CursorTable(&row_to_insert_before)->table_id), 1, new_row)) return false; if (count == 0) @@ -750,11 +736,11 @@ bool md_add_new_row_to_list(mdcursor_t list_owner, col_index_t list_col, mdcurso md_commit_row_add(new_indirection_row); return true; } - else if (CursorEnd(&row_after_range)) + else if (CursorEnd(&row_to_insert_before)) { // In this case, we don't have an indirection table // and we don't need to create one as we're inserting a row at the end of the table. - if (!append_row(CursorTable(&row_after_range), new_row)) + if (!append_row(CursorTable(&row_to_insert_before), new_row)) return false; if (count == 0) @@ -807,8 +793,118 @@ bool md_add_new_row_to_list(mdcursor_t list_owner, col_index_t list_col, mdcurso *list_col_details = (*list_col_details & ~mdtc_timask) | InsertTable(indirect_table); // Now that we have created an indirection table, we can insert the row into it. - // We need to re-do all of the work that we've already done, so just call back into ourselves now that we have the indirection table. - return md_add_new_row_to_list(list_owner, list_col, new_row); + // We need to change our "row to insert before" cursor to point at the indirection table. + // Because we just created the indirection table, then we know that each row in the target table corresponds to the same row index + // in the indirection table. + row_to_insert_before = create_cursor(&target_table->cxt->tables[indirect_table], CursorRow(&range)); + + // Now, we can call back into ourselves to do the actual insert. + return add_new_row_to_list(list_owner, list_col, row_to_insert_before, new_row); +} + +bool md_add_new_row_to_list(mdcursor_t list_owner, col_index_t list_col, mdcursor_t* new_row) +{ + if (!col_points_to_list(&list_owner, list_col)) + return false; + + // Get the range of rows already in the parent's child list. + // If we have an indirection table already, we will get back a range in the indirection table here. + mdcursor_t existing_range; + uint32_t count; + if (!md_get_column_value_as_range(list_owner, list_col, &existing_range, &count)) + return false; + + if (CursorTable(&existing_range)->cxt == NULL) + { + // If we don't have a table to add the row to, create one. + if (!allocate_new_table(CursorTable(&list_owner)->cxt, CursorTable(&existing_range)->table_id)) + return false; + + // Now that we have a table, we recreate the "existing range" cursor as the one-past-the-end cursor + // This allows us to use the remaining logic unchanged. + existing_range = create_cursor(CursorTable(&existing_range), 1); + } + + mdcursor_t row_after_range = existing_range; + // Move the cursor just past the end of the range. We'll insert a row at the end of the range. + if (!md_cursor_move(&row_after_range, count)) + return false; + + return add_new_row_to_list(list_owner, list_col, row_after_range, new_row); +} + +bool md_add_new_row_to_sorted_list(mdcursor_t list_owner, col_index_t list_col, col_index_t sort_order_col, uint32_t sort_col_value, mdcursor_t* new_row) +{ + if (!col_points_to_list(&list_owner, list_col)) + return false; + + // Get the range of rows already in the parent's child list. + // If we have an indirection table already, we will get back a range in the indirection table here. + mdcursor_t existing_range; + uint32_t count; + if (!md_get_column_value_as_range(list_owner, list_col, &existing_range, &count)) + return false; + + if (CursorTable(&existing_range)->cxt == NULL) + { + // If we don't have a table to add the row to, create one. + if (!allocate_new_table(CursorTable(&list_owner)->cxt, CursorTable(&existing_range)->table_id)) + return false; + + // Now that we have a table, we recreate the "existing range" cursor as the one-past-the-end cursor + // This allows us to use the remaining logic unchanged. + existing_range = create_cursor(CursorTable(&existing_range), 1); + } + + mdcursor_t row_to_insert_before = existing_range; + // Move the cursor to just past the end of the range. If we don't find a place in the middle that we need to insert the row, + // we'll insert it here. + if (!md_cursor_move(&row_to_insert_before, count)) + return false; + + // The existing list isn't empty, so we need to find the correct place to insert the new row. + if (count > 0) + { + // In most cases we will be inserting at the end of the list, + // so start searching there to make this a little faster. + mdcursor_t row_to_check = row_to_insert_before; + + // Move our cursor to the last row in the list. + // This can't return false as we got to row_to_insert_before by moving forward at least one row. + (void)md_cursor_move(&row_to_check, -1); + for (; CursorRow(&row_to_check) >= CursorRow(&existing_range); (void)md_cursor_move(&row_to_check, -1)) + { + // If the range is in an indirection table, we need to normalize to the target table to + // get the sort column value. + mdcursor_t target_row; + if (!md_resolve_indirect_cursor(row_to_check, &target_row)) + return false; + + uint32_t current_sort_col_value; + if (1 != md_get_column_value_as_constant(target_row, sort_order_col, 1, ¤t_sort_col_value)) + return false; + + if (current_sort_col_value <= sort_col_value) + { + // row_to_check is the first row with a sort order less than or equal to the new row. + // So we want to insert the new row after this row. + // Set row_to_insert_before to the next row to ensure we insert the new row after this row. + row_to_insert_before = row_to_check; + (void)md_cursor_next(&row_to_insert_before); // We got to row_to_insert_before by moving back from an existing row, so there must be a next row. + break; + } + } + } + + if (!add_new_row_to_list(list_owner, list_col, row_to_insert_before, new_row)) + return false; + + // Now that we've added the new column to the list, set the sort order column to the provided value to + // ensure the sort is accurate. + if (1 != md_set_column_value_as_constant(*new_row, sort_order_col, 1, &sort_col_value)) + return false; + + return true; } bool copy_cursor(mdcursor_t dest, mdcursor_t src) diff --git a/src/inc/dnmd.h b/src/inc/dnmd.h index 8768b4cd..88813d13 100644 --- a/src/inc/dnmd.h +++ b/src/inc/dnmd.h @@ -521,6 +521,14 @@ bool md_append_row(mdhandle_t handle, mdtable_id_t table_id, mdcursor_t* new_row // The table that new_child_row points to is treated as unsorted until md_commit_row_add is called after all columns have been set on the new row. bool md_add_new_row_to_list(mdcursor_t list_owner, col_index_t list_col, mdcursor_t* new_row); +// Creates a new row in the list for the given cursor specified by the given column such that the values of the sort_order_col are maintained in ascending order. +// This method assumes that the list is currently sorted by the sort_order_col column. +// This method accounts for any indirection tables that may need to be created or maintained to ensure that +// the structure of the list is maintained without moving tokens. +// The table that new_row points to is treated as unsorted until md_commit_row_add is called after all columns have been set on the new row. +// The new_row row will also have the sort_order_col column initialized to sort_col_value. +bool md_add_new_row_to_sorted_list(mdcursor_t list_owner, col_index_t list_col, col_index_t sort_order_col, uint32_t sort_col_value, mdcursor_t* new_row); + // Finish the process of adding a row to the cursor's table. void md_commit_row_add(mdcursor_t row);