Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Client, Document actor to class #187

Merged
merged 2 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 41 additions & 43 deletions Sources/Core/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ public struct ClientOptions {
* It has documents and sends changes of the documents in local
* to the server to synchronize with other replicas in remote.
*/
public actor Client {
private var attachmentMap: [DocumentKey: Attachment]
@MainActor
public class Client {
private var attachmentMap = [DocumentKey: Attachment]()
private let syncLoopDuration: Int
private let reconnectStreamDelay: Int
private let maximumAttachmentTimeout: Int
Expand All @@ -141,17 +142,14 @@ public actor Client {
public private(set) var id: ActorID?
public nonisolated let key: String
public var isActive: Bool { self.status == .activated }
public private(set) var status: ClientStatus
public private(set) var status: ClientStatus = .deactivated

/**
* @param rpcAddr - the address of the RPC server.
* @param opts - the options of the client.
*/
public init(_ urlString: String, _ options: ClientOptions = ClientOptions()) {
public nonisolated init(_ urlString: String, _ options: ClientOptions = ClientOptions()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Marking initializer as nonisolated.

Since Client is no longer an actor, marking the initializer as nonisolated allows it to be called from any thread context, which is necessary for synchronous operations but requires careful management to avoid race conditions.

Consider ensuring that all accesses to mutable state within Client are thread-safe, possibly using synchronization mechanisms like locks or serial queues.

self.key = options.key ?? UUID().uuidString

self.status = .deactivated
self.attachmentMap = [String: Attachment]()
self.syncLoopDuration = options.syncLoopDuration
self.reconnectStreamDelay = options.reconnectStreamDelay
self.maximumAttachmentTimeout = options.maximumAttachmentTimeout
Expand All @@ -169,7 +167,7 @@ public actor Client {
* @param url - the url of the RPC server.
* @param opts - the options of the client.
*/
init?(_ url: URL, _ options: ClientOptions = ClientOptions()) {
convenience init?(_ url: URL, _ options: ClientOptions = ClientOptions()) {
self.init(url.absoluteString, options)
}

Expand Down Expand Up @@ -240,18 +238,18 @@ public actor Client {
throw YorkieError.unexpected(message: "Invalid client ID! [\(self.id ?? "nil")]")
}

guard await doc.status == .detached else {
guard doc.status == .detached else {
throw YorkieError.documentNotDetached(message: "\(doc) is not detached.")
}

await doc.setActor(clientID)
try await doc.update { _, presence in
doc.setActor(clientID)
try doc.update { _, presence in
presence.set(initialPresence)
}

var attachRequest = AttachDocumentRequest()
attachRequest.clientID = clientID
attachRequest.changePack = Converter.toChangePack(pack: await doc.createChangePack())
attachRequest.changePack = Converter.toChangePack(pack: doc.createChangePack())

do {
let docKey = doc.getKey()
Expand All @@ -266,18 +264,18 @@ public actor Client {
}

let pack = try Converter.fromChangePack(message.changePack)
try await doc.applyChangePack(pack)
try doc.applyChangePack(pack)

if await doc.status == .removed {
if doc.status == .removed {
throw YorkieError.documentRemoved(message: "\(doc) is removed.")
}

await doc.applyStatus(.attached)
doc.applyStatus(.attached)

self.attachmentMap[doc.getKey()] = Attachment(doc: doc, docID: message.documentID, syncMode: syncMode, remoteChangeEventReceived: false)

if syncMode != .manual {
try await self.runWatchLoop(docKey)
try self.runWatchLoop(docKey)
try await self.waitForInitialization(semaphore, docKey)
}

Expand Down Expand Up @@ -314,14 +312,14 @@ public actor Client {
throw YorkieError.documentNotAttached(message: "\(doc.getKey()) is not attached when \(#function).")
}

try await doc.update { _, presence in
try doc.update { _, presence in
presence.clear()
}

var detachDocumentRequest = DetachDocumentRequest()
detachDocumentRequest.clientID = clientID
detachDocumentRequest.documentID = attachment.docID
detachDocumentRequest.changePack = Converter.toChangePack(pack: await doc.createChangePack())
detachDocumentRequest.changePack = Converter.toChangePack(pack: doc.createChangePack())

do {
let detachDocumentResponse = await self.rpcClient.detachDocument(request: detachDocumentRequest, headers: self.authHeader.makeHeader(doc.getKey()))
Expand All @@ -332,10 +330,10 @@ public actor Client {

let pack = try Converter.fromChangePack(message.changePack)

try await doc.applyChangePack(pack)
try doc.applyChangePack(pack)

if await doc.status != .removed {
await doc.applyStatus(.detached)
if doc.status != .removed {
doc.applyStatus(.detached)
}

try self.stopWatchLoop(doc.getKey())
Expand Down Expand Up @@ -371,7 +369,7 @@ public actor Client {
var removeDocumentRequest = RemoveDocumentRequest()
removeDocumentRequest.clientID = clientID
removeDocumentRequest.documentID = attachment.docID
removeDocumentRequest.changePack = Converter.toChangePack(pack: await doc.createChangePack(true))
removeDocumentRequest.changePack = Converter.toChangePack(pack: doc.createChangePack(true))

do {
let removeDocumentResponse = await self.rpcClient.removeDocument(request: removeDocumentRequest, headers: self.authHeader.makeHeader(doc.getKey()))
Expand All @@ -381,7 +379,7 @@ public actor Client {
}

let pack = try Converter.fromChangePack(message.changePack)
try await doc.applyChangePack(pack)
try doc.applyChangePack(pack)

try self.stopWatchLoop(doc.getKey())

Expand All @@ -400,7 +398,7 @@ public actor Client {
* `changeSyncMode` changes the synchronization mode of the given document.
*/
@discardableResult
public func changeSyncMode(_ doc: Document, _ syncMode: SyncMode) async throws -> Document {
public func changeSyncMode(_ doc: Document, _ syncMode: SyncMode) throws -> Document {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modification of synchronization mode handling.

The removal of asynchronous handling in changeSyncMode and adjustments to the synchronization logic are significant. It's essential to ensure that these changes do not introduce any unintended behaviors or race conditions, especially since synchronization settings are critical for the correct operation of the client.

Verify that the new synchronization logic correctly handles all edge cases and that there are no race conditions introduced by these changes.

Also applies to: 431-431

let docKey = doc.getKey()

guard let attachment = self.attachmentMap[docKey] else {
Expand Down Expand Up @@ -430,7 +428,7 @@ public actor Client {

// manual to realtime
if prevSyncMode == .manual {
try await self.runWatchLoop(docKey)
try self.runWatchLoop(docKey)
}

return doc
Expand Down Expand Up @@ -535,7 +533,7 @@ public actor Client {
await self.doSyncLoop()
}

private func doWatchLoop(_ docKey: DocumentKey) async throws {
private func doWatchLoop(_ docKey: DocumentKey) throws {
self.attachmentMap[docKey]?.resetWatchLoopTimer()

guard self.isActive, let id = self.id else {
Expand Down Expand Up @@ -582,13 +580,13 @@ public actor Client {

self.attachmentMap[docKey]?.connectStream(stream)

await self.attachmentMap[docKey]?.doc.publishConnectionEvent(.connected)
self.attachmentMap[docKey]?.doc.publishConnectionEvent(.connected)
}

private func runWatchLoop(_ docKey: DocumentKey) async throws {
private func runWatchLoop(_ docKey: DocumentKey) throws {
Logger.debug("[WL] c:\"\(self.key)\" run watch loop")

try await self.doWatchLoop(docKey)
try self.doWatchLoop(docKey)
}

private func stopWatchLoop(_ docKey: DocumentKey) throws {
Expand Down Expand Up @@ -620,38 +618,38 @@ public actor Client {
switch body {
case .initialization(let initialization):
var onlineClients = Set<ActorID>()
let actorID = await self.attachmentMap[docKey]?.doc.actorID
let actorID = self.attachmentMap[docKey]?.doc.actorID

for pbClientID in initialization.clientIds.filter({ $0 != actorID }) {
onlineClients.insert(pbClientID)
}

self.semaphoresForInitialzation[docKey]?.signal()

await self.attachmentMap[docKey]?.doc.setOnlineClients(onlineClients)
await self.attachmentMap[docKey]?.doc.publishPresenceEvent(.initialized)
self.attachmentMap[docKey]?.doc.setOnlineClients(onlineClients)
self.attachmentMap[docKey]?.doc.publishPresenceEvent(.initialized)
case .event(let pbWatchEvent):
let publisher = pbWatchEvent.publisher

switch pbWatchEvent.type {
case .documentChanged:
self.attachmentMap[docKey]?.remoteChangeEventReceived = true
case .documentWatched:
await self.attachmentMap[docKey]?.doc.addOnlineClient(publisher)
self.attachmentMap[docKey]?.doc.addOnlineClient(publisher)
// NOTE(chacha912): We added to onlineClients, but we won't trigger watched event
// unless we also know their initial presence data at this point.
if let presence = await self.attachmentMap[docKey]?.doc.getPresence(publisher) {
await self.attachmentMap[docKey]?.doc.publishPresenceEvent(.watched, publisher, presence)
if let presence = self.attachmentMap[docKey]?.doc.getPresence(publisher) {
self.attachmentMap[docKey]?.doc.publishPresenceEvent(.watched, publisher, presence)
}
case .documentUnwatched:
// NOTE(chacha912): There is no presence, when PresenceChange(clear) is applied before unwatching.
// In that case, the 'unwatched' event is triggered while handling the PresenceChange.
let presence = await self.attachmentMap[docKey]?.doc.getPresence(publisher)
let presence = self.attachmentMap[docKey]?.doc.getPresence(publisher)

await self.attachmentMap[docKey]?.doc.removeOnlineClient(publisher)
self.attachmentMap[docKey]?.doc.removeOnlineClient(publisher)

if let presence {
await self.attachmentMap[docKey]?.doc.publishPresenceEvent(.unwatched, publisher, presence)
self.attachmentMap[docKey]?.doc.publishPresenceEvent(.unwatched, publisher, presence)
}
default:
break
Expand Down Expand Up @@ -695,7 +693,7 @@ public actor Client {
pushPullRequest.clientID = clientID

let doc = attachment.doc
let requestPack = await doc.createChangePack()
let requestPack = doc.createChangePack()
let localSize = requestPack.getChangeSize()

pushPullRequest.changePack = Converter.toChangePack(pack: requestPack)
Expand All @@ -719,20 +717,20 @@ public actor Client {
return doc
}

try await doc.applyChangePack(responsePack)
try doc.applyChangePack(responsePack)

if await doc.status == .removed {
if doc.status == .removed {
self.attachmentMap.removeValue(forKey: docKey)
}

await doc.publishSyncEvent(.synced)
doc.publishSyncEvent(.synced)

let remoteSize = responsePack.getChangeSize()
Logger.info("[PP] c:\"\(self.key)\" sync d:\"\(docKey)\", push:\(localSize) pull:\(remoteSize) cp:\(responsePack.getCheckpoint().toTestString)")

return doc
} catch {
await doc.publishSyncEvent(.syncFailed)
doc.publishSyncEvent(.syncFailed)

Logger.error("[PP] c:\"\(self.key)\" err : \(error)")

Expand Down
6 changes: 2 additions & 4 deletions Sources/Document/CRDT/CRDTTree.swift
Original file line number Diff line number Diff line change
Expand Up @@ -621,8 +621,7 @@ class CRDTTree: CRDTElement {
let allChildren = realParent.innerChildren
let index = isLeftMost ? 0 : (allChildren.firstIndex(where: { $0 === leftNode }) ?? -1) + 1

for index in index ..< allChildren.count {
let next = allChildren[index]
for next in allChildren.suffix(from: index) {
if !next.id.createdAt.after(editedAt) {
break
}
Expand Down Expand Up @@ -1141,8 +1140,7 @@ class CRDTTree: CRDTElement {
// Generate ranges by accumulating consecutive nodes.
var start: TreeToken<CRDTTreeNode>?
var end: TreeToken<CRDTTreeNode>?
for index in 0 ..< candidates.count {
let cur = candidates[index]
for (index, cur) in candidates.enumerated() {
let next = candidates[safe: index + 1]
if start == nil {
start = cur
Expand Down
Loading