forked from badlogic/skychat
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfirehose.ts
100 lines (88 loc) · 3.52 KB
/
firehose.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import { ComAtprotoSyncSubscribeRepos, FollowRecord } from "@atproto/api";
import { CarReader } from "@ipld/car/reader";
import { decode as cborDecode } from "@ipld/dag-cbor";
export { ComAtprotoSyncSubscribeRepos } from "@atproto/api";
import { AtpBaseClient } from "@atproto/api";
import { decodeMultiple } from "cbor-x";
import { cborToLexRecord, readCar } from "@atproto/repo";
export const startEventStream = (onPost: (post: Post) => void, onClose: () => void) => {
return new BskyEventStream(onPost, onClose);
};
export type Post = {
rkey: string;
authorDid: string;
text: string;
createdAt: string | number;
};
export class BskyEventStream {
serviceUri = "bsky.social";
nsid = "com.atproto.sync.subscribeRepos";
closed: boolean = false;
protected ws: WebSocket;
protected baseClient = new AtpBaseClient();
constructor(private onPost: (post: Post) => void, private onClose: () => void) {
this.serviceUri = "bsky.social";
this.nsid = "com.atproto.sync.subscribeRepos";
this.ws = new WebSocket(`wss://${this.serviceUri}/xrpc/${this.nsid}`);
this.ws.binaryType = "arraybuffer";
this.ws.onmessage = (ev) => this.handleMessage(ev.data);
this.ws.onerror = (ev) => this.handleError("Error");
this.ws.onclose = (ev) => this.handleClose(ev.code, ev.reason);
}
close(code?: number, reason?: string) {
this.ws.close(code, reason);
}
async decode(message: any) {
if (message["$type"] == "com.atproto.sync.subscribeRepos#commit") {
for (const op of message.ops) {
if (op.action == "create" || op.action == "update") {
const cr = await CarReader.fromBytes(message.blocks);
if (op.cid) {
const blocks = cr._blocks.map((block) => cborDecode(block.bytes)) as any[];
if (blocks.length < 2) continue;
const payload = blocks[blocks.length - 2];
if (payload["$type"] != "app.bsky.feed.post") continue;
const payloadDid = blocks[blocks.length - 1];
if (!payloadDid["did"]) continue;
op.post = {
authorDid: payloadDid["did"],
rkey: op.path.split("/")[1],
...payload,
} as Post;
}
}
}
return message;
} else {
return undefined;
}
}
private async handleMessage(data: ArrayBuffer) {
const [header, payload] = decodeMultiple(new Uint8Array(data)) as any;
if (header["op"] == 1) {
const t = header["t"];
if (t && t == "#commit") {
const message = {
$type: `${this.nsid}${t}`,
...payload,
};
const decoded = await this.decode(message);
if (!decoded) return;
for (const op of decoded.ops) {
const post: Post = op.post;
if (!post) continue;
this.onPost(post);
}
}
} else {
this.handleError(header["error"], header["message"]);
}
}
private handleError(error: Error | string, message?: string) {
this.close();
}
private handleClose(code: number | undefined, reason: string | undefined) {
this.closed = true;
this.onClose();
}
}