From e75ea5e5516b95bc522e1186affb54eef25905c9 Mon Sep 17 00:00:00 2001 From: "E. Cooper" Date: Mon, 28 Oct 2024 10:10:28 -0700 Subject: [PATCH] Add initial event feed support (#290) https://docs.fauna.com/fauna/current/learn/cdc/ --- README.md | 161 +++++++- .../functional/client-configuration.test.ts | 18 +- .../feed-client-configuration.test.ts | 100 +++++ __tests__/functional/feed-client.test.ts | 211 +++++++++++ __tests__/integration/feed.test.ts | 237 ++++++++++++ __tests__/integration/query.test.ts | 4 +- __tests__/unit/retryable.test.ts | 69 ++++ src/client-configuration.ts | 38 +- src/client.ts | 345 +++++++++++++++++- src/http-client/fetch-client.ts | 6 +- src/http-client/http-client.ts | 8 +- src/http-client/node-http2-client.ts | 8 +- src/http-client/paths.ts | 1 + src/index.ts | 8 +- src/util/retryable.ts | 41 +++ src/values/stream.ts | 62 +++- src/wire-protocol.ts | 16 +- 17 files changed, 1264 insertions(+), 69 deletions(-) create mode 100644 __tests__/functional/feed-client-configuration.test.ts create mode 100644 __tests__/functional/feed-client.test.ts create mode 100644 __tests__/integration/feed.test.ts create mode 100644 __tests__/unit/retryable.test.ts create mode 100644 src/util/retryable.ts diff --git a/README.md b/README.md index 3819d065..60943cc8 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,11 @@ See the [Fauna Documentation](https://docs.fauna.com/fauna/current/) for additio - [Query timeout](#query-timeout) - [Client timeout](#client-timeout) - [HTTP/2 session idle timeout](#http2-session-idle-timeout) + - [Event Feeds](#event-feeds) + - [Request an Event Feed](#request-a-event-feed) + - [Iterate on an Event Feed](#iterate-on-a-event-feed) + - [Error handling](#error-handling) + - [Event Feed options](#event-feed-options) - [Event Streaming](#event-streaming) - [Start a stream](#start-a-stream) - [Iterate on a stream](#iterate-on-a-stream) @@ -69,14 +74,12 @@ Stable versions of: - Safari 12.1+ - Edge 79+ - ## API reference API reference documentation for the driver is available at https://fauna.github.io/fauna-js/. The docs are generated using [TypeDoc](https://typedoc.org/). - ## Install The driver is available on [npm](https://www.npmjs.com/package/fauna). You @@ -467,20 +470,142 @@ const client = new Client({ http2_session_idle_ms: 6000 }); > **Warning** > Setting `http2_session_idle_ms` to small values can lead to a race condition where requests cannot be transmitted before the session is closed, yielding `ERR_HTTP2_GOAWAY_SESSION` errors. +## Event Feeds + +The driver supports [Event Feeds](https://docs.fauna.com/fauna/current/learn/cdc/#event-feeds). + +### Request an Event Feed + +An Event Feed asynchronously polls an [event source](https://docs.fauna.com/fauna/current/learn/cdc/#create-an-event-source) for events. + +To get an event source, append `eventSource()` or `eventsOn()` to a set from a +[supported Set](https://docs.fauna.com/fauna/current/reference/streaming_reference/#sets). + +To get paginated events, pass the event source to `feed()`: + +```javascript +const response = await client.query(fql` + let set = Product.all() + + { + initialPage: set.pageSize(10), + eventSource: set.eventSource() + } +`); +const { initialPage, eventSource } = response.data; + +const feed = client.feed(eventSource); +``` + +You can also pass a query that produces an event source directly to `feed()`: + +```javascript +const query = fql`Product.all().eventsOn(.price, .stock)`; + +const feed = client.feed(query); +``` + +### Iterate on a Event Feed + +`feed()` returns a `FeedClient` instance that can act as an `AsyncIterator`. You can use `for await...of` to iterate through all the pages: + +```ts +const query = fql`Product.all().eventsOn(.price, .stock)`; +const feed = client.feed(query); + +for await (const page of feed) { + console.log("Page stats", page.stats); + + for (event in page.events) { + switch (event.type) { + case "update": + case "add": + case "remove": + console.log("Event: ", event); + // ... + break; + } + } +} +``` + +Alternatively, use `flatten()` to get paginated results as a single, flat array: + +```ts +const query = fql`Product.all().eventsOn(.price, .stock)`; +const feed = client.feed(query); + +for await (const event of feed.flatten()) { + console.log("Event: ", event); +} +``` + +### Error handling + +Exceptions can be raised at two different places: + +1. While fetching a page +1. While iterating a page's events + +This distinction allows for you to ignore errors originating from event processing. +For example: + +```ts +const feed = client.feed(fql` + Product.all().map(.details.toUpperCase()).eventSource() +`); + +try { + for await (const page of feed) { + // Pages will stop at the first error encountered. + // Therefore, its safe to handle an event failures + // and then pull more pages. + try { + for (const event of page.events) { + console.log("Event: ", event); + } + } catch (error: unknown) { + console.log("Feed event error: ", error); + } + } +} catch (error: unknown) { + console.log("Non-retryable error: ", error); +} +``` + +### Event Feed options + +The client configuration sets the default options for `feed()`. You can pass a `FeedClientConfiguration` object to override these defaults: + +```ts +const options: FeedClientConfiguration = { + long_type: "number", + max_attempts: 5, + max_backoff: 1000, + query_timeout_ms: 5000, + client_timeout_buffer_ms: 5000, + secret: "FAUNA_SECRET", + cursor: undefined, + start_ts: undefined, +}; + +client.feed(fql`Product.all().eventSource()`, options); +``` + ## Event Streaming The driver supports [Event Streaming](https://docs.fauna.com/fauna/current/learn/streaming). ### Start a stream -To get a stream token, append -[`toStream()`](https://docs.fauna.com/fauna/current/reference/reference/schema_entities/set/tostream) +To get an event source, append +[`eventSource()`](https://docs.fauna.com/fauna/current/reference/reference/schema_entities/set/eventsource) or -[`changesOn()`](https://docs.fauna.com/fauna/current/reference/reference/schema_entities/set/changeson) +[`eventsOn()`](https://docs.fauna.com/fauna/current/reference/reference/schema_entities/set/eventson) to a set from a [supported -source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources). +source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#sets). -To start and subscribe to the stream, pass the stream token to `stream()`: +To start and subscribe to the stream, pass the event source to `stream()`: ```javascript const response = await client.query(fql` @@ -488,18 +613,18 @@ const response = await client.query(fql` { initialPage: set.pageSize(10), - streamToken: set.toStream() + eventSource: set.eventSource() } `); -const { initialPage, streamToken } = response.data; +const { initialPage, eventSource } = response.data; -client.stream(streamToken); +client.stream(eventSource); ``` -You can also pass a query that produces a stream token directly to `stream()`: +You can also pass a query that produces an event source directly to `stream()`: ```javascript -const query = fql`Product.all().changesOn(.price, .stock)`; +const query = fql`Product.all().eventsOn(.price, .stock)`; client.stream(query); ``` @@ -515,7 +640,7 @@ try { case "update": case "add": case "remove": - console.log("Stream event:", event); + console.log("Event: ", event); // ... break; } @@ -537,7 +662,7 @@ stream.start( case "update": case "add": case "remove": - console.log("Stream event:", event); + console.log("Event: ", event); // ... break; } @@ -556,11 +681,11 @@ stream.start( Use `close()` to close a stream: ```javascript -const stream = await client.stream(fql`Product.all().toStream()`); +const stream = await client.stream(fql`Product.all().eventSource()`); let count = 0; for await (const event of stream) { - console.log("Stream event:", event); + console.log("Event: ", event); // ... count++; @@ -589,7 +714,7 @@ const options: StreamClientConfiguration = { cursor: null, }; -client.stream(fql`Product.all().toStream()`, options); +client.stream(fql`Product.all().eventSource()`, options); ``` For supported properties, see @@ -598,7 +723,7 @@ in the API reference. ## Contributing -Any contributions are from the community are greatly appreciated! +Any contributions from the community are greatly appreciated! If you have a suggestion that would make this better, please fork the repo and create a pull request. You may also simply open an issue. We provide templates, so please complete those to the best of your ability. diff --git a/__tests__/functional/client-configuration.test.ts b/__tests__/functional/client-configuration.test.ts index 96caf3a9..8eb26548 100644 --- a/__tests__/functional/client-configuration.test.ts +++ b/__tests__/functional/client-configuration.test.ts @@ -24,7 +24,7 @@ describe("ClientConfiguration", () => { process.env["FAUNA_SECRET"] = "foo"; const client = new Client(); expect(Buffer.from(JSON.stringify(client)).toString()).not.toContain( - "secret" + "secret", ); }); @@ -33,11 +33,11 @@ describe("ClientConfiguration", () => { const client = new Client({ secret: "secret" }); expect(client.clientConfiguration.endpoint?.toString()).toEqual( - "https://localhost:9999/" + "https://localhost:9999/", ); }); - it("Client respectes passed in client configuration over defaults", () => { + it("Client respects passed in client configuration over defaults", () => { // TODO: when the Client accepts an http client add a mock that validates // the configuration changes were applied. }); @@ -51,7 +51,7 @@ describe("ClientConfiguration", () => { if ("message" in e) { expect(e.message).toEqual( "You must provide a secret to the driver. Set it in \ -an environmental variable named FAUNA_SECRET or pass it to the Client constructor." +an environmental variable named FAUNA_SECRET or pass it to the Client constructor.", ); } } @@ -117,10 +117,10 @@ an environmental variable named FAUNA_SECRET or pass it to the Client constructo expect(req.headers["x-query-timeout-ms"]).toEqual("5000"); const _expectedHeader = expectedHeader; expect(req.headers[_expectedHeader.key]).toEqual( - _expectedHeader.value + _expectedHeader.value, ); return getDefaultHTTPClient(getDefaultHTTPClientOptions()).request( - req + req, ); }, @@ -132,11 +132,11 @@ an environmental variable named FAUNA_SECRET or pass it to the Client constructo query_timeout_ms: 5000, [fieldName]: fieldValue, }, - httpClient + httpClient, ); await client.query(fql`"taco".length`); client.close(); - } + }, ); it("can accept endpoints with or without a trailing slash.", async () => { @@ -173,7 +173,7 @@ an environmental variable named FAUNA_SECRET or pass it to the Client constructo } finally { client?.close(); } - } + }, ); it("throws a RangeError if 'client_timeout_buffer_ms' is less than or equal to zero", async () => { diff --git a/__tests__/functional/feed-client-configuration.test.ts b/__tests__/functional/feed-client-configuration.test.ts new file mode 100644 index 00000000..a0c25069 --- /dev/null +++ b/__tests__/functional/feed-client-configuration.test.ts @@ -0,0 +1,100 @@ +import { + StreamToken, + getDefaultHTTPClient, + FeedClientConfiguration, + FeedClient, +} from "../../src"; +import { getDefaultHTTPClientOptions } from "../client"; + +const defaultHttpClient = getDefaultHTTPClient(getDefaultHTTPClientOptions()); +const defaultConfig: FeedClientConfiguration = { + secret: "secret", + long_type: "number", + max_attempts: 3, + max_backoff: 20, + query_timeout_ms: 5000, + client_timeout_buffer_ms: 5000, + httpClient: defaultHttpClient, +}; +const dummyStreamToken = new StreamToken("dummy"); + +describe("FeedClientConfiguration", () => { + it("can be instantiated directly with a token", () => { + new FeedClient(dummyStreamToken, defaultConfig); + }); + + it("can be instantiated directly with a lambda", async () => { + new FeedClient(() => Promise.resolve(dummyStreamToken), defaultConfig); + }); + + it("throws a RangeError if 'max_backoff' is less than or equal to zero", async () => { + expect.assertions(1); + + const config = { ...defaultConfig, max_backoff: 0 }; + try { + new FeedClient(dummyStreamToken, config); + } catch (e: any) { + expect(e).toBeInstanceOf(RangeError); + } + }); + + it.each` + fieldName + ${"long_type"} + ${"httpClient"} + ${"max_backoff"} + ${"max_attempts"} + ${"client_timeout_buffer_ms"} + ${"query_timeout_ms"} + ${"secret"} + `( + "throws a TypeError if $fieldName provided is undefined", + async ({ fieldName }: { fieldName: keyof FeedClientConfiguration }) => { + expect.assertions(1); + + const config = { ...defaultConfig }; + delete config[fieldName]; + try { + new FeedClient(dummyStreamToken, config); + } catch (e: any) { + expect(e).toBeInstanceOf(TypeError); + } + }, + ); + + it("throws a RangeError if 'max_attempts' is less than or equal to zero", async () => { + expect.assertions(1); + + const config = { ...defaultConfig, max_attempts: 0 }; + try { + new FeedClient(dummyStreamToken, config); + } catch (e: any) { + expect(e).toBeInstanceOf(RangeError); + } + }); + + it("throws a TypeError is start_ts and cursor are both provided", async () => { + const config = { ...defaultConfig, start_ts: 1, cursor: "cursor" }; + expect(() => { + new FeedClient(dummyStreamToken, config); + }).toThrow(TypeError); + }); + + it("throws a RangeError if 'query_timeout_ms' is less than or equal to zero", async () => { + const config = { ...defaultConfig, query_timeout_ms: 0 }; + try { + new FeedClient(dummyStreamToken, config); + } catch (e: any) { + expect(e).toBeInstanceOf(RangeError); + } + }); + + it("throws a RangeError if 'client_timeout_buffer_ms' is less than or equal to zero", async () => { + const config = { ...defaultConfig, client_timeout_buffer_ms: 0 }; + try { + new FeedClient(dummyStreamToken, config); + } catch (e: any) { + expect(e).toBeInstanceOf(RangeError); + } + }); +}); diff --git a/__tests__/functional/feed-client.test.ts b/__tests__/functional/feed-client.test.ts new file mode 100644 index 00000000..a31ef149 --- /dev/null +++ b/__tests__/functional/feed-client.test.ts @@ -0,0 +1,211 @@ +import { + FeedClient, + FeedClientConfiguration, + QueryRuntimeError, + EventSource, + StreamToken, + ThrottlingError, +} from "../../src"; + +const mockHttpResponse = { + status: 200, + body: JSON.stringify({ + events: [], + cursor: "cursor=", + has_next: false, + stats: { + test: "test", + }, + }), + headers: { + ":status": 200, + "content-type": "application/json;charset=utf-8", + }, +}; +const mockHttpClient = { + request: jest + .fn() + .mockImplementation(() => Promise.resolve({ ...mockHttpResponse })), + close: jest.fn(), +}; +const defaultConfig: FeedClientConfiguration = { + secret: "secret", + long_type: "number", + max_attempts: 3, + max_backoff: 20, + query_timeout_ms: 5000, + client_timeout_buffer_ms: 5000, + httpClient: mockHttpClient, +}; +const testEventSource: EventSource = new StreamToken("dummy"); + +const fromAsync = async (iterator: AsyncIterable): Promise => { + const res: T[] = []; + for await (const item of iterator) { + res.push(item); + } + return res; +}; + +describe("FeedClient", () => { + afterEach(() => { + jest.clearAllMocks(); + }); + + it("returns a valid page of events", async () => { + mockHttpClient.request.mockImplementationOnce(() => + Promise.resolve({ + ...mockHttpResponse, + body: JSON.stringify({ + cursor: "gsGCGmcGl+0aJPRzAAA=", + has_next: true, + events: [ + { + type: "add", + data: { + "@doc": { + id: "411279302456246784", + coll: { "@mod": "FeedTest" }, + ts: { "@time": "2024-10-09T14:49:17.620Z" }, + value: { "@int": "1" }, + }, + }, + txn_ts: 1728485357620000, + cursor: "gsGCGmcGl+0aJPRzAAA=", + stats: { + read_ops: 1, + storage_bytes_read: 66, + compute_ops: 1, + processing_time_ms: 0, + rate_limits_hit: [], + }, + }, + ], + }), + }), + ); + + const feed = new FeedClient(testEventSource, defaultConfig); + const firstPage = await feed.nextPage(); + + expect(firstPage.cursor).toBe("gsGCGmcGl+0aJPRzAAA="); + expect(firstPage.hasNext).toBe(true); + + const events = Array.from(firstPage.events); + expect(events).toHaveLength(1); + expect(events[0].type).toBe("add"); + expect(events[0].data).toEqual(expect.any(Object)); + + const secondPage = await feed.nextPage(); + + expect(secondPage.cursor).toBe("cursor="); + expect(secondPage.hasNext).toBe(false); + expect(Array.from(secondPage.events)).toHaveLength(0); + }); + + it("uses a valid HTTPRequest", async () => { + const feed = new FeedClient(testEventSource, defaultConfig); + await fromAsync(feed); + + expect(mockHttpClient.request).toHaveBeenCalledWith( + expect.objectContaining({ + client_timeout_ms: 10000, + headers: expect.objectContaining({ + Authorization: "Bearer secret", + "x-format": "tagged", + "x-driver-env": expect.any(String), + "x-query-timeout-ms": "5000", + }), + data: { + token: "dummy", + }, + }), + ); + }); + + it("uses page_size when set", async () => { + const feed = new FeedClient(testEventSource, { + ...defaultConfig, + page_size: 10, + }); + await fromAsync(feed.flatten()); + + expect(mockHttpClient.request).toHaveBeenCalledWith( + expect.objectContaining({ + data: { + token: "dummy", + page_size: 10, + }, + }), + ); + }); + + it("uses cursor when set", async () => { + const feed = new FeedClient(testEventSource, { + ...defaultConfig, + cursor: "some-cursor=", + }); + await fromAsync(feed.flatten()); + + expect(mockHttpClient.request).toHaveBeenCalledWith( + expect.objectContaining({ + data: { + token: "dummy", + cursor: "some-cursor=", + }, + }), + ); + }); + + it("uses start_ts when set", async () => { + const startTs = Date.now(); + const feed = new FeedClient(testEventSource, { + ...defaultConfig, + start_ts: startTs, + }); + + await fromAsync(feed.flatten()); + + expect(mockHttpClient.request).toHaveBeenCalledWith( + expect.objectContaining({ + data: { + token: "dummy", + start_ts: startTs, + }, + }), + ); + }); + + it("retries throttling errors", async () => { + mockHttpClient.request.mockImplementationOnce(() => + Promise.reject( + new ThrottlingError({ + error: { + code: "throttled", + message: "test", + }, + }), + ), + ); + + const feed = new FeedClient(testEventSource, defaultConfig); + await fromAsync(feed.flatten()); + + expect(mockHttpClient.request).toHaveBeenCalledTimes(2); + }); + + it("throws an error for an error response", () => { + mockHttpClient.request.mockImplementationOnce(() => + Promise.resolve({ + ...mockHttpResponse, + status: 400, + body: JSON.stringify({ + error: { code: "test", message: "test" }, + }), + }), + ); + + const feed = new FeedClient(testEventSource, defaultConfig); + expect(fromAsync(feed.flatten())).rejects.toThrow(QueryRuntimeError); + }); +}); diff --git a/__tests__/integration/feed.test.ts b/__tests__/integration/feed.test.ts new file mode 100644 index 00000000..7426923b --- /dev/null +++ b/__tests__/integration/feed.test.ts @@ -0,0 +1,237 @@ +import { + Client, + FeedClient, + FeedClientConfiguration, + EventSource, + fql, + getDefaultHTTPClient, + ClientError, + AbortError, + FeedPage, + QueryTimeoutError, + NetworkError, +} from "../../src"; +import { + getClient, + getDefaultHTTPClientOptions, + getDefaultSecretAndEndpoint, +} from "../client"; + +const defaultHttpClient = getDefaultHTTPClient(getDefaultHTTPClientOptions()); +const { secret } = getDefaultSecretAndEndpoint(); + +let client: Client; +const TEST_DB_NAME = "FeedTestDB"; +const TEST_SECRET = `${secret}:${TEST_DB_NAME}:admin`; +const defaultFeedConfig: FeedClientConfiguration = { + secret: TEST_SECRET, + long_type: "number", + max_attempts: 3, + max_backoff: 20, + httpClient: defaultHttpClient, + client_timeout_buffer_ms: 5000, + query_timeout_ms: 5000, +}; + +type FeedTest = { value: number }; + +const fromAsync = async (iterator: AsyncIterable): Promise => { + const res: T[] = []; + for await (const item of iterator) { + res.push(item); + } + return res; +}; + +beforeAll(async () => { + const rootClient = getClient(); + + await rootClient.query(fql` + if (Database.byName(${TEST_DB_NAME}) == null) { + Database.create({ name: ${TEST_DB_NAME} }) + } + `); + + client = getClient({ secret: TEST_SECRET }); + + await client.query(fql` + if (Collection.byName("FeedTest") != null) { + Collection.byName("FeedTest")!.delete() + } + `); + + await client.query(fql` + Collection.create({ name: "FeedTest" }); + `); +}); + +afterAll(() => { + if (client) { + client.close(); + } +}); + +afterEach(async () => { + await client.query(fql` + FeedTest.all().forEach(d => d.delete()); + `); +}); + +describe("Client", () => { + it("should throw a ClientError if not using a stream token", async () => { + await expect(async () => { + const res: FeedPage[] = []; + for await (const page of client.feed(fql`1+1`)) { + res.push(page); + } + }).rejects.toThrow(ClientError); + }); + + it("should return a iterable feed from a stream token", async () => { + const token = await client.query(fql`FeedTest.all().eventSource()`); + const Feed = client.feed(token.data); + + await client.query( + fql`Set.sequence(0, 3).forEach(v => FeedTest.create({ value: v + 1}));`, + ); + const pages = await fromAsync(Feed); + + expect(pages).toHaveLength(1); + + const events = Array.from(pages[0].events); + expect(events).toHaveLength(3); + expect(events[0].type).toEqual("add"); + expect(events[0].data.value).toEqual(1); + }); + + it("should return an iterable feed with a lambda", async () => { + const Feed = client.feed(fql`FeedTest.all().eventSource()`); + + await client.query(fql`FeedTest.create({ value: 1})`); + + const pages = await fromAsync(Feed); + + // Lambdas are evaluated lazily, so we should get an one page of empty events unless using start_ts + expect(pages).toHaveLength(1); + expect(Array.from(pages[0].events)).toHaveLength(0); + }); + + it("should pass configuration to the feed client", async () => { + // First document, which we don't want to include in the feed + const startAt = (await client.query(fql`FeedTest.create({ value: 1})`)) + .txn_ts; + // Second batch of documents we do want to include in the feed + await client.query( + fql`Set.sequence(0, 3).forEach(v => FeedTest.create({ value: v + 1}));`, + ); + + const pages = await fromAsync( + client.feed(fql`FeedTest.all().eventSource()`, { + page_size: 1, + start_ts: startAt, + }), + ); + + // Page size of 1 means we should get 3 pages of 1 event. + expect(pages).toHaveLength(3); + expect(Array.from(pages[0].events)).toHaveLength(1); + }); +}); + +describe("FeedClient", () => { + it("can be instantiated directly with a token and client configuration", async () => { + const startAt = (await client.query(fql`FeedTest.create({ value: 1})`)) + .txn_ts; + await client.query( + fql`Set.sequence(0, 3).forEach(v => FeedTest.create({ value: v + 1}));`, + ); + + const token = await client.query( + fql`FeedTest.all().eventSource()`, + ); + const Feed = new FeedClient(token.data, { + ...defaultFeedConfig, + start_ts: startAt, + page_size: 1, + }); + const pages = await fromAsync(Feed); + + expect(pages).toHaveLength(3); + + const events = Array.from(pages[0].events); + expect(events).toHaveLength(1); + expect(events[0].data.value).toEqual(1); + }); + + it("can pass an existing cursor", async () => { + const token = await client.query( + fql`FeedTest.all().eventSource()`, + ); + const Feed = new FeedClient(token.data, { + ...defaultFeedConfig, + page_size: 1, + }); + + await client.query( + fql`Set.sequence(0, 3).forEach(v => FeedTest.create({ value: v + 1}));`, + ); + + const firstPage = await Feed[Symbol.asyncIterator]().next(); + + const FeedWithCursor = new FeedClient(token.data, { + ...defaultFeedConfig, + cursor: firstPage.value.cursor, + }); + const pages = await fromAsync(FeedWithCursor); + + expect(pages).toHaveLength(1); + + const events = Array.from(pages[0].events); + expect(events).toHaveLength(2); + }); + + it("throws an error on an error event within a page of events", async () => { + const token = await client.query( + fql`FeedTest.all().map(_ => abort('oops')).eventSource()`, + ); + const Feed = new FeedClient(token.data, defaultFeedConfig); + + await client.query(fql`FeedTest.create({ value: 1})`); + + expect(fromAsync(Feed.flatten())).rejects.toThrow(AbortError); + }); + + it("can return a flattened array of events", async () => { + const token = await client.query(fql`FeedTest.all().eventSource()`); + const Feed = new FeedClient(token.data, defaultFeedConfig); + + await client.query( + fql`Set.sequence(0, 3).forEach(v => FeedTest.create({ value: v + 1}));`, + ); + + const events = await fromAsync(Feed.flatten()); + + expect(events).toHaveLength(3); + }); + + it("throws a QueryTimeoutError if the query times out", async () => { + const token = await client.query(fql`FeedTest.all().eventSource()`); + const feed = new FeedClient(token.data, { + ...defaultFeedConfig, + query_timeout_ms: 1, + }); + + await expect(fromAsync(feed.flatten())).rejects.toThrow(QueryTimeoutError); + }); + + it("throws a NetworkError if the client times out", async () => { + const token = await client.query(fql`FeedTest.all().eventSource()`); + const feed = new FeedClient(token.data, { + ...defaultFeedConfig, + query_timeout_ms: 1, + client_timeout_buffer_ms: 0, + }); + + await expect(fromAsync(feed.flatten())).rejects.toThrow(NetworkError); + }); +}); diff --git a/__tests__/integration/query.test.ts b/__tests__/integration/query.test.ts index 9a102074..1641a226 100644 --- a/__tests__/integration/query.test.ts +++ b/__tests__/integration/query.test.ts @@ -369,8 +369,8 @@ describe("query", () => { const httpClient = getDefaultHTTPClient(getDefaultHTTPClientOptions()); const badHTTPClient = { - async request(req: HTTPRequest) { - const badRequest: HTTPRequest = { + async request(req: HTTPRequest) { + const badRequest: HTTPRequest = { ...req, client_timeout_ms: 1, }; diff --git a/__tests__/unit/retryable.test.ts b/__tests__/unit/retryable.test.ts new file mode 100644 index 00000000..ccc9c460 --- /dev/null +++ b/__tests__/unit/retryable.test.ts @@ -0,0 +1,69 @@ +import { withRetries } from "../../src/util/retryable"; + +describe("retryable", () => { + it("should retry", async () => { + const fn = jest + .fn() + .mockImplementation(() => { + return true; + }) + .mockImplementationOnce(() => { + throw new Error("error"); + }); + const res = await withRetries(fn, { maxAttempts: 3, maxBackoff: 2 }); + + expect(res).toBe(true); + expect(fn).toHaveBeenCalledTimes(2); + }); + + it("should throw the last error if attempts are greater than or equal to maxAttempts", async () => { + const fn = jest.fn().mockImplementation(() => { + throw new Error("error"); + }); + + await expect( + withRetries(fn, { maxAttempts: 3, maxBackoff: 2 }), + ).rejects.toThrow("error"); + expect(fn).toHaveBeenCalledTimes(3); + }); + + it("should not retry if shouldRetry returns false", async () => { + const fn = jest.fn().mockImplementation(() => { + throw new Error("error"); + }); + const shouldRetry = jest + .fn() + .mockImplementation(() => false) + .mockImplementationOnce(() => true); + + await expect( + withRetries(fn, { maxAttempts: 3, maxBackoff: 2, shouldRetry }), + ).rejects.toThrow("error"); + expect(fn).toHaveBeenCalledTimes(2); + expect(shouldRetry).toHaveBeenCalledTimes(2); + }); + + it("should backoff", async () => { + const fn = jest.fn().mockImplementation(() => { + throw new Error("max attempts reached"); + }); + + const mockSleep = jest.fn().mockImplementation((fn) => { + fn(); + }); + + await expect( + withRetries(fn, { maxAttempts: 3, maxBackoff: 2, sleepFn: mockSleep }), + ).rejects.toThrow("max attempts reached"); + + expect(mockSleep).toHaveBeenCalledTimes(2); + expect(mockSleep).toHaveBeenNthCalledWith(1, expect.any(Function), 0); // first attempt + expect(mockSleep).toHaveBeenNthCalledWith( + 2, + expect.any(Function), + expect.any(Number), + ); // second attempt + expect(mockSleep.mock.calls[1][1]).toBeGreaterThan(0); // second attempt is greater than 0 + expect(fn).toHaveBeenCalledTimes(3); + }); +}); diff --git a/src/client-configuration.ts b/src/client-configuration.ts index ecc9d887..e237d931 100644 --- a/src/client-configuration.ts +++ b/src/client-configuration.ts @@ -1,4 +1,4 @@ -import { HTTPStreamClient } from "./http-client"; +import { HTTPClient, HTTPStreamClient } from "./http-client"; import type { ValueFormat } from "./wire-protocol"; /** @@ -204,6 +204,42 @@ export type StreamClientConfiguration = { cursor?: string; }; +/** + * Configuration for an event feed client. + */ +export type FeedClientConfiguration = Required< + Pick< + ClientConfiguration, + | "long_type" + | "max_attempts" + | "max_backoff" + | "client_timeout_buffer_ms" + | "query_timeout_ms" + | "secret" + > +> & { + /** + * The underlying {@link HTTPClient} that will execute the actual HTTP calls + */ + httpClient: HTTPClient; + + /** + * The starting timestamp of the event feed, exclusive. If set, Fauna will return events starting after + the timestamp. + */ + start_ts?: number; + + /** + * The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor. + */ + cursor?: string; + + /** + * The desired number of events per page. + */ + page_size?: number; +}; + /** * A extensible set of endpoints for calling Fauna. * @remarks Most clients will will not need to extend this set. diff --git a/src/client.ts b/src/client.ts index 99b2a3e2..e4c9d3a1 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,4 +1,5 @@ import { + FeedClientConfiguration, ClientConfiguration, StreamClientConfiguration, endpoints, @@ -20,12 +21,24 @@ import { isStreamClient, isHTTPResponse, type HTTPClient, + HTTPRequest, + FaunaAPIPaths, } from "./http-client"; import { Query } from "./query-builder"; import { TaggedTypeFormat } from "./tagged-type"; import { getDriverEnv } from "./util/environment"; -import { EmbeddedSet, Page, SetIterator, StreamToken } from "./values"; +import { withRetries } from "./util/retryable"; import { + FeedPage, + EmbeddedSet, + Page, + SetIterator, + EventSource, + isEventSource, +} from "./values"; +import { + FeedRequest, + FeedSuccess, EncodedObject, isQueryFailure, isQuerySuccess, @@ -36,6 +49,7 @@ import { StreamEventStatus, type QuerySuccess, type QueryValue, + FeedError, } from "./wire-protocol"; type RequiredClientConfig = ClientConfiguration & @@ -155,7 +169,7 @@ export class Client { } /** - * Closes the underlying HTTP client. Subsquent query or close calls + * Closes the underlying HTTP client. Subsequent query or close calls * will fail. */ close() { @@ -278,7 +292,7 @@ export class Client { * * @example * ```javascript - * const stream = client.stream(fql`MyCollection.all().toStream()`) + * const stream = client.stream(fql`MyCollection.all().eventSource()`) * * try { * for await (const event of stream) { @@ -302,7 +316,7 @@ export class Client { * * @example * ```javascript - * const stream = client.stream(fql`MyCollection.all().toStream()`) + * const stream = client.stream(fql`MyCollection.all().eventSource()`) * * stream.start( * function onEvent(event) { @@ -326,7 +340,7 @@ export class Client { * ``` */ stream( - tokenOrQuery: StreamToken | Query, + tokenOrQuery: EventSource | Query, options?: Partial, ): StreamClient { if (this.#isClosed) { @@ -355,7 +369,7 @@ export class Client { const tokenOrGetToken = tokenOrQuery instanceof Query - ? () => this.query(tokenOrQuery).then((res) => res.data) + ? () => this.query(tokenOrQuery).then((res) => res.data) : tokenOrQuery; return new StreamClient(tokenOrGetToken, streamClientConfig); @@ -364,6 +378,79 @@ export class Client { } } + /** + * Initialize a event feed in Fauna and returns an asynchronous iterator of + * feed events. + * @typeParam T - The expected type of the response from Fauna. T can be inferred + * if the provided query used a type parameter. + * @param query - A string-encoded streaming token, or a {@link Query} + * @returns A {@link FeedClient} that which can be used to listen to a feed + * of events + * + * @example + * ```javascript + * const feed = client.feed(fql`MyCollection.all().eventSource()`) + * + * try { + * for await (const page of feed) { + * for (const event of page.events) { + * // ... handle event + * } + * } + * } catch (error) { + * // An error will be handled here if Fauna returns a terminal, "error" event, or + * // if Fauna returns a non-200 response when trying to connect, or + * // if the max number of retries on network errors is reached. + * + * // ... handle fatal error + * }; + * ``` + * @example + * The {@link FeedClient.flatten} method can be used so the iterator yields + * events directly. Each event is fetched asynchronously and hides when + * additional pages are fetched. + * + * ```javascript + * const feed = client.feed(fql`MyCollection.all().eventSource()`) + * + * for await (const user of feed.flatten()) { + * // do something with each event + * } + * ``` + */ + feed( + tokenOrQuery: EventSource | Query, + options?: Partial, + ): FeedClient { + if (this.#isClosed) { + throw new ClientClosedError( + "Your client is closed. No further requests can be issued.", + ); + } + + const clientConfiguration: FeedClientConfiguration = { + ...this.#clientConfiguration, + httpClient: this.#httpClient, + ...options, + }; + + if ( + clientConfiguration.cursor !== undefined && + tokenOrQuery instanceof Query + ) { + throw new ClientError( + "The `cursor` configuration can only be used with a stream token.", + ); + } + + const tokenOrGetToken = + tokenOrQuery instanceof Query + ? () => this.query(tokenOrQuery).then((res) => res.data) + : tokenOrQuery; + + return new FeedClient(tokenOrGetToken, clientConfiguration); + } + async #queryWithRetries( queryRequest: QueryRequest, queryOptions?: QueryOptions, @@ -641,33 +728,33 @@ export class StreamClient { #clientConfiguration: StreamClientConfiguration; /** A tracker for the number of connection attempts */ #connectionAttempts = 0; - /** A lambda that returns a promise for a {@link StreamToken} */ - #query: () => Promise; + /** A lambda that returns a promise for a {@link EventSource} */ + #query: () => Promise; /** The last `txn_ts` value received from events */ #last_ts?: number; /** The last `cursor` value received from events */ #last_cursor?: string; /** A common interface to operate a stream from any HTTPStreamClient */ #streamAdapter?: StreamAdapter; - /** A saved copy of the StreamToken once received */ - #streamToken?: StreamToken; + /** A saved copy of the EventSource once received */ + #eventSource?: EventSource; /** * - * @param query - A lambda that returns a promise for a {@link StreamToken} + * @param query - A lambda that returns a promise for a {@link EventSource} * @param clientConfiguration - The {@link ClientConfiguration} to apply * @param httpStreamClient - The underlying {@link HTTPStreamClient} that will * execute the actual HTTP calls * @example * ```typescript - * const streamClient = client.stream(streamToken); + * const streamClient = client.stream(eventSource); * ``` */ constructor( - token: StreamToken | (() => Promise), + token: EventSource | (() => Promise), clientConfiguration: StreamClientConfiguration, ) { - if (token instanceof StreamToken) { + if (isEventSource(token)) { this.#query = () => Promise.resolve(token); } else { this.#query = token; @@ -720,11 +807,11 @@ export class StreamClient { throw new ClientError("The stream has been closed and cannot be reused."); } - if (!this.#streamToken) { - this.#streamToken = await this.#query().then((maybeStreamToken) => { - if (!(maybeStreamToken instanceof StreamToken)) { + if (!this.#eventSource) { + this.#eventSource = await this.#query().then((maybeStreamToken) => { + if (!isEventSource(maybeStreamToken)) { throw new ClientError( - `Error requesting a stream token. Expected a StreamToken as the query result, but received ${typeof maybeStreamToken}. Your query must return the result of '.toStream' or '.changesOn')\n` + + `Error requesting a stream token. Expected a EventSource as the query result, but received ${typeof maybeStreamToken}. Your query must return the result of '.eventSource' or '.eventsOn')\n` + `Query result: ${JSON.stringify(maybeStreamToken, null)}`, ); } @@ -776,7 +863,7 @@ export class StreamClient { StreamEventData | StreamEventStatus > { // Safety: This method must only be called after a stream token has been acquired - const streamToken = this.#streamToken as StreamToken; + const eventSource = this.#eventSource as EventSource; const headers = { Authorization: `Bearer ${this.#clientConfiguration.secret}`, @@ -784,7 +871,7 @@ export class StreamClient { const streamAdapter = this.#clientConfiguration.httpStreamClient.stream({ data: { - token: streamToken.token, + token: eventSource.token, cursor: this.#last_cursor || this.#clientConfiguration.cursor, }, headers, @@ -852,6 +939,224 @@ export class StreamClient { } } +/** + * A class to iterate through to a Fauna event feed. + */ +export class FeedClient { + /** A static copy of the driver env header to send with each request */ + static readonly #driverEnvHeader = getDriverEnv(); + /** A lambda that returns a promise for a {@link EventSource} */ + #query: () => Promise; + /** The event feed's client options */ + #clientConfiguration: FeedClientConfiguration; + /** The last `cursor` value received for the current page */ + #lastCursor?: string; + /** A saved copy of the EventSource once received */ + #eventSource?: EventSource; + /** Whether or not another page can be fetched by the client */ + #isDone?: boolean; + + /** + * + * @param query - A lambda that returns a promise for a {@link EventSource} + * @param clientConfiguration - The {@link FeedClientConfiguration} to apply + * @example + * ```typescript + * const feed = client.feed(eventSource); + * ``` + */ + constructor( + token: EventSource | (() => Promise), + clientConfiguration: FeedClientConfiguration, + ) { + if (isEventSource(token)) { + this.#query = () => Promise.resolve(token); + } else { + this.#query = token; + } + + this.#clientConfiguration = clientConfiguration; + this.#lastCursor = clientConfiguration.cursor; + + this.#validateConfiguration(); + } + + #getHeaders(): Record { + return { + Authorization: `Bearer ${this.#clientConfiguration.secret}`, + "x-format": "tagged", + "x-driver-env": FeedClient.#driverEnvHeader, + "x-query-timeout-ms": + this.#clientConfiguration.query_timeout_ms.toString(), + }; + } + + async #nextPageHttpRequest() { + // If we never resolved the stream token, do it now since we need it here when + // building the payload + if (!this.#eventSource) { + this.#eventSource = await this.#resolveEventSource(this.#query); + } + + const headers = this.#getHeaders(); + + const req: HTTPRequest = { + headers, + client_timeout_ms: + this.#clientConfiguration.client_timeout_buffer_ms + + this.#clientConfiguration.query_timeout_ms, + data: { + token: this.#eventSource.token, + }, + method: "POST", + path: FaunaAPIPaths.EVENT_FEED, + }; + + // Set the page size if it is available + if (this.#clientConfiguration.page_size) { + req.data.page_size = this.#clientConfiguration.page_size; + } + + // If we have a cursor, use that. Otherwise, use the start_ts if available. + // When the config is validated, if both are set, an error is thrown. + if (this.#lastCursor) { + req.data.cursor = this.#lastCursor; + } else if (this.#clientConfiguration.start_ts) { + req.data.start_ts = this.#clientConfiguration.start_ts; + } + + return req; + } + + async *[Symbol.asyncIterator](): AsyncGenerator> { + while (!this.#isDone) { + yield await this.nextPage(); + } + } + + /** + * Fetches the next page of the event feed. If there are no more pages to + * fetch, this method will throw a {@link ClientError}. + */ + async nextPage(): Promise> { + if (this.#isDone) { + throw new ClientError("The event feed has no more pages to fetch."); + } + + const { httpClient } = this.#clientConfiguration; + + const request = await this.#nextPageHttpRequest(); + const response = await withRetries(() => httpClient.request(request), { + maxAttempts: this.#clientConfiguration.max_attempts, + maxBackoff: this.#clientConfiguration.max_backoff, + shouldRetry: (error) => error instanceof ThrottlingError, + }); + + let body: FeedSuccess | FeedError; + + try { + body = TaggedTypeFormat.decode(response.body, { + long_type: this.#clientConfiguration.long_type, + }); + } catch (error: unknown) { + throw new ProtocolError({ + message: `Error parsing response as JSON: ${error}`, + httpStatus: response.status, + }); + } + + if (isQueryFailure(body)) { + throw getServiceError(body, response.status); + } + + const page = new FeedPage(body); + this.#lastCursor = page.cursor; + this.#isDone = !page.hasNext; + + return page; + } + + /** + * Returns an async generator that yields the events of the event feed + * directly. + * + * @example + * ```javascript + * const feed = client.feed(fql`MyCollection.all().eventSource()`) + * + * for await (const user of feed.flatten()) { + * // do something with each event + * } + * ``` + */ + async *flatten(): AsyncGenerator> { + for await (const page of this) { + for (const event of page.events) { + yield event; + } + } + } + + async #resolveEventSource( + fn: () => Promise, + ): Promise { + return await fn().then((maybeEventSource) => { + if (!isEventSource(maybeEventSource)) { + throw new ClientError( + `Error requesting a stream token. Expected a EventSource as the query result, but received ${typeof maybeEventSource}. Your query must return the result of '.eventSource' or '.eventsOn')\n` + + `Query result: ${JSON.stringify(maybeEventSource, null)}`, + ); + } + return maybeEventSource; + }); + } + + #validateConfiguration() { + const config = this.#clientConfiguration; + + const required_options: (keyof FeedClientConfiguration)[] = [ + "long_type", + "httpClient", + "max_backoff", + "max_attempts", + "client_timeout_buffer_ms", + "query_timeout_ms", + "secret", + ]; + required_options.forEach((option) => { + if (config[option] === undefined) { + throw new TypeError( + `ClientConfiguration option '${option}' must be defined.`, + ); + } + }); + + if (config.max_backoff <= 0) { + throw new RangeError(`'max_backoff' must be greater than zero.`); + } + + if (config.max_attempts <= 0) { + throw new RangeError(`'max_attempts' must be greater than zero.`); + } + + if (config.query_timeout_ms <= 0) { + throw new RangeError(`'query_timeout_ms' must be greater than zero.`); + } + + if (config.client_timeout_buffer_ms < 0) { + throw new RangeError( + `'client_timeout_buffer_ms' must be greater than or equal to zero.`, + ); + } + + if (config.start_ts !== undefined && config.cursor !== undefined) { + throw new TypeError( + "Only one of 'start_ts' or 'cursor' can be defined in the client configuration.", + ); + } + } +} + // Private types and constants for internal logic. function wait(ms: number) { diff --git a/src/http-client/fetch-client.ts b/src/http-client/fetch-client.ts index b96fbb42..cd3f8efe 100644 --- a/src/http-client/fetch-client.ts +++ b/src/http-client/fetch-client.ts @@ -2,7 +2,7 @@ /// import { getServiceError, NetworkError } from "../errors"; -import { QueryFailure } from "../wire-protocol"; +import { QueryFailure, QueryRequest } from "../wire-protocol"; import { FaunaAPIPaths } from "./paths"; import { HTTPClient, @@ -33,13 +33,13 @@ export class FetchClient implements HTTPClient, HTTPStreamClient { } /** {@inheritDoc HTTPClient.request} */ - async request({ + async request({ data, headers: requestHeaders, method, client_timeout_ms, path = this.#defaultRequestPath, - }: HTTPRequest): Promise { + }: HTTPRequest): Promise { const signal = AbortSignal.timeout === undefined ? (() => { diff --git a/src/http-client/http-client.ts b/src/http-client/http-client.ts index 66ed767c..0ffb7984 100644 --- a/src/http-client/http-client.ts +++ b/src/http-client/http-client.ts @@ -7,14 +7,14 @@ import { SupportedFaunaAPIPaths } from "./paths"; * An object representing an http request. * The {@link Client} provides this to the {@link HTTPClient} implementation. */ -export type HTTPRequest = { +export type HTTPRequest = { /** * The timeout of each http request, in milliseconds. */ client_timeout_ms: number; /** The encoded Fauna query to send */ - data: QueryRequest; + data: T; /** Headers in object format */ headers: Record; @@ -44,7 +44,7 @@ export type HTTPClientOptions = { }; /** - * An interface to provide implementation-specific, asyncronous http calls. + * An interface to provide implementation-specific, asynchronous http calls. * This driver provides default implementations for common environments. Users * can configure the {@link Client} to use custom implementations if desired. */ @@ -55,7 +55,7 @@ export interface HTTPClient { * @returns A Promise<{@link HTTPResponse}> * @throws {@link NetworkError} on request timeout or other network issue. */ - request(req: HTTPRequest): Promise; + request(req: HTTPRequest): Promise; /** * Flags the calling {@link Client} as no longer diff --git a/src/http-client/node-http2-client.ts b/src/http-client/node-http2-client.ts index 60bbc3e7..54ae7836 100644 --- a/src/http-client/node-http2-client.ts +++ b/src/http-client/node-http2-client.ts @@ -14,7 +14,7 @@ import { StreamAdapter, } from "./http-client"; import { NetworkError, getServiceError } from "../errors"; -import { QueryFailure } from "../wire-protocol"; +import { QueryFailure, QueryRequest } from "../wire-protocol"; import { FaunaAPIPaths } from "./paths"; // alias http2 types @@ -78,7 +78,7 @@ export class NodeHTTP2Client implements HTTPClient, HTTPStreamClient { } /** {@inheritDoc HTTPClient.request} */ - async request(req: HTTPRequest): Promise { + async request(req: HTTPRequest): Promise { let retryCount = 0; let memoizedError: any; do { @@ -164,13 +164,13 @@ export class NodeHTTP2Client implements HTTPClient, HTTPStreamClient { return this.#session; } - #doRequest({ + #doRequest({ client_timeout_ms, data: requestData, headers: requestHeaders, method, path = this.#defaultRequestPath, - }: HTTPRequest): Promise { + }: HTTPRequest): Promise { return new Promise((resolvePromise, rejectPromise) => { let req: ClientHttp2Stream; const onResponse = ( diff --git a/src/http-client/paths.ts b/src/http-client/paths.ts index 0a74710e..4955c376 100644 --- a/src/http-client/paths.ts +++ b/src/http-client/paths.ts @@ -5,6 +5,7 @@ export const FaunaAPIPaths = { QUERY: "/query/1", STREAM: "/stream/1", + EVENT_FEED: "/feed/1", } as const; export type SupportedFaunaAPIPaths = diff --git a/src/index.ts b/src/index.ts index 7799cc89..cb13de4e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,9 +1,10 @@ -export { Client, StreamClient } from "./client"; +export { Client, StreamClient, FeedClient } from "./client"; export { endpoints, type ClientConfiguration, type Endpoints, type StreamClientConfiguration, + type FeedClientConfiguration, } from "./client-configuration"; export { AbortError, @@ -38,6 +39,9 @@ export { type QuerySuccess, type Span, type ValueFragment, + type FeedRequest, + type FeedSuccess, + type FeedError, } from "./wire-protocol"; export { DateStub, @@ -52,6 +56,8 @@ export { SetIterator, StreamToken, TimeStub, + FeedPage, + type EventSource, type DocumentT, } from "./values"; export { diff --git a/src/util/retryable.ts b/src/util/retryable.ts new file mode 100644 index 00000000..021b5671 --- /dev/null +++ b/src/util/retryable.ts @@ -0,0 +1,41 @@ +export type RetryOptions = { + maxAttempts: number; + maxBackoff: number; + shouldRetry?: (error: any) => boolean; + attempt?: number; + sleepFn?: (callback: (args: void) => void, ms?: number) => void; +}; + +export const withRetries = async ( + fn: () => Promise, + { + maxAttempts, + maxBackoff, + shouldRetry = () => true, + attempt = 0, + sleepFn = setTimeout, + }: RetryOptions, +): Promise => { + const backoffMs = + attempt > 0 + ? Math.min(Math.random() * 2 ** attempt, maxBackoff) * 1_000 + : 0; + attempt += 1; + + try { + return await fn(); + } catch (error: any) { + if (attempt >= maxAttempts || shouldRetry(error) !== true) { + throw error; + } + + await new Promise((resolve) => sleepFn(resolve, backoffMs)); + return withRetries(fn, { + maxAttempts, + maxBackoff, + shouldRetry, + attempt, + sleepFn, + }); + } +}; diff --git a/src/values/stream.ts b/src/values/stream.ts index 544dc68d..092ff84d 100644 --- a/src/values/stream.ts +++ b/src/values/stream.ts @@ -1,7 +1,15 @@ +import { + FeedSuccess, + QueryValue, + StreamEventData, + QueryStats, +} from "../wire-protocol"; +import { getServiceError } from "../errors"; + /** - * A token used to initiate a Fauna stream at a particular snapshot in time. + * A token used to initiate a Fauna event source at a particular snapshot in time. * - * The example below shows how to request a stream token from Fauna and use it + * The example below shows how to request an event token from Fauna and use it * to establish an event steam. * * @example @@ -9,18 +17,62 @@ * const response = await client.query(fql` * Messages.byRecipient(User.byId("1234")) * `); - * const token = response.data; + * const eventSource = response.data; * - * const stream = client.stream(token) + * const stream = client.stream(eventSource) * .on("add", (event) => console.log("New message", event)) * * stream.start(); * ``` */ -export class StreamToken { +export interface EventSource { + readonly token: string; +} + +export function isEventSource(value: any): value is EventSource { + if (typeof value.token === "string") { + return true; + } + + return false; +} + +export class StreamToken implements EventSource { readonly token: string; constructor(token: string) { this.token = token; } } + +/** + * A class to represent a page of events from a Fauna stream. + */ +export class FeedPage { + readonly events: IterableIterator>; + readonly cursor: string; + readonly hasNext: boolean; + readonly stats?: QueryStats; + + constructor({ events, cursor, has_next, stats }: FeedSuccess) { + this.events = this.#toEventIterator(events); + this.cursor = cursor; + this.hasNext = has_next; + this.stats = stats; + } + + *#toEventIterator( + events: FeedSuccess["events"], + ): IterableIterator> { + // A page of events may contain an error event. These won't be reported + // at a response level, so we need to check for them here. They are + // considered fatal. Pages end at the first error event. + for (const event of events) { + if (event.type === "error") { + throw getServiceError(event); + } + + yield event; + } + } +} diff --git a/src/wire-protocol.ts b/src/wire-protocol.ts index 1330900a..0798a5c2 100644 --- a/src/wire-protocol.ts +++ b/src/wire-protocol.ts @@ -1,5 +1,4 @@ -// eslint-disable-next-line @typescript-eslint/no-unused-vars -import { fql, QueryArgumentObject } from "./query-builder"; +import { QueryArgumentObject } from "./query-builder"; import { DateStub, Document, @@ -405,6 +404,19 @@ export type StreamEvent = | StreamEventData | StreamEventError; +export type FeedRequest = StreamRequest & { + page_size?: number; +}; + +export type FeedSuccess = { + events: (StreamEventData | StreamEventError)[]; + cursor: string; + has_next: boolean; + stats?: QueryStats; +}; + +export type FeedError = QueryFailure; + export type TaggedBytes = { "@bytes": string }; export type TaggedDate = { "@date": string }; export type TaggedDouble = { "@double": string };