Skip to content

Commit

Permalink
feat: AbortController to signal request timeouts
Browse files Browse the repository at this point in the history
This commit adds a nodejs AbortController to the HTTP request object that is
used to signal cancellation for a function execution in the event of a timeout
or client disconnect.
  • Loading branch information
matthewrobertson committed Apr 26, 2024
1 parent 1c48074 commit b04f564
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 41 deletions.
1 change: 1 addition & 0 deletions docs/generated/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ interface Request_2 extends Request_3 {
rawBody?: Buffer;
spanId?: string;
traceId?: string;
abortController?: AbortController;
}
export { Request_2 as Request }

Expand Down
4 changes: 4 additions & 0 deletions src/functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ export interface Request extends ExpressRequest {
* Cloud Trace span ID.
*/
spanId?: string;
/**
* An AbortController used to signal cancellation of a function invocation (e.g. in case of time out).
*/
abortController?: AbortController;
}

/**
Expand Down
10 changes: 5 additions & 5 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ export const main = async () => {
// eslint-disable-next-line no-process-exit
process.exit(1);
}

const {userFunction, signatureType} = loadedFunction;
const server = getServer(
userFunction!,
signatureType,
options.enableExecutionId
);
// It is possible to overwrite the configured signature type in code so we
// reset it here based on what we loaded.
options.signatureType = signatureType;
const server = getServer(userFunction!, options);
const errorHandler = new ErrorHandler(server);
server
.listen(options.port, () => {
Expand Down
37 changes: 37 additions & 0 deletions src/middleware/timeout.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import {Request, Response} from '../functions';
import {NextFunction} from 'express';

export const timeoutMiddleware = (timeoutMilliseconds: number) => {
return (req: Request, res: Response, next: NextFunction) => {
// In modern versions of Node.js that support the AbortController API we add one to
// signal function timeout.
if (timeoutMilliseconds > 0 && AbortController) {
req.abortController = new AbortController();
req.setTimeout(timeoutMilliseconds);
let executionComplete = false;
res.on('timeout', () => {
// This event is triggered when the underlying socket times out due to inactivity.
if (!executionComplete) {
executionComplete = true;
req.abortController?.abort('timeout');
}
});
req.on('close', () => {
// This event is triggered when the underlying HTTP connection is closed. This can
// happen if the data plane times out the request, the client disconnects or the
// response is complete.
if (!executionComplete) {
executionComplete = true;
req.abortController?.abort('request closed');
}
});
req.on('end', () => {
// This event is triggered when the function execution completes and we
// write an HTTP response.
executionComplete = true;
});
}
// Always call next to continue middleware processing.
next();
};
};
20 changes: 20 additions & 0 deletions src/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ export interface FrameworkOptions {
* Whether or not to enable execution id support.
*/
enableExecutionId: boolean;
/**
* The request timeout.
*/
timeoutMilliseconds: number;
}

/**
Expand Down Expand Up @@ -112,6 +116,20 @@ const SignatureOption = new ConfigurableOption(
);
}
);
const TimeoutOption = new ConfigurableOption(
'timeout',
'CLOUD_RUN_TIMEOUT_SECONDS',
0,
(x: string | number) => {
if (typeof x === 'string') {
x = parseInt(x, 10);
}
if (isNaN(x) || x < 0) {
throw new OptionsError('Timeout must be a positive integer');
}
return x * 1000;
}
);

export const requiredNodeJsVersionForLogExecutionID = '13.0.0';
const ExecutionIdOption = new ConfigurableOption(
Expand Down Expand Up @@ -158,13 +176,15 @@ export const parseOptions = (
FunctionTargetOption.cliOption,
SignatureOption.cliOption,
SourceLocationOption.cliOption,
TimeoutOption.cliOption,
],
});
return {
port: PortOption.parse(argv, envVars),
target: FunctionTargetOption.parse(argv, envVars),
sourceLocation: SourceLocationOption.parse(argv, envVars),
signatureType: SignatureOption.parse(argv, envVars),
timeoutMilliseconds: TimeoutOption.parse(argv, envVars),
printHelp: cliArgs[2] === '-h' || cliArgs[2] === '--help',
enableExecutionId: ExecutionIdOption.parse(argv, envVars),
};
Expand Down
28 changes: 15 additions & 13 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@ import * as express from 'express';
import * as http from 'http';
import * as onFinished from 'on-finished';
import {HandlerFunction, Request, Response} from './functions';
import {SignatureType} from './types';
import {setLatestRes} from './invoker';
import {legacyPubSubEventMiddleware} from './pubsub_middleware';
import {cloudEventToBackgroundEventMiddleware} from './middleware/cloud_event_to_background_event';
import {backgroundEventToCloudEventMiddleware} from './middleware/background_event_to_cloud_event';
import {timeoutMiddleware} from './middleware/timeout';
import {wrapUserFunction} from './function_wrappers';
import {asyncLocalStorageMiddleware} from './async_local_storage';
import {executionContextMiddleware} from './execution_context';
import {errorHandler} from './logger';
import {FrameworkOptions} from './options';

/**
* Creates and configures an Express application and returns an HTTP server
* which will run it.
* @param userFunction User's function.
* @param functionSignatureType Type of user's function signature.
* @param options the configured Function Framework options.
* @return HTTP server.
*/
export function getServer(
userFunction: HandlerFunction,
functionSignatureType: SignatureType,
enableExecutionId: boolean
options: FrameworkOptions
): http.Server {
// App to use for function executions.
const app = express();
Expand Down Expand Up @@ -89,7 +89,7 @@ export function getServer(
};

// Apply middleware
if (functionSignatureType !== 'typed') {
if (options.signatureType !== 'typed') {
// If the function is not typed then JSON parsing can be done automatically, otherwise the
// functions format must determine deserialization.
app.use(bodyParser.json(cloudEventsBodySavingOptions));
Expand Down Expand Up @@ -120,23 +120,23 @@ export function getServer(
app.use(asyncLocalStorageMiddleware);

if (
functionSignatureType === 'event' ||
functionSignatureType === 'cloudevent'
options.signatureType === 'event' ||
options.signatureType === 'cloudevent'
) {
// If a Pub/Sub subscription is configured to invoke a user's function directly, the request body
// needs to be marshalled into the structure that wrapEventFunction expects. This unblocks local
// development with the Pub/Sub emulator
app.use(legacyPubSubEventMiddleware);
}

if (functionSignatureType === 'event') {
if (options.signatureType === 'event') {
app.use(cloudEventToBackgroundEventMiddleware);
}
if (functionSignatureType === 'cloudevent') {
if (options.signatureType === 'cloudevent') {
app.use(backgroundEventToCloudEventMiddleware);
}

if (functionSignatureType === 'http') {
if (options.signatureType === 'http') {
app.use('/favicon.ico|/robots.txt', (req, res) => {
// Neither crawlers nor browsers attempting to pull the icon find the body
// contents particularly useful, so we send nothing in the response body.
Expand All @@ -151,16 +151,18 @@ export function getServer(
});
}

app.use(timeoutMiddleware(options.timeoutMilliseconds));

// Set up the routes for the user's function
const requestHandler = wrapUserFunction(userFunction, functionSignatureType);
if (functionSignatureType === 'http') {
const requestHandler = wrapUserFunction(userFunction, options.signatureType);
if (options.signatureType === 'http') {
app.all('/*', requestHandler);
} else {
app.post('/*', requestHandler);
}

// Error Handler
if (enableExecutionId) {
if (options.enableExecutionId) {
app.use(errorHandler);
}

Expand Down
14 changes: 9 additions & 5 deletions src/testing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,13 @@ export const getTestServer = (functionName: string): Server => {
`The provided function "${functionName}" was not registered. Did you forget to require the module that defined it?`
);
}
return getServer(
registeredFunction.userFunction,
registeredFunction.signatureType,
/*enableExecutionId=*/ false
);
return getServer(registeredFunction.userFunction, {
signatureType: registeredFunction.signatureType,
enableExecutionId: false,
timeoutMilliseconds: 0,
port: '0',
target: '',
sourceLocation: '',
printHelp: false,
});
};
33 changes: 18 additions & 15 deletions test/integration/legacy_event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import * as functions from '../../src/functions';
import * as sinon from 'sinon';
import {getServer} from '../../src/server';
import * as supertest from 'supertest';
import {SignatureType} from '../../src/types';

const TEST_CLOUD_EVENT = {
specversion: '1.0',
Expand All @@ -31,6 +32,16 @@ const TEST_CLOUD_EVENT = {
},
};

const testOptions = {
signatureType: 'event' as SignatureType,
enableExecutionId: false,
timeoutMilliseconds: 0,
port: '0',
target: '',
sourceLocation: '',
printHelp: false,
};

describe('Event Function', () => {
beforeEach(() => {
// Prevent log spew from the PubSub emulator request.
Expand Down Expand Up @@ -181,14 +192,10 @@ describe('Event Function', () => {
it(test.name, async () => {
let receivedData: {} | null = null;
let receivedContext: functions.CloudFunctionsContext | null = null;
const server = getServer(
(data: {}, context: functions.Context) => {
receivedData = data;
receivedContext = context as functions.CloudFunctionsContext;
},
'event',
/*enableExecutionId=*/ false
);
const server = getServer((data: {}, context: functions.Context) => {
receivedData = data;
receivedContext = context as functions.CloudFunctionsContext;
}, testOptions);
const requestHeaders = {
'Content-Type': 'application/json',
...test.headers,
Expand All @@ -204,13 +211,9 @@ describe('Event Function', () => {
});

it('returns a 500 if the function throws an exception', async () => {
const server = getServer(
() => {
throw 'I crashed';
},
'event',
/*enableExecutionId=*/ false
);
const server = getServer(() => {
throw 'I crashed';
}, testOptions);
await supertest(server)
.post('/')
.send({
Expand Down
49 changes: 49 additions & 0 deletions test/middleware/timeout.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import * as assert from 'assert';
import * as sinon from 'sinon';
import {NextFunction} from 'express';
import {Request, Response} from '../../src/functions';

import {timeoutMiddleware} from '../../src/middleware/timeout';

describe('timeoutMiddleware', () => {
let request: Request;
let response: Response;
let next: NextFunction;
beforeEach(() => {
request = {
setTimeout: sinon.spy(),
on: sinon.spy(),
} as unknown as Request;
response = {
on: sinon.spy(),
} as unknown as Response;
next = sinon.spy();
});

it('calls the next function', () => {
const middleware = timeoutMiddleware(1000);
middleware(request, response, next);
assert.strictEqual((next as sinon.SinonSpy).called, true);
});

it('adds an abort controller to the request', () => {
const middleware = timeoutMiddleware(1000);
middleware(request, response, next);
assert.strictEqual(!!request.abortController, true);
});

it('adds an abort controller to the request', () => {
const middleware = timeoutMiddleware(1000);
middleware(request, response, next);
assert.strictEqual(!!request.abortController, true);
});

it('sets the request timeout', () => {
const middleware = timeoutMiddleware(1000);
middleware(request, response, next);
assert.strictEqual(
(request.setTimeout as sinon.SinonSpy).calledWith(1000),
true
);
});
});
Loading

0 comments on commit b04f564

Please sign in to comment.