Skip to content

Commit

Permalink
Prevents objects from being deleted during transfer. (#1959)
Browse files Browse the repository at this point in the history
Fixes #1927

Signed-off-by: vegetableysm <[email protected]>
  • Loading branch information
vegetableysm authored Jul 29, 2024
1 parent d778848 commit 86808bb
Show file tree
Hide file tree
Showing 8 changed files with 245 additions and 26 deletions.
25 changes: 25 additions & 0 deletions src/client/rpc_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ Status RPCClient::GetRemoteBlob(const ObjectID& id, const bool unsafe,
std::vector<int> fd_sent;

std::string message_out;
RDMABlobScopeGuard rdmaBlobScopeGuard;
if (rdma_connected_) {
WriteGetRemoteBuffersRequest(std::set<ObjectID>{id}, unsafe, false, true,
message_out);
Expand All @@ -755,6 +756,12 @@ Status RPCClient::GetRemoteBlob(const ObjectID& id, const bool unsafe,
false, message_out);
}
RETURN_ON_ERROR(doWrite(message_out));
if (rdma_connected_) {
std::unordered_set<ObjectID> ids{payloads[0].object_id};
std::function<void(std::unordered_set<ObjectID>)> func = std::bind(
&RPCClient::doReleaseBlobsWithRDMARequest, this, std::placeholders::_1);
rdmaBlobScopeGuard.set(func, ids);
}
json message_in;
RETURN_ON_ERROR(doRead(message_in));
RETURN_ON_ERROR(ReadGetBuffersReply(message_in, payloads, fd_sent));
Expand Down Expand Up @@ -856,6 +863,7 @@ Status RPCClient::GetRemoteBlobs(
std::unordered_set<ObjectID> id_set(ids.begin(), ids.end());
std::vector<Payload> payloads;
std::vector<int> fd_sent;
RDMABlobScopeGuard rdmaBlobScopeGuard;

std::string message_out;
if (rdma_connected_) {
Expand All @@ -865,6 +873,11 @@ Status RPCClient::GetRemoteBlobs(
message_out);
}
RETURN_ON_ERROR(doWrite(message_out));
if (rdma_connected_) {
std::function<void(std::unordered_set<ObjectID>)> func = std::bind(
&RPCClient::doReleaseBlobsWithRDMARequest, this, std::placeholders::_1);
rdmaBlobScopeGuard.set(func, id_set);
}
json message_in;
RETURN_ON_ERROR(doRead(message_in));
RETURN_ON_ERROR(ReadGetBuffersReply(message_in, payloads, fd_sent));
Expand All @@ -886,6 +899,7 @@ Status RPCClient::GetRemoteBlobs(
TransferRemoteBlobWithRDMA(remote_blob->buffer_, payload, opt_func));
id_payload_map[payload.object_id] = remote_blob;
}

} else {
for (auto const& payload : payloads) {
auto remote_blob = std::shared_ptr<RemoteBlob>(new RemoteBlob(
Expand Down Expand Up @@ -994,4 +1008,15 @@ Status RPCClient::GetRemoteBlobs(
return Status::OK();
}

Status RPCClient::doReleaseBlobsWithRDMARequest(
std::unordered_set<ObjectID> id_set) {
std::string message_out;
WriteReleaseBlobsWithRDMARequest(id_set, message_out);
RETURN_ON_ERROR(doWrite(message_out));
json message_in;
RETURN_ON_ERROR(doRead(message_in));
RETURN_ON_ERROR(ReadReleaseBlobsWithRDMAReply(message_in));
return Status::OK();
}

} // namespace vineyard
27 changes: 27 additions & 0 deletions src/client/rpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
#include <memory>
#include <set>
#include <string>
#include <unordered_set>
#include <vector>

#include "client/client_base.h"
Expand Down Expand Up @@ -449,6 +450,32 @@ class RPCClient final : public ClientBase {
Status TransferRemoteBlobWithRDMA(std::shared_ptr<Buffer> buffer,
const Payload& payload,
RDMAClient::rdma_opt_t rdma_opt);
Status doReleaseBlobsWithRDMARequest(std::unordered_set<ObjectID> id_set);

class RDMABlobScopeGuard {
public:
RDMABlobScopeGuard(std::function<void(std::unordered_set<ObjectID>)> onExit,
std::unordered_set<ObjectID> id_set)
: onExit_(onExit), id_set_(id_set) {}

RDMABlobScopeGuard() = default;

void set(std::function<void(std::unordered_set<ObjectID>)> onExit,
std::unordered_set<ObjectID> id_set) {
onExit_ = onExit;
id_set_ = id_set;
}

~RDMABlobScopeGuard() {
if (onExit_) {
onExit_(id_set_);
}
}

private:
std::function<void(std::unordered_set<ObjectID>)> onExit_;
std::unordered_set<ObjectID> id_set_;
};

InstanceID remote_instance_id_;

Expand Down
39 changes: 39 additions & 0 deletions src/common/util/protocols.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ const std::string command_t::CLEAR_REQUEST = "clear_request";
const std::string command_t::CLEAR_REPLY = "clear_reply";
const std::string command_t::MEMORY_TRIM_REQUEST = "memory_trim_request";
const std::string command_t::MEMORY_TRIM_REPLY = "memory_trim_reply";
const std::string command_t::RELEASE_BLOBS_WITH_RDMA_REQUEST =
"release_blobs_with_rdma_request";
const std::string command_t::RELEASE_BLOBS_WITH_RDMA_REPLY =
"release_blobs_with_rdma_reply";

// Stream APIs
const std::string command_t::CREATE_STREAM_REQUEST = "create_stream_request";
Expand Down Expand Up @@ -1475,6 +1479,41 @@ Status ReadMemoryTrimReply(const json& root, bool& trimmed) {
return Status::OK();
}

void WriteReleaseBlobsWithRDMARequest(const std::unordered_set<ObjectID>& ids,
std::string& msg) {
json root;
root["type"] = command_t::RELEASE_BLOBS_WITH_RDMA_REQUEST;
int idx = 0;
for (auto const& id : ids) {
root[std::to_string(idx++)] = id;
}

root["num"] = ids.size();
encode_msg(root, msg);
}

Status ReadReleaseBlobsWithRDMARequest(const json& root,
std::vector<ObjectID>& ids) {
CHECK_IPC_ERROR(root, command_t::RELEASE_BLOBS_WITH_RDMA_REQUEST);
size_t num = root["num"].get<size_t>();
for (size_t i = 0; i < num; ++i) {
ids.push_back(root[std::to_string(i)].get<ObjectID>());
}
return Status::OK();
}

void WriteReleaseBlobsWithRDMAReply(std::string& msg) {
json root;
root["type"] = command_t::RELEASE_BLOBS_WITH_RDMA_REPLY;

encode_msg(root, msg);
}

Status ReadReleaseBlobsWithRDMAReply(const json& root) {
CHECK_IPC_ERROR(root, command_t::RELEASE_BLOBS_WITH_RDMA_REPLY);
return Status::OK();
}

void WriteCreateStreamRequest(const ObjectID& object_id, std::string& msg) {
json root;
root["type"] = command_t::CREATE_STREAM_REQUEST;
Expand Down
12 changes: 12 additions & 0 deletions src/common/util/protocols.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ struct command_t {
static const std::string CLEAR_REPLY;
static const std::string MEMORY_TRIM_REQUEST;
static const std::string MEMORY_TRIM_REPLY;
static const std::string RELEASE_BLOBS_WITH_RDMA_REQUEST;
static const std::string RELEASE_BLOBS_WITH_RDMA_REPLY;

// Stream APIs
static const std::string CREATE_STREAM_REQUEST;
Expand Down Expand Up @@ -581,6 +583,16 @@ void WriteMemoryTrimReply(const bool trimmed, std::string& msg);

Status ReadMemoryTrimReply(const json& root, bool& trimmed);

void WriteReleaseBlobsWithRDMARequest(const std::unordered_set<ObjectID>& ids,
std::string& msg);

Status ReadReleaseBlobsWithRDMARequest(const json& root,
std::vector<ObjectID>& ids);

void WriteReleaseBlobsWithRDMAReply(std::string& msg);

Status ReadReleaseBlobsWithRDMAReply(const json& root);

void WriteCreateStreamRequest(const ObjectID& object_id, std::string& msg);

Status ReadCreateStreamRequest(const json& root, ObjectID& object_id);
Expand Down
40 changes: 31 additions & 9 deletions src/server/async/socket_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ bool SocketConnection::processMessage(const std::string& message_in) {
return doAcquireLock(root);
} else if (cmd == command_t::RELEASE_LOCK_REQUEST) {
return doReleaseLock(root);
} else if (cmd == command_t::RELEASE_BLOBS_WITH_RDMA_REQUEST) {
return doReleaseBlobsWithRDMA(root);
} else {
RESPONSE_ON_ERROR(Status::Invalid("Got unexpected command: " + cmd));
return false;
Expand Down Expand Up @@ -784,21 +786,25 @@ bool SocketConnection::doGetRemoteBuffers(const json& root) {

TRY_READ_REQUEST(ReadGetRemoteBuffersRequest, root, ids, unsafe, compress,
use_rdma);
server_ptr_->LockTransmissionObjects(ids);
RESPONSE_ON_ERROR(bulk_store_->GetUnsafe(ids, unsafe, objects));
RESPONSE_ON_ERROR(bulk_store_->AddDependency(
std::unordered_set<ObjectID>(ids.begin(), ids.end()), this->getConnId()));
WriteGetBuffersReply(objects, {}, compress, message_out);

if (!use_rdma) {
this->doWrite(message_out, [self, objects, compress](const Status& status) {
SendRemoteBuffers(
self->socket_, objects, 0, compress, [self](const Status& status) {
if (!status.ok()) {
VLOG(100) << "Failed to send buffers to remote client: "
<< status.ToString();
}
return Status::OK();
});
this->doWrite(message_out, [self, objects, compress,
ids](const Status& status) {
SendRemoteBuffers(self->socket_, objects, 0, compress,
[self, ids = std::move(ids)](const Status& status) {
if (!status.ok()) {
VLOG(100)
<< "Failed to send buffers to remote client: "
<< status.ToString();
}
self->server_ptr_->UnlockTransmissionObjects(ids);
return Status::OK();
});
return Status::OK();
});
} else {
Expand Down Expand Up @@ -835,6 +841,7 @@ bool SocketConnection::doDelDataWithFeedbacks(json const& root) {
std::vector<ObjectID> ids;
bool force, deep, memory_trim, fastpath;
double startTime = GetCurrentTime();

TRY_READ_REQUEST(ReadDelDataWithFeedbacksRequest, root, ids, force, deep,
memory_trim, fastpath);
RESPONSE_ON_ERROR(server_ptr_->DelData(
Expand Down Expand Up @@ -1834,6 +1841,21 @@ bool SocketConnection::doReleaseLock(const json& root) {
return false;
}

bool SocketConnection::doReleaseBlobsWithRDMA(const json& root) {
auto self(shared_from_this());
std::vector<ObjectID> ids;
TRY_READ_REQUEST(ReadReleaseBlobsWithRDMARequest, root, ids);

boost::asio::post(server_ptr_->GetIOContext(), [self, ids]() {
self->server_ptr_->UnlockTransmissionObjects(ids);
std::string message_out;
WriteReleaseBlobsWithRDMAReply(message_out);
self->doWrite(message_out);
});

return false;
}

void SocketConnection::doWrite(const std::string& buf) {
std::string to_send;
size_t length = buf.size();
Expand Down
2 changes: 2 additions & 0 deletions src/server/async/socket_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ class SocketConnection : public std::enable_shared_from_this<SocketConnection> {
bool doAcquireLock(json const& root);
bool doReleaseLock(json const& root);

bool doReleaseBlobsWithRDMA(json const& root);

protected:
template <typename FROM, typename TO>
Status MoveBuffers(std::map<FROM, TO> mapping,
Expand Down
34 changes: 17 additions & 17 deletions src/server/server/vineyard_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,13 @@ Status VineyardServer::DelData(
"Fastpath deletion can only be applied to blobs");
}
context_.post([this, memory_trim, ids, callback] {
for (auto const id : ids) {
std::vector<ObjectID> transmissions, non_transmissions;
std::unique_lock<std::mutex> lock =
FindTransmissionObjects(ids, transmissions, non_transmissions);

RemoveFromMigrationList(non_transmissions);
AddPendingObjects(transmissions);
for (auto const id : non_transmissions) {
VINEYARD_DISCARD(bulk_store_->OnDelete(id, memory_trim));
}
VINEYARD_DISCARD(callback(Status::OK(), ids));
Expand All @@ -788,27 +794,21 @@ Status VineyardServer::DelData(
std::vector<meta_tree::op_t>& ops, bool& sync_remote) {
if (status.ok()) {
Status s;
std::vector<ObjectID> transmissions, non_transmissions;
std::unique_lock<std::mutex> lock = self->FindTransmissionObjects(
ids_to_delete, transmissions, non_transmissions);
VLOG(100) << "transmission object num:" << transmissions.size()
<< ", non-transmission object num:"
<< non_transmissions.size();
self->RemoveFromMigrationList(non_transmissions);
self->AddPendingObjects(transmissions);

{
std::lock_guard<std::mutex> lock_origin(
self->migrations_target_to_origin_mutex_);
std::lock_guard<std::mutex> lock_target(
self->migrations_origin_to_target_mutex_);
for (auto const id : ids) {
if (self->migrations_target_to_origin_.find(id) !=
self->migrations_target_to_origin_.end()) {
ObjectID remoteID = self->migrations_target_to_origin_[id];
self->migrations_origin_to_target_.erase(remoteID);
self->migrations_target_to_origin_.erase(id);
}
}
}
VCATCH_JSON_ERROR(
meta, s,
meta_tree::DelDataOps(meta, ids_to_delete, ops, sync_remote));
meta_tree::DelDataOps(meta, non_transmissions, ops, sync_remote));
if (status.ok() && !ops.empty() &&
self->spec_["sync_crds"].get<bool>()) {
for (auto const& id : ids_to_delete) {
for (auto const& id : non_transmissions) {
if (IsBlob(id)) {
continue;
}
Expand Down
Loading

0 comments on commit 86808bb

Please sign in to comment.