Skip to content

Commit

Permalink
Implemented the background thread for periodic syncing
Browse files Browse the repository at this point in the history
Summary:
Starting a new thread which is used to periodically sync writes.

Changed sysvars to directly mimic WriteOptions. Introduced a separate global sysvar to control the background sync thread.

This diff is for this issue: MariaDB/webscalesql-5.6#15

Test Plan:
ran the test. A number of syncs doesn't go up in the background mode.
It goes up after a second sleep
Checked in gdb that correct write_options are set for truncate table and drop table

Reviewers: jonahcohen, sergey.petrunya.9, MarkCallaghan

Reviewed By: MarkCallaghan

Differential Revision: https://reviews.facebook.net/D31221
  • Loading branch information
maykov authored and Gunnar Kudrjavets committed Jul 21, 2017
1 parent b97d77f commit abe784b
Showing 1 changed file with 130 additions and 65 deletions.
195 changes: 130 additions & 65 deletions storage/rocksdb/ha_rocksdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ static handler *rocksdb_create_handler(handlerton *hton,
MEM_ROOT *mem_root);
void key_copy(uchar *to_key, uchar *from_record, KEY *key_info,
uint key_length);
void* background_thread(void*);

///////////////////////////////////////////////////////////
// Parameters and settings
Expand Down Expand Up @@ -83,6 +84,10 @@ static HASH rocksdb_open_tables;

/* The mutex used to init the hash; variable for example share methods */
mysql_mutex_t rocksdb_mutex;
mysql_mutex_t background_mutex;
mysql_mutex_t stop_cond_mutex;
mysql_cond_t stop_cond;
bool stop_background_thread;


//////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -118,8 +123,7 @@ static long long rocksdb_block_cache_size;
static uint64_t rocksdb_info_log_level;
static char * rocksdb_wal_dir;
static uint64_t rocksdb_index_type;
static uint64_t rocksdb_write_sync;
static rocksdb::WriteOptions rocksdb_write_options;
static char rocksdb_background_sync;

static rocksdb::DBOptions init_db_options() {
rocksdb::DBOptions o;
Expand Down Expand Up @@ -159,27 +163,6 @@ static TYPELIB index_type_typelib = {
nullptr
};

enum write_sync_options {
WRITE_SYNC_OFF,
WRITE_SYNC_ON_COMMIT,
WRITE_SYNC_BACKGROUND
};

static const char* write_sync_names[] = {
"off",
"on_commit",
"background",
NullS
};

static TYPELIB write_sync_typelib = {
array_elements(write_sync_names) - 1,
"write_sync_typelib",
write_sync_names,
nullptr
};


//TODO: 0 means don't wait at all, and we don't support it yet?
static MYSQL_THDVAR_ULONG(lock_wait_timeout, PLUGIN_VAR_RQCMDARG,
"Number of seconds to wait for lock",
Expand Down Expand Up @@ -478,31 +461,32 @@ static MYSQL_SYSVAR_STR(cf_options_file, rocksdb_cf_options_file,
rocksdb_cf_options_file_validate,
rocksdb_cf_options_file_update, "");

static MYSQL_SYSVAR_ENUM(write_sync,
rocksdb_write_sync,
static MYSQL_SYSVAR_BOOL(background_sync,
rocksdb_background_sync,
PLUGIN_VAR_RQCMDARG,
"WriteOptions::write_sync for RocksDB",
NULL, NULL, WRITE_SYNC_OFF, &write_sync_typelib);
"turns on background syncs for RocksDB",
NULL, NULL, FALSE);

static MYSQL_SYSVAR_BOOL(write_disable_wal,
*reinterpret_cast<my_bool*>(&rocksdb_write_options.disableWAL),
static MYSQL_THDVAR_BOOL(write_sync,
PLUGIN_VAR_RQCMDARG,
"WriteOptions::sync for RocksDB",
NULL, NULL, rocksdb::WriteOptions().sync);

static MYSQL_THDVAR_BOOL(write_disable_wal,
PLUGIN_VAR_RQCMDARG,
"WriteOptions::disableWAL for RocksDB",
NULL, NULL, rocksdb_write_options.disableWAL);
NULL, NULL, rocksdb::WriteOptions().disableWAL);

static MYSQL_SYSVAR_ULONG(write_timeout_hint_us,
rocksdb_write_options.timeout_hint_us,
static MYSQL_THDVAR_ULONG(write_timeout_hint_us,
PLUGIN_VAR_RQCMDARG,
"WriteOptions::timeout_hint_us for RocksDB",
NULL, NULL, rocksdb_write_options.timeout_hint_us,
NULL, NULL, rocksdb::WriteOptions().timeout_hint_us,
/* min */ 0L, /* max */ LONG_MAX, 0);

static MYSQL_SYSVAR_BOOL(write_ignore_missing_column_families,
*reinterpret_cast<my_bool*>(
&rocksdb_write_options.ignore_missing_column_families),
static MYSQL_THDVAR_BOOL(write_ignore_missing_column_families,
PLUGIN_VAR_RQCMDARG,
"WriteOptions::ignore_missing_column_families for RocksDB",
NULL, NULL, rocksdb_write_options.ignore_missing_column_families);
NULL, NULL, rocksdb::WriteOptions().ignore_missing_column_families);

const longlong ROCKSDB_WRITE_BUFFER_SIZE_DEFAULT=4194304;

Expand Down Expand Up @@ -559,6 +543,8 @@ static struct st_mysql_sys_var* rocksdb_system_variables[]= {
MYSQL_SYSVAR(default_cf_options),
MYSQL_SYSVAR(cf_options_file),

MYSQL_SYSVAR(background_sync),

MYSQL_SYSVAR(write_sync),
MYSQL_SYSVAR(write_disable_wal),
MYSQL_SYSVAR(write_timeout_hint_us),
Expand All @@ -567,20 +553,15 @@ static struct st_mysql_sys_var* rocksdb_system_variables[]= {
NULL
};

static rocksdb::WriteOptions get_write_options() {
rocksdb::WriteOptions opt(rocksdb_write_options);
switch (rocksdb_write_sync) {
case WRITE_SYNC_OFF:
opt.sync = false;
break;
case WRITE_SYNC_ON_COMMIT:
opt.sync = true;
break;
case WRITE_SYNC_BACKGROUND:
// this option is not implemented yet
DBUG_ASSERT(0);
break;
}
rocksdb::WriteOptions get_write_options(THD* thd)
{
rocksdb::WriteOptions opt;

opt.sync = THDVAR(thd, write_sync);
opt.disableWAL = THDVAR(thd, write_disable_wal);
opt.timeout_hint_us = THDVAR(thd, write_timeout_hint_us);
opt.ignore_missing_column_families =
THDVAR(thd, write_ignore_missing_column_families);
return opt;
}

Expand All @@ -605,18 +586,35 @@ static uchar* rocksdb_get_key(ROCKSDB_SHARE *share, size_t *length,
PSI_stage_info stage_waiting_on_row_lock= { 0, "Waiting for row lock", 0};

#ifdef HAVE_PSI_INTERFACE
static PSI_thread_key key_thread_background;

static PSI_stage_info *all_rocksdb_stages[]=
{
& stage_waiting_on_row_lock
};


static PSI_mutex_key ex_key_mutex_example, ex_key_mutex_ROCKSDB_SHARE_mutex;
static PSI_mutex_key ex_key_mutex_example, ex_key_mutex_ROCKSDB_SHARE_mutex,
key_mutex_background, key_mutex_stop_background;

static PSI_mutex_info all_rocksdb_mutexes[]=
{
{ &ex_key_mutex_example, "rocksdb", PSI_FLAG_GLOBAL},
{ &ex_key_mutex_ROCKSDB_SHARE_mutex, "ROCKSDB_SHARE::mutex", 0}
{ &ex_key_mutex_ROCKSDB_SHARE_mutex, "ROCKSDB_SHARE::mutex", 0},
{ &key_mutex_background, "background", PSI_FLAG_GLOBAL},
{ &key_mutex_stop_background, "stop background", PSI_FLAG_GLOBAL},
};

PSI_cond_key key_cond_stop;

static PSI_cond_info all_rocksdb_conds[]=
{
{ &key_cond_stop, "cond_stop", PSI_FLAG_GLOBAL}
};

static PSI_thread_info all_rocksdb_threads[]=
{
{ &key_thread_background, "background", PSI_FLAG_GLOBAL},
};

static void init_rocksdb_psi_keys()
Expand All @@ -630,12 +628,17 @@ static void init_rocksdb_psi_keys()
count= array_elements(all_rocksdb_mutexes);
PSI_server->register_mutex(category, all_rocksdb_mutexes, count);

count= array_elements(all_rocksdb_conds);
PSI_server->register_cond(category, all_rocksdb_conds, count);

count= array_elements(all_rocksdb_stages);
mysql_stage_register(category, all_rocksdb_stages, count);

count= array_elements(all_server_threads);
mysql_thread_register(category, all_server_threads, count);
}
#endif


static int bytewise_compare(const rocksdb::Slice& a, const rocksdb::Slice& b)
{
size_t a_size= a.size();
Expand Down Expand Up @@ -785,18 +788,18 @@ class Rdb_transaction
trx_locks.clear();
}

bool commit()
bool commit(THD* thd)
{
bool res= false;
flush_batch();
flush_batch(thd);
/* rollback() will delete snapshot, batch and locks */
rollback();
return res;
}

private:

int flush_batch_intern()
int flush_batch_intern(THD* thd)
{
bool res= false;
rocksdb::WriteBatch batch;
Expand All @@ -820,15 +823,15 @@ class Rdb_transaction
}
}
}
rocksdb::Status s= rdb->Write(get_write_options(), &batch);
rocksdb::Status s= rdb->Write(get_write_options(thd), &batch);
res= !s.ok(); // we return true when something failed
return res;
}

public:
int flush_batch()
int flush_batch(THD* thd)
{
bool bres= flush_batch_intern();
bool bres= flush_batch_intern(thd);
changes.reinit();
n_writes= 0;
return bres;
Expand Down Expand Up @@ -923,7 +926,7 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_trx, bool)
{
Rdb_transaction*& trx= get_trx_from_thd(thd);
if (trx)
trx->commit(); // todo: return error code.
trx->commit(thd); // todo: return error code.
}
else
{
Expand Down Expand Up @@ -1020,6 +1023,10 @@ static int rocksdb_init_func(void *p)

rocksdb_hton= (handlerton *)p;
mysql_mutex_init(ex_key_mutex_example, &rocksdb_mutex, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_mutex_background, &background_mutex, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_mutex_stop_background, &stop_cond_mutex,
MY_MUTEX_INIT_FAST);
mysql_cond_init(key_cond_stop, &stop_cond, NULL);
(void) my_hash_init(&rocksdb_open_tables,system_charset_info,32,0,0,
(my_hash_get_key) rocksdb_get_key,0,0);

Expand Down Expand Up @@ -1143,6 +1150,19 @@ static int rocksdb_init_func(void *p)
if (ddl_manager.init(rdb))
DBUG_RETURN(1);

stop_background_thread = false;
pthread_t thread_handle;
auto err = mysql_thread_create(
key_thread_background, &thread_handle,
nullptr,
background_thread, NULL
);
if (err != 0) {
sql_print_error("RocksDB: Couldn't start the background thread: (errno=%d)",
err);
DBUG_RETURN(1);
}

sql_print_information("RocksDB instance opened");
DBUG_RETURN(0);
}
Expand All @@ -1151,6 +1171,17 @@ static int rocksdb_done_func(void *p)
{
int error= 0;
DBUG_ENTER("rocksdb_done_func");

// signal the background thread to stop
mysql_mutex_lock(&stop_cond_mutex);
stop_background_thread = true;
mysql_cond_signal(&stop_cond);
mysql_mutex_unlock(&stop_cond_mutex);

// wait for the background thread to finish
mysql_mutex_lock(&background_mutex);
mysql_mutex_unlock(&background_mutex);

if (rocksdb_open_tables.records)
error= 1;
my_hash_free(&rocksdb_open_tables);
Expand Down Expand Up @@ -1299,7 +1330,9 @@ static handler* rocksdb_create_handler(handlerton *hton,


ha_rocksdb::ha_rocksdb(handlerton *hton, TABLE_SHARE *table_arg)
: handler(hton, table_arg), scan_it(NULL), pk_descr(NULL),
: handler(hton, table_arg), scan_it(NULL),
tbl_def(NULL),
pk_descr(NULL),
key_descr(NULL),
pk_can_be_decoded(false),
pk_tuple(NULL), pk_packed_tuple(NULL),
Expand Down Expand Up @@ -2861,7 +2894,7 @@ int ha_rocksdb::write_row(uchar *buf)
if (!do_locking && trx->n_writes > THDVAR(table->in_use, bulk_load_size))
{
/* Bulk insert mode: flush the write batch if it became too big */
if (trx->flush_batch())
if (trx->flush_batch(table->in_use))
DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
}

Expand Down Expand Up @@ -3334,7 +3367,7 @@ int ha_rocksdb::external_lock(THD *thd, int lock_type)
will be no-ops.
*/
Rdb_transaction* trx= get_or_create_trx(thd);
res= trx->commit();
res= trx->commit(thd);
delete scan_it;
scan_it= NULL;
}
Expand Down Expand Up @@ -3422,6 +3455,8 @@ int ha_rocksdb::delete_table(const char *tablename)

void ha_rocksdb::remove_rows(RDBSE_TABLE_DEF *tbl)
{
rocksdb::WriteOptions wo = get_write_options(ha_thd());

rocksdb::ReadOptions opts;
opts.total_order_seek= true;
rocksdb::Iterator* it= rdb->NewIterator(opts);
Expand All @@ -3444,7 +3479,7 @@ void ha_rocksdb::remove_rows(RDBSE_TABLE_DEF *tbl)
rocksdb::Slice key= it->key();
if (!tbl->key_descr[i]->covers_key(key.data(), key.size()))
break;
rdb->Delete(get_write_options(), key);
rdb->Delete(wo, key);
it->Next();
}
}
Expand Down Expand Up @@ -3797,3 +3832,33 @@ ulong Primary_key_comparator::get_hashnr(const char *key, size_t key_len)
&nr, &nr2);
return((ulong) nr);
}

void* background_thread(void*)
{
mysql_mutex_lock(&background_mutex);
mysql_mutex_lock(&stop_cond_mutex);

rocksdb::WriteBatch wb;
for (;;) {
timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec++;
// wait for 1 second
auto ret = mysql_cond_timedwait(&stop_cond, &stop_cond_mutex, &ts);
if (ret != ETIMEDOUT && stop_background_thread) {
assert(ret == 0);
break;
}

if (rdb && rocksdb_background_sync) {
auto wo = rocksdb::WriteOptions();
wo.sync = true;
rdb->Write(wo, &wb);
}
}

mysql_mutex_unlock(&stop_cond_mutex);
mysql_mutex_unlock(&background_mutex);

return nullptr;
}

0 comments on commit abe784b

Please sign in to comment.