Skip to content

Commit

Permalink
fix: Await all logging has completed before finalizing the session. (s…
Browse files Browse the repository at this point in the history
  • Loading branch information
Isakdl authored Jul 31, 2024
1 parent 2142c8b commit e1ad802
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 3 deletions.
55 changes: 53 additions & 2 deletions packages/serverpod/lib/src/server/log_manager/log_manager.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import 'dart:async';
import 'dart:io';
import 'package:meta/meta.dart';
import 'package:serverpod/database.dart';
Expand Down Expand Up @@ -27,6 +28,8 @@ class SessionLogManager {

int get _nextLogOrderId => ++_logOrderId;

final _FutureTaskManager _logTasks;

/// Creates a new [LogManager] from [RuntimeSettings].
@internal
SessionLogManager(
Expand All @@ -37,7 +40,8 @@ class SessionLogManager {
_numberOfQueries = 0,
_logWriter = logWriter,
_settingsForSession = settingsForSession,
_serverId = serverId;
_serverId = serverId,
_logTasks = _FutureTaskManager();

bool _shouldLogQuery({
required Session session,
Expand Down Expand Up @@ -242,10 +246,11 @@ class SessionLogManager {
Function(int, T) setSessionLogId,
) async {
await _attemptOpenStreamingLog(session: session);

if (_continuouslyLogging(session) && session is StreamingSession) {
try {
setSessionLogId(session.sessionLogId!, entry);
await writeLog(session, entry);
_logTasks.addTask(() => writeLog(session, entry));
} catch (exception, stackTrace) {
stderr
.writeln('${DateTime.now().toUtc()} FAILED TO LOG STREAMING $type');
Expand Down Expand Up @@ -311,6 +316,9 @@ class SessionLogManager {
String? exception,
StackTrace? stackTrace,
}) async {
await _openStreamLogLock.synchronized(() {});
await _logTasks.awaitAllTasks();

var duration = session.duration;
var cachedEntry = session.sessionLogs;
LogSettings logSettings = _settingsForSession(session);
Expand Down Expand Up @@ -424,3 +432,46 @@ class LogManager {
return logEntry;
}
}

typedef _TaskCallback = Future<void> Function();

class _FutureTaskManager {
final Set<_TaskCallback> _pendingTasks = {};

Completer<void>? _tasksCompleter;

/// Synchronously adds a task to the task manager.
void addTask(_TaskCallback task) {
_tasksCompleter ??= Completer<void>();
_pendingTasks.add(task);

task().then((value) {
_completeTask(task);
}).onError((error, stackTrace) {
_completeTask(task);
var e = error;
if (e is Exception) throw e;
if (e is Error) throw e;
});
}

void _completeTask(_TaskCallback task) {
_pendingTasks.remove(task);

var tasksCompleter = _tasksCompleter;
if (_pendingTasks.isEmpty && tasksCompleter != null) {
tasksCompleter.complete();
_tasksCompleter = null;
}
}

Future<void> awaitAllTasks() {
var completer = _tasksCompleter;

if (completer == null) {
return Future.value();
} else {
return completer.future;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,15 @@ void main() async {
await client.logging.failedQueryMethod();
} catch (_) {}

await Future.delayed(Duration(milliseconds: 100));

var logs = await LoggingUtil.findAllLogs(session);

expect(logs, hasLength(1));

expect(logs.first.sessionLogEntry.endpoint, 'logging');
expect(logs.first.sessionLogEntry.method, 'failedQueryMethod');
}, skip: 'Fail because of the synchronized lock method, not sure why.');
});

test(
'Given a log setting with everything turned on when calling a method logging a message then the log including the message log is written.',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,59 @@ void main() async {
expect(logs.first.messages, hasLength(1));
});

test(
'Given that continuous logging is turned on when sending a stream message and closing the connection then the log is created.',
() async {
var settings = RuntimeSettingsBuilder()
.withLogSettings(LogSettingsBuilder()
.withLogStreamingSessionsContinuously(true)
.build())
.build();
await server.updateRuntimeSettings(settings);
await client.openStreamingConnection(
disconnectOnLostInternetConnection: false,
);

await client.logging.sendStreamMessage(Types());

await client.closeStreamingConnection();

// Wait for the log to be written
await Future.delayed(Duration(milliseconds: 100));

var logs = await LoggingUtil.findAllLogs(session);

expect(logs, hasLength(1));
expect(logs.first.messages, hasLength(1));
});

test(
'Given that continuous logging is turned on when sending a stream message and closing the connection then the log is created.',
() async {
var settings = RuntimeSettingsBuilder()
.withLogSettings(LogSettingsBuilder()
.withLogStreamingSessionsContinuously(true)
.build())
.build();
await server.updateRuntimeSettings(settings);
await client.openStreamingConnection(
disconnectOnLostInternetConnection: false,
);

await client.logging.sendStreamMessage(Types());
await client.logging.sendStreamMessage(Types());

await client.closeStreamingConnection();

// Wait for the log to be written
await Future.delayed(Duration(milliseconds: 100));

var logs = await LoggingUtil.findAllLogs(session);

expect(logs, hasLength(1));
expect(logs.first.messages, hasLength(2));
});

test(
'Given that continuous logging is turned on when sending several stream messages without closing the connection then the logs are created with different message ids.',
() async {
Expand Down

0 comments on commit e1ad802

Please sign in to comment.