diff --git a/mysql-test/suite/rocksdb/write_sync.result b/mysql-test/suite/rocksdb/write_sync.result index a9ae0a8c1149..8de770756bf8 100644 --- a/mysql-test/suite/rocksdb/write_sync.result +++ b/mysql-test/suite/rocksdb/write_sync.result @@ -3,7 +3,7 @@ SET GLOBAL rocksdb_write_timeout_hint_us=555; SET GLOBAL rocksdb_write_ignore_missing_column_families=true; create table aaa (id int primary key, i int) engine rocksdb; select variable_value into @a from information_schema.global_status where variable_name='rocksdb_wal_synced'; -SET GLOBAL rocksdb_write_sync=off; +SET LOCAL rocksdb_write_sync=off; insert aaa(id, i) values(1,1); select variable_value into @b from information_schema.global_status where variable_name='rocksdb_wal_synced'; select @b-@a; @@ -22,7 +22,7 @@ select @b-@a; @b-@a 0 set @a=@b; -SET GLOBAL rocksdb_write_sync=on_commit; +SET LOCAL rocksdb_write_sync=1; insert aaa(id, i) values(4,1); select variable_value into @b from information_schema.global_status where variable_name='rocksdb_wal_synced'; select @b-@a; @@ -41,8 +41,37 @@ select @b-@a; @b-@a 1 set @a=@b; +SET GLOBAL rocksdb_background_sync=on; +SET LOCAL rocksdb_write_sync=off; +insert aaa(id, i) values(7,1); +select variable_value into @b from information_schema.global_status where variable_name='rocksdb_wal_synced'; +select @b-@a; +@b-@a +0 +set @a=@b; +insert aaa(id, i) values(8,1); +select variable_value into @b from information_schema.global_status where variable_name='rocksdb_wal_synced'; +select @b-@a; +@b-@a +0 +set @a=@b; +insert aaa(id, i) values(9,1); +select variable_value into @b from information_schema.global_status where variable_name='rocksdb_wal_synced'; +select @b-@a; +@b-@a +0 +set @a=@b; +select variable_value into @b from information_schema.global_status where variable_name='rocksdb_wal_synced'; +select @b-@a; +@b-@a +1 +set @a=@b; +SET LOCAL rocksdb_write_timeout_hint_us=333; +truncate table aaa; +SET LOCAL rocksdb_write_timeout_hint_us=444; drop table aaa; SET GLOBAL rocksdb_write_sync=off; SET GLOBAL rocksdb_write_disable_wal=false; SET GLOBAL rocksdb_write_timeout_hint_us=0; SET GLOBAL rocksdb_write_ignore_missing_column_families=false; +SET GLOBAL rocksdb_background_sync=off; diff --git a/mysql-test/suite/rocksdb/write_sync.test b/mysql-test/suite/rocksdb/write_sync.test index 8a53573b2bb6..ef898dfe84de 100644 --- a/mysql-test/suite/rocksdb/write_sync.test +++ b/mysql-test/suite/rocksdb/write_sync.test @@ -5,7 +5,7 @@ SET GLOBAL rocksdb_write_ignore_missing_column_families=true; create table aaa (id int primary key, i int) engine rocksdb; select variable_value into @a from information_schema.global_status where variable_name='rocksdb_wal_synced'; -SET GLOBAL rocksdb_write_sync=off; +SET LOCAL rocksdb_write_sync=off; insert aaa(id, i) values(1,1); select variable_value into @b from information_schema.global_status where variable_name='rocksdb_wal_synced'; select @b-@a; @@ -19,7 +19,7 @@ select variable_value into @b from information_schema.global_status where variab select @b-@a; set @a=@b; -SET GLOBAL rocksdb_write_sync=on_commit; +SET LOCAL rocksdb_write_sync=1; insert aaa(id, i) values(4,1); select variable_value into @b from information_schema.global_status where variable_name='rocksdb_wal_synced'; select @b-@a; @@ -33,10 +33,37 @@ select variable_value into @b from information_schema.global_status where variab select @b-@a; set @a=@b; +SET GLOBAL rocksdb_background_sync=on; +SET LOCAL rocksdb_write_sync=off; +insert aaa(id, i) values(7,1); +select variable_value into @b from information_schema.global_status where variable_name='rocksdb_wal_synced'; +select @b-@a; +set @a=@b; +insert aaa(id, i) values(8,1); +select variable_value into @b from information_schema.global_status where variable_name='rocksdb_wal_synced'; +select @b-@a; +set @a=@b; +insert aaa(id, i) values(9,1); +select variable_value into @b from information_schema.global_status where variable_name='rocksdb_wal_synced'; +select @b-@a; +set @a=@b; + +let $status_var=rocksdb_wal_synced; +let $status_var_value=5; +source include/wait_for_status_var.inc; +select variable_value into @b from information_schema.global_status where variable_name='rocksdb_wal_synced'; +select @b-@a; +set @a=@b; + +SET LOCAL rocksdb_write_timeout_hint_us=333; +truncate table aaa; + # Cleanup +SET LOCAL rocksdb_write_timeout_hint_us=444; drop table aaa; SET GLOBAL rocksdb_write_sync=off; SET GLOBAL rocksdb_write_disable_wal=false; SET GLOBAL rocksdb_write_timeout_hint_us=0; SET GLOBAL rocksdb_write_ignore_missing_column_families=false; +SET GLOBAL rocksdb_background_sync=off; diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index 0ae9607b0381..b49557cd4c2d 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -49,6 +49,7 @@ static handler *rocksdb_create_handler(handlerton *hton, MEM_ROOT *mem_root); void key_copy(uchar *to_key, uchar *from_record, KEY *key_info, uint key_length); +void* background_thread(void*); /////////////////////////////////////////////////////////// // Parameters and settings @@ -83,6 +84,10 @@ static HASH rocksdb_open_tables; /* The mutex used to init the hash; variable for example share methods */ mysql_mutex_t rocksdb_mutex; +mysql_mutex_t background_mutex; +mysql_mutex_t stop_cond_mutex; +mysql_cond_t stop_cond; +bool stop_background_thread; ////////////////////////////////////////////////////////////////////////////// @@ -118,8 +123,7 @@ static long long rocksdb_block_cache_size; static uint64_t rocksdb_info_log_level; static char * rocksdb_wal_dir; static uint64_t rocksdb_index_type; -static uint64_t rocksdb_write_sync; -static rocksdb::WriteOptions rocksdb_write_options; +static char rocksdb_background_sync; static rocksdb::DBOptions init_db_options() { rocksdb::DBOptions o; @@ -159,27 +163,6 @@ static TYPELIB index_type_typelib = { nullptr }; -enum write_sync_options { - WRITE_SYNC_OFF, - WRITE_SYNC_ON_COMMIT, - WRITE_SYNC_BACKGROUND -}; - -static const char* write_sync_names[] = { - "off", - "on_commit", - "background", - NullS -}; - -static TYPELIB write_sync_typelib = { - array_elements(write_sync_names) - 1, - "write_sync_typelib", - write_sync_names, - nullptr -}; - - //TODO: 0 means don't wait at all, and we don't support it yet? static MYSQL_THDVAR_ULONG(lock_wait_timeout, PLUGIN_VAR_RQCMDARG, "Number of seconds to wait for lock", @@ -478,31 +461,32 @@ static MYSQL_SYSVAR_STR(cf_options_file, rocksdb_cf_options_file, rocksdb_cf_options_file_validate, rocksdb_cf_options_file_update, ""); -static MYSQL_SYSVAR_ENUM(write_sync, - rocksdb_write_sync, +static MYSQL_SYSVAR_BOOL(background_sync, + rocksdb_background_sync, PLUGIN_VAR_RQCMDARG, - "WriteOptions::write_sync for RocksDB", - NULL, NULL, WRITE_SYNC_OFF, &write_sync_typelib); + "turns on background syncs for RocksDB", + NULL, NULL, FALSE); -static MYSQL_SYSVAR_BOOL(write_disable_wal, - *reinterpret_cast(&rocksdb_write_options.disableWAL), +static MYSQL_THDVAR_BOOL(write_sync, + PLUGIN_VAR_RQCMDARG, + "WriteOptions::sync for RocksDB", + NULL, NULL, rocksdb::WriteOptions().sync); + +static MYSQL_THDVAR_BOOL(write_disable_wal, PLUGIN_VAR_RQCMDARG, "WriteOptions::disableWAL for RocksDB", - NULL, NULL, rocksdb_write_options.disableWAL); + NULL, NULL, rocksdb::WriteOptions().disableWAL); -static MYSQL_SYSVAR_ULONG(write_timeout_hint_us, - rocksdb_write_options.timeout_hint_us, +static MYSQL_THDVAR_ULONG(write_timeout_hint_us, PLUGIN_VAR_RQCMDARG, "WriteOptions::timeout_hint_us for RocksDB", - NULL, NULL, rocksdb_write_options.timeout_hint_us, + NULL, NULL, rocksdb::WriteOptions().timeout_hint_us, /* min */ 0L, /* max */ LONG_MAX, 0); -static MYSQL_SYSVAR_BOOL(write_ignore_missing_column_families, - *reinterpret_cast( - &rocksdb_write_options.ignore_missing_column_families), +static MYSQL_THDVAR_BOOL(write_ignore_missing_column_families, PLUGIN_VAR_RQCMDARG, "WriteOptions::ignore_missing_column_families for RocksDB", - NULL, NULL, rocksdb_write_options.ignore_missing_column_families); + NULL, NULL, rocksdb::WriteOptions().ignore_missing_column_families); const longlong ROCKSDB_WRITE_BUFFER_SIZE_DEFAULT=4194304; @@ -559,6 +543,8 @@ static struct st_mysql_sys_var* rocksdb_system_variables[]= { MYSQL_SYSVAR(default_cf_options), MYSQL_SYSVAR(cf_options_file), + MYSQL_SYSVAR(background_sync), + MYSQL_SYSVAR(write_sync), MYSQL_SYSVAR(write_disable_wal), MYSQL_SYSVAR(write_timeout_hint_us), @@ -567,20 +553,15 @@ static struct st_mysql_sys_var* rocksdb_system_variables[]= { NULL }; -static rocksdb::WriteOptions get_write_options() { - rocksdb::WriteOptions opt(rocksdb_write_options); - switch (rocksdb_write_sync) { - case WRITE_SYNC_OFF: - opt.sync = false; - break; - case WRITE_SYNC_ON_COMMIT: - opt.sync = true; - break; - case WRITE_SYNC_BACKGROUND: - // this option is not implemented yet - DBUG_ASSERT(0); - break; - } +rocksdb::WriteOptions get_write_options(THD* thd) +{ + rocksdb::WriteOptions opt; + + opt.sync = THDVAR(thd, write_sync); + opt.disableWAL = THDVAR(thd, write_disable_wal); + opt.timeout_hint_us = THDVAR(thd, write_timeout_hint_us); + opt.ignore_missing_column_families = + THDVAR(thd, write_ignore_missing_column_families); return opt; } @@ -605,18 +586,35 @@ static uchar* rocksdb_get_key(ROCKSDB_SHARE *share, size_t *length, PSI_stage_info stage_waiting_on_row_lock= { 0, "Waiting for row lock", 0}; #ifdef HAVE_PSI_INTERFACE +static PSI_thread_key key_thread_background; + static PSI_stage_info *all_rocksdb_stages[]= { & stage_waiting_on_row_lock }; -static PSI_mutex_key ex_key_mutex_example, ex_key_mutex_ROCKSDB_SHARE_mutex; +static PSI_mutex_key ex_key_mutex_example, ex_key_mutex_ROCKSDB_SHARE_mutex, + key_mutex_background, key_mutex_stop_background; static PSI_mutex_info all_rocksdb_mutexes[]= { { &ex_key_mutex_example, "rocksdb", PSI_FLAG_GLOBAL}, - { &ex_key_mutex_ROCKSDB_SHARE_mutex, "ROCKSDB_SHARE::mutex", 0} + { &ex_key_mutex_ROCKSDB_SHARE_mutex, "ROCKSDB_SHARE::mutex", 0}, + { &key_mutex_background, "background", PSI_FLAG_GLOBAL}, + { &key_mutex_stop_background, "stop background", PSI_FLAG_GLOBAL}, +}; + +PSI_cond_key key_cond_stop; + +static PSI_cond_info all_rocksdb_conds[]= +{ + { &key_cond_stop, "cond_stop", PSI_FLAG_GLOBAL} +}; + +static PSI_thread_info all_rocksdb_threads[]= +{ + { &key_thread_background, "background", PSI_FLAG_GLOBAL}, }; static void init_rocksdb_psi_keys() @@ -630,12 +628,17 @@ static void init_rocksdb_psi_keys() count= array_elements(all_rocksdb_mutexes); PSI_server->register_mutex(category, all_rocksdb_mutexes, count); + count= array_elements(all_rocksdb_conds); + PSI_server->register_cond(category, all_rocksdb_conds, count); + count= array_elements(all_rocksdb_stages); mysql_stage_register(category, all_rocksdb_stages, count); + + count= array_elements(all_server_threads); + mysql_thread_register(category, all_server_threads, count); } #endif - static int bytewise_compare(const rocksdb::Slice& a, const rocksdb::Slice& b) { size_t a_size= a.size(); @@ -785,10 +788,10 @@ class Rdb_transaction trx_locks.clear(); } - bool commit() + bool commit(THD* thd) { bool res= false; - flush_batch(); + flush_batch(thd); /* rollback() will delete snapshot, batch and locks */ rollback(); return res; @@ -796,7 +799,7 @@ class Rdb_transaction private: - int flush_batch_intern() + int flush_batch_intern(THD* thd) { bool res= false; rocksdb::WriteBatch batch; @@ -820,15 +823,15 @@ class Rdb_transaction } } } - rocksdb::Status s= rdb->Write(get_write_options(), &batch); + rocksdb::Status s= rdb->Write(get_write_options(thd), &batch); res= !s.ok(); // we return true when something failed return res; } public: - int flush_batch() + int flush_batch(THD* thd) { - bool bres= flush_batch_intern(); + bool bres= flush_batch_intern(thd); changes.reinit(); n_writes= 0; return bres; @@ -923,7 +926,7 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_trx, bool) { Rdb_transaction*& trx= get_trx_from_thd(thd); if (trx) - trx->commit(); // todo: return error code. + trx->commit(thd); // todo: return error code. } else { @@ -1020,6 +1023,10 @@ static int rocksdb_init_func(void *p) rocksdb_hton= (handlerton *)p; mysql_mutex_init(ex_key_mutex_example, &rocksdb_mutex, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_mutex_background, &background_mutex, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_mutex_stop_background, &stop_cond_mutex, + MY_MUTEX_INIT_FAST); + mysql_cond_init(key_cond_stop, &stop_cond, NULL); (void) my_hash_init(&rocksdb_open_tables,system_charset_info,32,0,0, (my_hash_get_key) rocksdb_get_key,0,0); @@ -1143,6 +1150,19 @@ static int rocksdb_init_func(void *p) if (ddl_manager.init(rdb)) DBUG_RETURN(1); + stop_background_thread = false; + pthread_t thread_handle; + auto err = mysql_thread_create( + key_thread_background, &thread_handle, + nullptr, + background_thread, NULL + ); + if (err != 0) { + sql_print_error("RocksDB: Couldn't start the background thread: (errno=%d)", + err); + DBUG_RETURN(1); + } + sql_print_information("RocksDB instance opened"); DBUG_RETURN(0); } @@ -1151,6 +1171,17 @@ static int rocksdb_done_func(void *p) { int error= 0; DBUG_ENTER("rocksdb_done_func"); + + // signal the background thread to stop + mysql_mutex_lock(&stop_cond_mutex); + stop_background_thread = true; + mysql_cond_signal(&stop_cond); + mysql_mutex_unlock(&stop_cond_mutex); + + // wait for the background thread to finish + mysql_mutex_lock(&background_mutex); + mysql_mutex_unlock(&background_mutex); + if (rocksdb_open_tables.records) error= 1; my_hash_free(&rocksdb_open_tables); @@ -1299,7 +1330,9 @@ static handler* rocksdb_create_handler(handlerton *hton, ha_rocksdb::ha_rocksdb(handlerton *hton, TABLE_SHARE *table_arg) - : handler(hton, table_arg), scan_it(NULL), pk_descr(NULL), + : handler(hton, table_arg), scan_it(NULL), + tbl_def(NULL), + pk_descr(NULL), key_descr(NULL), pk_can_be_decoded(false), pk_tuple(NULL), pk_packed_tuple(NULL), @@ -2861,7 +2894,7 @@ int ha_rocksdb::write_row(uchar *buf) if (!do_locking && trx->n_writes > THDVAR(table->in_use, bulk_load_size)) { /* Bulk insert mode: flush the write batch if it became too big */ - if (trx->flush_batch()) + if (trx->flush_batch(table->in_use)) DBUG_RETURN(HA_ERR_INTERNAL_ERROR); } @@ -3334,7 +3367,7 @@ int ha_rocksdb::external_lock(THD *thd, int lock_type) will be no-ops. */ Rdb_transaction* trx= get_or_create_trx(thd); - res= trx->commit(); + res= trx->commit(thd); delete scan_it; scan_it= NULL; } @@ -3422,6 +3455,8 @@ int ha_rocksdb::delete_table(const char *tablename) void ha_rocksdb::remove_rows(RDBSE_TABLE_DEF *tbl) { + rocksdb::WriteOptions wo = get_write_options(ha_thd()); + rocksdb::ReadOptions opts; opts.total_order_seek= true; rocksdb::Iterator* it= rdb->NewIterator(opts); @@ -3444,7 +3479,7 @@ void ha_rocksdb::remove_rows(RDBSE_TABLE_DEF *tbl) rocksdb::Slice key= it->key(); if (!tbl->key_descr[i]->covers_key(key.data(), key.size())) break; - rdb->Delete(get_write_options(), key); + rdb->Delete(wo, key); it->Next(); } } @@ -3798,3 +3833,32 @@ ulong Primary_key_comparator::get_hashnr(const char *key, size_t key_len) return((ulong) nr); } +void* background_thread(void*) +{ + mysql_mutex_lock(&background_mutex); + mysql_mutex_lock(&stop_cond_mutex); + + rocksdb::WriteBatch wb; + for (;;) { + timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec++; + // wait for 1 second + auto ret = mysql_cond_timedwait(&stop_cond, &stop_cond_mutex, &ts); + if (ret != ETIMEDOUT && stop_background_thread) { + assert(ret == 0); + break; + } + + if (rdb && rocksdb_background_sync) { + auto wo = rocksdb::WriteOptions(); + wo.sync = true; + rdb->Write(wo, &wb); + } + } + + mysql_mutex_unlock(&stop_cond_mutex); + mysql_mutex_unlock(&background_mutex); + + return nullptr; +}