Skip to content

Commit

Permalink
WDT reconnect logic and block retry logic improvement
Browse files Browse the repository at this point in the history
Summary: WDT used to retry a block only certain number of times. Now it does not give up
on an individual block. But, if certain number of reconnects are made without
any progress, that thread ends.

Reviewed By: @ldemailly

Differential Revision: D2508644

fb-gh-sync-id: 1a0e06d6fae37e643cfea2897fa895240346630a
  • Loading branch information
uddipta authored and ldemailly committed Oct 6, 2015
1 parent 68d0512 commit 3efdc29
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 47 deletions.
5 changes: 4 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ cmake_minimum_required(VERSION 3.2)
# There is no C per se in WDT but if you use CXX only here many checks fail
# Version is Major.Minor.YYMMDDX for up to 10 releases per day
# Minor currently is also the protocol version - has to match with Protocol.cpp
project("WDT" LANGUAGES C CXX VERSION 1.21.1510020)
project("WDT" LANGUAGES C CXX VERSION 1.21.1510050)

# On MacOS this requires the latest (master) CMake (and/or CMake 3.1.1/3.2)
set(CMAKE_CXX_STANDARD 11)
Expand Down Expand Up @@ -302,4 +302,7 @@ if (BUILD_TESTING)
add_test(NAME WdtOverwriteTest COMMAND
"${CMAKE_CURRENT_SOURCE_DIR}/wdt_overwrite_test.py")

add_test(NAME WdtBadServerTest COMMAND
"${CMAKE_CURRENT_SOURCE_DIR}/wdt_bad_server_test.py")

endif(BUILD_TESTING)
11 changes: 2 additions & 9 deletions DirectorySourceQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,15 +396,8 @@ void DirectorySourceQueue::returnToQueue(
int returnedCount = 0;
std::unique_lock<std::mutex> lock(mutex_);
for (auto &source : sources) {
const int64_t retries = source->getTransferStats().getFailedAttempts();
if (retries >= options_.max_transfer_retries) {
LOG(ERROR) << source->getIdentifier() << " failed after " << retries
<< " number of tries.";
failedSourceStats_.emplace_back(std::move(source->getTransferStats()));
} else {
sourceQueue_.push(std::move(source));
returnedCount++;
}
sourceQueue_.push(std::move(source));
returnedCount++;
WDT_CHECK_GT(numBlocksDequeued_, 0);
numBlocksDequeued_--;
}
Expand Down
4 changes: 3 additions & 1 deletion ErrorCodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ namespace wdt {
X(URI_PARSE_ERROR) /** Wdt uri passed couldn't be parsed */ \
X(INCONSISTENT_DIRECTORY) /** Destination directory is not consistent with \
transfer log */ \
X(INVALID_LOG) /** Transfer log invalid */
X(INVALID_LOG) /** Transfer log invalid */ \
X(INVALID_CHECKPOINT) /** Received checkpoint is invalid */ \
X(NO_PROGRESS) /** Transfer has not progressed */

enum ErrorCode {
#define X(A) A,
Expand Down
8 changes: 8 additions & 0 deletions Protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ int Protocol::negotiateProtocol(int requestedProtocolVersion,
return std::min<int>(curProtocolVersion, requestedProtocolVersion);
}

std::ostream &operator<<(std::ostream &os, const Checkpoint &checkpoint) {
os << "num-blocks: " << checkpoint.numBlocks
<< " seq-id: " << checkpoint.lastBlockSeqId
<< " block-offset: " << checkpoint.lastBlockOffset
<< " received-bytes: " << checkpoint.lastBlockReceivedBytes;
return os;
}

void FileChunksInfo::addChunk(const Interval &chunk) {
chunks_.emplace_back(chunk);
}
Expand Down
9 changes: 8 additions & 1 deletion Protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ namespace wdt {
/// blocks and number of bytes received for the last block
struct Checkpoint {
int32_t port{0};
/// number of complete blocks received
int64_t numBlocks{0};
int64_t lastBlockReceivedBytes{0};
/// Next three fields are only set if a block is received partially
/// seq-id of the partially received block
int64_t lastBlockSeqId{-1};
/// block offset of the partially received block
int64_t lastBlockOffset{0};
/// number of bytes received for the partially received block
int64_t lastBlockReceivedBytes{0};
bool hasSeqId{false};
Checkpoint() {
}
Expand All @@ -53,6 +58,8 @@ struct Checkpoint {
}
};

std::ostream &operator<<(std::ostream &os, const Checkpoint &checkpoint);

/// structure representing a single chunk of a file
struct Interval {
/// start offset
Expand Down
96 changes: 78 additions & 18 deletions Sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ bool ThreadTransferHistory::addSource(std::unique_ptr<ByteSource> &source) {
return true;
}

int64_t ThreadTransferHistory::setCheckpointAndReturnToQueue(
ErrorCode ThreadTransferHistory::setCheckpointAndReturnToQueue(
const Checkpoint &checkpoint, bool globalCheckpoint) {
folly::SpinLockGuard guard(lock_);
const int64_t historySize = history_.size();
Expand All @@ -68,25 +68,23 @@ int64_t ThreadTransferHistory::setCheckpointAndReturnToQueue(
LOG(ERROR)
<< "checkpoint is greater than total number of sources transfered "
<< history_.size() << " " << numReceivedSources;
return -1;
return INVALID_CHECKPOINT;
}
if (numReceivedSources < numAcknowledged_) {
LOG(ERROR) << "new checkpoint is less than older checkpoint "
<< numAcknowledged_ << " " << numReceivedSources;
return -1;
ErrorCode errCode = validateCheckpoint(checkpoint, globalCheckpoint);
if (errCode == INVALID_CHECKPOINT) {
return INVALID_CHECKPOINT;
}
globalCheckpoint_ |= globalCheckpoint;
lastCheckpoint_ = folly::make_unique<Checkpoint>(checkpoint);
int64_t numFailedSources = historySize - numReceivedSources;
if (numFailedSources == 0 && lastBlockReceivedBytes > 0) {
if (globalCheckpoint) {
lastCheckpoint_ = folly::make_unique<Checkpoint>(checkpoint);
} else {
if (!globalCheckpoint) {
// no block to apply checkpoint offset. This can happen if we receive same
// local checkpoint without adding anything to the history
LOG(WARNING) << "Local checkpoint has received bytes for last block, but "
"there are no unacked blocks in the history. Ignoring.";
}
}
globalCheckpoint_ |= globalCheckpoint;
numAcknowledged_ = numReceivedSources;
std::vector<std::unique_ptr<ByteSource>> sourcesToReturn;
for (int64_t i = 0; i < numFailedSources; i++) {
Expand All @@ -98,7 +96,10 @@ int64_t ThreadTransferHistory::setCheckpointAndReturnToQueue(
sourcesToReturn.emplace_back(std::move(source));
}
queue_.returnToQueue(sourcesToReturn);
return numFailedSources;
LOG(INFO) << numFailedSources
<< " number of sources returned to queue, checkpoint: "
<< checkpoint;
return errCode;
}

std::vector<TransferStats> ThreadTransferHistory::popAckedSourceStats() {
Expand All @@ -118,10 +119,54 @@ void ThreadTransferHistory::markAllAcknowledged() {
numAcknowledged_ = history_.size();
}

int64_t ThreadTransferHistory::returnUnackedSourcesToQueue() {
void ThreadTransferHistory::returnUnackedSourcesToQueue() {
Checkpoint checkpoint;
checkpoint.numBlocks = numAcknowledged_;
return setCheckpointAndReturnToQueue(checkpoint, false);
setCheckpointAndReturnToQueue(checkpoint, false);
}

ErrorCode ThreadTransferHistory::validateCheckpoint(
const Checkpoint &checkpoint, bool globalCheckpoint) {
if (lastCheckpoint_ == nullptr) {
return OK;
}
if (checkpoint.numBlocks < lastCheckpoint_->numBlocks) {
LOG(ERROR) << "Current checkpoint must be higher than previous checkpoint, "
"Last checkpoint: "
<< *lastCheckpoint_ << ", Current checkpoint: " << checkpoint;
return INVALID_CHECKPOINT;
}
if (checkpoint.numBlocks > lastCheckpoint_->numBlocks) {
return OK;
}
bool noProgress = false;
// numBlocks same
if (checkpoint.lastBlockSeqId == lastCheckpoint_->lastBlockSeqId &&
checkpoint.lastBlockOffset == lastCheckpoint_->lastBlockOffset) {
// same block
if (checkpoint.lastBlockReceivedBytes !=
lastCheckpoint_->lastBlockReceivedBytes) {
LOG(ERROR) << "Current checkpoint has different received bytes, but all "
"other fields are same, Last checkpoint "
<< *lastCheckpoint_ << ", Current checkpoint: " << checkpoint;
return INVALID_CHECKPOINT;
}
noProgress = true;
} else {
// different block
WDT_CHECK(checkpoint.lastBlockReceivedBytes >= 0);
if (checkpoint.lastBlockReceivedBytes == 0) {
noProgress = true;
}
}
if (noProgress && !globalCheckpoint) {
// we can get same global checkpoint multiple times, so no need to check for
// progress
LOG(WARNING) << "No progress since last checkpoint, Last checkpoint: "
<< *lastCheckpoint_ << ", Current checkpoint: " << checkpoint;
return NO_PROGRESS;
}
return OK;
}

void ThreadTransferHistory::markSourceAsFailed(
Expand Down Expand Up @@ -544,10 +589,19 @@ Sender::SenderState Sender::connect(ThreadData &data) {
int port = ports_[data.threadIndex_];
TransferStats &threadStats = data.threadStats_;
auto &socket = data.socket_;
auto &numReconnectWithoutProgress = data.numReconnectWithoutProgress_;
auto &options = WdtOptions::get();

if (socket) {
socket->close();
}
if (numReconnectWithoutProgress >= options.max_transfer_retries) {
LOG(ERROR) << "Sender thread reconnected " << numReconnectWithoutProgress
<< " times without making any progress, giving up. port: "
<< socket->getPort();
threadStats.setErrorCode(NO_PROGRESS);
return END;
}

ErrorCode code;
socket = connectToReceiver(port, code);
Expand Down Expand Up @@ -577,16 +631,18 @@ Sender::SenderState Sender::readLocalCheckPoint(ThreadData &data) {
int port = ports_[data.threadIndex_];
TransferStats &threadStats = data.threadStats_;
ThreadTransferHistory &transferHistory = data.getTransferHistory();
auto &numReconnectWithoutProgress = data.numReconnectWithoutProgress_;

std::vector<Checkpoint> checkpoints;
int64_t decodeOffset = 0;
char *buf = data.buf_;
int checkpointLen = Protocol::getMaxLocalCheckpointLength(protocolVersion_);
int64_t numRead = data.socket_->read(buf, checkpointLen);
if (numRead != checkpointLen) {
VLOG(1) << "read mismatch " << checkpointLen << " " << numRead << " port "
<< port;
LOG(ERROR) << "read mismatch during reading local checkpoint "
<< checkpointLen << " " << numRead << " port " << port;
threadStats.setErrorCode(SOCKET_READ_ERROR);
numReconnectWithoutProgress++;
return CONNECT;
}
if (!Protocol::decodeCheckpoints(protocolVersion_, buf, decodeOffset,
Expand Down Expand Up @@ -614,13 +670,17 @@ Sender::SenderState Sender::readLocalCheckPoint(ThreadData &data) {
return READ_RECEIVER_CMD;
}

auto numReturned =
ErrorCode errCode =
transferHistory.setCheckpointAndReturnToQueue(checkpoint, false);
if (numReturned == -1) {
if (errCode == INVALID_CHECKPOINT) {
threadStats.setErrorCode(PROTOCOL_ERROR);
return END;
}
VLOG(1) << numReturned << " number of source(s) returned to queue";
if (errCode == NO_PROGRESS) {
numReconnectWithoutProgress++;
} else {
numReconnectWithoutProgress = 0;
}
return SEND_SETTINGS;
}

Expand Down
21 changes: 10 additions & 11 deletions Sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,10 @@ class ThreadTransferHistory {
*
* @param checkpoint checkpoint received
* @param globalCheckpoint global or local checkpoint
* @return number of sources returned to queue, -1 in
* case of error
* @return number of sources returned to queue
*/
int64_t setCheckpointAndReturnToQueue(const Checkpoint &checkpoint,
bool globalCheckpoint);
ErrorCode setCheckpointAndReturnToQueue(const Checkpoint &checkpoint,
bool globalCheckpoint);

/**
* @return stats for acked sources, must be called after all the
Expand All @@ -72,13 +71,8 @@ class ThreadTransferHistory {
/// marks all the sources as acked
void markAllAcknowledged();

/**
* returns all unacked sources to the queue
*
*
* @return number of sources returned to queue, -1 in case of error
*/
int64_t returnUnackedSourcesToQueue();
/// returns all unacked sources to the queue
void returnUnackedSourcesToQueue();

/**
* @return number of sources acked by the receiver
Expand All @@ -88,6 +82,9 @@ class ThreadTransferHistory {
}

private:
ErrorCode validateCheckpoint(const Checkpoint &checkpoint,
bool globalCheckpoint);

void markSourceAsFailed(std::unique_ptr<ByteSource> &source,
const Checkpoint *checkpoint);

Expand Down Expand Up @@ -274,6 +271,8 @@ class Sender : public WdtBase {
char buf_[Protocol::kMinBufLength];
/// whether total file size has been sent to the receiver
bool totalSizeSent_{false};
/// number of consecutive reconnects without any progress
int numReconnectWithoutProgress_{0};
ThreadData(int threadIndex, TransferStats &threadStats,
std::vector<ThreadTransferHistory> &transferHistories)
: threadIndex_(threadIndex),
Expand Down
4 changes: 2 additions & 2 deletions WdtConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

#define WDT_VERSION_MAJOR 1
#define WDT_VERSION_MINOR 21
#define WDT_VERSION_BUILD 1510020
#define WDT_VERSION_BUILD 1510050
// Add -fbcode to version str
#define WDT_VERSION_STR "1.21.1510020-fbcode"
#define WDT_VERSION_STR "1.21.1510050-fbcode"
// Tie minor and proto version
#define WDT_PROTOCOL_VERSION WDT_VERSION_MINOR

Expand Down
4 changes: 3 additions & 1 deletion WdtFlags.cpp.inc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ WDT_OPT(skip_writes, bool, "Skip writes on the receiver side");
WDT_OPT(backlog, int32, "Accept backlog");
WDT_OPT(buffer_size, int32, "Buffer size (per thread/socket)");
WDT_OPT(max_retries, int32, "how many attempts to connect/listen");
WDT_OPT(max_transfer_retries, int32, "Max number of retries for a source");
WDT_OPT(max_transfer_retries, int32,
"Maximum number of times sender thread reconnects without making any "
"progress");
WDT_OPT(sleep_millis, int32, "how many ms to wait between attempts");
WDT_OPT(block_size_mbytes, double,
"Size of the blocks that files will be divided in, specify negative "
Expand Down
3 changes: 2 additions & 1 deletion WdtOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ class WdtOptions {
std::string prune_dir_regex{""};

/**
* Maximum number of retries for transferring a file
* Maximum number of times sender thread reconnects without making any
* progress
*/
int max_transfer_retries{3};

Expand Down
51 changes: 51 additions & 0 deletions wdt_bad_server_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#! /usr/bin/env python

import socket
from time import time
from common_utils import *

def start_server():
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
s.bind(("", 0))
s.listen(5)
return s

def main():
s = start_server()
port = s.getsockname()[1]
print("Started server at port {0}".format(port))

read_timeout = 500
abort_check_interval = 100
retries = 3
# start a wdt sender
sender_cmd = ("_bin/wdt/wdt -directory wdt/ -ipv6 -start_port={0} "
"-num_ports=1 -destination=localhost "
"-max_transfer_retries={1} -read_timeout_millis={2} "
"-abort_after_seconds=5 "
"-abort_check_interval_millis={3}").format(
port,
retries,
read_timeout,
abort_check_interval)
print(sender_cmd)
start_time = time()
os.system(sender_cmd)
end_time = time()
s.close()
duration_millis = (end_time - start_time) * 1000
# have to count first read_receiver_cmd
num_retries = retries + 1
expected_duration = (read_timeout + abort_check_interval) * num_retries
# adding extra 500 millis delay because delay introduced during testing
expected_duration += 500
print("Transfer duration {0} millis, expected duration {1} millis").format(
duration_millis,
expected_duration)
if duration_millis > expected_duration:
exit(1)
else:
exit(0)

if __name__ == "__main__":
main()
Loading

0 comments on commit 3efdc29

Please sign in to comment.