Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial event feed support #290

Merged
merged 18 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/pr_validate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ jobs:
image: fauna/faunadb:latest
ports:
- 8443:8443
env:
FLAG_ACCOUNT_CHANGE_FEEDS: "true"
ecooper marked this conversation as resolved.
Show resolved Hide resolved
alt_core:
image: fauna/faunadb:latest
ports:
- 7443:8443
env:
FLAG_ACCOUNT_CHANGE_FEEDS: "true"
strategy:
matrix:
node: ["18", "20"]
Expand Down
80 changes: 80 additions & 0 deletions __tests__/functional/change-feed-client-configuration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import {
StreamToken,
getDefaultHTTPClient,
ChangeFeedClientConfiguration,
ChangeFeedClient,
} from "../../src";
import { getDefaultHTTPClientOptions } from "../client";

const defaultHttpClient = getDefaultHTTPClient(getDefaultHTTPClientOptions());
const defaultConfig: ChangeFeedClientConfiguration = {
secret: "secret",
long_type: "number",
max_attempts: 3,
max_backoff: 20,
query_timeout_ms: 5000,
httpClient: defaultHttpClient,
};
const dummyStreamToken = new StreamToken("dummy");

describe("ChangeFeedClientConfiguration", () => {
it("can be instantiated directly with a token", () => {
new ChangeFeedClient(dummyStreamToken, defaultConfig);
ecooper marked this conversation as resolved.
Show resolved Hide resolved
});

it("can be instantiated directly with a lambda", async () => {
new ChangeFeedClient(
() => Promise.resolve(dummyStreamToken),
defaultConfig,
);
});

it("throws a RangeError if 'max_backoff' is less than or equal to zero", async () => {
ecooper marked this conversation as resolved.
Show resolved Hide resolved
expect.assertions(1);

const config = { ...defaultConfig, max_backoff: 0 };
try {
new ChangeFeedClient(dummyStreamToken, config);
} catch (e: any) {
expect(e).toBeInstanceOf(RangeError);
}
});

it.each`
fieldName
${"long_type"}
${"httpClient"}
${"max_backoff"}
${"max_attempts"}
${"query_timeout_ms"}
${"secret"}
`(
"throws a TypeError if $fieldName provided is undefined",
async ({
fieldName,
}: {
fieldName: keyof ChangeFeedClientConfiguration;
}) => {
expect.assertions(1);

const config = { ...defaultConfig };
delete config[fieldName];
try {
new ChangeFeedClient(dummyStreamToken, config);
} catch (e: any) {
expect(e).toBeInstanceOf(TypeError);
}
},
);

it("throws a RangeError if 'max_attempts' is less than or equal to zero", async () => {
expect.assertions(1);

const config = { ...defaultConfig, max_attempts: 0 };
try {
new ChangeFeedClient(dummyStreamToken, config);
} catch (e: any) {
expect(e).toBeInstanceOf(RangeError);
}
});
});
238 changes: 238 additions & 0 deletions __tests__/functional/change-feed-client.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
import {
ChangeFeedClient,
ChangeFeedClientConfiguration,
QueryRuntimeError,
StreamToken,
ThrottlingError,
} from "../../src";

const mockHttpResponse = {
status: 200,
body: JSON.stringify({
events: [],
cursor: "cursor=",
has_next: false,
stats: {
test: "test",
},
}),
headers: {
":status": 200,
"content-type": "application/json;charset=utf-8",
},
};
const mockHttpClient = {
request: jest
.fn()
.mockImplementation(() => Promise.resolve({ ...mockHttpResponse })),
close: jest.fn(),
};
const defaultConfig: ChangeFeedClientConfiguration = {
secret: "secret",
long_type: "number",
max_attempts: 3,
max_backoff: 20,
query_timeout_ms: 5000,
httpClient: mockHttpClient,
};
const dummyStreamToken = new StreamToken("dummy");

const fromAsync = async <T>(iterator: AsyncIterable<T>): Promise<T[]> => {
const res: T[] = [];
for await (const item of iterator) {
res.push(item);
}
return res;
};

describe("ChangeFeedClient", () => {
afterEach(() => {
jest.clearAllMocks();
});

it("can be instantiated directly with a token", () => {
new ChangeFeedClient(dummyStreamToken, defaultConfig);
});

it("can be instantiated directly with a lambda", async () => {
new ChangeFeedClient(
() => Promise.resolve(dummyStreamToken),
defaultConfig,
);
});
ecooper marked this conversation as resolved.
Show resolved Hide resolved

it("returns a valid page of events", async () => {
mockHttpClient.request.mockImplementationOnce(() =>
Promise.resolve({
...mockHttpResponse,
body: JSON.stringify({
cursor: "gsGCGmcGl+0aJPRzAAA=",
has_next: true,
events: [
{
type: "add",
data: {
"@doc": {
id: "411279302456246784",
coll: { "@mod": "ChangeFeedTest" },
ts: { "@time": "2024-10-09T14:49:17.620Z" },
value: { "@int": "1" },
},
},
txn_ts: 1728485357620000,
cursor: "gsGCGmcGl+0aJPRzAAA=",
stats: {
read_ops: 1,
storage_bytes_read: 66,
compute_ops: 1,
processing_time_ms: 0,
rate_limits_hit: [],
},
},
],
}),
}),
);

const changeFeed = new ChangeFeedClient(dummyStreamToken, defaultConfig);
const firstPage = await changeFeed.nextPage();

expect(firstPage.cursor).toBe("gsGCGmcGl+0aJPRzAAA=");
expect(firstPage.hasNext).toBe(true);

const events = Array.from(firstPage.events);
expect(events).toHaveLength(1);
expect(events[0].type).toBe("add");
expect(events[0].data).toEqual(expect.any(Object));

const secondPage = await changeFeed.nextPage();

expect(secondPage.cursor).toBe("cursor=");
expect(secondPage.hasNext).toBe(false);
expect(Array.from(secondPage.events)).toHaveLength(0);
});

it("uses a valid HTTPRequest", async () => {
const changeFeed = new ChangeFeedClient(dummyStreamToken, defaultConfig);
await fromAsync(changeFeed);

expect(mockHttpClient.request).toHaveBeenCalledWith(
expect.objectContaining({
client_timeout_ms: defaultConfig.query_timeout_ms,
ecooper marked this conversation as resolved.
Show resolved Hide resolved
headers: expect.objectContaining({
Authorization: "Bearer secret",
"x-format": "tagged",
"x-driver-env": expect.any(String),
}),
data: {
ecooper marked this conversation as resolved.
Show resolved Hide resolved
token: "dummy",
},
}),
);
});

it("uses page_size when set", async () => {
const changeFeed = new ChangeFeedClient(dummyStreamToken, {
...defaultConfig,
page_size: 10,
});
await fromAsync(changeFeed.flatten());

expect(mockHttpClient.request).toHaveBeenCalledWith(
expect.objectContaining({
data: {
token: "dummy",
page_size: 10,
},
}),
);
});

it("uses cursor when set", async () => {
const changeFeed = new ChangeFeedClient(dummyStreamToken, {
...defaultConfig,
cursor: "some-cursor=",
});
await fromAsync(changeFeed.flatten());

expect(mockHttpClient.request).toHaveBeenCalledWith(
expect.objectContaining({
data: {
token: "dummy",
cursor: "some-cursor=",
},
}),
);
});

it("uses start_ts when set", async () => {
const startTs = Date.now();
const changeFeed = new ChangeFeedClient(dummyStreamToken, {
...defaultConfig,
start_ts: startTs,
});

await fromAsync(changeFeed.flatten());

expect(mockHttpClient.request).toHaveBeenCalledWith(
expect.objectContaining({
data: {
token: "dummy",
start_ts: startTs,
},
}),
);
});

it("does not use start_ts if cursor is set", async () => {
const startTs = Date.now();
const changeFeed = new ChangeFeedClient(dummyStreamToken, {
...defaultConfig,
cursor: "cursor=",
start_ts: startTs,
});
await fromAsync(changeFeed.flatten());

expect(mockHttpClient.request).toHaveBeenCalledWith(
expect.objectContaining({
data: {
token: "dummy",
cursor: "cursor=",
},
}),
);
});

it("retries throttling errors", async () => {
mockHttpClient.request.mockImplementationOnce(() =>
Promise.reject(
new ThrottlingError({
error: {
code: "throttled",
message: "test",
},
}),
),
);

const changeFeed = new ChangeFeedClient(dummyStreamToken, defaultConfig);
await fromAsync(changeFeed.flatten());

expect(mockHttpClient.request).toHaveBeenCalledTimes(2);
ecooper marked this conversation as resolved.
Show resolved Hide resolved
});

it("throws an error for an error response", () => {
ecooper marked this conversation as resolved.
Show resolved Hide resolved
mockHttpClient.request.mockImplementationOnce(() =>
Promise.resolve({
...mockHttpResponse,
status: 400,
body: JSON.stringify({
error: { code: "test", message: "test" },
}),
}),
);

const changeFeed = new ChangeFeedClient(dummyStreamToken, defaultConfig);
expect(fromAsync(changeFeed.flatten())).rejects.toThrow(QueryRuntimeError);
});
});
Loading
Loading