Skip to content

Commit

Permalink
Support to use RPC to get object and improve the resolver and builder…
Browse files Browse the repository at this point in the history
… of python (#1622)

Signed-off-by: Ye Cao <[email protected]>
  • Loading branch information
dashanji authored Nov 27, 2023
1 parent 7a9b3fc commit 77c668d
Show file tree
Hide file tree
Showing 34 changed files with 508 additions and 39 deletions.
6 changes: 5 additions & 1 deletion python/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,11 @@ void bind_client(py::module& mod) {
.def_property_readonly("rpc_endpoint", &ClientBase::RPCEndpoint,
doc::ClientBase_rpc_endpoint)
.def_property_readonly("version", &ClientBase::Version,
doc::ClientBase_version);
doc::ClientBase_version)
.def_property_readonly("is_ipc", &ClientBase::IsIPC,
doc::ClientBase_is_ipc)
.def_property_readonly("is_rpc", &ClientBase::IsRPC,
doc::ClientBase_is_rpc);

// Client
py::class_<Client, std::shared_ptr<Client>, ClientBase>(mod, "IPCClient",
Expand Down
2 changes: 1 addition & 1 deletion python/core.cc
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ void bind_blobs(py::module& mod) {
});

// RemoteBlob
py::class_<RemoteBlob, std::shared_ptr<RemoteBlob>>(
py::class_<RemoteBlob, std::shared_ptr<RemoteBlob>, Object>(
mod, "RemoteBlob", py::buffer_protocol(), doc::RemoteBlob)
.def_property_readonly(
"id", [](RemoteBlob* self) -> ObjectIDWrapper { return self->id(); },
Expand Down
8 changes: 8 additions & 0 deletions python/pybind11_docs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,14 @@ The version number string of connected vineyard server, in the format of semver:
:code:`MAJOR.MINOR.PATCH`.
)doc";

const char* ClientBase_is_ipc = R"doc(
Whether the client is connected to vineyard server via UNIX domain socket.
)doc";

const char* ClientBase_is_rpc = R"doc(
Whether the client is connected to vineyard server via RPC endpoint.
)doc";

const char* IPCClient = R"doc(
IPC client that connects to vineyard instance's UNIX domain socket.
)doc";
Expand Down
2 changes: 2 additions & 0 deletions python/pybind11_docs.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ extern const char* ClientBase_status;
extern const char* ClientBase_ipc_socket;
extern const char* ClientBase_rpc_endpoint;
extern const char* ClientBase_version;
extern const char* ClientBase_is_ipc;
extern const char* ClientBase_is_rpc;

extern const char* IPCClient;
extern const char* IPCClient_create_blob;
Expand Down
4 changes: 4 additions & 0 deletions python/vineyard/contrib/ml/tests/test_dali.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

import lazy_import
import pytest
import pytest_cases

from vineyard.conftest import vineyard_client
from vineyard.conftest import vineyard_rpc_client
from vineyard.contrib.ml.dali import dali_context

dali = lazy_import.lazy_module("nvidia.dali")
Expand All @@ -32,6 +35,7 @@ def vineyard_for_dali():
yield


@pytest_cases.parametrize("vineyard_client", [vineyard_client, vineyard_rpc_client])
def test_dali_tensor(vineyard_client):
@dali.pipeline_def()
def pipe():
Expand Down
7 changes: 7 additions & 0 deletions python/vineyard/contrib/ml/tests/test_mxnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@

import lazy_import
import pytest
import pytest_cases

from vineyard.conftest import vineyard_client
from vineyard.conftest import vineyard_rpc_client
from vineyard.contrib.ml.mxnet import mxnet_context

mx = lazy_import.lazy_module("mxnet")
Expand All @@ -34,6 +37,7 @@ def vineyard_for_mxnet():
yield


@pytest_cases.parametrize("vineyard_client", [vineyard_client, vineyard_rpc_client])
def test_mxnet_tensor(vineyard_client):
data = [np.random.rand(2, 3) for i in range(10)]
label = [np.random.rand(2, 3) for i in range(10)]
Expand All @@ -45,6 +49,7 @@ def test_mxnet_tensor(vineyard_client):
assert dataset[1][0].shape == dtrain[1][0].shape


@pytest_cases.parametrize("vineyard_client", [vineyard_client, vineyard_rpc_client])
def test_mxnet_dataframe(vineyard_client):
df = pd.DataFrame({'a': [1, 2, 3, 4], 'b': [5, 6, 7, 8], 'c': [1.0, 2.0, 3.0, 4.0]})
label = df['c'].values.astype(np.float32)
Expand All @@ -59,6 +64,7 @@ def test_mxnet_dataframe(vineyard_client):
assert dataset[1].shape == dtrain[1].shape


@pytest_cases.parametrize("vineyard_client", [vineyard_client, vineyard_rpc_client])
def test_mxnet_record_batch(vineyard_client):
arrays = [
pa.array([1, 2, 3, 4]),
Expand All @@ -72,6 +78,7 @@ def test_mxnet_record_batch(vineyard_client):
assert len(dtrain[0][0]) == 2


@pytest_cases.parametrize("vineyard_client", [vineyard_client, vineyard_rpc_client])
def test_mxnet_table(vineyard_client):
arrays = [pa.array([1, 2]), pa.array([0, 1]), pa.array([0.1, 0.2])]
batch = pa.RecordBatch.from_arrays(arrays, ['f0', 'f1', 'target'])
Expand Down
7 changes: 7 additions & 0 deletions python/vineyard/contrib/ml/tests/test_tensorflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@

import lazy_import
import pytest
import pytest_cases

from vineyard.conftest import vineyard_client
from vineyard.conftest import vineyard_rpc_client
from vineyard.contrib.ml.tensorflow import tensorflow_context
from vineyard.core.builder import builder_context
from vineyard.core.resolver import resolver_context
Expand All @@ -36,6 +39,7 @@ def vineyard_for_tensorflow():
yield


@pytest_cases.parametrize("vineyard_client", [vineyard_client, vineyard_rpc_client])
def test_tensorflow_tensor(vineyard_client):
data = [np.random.rand(2, 3) for i in range(10)]
label = [np.random.rand(2, 3) for i in range(10)]
Expand All @@ -53,6 +57,7 @@ def test_tensorflow_tensor(vineyard_client):
assert len(dataset) == len(dtrain)


@pytest_cases.parametrize("vineyard_client", [vineyard_client, vineyard_rpc_client])
def test_tensorflow_dataframe(vineyard_client):
df = pd.DataFrame(
{'a': [1, 2, 3, 4], 'b': [5, 6, 7, 8], 'target': [1.0, 2.0, 3.0, 4.0]}
Expand All @@ -69,6 +74,7 @@ def test_tensorflow_dataframe(vineyard_client):
assert data_ncols == dtrain_ncols


@pytest_cases.parametrize("vineyard_client", [vineyard_client, vineyard_rpc_client])
def test_tensorflow_record_batch(vineyard_client):
arrays = [
pa.array([1, 2, 3, 4]),
Expand All @@ -84,6 +90,7 @@ def test_tensorflow_record_batch(vineyard_client):
assert len(dtrain) == 4


@pytest_cases.parametrize("vineyard_client", [vineyard_client, vineyard_rpc_client])
def test_tensorflow_table(vineyard_client):
arrays = [pa.array([1, 2]), pa.array([0, 1]), pa.array([0.1, 0.2])]
batch = pa.RecordBatch.from_arrays(arrays, ['f0', 'f1', 'label'])
Expand Down
9 changes: 9 additions & 0 deletions python/vineyard/contrib/ml/tests/test_torch.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@

import lazy_import
import pytest
import pytest_cases

from vineyard.conftest import vineyard_client
from vineyard.conftest import vineyard_rpc_client
from vineyard.contrib.ml.torch import torch_context
from vineyard.data.dataframe import NDArrayArray

Expand All @@ -36,6 +39,7 @@ def vineyard_for_torch():
yield


@pytest_cases.parametrize("vineyard_client", [vineyard_client, vineyard_rpc_client])
def test_torch_tensor(vineyard_client):
tensor = torch.ones(5, 2)
object_id = vineyard_client.put(tensor)
Expand All @@ -47,6 +51,7 @@ def test_torch_tensor(vineyard_client):
assert torch.equal(value, tensor)


@pytest_cases.parametrize("vineyard_client", [vineyard_client, vineyard_rpc_client])
def test_torch_dataset(vineyard_client):
dataset = torch.utils.data.TensorDataset(
*[torch.tensor(np.random.rand(2, 3)), torch.tensor(np.random.rand(2, 3))],
Expand All @@ -62,6 +67,7 @@ def test_torch_dataset(vineyard_client):
assert torch.isclose(t1, t2).all()


@pytest_cases.parametrize("vineyard_client", [vineyard_client, vineyard_rpc_client])
def test_torch_dataset_dataframe(vineyard_client):
df = pd.DataFrame({'a': [1, 2, 3, 4], 'b': [5, 6, 7, 8], 'c': [1.0, 2.0, 3.0, 4.0]})
object_id = vineyard_client.put(df)
Expand All @@ -77,6 +83,7 @@ def test_torch_dataset_dataframe(vineyard_client):
).all()


@pytest_cases.parametrize("vineyard_client", [vineyard_client, vineyard_rpc_client])
def test_torch_dataset_dataframe_multidimensional(vineyard_client):
df = pd.DataFrame(
{
Expand All @@ -91,6 +98,7 @@ def test_torch_dataset_dataframe_multidimensional(vineyard_client):
assert len(df.columns) == len(value.tensors)


@pytest_cases.parametrize("vineyard_client", [vineyard_client, vineyard_rpc_client])
def test_torch_dataset_recordbatch(vineyard_client):
df = pd.DataFrame({'a': [1, 2, 3, 4], 'b': [5, 6, 7, 8], 'c': [1.0, 2.0, 3.0, 4.0]})
batch = pa.RecordBatch.from_pandas(df)
Expand All @@ -107,6 +115,7 @@ def test_torch_dataset_recordbatch(vineyard_client):
).all()


@pytest_cases.parametrize("vineyard_client", [vineyard_client, vineyard_rpc_client])
def test_torch_dataset_table(vineyard_client):
df = pd.DataFrame({'a': [1, 2, 3, 4], 'b': [5, 6, 7, 8], 'c': [1.0, 2.0, 3.0, 4.0]})
table = pa.Table.from_pandas(df)
Expand Down
5 changes: 5 additions & 0 deletions python/vineyard/contrib/ml/tests/test_torcharrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

import lazy_import
import pytest
import pytest_cases

from vineyard.conftest import vineyard_client
from vineyard.conftest import vineyard_rpc_client
from vineyard.contrib.ml.torcharrow import torcharrow_context

ta = lazy_import.lazy_module("torcharrow")
Expand All @@ -30,6 +33,7 @@ def vineyard_for_torcharrow():
yield


@pytest_cases.parametrize("vineyard_client", [vineyard_client, vineyard_rpc_client])
def test_torch_arrow_column(vineyard_client):
s = ta.column([1, 2, None, 4])
assert s.sum() == 7
Expand All @@ -39,6 +43,7 @@ def test_torch_arrow_column(vineyard_client):
assert s.sum() == 7


@pytest_cases.parametrize("vineyard_client", [vineyard_client, vineyard_rpc_client])
def test_torch_arrow_dataframe(vineyard_client):
s = ta.dataframe({"a": [1, 2, None, 4], "b": [5, 6, None, 8]})
assert s.sum()['a'][0] == 7
Expand Down
8 changes: 8 additions & 0 deletions python/vineyard/contrib/ml/tests/test_xgboost.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import pyarrow as pa

import pytest
import pytest_cases

from vineyard.conftest import vineyard_client
from vineyard.conftest import vineyard_rpc_client
from vineyard.contrib.ml.xgboost import xgboost_context


Expand All @@ -31,6 +34,7 @@ def vineyard_for_xgboost():
yield


@pytest_cases.parametrize("vineyard_client", [vineyard_client, vineyard_rpc_client])
def test_numpy_ndarray(vineyard_client):
arr = np.random.rand(4, 5)
object_id = vineyard_client.put(arr)
Expand All @@ -39,6 +43,7 @@ def test_numpy_ndarray(vineyard_client):
assert dtrain.num_row() == 4


@pytest_cases.parametrize("vineyard_client", [vineyard_client, vineyard_rpc_client])
def test_pandas_dataframe_specify_label(vineyard_client):
df = pd.DataFrame({'a': [1, 2, 3, 4], 'b': [5, 6, 7, 8], 'c': [1.0, 2.0, 3.0, 4.0]})
object_id = vineyard_client.put(df)
Expand All @@ -50,6 +55,7 @@ def test_pandas_dataframe_specify_label(vineyard_client):
assert dtrain.feature_names == ['b', 'c']


@pytest_cases.parametrize("vineyard_client", [vineyard_client, vineyard_rpc_client])
def test_pandas_dataframe_specify_data(vineyard_client):
df = pd.DataFrame(
{'a': [1, 2, 3, 4], 'b': [[5, 1.0], [6, 2.0], [7, 3.0], [8, 9.0]]}
Expand All @@ -62,6 +68,7 @@ def test_pandas_dataframe_specify_data(vineyard_client):
assert np.allclose(arr, dtrain.get_label())


@pytest_cases.parametrize("vineyard_client", [vineyard_client, vineyard_rpc_client])
def test_record_batch_xgb_resolver(vineyard_client):
arrays = [
pa.array([1, 2, 3, 4]),
Expand All @@ -78,6 +85,7 @@ def test_record_batch_xgb_resolver(vineyard_client):
assert dtrain.feature_names == ['f0', 'f1']


@pytest_cases.parametrize("vineyard_client", [vineyard_client, vineyard_rpc_client])
def test_table_xgb_resolver(vineyard_client):
arrays = [pa.array([1, 2]), pa.array([0, 1]), pa.array([0.1, 0.2])]
batch = pa.RecordBatch.from_arrays(arrays, ['f0', 'label', 'f2'])
Expand Down
13 changes: 7 additions & 6 deletions python/vineyard/core/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,13 @@ def get(
object_id = client.get_name(name)

# run resolver
obj = client.get_object(object_id, fetch=fetch)
meta = obj.meta
if not meta.islocal and not meta.isglobal:
raise ValueError(
"Not a local object: for remote object, you can only get its metadata"
)
if client.is_rpc:
obj = client.get_object(object_id)
elif client.is_ipc:
obj = client.get_object(object_id, fetch=fetch)
else:
raise RuntimeError('Unknown vineyard client type: %s' % type(client))

if resolver is None:
resolver = get_current_resolvers()
return resolver(obj, __vineyard_client=client, **kw)
Expand Down
9 changes: 0 additions & 9 deletions python/vineyard/core/tests/test_rpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,6 @@ def test_remote_blob_create_and_get_large_object(vineyard_endpoint):
assert memoryview(remote_blob) == memoryview(large_payload)


def test_remote_blob_error(vineyard_endpoint):
vineyard_rpc_client = vineyard.connect(*vineyard_endpoint.split(':'))

with pytest.raises(
ValueError, match="Vineyard RPC client cannot be used to create local blobs"
):
vineyard_rpc_client.put(np.ones((2, 3, 4)))


def test_multiple_remote_blobs(vineyard_endpoint):
vineyard_rpc_client = vineyard.connect(*vineyard_endpoint.split(':'))

Expand Down
12 changes: 8 additions & 4 deletions python/vineyard/data/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@
from vineyard.data.utils import normalize_dtype


def buffer_builder(
client: IPCClient, buffer: Union[bytes, memoryview], builder: BuilderContext
):
def buffer_builder(client, buffer: Union[bytes, memoryview], builder: BuilderContext):
if buffer is None:
address = None
size = 0
Expand All @@ -51,7 +49,13 @@ def buffer_builder(


def as_arrow_buffer(blob: Blob):
buffer = blob.buffer
if isinstance(blob, Blob):
buffer = blob.buffer
else:
if not blob.is_empty:
buffer = memoryview(blob)
else:
buffer = memoryview(b'')
if buffer is None:
return pa.py_buffer(bytearray())
return pa.py_buffer(buffer)
Expand Down
1 change: 1 addition & 0 deletions python/vineyard/data/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def register_base_types(

if resolver_ctx is not None:
resolver_ctx.register('vineyard::Blob', bytes_resolver)
resolver_ctx.register('vineyard::RemoteBlob', bytes_resolver)
resolver_ctx.register('vineyard::Scalar', scalar_resolver)
resolver_ctx.register('vineyard::Array', array_resolver)
resolver_ctx.register('vineyard::Sequence', sequence_resolver)
Loading

0 comments on commit 77c668d

Please sign in to comment.