diff --git a/Sources/Core/Client.swift b/Sources/Core/Client.swift index f606c799..03c38349 100644 --- a/Sources/Core/Client.swift +++ b/Sources/Core/Client.swift @@ -126,8 +126,9 @@ public struct ClientOptions { * It has documents and sends changes of the documents in local * to the server to synchronize with other replicas in remote. */ -public actor Client { - private var attachmentMap: [DocumentKey: Attachment] +@MainActor +public class Client { + private var attachmentMap = [DocumentKey: Attachment]() private let syncLoopDuration: Int private let reconnectStreamDelay: Int private let maximumAttachmentTimeout: Int @@ -141,17 +142,14 @@ public actor Client { public private(set) var id: ActorID? public nonisolated let key: String public var isActive: Bool { self.status == .activated } - public private(set) var status: ClientStatus + public private(set) var status: ClientStatus = .deactivated /** * @param rpcAddr - the address of the RPC server. * @param opts - the options of the client. */ - public init(_ urlString: String, _ options: ClientOptions = ClientOptions()) { + public nonisolated init(_ urlString: String, _ options: ClientOptions = ClientOptions()) { self.key = options.key ?? UUID().uuidString - - self.status = .deactivated - self.attachmentMap = [String: Attachment]() self.syncLoopDuration = options.syncLoopDuration self.reconnectStreamDelay = options.reconnectStreamDelay self.maximumAttachmentTimeout = options.maximumAttachmentTimeout @@ -169,7 +167,7 @@ public actor Client { * @param url - the url of the RPC server. * @param opts - the options of the client. */ - init?(_ url: URL, _ options: ClientOptions = ClientOptions()) { + convenience init?(_ url: URL, _ options: ClientOptions = ClientOptions()) { self.init(url.absoluteString, options) } @@ -240,18 +238,18 @@ public actor Client { throw YorkieError.unexpected(message: "Invalid client ID! [\(self.id ?? "nil")]") } - guard await doc.status == .detached else { + guard doc.status == .detached else { throw YorkieError.documentNotDetached(message: "\(doc) is not detached.") } - await doc.setActor(clientID) - try await doc.update { _, presence in + doc.setActor(clientID) + try doc.update { _, presence in presence.set(initialPresence) } var attachRequest = AttachDocumentRequest() attachRequest.clientID = clientID - attachRequest.changePack = Converter.toChangePack(pack: await doc.createChangePack()) + attachRequest.changePack = Converter.toChangePack(pack: doc.createChangePack()) do { let docKey = doc.getKey() @@ -266,18 +264,18 @@ public actor Client { } let pack = try Converter.fromChangePack(message.changePack) - try await doc.applyChangePack(pack) + try doc.applyChangePack(pack) - if await doc.status == .removed { + if doc.status == .removed { throw YorkieError.documentRemoved(message: "\(doc) is removed.") } - await doc.applyStatus(.attached) + doc.applyStatus(.attached) self.attachmentMap[doc.getKey()] = Attachment(doc: doc, docID: message.documentID, syncMode: syncMode, remoteChangeEventReceived: false) if syncMode != .manual { - try await self.runWatchLoop(docKey) + try self.runWatchLoop(docKey) try await self.waitForInitialization(semaphore, docKey) } @@ -314,14 +312,14 @@ public actor Client { throw YorkieError.documentNotAttached(message: "\(doc.getKey()) is not attached when \(#function).") } - try await doc.update { _, presence in + try doc.update { _, presence in presence.clear() } var detachDocumentRequest = DetachDocumentRequest() detachDocumentRequest.clientID = clientID detachDocumentRequest.documentID = attachment.docID - detachDocumentRequest.changePack = Converter.toChangePack(pack: await doc.createChangePack()) + detachDocumentRequest.changePack = Converter.toChangePack(pack: doc.createChangePack()) do { let detachDocumentResponse = await self.rpcClient.detachDocument(request: detachDocumentRequest, headers: self.authHeader.makeHeader(doc.getKey())) @@ -332,10 +330,10 @@ public actor Client { let pack = try Converter.fromChangePack(message.changePack) - try await doc.applyChangePack(pack) + try doc.applyChangePack(pack) - if await doc.status != .removed { - await doc.applyStatus(.detached) + if doc.status != .removed { + doc.applyStatus(.detached) } try self.stopWatchLoop(doc.getKey()) @@ -371,7 +369,7 @@ public actor Client { var removeDocumentRequest = RemoveDocumentRequest() removeDocumentRequest.clientID = clientID removeDocumentRequest.documentID = attachment.docID - removeDocumentRequest.changePack = Converter.toChangePack(pack: await doc.createChangePack(true)) + removeDocumentRequest.changePack = Converter.toChangePack(pack: doc.createChangePack(true)) do { let removeDocumentResponse = await self.rpcClient.removeDocument(request: removeDocumentRequest, headers: self.authHeader.makeHeader(doc.getKey())) @@ -381,7 +379,7 @@ public actor Client { } let pack = try Converter.fromChangePack(message.changePack) - try await doc.applyChangePack(pack) + try doc.applyChangePack(pack) try self.stopWatchLoop(doc.getKey()) @@ -400,7 +398,7 @@ public actor Client { * `changeSyncMode` changes the synchronization mode of the given document. */ @discardableResult - public func changeSyncMode(_ doc: Document, _ syncMode: SyncMode) async throws -> Document { + public func changeSyncMode(_ doc: Document, _ syncMode: SyncMode) throws -> Document { let docKey = doc.getKey() guard let attachment = self.attachmentMap[docKey] else { @@ -430,7 +428,7 @@ public actor Client { // manual to realtime if prevSyncMode == .manual { - try await self.runWatchLoop(docKey) + try self.runWatchLoop(docKey) } return doc @@ -535,7 +533,7 @@ public actor Client { await self.doSyncLoop() } - private func doWatchLoop(_ docKey: DocumentKey) async throws { + private func doWatchLoop(_ docKey: DocumentKey) throws { self.attachmentMap[docKey]?.resetWatchLoopTimer() guard self.isActive, let id = self.id else { @@ -582,13 +580,13 @@ public actor Client { self.attachmentMap[docKey]?.connectStream(stream) - await self.attachmentMap[docKey]?.doc.publishConnectionEvent(.connected) + self.attachmentMap[docKey]?.doc.publishConnectionEvent(.connected) } - private func runWatchLoop(_ docKey: DocumentKey) async throws { + private func runWatchLoop(_ docKey: DocumentKey) throws { Logger.debug("[WL] c:\"\(self.key)\" run watch loop") - try await self.doWatchLoop(docKey) + try self.doWatchLoop(docKey) } private func stopWatchLoop(_ docKey: DocumentKey) throws { @@ -620,7 +618,7 @@ public actor Client { switch body { case .initialization(let initialization): var onlineClients = Set() - let actorID = await self.attachmentMap[docKey]?.doc.actorID + let actorID = self.attachmentMap[docKey]?.doc.actorID for pbClientID in initialization.clientIds.filter({ $0 != actorID }) { onlineClients.insert(pbClientID) @@ -628,8 +626,8 @@ public actor Client { self.semaphoresForInitialzation[docKey]?.signal() - await self.attachmentMap[docKey]?.doc.setOnlineClients(onlineClients) - await self.attachmentMap[docKey]?.doc.publishPresenceEvent(.initialized) + self.attachmentMap[docKey]?.doc.setOnlineClients(onlineClients) + self.attachmentMap[docKey]?.doc.publishPresenceEvent(.initialized) case .event(let pbWatchEvent): let publisher = pbWatchEvent.publisher @@ -637,21 +635,21 @@ public actor Client { case .documentChanged: self.attachmentMap[docKey]?.remoteChangeEventReceived = true case .documentWatched: - await self.attachmentMap[docKey]?.doc.addOnlineClient(publisher) + self.attachmentMap[docKey]?.doc.addOnlineClient(publisher) // NOTE(chacha912): We added to onlineClients, but we won't trigger watched event // unless we also know their initial presence data at this point. - if let presence = await self.attachmentMap[docKey]?.doc.getPresence(publisher) { - await self.attachmentMap[docKey]?.doc.publishPresenceEvent(.watched, publisher, presence) + if let presence = self.attachmentMap[docKey]?.doc.getPresence(publisher) { + self.attachmentMap[docKey]?.doc.publishPresenceEvent(.watched, publisher, presence) } case .documentUnwatched: // NOTE(chacha912): There is no presence, when PresenceChange(clear) is applied before unwatching. // In that case, the 'unwatched' event is triggered while handling the PresenceChange. - let presence = await self.attachmentMap[docKey]?.doc.getPresence(publisher) + let presence = self.attachmentMap[docKey]?.doc.getPresence(publisher) - await self.attachmentMap[docKey]?.doc.removeOnlineClient(publisher) + self.attachmentMap[docKey]?.doc.removeOnlineClient(publisher) if let presence { - await self.attachmentMap[docKey]?.doc.publishPresenceEvent(.unwatched, publisher, presence) + self.attachmentMap[docKey]?.doc.publishPresenceEvent(.unwatched, publisher, presence) } default: break @@ -695,7 +693,7 @@ public actor Client { pushPullRequest.clientID = clientID let doc = attachment.doc - let requestPack = await doc.createChangePack() + let requestPack = doc.createChangePack() let localSize = requestPack.getChangeSize() pushPullRequest.changePack = Converter.toChangePack(pack: requestPack) @@ -719,20 +717,20 @@ public actor Client { return doc } - try await doc.applyChangePack(responsePack) + try doc.applyChangePack(responsePack) - if await doc.status == .removed { + if doc.status == .removed { self.attachmentMap.removeValue(forKey: docKey) } - await doc.publishSyncEvent(.synced) + doc.publishSyncEvent(.synced) let remoteSize = responsePack.getChangeSize() Logger.info("[PP] c:\"\(self.key)\" sync d:\"\(docKey)\", push:\(localSize) pull:\(remoteSize) cp:\(responsePack.getCheckpoint().toTestString)") return doc } catch { - await doc.publishSyncEvent(.syncFailed) + doc.publishSyncEvent(.syncFailed) Logger.error("[PP] c:\"\(self.key)\" err : \(error)") diff --git a/Sources/Document/CRDT/CRDTTree.swift b/Sources/Document/CRDT/CRDTTree.swift index 32ddec48..03005e4f 100644 --- a/Sources/Document/CRDT/CRDTTree.swift +++ b/Sources/Document/CRDT/CRDTTree.swift @@ -621,8 +621,7 @@ class CRDTTree: CRDTElement { let allChildren = realParent.innerChildren let index = isLeftMost ? 0 : (allChildren.firstIndex(where: { $0 === leftNode }) ?? -1) + 1 - for index in index ..< allChildren.count { - let next = allChildren[index] + for next in allChildren.suffix(from: index) { if !next.id.createdAt.after(editedAt) { break } @@ -1141,8 +1140,7 @@ class CRDTTree: CRDTElement { // Generate ranges by accumulating consecutive nodes. var start: TreeToken? var end: TreeToken? - for index in 0 ..< candidates.count { - let cur = candidates[index] + for (index, cur) in candidates.enumerated() { let next = candidates[safe: index + 1] if start == nil { start = cur diff --git a/Sources/Document/Document.swift b/Sources/Document/Document.swift index b5391e97..2f4db13f 100644 --- a/Sources/Document/Document.swift +++ b/Sources/Document/Document.swift @@ -16,7 +16,6 @@ import Combine import Foundation -import Semaphore /** * `DocumentOptions` are the options to create a new document. @@ -71,65 +70,49 @@ public enum PresenceSubscriptionType: String { * of the application. And we can edit it even while offline. * */ -public actor Document { - public typealias SubscribeCallback = (DocEvent, isolated Document) -> Void +@MainActor +public class Document { + public typealias SubscribeCallback = @MainActor (DocEvent, Document) -> Void private let key: DocumentKey - private(set) var status: DocumentStatus - private let opts: DocumentOptions - private var changeID: ChangeID - var checkpoint: Checkpoint - private var localChanges: [Change] + private(set) var status: DocumentStatus = .detached + private let disableGC: Bool + private var changeID: ChangeID = .initial + var checkpoint: Checkpoint = .initial + private var localChanges = [Change]() - private var root: CRDTRoot + private var root = CRDTRoot() private var clone: (root: CRDTRoot, presences: [ActorID: StringValueTypeDictionary])? private var defaultSubscribeCallback: SubscribeCallback? - private var subscribeCallbacks: [String: SubscribeCallback] - private var presenceSubscribeCallback: [String: SubscribeCallback] + private var subscribeCallbacks = [String: SubscribeCallback]() + private var presenceSubscribeCallback = [String: SubscribeCallback]() private var connectionSubscribeCallback: SubscribeCallback? private var syncSubscribeCallback: SubscribeCallback? /** * `onlineClients` is a set of client IDs that are currently online. */ - public var onlineClients: Set + public var onlineClients = Set() /** * `presences` is a map of client IDs to their presence information. */ - private var presences: [ActorID: StringValueTypeDictionary] + private var presences = [ActorID: StringValueTypeDictionary]() - private let workSemaphore = AsyncSemaphore(value: 1) - - public init(key: String) { + public convenience nonisolated init(key: String) { self.init(key: key, opts: DocumentOptions(disableGC: false)) } - public init(key: String, opts: DocumentOptions) { + public nonisolated init(key: DocumentKey, opts: DocumentOptions) { self.key = key - self.status = .detached - self.opts = opts - self.root = CRDTRoot() - self.changeID = ChangeID.initial - self.checkpoint = Checkpoint.initial - self.localChanges = [] - self.subscribeCallbacks = [:] - self.presenceSubscribeCallback = [:] - self.onlineClients = Set() - self.presences = [:] + self.disableGC = opts.disableGC } /** * `update` executes the given updater to update this document. */ - public func update(_ updater: (_ root: JSONObject, _ presence: inout Presence) async throws -> Void, _ message: String? = nil) async throws { - await self.workSemaphore.wait() - - defer { - self.workSemaphore.signal() - } - + public func update(_ updater: (_ root: JSONObject, _ presence: inout Presence) throws -> Void, _ message: String? = nil) throws { guard self.status != .removed else { throw YorkieError.documentRemoved(message: "\(self) is removed.") } @@ -150,7 +133,7 @@ public actor Document { var presence = Presence(changeContext: context, presence: self.clone?.presences[actorID] ?? [:]) - try await updater(proxy, &presence) + try updater(proxy, &presence) self.clone?.presences[actorID] = presence.presence @@ -259,13 +242,7 @@ public actor Document { * * - Parameter pack: change pack */ - func applyChangePack(_ pack: ChangePack) async throws { - await self.workSemaphore.wait() - - defer { - self.workSemaphore.signal() - } - + func applyChangePack(_ pack: ChangePack) throws { if let snapshot = pack.getSnapshot() { try self.applySnapshot(pack.getCheckpoint().getServerSeq(), snapshot) } else if pack.hasChanges() { @@ -387,7 +364,7 @@ public actor Document { */ @discardableResult func garbageCollect(_ ticket: TimeTicket) -> Int { - if self.opts.disableGC { + if self.disableGC { return 0 } diff --git a/Sources/Util/IndexTree.swift b/Sources/Util/IndexTree.swift index c13b6907..83421db3 100644 --- a/Sources/Util/IndexTree.swift +++ b/Sources/Util/IndexTree.swift @@ -93,9 +93,7 @@ func addSizeOfLeftSiblings(parent: T, offset: Int) -> Int { let siblings = parent.children - for index in 0 ..< offset { - let leftSibling = siblings[index] - + for leftSibling in siblings.prefix(upTo: offset) { if leftSibling.isRemoved { continue } @@ -750,9 +748,7 @@ func findTextPos(node: T, pathElement: Int) throws -> TreePos< throw YorkieError.unexpected(message: "unacceptable path") } - for index in 0 ..< node.children.count { - let child = node.children[index] - + for child in node.children { if child.size < pathElement { pathElement -= child.size } else { @@ -859,14 +855,11 @@ class IndexTree { } var node = self.root - for index in 0 ..< path.count - 1 { - let pathElement = path[index] - - if node.children[safe: pathElement] == nil { + for pathElement in path.dropLast() { + guard let child = node.children[safe: pathElement] else { throw YorkieError.unexpected(message: "unacceptable path") } - - node = node.children[pathElement] + node = child } if node.hasTextChild { diff --git a/Tests/Unit/Document/DocumentTests.swift b/Tests/Unit/Document/DocumentTests.swift index 40586210..96cc0c8a 100644 --- a/Tests/Unit/Document/DocumentTests.swift +++ b/Tests/Unit/Document/DocumentTests.swift @@ -738,11 +738,9 @@ class DocumentTests: XCTestCase { } await target.subscribe("$.") { _, _ in - Task { - let array = try? await target.getValueByPath("$.") as? JSONObject + let array = try? target.getValueByPath("$.") as? JSONObject - XCTAssertTrue(array != nil) - } + XCTAssertTrue(array != nil) } try await target.update { root, _ in