Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream enhancements #268

Draft
wants to merge 5 commits into
base: trunk
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions at_client/lib/src/client/at_client_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import 'package:at_client/src/preference/at_client_preference.dart';
import 'package:at_client/src/service/encryption_service.dart';
import 'package:at_client/src/service/file_transfer_service.dart';
import 'package:at_client/src/service/notification_service.dart';
import 'package:at_client/src/service/stream_service.dart';
import 'package:at_client/src/stream/at_stream_notification.dart';
import 'package:at_client/src/stream/at_stream_response.dart';
import 'package:at_client/src/stream/file_transfer_object.dart';
Expand Down Expand Up @@ -863,10 +864,12 @@ class AtClientImpl implements AtClient {
}

@override
@deprecated
/// use [StreamService.createStream]
Future<AtStreamResponse> stream(String sharedWith, String filePath,
{String? namespace}) async {
var streamResponse = AtStreamResponse();
var streamId = Uuid().v4();
var streamResponse = AtStreamResponse(streamId);
var file = File(filePath);
var data = file.readAsBytesSync();
var fileName = basename(filePath);
Expand Down Expand Up @@ -901,6 +904,7 @@ class AtClientImpl implements AtClient {
return streamResponse;
}

/// [deprecated] Create a receiver stream using use [StreamService.createStream] and call [StreamReceiver.ack]
Future<void> sendStreamAck(
String streamId,
String fileName,
Expand All @@ -909,8 +913,6 @@ class AtClientImpl implements AtClient {
Function streamCompletionCallBack,
Function streamReceiveCallBack) async {
var handler = StreamNotificationHandler();
handler.remoteSecondary = getRemoteSecondary();
handler.localSecondary = getLocalSecondary();
handler.preference = _preference;
handler.encryptionService = _encryptionService;
var notification = AtStreamNotification()
Expand Down
2 changes: 2 additions & 0 deletions at_client/lib/src/client/at_client_spec.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import 'package:at_client/src/manager/sync_manager.dart';
import 'package:at_client/src/preference/at_client_preference.dart';
import 'package:at_client/src/service/encryption_service.dart';
import 'package:at_client/src/service/notification_service.dart';
import 'package:at_client/src/service/stream_service.dart';
import 'package:at_client/src/stream/at_stream_response.dart';
import 'package:at_client/src/stream/file_transfer_object.dart';
import 'package:at_commons/at_commons.dart';
Expand Down Expand Up @@ -403,6 +404,7 @@ abstract class AtClient {
{String? regex});

/// Streams the file in [filePath] to [sharedWith] atSign.
@deprecated /// use [StreamService.createStream]
Future<AtStreamResponse> stream(String sharedWith, String filePath,
{String namespace});

Expand Down
52 changes: 52 additions & 0 deletions at_client/lib/src/converters/byte_splitter.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import 'dart:convert';
import 'dart:typed_data';

class Splitter extends Converter<List<int>, List<List<int>>> {
Splitter(this._splitOnByte);

final int _splitOnByte;

@override
SplitterSink startChunkedConversion(Sink sink) {
return SplitterSink(sink, _splitOnByte);
}

@override
List<List<int>> convert(input) {
var out = ListSink();
var sink = SplitterSink(out, _splitOnByte);
sink.add(input);
sink.close();
return out.list;
}
}

class SplitterSink extends ByteConversionSinkBase {
SplitterSink(this._sink, this._splitOnByte);

final int _splitOnByte;
final Sink _sink;

@override
void add(List<int> chunk) {
assert(chunk is Uint8List);
for (var i = 0; i < chunk.length; i += _splitOnByte) {
_sink.add(chunk.sublist(i,
i + _splitOnByte > chunk.length ? chunk.length : i + _splitOnByte));
}
}

@override
void close() {
_sink.close();
}
}

class ListSink extends Sink<List<int>> {
final List<List<int>> list = <List<int>>[];
@override
void add(List<int> chunk) => list.add(chunk);

@override
void close() {}
}
101 changes: 101 additions & 0 deletions at_client/lib/src/converters/encryption/aes_converter.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import 'dart:convert';
import 'dart:typed_data';

import 'package:encrypt/encrypt.dart';

class AESEncrypter extends Converter<List<int>, List<int>> {
final String _encryptionKey;
const AESEncrypter(this._encryptionKey);

@override
List<int> convert(List<int> data) {
var aesKey = AES(Key.fromBase64(_encryptionKey), padding: null);

var initializationVector = IV.fromLength(16);
var aesEncrypter = Encrypter(aesKey);
var encryptedValue =
aesEncrypter.encryptBytes(data, iv: initializationVector);
return encryptedValue.bytes;
}

@override
AESEncryptionSink startChunkedConversion(sink) {
return AESEncryptionSink(_encryptionKey, sink);
}
}

class AESDecrypter extends Converter<List<int>, List<int>> {
final String _encryptionKey;
const AESDecrypter(this._encryptionKey);

@override
List<int> convert(List<int> data) {
var aesKey = AES(Key.fromBase64(_encryptionKey), padding: null);
var decrypter = Encrypter(aesKey);
var iv2 = IV.fromLength(16);
return decrypter.decryptBytes(Encrypted(data as Uint8List), iv: iv2);
}

@override
AESDecryptionSink startChunkedConversion(sink) {
return AESDecryptionSink(_encryptionKey, sink);
}
}

class AESCodec extends Codec<List<int>, List<int>> {
final _key;
const AESCodec(this._key);

@override
List<int> encode(List<int> data) {
return AESEncrypter(_key).convert(data);
}

@override
List<int> decode(List<int> data) {
return AESDecrypter(_key).convert(data);
}

@override
AESEncrypter get encoder => AESEncrypter(_key);
@override
AESDecrypter get decoder => AESDecrypter(_key);
}

class AESEncryptionSink extends ByteConversionSink {
final _converter;
final Sink<List<int>> _outSink;
AESEncryptionSink(key, this._outSink) : _converter = AESEncrypter(key);

@override
void add(List<int> data) {
_outSink.add(_converter.convert(data));
}

@override
void close() {
_outSink.close();
}

@override
void addSlice(List<int> chunk, int start, int end, bool isLast) {
add(chunk.sublist(start, end));
if (isLast) close();
}
}

class AESDecryptionSink extends ChunkedConversionSink<List<int>> {
final _converter;
final Sink<List<int>> _outSink;
AESDecryptionSink(key, this._outSink) : _converter = AESDecrypter(key);

@override
void add(List<int> data) {
_outSink.add(_converter.convert(data));
}

@override
void close() {
_outSink.close();
}
}
4 changes: 4 additions & 0 deletions at_client/lib/src/manager/at_client_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import 'package:at_client/src/listener/at_sign_change_listener.dart';
import 'package:at_client/src/listener/switch_at_sign_event.dart';
import 'package:at_client/src/service/notification_service.dart';
import 'package:at_client/src/service/notification_service_impl.dart';
import 'package:at_client/src/service/stream_service.dart';
import 'package:at_client/src/service/stream_service_impl.dart';
import 'package:at_client/src/service/sync_service.dart';
import 'package:at_client/src/service/sync_service_impl.dart';

Expand All @@ -24,6 +26,7 @@ class AtClientManager {
AtClient get atClient => _currentAtClient;
late SyncService syncService;
late NotificationService notificationService;
late StreamService streamService;
final _changeListeners = <AtSignChangeListener>[];

static final AtClientManager _singleton = AtClientManager._internal();
Expand All @@ -48,6 +51,7 @@ class AtClientManager {
notificationService =
await NotificationServiceImpl.create(_currentAtClient);
syncService = await SyncServiceImpl.create(_currentAtClient);
streamService = StreamServiceImpl.create(_currentAtClient);
_previousAtClient = _currentAtClient;
_notifyListeners(switchAtSignEvent);
return this;
Expand Down
35 changes: 35 additions & 0 deletions at_client/lib/src/response/stream_notification_parser.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import 'dart:convert';
import 'package:at_client/at_client.dart';
import 'package:at_client/src/stream/at_stream_notification.dart';

class StreamNotificationParser {
final streamNotificationKey = 'stream_id';

final namespace;

StreamNotificationParser(this.namespace);

AtStreamNotification? parseStreamNotification(AtNotification atNotification) {
var notificationKey = atNotification.key;
var fromAtSign = atNotification.from;
var atKey = notificationKey.split(':')[1];
atKey = atKey.replaceFirst(fromAtSign, '');
atKey = atKey.trim();
if (atKey == '$streamNotificationKey.$namespace') {
var valueObject = atNotification.value;
if (valueObject == null) {
return null;
}
var streamId = valueObject.split(':')[0];
var fileName = valueObject.split(':')[1];
var fileLength = int.parse(valueObject.split(':')[2]);
fileName = utf8.decode(base64.decode(fileName));
return AtStreamNotification()
..streamId = streamId
..fileName = fileName
..fileLength = fileLength
..senderAtSign = fromAtSign;
}
return null;
}
}
27 changes: 17 additions & 10 deletions at_client/lib/src/service/encryption_service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,18 @@ class EncryptionService {
}
}

//TODO remove code duplication - encrypt and encryptStream
Future<List<int>> encryptStream(List<int> value, String sharedWith) async {
var sharedKey = await getStreamEncryptionKey(sharedWith);
var encryptedValue = EncryptionUtil.encryptBytes(value, sharedKey);
return encryptedValue;
}

//TODO remove code duplication - encrypt and getStreamEncryptionKey
Future<String> getStreamEncryptionKey(String sharedWith) async {
var currentAtSignPublicKey =
await (localSecondary!.getEncryptionPublicKey(currentAtSign!));
await (localSecondary!.getEncryptionPublicKey(currentAtSign!));
var currentAtSignPrivateKey =
await localSecondary!.getEncryptionPrivateKey();
await localSecondary!.getEncryptionPrivateKey();
var sharedWithUser = sharedWith.replaceFirst('@', '');
// //1. Get/Generate AES key for sharedWith atsign
var llookupVerbBuilder = LLookupVerbBuilder()
Expand All @@ -206,20 +212,21 @@ class EncryptionService {
sharedKey = sharedKey.replaceFirst('data:', '');
sharedKey =
EncryptionUtil.decryptKey(sharedKey, currentAtSignPrivateKey!);
return sharedKey;
}
//2. Lookup public key of sharedWith atsign
var plookupBuilder = PLookupVerbBuilder()
..atKey = 'publickey'
..sharedBy = sharedWith;
var sharedWithPublicKey =
await remoteSecondary!.executeAndParse(plookupBuilder);
await remoteSecondary!.executeAndParse(plookupBuilder);
if (sharedWithPublicKey == 'null' || sharedWithPublicKey.isEmpty) {
throw KeyNotFoundException(
'shared key not found. data sharing is forbidden.');
}
//3. Encrypt shared key with public key of sharedWith atsign and store
var encryptedSharedKey =
EncryptionUtil.encryptKey(sharedKey, sharedWithPublicKey);
EncryptionUtil.encryptKey(sharedKey, sharedWithPublicKey);

var updateSharedKeyBuilder = UpdateVerbBuilder()
..sharedWith = sharedWith
Expand All @@ -233,18 +240,15 @@ class EncryptionService {
throw KeyNotFoundException('encryption public key not found');
}
var encryptedSharedKeyForCurrentAtSign =
EncryptionUtil.encryptKey(sharedKey, currentAtSignPublicKey);
EncryptionUtil.encryptKey(sharedKey, currentAtSignPublicKey);

var updateSharedKeyForCurrentAtSignBuilder = UpdateVerbBuilder()
..sharedBy = currentAtSign
..atKey = '$AT_ENCRYPTION_SHARED_KEY.$sharedWithUser'
..value = encryptedSharedKeyForCurrentAtSign;
await localSecondary!
.executeVerb(updateSharedKeyForCurrentAtSignBuilder, sync: true);

//5. Encrypt value using sharedKey
var encryptedValue = EncryptionUtil.encryptBytes(value, sharedKey);
return encryptedValue;
return sharedKey;
}

List<int> decryptStream(List<int> encryptedValue, String sharedKey) {
Expand Down Expand Up @@ -293,6 +297,7 @@ class EncryptionService {
return base64Encode(dataSignature);
}

/// This function was implemented to migrate old unencrypted keys. No longer applicable.
@deprecated
Future<void> encryptUnencryptedData() async {
var atClient = await (AtClientImpl.getClient(currentAtSign));
Expand Down Expand Up @@ -389,7 +394,9 @@ class EncryptionService {
..atKey = AT_ENCRYPTION_SHARED_KEY;
encryptedSharedKey =
await localSecondary!.executeVerb(localLookupSharedKeyBuilder);
print('encryptedSharedKey from local: $encryptedSharedKey');
if (encryptedSharedKey == null || encryptedSharedKey == 'data:null') {
print('getting encrypted shared key from remote');
var sharedKeyLookUpBuilder = LookupVerbBuilder()
..atKey = AT_ENCRYPTION_SHARED_KEY
..sharedBy = sharedBy
Expand Down
8 changes: 8 additions & 0 deletions at_client/lib/src/service/stream_service.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import 'package:at_client/src/stream/at_stream.dart';
abstract class StreamService {

/// Create a stream for a given [streamType]. If your app is sending a file through stream
/// then pass [StreamType.SEND]. If your app is receiving a file pass [StreamType.RECEIVE].
/// Optionally pass [streamId] if you want to create a stream for a known stream transfer.
AtStream createStream(StreamType streamType, {String? streamId});
}
Loading