forked from ashramsey/aws-lambda-graphql
-
Notifications
You must be signed in to change notification settings - Fork 0
/
DynamoDBEventProcessor.ts
144 lines (125 loc) · 5.1 KB
/
DynamoDBEventProcessor.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import { DynamoDBStreamHandler } from 'aws-lambda';
import { DynamoDB } from 'aws-sdk';
import { isAsyncIterable, getAsyncIterator } from 'iterall';
import { ExecutionResult } from 'graphql';
import { ArrayPubSub } from './ArrayPubSub';
import { IEventProcessor } from './types';
import { formatMessage } from './formatMessage';
import { execute } from './execute';
import { SERVER_EVENT_TYPES } from './protocol';
import { Server } from './Server';
import { IDynamoDBSubscriptionEvent } from './DynamoDBEventStore';
import { isTTLExpired } from './helpers/isTTLExpired';
interface DynamoDBEventProcessorOptions {
onError?: (err: any) => void;
/**
* Enable log
*/
debug?: boolean;
/**
* Allow injecting a logging function
*/
log?: (message: any, ...optionalParams: any[]) => void;
}
/**
* DynamoDBEventProcessor
*
* Processes DynamoDB stream event in order to send events to subscribed clients
*/
export class DynamoDBEventProcessor<TServer extends Server = Server>
implements IEventProcessor<TServer, DynamoDBStreamHandler> {
private onError: (err: any) => void;
private debug: boolean;
private log: (message: any, ...optionalParams: any[]) => void;
constructor(options: DynamoDBEventProcessorOptions = {}) {
this.log = options.log || console.log;
this.onError = options.onError || ((err: any) => this.log(err));
this.debug = options.debug || false;
}
public createHandler(server: TServer): DynamoDBStreamHandler {
return async (lambdaEvent, lambdaContext) => {
const connectionManager = server.getConnectionManager();
const subscriptionManager = server.getSubscriptionManager();
const { Records } = lambdaEvent;
for (const record of Records) {
// process only INSERT events
if (record.eventName !== 'INSERT') {
continue;
}
// now construct event from dynamodb image
const event: IDynamoDBSubscriptionEvent = DynamoDB.Converter.unmarshall(
record.dynamodb!.NewImage as any,
) as any;
// skip if event is expired
if (isTTLExpired(event.ttl)) {
if (this.debug) this.log('Discarded event : TTL expired', event);
continue;
}
if (this.debug) this.log('Processing event', event);
// iterate over subscribers that listen to this event
// and for each connection:
// - create a schema (so we have subscribers registered in PubSub)
// - execute operation from event againt schema
// - if iterator returns a result, send it to client
// - clean up subscriptions and follow with next page of subscriptions
// - if they are no more subscriptions, process next event
// make sure that you won't throw any errors otherwise dynamo will call
// handler with same events again
for await (const subscribers of subscriptionManager.subscribersByEvent(
event,
)) {
const promises = subscribers
.map(async (subscriber) => {
// create PubSub for this subscriber
const pubSub = new ArrayPubSub([event]);
const options = await server.createGraphQLServerOptions2(
lambdaEvent as any,
lambdaContext,
{
// this allows createGraphQLServerOptions() to append more extra data
// to context from connection.data.context
connection: subscriber.connection,
operation: subscriber.operation,
pubSub,
},
);
// execute operation by executing it and then publishing the event
const iterable = await execute({
connectionManager,
subscriptionManager,
schema: options.schema,
event: lambdaEvent as any, // we don't have an API GW event here
lambdaContext,
context: options.context,
connection: subscriber.connection,
operation: subscriber.operation,
pubSub,
registerSubscriptions: false,
});
if (!isAsyncIterable(iterable)) {
// something went wrong, probably there is an error
this.log('Execution result: non iterable', event);
return Promise.resolve();
}
const iterator = getAsyncIterator(iterable);
const result: IteratorResult<ExecutionResult> = await iterator.next();
if (result.value != null) {
if (this.debug) this.log('Send event ', result);
return connectionManager.sendToConnection(
subscriber.connection,
formatMessage({
id: subscriber.operationId,
payload: result.value,
type: SERVER_EVENT_TYPES.GQL_DATA,
}),
);
}
return Promise.resolve();
})
.map((promise) => promise.catch(this.onError));
await Promise.all(promises);
}
}
};
}
}