Skip to content

Commit

Permalink
RSDK-7045 Implement StreamTicks (#189)
Browse files Browse the repository at this point in the history
* service

* stream ticks

* override

* comment

* PR feedback

* add extra
  • Loading branch information
oliviamiller authored Apr 10, 2024
1 parent ebb5d2b commit 8037f7b
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 4 deletions.
19 changes: 18 additions & 1 deletion lib/src/components/board/board.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import 'dart:collection';

import 'package:fixnum/fixnum.dart';

import '../../gen/common/v1/common.pb.dart' as common;
import '../../gen/component/board/v1/board.pbenum.dart';
import '../../gen/component/board/v1/board.pbgrpc.dart';
import '../../resource/base.dart';
import '../../robot/client.dart';

Expand All @@ -26,6 +28,15 @@ class BoardStatus {
}
}

/// Tick of a digital interrupt
class Tick {
String pinName;
bool high;
Int64 time;

Tick({required this.pinName, required this.high, required this.time});
}

/// Board represents a physical general purpose compute board that contains various
/// components such as analog readers, and digital interrupts.
abstract class Board extends Resource {
Expand Down Expand Up @@ -58,6 +69,12 @@ abstract class Board extends Resource {
/// Return the current value of the interrupt which is based on the type of Interrupt.
Future<int> digitalInterruptValue(String digitalInterruptName, {Map<String, dynamic>? extra});

// Stream digital interrupts ticks.
Stream<Tick> streamTicks(List<String> interrupts, {Map<String, dynamic>? extra});

// addCallbacks adds a listener for the digital interrupts.
Future<void> addCallbacks(List<String> interrupts, Queue<Tick> tickQueue, {Map<String, dynamic>? extra});

/// Set the board to the indicated power mode.
Future<void> setPowerMode(PowerMode powerMode, int seconds, int nanos, {Map<String, dynamic>? extra});

Expand Down
20 changes: 20 additions & 0 deletions lib/src/components/board/client.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import 'dart:async';
import 'dart:collection';

import 'package:fixnum/fixnum.dart';
import 'package:grpc/grpc_connection_interface.dart';

Expand Down Expand Up @@ -120,6 +123,23 @@ class BoardClient extends Board implements ResourceRPCClient {
return response.value.toInt();
}

@override
Future<void> addCallbacks(List<String> interrupts, Queue<Tick> tickQueue, {Map<String, dynamic>? extra}) async {
// addCallbacks not implemented on client side since it is a helper for StreamTicks.
throw UnimplementedError();
}

@override
Stream<Tick> streamTicks(List<String> interrupts, {Map<String, dynamic>? extra}) {
final response = client.streamTicks(StreamTicksRequest()
..name = name
..pinNames.addAll(interrupts)
..extra = extra?.toStruct() ?? Struct());

final stream = response.map((resp) => Tick(pinName: resp.pinName, high: resp.high, time: resp.time));
return stream.asBroadcastStream(onCancel: (_) => response.cancel());
}

@override
Future<void> setPowerMode(PowerMode powerMode, int seconds, int nanos, {Map<String, dynamic>? extra}) async {
final duration = grpc_duration.Duration()
Expand Down
23 changes: 20 additions & 3 deletions lib/src/components/board/service.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import 'dart:collection';

import 'package:fixnum/fixnum.dart';
import 'package:grpc/grpc.dart';

Expand Down Expand Up @@ -112,8 +114,23 @@ class BoardService extends BoardServiceBase {
}

@override
Stream<StreamTicksResponse> streamTicks(ServiceCall call, StreamTicksRequest request) {
// TODO: implement streamTicks
throw UnimplementedError();
Stream<StreamTicksResponse> streamTicks(ServiceCall call, StreamTicksRequest request) async* {
final board = _fromManager(request.name);

final ticks = Queue<Tick>();
await board.addCallbacks(request.pinNames, ticks);

try {
while (true) {
await Future.delayed(const Duration(microseconds: 1));
if (ticks.isNotEmpty) {
final tick = ticks.first;
ticks.removeFirst();
yield StreamTicksResponse(pinName: tick.pinName, high: tick.high, time: tick.time);
}
}
} catch (error) {
rethrow;
}
}
}
71 changes: 71 additions & 0 deletions test/unit_test/components/board_test.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import 'dart:collection';

import 'package:fixnum/fixnum.dart';
import 'package:flutter_test/flutter_test.dart';
import 'package:grpc/grpc.dart';
Expand All @@ -16,6 +18,7 @@ class FakeBoard extends Board {
final Map<String, int> analogMap = {'pin': 0};
final BoardStatus boardStatus = const BoardStatus({'1': 0}, {'1': 0});
PowerMode powerMode = PowerMode.POWER_MODE_NORMAL;
final Map<String, Queue<Tick>> tickCallbackMap = {};
Map<String, dynamic>? extra;

@override
Expand Down Expand Up @@ -94,6 +97,24 @@ class FakeBoard extends Board {
this.extra = extra;
analogMap[pin] = value;
}

@override
// Stream digital interrupts ticks.
Stream<Tick> streamTicks(List<String> interrupts, {Map<String, dynamic>? extra}) {
throw UnimplementedError();
}

@override
Future<void> addCallbacks(List<String> interrupts, Queue<Tick> tickQueue, {Map<String, dynamic>? extra}) async {
for (final i in interrupts) {
tickCallbackMap[i] = tickQueue;
}
}

Future<void> tick(Tick tick) async {
final queue = tickCallbackMap[tick.pinName];
queue?.add(tick);
}
}

void main() {
Expand All @@ -115,6 +136,13 @@ void main() {
expect(await board.digitalInterruptValue('1'), expected);
});

test('addCallbacks', () async {
final tickQueue = Queue<Tick>();
final interrupts = ['1'];
await board.addCallbacks(interrupts, tickQueue);
expect(board.tickCallbackMap['1'], tickQueue);
});

test('gpio', () async {
const expected = false;
expect(await board.gpio('pin'), expected);
Expand Down Expand Up @@ -223,6 +251,29 @@ void main() {
expect(response.value.toInt(), expected);
});

test('streamTicks', () async {
final client = BoardServiceClient(channel);

final request = StreamTicksRequest()
..name = name
..pinNames.add('1');

final stream = client.streamTicks(request);

// Give time for server to start streaming.
await Future.delayed(const Duration(milliseconds: 100));

final tick1 = Tick(pinName: '1', high: true, time: Int64(1000));
await board.tick(tick1);
await for (var resp in stream) {
expect(resp.pinName, '1');
expect(resp.high, true);
expect(resp.time, Int64(1000));
break;
}
await stream.cancel();
});

test('gpio', () async {
final client = BoardServiceClient(channel);
const expected = false;
Expand Down Expand Up @@ -345,6 +396,26 @@ void main() {
const expected = 0;
expect(await client.digitalInterruptValue('1'), Int64(expected));
});
test('streamTicks', () async {
final client = BoardClient(name, channel);

final stream = client.streamTicks(['1']);

// Give time for server to start streaming.
await Future.delayed(const Duration(milliseconds: 100));

final testTick = Tick(pinName: '1', high: true, time: Int64(1000));
await board.tick(testTick);

final sub = stream.listen(null);

sub.onData((tick) async {
expect(tick.pinName, testTick.pinName);
expect(tick.high, testTick.high);
expect(tick.time, testTick.time);
await sub.cancel();
});
});

test('gpio', () async {
final client = BoardClient(name, channel);
Expand Down

0 comments on commit 8037f7b

Please sign in to comment.