Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ECO-5116] Switch to V2 Chat API #122

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AblyChat.xcworkspace/xcshareddata/swiftpm/Package.resolved
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"location" : "https://github.com/ably/ably-cocoa",
"state" : {
"branch" : "main",
"revision" : "f7bff4b1c941b4c7b952b9224a33674e2302e19f"
"revision" : "5a9b7ba3395ce24a002d552edcb395263fba961d"
}
},
{
Expand Down
4 changes: 2 additions & 2 deletions Example/AblyChatExample/ContentView.swift
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,15 @@ struct ContentView: View {

for message in previousMessages.items {
withAnimation {
messages.append(BasicListItem(id: message.timeserial, title: message.clientID, text: message.text))
messages.append(BasicListItem(id: message.serial, title: message.clientID, text: message.text))
}
}

// Continue listening for messages on a background task so this function can return
Task {
for await message in messagesSubscription {
withAnimation {
messages.insert(BasicListItem(id: message.timeserial, title: message.clientID, text: message.text), at: 0)
messages.insert(BasicListItem(id: message.serial, title: message.clientID, text: message.text), at: 0)
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion Example/AblyChatExample/Mocks/Misc.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ final class MockMessagesPaginatedResult: PaginatedResult {
var items: [T] {
Array(repeating: 0, count: numberOfMockMessages).map { _ in
Message(
timeserial: "\(Date().timeIntervalSince1970)",
serial: "\(Date().timeIntervalSince1970)",
maratal marked this conversation as resolved.
Show resolved Hide resolved
latestAction: .create,
clientID: self.clientID,
roomID: self.roomID,
text: MockStrings.randomPhrase(),
Expand Down
6 changes: 4 additions & 2 deletions Example/AblyChatExample/Mocks/MockClients.swift
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ actor MockMessages: Messages {
private func createSubscription() -> MockSubscription<Message> {
let subscription = MockSubscription<Message>(randomElement: {
Message(
timeserial: "\(Date().timeIntervalSince1970)",
serial: "\(Date().timeIntervalSince1970)",
latestAction: .create,
clientID: MockStrings.names.randomElement()!,
roomID: self.roomID,
text: MockStrings.randomPhrase(),
Expand All @@ -128,7 +129,8 @@ actor MockMessages: Messages {

func send(params: SendMessageParams) async throws -> Message {
let message = Message(
timeserial: "\(Date().timeIntervalSince1970)",
serial: "\(Date().timeIntervalSince1970)",
latestAction: .create,
clientID: clientID,
roomID: roomID,
text: params.text,
Expand Down
2 changes: 1 addition & 1 deletion Package.resolved
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"location" : "https://github.com/ably/ably-cocoa",
"state" : {
"branch" : "main",
"revision" : "f7bff4b1c941b4c7b952b9224a33674e2302e19f"
"revision" : "5a9b7ba3395ce24a002d552edcb395263fba961d"
}
},
{
Expand Down
10 changes: 6 additions & 4 deletions Sources/AblyChat/ChatAPI.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,20 @@ import Ably
internal final class ChatAPI: Sendable {
private let realtime: RealtimeClient
private let apiVersion = "/chat/v1"
private let apiVersionV2 = "/chat/v2" // TODO: remove v1 after full transition to v2
umair-ably marked this conversation as resolved.
Show resolved Hide resolved
maratal marked this conversation as resolved.
Show resolved Hide resolved

public init(realtime: RealtimeClient) {
self.realtime = realtime
}

// (CHA-M6) Messages should be queryable from a paginated REST API.
internal func getMessages(roomId: String, params: QueryOptions) async throws -> any PaginatedResult<Message> {
let endpoint = "\(apiVersion)/rooms/\(roomId)/messages"
let endpoint = "\(apiVersionV2)/rooms/\(roomId)/messages"
return try await makePaginatedRequest(endpoint, params: params.asQueryItems())
}

internal struct SendMessageResponse: Codable {
internal let timeserial: String
internal let serial: String
maratal marked this conversation as resolved.
Show resolved Hide resolved
internal let createdAt: Int64
}

Expand All @@ -26,7 +27,7 @@ internal final class ChatAPI: Sendable {
throw ARTErrorInfo.create(withCode: 40000, message: "Ensure your Realtime instance is initialized with a clientId.")
}

let endpoint = "\(apiVersion)/rooms/\(roomId)/messages"
let endpoint = "\(apiVersionV2)/rooms/\(roomId)/messages"
var body: [String: Any] = ["text": params.text]

// (CHA-M3b) A message may be sent without metadata or headers. When these are not specified by the user, they must be omitted from the REST payload.
Expand All @@ -44,7 +45,8 @@ internal final class ChatAPI: Sendable {
let createdAtInSeconds = TimeInterval(Double(response.createdAt) / 1000)

let message = Message(
timeserial: response.timeserial,
serial: response.serial,
latestAction: .create,
clientID: clientId,
roomID: roomId,
text: params.text,
Expand Down
54 changes: 23 additions & 31 deletions Sources/AblyChat/DefaultMessages.swift
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import Ably

// Typealias for the timeserial used to sync message subscriptions with. This is a string representation of a timestamp.
private typealias TimeserialString = String

// Wraps the MessageSubscription with the timeserial of when the subscription was attached or resumed.
// Wraps the MessageSubscription with the message serial of when the subscription was attached or resumed.
private struct MessageSubscriptionWrapper {
let subscription: MessageSubscription
var timeserial: TimeserialString
var serial: String
}

// TODO: Don't have a strong understanding of why @MainActor is needed here. Revisit as part of https://github.com/ably-labs/ably-chat-swift/issues/83
Expand All @@ -19,7 +16,7 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
private let logger: InternalLogger

// TODO: https://github.com/ably-labs/ably-chat-swift/issues/36 - Handle unsubscribing in line with CHA-M4b
// UUID acts as a unique identifier for each listener/subscription. MessageSubscriptionWrapper houses the subscription and the timeserial of when it was attached or resumed.
// UUID acts as a unique identifier for each listener/subscription. MessageSubscriptionWrapper houses the subscription and the serial of when it was attached or resumed.
private var subscriptionPoints: [UUID: MessageSubscriptionWrapper] = [:]

internal nonisolated init(featureChannel: FeatureChannel, chatAPI: ChatAPI, roomID: String, clientID: String, logger: InternalLogger) async {
Expand All @@ -42,7 +39,7 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
internal func subscribe(bufferingPolicy: BufferingPolicy) async throws -> MessageSubscription {
logger.log(message: "Subscribing to messages", level: .debug)
let uuid = UUID()
let timeserial = try await resolveSubscriptionStart()
let serial = try await resolveSubscriptionStart()
let messageSubscription = MessageSubscription(
bufferingPolicy: bufferingPolicy
) { [weak self] queryOptions in
Expand All @@ -51,12 +48,12 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
}

// (CHA-M4a) A subscription can be registered to receive incoming messages. Adding a subscription has no side effects on the status of the room or the underlying realtime channel.
subscriptionPoints[uuid] = .init(subscription: messageSubscription, timeserial: timeserial)
subscriptionPoints[uuid] = .init(subscription: messageSubscription, serial: serial)

// (CHA-M4c) When a realtime message with name set to message.created is received, it is translated into a message event, which contains a type field with the event type as well as a message field containing the Message Struct. This event is then broadcast to all subscribers.
// (CHA-M4d) If a realtime message with an unknown name is received, the SDK shall silently discard the message, though it may log at DEBUG or TRACE level.
// (CHA-M5k) Incoming realtime events that are malformed (unknown field should be ignored) shall not be emitted to subscribers.
channel.subscribe(MessageEvent.created.rawValue) { message in
channel.subscribe(RealtimeMessageName.chatMessage.rawValue) { message in
Task {
// TODO: Revisit errors thrown as part of https://github.com/ably-labs/ably-chat-swift/issues/32
guard let data = message.data as? [String: Any],
Expand All @@ -69,8 +66,8 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
throw ARTErrorInfo.create(withCode: 50000, status: 500, message: "Received incoming message without extras")
}

guard let timeserial = extras["timeserial"] as? String else {
throw ARTErrorInfo.create(withCode: 50000, status: 500, message: "Received incoming message without timeserial")
guard let serial = message.serial else {
throw ARTErrorInfo.create(withCode: 50000, status: 500, message: "Received incoming message without serial")
}

guard let clientID = message.clientId else {
Expand All @@ -80,8 +77,13 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
let metadata = data["metadata"] as? Metadata
let headers = extras["headers"] as? Headers

guard let action = MessageAction.fromRealtimeAction(message.action) else {
return
}

let message = Message(
timeserial: timeserial,
serial: serial,
latestAction: action,
clientID: clientID,
roomID: self.roomID,
text: text,
Expand Down Expand Up @@ -112,24 +114,14 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
}

private func getBeforeSubscriptionStart(_ uuid: UUID, params: QueryOptions) async throws -> any PaginatedResult<Message> {
guard let subscriptionPoint = subscriptionPoints[uuid]?.timeserial else {
guard let subscriptionPoint = subscriptionPoints[uuid]?.serial else {
throw ARTErrorInfo.create(
withCode: 40000,
status: 400,
message: "cannot query history; listener has not been subscribed yet"
)
}

// (CHA-M5j) If the end parameter is specified and is more recent than the subscription point timeserial, the method must throw an ErrorInfo with code 40000.
let parseSerial = try? DefaultTimeserial.calculateTimeserial(from: subscriptionPoint)
if let end = params.end, dateToMilliseconds(end) > parseSerial?.timestamp ?? 0 {
throw ARTErrorInfo.create(
withCode: 40000,
status: 400,
message: "cannot query history; end time is after the subscription point of the listener"
)
}

// (CHA-M5f) This method must accept any of the standard history query options, except for direction, which must always be backwards.
var queryOptions = params
queryOptions.orderBy = .newestFirst // newestFirst is equivalent to backwards
Expand Down Expand Up @@ -173,19 +165,19 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
}

do {
let timeserialOnChannelAttach = try await timeserialOnChannelAttach()
let serialOnChannelAttach = try await serialOnChannelAttach()

for uuid in subscriptionPoints.keys {
logger.log(message: "Resetting subscription point for listener: \(uuid)", level: .debug)
subscriptionPoints[uuid]?.timeserial = timeserialOnChannelAttach
subscriptionPoints[uuid]?.serial = serialOnChannelAttach
}
} catch {
logger.log(message: "Error handling attach: \(error)", level: .error)
throw ARTErrorInfo.create(from: error)
}
}

private func resolveSubscriptionStart() async throws -> TimeserialString {
private func resolveSubscriptionStart() async throws -> String {
logger.log(message: "Resolving subscription start", level: .debug)
// (CHA-M5a) If a subscription is added when the underlying realtime channel is ATTACHED, then the subscription point is the current channelSerial of the realtime channel.
if channel.state == .attached {
Expand All @@ -200,28 +192,28 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
}

// (CHA-M5b) If a subscription is added when the underlying realtime channel is in any other state, then its subscription point becomes the attachSerial at the the point of channel attachment.
return try await timeserialOnChannelAttach()
return try await serialOnChannelAttach()
}

// Always returns the attachSerial and not the channelSerial to also serve (CHA-M5c) - If a channel leaves the ATTACHED state and then re-enters ATTACHED with resumed=false, then it must be assumed that messages have been missed. The subscription point of any subscribers must be reset to the attachSerial.
private func timeserialOnChannelAttach() async throws -> TimeserialString {
logger.log(message: "Resolving timeserial on channel attach", level: .debug)
private func serialOnChannelAttach() async throws -> String {
logger.log(message: "Resolving serial on channel attach", level: .debug)
// If the state is already 'attached', return the attachSerial immediately
if channel.state == .attached {
if let attachSerial = channel.properties.attachSerial {
logger.log(message: "Channel is attached, returning attachSerial: \(attachSerial)", level: .debug)
return attachSerial
} else {
let error = ARTErrorInfo.create(withCode: 40000, status: 400, message: "Channel is attached, but attachSerial is not defined")
logger.log(message: "Error resolving timeserial on channel attach: \(error)", level: .error)
logger.log(message: "Error resolving serial on channel attach: \(error)", level: .error)
throw error
}
}

// (CHA-M5b) If a subscription is added when the underlying realtime channel is in any other state, then its subscription point becomes the attachSerial at the the point of channel attachment.
return try await withCheckedThrowingContinuation { continuation in
// avoids multiple invocations of the continuation
var nillableContinuation: CheckedContinuation<TimeserialString, any Error>? = continuation
var nillableContinuation: CheckedContinuation<String, any Error>? = continuation

channel.on { [weak self] stateChange in
guard let self else {
Expand Down
27 changes: 25 additions & 2 deletions Sources/AblyChat/Events.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,28 @@
internal enum MessageEvent: String {
case created = "message.created"
import Ably

public enum MessageAction: String, Codable, Sendable {
case create = "message.create"

internal static func fromRealtimeAction(_ action: ARTMessageAction) -> Self? {
switch action {
case .create:
.create
// ignore any other actions except `message.create` for now
case .unset,
.update,
.delete,
.annotationCreate,
.annotationDelete,
.metaOccupancy:
nil
@unknown default:
nil
}
}
}

internal enum RealtimeMessageName: String, Sendable {
case chatMessage = "chat.message"
}

internal enum RoomReactionEvents: String {
Expand Down
33 changes: 9 additions & 24 deletions Sources/AblyChat/Message.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,21 @@ public typealias MessageMetadata = Metadata

// (CHA-M2) A Message corresponds to a single message in a chat room. This is analogous to a single user-specified message on an Ably channel (NOTE: not a ProtocolMessage).
public struct Message: Sendable, Codable, Identifiable, Equatable {
// id to meet Identifiable conformance. 2 messages in the same channel cannot have the same timeserial.
public var id: String { timeserial }
// id to meet Identifiable conformance. 2 messages in the same channel cannot have the same serial.
public var id: String { serial }

public var timeserial: String
public var serial: String
public var latestAction: MessageAction
public var clientID: String
public var roomID: String
public var text: String
public var createdAt: Date?
public var metadata: MessageMetadata
public var headers: MessageHeaders

public init(timeserial: String, clientID: String, roomID: String, text: String, createdAt: Date?, metadata: MessageMetadata, headers: MessageHeaders) {
self.timeserial = timeserial
public init(serial: String, latestAction: MessageAction, clientID: String, roomID: String, text: String, createdAt: Date?, metadata: MessageMetadata, headers: MessageHeaders) {
self.serial = serial
self.latestAction = latestAction
self.clientID = clientID
self.roomID = roomID
self.text = text
Expand All @@ -27,30 +29,13 @@ public struct Message: Sendable, Codable, Identifiable, Equatable {
}

internal enum CodingKeys: String, CodingKey {
case timeserial
case serial
case latestAction
case clientID = "clientId"
case roomID = "roomId"
case text
case createdAt
case metadata
case headers
}

// (CHA-M2a) A Message is considered before another Message in the global order if the timeserial of the corresponding realtime channel message comes first.
public func isBefore(_ otherMessage: Message) throws -> Bool {
let otherMessageTimeserial = try DefaultTimeserial.calculateTimeserial(from: otherMessage.timeserial)
return try DefaultTimeserial.calculateTimeserial(from: timeserial).before(otherMessageTimeserial)
}

// CHA-M2b) A Message is considered after another Message in the global order if the timeserial of the corresponding realtime channel message comes second.
public func isAfter(_ otherMessage: Message) throws -> Bool {
let otherMessageTimeserial = try DefaultTimeserial.calculateTimeserial(from: otherMessage.timeserial)
return try DefaultTimeserial.calculateTimeserial(from: timeserial).after(otherMessageTimeserial)
}

// (CHA-M2c) A Message is considered to be equal to another Message if they have the same timeserial.
public func isEqual(_ otherMessage: Message) throws -> Bool {
let otherMessageTimeserial = try DefaultTimeserial.calculateTimeserial(from: otherMessage.timeserial)
return try DefaultTimeserial.calculateTimeserial(from: timeserial).equal(otherMessageTimeserial)
}
}
Loading