From 9b88a8688a06200afb94a6a51d1465973da821cf Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Mon, 16 Dec 2024 17:34:04 -0300 Subject: [PATCH] integration tests --- Sources/Realtime/V2/RealtimeClientV2.swift | 8 +- .../RealtimeIntegrationTests.swift | 284 +++++++++--------- 2 files changed, 154 insertions(+), 138 deletions(-) diff --git a/Sources/Realtime/V2/RealtimeClientV2.swift b/Sources/Realtime/V2/RealtimeClientV2.swift index aecf6902..e0564bdc 100644 --- a/Sources/Realtime/V2/RealtimeClientV2.swift +++ b/Sources/Realtime/V2/RealtimeClientV2.swift @@ -209,7 +209,10 @@ public final class RealtimeClientV2: Sendable { } private func onClose(code: Int?, reason: String?) { - // TODO: implement + options.logger?.debug( + "WebSocket closed. Code: \(code?.description ?? ""), Reason: \(reason ?? "")") + + reconnect() } private func reconnect() { @@ -302,7 +305,8 @@ public final class RealtimeClientV2: Sendable { switch event { case .binary: - fatalError("Unsupported binary event") + self.options.logger?.error("Unsupported binary event received.") + break case .text(let text): let data = Data(text.utf8) let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data) diff --git a/Tests/IntegrationTests/RealtimeIntegrationTests.swift b/Tests/IntegrationTests/RealtimeIntegrationTests.swift index e337eb01..92c08694 100644 --- a/Tests/IntegrationTests/RealtimeIntegrationTests.swift +++ b/Tests/IntegrationTests/RealtimeIntegrationTests.swift @@ -7,6 +7,8 @@ import ConcurrencyExtras import CustomDump +import Helpers +import InlineSnapshotTesting import PostgREST import Supabase import TestHelpers @@ -14,11 +16,22 @@ import XCTest @testable import Realtime +struct TestLogger: SupabaseLogger { + func log(message: SupabaseLogMessage) { + print(message.description) + } +} + final class RealtimeIntegrationTests: XCTestCase { + + static let reconnectDelay: TimeInterval = 1 + let realtime = RealtimeClientV2( url: URL(string: "\(DotEnv.SUPABASE_URL)/realtime/v1")!, options: RealtimeClientOptions( - headers: ["apikey": DotEnv.SUPABASE_ANON_KEY] + headers: ["apikey": DotEnv.SUPABASE_ANON_KEY], + reconnectDelay: reconnectDelay, + logger: TestLogger() ) ) @@ -35,23 +48,26 @@ final class RealtimeIntegrationTests: XCTestCase { } } - func testBroadcast() async throws { - let expectation = expectation(description: "receivedBroadcastMessages") - expectation.expectedFulfillmentCount = 3 + func testDisconnectByUser_shouldNotReconnect() async { + await realtime.connect() + XCTAssertEqual(realtime.status, .connected) + + realtime.disconnect() + /// Wait for the reconnection delay + try? await Task.sleep( + nanoseconds: NSEC_PER_SEC * UInt64(Self.reconnectDelay) + 1) + + XCTAssertEqual(realtime.status, .disconnected) + } + + func testBroadcast() async throws { let channel = realtime.channel("integration") { $0.broadcast.receiveOwnBroadcasts = true } - let receivedMessages = LockIsolated<[JSONObject]>([]) - - Task { - for await message in channel.broadcastStream(event: "test") { - receivedMessages.withValue { - $0.append(message) - } - expectation.fulfill() - } + let receivedMessagesTask = Task { + await channel.broadcastStream(event: "test").prefix(3).collect() } await Task.yield() @@ -66,35 +82,38 @@ final class RealtimeIntegrationTests: XCTestCase { try await channel.broadcast(event: "test", message: Message(value: 2)) try await channel.broadcast(event: "test", message: ["value": 3, "another_value": 42]) - await fulfillment(of: [expectation], timeout: 0.5) + let receivedMessages = try await withTimeout(interval: 5) { + await receivedMessagesTask.value + } - expectNoDifference( - receivedMessages.value, + assertInlineSnapshot(of: receivedMessages, as: .json) { + """ [ - [ - "event": "test", - "payload": [ - "value": 1 - ], - "type": "broadcast", - ], - [ - "event": "test", - "payload": [ - "value": 2 - ], - "type": "broadcast", - ], - [ - "event": "test", - "payload": [ - "value": 3, - "another_value": 42, - ], - "type": "broadcast", - ], + { + "event" : "test", + "payload" : { + "value" : 1 + }, + "type" : "broadcast" + }, + { + "event" : "test", + "payload" : { + "value" : 2 + }, + "type" : "broadcast" + }, + { + "event" : "test", + "payload" : { + "another_value" : 42, + "value" : 3 + }, + "type" : "broadcast" + } ] - ) + """ + } await channel.unsubscribe() } @@ -118,18 +137,8 @@ final class RealtimeIntegrationTests: XCTestCase { $0.broadcast.receiveOwnBroadcasts = true } - let expectation = expectation(description: "presenceChange") - expectation.expectedFulfillmentCount = 4 - - let receivedPresenceChanges = LockIsolated<[any PresenceAction]>([]) - - Task { - for await presence in channel.presenceChange() { - receivedPresenceChanges.withValue { - $0.append(presence) - } - expectation.fulfill() - } + let receivedPresenceChangesTask = Task { + await channel.presenceChange().prefix(4).collect() } await Task.yield() @@ -145,10 +154,12 @@ final class RealtimeIntegrationTests: XCTestCase { await channel.untrack() - await fulfillment(of: [expectation], timeout: 0.5) + let receivedPresenceChanges = try await withTimeout(interval: 5) { + await receivedPresenceChangesTask.value + } - let joins = try receivedPresenceChanges.value.map { try $0.decodeJoins(as: UserState.self) } - let leaves = try receivedPresenceChanges.value.map { try $0.decodeLeaves(as: UserState.self) } + let joins = try receivedPresenceChanges.map { try $0.decodeJoins(as: UserState.self) } + let leaves = try receivedPresenceChanges.map { try $0.decodeLeaves(as: UserState.self) } expectNoDifference( joins, [ @@ -172,86 +183,87 @@ final class RealtimeIntegrationTests: XCTestCase { await channel.unsubscribe() } - // FIXME: Test getting stuck - // func testPostgresChanges() async throws { - // let channel = realtime.channel("db-changes") - // - // let receivedInsertActions = Task { - // await channel.postgresChange(InsertAction.self, schema: "public").prefix(1).collect() - // } - // - // let receivedUpdateActions = Task { - // await channel.postgresChange(UpdateAction.self, schema: "public").prefix(1).collect() - // } - // - // let receivedDeleteActions = Task { - // await channel.postgresChange(DeleteAction.self, schema: "public").prefix(1).collect() - // } - // - // let receivedAnyActionsTask = Task { - // await channel.postgresChange(AnyAction.self, schema: "public").prefix(3).collect() - // } - // - // await Task.yield() - // await channel.subscribe() - // - // struct Entry: Codable, Equatable { - // let key: String - // let value: AnyJSON - // } - // - // let key = try await ( - // db.from("key_value_storage") - // .insert(["key": AnyJSON.string(UUID().uuidString), "value": "value1"]).select().single() - // .execute().value as Entry - // ).key - // try await db.from("key_value_storage").update(["value": "value2"]).eq("key", value: key) - // .execute() - // try await db.from("key_value_storage").delete().eq("key", value: key).execute() - // - // let insertedEntries = try await receivedInsertActions.value.map { - // try $0.decodeRecord( - // as: Entry.self, - // decoder: JSONDecoder() - // ) - // } - // let updatedEntries = try await receivedUpdateActions.value.map { - // try $0.decodeRecord( - // as: Entry.self, - // decoder: JSONDecoder() - // ) - // } - // let deletedEntryIds = await receivedDeleteActions.value.compactMap { - // $0.oldRecord["key"]?.stringValue - // } - // - // expectNoDifference(insertedEntries, [Entry(key: key, value: "value1")]) - // expectNoDifference(updatedEntries, [Entry(key: key, value: "value2")]) - // expectNoDifference(deletedEntryIds, [key]) - // - // let receivedAnyActions = await receivedAnyActionsTask.value - // XCTAssertEqual(receivedAnyActions.count, 3) - // - // if case let .insert(action) = receivedAnyActions[0] { - // let record = try action.decodeRecord(as: Entry.self, decoder: JSONDecoder()) - // expectNoDifference(record, Entry(key: key, value: "value1")) - // } else { - // XCTFail("Expected a `AnyAction.insert` on `receivedAnyActions[0]`") - // } - // - // if case let .update(action) = receivedAnyActions[1] { - // let record = try action.decodeRecord(as: Entry.self, decoder: JSONDecoder()) - // expectNoDifference(record, Entry(key: key, value: "value2")) - // } else { - // XCTFail("Expected a `AnyAction.update` on `receivedAnyActions[1]`") - // } - // - // if case let .delete(action) = receivedAnyActions[2] { - // expectNoDifference(key, action.oldRecord["key"]?.stringValue) - // } else { - // XCTFail("Expected a `AnyAction.delete` on `receivedAnyActions[2]`") - // } - // - // await channel.unsubscribe() - // } + func testPostgresChanges() async throws { + let channel = realtime.channel("db-changes") + + let receivedInsertActions = Task { + await channel.postgresChange(InsertAction.self, schema: "public").prefix(1).collect() + } + + let receivedUpdateActions = Task { + await channel.postgresChange(UpdateAction.self, schema: "public").prefix(1).collect() + } + + let receivedDeleteActions = Task { + await channel.postgresChange(DeleteAction.self, schema: "public").prefix(1).collect() + } + + let receivedAnyActionsTask = Task { + await channel.postgresChange(AnyAction.self, schema: "public").prefix(3).collect() + } + + await Task.yield() + await channel.subscribe() + + struct Entry: Codable, Equatable { + let key: String + let value: AnyJSON + } + + // Wait until a system event for makind sure DB change listeners are set before making DB changes. + _ = await channel.system().first(where: { _ in true }) + + let key = try await + (db.from("key_value_storage") + .insert(["key": AnyJSON.string(UUID().uuidString), "value": "value1"]).select().single() + .execute().value as Entry).key + try await db.from("key_value_storage").update(["value": "value2"]).eq("key", value: key) + .execute() + try await db.from("key_value_storage").delete().eq("key", value: key).execute() + + let insertedEntries = try await receivedInsertActions.value.map { + try $0.decodeRecord( + as: Entry.self, + decoder: JSONDecoder() + ) + } + let updatedEntries = try await receivedUpdateActions.value.map { + try $0.decodeRecord( + as: Entry.self, + decoder: JSONDecoder() + ) + } + let deletedEntryIds = await receivedDeleteActions.value.compactMap { + $0.oldRecord["key"]?.stringValue + } + + expectNoDifference(insertedEntries, [Entry(key: key, value: "value1")]) + expectNoDifference(updatedEntries, [Entry(key: key, value: "value2")]) + expectNoDifference(deletedEntryIds, [key]) + + let receivedAnyActions = await receivedAnyActionsTask.value + XCTAssertEqual(receivedAnyActions.count, 3) + + if case let .insert(action) = receivedAnyActions[0] { + let record = try action.decodeRecord(as: Entry.self, decoder: JSONDecoder()) + expectNoDifference(record, Entry(key: key, value: "value1")) + } else { + XCTFail("Expected a `AnyAction.insert` on `receivedAnyActions[0]`") + } + + if case let .update(action) = receivedAnyActions[1] { + let record = try action.decodeRecord(as: Entry.self, decoder: JSONDecoder()) + expectNoDifference(record, Entry(key: key, value: "value2")) + } else { + XCTFail("Expected a `AnyAction.update` on `receivedAnyActions[1]`") + } + + if case let .delete(action) = receivedAnyActions[2] { + expectNoDifference(key, action.oldRecord["key"]?.stringValue) + } else { + XCTFail("Expected a `AnyAction.delete` on `receivedAnyActions[2]`") + } + + await channel.unsubscribe() + } }