Skip to content

Commit

Permalink
Merge pull request #3829 from canonical/fix-http2-connection-disconne…
Browse files Browse the repository at this point in the history
…cted

Fix `Connection is being forcefully terminated` error in GUI
  • Loading branch information
andrei-toterman authored Dec 20, 2024
2 parents 1c0f995 + 7f7d7fd commit b240d73
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 127 deletions.
221 changes: 97 additions & 124 deletions src/client/gui/lib/grpc_client.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import 'dart:async';
import 'dart:io';

import 'package:async/async.dart';
import 'package:fpdart/fpdart.dart';
import 'package:grpc/grpc.dart';
import 'package:protobuf/protobuf.dart' hide RpcClient;
Expand Down Expand Up @@ -91,189 +91,148 @@ class GrpcClient {
}
}

Future<Rep?> doRpc<Req extends RpcMessage, Rep extends RpcMessage>(
ResponseStream<Rep> Function(Stream<Req> request) action,
Req request, {
bool checkUpdates = false,
bool log = true,
}) {
if (log) logger.i('Sent ${request.repr}');
Stream<Rep> replyStream = action(Stream.value(request));
if (checkUpdates) replyStream = replyStream.doOnData(checkForUpdate);
if (log) replyStream = replyStream.doOnEach(logGrpc(request));
return replyStream.lastOrNull;
}

Future<StartReply?> start(Iterable<String> names) {
final request = StartRequest(
instanceNames: InstanceNames(instanceName: names),
return doRpc(
_client.start,
StartRequest(instanceNames: InstanceNames(instanceName: names)),
checkUpdates: true,
);
logger.i('Sent ${request.repr}');
return _client
.start(Stream.value(request))
.doOnData(checkForUpdate)
.doOnEach(logGrpc(request))
.firstOrNull;
}

Future<StopReply?> stop(Iterable<String> names) {
final request = StopRequest(
instanceNames: InstanceNames(instanceName: names),
return doRpc(
_client.stop,
StopRequest(instanceNames: InstanceNames(instanceName: names)),
);
logger.i('Sent ${request.repr}');
return _client
.stop(Stream.value(request))
.doOnEach(logGrpc(request))
.firstOrNull;
}

Future<SuspendReply?> suspend(Iterable<String> names) {
final request = SuspendRequest(
instanceNames: InstanceNames(instanceName: names),
return doRpc(
_client.suspend,
SuspendRequest(instanceNames: InstanceNames(instanceName: names)),
);
logger.i('Sent ${request.repr}');
return _client
.suspend(Stream.value(request))
.doOnEach(logGrpc(request))
.firstOrNull;
}

Future<RestartReply?> restart(Iterable<String> names) {
final request = RestartRequest(
instanceNames: InstanceNames(instanceName: names),
return doRpc(
_client.restart,
RestartRequest(instanceNames: InstanceNames(instanceName: names)),
checkUpdates: true,
);
logger.i('Sent ${request.repr}');
return _client
.restart(Stream.value(request))
.doOnData(checkForUpdate)
.doOnEach(logGrpc(request))
.firstOrNull;
}

Future<DeleteReply?> delete(Iterable<String> names) {
final request = DeleteRequest(
instanceSnapshotPairs: names.map(
(name) => InstanceSnapshotPair(instanceName: name),
return doRpc(
_client.delet,
DeleteRequest(
instanceSnapshotPairs: names.map(
(name) => InstanceSnapshotPair(instanceName: name),
),
),
);
logger.i('Sent ${request.repr}');
return _client
.delet(Stream.value(request))
.doOnEach(logGrpc(request))
.firstOrNull;
}

Future<RecoverReply?> recover(Iterable<String> names) {
final request = RecoverRequest(
instanceNames: InstanceNames(instanceName: names),
return doRpc(
_client.recover,
RecoverRequest(instanceNames: InstanceNames(instanceName: names)),
);
logger.i('Sent ${request.repr}');
return _client
.recover(Stream.value(request))
.doOnEach(logGrpc(request))
.firstOrNull;
}

Future<DeleteReply?> purge(Iterable<String> names) {
final request = DeleteRequest(
instanceSnapshotPairs: names.map(
(name) => InstanceSnapshotPair(instanceName: name),
return doRpc(
_client.delet,
DeleteRequest(
purge: true,
instanceSnapshotPairs: names.map(
(name) => InstanceSnapshotPair(instanceName: name),
),
),
purge: true,
);
logger.i('Sent ${request.repr}');
return _client
.delet(Stream.value(request))
.doOnEach(logGrpc(request))
.firstOrNull;
}

Future<List<VmInfo>> info([Iterable<String> names = const []]) {
final request = InfoRequest(
instanceSnapshotPairs: names.map(
(name) => InstanceSnapshotPair(instanceName: name),
return doRpc(
_client.info,
checkUpdates: true,
log: false,
InfoRequest(
instanceSnapshotPairs: names.map(
(name) => InstanceSnapshotPair(instanceName: name),
),
),
);
return _client
.info(Stream.value(request))
.doOnData(checkForUpdate)
.last
.then((r) => r.details.toList());
).then((r) => r!.details.toList());
}

Future<MountReply?> mount(MountRequest request) {
logger.i('Sent ${request.repr}');
return _client
.mount(Stream.value(request))
.doOnEach(logGrpc(request))
.firstOrNull;
return doRpc(_client.mount, request);
}

Future<void> umount(String name, [String? path]) {
final request = UmountRequest(
targetPaths: [TargetPathInfo(instanceName: name, targetPath: path)],
return doRpc(
_client.umount,
UmountRequest(
targetPaths: [TargetPathInfo(instanceName: name, targetPath: path)],
),
);
logger.i('Sent ${request.repr}');
return _client
.umount(Stream.value(request))
.doOnEach(logGrpc(request))
.firstOrNull;
}

Future<FindReply> find({bool images = true, bool blueprints = true}) {
final request = FindRequest(
showImages: images,
showBlueprints: blueprints,
);
logger.i('Sent ${request.repr}');
return _client.find(Stream.value(request)).doOnEach(logGrpc(request)).last;
return doRpc(
_client.find,
FindRequest(
showImages: images,
showBlueprints: blueprints,
),
).then((r) => r!);
}

Future<List<NetInterface>> networks() {
final request = NetworksRequest();
logger.i('Sent ${request.repr}');
return _client
.networks(Stream.value(request))
.doOnData(checkForUpdate)
.doOnEach(logGrpc(request))
.last
.then((r) => r.interfaces);
return doRpc(
_client.networks,
NetworksRequest(),
checkUpdates: true,
).then((r) => r!.interfaces);
}

Future<String> version() {
final request = VersionRequest();
logger.i('Sent ${request.repr}');
return _client
.version(Stream.value(request))
.doOnData(checkForUpdate)
.doOnEach(logGrpc(request))
.last
.then((reply) => reply.version);
return doRpc(
_client.version,
VersionRequest(),
checkUpdates: true,
).then((r) => r!.version);
}

Future<String> get(String key) {
final request = GetRequest(key: key);
logger.i('Sent ${request.repr}');
return _client
.get(Stream.value(request))
.doOnEach(logGrpc(request))
.last
.then((reply) => reply.value);
return doRpc(_client.get, GetRequest(key: key)).then((r) => r!.value);
}

Future<void> set(String key, String value) {
final request = SetRequest(key: key, val: value);
logger.i('Sent ${request.repr}');
return _client
.set(Stream.value(request))
.doOnEach(logGrpc(request))
.firstOrNull;
return doRpc(_client.set, SetRequest(key: key, val: value));
}

Future<SSHInfo?> sshInfo(String name) {
final request = SSHInfoRequest(instanceName: [name]);
logger.i('Sent ${request.repr}');
return _client
.ssh_info(Stream.value(request))
.doOnEach(logGrpc(request))
.first
.then((reply) => reply.sshInfo[name]);
return doRpc(
_client.ssh_info,
SSHInfoRequest(instanceName: [name]),
).then((r) => r!.sshInfo[name]);
}

Future<DaemonInfoReply> daemonInfo() {
final request = DaemonInfoRequest();
logger.i('Sent ${request.repr}');
return _client
.daemon_info(Stream.value(request))
.doOnEach(logGrpc(request))
.last;
return doRpc(_client.daemon_info, DaemonInfoRequest()).then((r) => r!);
}
}

Expand All @@ -299,3 +258,17 @@ class CustomChannelCredentials extends ChannelCredentials {
return ctx;
}
}

extension<T> on Stream<T> {
Future<T?> get lastOrNull {
final completer = Completer<T?>.sync();
T? result;
listen(
(event) => result = event,
onError: completer.completeError,
onDone: () => completer.complete(result),
cancelOnError: true,
);
return completer.future;
}
}
3 changes: 0 additions & 3 deletions src/client/gui/lib/providers.dart
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ final daemonAvailableProvider = Provider((ref) {
if (message.contains('failed to obtain exit status for remote process')) {
return true;
}
if (message.contains('Connection is being forcefully terminated')) {
return true;
}
}
return false;
});
Expand Down

0 comments on commit b240d73

Please sign in to comment.