forked from ably/spaces
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCursorHistory.ts
86 lines (71 loc) · 2.46 KB
/
CursorHistory.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import { Types } from 'ably';
import type { CursorUpdate } from './types.js';
import type { CursorsOptions } from './types.js';
import type { OutgoingBuffer } from './CursorBatching.js';
type ConnectionId = string;
type ConnectionsLastPosition = Record<ConnectionId, null | CursorUpdate>;
export default class CursorHistory {
constructor() {}
private positionsMissing(connections: ConnectionsLastPosition) {
return Object.keys(connections).some((connectionId) => connections[connectionId] === null);
}
private messageToUpdate(
connectionId: string,
clientId: string,
update: Pick<CursorUpdate, 'position' | 'data'>,
): CursorUpdate {
return {
clientId,
connectionId,
position: update.position,
data: update.data,
};
}
private allCursorUpdates(
connections: ConnectionsLastPosition,
page: Types.PaginatedResult<Types.Message>,
): ConnectionsLastPosition {
return Object.fromEntries(
Object.entries(connections).map(([connectionId, cursors]) => {
const lastMessage = page.items.find((item) => item.connectionId === connectionId);
if (!lastMessage) return [connectionId, cursors];
const { data = [], clientId }: { data: OutgoingBuffer[] } & Pick<Types.Message, 'clientId'> = lastMessage;
const lastPositionSet = data[data.length - 1]?.cursor;
const lastUpdate = lastPositionSet
? {
clientId,
connectionId,
position: lastPositionSet.position,
data: lastPositionSet.data,
}
: null;
return [connectionId, lastUpdate];
}),
);
}
async getLastCursorUpdate(
channel: Types.RealtimeChannelPromise,
paginationLimit: CursorsOptions['paginationLimit'],
): Promise<ConnectionsLastPosition> {
const members = await channel.presence.get();
if (members.length === 0) return {};
let connections: ConnectionsLastPosition = members.reduce(
(acc, member) => ({
...acc,
[member.connectionId]: null,
}),
{},
);
const history = await channel.history();
let pageNo = 1;
let page = await history.current();
connections = this.allCursorUpdates(connections, page);
pageNo++;
while (pageNo <= paginationLimit && this.positionsMissing(connections) && history.hasNext()) {
page = await history.next();
connections = this.allCursorUpdates(connections, page);
pageNo++;
}
return connections;
}
}