Skip to content

Commit

Permalink
Merge pull request #570 from MathieuBordere/revert-interrupt
Browse files Browse the repository at this point in the history
Revert "Merge pull request #560 from MathieuBordere/interrupt"
  • Loading branch information
cole-miller authored Feb 9, 2024
2 parents 4ca4dae + ef7741e commit 5e3128f
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 127 deletions.
27 changes: 5 additions & 22 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
#include "tracing.h"
#include "transport.h"

#include <stdlib.h>

/* Initialize the given buffer for reading, ensure it has the given size. */
static int init_read(struct conn *c, uv_buf_t *buf, size_t size)
{
Expand All @@ -20,46 +18,34 @@ static int init_read(struct conn *c, uv_buf_t *buf, size_t size)
}

static int read_message(struct conn *c);
static void conn_write_cb(uv_write_t *req, int status)
static void conn_write_cb(struct transport *transport, int status)
{
struct transport *t = req->data;
assert(t != NULL);
struct conn *c = t->data;
assert(c != NULL);
struct conn *c = transport->data;
bool finished;
int rv;
if (status != 0) {
tracef("write cb status %d", status);
goto abort;
}
if (c->closed) {
tracef("connection closing");
goto abort;
}

buffer__reset(&c->write);
buffer__advance(&c->write, message__sizeof(&c->response)); /* Header */

rv = gateway__resume(&c->gateway, &finished);
tracef("request finished: %d", finished);
if (rv != 0) {
goto abort;
}

/* Start reading the next message if we're not doing that already. */
if (c->reading_message) {
free(req);
if (!finished) {
return;
}

/* Start reading the next request */
rv = read_message(c);
if (rv != 0) {
goto abort;
}

free(req);
return;
abort:
free(req);
conn__stop(c);
}

Expand Down Expand Up @@ -205,7 +191,6 @@ static void read_message_cb(struct transport *transport, int status)
struct cursor cursor;
int rv;

c->reading_message = false;
if (status != 0) {
// errorf(c->logger, "read error");
tracef("read error %d", status);
Expand Down Expand Up @@ -237,7 +222,6 @@ static int read_message(struct conn *c)
tracef("init read failed %d", rv);
return rv;
}
c->reading_message = true;
rv = transport__read(&c->transport, &buf, read_message_cb);
if (rv != 0) {
tracef("transport read failed %d", rv);
Expand Down Expand Up @@ -336,7 +320,6 @@ int conn__start(struct conn *c,
}
c->handle.data = c;
c->closed = false;
c->reading_message = false;
/* First, we expect the client to send us the protocol version. */
rv = read_protocol(c);
if (rv != 0) {
Expand Down
1 change: 0 additions & 1 deletion src/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ struct conn
struct message response; /* Response message meta data */
struct handle handle;
bool closed;
bool reading_message; /* Conn is waiting for a message */
queue queue;
};

Expand Down
14 changes: 10 additions & 4 deletions src/gateway.c
Original file line number Diff line number Diff line change
Expand Up @@ -1394,8 +1394,11 @@ int gateway__handle(struct gateway *g,
goto handle;
}

/* Request in progress, the only time we allow interleaving requests is
* when the second request tries to interrupt a query yielding rows. */
/* Request in progress. TODO The current implementation doesn't allow
* reading a new request while a query is yielding rows, in that case
* gateway__resume in write_cb will indicate it has not finished
* returning results and a new request (in this case, the interrupt)
* will not be read. */
if (g->req->type == DQLITE_REQUEST_QUERY &&
type == DQLITE_REQUEST_INTERRUPT) {
goto handle;
Expand All @@ -1407,8 +1410,9 @@ int gateway__handle(struct gateway *g,
}

/* Receiving a request when one is ongoing on the same connection
* is an error, unless it's an interrupt request. The connection will be
* stopped due to the non-0 return value. */
* is a hard error. The connection will be stopped due to the non-0
* return code in case asserts are off. */
assert(false);
return SQLITE_BUSY;

handle:
Expand Down Expand Up @@ -1441,9 +1445,11 @@ int gateway__resume(struct gateway *g, bool *finished)
{
if (g->req == NULL || (g->req->type != DQLITE_REQUEST_QUERY &&
g->req->type != DQLITE_REQUEST_QUERY_SQL)) {
tracef("gateway resume - finished");
*finished = true;
return 0;
}
tracef("gateway resume - not finished");
*finished = false;
query_batch(g);
return 0;
Expand Down
26 changes: 17 additions & 9 deletions src/lib/transport.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#include <raft.h>

#include <stdlib.h>

#include "../../include/dqlite.h"

#include "assert.h"
Expand Down Expand Up @@ -124,7 +122,9 @@ int transport__init(struct transport *t, struct uv_stream_s *stream)
t->stream->data = t;
t->read.base = NULL;
t->read.len = 0;
t->write.data = t;
t->read_cb = NULL;
t->write_cb = NULL;
t->close_cb = NULL;

return 0;
Expand Down Expand Up @@ -161,15 +161,23 @@ int transport__read(struct transport *t, uv_buf_t *buf, transport_read_cb cb)
return 0;
}

int transport__write(struct transport *t, uv_buf_t *buf, uv_write_cb cb)
static void write_cb(uv_write_t *req, int status)
{
struct transport *t = req->data;
transport_write_cb cb = t->write_cb;

assert(cb != NULL);
t->write_cb = NULL;

cb(t, status);
}

int transport__write(struct transport *t, uv_buf_t *buf, transport_write_cb cb)
{
int rv;
uv_write_t *req = malloc(sizeof(*req));
if (req == NULL) {
return DQLITE_NOMEM;
}
req->data = t;
rv = uv_write(req, t->stream, buf, 1, cb);
assert(t->write_cb == NULL);
t->write_cb = cb;
rv = uv_write(&t->write, t->stream, buf, 1, write_cb);
if (rv != 0) {
return rv;
}
Expand Down
7 changes: 4 additions & 3 deletions src/lib/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
struct transport;
typedef void (*transport_read_cb)(struct transport *t, int status);
typedef void (*transport_write_cb)(struct transport *t, int status);
typedef void (*transport_close_cb)(struct transport *t);

/**
Expand All @@ -27,6 +28,7 @@ struct transport
uv_buf_t read; /* Read buffer */
uv_write_t write; /* Write request */
transport_read_cb read_cb; /* Read callback */
transport_write_cb write_cb; /* Write callback */
transport_close_cb close_cb; /* Close callback */
};

Expand All @@ -47,10 +49,9 @@ void transport__close(struct transport *t, transport_close_cb cb);
int transport__read(struct transport *t, uv_buf_t *buf, transport_read_cb cb);

/**
* Write the given buffer to the transport. The @cb gains ownership of
* uv_write_t and must `free` it at its own convenience.
* Write the given buffer to the transport.
*/
int transport__write(struct transport *t, uv_buf_t *buf, uv_write_cb cb);
int transport__write(struct transport *t, uv_buf_t *buf, transport_write_cb cb);

/* Create an UV stream object from the given fd. */
int transport__stream(struct uv_loop_s *loop,
Expand Down
72 changes: 0 additions & 72 deletions test/integration/test_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,46 +123,6 @@ TEST(client, query, setUp, tearDown, 0, client_params)
return MUNIT_OK;
}

TEST(client, queryReuseStmtIdAferInterrupt, setUp, tearDown, 0, client_params)
{
struct fixture *f = data;
uint32_t stmt_id;
uint64_t last_insert_id;
uint64_t rows_affected;
unsigned i;
struct rows rows;
(void)params;
PREPARE("CREATE TABLE test (n INT)", &stmt_id);
EXEC(stmt_id, &last_insert_id, &rows_affected);

PREPARE("BEGIN", &stmt_id);
EXEC(stmt_id, &last_insert_id, &rows_affected);

PREPARE("INSERT INTO test (n) VALUES(123)", &stmt_id);
for (i = 0; i < 4098; i++) {
EXEC(stmt_id, &last_insert_id, &rows_affected);
}

PREPARE("COMMIT", &stmt_id);
EXEC(stmt_id, &last_insert_id, &rows_affected);

/* More than 1 response buffer will be needed to return all the rows, so
* we are able to interrupt the query. */
PREPARE("SELECT * FROM test", &stmt_id);
bool done = true;
QUERY_DONE(stmt_id, &rows, &done);
munit_assert_false(done);

clientSendInterrupt(f->client, NULL);
clientCloseRows(&rows);

/* Ensure stmt_id is still valid after interrupt. */
QUERY(stmt_id, &rows);

clientCloseRows(&rows);
return MUNIT_OK;
}

TEST(client, querySql, setUp, tearDown, 0, client_params)
{
struct fixture *f = data;
Expand All @@ -189,35 +149,3 @@ TEST(client, querySql, setUp, tearDown, 0, client_params)

return MUNIT_OK;
}

TEST(client, querySqlInterrupt, setUp, tearDown, 0, client_params)
{
struct fixture *f = data;
uint32_t stmt_id;
uint64_t last_insert_id;
uint64_t rows_affected;
unsigned i;
struct rows rows;
bool done = true;
(void)params;
EXEC_SQL("CREATE TABLE test (n INT)", &last_insert_id, &rows_affected);

EXEC_SQL("BEGIN", &last_insert_id, &rows_affected);

PREPARE("INSERT INTO test (n) VALUES(123)", &stmt_id);
for (i = 0; i < 4098; i++) {
EXEC(stmt_id, &last_insert_id, &rows_affected);
}

EXEC_SQL("COMMIT", &last_insert_id, &rows_affected);

/* More than 1 response buffer will be needed to return all the rows, so
* we are able to interrupt the query. */
QUERY_SQL_DONE("SELECT * FROM test", &rows, &done);
munit_assert_false(done);

clientSendInterrupt(f->client, NULL);
clientCloseRows(&rows);

return MUNIT_OK;
}
16 changes: 5 additions & 11 deletions test/lib/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,29 +150,23 @@
munit_assert_int(rv_, ==, 0); \
}

/* Perform a query, DONE is a pointer to a bool that will be true when the query
* is done. */
#define QUERY_DONE(STMT_ID, ROWS, DONE) \
/* Perform a query. */
#define QUERY(STMT_ID, ROWS) \
{ \
int rv_; \
rv_ = clientSendQuery(f->client, STMT_ID, NULL, 0, NULL); \
munit_assert_int(rv_, ==, 0); \
rv_ = clientRecvRows(f->client, ROWS, DONE, NULL); \
rv_ = clientRecvRows(f->client, ROWS, NULL, NULL); \
munit_assert_int(rv_, ==, 0); \
}

/* Perform a query. */
#define QUERY(STMT_ID, ROWS) QUERY_DONE(STMT_ID, ROWS, NULL)

#define QUERY_SQL_DONE(SQL, ROWS, DONE) \
#define QUERY_SQL(SQL, ROWS) \
{ \
int rv_; \
rv_ = clientSendQuerySQL(f->client, SQL, NULL, 0, NULL); \
munit_assert_int(rv_, ==, 0); \
rv_ = clientRecvRows(f->client, ROWS, DONE, NULL); \
rv_ = clientRecvRows(f->client, ROWS, NULL, NULL); \
munit_assert_int(rv_, ==, 0); \
}

#define QUERY_SQL(SQL, ROWS) QUERY_SQL_DONE(SQL, ROWS, NULL)

#endif /* TEST_CLIENT_H */
7 changes: 2 additions & 5 deletions test/unit/lib/test_transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,11 @@ static void read_cb(struct transport *transport, int status)
f->read.status = status;
}

static void write_cb(uv_write_t *req, int status)
static void write_cb(struct transport *transport, int status)
{
struct transport *t = req->data;
munit_assert_ptr_not_null(t);
struct fixture *f = t->data;
struct fixture *f = transport->data;
f->write.invoked = true;
f->write.status = status;
free(req);
}

static void *setup(const MunitParameter params[], void *user_data)
Expand Down

0 comments on commit 5e3128f

Please sign in to comment.