From 99c7cca6bc12367b651642cbc8850959fb005125 Mon Sep 17 00:00:00 2001 From: Simon Woolf Date: Fri, 17 Jan 2025 21:09:10 +0000 Subject: [PATCH] Support annotations (rest and realtime) --- ably.d.ts | 203 +++++++++++++++++- modular.d.ts | 20 ++ src/common/lib/client/baseclient.ts | 3 + src/common/lib/client/defaultrealtime.ts | 11 + src/common/lib/client/defaultrest.ts | 11 + src/common/lib/client/modularplugins.ts | 11 + src/common/lib/client/realtimeannotations.ts | 93 ++++++++ src/common/lib/client/realtimechannel.ts | 26 +++ src/common/lib/client/restannotations.ts | 75 +++++++ src/common/lib/client/restchannel.ts | 11 + src/common/lib/transport/comettransport.ts | 6 +- src/common/lib/transport/connectionmanager.ts | 2 +- src/common/lib/transport/protocol.ts | 8 +- src/common/lib/transport/transport.ts | 6 +- .../lib/transport/websockettransport.ts | 1 + src/common/lib/types/annotation.ts | 123 +++++++++++ src/common/lib/types/basemessage.ts | 3 + src/common/lib/types/defaultannotation.ts | 22 ++ src/common/lib/types/message.ts | 2 + src/common/lib/types/protocolmessage.ts | 40 +++- src/common/lib/types/protocolmessagecommon.ts | 22 +- src/platform/web/modular.ts | 1 + src/platform/web/modular/annotations.ts | 13 ++ test/browser/modular.test.js | 142 ++++++++++++ test/common/ably-common | 2 +- test/realtime/annotations.test.js | 37 ++++ test/realtime/channel.test.js | 2 +- 27 files changed, 874 insertions(+), 22 deletions(-) create mode 100644 src/common/lib/client/realtimeannotations.ts create mode 100644 src/common/lib/client/restannotations.ts create mode 100644 src/common/lib/types/annotation.ts create mode 100644 src/common/lib/types/defaultannotation.ts create mode 100644 src/platform/web/modular/annotations.ts create mode 100644 test/realtime/annotations.test.js diff --git a/ably.d.ts b/ably.d.ts index 56ddb56f4..dae75199b 100644 --- a/ably.d.ts +++ b/ably.d.ts @@ -990,6 +990,18 @@ export interface RestHistoryParams { limit?: number; } +/** + * Describes the parameters accepted by {@link RestAnnotations.get}. + */ +export interface GetAnnotationsParams { + /** + * An upper limit on the number of messages returned. The default is 100, and the maximum is 1000. + * + * @defaultValue 100 + */ + limit?: number; +} + /** * The `RestPresenceParams` interface describes the parameters accepted by {@link Presence.get}. */ @@ -2026,6 +2038,68 @@ export declare interface RealtimePresence { leaveClient(clientId: string, data?: any): Promise; } +/** + * Functionality for annotating messages with small pieces of data, such as emoji + * reactions, that the server will roll up into the message as a summary. + */ +export declare interface RealtimeAnnotations { + /** + * Registers a listener that is called each time an {@link Annotation} matching a given refType. + * Note that if you want to receive individual realtime annotations (instead of just the rolled-up summaries), you will need to request the annotation_subscribe ChannelMode in ChannelOptions, since they are not delivered by default. In general, most clients will not bother with subscribing to individual annotations, and will instead just look at the summary updates. + * + * @param refType - A specific refType string or an array of them to register the listener for. + * @param listener - An event listener function. + * @returns A promise which resolves upon success of the channel {@link RealtimeChannel.attach | `attach()`} operation and rejects with an {@link ErrorInfo} object upon its failure. + */ + subscribe(refType: string | Array, listener?: messageCallback): Promise; + /** + * Registers a listener that is called each time an {@link Annotation} is received on the channel. + * Note that if you want to receive individual realtime annotations (instead of just the rolled-up summaries), you will need to request the annotation_subscribe ChannelMode in ChannelOptions, since they are not delivered by default. In general, most clients will not bother with subscribing to individual annotations, and will instead just look at the summary updates. + * + * @param listener - An event listener function. + * @returns A promise which resolves upon success of the channel {@link RealtimeChannel.attach | `attach()`} operation and rejects with an {@link ErrorInfo} object upon its failure. + */ + subscribe(listener?: messageCallback): Promise; + /** + * Deregisters a specific listener that is registered to receive {@link Annotation} on the channel for a given refType. + * + * @param refType - A specific refType (or array of refTypes) to deregister the listener for. + * @param listener - An event listener function. + */ + unsubscribe(refType: string | Array, listener: messageCallback): void; + /** + * Deregisters any listener that is registered to receive {@link Annotation} on the channel for a specific refType + * + * @param refType - A specific refType (or array of refTypes) to deregister the listeners for. + */ + unsubscribe(refType: string | Array): void; + /** + * Deregisters a specific listener that is registered to receive {@link Annotation} on the channel. + * + * @param listener - An event listener function. + */ + unsubscribe(listener: messageCallback): void; + /** + * Deregisters all listeners currently receiving {@link Annotation} for the channel. + */ + unsubscribe(): void; + /** + * Publish a new annotation for a message. + * + * @param refSerial - The `serial` of the message to annotate. + * @param refType - What type of annotation you want. + * @param data - The contents of the annotation. + */ + publish(refSerial: string, refType: string, data: string | ArrayBuffer | Uint8Array): Promise; + /** + * Get all annotations for a given message (as a paginated result) + * + * @param serial - The `serial` of the message to get annotations for. + * @param params - Restrictions on which annotations to get (in particular a limit) + */ + get(serial: string, params: GetAnnotationsParams | null): Promise; +} + /** * Enables devices to subscribe to push notifications for a channel. */ @@ -2072,6 +2146,10 @@ export declare interface Channel { * A {@link Presence} object. */ presence: Presence; + /** + * {@link RestAnnotations} + */ + annotations: RestAnnotations; /** * A {@link PushChannel} object. */ @@ -2116,6 +2194,28 @@ export declare interface Channel { status(): Promise; } +/** + * Functionality for annotating messages with small pieces of data, such as emoji + * reactions, that the server will roll up into the message as a summary. + */ +export declare interface RestAnnotations { + /** + * Publish a new annotation for a message. + * + * @param refSerial - The `serial` of the message to annotate. + * @param refType - What type of annotation you want. + * @param data - The contents of the annotation. + */ + publish(refSerial: string, refType: string, data: string | ArrayBuffer | Uint8Array): Promise; + /** + * Get all annotations for a given message (as a paginated result) + * + * @param serial - The `serial` of the message to get annotations for. + * @param params - Restrictions on which annotations to get (in particular a limit) + */ + get(serial: string, params: GetAnnotationsParams | null): Promise; +} + /** * Enables messages to be published and subscribed to. Also enables historic messages to be retrieved and provides access to the {@link RealtimePresence} object of a channel. */ @@ -2188,6 +2288,10 @@ export declare interface RealtimeChannel extends EventEmitter>): PresenceMessage; } +/** + * Static utilities related to annotations. + */ +export interface AnnotationStatic { + /** + * Decodes and decrypts a deserialized `Annotation`-like object using the cipher in {@link ChannelOptions}. Any residual transforms that cannot be decoded or decrypted will be in the `encoding` property. Intended for users receiving messages from a source other than a REST or Realtime channel (for example a queue) to avoid having to parse the encoding string and action. + * + * @param JsonObject - The deserialized `Annotation`-like object to decode and decrypt. + * @param channelOptions - A {@link ChannelOptions} object containing the cipher. + */ + fromEncoded: (JsonObject: any, channelOptions?: ChannelOptions) => Promise; + /** + * Decodes and decrypts an array of deserialized `Annotation`-like object using the cipher in {@link ChannelOptions}. Any residual transforms that cannot be decoded or decrypted will be in the `encoding` property. Intended for users receiving messages from a source other than a REST or Realtime channel (for example a queue) to avoid having to parse the encoding string. + * + * @param JsonArray - An array of deserialized `Annotation`-like objects to decode and decrypt. + * @param channelOptions - A {@link ChannelOptions} object containing the cipher. + */ + fromEncodedArray: (JsonArray: any[], channelOptions?: ChannelOptions) => Promise; +} + /** * Cipher Key used in {@link CipherParamOptions}. If set to a `string`, the value must be base64 encoded. */ @@ -2926,6 +3119,10 @@ export declare class Rest implements RestClient { * Static utilities related to presence messages. */ static PresenceMessage: PresenceMessageStatic; + /** + * Static utilities related to annotations. + */ + static Annotation: AnnotationStatic; // Requirements of RestClient @@ -2977,6 +3174,10 @@ export declare class Realtime implements RealtimeClient { * Static utilities related to presence messages. */ static PresenceMessage: PresenceMessageStatic; + /** + * Static utilities related to annotations. + */ + static Annotation: AnnotationStatic; // Requirements of RealtimeClient diff --git a/modular.d.ts b/modular.d.ts index 94f0ba27f..25fc77e1f 100644 --- a/modular.d.ts +++ b/modular.d.ts @@ -135,6 +135,21 @@ export declare const MsgPack: unknown; */ export declare const RealtimePresence: unknown; +/** + * Provides a {@link BaseRealtime} instance with the ability to interact with message + * annotations. + * + * To create a client that includes this plugin, include it in the client options that you pass to the {@link BaseRealtime.constructor}: + * + * ```javascript + * import { BaseRealtime, WebSocketTransport, FetchRequest, Annotations } from 'ably/modular'; + * const realtime = new BaseRealtime({ ...options, plugins: { WebSocketTransport, FetchRequest, Annotations } }); + * ``` + * + * If you do not provide this plugin, then attempting to access a channel’s {@link ably!RealtimeChannel.annotations} property will cause a runtime error. + */ +export declare const Annotations: unknown; + /** * Provides a {@link BaseRealtime} instance with the ability to establish a connection with the Ably realtime service using a [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API) connection. * @@ -230,6 +245,11 @@ export interface ModularPlugins { */ RealtimePresence?: typeof RealtimePresence; + /** + * See {@link Annotations | documentation for the `Annotations` plugin}. + */ + Annotations?: typeof Annotations; + /** * See {@link WebSocketTransport | documentation for the `WebSocketTransport` plugin}. */ diff --git a/src/common/lib/client/baseclient.ts b/src/common/lib/client/baseclient.ts index 0b37e1617..f7d23edf0 100644 --- a/src/common/lib/client/baseclient.ts +++ b/src/common/lib/client/baseclient.ts @@ -12,6 +12,7 @@ import * as Utils from '../util/utils'; import Platform from '../../platform'; import { Rest } from './rest'; import { IUntypedCryptoStatic } from 'common/types/ICryptoStatic'; +import { AnnotationsPlugin } from './modularplugins'; import { throwMissingPluginError } from '../util/utils'; import { MsgPack } from 'common/types/msgpack'; import { HTTPRequestImplementations } from 'platform/web/lib/http/http'; @@ -46,6 +47,7 @@ class BaseClient { // Extra HTTP request implementations available to this client, in addition to those in web’s Http.bundledRequestImplementations readonly _additionalHTTPRequestImplementations: HTTPRequestImplementations | null; private readonly __FilteredSubscriptions: typeof FilteredSubscriptions | null; + readonly _Annotations: AnnotationsPlugin | null; readonly logger: Logger; _device?: LocalDevice; @@ -98,6 +100,7 @@ class BaseClient { this._rest = options.plugins?.Rest ? new options.plugins.Rest(this) : null; this._Crypto = options.plugins?.Crypto ?? null; this.__FilteredSubscriptions = options.plugins?.MessageInteractions ?? null; + this._Annotations = options.plugins?.Annotations ?? null; } get rest(): Rest { diff --git a/src/common/lib/client/defaultrealtime.ts b/src/common/lib/client/defaultrealtime.ts index 447b82ee6..f4231cf13 100644 --- a/src/common/lib/client/defaultrealtime.ts +++ b/src/common/lib/client/defaultrealtime.ts @@ -9,10 +9,14 @@ import { DefaultMessage } from '../types/defaultmessage'; import { MsgPack } from 'common/types/msgpack'; import RealtimePresence from './realtimepresence'; import { DefaultPresenceMessage } from '../types/defaultpresencemessage'; +import { DefaultAnnotation } from '../types/defaultannotation'; import WebSocketTransport from '../transport/websockettransport'; import { FilteredSubscriptions } from './filteredsubscriptions'; import { PresenceMap } from './presencemap'; import PresenceMessage, { WirePresenceMessage } from '../types/presencemessage'; +import RealtimeAnnotations from './realtimeannotations'; +import RestAnnotations from './restannotations'; +import Annotation, { WireAnnotation } from '../types/annotation'; import { Http } from 'common/types/http'; import Defaults from '../util/defaults'; import Logger from '../util/logger'; @@ -38,6 +42,12 @@ export class DefaultRealtime extends BaseRealtime { PresenceMessage, WirePresenceMessage, }, + Annotations: { + Annotation, + WireAnnotation, + RealtimeAnnotations, + RestAnnotations, + }, WebSocketTransport, MessageInteractions: FilteredSubscriptions, }), @@ -62,6 +72,7 @@ export class DefaultRealtime extends BaseRealtime { static Message = DefaultMessage; static PresenceMessage = DefaultPresenceMessage; + static Annotation = DefaultAnnotation; static _MsgPack: MsgPack | null = null; diff --git a/src/common/lib/client/defaultrest.ts b/src/common/lib/client/defaultrest.ts index d6aa8b971..b4e1e0a22 100644 --- a/src/common/lib/client/defaultrest.ts +++ b/src/common/lib/client/defaultrest.ts @@ -5,7 +5,11 @@ import Platform from 'common/platform'; import { DefaultMessage } from '../types/defaultmessage'; import { MsgPack } from 'common/types/msgpack'; import { DefaultPresenceMessage } from '../types/defaultpresencemessage'; +import { DefaultAnnotation } from '../types/defaultannotation'; import { Http } from 'common/types/http'; +import RealtimeAnnotations from './realtimeannotations'; +import RestAnnotations from './restannotations'; +import Annotation, { WireAnnotation } from '../types/annotation'; import Defaults from '../util/defaults'; import Logger from '../util/logger'; @@ -25,6 +29,12 @@ export class DefaultRest extends BaseRest { ...allCommonModularPlugins, Crypto: DefaultRest.Crypto ?? undefined, MsgPack: DefaultRest._MsgPack ?? undefined, + Annotations: { + Annotation, + WireAnnotation, + RealtimeAnnotations, + RestAnnotations, + }, }), ); } @@ -43,6 +53,7 @@ export class DefaultRest extends BaseRest { static Message = DefaultMessage; static PresenceMessage = DefaultPresenceMessage; + static Annotation = DefaultAnnotation; static _MsgPack: MsgPack | null = null; diff --git a/src/common/lib/client/modularplugins.ts b/src/common/lib/client/modularplugins.ts index 1729d59f9..7c10f70cf 100644 --- a/src/common/lib/client/modularplugins.ts +++ b/src/common/lib/client/modularplugins.ts @@ -2,10 +2,13 @@ import { Rest } from './rest'; import { IUntypedCryptoStatic } from '../../types/ICryptoStatic'; import { MsgPack } from 'common/types/msgpack'; import RealtimePresence from './realtimepresence'; +import RealtimeAnnotations from './realtimeannotations'; +import RestAnnotations from './restannotations'; import XHRRequest from 'platform/web/lib/http/request/xhrrequest'; import fetchRequest from 'platform/web/lib/http/request/fetchrequest'; import { FilteredSubscriptions } from './filteredsubscriptions'; import PresenceMessage, { WirePresenceMessage } from '../types/presencemessage'; +import Annotation, { WireAnnotation } from '../types/annotation'; import { TransportCtor } from '../transport/transport'; import * as PushPlugin from 'plugins/push'; @@ -18,11 +21,19 @@ export type RealtimePresencePlugin = PresenceMessagePlugin & { RealtimePresence: typeof RealtimePresence; }; +export type AnnotationsPlugin = { + Annotation: typeof Annotation; + WireAnnotation: typeof WireAnnotation; + RealtimeAnnotations: typeof RealtimeAnnotations; + RestAnnotations: typeof RestAnnotations; +}; + export interface ModularPlugins { Rest?: typeof Rest; Crypto?: IUntypedCryptoStatic; MsgPack?: MsgPack; RealtimePresence?: RealtimePresencePlugin; + Annotations?: AnnotationsPlugin; WebSocketTransport?: TransportCtor; XHRPolling?: TransportCtor; XHRRequest?: typeof XHRRequest; diff --git a/src/common/lib/client/realtimeannotations.ts b/src/common/lib/client/realtimeannotations.ts new file mode 100644 index 000000000..6fa7f6888 --- /dev/null +++ b/src/common/lib/client/realtimeannotations.ts @@ -0,0 +1,93 @@ +import EventEmitter from '../util/eventemitter'; +import Logger from '../util/logger'; +import Annotation from '../types/annotation'; +import { actions, flags } from '../types/protocolmessagecommon'; +import { fromValues as protocolMessageFromValues } from '../types/protocolmessage'; +import type { CipherOptions } from '../types/basemessage'; +import ErrorInfo from '../types/errorinfo'; +import RealtimeChannel from './realtimechannel'; +import RestAnnotations, { RestGetAnnotationsParams } from './restannotations'; +import type { PaginatedResult } from './paginatedresource'; + +class RealtimeAnnotations { + private channel: RealtimeChannel; + private logger: Logger; + private subscriptions: EventEmitter; + + constructor(channel: RealtimeChannel) { + this.channel = channel; + this.logger = channel.logger; + this.subscriptions = new EventEmitter(this.logger); + } + + async publish(refSerial: string, refType: string, data: any): Promise { + const channelName = this.channel.name; + const annotation = Annotation.fromValues({ + action: 'annotation.create', + refSerial, + refType, + data, + }); + + // TODO get rid of CipherOptions type assertion, indicates channeloptions types are broken + const wireAnnotation = await annotation.encode(this.channel.channelOptions as CipherOptions); + + this.channel._throwIfUnpublishableState(); + + Logger.logAction( + this.logger, + Logger.LOG_MICRO, + 'RealtimeAnnotations.publish()', + 'channelName = ' + channelName + ', sending annotation with refSerial = ' + refSerial + ', refType = ' + refType, + ); + + const pm = protocolMessageFromValues({ + action: actions.ANNOTATION, + channel: channelName, + annotations: [wireAnnotation], + }); + return this.channel.sendMessage(pm); + } + + async subscribe(..._args: unknown[] /* [refType], listener */): Promise { + const args = RealtimeChannel.processListenerArgs(_args); + const event = args[0]; + const listener = args[1]; + const channel = this.channel; + + if (channel.state === 'failed') { + throw ErrorInfo.fromValues(channel.invalidStateError()); + } + + await channel.attach(); + + if ((this.channel._mode & flags.ANNOTATION_SUBSCRIBE) === 0) { + throw new ErrorInfo( + "You're trying to add an annotation listener, but you haven't requested the annotation_subscribe channel mode in ChannelOptions, so this won't do anything (we only deliver annotations to clients who have explicitly requested them)", + 93001, + 400, + ); + } + + this.subscriptions.on(event, listener); + } + + unsubscribe(..._args: unknown[] /* [event], listener */): void { + const args = RealtimeChannel.processListenerArgs(_args); + const event = args[0]; + const listener = args[1]; + this.subscriptions.off(event, listener); + } + + _processIncoming(annotations: Annotation[]): void { + for (const annotation of annotations) { + this.subscriptions.emit(annotation.refType || '', annotation); + } + } + + async get(serial: string, params: RestGetAnnotationsParams | null): Promise> { + return RestAnnotations.prototype.get.call(this, serial, params); + } +} + +export default RealtimeAnnotations; diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 351198954..cac9eec8d 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -19,6 +19,7 @@ import { PaginatedResult } from './paginatedresource'; import type { PushChannel } from 'plugins/push'; import type { WirePresenceMessage } from '../types/presencemessage'; import type RealtimePresence from './realtimepresence'; +import type RealtimeAnnotations from './realtimeannotations'; interface RealtimeHistoryParams { start?: number; @@ -57,12 +58,19 @@ class RealtimeChannel extends EventEmitter { channelOptions: ChannelOptions; client: BaseRealtime; private _presence: RealtimePresence | null; + private _annotations: RealtimeAnnotations | null = null; get presence(): RealtimePresence { if (!this._presence) { Utils.throwMissingPluginError('RealtimePresence'); } return this._presence; } + get annotations(): RealtimeAnnotations { + if (!this._annotations) { + Utils.throwMissingPluginError('Annotations'); + } + return this._annotations; + } connectionManager: ConnectionManager; state: API.ChannelState; subscriptions: EventEmitter; @@ -96,6 +104,9 @@ class RealtimeChannel extends EventEmitter { this.channelOptions = normaliseChannelOptions(client._Crypto ?? null, this.logger, options); this.client = client; this._presence = client._RealtimePresence ? new client._RealtimePresence.RealtimePresence(this) : null; + if (client._Annotations) { + this._annotations = new client._Annotations.RealtimeAnnotations(this); + } this.connectionManager = client.connection.connectionManager; this.state = 'initialized'; this.subscriptions = new EventEmitter(this.logger); @@ -646,6 +657,21 @@ class RealtimeChannel extends EventEmitter { break; } + case actions.ANNOTATION: { + populateFieldsFromParent(message); + const options = this.channelOptions; + if (this._annotations) { + const annotations = await Promise.all( + (message.annotations || []).map((wpm) => { + return wpm.decode(options, this.logger); + }), + ); + + this._annotations._processIncoming(annotations); + } + break; + } + case actions.ERROR: { /* there was a channel-specific error */ const err = message.error as ErrorInfo; diff --git a/src/common/lib/client/restannotations.ts b/src/common/lib/client/restannotations.ts new file mode 100644 index 000000000..4b50a096b --- /dev/null +++ b/src/common/lib/client/restannotations.ts @@ -0,0 +1,75 @@ +import * as Utils from '../util/utils'; +import Annotation, { WireAnnotation, _fromEncodedArray } from '../types/annotation'; +import type { CipherOptions } from '../types/basemessage'; +import RestChannel from './restchannel'; +import Defaults from '../util/defaults'; +import PaginatedResource, { PaginatedResult } from './paginatedresource'; +import Resource from './resource'; + +export interface RestGetAnnotationsParams { + limit?: number; +} + +class RestAnnotations { + private channel: RestChannel; + + constructor(channel: RestChannel) { + this.channel = channel; + } + + async publish(refSerial: string, refType: string, data: any): Promise { + const annotation = Annotation.fromValues({ + action: 'annotation.create', + refSerial, + refType, + data, + }); + + // TODO get rid of CipherOptions type assertion, indicates channeloptions types are broken + const wireAnnotation = await annotation.encode(this.channel.channelOptions as CipherOptions); + + const client = this.channel.client, + options = client.options, + format = options.useBinaryProtocol ? Utils.Format.msgpack : Utils.Format.json, + headers = Defaults.defaultPostHeaders(client.options, { format }), + params = {}; + + const requestBody = Utils.encodeBody([wireAnnotation], client._MsgPack, format); + + await Resource.post(client, this.basePathForSerial(refSerial), requestBody, headers, params, null, true); + } + + async get(serial: string, params: RestGetAnnotationsParams | null): Promise> { + const client = this.channel.client, + format = client.options.useBinaryProtocol ? Utils.Format.msgpack : Utils.Format.json, + envelope = client.http.supportsLinkHeaders ? undefined : format, + headers = Defaults.defaultGetHeaders(client.options, { format }); + + Utils.mixin(headers, client.options.headers); + + return new PaginatedResource( + client, + this.basePathForSerial(serial), + headers, + envelope, + async (body, _, unpacked) => { + const decoded = ( + unpacked ? body : Utils.decodeBody(body, client._MsgPack, format) + ) as Utils.Properties[]; + + return _fromEncodedArray(decoded, this.channel); + }, + ).get(params as Record); + } + + private basePathForSerial(serial: string) { + return ( + this.channel.client.rest.channelMixin.basePath(this.channel) + + '/messages/' + + encodeURIComponent(serial) + + '/annotations' + ); + } +} + +export default RestAnnotations; diff --git a/src/common/lib/client/restchannel.ts b/src/common/lib/client/restchannel.ts index 0c14d120a..8cdab42b7 100644 --- a/src/common/lib/client/restchannel.ts +++ b/src/common/lib/client/restchannel.ts @@ -17,6 +17,7 @@ import Defaults, { normaliseChannelOptions } from '../util/defaults'; import { RestHistoryParams } from './restchannelmixin'; import { RequestBody } from 'common/types/http'; import type { PushChannel } from 'plugins/push'; +import type RestAnnotations from './restannotations'; const MSG_ID_ENTROPY_BYTES = 9; @@ -32,6 +33,13 @@ class RestChannel { presence: RestPresence; channelOptions: ChannelOptions; _push?: PushChannel; + private _annotations: RestAnnotations | null = null; + get annotations(): RestAnnotations { + if (!this._annotations) { + Utils.throwMissingPluginError('Annotations'); + } + return this._annotations; + } constructor(client: BaseRest, name: string, channelOptions?: ChannelOptions) { Logger.logAction(client.logger, Logger.LOG_MINOR, 'RestChannel()', 'started; name = ' + name); @@ -42,6 +50,9 @@ class RestChannel { if (client.options.plugins?.Push) { this._push = new client.options.plugins.Push.PushChannel(this); } + if (client._Annotations) { + this._annotations = new client._Annotations.RestAnnotations(this); + } } get push() { diff --git a/src/common/lib/transport/comettransport.ts b/src/common/lib/transport/comettransport.ts index 77f0a75f6..ce75be69f 100644 --- a/src/common/lib/transport/comettransport.ts +++ b/src/common/lib/transport/comettransport.ts @@ -353,7 +353,11 @@ abstract class CometTransport extends Transport { if (items && items.length) for (let i = 0; i < items.length; i++) this.onProtocolMessage( - protocolMessageFromDeserialized(items[i], this.connectionManager.realtime._RealtimePresence), + protocolMessageFromDeserialized( + items[i], + this.connectionManager.realtime._RealtimePresence, + this.connectionManager.realtime._Annotations, + ), ); } catch (e) { Logger.logAction( diff --git a/src/common/lib/transport/connectionmanager.ts b/src/common/lib/transport/connectionmanager.ts index 73e948379..d7e08b481 100644 --- a/src/common/lib/transport/connectionmanager.ts +++ b/src/common/lib/transport/connectionmanager.ts @@ -1805,7 +1805,7 @@ class ConnectionManager extends EventEmitter { Logger.LOG_MICRO, 'ConnectionManager.send()', - 'queueing msg; ' + stringifyProtocolMessage(msg, this.realtime._RealtimePresence), + 'queueing msg; ' + stringifyProtocolMessage(msg, this.realtime._RealtimePresence, this.realtime._Annotations), ); } this.queue(msg, callback); diff --git a/src/common/lib/transport/protocol.ts b/src/common/lib/transport/protocol.ts index 5f792b1a7..98ec9507d 100644 --- a/src/common/lib/transport/protocol.ts +++ b/src/common/lib/transport/protocol.ts @@ -21,7 +21,7 @@ export class PendingMessage { this.merged = false; const action = message.action; this.sendAttempted = false; - this.ackRequired = action == actions.MESSAGE || action == actions.PRESENCE; + this.ackRequired = action == actions.MESSAGE || action == actions.PRESENCE || action == actions.ANNOTATION; } } @@ -78,7 +78,11 @@ class Protocol extends EventEmitter { Logger.LOG_MICRO, 'Protocol.send()', 'sending msg; ' + - stringifyProtocolMessage(pendingMessage.message, this.transport.connectionManager.realtime._RealtimePresence), + stringifyProtocolMessage( + pendingMessage.message, + this.transport.connectionManager.realtime._RealtimePresence, + this.transport.connectionManager.realtime._Annotations, + ), ); } pendingMessage.sendAttempted = true; diff --git a/src/common/lib/transport/transport.ts b/src/common/lib/transport/transport.ts index 5172bd6d6..005c84e55 100644 --- a/src/common/lib/transport/transport.ts +++ b/src/common/lib/transport/transport.ts @@ -128,7 +128,11 @@ abstract class Transport extends EventEmitter { 'received on ' + this.shortName + ': ' + - stringifyProtocolMessage(message, this.connectionManager.realtime._RealtimePresence) + + stringifyProtocolMessage( + message, + this.connectionManager.realtime._RealtimePresence, + this.connectionManager.realtime._Annotations, + ) + '; connectionId = ' + this.connectionManager.connectionId, ); diff --git a/src/common/lib/transport/websockettransport.ts b/src/common/lib/transport/websockettransport.ts index 3e12f1d67..7145bf4b8 100644 --- a/src/common/lib/transport/websockettransport.ts +++ b/src/common/lib/transport/websockettransport.ts @@ -140,6 +140,7 @@ class WebSocketTransport extends Transport { data, this.connectionManager.realtime._MsgPack, this.connectionManager.realtime._RealtimePresence, + this.connectionManager.realtime._Annotations, this.format, ), ); diff --git a/src/common/lib/types/annotation.ts b/src/common/lib/types/annotation.ts new file mode 100644 index 000000000..85f86e0f9 --- /dev/null +++ b/src/common/lib/types/annotation.ts @@ -0,0 +1,123 @@ +import Logger from '../util/logger'; +import { BaseMessage, encode, decode, wireToJSON, normalizeCipherOptions, CipherOptions, strMsg } from './basemessage'; +import * as API from '../../../../ably'; +import * as Utils from '../util/utils'; + +import type { IUntypedCryptoStatic } from '../../types/ICryptoStatic'; +import type { Properties } from '../util/utils'; +import type RestChannel from '../client/restchannel'; +import type RealtimeChannel from '../client/realtimechannel'; +import type { ChannelOptions } from '../../types/channel'; +type Channel = RestChannel | RealtimeChannel; + +const actions = ['annotation.create', 'annotation.delete']; + +export async function fromEncoded( + logger: Logger, + Crypto: IUntypedCryptoStatic | null, + encoded: WireAnnotation, + inputOptions?: API.ChannelOptions, +): Promise { + const options = normalizeCipherOptions(Crypto, logger, inputOptions ?? null); + const wa = WireAnnotation.fromValues(encoded); + return wa.decode(options, logger); +} + +export async function fromEncodedArray( + logger: Logger, + Crypto: IUntypedCryptoStatic | null, + encodedArray: WireAnnotation[], + options?: API.ChannelOptions, +): Promise { + return Promise.all( + encodedArray.map(function (encoded) { + return fromEncoded(logger, Crypto, encoded, options); + }), + ); +} + +// these forms of the functions are used internally when we have a channel instance +// already, so don't need to normalise channel options +export async function _fromEncoded(encoded: Properties, channel: Channel): Promise { + return WireAnnotation.fromValues(encoded).decode(channel.channelOptions, channel.logger); +} + +export async function _fromEncodedArray( + encodedArray: Properties[], + channel: Channel, +): Promise { + return Promise.all( + encodedArray.map(function (encoded) { + return _fromEncoded(encoded, channel); + }), + ); +} + +// for tree-shakability +export function fromValues(values: Properties) { + return Annotation.fromValues(values); +} + +class Annotation extends BaseMessage { + action?: API.AnnotationAction; + serial?: string; + refSerial?: string; + refType?: string; + + async encode(options: CipherOptions): Promise { + const res = Object.assign(new WireAnnotation(), this, { + action: actions.indexOf(this.action || 'annotation.create'), + }); + return encode(res, options); + } + + static fromValues(values: Properties): Annotation { + return Object.assign(new Annotation(), values); + } + + static fromValuesArray(values: Properties[]): Annotation[] { + return values.map((v) => Annotation.fromValues(v)); + } + + toString() { + return strMsg(this, 'Annotation'); + } +} + +export class WireAnnotation extends BaseMessage { + action?: number; + serial?: string; + refSerial?: string; + refType?: string; + + toJSON(...args: any[]) { + return wireToJSON.call(this, ...args); + } + + static fromValues(values: Properties): WireAnnotation { + return Object.assign(new WireAnnotation(), values); + } + + static fromValuesArray(values: Properties[]): WireAnnotation[] { + return values.map((v) => WireAnnotation.fromValues(v)); + } + + async decode(channelOptions: ChannelOptions, logger: Logger): Promise { + const res = Object.assign(new Annotation(), { + ...this, + action: actions[this.action!], + }); + try { + await decode(res, channelOptions); + } catch (e) { + Logger.logAction(logger, Logger.LOG_ERROR, 'WireAnnotation.decode()', Utils.inspectError(e)); + } + return res; + } + + toString() { + return strMsg(this, 'WireAnnotation'); + } +} + +export default Annotation; diff --git a/src/common/lib/types/basemessage.ts b/src/common/lib/types/basemessage.ts index 7665b5ffe..421be10e2 100644 --- a/src/common/lib/types/basemessage.ts +++ b/src/common/lib/types/basemessage.ts @@ -236,6 +236,9 @@ export function populateFieldsFromParent(parent: ProtocolMessage) { case actions.SYNC: msgs = parent.presence!; break; + case actions.ANNOTATION: + msgs = parent.annotations!; + break; default: throw new ErrorInfo('Unexpected action ' + parent.action, 40000, 400); } diff --git a/src/common/lib/types/defaultannotation.ts b/src/common/lib/types/defaultannotation.ts new file mode 100644 index 000000000..2393f7431 --- /dev/null +++ b/src/common/lib/types/defaultannotation.ts @@ -0,0 +1,22 @@ +import * as API from '../../../../ably'; +import Logger from '../util/logger'; +import Annotation, { fromEncoded, fromEncodedArray, WireAnnotation } from './annotation'; +import Platform from 'common/platform'; +import type { Properties } from '../util/utils'; + +/** + `DefaultAnnotation` is the class returned by `DefaultRest` and `DefaultRealtime`’s `Annotation` static property. It introduces the static methods described in the `AnnotationStatic` interface of the public API of the non tree-shakable version of the library. + */ +export class DefaultAnnotation extends Annotation { + static async fromEncoded(encoded: unknown, inputOptions?: API.ChannelOptions): Promise { + return fromEncoded(Logger.defaultLogger, Platform.Crypto, encoded as WireAnnotation, inputOptions); + } + + static async fromEncodedArray(encodedArray: Array, options?: API.ChannelOptions): Promise { + return fromEncodedArray(Logger.defaultLogger, Platform.Crypto, encodedArray as WireAnnotation[], options); + } + + static fromValues(values: Properties): Annotation { + return Annotation.fromValues(values); + } +} diff --git a/src/common/lib/types/message.ts b/src/common/lib/types/message.ts index 41c7ccdf0..37a9028e4 100644 --- a/src/common/lib/types/message.ts +++ b/src/common/lib/types/message.ts @@ -116,6 +116,7 @@ class Message extends BaseMessage { createdAt?: number; version?: string; operation?: API.Operation; + summary?: any; // TODO improve typings after summary structure is finalised expandFields() { if (this.action === 'message.create') { @@ -160,6 +161,7 @@ export class WireMessage extends BaseMessage { createdAt?: number; version?: string; operation?: API.Operation; + summary?: any; // Overload toJSON() to intercept JSON.stringify() toJSON(...args: any[]) { diff --git a/src/common/lib/types/protocolmessage.ts b/src/common/lib/types/protocolmessage.ts index 5d6321777..dcf7eeb85 100644 --- a/src/common/lib/types/protocolmessage.ts +++ b/src/common/lib/types/protocolmessage.ts @@ -1,10 +1,14 @@ import { MsgPack } from 'common/types/msgpack'; import * as API from '../../../../ably'; import { PresenceMessagePlugin } from '../client/modularplugins'; +import { AnnotationsPlugin } from '../client/modularplugins'; import * as Utils from '../util/utils'; import ErrorInfo from './errorinfo'; import { WireMessage } from './message'; import PresenceMessage, { WirePresenceMessage } from './presencemessage'; +import Annotation, { WireAnnotation } from './annotation'; +import RealtimeAnnotations from '../client/realtimeannotations'; +import RestAnnotations from '../client/restannotations'; import { flags, flagNames, channelModes, ActionName } from './protocolmessagecommon'; import type { Properties } from '../util/utils'; @@ -24,15 +28,17 @@ export function deserialize( serialized: unknown, MsgPack: MsgPack | null, presenceMessagePlugin: PresenceMessagePlugin | null, + annotationsPlugin: AnnotationsPlugin | null, format?: Utils.Format, ): ProtocolMessage { const deserialized = Utils.decodeBody>(serialized, MsgPack, format); - return fromDeserialized(deserialized, presenceMessagePlugin); + return fromDeserialized(deserialized, presenceMessagePlugin, annotationsPlugin); } export function fromDeserialized( deserialized: Record, presenceMessagePlugin: PresenceMessagePlugin | null, + annotationsPlugin: AnnotationsPlugin | null, ): ProtocolMessage { let error: ErrorInfo | undefined; if (deserialized.error) { @@ -51,21 +57,36 @@ export function fromDeserialized( ); } - return Object.assign(new ProtocolMessage(), { ...deserialized, presence, messages, error }); + let annotations: WireAnnotation[] | undefined; + if (annotationsPlugin && deserialized.annotations) { + annotations = annotationsPlugin.WireAnnotation.fromValuesArray( + deserialized.annotations as Array>, + ); + } + + return Object.assign(new ProtocolMessage(), { ...deserialized, presence, messages, annotations, error }); } /** * Used by the tests. */ export function fromDeserializedIncludingDependencies(deserialized: Record): ProtocolMessage { - return fromDeserialized(deserialized, { PresenceMessage, WirePresenceMessage }); + return fromDeserialized( + deserialized, + { PresenceMessage, WirePresenceMessage }, + { Annotation, WireAnnotation, RealtimeAnnotations, RestAnnotations }, + ); } -export function fromValues(values: unknown): ProtocolMessage { +export function fromValues(values: Properties): ProtocolMessage { return Object.assign(new ProtocolMessage(), values); } -export function stringify(msg: any, presenceMessagePlugin: PresenceMessagePlugin | null): string { +export function stringify( + msg: any, + presenceMessagePlugin: PresenceMessagePlugin | null, + annotationsPlugin: AnnotationsPlugin | null, +): string { let result = '[ProtocolMessage'; if (msg.action !== undefined) result += '; action=' + ActionName[msg.action] || msg.action; @@ -79,6 +100,9 @@ export function stringify(msg: any, presenceMessagePlugin: PresenceMessagePlugin if (msg.messages) result += '; messages=' + toStringArray(WireMessage.fromValuesArray(msg.messages)); if (msg.presence && presenceMessagePlugin) result += '; presence=' + toStringArray(presenceMessagePlugin.WirePresenceMessage.fromValuesArray(msg.presence)); + if (msg.annotations && annotationsPlugin) { + result += '; annotations=' + toStringArray(annotationsPlugin.WireAnnotation.fromValuesArray(msg.annotations)); + } if (msg.error) result += '; error=' + ErrorInfo.fromValues(msg.error).toString(); if (msg.auth && msg.auth.accessToken) result += '; token=' + msg.auth.accessToken; if (msg.flags) result += '; flags=' + flagNames.filter(msg.hasFlag).join(','); @@ -112,8 +136,10 @@ class ProtocolMessage { messages?: WireMessage[]; // This will be undefined if we skipped decoding this property due to user not requesting presence functionality — see `fromDeserialized` presence?: WirePresenceMessage[]; + annotations?: WireAnnotation[]; auth?: unknown; connectionDetails?: Record; + params?: Record; hasFlag = (flag: string): boolean => { return ((this.flags as number) & flags[flag]) > 0; @@ -123,8 +149,8 @@ class ProtocolMessage { return (this.flags = (this.flags as number) | flags[flag]); } - getMode(): number | undefined { - return this.flags && this.flags & flags.MODE_ALL; + getMode(): number { + return (this.flags || 0) & flags.MODE_ALL; } encodeModesToFlags(modes: API.ChannelMode[]): void { diff --git a/src/common/lib/types/protocolmessagecommon.ts b/src/common/lib/types/protocolmessagecommon.ts index 559f8b0bb..bb1aaabc7 100644 --- a/src/common/lib/types/protocolmessagecommon.ts +++ b/src/common/lib/types/protocolmessagecommon.ts @@ -49,11 +49,19 @@ export const flags: { [key: string]: number } = { export const flagNames = Object.keys(flags); -flags.MODE_ALL = flags.PRESENCE - | flags.PUBLISH - | flags.SUBSCRIBE - | flags.PRESENCE_SUBSCRIBE - | flags.ANNOTATION_PUBLISH - | flags.ANNOTATION_SUBSCRIBE; +flags.MODE_ALL = + flags.PRESENCE | + flags.PUBLISH | + flags.SUBSCRIBE | + flags.PRESENCE_SUBSCRIBE | + flags.ANNOTATION_PUBLISH | + flags.ANNOTATION_SUBSCRIBE; -export const channelModes = ['PRESENCE', 'PUBLISH', 'SUBSCRIBE', 'PRESENCE_SUBSCRIBE', 'ANNOTATION_PUBLISH', 'ANNOTATION_SUBSCRIBE']; +export const channelModes = [ + 'PRESENCE', + 'PUBLISH', + 'SUBSCRIBE', + 'PRESENCE_SUBSCRIBE', + 'ANNOTATION_PUBLISH', + 'ANNOTATION_SUBSCRIBE', +]; diff --git a/src/platform/web/modular.ts b/src/platform/web/modular.ts index 19a9b513a..cb696945a 100644 --- a/src/platform/web/modular.ts +++ b/src/platform/web/modular.ts @@ -39,6 +39,7 @@ export * from './modular/message'; export * from './modular/presencemessage'; export * from './modular/msgpack'; export * from './modular/realtimepresence'; +export * from './modular/annotations'; export * from './modular/transports'; export * from './modular/http'; export { Rest } from '../../common/lib/client/rest'; diff --git a/src/platform/web/modular/annotations.ts b/src/platform/web/modular/annotations.ts new file mode 100644 index 000000000..988ab7679 --- /dev/null +++ b/src/platform/web/modular/annotations.ts @@ -0,0 +1,13 @@ +import { AnnotationsPlugin } from 'common/lib/client/modularplugins'; +import RealtimeAnnotations from '../../../common/lib/client/realtimeannotations'; +import RestAnnotations from '../../../common/lib/client/restannotations'; +import Annotation, { WireAnnotation } from '../../../common/lib/types/annotation'; + +const Annotations: AnnotationsPlugin = { + Annotation, + WireAnnotation, + RealtimeAnnotations, + RestAnnotations, +}; + +export { Annotations }; diff --git a/test/browser/modular.test.js b/test/browser/modular.test.js index e6af1530b..3fe5874be 100644 --- a/test/browser/modular.test.js +++ b/test/browser/modular.test.js @@ -21,6 +21,7 @@ import { FetchRequest, XHRRequest, MessageInteractions, + Annotations, } from '../../build/modular/index.mjs'; function registerAblyModularTests(Helper) { @@ -916,6 +917,147 @@ function registerAblyModularTests(Helper) { }); }); + describe('Annotations', () => { + describe('BaseRealtime without Annotations', () => { + /** @nospec */ + it('throws an error when attempting to access the `annotations` property', async function () { + const helper = this.test.helper; + const client = new BaseRealtime(helper.ablyClientOptions({ plugins: { WebSocketTransport, FetchRequest } })); + + await monitorConnectionThenCloseAndFinish( + helper, + async () => { + const channel = client.channels.get('channel'); + + expect(() => channel.annotations).to.throw('Annotations plugin not provided'); + }, + client, + ); + }); + + /** @nospec */ + it('doesn’t break when it receives an ANNOTATION ProtocolMessage', async function () { + const helper = this.test.helper; + const rxClient = new BaseRealtime( + helper.ablyClientOptions({ plugins: { WebSocketTransport, FetchRequest } }), + ); + const channelName = 'mutable:annotation-1'; + + await monitorConnectionThenCloseAndFinish( + helper, + async () => { + const rxChannel = rxClient.channels.get(channelName, { + modes: ['PUBLISH', 'SUBSCRIBE', 'ANNOTATION_PUBLISH', 'ANNOTATION_SUBSCRIBE'], + }); + await rxChannel.attach(); + let receivedMessagePromise = rxChannel.subscriptions.once(); + + const txClient = new BaseRealtime( + this.test.helper.ablyClientOptions({ + plugins: { + WebSocketTransport, + FetchRequest, + Annotations, + }, + }), + ); + + await monitorConnectionThenCloseAndFinish( + helper, + async () => { + const txChannel = txClient.channels.get(channelName); + const onMessage = txChannel.subscriptions.once(); + await txChannel.attach(); + await txChannel.publish('test', 'body'); + const message = await onMessage; + await txChannel.annotations.publish(message.serial, 'reaction:emoji.v1', '👍'); + + // with that received, do another round-trip pub-sub of a normal message + // to check that rxChannel is still receiving + await receivedMessagePromise; + receivedMessagePromise = rxChannel.subscriptions.once(); + txChannel.publish('test2', 'body'); + await receivedMessagePromise; + }, + txClient, + ); + }, + rxClient, + ); + }); + }); + + describe('BaseRealtime with Annotations', () => { + it('offers annotation functionality', async function () { + const helper = this.test.helper; + const channelName = 'mutable:annotation-2'; + const rxClient = new BaseRealtime( + helper.ablyClientOptions({ + plugins: { + WebSocketTransport, + FetchRequest, + Annotations, + }, + }), + ); + const rxChannel = rxClient.channels.get(channelName, { + modes: ['PUBLISH', 'SUBSCRIBE', 'ANNOTATION_PUBLISH', 'ANNOTATION_SUBSCRIBE'], + }); + + await monitorConnectionThenCloseAndFinish( + helper, + async () => { + const txRealtime = new BaseRealtime( + this.test.helper.ablyClientOptions({ + plugins: { + WebSocketTransport, + FetchRequest, + Annotations, + }, + }), + ); + const txRest = new BaseRest( + this.test.helper.ablyClientOptions({ + plugins: { + FetchRequest, + Annotations, + }, + }), + ); + + await monitorConnectionThenCloseAndFinish( + helper, + async () => { + const txChannel = txRealtime.channels.get(channelName, { + modes: ['PUBLISH', 'SUBSCRIBE', 'ANNOTATION_PUBLISH', 'ANNOTATION_SUBSCRIBE'], + }); + const onMessage = txChannel.subscriptions.once(); + let rxOnAnnotation = rxChannel.annotations.subscriptions.once(); + await txChannel.attach(); + await rxChannel.attach(); + + await txChannel.publish('test', 'body'); + const message = await onMessage; + await txChannel.annotations.publish(message.serial, 'reaction:emoji.v1', '👍'); + let annotation = await rxOnAnnotation; + expect(annotation.data).to.equal('👍'); + + // and try a rest annotation publish + rxOnAnnotation = rxChannel.annotations.subscriptions.once(); + const txRestChannel = txRest.channels.get(channelName); + await txRestChannel.annotations.publish(message.serial, 'reaction:emoji.v1', '😕'); + annotation = await rxOnAnnotation; + expect(annotation.data).to.equal('😕'); + }, + txRealtime, + ); + }, + rxClient, + ); + }); + }); + }); + describe('Transports', () => { describe('BaseRealtime', () => { describe('without a transport plugin', () => { diff --git a/test/common/ably-common b/test/common/ably-common index 0b6eb2564..496da5ead 160000 --- a/test/common/ably-common +++ b/test/common/ably-common @@ -1 +1 @@ -Subproject commit 0b6eb25646cf62d999fb42cda1fefb6af533b2e5 +Subproject commit 496da5ead0fa9d6667be0422b5a7ffa62fa7366c diff --git a/test/realtime/annotations.test.js b/test/realtime/annotations.test.js new file mode 100644 index 000000000..e32d595fe --- /dev/null +++ b/test/realtime/annotations.test.js @@ -0,0 +1,37 @@ +'use strict'; + +define(['shared_helper', 'chai'], function (Helper, chai) { + const { assert } = chai; + describe('realtime/annotations', function () { + this.timeout(10 * 1000); + + before(function (done) { + const helper = Helper.forHook(this); + helper.setupApp(function (err) { + if (err) { + done(err); + return; + } + rest = helper.AblyRest(); + done(); + }); + }); + + it('', () => { + // TODO + // const helper = this.test.helper; + // const realtime = helper.AblyRealtime(); + // const channel = realtime.channels.get('channel-with-options', { modes: ['PRESENCE'] }); + // channel.attach(); + // Helper.whenPromiseSettles(channel.whenState('attaching'), function () { + // try { + // realtime.channels.get('channel-with-options', { modes: ['PRESENCE'] }); + // helper.closeAndFinish(done, realtime); + // } catch (err) { + // helper.closeAndFinish(done, realtime, err); + // } + // }); + // assert.isFalse(presenceMap.remove(incoming)); + }); + }); +}); diff --git a/test/realtime/channel.test.js b/test/realtime/channel.test.js index 7d2f5a76f..eb56df309 100644 --- a/test/realtime/channel.test.js +++ b/test/realtime/channel.test.js @@ -1465,7 +1465,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async /* Stub out the channel's ability to communicate */ helper.recordPrivateApi('replace.channel.sendMessage'); - channel.sendMessage = function () {}; + channel.sendMessage = async function () {}; async.series( [