diff --git a/src/replication.cpp b/src/replication.cpp index d0c3d1b91..afdb62256 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1158,10 +1158,23 @@ class replicationBuffer { } void putSlavesOnline() { + for (auto replica : replicas) { + std::unique_lock ul(replica->lock); + // If we put the replica online before the output is drained then it will get immediately closed + while (checkClientOutputBufferLimits(replica) + && (replica->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP) == 0) { + ul.unlock(); + usleep(1000); // give 1ms for the I/O before we poll again + ul.lock(); + } + } + + aeAcquireLock(); for (auto replica : replicas) { replica->replstate = SLAVE_STATE_FASTSYNC_DONE; replica->repl_put_online_on_ack = 1; } + aeReleaseLock(); } void abort() { @@ -1282,9 +1295,7 @@ int rdbSaveSnapshotForReplication(rdbSaveInfo *rsi) { auto usec = ustime() - timeStart; serverLog(LL_NOTICE, "Transferred %zuMB total (%zuMB data) in %.2f seconds. (%.2fGbit/s)", spreplBuf->cbWritten()/1024/1024, cbData/1024/1024, usec/1000000.0, (spreplBuf->cbWritten()*8.0)/(usec/1000000.0)/1000000000.0); if (retval == C_OK) { - aeAcquireLock(); replBuf.putSlavesOnline(); - aeReleaseLock(); } });