Skip to content

Commit

Permalink
Merge branch 'main' into RELEASE_6_3
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnSully committed May 20, 2022
2 parents aa2143f + 38310ec commit 91d0975
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 47 deletions.
13 changes: 13 additions & 0 deletions COPYING
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Copyright (c) 2006-2020, Salvatore Sanfilippo
Copyright (C) 2019-2021, John Sully
Copyright (C) 2020-2021, EQ Alpha Technology Ltd.
Copyright (C) 2022 Snap Inc.
All rights reserved.

Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
* Neither the name of Redis nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ Avoid forwarding RREPLAY messages to other masters? WARNING: This setting is dan


```
scratch-file-path /path
db-s3-object /path/to/bucket
```
If you would like KeyDB to dump and load directly to AWS S3 this option specifies the bucket. Using this option with the traditional RDB options will result in KeyDB backing up twice to both locations. If both are specified KeyDB will first attempt to load from the local dump file and if that fails load from S3. This requires the AWS CLI tools to be installed and configured which are used under the hood to transfer the data.

Expand Down
14 changes: 14 additions & 0 deletions pkg/deb/master_changelog
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
keydb (6:6.3.0-1distribution_placeholder) codename_placeholder; urgency=medium

* This release open sources KeyDB Enterprise features into the open source project along with PSYNC for active replication
* Partial synchronization for active replication is introduced
* MVCC introduced into codebase from KeyDB Enterprise
* Async commands added: GET, MGET. These will see perf improvements
* KEYS and SCAN commands will no longer be blocking calls
* Async Rehash implemented for additional stability to perf
* IStorage interface added
* In-process background saving (forkless) to comply with maxmemory setting
* See v6.3.0 tagged release notes on github for a detailed explanation of these changes

-- Ben Schermel <[email protected]> Wed, 11 May 2022 20:00:37 +0000

keydb (6:6.2.2-1distribution_placeholder) codename_placeholder; urgency=medium

* Acquire lock in module.cpp to fix module test break
Expand Down
2 changes: 1 addition & 1 deletion src/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2782,7 +2782,7 @@ standardConfig configs[] = {
createBoolConfig("disable-thp", NULL, MODIFIABLE_CONFIG, g_pserver->disable_thp, 1, NULL, NULL),
createBoolConfig("cluster-allow-replica-migration", NULL, MODIFIABLE_CONFIG, g_pserver->cluster_allow_replica_migration, 1, NULL, NULL),
createBoolConfig("replica-announced", NULL, MODIFIABLE_CONFIG, g_pserver->replica_announced, 1, NULL, NULL),
createBoolConfig("enable-async-commands", NULL, MODIFIABLE_CONFIG, g_pserver->enable_async_commands, 1, NULL, NULL),
createBoolConfig("enable-async-commands", NULL, MODIFIABLE_CONFIG, g_pserver->enable_async_commands, 0, NULL, NULL),
createBoolConfig("multithread-load-enabled", NULL, MODIFIABLE_CONFIG, g_pserver->multithread_load_enabled, 0, NULL, NULL),
createBoolConfig("active-client-balancing", NULL, MODIFIABLE_CONFIG, g_pserver->active_client_balancing, 1, NULL, NULL),

Expand Down
10 changes: 8 additions & 2 deletions src/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2669,6 +2669,11 @@ void redisDbPersistentData::prepOverwriteForSnapshot(char *key)
auto itr = m_pdbSnapshot->find_cached_threadsafe(key);
if (itr.key() != nullptr)
{
if (itr.val()->FExpires()) {
// Note: I'm sure we could handle this, but its too risky at the moment.
// There are known bugs doing this with expires
return;
}
sds keyNew = sdsdupshared(itr.key());
if (dictAdd(m_pdictTombstone, keyNew, (void*)dictHashKey(m_pdict, key)) != DICT_OK)
sdsfree(keyNew);
Expand Down Expand Up @@ -3263,10 +3268,11 @@ bool redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command
dictEntry **table;
__atomic_load(&c->db->m_pdict->ht[iht].table, &table, __ATOMIC_RELAXED);
if (table != nullptr) {
dictEntry *de = table[hT];
dictEntry *de;
__atomic_load(&table[hT], &de, __ATOMIC_ACQUIRE);
while (de != nullptr) {
_mm_prefetch(dictGetKey(de), _MM_HINT_T2);
de = de->next;
__atomic_load(&de->next, &de, __ATOMIC_ACQUIRE);
}
}
if (!dictIsRehashing(c->db->m_pdict))
Expand Down
32 changes: 29 additions & 3 deletions src/dict.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ int _dictInit(dict *d, dictType *type,
d->pauserehash = 0;
d->asyncdata = nullptr;
d->refcount = 1;
d->noshrink = false;
return DICT_OK;
}

Expand Down Expand Up @@ -204,15 +205,15 @@ int dictMerge(dict *dst, dict *src)

if (dictSize(dst) == 0)
{
std::swap(*dst, *src);
dict::swap(*dst, *src);
std::swap(dst->pauserehash, src->pauserehash);
return DICT_OK;
}

size_t expectedSize = dictSize(src) + dictSize(dst);
if (dictSize(src) > dictSize(dst) && src->asyncdata == nullptr && dst->asyncdata == nullptr)
{
std::swap(*dst, *src);
dict::swap(*dst, *src);
std::swap(dst->pauserehash, src->pauserehash);
}

Expand Down Expand Up @@ -402,7 +403,7 @@ int dictRehash(dict *d, int n) {

dictAsyncRehashCtl::dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next) : dict(d), next(next) {
queue.reserve(c_targetQueueSize);
__atomic_fetch_add(&d->refcount, 1, __ATOMIC_RELEASE);
__atomic_fetch_add(&d->refcount, 1, __ATOMIC_ACQ_REL);
this->rehashIdxBase = d->rehashidx;
}

Expand Down Expand Up @@ -446,6 +447,9 @@ dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) {
}

void dictRehashAsync(dictAsyncRehashCtl *ctl) {
if (ctl->abondon.load(std::memory_order_acquire)) {
ctl->hashIdx = ctl->queue.size();
}
for (size_t idx = ctl->hashIdx; idx < ctl->queue.size(); ++idx) {
auto &wi = ctl->queue[idx];
wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de));
Expand All @@ -455,6 +459,9 @@ void dictRehashAsync(dictAsyncRehashCtl *ctl) {
}

bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes) {
if (ctl->abondon.load(std::memory_order_acquire)) {
ctl->hashIdx = ctl->queue.size();
}
size_t max = std::min(ctl->hashIdx + hashes, ctl->queue.size());
for (; ctl->hashIdx < max; ++ctl->hashIdx) {
auto &wi = ctl->queue[ctl->hashIdx];
Expand All @@ -465,6 +472,23 @@ bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes) {
return ctl->hashIdx < ctl->queue.size();
}


void discontinueAsyncRehash(dict *d) {
// We inform our async rehashers and the completion function the results are to be
// abandoned. We keep the asyncdata linked in so that dictEntry's are still added
// to the GC list. This is because we can't gurantee when the other threads will
// stop looking at them.
if (d->asyncdata != nullptr) {
auto adata = d->asyncdata;
while (adata != nullptr && !adata->abondon.load(std::memory_order_relaxed)) {
adata->abondon = true;
adata = adata->next;
}
if (dictIsRehashing(d))
d->rehashidx = 0;
}
}

void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree) {
dict *d = ctl->dict;
assert(ctl->done);
Expand Down Expand Up @@ -786,6 +810,8 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
if (callback && (i & 65535) == 0) callback(d->privdata);

if ((he = ht->table[i]) == NULL) continue;
dictEntry *deNull = nullptr;
__atomic_store(&ht->table[i], &deNull, __ATOMIC_RELEASE);
while(he) {
nextHe = he->next;
if (d->asyncdata && (ssize_t)i < d->rehashidx) {
Expand Down
22 changes: 22 additions & 0 deletions src/dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,16 @@ struct dictAsyncRehashCtl {
std::atomic<bool> abondon { false };

dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next);
dictAsyncRehashCtl(const dictAsyncRehashCtl&) = delete;
dictAsyncRehashCtl(dictAsyncRehashCtl&&) = delete;
~dictAsyncRehashCtl();
};
#else
struct dictAsyncRehashCtl;
#endif

void discontinueAsyncRehash(dict *d);

typedef struct dict {
dictType *type;
void *privdata;
Expand All @@ -125,6 +129,24 @@ typedef struct dict {
dictAsyncRehashCtl *asyncdata;
int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
uint8_t noshrink = false;

#ifdef __cplusplus
dict() = default;
dict(dict &) = delete; // No Copy Ctor

static void swap(dict& a, dict& b) {
discontinueAsyncRehash(&a);
discontinueAsyncRehash(&b);
std::swap(a.type, b.type);
std::swap(a.privdata, b.privdata);
std::swap(a.ht[0], b.ht[0]);
std::swap(a.ht[1], b.ht[1]);
std::swap(a.rehashidx, b.rehashidx);
// Never swap refcount - they are attached to the specific dict obj
std::swap(a.pauserehash, b.pauserehash);
std::swap(a.noshrink, b.noshrink);
}
#endif
} dict;

/* If safe is set to 1 this is a safe iterator, that means, you can call
Expand Down
31 changes: 17 additions & 14 deletions src/gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <vector>
#include <assert.h>
#include <unordered_set>
#include <list>

struct ICollectable
{
Expand Down Expand Up @@ -45,14 +46,14 @@ class GarbageCollector
void shutdown()
{
std::unique_lock<fastlock> lock(m_lock);
m_vecepochs.clear();
m_listepochs.clear();
m_setepochOutstanding.clear();
}

bool empty() const
{
std::unique_lock<fastlock> lock(m_lock);
return m_vecepochs.empty();
return m_listepochs.empty();
}

void endEpoch(uint64_t epoch, bool fNoFree = false)
Expand All @@ -63,12 +64,12 @@ class GarbageCollector
m_setepochOutstanding.erase(epoch);
if (fNoFree)
return;
std::vector<EpochHolder> vecclean;
std::list<EpochHolder> listclean;

// No outstanding epochs?
if (m_setepochOutstanding.empty())
{
vecclean = std::move(m_vecepochs); // Everything goes!
listclean = std::move(m_listepochs); // Everything goes!
}
else
{
Expand All @@ -77,18 +78,20 @@ class GarbageCollector
return; // No available epochs to free

// Clean any epochs available (after the lock)
for (size_t iepoch = 0; iepoch < m_vecepochs.size(); ++iepoch)
for (auto itr = m_listepochs.begin(); itr != m_listepochs.end(); /* itr incremented in loop*/)
{
auto &e = m_vecepochs[iepoch];
auto &e = *itr;
auto itrNext = itr;
++itrNext;
if (e < minepoch)
{
vecclean.emplace_back(std::move(e));
m_vecepochs.erase(m_vecepochs.begin() + iepoch);
--iepoch;
listclean.emplace_back(std::move(e));
m_listepochs.erase(itr);
}
itr = itrNext;
}

assert(vecclean.empty() || fMinElement);
assert(listclean.empty() || fMinElement);
}

lock.unlock(); // don't hold it for the potentially long delete of vecclean
Expand All @@ -100,13 +103,13 @@ class GarbageCollector
serverAssert(m_setepochOutstanding.find(epoch) != m_setepochOutstanding.end());
serverAssert(sp->FWillFreeChildDebug() == false);

auto itr = std::find(m_vecepochs.begin(), m_vecepochs.end(), m_epochNext+1);
if (itr == m_vecepochs.end())
auto itr = std::find(m_listepochs.begin(), m_listepochs.end(), m_epochNext+1);
if (itr == m_listepochs.end())
{
EpochHolder e;
e.tstamp = m_epochNext+1;
e.m_vecObjs.push_back(std::move(sp));
m_vecepochs.emplace_back(std::move(e));
m_listepochs.emplace_back(std::move(e));
}
else
{
Expand All @@ -117,7 +120,7 @@ class GarbageCollector
private:
mutable fastlock m_lock { "Garbage Collector"};

std::vector<EpochHolder> m_vecepochs;
std::list<EpochHolder> m_listepochs;
std::unordered_set<uint64_t> m_setepochOutstanding;
uint64_t m_epochNext = 0;
};
10 changes: 6 additions & 4 deletions src/networking.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1810,6 +1810,8 @@ int writeToClient(client *c, int handler_installed) {
is a replica, so only attempt to do so if that's the case. */
if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR) && c->replstate == SLAVE_STATE_ONLINE) {
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
// Ensure all writes to the repl backlog are visible
std::atomic_thread_fence(std::memory_order_acquire);

while (clientHasPendingReplies(c)) {
long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off);
Expand Down Expand Up @@ -2077,8 +2079,6 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
* that may trigger write error or recreate handler. */
if ((flags & CLIENT_PROTECTED) && !(flags & CLIENT_SLAVE)) continue;

//std::unique_lock<decltype(c->lock)> lock(c->lock);

/* Don't write to clients that are going to be closed anyway. */
if (c->flags & CLIENT_CLOSE_ASAP) continue;

Expand All @@ -2096,6 +2096,7 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {

/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
std::unique_lock<decltype(c->lock)> lock(c->lock);
if (clientHasPendingReplies(c)) {
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR) {
freeClientAsync(c);
Expand Down Expand Up @@ -2742,9 +2743,10 @@ void readQueryFromClient(connection *conn) {
parseClientCommandBuffer(c);
if (g_pserver->enable_async_commands && !serverTL->disable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode)) {
// Frequent writers aren't good candidates for this optimization, they cause us to renew the snapshot too often
// so we exclude them unless the snapshot we need already exists
// so we exclude them unless the snapshot we need already exists.
// Note: In test mode we want to create snapshots as often as possibl to excercise them - we don't care about perf
bool fSnapshotExists = c->db->mvccLastSnapshot >= c->mvccCheckpoint;
bool fWriteTooRecent = (((getMvccTstamp() - c->mvccCheckpoint) >> MVCC_MS_SHIFT) < static_cast<uint64_t>(g_pserver->snapshot_slip)/2);
bool fWriteTooRecent = !g_fTestMode && (((getMvccTstamp() - c->mvccCheckpoint) >> MVCC_MS_SHIFT) < static_cast<uint64_t>(g_pserver->snapshot_slip)/2);

// The check below avoids running async commands if this is a frequent writer unless a snapshot is already there to service it
if (!fWriteTooRecent || fSnapshotExists) {
Expand Down
18 changes: 15 additions & 3 deletions src/rdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1657,13 +1657,18 @@ int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi)

g_pserver->rdbThreadVars.tmpfileNum++;
g_pserver->rdbThreadVars.fRdbThreadCancel = false;
if (pthread_create(&child, NULL, rdbSaveThread, args)) {
pthread_attr_t tattr;
pthread_attr_init(&tattr);
pthread_attr_setstacksize(&tattr, 1 << 23); // 8 MB
if (pthread_create(&child, &tattr, rdbSaveThread, args)) {
pthread_attr_destroy(&tattr);
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb]->endSnapshot(args->rgpdb[idb]);
args->~rdbSaveThreadArgs();
zfree(args);
return C_ERR;
}
pthread_attr_destroy(&tattr);
g_pserver->child_type = CHILD_TYPE_RDB;
}
return C_OK;
Expand Down Expand Up @@ -3049,7 +3054,9 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
(r->keys_since_last_callback >= g_pserver->loading_process_events_interval_keys)))
{
rdbAsyncWorkThread *pwthread = reinterpret_cast<rdbAsyncWorkThread*>(r->chksum_arg);
bool fUpdateReplication = (g_pserver->mstime - r->last_update) > 1000;
mstime_t mstime;
__atomic_load(&g_pserver->mstime, &mstime, __ATOMIC_RELAXED);
bool fUpdateReplication = (mstime - r->last_update) > 1000;

if (fUpdateReplication) {
listIter li;
Expand Down Expand Up @@ -3832,7 +3839,11 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {

g_pserver->rdbThreadVars.tmpfileNum++;
g_pserver->rdbThreadVars.fRdbThreadCancel = false;
if (pthread_create(&child, nullptr, rdbSaveToSlavesSocketsThread, args)) {
pthread_attr_t tattr;
pthread_attr_init(&tattr);
pthread_attr_setstacksize(&tattr, 1 << 23); // 8 MB
if (pthread_create(&child, &tattr, rdbSaveToSlavesSocketsThread, args)) {
pthread_attr_destroy(&tattr);
serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno));

Expand All @@ -3858,6 +3869,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
closeChildInfoPipe();
return C_ERR;
}
pthread_attr_destroy(&tattr);
g_pserver->child_type = CHILD_TYPE_RDB;

serverLog(LL_NOTICE,"Background RDB transfer started");
Expand Down
Loading

0 comments on commit 91d0975

Please sign in to comment.