Skip to content

Commit

Permalink
Support annotations (rest and realtime)
Browse files Browse the repository at this point in the history
  • Loading branch information
SimonWoolf committed Jan 23, 2025
1 parent a6fee8a commit 14c9963
Show file tree
Hide file tree
Showing 15 changed files with 505 additions and 15 deletions.
135 changes: 135 additions & 0 deletions ably.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1989,6 +1989,61 @@ export declare interface RealtimePresence {
leaveClient(clientId: string, data?: any): Promise<void>;
}

/**
* Functionality for annotating messages with small pieces of data, such as emoji
* reactions, that the server will roll up into the message as a summary.
*/
export declare interface RealtimeAnnotations {
/**
* Registers a listener that is called each time an {@link Annotation} matching a given refType.
* Note that if you want to receive individual realtime annotations (instead of just the rolled-up summaries), you will need to request the annotation_subscribe ChannelMode in ChannelOptions, since they are not delivered by default. In general, most clients will not bother with subscribing to individual annotations, and will instead just look at the summary updates.
*
* @param refType - A specific refType string or an array of them to register the listener for.
* @param listener - An event listener function.
* @returns A promise which resolves upon success of the channel {@link RealtimeChannel.attach | `attach()`} operation and rejects with an {@link ErrorInfo} object upon its failure.
*/
subscribe(refType: string | Array<string>, listener?: messageCallback<PresenceMessage>): Promise<void>;
/**
* Registers a listener that is called each time an {@link Annotation} is received on the channel.
* Note that if you want to receive individual realtime annotations (instead of just the rolled-up summaries), you will need to request the annotation_subscribe ChannelMode in ChannelOptions, since they are not delivered by default. In general, most clients will not bother with subscribing to individual annotations, and will instead just look at the summary updates.
*
* @param listener - An event listener function.
* @returns A promise which resolves upon success of the channel {@link RealtimeChannel.attach | `attach()`} operation and rejects with an {@link ErrorInfo} object upon its failure.
*/
subscribe(listener?: messageCallback<Annotation>): Promise<void>;
/**
* Deregisters a specific listener that is registered to receive {@link Annotation} on the channel for a given refType.
*
* @param refType - A specific refType (or array of refTypes) to deregister the listener for.
* @param listener - An event listener function.
*/
unsubscribe(refType: string | Array<string>, listener: messageCallback<Annotation>): void;
/**
* Deregisters any listener that is registered to receive {@link Annotation} on the channel for a specific refType
*
* @param refType - A specific refType (or array of refTypes) to deregister the listeners for.
*/
unsubscribe(refType: string | Array<string>): void;
/**
* Deregisters a specific listener that is registered to receive {@link Annotation} on the channel.
*
* @param listener - An event listener function.
*/
unsubscribe(listener: messageCallback<Annotation>): void;
/**
* Deregisters all listeners currently receiving {@link Annotation} for the channel.
*/
unsubscribe(): void;
/**
* Publish a new annotation for a message.
*
* @param refSerial - The `serial` of the message to annotate.
* @param refType - What type of annotation you want.
* @param data - The contents of the annotation.
*/
publish(refSerial: string, refType: string, data: string | ArrayBuffer | Uint8Array): Promise<void>;
}

/**
* Enables devices to subscribe to push notifications for a channel.
*/
Expand Down Expand Up @@ -2035,6 +2090,10 @@ export declare interface Channel {
* A {@link Presence} object.
*/
presence: Presence;
/**
* {@link RestAnnotations}
*/
annotations: RestAnnotations;
/**
* A {@link PushChannel} object.
*/
Expand Down Expand Up @@ -2079,6 +2138,21 @@ export declare interface Channel {
status(): Promise<ChannelDetails>;
}

/**
* Functionality for annotating messages with small pieces of data, such as emoji
* reactions, that the server will roll up into the message as a summary.
*/
export declare interface RestAnnotations {
/**
* Publish a new annotation for a message.
*
* @param refSerial - The `serial` of the message to annotate.
* @param refType - What type of annotation you want.
* @param data - The contents of the annotation.
*/
publish(refSerial: string, refType: string, data: string | ArrayBuffer | Uint8Array): Promise<void>;
}

/**
* Enables messages to be published and subscribed to. Also enables historic messages to be retrieved and provides access to the {@link RealtimePresence} object of a channel.
*/
Expand Down Expand Up @@ -2151,6 +2225,10 @@ export declare interface RealtimeChannel extends EventEmitter<channelEventCallba
* A {@link RealtimePresence} object.
*/
presence: RealtimePresence;
/**
* {@link RealtimeAnnotations}
*/
annotations: RealtimeAnnotations;
/**
* Attach to this channel ensuring the channel is created in the Ably system and all messages published on the channel are received by any channel listeners registered using {@link RealtimeChannel.subscribe | `subscribe()`}. Any resulting channel state change will be emitted to any listeners registered using the {@link EventEmitter.on | `on()`} or {@link EventEmitter.once | `once()`} methods. As a convenience, `attach()` is called implicitly if {@link RealtimeChannel.subscribe | `subscribe()`} for the channel is called, or {@link RealtimePresence.enter | `enter()`} or {@link RealtimePresence.subscribe | `subscribe()`} are called on the {@link RealtimePresence} object for this channel.
*
Expand Down Expand Up @@ -2382,6 +2460,44 @@ export interface Message {
operation?: Operation;
}

/**
* An annotation to a message, received from Ably
*/
export interface Annotation {
/**
* Unique ID assigned by Ably to this annotation.
*/
id: string;
/**
* The client ID of the publisher of this annotation (if any).
*/
clientId?: string;
/**
* The annotation payload, if provided.
*/
data?: any;
/**
* This is typically empty, as all annotations received from Ably are automatically decoded client-side using this value. However, if the annotation encoding cannot be processed, this attribute contains the remaining transformations not applied to the `data` payload.
*/
encoding?: string;
/**
* Timestamp of when the annotation was received by Ably, as milliseconds since the Unix epoch.
*/
timestamp: number;
/**
* The action, whether this is an annotation being added or removed, one of the {@link AnnotationAction} enum values.
*/
action: AnnotationAction;
/**
* The message (of type message.create) that this annotation is annotating.
*/
refSerial: string;
/**
* The kind of annotation it is (for example, an emoji reaction)
*/
refType: string;
}

/**
* Contains the details of an operation, such as update or deletion, supplied by the actioning client.
*/
Expand Down Expand Up @@ -2437,6 +2553,25 @@ export type MessageAction =
| MessageActions.META_OCCUPANCY
| MessageActions.MESSAGE_SUMMARY;

/**
* The namespace containing the different types of annotation actions.
*/
declare namespace AnnotationActions {
/**
* Annotation action for a created annotation.
*/
type ANNOTATION_CREATE = 'annotation.create';
/**
* Annotation action for a deleted annotation.
*/
type ANNOTATION_DELETE = 'annotation.delete';
}

/**
* The possible values of the 'action' field of an {@link Annotation}.
*/
export type AnnotationAction = AnnotationActions.ANNOTATION_CREATE | AnnotationActions.ANNOTATION_DELETE;

/**
* A message received from Ably.
*/
Expand Down
3 changes: 3 additions & 0 deletions src/common/lib/client/baseclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import * as Utils from '../util/utils';
import Platform from '../../platform';
import { Rest } from './rest';
import { IUntypedCryptoStatic } from 'common/types/ICryptoStatic';
import { AnnotationsPlugin } from './modularplugins';
import { throwMissingPluginError } from '../util/utils';
import { MsgPack } from 'common/types/msgpack';
import { HTTPRequestImplementations } from 'platform/web/lib/http/http';
Expand Down Expand Up @@ -46,6 +47,7 @@ class BaseClient {
// Extra HTTP request implementations available to this client, in addition to those in web’s Http.bundledRequestImplementations
readonly _additionalHTTPRequestImplementations: HTTPRequestImplementations | null;
private readonly __FilteredSubscriptions: typeof FilteredSubscriptions | null;
readonly _Annotations: AnnotationsPlugin | null;
readonly logger: Logger;
_device?: LocalDevice;

Expand Down Expand Up @@ -98,6 +100,7 @@ class BaseClient {
this._rest = options.plugins?.Rest ? new options.plugins.Rest(this) : null;
this._Crypto = options.plugins?.Crypto ?? null;
this.__FilteredSubscriptions = options.plugins?.MessageInteractions ?? null;
this._Annotations = options.plugins?.Annotations ?? null;
}

get rest(): Rest {
Expand Down
9 changes: 9 additions & 0 deletions src/common/lib/client/defaultrealtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import WebSocketTransport from '../transport/websockettransport';
import { FilteredSubscriptions } from './filteredsubscriptions';
import { PresenceMap } from './presencemap';
import PresenceMessage, { WirePresenceMessage } from '../types/presencemessage';
import RealtimeAnnotations from './realtimeannotations';
import RestAnnotations from './restannotations';
import Annotation, { WireAnnotation } from '../types/annotation';
import { Http } from 'common/types/http';
import Defaults from '../util/defaults';
import Logger from '../util/logger';
Expand All @@ -38,6 +41,12 @@ export class DefaultRealtime extends BaseRealtime {
PresenceMessage,
WirePresenceMessage,
},
Annotations: {
Annotation,
WireAnnotation,
RealtimeAnnotations,
RestAnnotations,
},
WebSocketTransport,
MessageInteractions: FilteredSubscriptions,
}),
Expand Down
9 changes: 9 additions & 0 deletions src/common/lib/client/defaultrest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import { DefaultMessage } from '../types/defaultmessage';
import { MsgPack } from 'common/types/msgpack';
import { DefaultPresenceMessage } from '../types/defaultpresencemessage';
import { Http } from 'common/types/http';
import RealtimeAnnotations from './realtimeannotations';
import RestAnnotations from './restannotations';
import Annotation, { WireAnnotation } from '../types/annotation';
import Defaults from '../util/defaults';
import Logger from '../util/logger';

Expand All @@ -25,6 +28,12 @@ export class DefaultRest extends BaseRest {
...allCommonModularPlugins,
Crypto: DefaultRest.Crypto ?? undefined,
MsgPack: DefaultRest._MsgPack ?? undefined,
Annotations: {
Annotation,
WireAnnotation,
RealtimeAnnotations,
RestAnnotations,
},
}),
);
}
Expand Down
11 changes: 11 additions & 0 deletions src/common/lib/client/modularplugins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ import { Rest } from './rest';
import { IUntypedCryptoStatic } from '../../types/ICryptoStatic';
import { MsgPack } from 'common/types/msgpack';
import RealtimePresence from './realtimepresence';
import RealtimeAnnotations from './realtimeannotations';
import RestAnnotations from './restannotations';
import XHRRequest from 'platform/web/lib/http/request/xhrrequest';
import fetchRequest from 'platform/web/lib/http/request/fetchrequest';
import { FilteredSubscriptions } from './filteredsubscriptions';
import PresenceMessage, { WirePresenceMessage } from '../types/presencemessage';
import Annotation, { WireAnnotation } from '../types/annotation';
import { TransportCtor } from '../transport/transport';
import * as PushPlugin from 'plugins/push';

Expand All @@ -18,11 +21,19 @@ export type RealtimePresencePlugin = PresenceMessagePlugin & {
RealtimePresence: typeof RealtimePresence;
};

export type AnnotationsPlugin = {
Annotation: typeof Annotation;
WireAnnotation: typeof WireAnnotation;
RealtimeAnnotations: typeof RealtimeAnnotations;
RestAnnotations: typeof RestAnnotations;
};

export interface ModularPlugins {
Rest?: typeof Rest;
Crypto?: IUntypedCryptoStatic;
MsgPack?: MsgPack;
RealtimePresence?: RealtimePresencePlugin;
Annotations?: AnnotationsPlugin;
WebSocketTransport?: TransportCtor;
XHRPolling?: TransportCtor;
XHRRequest?: typeof XHRRequest;
Expand Down
87 changes: 87 additions & 0 deletions src/common/lib/client/realtimeannotations.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import EventEmitter from '../util/eventemitter';
import Logger from '../util/logger';
import Annotation from '../types/annotation';
import { actions, flags } from '../types/protocolmessagecommon';
import { fromValues as protocolMessageFromValues } from '../types/protocolmessage';
import type { CipherOptions } from '../types/basemessage';
import ErrorInfo from '../types/errorinfo';
import RealtimeChannel from './realtimechannel';

class RealtimeAnnotations {
private channel: RealtimeChannel;
private logger: Logger;
private subscriptions: EventEmitter;

constructor(channel: RealtimeChannel) {
this.channel = channel;
this.logger = channel.logger;
this.subscriptions = new EventEmitter(this.logger);
}

async publish(refSerial: string, refType: string, data: any): Promise<void> {
const channelName = this.channel.name;
const annotation = Annotation.fromValues({
action: 'annotation.create',
refSerial,
refType,
data,
});

// TODO get rid of CipherOptions type assertion, indicates channeloptions types are broken
const wireAnnotation = await annotation.encode(this.channel.channelOptions as CipherOptions);

this.channel._throwIfUnpublishableState();

Logger.logAction(
this.logger,
Logger.LOG_MICRO,
'RealtimeAnnotations.publish()',
'channelName = ' + channelName + ', sending annotation with refSerial = ' + refSerial + ', refType = ' + refType,
);

const pm = protocolMessageFromValues({
action: actions.ANNOTATION,
channel: channelName,
annotations: [wireAnnotation],
});
return this.channel.sendMessage(pm);
}

async subscribe(..._args: unknown[] /* [refType], listener */): Promise<void> {
const args = RealtimeChannel.processListenerArgs(_args);
const event = args[0];
const listener = args[1];
const channel = this.channel;

if (channel.state === 'failed') {
throw ErrorInfo.fromValues(channel.invalidStateError());
}

await channel.attach();

if ((this.channel._mode & flags.ANNOTATION_SUBSCRIBE) === 0) {
throw new ErrorInfo(
"You're trying to add an annotation listener, but you haven't requested the annotation_subscribe channel mode in ChannelOptions, so this won't do anything (we only deliver annotations to clients who have explicitly requested them)",
93001,
400,
);
}

this.subscriptions.on(event, listener);
}

unsubscribe(..._args: unknown[] /* [event], listener */): void {
const args = RealtimeChannel.processListenerArgs(_args);
const event = args[0];
const listener = args[1];
this.subscriptions.off(event, listener);
}

_processIncoming(annotations: Annotation[]): void {
for (const annotation of annotations) {
this.subscriptions.emit(annotation.refType || '', annotation);
}
}
}

export default RealtimeAnnotations;
Loading

0 comments on commit 14c9963

Please sign in to comment.