diff --git a/.github/workflows/swift-integration.yml b/.github/workflows/swift-integration.yml index daa874a1..892e694b 100644 --- a/.github/workflows/swift-integration.yml +++ b/.github/workflows/swift-integration.yml @@ -8,10 +8,11 @@ on: jobs: build: - runs-on: macos-12 + runs-on: macos-13 steps: - uses: actions/checkout@v3 - - uses: docker-practice/actions-setup-docker@master + - name: Setup Docker on macOS + uses: douglascamata/setup-docker-macos-action@v1-alpha - run: docker-compose -f docker/docker-compose-ci.yml up --build -d - name: Run tests run: swift test --enable-code-coverage -v --filter YorkieIntegrationTests diff --git a/.github/workflows/swift.yml b/.github/workflows/swift.yml index 690ded2d..fe8aaa4d 100644 --- a/.github/workflows/swift.yml +++ b/.github/workflows/swift.yml @@ -8,7 +8,7 @@ on: jobs: build: - runs-on: macos-12 + runs-on: macos-13 steps: - uses: actions/checkout@v3 - name: SwiftLint diff --git a/Examples/TextEditorApp/TextEditorApp/TextEditor/TextViewModel.swift b/Examples/TextEditorApp/TextEditorApp/TextEditor/TextViewModel.swift index 4b8e42c1..21b83fe8 100644 --- a/Examples/TextEditorApp/TextEditorApp/TextEditor/TextViewModel.swift +++ b/Examples/TextEditorApp/TextEditorApp/TextEditor/TextViewModel.swift @@ -56,13 +56,14 @@ class TextViewModel { } // subscribe document event. - let clientID = await self.client.id - await self.document.subscribe { [weak self] event in - if event.type == .snapshot { + switch event.type { + case .snapshot, .remoteChange: Task { [weak self] in await self?.syncText() } + default: + break } } @@ -73,15 +74,9 @@ class TextViewModel { var textChanges = [TextOperation]() - event.value.filter { $0.actorID != clientID }.forEach { changeInfo in + event.value.forEach { changeInfo in changeInfo.operations.forEach { - if let op = $0 as? EditOpInfo { - let range = NSRange(location: op.from, length: op.to - op.from) - let content = op.content ?? "" - - textChanges.append(.edit(range: range, content: content)) - } else if let _ = $0 as? StyleOpInfo { - } else if let op = $0 as? SelectOpInfo { + if let op = $0 as? SelectOpInfo { let range: NSRange if op.from <= op.to { @@ -146,10 +141,10 @@ class TextViewModel { } func pause() async { - try! await self.client.pause(self.document) + try? await self.client.pauseRemoteChanges(doc: self.document) } func resume() async { - try! await self.client.resume(self.document) + try? await self.client.resumeRemoteChanges(doc: self.document) } } diff --git a/Sources/API/V1/yorkie/v1/yorkie.pb.swift b/Sources/API/V1/yorkie/v1/yorkie.pb.swift index 8694e555..04ae066c 100644 --- a/Sources/API/V1/yorkie/v1/yorkie.pb.swift +++ b/Sources/API/V1/yorkie/v1/yorkie.pb.swift @@ -334,6 +334,8 @@ struct Yorkie_V1_PushPullChangesRequest { /// Clears the value of `changePack`. Subsequent reads from it will return its default value. mutating func clearChangePack() {self._changePack = nil} + var pushOnly: Bool = false + var unknownFields = SwiftProtobuf.UnknownStorage() init() {} @@ -976,6 +978,7 @@ extension Yorkie_V1_PushPullChangesRequest: SwiftProtobuf.Message, SwiftProtobuf 1: .standard(proto: "client_id"), 2: .standard(proto: "document_id"), 3: .standard(proto: "change_pack"), + 4: .standard(proto: "push_only"), ] mutating func decodeMessage(decoder: inout D) throws { @@ -987,6 +990,7 @@ extension Yorkie_V1_PushPullChangesRequest: SwiftProtobuf.Message, SwiftProtobuf case 1: try { try decoder.decodeSingularBytesField(value: &self.clientID) }() case 2: try { try decoder.decodeSingularStringField(value: &self.documentID) }() case 3: try { try decoder.decodeSingularMessageField(value: &self._changePack) }() + case 4: try { try decoder.decodeSingularBoolField(value: &self.pushOnly) }() default: break } } @@ -1006,6 +1010,9 @@ extension Yorkie_V1_PushPullChangesRequest: SwiftProtobuf.Message, SwiftProtobuf try { if let v = self._changePack { try visitor.visitSingularMessageField(value: v, fieldNumber: 3) } }() + if self.pushOnly != false { + try visitor.visitSingularBoolField(value: self.pushOnly, fieldNumber: 4) + } try unknownFields.traverse(visitor: &visitor) } @@ -1013,6 +1020,7 @@ extension Yorkie_V1_PushPullChangesRequest: SwiftProtobuf.Message, SwiftProtobuf if lhs.clientID != rhs.clientID {return false} if lhs.documentID != rhs.documentID {return false} if lhs._changePack != rhs._changePack {return false} + if lhs.pushOnly != rhs.pushOnly {return false} if lhs.unknownFields != rhs.unknownFields {return false} return true } diff --git a/Sources/API/V1/yorkie/v1/yorkie.proto b/Sources/API/V1/yorkie/v1/yorkie.proto index 7b91d24f..efb5fec1 100644 --- a/Sources/API/V1/yorkie/v1/yorkie.proto +++ b/Sources/API/V1/yorkie/v1/yorkie.proto @@ -108,6 +108,7 @@ message PushPullChangesRequest { bytes client_id = 1; string document_id = 2; ChangePack change_pack = 3; + bool push_only = 4; } message PushPullChangesResponse { diff --git a/Sources/Core/Auth.swift b/Sources/Core/Auth.swift index 48a08711..b5190110 100644 --- a/Sources/Core/Auth.swift +++ b/Sources/Core/Auth.swift @@ -21,10 +21,12 @@ import NIOCore class AuthClientInterceptor: ClientInterceptor { let apiKey: String? let token: String? + let docKey: String? - init(apiKey: String? = nil, token: String? = nil) { + init(apiKey: String? = nil, token: String? = nil, docKey: String? = nil) { self.apiKey = apiKey self.token = token + self.docKey = docKey } override func send(_ part: GRPCClientRequestPart, promise: EventLoopPromise?, context: ClientInterceptorContext) { @@ -34,6 +36,14 @@ class AuthClientInterceptor: ClientInterceptor: ClientInterceptor AuthClientInterceptors { + AuthClientInterceptors(apiKey: self.apiKey, token: self.token, docKey: docKey) } func makeActivateClientInterceptors() -> [GRPC.ClientInterceptor] { @@ -68,26 +84,26 @@ final class AuthClientInterceptors: YorkieServiceClientInterceptorFactoryProtoco } func makeUpdatePresenceInterceptors() -> [GRPC.ClientInterceptor] { - [AuthClientInterceptor(apiKey: self.apiKey, token: self.token)] + [AuthClientInterceptor(apiKey: self.apiKey, token: self.token, docKey: self.docKey)] } func makeAttachDocumentInterceptors() -> [GRPC.ClientInterceptor] { - [AuthClientInterceptor(apiKey: self.apiKey, token: self.token)] + [AuthClientInterceptor(apiKey: self.apiKey, token: self.token, docKey: self.docKey)] } func makeDetachDocumentInterceptors() -> [GRPC.ClientInterceptor] { - [AuthClientInterceptor(apiKey: self.apiKey, token: self.token)] + [AuthClientInterceptor(apiKey: self.apiKey, token: self.token, docKey: self.docKey)] } func makeWatchDocumentInterceptors() -> [GRPC.ClientInterceptor] { - [AuthClientInterceptor(apiKey: self.apiKey, token: self.token)] + [AuthClientInterceptor(apiKey: self.apiKey, token: self.token, docKey: self.docKey)] } func makeRemoveDocumentInterceptors() -> [GRPC.ClientInterceptor] { - [AuthClientInterceptor(apiKey: self.apiKey, token: self.token)] + [AuthClientInterceptor(apiKey: self.apiKey, token: self.token, docKey: self.docKey)] } func makePushPullChangesInterceptors() -> [GRPC.ClientInterceptor] { - [AuthClientInterceptor(apiKey: self.apiKey, token: self.token)] + [AuthClientInterceptor(apiKey: self.apiKey, token: self.token, docKey: self.docKey)] } } diff --git a/Sources/Core/Client.swift b/Sources/Core/Client.swift index 077685ca..fc4cea8d 100644 --- a/Sources/Core/Client.swift +++ b/Sources/Core/Client.swift @@ -51,14 +51,38 @@ enum StreamConnectionStatus { case disconnected } +/** + * `SyncMode` is the mode of synchronization. It is used to determine + * whether to push and pull changes in PushPullChanges API. + */ +public enum SyncMode { + /** + * `PushPull` is the mode that pushes and pulls changes. + */ + case pushPull + + /** + * `PushOnly` is the mode that pushes changes only. + */ + case pushOnly +} + struct Attachment { var doc: Document var docID: String var isRealtimeSync: Bool + var realtimeSyncMode: SyncMode var peerPresenceMap: [ActorID: PresenceInfo] var remoteChangeEventReceived: Bool var remoteWatchStream: GRPCAsyncServerStreamingCall? var watchLoopReconnectTimer: Timer? + + /** + * `getPresence` returns the presence information of the client. + */ + func getPresence(clientID: ActorID) -> Presence? { + self.peerPresenceMap[clientID]?.data + } } /** @@ -162,7 +186,7 @@ public actor Client { private let reconnectStreamDelay: Int private let maximumAttachmentTimeout: Int - private let rpcClient: YorkieServiceAsyncClient + private var rpcClient: YorkieServiceAsyncClient private let group: EventLoopGroup @@ -230,6 +254,7 @@ public actor Client { activateRequest.clientKey = self.key do { + self.changeDocKeyOfAuthInterceptors(nil) let activateResponse = try await self.rpcClient.activateClient(activateRequest, callOptions: nil) self.id = activateResponse.clientID.toHexString @@ -267,6 +292,7 @@ public actor Client { deactivateRequest.clientID = clientIDData do { + self.changeDocKeyOfAuthInterceptors(nil) _ = try await self.rpcClient.deactivateClient(deactivateRequest) } catch { Logger.error("Failed to request deactivate client(\(self.key)).", error: error) @@ -311,6 +337,7 @@ public actor Client { self.semaphoresForInitialzation[docKey] = semaphore + self.changeDocKeyOfAuthInterceptors(docKey) let result = try await self.rpcClient.attachDocument(attachDocumentRequest) let pack = try Converter.fromChangePack(result.changePack) @@ -322,7 +349,7 @@ public actor Client { await doc.setStatus(.attached) - self.attachmentMap[doc.getKey()] = Attachment(doc: doc, docID: result.documentID, isRealtimeSync: isRealtimeSync, peerPresenceMap: [String: PresenceInfo](), remoteChangeEventReceived: false) + self.attachmentMap[doc.getKey()] = Attachment(doc: doc, docID: result.documentID, isRealtimeSync: isRealtimeSync, realtimeSyncMode: .pushPull, peerPresenceMap: [String: PresenceInfo](), remoteChangeEventReceived: false) try self.runWatchLoop(docKey) Logger.info("[AD] c:\"\(self.key))\" attaches d:\"\(doc.getKey())\"") @@ -368,6 +395,7 @@ public actor Client { detachDocumentRequest.changePack = Converter.toChangePack(pack: await doc.createChangePack()) do { + self.changeDocKeyOfAuthInterceptors(doc.getKey()) let result = try await self.rpcClient.detachDocument(detachDocumentRequest) let pack = try Converter.fromChangePack(result.changePack) @@ -394,14 +422,22 @@ public actor Client { * `pause` pause the realtime syncronization of the given document. */ public func pause(_ doc: Document) throws { - try self.changeRealtimeSyncSetting(doc, false) + guard self.isActive else { + throw YorkieError.clientNotActive(message: "\(self.key) is not active") + } + + try self.changeRealtimeSync(doc, false) } /** * `resume` resume the realtime syncronization of the given document. */ public func resume(_ doc: Document) throws { - try self.changeRealtimeSyncSetting(doc, true) + guard self.isActive else { + throw YorkieError.clientNotActive(message: "\(self.key) is not active") + } + + try self.changeRealtimeSync(doc, true) } /** @@ -427,6 +463,7 @@ public actor Client { removeDocumentRequest.changePack = Converter.toChangePack(pack: await doc.createChangePack(true)) do { + self.changeDocKeyOfAuthInterceptors(doc.getKey()) let result = try await self.rpcClient.removeDocument(removeDocumentRequest) let pack = try Converter.fromChangePack(result.changePack) @@ -445,11 +482,10 @@ public actor Client { } } - private func changeRealtimeSyncSetting(_ doc: Document, _ isRealtimeSync: Bool) throws { - guard self.isActive else { - throw YorkieError.clientNotActive(message: "\(self.key) is not active") - } - + /** + * `changeRealtimeSync` changes the synchronization mode of the given document. + */ + private func changeRealtimeSync(_ doc: Document, _ isRealtimeSync: Bool) throws { let docKey = doc.getKey() guard self.attachmentMap[docKey] != nil else { @@ -465,20 +501,57 @@ public actor Client { } } + /** + * `pauseRemoteChanges` pauses the synchronization of remote changes, + * allowing only local changes to be applied. + */ + public func pauseRemoteChanges(doc: Document) throws { + guard self.isActive else { + throw YorkieError.clientNotActive(message: "\(self.key) is not active") + } + + let docKey = doc.getKey() + + guard self.attachmentMap[docKey] != nil else { + throw YorkieError.documentNotAttached(message: "\(docKey) is not attached") + } + + self.attachmentMap[docKey]?.realtimeSyncMode = .pushOnly + } + + /** + * `resumeRemoteChanges` resumes the synchronization of remote changes, + * allowing both local and remote changes to be applied. + */ + public func resumeRemoteChanges(doc: Document) throws { + guard self.isActive else { + throw YorkieError.clientNotActive(message: "\(self.key) is not active") + } + + let docKey = doc.getKey() + + guard self.attachmentMap[docKey] != nil else { + throw YorkieError.documentNotAttached(message: "\(docKey) is not attached") + } + + self.attachmentMap[docKey]?.realtimeSyncMode = .pushPull + self.attachmentMap[docKey]?.remoteChangeEventReceived = true + } + /** * `sync` pushes local changes of the attached documents to the server and * receives changes of the remote replica from the server then apply them to * local documents. */ @discardableResult - public func sync() async throws -> [Document] { + public func sync(_ syncModes: [DocumentKey: SyncMode] = [:]) async throws -> [Document] { let attachments = self.attachmentMap.values do { try await withThrowingTaskGroup(of: Void.self) { group in attachments.forEach { attachment in group.addTask { - try await self.syncInternal(attachment) + try await self.syncInternal(attachment, syncModes[attachment.doc.getKey()] ?? .pushPull) } } @@ -523,6 +596,7 @@ public actor Client { self.sendPeerChangeEvent(.presenceChanged, [docKey], id) do { + self.changeDocKeyOfAuthInterceptors(docKey) _ = try await self.rpcClient.updatePresence(updatePresenceRequest) Logger.info("[UM] c\"\(self.key)\" updated") } catch { @@ -532,13 +606,26 @@ public actor Client { } /** - * `getPeers` returns the peers of the given document. + * `getPeerPresence` returns the presence of the given document and client. */ - public func getPeers(key: String) -> PresenceMap { + public func getPeerPresence(docKey: DocumentKey, clientID: ActorID) -> Presence? { + self.attachmentMap[docKey]?.getPresence(clientID: clientID) + } + + /** + * `getPeersByDocKey` returns the peers of the given document. + */ + public func getPeersByDocKey(docKey: DocumentKey) throws -> PresenceMap { + guard let attachment = self.attachmentMap[docKey] else { + throw YorkieError.documentNotAttached(message: "\(docKey) is not attached.") + } + var peers = PresenceMap() - self.attachmentMap[key]?.peerPresenceMap.forEach { + + attachment.peerPresenceMap.forEach { peers[$0.key] = $0.value.data } + return peers } @@ -591,7 +678,7 @@ public actor Client { if docChanged || attachment.remoteChangeEventReceived { self.clearAttachmentRemoteChangeEventReceived(key) group.addTask { - try await self.syncInternal(attachment) + try await self.syncInternal(attachment, attachment.realtimeSyncMode) } } } @@ -633,6 +720,7 @@ public actor Client { request.client = Converter.toClient(id: id, presence: self.presenceInfo) request.documentID = docID + self.changeDocKeyOfAuthInterceptors(docKey) self.attachmentMap[docKey]?.remoteWatchStream = self.rpcClient.makeWatchDocumentCall(request) let event = StreamConnectionStatusChangedEvent(value: .connected) @@ -775,7 +863,7 @@ public actor Client { } @discardableResult - private func syncInternal(_ attachment: Attachment) async throws -> Document { + private func syncInternal(_ attachment: Attachment, _ syncMode: SyncMode) async throws -> Document { guard let clientID = self.id, let clientIDData = clientID.toData else { throw YorkieError.unexpected(message: "Invalid Client ID!") } @@ -789,21 +877,31 @@ public actor Client { pushPullRequest.changePack = Converter.toChangePack(pack: requestPack) pushPullRequest.documentID = attachment.docID + pushPullRequest.pushOnly = syncMode == .pushOnly do { + let docKey = doc.getKey() + + self.changeDocKeyOfAuthInterceptors(docKey) let response = try await self.rpcClient.pushPullChanges(pushPullRequest) let responsePack = try Converter.fromChangePack(response.changePack) + + // NOTE(chacha912, hackerwins): If syncLoop already executed with + // PushPull, ignore the response when the syncMode is PushOnly. + if responsePack.hasChanges(), syncMode == .pushOnly { + return doc + } + try await doc.applyChangePack(pack: responsePack) if await doc.status == .removed { - self.attachmentMap.removeValue(forKey: doc.getKey()) + self.attachmentMap.removeValue(forKey: docKey) } let event = DocumentSyncedEvent(value: .synced) self.eventStream.send(event) - let docKey = doc.getKey() let remoteSize = responsePack.getChangeSize() Logger.info("[PP] c:\"\(self.key)\" sync d:\"\(docKey)\", push:\(localSize) pull:\(remoteSize) cp:\(responsePack.getCheckpoint().structureAsString)") @@ -814,4 +912,8 @@ public actor Client { throw error } } + + private func changeDocKeyOfAuthInterceptors(_ docKey: String?) { + self.rpcClient.interceptors = (self.rpcClient.interceptors as? AuthClientInterceptors)?.docKeyChangedInterceptors(docKey) + } } diff --git a/Sources/Document/DocEvent.swift b/Sources/Document/DocEvent.swift index 72107733..32581e54 100644 --- a/Sources/Document/DocEvent.swift +++ b/Sources/Document/DocEvent.swift @@ -58,7 +58,7 @@ public struct SnapshotEvent: DocEvent { public var value: Data } -protocol ChangeEventable: DocEvent { +protocol ChangeEvent: DocEvent { var type: DocEventType { get } var value: ChangeInfo { get } } @@ -78,7 +78,7 @@ public struct ChangeInfo { * by local changes. * */ -public struct LocalChangeEvent: ChangeEventable { +public struct LocalChangeEvent: ChangeEvent { /** * ``DocEventType/localChange`` */ @@ -94,7 +94,7 @@ public struct LocalChangeEvent: ChangeEventable { * by remote changes. * */ -public struct RemoteChangeEvent: ChangeEventable { +public struct RemoteChangeEvent: ChangeEvent { /** * ``DocEventType/remoteChange`` */ diff --git a/Sources/Document/Document.swift b/Sources/Document/Document.swift index de46c2d8..cd15b2b6 100644 --- a/Sources/Document/Document.swift +++ b/Sources/Document/Document.swift @@ -62,7 +62,7 @@ public actor Document { private var root: CRDTRoot private var clone: CRDTRoot? private var changeID: ChangeID - private var checkpoint: Checkpoint + internal var checkpoint: Checkpoint private var localChanges: [Change] private var defaultSubscribeCallback: SubscribeCallback? private var subscribeCallbacks: [String: SubscribeCallback] @@ -121,6 +121,17 @@ public actor Document { } } + /** + * `unsubscribe` unregisters a callback to subscribe to events on the document. + */ + public func unsubscribe(targetPath: String? = nil) { + if let targetPath { + self.subscribeCallbacks[targetPath] = nil + } else { + self.defaultSubscribeCallback = nil + } + } + /** * `applyChangePack` applies the given change pack into this document. * 1. Remove local changes applied to server. @@ -319,7 +330,7 @@ public actor Document { self.changeID.syncLamport(with: $0.id.getLamport()) } - + changeInfos.forEach { self.processDocEvent(RemoteChangeEvent(value: $0)) } @@ -372,9 +383,9 @@ public actor Document { private func processDocEvent(_ event: DocEvent) { if event.type != .snapshot { - if let event = event as? ChangeEventable { + if let event = event as? ChangeEvent { var operations = [String: [any OperationInfo]]() - + event.value.operations.forEach { operationInfo in self.subscribeCallbacks.keys.forEach { targetPath in if self.isSameElementOrChildOf(operationInfo.path, targetPath) { @@ -385,13 +396,15 @@ public actor Document { } } } - + operations.forEach { key, value in let info = ChangeInfo(message: event.value.message, operations: value, actorID: event.value.actorID) - + self.subscribeCallbacks[key]?(event.type == .localChange ? LocalChangeEvent(value: info) : RemoteChangeEvent(value: info)) } } + } else { + self.subscribeCallbacks["$"]?(event) } self.defaultSubscribeCallback?(event) diff --git a/Tests/Integration/ClientIntegrationTests.swift b/Tests/Integration/ClientIntegrationTests.swift index fe2979c5..71afc3e1 100644 --- a/Tests/Integration/ClientIntegrationTests.swift +++ b/Tests/Integration/ClientIntegrationTests.swift @@ -290,7 +290,7 @@ final class ClientIntegrationTests: XCTestCase { try await c1.sync() try await c2.sync() - let presence1: PresenceType = self.decodePresence(await c2.getPeers(key: d2.getKey())[c1.id!]!)! + let presence1: PresenceType = self.decodePresence(await c2.getPeerPresence(docKey: docKey, clientID: c1.id!)!)! XCTAssert(c1Name == presence1.name) @@ -300,12 +300,12 @@ final class ClientIntegrationTests: XCTestCase { try await c2.sync() try await c1.sync() - let presence2: PresenceType = self.decodePresence(await c1.getPeers(key: d1.getKey())[c2.id!]!)! + let presence2: PresenceType = self.decodePresence(await c1.getPeerPresence(docKey: docKey, clientID: c2.id!)!)! XCTAssert(c2Name == presence2.name) - let c1Peer = await c1.getPeers(key: d1.getKey()) as NSDictionary - let c2Peer = (await c2.getPeers(key: d2.getKey()) as NSDictionary) as! [AnyHashable: Any] + let c1Peer = try await c1.getPeersByDocKey(docKey: d1.getKey()) as NSDictionary + let c2Peer = try (await c2.getPeersByDocKey(docKey: d2.getKey()) as NSDictionary) as! [AnyHashable: Any] XCTAssert(c1Peer.isEqual(to: c2Peer)) @@ -360,6 +360,139 @@ final class ClientIntegrationTests: XCTestCase { try await c1.deactivate() } + func test_can_change_sync_mode_in_realtime_sync() async throws { + let c1 = Client(rpcAddress: self.rpcAddress, options: ClientOptions()) + let c2 = Client(rpcAddress: self.rpcAddress, options: ClientOptions()) + let c3 = Client(rpcAddress: self.rpcAddress, options: ClientOptions()) + + try await c1.activate() + try await c2.activate() + try await c3.activate() + + let docKey = "\(self.description)-\(Date().description)".toDocKey + + let d1 = Document(key: docKey) + let d2 = Document(key: docKey) + let d3 = Document(key: docKey) + + // 01. c1, c2, c3 attach to the same document in realtime sync. + try await c1.attach(d1) + try await c2.attach(d2) + try await c3.attach(d3) + + // 02. c1, c2 sync in realtime. + try await d1.update { root in + root.c1 = Int64(0) + } + + try await d2.update { root in + root.c2 = Int64(0) + } + + try await Task.sleep(nanoseconds: 1_500_000_000) + + var d1Doc = await d1.toSortedJSON() + var d2Doc = await d2.toSortedJSON() + + XCTAssertEqual(d1Doc, "{\"c1\":0,\"c2\":0}") + XCTAssertEqual(d2Doc, "{\"c1\":0,\"c2\":0}") + + // 03. c1 and c2 sync with push-only mode. So, the changes of c1 and c2 + // are not reflected to each other. + // But, c can get the changes of c1 and c2, because c3 sync with push-pull mode. + try await c1.pauseRemoteChanges(doc: d1) + try await c2.pauseRemoteChanges(doc: d2) + + try await d1.update { root in + root.c1 = Int64(1) + } + + try await d2.update { root in + root.c2 = Int64(1) + } + + try await Task.sleep(nanoseconds: 1_500_000_000) + + d1Doc = await d1.toSortedJSON() + d2Doc = await d2.toSortedJSON() + let d3Doc = await d3.toSortedJSON() + + XCTAssertEqual(d1Doc, "{\"c1\":1,\"c2\":0}") + XCTAssertEqual(d2Doc, "{\"c1\":0,\"c2\":1}") + XCTAssertEqual(d3Doc, "{\"c1\":1,\"c2\":1}") + + // 04. c1 and c2 sync with push-pull mode. + try await c1.resumeRemoteChanges(doc: d1) + try await c2.resumeRemoteChanges(doc: d2) + + try await Task.sleep(nanoseconds: 1_500_000_000) + + d1Doc = await d1.toSortedJSON() + d2Doc = await d2.toSortedJSON() + + XCTAssertEqual(d1Doc, "{\"c1\":1,\"c2\":1}") + XCTAssertEqual(d2Doc, "{\"c1\":1,\"c2\":1}") + + try await c1.deactivate() + try await c2.deactivate() + try await c3.deactivate() + } + + func test_can_get_peers_presence() async throws { + struct Cursor: Codable, Equatable { + // swiftlint: disable identifier_name + var x: Int + var y: Int + // swiftlint: enable identifier_name + } + + struct PresenceType: Codable, Equatable { + static func == (lhs: PresenceType, rhs: PresenceType) -> Bool { + lhs.name == rhs.name && lhs.cursor == rhs.cursor + } + + var name: String + var cursor: Cursor + } + + var option = ClientOptions() + option.presence = PresenceType(name: "a", cursor: Cursor(x: 0, y: 0)).createdDictionary + + let c1 = Client(rpcAddress: rpcAddress, options: option) + + option.presence = PresenceType(name: "b", cursor: Cursor(x: 1, y: 1)).createdDictionary + + let c2 = Client(rpcAddress: rpcAddress, options: option) + + try await c1.activate() + try await c2.activate() + + let docKey = "\(self.description)-\(Date().description)".toDocKey + + let d1 = Document(key: docKey) + let d2 = Document(key: docKey) + + try await c1.attach(d1) + + let presence1 = await c1.getPeerPresence(docKey: docKey, clientID: c2.id!) + + XCTAssert(presence1 == nil) + + try await c2.attach(d2) + + try await Task.sleep(nanoseconds: 1_500_000_000) + + let presence2: PresenceType? = self.decodePresence(await c1.getPeerPresence(docKey: docKey, clientID: c2.id!)!) + + XCTAssertTrue(presence2 == PresenceType(name: "b", cursor: Cursor(x: 1, y: 1))) + + try await c1.detach(d1) + try await c2.detach(d2) + + try await c1.deactivate() + try await c2.deactivate() + } + private func decodePresence(_ dictionary: [String: Any]) -> T? { guard let data = try? JSONSerialization.data(withJSONObject: dictionary, options: []) else { return nil diff --git a/Tests/Integration/ClientTests.swift b/Tests/Integration/ClientTests.swift index 72a99b74..90e02dc3 100644 --- a/Tests/Integration/ClientTests.swift +++ b/Tests/Integration/ClientTests.swift @@ -155,4 +155,148 @@ class ClientTests: XCTestCase { XCTAssertFalse(isActive) XCTAssert(status == .deactivated) } + + func test_sync_option_with_multiple_clients() async throws { + let rpcAddress = RPCAddress(host: "localhost", port: 8080) + let docKey = "\(self.description)-\(Date().description)".toDocKey + + let c1 = Client(rpcAddress: rpcAddress, options: ClientOptions()) + let c2 = Client(rpcAddress: rpcAddress, options: ClientOptions()) + let c3 = Client(rpcAddress: rpcAddress, options: ClientOptions()) + + try await c1.activate() + try await c2.activate() + try await c3.activate() + + // 01. c1, c2, c3 attach to the same document. + let d1 = Document(key: docKey) + try await c1.attach(d1, false) + let d2 = Document(key: docKey) + try await c2.attach(d2, false) + let d3 = Document(key: docKey) + try await c3.attach(d3, false) + + // 02. c1, c2 sync with push-pull mode. + try await d1.update { root in + root.c1 = Int64(0) + } + + try await d1.update { root in + root.c2 = Int64(0) + } + + try await c1.sync() + try await c2.sync() + try await c1.sync() + + var result1 = await d1.getRoot().debugDescription + var result2 = await d2.getRoot().debugDescription + + XCTAssertEqual(result1, result2) + + // 03. c1 and c2 sync with push-only mode. So, the changes of c1 and c2 + // are not reflected to each other. + // But, c3 can get the changes of c1 and c2, because c3 sync with pull-pull mode. + try await d1.update { root in + root.c1 = Int64(1) + } + + try await d2.update { root in + root.c2 = Int64(1) + } + + try await c1.sync([docKey: .pushOnly]) + try await c2.sync([docKey: .pushOnly]) + try await c3.sync() + + result1 = await d1.getRoot().debugDescription + result2 = await d2.getRoot().debugDescription + + XCTAssertNotEqual(result1, result2) + + let result3 = await d3.getRoot().debugDescription + + XCTAssertEqual(result3, "{\"c1\":1,\"c2\":1}") + + // 04. c1 and c2 sync with push-pull mode. + try await c1.sync() + try await c2.sync() + + result1 = await d1.getRoot().debugDescription + result2 = await d2.getRoot().debugDescription + + XCTAssertEqual(result1, result3) + XCTAssertEqual(result2, result3) + + try await c1.detach(d1) + try await c2.detach(d2) + try await c3.detach(d3) + + try await c1.deactivate() + try await c2.deactivate() + try await c3.deactivate() + } + + func test_sync_option_with_mixed_mode() async throws { + let rpcAddress = RPCAddress(host: "localhost", port: 8080) + let docKey = "\(self.description)-\(Date().description)".toDocKey + + let c1 = Client(rpcAddress: rpcAddress, options: ClientOptions()) + + try await c1.activate() + + // 01. cli attach to the same document having counter. + let d1 = Document(key: docKey) + try await c1.attach(d1, false) + + // 02. cli update the document with creating a counter + // and sync with push-pull mode: CP(0, 0) -> CP(1, 1) + try await d1.update { root in + root.counter = JSONCounter(value: Int64(0)) + } + + var checkpoint = await d1.checkpoint + XCTAssertEqual(Checkpoint(serverSeq: 0, clientSeq: 0), checkpoint) + + try await c1.sync() + + checkpoint = await d1.checkpoint + XCTAssertEqual(Checkpoint(serverSeq: 1, clientSeq: 1), checkpoint) + + // 03. cli update the document with increasing the counter(0 -> 1) + // and sync with push-only mode: CP(1, 1) -> CP(2, 1) + try await d1.update { root in + (root.counter as? JSONCounter)!.increase(value: 1) + } + + var changePack = await d1.createChangePack() + + XCTAssertEqual(changePack.getChanges().count, 1) + + try await c1.sync([docKey: .pushOnly]) + + checkpoint = await d1.checkpoint + XCTAssertEqual(Checkpoint(serverSeq: 1, clientSeq: 2), checkpoint) + + // 04. cli update the document with increasing the counter(1 -> 2) + // and sync with push-pull mode. CP(2, 1) -> CP(3, 3) + try await d1.update { root in + (root.counter as? JSONCounter)!.increase(value: 1) + } + + // The previous increase(0 -> 1) is already pushed to the server, + // so the ChangePack of the request only has the increase(1 -> 2). + changePack = await d1.createChangePack() + + XCTAssertEqual(changePack.getChanges().count, 1) + + try await c1.sync() + + checkpoint = await d1.checkpoint + XCTAssertEqual(Checkpoint(serverSeq: 3, clientSeq: 3), checkpoint) + + let counter = await(d1.getRoot().get(key: "counter") as? JSONCounter)! + + XCTAssertEqual(2, counter.value) + } } diff --git a/Tests/Integration/DocumentIntegrationTests.swift b/Tests/Integration/DocumentIntegrationTests.swift index d8dbf30d..f64003c1 100644 --- a/Tests/Integration/DocumentIntegrationTests.swift +++ b/Tests/Integration/DocumentIntegrationTests.swift @@ -365,4 +365,115 @@ final class DocumentIntegrationTests: XCTestCase { try await self.c1.deactivate() } + + func test_specify_the_topic_to_subscribe_to() async throws { + let options = ClientOptions() + let docKey = "\(self.description)-\(Date().description)".toDocKey + + self.c1 = Client(rpcAddress: self.rpcAddress, options: options) + self.c2 = Client(rpcAddress: self.rpcAddress, options: options) + + try await self.c1.activate() + try await self.c2.activate() + + self.d1 = Document(key: docKey) + self.d2 = Document(key: docKey) + + try await self.c1.attach(self.d1) + try await self.c2.attach(self.d2) + + var d1Events = [any OperationInfo]() + var d2Events = [any OperationInfo]() + var d3Events = [any OperationInfo]() + + await self.d1.subscribe { event in + d1Events.append(contentsOf: (event as? ChangeEvent)?.value.operations ?? []) + } + + await self.d1.subscribe(targetPath: "$.todos") { event in + d2Events.append(contentsOf: (event as? ChangeEvent)?.value.operations ?? []) + } + + await self.d1.subscribe(targetPath: "$.counter") { event in + d3Events.append(contentsOf: (event as? ChangeEvent)?.value.operations ?? []) + } + + try await self.d2.update { root in + root.counter = JSONCounter(value: Int32(0)) + root.todos = ["todo1", "todo2"] + } + + // Wait sync. + try await Task.sleep(nanoseconds: 1_500_000_000) + + XCTAssertEqual(d1Events[0] as? SetOpInfo, SetOpInfo(path: "$", key: "counter")) + XCTAssertEqual(d1Events[1] as? SetOpInfo, SetOpInfo(path: "$", key: "todos")) + XCTAssertEqual(d1Events[2] as? AddOpInfo, AddOpInfo(path: "$.todos", index: 0)) + XCTAssertEqual(d1Events[3] as? AddOpInfo, AddOpInfo(path: "$.todos", index: 1)) + XCTAssertEqual(d2Events[0] as? AddOpInfo, AddOpInfo(path: "$.todos", index: 0)) + XCTAssertEqual(d2Events[1] as? AddOpInfo, AddOpInfo(path: "$.todos", index: 1)) + + d1Events = [] + d2Events = [] + + try await self.d2.update { root in + (root.counter as? JSONCounter)?.increase(value: 10) + } + + // Wait sync. + try await Task.sleep(nanoseconds: 1_500_000_000) + + XCTAssertEqual(d1Events[0] as? IncreaseOpInfo, IncreaseOpInfo(path: "$.counter", value: 10)) + XCTAssertEqual(d3Events[0] as? IncreaseOpInfo, IncreaseOpInfo(path: "$.counter", value: 10)) + + d1Events = [] + d3Events = [] + + try await self.d2.update { root in + (root.todos as? JSONArray)?.append("todo3") + } + + // Wait sync. + try await Task.sleep(nanoseconds: 1_500_000_000) + + XCTAssertEqual(d1Events[0] as? AddOpInfo, AddOpInfo(path: "$.todos", index: 2)) + XCTAssertEqual(d2Events[0] as? AddOpInfo, AddOpInfo(path: "$.todos", index: 2)) + + d1Events = [] + d2Events = [] + + await self.d1.unsubscribe(targetPath: "$.todos") + + try await self.d2.update { root in + (root.todos as? JSONArray)?.append("todo4") + } + + // Wait sync. + try await Task.sleep(nanoseconds: 1_500_000_000) + + XCTAssertEqual(d1Events[0] as? AddOpInfo, AddOpInfo(path: "$.todos", index: 3)) + XCTAssertTrue(d2Events.isEmpty) + + d1Events = [] + + await self.d1.unsubscribe(targetPath: "$.counter") + + try await self.d2.update { root in + (root.counter as? JSONCounter)?.increase(value: 10) + } + + // Wait sync. + try await Task.sleep(nanoseconds: 1_500_000_000) + + XCTAssertEqual(d1Events[0] as? IncreaseOpInfo, IncreaseOpInfo(path: "$.counter", value: 10)) + XCTAssertTrue(d3Events.isEmpty) + + await self.d1.unsubscribe() + + try await self.c1.detach(self.d1) + try await self.c2.detach(self.d2) + + try await self.c1.deactivate() + try await self.c2.deactivate() + } } diff --git a/Tests/Unit/Document/DocumentTests.swift b/Tests/Unit/Document/DocumentTests.swift index 445381dc..1c7330e4 100644 --- a/Tests/Unit/Document/DocumentTests.swift +++ b/Tests/Unit/Document/DocumentTests.swift @@ -38,7 +38,7 @@ class DocumentTests: XCTestCase { func test_can_input_nil() async throws { let target = Document(key: "test-doc") try await target.update { root in - root.data = ["": nil, "null": nil] + root.data = ["": nil, "null": nil] as [String: Any?] } let result = await target.toSortedJSON() @@ -734,7 +734,7 @@ class DocumentTests: XCTestCase { } try await target.update { root in - root[""] = [:] + root[""] = [:] as [String: Any] XCTAssertEqual(root.debugDescription, """ @@ -742,7 +742,7 @@ class DocumentTests: XCTestCase { """) let emptyKey = root[""] as? JSONObject - emptyKey!.obj = [:] + emptyKey!.obj = [:] as [String: Any] XCTAssertEqual(root.debugDescription, """ @@ -819,12 +819,12 @@ class DocumentTests: XCTestCase { } try await target.update { root in - root.arr = [] + root.arr = [] as [String] let arr = root.arr as? JSONArray arr?.append(Int64(0)) arr?.append(Int64(1)) arr?.remove(index: 1) - root["$$...hello"] = [] + root["$$...hello"] = [] as [String] let hello = root["$$...hello"] as? JSONArray hello?.append(Int64(0)) diff --git a/Tests/Unit/Document/JSONObjectTests.swift b/Tests/Unit/Document/JSONObjectTests.swift index e6c392c1..b0a1b1c2 100644 --- a/Tests/Unit/Document/JSONObjectTests.swift +++ b/Tests/Unit/Document/JSONObjectTests.swift @@ -95,7 +95,7 @@ class JSONObjectTests: XCTestCase { "long": Int64(9_999_999), "double": Double(1.2222222), "string": "abc", - "compB": ["id": "b", "compC": ["id": "c", "compD": ["id": "d-1"]]]]) + "compB": ["id": "b", "compC": ["id": "c", "compD": ["id": "d-1"]] as [String: Any]] as [String: Any]] as [String: Any]) XCTAssertEqual(root.debugDescription, """ @@ -126,7 +126,7 @@ class JSONObjectTests: XCTestCase { "long": Int64(9_999_999), "double": Double(1.2222222), "string": "abc", - "compB": ["id": "b", "compC": ["id": "c", "compD": ["id": "d-1"]]]]) + "compB": ["id": "b", "compC": ["id": "c", "compD": ["id": "d-1"]] as [String: Any]] as [String: Any]] as [String: Any]) XCTAssertEqual(root.debugDescription, """ diff --git a/Tests/Unit/Document/JSONTextTest.swift b/Tests/Unit/Document/JSONTextTest.swift index 904462c2..f2385783 100644 --- a/Tests/Unit/Document/JSONTextTest.swift +++ b/Tests/Unit/Document/JSONTextTest.swift @@ -114,7 +114,7 @@ final class JSONTextTest: XCTestCase { try await doc.update { root in root.text = JSONText() } await doc.subscribe(targetPath: "$.text") { - view.applyChanges(operations: ($0 as! ChangeEventable).value.operations) + view.applyChanges(operations: ($0 as! ChangeEvent).value.operations) } let commands: [(from: Int, to: Int, content: String)] = [ @@ -141,7 +141,7 @@ final class JSONTextTest: XCTestCase { try await doc.update { root in root.text = JSONText() } await doc.subscribe(targetPath: "$.text") { - view.applyChanges(operations: ($0 as! ChangeEventable).value.operations) + view.applyChanges(operations: ($0 as! ChangeEvent).value.operations) } let commands: [(from: Int, to: Int, content: String)] = [ @@ -176,7 +176,7 @@ final class JSONTextTest: XCTestCase { try await doc.update { root in root.text = JSONText() } await doc.subscribe(targetPath: "$.text") { - view.applyChanges(operations: ($0 as! ChangeEventable).value.operations) + view.applyChanges(operations: ($0 as! ChangeEvent).value.operations) } let commands: [(from: Int, to: Int, content: String)] = [ @@ -210,7 +210,7 @@ final class JSONTextTest: XCTestCase { } await doc.subscribe(targetPath: "$.text") { event in - XCTAssertEqual((event as! ChangeEventable).value.operations[0] as! SelectOpInfo, SelectOpInfo(path: "$.text", from: 2, to: 4)) + XCTAssertEqual((event as! ChangeEvent).value.operations[0] as! SelectOpInfo, SelectOpInfo(path: "$.text", from: 2, to: 4)) } try await doc.update { root in (root.text as? JSONText)?.select(2, 4) }