diff --git a/CMakeLists.txt b/CMakeLists.txt index 6c26ad48..0d8e2224 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -22,7 +22,7 @@ cmake_minimum_required(VERSION 3.2) # There is no C per se in WDT but if you use CXX only here many checks fail # Version is Major.Minor.YYMMDDX for up to 10 releases per day # Minor currently is also the protocol version - has to match with Protocol.cpp -project("WDT" LANGUAGES C CXX VERSION 1.21.1510020) +project("WDT" LANGUAGES C CXX VERSION 1.21.1510050) # On MacOS this requires the latest (master) CMake (and/or CMake 3.1.1/3.2) set(CMAKE_CXX_STANDARD 11) @@ -302,4 +302,7 @@ if (BUILD_TESTING) add_test(NAME WdtOverwriteTest COMMAND "${CMAKE_CURRENT_SOURCE_DIR}/wdt_overwrite_test.py") + add_test(NAME WdtBadServerTest COMMAND + "${CMAKE_CURRENT_SOURCE_DIR}/wdt_bad_server_test.py") + endif(BUILD_TESTING) diff --git a/DirectorySourceQueue.cpp b/DirectorySourceQueue.cpp index a6bc660b..f9e0542d 100644 --- a/DirectorySourceQueue.cpp +++ b/DirectorySourceQueue.cpp @@ -396,15 +396,8 @@ void DirectorySourceQueue::returnToQueue( int returnedCount = 0; std::unique_lock lock(mutex_); for (auto &source : sources) { - const int64_t retries = source->getTransferStats().getFailedAttempts(); - if (retries >= options_.max_transfer_retries) { - LOG(ERROR) << source->getIdentifier() << " failed after " << retries - << " number of tries."; - failedSourceStats_.emplace_back(std::move(source->getTransferStats())); - } else { - sourceQueue_.push(std::move(source)); - returnedCount++; - } + sourceQueue_.push(std::move(source)); + returnedCount++; WDT_CHECK_GT(numBlocksDequeued_, 0); numBlocksDequeued_--; } diff --git a/ErrorCodes.h b/ErrorCodes.h index 99e9346a..1eef6654 100644 --- a/ErrorCodes.h +++ b/ErrorCodes.h @@ -49,7 +49,9 @@ namespace wdt { X(URI_PARSE_ERROR) /** Wdt uri passed couldn't be parsed */ \ X(INCONSISTENT_DIRECTORY) /** Destination directory is not consistent with \ transfer log */ \ - X(INVALID_LOG) /** Transfer log invalid */ + X(INVALID_LOG) /** Transfer log invalid */ \ + X(INVALID_CHECKPOINT) /** Received checkpoint is invalid */ \ + X(NO_PROGRESS) /** Transfer has not progressed */ enum ErrorCode { #define X(A) A, diff --git a/Protocol.cpp b/Protocol.cpp index eae347c7..018e60b9 100644 --- a/Protocol.cpp +++ b/Protocol.cpp @@ -48,6 +48,14 @@ int Protocol::negotiateProtocol(int requestedProtocolVersion, return std::min(curProtocolVersion, requestedProtocolVersion); } +std::ostream &operator<<(std::ostream &os, const Checkpoint &checkpoint) { + os << "num-blocks: " << checkpoint.numBlocks + << " seq-id: " << checkpoint.lastBlockSeqId + << " block-offset: " << checkpoint.lastBlockOffset + << " received-bytes: " << checkpoint.lastBlockReceivedBytes; + return os; +} + void FileChunksInfo::addChunk(const Interval &chunk) { chunks_.emplace_back(chunk); } diff --git a/Protocol.h b/Protocol.h index 3bcb5def..34602307 100644 --- a/Protocol.h +++ b/Protocol.h @@ -23,10 +23,15 @@ namespace wdt { /// blocks and number of bytes received for the last block struct Checkpoint { int32_t port{0}; + /// number of complete blocks received int64_t numBlocks{0}; - int64_t lastBlockReceivedBytes{0}; + /// Next three fields are only set if a block is received partially + /// seq-id of the partially received block int64_t lastBlockSeqId{-1}; + /// block offset of the partially received block int64_t lastBlockOffset{0}; + /// number of bytes received for the partially received block + int64_t lastBlockReceivedBytes{0}; bool hasSeqId{false}; Checkpoint() { } @@ -53,6 +58,8 @@ struct Checkpoint { } }; +std::ostream &operator<<(std::ostream &os, const Checkpoint &checkpoint); + /// structure representing a single chunk of a file struct Interval { /// start offset diff --git a/Sender.cpp b/Sender.cpp index 0af32d74..d1364a0b 100644 --- a/Sender.cpp +++ b/Sender.cpp @@ -58,7 +58,7 @@ bool ThreadTransferHistory::addSource(std::unique_ptr &source) { return true; } -int64_t ThreadTransferHistory::setCheckpointAndReturnToQueue( +ErrorCode ThreadTransferHistory::setCheckpointAndReturnToQueue( const Checkpoint &checkpoint, bool globalCheckpoint) { folly::SpinLockGuard guard(lock_); const int64_t historySize = history_.size(); @@ -68,25 +68,23 @@ int64_t ThreadTransferHistory::setCheckpointAndReturnToQueue( LOG(ERROR) << "checkpoint is greater than total number of sources transfered " << history_.size() << " " << numReceivedSources; - return -1; + return INVALID_CHECKPOINT; } - if (numReceivedSources < numAcknowledged_) { - LOG(ERROR) << "new checkpoint is less than older checkpoint " - << numAcknowledged_ << " " << numReceivedSources; - return -1; + ErrorCode errCode = validateCheckpoint(checkpoint, globalCheckpoint); + if (errCode == INVALID_CHECKPOINT) { + return INVALID_CHECKPOINT; } + globalCheckpoint_ |= globalCheckpoint; + lastCheckpoint_ = folly::make_unique(checkpoint); int64_t numFailedSources = historySize - numReceivedSources; if (numFailedSources == 0 && lastBlockReceivedBytes > 0) { - if (globalCheckpoint) { - lastCheckpoint_ = folly::make_unique(checkpoint); - } else { + if (!globalCheckpoint) { // no block to apply checkpoint offset. This can happen if we receive same // local checkpoint without adding anything to the history LOG(WARNING) << "Local checkpoint has received bytes for last block, but " "there are no unacked blocks in the history. Ignoring."; } } - globalCheckpoint_ |= globalCheckpoint; numAcknowledged_ = numReceivedSources; std::vector> sourcesToReturn; for (int64_t i = 0; i < numFailedSources; i++) { @@ -98,7 +96,10 @@ int64_t ThreadTransferHistory::setCheckpointAndReturnToQueue( sourcesToReturn.emplace_back(std::move(source)); } queue_.returnToQueue(sourcesToReturn); - return numFailedSources; + LOG(INFO) << numFailedSources + << " number of sources returned to queue, checkpoint: " + << checkpoint; + return errCode; } std::vector ThreadTransferHistory::popAckedSourceStats() { @@ -118,10 +119,54 @@ void ThreadTransferHistory::markAllAcknowledged() { numAcknowledged_ = history_.size(); } -int64_t ThreadTransferHistory::returnUnackedSourcesToQueue() { +void ThreadTransferHistory::returnUnackedSourcesToQueue() { Checkpoint checkpoint; checkpoint.numBlocks = numAcknowledged_; - return setCheckpointAndReturnToQueue(checkpoint, false); + setCheckpointAndReturnToQueue(checkpoint, false); +} + +ErrorCode ThreadTransferHistory::validateCheckpoint( + const Checkpoint &checkpoint, bool globalCheckpoint) { + if (lastCheckpoint_ == nullptr) { + return OK; + } + if (checkpoint.numBlocks < lastCheckpoint_->numBlocks) { + LOG(ERROR) << "Current checkpoint must be higher than previous checkpoint, " + "Last checkpoint: " + << *lastCheckpoint_ << ", Current checkpoint: " << checkpoint; + return INVALID_CHECKPOINT; + } + if (checkpoint.numBlocks > lastCheckpoint_->numBlocks) { + return OK; + } + bool noProgress = false; + // numBlocks same + if (checkpoint.lastBlockSeqId == lastCheckpoint_->lastBlockSeqId && + checkpoint.lastBlockOffset == lastCheckpoint_->lastBlockOffset) { + // same block + if (checkpoint.lastBlockReceivedBytes != + lastCheckpoint_->lastBlockReceivedBytes) { + LOG(ERROR) << "Current checkpoint has different received bytes, but all " + "other fields are same, Last checkpoint " + << *lastCheckpoint_ << ", Current checkpoint: " << checkpoint; + return INVALID_CHECKPOINT; + } + noProgress = true; + } else { + // different block + WDT_CHECK(checkpoint.lastBlockReceivedBytes >= 0); + if (checkpoint.lastBlockReceivedBytes == 0) { + noProgress = true; + } + } + if (noProgress && !globalCheckpoint) { + // we can get same global checkpoint multiple times, so no need to check for + // progress + LOG(WARNING) << "No progress since last checkpoint, Last checkpoint: " + << *lastCheckpoint_ << ", Current checkpoint: " << checkpoint; + return NO_PROGRESS; + } + return OK; } void ThreadTransferHistory::markSourceAsFailed( @@ -544,10 +589,19 @@ Sender::SenderState Sender::connect(ThreadData &data) { int port = ports_[data.threadIndex_]; TransferStats &threadStats = data.threadStats_; auto &socket = data.socket_; + auto &numReconnectWithoutProgress = data.numReconnectWithoutProgress_; + auto &options = WdtOptions::get(); if (socket) { socket->close(); } + if (numReconnectWithoutProgress >= options.max_transfer_retries) { + LOG(ERROR) << "Sender thread reconnected " << numReconnectWithoutProgress + << " times without making any progress, giving up. port: " + << socket->getPort(); + threadStats.setErrorCode(NO_PROGRESS); + return END; + } ErrorCode code; socket = connectToReceiver(port, code); @@ -577,6 +631,7 @@ Sender::SenderState Sender::readLocalCheckPoint(ThreadData &data) { int port = ports_[data.threadIndex_]; TransferStats &threadStats = data.threadStats_; ThreadTransferHistory &transferHistory = data.getTransferHistory(); + auto &numReconnectWithoutProgress = data.numReconnectWithoutProgress_; std::vector checkpoints; int64_t decodeOffset = 0; @@ -584,9 +639,10 @@ Sender::SenderState Sender::readLocalCheckPoint(ThreadData &data) { int checkpointLen = Protocol::getMaxLocalCheckpointLength(protocolVersion_); int64_t numRead = data.socket_->read(buf, checkpointLen); if (numRead != checkpointLen) { - VLOG(1) << "read mismatch " << checkpointLen << " " << numRead << " port " - << port; + LOG(ERROR) << "read mismatch during reading local checkpoint " + << checkpointLen << " " << numRead << " port " << port; threadStats.setErrorCode(SOCKET_READ_ERROR); + numReconnectWithoutProgress++; return CONNECT; } if (!Protocol::decodeCheckpoints(protocolVersion_, buf, decodeOffset, @@ -614,13 +670,17 @@ Sender::SenderState Sender::readLocalCheckPoint(ThreadData &data) { return READ_RECEIVER_CMD; } - auto numReturned = + ErrorCode errCode = transferHistory.setCheckpointAndReturnToQueue(checkpoint, false); - if (numReturned == -1) { + if (errCode == INVALID_CHECKPOINT) { threadStats.setErrorCode(PROTOCOL_ERROR); return END; } - VLOG(1) << numReturned << " number of source(s) returned to queue"; + if (errCode == NO_PROGRESS) { + numReconnectWithoutProgress++; + } else { + numReconnectWithoutProgress = 0; + } return SEND_SETTINGS; } diff --git a/Sender.h b/Sender.h index dec386ca..73cb9c57 100644 --- a/Sender.h +++ b/Sender.h @@ -57,11 +57,10 @@ class ThreadTransferHistory { * * @param checkpoint checkpoint received * @param globalCheckpoint global or local checkpoint - * @return number of sources returned to queue, -1 in - * case of error + * @return number of sources returned to queue */ - int64_t setCheckpointAndReturnToQueue(const Checkpoint &checkpoint, - bool globalCheckpoint); + ErrorCode setCheckpointAndReturnToQueue(const Checkpoint &checkpoint, + bool globalCheckpoint); /** * @return stats for acked sources, must be called after all the @@ -72,13 +71,8 @@ class ThreadTransferHistory { /// marks all the sources as acked void markAllAcknowledged(); - /** - * returns all unacked sources to the queue - * - * - * @return number of sources returned to queue, -1 in case of error - */ - int64_t returnUnackedSourcesToQueue(); + /// returns all unacked sources to the queue + void returnUnackedSourcesToQueue(); /** * @return number of sources acked by the receiver @@ -88,6 +82,9 @@ class ThreadTransferHistory { } private: + ErrorCode validateCheckpoint(const Checkpoint &checkpoint, + bool globalCheckpoint); + void markSourceAsFailed(std::unique_ptr &source, const Checkpoint *checkpoint); @@ -274,6 +271,8 @@ class Sender : public WdtBase { char buf_[Protocol::kMinBufLength]; /// whether total file size has been sent to the receiver bool totalSizeSent_{false}; + /// number of consecutive reconnects without any progress + int numReconnectWithoutProgress_{0}; ThreadData(int threadIndex, TransferStats &threadStats, std::vector &transferHistories) : threadIndex_(threadIndex), diff --git a/WdtConfig.h b/WdtConfig.h index ea3a7d31..0b2c8ac0 100644 --- a/WdtConfig.h +++ b/WdtConfig.h @@ -8,9 +8,9 @@ #define WDT_VERSION_MAJOR 1 #define WDT_VERSION_MINOR 21 -#define WDT_VERSION_BUILD 1510020 +#define WDT_VERSION_BUILD 1510050 // Add -fbcode to version str -#define WDT_VERSION_STR "1.21.1510020-fbcode" +#define WDT_VERSION_STR "1.21.1510050-fbcode" // Tie minor and proto version #define WDT_PROTOCOL_VERSION WDT_VERSION_MINOR diff --git a/WdtFlags.cpp.inc b/WdtFlags.cpp.inc index a0c6b4ce..83b6cd28 100644 --- a/WdtFlags.cpp.inc +++ b/WdtFlags.cpp.inc @@ -31,7 +31,9 @@ WDT_OPT(skip_writes, bool, "Skip writes on the receiver side"); WDT_OPT(backlog, int32, "Accept backlog"); WDT_OPT(buffer_size, int32, "Buffer size (per thread/socket)"); WDT_OPT(max_retries, int32, "how many attempts to connect/listen"); -WDT_OPT(max_transfer_retries, int32, "Max number of retries for a source"); +WDT_OPT(max_transfer_retries, int32, + "Maximum number of times sender thread reconnects without making any " + "progress"); WDT_OPT(sleep_millis, int32, "how many ms to wait between attempts"); WDT_OPT(block_size_mbytes, double, "Size of the blocks that files will be divided in, specify negative " diff --git a/WdtOptions.h b/WdtOptions.h index 2ff2124f..115c8a5d 100644 --- a/WdtOptions.h +++ b/WdtOptions.h @@ -148,7 +148,8 @@ class WdtOptions { std::string prune_dir_regex{""}; /** - * Maximum number of retries for transferring a file + * Maximum number of times sender thread reconnects without making any + * progress */ int max_transfer_retries{3}; diff --git a/wdt_bad_server_test.py b/wdt_bad_server_test.py new file mode 100755 index 00000000..55c40717 --- /dev/null +++ b/wdt_bad_server_test.py @@ -0,0 +1,51 @@ +#! /usr/bin/env python + +import socket +from time import time +from common_utils import * + +def start_server(): + s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + s.bind(("", 0)) + s.listen(5) + return s + +def main(): + s = start_server() + port = s.getsockname()[1] + print("Started server at port {0}".format(port)) + + read_timeout = 500 + abort_check_interval = 100 + retries = 3 + # start a wdt sender + sender_cmd = ("_bin/wdt/wdt -directory wdt/ -ipv6 -start_port={0} " + "-num_ports=1 -destination=localhost " + "-max_transfer_retries={1} -read_timeout_millis={2} " + "-abort_after_seconds=5 " + "-abort_check_interval_millis={3}").format( + port, + retries, + read_timeout, + abort_check_interval) + print(sender_cmd) + start_time = time() + os.system(sender_cmd) + end_time = time() + s.close() + duration_millis = (end_time - start_time) * 1000 + # have to count first read_receiver_cmd + num_retries = retries + 1 + expected_duration = (read_timeout + abort_check_interval) * num_retries + # adding extra 500 millis delay because delay introduced during testing + expected_duration += 500 + print("Transfer duration {0} millis, expected duration {1} millis").format( + duration_millis, + expected_duration) + if duration_millis > expected_duration: + exit(1) + else: + exit(0) + +if __name__ == "__main__": + main() diff --git a/wdt_download_resumption_test.sh b/wdt_download_resumption_test.sh index 4ed71164..24bf7989 100755 --- a/wdt_download_resumption_test.sh +++ b/wdt_download_resumption_test.sh @@ -109,7 +109,7 @@ WDTBIN_OPTS="-ipv6 -num_ports=$threads -full_reporting \ -enable_download_resumption -keep_transfer_log=false \ -treat_fewer_port_as_error \ -resume_using_dir_tree=$RESUME_USING_DIR_TREE -enable_perf_stat_collection \ --connect_timeout_millis 100 -max_transfer_retries=6" +-connect_timeout_millis 100" WDTBIN_CLIENT="$WDT_SENDER $WDTBIN_OPTS" WDTBIN_SERVER="$WDT_RECEIVER $WDTBIN_OPTS" diff --git a/wdt_network_test.sh b/wdt_network_test.sh index 49dcc998..d2e47a88 100755 --- a/wdt_network_test.sh +++ b/wdt_network_test.sh @@ -75,7 +75,7 @@ WDTBIN_OPTS="-enable_perf_stat_collection -ipv6 -start_port=$STARTING_PORT \ -avg_mbytes_per_sec=60 -max_mbytes_per_sec=65 -run_as_daemon=false \ -full_reporting -read_timeout_millis=495 -write_timeout_millis=495 \ -progress_report_interval_millis=-1 -abort_check_interval_millis=100 \ --max_transfer_retries=26 -treat_fewer_port_as_error \ +-treat_fewer_port_as_error \ -connect_timeout_millis 100 -transfer_id $$ -num_ports=$threads" WDTBIN_SERVER="$WDT_RECEIVER $WDTBIN_OPTS \ -protocol_version=$RECEIVER_PROTOCOL_VERSION"