diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 5087a692f..f0c5da780 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.10.8", + "version": "1.10.11", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", diff --git a/packages/grpc-js/src/client.ts b/packages/grpc-js/src/client.ts index 995d5b328..dc75ac482 100644 --- a/packages/grpc-js/src/client.ts +++ b/packages/grpc-js/src/client.ts @@ -330,7 +330,7 @@ export class Client { // eslint-disable-next-line @typescript-eslint/no-explicit-any onReceiveMessage(message: any) { if (responseMessage !== null) { - call.cancelWithStatus(Status.INTERNAL, 'Too many responses received'); + call.cancelWithStatus(Status.UNIMPLEMENTED, 'Too many responses received'); } responseMessage = message; }, @@ -345,7 +345,7 @@ export class Client { callProperties.callback!( callErrorFromStatus( { - code: Status.INTERNAL, + code: Status.UNIMPLEMENTED, details: 'No message received', metadata: status.metadata, }, @@ -463,9 +463,10 @@ export class Client { // eslint-disable-next-line @typescript-eslint/no-explicit-any onReceiveMessage(message: any) { if (responseMessage !== null) { - call.cancelWithStatus(Status.INTERNAL, 'Too many responses received'); + call.cancelWithStatus(Status.UNIMPLEMENTED, 'Too many responses received'); } responseMessage = message; + call.startRead(); }, onReceiveStatus(status: StatusObject) { if (receivedStatus) { @@ -478,7 +479,7 @@ export class Client { callProperties.callback!( callErrorFromStatus( { - code: Status.INTERNAL, + code: Status.UNIMPLEMENTED, details: 'No message received', metadata: status.metadata, }, diff --git a/packages/grpc-js/src/compression-filter.ts b/packages/grpc-js/src/compression-filter.ts index 136311ad5..189749f03 100644 --- a/packages/grpc-js/src/compression-filter.ts +++ b/packages/grpc-js/src/compression-filter.ts @@ -21,7 +21,7 @@ import { WriteObject, WriteFlags } from './call-interface'; import { Channel } from './channel'; import { ChannelOptions } from './channel-options'; import { CompressionAlgorithms } from './compression-algorithms'; -import { LogVerbosity } from './constants'; +import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, DEFAULT_MAX_SEND_MESSAGE_LENGTH, LogVerbosity, Status } from './constants'; import { BaseFilter, Filter, FilterFactory } from './filter'; import * as logging from './logging'; import { Metadata, MetadataValue } from './metadata'; @@ -98,6 +98,10 @@ class IdentityHandler extends CompressionHandler { } class DeflateHandler extends CompressionHandler { + constructor(private maxRecvMessageLength: number) { + super(); + } + compressMessage(message: Buffer) { return new Promise((resolve, reject) => { zlib.deflate(message, (err, output) => { @@ -112,18 +116,34 @@ class DeflateHandler extends CompressionHandler { decompressMessage(message: Buffer) { return new Promise((resolve, reject) => { - zlib.inflate(message, (err, output) => { - if (err) { - reject(err); - } else { - resolve(output); + let totalLength = 0; + const messageParts: Buffer[] = []; + const decompresser = zlib.createInflate(); + decompresser.on('data', (chunk: Buffer) => { + messageParts.push(chunk); + totalLength += chunk.byteLength; + if (this.maxRecvMessageLength !== -1 && totalLength > this.maxRecvMessageLength) { + decompresser.destroy(); + reject({ + code: Status.RESOURCE_EXHAUSTED, + details: `Received message that decompresses to a size larger than ${this.maxRecvMessageLength}` + }); } }); + decompresser.on('end', () => { + resolve(Buffer.concat(messageParts)); + }); + decompresser.write(message); + decompresser.end(); }); } } class GzipHandler extends CompressionHandler { + constructor(private maxRecvMessageLength: number) { + super(); + } + compressMessage(message: Buffer) { return new Promise((resolve, reject) => { zlib.gzip(message, (err, output) => { @@ -138,13 +158,25 @@ class GzipHandler extends CompressionHandler { decompressMessage(message: Buffer) { return new Promise((resolve, reject) => { - zlib.unzip(message, (err, output) => { - if (err) { - reject(err); - } else { - resolve(output); + let totalLength = 0; + const messageParts: Buffer[] = []; + const decompresser = zlib.createGunzip(); + decompresser.on('data', (chunk: Buffer) => { + messageParts.push(chunk); + totalLength += chunk.byteLength; + if (this.maxRecvMessageLength !== -1 && totalLength > this.maxRecvMessageLength) { + decompresser.destroy(); + reject({ + code: Status.RESOURCE_EXHAUSTED, + details: `Received message that decompresses to a size larger than ${this.maxRecvMessageLength}` + }); } }); + decompresser.on('end', () => { + resolve(Buffer.concat(messageParts)); + }); + decompresser.write(message); + decompresser.end(); }); } } @@ -169,14 +201,14 @@ class UnknownHandler extends CompressionHandler { } } -function getCompressionHandler(compressionName: string): CompressionHandler { +function getCompressionHandler(compressionName: string, maxReceiveMessageSize: number): CompressionHandler { switch (compressionName) { case 'identity': return new IdentityHandler(); case 'deflate': - return new DeflateHandler(); + return new DeflateHandler(maxReceiveMessageSize); case 'gzip': - return new GzipHandler(); + return new GzipHandler(maxReceiveMessageSize); default: return new UnknownHandler(compressionName); } @@ -186,6 +218,8 @@ export class CompressionFilter extends BaseFilter implements Filter { private sendCompression: CompressionHandler = new IdentityHandler(); private receiveCompression: CompressionHandler = new IdentityHandler(); private currentCompressionAlgorithm: CompressionAlgorithm = 'identity'; + private maxReceiveMessageLength: number; + private maxSendMessageLength: number; constructor( channelOptions: ChannelOptions, @@ -195,6 +229,8 @@ export class CompressionFilter extends BaseFilter implements Filter { const compressionAlgorithmKey = channelOptions['grpc.default_compression_algorithm']; + this.maxReceiveMessageLength = channelOptions['grpc.max_receive_message_length'] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH; + this.maxSendMessageLength = channelOptions['grpc.max_send_message_length'] ?? DEFAULT_MAX_SEND_MESSAGE_LENGTH; if (compressionAlgorithmKey !== undefined) { if (isCompressionAlgorithmKey(compressionAlgorithmKey)) { const clientSelectedEncoding = CompressionAlgorithms[ @@ -215,7 +251,8 @@ export class CompressionFilter extends BaseFilter implements Filter { ) { this.currentCompressionAlgorithm = clientSelectedEncoding; this.sendCompression = getCompressionHandler( - this.currentCompressionAlgorithm + this.currentCompressionAlgorithm, + -1 ); } } else { @@ -247,7 +284,7 @@ export class CompressionFilter extends BaseFilter implements Filter { if (receiveEncoding.length > 0) { const encoding: MetadataValue = receiveEncoding[0]; if (typeof encoding === 'string') { - this.receiveCompression = getCompressionHandler(encoding); + this.receiveCompression = getCompressionHandler(encoding, this.maxReceiveMessageLength); } } metadata.remove('grpc-encoding'); @@ -279,6 +316,12 @@ export class CompressionFilter extends BaseFilter implements Filter { * and the output is a framed and possibly compressed message. For this * reason, this filter should be at the bottom of the filter stack */ const resolvedMessage: WriteObject = await message; + if (this.maxSendMessageLength !== -1 && resolvedMessage.message.length > this.maxSendMessageLength) { + throw { + code: Status.RESOURCE_EXHAUSTED, + details: `Attempted to send message with a size larger than ${this.maxSendMessageLength}` + }; + } let compress: boolean; if (this.sendCompression instanceof IdentityHandler) { compress = false; diff --git a/packages/grpc-js/src/internal-channel.ts b/packages/grpc-js/src/internal-channel.ts index 469ace557..857f2a4eb 100644 --- a/packages/grpc-js/src/internal-channel.ts +++ b/packages/grpc-js/src/internal-channel.ts @@ -20,7 +20,7 @@ import { ChannelOptions } from './channel-options'; import { ResolvingLoadBalancer } from './resolving-load-balancer'; import { SubchannelPool, getSubchannelPool } from './subchannel-pool'; import { ChannelControlHelper } from './load-balancer'; -import { UnavailablePicker, Picker, QueuePicker } from './picker'; +import { UnavailablePicker, Picker, QueuePicker, PickArgs, PickResult, PickResultType } from './picker'; import { Metadata } from './metadata'; import { Status, LogVerbosity, Propagate } from './constants'; import { FilterStackFactory } from './filter-stack'; @@ -33,7 +33,6 @@ import { } from './resolver'; import { trace } from './logging'; import { SubchannelAddress } from './subchannel-address'; -import { MaxMessageSizeFilterFactory } from './max-message-size-filter'; import { mapProxyName } from './http_proxy'; import { GrpcUri, parseUri, uriToString } from './uri-parser'; import { ServerSurfaceCall } from './server-call'; @@ -144,6 +143,22 @@ class ChannelSubchannelWrapper } } +class ShutdownPicker implements Picker { + pick(pickArgs: PickArgs): PickResult { + return { + pickResultType: PickResultType.DROP, + status: { + code: Status.UNAVAILABLE, + details: 'Channel closed before call started', + metadata: new Metadata() + }, + subchannel: null, + onCallStarted: null, + onCallEnded: null + } + } +} + export class InternalChannel { private readonly resolvingLoadBalancer: ResolvingLoadBalancer; private readonly subchannelPool: SubchannelPool; @@ -402,7 +417,6 @@ export class InternalChannel { } ); this.filterStackFactory = new FilterStackFactory([ - new MaxMessageSizeFilterFactory(this.options), new CompressionFilterFactory(this, this.options), ]); this.trace( @@ -538,7 +552,9 @@ export class InternalChannel { } getConfig(method: string, metadata: Metadata): GetConfigResult { - this.resolvingLoadBalancer.exitIdle(); + if (this.connectivityState !== ConnectivityState.SHUTDOWN) { + this.resolvingLoadBalancer.exitIdle(); + } if (this.configSelector) { return { type: 'SUCCESS', @@ -747,6 +763,15 @@ export class InternalChannel { close() { this.resolvingLoadBalancer.destroy(); this.updateState(ConnectivityState.SHUTDOWN); + this.currentPicker = new ShutdownPicker(); + for (const call of this.configSelectionQueue) { + call.cancelWithStatus(Status.UNAVAILABLE, 'Channel closed before call started'); + } + this.configSelectionQueue = []; + for (const call of this.pickQueue) { + call.cancelWithStatus(Status.UNAVAILABLE, 'Channel closed before call started'); + } + this.pickQueue = []; clearInterval(this.callRefTimer); if (this.idleTimer) { clearTimeout(this.idleTimer); diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index f6c43b33d..e042e1161 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -32,7 +32,7 @@ import { PickResultType, UnavailablePicker, } from './picker'; -import { Endpoint, SubchannelAddress } from './subchannel-address'; +import { Endpoint, SubchannelAddress, subchannelAddressToString } from './subchannel-address'; import * as logging from './logging'; import { LogVerbosity } from './constants'; import { @@ -348,7 +348,6 @@ export class PickFirstLoadBalancer implements LoadBalancer { if (newState !== ConnectivityState.READY) { this.removeCurrentPick(); this.calculateAndReportNewState(); - this.requestReresolution(); } return; } @@ -483,6 +482,15 @@ export class PickFirstLoadBalancer implements LoadBalancer { subchannel: this.channelControlHelper.createSubchannel(address, {}), hasReportedTransientFailure: false, })); + trace('connectToAddressList([' + addressList.map(address => subchannelAddressToString(address)) + '])'); + for (const { subchannel } of newChildrenList) { + if (subchannel.getConnectivityState() === ConnectivityState.READY) { + this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef()); + subchannel.addConnectivityStateListener(this.subchannelStateListener); + this.pickSubchannel(subchannel); + return; + } + } /* Ref each subchannel before resetting the list, to ensure that * subchannels shared between the list don't drop to 0 refs during the * transition. */ @@ -494,10 +502,6 @@ export class PickFirstLoadBalancer implements LoadBalancer { this.children = newChildrenList; for (const { subchannel } of this.children) { subchannel.addConnectivityStateListener(this.subchannelStateListener); - if (subchannel.getConnectivityState() === ConnectivityState.READY) { - this.pickSubchannel(subchannel); - return; - } } for (const child of this.children) { if ( @@ -527,6 +531,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { const rawAddressList = ([] as SubchannelAddress[]).concat( ...endpointList.map(endpoint => endpoint.addresses) ); + trace('updateAddressList([' + rawAddressList.map(address => subchannelAddressToString(address)) + '])'); if (rawAddressList.length === 0) { throw new Error('No addresses in endpoint list passed to pick_first'); } diff --git a/packages/grpc-js/src/max-message-size-filter.ts b/packages/grpc-js/src/max-message-size-filter.ts deleted file mode 100644 index b6df374b2..000000000 --- a/packages/grpc-js/src/max-message-size-filter.ts +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import { BaseFilter, Filter, FilterFactory } from './filter'; -import { WriteObject } from './call-interface'; -import { - Status, - DEFAULT_MAX_SEND_MESSAGE_LENGTH, - DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, -} from './constants'; -import { ChannelOptions } from './channel-options'; -import { Metadata } from './metadata'; - -export class MaxMessageSizeFilter extends BaseFilter implements Filter { - private maxSendMessageSize: number = DEFAULT_MAX_SEND_MESSAGE_LENGTH; - private maxReceiveMessageSize: number = DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH; - constructor(options: ChannelOptions) { - super(); - if ('grpc.max_send_message_length' in options) { - this.maxSendMessageSize = options['grpc.max_send_message_length']!; - } - if ('grpc.max_receive_message_length' in options) { - this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!; - } - } - - async sendMessage(message: Promise): Promise { - /* A configured size of -1 means that there is no limit, so skip the check - * entirely */ - if (this.maxSendMessageSize === -1) { - return message; - } else { - const concreteMessage = await message; - if (concreteMessage.message.length > this.maxSendMessageSize) { - throw { - code: Status.RESOURCE_EXHAUSTED, - details: `Sent message larger than max (${concreteMessage.message.length} vs. ${this.maxSendMessageSize})`, - metadata: new Metadata(), - }; - } else { - return concreteMessage; - } - } - } - - async receiveMessage(message: Promise): Promise { - /* A configured size of -1 means that there is no limit, so skip the check - * entirely */ - if (this.maxReceiveMessageSize === -1) { - return message; - } else { - const concreteMessage = await message; - if (concreteMessage.length > this.maxReceiveMessageSize) { - throw { - code: Status.RESOURCE_EXHAUSTED, - details: `Received message larger than max (${concreteMessage.length} vs. ${this.maxReceiveMessageSize})`, - metadata: new Metadata(), - }; - } else { - return concreteMessage; - } - } - } -} - -export class MaxMessageSizeFilterFactory - implements FilterFactory -{ - constructor(private readonly options: ChannelOptions) {} - - createFilter(): MaxMessageSizeFilter { - return new MaxMessageSizeFilter(this.options); - } -} diff --git a/packages/grpc-js/src/server-interceptors.ts b/packages/grpc-js/src/server-interceptors.ts index 8afc59b17..8e45a8082 100644 --- a/packages/grpc-js/src/server-interceptors.ts +++ b/packages/grpc-js/src/server-interceptors.ts @@ -30,14 +30,10 @@ import { import * as http2 from 'http2'; import { getErrorMessage } from './error'; import * as zlib from 'zlib'; -import { promisify } from 'util'; import { StreamDecoder } from './stream-decoder'; import { CallEventTracker } from './transport'; import * as logging from './logging'; -const unzip = promisify(zlib.unzip); -const inflate = promisify(zlib.inflate); - const TRACER_NAME = 'server_call'; function trace(text: string) { @@ -503,7 +499,7 @@ export class BaseServerInterceptingCall private wantTrailers = false; private cancelNotified = false; private incomingEncoding = 'identity'; - private decoder = new StreamDecoder(); + private decoder: StreamDecoder; private readQueue: ReadQueueEntry[] = []; private isReadPending = false; private receivedHalfClose = false; @@ -563,6 +559,8 @@ export class BaseServerInterceptingCall } this.host = headers[':authority'] ?? headers.host!; + this.decoder = new StreamDecoder(this.maxReceiveMessageSize); + const metadata = Metadata.fromHttp2Headers(headers); if (logging.isTracerEnabled(TRACER_NAME)) { @@ -683,18 +681,41 @@ export class BaseServerInterceptingCall message: Buffer, encoding: string ): Buffer | Promise { - switch (encoding) { - case 'deflate': - return inflate(message.subarray(5)); - case 'gzip': - return unzip(message.subarray(5)); - case 'identity': - return message.subarray(5); - default: - return Promise.reject({ - code: Status.UNIMPLEMENTED, - details: `Received message compressed with unsupported encoding "${encoding}"`, + const messageContents = message.subarray(5); + if (encoding === 'identity') { + return messageContents; + } else if (encoding === 'deflate' || encoding === 'gzip') { + let decompresser: zlib.Gunzip | zlib.Deflate; + if (encoding === 'deflate') { + decompresser = zlib.createInflate(); + } else { + decompresser = zlib.createGunzip(); + } + return new Promise((resolve, reject) => { + let totalLength = 0 + const messageParts: Buffer[] = []; + decompresser.on('data', (chunk: Buffer) => { + messageParts.push(chunk); + totalLength += chunk.byteLength; + if (this.maxReceiveMessageSize !== -1 && totalLength > this.maxReceiveMessageSize) { + decompresser.destroy(); + reject({ + code: Status.RESOURCE_EXHAUSTED, + details: `Received message that decompresses to a size larger than ${this.maxReceiveMessageSize}` + }); + } + }); + decompresser.on('end', () => { + resolve(Buffer.concat(messageParts)); }); + decompresser.write(messageContents); + decompresser.end(); + }); + } else { + return Promise.reject({ + code: Status.UNIMPLEMENTED, + details: `Received message compressed with unsupported encoding "${encoding}"`, + }); } } @@ -707,10 +728,16 @@ export class BaseServerInterceptingCall const compressedMessageEncoding = compressed ? this.incomingEncoding : 'identity'; - const decompressedMessage = await this.decompressMessage( - queueEntry.compressedMessage!, - compressedMessageEncoding - ); + let decompressedMessage: Buffer; + try { + decompressedMessage = await this.decompressMessage( + queueEntry.compressedMessage!, + compressedMessageEncoding + ); + } catch (err) { + this.sendStatus(err as PartialStatusObject); + return; + } try { queueEntry.parsedMessage = this.handler.deserialize(decompressedMessage); } catch (err) { @@ -752,23 +779,16 @@ export class BaseServerInterceptingCall ' received data frame of size ' + data.length ); - const rawMessages = this.decoder.write(data); + let rawMessages: Buffer[]; + try { + rawMessages = this.decoder.write(data); + } catch (e) { + this.sendStatus({ code: Status.RESOURCE_EXHAUSTED, details: (e as Error).message }); + return; + } for (const messageBytes of rawMessages) { this.stream.pause(); - if ( - this.maxReceiveMessageSize !== -1 && - messageBytes.length - 5 > this.maxReceiveMessageSize - ) { - this.sendStatus({ - code: Status.RESOURCE_EXHAUSTED, - details: `Received message larger than max (${ - messageBytes.length - 5 - } vs. ${this.maxReceiveMessageSize})`, - metadata: null, - }); - return; - } const queueEntry: ReadQueueEntry = { type: 'COMPRESSED', compressedMessage: messageBytes, diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 1e4611117..cb60943ce 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -443,6 +443,14 @@ export class Server { ); } + private keepaliveTrace(text: string): void { + logging.trace( + LogVerbosity.DEBUG, + 'keepalive', + '(' + this.channelzRef.id + ') ' + text + ); + } + addProtoService(): never { throw new Error('Not implemented. Use addService() instead'); } @@ -1467,8 +1475,7 @@ export class Server { let connectionAgeTimer: NodeJS.Timeout | null = null; let connectionAgeGraceTimer: NodeJS.Timeout | null = null; - let keeapliveTimeTimer: NodeJS.Timeout | null = null; - let keepaliveTimeoutTimer: NodeJS.Timeout | null = null; + let keepaliveTimer: NodeJS.Timeout | null = null; let sessionClosedByServer = false; const idleTimeoutObj = this.enableIdleTimeout(session); @@ -1511,41 +1518,90 @@ export class Server { connectionAgeTimer.unref?.(); } - if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) { - keeapliveTimeTimer = setInterval(() => { - keepaliveTimeoutTimer = setTimeout(() => { - sessionClosedByServer = true; - session.close(); - }, this.keepaliveTimeoutMs); - keepaliveTimeoutTimer.unref?.(); + const clearKeepaliveTimeout = () => { + if (keepaliveTimer) { + clearTimeout(keepaliveTimer); + keepaliveTimer = null; + } + }; - try { - session.ping( - (err: Error | null, duration: number, payload: Buffer) => { - if (keepaliveTimeoutTimer) { - clearTimeout(keepaliveTimeoutTimer); - } - - if (err) { - sessionClosedByServer = true; - this.trace( - 'Connection dropped due to error of a ping frame ' + - err.message + - ' return in ' + - duration - ); - session.close(); - } + const canSendPing = () => { + return ( + !session.destroyed && + this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS && + this.keepaliveTimeMs > 0 + ); + }; + + /* eslint-disable-next-line prefer-const */ + let sendPing: () => void; // hoisted for use in maybeStartKeepalivePingTimer + + const maybeStartKeepalivePingTimer = () => { + if (!canSendPing()) { + return; + } + this.keepaliveTrace( + 'Starting keepalive timer for ' + this.keepaliveTimeMs + 'ms' + ); + keepaliveTimer = setTimeout(() => { + clearKeepaliveTimeout(); + sendPing(); + }, this.keepaliveTimeMs); + keepaliveTimer.unref?.(); + }; + + sendPing = () => { + if (!canSendPing()) { + return; + } + this.keepaliveTrace( + 'Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms' + ); + let pingSendError = ''; + try { + const pingSentSuccessfully = session.ping( + (err: Error | null, duration: number, payload: Buffer) => { + clearKeepaliveTimeout(); + if (err) { + this.keepaliveTrace('Ping failed with error: ' + err.message); + sessionClosedByServer = true; + session.close(); + } else { + this.keepaliveTrace('Received ping response'); + maybeStartKeepalivePingTimer(); } - ); - } catch (e) { - clearTimeout(keepaliveTimeoutTimer); - // The ping can't be sent because the session is already closed - session.destroy(); + } + ); + if (!pingSentSuccessfully) { + pingSendError = 'Ping returned false'; } - }, this.keepaliveTimeMs); - keeapliveTimeTimer.unref?.(); - } + } catch (e) { + // grpc/grpc-node#2139 + pingSendError = + (e instanceof Error ? e.message : '') || 'Unknown error'; + } + + if (pingSendError) { + this.keepaliveTrace('Ping send failed: ' + pingSendError); + this.trace( + 'Connection dropped due to ping send error: ' + pingSendError + ); + sessionClosedByServer = true; + session.close(); + return; + } + + keepaliveTimer = setTimeout(() => { + clearKeepaliveTimeout(); + this.keepaliveTrace('Ping timeout passed without response'); + this.trace('Connection dropped by keepalive timeout'); + sessionClosedByServer = true; + session.close(); + }, this.keepaliveTimeoutMs); + keepaliveTimer.unref?.(); + }; + + maybeStartKeepalivePingTimer(); session.on('close', () => { if (!sessionClosedByServer) { @@ -1562,12 +1618,7 @@ export class Server { clearTimeout(connectionAgeGraceTimer); } - if (keeapliveTimeTimer) { - clearInterval(keeapliveTimeTimer); - if (keepaliveTimeoutTimer) { - clearTimeout(keepaliveTimeoutTimer); - } - } + clearKeepaliveTimeout(); if (idleTimeoutObj !== null) { clearTimeout(idleTimeoutObj.timeout); @@ -1612,8 +1663,7 @@ export class Server { let connectionAgeTimer: NodeJS.Timeout | null = null; let connectionAgeGraceTimer: NodeJS.Timeout | null = null; - let keeapliveTimeTimer: NodeJS.Timeout | null = null; - let keepaliveTimeoutTimer: NodeJS.Timeout | null = null; + let keepaliveTimeout: NodeJS.Timeout | null = null; let sessionClosedByServer = false; const idleTimeoutObj = this.enableIdleTimeout(session); @@ -1655,49 +1705,103 @@ export class Server { connectionAgeTimer.unref?.(); } - if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) { - keeapliveTimeTimer = setInterval(() => { - keepaliveTimeoutTimer = setTimeout(() => { - sessionClosedByServer = true; - this.channelzTrace.addTrace( - 'CT_INFO', - 'Connection dropped by keepalive timeout from ' + clientAddress - ); + const clearKeepaliveTimeout = () => { + if (keepaliveTimeout) { + clearTimeout(keepaliveTimeout); + keepaliveTimeout = null; + } + }; + + const canSendPing = () => { + return ( + !session.destroyed && + this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS && + this.keepaliveTimeMs > 0 + ); + }; - session.close(); - }, this.keepaliveTimeoutMs); - keepaliveTimeoutTimer.unref?.(); + /* eslint-disable-next-line prefer-const */ + let sendPing: () => void; // hoisted for use in maybeStartKeepalivePingTimer - try { - session.ping( - (err: Error | null, duration: number, payload: Buffer) => { - if (keepaliveTimeoutTimer) { - clearTimeout(keepaliveTimeoutTimer); - } - - if (err) { - sessionClosedByServer = true; - this.channelzTrace.addTrace( - 'CT_INFO', - 'Connection dropped due to error of a ping frame ' + - err.message + - ' return in ' + - duration - ); - - session.close(); - } + const maybeStartKeepalivePingTimer = () => { + if (!canSendPing()) { + return; + } + this.keepaliveTrace( + 'Starting keepalive timer for ' + this.keepaliveTimeMs + 'ms' + ); + keepaliveTimeout = setTimeout(() => { + clearKeepaliveTimeout(); + sendPing(); + }, this.keepaliveTimeMs); + keepaliveTimeout.unref?.(); + }; + + sendPing = () => { + if (!canSendPing()) { + return; + } + this.keepaliveTrace( + 'Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms' + ); + let pingSendError = ''; + try { + const pingSentSuccessfully = session.ping( + (err: Error | null, duration: number, payload: Buffer) => { + clearKeepaliveTimeout(); + if (err) { + this.keepaliveTrace('Ping failed with error: ' + err.message); + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection dropped due to error of a ping frame ' + + err.message + + ' return in ' + + duration + ); + sessionClosedByServer = true; + session.close(); + } else { + this.keepaliveTrace('Received ping response'); + maybeStartKeepalivePingTimer(); } - ); - channelzSessionInfo.keepAlivesSent += 1; - } catch (e) { - clearTimeout(keepaliveTimeoutTimer); - // The ping can't be sent because the session is already closed - session.destroy(); + } + ); + if (!pingSentSuccessfully) { + pingSendError = 'Ping returned false'; } - }, this.keepaliveTimeMs); - keeapliveTimeTimer.unref?.(); - } + } catch (e) { + // grpc/grpc-node#2139 + pingSendError = + (e instanceof Error ? e.message : '') || 'Unknown error'; + } + + if (pingSendError) { + this.keepaliveTrace('Ping send failed: ' + pingSendError); + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection dropped due to ping send error: ' + pingSendError + ); + sessionClosedByServer = true; + session.close(); + return; + } + + channelzSessionInfo.keepAlivesSent += 1; + + keepaliveTimeout = setTimeout(() => { + clearKeepaliveTimeout(); + this.keepaliveTrace('Ping timeout passed without response'); + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection dropped by keepalive timeout from ' + clientAddress + ); + sessionClosedByServer = true; + session.close(); + }, this.keepaliveTimeoutMs); + keepaliveTimeout.unref?.(); + }; + + maybeStartKeepalivePingTimer(); session.on('close', () => { if (!sessionClosedByServer) { @@ -1718,12 +1822,7 @@ export class Server { clearTimeout(connectionAgeGraceTimer); } - if (keeapliveTimeTimer) { - clearInterval(keeapliveTimeTimer); - if (keepaliveTimeoutTimer) { - clearTimeout(keepaliveTimeoutTimer); - } - } + clearKeepaliveTimeout(); if (idleTimeoutObj !== null) { clearTimeout(idleTimeoutObj.timeout); @@ -1782,19 +1881,22 @@ export class Server { // for future refreshes if ( sessionInfo !== undefined && - sessionInfo.activeStreams === 0 && - Date.now() - sessionInfo.lastIdle >= ctx.sessionIdleTimeout + sessionInfo.activeStreams === 0 ) { - ctx.trace( - 'Session idle timeout triggered for ' + - socket?.remoteAddress + - ':' + - socket?.remotePort + - ' last idle at ' + - sessionInfo.lastIdle - ); + if (Date.now() - sessionInfo.lastIdle >= ctx.sessionIdleTimeout) { + ctx.trace( + 'Session idle timeout triggered for ' + + socket?.remoteAddress + + ':' + + socket?.remotePort + + ' last idle at ' + + sessionInfo.lastIdle + ); - ctx.closeSession(session); + ctx.closeSession(session); + } else { + sessionInfo.timeout.refresh(); + } } } diff --git a/packages/grpc-js/src/stream-decoder.ts b/packages/grpc-js/src/stream-decoder.ts index 671ad41ae..ea669d14c 100644 --- a/packages/grpc-js/src/stream-decoder.ts +++ b/packages/grpc-js/src/stream-decoder.ts @@ -30,6 +30,8 @@ export class StreamDecoder { private readPartialMessage: Buffer[] = []; private readMessageRemaining = 0; + constructor(private maxReadMessageLength: number) {} + write(data: Buffer): Buffer[] { let readHead = 0; let toRead: number; @@ -60,6 +62,9 @@ export class StreamDecoder { // readSizeRemaining >=0 here if (this.readSizeRemaining === 0) { this.readMessageSize = this.readPartialSize.readUInt32BE(0); + if (this.maxReadMessageLength !== -1 && this.readMessageSize > this.maxReadMessageLength) { + throw new Error(`Received message larger than max (${this.readMessageSize} vs ${this.maxReadMessageLength})`); + } this.readMessageRemaining = this.readMessageSize; if (this.readMessageRemaining > 0) { this.readState = ReadState.READING_MESSAGE; diff --git a/packages/grpc-js/src/subchannel-call.ts b/packages/grpc-js/src/subchannel-call.ts index 0ce7d72cb..bee00119f 100644 --- a/packages/grpc-js/src/subchannel-call.ts +++ b/packages/grpc-js/src/subchannel-call.ts @@ -18,7 +18,7 @@ import * as http2 from 'http2'; import * as os from 'os'; -import { Status } from './constants'; +import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, Status } from './constants'; import { Metadata } from './metadata'; import { StreamDecoder } from './stream-decoder'; import * as logging from './logging'; @@ -116,7 +116,7 @@ function mapHttpStatusCode(code: number): StatusObject { } export class Http2SubchannelCall implements SubchannelCall { - private decoder = new StreamDecoder(); + private decoder: StreamDecoder; private isReadFilterPending = false; private isPushPending = false; @@ -147,6 +147,8 @@ export class Http2SubchannelCall implements SubchannelCall { private readonly transport: Transport, private readonly callId: number ) { + const maxReceiveMessageLength = transport.getOptions()['grpc.max_receive_message_length'] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH; + this.decoder = new StreamDecoder(maxReceiveMessageLength); http2Stream.on('response', (headers, flags) => { let headersString = ''; for (const header of Object.keys(headers)) { @@ -182,7 +184,13 @@ export class Http2SubchannelCall implements SubchannelCall { return; } this.trace('receive HTTP/2 data frame of length ' + data.length); - const messages = this.decoder.write(data); + let messages: Buffer[]; + try { + messages = this.decoder.write(data); + } catch (e) { + this.cancelWithStatus(Status.RESOURCE_EXHAUSTED, (e as Error).message); + return; + } for (const message of messages) { this.trace('parsed message of length ' + message.length); diff --git a/packages/grpc-js/src/transport.ts b/packages/grpc-js/src/transport.ts index 66a5d4556..1acbab40e 100644 --- a/packages/grpc-js/src/transport.ts +++ b/packages/grpc-js/src/transport.ts @@ -84,6 +84,7 @@ export interface TransportDisconnectListener { export interface Transport { getChannelzRef(): SocketRef; getPeerName(): string; + getOptions(): ChannelOptions; createCall( metadata: Metadata, host: string, @@ -101,28 +102,24 @@ class Http2Transport implements Transport { /** * The amount of time in between sending pings */ - private keepaliveTimeMs = -1; + private readonly keepaliveTimeMs: number; /** * The amount of time to wait for an acknowledgement after sending a ping */ - private keepaliveTimeoutMs: number = KEEPALIVE_TIMEOUT_MS; + private readonly keepaliveTimeoutMs: number; /** - * Timer reference for timeout that indicates when to send the next ping + * Indicates whether keepalive pings should be sent without any active calls + */ + private readonly keepaliveWithoutCalls: boolean; + /** + * Timer reference indicating when to send the next ping or when the most recent ping will be considered lost. */ - private keepaliveTimerId: NodeJS.Timeout | null = null; + private keepaliveTimer: NodeJS.Timeout | null = null; /** * Indicates that the keepalive timer ran out while there were no active * calls, and a ping should be sent the next time a call starts. */ private pendingSendKeepalivePing = false; - /** - * Timer reference tracking when the most recent ping will be considered lost - */ - private keepaliveTimeoutId: NodeJS.Timeout | null = null; - /** - * Indicates whether keepalive pings should be sent without any active calls - */ - private keepaliveWithoutCalls = false; private userAgent: string; @@ -147,7 +144,7 @@ class Http2Transport implements Transport { constructor( private session: http2.ClientHttp2Session, subchannelAddress: SubchannelAddress, - options: ChannelOptions, + private options: ChannelOptions, /** * Name of the remote server, if it is not the same as the subchannel * address, i.e. if connecting through an HTTP CONNECT proxy. @@ -182,9 +179,13 @@ class Http2Transport implements Transport { if ('grpc.keepalive_time_ms' in options) { this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!; + } else { + this.keepaliveTimeMs = -1; } if ('grpc.keepalive_timeout_ms' in options) { this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms']!; + } else { + this.keepaliveTimeoutMs = KEEPALIVE_TIMEOUT_MS; } if ('grpc.keepalive_permit_without_calls' in options) { this.keepaliveWithoutCalls = @@ -195,7 +196,6 @@ class Http2Transport implements Transport { session.once('close', () => { this.trace('session closed'); - this.stopKeepalivePings(); this.handleDisconnect(); }); @@ -383,6 +383,7 @@ class Http2Transport implements Transport { * Handle connection drops, but not GOAWAYs. */ private handleDisconnect() { + this.clearKeepaliveTimeout(); this.reportDisconnectToOwner(false); /* Give calls an event loop cycle to finish naturally before reporting the * disconnnection to them. */ @@ -390,6 +391,7 @@ class Http2Transport implements Transport { for (const call of this.activeCalls) { call.onDisconnect(); } + this.session.destroy(); }); } @@ -397,63 +399,58 @@ class Http2Transport implements Transport { this.disconnectListeners.push(listener); } - private clearKeepaliveTimer() { - if (!this.keepaliveTimerId) { - return; - } - clearTimeout(this.keepaliveTimerId); - this.keepaliveTimerId = null; - } - - private clearKeepaliveTimeout() { - if (!this.keepaliveTimeoutId) { - return; - } - clearTimeout(this.keepaliveTimeoutId); - this.keepaliveTimeoutId = null; - } - private canSendPing() { return ( + !this.session.destroyed && this.keepaliveTimeMs > 0 && (this.keepaliveWithoutCalls || this.activeCalls.size > 0) ); } private maybeSendPing() { - this.clearKeepaliveTimer(); if (!this.canSendPing()) { this.pendingSendKeepalivePing = true; return; } + if (this.keepaliveTimer) { + console.error('keepaliveTimeout is not null'); + return; + } if (this.channelzEnabled) { this.keepalivesSent += 1; } this.keepaliveTrace( 'Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms' ); - if (!this.keepaliveTimeoutId) { - this.keepaliveTimeoutId = setTimeout(() => { - this.keepaliveTrace('Ping timeout passed without response'); - this.handleDisconnect(); - }, this.keepaliveTimeoutMs); - this.keepaliveTimeoutId.unref?.(); - } + this.keepaliveTimer = setTimeout(() => { + this.keepaliveTimer = null; + this.keepaliveTrace('Ping timeout passed without response'); + this.handleDisconnect(); + }, this.keepaliveTimeoutMs); + this.keepaliveTimer.unref?.(); + let pingSendError = ''; try { - this.session!.ping( + const pingSentSuccessfully = this.session.ping( (err: Error | null, duration: number, payload: Buffer) => { + this.clearKeepaliveTimeout(); if (err) { this.keepaliveTrace('Ping failed with error ' + err.message); this.handleDisconnect(); + } else { + this.keepaliveTrace('Received ping response'); + this.maybeStartKeepalivePingTimer(); } - this.keepaliveTrace('Received ping response'); - this.clearKeepaliveTimeout(); - this.maybeStartKeepalivePingTimer(); } ); + if (!pingSentSuccessfully) { + pingSendError = 'Ping returned false'; + } } catch (e) { - /* If we fail to send a ping, the connection is no longer functional, so - * we should discard it. */ + // grpc/grpc-node#2139 + pingSendError = (e instanceof Error ? e.message : '') || 'Unknown error'; + } + if (pingSendError) { + this.keepaliveTrace('Ping send failed: ' + pingSendError); this.handleDisconnect(); } } @@ -471,25 +468,28 @@ class Http2Transport implements Transport { if (this.pendingSendKeepalivePing) { this.pendingSendKeepalivePing = false; this.maybeSendPing(); - } else if (!this.keepaliveTimerId && !this.keepaliveTimeoutId) { + } else if (!this.keepaliveTimer) { this.keepaliveTrace( 'Starting keepalive timer for ' + this.keepaliveTimeMs + 'ms' ); - this.keepaliveTimerId = setTimeout(() => { + this.keepaliveTimer = setTimeout(() => { + this.keepaliveTimer = null; this.maybeSendPing(); }, this.keepaliveTimeMs); - this.keepaliveTimerId.unref?.(); + this.keepaliveTimer.unref?.(); } /* Otherwise, there is already either a keepalive timer or a ping pending, * wait for those to resolve. */ } - private stopKeepalivePings() { - if (this.keepaliveTimerId) { - clearTimeout(this.keepaliveTimerId); - this.keepaliveTimerId = null; + /** + * Clears whichever keepalive timeout is currently active, if any. + */ + private clearKeepaliveTimeout() { + if (this.keepaliveTimer) { + clearTimeout(this.keepaliveTimer); + this.keepaliveTimer = null; } - this.clearKeepaliveTimeout(); } private removeActiveCall(call: Http2SubchannelCall) { @@ -533,7 +533,7 @@ class Http2Transport implements Transport { * error here. */ try { - http2Stream = this.session!.request(headers); + http2Stream = this.session.request(headers); } catch (e) { this.handleDisconnect(); throw e; @@ -617,6 +617,10 @@ class Http2Transport implements Transport { return this.subchannelAddressString; } + getOptions() { + return this.options; + } + shutdown() { this.session.close(); unregisterChannelzRef(this.channelzRef); diff --git a/packages/grpc-js/test/fixtures/test_service.proto b/packages/grpc-js/test/fixtures/test_service.proto index 64ce0d378..2a7a303f3 100644 --- a/packages/grpc-js/test/fixtures/test_service.proto +++ b/packages/grpc-js/test/fixtures/test_service.proto @@ -21,6 +21,7 @@ message Request { bool error = 1; string message = 2; int32 errorAfter = 3; + int32 responseLength = 4; } message Response { diff --git a/packages/grpc-js/test/test-client.ts b/packages/grpc-js/test/test-client.ts index 67b396015..bbb3f063f 100644 --- a/packages/grpc-js/test/test-client.ts +++ b/packages/grpc-js/test/test-client.ts @@ -97,6 +97,21 @@ describe('Client without a server', () => { } ); }); + it('close should force calls to end', done => { + client.makeUnaryRequest( + '/service/method', + x => x, + x => x, + Buffer.from([]), + new grpc.Metadata({waitForReady: true}), + (error, value) => { + assert(error); + assert.strictEqual(error?.code, grpc.status.UNAVAILABLE); + done(); + } + ); + client.close(); + }); }); describe('Client with a nonexistent target domain', () => { @@ -133,4 +148,19 @@ describe('Client with a nonexistent target domain', () => { } ); }); + it('close should force calls to end', done => { + client.makeUnaryRequest( + '/service/method', + x => x, + x => x, + Buffer.from([]), + new grpc.Metadata({waitForReady: true}), + (error, value) => { + assert(error); + assert.strictEqual(error?.code, grpc.status.UNAVAILABLE); + done(); + } + ); + client.close(); + }); }); diff --git a/packages/grpc-js/test/test-idle-timer.ts b/packages/grpc-js/test/test-idle-timer.ts index 3f2a8ed20..ed6af2cf7 100644 --- a/packages/grpc-js/test/test-idle-timer.ts +++ b/packages/grpc-js/test/test-idle-timer.ts @@ -199,7 +199,7 @@ describe('Server idle timer', () => { grpc.connectivityState.READY ); client?.waitForClientState( - Date.now() + 600, + Date.now() + 1500, grpc.connectivityState.IDLE, done ); @@ -217,7 +217,7 @@ describe('Server idle timer', () => { ); client!.waitForClientState( - Date.now() + 600, + Date.now() + 1500, grpc.connectivityState.IDLE, err => { if (err) return done(err); @@ -248,7 +248,7 @@ describe('Server idle timer', () => { ); client!.waitForClientState( - Date.now() + 600, + Date.now() + 1500, grpc.connectivityState.IDLE, done ); diff --git a/packages/grpc-js/test/test-server-errors.ts b/packages/grpc-js/test/test-server-errors.ts index 24ccfeef3..fcc5ef7ff 100644 --- a/packages/grpc-js/test/test-server-errors.ts +++ b/packages/grpc-js/test/test-server-errors.ts @@ -33,6 +33,7 @@ import { } from '../src/server-call'; import { loadProtoFile } from './common'; +import { CompressionAlgorithms } from '../src/compression-algorithms'; const protoFile = join(__dirname, 'fixtures', 'test_service.proto'); const testServiceDef = loadProtoFile(protoFile); @@ -286,6 +287,98 @@ describe('Server serialization failure handling', () => { }); }); +describe('Cardinality violations', () => { + let client: ServiceClient; + let server: Server; + let responseCount: number = 1; + const testMessage = Buffer.from([]); + before(done => { + const serverServiceDefinition = { + testMethod: { + path: '/TestService/TestMethod/', + requestStream: false, + responseStream: true, + requestSerialize: identity, + requestDeserialize: identity, + responseDeserialize: identity, + responseSerialize: identity + } + }; + const clientServiceDefinition = { + testMethod: { + path: '/TestService/TestMethod/', + requestStream: true, + responseStream: false, + requestSerialize: identity, + requestDeserialize: identity, + responseDeserialize: identity, + responseSerialize: identity + } + }; + const TestClient = grpc.makeClientConstructor(clientServiceDefinition, 'TestService'); + server = new grpc.Server(); + server.addService(serverServiceDefinition, { + testMethod(stream: ServerWritableStream) { + for (let i = 0; i < responseCount; i++) { + stream.write(testMessage); + } + stream.end(); + } + }); + server.bindAsync('localhost:0', serverInsecureCreds, (error, port) => { + assert.ifError(error); + client = new TestClient(`localhost:${port}`, clientInsecureCreds); + done(); + }); + }); + beforeEach(() => { + responseCount = 1; + }); + after(done => { + client.close(); + server.tryShutdown(done); + }); + it('Should fail if the client sends too few messages', done => { + const call = client.testMethod((err: ServiceError, data: any) => { + assert(err); + assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED); + done(); + }); + call.end(); + }); + it('Should fail if the client sends too many messages', done => { + const call = client.testMethod((err: ServiceError, data: any) => { + assert(err); + assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED); + done(); + }); + call.write(testMessage); + call.write(testMessage); + call.end(); + }); + it('Should fail if the server sends too few messages', done => { + responseCount = 0; + const call = client.testMethod((err: ServiceError, data: any) => { + assert(err); + assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED); + done(); + }); + call.write(testMessage); + call.end(); + }); + it('Should fail if the server sends too many messages', done => { + responseCount = 2; + const call = client.testMethod((err: ServiceError, data: any) => { + assert(err); + assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED); + done(); + }); + call.write(testMessage); + call.end(); + }); + +}); + describe('Other conditions', () => { let client: ServiceClient; let server: Server; @@ -310,7 +403,7 @@ describe('Other conditions', () => { trailerMetadata ); } else { - cb(null, { count: 1 }, trailerMetadata); + cb(null, { count: 1, message: 'a'.repeat(req.responseLength) }, trailerMetadata); } }, @@ -320,6 +413,7 @@ describe('Other conditions', () => { ) { let count = 0; let errored = false; + let responseLength = 0; stream.on('data', (data: any) => { if (data.error) { @@ -327,13 +421,14 @@ describe('Other conditions', () => { errored = true; cb(new Error(message) as ServiceError, null, trailerMetadata); } else { + responseLength += data.responseLength; count++; } }); stream.on('end', () => { if (!errored) { - cb(null, { count }, trailerMetadata); + cb(null, { count, message: 'a'.repeat(responseLength) }, trailerMetadata); } }); }, @@ -349,7 +444,7 @@ describe('Other conditions', () => { }); } else { for (let i = 1; i <= 5; i++) { - stream.write({ count: i }); + stream.write({ count: i, message: 'a'.repeat(req.responseLength) }); if (req.errorAfter && req.errorAfter === i) { stream.emit('error', { code: grpc.status.UNKNOWN, @@ -376,7 +471,7 @@ describe('Other conditions', () => { err.metadata.add('count', '' + count); stream.emit('error', err); } else { - stream.write({ count }); + stream.write({ count, message: 'a'.repeat(data.responseLength) }); count++; } }); @@ -740,6 +835,44 @@ describe('Other conditions', () => { }); }); }); + + describe('Max message size', () => { + const largeMessage = 'a'.repeat(10_000_000); + it('Should be enforced on the server', done => { + client.unary({ message: largeMessage }, (error?: ServiceError) => { + assert(error); + assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED); + done(); + }); + }); + it('Should be enforced on the client', done => { + client.unary({ responseLength: 10_000_000 }, (error?: ServiceError) => { + assert(error); + assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED); + done(); + }); + }); + describe('Compressed messages', () => { + it('Should be enforced with gzip', done => { + const compressingClient = new testServiceClient(`localhost:${port}`, clientInsecureCreds, {'grpc.default_compression_algorithm': CompressionAlgorithms.gzip}); + compressingClient.unary({ message: largeMessage }, (error?: ServiceError) => { + assert(error); + assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED); + assert.match(error.details, /Received message that decompresses to a size larger/); + done(); + }); + }); + it('Should be enforced with deflate', done => { + const compressingClient = new testServiceClient(`localhost:${port}`, clientInsecureCreds, {'grpc.default_compression_algorithm': CompressionAlgorithms.deflate}); + compressingClient.unary({ message: largeMessage }, (error?: ServiceError) => { + assert(error); + assert.strictEqual(error.code, grpc.status.RESOURCE_EXHAUSTED); + assert.match(error.details, /Received message that decompresses to a size larger/); + done(); + }); + }); + }); + }); }); function identity(arg: any): any {