Skip to content

Commit

Permalink
Remove Client eventStream (#164)
Browse files Browse the repository at this point in the history
  • Loading branch information
humdrum authored May 17, 2024
1 parent bacbf9c commit cde554c
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 210 deletions.
52 changes: 10 additions & 42 deletions Sources/Core/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,6 @@ public enum ClientStatus: String {
case activated
}

/**
* `StreamConnectionStatus` is stream connection status types
*/
enum StreamConnectionStatus {
/**
* stream connected
*/
case connected
/**
* stream disconnected
*/
case disconnected
}

/**
* `SyncMode` defines synchronization modes for the PushPullChanges API.
*/
Expand Down Expand Up @@ -193,7 +179,6 @@ public actor Client {
public nonisolated let key: String
public var isActive: Bool { self.status == .activated }
public private(set) var status: ClientStatus
public nonisolated let eventStream: PassthroughSubject<BaseClientEvent, Never>

/**
* @param rpcAddr - the address of the RPC server.
Expand Down Expand Up @@ -226,7 +211,6 @@ public actor Client {
let authInterceptors = AuthClientInterceptors(apiKey: options.apiKey, token: options.token)

self.rpcClient = YorkieServiceAsyncClient(channel: channel, interceptors: authInterceptors)
self.eventStream = PassthroughSubject()
}

/**
Expand Down Expand Up @@ -268,9 +252,6 @@ public actor Client {
self.status = .activated
await self.runSyncLoop()

let changeEvent = StatusChangedEvent(value: self.status)
self.eventStream.send(changeEvent)

Logger.debug("Client(\(self.key)) activated")
} catch {
Logger.error("Failed to request activate client(\(self.key)).", error: error)
Expand Down Expand Up @@ -304,9 +285,6 @@ public actor Client {

self.status = .deactivated

let changeEvent = StatusChangedEvent(value: self.status)
self.eventStream.send(changeEvent)

Logger.info("Client(\(self.key) deactivated.")
}

Expand Down Expand Up @@ -353,7 +331,7 @@ public actor Client {
throw YorkieError.documentRemoved(message: "\(doc) is removed.")
}

await doc.setStatus(.attached)
await doc.applyStatus(.attached)

self.attachmentMap[doc.getKey()] = Attachment(doc: doc, docID: result.documentID, syncMode: syncMode, remoteChangeEventReceived: false)

Expand Down Expand Up @@ -412,7 +390,7 @@ public actor Client {
try await doc.applyChangePack(pack)

if await doc.status != .removed {
await doc.setStatus(.detached)
await doc.applyStatus(.detached)
}

try self.stopWatchLoop(doc.getKey())
Expand Down Expand Up @@ -533,9 +511,6 @@ public actor Client {
do {
return try await self.performSyncInternal(false, attachment)
} catch {
let event = DocumentSyncedEvent(value: .syncFailed)
self.eventStream.send(event)

throw error
}
}
Expand Down Expand Up @@ -603,11 +578,6 @@ public actor Client {

self.setSyncTimer(false)
} catch {
Logger.error("[SL] c:\"\(self.key)\" sync failed: \(error)")

let event = DocumentSyncedEvent(value: .syncFailed)
self.eventStream.send(event)

self.setSyncTimer(true)
}
}
Expand Down Expand Up @@ -639,9 +609,6 @@ public actor Client {
self.changeDocKeyOfAuthInterceptors(docKey)
self.attachmentMap[docKey]?.remoteWatchStream = self.rpcClient.makeWatchDocumentCall(request)

let event = StreamConnectionStatusChangedEvent(value: .connected)
self.eventStream.send(event)

Task {
if let stream = self.attachmentMap[docKey]?.remoteWatchStream?.responseStream {
do {
Expand All @@ -658,6 +625,8 @@ public actor Client {
}
}
}

await self.attachmentMap[docKey]?.doc.publishConnectionEvent(.connected)
}
}

Expand Down Expand Up @@ -710,9 +679,6 @@ public actor Client {
switch pbWatchEvent.type {
case .documentChanged:
self.attachmentMap[docKey]?.remoteChangeEventReceived = true

let event = DocumentChangedEvent(value: [docKey])
self.eventStream.send(event)
case .documentWatched:
await self.attachmentMap[docKey]?.doc.addOnlineClient(publisher)
// NOTE(chacha912): We added to onlineClients, but we won't trigger watched event
Expand Down Expand Up @@ -753,8 +719,9 @@ public actor Client {

Logger.debug("[WD] c:\"\(self.key)\" unwatches")

let event = StreamConnectionStatusChangedEvent(value: .disconnected)
self.eventStream.send(event)
Task {
await self.attachmentMap[docKey]?.doc.publishConnectionEvent(.disconnected)
}
}

private func onStreamDisconnect(_ docKey: DocumentKey) throws {
Expand Down Expand Up @@ -808,14 +775,15 @@ public actor Client {
self.attachmentMap.removeValue(forKey: docKey)
}

let event = DocumentSyncedEvent(value: .synced)
self.eventStream.send(event)
await doc.publishSyncEvent(.synced)

let remoteSize = responsePack.getChangeSize()
Logger.info("[PP] c:\"\(self.key)\" sync d:\"\(docKey)\", push:\(localSize) pull:\(remoteSize) cp:\(responsePack.getCheckpoint().toTestString)")

return doc
} catch {
await doc.publishSyncEvent(.syncFailed)

Logger.error("[PP] c:\"\(self.key)\" err : \(error)")

throw error
Expand Down
118 changes: 0 additions & 118 deletions Sources/Core/ClientEvent.swift

This file was deleted.

85 changes: 85 additions & 0 deletions Sources/Document/DocEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,21 @@ import Foundation
* `DocEventType` is document event types
*/
public enum DocEventType: String {
/**
* status changed event type
*/
case statusChanged = "status-changed"

/**
* `connectionChanged` means that the watch stream connection status has changed.
*/
case connectionChanged = "connection-changed"

/**
* `syncStatusChanged` means that the document sync status has changed.
*/
case syncStatusChanged = "sync-status-changed"

/**
* snapshot event type
*/
Expand Down Expand Up @@ -64,6 +79,76 @@ public protocol DocEvent {
var type: DocEventType { get }
}

public struct StatusInfo {
public let status: DocumentStatus
public let actorID: ActorID?
}

/**
* `StatusChangedEvent` is an event that occurs when
* the client's stream connection state changes.
*/
public struct StatusChangedEvent: DocEvent {
public let type: DocEventType = .statusChanged
/**
* StatusChangedEvent type
*/
var source: OpSource
var value: StatusInfo
}

/**
* `StreamConnectionStatus` is stream connection status types
*/
public enum StreamConnectionStatus {
/**
* stream connected
*/
case connected
/**
* stream disconnected
*/
case disconnected
}

/**
* `ConnectionChangedEvent` is an event that occurs when
* the client's stream connection state changes.
*/
public struct ConnectionChangedEvent: DocEvent {
public let type: DocEventType = .connectionChanged
/**
* ConnectionChanged type
*/
var value: StreamConnectionStatus
}

/**
* `DocumentSyncStatus` is document sync result types
*/
public enum DocumentSyncStatus: String {
/**
* type when Document synced.
*/
case synced
/**
* type when Document sync failed.
*/
case syncFailed = "sync-failed"
}

/**
* `SyncStatusChangedEvent` is an event that occurs when document
* attached to the client are synced.
*/
public struct SyncStatusChangedEvent: DocEvent {
public let type: DocEventType = .syncStatusChanged
/**
* SyncStatusChangedEvent type
*/
var value: DocumentSyncStatus
}

/**
* `SnapshotEvent` is an event that occurs when a snapshot is received from
* the server.
Expand Down
Loading

0 comments on commit cde554c

Please sign in to comment.