Skip to content

Commit

Permalink
Improve the performance of RPC client (#1731)
Browse files Browse the repository at this point in the history
- Add batch APIs for creating blobs and creating remote blobs.
- Refactor the implementation of RPCClient's `GetObject`.
- Improve the builder's implementation for torch state_dict.
- Use boost's `socket.async_receive` for better performance.

Fixes #1727

Signed-off-by: Tao He <[email protected]>
  • Loading branch information
sighingnow authored Jan 25, 2024
1 parent 5ecd806 commit 95a1610
Show file tree
Hide file tree
Showing 37 changed files with 1,261 additions and 273 deletions.
4 changes: 2 additions & 2 deletions docs/notes/key-concepts/data-accessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,13 @@ payloads over the network.
>>> remote_buffer_builder.copy(0, payload)
>>>
>>> # create the remote blob using the RPCClient, with the `remote_buffer_builder` as argument
>>> remote_blob_id = vineyard_rpc_client.create_remote_blob(remote_buffer_builder)
>>> remote_blob_meta = vineyard_rpc_client.create_remote_blob(remote_buffer_builder)
.. code:: python
:caption: Accessing remote blobs
>>> # get the remote blob from vineyard using object id
>>> remote_blob = vineyard_rpc_client.get_remote_blob(remote_blob_id)
>>> remote_blob = vineyard_rpc_client.get_remote_blob(remote_blob_meta.id)
>>> remote_blob, type(remote_blob)
(<vineyard._C.RemoteBlob at 0x142204870>, vineyard._C.RemoteBlob)
>>>
Expand Down
52 changes: 48 additions & 4 deletions python/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,15 @@ void bind_client(py::module& mod) {
return metadata;
},
"metadata"_a, doc::ClientBase_create_metadata)
.def(
"create_metadata",
[](ClientBase* self,
std::vector<ObjectMeta>& metadatas) -> std::vector<ObjectMeta>& {
std::vector<ObjectID> object_ids;
throw_on_error(self->CreateMetaData(metadatas, object_ids));
return metadatas;
},
"metadata"_a, doc::ClientBase_create_metadata)
.def(
"create_metadata",
[](ClientBase* self, ObjectMeta& metadata,
Expand All @@ -235,6 +244,16 @@ void bind_client(py::module& mod) {
return metadata;
},
"metadata"_a, "instance_id"_a)
.def(
"create_metadata",
[](ClientBase* self, std::vector<ObjectMeta>& metadatas,
InstanceID const& instance_id) -> std::vector<ObjectMeta>& {
std::vector<ObjectID> object_ids;
throw_on_error(
self->CreateMetaData(metadatas, instance_id, object_ids));
return metadatas;
},
"metadata"_a, "instance_id"_a)
.def(
"delete",
[](ClientBase* self, const ObjectIDWrapper object_id,
Expand Down Expand Up @@ -570,6 +589,8 @@ void bind_client(py::module& mod) {
throw_on_error(self->Debug(detail::to_json(debug), result));
return detail::from_json(result);
})
.def_property("compression", &ClientBase::compression_enabled,
&ClientBase::set_compression_enabled)
.def_property_readonly("ipc_socket", &ClientBase::IPCSocket,
doc::ClientBase_ipc_socket)
.def_property_readonly("rpc_endpoint", &ClientBase::RPCEndpoint,
Expand All @@ -592,6 +613,18 @@ void bind_client(py::module& mod) {
return std::shared_ptr<BlobWriter>(blob.release());
},
py::return_value_policy::move, "size"_a, doc::IPCClient_create_blob)
.def(
"create_blob",
[](Client* self, std::vector<size_t> const& sizes) {
std::vector<std::unique_ptr<BlobWriter>> blobs;
throw_on_error(self->CreateBlobs(sizes, blobs));
std::vector<std::shared_ptr<BlobWriter>> lived_blobs;
for (auto& blob : blobs) {
lived_blobs.emplace_back(blob.release());
}
return lived_blobs;
},
py::return_value_policy::move, "size"_a, doc::IPCClient_create_blob)
.def(
"create_empty_blob",
[](Client* self) -> std::shared_ptr<Blob> {
Expand Down Expand Up @@ -781,11 +814,22 @@ void bind_client(py::module& mod) {
"create_remote_blob",
[](RPCClient* self,
const std::shared_ptr<RemoteBlobWriter>& remote_blob_builder)
-> ObjectIDWrapper {
ObjectID blob_id = InvalidObjectID();
-> ObjectMeta {
ObjectMeta blob_meta;
throw_on_error(
self->CreateRemoteBlob(remote_blob_builder, blob_meta));
return blob_meta;
},
"remote_blob_builder"_a, doc::RPCClient_create_remote_blob)
.def(
"create_remote_blob",
[](RPCClient* self,
const std::vector<std::shared_ptr<RemoteBlobWriter>>&
remote_blob_builders) -> std::vector<ObjectMeta> {
std::vector<ObjectMeta> blob_metas;
throw_on_error(
self->CreateRemoteBlob(remote_blob_builder, blob_id));
return blob_id;
self->CreateRemoteBlobs(remote_blob_builders, blob_metas));
return blob_metas;
},
"remote_blob_builder"_a, doc::RPCClient_create_remote_blob)
.def(
Expand Down
12 changes: 6 additions & 6 deletions python/core.cc
Original file line number Diff line number Diff line change
Expand Up @@ -572,8 +572,8 @@ void bind_blobs(py::module& mod) {
if (self->size() == 0) {
return;
}
throw_on_error(copy_memoryview(buffer.ptr(), self->data(),
self->size(), offset, concurrency));
throw_on_error(copy_memoryview(self->data(), self->size(),
buffer.ptr(), offset, concurrency));
},
"offset"_a, "buffer"_a,
py::arg("concurrency") = memory::default_memcpy_concurrency,
Expand Down Expand Up @@ -750,8 +750,8 @@ void bind_blobs(py::module& mod) {
if (self->size() == 0) {
return;
}
throw_on_error(copy_memoryview(buffer.ptr(), self->data(),
self->size(), offset, concurrency));
throw_on_error(copy_memoryview(self->data(), self->size(),
buffer.ptr(), offset, concurrency));
},
"offset"_a, "buffer"_a,
py::arg("concurrency") = memory::default_memcpy_concurrency,
Expand All @@ -774,8 +774,8 @@ void bind_blobs(py::module& mod) {
std::to_string(self->size() - offset) +
"', but the buffer size is '" + std::to_string(size) + "'"));
}
memory::concurrent_memcpy(self->data() + offset, buffer, size,
concurrency);
throw_on_error(copy_memoryview(self->data(), self->size(), buffer,
size, offset, concurrency));
},
"offset"_a, "bytes"_a,
py::arg("concurrency") = memory::default_memcpy_concurrency,
Expand Down
94 changes: 64 additions & 30 deletions python/pybind11_docs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,8 @@ Base class for vineyard object builders.
)doc";

const char* ClientBase_create_metadata = R"doc(
.. method:: create_metadata(metadata: ObjectMeta) -> ObjectMeta
.. method:: create_metadata(metadata: Union[ObjectMeta, List[ObjectMeta]])
-> Union[ObjectMeta, List[ObjectMeta]]
:noindex:
Create metadata in vineyardd.
Expand All @@ -547,9 +548,12 @@ Create metadata in vineyardd.
The metadata that will be created on vineyardd.
Returns:
The result created metadata.
Union[ObjectMeta, List[ObjectMeta]]:
The result created metadata.
.. method:: create_metadata(metadata: ObjectMeta, instance_id: InstanceID) -> ObjectMeta
.. method:: create_metadata(metadata: Union[ObjectMeta, List[ObjectMeta]],
instance_id: InstanceID)
-> Union[ObjectMeta, List[ObjectMeta]]
:noindex:
Create metadata in vineyardd with a specified instance id.
Expand All @@ -561,7 +565,8 @@ Create metadata in vineyardd with a specified instance id.
The instance to place the object metadata.
Returns:
The result created metadata.
Union[ObjectMeta, List[ObjectMeta]]:
The result created metadata.
)doc";

const char* ClientBase_delete = R"doc(
Expand Down Expand Up @@ -833,17 +838,29 @@ IPC client that connects to vineyard instance's UNIX domain socket.
)doc";

const char* IPCClient_create_blob = R"doc(
.. method:: create_blob(size: int) -> Blob
.. method:: create_blob(size: int) -> BlobBuilder
:noindex:
Allocate a blob in vineyard server.
Allocate a blob in vineyard server.
Parameters:
size: int
The size of blob that will be allocated on vineyardd.
Parameters:
size: int
The size of blob that will be allocated on vineyardd.
Returns:
BlobBuilder
Returns:
BlobBuilder
.. method:: create_blob(sizes: List[int]) -> List[BlobBuilder]
:noindex:
Allocate blobs in vineyard server.
Parameters:
size: List[int]
The size of blobs that will be allocated on vineyardd.
Returns:
List[BlobBuilder]
)doc";

const char* IPCClient_create_empty_blob = R"doc(
Expand Down Expand Up @@ -1083,34 +1100,51 @@ Get multiple objects from vineyard.
)doc";

const char* RPCClient_create_remote_blob = R"doc(
.. method:: create_remote_blob(blob_builder: RemoteBlobBuilder) -> ObjectID
.. method:: create_remote_blob(blob_builder: RemoteBlobBuilder) -> ObjectMeta
:noindex:
Put the remote blob to connected remote vineyard instance. The :code:`blob_builder`
is assumed to be ready and modification on the :code:`blob_builder` after creation
won't take effect.
Put the remote blob to connected remote vineyard instance. The :code:`blob_builder`
is assumed to be ready and modification on the :code:`blob_builder` after creation
won't take effect.
Note that creating remote blobs requires network transfer and may yields significate
overhead.
Note that creating remote blobs requires network transfer and may yields significate
overhead.
.. code:: python
.. code:: python
vineyard_rpc_client = vineyard.connect(*vineyard_endpoint.split(':'))
vineyard_rpc_client = vineyard.connect(*vineyard_endpoint.split(':'))
buffer_writer = RemoteBlobBuilder(len(payload))
buffer_writer.copy(0, payload)
blob_id = vineyard_rpc_client.create_remote_blob(buffer_writer)
buffer_writer = RemoteBlobBuilder(len(payload))
buffer_writer.copy(0, payload)
blob_meta = vineyard_rpc_client.create_remote_blob(buffer_writer)
Parameters:
blob_builder: RemoteBlobBuilder
The remote blob to create.
Parameters:
blob_builder: RemoteBlobBuilder
The remote blob to create.
Returns:
ObjectID
Returns:
ObjectMeta
See Also:
RPCClient.get_remote_blob
RPCClient.get_remote_blobs
See Also:
RPCClient.get_remote_blob
RPCClient.get_remote_blobs
.. method:: create_remote_blob(blob_builders: List[RemoteBlobBuilder]) -> List[ObjectMeta]
:noindex:
Put the remote blobs to connected remote vineyard instance. The :code:`blob_builders`
is assumed to be ready and modification on the :code:`blob_builder` after creation
won't take effect.
Note that creating remote blobs requires network transfer and may yields significate
overhead.
Returns:
List[ObjectMeta]
See Also:
RPCClient.get_remote_blob
RPCClient.get_remote_blobs
)doc";

const char* RPCClient_get_remote_blob = R"doc(
Expand Down
Loading

0 comments on commit 95a1610

Please sign in to comment.