From 5f0374b804e83c69923c708ab4130fe903f06799 Mon Sep 17 00:00:00 2001 From: gkc Date: Tue, 13 Feb 2024 22:29:35 +0530 Subject: [PATCH 1/9] feat: Added `multi` parameter to `SocketConnector.serverToSocket` - whether to create new connections on the "B" side every time there is a new "A" side connection to the bound server port. Also added `onConnect` parameter, so that callers can be informed when every new connection is made, and can thus take whatever action they require. build: updated version and CHANGELOG for v2.1.0 --- CHANGELOG.md | 7 +++ lib/src/socket_connector.dart | 62 +++++++++++++--------- pubspec.yaml | 4 +- test/socket_connector_test.dart | 91 ++++++++++++++++++++++++++++++--- 4 files changed, 130 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5377d35..a892feb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +## 2.1.0 +- Added `multi` parameter to `SocketConnector.serverToSocket` - whether to + create new connections on the "B" side every time there is a new "A" side + connection to the bound server port. Also added `onConnect` parameter, + so that callers can be informed when every new connection is made, and + can thus take whatever action they require. + ## 2.0.1 - Removed an unnecessary dependency diff --git a/lib/src/socket_connector.dart b/lib/src/socket_connector.dart index fab09bb..73af81e 100644 --- a/lib/src/socket_connector.dart +++ b/lib/src/socket_connector.dart @@ -106,11 +106,12 @@ class SocketConnector { } } catch (e) { thisSide.authenticated = false; - _log('Error while authenticating side ${thisSide.name} : $e'); + _log('Error while authenticating side ${thisSide.name} : $e', + force: true); } } if (!thisSide.authenticated) { - _log('Authentication failed on side ${thisSide.name}'); + _log('Authentication failed on side ${thisSide.name}', force: true); _destroySide(thisSide); return; } @@ -149,7 +150,7 @@ class SocketConnector { _log('stream.onDone on side ${side.name}'); _destroySide(side); }, onError: (error) { - _log('stream.onError on side ${side.name}: $error'); + _log('stream.onError on side ${side.name}: $error', force: true); _destroySide(side); }); } @@ -214,8 +215,8 @@ class SocketConnector { pendingB.clear(); } - void _log(String s) { - if (verbose) { + void _log(String s, {bool force = false}) { + if (verbose || force) { logger.writeln('${DateTime.now()} | SocketConnector | $s'); } } @@ -377,19 +378,21 @@ class SocketConnector { /// /// - If [portA] is not provided then a port is chosen by the OS. /// - [addressA] defaults to [InternetAddress.anyIPv4] - static Future serverToSocket({ - /// Defaults to [InternetAddress.anyIPv4] - InternetAddress? addressA, - int portA = 0, - required InternetAddress addressB, - required int portB, - DataTransformer? transformAtoB, - DataTransformer? transformBtoA, - bool verbose = false, - bool logTraffic = false, - Duration timeout = SocketConnector.defaultTimeout, - IOSink? logger, - }) async { + static Future serverToSocket( + { + /// Defaults to [InternetAddress.anyIPv4] + InternetAddress? addressA, + int portA = 0, + required InternetAddress addressB, + required int portB, + DataTransformer? transformAtoB, + DataTransformer? transformBtoA, + bool verbose = false, + bool logTraffic = false, + Duration timeout = SocketConnector.defaultTimeout, + IOSink? logger, + bool multi = false, + Function(Socket sideA, Socket sideB)? onConnect}) async { IOSink logSink = logger ?? stderr; addressA ??= InternetAddress.anyIPv4; @@ -400,18 +403,27 @@ class SocketConnector { logger: logSink, ); + int connections = 0; // bind to a local port for side 'A' connector._serverSocketA = await ServerSocket.bind(addressA, portA); // listen on the local port and connect the inbound socket - connector._serverSocketA?.listen((socket) { - Side sideA = Side(socket, true, transformer: transformAtoB); + connector._serverSocketA?.listen((sideASocket) async { + if (!multi) { + unawaited(connector._serverSocketA?.close()); + } + Side sideA = Side(sideASocket, true, transformer: transformAtoB); unawaited(connector.handleSingleConnection(sideA)); - }); - // connect to the side 'B' address and port - Socket sideBSocket = await Socket.connect(addressB, portB); - Side sideB = Side(sideBSocket, false, transformer: transformBtoA); - unawaited(connector.handleSingleConnection(sideB)); + if (verbose) { + logSink.writeln('Making connection ${++connections} to the "B" side'); + } + // connect to the side 'B' address and port + Socket sideBSocket = await Socket.connect(addressB, portB); + Side sideB = Side(sideBSocket, false, transformer: transformBtoA); + unawaited(connector.handleSingleConnection(sideB)); + + onConnect?.call(sideASocket, sideBSocket); + }); return (connector); } diff --git a/pubspec.yaml b/pubspec.yaml index 473c0d7..26ff2a7 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,7 +1,7 @@ name: socket_connector -description: Package allows you to join two TCP clients or two servers this package includes all the tools you need to connect and optionally print the traffic. +description: Package for joining sockets together to create socket relays. -version: 2.0.1 +version: 2.1.0 repository: https://github.com/cconstab/socket_connector environment: diff --git a/test/socket_connector_test.dart b/test/socket_connector_test.dart index 1258d65..ffefdaa 100644 --- a/test/socket_connector_test.dart +++ b/test/socket_connector_test.dart @@ -240,7 +240,7 @@ void main() { await connector.done.timeout(Duration.zero); }); - test('Test serverToSocket', () async { + test('Test serverToSocket single', () async { // Bind to a port that SocketConnector.serverToSocket can connect to ServerSocket testExternalServer = await ServerSocket.bind('127.0.0.1', 0); @@ -265,15 +265,14 @@ void main() { }); }); - await readyB.future; - expect(connector.connections.isEmpty, true); - Socket socketA = await Socket.connect( 'localhost', connector.sideAPort!, ); // Wait for SocketConnector to handle the events await (Future.delayed(Duration(milliseconds: 10))); + await readyB.future; + expect(connector.connections.isEmpty, false); socketA.listen((List data) { @@ -296,7 +295,87 @@ void main() { expect(connector.closed, true); await connector.done.timeout(Duration.zero); }); + + test('Test serverToSocket multi', () async { + // Bind to a port that SocketConnector.serverToSocket can connect to + ServerSocket testExternalServer = await ServerSocket.bind('127.0.0.1', 0); + + int serverConnections = 0; + SocketConnector connector = await SocketConnector.serverToSocket( + addressB: testExternalServer.address, + portB: testExternalServer.port, + verbose: true, + timeout: Duration(milliseconds: 100), + multi: true, + onConnect: (Socket sideA, Socket sideB) { + serverConnections++; + print('SocketConnector.serverToSocket onConnect called back'); + }); + expect(connector.connections.isEmpty, true); + + List rcvdA = []; + List rcvdB = []; + List bSockets = []; + + Socket? currentSocketB; + testExternalServer.listen((socket) { + currentSocketB = socket; + bSockets.add(socket); + int which = bSockets.length; + socket.listen((List data) { + var msg = '$which: ${String.fromCharCodes(data)}'; + print('socket B ultimate destination received $msg'); + rcvdB.add(msg); + }); + }); + + expect(connector.connections.isEmpty, true); + + int howMany = 5; + List aSockets = []; + for (int i = 0; i < howMany; i++) { + Socket socketA = await Socket.connect( + 'localhost', + connector.sideAPort!, + ); + aSockets.add(socketA); + // Wait for SocketConnector to handle the events + await (Future.delayed(Duration(milliseconds: 10))); + expect(connector.connections.isEmpty, false); + + socketA.listen((List data) { + var msg = '${aSockets.length}: ${String.fromCharCodes(data)}'; + print('socket A ultimate client received $msg'); + rcvdA.add(msg); + }); + + // Wait for the sockets to send and receive data + await Future.delayed(Duration(milliseconds: 10)); + + socketA.write('hello world from side A'); + expect(currentSocketB != null, true); + currentSocketB?.write('hello world from side B'); + // Wait for the sockets to send and receive data + await Future.delayed(Duration(milliseconds: 10)); + + expect(rcvdA.last, "${aSockets.length}: hello world from side B"); + expect(rcvdB.last, "${bSockets.length}: hello world from side A"); + expect(rcvdA.length, i + 1); + expect(rcvdB.length, i + 1); + } + + expect(serverConnections, howMany); + + for (final s in aSockets) { + s.destroy(); + } + await (Future.delayed(Duration(milliseconds: 10))); + expect(connector.closed, true); + + await connector.done.timeout(Duration.zero); + }); }); + group('Authenticator tests', () { Future<(bool, Stream?)> goAuthVerifier(Socket socket) async { Completer<(bool, Stream?)> completer = Completer(); @@ -702,15 +781,13 @@ void main() { }); }); - await readyB.future; - expect(connector.connections.isEmpty, true); - Socket socketA = await Socket.connect( 'localhost', connector.sideAPort!, ); // Wait for SocketConnector to handle the events await (Future.delayed(Duration(milliseconds: 10))); + await readyB.future; expect(connector.connections.isEmpty, false); socketA.listen((List data) { From 9a6f67edd3a5e6aa251e4e23a997353c0afdc5df Mon Sep 17 00:00:00 2001 From: gkc Date: Tue, 13 Feb 2024 23:06:48 +0530 Subject: [PATCH 2/9] docs: improved `serverToSocket` method doc --- lib/src/socket_connector.dart | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/src/socket_connector.dart b/lib/src/socket_connector.dart index 73af81e..3c69372 100644 --- a/lib/src/socket_connector.dart +++ b/lib/src/socket_connector.dart @@ -375,9 +375,12 @@ class SocketConnector { /// - Binds to [portA] on [addressA] /// - Listens for a socket connection on [portA] port and joins it to /// the 'B' side - /// /// - If [portA] is not provided then a port is chosen by the OS. /// - [addressA] defaults to [InternetAddress.anyIPv4] + /// - [multi] flag controls whether or not to allow multiple connections + /// to the bound server port [portA] + /// - [onConnect] is called when [portA] has got a new connection and a + /// corresponding outbound socket has been created to [addressB]:[portB] static Future serverToSocket( { /// Defaults to [InternetAddress.anyIPv4] From 3deb74a44d8b4b8693d4c5d0141131c8a55e8919 Mon Sep 17 00:00:00 2001 From: gkc Date: Wed, 14 Feb 2024 14:42:15 +0530 Subject: [PATCH 3/9] feat: added optional `connector` parameter to socketToSocket --- lib/src/socket_connector.dart | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/src/socket_connector.dart b/lib/src/socket_connector.dart index 3c69372..d8eb1f1 100644 --- a/lib/src/socket_connector.dart +++ b/lib/src/socket_connector.dart @@ -332,6 +332,7 @@ class SocketConnector { /// - Creates socket to [portB] on [addressB] /// - Relays data between the sockets static Future socketToSocket({ + SocketConnector? connector, required InternetAddress addressA, required int portA, required InternetAddress addressB, @@ -344,7 +345,7 @@ class SocketConnector { IOSink? logger, }) async { IOSink logSink = logger ?? stderr; - SocketConnector connector = SocketConnector( + connector ??= SocketConnector( verbose: verbose, logTraffic: logTraffic, timeout: timeout, From fd2a3d9e7a97eb7504d75c97836a6507508ddb4f Mon Sep 17 00:00:00 2001 From: gkc Date: Wed, 14 Feb 2024 17:45:17 +0530 Subject: [PATCH 4/9] fix: trying to resolve some weirdness --- lib/src/socket_connector.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/src/socket_connector.dart b/lib/src/socket_connector.dart index d8eb1f1..3c3585b 100644 --- a/lib/src/socket_connector.dart +++ b/lib/src/socket_connector.dart @@ -426,7 +426,7 @@ class SocketConnector { Side sideB = Side(sideBSocket, false, transformer: transformBtoA); unawaited(connector.handleSingleConnection(sideB)); - onConnect?.call(sideASocket, sideBSocket); + unawaited(onConnect?.call(sideASocket, sideBSocket)); }); return (connector); From 0f2c25fa668bb4a1e1df4d4a16f66d722d9e76b1 Mon Sep 17 00:00:00 2001 From: gkc Date: Wed, 14 Feb 2024 17:52:29 +0530 Subject: [PATCH 5/9] fix: trying to resolve some weirdness --- lib/src/socket_connector.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/src/socket_connector.dart b/lib/src/socket_connector.dart index 3c3585b..d8eb1f1 100644 --- a/lib/src/socket_connector.dart +++ b/lib/src/socket_connector.dart @@ -426,7 +426,7 @@ class SocketConnector { Side sideB = Side(sideBSocket, false, transformer: transformBtoA); unawaited(connector.handleSingleConnection(sideB)); - unawaited(onConnect?.call(sideASocket, sideBSocket)); + onConnect?.call(sideASocket, sideBSocket); }); return (connector); From 4263504edcdb7f26bbc26b1942094f8f49c017f4 Mon Sep 17 00:00:00 2001 From: gkc Date: Wed, 14 Feb 2024 18:01:37 +0530 Subject: [PATCH 6/9] fix: trying to resolve some weirdness --- lib/src/socket_connector.dart | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/src/socket_connector.dart b/lib/src/socket_connector.dart index d8eb1f1..0ea115b 100644 --- a/lib/src/socket_connector.dart +++ b/lib/src/socket_connector.dart @@ -426,7 +426,9 @@ class SocketConnector { Side sideB = Side(sideBSocket, false, transformer: transformBtoA); unawaited(connector.handleSingleConnection(sideB)); - onConnect?.call(sideASocket, sideBSocket); + Future.delayed(Duration(milliseconds: 0), () { + onConnect?.call(sideASocket, sideBSocket); + }); }); return (connector); From 70925e023999186166a3c1358e7a45d33ad7cae0 Mon Sep 17 00:00:00 2001 From: gkc Date: Wed, 14 Feb 2024 18:03:29 +0530 Subject: [PATCH 7/9] fix: trying to resolve some weirdness --- lib/src/socket_connector.dart | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/src/socket_connector.dart b/lib/src/socket_connector.dart index 0ea115b..d8eb1f1 100644 --- a/lib/src/socket_connector.dart +++ b/lib/src/socket_connector.dart @@ -426,9 +426,7 @@ class SocketConnector { Side sideB = Side(sideBSocket, false, transformer: transformBtoA); unawaited(connector.handleSingleConnection(sideB)); - Future.delayed(Duration(milliseconds: 0), () { - onConnect?.call(sideASocket, sideBSocket); - }); + onConnect?.call(sideASocket, sideBSocket); }); return (connector); From 458086671c01cd57423b4c88b298f0e4ec4d2993 Mon Sep 17 00:00:00 2001 From: gkc Date: Thu, 15 Feb 2024 00:21:44 +0530 Subject: [PATCH 8/9] feat: add grace period so that SocketConnector doesn't close until both (a) initial timeout has expired and (b) number of established connections is zero or has dropped to zero --- lib/src/socket_connector.dart | 8 +++++-- test/socket_connector_test.dart | 39 ++++++++++++++++++++------------- 2 files changed, 30 insertions(+), 17 deletions(-) diff --git a/lib/src/socket_connector.dart b/lib/src/socket_connector.dart index d8eb1f1..927d3f8 100644 --- a/lib/src/socket_connector.dart +++ b/lib/src/socket_connector.dart @@ -21,6 +21,8 @@ import 'package:socket_connector/src/types.dart'; class SocketConnector { static const defaultTimeout = Duration(seconds: 30); + bool gracePeriodPassed = false; + SocketConnector({ this.verbose = false, this.logTraffic = false, @@ -29,6 +31,7 @@ class SocketConnector { }) { this.logger = logger ?? stderr; Timer(timeout, () { + gracePeriodPassed = true; if (connections.isEmpty) { close(); } @@ -182,8 +185,9 @@ class SocketConnector { if (connectionToRemove != null) { connections.remove(connectionToRemove); _log(chalk.brightBlue('Removed connection')); - if (connections.isEmpty) { - _log(chalk.brightBlue('No established connections remain - ' + if (connections.isEmpty && gracePeriodPassed) { + _log(chalk.brightBlue('No established connections remain' + ' and grace period has passed - ' ' will close connector')); close(); } diff --git a/test/socket_connector_test.dart b/test/socket_connector_test.dart index ffefdaa..c401ee2 100644 --- a/test/socket_connector_test.dart +++ b/test/socket_connector_test.dart @@ -67,7 +67,8 @@ void main() { }); test('Test ServerToServer', () async { - Duration timeout = Duration(milliseconds: 200); + int timeoutMs = 200; + Duration timeout = Duration(milliseconds: timeoutMs); SocketConnector connector = await SocketConnector.serverToServer( portA: 0, portB: 0, @@ -118,12 +119,13 @@ void main() { socketB.destroy(); // Wait for SocketConnector to handle the events - await (Future.delayed(Duration(milliseconds: 10))); + await (Future.delayed(Duration(milliseconds: timeoutMs))); expect(connector.closed, true); await connector.done.timeout(Duration.zero); }); test('Test socketToServer', () async { + int timeoutMs = 100; // Bind to a port that SocketConnector.socketToServer can connect to ServerSocket testExternalServer = await ServerSocket.bind('127.0.0.1', 0); @@ -131,7 +133,7 @@ void main() { addressA: testExternalServer.address, portA: testExternalServer.port, verbose: false, - timeout: Duration(milliseconds: 100), + timeout: Duration(milliseconds: timeoutMs), ); expect(connector.connections.isEmpty, true); @@ -176,12 +178,13 @@ void main() { socketB.destroy(); // Wait for SocketConnector to handle the events - await (Future.delayed(Duration(milliseconds: 10))); + await (Future.delayed(Duration(milliseconds: timeoutMs))); expect(connector.closed, true); await connector.done.timeout(Duration.zero); }); test('Test socketToSocket', () async { + int timeoutMs = 200; // Bind two ports that SocketConnector.socketToSocket can connect to ServerSocket testExternalServerA = await ServerSocket.bind('127.0.0.1', 0); @@ -194,6 +197,7 @@ void main() { addressB: testExternalServerB.address, portB: testExternalServerB.port, verbose: false, + timeout: Duration(milliseconds: timeoutMs), ); String rcvdA = ''; @@ -235,7 +239,7 @@ void main() { socketA.destroy(); // Wait for SocketConnector to handle the events - await (Future.delayed(Duration(milliseconds: 10))); + await (Future.delayed(Duration(milliseconds: timeoutMs))); expect(connector.closed, true); await connector.done.timeout(Duration.zero); }); @@ -244,11 +248,12 @@ void main() { // Bind to a port that SocketConnector.serverToSocket can connect to ServerSocket testExternalServer = await ServerSocket.bind('127.0.0.1', 0); + int timeoutMs = 100; SocketConnector connector = await SocketConnector.serverToSocket( addressB: testExternalServer.address, portB: testExternalServer.port, verbose: false, - timeout: Duration(milliseconds: 100), + timeout: Duration(milliseconds: timeoutMs), ); expect(connector.connections.isEmpty, true); @@ -291,7 +296,7 @@ void main() { socketA.destroy(); // Wait for SocketConnector to handle the events - await (Future.delayed(Duration(milliseconds: 10))); + await (Future.delayed(Duration(milliseconds: timeoutMs))); expect(connector.closed, true); await connector.done.timeout(Duration.zero); }); @@ -465,7 +470,7 @@ void main() { socketB.destroy(); // Wait for SocketConnector to handle the events - await (Future.delayed(Duration(milliseconds: 10))); + await (Future.delayed(timeout)); expect(connector.closed, true); await connector.done.timeout(Duration.zero); }); @@ -552,7 +557,7 @@ void main() { socketB.destroy(); // Wait for SocketConnector to handle the events - await (Future.delayed(Duration(milliseconds: 10))); + await (Future.delayed(timeout)); expect(connector.closed, true); await connector.done.timeout(Duration.zero); }); @@ -633,7 +638,7 @@ void main() { expect(connector.closed, false); authedB[2].destroy(); - await (Future.delayed(Duration(milliseconds: 10))); + await (Future.delayed(timeout)); expect(connector.closed, true); await connector.done.timeout(Duration.zero); }); @@ -643,11 +648,12 @@ void main() { // Bind to a port that SocketConnector.socketToServer can connect to ServerSocket testExternalServer = await ServerSocket.bind('127.0.0.1', 0); + var timeout = Duration(milliseconds: 100); SocketConnector connector = await SocketConnector.socketToServer( addressA: testExternalServer.address, portA: testExternalServer.port, transformAtoB: reverser, - timeout: Duration(milliseconds: 100), + timeout: timeout, ); expect(connector.connections.isEmpty, true); @@ -690,12 +696,13 @@ void main() { socketB.destroy(); // Wait for SocketConnector to handle the events - await (Future.delayed(Duration(milliseconds: 10))); + await (Future.delayed(timeout)); expect(connector.closed, true); await connector.done.timeout(Duration.zero); }); test('Test socketToSocket with two prefixing transformers', () async { + int timeoutMs = 100; // Bind two ports that SocketConnector.socketToSocket can connect to ServerSocket testExternalServerA = await ServerSocket.bind('127.0.0.1', 0); @@ -709,6 +716,7 @@ void main() { addressB: testExternalServerB.address, portB: testExternalServerB.port, transformBtoA: bToA, + timeout: Duration(milliseconds: timeoutMs), ); String rcvdA = ''; @@ -749,7 +757,7 @@ void main() { socketA.destroy(); // Wait for SocketConnector to handle the events - await (Future.delayed(Duration(milliseconds: 10))); + await (Future.delayed(Duration(milliseconds: timeoutMs))); expect(connector.closed, true); await connector.done.timeout(Duration.zero); }); @@ -759,12 +767,13 @@ void main() { // Bind to a port that SocketConnector.serverToSocket can connect to ServerSocket testExternalServer = await ServerSocket.bind('127.0.0.1', 0); + var timeout = Duration(milliseconds: 100); SocketConnector connector = await SocketConnector.serverToSocket( addressB: testExternalServer.address, portB: testExternalServer.port, transformAtoB: reverser, transformBtoA: reverser, - timeout: Duration(milliseconds: 100), + timeout: timeout, ); expect(connector.connections.isEmpty, true); @@ -804,7 +813,7 @@ void main() { socketA.destroy(); // Wait for SocketConnector to handle the events - await (Future.delayed(Duration(milliseconds: 10))); + await (Future.delayed(timeout)); expect(connector.closed, true); await connector.done.timeout(Duration.zero); }); From 8574a223b41575d5177531caff8fe5c5a6bb8844 Mon Sep 17 00:00:00 2001 From: gkc Date: Thu, 15 Feb 2024 08:19:07 +0530 Subject: [PATCH 9/9] docs: updated CHANGELOG re grace period --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a892feb..e053796 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ connection to the bound server port. Also added `onConnect` parameter, so that callers can be informed when every new connection is made, and can thus take whatever action they require. +- feat: Added grace period so that SocketConnector doesn't close until both + (a) initial timeout has expired and (b) number of established connections + is zero or has dropped to zero ## 2.0.1 - Removed an unnecessary dependency