From 5768cc96df6a240b10fdc295cf15bf345f4f77c7 Mon Sep 17 00:00:00 2001 From: CodeDoctorDE Date: Tue, 1 Oct 2024 18:07:32 +0200 Subject: [PATCH] Change server to be used as library --- server/bin/quokka_server.dart | 92 +---------- server/lib/events.dart | 44 ++++++ server/lib/main.dart | 263 ++++++++++--------------------- server/lib/programs/kick.dart | 6 +- server/lib/programs/packs.dart | 2 +- server/lib/programs/players.dart | 2 +- server/lib/programs/save.dart | 2 +- server/lib/programs/say.dart | 2 +- server/lib/programs/stop.dart | 2 +- server/lib/quokka_server.dart | 12 ++ server/lib/server.dart | 201 +++++++++++++++++++++++ 11 files changed, 351 insertions(+), 277 deletions(-) create mode 100644 server/lib/events.dart create mode 100644 server/lib/quokka_server.dart create mode 100644 server/lib/server.dart diff --git a/server/bin/quokka_server.dart b/server/bin/quokka_server.dart index ff85c85..c2e6d6b 100644 --- a/server/bin/quokka_server.dart +++ b/server/bin/quokka_server.dart @@ -1,93 +1,5 @@ -import 'package:args/args.dart'; -import 'package:quokka_api/quokka_api.dart'; import 'package:quokka_server/main.dart'; -const String version = '0.0.1'; - -ArgParser buildParser() { - return ArgParser() - ..addFlag( - 'help', - abbr: 'h', - negatable: false, - help: 'Print this usage information.', - ) - ..addFlag( - 'verbose', - abbr: 'v', - negatable: false, - help: 'Show additional command output.', - ) - ..addFlag( - 'version', - abbr: 'V', - negatable: false, - help: 'Print the tool version.', - ) - ..addOption( - 'port', - abbr: 'p', - help: 'The port to run the server on. Defaults to $kDefaultPort.', - ) - ..addOption( - 'description', - abbr: 'd', - help: - 'A description of the server. Will be displayed in the server list.', - ) - ..addOption('autosave', - abbr: 'a', help: "Disable saving of the world automatically"); -} - -void printUsage(ArgParser argParser) { - print('Usage: server [arguments]'); - print(argParser.usage); -} - -const welcomeText = """ - ____ __ __ - / __ \\__ _____ / /__ / /_____ _ -/ /_/ / // / _ \\/ '_// '_/ _ `/ -\\___\\_\\_,_/\\___/_/\\_\\/_/\\_\\\\_,_/ -"""; -Future main(List arguments) async { - final ArgParser argParser = buildParser(); - try { - final ArgResults results = argParser.parse(arguments); - bool verbose = false, autosave = false; - - // Process the parsed arguments. - if (results.wasParsed('help')) { - printUsage(argParser); - return; - } - if (results.wasParsed('version')) { - print('server version: $version'); - return; - } - if (results.wasParsed('verbose')) { - verbose = true; - } - if (results.wasParsed('autosave')) { - autosave = true; - } - String description = ''; - if (results.wasParsed('description')) { - description = results['description']; - } - print(welcomeText); - final server = await QuokkaServer.load(); - await server.init( - port: int.tryParse(results['port'] ?? '') ?? kDefaultPort, - verbose: verbose, - autosave: autosave, - description: description, - ); - await server.run(); - } on FormatException catch (e) { - // Print usage information if an invalid argument was provided. - print(e.message); - print(''); - printUsage(argParser); - } +Future main(List arguments) { + return runServer(arguments); } diff --git a/server/lib/events.dart b/server/lib/events.dart new file mode 100644 index 0000000..3ac610a --- /dev/null +++ b/server/lib/events.dart @@ -0,0 +1,44 @@ +import 'dart:async'; + +import 'package:networker/networker.dart'; +import 'package:quokka_server/server.dart'; + +typedef ListenerResult = (S, Channel)?; + +typedef EventListener = FutureOr> Function( + S serverEvent, + Channel target, + T clientEvent, + Channel source, + QuokkaServer server, +); + +final class EventSystem { + final Map>> _listeners = {}; + + void addListener(T event, EventListener listener) { + _listeners.putIfAbsent(event, () => []).add(listener); + } + + void removeListener(T event, EventListener listener) { + _listeners[event]?.remove(listener); + } + + Future> dispatch( + S serverEvent, + Channel target, + T clientEvent, + Channel source, + QuokkaServer server, + ) async { + final listeners = _listeners[clientEvent]; + if (listeners == null) return null; + ListenerResult result = (serverEvent, target); + for (final listener in listeners) { + if (result == null) return null; + result = + await listener(result.$1, result.$2, clientEvent, source, server); + } + return result; + } +} diff --git a/server/lib/main.dart b/server/lib/main.dart index 9504112..a8d4835 100644 --- a/server/lib/main.dart +++ b/server/lib/main.dart @@ -1,192 +1,97 @@ import 'dart:async'; -import 'dart:io'; -import 'dart:isolate'; -import 'package:bloc/bloc.dart'; -import 'package:bloc_concurrency/bloc_concurrency.dart'; -import 'package:consoler/consoler.dart'; -import 'package:networker/networker.dart'; -import 'package:networker_socket/server.dart'; +import 'package:args/args.dart'; import 'package:quokka_api/quokka_api.dart'; -import 'package:quokka_server/asset.dart'; -import 'package:quokka_server/programs/packs.dart'; -import 'package:quokka_server/programs/players.dart'; -import 'package:quokka_server/programs/save.dart'; -import 'package:quokka_server/programs/say.dart'; -import 'package:quokka_server/programs/stop.dart'; - -Future _computeEvent(ServerWorldEvent event, WorldState state, - Map signature) { - return Isolate.run( - () => processServerEvent(event, state, signature: signature)); +import 'package:quokka_server/server.dart'; + +const String version = '0.0.1'; + +ArgParser buildParser() { + return ArgParser() + ..addFlag( + 'help', + abbr: 'h', + negatable: false, + help: 'Print this usage information.', + ) + ..addFlag( + 'verbose', + abbr: 'v', + negatable: false, + help: 'Show additional command output.', + ) + ..addFlag( + 'version', + abbr: 'V', + negatable: false, + help: 'Print the tool version.', + ) + ..addOption( + 'port', + abbr: 'p', + help: 'The port to run the server on. Defaults to $kDefaultPort.', + ) + ..addOption( + 'description', + abbr: 'd', + help: + 'A description of the server. Will be displayed in the server list.', + ) + ..addOption('autosave', + abbr: 'a', help: "Disable saving of the world automatically"); } -final class QuokkaServer extends Bloc { - final Consoler consoler; - final ServerAssetManager assetManager; - final String? worldFile; - bool _temp = false; - - NetworkerSocketServer? _server; - NetworkerPipe? _pipe; - - QuokkaServer._( - this.worldFile, this.consoler, QuokkaData data, this.assetManager) - : super(WorldState( - data: data, - table: data.getTableOrDefault(), - metadata: data.getMetadataOrDefault(), - info: data.getInfoOrDefault(), - )) { - on((event, emit) async { - final signature = assetManager.createSignature(); - final newState = await _computeEvent(event, state, signature); - if (newState == null) return; - emit(newState); - return save(); - }, transformer: sequential()); - } +void printUsage(ArgParser argParser) { + print('Usage: server [arguments]'); + print(argParser.usage); +} - static Future load({ - String? worldFile, - bool disableLoading = false, - }) async { - final assetManager = ServerAssetManager(); - final consoler = Consoler( - defaultProgramConfig: DefaultProgramConfiguration( - description: "Quokka server", - ), - ); - await _runStaticLogZone( - consoler, () => assetManager.init(console: consoler)); - worldFile ??= defaultWorldFile; - final file = File(worldFile); - QuokkaData? data; - if (!disableLoading && await file.exists()) { - final bytes = await file.readAsBytes(); - data = QuokkaData.fromData(bytes); +typedef ServerLoader = FutureOr Function(QuokkaServer server); + +const welcomeText = """ + ____ __ __ + / __ \\__ _____ / /__ / /_____ _ +/ /_/ / // / _ \\/ '_// '_/ _ `/ +\\___\\_\\_,_/\\___/_/\\_\\/_/\\_\\\\_,_/ +"""; +Future runServer(List arguments, [ServerLoader? onLoad]) async { + final ArgParser argParser = buildParser(); + try { + final ArgResults results = argParser.parse(arguments); + bool verbose = false, autosave = false; + + // Process the parsed arguments. + if (results.wasParsed('help')) { + printUsage(argParser); + return; } - data ??= QuokkaData.empty().setInfo(GameInfo( - packs: assetManager.packs.map((e) => e.key).toList(), - )); - return QuokkaServer._(worldFile, consoler, data, assetManager); - } - - void log(Object? message, {LogLevel? level}) => - consoler.print(message, level: level); - - static final String defaultWorldFile = 'world.qka'; - - Map get players => - Map.fromEntries((_server?.clientConnections ?? {}) - .map((e) => MapEntry(e, _server!.getConnectionInfo(e)!))); - - Future init({ - int port = kDefaultPort, - bool verbose = false, - bool autosave = false, - String description = '', - }) async { - if (verbose) { - consoler.minLogLevel = LogLevel.verbose; + if (results.wasParsed('version')) { + print('server version: $version'); + return; } - log("Starting server on port $port", level: LogLevel.info); - log('Verbose logging activated', level: LogLevel.verbose); - _temp = autosave; - final server = - _server = NetworkerSocketServer(InternetAddress.anyIPv4, port, - filterConnections: buildFilterConnections( - property: GameProperty.defaultProperty.copyWith( - description: description, - ))); - final transformer = _pipe = NetworkerPipeTransformer( - WorldEventMapper.fromJson, (e) => e.toJson()); - transformer.read.listen(_onClientEvent); - server - ..clientConnect.listen(_onJoin) - ..clientDisconnect.listen(_onLeave) - ..connect(StringNetworkerPlugin()..connect(transformer)); - await _server?.init(); - - consoler.registerProgram('stop', StopProgram(this)); - consoler.registerProgram('save', SaveProgram(this)); - consoler.registerProgram('packs', PacksProgram(this)); - consoler.registerProgram('players', PlayersProgram(this)); - consoler.registerProgram('say', SayProgram(this)); - consoler.registerProgram(null, UnknownProgram()); - } - - static R _runStaticLogZone(Consoler consoler, R Function() body) => - runZoned(body, zoneSpecification: ZoneSpecification( - print: (self, parent, zone, message) { - consoler.print(message); - }, - )); - - Future run() async { - consoler.run(); - log('Server running on ${_server?.address}', level: LogLevel.info); - await _server?.onClosed.first; - } - - void _onClientEvent(NetworkerPacket event) async { - final data = event.data; - final process = processClientEvent( - data, - event.channel, - state, - assetManager: assetManager, - ); - if (process == null) return; - if (data != null) { - log('Processing event by ${event.channel}: $process', - level: LogLevel.verbose); + if (results.wasParsed('verbose')) { + verbose = true; } - switch (event.data) { - case MessageRequest data: - log("Message by ${event.channel}: ${data.message}", - level: LogLevel.info); - default: + if (results.wasParsed('autosave')) { + autosave = true; } - _pipe?.sendMessage(process.$1, process.$2); - if (process.$2 == kAnyChannel || process.$2 == kAuthorityChannel) { - add(process.$1); + String description = ''; + if (results.wasParsed('description')) { + description = results['description']; } - } - - void _onJoin((Channel, ConnectionInfo) event) { - final (user, info) = event; - log('${info.address} ($user) joined the game', level: LogLevel.info); - _onClientEvent(NetworkerPacket(null, event.$1)); - } - - void _onLeave((Channel, ConnectionInfo) event) { - final (user, info) = event; - log('${info.address} ($user) left the game', level: LogLevel.info); - } - - Future save({bool force = false}) async { - if (!force && _temp) return; - final bytes = state.save().exportAsBytes(); - await File(defaultWorldFile).writeAsBytes(bytes); - } - - @override - Future close() async { - await super.close(); - log('Closing...', level: LogLevel.info); - _server?.close(); - consoler.dispose(); - } - - void process(WorldEvent event) { - _onClientEvent(NetworkerPacket(event, kAuthorityChannel)); - } - - bool kick(int id) { - final info = _server?.getConnectionInfo(id); - if (info == null) return false; - info.close(); - return true; + final server = await QuokkaServer.load(); + await server.init( + port: int.tryParse(results['port'] ?? '') ?? kDefaultPort, + verbose: verbose, + autosave: autosave, + description: description, + ); + await onLoad?.call(server); + await server.run(); + } on FormatException catch (e) { + // Print usage information if an invalid argument was provided. + print(e.message); + print(''); + printUsage(argParser); } } diff --git a/server/lib/programs/kick.dart b/server/lib/programs/kick.dart index c3fca67..a50c462 100644 --- a/server/lib/programs/kick.dart +++ b/server/lib/programs/kick.dart @@ -1,10 +1,10 @@ import 'package:consoler/consoler.dart'; -import 'package:quokka_server/main.dart'; +import 'package:quokka_server/server.dart'; -class PlayersProgram extends ConsoleProgram { +class KickProgram extends ConsoleProgram { final QuokkaServer server; - PlayersProgram(this.server); + KickProgram(this.server); @override String getDescription() => "Kick a player"; diff --git a/server/lib/programs/packs.dart b/server/lib/programs/packs.dart index 98c174e..1060163 100644 --- a/server/lib/programs/packs.dart +++ b/server/lib/programs/packs.dart @@ -1,6 +1,6 @@ import 'package:consoler/consoler.dart'; import 'package:quokka_api/quokka_api.dart'; -import 'package:quokka_server/main.dart'; +import 'package:quokka_server/server.dart'; class PacksProgram extends ConsoleProgram { final QuokkaServer server; diff --git a/server/lib/programs/players.dart b/server/lib/programs/players.dart index eb2824d..b8fd806 100644 --- a/server/lib/programs/players.dart +++ b/server/lib/programs/players.dart @@ -1,5 +1,5 @@ import 'package:consoler/consoler.dart'; -import 'package:quokka_server/main.dart'; +import 'package:quokka_server/server.dart'; class PlayersProgram extends ConsoleProgram { final QuokkaServer server; diff --git a/server/lib/programs/save.dart b/server/lib/programs/save.dart index 0dbf05d..935475b 100644 --- a/server/lib/programs/save.dart +++ b/server/lib/programs/save.dart @@ -1,5 +1,5 @@ import 'package:consoler/consoler.dart'; -import 'package:quokka_server/main.dart'; +import 'package:quokka_server/server.dart'; class SaveProgram extends ConsoleProgram { final QuokkaServer server; diff --git a/server/lib/programs/say.dart b/server/lib/programs/say.dart index d3915ab..cdfefe0 100644 --- a/server/lib/programs/say.dart +++ b/server/lib/programs/say.dart @@ -1,6 +1,6 @@ import 'package:consoler/consoler.dart'; import 'package:quokka_api/quokka_api.dart'; -import 'package:quokka_server/main.dart'; +import 'package:quokka_server/server.dart'; class SayProgram extends ConsoleProgram { final QuokkaServer server; diff --git a/server/lib/programs/stop.dart b/server/lib/programs/stop.dart index 878cef5..12b56e8 100644 --- a/server/lib/programs/stop.dart +++ b/server/lib/programs/stop.dart @@ -1,5 +1,5 @@ import 'package:consoler/consoler.dart'; -import 'package:quokka_server/main.dart'; +import 'package:quokka_server/server.dart'; class StopProgram extends ConsoleProgram { final QuokkaServer server; diff --git a/server/lib/quokka_server.dart b/server/lib/quokka_server.dart new file mode 100644 index 0000000..e6a51ef --- /dev/null +++ b/server/lib/quokka_server.dart @@ -0,0 +1,12 @@ +/// Official quokka server library +library; + +export 'asset.dart'; +export 'main.dart'; +export 'server.dart'; +export 'programs/kick.dart'; +export 'programs/packs.dart'; +export 'programs/players.dart'; +export 'programs/save.dart'; +export 'programs/say.dart'; +export 'programs/stop.dart'; diff --git a/server/lib/server.dart b/server/lib/server.dart new file mode 100644 index 0000000..1f332cb --- /dev/null +++ b/server/lib/server.dart @@ -0,0 +1,201 @@ +import 'dart:async'; +import 'dart:io'; +import 'dart:isolate'; + +import 'package:bloc/bloc.dart'; +import 'package:bloc_concurrency/bloc_concurrency.dart'; +import 'package:consoler/consoler.dart'; +import 'package:networker/networker.dart'; +import 'package:networker_socket/server.dart'; +import 'package:quokka_api/quokka_api.dart'; +import 'package:quokka_server/asset.dart'; +import 'package:quokka_server/events.dart'; +import 'package:quokka_server/programs/packs.dart'; +import 'package:quokka_server/programs/players.dart'; +import 'package:quokka_server/programs/save.dart'; +import 'package:quokka_server/programs/say.dart'; +import 'package:quokka_server/programs/stop.dart'; + +Future _computeEvent(ServerWorldEvent event, WorldState state, + Map signature) { + return Isolate.run( + () => processServerEvent(event, state, signature: signature)); +} + +final class QuokkaServer extends Bloc { + final Consoler consoler; + final ServerAssetManager assetManager; + final String? worldFile; + final EventSystem eventSystem = EventSystem(); + bool _temp = false; + + NetworkerSocketServer? _server; + NetworkerPipe? _pipe; + + QuokkaServer._( + this.worldFile, this.consoler, QuokkaData data, this.assetManager) + : super(WorldState( + data: data, + table: data.getTableOrDefault(), + metadata: data.getMetadataOrDefault(), + info: data.getInfoOrDefault(), + )) { + on((event, emit) async { + final signature = assetManager.createSignature(); + final newState = await _computeEvent(event, state, signature); + if (newState == null) return; + emit(newState); + return save(); + }, transformer: sequential()); + } + + static Future load({ + String? worldFile, + bool disableLoading = false, + }) async { + final assetManager = ServerAssetManager(); + final consoler = Consoler( + defaultProgramConfig: DefaultProgramConfiguration( + description: "Quokka server", + ), + ); + await _runStaticLogZone( + consoler, () => assetManager.init(console: consoler)); + worldFile ??= defaultWorldFile; + final file = File(worldFile); + QuokkaData? data; + if (!disableLoading && await file.exists()) { + final bytes = await file.readAsBytes(); + data = QuokkaData.fromData(bytes); + } + data ??= QuokkaData.empty().setInfo(GameInfo( + packs: assetManager.packs.map((e) => e.key).toList(), + )); + return QuokkaServer._(worldFile, consoler, data, assetManager); + } + + void log(Object? message, {LogLevel? level}) => + consoler.print(message, level: level); + + static final String defaultWorldFile = 'world.qka'; + + Map get players => + Map.fromEntries((_server?.clientConnections ?? {}) + .map((e) => MapEntry(e, _server!.getConnectionInfo(e)!))); + + Future init({ + int port = kDefaultPort, + bool verbose = false, + bool autosave = false, + String description = '', + }) async { + if (verbose) { + consoler.minLogLevel = LogLevel.verbose; + } + log("Starting server on port $port", level: LogLevel.info); + log('Verbose logging activated', level: LogLevel.verbose); + _temp = autosave; + final server = + _server = NetworkerSocketServer(InternetAddress.anyIPv4, port, + filterConnections: buildFilterConnections( + property: GameProperty.defaultProperty.copyWith( + description: description, + ))); + final transformer = _pipe = NetworkerPipeTransformer( + WorldEventMapper.fromJson, (e) => e.toJson()); + transformer.read.listen(_onClientEvent); + server + ..clientConnect.listen(_onJoin) + ..clientDisconnect.listen(_onLeave) + ..connect(StringNetworkerPlugin()..connect(transformer)); + await _server?.init(); + + consoler.registerProgram('stop', StopProgram(this)); + consoler.registerProgram('save', SaveProgram(this)); + consoler.registerProgram('packs', PacksProgram(this)); + consoler.registerProgram('players', PlayersProgram(this)); + consoler.registerProgram('say', SayProgram(this)); + consoler.registerProgram(null, UnknownProgram()); + } + + static R _runStaticLogZone(Consoler consoler, R Function() body) => + runZoned(body, zoneSpecification: ZoneSpecification( + print: (self, parent, zone, message) { + consoler.print(message); + }, + )); + + Future run() async { + consoler.run(); + log('Server running on ${_server?.address}', level: LogLevel.info); + await _server?.onClosed.first; + } + + void _onClientEvent(NetworkerPacket event) async { + final data = event.data; + if (data == null) return; + ListenerResult process = processClientEvent( + data, + event.channel, + state, + assetManager: assetManager, + ); + if (process == null) return; + process = await eventSystem.dispatch( + process.$1, + process.$2, + data, + event.channel, + this, + ); + if (process == null) return; + log('Processing event by ${event.channel}: $process', + level: LogLevel.verbose); + switch (event.data) { + case MessageRequest data: + log("Message by ${event.channel}: ${data.message}", + level: LogLevel.info); + default: + } + _pipe?.sendMessage(process.$1, process.$2); + if (process.$2 == kAnyChannel || process.$2 == kAuthorityChannel) { + add(process.$1); + } + } + + void _onJoin((Channel, ConnectionInfo) event) { + final (user, info) = event; + log('${info.address} ($user) joined the game', level: LogLevel.info); + _onClientEvent(NetworkerPacket(null, event.$1)); + } + + void _onLeave((Channel, ConnectionInfo) event) { + final (user, info) = event; + log('${info.address} ($user) left the game', level: LogLevel.info); + } + + Future save({bool force = false}) async { + if (!force && _temp) return; + final bytes = state.save().exportAsBytes(); + await File(defaultWorldFile).writeAsBytes(bytes); + } + + @override + Future close() async { + await super.close(); + log('Closing...', level: LogLevel.info); + _server?.close(); + consoler.dispose(); + } + + void process(WorldEvent event) { + _onClientEvent(NetworkerPacket(event, kAuthorityChannel)); + } + + bool kick(int id) { + final info = _server?.getConnectionInfo(id); + if (info == null) return false; + info.close(); + return true; + } +}