diff --git a/src/IStorage.h b/src/IStorage.h index 79726daf5..82f8087a0 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -7,6 +7,13 @@ #define METADATA_DB_IDENTIFIER "c299fde0-6d42-4ec4-b939-34f680ffe39f" struct StorageToken { + enum class TokenType { + SingleRead, + SingleWrite, + Delete, + BatchWrite, + }; + TokenType type; std::unordered_set setc; struct redisDbPersistentData *db; virtual ~StorageToken() {} @@ -46,6 +53,9 @@ class IStorage virtual StorageToken *begin_retrieve(struct aeEventLoop *, aePostFunctionTokenProc, sds *, size_t) {return nullptr;}; virtual void complete_retrieve(StorageToken * /*tok*/, callbackSingle /*fn*/) {}; + virtual StorageToken* begin_endWriteBatch(struct aeEventLoop *, aePostFunctionTokenProc*) {} // NOP + virtual void complete_endWriteBatch(StorageToken * /*tok*/) {}; + virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) { beginWriteBatch(); for (size_t ielem = 0; ielem < celem; ++ielem) { diff --git a/src/StorageCache.cpp b/src/StorageCache.cpp index c0ff305b1..51d2e8c1f 100644 --- a/src/StorageCache.cpp +++ b/src/StorageCache.cpp @@ -233,4 +233,5 @@ void StorageCache::emergencyFreeCache() { dictRelease(d); }); } -} \ No newline at end of file +} + diff --git a/src/StorageCache.h b/src/StorageCache.h index 614f8c27b..828c657df 100644 --- a/src/StorageCache.h +++ b/src/StorageCache.h @@ -45,6 +45,8 @@ class StorageCache void retrieve(sds key, IStorage::callbackSingle fn) const; StorageToken *begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc proc, sds *rgkey, size_t ckey); void complete_retrieve(StorageToken *tok, IStorage::callbackSingle fn); + StorageToken* begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* proc) {m_spstorage->begin_endWriteBatch(el,proc);} // NOP + void complete_endWriteBatch(StorageToken *tok) {m_spstorage->complete_endWriteBatch(tok);}; bool erase(sds key); void emergencyFreeCache(); bool keycacheIsEnabled() const { return m_pdict != nullptr; } diff --git a/src/db.cpp b/src/db.cpp index 013473163..21961a6af 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -3072,6 +3072,8 @@ void redisDbPersistentData::bulkDirectStorageInsert(char **rgKeys, size_t *rgcbK void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree) { + + std::unordered_set setcBlocked; if (m_pdbSnapshotStorageFlush) { dictIterator *di = dictGetIterator(m_dictChangedStorageFlush); @@ -3086,8 +3088,22 @@ void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot ** *psnapshotFree = m_pdbSnapshotStorageFlush; m_pdbSnapshotStorageFlush = nullptr; } + if (m_spstorage != nullptr) - m_spstorage->endWriteBatch(); + { + auto tok = m_spstorage->begin_endWriteBatch(serverTL->el, storageLoadCallback); + if (tok != nullptr) + { + for (client *c : setcBlocked) //need to check how to push client to blocked list + { + if (!(c->flags & CLIENT_BLOCKED)) + blockClient(c, BLOCKED_STORAGE); + } + // tok->setc = std::move(setcBlocked); + tok->db = this; + tok->type = StorageToken::TokenType::BatchWrite; + } + } } redisDbPersistentData::~redisDbPersistentData() diff --git a/src/storage/rocksdb.cpp b/src/storage/rocksdb.cpp index 16aed43b7..1bae69d8c 100644 --- a/src/storage/rocksdb.cpp +++ b/src/storage/rocksdb.cpp @@ -255,6 +255,21 @@ void RocksDBStorageProvider::endWriteBatch() m_lock.unlock(); } +StorageToken* RocksDBStorageProvider::begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* callback) +{ + StorageToken *tok = new StorageToken(); + auto pbatch = m_spbatch.get(); + (*m_pfactory->m_wqueue)->AddWorkFunction([this, el,callback,tok,&pbatch]{ + m_spdb->Write(WriteOptions(),pbatch); + aePostFunction(el,callback,tok); + }); +} + +void RocksDBStorageProvider::complete_endWriteBatch(StorageToken* tok){ + // m_spbatch = nullptr; + m_lock.unlock(); +} + void RocksDBStorageProvider::batch_lock() { m_lock.lock(); diff --git a/src/storage/rocksdb.h b/src/storage/rocksdb.h index 01edb4975..d5587c9e9 100644 --- a/src/storage/rocksdb.h +++ b/src/storage/rocksdb.h @@ -42,6 +42,8 @@ class RocksDBStorageProvider : public IStorage virtual void beginWriteBatch() override; virtual void endWriteBatch() override; + virtual StorageToken* begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* proc); + virtual void complete_endWriteBatch(StorageToken *tok); virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) override;