Skip to content

Commit

Permalink
Avoid using late var, avoid sending malformed token in test (#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia authored Jul 28, 2022
1 parent eb31740 commit 81405fc
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 36 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## [0.9.1]

* Fix setting initial connection token
* Fix setting connection data
* Fix LateInitializationError if transport is not initialized yet
* Better string representation of `UnsubscribedEvent`

## [0.9.0]

**Breaking changes**
Expand Down
17 changes: 9 additions & 8 deletions example/console/example.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ void main() async {
final client = centrifuge.createClient(
url,
centrifuge.ClientConfig(
// Uncomment to use example token based on secret key `secret`.
// token:
// 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0c3VpdGVfand0In0.hPmHsVqvtY88PvK4EmJlcdwNuKFuy3BGaF7dMaKdPlw',
// Uncomment to use example token based on secret key `secret` for user `testsuite_jwt`.
token:
'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0c3VpdGVfand0In0.hPmHsVqvtY88PvK4EmJlcdwNuKFuy3BGaF7dMaKdPlw',
// getToken: (centrifuge.ConnectionTokenEvent event) {
// return Future.value('');
// return Future.value(
// 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0c3VpdGVfand0In0.hPmHsVqvtY88PvK4EmJlcdwNuKFuy3BGaF7dMaKdPlw');
// },
headers: <String, dynamic>{'X-Example-Header': 'example'},
),
Expand All @@ -45,10 +46,10 @@ void main() async {
final subscription = client.newSubscription(
channel,
centrifuge.SubscriptionConfig(
// getToken: (centrifuge.SubscriptionTokenEvent event) {
// return Future.value('');
// },
),
getToken: (centrifuge.SubscriptionTokenEvent event) {
return Future.value('');
},
),
);

final onSubscriptionEvent = (dynamic event) async {
Expand Down
55 changes: 30 additions & 25 deletions lib/src/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class ClientImpl implements Client {
final _subscriptions = <String, SubscriptionImpl>{};
final _serverSubs = <String, ServerSubscription>{};

late Transport _transport;
Transport? _transport;

final String _url;
ClientConfig _config;
Expand Down Expand Up @@ -195,7 +195,7 @@ class ClientImpl implements Client {
Future<void> disconnect() async {
_reconnectAttempts = 0;
_processDisconnect(code: disconnectedCodeDisconnectCalled, reason: 'disconnect called', reconnect: false);
await _transport.close();
await _transport?.close();
}

@override
Expand All @@ -217,7 +217,7 @@ class ClientImpl implements Client {
final request = protocol.PublishRequest()
..channel = channel
..data = data;
final result = await _transport.sendMessage(
final result = await _transport!.sendMessage(
request,
protocol.PublishResult(),
);
Expand All @@ -230,7 +230,7 @@ class ClientImpl implements Client {
final request = protocol.RPCRequest();
request.method = method;
request.data = data;
final result = await _transport.sendMessage(request, protocol.RPCResult());
final result = await _transport!.sendMessage(request, protocol.RPCResult());
return RPCResult.from(result);
}

Expand All @@ -247,7 +247,7 @@ class ClientImpl implements Client {
sp.epoch = since.epoch;
request.since = sp;
}
final result = await _transport.sendMessage(
final result = await _transport!.sendMessage(
request,
protocol.HistoryResult(),
);
Expand All @@ -258,7 +258,7 @@ class ClientImpl implements Client {
Future<PresenceResult> presence(String channel) async {
await ready().timeout(_config.timeout);
final request = protocol.PresenceRequest()..channel = channel;
final result = await _transport.sendMessage(
final result = await _transport!.sendMessage(
request,
protocol.PresenceResult(),
);
Expand All @@ -269,7 +269,7 @@ class ClientImpl implements Client {
Future<PresenceStatsResult> presenceStats(String channel) async {
await ready().timeout(_config.timeout);
final request = protocol.PresenceStatsRequest()..channel = channel;
final result = await _transport.sendMessage(
final result = await _transport!.sendMessage(
request,
protocol.PresenceStatsResult(),
);
Expand All @@ -280,7 +280,7 @@ class ClientImpl implements Client {
Future<void> send(List<int> data) async {
await ready().timeout(_config.timeout);
final request = protocol.Message()..data = data;
await _transport.sendAsyncMessage(request);
await _transport!.sendAsyncMessage(request);
}

@override
Expand Down Expand Up @@ -360,7 +360,10 @@ class ClientImpl implements Client {

void _failUnauthorized() {
_processDisconnect(code: disconnectedCodeUnauthorized, reason: 'unauthorized', reconnect: false);
_transport.close();
if (_transport == null) {
return;
}
_transport!.close();
}

void _scheduleReconnect() {
Expand Down Expand Up @@ -392,7 +395,9 @@ class ClientImpl implements Client {
} catch (ex) {
final event = ErrorEvent(RefreshError(ex));
_errorController.add(event);
await _transport.close();
if (_transport != null) {
await _transport!.close();
}
_scheduleReconnect();
return;
}
Expand All @@ -402,14 +407,14 @@ class ClientImpl implements Client {
url: _url, config: TransportConfig(headers: _config.headers, timeout: _config.timeout));

try {
await _transport.open(_onPush, onError: (dynamic error) {
await _transport!.open(_onPush, onError: (dynamic error) {
final event = ErrorEvent(TransportError(error));
_errorController.add(event);
if (state != State.connected) {
return;
}
_processDisconnect(code: connectingCodeTransportClosed, reason: "connection closed", reconnect: true);
_transport.close();
_transport!.close();
}, onDone: (code, reason, reconnect) {
if (state == State.disconnected) {
return;
Expand All @@ -419,7 +424,7 @@ class ClientImpl implements Client {
} catch (ex) {
final event = ErrorEvent(TransportError(ex));
_errorController.add(event);
await _transport.close();
await _transport!.close();
_scheduleReconnect();
return;
}
Expand All @@ -445,7 +450,7 @@ class ClientImpl implements Client {
}

try {
final result = await _transport.sendMessage(
final result = await _transport!.sendMessage(
request,
protocol.ConnectResult(),
);
Expand Down Expand Up @@ -507,11 +512,11 @@ class ClientImpl implements Client {
return;
} else if (!err.temporary) {
_processDisconnect(code: err.code, reason: err.message, reconnect: false);
_transport.close();
_transport!.close();
return;
} else {
_processDisconnect(code: err.code, reason: err.message, reconnect: false);
_transport.close();
_transport!.close();
return;
}
} else {
Expand All @@ -527,7 +532,7 @@ class ClientImpl implements Client {
return;
}
processDisconnect(code: connectingCodeNoPing, reason: 'no ping', reconnect: true);
await _transport.close();
await _transport!.close();
});
}

Expand Down Expand Up @@ -565,7 +570,7 @@ class ClientImpl implements Client {
}

try {
final result = await _transport.sendMessage(
final result = await _transport!.sendMessage(
request,
protocol.RefreshResult(),
);
Expand Down Expand Up @@ -595,7 +600,7 @@ class ClientImpl implements Client {
return;
}
_processDisconnect(code: err.code, reason: err.message, reconnect: false);
_transport.close();
_transport!.close();
return;
}
_refreshTimer = Timer(backoffDelay(0, Duration(seconds: 5), Duration(seconds: 10)), () {
Expand Down Expand Up @@ -673,7 +678,7 @@ class ClientImpl implements Client {
final code = disconnect.code;
final bool reconnect = code < 3500 || code >= 5000 || (code >= 4000 && code < 4500);
_processDisconnect(code: disconnect.code, reason: disconnect.reason, reconnect: reconnect);
_transport.close();
_transport!.close();
}

void _handleSubscribe(String channel, protocol.Subscribe subscribe) {
Expand Down Expand Up @@ -706,7 +711,7 @@ class ClientImpl implements Client {
_setPingTimer();

if (_sendPong) {
_transport.sendAsyncMessage(
_transport!.sendAsyncMessage(
protocol.Command(),
);
}
Expand Down Expand Up @@ -736,23 +741,23 @@ class ClientImpl implements Client {

@internal
Future<protocol.UnsubscribeResult> sendUnsubscribe(protocol.UnsubscribeRequest request) async {
return await _transport.sendMessage(
return await _transport!.sendMessage(
request,
protocol.UnsubscribeResult(),
);
}

@internal
Future<protocol.SubscribeResult> sendSubscribe(protocol.SubscribeRequest request) async {
return await _transport.sendMessage(
return await _transport!.sendMessage(
request,
protocol.SubscribeResult(),
);
}

@internal
Future<protocol.SubRefreshResult> sendSubRefresh(protocol.SubRefreshRequest request) async {
return await _transport.sendMessage(
return await _transport!.sendMessage(
request,
protocol.SubRefreshResult(),
);
Expand All @@ -764,7 +769,7 @@ class ClientImpl implements Client {
}

@internal
void closeTransport() async => await _transport.close();
void closeTransport() async => await _transport!.close();
}

final _random = new Random();
Expand Down
2 changes: 1 addition & 1 deletion lib/src/events.dart
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ class UnsubscribedEvent {

@override
String toString() {
return 'UnsubscribedEvent{}';
return 'UnsubscribedEvent{code $code, reason: $reason}';
}
}

Expand Down
4 changes: 2 additions & 2 deletions test/client_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ void main() {
test('connect emits error event when url invalid', () async {
final client = centrifuge.createClient(
"invalid",
centrifuge.ClientConfig(token: "test", data: utf8.encode('test connect data')),
centrifuge.ClientConfig(data: utf8.encode('test connect data')),
);
final errorFinish = client.error.first;
client.connect();
Expand All @@ -31,7 +31,7 @@ void main() {
test('can connect and disconnect', () async {
final client = centrifuge.createClient(
url,
centrifuge.ClientConfig(token: "test", data: utf8.encode('test connect data')),
centrifuge.ClientConfig(data: utf8.encode('test connect data')),
);
final connectFinish = client.connected.first;
client.connect();
Expand Down

0 comments on commit 81405fc

Please sign in to comment.