From b04f564788a066341f61100a6ee2d959f4fe6e13 Mon Sep 17 00:00:00 2001 From: Matthew Robertson Date: Thu, 25 Apr 2024 13:26:43 -0700 Subject: [PATCH] feat: AbortController to signal request timeouts This commit adds a nodejs AbortController to the HTTP request object that is used to signal cancellation for a function execution in the event of a timeout or client disconnect. --- docs/generated/api.md | 1 + src/functions.ts | 4 +++ src/main.ts | 10 +++---- src/middleware/timeout.ts | 37 ++++++++++++++++++++++++ src/options.ts | 20 +++++++++++++ src/server.ts | 28 +++++++++--------- src/testing.ts | 14 +++++---- test/integration/legacy_event.ts | 33 +++++++++++---------- test/middleware/timeout.ts | 49 ++++++++++++++++++++++++++++++++ test/options.ts | 34 ++++++++++++++++++++-- 10 files changed, 189 insertions(+), 41 deletions(-) create mode 100644 src/middleware/timeout.ts create mode 100644 test/middleware/timeout.ts diff --git a/docs/generated/api.md b/docs/generated/api.md index 56ea685d..52d9f2e1 100644 --- a/docs/generated/api.md +++ b/docs/generated/api.md @@ -116,6 +116,7 @@ interface Request_2 extends Request_3 { rawBody?: Buffer; spanId?: string; traceId?: string; + abortController?: AbortController; } export { Request_2 as Request } diff --git a/src/functions.ts b/src/functions.ts index cd37b8f4..75ddcd27 100644 --- a/src/functions.ts +++ b/src/functions.ts @@ -44,6 +44,10 @@ export interface Request extends ExpressRequest { * Cloud Trace span ID. */ spanId?: string; + /** + * An AbortController used to signal cancellation of a function invocation (e.g. in case of time out). + */ + abortController?: AbortController; } /** diff --git a/src/main.ts b/src/main.ts index 3dd98398..2f1684c7 100644 --- a/src/main.ts +++ b/src/main.ts @@ -49,12 +49,12 @@ export const main = async () => { // eslint-disable-next-line no-process-exit process.exit(1); } + const {userFunction, signatureType} = loadedFunction; - const server = getServer( - userFunction!, - signatureType, - options.enableExecutionId - ); + // It is possible to overwrite the configured signature type in code so we + // reset it here based on what we loaded. + options.signatureType = signatureType; + const server = getServer(userFunction!, options); const errorHandler = new ErrorHandler(server); server .listen(options.port, () => { diff --git a/src/middleware/timeout.ts b/src/middleware/timeout.ts new file mode 100644 index 00000000..fd0197b3 --- /dev/null +++ b/src/middleware/timeout.ts @@ -0,0 +1,37 @@ +import {Request, Response} from '../functions'; +import {NextFunction} from 'express'; + +export const timeoutMiddleware = (timeoutMilliseconds: number) => { + return (req: Request, res: Response, next: NextFunction) => { + // In modern versions of Node.js that support the AbortController API we add one to + // signal function timeout. + if (timeoutMilliseconds > 0 && AbortController) { + req.abortController = new AbortController(); + req.setTimeout(timeoutMilliseconds); + let executionComplete = false; + res.on('timeout', () => { + // This event is triggered when the underlying socket times out due to inactivity. + if (!executionComplete) { + executionComplete = true; + req.abortController?.abort('timeout'); + } + }); + req.on('close', () => { + // This event is triggered when the underlying HTTP connection is closed. This can + // happen if the data plane times out the request, the client disconnects or the + // response is complete. + if (!executionComplete) { + executionComplete = true; + req.abortController?.abort('request closed'); + } + }); + req.on('end', () => { + // This event is triggered when the function execution completes and we + // write an HTTP response. + executionComplete = true; + }); + } + // Always call next to continue middleware processing. + next(); + }; +}; diff --git a/src/options.ts b/src/options.ts index d761b86f..7f1cd7e3 100644 --- a/src/options.ts +++ b/src/options.ts @@ -52,6 +52,10 @@ export interface FrameworkOptions { * Whether or not to enable execution id support. */ enableExecutionId: boolean; + /** + * The request timeout. + */ + timeoutMilliseconds: number; } /** @@ -112,6 +116,20 @@ const SignatureOption = new ConfigurableOption( ); } ); +const TimeoutOption = new ConfigurableOption( + 'timeout', + 'CLOUD_RUN_TIMEOUT_SECONDS', + 0, + (x: string | number) => { + if (typeof x === 'string') { + x = parseInt(x, 10); + } + if (isNaN(x) || x < 0) { + throw new OptionsError('Timeout must be a positive integer'); + } + return x * 1000; + } +); export const requiredNodeJsVersionForLogExecutionID = '13.0.0'; const ExecutionIdOption = new ConfigurableOption( @@ -158,6 +176,7 @@ export const parseOptions = ( FunctionTargetOption.cliOption, SignatureOption.cliOption, SourceLocationOption.cliOption, + TimeoutOption.cliOption, ], }); return { @@ -165,6 +184,7 @@ export const parseOptions = ( target: FunctionTargetOption.parse(argv, envVars), sourceLocation: SourceLocationOption.parse(argv, envVars), signatureType: SignatureOption.parse(argv, envVars), + timeoutMilliseconds: TimeoutOption.parse(argv, envVars), printHelp: cliArgs[2] === '-h' || cliArgs[2] === '--help', enableExecutionId: ExecutionIdOption.parse(argv, envVars), }; diff --git a/src/server.ts b/src/server.ts index e4080d5c..9ce9396a 100644 --- a/src/server.ts +++ b/src/server.ts @@ -17,27 +17,27 @@ import * as express from 'express'; import * as http from 'http'; import * as onFinished from 'on-finished'; import {HandlerFunction, Request, Response} from './functions'; -import {SignatureType} from './types'; import {setLatestRes} from './invoker'; import {legacyPubSubEventMiddleware} from './pubsub_middleware'; import {cloudEventToBackgroundEventMiddleware} from './middleware/cloud_event_to_background_event'; import {backgroundEventToCloudEventMiddleware} from './middleware/background_event_to_cloud_event'; +import {timeoutMiddleware} from './middleware/timeout'; import {wrapUserFunction} from './function_wrappers'; import {asyncLocalStorageMiddleware} from './async_local_storage'; import {executionContextMiddleware} from './execution_context'; import {errorHandler} from './logger'; +import {FrameworkOptions} from './options'; /** * Creates and configures an Express application and returns an HTTP server * which will run it. * @param userFunction User's function. - * @param functionSignatureType Type of user's function signature. + * @param options the configured Function Framework options. * @return HTTP server. */ export function getServer( userFunction: HandlerFunction, - functionSignatureType: SignatureType, - enableExecutionId: boolean + options: FrameworkOptions ): http.Server { // App to use for function executions. const app = express(); @@ -89,7 +89,7 @@ export function getServer( }; // Apply middleware - if (functionSignatureType !== 'typed') { + if (options.signatureType !== 'typed') { // If the function is not typed then JSON parsing can be done automatically, otherwise the // functions format must determine deserialization. app.use(bodyParser.json(cloudEventsBodySavingOptions)); @@ -120,8 +120,8 @@ export function getServer( app.use(asyncLocalStorageMiddleware); if ( - functionSignatureType === 'event' || - functionSignatureType === 'cloudevent' + options.signatureType === 'event' || + options.signatureType === 'cloudevent' ) { // If a Pub/Sub subscription is configured to invoke a user's function directly, the request body // needs to be marshalled into the structure that wrapEventFunction expects. This unblocks local @@ -129,14 +129,14 @@ export function getServer( app.use(legacyPubSubEventMiddleware); } - if (functionSignatureType === 'event') { + if (options.signatureType === 'event') { app.use(cloudEventToBackgroundEventMiddleware); } - if (functionSignatureType === 'cloudevent') { + if (options.signatureType === 'cloudevent') { app.use(backgroundEventToCloudEventMiddleware); } - if (functionSignatureType === 'http') { + if (options.signatureType === 'http') { app.use('/favicon.ico|/robots.txt', (req, res) => { // Neither crawlers nor browsers attempting to pull the icon find the body // contents particularly useful, so we send nothing in the response body. @@ -151,16 +151,18 @@ export function getServer( }); } + app.use(timeoutMiddleware(options.timeoutMilliseconds)); + // Set up the routes for the user's function - const requestHandler = wrapUserFunction(userFunction, functionSignatureType); - if (functionSignatureType === 'http') { + const requestHandler = wrapUserFunction(userFunction, options.signatureType); + if (options.signatureType === 'http') { app.all('/*', requestHandler); } else { app.post('/*', requestHandler); } // Error Handler - if (enableExecutionId) { + if (options.enableExecutionId) { app.use(errorHandler); } diff --git a/src/testing.ts b/src/testing.ts index f66baad7..3360a000 100644 --- a/src/testing.ts +++ b/src/testing.ts @@ -48,9 +48,13 @@ export const getTestServer = (functionName: string): Server => { `The provided function "${functionName}" was not registered. Did you forget to require the module that defined it?` ); } - return getServer( - registeredFunction.userFunction, - registeredFunction.signatureType, - /*enableExecutionId=*/ false - ); + return getServer(registeredFunction.userFunction, { + signatureType: registeredFunction.signatureType, + enableExecutionId: false, + timeoutMilliseconds: 0, + port: '0', + target: '', + sourceLocation: '', + printHelp: false, + }); }; diff --git a/test/integration/legacy_event.ts b/test/integration/legacy_event.ts index 3213acd9..a25250e6 100644 --- a/test/integration/legacy_event.ts +++ b/test/integration/legacy_event.ts @@ -17,6 +17,7 @@ import * as functions from '../../src/functions'; import * as sinon from 'sinon'; import {getServer} from '../../src/server'; import * as supertest from 'supertest'; +import {SignatureType} from '../../src/types'; const TEST_CLOUD_EVENT = { specversion: '1.0', @@ -31,6 +32,16 @@ const TEST_CLOUD_EVENT = { }, }; +const testOptions = { + signatureType: 'event' as SignatureType, + enableExecutionId: false, + timeoutMilliseconds: 0, + port: '0', + target: '', + sourceLocation: '', + printHelp: false, +}; + describe('Event Function', () => { beforeEach(() => { // Prevent log spew from the PubSub emulator request. @@ -181,14 +192,10 @@ describe('Event Function', () => { it(test.name, async () => { let receivedData: {} | null = null; let receivedContext: functions.CloudFunctionsContext | null = null; - const server = getServer( - (data: {}, context: functions.Context) => { - receivedData = data; - receivedContext = context as functions.CloudFunctionsContext; - }, - 'event', - /*enableExecutionId=*/ false - ); + const server = getServer((data: {}, context: functions.Context) => { + receivedData = data; + receivedContext = context as functions.CloudFunctionsContext; + }, testOptions); const requestHeaders = { 'Content-Type': 'application/json', ...test.headers, @@ -204,13 +211,9 @@ describe('Event Function', () => { }); it('returns a 500 if the function throws an exception', async () => { - const server = getServer( - () => { - throw 'I crashed'; - }, - 'event', - /*enableExecutionId=*/ false - ); + const server = getServer(() => { + throw 'I crashed'; + }, testOptions); await supertest(server) .post('/') .send({ diff --git a/test/middleware/timeout.ts b/test/middleware/timeout.ts new file mode 100644 index 00000000..8997cdca --- /dev/null +++ b/test/middleware/timeout.ts @@ -0,0 +1,49 @@ +import * as assert from 'assert'; +import * as sinon from 'sinon'; +import {NextFunction} from 'express'; +import {Request, Response} from '../../src/functions'; + +import {timeoutMiddleware} from '../../src/middleware/timeout'; + +describe('timeoutMiddleware', () => { + let request: Request; + let response: Response; + let next: NextFunction; + beforeEach(() => { + request = { + setTimeout: sinon.spy(), + on: sinon.spy(), + } as unknown as Request; + response = { + on: sinon.spy(), + } as unknown as Response; + next = sinon.spy(); + }); + + it('calls the next function', () => { + const middleware = timeoutMiddleware(1000); + middleware(request, response, next); + assert.strictEqual((next as sinon.SinonSpy).called, true); + }); + + it('adds an abort controller to the request', () => { + const middleware = timeoutMiddleware(1000); + middleware(request, response, next); + assert.strictEqual(!!request.abortController, true); + }); + + it('adds an abort controller to the request', () => { + const middleware = timeoutMiddleware(1000); + middleware(request, response, next); + assert.strictEqual(!!request.abortController, true); + }); + + it('sets the request timeout', () => { + const middleware = timeoutMiddleware(1000); + middleware(request, response, next); + assert.strictEqual( + (request.setTimeout as sinon.SinonSpy).calledWith(1000), + true + ); + }); +}); diff --git a/test/options.ts b/test/options.ts index e928a0e4..7193f8fc 100644 --- a/test/options.ts +++ b/test/options.ts @@ -59,6 +59,7 @@ describe('parseOptions', () => { signatureType: 'http', printHelp: false, enableExecutionId: false, + timeoutMilliseconds: 0, }, }, { @@ -72,6 +73,8 @@ describe('parseOptions', () => { '--signature-type', 'cloudevent', '--source=/source', + '--timeout', + '6', ], envVars: {}, expectedOptions: { @@ -81,6 +84,7 @@ describe('parseOptions', () => { signatureType: 'cloudevent', printHelp: false, enableExecutionId: false, + timeoutMilliseconds: 6000, }, }, { @@ -91,6 +95,7 @@ describe('parseOptions', () => { FUNCTION_TARGET: 'helloWorld', FUNCTION_SIGNATURE_TYPE: 'cloudevent', FUNCTION_SOURCE: '/source', + CLOUD_RUN_TIMEOUT_SECONDS: '2', }, expectedOptions: { port: '1234', @@ -99,6 +104,7 @@ describe('parseOptions', () => { signatureType: 'cloudevent', printHelp: false, enableExecutionId: false, + timeoutMilliseconds: 2000, }, }, { @@ -112,12 +118,14 @@ describe('parseOptions', () => { '--signature-type', 'cloudevent', '--source=/source', + '--timeout=3', ], envVars: { PORT: '4567', FUNCTION_TARGET: 'fooBar', FUNCTION_SIGNATURE_TYPE: 'event', FUNCTION_SOURCE: '/somewhere/else', + CLOUD_RUN_TIMEOUT_SECONDS: '5', }, expectedOptions: { port: '1234', @@ -126,6 +134,7 @@ describe('parseOptions', () => { signatureType: 'cloudevent', printHelp: false, enableExecutionId: false, + timeoutMilliseconds: 3000, }, }, ]; @@ -197,9 +206,28 @@ describe('parseOptions', () => { }); }); - it('throws an exception for invalid signature types', () => { - assert.throws(() => { - parseOptions(['bin/node', 'index.js', '--signature-type=monkey']); + const validationErrorTestCases: TestData[] = [ + { + name: 'signature type is invalid', + cliOpts: ['bin/node', 'index.js', '--signature-type=monkey'], + envVars: {}, + }, + { + name: 'timeout is not a number', + cliOpts: ['bin/node', '/index.js', '--timeout=foobar'], + envVars: {}, + }, + { + name: 'timeout is a negative number', + cliOpts: ['bin/node', '/index.js', '--timeout=-10'], + envVars: {}, + }, + ]; + validationErrorTestCases.forEach(testCase => { + it('throws an exception when ' + testCase.name, () => { + assert.throws(() => { + parseOptions(testCase.cliOpts, testCase.envVars); + }); }); }); });