Skip to content

Commit

Permalink
WIP: start working on a way to send EXEC_SQL to the DB thread
Browse files Browse the repository at this point in the history
This has a deadlock bug that needs to be ironed out.

Signed-off-by: Cole Miller <[email protected]>
  • Loading branch information
cole-miller committed Oct 18, 2023
1 parent 779c991 commit f638fd7
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 15 deletions.
41 changes: 36 additions & 5 deletions src/revamp.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,47 @@
int dbContextInit(struct db_context *ctx, struct config *config)
{
int rv;
rv = sem_init(&ctx->sem, 0, 0);
if (rv != 0) {
return DQLITE_ERROR;
}
rv = pthread_mutex_init(&ctx->mutex, NULL);
assert(rv == 0);
rv = pthread_cond_init(&ctx->cond, NULL);
assert(rv == 0);
registry__init(&ctx->registry, config);
ctx->shutdown = false;
return 0;
}

int postExecSqlReq(struct db_context *ctx,
struct exec_sql_req req,
void *data,
void (*cb)(struct exec_sql_req, int, void *))
{
(void)ctx;
(void)req;
(void)data;
(void)cb;
return 0;
}

void dbContextClose(struct db_context *ctx)
{
registry__close(&ctx->registry);
sem_destroy(&ctx->sem);
pthread_cond_destroy(&ctx->cond);
pthread_mutex_destroy(&ctx->mutex);
}

void *dbTask(void *arg)
{
struct db_context *ctx = arg;
int rv;

rv = pthread_mutex_lock(&ctx->mutex);
assert(rv == 0);
for (;;) {
rv = pthread_cond_wait(&ctx->cond, &ctx->mutex);
assert(rv == 0);
if (ctx->shutdown) {
break;
}
}
return NULL;
}
26 changes: 24 additions & 2 deletions src/revamp.h
Original file line number Diff line number Diff line change
@@ -1,17 +1,39 @@
#ifndef DQLITE_REVAMP_H_
#define DQLITE_REVAMP_H_

#include "lib/queue.h"
#include "registry.h"
#include "tuple.h"

#include <semaphore.h>
#include <pthread.h>
#include <stdbool.h>

struct db_context
{
sem_t sem;
pthread_mutex_t mutex;
pthread_cond_t cond;
struct registry registry;
queue exec_sql_reqs;
bool shutdown;
};

struct exec_sql_req
{
char *db_name;
char *sql;
struct value *params;
queue queue;
};

int dbContextInit(struct db_context *ctx, struct config *config);

int postExecSqlReq(struct db_context *ctx,
struct exec_sql_req req,
void *data,
void (*cb)(struct exec_sql_req, int, void *));

void dbContextClose(struct db_context *ctx);

void *dbTask(void *arg);

#endif
18 changes: 10 additions & 8 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "lib/fs.h"
#include "logger.h"
#include "protocol.h"
#include "revamp.h"
#include "roles.h"
#include "tracing.h"
#include "translate.h"
Expand Down Expand Up @@ -529,7 +530,10 @@ static void stopCb(uv_async_t *stop)
}
raft_close(&d->raft, raftCloseCb);

sem_post(&d->db_ctx->sem);
pthread_mutex_lock(&d->db_ctx->mutex);
d->db_ctx->shutdown = true;
pthread_cond_signal(&d->db_ctx->cond);
pthread_mutex_unlock(&d->db_ctx->mutex);
pthread_join(d->db_thread, NULL);
}

Expand Down Expand Up @@ -676,13 +680,6 @@ static void roleManagementTimerCb(uv_timer_t *handle)
RolesAdjust(d);
}

static void *dbTask(void *arg)
{
struct db_context *ctx = arg;
sem_wait(&ctx->sem);
return NULL;
}

static int taskRun(struct dqlite_node *d)
{
int rv;
Expand All @@ -697,6 +694,8 @@ static int taskRun(struct dqlite_node *d)
return rv;
}

rv = pthread_mutex_lock(&d->db_ctx->mutex);
assert(rv == 0);
rv = pthread_create(&d->db_thread, NULL, dbTask, d->db_ctx);
assert(rv == 0);

Expand Down Expand Up @@ -754,6 +753,9 @@ static int taskRun(struct dqlite_node *d)
return rv;
}

rv = pthread_mutex_unlock(&d->db_ctx->mutex);
assert(rv == 0);

rv = uv_run(&d->loop, UV_RUN_DEFAULT);
assert(rv == 0);

Expand Down

0 comments on commit f638fd7

Please sign in to comment.