Skip to content

Commit

Permalink
Merge pull request #675 from marco6/fix-prepared-cleanup
Browse files Browse the repository at this point in the history
Cleanup prepared statement during node close
  • Loading branch information
cole-miller authored Jul 26, 2024
2 parents 4f3187c + e6ac700 commit 39ffc97
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 0 deletions.
9 changes: 9 additions & 0 deletions src/gateway.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ void gateway__leader_close(struct gateway *g, int reason)
*/
sqlite3_finalize(g->req->stmt);
g->req = NULL;
} else if (g->req->type == DQLITE_REQUEST_QUERY) {
/* In case the statement is a prepared one, it
* will be finalized by the stmt__registry_close
* call below. Nevertheless, we must signal that
* the request is not in place anymore so that any
* callback which is already in the queue will not
* attempt to execute a finalized statement.
*/
g->req = NULL;
}
}
stmt__registry_close(&g->stmts);
Expand Down
50 changes: 50 additions & 0 deletions test/unit/test_gateway.c
Original file line number Diff line number Diff line change
Expand Up @@ -1631,6 +1631,56 @@ TEST_CASE(query, manyParams, NULL)
return MUNIT_OK;
}


/* Successfully query that yields a large number of rows that need to be split
* into several reponses. */
TEST_CASE(query, close_while_in_flight, NULL)
{
struct query_fixture *f = data;
unsigned i;
uint64_t stmt_id;
uint64_t n;
const char *column;
struct value value;
bool finished;
(void)params;
EXEC("BEGIN");

/* 16 = 8B header + 8B value (int) */
unsigned n_rows_buffer = max_rows_buffer(16);
/* Insert 1 less than 2 response buffers worth of rows, otherwise we
* need 3 responses, of which the last one contains no rows. */
for (i = 0; i < ((2 * n_rows_buffer) - 1); i++) {
EXEC("INSERT INTO test(n) VALUES(123)");
}
EXEC("COMMIT");

PREPARE("SELECT n FROM test");
f->request.db_id = 0;
f->request.stmt_id = stmt_id;
ENCODE(&f->request, query);
HANDLE(QUERY);
ASSERT_CALLBACK(0, ROWS);

uint64__decode(f->cursor, &n);
munit_assert_int(n, ==, 1);
text__decode(f->cursor, &column);
munit_assert_string_equal(column, "n");

/* First response contains max amount of rows */
for (i = 0; i < n_rows_buffer; i++) {
DECODE_ROW(1, &value);
munit_assert_int(value.type, ==, SQLITE_INTEGER);
munit_assert_int(value.integer, ==, 123);
}

/* Simulate a gateway close */
gateway__close(f->gateway);
gateway__resume(f->gateway, &finished);

return MUNIT_OK;
}

/******************************************************************************
*
* finalize
Expand Down

0 comments on commit 39ffc97

Please sign in to comment.