Skip to content

Commit

Permalink
Add initial event feed support (#290)
Browse files Browse the repository at this point in the history
  • Loading branch information
ecooper authored Oct 28, 2024
1 parent 13d7c18 commit e75ea5e
Show file tree
Hide file tree
Showing 17 changed files with 1,264 additions and 69 deletions.
161 changes: 143 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ See the [Fauna Documentation](https://docs.fauna.com/fauna/current/) for additio
- [Query timeout](#query-timeout)
- [Client timeout](#client-timeout)
- [HTTP/2 session idle timeout](#http2-session-idle-timeout)
- [Event Feeds](#event-feeds)
- [Request an Event Feed](#request-a-event-feed)
- [Iterate on an Event Feed](#iterate-on-a-event-feed)
- [Error handling](#error-handling)
- [Event Feed options](#event-feed-options)
- [Event Streaming](#event-streaming)
- [Start a stream](#start-a-stream)
- [Iterate on a stream](#iterate-on-a-stream)
Expand Down Expand Up @@ -69,14 +74,12 @@ Stable versions of:
- Safari 12.1+
- Edge 79+


## API reference

API reference documentation for the driver is available at
https://fauna.github.io/fauna-js/. The docs are generated using
[TypeDoc](https://typedoc.org/).


## Install

The driver is available on [npm](https://www.npmjs.com/package/fauna). You
Expand Down Expand Up @@ -467,39 +470,161 @@ const client = new Client({ http2_session_idle_ms: 6000 });
> **Warning**
> Setting `http2_session_idle_ms` to small values can lead to a race condition where requests cannot be transmitted before the session is closed, yielding `ERR_HTTP2_GOAWAY_SESSION` errors.
## Event Feeds

The driver supports [Event Feeds](https://docs.fauna.com/fauna/current/learn/cdc/#event-feeds).

### Request an Event Feed

An Event Feed asynchronously polls an [event source](https://docs.fauna.com/fauna/current/learn/cdc/#create-an-event-source) for events.

To get an event source, append `eventSource()` or `eventsOn()` to a set from a
[supported Set](https://docs.fauna.com/fauna/current/reference/streaming_reference/#sets).

To get paginated events, pass the event source to `feed()`:

```javascript
const response = await client.query(fql`
let set = Product.all()
{
initialPage: set.pageSize(10),
eventSource: set.eventSource()
}
`);
const { initialPage, eventSource } = response.data;

const feed = client.feed(eventSource);
```

You can also pass a query that produces an event source directly to `feed()`:

```javascript
const query = fql`Product.all().eventsOn(.price, .stock)`;

const feed = client.feed(query);
```

### Iterate on a Event Feed

`feed()` returns a `FeedClient` instance that can act as an `AsyncIterator`. You can use `for await...of` to iterate through all the pages:

```ts
const query = fql`Product.all().eventsOn(.price, .stock)`;
const feed = client.feed(query);

for await (const page of feed) {
console.log("Page stats", page.stats);

for (event in page.events) {
switch (event.type) {
case "update":
case "add":
case "remove":
console.log("Event: ", event);
// ...
break;
}
}
}
```

Alternatively, use `flatten()` to get paginated results as a single, flat array:

```ts
const query = fql`Product.all().eventsOn(.price, .stock)`;
const feed = client.feed(query);

for await (const event of feed.flatten()) {
console.log("Event: ", event);
}
```

### Error handling

Exceptions can be raised at two different places:

1. While fetching a page
1. While iterating a page's events

This distinction allows for you to ignore errors originating from event processing.
For example:

```ts
const feed = client.feed(fql`
Product.all().map(.details.toUpperCase()).eventSource()
`);

try {
for await (const page of feed) {
// Pages will stop at the first error encountered.
// Therefore, its safe to handle an event failures
// and then pull more pages.
try {
for (const event of page.events) {
console.log("Event: ", event);
}
} catch (error: unknown) {
console.log("Feed event error: ", error);
}
}
} catch (error: unknown) {
console.log("Non-retryable error: ", error);
}
```

### Event Feed options

The client configuration sets the default options for `feed()`. You can pass a `FeedClientConfiguration` object to override these defaults:

```ts
const options: FeedClientConfiguration = {
long_type: "number",
max_attempts: 5,
max_backoff: 1000,
query_timeout_ms: 5000,
client_timeout_buffer_ms: 5000,
secret: "FAUNA_SECRET",
cursor: undefined,
start_ts: undefined,
};

client.feed(fql`Product.all().eventSource()`, options);
```

## Event Streaming

The driver supports [Event Streaming](https://docs.fauna.com/fauna/current/learn/streaming).

### Start a stream

To get a stream token, append
[`toStream()`](https://docs.fauna.com/fauna/current/reference/reference/schema_entities/set/tostream)
To get an event source, append
[`eventSource()`](https://docs.fauna.com/fauna/current/reference/reference/schema_entities/set/eventsource)
or
[`changesOn()`](https://docs.fauna.com/fauna/current/reference/reference/schema_entities/set/changeson)
[`eventsOn()`](https://docs.fauna.com/fauna/current/reference/reference/schema_entities/set/eventson)
to a set from a [supported
source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources).
source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#sets).

To start and subscribe to the stream, pass the stream token to `stream()`:
To start and subscribe to the stream, pass the event source to `stream()`:

```javascript
const response = await client.query(fql`
let set = Product.all()
{
initialPage: set.pageSize(10),
streamToken: set.toStream()
eventSource: set.eventSource()
}
`);
const { initialPage, streamToken } = response.data;
const { initialPage, eventSource } = response.data;

client.stream(streamToken);
client.stream(eventSource);
```

You can also pass a query that produces a stream token directly to `stream()`:
You can also pass a query that produces an event source directly to `stream()`:

```javascript
const query = fql`Product.all().changesOn(.price, .stock)`;
const query = fql`Product.all().eventsOn(.price, .stock)`;

client.stream(query);
```
Expand All @@ -515,7 +640,7 @@ try {
case "update":
case "add":
case "remove":
console.log("Stream event:", event);
console.log("Event: ", event);
// ...
break;
}
Expand All @@ -537,7 +662,7 @@ stream.start(
case "update":
case "add":
case "remove":
console.log("Stream event:", event);
console.log("Event: ", event);
// ...
break;
}
Expand All @@ -556,11 +681,11 @@ stream.start(
Use `close()` to close a stream:

```javascript
const stream = await client.stream(fql`Product.all().toStream()`);
const stream = await client.stream(fql`Product.all().eventSource()`);

let count = 0;
for await (const event of stream) {
console.log("Stream event:", event);
console.log("Event: ", event);
// ...
count++;

Expand Down Expand Up @@ -589,7 +714,7 @@ const options: StreamClientConfiguration = {
cursor: null,
};

client.stream(fql`Product.all().toStream()`, options);
client.stream(fql`Product.all().eventSource()`, options);
```

For supported properties, see
Expand All @@ -598,7 +723,7 @@ in the API reference.

## Contributing

Any contributions are from the community are greatly appreciated!
Any contributions from the community are greatly appreciated!

If you have a suggestion that would make this better, please fork the repo and create a pull request. You may also simply open an issue. We provide templates, so please complete those to the best of your ability.

Expand Down
18 changes: 9 additions & 9 deletions __tests__/functional/client-configuration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ describe("ClientConfiguration", () => {
process.env["FAUNA_SECRET"] = "foo";
const client = new Client();
expect(Buffer.from(JSON.stringify(client)).toString()).not.toContain(
"secret"
"secret",
);
});

Expand All @@ -33,11 +33,11 @@ describe("ClientConfiguration", () => {

const client = new Client({ secret: "secret" });
expect(client.clientConfiguration.endpoint?.toString()).toEqual(
"https://localhost:9999/"
"https://localhost:9999/",
);
});

it("Client respectes passed in client configuration over defaults", () => {
it("Client respects passed in client configuration over defaults", () => {
// TODO: when the Client accepts an http client add a mock that validates
// the configuration changes were applied.
});
Expand All @@ -51,7 +51,7 @@ describe("ClientConfiguration", () => {
if ("message" in e) {
expect(e.message).toEqual(
"You must provide a secret to the driver. Set it in \
an environmental variable named FAUNA_SECRET or pass it to the Client constructor."
an environmental variable named FAUNA_SECRET or pass it to the Client constructor.",
);
}
}
Expand Down Expand Up @@ -117,10 +117,10 @@ an environmental variable named FAUNA_SECRET or pass it to the Client constructo
expect(req.headers["x-query-timeout-ms"]).toEqual("5000");
const _expectedHeader = expectedHeader;
expect(req.headers[_expectedHeader.key]).toEqual(
_expectedHeader.value
_expectedHeader.value,
);
return getDefaultHTTPClient(getDefaultHTTPClientOptions()).request(
req
req,
);
},

Expand All @@ -132,11 +132,11 @@ an environmental variable named FAUNA_SECRET or pass it to the Client constructo
query_timeout_ms: 5000,
[fieldName]: fieldValue,
},
httpClient
httpClient,
);
await client.query<number>(fql`"taco".length`);
client.close();
}
},
);

it("can accept endpoints with or without a trailing slash.", async () => {
Expand Down Expand Up @@ -173,7 +173,7 @@ an environmental variable named FAUNA_SECRET or pass it to the Client constructo
} finally {
client?.close();
}
}
},
);

it("throws a RangeError if 'client_timeout_buffer_ms' is less than or equal to zero", async () => {
Expand Down
Loading

0 comments on commit e75ea5e

Please sign in to comment.