Skip to content

Commit

Permalink
Support to gather all remote blobs concurrently when get an object (#…
Browse files Browse the repository at this point in the history
…1899)

Fixes #1898

Signed-off-by: Ye Cao <[email protected]>
  • Loading branch information
dashanji authored Jun 3, 2024
1 parent 65570be commit 23a8f17
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 34 deletions.
4 changes: 4 additions & 0 deletions python/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,10 @@ void bind_client(py::module& mod) {
"get_remote_blobs",
[](RPCClient* self, std::vector<ObjectIDWrapper> object_ids,
const bool unsafe) {
// Release GIL to avoid blocking the other threads
// See also
// https://pybind11.readthedocs.io/en/stable/advanced/misc.html#global-interpreter-lock-gil
py::gil_scoped_release release;
std::vector<ObjectID> unwrapped_object_ids(object_ids.size());
for (size_t idx = 0; idx < object_ids.size(); ++idx) {
unwrapped_object_ids[idx] = object_ids[idx];
Expand Down
7 changes: 7 additions & 0 deletions python/core.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,12 @@ void bind_core(py::module& mod) {
[](ObjectMeta* self, std::string const& key, py::dict const& value) {
self->AddKeyValue(key, detail::to_json(value));
})
.def(
"add_remote_blob",
[](ObjectMeta* self, RemoteBlob const& blob) {
self->AddRemoteBlob(blob);
},
doc::ObjectMeta_add_remote_blob)
.def(
"add_member",
[](ObjectMeta* self, std::string const& key, Object const* member) {
Expand All @@ -331,6 +337,7 @@ void bind_core(py::module& mod) {
return detail::from_json(usages);
},
py::arg("pretty") = true)
.def("force_local", &ObjectMeta::ForceLocal)
.def_property_readonly("timestamp", &ObjectMeta::Timestamp)
.def_property_readonly("labels",
[](const ObjectMeta* self) -> py::object {
Expand Down
11 changes: 11 additions & 0 deletions python/pybind11_docs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,17 @@ Add a member object.
The reference to the member object or the object id of the member object.
)doc";

const char* ObjectMeta_add_remote_blob = R"doc(
.. method:: add_remote_blob(self, RemoteBlob blob) -> None
:noindex:
Add the remote blob's buffer to the metadata.
Parameters:
blob: :class:`RemoteBlob`
The reference to the remote blob.
)doc";

const char* ObjectMeta_add_member = R"doc(
.. method:: add_member(self, key: str, ObjectID, Object or ObjectMeta) -> None
:noindex:
Expand Down
1 change: 1 addition & 0 deletions python/pybind11_docs.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ extern const char* ObjectMeta__getitem__;
extern const char* ObjectMeta_get;
extern const char* ObjectMeta_get_member;
extern const char* ObjectMeta__setitem__;
extern const char* ObjectMeta_add_remote_blob;
extern const char* ObjectMeta_add_member;

extern const char* Object;
Expand Down
126 changes: 93 additions & 33 deletions python/vineyard/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,52 @@ def _parse_configuration(config) -> Tuple[Optional[str], Optional[str]]:
return ipc_socket, rpc_endpoint


def _is_blob(object_id: ObjectID):
"""_is_blob_
Args:
object_id (ObjectID): ObjectID to check if it is a blob
Returns:
bool: True if the object_id is a blob, False otherwise
"""
return int(object_id) & 0x8000000000000000


def _traverse_blobs(meta: ObjectMeta, blobs=None):
"""_traverse_blobs_
Recursively traverses ObjectMeta to find and accumulate blob IDs by instance_id.
Args:
meta (ObjectMeta): ObjectMeta to traverse for blobs.
blobs (dict, optional): Accumulator for blobs organized by instance_id.
Returns:
dict: A dictionary of blobs organized by instance_id.
"""

if blobs is None:
blobs = {}

def add_blob(instance_id, blob_id):
if instance_id not in blobs:
blobs[instance_id] = []
blobs[instance_id].append(blob_id)

if _is_blob(meta.id):
add_blob(meta.instance_id, meta.id)
else:
for _, v in meta.items():
if isinstance(v, ObjectMeta):
if _is_blob(v.id):
add_blob(v.instance_id, v.id)
else:
_traverse_blobs(v, blobs)

return blobs


class Client:
"""Client is responsible for managing IPC and RPC clients for Vineyard
and provides a high-level interface to fetch objects from the Vineyard cluster.
Expand Down Expand Up @@ -490,12 +536,12 @@ def get_remote_blobs(
return self.rpc_client.get_remote_blobs(object_ids, unsafe)

@_apply_docstring(IPCClient.get_object)
def get_object(self, object_id: ObjectID) -> Object:
def get_object(self, object_id: ObjectID, fetch: bool = False) -> Object:
"""
Fetches the object associated with the given object_id from Vineyard.
The IPC client is preferred if it's available, otherwise the RPC client
"""
return self._fetch_object(object_id)
return self._fetch_object(object_id, enable_migrate=fetch)

@_apply_docstring(IPCClient.get_objects)
def get_objects(self, object_ids: List[ObjectID]) -> List[Object]:
Expand Down Expand Up @@ -573,58 +619,72 @@ def fork(self) -> 'Client':
self._rpc_client = self._rpc_client.fork()
return self

def _fetch_object(self, object_id: ObjectID) -> Object:
def _fetch_object(self, object_id: ObjectID, enable_migrate: bool) -> Object:
meta = self.get_meta(object_id, sync_remote=True)

if self.has_ipc_client():
if meta.instance_id == self._ipc_client.instance_id or meta.isglobal:
return Object.from_(meta)
else:
warnings.warn(
f"Migrating object {object_id} from another vineyard instance "
f"{meta.instance_id}"
if self.has_ipc_client() and enable_migrate:
return self._ipc_client.get_object(object_id, fetch=True)

blobs = _traverse_blobs(meta)

cluster_info = self.default_client().meta
meta.force_local()
meta._client = None

with ThreadPoolExecutor() as executor:
futures = {
executor.submit(
self._fetch_blobs_from_instance,
cluster_info,
instance_id,
blobs[instance_id],
self.compression,
)
return self._ipc_client.get_object(object_id, fetch=True)
if self.has_rpc_client():
if self._rpc_client.is_fetchable(meta):
return self._rpc_client.get_object(object_id)
else:
return self._locate_and_fetch(meta)
for instance_id in blobs
if instance_id != self.instance_id
}

def _locate_and_fetch(self, meta) -> Object:
"""
Fetches an object from another instance in the Vineyard cluster based on
the meta information.
for future in as_completed(futures):
fetched_blobs = future.result()
for blob in fetched_blobs:
meta.add_remote_blob(blob)

return Object.from_(meta)

It's triggered when the RPC client is not able to fetch the object from the
current instance.
def _fetch_blobs_from_instance(
self, cluster_info, instance_id, blob_ids, compression
) -> Object:
"""Fetches all blobs from a given instance id in the Vineyard cluster.
Args:
cluster_info (Dict): The cluster information of the Vineyard cluster.
instance_id (int): The instance id to fetch blobs from.
blob_ids (List): The list of blob ids to fetch.
compression (bool): Whether to enable compression for RPC Client.
Returns:
RemoteBlob(List): The list of fetched remote blobs.
"""
cluster_info = self._rpc_client.meta
instance_status = cluster_info.get(meta.instance_id)
instance_status = cluster_info.get(instance_id)

if instance_status is None or instance_status['rpc_endpoint'] is None:
raise RuntimeError(
"The rpc endpoint of the vineyard instance "
f"{meta.instance_id} is not available."
f"{instance_id} is not available."
)

previous_compression_state = self.compression
host, port = instance_status['rpc_endpoint'].split(':')
try:
with envvars('VINEYARD_RPC_SKIP_RETRY', '1'):
remote_client = _connect(host, port)
remote_client.compression = previous_compression_state
remote_client.compression = compression
except Exception as exec:
raise RuntimeError(
f"Failed to connect to the vineyard instance {meta.instance_id} "
f"Failed to connect to the vineyard instance {instance_id} "
f"at {host}:{port}."
) from exec

warnings.warn(
f"Fetching remote object {meta.id} from the remote vineyard instance "
f"{meta.instance_id} at {host}:{port}."
)
return remote_client.get_object(meta.id)
return remote_client.get_remote_blobs(blob_ids)

def _connect_and_get_memory(self, instance_id, rpc_endpoint):
host, port = rpc_endpoint.split(':')
Expand Down
2 changes: 1 addition & 1 deletion python/vineyard/core/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def get(
elif name is not None:
object_id = client.get_name(name)

obj = client.get_object(object_id)
obj = client.get_object(object_id, fetch=fetch)

if resolver is None:
resolver = get_current_resolvers()
Expand Down
6 changes: 6 additions & 0 deletions src/client/ds/object_meta.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
#include "client/client.h"
#include "client/client_base.h"
#include "client/ds/blob.h"
#include "client/ds/remote_blob.h"
#include "common/util/env.h"

namespace vineyard {
Expand Down Expand Up @@ -152,6 +153,11 @@ void ObjectMeta::GetKeyValue(const std::string& key, json& value) const {
}
}

void ObjectMeta::AddRemoteBlob(const RemoteBlob& blob) {
VINEYARD_CHECK_OK(buffer_set_->EmplaceBuffer(blob.id()));
VINEYARD_CHECK_OK(buffer_set_->EmplaceBuffer(blob.id(), blob.Buffer()));
}

void ObjectMeta::AddMember(const std::string& name, const ObjectMeta& member) {
VINEYARD_ASSERT(!meta_.contains(name));
meta_[name] = member.meta_;
Expand Down
8 changes: 8 additions & 0 deletions src/client/ds/object_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ limitations under the License.
namespace vineyard {

class Blob;
class RemoteBlob;
class Buffer;
class BufferSet;
class ClientBase;
Expand Down Expand Up @@ -590,6 +591,13 @@ class ObjectMeta {
*/
void GetKeyValue(const std::string& key, json& value) const;

/**
* @brief Add remote blob's buffer to ObjectMeta.
*
* @param blob The remote blob to be added.
*/
void AddRemoteBlob(const RemoteBlob& blob);

/**
* @brief Add member to ObjectMeta.
*
Expand Down

0 comments on commit 23a8f17

Please sign in to comment.