Skip to content

Commit

Permalink
Merge pull request #1871 from atsign-foundation/gkc/fix-async-socket-…
Browse files Browse the repository at this point in the history
…exceptions
  • Loading branch information
gkc authored Mar 30, 2024
2 parents 3cc839f + c6458f6 commit 153eb53
Show file tree
Hide file tree
Showing 17 changed files with 77 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ abstract class BaseSocketConnection<T extends Socket> extends AtConnection {
T get underlying => _socket;

@override
void write(String data) {
Future<void> write(String data) async {
if (isInValid()) {
throw ConnectionInvalidException('Connection is invalid');
}
try {
underlying.write(data);
await underlying.flush();
metaData.lastAccessed = DateTime.now().toUtc();
} on Exception catch (e) {
metaData.isStale = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class DummyInboundConnection implements InboundConnection {

String? lastWrittenData;
@override
void write(String data) {
Future<void> write(String data) async {
lastWrittenData = data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,17 @@ class InboundConnectionImpl<T extends Socket> extends BaseSocketConnection
maxRequestsPerTimeFrame = AtSecondaryConfig.maxEnrollRequestsAllowed;
timeFrameInMillis = AtSecondaryConfig.timeFrameInMills;
requestTimestampQueue = Queue();

logger.info(logger.getAtConnectionLogMessage(
metaData, 'New connection ('
'this side: ${underlying.address}:${underlying.port}'
' remote side: ${underlying.remoteAddress}:${underlying.remotePort}'
')'));

socket.done.onError((error, stackTrace) {
logger.info('socket.done.onError called with $error. Calling this.close()');
this.close();
});
}

/// Returns true if the underlying socket is not null and socket's remote address and port match.
Expand Down Expand Up @@ -221,11 +232,12 @@ class InboundConnectionImpl<T extends Socket> extends BaseSocketConnection
}

try {
var address = underlying.remoteAddress;
var port = underlying.remotePort;
logger.info(logger.getAtConnectionLogMessage(
metaData, 'destroying socket ('
'this side: ${underlying.address}:${underlying.port}'
' remote side: ${underlying.remoteAddress}:${underlying.remotePort}'
')'));
underlying.destroy();
logger.finer(logger.getAtConnectionLogMessage(
metaData, '$address:$port Disconnected'));
} catch (_) {
// Ignore exception on a connection close
metaData.isStale = true;
Expand All @@ -235,8 +247,8 @@ class InboundConnectionImpl<T extends Socket> extends BaseSocketConnection
}

@override
void write(String data) {
super.write(data);
Future<void> write(String data) async {
await super.write(data);
if (metaData is InboundConnectionMetadata) {
logger.info(logger.getAtConnectionLogMessage(
metaData, 'SENT: ${BaseSocketConnection.truncateForLogging(data)}'));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,12 @@ class InboundMessageListener {

/// Closes the [InboundConnection]
Future<void> _finishedHandler() async {
logger.info('_finishedHandler called - closing connection');
await _closeConnection();
}

Future<void> _closeConnection() async {
if (!connection.isInValid()) {
await connection.close();
}
await connection.close();
// Removes the connection from the InboundConnectionPool.
InboundConnectionPool.getInstance().remove(connection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class OutboundClient {
}
try {
//1. create from request
outboundConnection!.write(AtRequestFormatter.createFromRequest(
await outboundConnection!.write(AtRequestFormatter.createFromRequest(
AtSecondaryServerImpl.getInstance().currentAtSign));

//2. Receive proof
Expand All @@ -200,7 +200,7 @@ class OutboundClient {
}

//4. Create pol request
outboundConnection!.write(AtRequestFormatter.createPolRequest());
await outboundConnection!.write(AtRequestFormatter.createPolRequest());

// 5. wait for handshake result - @<current_atsign>@
var handShakeResult = await messageListener.read();
Expand Down Expand Up @@ -242,7 +242,7 @@ class OutboundClient {
}
var lookUpRequest = AtRequestFormatter.createLookUpRequest(key);
try {
outboundConnection!.write(lookUpRequest);
await outboundConnection!.write(lookUpRequest);
} on AtIOException catch (e) {
await outboundConnection!.close();
throw LookupException(
Expand Down Expand Up @@ -271,7 +271,7 @@ class OutboundClient {
scanRequest = 'scan $regex\n';
}
try {
outboundConnection!.write(scanRequest);
await outboundConnection!.write(scanRequest);
} on AtIOException catch (e) {
await outboundConnection!.close();
throw LookupException(
Expand Down Expand Up @@ -326,7 +326,7 @@ class OutboundClient {
}
try {
var notificationRequest = 'notify:$notifyCommandBody\n';
outboundConnection!.write(notificationRequest);
await outboundConnection!.write(notificationRequest);
} on AtIOException catch (e) {
await outboundConnection!.close();
throw LookupException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ class OutboundConnectionImpl<T extends Socket>
..toAtSign = toAtSign
..created = DateTime.now().toUtc()
..isCreated = true;

logger.info(logger.getAtConnectionLogMessage(
metaData, 'New connection ('
'this side: ${underlying.address}:${underlying.port}'
' remote side: ${underlying.remoteAddress}:${underlying.remotePort}'
')'));

socket.done.onError((error, stackTrace) {
logger.info('socket.done.onError called with $error. Calling this.close()');
this.close();
});
}

int _getIdleTimeMillis() {
Expand Down Expand Up @@ -47,10 +58,12 @@ class OutboundConnectionImpl<T extends Socket>

try {
var socket = underlying;
var address = socket.remoteAddress;
var port = socket.remotePort;
logger.info(logger.getAtConnectionLogMessage(
metaData, 'destroying socket ('
'this side: ${underlying.address}:${underlying.port}'
' remote side: ${underlying.remoteAddress}:${underlying.remotePort}'
')'));
socket.destroy();
logger.finer('$address:$port Disconnected');
} catch (_) {
// Ignore exception on a connection close
metaData.isStale = true;
Expand All @@ -60,8 +73,8 @@ class OutboundConnectionImpl<T extends Socket>
}

@override
void write(String data) {
super.write(data);
Future<void> write(String data) async {
await super.write(data);
logger.info(logger.getAtConnectionLogMessage(
metaData, 'SENT: ${BaseSocketConnection.truncateForLogging(data)}'));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class OutboundMessageListener {

/// Closes the [OutboundClient]
void _finishedHandler() async {
logger.info('_finishedHandler called - closing connection');
_closeOutboundClient();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,8 @@ class AtSecondaryServerImpl implements AtSecondaryServer {
Future<void> initDynamicConfigListeners() async {
//only works if testingMode is set to true
if (AtSecondaryConfig.testingMode) {
logger.warning(
'UNSAFE: testingMode in config.yaml is set to true. Please set to false if not required.');
logger.warning('testingMode in config.yaml is set to true.'
' Please set to false if not required.');

//subscriber for inbound_max_limit change
logger.finest('Subscribing to dynamic changes made to inbound_max_limit');
Expand Down Expand Up @@ -442,7 +442,7 @@ class AtSecondaryServerImpl implements AtSecondaryServer {
/// Throws [Exception] for any other exceptions.
/// @param - ServerSocket
void _listen(var serverSocket) {
logger.finer('serverSocket _listen : ${serverSocket.runtimeType}');
logger.info('serverSocket _listen : ${serverSocket.runtimeType}');
serverSocket.listen(((clientSocket) {
var sessionID = '_${Uuid().v4()}';
InboundConnection? connection;
Expand All @@ -459,8 +459,8 @@ class AtSecondaryServerImpl implements AtSecondaryServer {
.handle(e, atConnection: connection, clientSocket: clientSocket);
}
}), onError: (error) {
// We've got no action to take here, let's just log a warning
logger.warning("ServerSocket.listen called onError with '$error'");
// We've got no action to take here, let's just log a message
logger.info("ServerSocket.listen called onError with '$error'");
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ class MonitorVerbHandler extends AbstractVerbHandler {
(notification.notification!.contains(RegExp(regex)) ||
(notification.fromAtSign != null &&
notification.fromAtSign!.contains(RegExp(regex))))) {
atConnection
.write('notification: ${jsonEncode(notification.toJson())}\n');
await atConnection.write('notification:'
' ${jsonEncode(notification.toJson())}\n');
}
} on FormatException {
logger.severe('Invalid regular expression : $regex');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ abstract class BaseResponseHandler implements ResponseHandler {
} else {
responseMessage = getResponseMessage(result, prompt)!;
}
connection.write(responseMessage);
await connection.write(responseMessage);
} on Exception catch (e, st) {
logger.severe('exception in writing response to socket:${e.toString()}');
await GlobalExceptionHandler.getInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class StreamVerbHandler extends AbstractVerbHandler {
logger.severe('sender connection is null for stream id:$streamId');
throw UnAuthenticatedException('Invalid stream id');
}
StreamManager.senderSocketMap[streamId]!
await StreamManager.senderSocketMap[streamId]!
.write('stream:done $streamId\n');
_cleanUp(streamId);
break;
Expand Down
2 changes: 1 addition & 1 deletion packages/at_secondary_server/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ dependencies:
at_utils: 3.0.16
at_chops: 2.0.0
at_lookup: 3.0.46
at_server_spec: 4.0.1
at_server_spec: 5.0.0
at_persistence_spec: 2.0.14
at_persistence_secondary_server: 3.0.61
intl: ^0.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ class MockInboundConnectionImpl extends InboundConnectionImpl {
}

@override
void write(String data) {
Future<void> write(String data) async {
metaData.lastAccessed = DateTime.now().toUtc();
}
}
6 changes: 3 additions & 3 deletions packages/at_secondary_server/test/resource_manager_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ void main() async {
while (itr.moveNext()) {
atNotificationList.add(itr.current);
}
// Expecting that the atNotificationList[0] returned from the dequeue() is same as the notification we passed i.e., atNotificationid1.id
// Expecting that the atNotificationList[0] returned from the dequeue() is same as the notification id we passed
expect(atNotificationList[0].id, '121');
// Expecting that the atNotificationList[1] returned from the dequeue() is same as the notification we passed i.e., atNotificationid2.id
// Expecting that the atNotificationList[1] returned from the dequeue() is same as the notification id we passed
expect(atNotificationList[1].id, '122');
// Expecting that the atNotificationList[2] returned from the dequeue() is same as the notification we passed i.e., atNotificationid3.id
// Expecting that the atNotificationList[2] returned from the dequeue() is same as the notification id we passed
expect(atNotificationList[2].id, '123');
}, timeout: Timeout(Duration(seconds: 10)));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ void main() {
.thenAnswer((_) => 4);

when(() => mockInboundConnection1.write(
any(that: startsWith('notification:')))).thenAnswer((invocation) {
any(that: startsWith('notification:')))).thenAnswer((invocation) async {
inboundConn1Written = true;
});

when(() => mockInboundConnection2
.write(any(that: startsWith('notification:'))))
.thenAnswer((Invocation invocation) {
.thenAnswer((Invocation invocation) async {
inboundConn2Written = true;
});

Expand Down
14 changes: 13 additions & 1 deletion packages/at_secondary_server/test/test_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,19 @@ class MockOutboundConnection extends Mock implements OutboundSocketConnection {}

class MockSecureSocket extends Mock implements SecureSocket {}

class MockSocket extends Mock implements Socket {}
class MockSocket extends Mock implements Socket {
Completer completer = Completer();
@override
Future get done => completer.future;
@override
InternetAddress get remoteAddress => InternetAddress('127.0.0.1');
@override
int get remotePort => 9999;
@override
InternetAddress get address => InternetAddress('127.0.0.1');
@override
int get port => 5555;
}

class MockStreamSubscription<T> extends Mock implements StreamSubscription<T> {}

Expand Down
2 changes: 1 addition & 1 deletion tests/at_functional_test/test/enroll_verb_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ void main() {
3. On sending a cram request, server returns "data:success"
4. On sending monitor request, server returns enrollment request
*/
}, count: 4));
}, count: 5));
monitorSocket.write('from:${firstAtSign.toString().trim()}\n');
});

Expand Down

0 comments on commit 153eb53

Please sign in to comment.