Skip to content

Commit

Permalink
Datasync (#140)
Browse files Browse the repository at this point in the history
  • Loading branch information
njooma authored Oct 31, 2023
1 parent 5a70f1c commit 6f22cd6
Show file tree
Hide file tree
Showing 6 changed files with 364 additions and 87 deletions.
97 changes: 89 additions & 8 deletions lib/src/app/data.dart
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
import 'dart:io';
import 'dart:math';

import 'package:async/async.dart';
import 'package:collection/collection.dart';
import 'package:fixnum/fixnum.dart';
import 'package:viam_sdk/src/gen/google/protobuf/timestamp.pb.dart';
import 'package:viam_sdk/src/gen/google/protobuf/any.pb.dart';

import '../gen/app/data/v1/data.pbgrpc.dart';
import '../gen/app/datasync/v1/data_sync.pbgrpc.dart' hide CaptureInterval;
import '../gen/google/protobuf/timestamp.pb.dart';
import '../media/image.dart';

/// gRPC client for the [DataClient]. Used for retrieving stored data from app.viam.com.
///
/// All calls must be authenticated.
class DataClient {
final DataServiceClient _client;
final DataServiceClient _dataClient;
final DataSyncServiceClient _dataSyncClient;

DataClient(this._client);
DataClient(this._dataClient, this._dataSyncClient);

DataRequest _makeDataRequest(Filter? filter, int? limit, String? last, Order? sortOrder) {
final dataRequest = DataRequest();
Expand All @@ -37,7 +44,7 @@ class DataClient {
final request = TabularDataByFilterRequest()
..dataRequest = dataRequest
..countOnly = true;
return await _client.tabularDataByFilter(request);
return await _dataClient.tabularDataByFilter(request);
}

final finalResponse = TabularDataByFilterResponse();
Expand All @@ -49,7 +56,7 @@ class DataClient {
..dataRequest = dataRequest
..countOnly = false;

final response = await _client.tabularDataByFilter(request);
final response = await _dataClient.tabularDataByFilter(request);

if (response.count == 0) {
break;
Expand All @@ -73,7 +80,7 @@ class DataClient {
final request = BinaryDataByFilterRequest()
..dataRequest = dataRequest
..countOnly = true;
return await _client.binaryDataByFilter(request);
return await _dataClient.binaryDataByFilter(request);
}

final finalResponse = BinaryDataByFilterResponse();
Expand All @@ -85,7 +92,7 @@ class DataClient {
..dataRequest = dataRequest
..countOnly = false;

final response = await _client.binaryDataByFilter(request);
final response = await _dataClient.binaryDataByFilter(request);

if (response.count == 0) {
break;
Expand All @@ -102,9 +109,83 @@ class DataClient {
/// Retrieve binary data by IDs
Future<List<BinaryData>> binaryDataByIds(List<BinaryID> binaryIds) async {
final request = BinaryDataByIDsRequest()..binaryIds.addAll(binaryIds);
final response = await _client.binaryDataByIDs(request);
final response = await _dataClient.binaryDataByIDs(request);
return response.data;
}

/// Upload an image to Viam's Data Manager
///
/// If no name is provided, the current timestamp will be used as the filename.
Future<String> uploadImage(ViamImage image, String partId,
{String? fileName,
String? componentType,
String? componentName,
String? methodName,
Map<String, Any>? methodParameters,
Iterable<String> tags = const []}) async {
final metadata = UploadMetadata()
..partId = partId
..type = DataType.DATA_TYPE_FILE
..fileName = fileName ?? DateTime.now().toIso8601String()
..fileExtension = '.${image.mimeType.type}'
..tags.addAll(tags);
if (componentType != null) metadata.componentType = componentType;
if (componentName != null) metadata.componentName = componentName;
if (methodName != null) metadata.methodName = methodName;
if (methodParameters != null) metadata.methodParameters.addAll(methodParameters);
final metadataRequest = FileUploadRequest()..metadata = metadata;

// Make requests that are at most 2MB large (max gRPC request size is 4MB)
final dataRequests = image.raw.slices(2 * 1024 * 1024).map((e) => FileUploadRequest()..fileContents = (FileData()..data = e));

final requestStream = Stream.fromIterable([metadataRequest, ...dataRequests]);
final response = await _dataSyncClient.fileUpload(requestStream);
return response.fileId;
}

/// Upload a file from its path to Viam's Data Manager
///
/// The file name can be overridden by providing the [fileName] parameter.
Future<String> uploadFile(String path, String partId,
{String? fileName,
String? componentType,
String? componentName,
String? methodName,
Map<String, Any>? methodParameters,
Iterable<String> tags = const []}) async {
final fileNameAndExt = path.split(Platform.pathSeparator).last;
String fName, ext;
if (fileNameAndExt.contains('.')) {
fName = (fileNameAndExt.split('.')..removeLast()).join('.');
ext = '.${fileNameAndExt.split('.').last}';
} else {
fName = fileNameAndExt;
ext = '';
}
final metadata = UploadMetadata()
..partId = partId
..type = DataType.DATA_TYPE_FILE
..fileName = fileName ?? fName
..fileExtension = ext
..tags.addAll(tags);
if (componentType != null) metadata.componentType = componentType;
if (componentName != null) metadata.componentName = componentName;
if (methodName != null) metadata.methodName = methodName;
if (methodParameters != null) metadata.methodParameters.addAll(methodParameters);
final metadataStream = Stream.value(FileUploadRequest()..metadata = metadata);

final file = File(path);
final reader = ChunkedStreamReader(file.openRead());
try {
final fileDataStream =
reader.readStream(file.lengthSync()).map((event) => FileUploadRequest()..fileContents = (FileData()..data = event));
final requestStream = StreamGroup.merge([metadataStream, fileDataStream]);
final response = await _dataSyncClient.fileUpload(requestStream);
return response.fileId;
} finally {
await reader.cancel();
}
}
}

extension FilterUtils on Filter {
Expand Down
4 changes: 4 additions & 0 deletions lib/src/media/image.dart
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class MimeType {
/// If the MimeType is not supported, then this [name] is the string of the unsupported MimeType.
String get name => _name;

/// The underlying type, e.g. 'jpeg', 'png', 'pcd'.
/// If the MimeType is not supported, then this [type] is the string 'unsupported'.
String get type => _type;

const MimeType._(this._type, this._name);

/// Viam's custom RGBA encoding (image/vnd.viam.rgba)
Expand Down
5 changes: 3 additions & 2 deletions lib/src/viam_sdk_impl.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'package:grpc/grpc_connection_interface.dart';
import 'package:viam_sdk/protos/app/data_sync.dart';

import './app/app.dart';
import './app/data.dart';
Expand All @@ -15,12 +16,12 @@ class ViamImpl implements Viam {

ViamImpl._withChannel(this._clientChannelBase) {
_appClient = AppClient(AppServiceClient(_clientChannelBase));
_dataClient = DataClient(DataServiceClient(_clientChannelBase));
_dataClient = DataClient(DataServiceClient(_clientChannelBase), DataSyncServiceClient(_clientChannelBase));
}

ViamImpl.withAccessToken(String accessToken) : _clientChannelBase = AuthenticatedChannel('app.viam.com', 443, accessToken, false) {
_appClient = AppClient(AppServiceClient(_clientChannelBase));
_dataClient = DataClient(DataServiceClient(_clientChannelBase));
_dataClient = DataClient(DataServiceClient(_clientChannelBase), DataSyncServiceClient(_clientChannelBase));
}

static Future<ViamImpl> withApiKey(String apiKeyId, String apiKey) async {
Expand Down
1 change: 1 addition & 0 deletions pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies:
collection: ^1.17.1
intl: ^0.18.1
bonsoir: ^2.2.0+1 # do not upgrade beyond 2.2.0+1 until this issue is resolved: https://github.com/Skyost/Bonsoir/issues/57
async: ^2.11.0

dev_dependencies:
flutter_test:
Expand Down
Loading

0 comments on commit 6f22cd6

Please sign in to comment.