Skip to content

Commit 11c7d08

Browse files
committed
feat(store-sync): make pending logs sync more resilient
1 parent a68eb17 commit 11c7d08

File tree

6 files changed

+1368
-346
lines changed

6 files changed

+1368
-346
lines changed

.changeset/selfish-wolves-promise.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@latticexyz/store-sync": patch
3+
---
4+
5+
The sync stack now handles downtime in the pending logs API and reconnects once it's available again.

packages/store-sync/src/README.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Approach
2+
3+
#### If pending logs are not available
4+
5+
- Initialize snapshot
6+
- Initialize log stream from latest block
7+
- Catch up logs between snapshot and latest block
8+
- Attempt to stream logs from indexer
9+
- On failure, fallback to streaming logs from RPC
10+
- Release initial, catchup and ongoing stream to subscribers
11+
12+
#### If pending logs are available
13+
14+
- Initialize from snapshot
15+
- Open a pending log stream
16+
- On error recreate the stream
17+
- Open a fallback log stream (indexer or RPC)
18+
- Catch up logs between snapshot and latest block
19+
- Cache processed log indices from pending logs stream
20+
- On every new block from the fallback logs stream
21+
- Verify that all logs have already been processed in the pending logs stream
22+
- If missing logs are found, release the missing logs to subscribers and recreate the pending logs stream
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
import {
2+
catchError,
3+
combineLatest,
4+
concatMap,
5+
from,
6+
map,
7+
mergeMap,
8+
Observable,
9+
of,
10+
Subject,
11+
tap,
12+
throwError,
13+
switchMap,
14+
startWith,
15+
merge,
16+
filter,
17+
} from "rxjs";
18+
import { StorageAdapterBlock, StoreEventsLog, SyncFilter } from "./common";
19+
import { watchLogs } from "./watchLogs";
20+
import { Hex } from "viem";
21+
import { fromEventSource } from "./fromEventSource";
22+
import { isLogsApiResponse } from "./indexer-client/isLogsApiResponse";
23+
import { toStorageAdapterBlock } from "./indexer-client/toStorageAdapterBlock";
24+
import { fetchAndStoreLogs } from "./fetchAndStoreLogs";
25+
import { storeEventsAbi } from "@latticexyz/store";
26+
import { bigIntMax } from "@latticexyz/common/utils";
27+
import { GetRpcClientOptions } from "@latticexyz/block-logs-stream";
28+
import { debug } from "./debug";
29+
30+
type PendingBlockStreamOptions = GetRpcClientOptions & {
31+
fromBlock: bigint;
32+
pendingLogsUrl: string;
33+
indexerUrl?: string;
34+
chainId: number;
35+
address?: Hex;
36+
filters: SyncFilter[];
37+
latestBlockNumber$: Observable<bigint>;
38+
maxBlockRange?: bigint;
39+
};
40+
41+
export function createPendingBlockStream(opts: PendingBlockStreamOptions): Observable<StorageAdapterBlock> {
42+
const recreatePendingStream$ = new Subject<void>();
43+
const recreateLatestStream$ = new Subject<void>();
44+
45+
const processedBlockLogs: { [blockNumber: string]: { [logIndex: number]: boolean } } = {};
46+
let restartBlockNumber = opts.fromBlock;
47+
48+
const latestBlock$ = recreateLatestStream$.pipe(
49+
startWith(undefined),
50+
switchMap(() =>
51+
createLatestBlockStream({ ...opts, fromBlock: restartBlockNumber }).pipe(
52+
catchError((e) => {
53+
debug("Error in latest block stream, recreating", e);
54+
recreateLatestStream$.next();
55+
return throwError(() => e);
56+
}),
57+
),
58+
),
59+
);
60+
61+
const pendingLogs$ = recreatePendingStream$.pipe(
62+
startWith(restartBlockNumber),
63+
switchMap(() =>
64+
watchLogs({
65+
...opts,
66+
url: opts.pendingLogsUrl,
67+
fromBlock: restartBlockNumber,
68+
}).logs$.pipe(
69+
catchError((e) => {
70+
debug("Error in pending logs stream, recreating", e);
71+
recreatePendingStream$.next();
72+
return throwError(() => e);
73+
}),
74+
),
75+
),
76+
tap((block) => {
77+
restartBlockNumber = block.blockNumber;
78+
const seenLogs = (processedBlockLogs[String(block.blockNumber)] ??= {});
79+
block.logs.forEach((log) => {
80+
seenLogs[log.logIndex!] = true;
81+
});
82+
debug("got pending block", block.blockNumber, "with", block.logs.length, "logs");
83+
}),
84+
);
85+
86+
const missingLogs$ = latestBlock$.pipe(
87+
map((block) => {
88+
const seenLogs = processedBlockLogs[String(block.blockNumber)] ?? {};
89+
const missingLogs = block.logs.filter((log) => !seenLogs[log.logIndex!]);
90+
debug(
91+
"got latest block",
92+
block.blockNumber,
93+
"with",
94+
block.logs.length,
95+
"logs (missing",
96+
missingLogs.length,
97+
"logs)",
98+
);
99+
return { blockNumber: block.blockNumber, logs: missingLogs };
100+
}),
101+
tap(({ blockNumber }) => {
102+
delete processedBlockLogs[String(blockNumber)];
103+
restartBlockNumber = blockNumber + 1n;
104+
}),
105+
filter(({ logs }) => logs.length > 0),
106+
tap(({ blockNumber }) => {
107+
debug("missing logs found in latest block", blockNumber, "recreating pending stream");
108+
recreatePendingStream$.next();
109+
}),
110+
);
111+
112+
return merge(pendingLogs$, missingLogs$);
113+
}
114+
115+
// TODO: reduce duplication with indexer/rpc stream in `createStoreSync.ts`
116+
function createLatestBlockStream({
117+
fromBlock,
118+
indexerUrl,
119+
chainId,
120+
address,
121+
filters,
122+
latestBlockNumber$,
123+
maxBlockRange,
124+
...opts
125+
}: PendingBlockStreamOptions): Observable<StorageAdapterBlock> {
126+
const indexerBlocks$ = indexerUrl
127+
? of(indexerUrl).pipe(
128+
mergeMap((indexerUrl) => {
129+
const url = new URL(
130+
`api/logs-live?${new URLSearchParams({
131+
input: JSON.stringify({ chainId, address, filters }),
132+
block_num: fromBlock.toString(),
133+
include_tx_hash: "true",
134+
})}`,
135+
indexerUrl,
136+
);
137+
return fromEventSource<string>(url);
138+
}),
139+
map((messageEvent) => {
140+
const data = JSON.parse(messageEvent.data);
141+
if (!isLogsApiResponse(data)) {
142+
throw new Error("Received unexpected from indexer:" + messageEvent.data);
143+
}
144+
return toStorageAdapterBlock(data);
145+
}),
146+
)
147+
: throwError(() => new Error("No indexer URL provided"));
148+
149+
let lastBlockNumberProcessed = 0n;
150+
const ethRpcBlocks$ = combineLatest([of(fromBlock), latestBlockNumber$]).pipe(
151+
map(([startBlock, endBlock]) => ({ startBlock, endBlock })),
152+
concatMap((range) => {
153+
const storedBlocks = fetchAndStoreLogs({
154+
...opts,
155+
address,
156+
events: storeEventsAbi,
157+
maxBlockRange,
158+
fromBlock: lastBlockNumberProcessed
159+
? bigIntMax(range.startBlock, lastBlockNumberProcessed + 1n)
160+
: range.startBlock,
161+
toBlock: range.endBlock,
162+
logFilter: filters.length
163+
? (log: StoreEventsLog): boolean =>
164+
filters.some(
165+
(filter) =>
166+
filter.tableId === log.args.tableId &&
167+
(filter.key0 == null || filter.key0 === log.args.keyTuple[0]) &&
168+
(filter.key1 == null || filter.key1 === log.args.keyTuple[1]),
169+
)
170+
: undefined,
171+
storageAdapter: () => Promise.resolve(),
172+
});
173+
return from(storedBlocks);
174+
}),
175+
tap((block) => {
176+
lastBlockNumberProcessed = block.blockNumber;
177+
}),
178+
);
179+
180+
const latestBlock$ = indexerBlocks$.pipe(
181+
catchError((error) => {
182+
debug("failed to stream logs from indexer:", error.message);
183+
debug("falling back to streaming logs from ETH RPC");
184+
return ethRpcBlocks$;
185+
}),
186+
);
187+
188+
return latestBlock$;
189+
}

packages/store-sync/src/createStoreSync.ts

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ import {
3232
throwError,
3333
mergeWith,
3434
ignoreElements,
35+
switchMap,
36+
Subject,
37+
startWith,
3538
} from "rxjs";
3639
import { debug as parentDebug } from "./debug";
3740
import { SyncStep } from "./SyncStep";
@@ -41,10 +44,10 @@ import { fromEventSource } from "./fromEventSource";
4144
import { fetchAndStoreLogs } from "./fetchAndStoreLogs";
4245
import { isLogsApiResponse } from "./indexer-client/isLogsApiResponse";
4346
import { toStorageAdapterBlock } from "./indexer-client/toStorageAdapterBlock";
44-
import { watchLogs } from "./watchLogs";
4547
import { getAction } from "viem/utils";
4648
import { getChainId, getTransactionReceipt } from "viem/actions";
4749
import packageJson from "../package.json";
50+
import { createPendingBlockStream } from "./createPendingBlockStream";
4851

4952
const debug = parentDebug.extend("createStoreSync");
5053

@@ -201,7 +204,20 @@ export async function createStoreSync({
201204
);
202205

203206
let latestBlockNumber: bigint | null = null;
204-
const latestBlock$ = createBlockStream({ ...opts, blockTag: followBlockTag }).pipe(shareReplay(1));
207+
const recreateLatestBlockStream$ = new Subject<void>();
208+
const latestBlock$ = recreateLatestBlockStream$.pipe(
209+
startWith(undefined),
210+
switchMap(() =>
211+
createBlockStream({ ...opts, blockTag: followBlockTag }).pipe(
212+
catchError((error) => {
213+
debug("error in latestBlock$, recreating");
214+
recreateLatestBlockStream$.next();
215+
return throwError(() => error);
216+
}),
217+
),
218+
),
219+
shareReplay(1),
220+
);
205221
const latestBlockNumber$ = latestBlock$.pipe(
206222
map((block) => block.number),
207223
tap((blockNumber) => {
@@ -217,15 +233,22 @@ export async function createStoreSync({
217233
const pendingLogsWebSocketUrl = publicClient.chain?.rpcUrls?.wiresaw?.webSocket?.[0];
218234
const storedPendingLogs$ = pendingLogsWebSocketUrl
219235
? startBlock$.pipe(
220-
mergeMap((startBlock) => watchLogs({ url: pendingLogsWebSocketUrl, address, fromBlock: startBlock }).logs$),
236+
switchMap((startBlock) =>
237+
createPendingBlockStream({
238+
...opts,
239+
fromBlock: startBlock,
240+
pendingLogsUrl: pendingLogsWebSocketUrl,
241+
chainId,
242+
filters,
243+
address,
244+
latestBlockNumber$,
245+
indexerUrl,
246+
}),
247+
),
221248
concatMap(async (block) => {
222249
await storageAdapter(block);
223250
return block;
224251
}),
225-
mergeWith(
226-
// The watchLogs API doesn't emit on empty logs, but consumers expect an emission on empty logs
227-
latestBlockNumber$.pipe(map((blockNumber) => ({ blockNumber, logs: [] }))),
228-
),
229252
)
230253
: throwError(() => new Error("No pending logs WebSocket RPC URL provided"));
231254

0 commit comments

Comments
 (0)