Skip to content

Commit

Permalink
Add support for indexing unlogged tables (#253)
Browse files Browse the repository at this point in the history
* added support for unlogged tables and corresponding tests
* clarified manual test instructions
* added necessary crit section lines inside StoreExternalEmptyIndex to prevent db crashes in sanitizer tests
* added include statement for CRIT_SECTION macros for earlier versions of pg
* added test hnsw_logged_unlogged which tests changing a table from logged to unlogged
* added dot out file for hnsw_logged_unlogged test
* added manual test cases where we switch from logged/unlogged
* added replica tests for unlogged tables
* added unique-distanced vectors so that output of distance queries are forced to be unique in hnsw_logged_unlogged
* finished making hnsw_logged_unlogged out results completely determinstic
* fixed runner.c to reconnect to root db on every test case to allow running several tests after crashing the root db as part of a test (like replica_test_unlogged currently does)
* replace killall with kill -9 pid AND finish root connection after each test
  • Loading branch information
therealdarkknight authored Jan 13, 2024
1 parent 9ac519e commit 670c318
Show file tree
Hide file tree
Showing 21 changed files with 1,424 additions and 31 deletions.
7 changes: 5 additions & 2 deletions ci/scripts/run-tests-linux.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ function run_db_tests(){
cd $WORKDIR/build && \
make test && \
make test-client && \
run_pgvector_tests && \
killall postgres && \
run_pgvector_tests
pg_pid=$(fuser -a 5432/tcp 2>/dev/null | awk "{print $1}" | awk '{$1=$1};1')
if [[ ! -z "$pg_pid" ]]; then
kill -9 $pg_pid
fi
gcovr -r $WORKDIR/src/ --object-directory $WORKDIR/build/ --xml /tmp/coverage.xml
fi
}
Expand Down
51 changes: 41 additions & 10 deletions src/hnsw/build.c
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ static void InitBuildState(HnswBuildState *buildstate, Relation heap, Relation i
buildstate->index_file_path = ldb_HnswGetIndexFilePath(index);

// If a dimension wasn't specified try to infer it
if(buildstate->dimensions < 1) {
if(heap != NULL && buildstate->dimensions < 1) {
buildstate->dimensions = InferDimension(heap, indexInfo);
}
/* Require column to have dimensions to be indexed */
Expand Down Expand Up @@ -416,10 +416,9 @@ static void ScanTable(HnswBuildState *buildstate)
}

/*
* Build the index
* Build the index, writing to the main fork
*/
static void BuildIndex(
Relation heap, Relation index, IndexInfo *indexInfo, HnswBuildState *buildstate, ForkNumber forkNum)
static void BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo, HnswBuildState *buildstate)
{
usearch_error_t error = NULL;
usearch_init_options_t opts;
Expand Down Expand Up @@ -543,7 +542,7 @@ static void BuildIndex(

//****************************** saving to WAL BEGIN ******************************//
UpdateProgress(PROGRESS_CREATEIDX_PHASE, PROGRESS_HNSW_PHASE_LOAD);
StoreExternalIndex(index, &metadata, forkNum, result_buf, &opts, num_added_vectors);
StoreExternalIndex(index, &metadata, MAIN_FORKNUM, result_buf, &opts, num_added_vectors);
//****************************** saving to WAL END ******************************//

munmap_ret = munmap(result_buf, index_file_stat.st_size);
Expand All @@ -560,6 +559,38 @@ static void BuildIndex(
FreeBuildState(buildstate);
}

/*
* Build an empty index, writing to the init fork
*/
static void BuildEmptyIndex(Relation index, IndexInfo *indexInfo, HnswBuildState *buildstate)
{
usearch_error_t error = NULL;
usearch_init_options_t opts;
MemSet(&opts, 0, sizeof(opts));

InitBuildState(buildstate, NULL, index, indexInfo);
opts.dimensions = buildstate->dimensions;
PopulateUsearchOpts(index, &opts);

buildstate->usearch_index = usearch_init(&opts, &error);
assert(error == NULL);

buildstate->hnsw = NULL;

char *result_buf = NULL;
usearch_save(buildstate->usearch_index, NULL, &result_buf, &error);
assert(error == NULL && result_buf != NULL);

StoreExternalEmptyIndex(index, INIT_FORKNUM, result_buf, &opts);

usearch_free(buildstate->usearch_index, &error);
free(result_buf);
assert(error == NULL);
buildstate->usearch_index = NULL;

FreeBuildState(buildstate);
}

/*
* Build the index for a logged table
*/
Expand All @@ -568,7 +599,7 @@ IndexBuildResult *ldb_ambuild(Relation heap, Relation index, IndexInfo *indexInf
IndexBuildResult *result;
HnswBuildState buildstate;

BuildIndex(heap, index, indexInfo, &buildstate, MAIN_FORKNUM);
BuildIndex(heap, index, indexInfo, &buildstate);

result = (IndexBuildResult *)palloc(sizeof(IndexBuildResult));
result->heap_tuples = buildstate.reltuples;
Expand All @@ -578,13 +609,13 @@ IndexBuildResult *ldb_ambuild(Relation heap, Relation index, IndexInfo *indexInf
}

/*
* Build the index for an unlogged table
* Build an empty index for an unlogged table
*/
void ldb_ambuildunlogged(Relation index)
{
LDB_UNUSED(index);
// todo::
elog(ERROR, "hnsw index on unlogged tables is currently not supported");
HnswBuildState buildstate;
IndexInfo *indexInfo = BuildIndexInfo(index);
BuildEmptyIndex(index, indexInfo, &buildstate);
}

void ldb_reindex_external_index(Oid indrelid)
Expand Down
79 changes: 73 additions & 6 deletions src/hnsw/external_index.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <assert.h>
#include <common/relpath.h>
#include <hnsw/fa_cache.h>
#include <miscadmin.h> // START_CRIT_SECTION, END_CRIT_SECTION
#include <pg_config.h> // BLCKSZ
#include <storage/bufmgr.h> // Buffer
#include <utils/hsearch.h>
Expand Down Expand Up @@ -119,11 +120,13 @@ static void UpdateHeaderBlockMapGroupDesc(
hdr_copy->blockmap_groups[ groupno ] = *desc;

log_rec_ptr = GenericXLogFinish(state);
assert(log_rec_ptr != InvalidXLogRecPtr);
if(flush_log) {
LDB_FAILURE_POINT_CRASH_IF_ENABLED("just_before_wal_flush");
XLogFlush(log_rec_ptr);
LDB_FAILURE_POINT_CRASH_IF_ENABLED("just_after_wal_flush");
if(RelationNeedsWAL(index)) {
assert(log_rec_ptr != InvalidXLogRecPtr);
if(flush_log) {
LDB_FAILURE_POINT_CRASH_IF_ENABLED("just_before_wal_flush");
XLogFlush(log_rec_ptr);
LDB_FAILURE_POINT_CRASH_IF_ENABLED("just_after_wal_flush");
}
}
ReleaseBuffer(hdr_buf);
}
Expand Down Expand Up @@ -417,7 +420,9 @@ void StoreExternalIndexBlockMapGroup(Relation index,
// When the blockmap page group was created, header block was updated accordingly in
// ContinueBlockMapGroupInitialization call above.
const BlockNumber blockmapno = blockmap_id + headerp->blockmap_groups[ blockmap_groupno ].first_block;
Buffer buf = ReadBufferExtended(index, MAIN_FORKNUM, blockmapno, RBM_NORMAL, NULL);
// todo:: should MAIN_FORKNUM be hardcoded here or use the forkNum parameter, from a code readability standpoint
// (other places in this file as well)
Buffer buf = ReadBufferExtended(index, MAIN_FORKNUM, blockmapno, RBM_NORMAL, NULL);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);

GenericXLogState *state = GenericXLogStart(index);
Expand All @@ -433,6 +438,68 @@ void StoreExternalIndexBlockMapGroup(Relation index,
}
}

void StoreExternalEmptyIndex(Relation index, ForkNumber forkNum, char *data, usearch_init_options_t *opts)
{
// this method is intended to store empty indexes for unlogged tables (ambuildempty method) and should hence be
// called with forkNum = INIT_FORKNUM

Buffer header_buf = ReadBufferExtended(index, forkNum, P_NEW, RBM_NORMAL, NULL);

// even when we are creating a new page, it must always be the first page we create
// and should therefore have BlockNumber 0
assert(BufferGetBlockNumber(header_buf) == 0);

LockBuffer(header_buf, BUFFER_LOCK_EXCLUSIVE);

START_CRIT_SECTION();

Page header_page = BufferGetPage(header_buf);

PageInit(header_page, BufferGetPageSize(header_buf), 0);

HnswIndexHeaderPage *headerp = (HnswIndexHeaderPage *)PageGetContents(header_page);

headerp->magicNumber = LDB_WAL_MAGIC_NUMBER;
headerp->version = LDB_WAL_VERSION_NUMBER;
headerp->vector_dim = opts->dimensions;
headerp->m = opts->connectivity;
headerp->ef_construction = opts->expansion_add;
headerp->ef = opts->expansion_search;
headerp->metric_kind = opts->metric_kind;

headerp->num_vectors = 0;
headerp->blockmap_groups_nr = 0;

for(uint32 i = 0; i < lengthof(headerp->blockmap_groups); ++i) {
headerp->blockmap_groups[ i ] = (HnswBlockMapGroupDesc){
.first_block = InvalidBlockNumber,
.blockmaps_initialized = 0,
};
}

headerp->last_data_block = InvalidBlockNumber;

memcpy(headerp->usearch_header, data, USEARCH_HEADER_SIZE);
((PageHeader)header_page)->pd_lower = ((char *)headerp + sizeof(HnswIndexHeaderPage)) - (char *)header_page;

MarkBufferDirty(header_buf);

// Write a WAL record containing a full image of the page. Even though this is an unlogged table that doesn't use
// WAL, this line appears to flush changes to disc immediately (and not waiting after the first checkpoint). This is
// important because this empty index will live in the init fork, where it will be used to reset the unlogged index
// after a crash, and so we need this written to disc in order to have proper crash recovery functionality available
// immediately. Otherwise, if a crash occurs before the first postgres checkpoint, postgres can't read the init fork
// from disc and we will have a corrupted index when postgres attempts recovery. This is also what nbtree access
// method's implementation does for empty unlogged indexes (ambuildempty implementation).
// NOTE: we MUST have this be inside a crit section, or else an assertion inside this method will fail and crash the
// db
log_newpage_buffer(header_buf, false);

END_CRIT_SECTION();

UnlockReleaseBuffer(header_buf);
}

void StoreExternalIndex(Relation index,
usearch_metadata_t *external_index_metadata,
ForkNumber forkNum,
Expand Down
1 change: 1 addition & 0 deletions src/hnsw/external_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ typedef struct
} HnswInsertState;

uint32 UsearchNodeBytes(usearch_metadata_t *metadata, int vector_bytes, int level);
void StoreExternalEmptyIndex(Relation index, ForkNumber forkNum, char *data, usearch_init_options_t *opts);
void StoreExternalIndex(Relation index,
usearch_metadata_t *external_index_metadata,
ForkNumber forkNum,
Expand Down
19 changes: 19 additions & 0 deletions src/hnsw/extra_dirtied.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,25 @@ void extra_dirtied_release_all(ExtraDirtiedBufs *ed)
ed->extra_dirtied_size = 0;
}

// Like extra_dirtied_release_all but does not perform a InvalidXLogRecPtr check.
// Used for inserts on unlogged tables, which do not write to WAL
void extra_dirtied_release_all_no_xlog_check(ExtraDirtiedBufs *ed)
{
for(int i = 0; i < ed->extra_dirtied_state_size; ++i) {
GenericXLogFinish(ed->extra_dirtied_state[ i ]);
}

for(int i = 0; i < ed->extra_dirtied_size; i++) {
assert(BufferIsValid(ed->extra_dirtied_buf[ i ]));
// header is not considered extra. we know we should not have dirtied it
// sanity check callees that manimulate extra_dirtied did not violate this
assert(ed->extra_dirtied_blockno[ i ] != 0);
// MarkBufferDirty() had been called by by GenericXLogFinish() already
UnlockReleaseBuffer(ed->extra_dirtied_buf[ i ]);
}
ed->extra_dirtied_size = 0;
}

void extra_dirtied_free(ExtraDirtiedBufs *ed)
{
if(ed->extra_dirtied_size != 0) {
Expand Down
1 change: 1 addition & 0 deletions src/hnsw/extra_dirtied.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ void extra_dirtied_add_wal_read_buffer(
ExtraDirtiedBufs* ed, Relation index, ForkNumber forkNum, BlockNumber blockno, Buffer* buf, Page* page);
Page extra_dirtied_get(ExtraDirtiedBufs* ed, BlockNumber blockno, Buffer* out_buf);
void extra_dirtied_release_all(ExtraDirtiedBufs* ed);
void extra_dirtied_release_all_no_xlog_check(ExtraDirtiedBufs* ed);
void extra_dirtied_free(ExtraDirtiedBufs* ed);

#endif // LDB_HNSW_EXTRA_DIRTIED_H
14 changes: 11 additions & 3 deletions src/hnsw/insert.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ bool ldb_aminsert(Relation index,
HnswIndexTuple *new_tuple;
usearch_init_options_t opts = {0};
LDB_UNUSED(heap);
LDB_UNUSED(indexInfo);
#if PG_VERSION_NUM >= 140000
LDB_UNUSED(indexUnchanged);
#endif
Expand Down Expand Up @@ -109,7 +110,7 @@ bool ldb_aminsert(Relation index,
hdr = (HnswIndexHeaderPage *)PageGetContents(hdr_page);
assert(hdr->magicNumber == LDB_WAL_MAGIC_NUMBER);

opts.dimensions = GetHnswIndexDimensions(index, indexInfo);
opts.dimensions = hdr->vector_dim;
CheckHnswIndexDimensions(index, values[ 0 ], opts.dimensions);
PopulateUsearchOpts(index, &opts);
opts.retriever_ctx = ldb_wal_retriever_area_init(index, hdr);
Expand Down Expand Up @@ -182,16 +183,23 @@ bool ldb_aminsert(Relation index,

ldb_wal_retriever_area_reset(insertstate->retriever_ctx, hdr);

int needs_wal = RelationNeedsWAL(index);
// we only release the header buffer AFTER inserting is finished to make sure nobody else changes the block
// structure. todo:: critical section here can definitely be shortened
{
// GenericXLogFinish also calls MarkBufferDirty(buf)
XLogRecPtr ptr = GenericXLogFinish(state);
assert(ptr != InvalidXLogRecPtr);
if(needs_wal) {
assert(ptr != InvalidXLogRecPtr);
}
LDB_UNUSED(ptr);
}

extra_dirtied_release_all(insertstate->retriever_ctx->extra_dirted);
if(needs_wal) {
extra_dirtied_release_all(insertstate->retriever_ctx->extra_dirted);
} else {
extra_dirtied_release_all_no_xlog_check(insertstate->retriever_ctx->extra_dirted);
}

usearch_free(insertstate->uidx, &error);
if(error != NULL) {
Expand Down
Loading

0 comments on commit 670c318

Please sign in to comment.