Skip to content

Commit

Permalink
src/utils/postgres.h: define commit interval in metadata
Browse files Browse the repository at this point in the history
Previously we had the commit interval hardcoded to 2000 in postgres
producer. It is useful to make this configurable for cases where data
resiliency is more important than performance.

This commit introduces no functionality change as long as
commit_interval is not specified in schaufel configuration. In this
case, the default is unchanged and we commit once every 2000 messages.

Ref: #111
  • Loading branch information
alip committed Apr 5, 2023
1 parent d119875 commit 80e6d90
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
10 changes: 8 additions & 2 deletions src/postgres.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ postgres_producer_produce(Producer p, Message msg)
}

m->count = m->count + 1;
if (m->count == 2000)
if (m->count >= m->commit_interval)
{
commit(&m);
}
Expand Down Expand Up @@ -383,7 +383,7 @@ postgres_validate(config_setting_t *config)

Array master = NULL,replica = NULL;

int m, r, threads;
int m, r, threads, commit_interval;
bool ret = true;

// We need the parent list, because the postgres
Expand All @@ -394,6 +394,9 @@ postgres_validate(config_setting_t *config)
ret = false;
if(!CONF_L_IS_INT(config, "threads", &threads, "require a threads integer"))
ret = false;
if(!CONF_L_IS_INT(config, "commit_interval", &commit_interval,
"require a commit interval integer"))
commit_interval = 2000;
if(!ret) goto error;

master = parse_hostinfo_master((char*) hosts);
Expand Down Expand Up @@ -450,6 +453,9 @@ postgres_validate(config_setting_t *config)
setting = config_setting_add(instance, "threads", CONFIG_TYPE_INT);
config_setting_set_int(setting, threads);

setting = config_setting_add(instance, "commit_interval", CONFIG_TYPE_INT);
config_setting_set_int(setting, commit_interval);

if(r && (array_get(replica,i) != NULL)) {
setting = config_setting_add(instance, "replica", CONFIG_TYPE_STRING);
config_setting_set_string(setting, array_get(replica, i));
Expand Down
1 change: 1 addition & 0 deletions src/utils/postgres.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ typedef struct Meta {
int cpyfmt;
int count;
int copy;
int commit_interval;
int commit_iter;
pthread_mutex_t commit_mutex;
pthread_t commit_worker;
Expand Down

0 comments on commit 80e6d90

Please sign in to comment.