Skip to content

Commit

Permalink
RSDK-7218 - Data sync wrappers (#202)
Browse files Browse the repository at this point in the history
  • Loading branch information
stuqdog authored May 10, 2024
1 parent 6eb0b9e commit 250ab6a
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 0 deletions.
128 changes: 128 additions & 0 deletions lib/src/app/data.dart
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,134 @@ class DataClient {
await reader.cancel();
}
}

/// Upload binary sensor data to Viam's Data Manager
///
/// Returns the data's file ID.
Future<String> binaryDataCaptureUpload(List<int> binaryData, String partId, String fileExtension,
{String? componentType,
String? componentName,
String? methodName,
Map<String, Any>? methodParameters,
(DateTime, DateTime)? dataRequestTimes,
Iterable<String> tags = const []}) async {
final sensorMetadata = SensorMetadata();
if (dataRequestTimes != null) {
sensorMetadata.timeRequested = Timestamp.fromDateTime(dataRequestTimes.$1);
sensorMetadata.timeReceived = Timestamp.fromDateTime(dataRequestTimes.$2);
}

final metadata = UploadMetadata()
..partId = partId
..componentType = componentType ?? ''
..componentName = componentName ?? ''
..methodName = methodName ?? ''
..type = DataType.DATA_TYPE_BINARY_SENSOR
..tags.addAll(tags);
if (methodParameters != null) metadata.methodParameters.addAll(methodParameters);
if (fileExtension.isEmpty) {
metadata.fileExtension = '';
} else if (fileExtension[0] == '.') {
metadata.fileExtension = fileExtension;
} else {
metadata.fileExtension = '.$fileExtension';
}

final sensorContents = SensorData()
..binary = binaryData
..metadata = sensorMetadata;
final request = DataCaptureUploadRequest()
..metadata = metadata
..sensorContents.add(sensorContents);
final response = await _dataSyncClient.dataCaptureUpload(request);
return response.fileId;
}

/// Upload tabular sensor data to Viam's Data Manager
///
/// Returns the data's file ID.
Future<String> tabularDataCaptureUpload(List<Map<String, dynamic>> tabularData, String partId,
{String? componentType,
String? componentName,
String? methodName,
Map<String, Any>? methodParameters,
List<(DateTime, DateTime)>? dataRequestTimes,
Iterable<String> tags = const []}) async {
if (dataRequestTimes != null && dataRequestTimes.length != tabularData.length) {
throw Exception('dataRequestTimes and tabularData lengths must be equal');
}
final sensorContents = <SensorData>[];
for (final (idx, data) in tabularData.indexed) {
final s = data.toStruct();
final sensorMetadata = SensorMetadata();
if (dataRequestTimes != null) {
sensorMetadata.timeRequested = Timestamp.fromDateTime(dataRequestTimes[idx].$1);
sensorMetadata.timeReceived = Timestamp.fromDateTime(dataRequestTimes[idx].$2);
}
final sensorData = SensorData()
..struct = s
..metadata = sensorMetadata;
sensorContents.add(sensorData);
}

final metadata = UploadMetadata()
..partId = partId
..componentType = componentType ?? ''
..componentName = componentName ?? ''
..methodName = methodName ?? ''
..type = DataType.DATA_TYPE_TABULAR_SENSOR
..tags.addAll(tags);
if (methodParameters != null) metadata.methodParameters.addAll(methodParameters);

final request = DataCaptureUploadRequest()
..metadata = metadata
..sensorContents.addAll(sensorContents);
final response = await _dataSyncClient.dataCaptureUpload(request);
return response.fileId;
}

/// Uploads the metadata and contents of streaming binary data
///
/// Returns the data's file ID.
Future<String> streamingDataCaptureUpload(List<int> bytes, String partId, String fileExtension,
{String? componentType,
String? componentName,
String? methodName,
Map<String, Any>? methodParameters,
(DateTime, DateTime)? dataRequestTimes,
Iterable<String> tags = const []}) async {
final uploadMetadata = UploadMetadata()
..partId = partId
..componentType = componentType ?? ''
..componentName = componentName ?? ''
..methodName = methodName ?? ''
..type = DataType.DATA_TYPE_BINARY_SENSOR
..tags.addAll(tags);
if (methodParameters != null) uploadMetadata.methodParameters.addAll(methodParameters);
if (fileExtension.isEmpty) {
uploadMetadata.fileExtension = '';
} else if (fileExtension[0] == '.') {
uploadMetadata.fileExtension = fileExtension;
} else {
uploadMetadata.fileExtension = '.$fileExtension';
}

final sensorMetadata = SensorMetadata();
if (dataRequestTimes != null) {
sensorMetadata.timeRequested = Timestamp.fromDateTime(dataRequestTimes.$1);
sensorMetadata.timeReceived = Timestamp.fromDateTime(dataRequestTimes.$2);
}

final metadata = DataCaptureUploadMetadata()
..sensorMetadata = sensorMetadata
..uploadMetadata = uploadMetadata;

final metadataStream = Stream.value(StreamingDataCaptureUploadRequest()..metadata = metadata);
final bytesStream = Stream.value(StreamingDataCaptureUploadRequest()..data = bytes);
final requestStream = StreamGroup.merge([metadataStream, bytesStream]);
final response = await _dataSyncClient.streamingDataCaptureUpload(requestStream);
return response.fileId;
}
}

extension FilterUtils on Filter {
Expand Down
46 changes: 46 additions & 0 deletions test/unit_test/app/data_client_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import '../mocks/service_clients_mocks.mocks.dart';

class FakeDataSyncServiceClient extends Fake implements DataSyncServiceClient {
UploadMetadata? metadata;
UploadMetadata? dataCaptureMetadata;

@override
ResponseFuture<FileUploadResponse> fileUpload(Stream<FileUploadRequest> request, {CallOptions? options}) {
Expand All @@ -28,6 +29,23 @@ class FakeDataSyncServiceClient extends Fake implements DataSyncServiceClient {
return FileUploadResponse()..fileId = metadata?.fileName ?? 'some file id';
}));
}

@override
ResponseFuture<StreamingDataCaptureUploadResponse> streamingDataCaptureUpload(Stream<StreamingDataCaptureUploadRequest> request,
{CallOptions? options}) {
final metadataRequest = request.first.then((value) => dataCaptureMetadata = value.metadata.uploadMetadata);
return MockResponseFuture.future(Future.microtask(() async {
await metadataRequest;
return StreamingDataCaptureUploadResponse()..fileId = metadata?.componentName ?? 'fileId';
}));
}

@override
ResponseFuture<DataCaptureUploadResponse> dataCaptureUpload(DataCaptureUploadRequest request, {CallOptions? options}) {
return MockResponseFuture.future(Future.microtask(() async {
return DataCaptureUploadResponse()..fileId = 'fileId';
}));
}
}

void main() {
Expand Down Expand Up @@ -299,6 +317,34 @@ void main() {
await dataClient.uploadFile('/dev/null', 'partId', fileName: 'otherName');
expect(syncServiceClient.metadata?.fileName, equals('otherName'));
});

test('binaryDataCaptureUpload', () async {
final response = await dataClient
.binaryDataCaptureUpload([1], 'partId', 'fileExt', componentType: 'type', componentName: 'name', methodName: 'name');
expect(response, equals('fileId'));
});

test('tabularDataCaptureUpload', () async {
final map = {'foo': 'bar', 'baz': false};
final response =
await dataClient.tabularDataCaptureUpload([map], 'partId', componentType: 'type', componentName: 'name', methodName: 'name');
expect(response, equals('fileId'));
});

test('streamingDataCaptureUpload', () async {
final expected = UploadMetadata()
..partId = 'partId'
..type = DataType.DATA_TYPE_BINARY_SENSOR
..fileExtension = '.txt'
..methodName = ''
..componentType = ''
..componentName = '';
await dataClient.streamingDataCaptureUpload([1, 2, 3], 'partId', '.txt');
expect(syncServiceClient.dataCaptureMetadata, expected);

await dataClient.streamingDataCaptureUpload([1, 2, 3], 'partId', '.txt', componentName: 'myCoolArm');
expect(syncServiceClient.dataCaptureMetadata?.componentName, equals('myCoolArm'));
});
});
});
group('Filter Utils Tests', () {
Expand Down

0 comments on commit 250ab6a

Please sign in to comment.