Skip to content

Commit 8e669a4

Browse files
author
Roger Torres
committed
fix: drain broker in serverless command
1 parent 255570a commit 8e669a4

File tree

3 files changed

+46
-34
lines changed

3 files changed

+46
-34
lines changed

libs/eventually-aws/src/lambda/command.ts

+7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import {
22
CommandHandlerFactory,
33
app,
4+
broker,
45
camelize,
56
client
67
} from "@rotorsoft/eventually";
@@ -54,6 +55,12 @@ export const command = async ({
5455
actor
5556
}
5657
);
58+
59+
// TODO: make this optional
60+
// Since we are in a serverless world that won't wait for external async operations to complete,
61+
// we can force a broker drain here, allowing policies and projectors to consume the new events
62+
if (snap?.event) await broker().drain();
63+
5764
return Ok(
5865
snap,
5966
snap?.event?.version ? { ETag: snap?.event?.version } : undefined

libs/eventually/src/adapters/InMemoryBroker.ts

+38-33
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,21 @@ const event_handler_types: Array<ArtifactType> = [
1616
* @param timeout lease expiration time (in ms) when polling the store
1717
* @param limit max number of events to drain in each try
1818
* @param delay debounce delay (in ms) to drain
19+
* @param subscribed to subscribe the broker to commit events - set to false when serverless
1920
*/
20-
export const InMemoryBroker = ({
21-
timeout,
22-
limit,
23-
delay
24-
}: {
25-
timeout: number;
26-
limit: number;
27-
delay: number;
21+
export const InMemoryBroker = (options?: {
22+
timeout?: number;
23+
limit?: number;
24+
delay?: number;
25+
subscribed?: boolean;
2826
}): Broker => {
2927
const name = "InMemoryBroker";
28+
const {
29+
timeout = 5000,
30+
limit = 10,
31+
delay = 500,
32+
subscribed = true
33+
} = options ?? {};
3034

3135
// connect private event handlers only
3236
// NOTE: public consumers should be connected by an external broker service
@@ -58,34 +62,35 @@ export const InMemoryBroker = ({
5862
const __drain = throttle(drainAll, delay);
5963

6064
// subscribe broker to commit events
61-
app().on("commit", async ({ factory, snapshot }) => {
62-
// commits STATE_EVENT - artifact must be configured in app builder
63-
if (snapshot) {
64-
const commit = app().commits.get(factory.name);
65-
if (commit && commit(snapshot)) {
66-
try {
67-
const { id, stream, name, metadata, version } = snapshot.event!;
68-
return await store().commit(
69-
stream,
70-
[
65+
subscribed &&
66+
app().on("commit", async ({ factory, snapshot }) => {
67+
// commits STATE_EVENT - artifact must be configured in app builder
68+
if (snapshot) {
69+
const commit = app().commits.get(factory.name);
70+
if (commit && commit(snapshot)) {
71+
try {
72+
const { id, stream, name, metadata, version } = snapshot.event!;
73+
return await store().commit(
74+
stream,
75+
[
76+
{
77+
name: STATE_EVENT,
78+
data: snapshot.state
79+
}
80+
],
7181
{
72-
name: STATE_EVENT,
73-
data: snapshot.state
74-
}
75-
],
76-
{
77-
correlation: metadata.correlation,
78-
causation: { event: { id, name, stream } }
79-
},
80-
version // IMPORTANT! - state events should be committed right after the snapshot's event
81-
);
82-
} catch (error) {
83-
log().error(error);
82+
correlation: metadata.correlation,
83+
causation: { event: { id, name, stream } }
84+
},
85+
version // IMPORTANT! - state events should be committed right after the snapshot's event
86+
);
87+
} catch (error) {
88+
log().error(error);
89+
}
8490
}
8591
}
86-
}
87-
__drain();
88-
});
92+
__drain();
93+
});
8994

9095
return {
9196
name,

libs/eventually/src/ports.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ export const client = port(function client(client?: Client & Disposable) {
111111
* @remarks Global port to internal broker
112112
*/
113113
export const broker = port(function broker(broker?: Broker) {
114-
return broker || InMemoryBroker({ timeout: 5000, limit: 10, delay: 500 });
114+
return broker || InMemoryBroker();
115115
});
116116

117117
/**

0 commit comments

Comments
 (0)