Skip to content

Commit

Permalink
Merge pull request #649 from raakella1/issue_116
Browse files Browse the repository at this point in the history
Return grpc error if a non originator receives fetch data request
  • Loading branch information
raakella1 authored Feb 14, 2025
2 parents fb28db4 + 8d8eaa4 commit 70de620
Showing 1 changed file with 30 additions and 24 deletions.
54 changes: 30 additions & 24 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -776,32 +776,38 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_
auto const& originator = req->blkid_originator();
auto const& remote_blkid = req->remote_blkid();

// release this assert if in the future we want to fetch from non-originator;
RD_REL_ASSERT_EQ(originator, server_id(),
"Not expect to receive fetch data from remote when I am not the originator of this request");
// Edit this check if in the future we want to fetch from non-originator;
if (originator != server_id()) {
auto const error_msg = fmt::format("Did not expect to receive fetch data from "
"remote when I am not the originator of this request, originator={}, my_server_id={}"
, originator, server_id());
RD_LOGW("{}", error_msg);
auto status = ::grpc::Status(::grpc::INVALID_ARGUMENT, error_msg);
rpc_data->set_status(status);
rpc_data->send_response();
return;
}

// fetch data based on the remote_blkid
if (originator == server_id()) {
// We are the originator of the blkid, read data locally;
MultiBlkId local_blkid;

// convert remote_blkid serialized data to local blkid
local_blkid.deserialize(sisl::blob{remote_blkid->Data(), remote_blkid->size()}, true /* copy */);

RD_LOGD("Data Channel: FetchData received: dsn={} lsn={} my_blkid={}", req->dsn(), lsn,
local_blkid.to_string());

// prepare the sgs data buffer to read into;
auto const total_size = local_blkid.blk_count() * get_blk_size();
sisl::sg_list sgs;
sgs.size = total_size;
sgs.iovs.emplace_back(
iovec{.iov_base = iomanager.iobuf_alloc(get_blk_size(), total_size), .iov_len = total_size});

// accumulate the sgs for later use (send back to the requester));
sgs_vec.push_back(sgs);
futs.emplace_back(async_read(local_blkid, sgs, total_size));
}
// We are the originator of the blkid, read data locally;
MultiBlkId local_blkid;

// convert remote_blkid serialized data to local blkid
local_blkid.deserialize(sisl::blob{remote_blkid->Data(), remote_blkid->size()}, true /* copy */);

RD_LOGD("Data Channel: FetchData received: dsn={} lsn={} my_blkid={}", req->dsn(), lsn,
local_blkid.to_string());

// prepare the sgs data buffer to read into;
auto const total_size = local_blkid.blk_count() * get_blk_size();
sisl::sg_list sgs;
sgs.size = total_size;
sgs.iovs.emplace_back(
iovec{.iov_base = iomanager.iobuf_alloc(get_blk_size(), total_size), .iov_len = total_size});

// accumulate the sgs for later use (send back to the requester));
sgs_vec.push_back(sgs);
futs.emplace_back(async_read(local_blkid, sgs, total_size));
}

folly::collectAllUnsafe(futs).thenValue(
Expand Down

0 comments on commit 70de620

Please sign in to comment.