Skip to content

Commit

Permalink
commitChanges async api
Browse files Browse the repository at this point in the history
  • Loading branch information
a00817524 committed Oct 24, 2023
1 parent b0935de commit b0f0939
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 2 deletions.
10 changes: 10 additions & 0 deletions src/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@
#define METADATA_DB_IDENTIFIER "c299fde0-6d42-4ec4-b939-34f680ffe39f"

struct StorageToken {
enum class TokenType {
SingleRead,
SingleWrite,
Delete,
BatchWrite,
};
TokenType type;
std::unordered_set<struct client *> setc;
struct redisDbPersistentData *db;
virtual ~StorageToken() {}
Expand Down Expand Up @@ -46,6 +53,9 @@ class IStorage
virtual StorageToken *begin_retrieve(struct aeEventLoop *, aePostFunctionTokenProc, sds *, size_t) {return nullptr;};
virtual void complete_retrieve(StorageToken * /*tok*/, callbackSingle /*fn*/) {};

virtual StorageToken* begin_endWriteBatch(struct aeEventLoop *, aePostFunctionTokenProc*) {} // NOP
virtual void complete_endWriteBatch(StorageToken * /*tok*/) {};

virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) {
beginWriteBatch();
for (size_t ielem = 0; ielem < celem; ++ielem) {
Expand Down
3 changes: 2 additions & 1 deletion src/StorageCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,5 @@ void StorageCache::emergencyFreeCache() {
dictRelease(d);
});
}
}
}

2 changes: 2 additions & 0 deletions src/StorageCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class StorageCache
void retrieve(sds key, IStorage::callbackSingle fn) const;
StorageToken *begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc proc, sds *rgkey, size_t ckey);
void complete_retrieve(StorageToken *tok, IStorage::callbackSingle fn);
StorageToken* begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* proc) {m_spstorage->begin_endWriteBatch(el,proc);} // NOP
void complete_endWriteBatch(StorageToken *tok) {m_spstorage->complete_endWriteBatch(tok);};
bool erase(sds key);
void emergencyFreeCache();
bool keycacheIsEnabled() const { return m_pdict != nullptr; }
Expand Down
18 changes: 17 additions & 1 deletion src/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3072,6 +3072,8 @@ void redisDbPersistentData::bulkDirectStorageInsert(char **rgKeys, size_t *rgcbK

void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree)
{

std::unordered_set<client *> setcBlocked;
if (m_pdbSnapshotStorageFlush)
{
dictIterator *di = dictGetIterator(m_dictChangedStorageFlush);
Expand All @@ -3086,8 +3088,22 @@ void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **
*psnapshotFree = m_pdbSnapshotStorageFlush;
m_pdbSnapshotStorageFlush = nullptr;
}

if (m_spstorage != nullptr)
m_spstorage->endWriteBatch();
{
auto tok = m_spstorage->begin_endWriteBatch(serverTL->el, storageLoadCallback);
if (tok != nullptr)
{
for (client *c : setcBlocked) //need to check how to push client to blocked list
{
if (!(c->flags & CLIENT_BLOCKED))
blockClient(c, BLOCKED_STORAGE);
}
// tok->setc = std::move(setcBlocked);
tok->db = this;
tok->type = StorageToken::TokenType::BatchWrite;
}
}
}

redisDbPersistentData::~redisDbPersistentData()
Expand Down
15 changes: 15 additions & 0 deletions src/storage/rocksdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,21 @@ void RocksDBStorageProvider::endWriteBatch()
m_lock.unlock();
}

StorageToken* RocksDBStorageProvider::begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* callback)
{
StorageToken *tok = new StorageToken();
auto pbatch = m_spbatch.get();
(*m_pfactory->m_wqueue)->AddWorkFunction([this, el,callback,tok,&pbatch]{
m_spdb->Write(WriteOptions(),pbatch);
aePostFunction(el,callback,tok);
});
}

void RocksDBStorageProvider::complete_endWriteBatch(StorageToken* tok){
// m_spbatch = nullptr;
m_lock.unlock();
}

void RocksDBStorageProvider::batch_lock()
{
m_lock.lock();
Expand Down
2 changes: 2 additions & 0 deletions src/storage/rocksdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class RocksDBStorageProvider : public IStorage

virtual void beginWriteBatch() override;
virtual void endWriteBatch() override;
virtual StorageToken* begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* proc);
virtual void complete_endWriteBatch(StorageToken *tok);

virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) override;

Expand Down

0 comments on commit b0f0939

Please sign in to comment.