From ffac55273a322d53c32372536cd3043d0b413989 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 21 Aug 2023 16:36:45 -0400 Subject: [PATCH] Remove Expireset (#217) Major refactor to place expiry information directly in the object struct. --- src/aof.cpp | 3 +- src/cluster.cpp | 1 - src/config.cpp | 2 +- src/db.cpp | 216 ++++++++++++++------------------------- src/debug.cpp | 13 +-- src/defrag.cpp | 25 +---- src/evict.cpp | 57 +++-------- src/expire.cpp | 244 ++++++++++++++++++++++++++++++-------------- src/expire.h | 216 +++++++++++++++++++++------------------ src/lazyfree.cpp | 8 +- src/module.cpp | 7 +- src/object.cpp | 15 +-- src/rdb.cpp | 12 +-- src/server.cpp | 21 ++-- src/server.h | 43 ++++---- src/snapshot.cpp | 16 +-- tests/unit/cron.tcl | 2 +- 17 files changed, 423 insertions(+), 478 deletions(-) diff --git a/src/aof.cpp b/src/aof.cpp index e529b4b0e..2f367ee4b 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -1592,8 +1592,7 @@ int rewriteAppendOnlyFileRio(rio *aof) { } /* Save the expire time */ if (o->FExpires()) { - std::unique_lock ul(g_expireLock); - expireEntry *pexpire = db->getExpire(&key); + expireEntry *pexpire = &o->expire; for (auto &subExpire : *pexpire) { if (subExpire.subkey() == nullptr) { diff --git a/src/cluster.cpp b/src/cluster.cpp index 82ad3d271..05c8e4f73 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -5610,7 +5610,6 @@ void migrateCommand(client *c) { /* Create RESTORE payload and generate the protocol to call the command. */ for (j = 0; j < num_keys; j++) { long long ttl = 0; - std::unique_lock ul(g_expireLock); expireEntry *pexpire = c->db->getExpire(kv[j]); long long expireat = INVALID_EXPIRE; if (pexpire != nullptr) diff --git a/src/config.cpp b/src/config.cpp index dbe7377bc..f898e9e74 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -2908,7 +2908,7 @@ standardConfig configs[] = { createIntConfig("list-compress-depth", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->list_compress_depth, 0, INTEGER_CONFIG, NULL, NULL), createIntConfig("rdb-key-save-delay", NULL, MODIFIABLE_CONFIG, INT_MIN, INT_MAX, g_pserver->rdb_key_save_delay, 0, INTEGER_CONFIG, NULL, NULL), createIntConfig("key-load-delay", NULL, MODIFIABLE_CONFIG, INT_MIN, INT_MAX, g_pserver->key_load_delay, 0, INTEGER_CONFIG, NULL, NULL), - createIntConfig("active-expire-effort", NULL, MODIFIABLE_CONFIG, 1, 10, cserver.active_expire_effort, 1, INTEGER_CONFIG, NULL, NULL), /* From 1 to 10. */ + createIntConfig("active-expire-effort", NULL, MODIFIABLE_CONFIG, 1, 10, g_pserver->active_expire_effort, 1, INTEGER_CONFIG, NULL, NULL), /* From 1 to 10. */ createIntConfig("hz", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->config_hz, CONFIG_DEFAULT_HZ, INTEGER_CONFIG, NULL, updateHZ), createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->repl_min_slaves_to_write, 0, INTEGER_CONFIG, NULL, updateGoodSlaves), createIntConfig("min-replicas-max-lag", "min-slaves-max-lag", MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->repl_min_slaves_max_lag, 10, INTEGER_CONFIG, NULL, updateGoodSlaves), diff --git a/src/db.cpp b/src/db.cpp index 297910b9e..9bc67a396 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -55,8 +55,8 @@ struct dbBackup { int expireIfNeeded(redisDb *db, robj *key, robj *o); void slotToKeyUpdateKeyCore(const char *key, size_t keylen, int add); -std::unique_ptr deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset); -sds serializeStoredObjectAndExpire(redisDbPersistentData *db, const char *key, robj_roptr o); +std::unique_ptr deserializeExpire(const char *str, size_t cch, size_t *poffset); +sds serializeStoredObjectAndExpire(robj_roptr o); dictType dictChangeDescType { dictSdsHash, /* hash function */ @@ -83,6 +83,7 @@ void updateExpire(redisDb *db, sds key, robj *valOld, robj *valNew) serverAssert(db->FKeyExpires((const char*)key)); + valNew->expire = std::move(valOld->expire); valNew->SetFExpires(true); valOld->SetFExpires(false); return; @@ -281,8 +282,8 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) { return o; } -bool dbAddCore(redisDb *db, sds key, robj *val, bool fUpdateMvcc, bool fAssumeNew = false, dict_iter *piterExisting = nullptr) { - serverAssert(!val->FExpires()); +bool dbAddCore(redisDb *db, sds key, robj *val, bool fUpdateMvcc, bool fAssumeNew = false, dict_iter *piterExisting = nullptr, bool fValExpires = false) { + serverAssert(fValExpires || !val->FExpires()); sds copy = sdsdupshared(key); uint64_t mvcc = getMvccTstamp(); @@ -1494,15 +1495,6 @@ void renameGenericCommand(client *c, int nx) { incrRefCount(o); - std::unique_ptr spexpire; - - { // scope pexpireOld since it will be invalid soon - std::unique_lock ul(g_expireLock); - expireEntry *pexpireOld = c->db->getExpire(c->argv[1]); - if (pexpireOld != nullptr) - spexpire = std::make_unique(std::move(*pexpireOld)); - } - if (lookupKeyWrite(c->db,c->argv[2]) != NULL) { if (nx) { decrRefCount(o); @@ -1513,10 +1505,12 @@ void renameGenericCommand(client *c, int nx) { * with the same name. */ dbDelete(c->db,c->argv[2]); } + bool fExpires = o->FExpires(); + long long whenT = o->expire.when(); dbDelete(c->db,c->argv[1]); - dbAdd(c->db,c->argv[2],o); - if (spexpire != nullptr) - setExpire(c,c->db,c->argv[2],std::move(*spexpire)); + o->SetFExpires(fExpires); + dbAddCore(c->db,szFromObj(c->argv[2]),o,true /*fUpdateMvcc*/,true/*fAssumeNew*/,nullptr,true/*fValExpires*/); + serverAssert(whenT == o->expire.when()); // dbDelete and dbAdd must not modify the expire, just the FExpire bit signalModifiedKey(c,c->db,c->argv[1]); signalModifiedKey(c,c->db,c->argv[2]); notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from", @@ -1579,22 +1573,15 @@ void moveCommand(client *c) { return; } - std::unique_ptr spexpire; - { // scope pexpireOld - std::unique_lock ul(g_expireLock); - expireEntry *pexpireOld = c->db->getExpire(c->argv[1]); - if (pexpireOld != nullptr) - spexpire = std::make_unique(std::move(*pexpireOld)); - } - if (o->FExpires()) - removeExpire(c->db,c->argv[1]); - serverAssert(!o->FExpires()); incrRefCount(o); + bool fExpire = o->FExpires(); + long long whenT = o->expire.when(); dbDelete(src,c->argv[1]); g_pserver->dirty++; - dbAdd(dst,c->argv[1],o); - if (spexpire != nullptr) setExpire(c,dst,c->argv[1],std::move(*spexpire)); + o->SetFExpires(fExpire); + dbAddCore(dst, szFromObj(c->argv[1]), o, true /*fUpdateMvcc*/, true /*fAssumeNew*/, nullptr, true /*fValExpires*/); + serverAssert(whenT == o->expire.when()); // add/delete must not modify the expire time signalModifiedKey(c,src,c->argv[1]); signalModifiedKey(c,dst,c->argv[1]); @@ -1662,7 +1649,7 @@ void copyCommand(client *c) { addReply(c,shared.czero); return; } - expire = c->db->getExpire(key); + expire = o->FExpires() ? &o->expire : nullptr; /* Return zero if the key already exists in the target DB. * If REPLACE option is selected, delete newkey from targetDB. */ @@ -1829,63 +1816,48 @@ int redisDbPersistentData::removeExpire(robj *key, dict_iter itr) { /* An expire may only be removed if there is a corresponding entry in the * main dict. Otherwise, the key will never be freed. */ serverAssertWithInfo(NULL,key,itr != nullptr); - std::unique_lock ul(g_expireLock); robj *val = itr.val(); if (!val->FExpires()) return 0; trackkey(key, true /* fUpdate */); - auto itrExpire = m_setexpire->find(itr.key()); - serverAssert(itrExpire != m_setexpire->end()); - m_setexpire->erase(itrExpire); val->SetFExpires(false); + serverAssert(m_numexpires > 0); + m_numexpires--; return 1; } int redisDbPersistentData::removeSubkeyExpire(robj *key, robj *subkey) { auto de = find(szFromObj(key)); serverAssertWithInfo(NULL,key,de != nullptr); - std::unique_lock ul(g_expireLock); robj *val = de.val(); if (!val->FExpires()) return 0; - - auto itr = m_setexpire->find(de.key()); - serverAssert(itr != m_setexpire->end()); - serverAssert(itr->key() == de.key()); - if (!itr->FFat()) + + if (!val->expire.FFat()) return 0; int found = 0; - for (auto subitr : *itr) + for (auto subitr : val->expire) { if (subitr.subkey() == nullptr) continue; if (sdscmp((sds)subitr.subkey(), szFromObj(subkey)) == 0) { - itr->erase(subitr); + val->expire.erase(subitr); found = 1; break; } } - if (itr->pfatentry()->size() == 0) + if (val->expire.pfatentry()->size() == 0) this->removeExpire(key, de); return found; } -void redisDbPersistentData::resortExpire(expireEntry &e) -{ - std::unique_lock ul(g_expireLock); - auto itr = m_setexpire->find(e.key()); - expireEntry eT = std::move(e); - m_setexpire->erase(itr); - m_setexpire->insert(eT); -} - /* Set an expire to the specified key. If the expire is set in the context * of an user calling a command 'c' is the client, otherwise 'c' is set * to NULL. The 'when' parameter is the absolute unix time in milliseconds @@ -1940,10 +1912,7 @@ void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e) if (kde.val()->FExpires()) removeExpire(db, key); - e.setKeyUnsafe(kde.key()); - db->setExpire(std::move(e)); - kde.val()->SetFExpires(true); - + db->setExpire(kde.key(), std::move(e)); int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0 && !g_pserver->fActiveReplica; if (c && writable_slave && !(c->flags & CLIENT_MASTER)) @@ -1954,14 +1923,15 @@ void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e) * is associated with this key (i.e. the key is non volatile) */ expireEntry *redisDbPersistentDataSnapshot::getExpire(const char *key) { /* No expire? return ASAP */ - std::unique_lock ul(g_expireLock); if (expireSize() == 0) return nullptr; - auto itrExpire = m_setexpire->find(key); - if (itrExpire == m_setexpire->end()) + auto itr = find_cached_threadsafe(key); + if (itr == end()) + return nullptr; + if (!itr.val()->FExpires()) return nullptr; - return itrExpire.operator->(); + return &itr.val()->expire; } const expireEntry *redisDbPersistentDataSnapshot::getExpire(const char *key) const @@ -2062,15 +2032,13 @@ int keyIsExpired(const redisDbPersistentDataSnapshot *db, robj *key) { /* Don't expire anything while loading. It will be done later. */ if (g_pserver->loading) return 0; - std::unique_lock ul(g_expireLock); const expireEntry *pexpire = db->getExpire(key); mstime_t now; + long long when; if (pexpire == nullptr) return 0; /* No expire for this key */ - long long when = pexpire->FGetPrimaryExpire(); - - if (when == INVALID_EXPIRE) + if (!pexpire->FGetPrimaryExpire(&when)) return 0; /* If we are in the context of a Lua script, we pretend that time is @@ -2632,7 +2600,6 @@ void redisDbPersistentData::initialize() m_pdbSnapshot = nullptr; m_pdict = dictCreate(&dbDictType,this); m_pdictTombstone = dictCreate(&dbTombstoneDictType,this); - m_setexpire = new(MALLOC_LOCAL) expireset(); m_fAllChanged = 0; m_fTrackingChanges = 0; } @@ -2668,7 +2635,6 @@ void moduleClusterLoadCallback(const char * rgchKey, size_t cchKey, void *data) void redisDb::initialize(int id) { redisDbPersistentData::initialize(); - this->expireitr = setexpire()->end(); this->blocking_keys = dictCreate(&keylistDictType,NULL); this->ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); this->watched_keys = dictCreate(&keylistDictType,NULL); @@ -2714,6 +2680,8 @@ bool redisDbPersistentData::insert(char *key, robj *o, bool fAssumeNew, dict_ite serverAssert(dictFind(m_pdictTombstone, key) != nullptr); } #endif + if (o->FExpires()) + ++m_numexpires; trackkey(key, false /* fUpdate */); } else @@ -2761,7 +2729,7 @@ size_t redisDb::clear(bool fAsync, void(callback)(void*)) } else { redisDbPersistentData::clear(callback); } - expireitr = setexpire()->end(); + expires_cursor = 0; return removed; } @@ -2774,59 +2742,57 @@ void redisDbPersistentData::clear(void(callback)(void*)) m_cnewKeysPending = 0; m_fAllChanged++; } - { - std::unique_lock ul(g_expireLock); - delete m_setexpire; - m_setexpire = new (MALLOC_LOCAL) expireset(); - } if (m_spstorage != nullptr) m_spstorage->clear(callback); dictEmpty(m_pdictTombstone,callback); m_pdbSnapshot = nullptr; + m_numexpires = 0; } void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when) { /* Reuse the sds from the main dict in the expire dict */ - std::unique_lock ul(g_expireLock); dictEntry *kde = dictFind(m_pdict,ptrFromObj(key)); serverAssertWithInfo(NULL,key,kde != NULL); trackkey(key, true /* fUpdate */); - if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) + robj *o = (robj*)dictGetVal(kde); + if (o->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) { // shared objects cannot have the expire bit set, create a real object - dictSetVal(m_pdict, kde, dupStringObject((robj*)dictGetVal(kde))); + dictSetVal(m_pdict, kde, dupStringObject(o)); + o = (robj*)dictGetVal(kde); } const char *szSubKey = (subkey != nullptr) ? szFromObj(subkey) : nullptr; - if (((robj*)dictGetVal(kde))->FExpires()) { - auto itr = m_setexpire->find((sds)dictGetKey(kde)); - serverAssert(itr != m_setexpire->end()); - expireEntry eNew(std::move(*itr)); - eNew.update(szSubKey, when); - m_setexpire->erase(itr); - m_setexpire->insert(eNew); + if (o->FExpires()) { + o->expire.update(szSubKey, when); } else { - expireEntry e((sds)dictGetKey(kde), szSubKey, when); - ((robj*)dictGetVal(kde))->SetFExpires(true); - m_setexpire->insert(e); + expireEntry e(szSubKey, when); + o->expire = std::move(e); + o->SetFExpires(true); + ++m_numexpires; } } -void redisDbPersistentData::setExpire(expireEntry &&e) +void redisDbPersistentData::setExpire(const char *key, expireEntry &&e) { - std::unique_lock ul(g_expireLock); - trackkey(e.key(), true /* fUpdate */); - m_setexpire->insert(e); + trackkey(key, true /* fUpdate */); + auto itr = find(key); + if (!itr->FExpires()) + m_numexpires++; + itr->expire = std::move(e); + itr->SetFExpires(true); } bool redisDb::FKeyExpires(const char *key) { - std::unique_lock ul(g_expireLock); - return setexpireUnsafe()->find(key) != setexpire()->end(); + auto itr = find(key); + if (itr == end()) + return false; + return itr->FExpires(); } void redisDbPersistentData::updateValue(dict_iter itr, robj *val) @@ -2850,7 +2816,6 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) serverAssert(m_refCount == 0); if (m_pdbSnapshot == nullptr && g_pserver->m_pstorageFactory == nullptr) return; - std::unique_lock ul(g_expireLock); // First see if the key can be obtained from a snapshot if (*pde == nullptr && m_pdbSnapshot != nullptr) @@ -2872,7 +2837,11 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) else { sds strT = serializeStoredObject(itr.val()); - robj *objNew = deserializeStoredObject(this, sdsKey, strT, sdslen(strT)); + robj *objNew = deserializeStoredObject(strT, sdslen(strT)); + if (itr->FExpires()) { + objNew->expire = itr->expire; + objNew->SetFExpires(true); + } sdsfree(strT); dictAdd(m_pdict, keyNew, objNew); serverAssert(objNew->getrefcount(std::memory_order_relaxed) == 1); @@ -2902,26 +2871,19 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) std::unique_ptr spexpire; m_spstorage->retrieve((sds)sdsKey, [&](const char *, size_t, const void *data, size_t cb){ size_t offset = 0; - spexpire = deserializeExpire(sdsNewKey, (const char*)data, cb, &offset); - o = deserializeStoredObject(this, sdsNewKey, reinterpret_cast(data) + offset, cb - offset); + spexpire = deserializeExpire((const char*)data, cb, &offset); + o = deserializeStoredObject(reinterpret_cast(data) + offset, cb - offset); serverAssert(o != nullptr); }); if (o != nullptr) { dictAdd(m_pdict, sdsNewKey, o); - o->SetFExpires(spexpire != nullptr); - std::unique_lock ul(g_expireLock); - if (spexpire != nullptr) - { - auto itr = m_setexpire->find(sdsKey); - if (itr != m_setexpire->end()) - m_setexpire->erase(itr); - m_setexpire->insert(std::move(*spexpire)); - serverAssert(m_setexpire->find(sdsKey) != m_setexpire->end()); + if (spexpire != nullptr) { + o->expire = std::move(*spexpire); } - serverAssert(o->FExpires() == (m_setexpire->find(sdsKey) != m_setexpire->end())); + o->SetFExpires(spexpire != nullptr); g_pserver->stat_storage_provider_read_hits++; } else { sdsfree(sdsNewKey); @@ -2931,18 +2893,11 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) *pde = dictFind(m_pdict, sdsKey); } } - - if (*pde != nullptr && dictGetVal(*pde) != nullptr) - { - robj *o = (robj*)dictGetVal(*pde); - std::unique_lock ul(g_expireLock); - serverAssert(o->FExpires() == (m_setexpire->find(sdsKey) != m_setexpire->end())); - } } void redisDbPersistentData::storeKey(sds key, robj *o, bool fOverwrite) { - sds temp = serializeStoredObjectAndExpire(this, key, o); + sds temp = serializeStoredObjectAndExpire(o); m_spstorage->insert(key, temp, sdslen(temp), fOverwrite); sdsfree(temp); } @@ -2966,7 +2921,7 @@ void redisDbPersistentData::storeDatabase() if (itr == nullptr) return; robj *o = itr.val(); - sds temp = serializeStoredObjectAndExpire(db, (const char*) itr.key(), o); + sds temp = serializeStoredObjectAndExpire(o); storage->insert((sds)key, temp, sdslen(temp), fUpdate); sdsfree(temp); } @@ -3042,7 +2997,7 @@ void redisDbPersistentData::processChangesAsync(std::atomic &pendingJobs) while ((de = dictNext(di)) != nullptr) { robj *o = (robj*)dictGetVal(de); - sds temp = serializeStoredObjectAndExpire(this, (const char*) dictGetKey(de), o); + sds temp = serializeStoredObjectAndExpire(o); veckeys.push_back((sds)dictGetKey(de)); veccbkeys.push_back(sdslen((sds)dictGetKey(de))); vecvals.push_back(temp); @@ -3106,9 +3061,7 @@ redisDbPersistentData::~redisDbPersistentData() if (m_dictChanged) dictRelease(m_dictChanged); if (m_dictChangedStorageFlush) - dictRelease(m_dictChangedStorageFlush); - - delete m_setexpire; + dictRelease(m_dictChangedStorageFlush); } dict_iter redisDbPersistentData::random() @@ -3262,7 +3215,7 @@ sds serializeExpire(const expireEntry *pexpire) return str; } -std::unique_ptr deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset) +std::unique_ptr deserializeExpire(const char *str, size_t cch, size_t *poffset) { unsigned celem; if (cch < sizeof(unsigned)) @@ -3294,7 +3247,7 @@ std::unique_ptr deserializeExpire(sds key, const char *str, size_t offset += sizeof(long long); if (spexpire == nullptr) - spexpire = std::make_unique(key, subkey, when); + spexpire = std::make_unique(subkey, when); else spexpire->update(subkey, when); @@ -3306,13 +3259,9 @@ std::unique_ptr deserializeExpire(sds key, const char *str, size_t return spexpire; } -sds serializeStoredObjectAndExpire(redisDbPersistentData *db, const char *key, robj_roptr o) +sds serializeStoredObjectAndExpire(robj_roptr o) { - std::unique_lock ul(g_expireLock); - auto itrExpire = db->setexpire()->find(key); - const expireEntry *pexpire = nullptr; - if (itrExpire != db->setexpire()->end()) - pexpire = &(*itrExpire); + const expireEntry *pexpire = o->FExpires() ? &o->expire : nullptr; sds str = serializeExpire(pexpire); str = serializeStoredObject(o, str); @@ -3395,8 +3344,8 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command robj *o = nullptr; m_spstorage->retrieve((sds)szFromObj(objKey), [&](const char *, size_t, const void *data, size_t cb){ size_t offset = 0; - spexpire = deserializeExpire(sharedKey, (const char*)data, cb, &offset); - o = deserializeStoredObject(this, sharedKey, reinterpret_cast(data) + offset, cb - offset); + spexpire = deserializeExpire((const char*)data, cb, &offset); + o = deserializeStoredObject(reinterpret_cast(data) + offset, cb - offset); serverAssert(o != nullptr); }); @@ -3431,18 +3380,9 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command } } dictAdd(m_pdict, sharedKey, o); - o->SetFExpires(spexpire != nullptr); - - std::unique_lock ul(g_expireLock); if (spexpire != nullptr) - { - auto itr = m_setexpire->find(sharedKey); - if (itr != m_setexpire->end()) - m_setexpire->erase(itr); - m_setexpire->insert(std::move(*spexpire)); - serverAssert(m_setexpire->find(sharedKey) != m_setexpire->end()); - } - serverAssert(o->FExpires() == (m_setexpire->find(sharedKey) != m_setexpire->end())); + o->expire = std::move(*spexpire); + o->SetFExpires(spexpire != nullptr); } } else diff --git a/src/debug.cpp b/src/debug.cpp index 688ca3a0e..00b9482dd 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -146,11 +146,10 @@ void mixStringObjectDigest(unsigned char *digest, robj_roptr o) { * Note that this function does not reset the initial 'digest' passed, it * will continue mixing this object digest to anything that was already * present. */ -void xorObjectDigest(redisDb *db, robj_roptr keyobj, unsigned char *digest, robj_roptr o) { +void xorObjectDigest(unsigned char *digest, robj_roptr o) { uint32_t aux = htonl(o->type); mixDigest(digest,&aux,sizeof(aux)); - std::unique_lock ul(g_expireLock); - expireEntry *pexpire = db->getExpire(keyobj); + const expireEntry *pexpire = o->FExpires() ? &o->expire : nullptr; long long expiretime = INVALID_EXPIRE; char buf[128]; @@ -318,7 +317,7 @@ void computeDatasetDigest(unsigned char *final) { mixDigest(digest,key,sdslen(key)); - xorObjectDigest(db,keyobj,digest,o); + xorObjectDigest(digest,o); /* We can finally xor the key-val digest to the final digest */ xorDigest(final,digest,20); @@ -716,7 +715,7 @@ NULL * work on logically expired keys */ auto itr = c->db->find(c->argv[j]); robj* o = (robj*)(itr == NULL ? NULL : itr.val()); - if (o) xorObjectDigest(c->db,c->argv[j],digest,o); + if (o) xorObjectDigest(digest,o); sds d = sdsempty(); for (int i = 0; i < 20; i++) d = sdscatprintf(d, "%02x",digest[i]); @@ -843,10 +842,6 @@ NULL g_pserver->db[dbid]->getStats(buf,sizeof(buf)); stats = sdscat(stats,buf); - stats = sdscatprintf(stats,"[Expires set]\n"); - g_pserver->db[dbid]->getExpireStats(buf, sizeof(buf)); - stats = sdscat(stats, buf); - addReplyVerbatim(c,stats,sdslen(stats),"txt"); sdsfree(stats); } else if (!strcasecmp(szFromObj(c->argv[1]),"htstats-key") && c->argc == 3) { diff --git a/src/defrag.cpp b/src/defrag.cpp index d48f4d804..b92888b89 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -47,7 +47,6 @@ extern "C" int je_get_defrag_hint(void* ptr); /* forward declarations*/ void defragDictBucketCallback(void *privdata, dictEntry **bucketref); dictEntry* replaceSatelliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged); -bool replaceSatelliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey); /* Defrag helper for generic allocations. * @@ -425,20 +424,6 @@ dictEntry* replaceSatelliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, s return NULL; } -bool replaceSatelliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) { - auto itr = set.find(oldkey); - if (itr != set.end()) - { - expireEntry eNew(std::move(*itr)); - eNew.setKeyUnsafe(newkey); - set.erase(itr); - set.insert(eNew); - serverAssert(set.find(newkey) != set.end()); - return true; - } - return false; -} - long activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) { quicklistNode *newnode, *node = *node_ref; long defragged = 0; @@ -851,7 +836,6 @@ long defragModule(redisDb *db, dictEntry *kde) { * all the various pointers it has. Returns a stat of how many pointers were * moved. */ long defragKey(redisDb *db, dictEntry *de) { - std::unique_lock ul(g_expireLock); sds keysds = (sds)dictGetKey(de); robj *newob, *ob; unsigned char *newzl; @@ -862,15 +846,8 @@ long defragKey(redisDb *db, dictEntry *de) { /* Try to defrag the key name. */ newsds = activeDefragSds(keysds); - if (newsds) - { + if (newsds) { defragged++, de->key = newsds; - if (!db->setexpire()->empty()) { - bool fReplaced = replaceSatelliteOSetKeyPtr(*const_cast(db->setexpire()), keysds, newsds); - serverAssert(fReplaced == ob->FExpires()); - } else { - serverAssert(!ob->FExpires()); - } } if ((newob = activeDefragStringOb(ob, &defragged))) { diff --git a/src/evict.cpp b/src/evict.cpp index 1523b2814..2503cca15 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -222,52 +222,23 @@ void processEvictionCandidate(int dbid, sds key, robj *o, const expireEntry *e, * idle time are on the left, and keys with the higher idle time on the * right. */ -struct visitFunctor +int evictionPoolPopulate(int dbid, redisDb *db, bool fVolatile, struct evictionPoolEntry *pool) { - int dbid; - dict *dbdict; - struct evictionPoolEntry *pool; - int count = 0; - int tries = 0; - - bool operator()(const expireEntry &e) - { - dictEntry *de = dictFind(dbdict, e.key()); - if (de != nullptr) + int returnCount = 0; + dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*)); + int count = dictGetSomeKeys(db->dictUnsafeKeyOnly(),samples,g_pserver->maxmemory_samples); + for (int j = 0; j < count; j++) { + robj *o = (robj*)dictGetVal(samples[j]); + // If the object is in second tier storage we don't need to evict it (since it already is) + if (o != nullptr) { - processEvictionCandidate(dbid, (sds)dictGetKey(de), (robj*)dictGetVal(de), &e, pool); - ++count; - } - ++tries; - return tries < g_pserver->maxmemory_samples; - } -}; -int evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool) -{ - if (setexpire != nullptr) - { - std::unique_lock ul(g_expireLock); - visitFunctor visitor { dbid, db->dictUnsafeKeyOnly(), pool, 0 }; - setexpire->random_visit(visitor); - return visitor.count; - } - else - { - int returnCount = 0; - dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*)); - int count = dictGetSomeKeys(db->dictUnsafeKeyOnly(),samples,g_pserver->maxmemory_samples); - for (int j = 0; j < count; j++) { - robj *o = (robj*)dictGetVal(samples[j]); - // If the object is in second tier storage we don't need to evict it (since it alrady is) - if (o != nullptr) - { - processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, nullptr, pool); + if (!fVolatile || o->FExpires()) { + processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, &o->expire, pool); ++returnCount; } } - return returnCount; } - return 0; + return returnCount; } /* ---------------------------------------------------------------------------- @@ -718,14 +689,14 @@ int performEvictions(bool fPreSnapshot) { if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) { if ((keys = db->size()) != 0) { - total_keys += evictionPoolPopulate(i, db, nullptr, pool); + total_keys += evictionPoolPopulate(i, db, false, pool); } } else { keys = db->expireSize(); if (keys != 0) - total_keys += evictionPoolPopulate(i, db, db->setexpireUnsafe(), pool); + total_keys += evictionPoolPopulate(i, db, true, pool); } } if (!total_keys) break; /* No keys to evict. */ @@ -786,7 +757,7 @@ int performEvictions(bool fPreSnapshot) { { if (db->expireSize()) { - bestkey = (sds)db->random_expire().key(); + db->random_expire(&bestkey); bestdbid = j; break; } diff --git a/src/expire.cpp b/src/expire.cpp index b727183ad..4bf81878a 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -33,8 +33,6 @@ #include "server.h" #include "cron.h" -fastlock g_expireLock {"Expire"}; - /* Helper function for the activeExpireCycle() function. * This function will try to expire the key that is stored in the hash table * entry 'de' of the 'expires' hash table of a Redis database. @@ -74,21 +72,20 @@ void activeExpireCycleExpireFullKey(redisDb *db, const char *key) { *----------------------------------------------------------------------------*/ -int activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now, size_t &tried) { +int activeExpireCycleExpire(redisDb *db, const char *key, expireEntry &e, long long now, size_t &tried) { if (!e.FFat()) { - activeExpireCycleExpireFullKey(db, e.key()); + activeExpireCycleExpireFullKey(db, key); ++tried; return 1; } expireEntryFat *pfat = e.pfatentry(); - robj *val = db->find(e.key()); + robj *val = db->find(key); int deleted = 0; redisObjectStack objKey; - initStaticStringObject(objKey, (char*)e.key()); - bool fTtlChanged = false; + initStaticStringObject(objKey, (char*)key); while (!pfat->FEmpty()) { @@ -99,7 +96,7 @@ int activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now, size_t & // Is it the full key expiration? if (pfat->nextExpireEntry().spsubkey == nullptr) { - activeExpireCycleExpireFullKey(db, e.key()); + activeExpireCycleExpireFullKey(db, key); return ++deleted; } @@ -109,7 +106,7 @@ int activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now, size_t & if (setTypeRemove(val,pfat->nextExpireEntry().spsubkey.get())) { deleted++; if (setTypeSize(val) == 0) { - activeExpireCycleExpireFullKey(db, e.key()); + activeExpireCycleExpireFullKey(db, key); return deleted; } } @@ -119,7 +116,7 @@ int activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now, size_t & if (hashTypeDelete(val,(sds)pfat->nextExpireEntry().spsubkey.get())) { deleted++; if (hashTypeLength(val) == 0) { - activeExpireCycleExpireFullKey(db, e.key()); + activeExpireCycleExpireFullKey(db, key); return deleted; } } @@ -129,7 +126,7 @@ int activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now, size_t & if (zsetDel(val,(sds)pfat->nextExpireEntry().spsubkey.get())) { deleted++; if (zsetLength(val) == 0) { - activeExpireCycleExpireFullKey(db, e.key()); + activeExpireCycleExpireFullKey(db, key); return deleted; } } @@ -137,15 +134,15 @@ int activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now, size_t & case OBJ_CRON: { - sds keyCopy = sdsdup(e.key()); + sds keyCopy = sdsdup(key); incrRefCount(val); aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [keyCopy, val]{ executeCronJobExpireHook(keyCopy, val); sdsfree(keyCopy); decrRefCount(val); }, true /*fLock*/, true /*fForceQueue*/); + break; } - return deleted; case OBJ_LIST: default: @@ -157,7 +154,6 @@ int activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now, size_t & propagateSubkeyExpire(db, val->type, &objKey, &objSubkey); pfat->popfrontExpireEntry(); - fTtlChanged = true; if ((tried % ACTIVE_EXPIRE_CYCLE_SUBKEY_LOOKUPS_PER_LOOP) == 0) { break; } @@ -167,11 +163,6 @@ int activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now, size_t & { removeExpire(db, &objKey); } - else if (!pfat->FEmpty() && fTtlChanged) - { - // We need to resort the expire entry since it may no longer be in the correct position - db->resortExpire(e); - } if (deleted) { @@ -317,8 +308,26 @@ void pexpireMemberAtCommand(client *c) * If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is * executed, where the time limit is a percentage of the REDIS_HZ period * as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. */ +#define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 20 /* Keys for each DB loop. */ +#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds. */ +#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* Max % of CPU to use. */ +#define ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE 10 /* % of stale keys after which + we do extra efforts. */ +/*static*/ void redisDbPersistentData::activeExpireCycleCore(int type) { + /* Adjust the running parameters according to the configured expire + * effort. The default effort is 1, and the maximum configurable effort + * is 10. */ + unsigned long + effort = g_pserver->active_expire_effort-1, /* Rescale from 0 to 9. */ + config_keys_per_loop = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP + + ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP/4*effort, + config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION + + ACTIVE_EXPIRE_CYCLE_FAST_DURATION/4*effort, + config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC + + 2*effort, + config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE- + effort; -void activeExpireCycleCore(int type) { /* This function has some global state in order to continue the work * incrementally across calls. */ static unsigned int current_db = 0; /* Next DB to test. */ @@ -336,10 +345,16 @@ void activeExpireCycleCore(int type) { if (type == ACTIVE_EXPIRE_CYCLE_FAST) { /* Don't start a fast cycle if the previous cycle did not exit - * for time limit. Also don't repeat a fast cycle for the same period + * for time limit, unless the percentage of estimated stale keys is + * too high. Also never repeat a fast cycle for the same period * as the fast cycle total duration itself. */ - if (!timelimit_exit) return; - if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return; + if (!timelimit_exit && + g_pserver->stat_expired_stale_perc < config_cycle_acceptable_stale) + return; + + if (start < last_fast_cycle + (long long)config_cycle_fast_duration*2) + return; + last_fast_cycle = start; } @@ -353,16 +368,16 @@ void activeExpireCycleCore(int type) { if (dbs_per_call > cserver.dbnum || timelimit_exit) dbs_per_call = cserver.dbnum; - /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time - * per iteration. Since this function gets called with a frequency of - * g_pserver->hz times per second, the following is the max amount of + /* We can use at max 'config_cycle_slow_time_perc' percentage of CPU + * time per iteration. Since this function gets called with a frequency of + * server.hz times per second, the following is the max amount of * microseconds we can spend in this function. */ - timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/g_pserver->hz/100; + timelimit = config_cycle_slow_time_perc*1000000/g_pserver->hz/100; timelimit_exit = 0; if (timelimit <= 0) timelimit = 1; if (type == ACTIVE_EXPIRE_CYCLE_FAST) - timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */ + timelimit = config_cycle_fast_duration; /* in microseconds. */ /* Accumulate some global stats as we expire keys, to have some idea * about the number of keys that are already logically expired, but still @@ -371,6 +386,9 @@ void activeExpireCycleCore(int type) { long total_expired = 0; for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) { + /* Expired and checked in a single loop. */ + unsigned long expired, sampled; + redisDb *db = g_pserver->db[(current_db % cserver.dbnum)]; /* Increment the DB now so we are sure if we run out of time @@ -378,48 +396,130 @@ void activeExpireCycleCore(int type) { * distribute the time evenly across DBs. */ current_db++; - long long now; - iteration++; - now = mstime(); - - /* If there is nothing to expire try next DB ASAP. */ - if (db->setexpireUnsafe()->empty()) - { - db->avg_ttl = 0; - db->last_expire_set = now; - continue; - } - - std::unique_lock ul(g_expireLock); - size_t expired = 0; - size_t tried = 0; - long long check = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; // assume a check is roughly 1us. It isn't but good enough - db->expireitr = db->setexpireUnsafe()->enumerate(db->expireitr, now, [&](expireEntry &e) __attribute__((always_inline)) { - if (e.when() < now) - { - expired += activeExpireCycleExpire(db, e, now, tried); + /* Continue to expire if at the end of the cycle there are still + * a big percentage of keys to expire, compared to the number of keys + * we scanned. The percentage, stored in config_cycle_acceptable_stale + * is not fixed, but depends on the Redis configured "expire effort". */ + do { + unsigned long num, slots; + long long now, ttl_sum; + int ttl_samples; + iteration++; + + /* If there is nothing to expire try next DB ASAP. */ + if (db->expireSize() == 0) { + db->avg_ttl = 0; + break; + } + num = dictSize(db->m_pdict); + slots = dictSlots(db->m_pdict); + now = mstime(); + + /* When there are less than 1% filled slots, sampling the key + * space is expensive, so stop here waiting for better times... + * The dictionary will be resized asap. */ + if (slots > DICT_HT_INITIAL_SIZE && + (num*100/slots < 1)) break; + + /* The main collection cycle. Sample random keys among keys + * with an expire set, checking for expired ones. */ + expired = 0; + sampled = 0; + ttl_sum = 0; + ttl_samples = 0; + + if (num > config_keys_per_loop) + num = config_keys_per_loop; + + /* Here we access the low level representation of the hash table + * for speed concerns: this makes this code coupled with dict.c, + * but it hardly changed in ten years. + * + * Note that certain places of the hash table may be empty, + * so we want also a stop condition about the number of + * buckets that we scanned. However scanning for free buckets + * is very fast: we are in the cache line scanning a sequential + * array of NULL pointers, so we can scan a lot more buckets + * than keys in the same time. */ + long max_buckets = num*20; + long checked_buckets = 0; + + while (sampled < num && checked_buckets < max_buckets) { + for (int table = 0; table < 2; table++) { + if (table == 1 && !dictIsRehashing(db->m_pdict)) break; + + unsigned long idx = db->expires_cursor; + idx &= db->m_pdict->ht[table].sizemask; + dictEntry *de = db->m_pdict->ht[table].table[idx]; + long long ttl; + + /* Scan the current bucket of the current table. */ + checked_buckets++; + while(de) { + /* Get the next entry now since this entry may get + * deleted. */ + dictEntry *e = de; + robj *o = (robj*)dictGetVal(de); + de = de->next; + if (!o->FExpires()) + continue; + + expireEntry *exp = &o->expire; + + serverAssert(exp->when() > 0); + ttl = exp->when()-now; + size_t tried = 0; + if (exp->when() <= now) { + if (activeExpireCycleExpire(db,(const char*)dictGetKey(e),*exp,now,tried)) expired++; + serverAssert(ttl <= 0); + } else { + serverAssert(ttl > 0); + } + if (ttl > 0) { + /* We want the average TTL of keys yet + * not expired. */ + ttl_sum += ttl; + ttl_samples++; + } + sampled++; + } + } + db->expires_cursor++; + } + total_expired += expired; + total_sampled += sampled; + + /* Update the average TTL stats for this database. */ + if (ttl_samples) { + long long avg_ttl = ttl_sum/ttl_samples; + + /* Do a simple running average with a few samples. + * We just use the current estimate with a weight of 2% + * and the previous estimate with a weight of 98%. */ + if (db->avg_ttl == 0) db->avg_ttl = avg_ttl; + db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50); } - if ((tried % ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) == 0) - { - /* We can't block forever here even if there are many keys to - * expire. So after a given amount of milliseconds return to the - * caller waiting for the other active expire cycle. */ + /* We can't block forever here even if there are many keys to + * expire. So after a given amount of milliseconds return to the + * caller waiting for the other active expire cycle. */ + if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */ elapsed = ustime()-start; if (elapsed > timelimit) { timelimit_exit = 1; g_pserver->stat_expired_time_cap_reached_count++; - return false; + break; } - check = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; } - return true; - }, &check); - - total_expired += expired; + /* We don't repeat the cycle for the current database if there are + * an acceptable amount of stale keys (logically expired but yet + * not reclaimed). */ + } while (sampled == 0 || + (expired*100/sampled) > config_cycle_acceptable_stale); } elapsed = ustime()-start; + g_pserver->stat_expire_cycle_time_used += elapsed; latencyAddSampleIfNeeded("expire-cycle",elapsed/1000); /* Update our estimate of keys existing but yet to be expired. @@ -435,7 +535,7 @@ void activeExpireCycleCore(int type) { void activeExpireCycle(int type) { - runAndPropogateToReplicas(activeExpireCycleCore, type); + runAndPropogateToReplicas(redisDbPersistentData::activeExpireCycleCore, type); } /*----------------------------------------------------------------------------- @@ -481,7 +581,6 @@ void expireSlaveKeys(void) { if (slaveKeysWithExpire == NULL || dictSize(slaveKeysWithExpire) == 0) return; - std::unique_lock ul(g_expireLock); int cycles = 0, noexpire = 0; mstime_t start = mstime(); while(1) { @@ -496,19 +595,14 @@ void expireSlaveKeys(void) { while(dbids && dbid < cserver.dbnum) { if ((dbids & 1) != 0) { redisDb *db = g_pserver->db[dbid]; - - // the expire is hashed based on the key pointer, so we need the point in the main db auto itrDB = db->find(keyname); - auto itrExpire = db->setexpire()->end(); - if (itrDB != nullptr) - itrExpire = db->setexpireUnsafe()->find(itrDB.key()); int expired = 0; - if (itrExpire != db->setexpire()->end()) + if (itrDB != db->end() && itrDB->FExpires()) { - if (itrExpire->when() < start) { + if (itrDB->expire.when() < start) { size_t tried = 0; - expired = activeExpireCycleExpire(g_pserver->db[dbid],*itrExpire,start,tried); + expired = activeExpireCycleExpire(g_pserver->db[dbid],itrDB.key(),itrDB->expire,start,tried); } } @@ -516,7 +610,7 @@ void expireSlaveKeys(void) { * corresponding bit in the new bitmap we set as value. * At the end of the loop if the bitmap is zero, it means we * no longer need to keep track of this key. */ - if (itrExpire != db->setexpire()->end() && !expired) { + if (itrDB != db->end() && itrDB->FExpires() && !expired) { noexpire++; new_dbids |= (uint64_t)1 << dbid; } @@ -694,7 +788,6 @@ void ttlGenericCommand(client *c, int output_ms) { /* The key exists. Return -1 if it has no expire, or the actual * TTL value otherwise. */ - std::unique_lock ul(g_expireLock); expireEntry *pexpire = c->db->getExpire(c->argv[1]); if (c->argc == 2) { @@ -784,18 +877,11 @@ expireEntryFat::~expireEntryFat() } expireEntryFat::expireEntryFat(const expireEntryFat &e) - : m_keyPrimary(e.m_keyPrimary), m_vecexpireEntries(e.m_vecexpireEntries) + : m_vecexpireEntries(e.m_vecexpireEntries) { // Note: dictExpires is not copied } -expireEntryFat::expireEntryFat(expireEntryFat &&e) - : m_keyPrimary(std::move(e.m_keyPrimary)), m_vecexpireEntries(std::move(e.m_vecexpireEntries)) -{ - m_dictIndex = e.m_dictIndex; - e.m_dictIndex = nullptr; -} - void expireEntryFat::createIndex() { serverAssert(m_dictIndex == nullptr); diff --git a/src/expire.h b/src/expire.h index d028a9671..2211fb160 100644 --- a/src/expire.h +++ b/src/expire.h @@ -22,9 +22,11 @@ class expireEntryFat {} subexpireEntry(const subexpireEntry &other) - : spsubkey((const char*)sdsdupshared(other.spsubkey.get()), sdsfree) + : spsubkey(nullptr, sdsfree) { when = other.when; + if (other.spsubkey != nullptr) + spsubkey = std::unique_ptr((const char*)sdsdupshared(other.spsubkey.get()), sdsfree); } subexpireEntry(subexpireEntry &&) = default; @@ -41,27 +43,30 @@ class expireEntryFat }; private: - sdsimmutablestring m_keyPrimary; std::vector m_vecexpireEntries; // Note a NULL for the sds portion means the expire is for the primary key dict *m_dictIndex = nullptr; + long long m_whenPrimary = LLONG_MAX; void createIndex(); public: - expireEntryFat(const sdsimmutablestring &keyPrimary) - : m_keyPrimary(keyPrimary) - {} + expireEntryFat() = default; + expireEntryFat(const expireEntryFat &); ~expireEntryFat(); - expireEntryFat(const expireEntryFat &e); - expireEntryFat(expireEntryFat &&e); - long long when() const noexcept { return m_vecexpireEntries.front().when; } - const char *key() const noexcept { return static_cast(m_keyPrimary); } bool operator<(long long when) const noexcept { return this->when() < when; } void expireSubKey(const char *szSubkey, long long when); + bool FGetPrimaryExpire(long long *pwhen) const { + if (m_whenPrimary != LLONG_MAX) { + *pwhen = m_whenPrimary; + return true; + } + return false; + } + bool FEmpty() const noexcept { return m_vecexpireEntries.empty(); } const subexpireEntry &nextExpireEntry() const noexcept { return m_vecexpireEntries.front(); } void popfrontExpireEntry(); @@ -70,19 +75,11 @@ class expireEntryFat }; class expireEntry { - struct - { - sdsimmutablestring m_key; - expireEntryFat *m_pfatentry = nullptr; - } u; - long long m_when; // bit wise and with FFatMask means this is a fat entry and we should use the pointer - - /* Mask to check if an entry is Fat, most significant bit of m_when being set means it is Fat otherwise it is not */ - long long FFatMask() const noexcept { - return (1LL) << (sizeof(long long)*CHAR_BIT - 1); - } - - expireEntry() = default; + struct { + uint64_t m_whenAndPtrUnion : 63, + fFat : 1; + } s; + static_assert(sizeof(expireEntryFat*) <= sizeof(int64_t), "The pointer must fit in the union"); public: class iter { @@ -118,93 +115,104 @@ class expireEntry { const iter &operator*() const { return *this; } }; - expireEntry(sds key, const char *subkey, long long when) + expireEntry() + { + s.fFat = 0; + s.m_whenAndPtrUnion = 0; + } + + expireEntry(const char *subkey, long long when) { if (subkey != nullptr) { - m_when = FFatMask() | INVALID_EXPIRE; - u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(sdsimmutablestring(sdsdupshared(key))); - u.m_pfatentry->expireSubKey(subkey, when); + auto pfatentry = new (MALLOC_LOCAL) expireEntryFat(); + pfatentry->expireSubKey(subkey, when); + s.m_whenAndPtrUnion = reinterpret_cast(pfatentry); + s.fFat = true; } else { - u.m_key = sdsimmutablestring(sdsdupshared(key)); - m_when = when; + s.m_whenAndPtrUnion = when; + s.fFat = false; } } - expireEntry(const expireEntry &e) + expireEntry(expireEntryFat *pfatentry) { - *this = e; + assert(pfatentry != nullptr); + s.m_whenAndPtrUnion = reinterpret_cast(pfatentry); + s.fFat = true; + } + + expireEntry(const expireEntry &e) { + if (e.FFat()) { + s.m_whenAndPtrUnion = reinterpret_cast(new expireEntryFat(*e.pfatentry())); + s.fFat = true; + } else { + s = e.s; + } } + expireEntry(expireEntry &&e) { - u.m_key = std::move(e.u.m_key); - u.m_pfatentry = std::move(e.u.m_pfatentry); - m_when = e.m_when; - e.m_when = 0; - e.u.m_pfatentry = nullptr; + s = e.s; } - expireEntry(expireEntryFat *pfatentry) + expireEntry &operator=(expireEntry &&e) { - u.m_pfatentry = pfatentry; - m_when = FFatMask() | INVALID_EXPIRE; - for (auto itr : *this) - { - if (itr.subkey() == nullptr) - { - m_when = FFatMask() | itr.when(); - break; - } + s = e.s; + e.s.m_whenAndPtrUnion = 0; + e.s.fFat = false; + return *this; + } + + expireEntry &operator=(expireEntry &e) { + if (e.FFat()) { + s.m_whenAndPtrUnion = reinterpret_cast(new expireEntryFat(*e.pfatentry())); + s.fFat = true; + } else { + s = e.s; } + return *this; } // Duplicate the expire, note this is intended to be passed directly to setExpire expireEntry duplicate() const { expireEntry dst; - dst.m_when = m_when; if (FFat()) { - dst.u.m_pfatentry = new expireEntryFat(*u.m_pfatentry); + auto pfatentry = new expireEntryFat(*expireEntry::pfatentry()); + dst.s.m_whenAndPtrUnion = reinterpret_cast(pfatentry); + dst.s.fFat = true; } else { - dst.u.m_key = u.m_key; + dst.s.m_whenAndPtrUnion = s.m_whenAndPtrUnion; + dst.s.fFat = false; } return dst; } - ~expireEntry() - { + void reset() { if (FFat()) - delete u.m_pfatentry; + delete pfatentry(); + s.fFat = false; + s.m_whenAndPtrUnion = 0; } - expireEntry &operator=(const expireEntry &e) - { - u.m_key = e.u.m_key; - m_when = e.m_when; - if (e.FFat()) - u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(*e.u.m_pfatentry); - return *this; - } - - void setKeyUnsafe(sds key) + ~expireEntry() { if (FFat()) - u.m_pfatentry->m_keyPrimary = sdsimmutablestring(sdsdupshared(key)); - else - u.m_key = sdsimmutablestring(sdsdupshared(key)); + delete pfatentry(); } - inline bool FFat() const noexcept { return m_when & FFatMask(); } - expireEntryFat *pfatentry() { assert(FFat()); return u.m_pfatentry; } - const expireEntryFat *pfatentry() const { assert(FFat()); return u.m_pfatentry; } - - - bool operator==(const sdsview &key) const noexcept - { - return key == this->key(); + inline bool FFat() const noexcept { return s.fFat; } + expireEntryFat *pfatentry() { + assert(FFat()); + return reinterpret_cast(s.m_whenAndPtrUnion); + } + const expireEntryFat *pfatentry() const { + return const_cast(this)->pfatentry(); } + bool operator<(const expireEntry &e) const noexcept { return when() < e.when(); @@ -214,17 +222,11 @@ class expireEntry { return this->when() < when; } - const char *key() const noexcept - { - if (FFat()) - return u.m_pfatentry->key(); - return static_cast(u.m_key); - } long long when() const noexcept { if (FFat()) - return u.m_pfatentry->when(); - return FGetPrimaryExpire(); + return pfatentry()->when(); + return s.m_whenAndPtrUnion; } void update(const char *subkey, long long when) @@ -233,30 +235,27 @@ class expireEntry { { if (subkey == nullptr) { - m_when = when; + s.m_whenAndPtrUnion = when; return; } else { // we have to upgrade to a fat entry - long long whenT = m_when; - sdsimmutablestring keyPrimary = u.m_key; - m_when |= FFatMask(); - u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(keyPrimary); - u.m_pfatentry->expireSubKey(nullptr, whenT); + auto pfatentry = new (MALLOC_LOCAL) expireEntryFat(); + pfatentry->expireSubKey(nullptr, s.m_whenAndPtrUnion); + s.m_whenAndPtrUnion = reinterpret_cast(pfatentry); + s.fFat = true; // at this point we're fat so fall through } } - if (subkey == nullptr) - m_when = when | FFatMask(); - u.m_pfatentry->expireSubKey(subkey, when); + pfatentry()->expireSubKey(subkey, when); } iter begin() const { return iter(this, 0); } iter end() const { if (FFat()) - return iter(this, u.m_pfatentry->size()); + return iter(this, pfatentry()->size()); return iter(this, 1); } @@ -268,26 +267,39 @@ class expireEntry { pfatentry()->m_vecexpireEntries.begin() + itr.m_idx); } - size_t size() const - { + size_t size() const { if (FFat()) - return u.m_pfatentry->size(); + return pfatentry()->size(); return 1; } - long long FGetPrimaryExpire() const noexcept + bool FGetPrimaryExpire(long long *pwhen) const noexcept { - return m_when & (~FFatMask()); + if (FFat()) { + return pfatentry()->FGetPrimaryExpire(pwhen); + } else { + *pwhen = s.m_whenAndPtrUnion; + return true; + } } - bool FGetPrimaryExpire(long long *pwhen) const noexcept - { - *pwhen = FGetPrimaryExpire(); - return *pwhen != INVALID_EXPIRE; + void *release_as_void() { + uint64_t whenT = s.m_whenAndPtrUnion; + whenT |= static_cast(s.fFat) << 63; + s.m_whenAndPtrUnion = 0; + s.fFat = 0; + return reinterpret_cast(whenT); + } + + static expireEntry *from_void(void **src) { + uintptr_t llV = reinterpret_cast(src); + return reinterpret_cast(llV); + } + static const expireEntry *from_void(void *const*src) { + uintptr_t llV = reinterpret_cast(src); + return reinterpret_cast(llV); } - explicit operator sdsview() const noexcept { return key(); } explicit operator long long() const noexcept { return when(); } }; -typedef semiorderedset expireset; -extern fastlock g_expireLock; \ No newline at end of file +static_assert(sizeof(expireEntry) == sizeof(long long), "This must fit in a long long so it can be put in a dictEntry"); diff --git a/src/lazyfree.cpp b/src/lazyfree.cpp index fe76b2f4a..90c2e26cb 100644 --- a/src/lazyfree.cpp +++ b/src/lazyfree.cpp @@ -20,11 +20,9 @@ void lazyfreeFreeObject(void *args[]) { * when the database was logically deleted. */ void lazyfreeFreeDatabase(void *args[]) { dict *ht1 = (dict *) args[0]; - expireset *setexpire = (expireset *) args[1]; size_t numkeys = dictSize(ht1); dictRelease(ht1); - delete setexpire; atomicDecr(lazyfree_objects,numkeys); atomicIncr(lazyfreed_objects,numkeys); } @@ -217,17 +215,15 @@ void freeObjAsync(robj *key, robj *obj) { * create a new empty set of hash tables and scheduling the old ones for * lazy freeing. */ void redisDbPersistentData::emptyDbAsync() { - std::unique_lock ul(g_expireLock); dict *oldht1 = m_pdict; - auto *set = m_setexpire; - m_setexpire = new (MALLOC_LOCAL) expireset(); m_pdict = dictCreate(&dbDictType,this); if (m_spstorage != nullptr) m_spstorage->clearAsync(); if (m_fTrackingChanges) m_fAllChanged = true; atomicIncr(lazyfree_objects,dictSize(oldht1)); - bioCreateLazyFreeJob(lazyfreeFreeDatabase,2,oldht1,set); + m_numexpires = 0; + bioCreateLazyFreeJob(lazyfreeFreeDatabase,2,oldht1,nullptr); } /* Release the radix tree mapping Redis Cluster keys to slots asynchronously. */ diff --git a/src/module.cpp b/src/module.cpp index 7bd32aebb..82b52bafa 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -2442,11 +2442,10 @@ int RM_UnlinkKey(RedisModuleKey *key) { * If no TTL is associated with the key or if the key is empty, * REDISMODULE_NO_EXPIRE is returned. */ mstime_t RM_GetExpire(RedisModuleKey *key) { - std::unique_lock ul(g_expireLock); - expireEntry *pexpire = key->db->getExpire(key->key); + auto itr = key->db->find(key->key); mstime_t expire = INVALID_EXPIRE; - if (pexpire != nullptr) - pexpire->FGetPrimaryExpire(&expire); + if (itr->FExpires()) + itr->expire.FGetPrimaryExpire(&expire); if (expire == INVALID_EXPIRE || key->value == NULL) return REDISMODULE_NO_EXPIRE; diff --git a/src/object.cpp b/src/object.cpp index d3122dbbd..82a492b51 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -1141,10 +1141,7 @@ struct redisMemOverhead *getMemoryOverheadData(void) { mh->db[mh->num_dbs].overhead_ht_main = mem; mem_total+=mem; - std::unique_lock ul(g_expireLock); - mem = db->setexpire()->estimated_bytes_used(); - mh->db[mh->num_dbs].overhead_ht_expires = mem; - mem_total+=mem; + mh->db[mh->num_dbs].overhead_ht_expires = 0; mh->num_dbs++; } @@ -1628,7 +1625,7 @@ robj *deserializeStoredStringObject(const char *data, size_t cb) return newObject; } -robj *deserializeStoredObjectCore(const void *data, size_t cb) +robj *deserializeStoredObject(const void *data, size_t cb) { switch (((char*)data)[0]) { @@ -1665,14 +1662,6 @@ robj *deserializeStoredObjectCore(const void *data, size_t cb) serverPanic("Unknown object type loading from storage"); } -robj *deserializeStoredObject(const redisDbPersistentData *db, const char *key, const void *data, size_t cb) -{ - robj *o = deserializeStoredObjectCore(data, cb); - std::unique_lock ul(g_expireLock); - o->SetFExpires(db->setexpire()->exists(key)); - return o; -} - sds serializeStoredObject(robj_roptr o, sds sdsPrefix) { switch (o->type) diff --git a/src/rdb.cpp b/src/rdb.cpp index 3f84a6b23..5c3c77174 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1237,18 +1237,14 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { return 1; } -int saveKey(rio *rdb, const redisDbPersistentDataSnapshot *db, int flags, size_t *processed, const char *keystr, robj_roptr o) +int saveKey(rio *rdb, int flags, size_t *processed, const char *keystr, robj_roptr o) { redisObjectStack key; initStaticStringObject(key,(char*)keystr); - std::unique_lock ul(g_expireLock, std::defer_lock); const expireEntry *pexpire = nullptr; - if (o->FExpires()) - { - ul.lock(); - pexpire = db->getExpire(&key); - serverAssert((o->FExpires() && pexpire != nullptr) || (!o->FExpires() && pexpire == nullptr)); + if (o->FExpires()) { + pexpire = &o->expire; } if (rdbSaveKeyValuePair(rdb,&key,o,pexpire) == -1) @@ -1355,7 +1351,7 @@ int rdbSaveRio(rio *rdb, const redisDbPersistentDataSnapshot **rgpdb, int *error if (o->FExpires()) ++ckeysExpired; - if (!saveKey(rdb, db, rdbflags, &processed, keystr, o)) + if (!saveKey(rdb, rdbflags, &processed, keystr, o)) return false; /* Update child info every 1 second (approximately). diff --git a/src/server.cpp b/src/server.cpp index 8ab67ad40..65d14da43 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1518,6 +1518,16 @@ dictType dbDictType = { dictGCAsyncFree /* async free destructor */ }; +dictType dbExpiresDictType = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + NULL, /* key destructor */ + NULL, /* val destructor */ + dictExpandAllowed /* allow to expand */ + }; + /* db->pdict, keys are sds strings, vals are Redis objects. */ dictType dbTombstoneDictType = { dictSdsHash, /* hash function */ @@ -1550,17 +1560,6 @@ dictType shaScriptObjectDictType = { NULL /* allow to expand */ }; -/* Db->expires */ -dictType dbExpiresDictType = { - dictSdsHash, /* hash function */ - NULL, /* key dup */ - NULL, /* val dup */ - dictSdsKeyCompare, /* key compare */ - NULL, /* key destructor */ - NULL, /* val destructor */ - dictExpandAllowed /* allow to expand */ -}; - /* Command table. sds string -> command struct pointer. */ dictType commandTableDictType = { dictSdsCaseHash, /* hash function */ diff --git a/src/server.h b/src/server.h index 9ad1aab8d..cd612626e 100644 --- a/src/server.h +++ b/src/server.h @@ -978,6 +978,7 @@ typedef struct redisObject { private: mutable std::atomic refcount {0}; public: + expireEntry expire; void *m_ptr; inline bool FExpires() const { return refcount.load(std::memory_order_relaxed) >> 31; } @@ -988,7 +989,7 @@ typedef struct redisObject { void addref() const { refcount.fetch_add(1, std::memory_order_relaxed); } unsigned release() const { return refcount.fetch_sub(1, std::memory_order_seq_cst) & ~(1U << 31); } } robj; -static_assert(sizeof(redisObject) <= 16, "object size is critical, don't increase"); +static_assert(sizeof(redisObject) <= 24, "object size is critical, don't increase"); class redisObjectStack : public redisObjectExtended, public redisObject { @@ -1144,16 +1145,20 @@ class redisDbPersistentData dict_iter random(); - const expireEntry &random_expire() + const expireEntry *random_expire(sds *key) { - return m_setexpire->random_value(); + auto itr = random(); + if (itr->FExpires()) { + *key = itr.key(); + return &itr->expire; + } + return nullptr; } dict_iter end() { return dict_iter(nullptr, nullptr); } dict_const_iter end() const { return dict_const_iter(nullptr); } void getStats(char *buf, size_t bufsize) { dictGetStats(buf, bufsize, m_pdict); } - void getExpireStats(char *buf, size_t bufsize) { m_setexpire->getstats(buf, bufsize); } bool insert(char *k, robj *o, bool fAssumeNew = false, dict_iter *existing = nullptr); void tryResize(); @@ -1161,16 +1166,15 @@ class redisDbPersistentData void updateValue(dict_iter itr, robj *val); bool syncDelete(robj *key); bool asyncDelete(robj *key); - size_t expireSize() const { return m_setexpire->size(); } + size_t expireSize() const { return m_numexpires; } int removeExpire(robj *key, dict_iter itr); int removeSubkeyExpire(robj *key, robj *subkey); - void resortExpire(expireEntry &e); void clear(void(callback)(void*)); void emptyDbAsync(); // Note: If you do not need the obj then use the objless iterator version. It's faster bool iterate(std::function fn); void setExpire(robj *key, robj *subkey, long long when); - void setExpire(expireEntry &&e); + void setExpire(const char *key, expireEntry &&e); void initialize(); void prepOverwriteForSnapshot(char *key); @@ -1194,9 +1198,6 @@ class redisDbPersistentData // objects stored elsewhere dict *dictUnsafeKeyOnly() { return m_pdict; } - expireset *setexpireUnsafe() { return m_setexpire; } - const expireset *setexpire() const { return m_setexpire; } - const redisDbPersistentDataSnapshot *createSnapshot(uint64_t mvccCheckpoint, bool fOptional); void endSnapshot(const redisDbPersistentDataSnapshot *psnapshot); void endSnapshotAsync(const redisDbPersistentDataSnapshot *psnapshot); @@ -1218,6 +1219,8 @@ class redisDbPersistentData dict_iter find_cached_threadsafe(const char *key) const; + static void activeExpireCycleCore(int type); + protected: uint64_t m_mvccCheckpoint = 0; @@ -1240,7 +1243,7 @@ class redisDbPersistentData std::shared_ptr m_spstorage = nullptr; // Expire - expireset *m_setexpire = nullptr; + size_t m_numexpires = 0; // These two pointers are the same, UNLESS the database has been cleared. // in which case m_pdbSnapshot is NULL and we continue as though we weren' @@ -1310,7 +1313,7 @@ struct redisDb : public redisDbPersistentDataSnapshot friend int removeExpire(redisDb *db, robj *key); friend void setExpire(struct client *c, redisDb *db, robj *key, robj *subkey, long long when); friend void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e); - friend int evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool); + friend int evictionPoolPopulate(int dbid, redisDb *db, bool fVolatile, struct evictionPoolEntry *pool); friend void activeDefragCycle(void); friend void activeExpireCycle(int); friend void expireSlaveKeys(void); @@ -1319,9 +1322,7 @@ struct redisDb : public redisDbPersistentDataSnapshot typedef ::dict_const_iter const_iter; typedef ::dict_iter iter; - redisDb() - : expireitr(nullptr) - {} + redisDb() = default; void initialize(int id); void storageProviderInitialize(); @@ -1343,7 +1344,6 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::random_expire; using redisDbPersistentData::end; using redisDbPersistentData::getStats; - using redisDbPersistentData::getExpireStats; using redisDbPersistentData::insert; using redisDbPersistentData::tryResize; using redisDbPersistentData::incrementallyRehash; @@ -1361,15 +1361,12 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::processChanges; using redisDbPersistentData::processChangesAsync; using redisDbPersistentData::commitChanges; - using redisDbPersistentData::setexpireUnsafe; - using redisDbPersistentData::setexpire; using redisDbPersistentData::endSnapshot; using redisDbPersistentData::restoreSnapshot; using redisDbPersistentData::removeAllCachedValues; using redisDbPersistentData::disableKeyCache; using redisDbPersistentData::keycacheIsEnabled; using redisDbPersistentData::dictUnsafeKeyOnly; - using redisDbPersistentData::resortExpire; using redisDbPersistentData::prefetchKeysAsync; using redisDbPersistentData::prepOverwriteForSnapshot; using redisDbPersistentData::FRehashing; @@ -1386,7 +1383,7 @@ struct redisDb : public redisDbPersistentDataSnapshot return psnapshot; } - expireset::setiter expireitr; + unsigned long expires_cursor = 0; dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ dict *ready_keys; /* Blocked keys that received a PUSH */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ @@ -2247,7 +2244,6 @@ struct redisServerConst { int maxidletime; /* Client timeout in seconds */ int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */ int active_expire_enabled; /* Can be disabled for testing purposes. */ - int active_expire_effort; /* From 1 (default) to 10, active effort. */ int active_defrag_enabled; int jemalloc_bg_thread; /* Enable jemalloc background thread */ size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */ @@ -2357,6 +2353,7 @@ struct redisServer { unsigned int loading_process_events_interval_keys; int active_expire_enabled; /* Can be disabled for testing purposes. */ + int active_expire_effort; /* From 1 (default) to 10, active effort. */ int replicaIsolationFactor = 1; @@ -3180,8 +3177,8 @@ int equalStringObjects(robj *a, robj *b); unsigned long long estimateObjectIdleTime(robj_roptr o); void trimStringObjectIfNeeded(robj *o); -robj *deserializeStoredObject(const redisDbPersistentData *db, const char *key, const void *data, size_t cb); -std::unique_ptr deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset); +robj *deserializeStoredObject(const void *data, size_t cb); +std::unique_ptr deserializeExpire(const char *str, size_t cch, size_t *poffset); sds serializeStoredObject(robj_roptr o, sds sdsPrefix = nullptr); #define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR) diff --git a/src/snapshot.cpp b/src/snapshot.cpp index dca1071d4..4f158803d 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -74,6 +74,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 spdb->m_fTrackingChanges = 0; spdb->m_pdict = m_pdict; spdb->m_pdictTombstone = m_pdictTombstone; + spdb->m_numexpires = m_numexpires; // Add a fake iterator so the dicts don't rehash (they need to be read only) dictPauseRehashing(spdb->m_pdict); dictForceRehash(spdb->m_pdictTombstone); // prevent rehashing by finishing the rehash now @@ -83,12 +84,6 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 spdb->m_pdbSnapshot = m_pdbSnapshot; spdb->m_refCount = 1; spdb->m_mvccCheckpoint = getMvccTstamp(); - if (m_setexpire != nullptr) - { - std::unique_lock ul(g_expireLock); - spdb->m_setexpire = new (MALLOC_LOCAL) expireset(*m_setexpire); - spdb->m_setexpire->pause_rehash(); // needs to be const - } if (dictIsRehashing(spdb->m_pdict) || dictIsRehashing(spdb->m_pdictTombstone)) { serverLog(LL_VERBOSE, "NOTICE: Suboptimal snapshot"); @@ -171,11 +166,6 @@ void redisDbPersistentData::restoreSnapshot(const redisDbPersistentDataSnapshot size_t expectedSize = psnapshot->size(); dictEmpty(m_pdict, nullptr); dictEmpty(m_pdictTombstone, nullptr); - { - std::unique_lock ul(g_expireLock); - delete m_setexpire; - m_setexpire = new (MALLOC_LOCAL) expireset(*psnapshot->m_setexpire); - } endSnapshot(psnapshot); serverAssert(size() == expectedSize); } @@ -597,8 +587,8 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe_core(std::function(data)+offset, cbData-offset); + deserializeExpire((const char*)data, cbData, &offset); + o = deserializeStoredObject(reinterpret_cast(data)+offset, cbData-offset); } fContinue = fn(sdsKey, o); if (o != nullptr) diff --git a/tests/unit/cron.tcl b/tests/unit/cron.tcl index fcbe90301..69cb61e1e 100644 --- a/tests/unit/cron.tcl +++ b/tests/unit/cron.tcl @@ -18,7 +18,7 @@ start_server {tags {"CRON"} overrides {hz 100} } { test {keydb.cron repeat works} { r flushall - r keydb.cron testjob repeat 0 600 {redis.call("incr","testkey")} + r keydb.cron testjob repeat 0 900 {redis.call("incr","testkey")} after 1000 assert_equal 2 [r get testkey] }