From 3efdc290c095050a5eb2002497dd4ce203a24d70 Mon Sep 17 00:00:00 2001 From: Uddipta Maity Date: Mon, 5 Oct 2015 18:16:26 -0700 Subject: [PATCH] WDT reconnect logic and block retry logic improvement Summary: WDT used to retry a block only certain number of times. Now it does not give up on an individual block. But, if certain number of reconnects are made without any progress, that thread ends. Reviewed By: @ldemailly Differential Revision: D2508644 fb-gh-sync-id: 1a0e06d6fae37e643cfea2897fa895240346630a --- CMakeLists.txt | 5 +- DirectorySourceQueue.cpp | 11 +--- ErrorCodes.h | 4 +- Protocol.cpp | 8 +++ Protocol.h | 9 +++- Sender.cpp | 96 ++++++++++++++++++++++++++------- Sender.h | 21 ++++---- WdtConfig.h | 4 +- WdtFlags.cpp.inc | 4 +- WdtOptions.h | 3 +- wdt_bad_server_test.py | 51 ++++++++++++++++++ wdt_download_resumption_test.sh | 2 +- wdt_network_test.sh | 2 +- 13 files changed, 173 insertions(+), 47 deletions(-) create mode 100755 wdt_bad_server_test.py diff --git a/CMakeLists.txt b/CMakeLists.txt index 6c26ad48..0d8e2224 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -22,7 +22,7 @@ cmake_minimum_required(VERSION 3.2) # There is no C per se in WDT but if you use CXX only here many checks fail # Version is Major.Minor.YYMMDDX for up to 10 releases per day # Minor currently is also the protocol version - has to match with Protocol.cpp -project("WDT" LANGUAGES C CXX VERSION 1.21.1510020) +project("WDT" LANGUAGES C CXX VERSION 1.21.1510050) # On MacOS this requires the latest (master) CMake (and/or CMake 3.1.1/3.2) set(CMAKE_CXX_STANDARD 11) @@ -302,4 +302,7 @@ if (BUILD_TESTING) add_test(NAME WdtOverwriteTest COMMAND "${CMAKE_CURRENT_SOURCE_DIR}/wdt_overwrite_test.py") + add_test(NAME WdtBadServerTest COMMAND + "${CMAKE_CURRENT_SOURCE_DIR}/wdt_bad_server_test.py") + endif(BUILD_TESTING) diff --git a/DirectorySourceQueue.cpp b/DirectorySourceQueue.cpp index a6bc660b..f9e0542d 100644 --- a/DirectorySourceQueue.cpp +++ b/DirectorySourceQueue.cpp @@ -396,15 +396,8 @@ void DirectorySourceQueue::returnToQueue( int returnedCount = 0; std::unique_lock lock(mutex_); for (auto &source : sources) { - const int64_t retries = source->getTransferStats().getFailedAttempts(); - if (retries >= options_.max_transfer_retries) { - LOG(ERROR) << source->getIdentifier() << " failed after " << retries - << " number of tries."; - failedSourceStats_.emplace_back(std::move(source->getTransferStats())); - } else { - sourceQueue_.push(std::move(source)); - returnedCount++; - } + sourceQueue_.push(std::move(source)); + returnedCount++; WDT_CHECK_GT(numBlocksDequeued_, 0); numBlocksDequeued_--; } diff --git a/ErrorCodes.h b/ErrorCodes.h index 99e9346a..1eef6654 100644 --- a/ErrorCodes.h +++ b/ErrorCodes.h @@ -49,7 +49,9 @@ namespace wdt { X(URI_PARSE_ERROR) /** Wdt uri passed couldn't be parsed */ \ X(INCONSISTENT_DIRECTORY) /** Destination directory is not consistent with \ transfer log */ \ - X(INVALID_LOG) /** Transfer log invalid */ + X(INVALID_LOG) /** Transfer log invalid */ \ + X(INVALID_CHECKPOINT) /** Received checkpoint is invalid */ \ + X(NO_PROGRESS) /** Transfer has not progressed */ enum ErrorCode { #define X(A) A, diff --git a/Protocol.cpp b/Protocol.cpp index eae347c7..018e60b9 100644 --- a/Protocol.cpp +++ b/Protocol.cpp @@ -48,6 +48,14 @@ int Protocol::negotiateProtocol(int requestedProtocolVersion, return std::min(curProtocolVersion, requestedProtocolVersion); } +std::ostream &operator<<(std::ostream &os, const Checkpoint &checkpoint) { + os << "num-blocks: " << checkpoint.numBlocks + << " seq-id: " << checkpoint.lastBlockSeqId + << " block-offset: " << checkpoint.lastBlockOffset + << " received-bytes: " << checkpoint.lastBlockReceivedBytes; + return os; +} + void FileChunksInfo::addChunk(const Interval &chunk) { chunks_.emplace_back(chunk); } diff --git a/Protocol.h b/Protocol.h index 3bcb5def..34602307 100644 --- a/Protocol.h +++ b/Protocol.h @@ -23,10 +23,15 @@ namespace wdt { /// blocks and number of bytes received for the last block struct Checkpoint { int32_t port{0}; + /// number of complete blocks received int64_t numBlocks{0}; - int64_t lastBlockReceivedBytes{0}; + /// Next three fields are only set if a block is received partially + /// seq-id of the partially received block int64_t lastBlockSeqId{-1}; + /// block offset of the partially received block int64_t lastBlockOffset{0}; + /// number of bytes received for the partially received block + int64_t lastBlockReceivedBytes{0}; bool hasSeqId{false}; Checkpoint() { } @@ -53,6 +58,8 @@ struct Checkpoint { } }; +std::ostream &operator<<(std::ostream &os, const Checkpoint &checkpoint); + /// structure representing a single chunk of a file struct Interval { /// start offset diff --git a/Sender.cpp b/Sender.cpp index 0af32d74..d1364a0b 100644 --- a/Sender.cpp +++ b/Sender.cpp @@ -58,7 +58,7 @@ bool ThreadTransferHistory::addSource(std::unique_ptr &source) { return true; } -int64_t ThreadTransferHistory::setCheckpointAndReturnToQueue( +ErrorCode ThreadTransferHistory::setCheckpointAndReturnToQueue( const Checkpoint &checkpoint, bool globalCheckpoint) { folly::SpinLockGuard guard(lock_); const int64_t historySize = history_.size(); @@ -68,25 +68,23 @@ int64_t ThreadTransferHistory::setCheckpointAndReturnToQueue( LOG(ERROR) << "checkpoint is greater than total number of sources transfered " << history_.size() << " " << numReceivedSources; - return -1; + return INVALID_CHECKPOINT; } - if (numReceivedSources < numAcknowledged_) { - LOG(ERROR) << "new checkpoint is less than older checkpoint " - << numAcknowledged_ << " " << numReceivedSources; - return -1; + ErrorCode errCode = validateCheckpoint(checkpoint, globalCheckpoint); + if (errCode == INVALID_CHECKPOINT) { + return INVALID_CHECKPOINT; } + globalCheckpoint_ |= globalCheckpoint; + lastCheckpoint_ = folly::make_unique(checkpoint); int64_t numFailedSources = historySize - numReceivedSources; if (numFailedSources == 0 && lastBlockReceivedBytes > 0) { - if (globalCheckpoint) { - lastCheckpoint_ = folly::make_unique(checkpoint); - } else { + if (!globalCheckpoint) { // no block to apply checkpoint offset. This can happen if we receive same // local checkpoint without adding anything to the history LOG(WARNING) << "Local checkpoint has received bytes for last block, but " "there are no unacked blocks in the history. Ignoring."; } } - globalCheckpoint_ |= globalCheckpoint; numAcknowledged_ = numReceivedSources; std::vector> sourcesToReturn; for (int64_t i = 0; i < numFailedSources; i++) { @@ -98,7 +96,10 @@ int64_t ThreadTransferHistory::setCheckpointAndReturnToQueue( sourcesToReturn.emplace_back(std::move(source)); } queue_.returnToQueue(sourcesToReturn); - return numFailedSources; + LOG(INFO) << numFailedSources + << " number of sources returned to queue, checkpoint: " + << checkpoint; + return errCode; } std::vector ThreadTransferHistory::popAckedSourceStats() { @@ -118,10 +119,54 @@ void ThreadTransferHistory::markAllAcknowledged() { numAcknowledged_ = history_.size(); } -int64_t ThreadTransferHistory::returnUnackedSourcesToQueue() { +void ThreadTransferHistory::returnUnackedSourcesToQueue() { Checkpoint checkpoint; checkpoint.numBlocks = numAcknowledged_; - return setCheckpointAndReturnToQueue(checkpoint, false); + setCheckpointAndReturnToQueue(checkpoint, false); +} + +ErrorCode ThreadTransferHistory::validateCheckpoint( + const Checkpoint &checkpoint, bool globalCheckpoint) { + if (lastCheckpoint_ == nullptr) { + return OK; + } + if (checkpoint.numBlocks < lastCheckpoint_->numBlocks) { + LOG(ERROR) << "Current checkpoint must be higher than previous checkpoint, " + "Last checkpoint: " + << *lastCheckpoint_ << ", Current checkpoint: " << checkpoint; + return INVALID_CHECKPOINT; + } + if (checkpoint.numBlocks > lastCheckpoint_->numBlocks) { + return OK; + } + bool noProgress = false; + // numBlocks same + if (checkpoint.lastBlockSeqId == lastCheckpoint_->lastBlockSeqId && + checkpoint.lastBlockOffset == lastCheckpoint_->lastBlockOffset) { + // same block + if (checkpoint.lastBlockReceivedBytes != + lastCheckpoint_->lastBlockReceivedBytes) { + LOG(ERROR) << "Current checkpoint has different received bytes, but all " + "other fields are same, Last checkpoint " + << *lastCheckpoint_ << ", Current checkpoint: " << checkpoint; + return INVALID_CHECKPOINT; + } + noProgress = true; + } else { + // different block + WDT_CHECK(checkpoint.lastBlockReceivedBytes >= 0); + if (checkpoint.lastBlockReceivedBytes == 0) { + noProgress = true; + } + } + if (noProgress && !globalCheckpoint) { + // we can get same global checkpoint multiple times, so no need to check for + // progress + LOG(WARNING) << "No progress since last checkpoint, Last checkpoint: " + << *lastCheckpoint_ << ", Current checkpoint: " << checkpoint; + return NO_PROGRESS; + } + return OK; } void ThreadTransferHistory::markSourceAsFailed( @@ -544,10 +589,19 @@ Sender::SenderState Sender::connect(ThreadData &data) { int port = ports_[data.threadIndex_]; TransferStats &threadStats = data.threadStats_; auto &socket = data.socket_; + auto &numReconnectWithoutProgress = data.numReconnectWithoutProgress_; + auto &options = WdtOptions::get(); if (socket) { socket->close(); } + if (numReconnectWithoutProgress >= options.max_transfer_retries) { + LOG(ERROR) << "Sender thread reconnected " << numReconnectWithoutProgress + << " times without making any progress, giving up. port: " + << socket->getPort(); + threadStats.setErrorCode(NO_PROGRESS); + return END; + } ErrorCode code; socket = connectToReceiver(port, code); @@ -577,6 +631,7 @@ Sender::SenderState Sender::readLocalCheckPoint(ThreadData &data) { int port = ports_[data.threadIndex_]; TransferStats &threadStats = data.threadStats_; ThreadTransferHistory &transferHistory = data.getTransferHistory(); + auto &numReconnectWithoutProgress = data.numReconnectWithoutProgress_; std::vector checkpoints; int64_t decodeOffset = 0; @@ -584,9 +639,10 @@ Sender::SenderState Sender::readLocalCheckPoint(ThreadData &data) { int checkpointLen = Protocol::getMaxLocalCheckpointLength(protocolVersion_); int64_t numRead = data.socket_->read(buf, checkpointLen); if (numRead != checkpointLen) { - VLOG(1) << "read mismatch " << checkpointLen << " " << numRead << " port " - << port; + LOG(ERROR) << "read mismatch during reading local checkpoint " + << checkpointLen << " " << numRead << " port " << port; threadStats.setErrorCode(SOCKET_READ_ERROR); + numReconnectWithoutProgress++; return CONNECT; } if (!Protocol::decodeCheckpoints(protocolVersion_, buf, decodeOffset, @@ -614,13 +670,17 @@ Sender::SenderState Sender::readLocalCheckPoint(ThreadData &data) { return READ_RECEIVER_CMD; } - auto numReturned = + ErrorCode errCode = transferHistory.setCheckpointAndReturnToQueue(checkpoint, false); - if (numReturned == -1) { + if (errCode == INVALID_CHECKPOINT) { threadStats.setErrorCode(PROTOCOL_ERROR); return END; } - VLOG(1) << numReturned << " number of source(s) returned to queue"; + if (errCode == NO_PROGRESS) { + numReconnectWithoutProgress++; + } else { + numReconnectWithoutProgress = 0; + } return SEND_SETTINGS; } diff --git a/Sender.h b/Sender.h index dec386ca..73cb9c57 100644 --- a/Sender.h +++ b/Sender.h @@ -57,11 +57,10 @@ class ThreadTransferHistory { * * @param checkpoint checkpoint received * @param globalCheckpoint global or local checkpoint - * @return number of sources returned to queue, -1 in - * case of error + * @return number of sources returned to queue */ - int64_t setCheckpointAndReturnToQueue(const Checkpoint &checkpoint, - bool globalCheckpoint); + ErrorCode setCheckpointAndReturnToQueue(const Checkpoint &checkpoint, + bool globalCheckpoint); /** * @return stats for acked sources, must be called after all the @@ -72,13 +71,8 @@ class ThreadTransferHistory { /// marks all the sources as acked void markAllAcknowledged(); - /** - * returns all unacked sources to the queue - * - * - * @return number of sources returned to queue, -1 in case of error - */ - int64_t returnUnackedSourcesToQueue(); + /// returns all unacked sources to the queue + void returnUnackedSourcesToQueue(); /** * @return number of sources acked by the receiver @@ -88,6 +82,9 @@ class ThreadTransferHistory { } private: + ErrorCode validateCheckpoint(const Checkpoint &checkpoint, + bool globalCheckpoint); + void markSourceAsFailed(std::unique_ptr &source, const Checkpoint *checkpoint); @@ -274,6 +271,8 @@ class Sender : public WdtBase { char buf_[Protocol::kMinBufLength]; /// whether total file size has been sent to the receiver bool totalSizeSent_{false}; + /// number of consecutive reconnects without any progress + int numReconnectWithoutProgress_{0}; ThreadData(int threadIndex, TransferStats &threadStats, std::vector &transferHistories) : threadIndex_(threadIndex), diff --git a/WdtConfig.h b/WdtConfig.h index ea3a7d31..0b2c8ac0 100644 --- a/WdtConfig.h +++ b/WdtConfig.h @@ -8,9 +8,9 @@ #define WDT_VERSION_MAJOR 1 #define WDT_VERSION_MINOR 21 -#define WDT_VERSION_BUILD 1510020 +#define WDT_VERSION_BUILD 1510050 // Add -fbcode to version str -#define WDT_VERSION_STR "1.21.1510020-fbcode" +#define WDT_VERSION_STR "1.21.1510050-fbcode" // Tie minor and proto version #define WDT_PROTOCOL_VERSION WDT_VERSION_MINOR diff --git a/WdtFlags.cpp.inc b/WdtFlags.cpp.inc index a0c6b4ce..83b6cd28 100644 --- a/WdtFlags.cpp.inc +++ b/WdtFlags.cpp.inc @@ -31,7 +31,9 @@ WDT_OPT(skip_writes, bool, "Skip writes on the receiver side"); WDT_OPT(backlog, int32, "Accept backlog"); WDT_OPT(buffer_size, int32, "Buffer size (per thread/socket)"); WDT_OPT(max_retries, int32, "how many attempts to connect/listen"); -WDT_OPT(max_transfer_retries, int32, "Max number of retries for a source"); +WDT_OPT(max_transfer_retries, int32, + "Maximum number of times sender thread reconnects without making any " + "progress"); WDT_OPT(sleep_millis, int32, "how many ms to wait between attempts"); WDT_OPT(block_size_mbytes, double, "Size of the blocks that files will be divided in, specify negative " diff --git a/WdtOptions.h b/WdtOptions.h index 2ff2124f..115c8a5d 100644 --- a/WdtOptions.h +++ b/WdtOptions.h @@ -148,7 +148,8 @@ class WdtOptions { std::string prune_dir_regex{""}; /** - * Maximum number of retries for transferring a file + * Maximum number of times sender thread reconnects without making any + * progress */ int max_transfer_retries{3}; diff --git a/wdt_bad_server_test.py b/wdt_bad_server_test.py new file mode 100755 index 00000000..55c40717 --- /dev/null +++ b/wdt_bad_server_test.py @@ -0,0 +1,51 @@ +#! /usr/bin/env python + +import socket +from time import time +from common_utils import * + +def start_server(): + s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + s.bind(("", 0)) + s.listen(5) + return s + +def main(): + s = start_server() + port = s.getsockname()[1] + print("Started server at port {0}".format(port)) + + read_timeout = 500 + abort_check_interval = 100 + retries = 3 + # start a wdt sender + sender_cmd = ("_bin/wdt/wdt -directory wdt/ -ipv6 -start_port={0} " + "-num_ports=1 -destination=localhost " + "-max_transfer_retries={1} -read_timeout_millis={2} " + "-abort_after_seconds=5 " + "-abort_check_interval_millis={3}").format( + port, + retries, + read_timeout, + abort_check_interval) + print(sender_cmd) + start_time = time() + os.system(sender_cmd) + end_time = time() + s.close() + duration_millis = (end_time - start_time) * 1000 + # have to count first read_receiver_cmd + num_retries = retries + 1 + expected_duration = (read_timeout + abort_check_interval) * num_retries + # adding extra 500 millis delay because delay introduced during testing + expected_duration += 500 + print("Transfer duration {0} millis, expected duration {1} millis").format( + duration_millis, + expected_duration) + if duration_millis > expected_duration: + exit(1) + else: + exit(0) + +if __name__ == "__main__": + main() diff --git a/wdt_download_resumption_test.sh b/wdt_download_resumption_test.sh index 4ed71164..24bf7989 100755 --- a/wdt_download_resumption_test.sh +++ b/wdt_download_resumption_test.sh @@ -109,7 +109,7 @@ WDTBIN_OPTS="-ipv6 -num_ports=$threads -full_reporting \ -enable_download_resumption -keep_transfer_log=false \ -treat_fewer_port_as_error \ -resume_using_dir_tree=$RESUME_USING_DIR_TREE -enable_perf_stat_collection \ --connect_timeout_millis 100 -max_transfer_retries=6" +-connect_timeout_millis 100" WDTBIN_CLIENT="$WDT_SENDER $WDTBIN_OPTS" WDTBIN_SERVER="$WDT_RECEIVER $WDTBIN_OPTS" diff --git a/wdt_network_test.sh b/wdt_network_test.sh index 49dcc998..d2e47a88 100755 --- a/wdt_network_test.sh +++ b/wdt_network_test.sh @@ -75,7 +75,7 @@ WDTBIN_OPTS="-enable_perf_stat_collection -ipv6 -start_port=$STARTING_PORT \ -avg_mbytes_per_sec=60 -max_mbytes_per_sec=65 -run_as_daemon=false \ -full_reporting -read_timeout_millis=495 -write_timeout_millis=495 \ -progress_report_interval_millis=-1 -abort_check_interval_millis=100 \ --max_transfer_retries=26 -treat_fewer_port_as_error \ +-treat_fewer_port_as_error \ -connect_timeout_millis 100 -transfer_id $$ -num_ports=$threads" WDTBIN_SERVER="$WDT_RECEIVER $WDTBIN_OPTS \ -protocol_version=$RECEIVER_PROTOCOL_VERSION"