Skip to content

Commit 0128ba8

Browse files
authored
DATA-3443: Add export_tabular_data to data client (#800)
1 parent fe27252 commit 0128ba8

File tree

4 files changed

+190
-12
lines changed

4 files changed

+190
-12
lines changed

CONTRIBUTING.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ We use [`uv`](https://docs.astral.sh/uv/) to manage our environments and depende
6161

6262
4. When you're done making changes, check that your changes conform to any code formatting requirements and pass any tests.
6363

64-
- When testing, make sure you use the correct virtual environment by running either `uv make test` or `source .venv/bin/activate; make test`
64+
- When testing, make sure you use the correct virtual environment by running either `uv run make test` or `source .venv/bin/activate; make test`
6565

6666
5. Commit your changes and open a pull request.
6767

src/viam/app/data_client.py

+131-9
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
BinaryID,
2424
BoundingBoxLabelsByFilterRequest,
2525
BoundingBoxLabelsByFilterResponse,
26+
CaptureInterval,
2627
CaptureMetadata,
2728
ConfigureDatabaseUserRequest,
2829
DataRequest,
@@ -33,6 +34,8 @@
3334
DeleteBinaryDataByIDsResponse,
3435
DeleteTabularDataRequest,
3536
DeleteTabularDataResponse,
37+
ExportTabularDataRequest,
38+
ExportTabularDataResponse,
3639
Filter,
3740
GetDatabaseConnectionRequest,
3841
GetDatabaseConnectionResponse,
@@ -145,6 +148,69 @@ def __eq__(self, other: object) -> bool:
145148
return str(self) == str(other)
146149
return False
147150

151+
@dataclass
152+
class TabularDataPoint:
153+
"""Represents a tabular data point and its associated metadata."""
154+
155+
part_id: str
156+
"""The robot part ID"""
157+
158+
resource_name: str
159+
"""The resource name"""
160+
161+
resource_subtype: str
162+
"""The resource subtype. Ex: `rdk:component:sensor`"""
163+
164+
method_name: str
165+
"""The method used for data capture. Ex" `Readings`"""
166+
167+
time_captured: datetime
168+
"""The time at which the data point was captured"""
169+
170+
organization_id: str
171+
"""The organization ID"""
172+
173+
location_id: str
174+
"""The location ID"""
175+
176+
robot_name: str
177+
"""The robot name"""
178+
179+
robot_id: str
180+
"""The robot ID"""
181+
182+
part_name: str
183+
"""The robot part name"""
184+
185+
method_parameters: Mapping[str, ValueTypes]
186+
"""Additional parameters associated with the data capture method"""
187+
188+
tags: List[str]
189+
"""A list of tags associated with the data point"""
190+
191+
payload: Mapping[str, ValueTypes]
192+
"""The captured data"""
193+
194+
def __str__(self) -> str:
195+
return (
196+
f"TabularDataPoint("
197+
f"robot='{self.robot_name}' (id={self.robot_id}), "
198+
f"part='{self.part_name}' (id={self.part_id}), "
199+
f"resource='{self.resource_name}' ({self.resource_subtype}), "
200+
f"method='{self.method_name}', "
201+
f"org={self.organization_id}, "
202+
f"location={self.location_id}, "
203+
f"time='{self.time_captured.isoformat()}', "
204+
f"params={self.method_parameters}, "
205+
f"tags={self.tags}, "
206+
f"payload={self.payload})"
207+
)
208+
209+
def __eq__(self, other: object) -> bool:
210+
if isinstance(other, DataClient.TabularDataPoint):
211+
return str(self) == str(other)
212+
return False
213+
148214
def __init__(self, channel: Channel, metadata: Mapping[str, str]):
149215
"""Create a `DataClient` that maintains a connection to app.
150216
@@ -254,7 +320,6 @@ async def tabular_data_by_sql(self, organization_id: str, sql_query: str) -> Lis
254320
sql_query="SELECT * FROM readings LIMIT 5"
255321
)
256322
257-
258323
Args:
259324
organization_id (str): The ID of the organization that owns the data.
260325
You can obtain your organization ID from the Viam app's organization settings page.
@@ -284,7 +349,6 @@ async def tabular_data_by_mql(self, organization_id: str, mql_binary: List[bytes
284349
285350
print(f"Tabular Data: {tabular_data}")
286351
287-
288352
Args:
289353
organization_id (str): The ID of the organization that owns the data.
290354
You can obtain your organization ID from the Viam app's organization settings page.
@@ -307,13 +371,12 @@ async def get_latest_tabular_data(
307371
308372
::
309373
310-
time_captured, time_synced, payload = await data_client.get_latest_tabular_data(
311-
part_id="<PART-ID>",
312-
resource_name="<RESOURCE-NAME>",
313-
resource_subtype="<RESOURCE-SUBTYPE>",
314-
method_name="<METHOD-NAME>"
315-
)
316-
374+
time_captured, time_synced, payload = await data_client.get_latest_tabular_data(
375+
part_id="<PART-ID>",
376+
resource_name="<RESOURCE-NAME>",
377+
resource_subtype="<RESOURCE-SUBTYPE>",
378+
method_name="<METHOD-NAME>"
379+
)
317380
318381
Args:
319382
part_id (str): The ID of the part that owns the data.
@@ -327,6 +390,7 @@ async def get_latest_tabular_data(
327390
datetime: The time captured,
328391
datetime: The time synced,
329392
Dict[str, ValueTypes]: The latest tabular data captured from the specified data source.
393+
330394
For more information, see `Data Client API <https://docs.viam.com/appendix/apis/data-client/>`_.
331395
"""
332396

@@ -338,6 +402,64 @@ async def get_latest_tabular_data(
338402
return None
339403
return response.time_captured.ToDatetime(), response.time_synced.ToDatetime(), struct_to_dict(response.payload)
340404

405+
async def export_tabular_data(
406+
self, part_id: str, resource_name: str, resource_subtype: str, method_name: str, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None
407+
) -> List[TabularDataPoint]:
408+
"""Obtain unified tabular data and metadata from the specified data source.
409+
410+
::
411+
412+
tabular_data = await data_client.export_tabular_data(
413+
part_id="<PART-ID>",
414+
resource_name="<RESOURCE-NAME>",
415+
resource_subtype="<RESOURCE-SUBTYPE>",
416+
method_name="<METHOD-NAME>",
417+
start_time="<START_TIME>"
418+
end_time="<END_TIME>"
419+
)
420+
421+
print(f"My data: {tabular_data}")
422+
423+
Args:
424+
part_id (str): The ID of the part that owns the data.
425+
resource_name (str): The name of the requested resource that captured the data.
426+
resource_subtype (str): The subtype of the requested resource that captured the data.
427+
method_name (str): The data capture method name.
428+
start_time (datetime): Optional start time for requesting a specific range of data.
429+
end_time (datetime): Optional end time for requesting a specific range of data.
430+
431+
Returns:
432+
List[TabularDataPoint]: The unified tabular data and metadata.
433+
434+
For more information, see `Data Client API <https://docs.viam.com/appendix/apis/data-client/>`_.
435+
"""
436+
437+
interval=CaptureInterval(start=datetime_to_timestamp(start_time), end=datetime_to_timestamp(end_time))
438+
request = ExportTabularDataRequest(
439+
part_id=part_id, resource_name=resource_name, resource_subtype=resource_subtype, method_name=method_name, interval=interval
440+
)
441+
response: List[ExportTabularDataResponse] = await self._data_client.ExportTabularData(request, metadata=self._metadata)
442+
443+
return [
444+
DataClient.TabularDataPoint(
445+
part_id=resp.part_id,
446+
resource_name=resp.resource_name,
447+
resource_subtype=resp.resource_subtype,
448+
method_name=resp.method_name,
449+
time_captured=resp.time_captured.ToDatetime(),
450+
organization_id=resp.organization_id,
451+
location_id=resp.location_id,
452+
robot_name=resp.robot_name,
453+
robot_id=resp.robot_id,
454+
part_name=resp.part_name,
455+
method_parameters=struct_to_dict(resp.method_parameters),
456+
tags=list(resp.tags),
457+
payload=struct_to_dict(resp.payload)
458+
)
459+
for resp in response
460+
]
461+
462+
341463
async def binary_data_by_filter(
342464
self,
343465
filter: Optional[Filter] = None,

tests/mocks/services.py

+14
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,8 @@
201201
DeleteBinaryDataByIDsResponse,
202202
DeleteTabularDataRequest,
203203
DeleteTabularDataResponse,
204+
ExportTabularDataRequest,
205+
ExportTabularDataResponse,
204206
GetDatabaseConnectionRequest,
205207
GetDatabaseConnectionResponse,
206208
GetLatestTabularDataRequest,
@@ -767,6 +769,7 @@ class MockData(UnimplementedDataServiceBase):
767769
def __init__(
768770
self,
769771
tabular_response: List[DataClient.TabularData],
772+
tabular_export_response: List[ExportTabularDataResponse],
770773
tabular_query_response: List[Dict[str, Union[ValueTypes, datetime]]],
771774
binary_response: List[BinaryData],
772775
delete_remove_response: int,
@@ -775,6 +778,7 @@ def __init__(
775778
hostname_response: str,
776779
):
777780
self.tabular_response = tabular_response
781+
self.tabular_export_response = tabular_export_response
778782
self.tabular_query_response = tabular_query_response
779783
self.binary_response = binary_response
780784
self.delete_remove_response = delete_remove_response
@@ -975,6 +979,16 @@ async def GetLatestTabularData(self, stream: Stream[GetLatestTabularDataRequest,
975979
data = dict_to_struct(self.tabular_response[0].data)
976980
await stream.send_message(GetLatestTabularDataResponse(time_captured=timestamp, time_synced=timestamp, payload=data))
977981

982+
async def ExportTabularData(self, stream: Stream[ExportTabularDataRequest, ExportTabularDataResponse]) -> None:
983+
request = await stream.recv_message()
984+
assert request is not None
985+
self.part_id = request.part_id
986+
self.resource_name = request.resource_name
987+
self.resource_subtype = request.resource_subtype
988+
self.method_name = request.method_name
989+
self.interval = request.interval
990+
for tabular_data in self.tabular_export_response:
991+
await stream.send_message(tabular_data)
978992

979993
class MockDataset(DatasetServiceBase):
980994
def __init__(self, create_response: str, datasets_response: Sequence[Dataset]):

tests/test_data_client.py

+44-2
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22
from typing import List
33

44
import pytest
5+
from google.protobuf.struct_pb2 import Struct
56
from google.protobuf.timestamp_pb2 import Timestamp
67
from grpclib.testing import ChannelFor
78

89
from viam.app.data_client import DataClient
9-
from viam.proto.app.data import Annotations, BinaryData, BinaryID, BinaryMetadata, BoundingBox, CaptureMetadata, Filter, Order
10-
from viam.utils import create_filter
10+
from viam.proto.app.data import Annotations, BinaryData, BinaryID, BinaryMetadata, BoundingBox, CaptureInterval, CaptureMetadata, ExportTabularDataResponse, Filter, Order
11+
from viam.utils import create_filter, dict_to_struct
1112

1213
from .mocks.services import MockData
1314

@@ -56,6 +57,7 @@
5657
bbox_labels=BBOX_LABELS,
5758
dataset_id=DATASET_ID,
5859
)
60+
INTERVAL=CaptureInterval(start=START_TS, end=END_TS)
5961

6062
FILE_ID = "file_id"
6163
BINARY_ID = BinaryID(file_id=FILE_ID, organization_id=ORG_ID, location_id=LOCATION_ID)
@@ -101,6 +103,20 @@
101103
)
102104

103105
TABULAR_RESPONSE = [DataClient.TabularData(TABULAR_DATA, TABULAR_METADATA, START_DATETIME, END_DATETIME)]
106+
TABULAR_EXPORT_RESPONSE = [ExportTabularDataResponse(
107+
part_id=TABULAR_METADATA.part_id,
108+
resource_name = TABULAR_METADATA.component_name,
109+
resource_subtype = TABULAR_METADATA.component_type,
110+
time_captured = END_TS,
111+
organization_id = TABULAR_METADATA.organization_id,
112+
location_id =TABULAR_METADATA.location_id,
113+
robot_name = TABULAR_METADATA.robot_name,
114+
robot_id = TABULAR_METADATA.robot_id,
115+
part_name = TABULAR_METADATA.part_name,
116+
method_parameters = Struct(),
117+
tags = TABULAR_METADATA.tags,
118+
payload = dict_to_struct(TABULAR_DATA),
119+
)]
104120
TABULAR_QUERY_RESPONSE = [
105121
{"key1": START_DATETIME, "key2": "2", "key3": [1, 2, 3], "key4": {"key4sub1": END_DATETIME}},
106122
]
@@ -117,6 +133,7 @@
117133
def service() -> MockData:
118134
return MockData(
119135
tabular_response=TABULAR_RESPONSE,
136+
tabular_export_response=TABULAR_EXPORT_RESPONSE,
120137
tabular_query_response=TABULAR_QUERY_RESPONSE,
121138
binary_response=BINARY_RESPONSE,
122139
delete_remove_response=DELETE_REMOVE_RESPONSE,
@@ -179,6 +196,31 @@ async def test_get_latest_tabular_data(self, service: MockData):
179196
assert time_captured == time
180197
assert time_synced == time
181198

199+
async def test_export_tabular_data(self, service: MockData):
200+
async with ChannelFor([service]) as channel:
201+
client = DataClient(channel, DATA_SERVICE_METADATA)
202+
tabular_data = await client.export_tabular_data(PART_ID, COMPONENT_NAME, COMPONENT_TYPE, METHOD, START_DATETIME, END_DATETIME)
203+
assert tabular_data is not None
204+
for tabular_datum in tabular_data:
205+
assert tabular_datum is not None
206+
assert tabular_datum.part_id == TABULAR_METADATA.part_id
207+
assert tabular_datum.resource_name == TABULAR_METADATA.component_name
208+
assert tabular_datum.resource_subtype == TABULAR_METADATA.component_type
209+
assert tabular_datum.time_captured == END_DATETIME
210+
assert tabular_datum.organization_id == TABULAR_METADATA.organization_id
211+
assert tabular_datum.location_id == TABULAR_METADATA.location_id
212+
assert tabular_datum.robot_name == TABULAR_METADATA.robot_name
213+
assert tabular_datum.robot_id == TABULAR_METADATA.robot_id
214+
assert tabular_datum.part_name == TABULAR_METADATA.part_name
215+
assert tabular_datum.method_parameters == TABULAR_METADATA.method_parameters
216+
assert tabular_datum.tags == TABULAR_METADATA.tags
217+
assert tabular_datum.payload == TABULAR_DATA
218+
assert service.part_id == PART_ID
219+
assert service.resource_name == COMPONENT_NAME
220+
assert service.resource_subtype == COMPONENT_TYPE
221+
assert service.method_name == METHOD
222+
assert service.interval == INTERVAL
223+
182224
async def test_binary_data_by_filter(self, service: MockData):
183225
async with ChannelFor([service]) as channel:
184226
client = DataClient(channel, DATA_SERVICE_METADATA)

0 commit comments

Comments
 (0)