From 40f9fbaaaaafcc4896f3cdcb54e3910f45b1fa13 Mon Sep 17 00:00:00 2001 From: AVVS Date: Mon, 29 Apr 2024 17:23:23 -0700 Subject: [PATCH 1/9] feat: decoder perf --- packages/grpc-js/benchmarks/README.md | 84 ++++++++++ packages/grpc-js/benchmarks/echo-unary.bin | Bin 0 -> 19 bytes packages/grpc-js/benchmarks/helpers/encode.ts | 37 +++++ packages/grpc-js/benchmarks/helpers/utils.ts | 29 ++++ packages/grpc-js/benchmarks/server.ts | 42 +++++ packages/grpc-js/package.json | 16 +- packages/grpc-js/src/server-interceptors.ts | 85 ++++++---- packages/grpc-js/src/server.ts | 2 + packages/grpc-js/src/stream-decoder.ts | 151 +++++++++++++++++- .../grpc-js/test/test-call-propagation.ts | 6 +- packages/grpc-js/tsconfig.json | 1 + packages/grpc-js/tsconfig.modern.json | 8 + 12 files changed, 417 insertions(+), 44 deletions(-) create mode 100644 packages/grpc-js/benchmarks/README.md create mode 100644 packages/grpc-js/benchmarks/echo-unary.bin create mode 100644 packages/grpc-js/benchmarks/helpers/encode.ts create mode 100644 packages/grpc-js/benchmarks/helpers/utils.ts create mode 100644 packages/grpc-js/benchmarks/server.ts create mode 100644 packages/grpc-js/tsconfig.modern.json diff --git a/packages/grpc-js/benchmarks/README.md b/packages/grpc-js/benchmarks/README.md new file mode 100644 index 000000000..a3fe7a85f --- /dev/null +++ b/packages/grpc-js/benchmarks/README.md @@ -0,0 +1,84 @@ +This folder contains basic benchmarks to approximate performance impact of changes + + +## How to test + +1. pnpm build; pnpm ts-node --transpile-only ./benchmarks/server.ts +2. pnpm tsc -p tsconfig.modern.json; pnpm ts-node --transpile-only ./benchmarks/server.ts +3. ideally run with jemalloc or memory fragmentation makes everything run slower over time + +For mac os: +`DYLD_INSERT_LIBRARIES=$(brew --prefix jemalloc)/lib/libjemalloc.dylib pnpm ts-node --transpile-only ./benchmarks/server.ts` + +2. h2load -n200000 -m 50 http://localhost:9999/EchoService/Echo -c10 -t 10 -H 'content-type: application/grpc' -d ./echo-unary.bin + +Baseline on M1 Max Laptop: + +``` +ES2017 & ESNext targets are within margin of error: + +finished in 4.09s, 48851.86 req/s, 2.47MB/s +requests: 200000 total, 200000 started, 200000 done, 200000 succeeded, 0 failed, 0 errored, 0 timeout +status codes: 200000 2xx, 0 3xx, 0 4xx, 0 5xx +traffic: 10.11MB (10603710) total, 978.38KB (1001860) headers (space savings 96.40%), 3.62MB (3800000) data + min max mean sd +/- sd +time for request: 2.47ms 104.19ms 10.18ms 3.46ms 94.40% +time for connect: 790us 1.13ms 879us 98us 90.00% +time to 1st byte: 12.09ms 97.17ms 52.68ms 28.04ms 60.00% +req/s : 4885.61 4922.01 4901.67 14.07 50.00% + + +``` + +--- + +Changes to stream decoder: + +1. switch -> if + +``` +h2load -n200000 -m 50 http://localhost:9999/EchoService/Echo -c10 -t 10 -H 'content-type: application/grpc' -d ./echo-unary.bin + +finished in 3.82s, 52410.67 req/s, 2.65MB/s +requests: 200000 total, 200000 started, 200000 done, 200000 succeeded, 0 failed, 0 errored, 0 timeout +status codes: 200000 2xx, 0 3xx, 0 4xx, 0 5xx +traffic: 10.11MB (10603690) total, 978.36KB (1001840) headers (space savings 96.40%), 3.62MB (3800000) data + min max mean sd +/- sd +time for request: 1.87ms 47.64ms 9.49ms 1.89ms 97.25% +time for connect: 1.75ms 3.14ms 2.43ms 410us 70.00% +time to 1st byte: 6.58ms 45.08ms 23.01ms 13.70ms 60.00% +req/s : 5242.32 5270.74 5253.00 9.37 70.00% +``` + +2. const enum is comparable to enum + +3. fewer buffer.concat,f unsafeAlloc + + +``` +finished in 3.40s, 58763.66 req/s, 987.33KB/s +requests: 200000 total, 200000 started, 200000 done, 200000 succeeded, 0 failed, 0 errored, 0 timeout +status codes: 200000 2xx, 0 3xx, 0 4xx, 0 5xx +traffic: 3.28MB (3441011) total, 1.01MB (1063183) headers (space savings 97.04%), 176.74KB (180986) data + min max mean sd +/- sd +time for request: 304us 41.57ms 3.28ms 1.63ms 80.98% +time for connect: 831us 1.47ms 1.14ms 181us 70.00% +time to 1st byte: 2.64ms 25.10ms 11.42ms 7.87ms 60.00% +req/s : 5877.32 6303.71 6082.75 168.23 50.00% +``` + + +``` +old decoder: + +finished in 3.83s, 52210.19 req/s, 2.64MB/s +requests: 200000 total, 200000 started, 200000 done, 200000 succeeded, 0 failed, 0 errored, 0 timeout +status codes: 200000 2xx, 0 3xx, 0 4xx, 0 5xx +traffic: 10.11MB (10603670) total, 978.34KB (1001820) headers (space savings 96.40%), 3.62MB (3800000) data + min max mean sd +/- sd +time for request: 1.16ms 18.75ms 3.82ms 1.45ms 88.89% +time for connect: 723us 1.38ms 1.18ms 191us 80.00% +time to 1st byte: 3.45ms 17.72ms 9.00ms 4.95ms 70.00% +req/s : 5221.65 5235.13 5225.05 4.23 90.00% +``` + diff --git a/packages/grpc-js/benchmarks/echo-unary.bin b/packages/grpc-js/benchmarks/echo-unary.bin new file mode 100644 index 0000000000000000000000000000000000000000..5d8a648c16d8b73b69809e2b7e590e8afb546f86 GIT binary patch literal 19 acmZQzU|`_m;wmmF%FIjGElbQ1-~s?42Lwa_ literal 0 HcmV?d00001 diff --git a/packages/grpc-js/benchmarks/helpers/encode.ts b/packages/grpc-js/benchmarks/helpers/encode.ts new file mode 100644 index 000000000..ab4fea5df --- /dev/null +++ b/packages/grpc-js/benchmarks/helpers/encode.ts @@ -0,0 +1,37 @@ +import * as fs from 'node:fs'; +import { resolve } from 'node:path'; +import { echoService } from './utils'; + +/** + * Serialize a message to a length-delimited byte string. + * @param value + * @returns + */ +function serializeMessage(serialize: any, value: any) { + const messageBuffer = serialize(value); + const byteLength = messageBuffer.byteLength; + const output = Buffer.allocUnsafe(byteLength + 5); + /* Note: response compression is currently not supported, so this + * compressed bit is always 0. */ + output.writeUInt8(0, 0); + output.writeUInt32BE(byteLength, 1); + messageBuffer.copy(output, 5); + return output; +} + +const binaryMessage = serializeMessage( + echoService.service.Echo.requestSerialize, + { + value: 'string-val', + value2: 10, + } +); + +console.log( + 'Service %s\nEcho binary bytes: %d, hex: %s', + echoService.service.Echo.path, + binaryMessage.length, + binaryMessage.toString('hex') +); + +fs.writeFileSync(resolve(__dirname, '../echo-unary.bin'), binaryMessage); diff --git a/packages/grpc-js/benchmarks/helpers/utils.ts b/packages/grpc-js/benchmarks/helpers/utils.ts new file mode 100644 index 000000000..1dbf2e92d --- /dev/null +++ b/packages/grpc-js/benchmarks/helpers/utils.ts @@ -0,0 +1,29 @@ +import * as loader from '@grpc/proto-loader'; +import * as path from 'node:path'; + +import { + GrpcObject, + ServiceClientConstructor, + loadPackageDefinition, +} from '../../build/src/make-client'; + +const protoLoaderOptions = { + keepCase: true, + longs: String, + enums: String, + defaults: true, + oneofs: true, +}; + +export function loadProtoFile(file: string): GrpcObject { + const packageDefinition = loader.loadSync(file, protoLoaderOptions); + return loadPackageDefinition(packageDefinition); +} + +const protoFile = path.join( + __dirname, + '../../test/fixtures', + 'echo_service.proto' +); +export const echoService = loadProtoFile(protoFile) + .EchoService as ServiceClientConstructor; diff --git a/packages/grpc-js/benchmarks/server.ts b/packages/grpc-js/benchmarks/server.ts new file mode 100644 index 000000000..69b318398 --- /dev/null +++ b/packages/grpc-js/benchmarks/server.ts @@ -0,0 +1,42 @@ +import { + Server, + ServerUnaryCall, + sendUnaryData, + ServerCredentials, +} from '../build/src/index'; +import { echoService } from './helpers/utils'; + +const serviceImpl = { + echo: (call: ServerUnaryCall, callback: sendUnaryData) => { + callback(null, call.request); + }, +}; + +async function main() { + const server = new Server({ + 'grpc.enable_channelz': 0, + }); + + server.addService(echoService.service, serviceImpl); + + const credentials = ServerCredentials.createInsecure(); + + setInterval( + () => console.log(`RSS: ${process.memoryUsage().rss / 1024 / 1024} MiB`), + 5e3 + ).unref(); + + await new Promise((resolve, reject) => { + server.bindAsync('localhost:9999', credentials, (error, port) => { + if (error) { + reject(error); + return; + } + + console.log('server listening on port %d', port); + resolve(); + }); + }); +} + +main(); diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 3b8ccf2c1..337a797d9 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -2,6 +2,7 @@ "name": "@grpc/grpc-js", "version": "1.10.6", "description": "gRPC Library for Node - pure JS implementation", + "module": "commonjs", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", "main": "build/src/index.js", @@ -20,12 +21,12 @@ "@types/lodash": "^4.14.202", "@types/mocha": "^10.0.6", "@types/ncp": "^2.0.8", - "@types/node": ">=20.11.20", + "@types/node": "^20.12.7", "@types/pify": "^5.0.4", "@types/semver": "^7.5.8", - "@typescript-eslint/eslint-plugin": "^7.1.0", - "@typescript-eslint/parser": "^7.1.0", - "@typescript-eslint/typescript-estree": "^7.1.0", + "@typescript-eslint/eslint-plugin": "^7.7.1", + "@typescript-eslint/parser": "^7.7.1", + "@typescript-eslint/typescript-estree": "^7.7.1", "clang-format": "^1.8.0", "eslint": "^8.42.0", "eslint-config-prettier": "^8.8.0", @@ -40,10 +41,10 @@ "ncp": "^2.0.0", "pify": "^4.0.1", "prettier": "^2.8.8", - "rimraf": "^3.0.2", + "rimraf": "^5.0.5", "semver": "^7.6.0", "ts-node": "^10.9.2", - "typescript": "^5.3.3" + "typescript": "^5.4.5" }, "contributors": [ { @@ -67,7 +68,8 @@ }, "dependencies": { "@grpc/proto-loader": "^0.7.10", - "@js-sdsl/ordered-map": "^4.4.2" + "@js-sdsl/ordered-map": "^4.4.2", + "reusify": "^1.0.4" }, "files": [ "src/**/*.ts", diff --git a/packages/grpc-js/src/server-interceptors.ts b/packages/grpc-js/src/server-interceptors.ts index b62d55108..6a1661462 100644 --- a/packages/grpc-js/src/server-interceptors.ts +++ b/packages/grpc-js/src/server-interceptors.ts @@ -31,7 +31,7 @@ import * as http2 from 'http2'; import { getErrorMessage } from './error'; import * as zlib from 'zlib'; import { promisify } from 'util'; -import { StreamDecoder } from './stream-decoder'; +import { GrpcFrame, decoder } from './stream-decoder'; import { CallEventTracker } from './transport'; import * as logging from './logging'; @@ -44,6 +44,12 @@ function trace(text: string) { logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); } +interface GrpcWriteFrame { + header: Buffer; + message: Buffer; + size: number; +} + export interface ServerMetadataListener { (metadata: Metadata, next: (metadata: Metadata) => void): void; } @@ -478,7 +484,7 @@ type ReadQueueEntryType = 'COMPRESSED' | 'READABLE' | 'HALF_CLOSE'; interface ReadQueueEntry { type: ReadQueueEntryType; - compressedMessage: Buffer | null; + compressedMessage: GrpcFrame | null; parsedMessage: any; } @@ -496,7 +502,7 @@ export class BaseServerInterceptingCall private wantTrailers = false; private cancelNotified = false; private incomingEncoding = 'identity'; - private decoder = new StreamDecoder(); + private decoder = decoder.get(); private readQueue: ReadQueueEntry[] = []; private isReadPending = false; private receivedHalfClose = false; @@ -536,6 +542,9 @@ export class BaseServerInterceptingCall } this.notifyOnCancel(); + + // release current decoder + decoder.release(this.decoder); }); this.stream.on('data', (data: Buffer) => { @@ -632,7 +641,7 @@ export class BaseServerInterceptingCall } this.cancelNotified = true; this.cancelled = true; - process.nextTick(() => { + queueMicrotask(() => { this.listener?.onCancel(); }); if (this.deadlineTimer) { @@ -658,16 +667,19 @@ export class BaseServerInterceptingCall * @param value * @returns */ - private serializeMessage(value: any) { + private serializeMessage(value: any): GrpcWriteFrame { const messageBuffer = this.handler.serialize(value); - const byteLength = messageBuffer.byteLength; - const output = Buffer.allocUnsafe(byteLength + 5); - /* Note: response compression is currently not supported, so this - * compressed bit is always 0. */ - output.writeUInt8(0, 0); - output.writeUInt32BE(byteLength, 1); - messageBuffer.copy(output, 5); - return output; + const { byteLength } = messageBuffer; + + const buffer = Buffer.allocUnsafe(5); + buffer.writeUint8(0, 0); + buffer.writeUint32BE(byteLength, 1); + + return { + size: byteLength, + header: buffer, + message: messageBuffer, + }; } private decompressMessage( @@ -676,11 +688,11 @@ export class BaseServerInterceptingCall ): Buffer | Promise { switch (encoding) { case 'deflate': - return inflate(message.subarray(5)); + return inflate(message); case 'gzip': - return unzip(message.subarray(5)); + return unzip(message); case 'identity': - return message.subarray(5); + return message; default: return Promise.reject({ code: Status.UNIMPLEMENTED, @@ -694,14 +706,16 @@ export class BaseServerInterceptingCall throw new Error(`Invalid queue entry type: ${queueEntry.type}`); } - const compressed = queueEntry.compressedMessage!.readUInt8(0) === 1; + const compressed = queueEntry.compressedMessage!.compressed === 1; const compressedMessageEncoding = compressed ? this.incomingEncoding : 'identity'; - const decompressedMessage = await this.decompressMessage( - queueEntry.compressedMessage!, - compressedMessageEncoding - ); + const decompressedMessage = compressed + ? await this.decompressMessage( + queueEntry.compressedMessage!.message, + compressedMessageEncoding + ) + : queueEntry.compressedMessage!.message; try { queueEntry.parsedMessage = this.handler.deserialize(decompressedMessage); } catch (err) { @@ -743,26 +757,27 @@ export class BaseServerInterceptingCall ' received data frame of size ' + data.length ); - const rawMessages = this.decoder.write(data); + const rawMessages: GrpcFrame[] = this.decoder.write(data); - for (const messageBytes of rawMessages) { + if (rawMessages.length > 0) { this.stream.pause(); + } + + for (const message of rawMessages) { if ( this.maxReceiveMessageSize !== -1 && - messageBytes.length - 5 > this.maxReceiveMessageSize + message.size > this.maxReceiveMessageSize ) { this.sendStatus({ code: Status.RESOURCE_EXHAUSTED, - details: `Received message larger than max (${ - messageBytes.length - 5 - } vs. ${this.maxReceiveMessageSize})`, + details: `Received message larger than max (${message.size} vs. ${this.maxReceiveMessageSize})`, metadata: null, }); return; } const queueEntry: ReadQueueEntry = { type: 'COMPRESSED', - compressedMessage: messageBytes, + compressedMessage: message, parsedMessage: null, }; this.readQueue.push(queueEntry); @@ -809,7 +824,7 @@ export class BaseServerInterceptingCall if (this.checkCancelled()) { return; } - let response: Buffer; + let response: GrpcWriteFrame; try { response = this.serializeMessage(message); } catch (e) { @@ -823,11 +838,11 @@ export class BaseServerInterceptingCall if ( this.maxSendMessageSize !== -1 && - response.length - 5 > this.maxSendMessageSize + response.size > this.maxSendMessageSize ) { this.sendStatus({ code: Status.RESOURCE_EXHAUSTED, - details: `Sent message larger than max (${response.length} vs. ${this.maxSendMessageSize})`, + details: `Sent message larger than max (${response.size} vs. ${this.maxSendMessageSize})`, metadata: null, }); return; @@ -837,9 +852,12 @@ export class BaseServerInterceptingCall 'Request to ' + this.handler.path + ' sent data frame of size ' + - response.length + response.size ); - this.stream.write(response, error => { + const { stream } = this; + stream.cork(); + stream.write(response.header); + stream.write(response.message, error => { if (error) { this.sendStatus({ code: Status.INTERNAL, @@ -851,6 +869,7 @@ export class BaseServerInterceptingCall this.callEventTracker?.addMessageSent(); callback(); }); + stream.uncork(); } sendStatus(status: PartialStatusObject): void { if (this.checkCancelled()) { diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index feb511b41..e988f2ce4 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -1057,6 +1057,8 @@ export class Server { /** * @deprecated No longer needed as of version 1.10.x */ + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore @deprecate( 'Calling start() is no longer necessary. It can be safely omitted.' ) diff --git a/packages/grpc-js/src/stream-decoder.ts b/packages/grpc-js/src/stream-decoder.ts index 671ad41ae..de4cb0a6b 100644 --- a/packages/grpc-js/src/stream-decoder.ts +++ b/packages/grpc-js/src/stream-decoder.ts @@ -15,12 +15,21 @@ * */ -enum ReadState { +// @ts-expect-error no types +import * as reusify from 'reusify'; + +const enum ReadState { NO_DATA, READING_SIZE, READING_MESSAGE, } +export interface GrpcFrame { + compressed: number; + size: number; + message: Buffer; +} + export class StreamDecoder { private readState: ReadState = ReadState.NO_DATA; private readCompressFlag: Buffer = Buffer.alloc(1); @@ -103,3 +112,143 @@ export class StreamDecoder { return result; } } + +const kMessageSizeBytes = 4 as const; +const kEmptyMessage = Buffer.alloc(0); + +interface StreamDecoder2 { + next: StreamDecoder2 | null; + readState: ReadState; + readCompressFlag: number; + readPartialSize: Buffer; + readSizeRemaining: number; + readMessageSize: number; + readPartialMessage: Buffer | null; + readMessageRemaining: number; + + write(data: Buffer): GrpcFrame[]; +} + +function StreamDecoder2(this: StreamDecoder2) { + // reusify reference + this.next = null; + + // internal state + this.readState = ReadState.NO_DATA; + this.readCompressFlag = 0; + this.readPartialSize = Buffer.alloc(kMessageSizeBytes); + this.readSizeRemaining = kMessageSizeBytes; + this.readMessageSize = 0; + this.readPartialMessage = null; + this.readMessageRemaining = 0; + + // eslint-disable-next-line @typescript-eslint/no-this-alias + const that = this; + + this.write = function decodeInputBufferStream( + this: undefined, + data: Buffer + ): GrpcFrame[] { + let readHead = 0; + let toRead = 0; + const result: GrpcFrame[] = []; + const len = data.length; + + while (readHead < len) { + const { readState } = that; + if (readState === ReadState.NO_DATA) { + that.readCompressFlag = data.readUint8(readHead); + readHead += 1; + that.readState = ReadState.READING_SIZE; + + // size prop + that.readSizeRemaining = kMessageSizeBytes; + + // message body props + that.readMessageSize = 0; + that.readMessageRemaining = 0; + that.readPartialMessage = null; + } else if (readState === ReadState.READING_SIZE) { + let { readSizeRemaining } = that; + toRead = Math.min(len - readHead, readSizeRemaining); + + // we read everything in 1 go + if (toRead === kMessageSizeBytes) { + that.readMessageSize = data.readUInt32BE(readHead); + readSizeRemaining = 0; + } else { + // we only have partial bytes available to us + data.copy( + that.readPartialSize, + kMessageSizeBytes - readSizeRemaining, + readHead, + readHead + toRead + ); + + readSizeRemaining -= toRead; + if (readSizeRemaining === 0) { + that.readMessageSize = that.readPartialSize.readUInt32BE(0); + } + } + + that.readSizeRemaining = readSizeRemaining; + readHead += toRead; + + // readSizeRemaining >=0 here + if (readSizeRemaining === 0) { + if (that.readMessageRemaining > 0) { + that.readState = ReadState.READING_MESSAGE; + that.readMessageRemaining = that.readMessageSize; + } else { + that.readState = ReadState.NO_DATA; + result.push({ + compressed: that.readCompressFlag, + size: 0, + message: kEmptyMessage, + }); + } + } + } else if (readState === ReadState.READING_MESSAGE) { + const { readMessageSize } = that; + let { readMessageRemaining } = that; + toRead = Math.min(len - readHead, readMessageRemaining); + + if (toRead === readMessageRemaining) { + that.readPartialMessage = data.subarray(readHead, readHead + toRead); + } else { + if (that.readPartialMessage === null) { + that.readPartialMessage = Buffer.allocUnsafe(readMessageSize); + } + + data.copy( + that.readPartialMessage!, + readMessageSize - readMessageRemaining, + readHead, + readHead + toRead + ); + } + + readMessageRemaining -= toRead; + readHead += toRead; + + // readMessageRemaining >=0 here + if (readMessageRemaining === 0) { + // At that point, we have read a full message + result.push({ + compressed: that.readCompressFlag, + size: readMessageSize, + message: that.readPartialMessage, + }); + + that.readState = ReadState.NO_DATA; + } + } else { + throw new Error('Unexpected read state'); + } + } + + return result; + }; +} + +export const decoder = reusify(StreamDecoder2); diff --git a/packages/grpc-js/test/test-call-propagation.ts b/packages/grpc-js/test/test-call-propagation.ts index 9ede91318..68ce858a1 100644 --- a/packages/grpc-js/test/test-call-propagation.ts +++ b/packages/grpc-js/test/test-call-propagation.ts @@ -32,7 +32,7 @@ function multiDone(done: () => void, target: number) { }; } -describe('Call propagation', () => { +describe.only('Call propagation', () => { let server: grpc.Server; let Client: ServiceClientConstructor; let client: ServiceClient; @@ -199,8 +199,8 @@ describe('Call propagation', () => { }); }); }); - describe('Deadlines', () => { - it('should work with unary requests', done => { + describe.only('Deadlines', () => { + it.only('should work with unary requests', done => { done = multiDone(done, 2); proxyServer.addService(Client.service, { unary: ( diff --git a/packages/grpc-js/tsconfig.json b/packages/grpc-js/tsconfig.json index 763ceda98..fd224bb93 100644 --- a/packages/grpc-js/tsconfig.json +++ b/packages/grpc-js/tsconfig.json @@ -10,6 +10,7 @@ "pretty": true, "sourceMap": true, "strict": true, + "alwaysStrict": true, "lib": ["es2017"], "outDir": "build", "target": "es2017", diff --git a/packages/grpc-js/tsconfig.modern.json b/packages/grpc-js/tsconfig.modern.json new file mode 100644 index 000000000..7937cfbb0 --- /dev/null +++ b/packages/grpc-js/tsconfig.modern.json @@ -0,0 +1,8 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "target": "ESNext", + "experimentalDecorators": true, + "emitDecoratorMetadata": true + } +} From 26bd140af130848e92a0b9dbf9bf0596e78fa408 Mon Sep 17 00:00:00 2001 From: AVVS Date: Mon, 29 Apr 2024 19:23:03 -0700 Subject: [PATCH 2/9] fix: logic issues in the decoder --- packages/grpc-js/src/server-interceptors.ts | 14 +++++--------- packages/grpc-js/src/stream-decoder.ts | 8 +++++--- packages/grpc-js/test/test-call-propagation.ts | 6 +++--- 3 files changed, 13 insertions(+), 15 deletions(-) diff --git a/packages/grpc-js/src/server-interceptors.ts b/packages/grpc-js/src/server-interceptors.ts index 6a1661462..6445c1b8d 100644 --- a/packages/grpc-js/src/server-interceptors.ts +++ b/packages/grpc-js/src/server-interceptors.ts @@ -706,16 +706,12 @@ export class BaseServerInterceptingCall throw new Error(`Invalid queue entry type: ${queueEntry.type}`); } - const compressed = queueEntry.compressedMessage!.compressed === 1; - const compressedMessageEncoding = compressed - ? this.incomingEncoding - : 'identity'; + const msg = queueEntry.compressedMessage!; + const compressed = msg!.compressed === 1; const decompressedMessage = compressed - ? await this.decompressMessage( - queueEntry.compressedMessage!.message, - compressedMessageEncoding - ) - : queueEntry.compressedMessage!.message; + ? await this.decompressMessage(msg.message, this.incomingEncoding) + : msg.message; + try { queueEntry.parsedMessage = this.handler.deserialize(decompressedMessage); } catch (err) { diff --git a/packages/grpc-js/src/stream-decoder.ts b/packages/grpc-js/src/stream-decoder.ts index de4cb0a6b..26375fd5d 100644 --- a/packages/grpc-js/src/stream-decoder.ts +++ b/packages/grpc-js/src/stream-decoder.ts @@ -196,7 +196,7 @@ function StreamDecoder2(this: StreamDecoder2) { // readSizeRemaining >=0 here if (readSizeRemaining === 0) { - if (that.readMessageRemaining > 0) { + if (that.readMessageSize > 0) { that.readState = ReadState.READING_MESSAGE; that.readMessageRemaining = that.readMessageSize; } else { @@ -213,7 +213,7 @@ function StreamDecoder2(this: StreamDecoder2) { let { readMessageRemaining } = that; toRead = Math.min(len - readHead, readMessageRemaining); - if (toRead === readMessageRemaining) { + if (toRead === readMessageSize) { that.readPartialMessage = data.subarray(readHead, readHead + toRead); } else { if (that.readPartialMessage === null) { @@ -221,7 +221,7 @@ function StreamDecoder2(this: StreamDecoder2) { } data.copy( - that.readPartialMessage!, + that.readPartialMessage, readMessageSize - readMessageRemaining, readHead, readHead + toRead @@ -241,6 +241,8 @@ function StreamDecoder2(this: StreamDecoder2) { }); that.readState = ReadState.NO_DATA; + } else { + that.readMessageRemaining = readMessageRemaining; } } else { throw new Error('Unexpected read state'); diff --git a/packages/grpc-js/test/test-call-propagation.ts b/packages/grpc-js/test/test-call-propagation.ts index 68ce858a1..9ede91318 100644 --- a/packages/grpc-js/test/test-call-propagation.ts +++ b/packages/grpc-js/test/test-call-propagation.ts @@ -32,7 +32,7 @@ function multiDone(done: () => void, target: number) { }; } -describe.only('Call propagation', () => { +describe('Call propagation', () => { let server: grpc.Server; let Client: ServiceClientConstructor; let client: ServiceClient; @@ -199,8 +199,8 @@ describe.only('Call propagation', () => { }); }); }); - describe.only('Deadlines', () => { - it.only('should work with unary requests', done => { + describe('Deadlines', () => { + it('should work with unary requests', done => { done = multiDone(done, 2); proxyServer.addService(Client.service, { unary: ( From 791890f268d2e61769bf302e192f693477a05191 Mon Sep 17 00:00:00 2001 From: AVVS Date: Mon, 29 Apr 2024 21:01:16 -0700 Subject: [PATCH 3/9] feat: use new decoder everywhere --- packages/grpc-js/src/server-interceptors.ts | 71 ++++++-------- packages/grpc-js/src/stream-decoder.ts | 103 +++----------------- packages/grpc-js/src/subchannel-call.ts | 17 +++- 3 files changed, 56 insertions(+), 135 deletions(-) diff --git a/packages/grpc-js/src/server-interceptors.ts b/packages/grpc-js/src/server-interceptors.ts index 6445c1b8d..6a47c5c67 100644 --- a/packages/grpc-js/src/server-interceptors.ts +++ b/packages/grpc-js/src/server-interceptors.ts @@ -44,11 +44,7 @@ function trace(text: string) { logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); } -interface GrpcWriteFrame { - header: Buffer; - message: Buffer; - size: number; -} +type GrpcWriteFrame = [header: Buffer, message: Buffer]; export interface ServerMetadataListener { (metadata: Metadata, next: (metadata: Metadata) => void): void; @@ -671,34 +667,29 @@ export class BaseServerInterceptingCall const messageBuffer = this.handler.serialize(value); const { byteLength } = messageBuffer; - const buffer = Buffer.allocUnsafe(5); - buffer.writeUint8(0, 0); - buffer.writeUint32BE(byteLength, 1); + const header = Buffer.allocUnsafe(5); + header.writeUint8(0, 0); + header.writeUint32BE(byteLength, 1); - return { - size: byteLength, - header: buffer, - message: messageBuffer, - }; + return [header, messageBuffer]; } private decompressMessage( message: Buffer, encoding: string - ): Buffer | Promise { - switch (encoding) { - case 'deflate': - return inflate(message); - case 'gzip': - return unzip(message); - case 'identity': - return message; - default: - return Promise.reject({ - code: Status.UNIMPLEMENTED, - details: `Received message compressed with unsupported encoding "${encoding}"`, - }); + ): Promise { + if (encoding === 'deflate') { + return inflate(message); + } + + if (encoding === 'gzip') { + return unzip(message); } + + throw { + code: Status.UNIMPLEMENTED, + details: `Received message compressed with unsupported encoding "${encoding}"`, + }; } private async decompressAndMaybePush(queueEntry: ReadQueueEntry) { @@ -708,19 +699,21 @@ export class BaseServerInterceptingCall const msg = queueEntry.compressedMessage!; const compressed = msg!.compressed === 1; - const decompressedMessage = compressed - ? await this.decompressMessage(msg.message, this.incomingEncoding) - : msg.message; try { + const decompressedMessage = compressed + ? await this.decompressMessage(msg.message, this.incomingEncoding) + : msg.message; + queueEntry.parsedMessage = this.handler.deserialize(decompressedMessage); - } catch (err) { + } catch (err: any) { this.sendStatus({ - code: Status.INTERNAL, - details: `Error deserializing request: ${(err as Error).message}`, + code: err.code || Status.INTERNAL, + details: err.details || `Error deserializing request: ${err.message}`, }); return; } + queueEntry.type = 'READABLE'; this.maybePushNextMessage(); } @@ -834,11 +827,11 @@ export class BaseServerInterceptingCall if ( this.maxSendMessageSize !== -1 && - response.size > this.maxSendMessageSize + response[1].length > this.maxSendMessageSize ) { this.sendStatus({ code: Status.RESOURCE_EXHAUSTED, - details: `Sent message larger than max (${response.size} vs. ${this.maxSendMessageSize})`, + details: `Sent message larger than max (${response[1].length} vs. ${this.maxSendMessageSize})`, metadata: null, }); return; @@ -848,12 +841,13 @@ export class BaseServerInterceptingCall 'Request to ' + this.handler.path + ' sent data frame of size ' + - response.size + response[1].length ); const { stream } = this; - stream.cork(); - stream.write(response.header); - stream.write(response.message, error => { + + // TODO: measure cork() / uncork() ? + stream.write(response[0]); + stream.write(response[1], error => { if (error) { this.sendStatus({ code: Status.INTERNAL, @@ -865,7 +859,6 @@ export class BaseServerInterceptingCall this.callEventTracker?.addMessageSent(); callback(); }); - stream.uncork(); } sendStatus(status: PartialStatusObject): void { if (this.checkCancelled()) { diff --git a/packages/grpc-js/src/stream-decoder.ts b/packages/grpc-js/src/stream-decoder.ts index 26375fd5d..6af39d5b2 100644 --- a/packages/grpc-js/src/stream-decoder.ts +++ b/packages/grpc-js/src/stream-decoder.ts @@ -17,107 +17,23 @@ // @ts-expect-error no types import * as reusify from 'reusify'; - -const enum ReadState { - NO_DATA, - READING_SIZE, - READING_MESSAGE, -} - export interface GrpcFrame { compressed: number; size: number; message: Buffer; } -export class StreamDecoder { - private readState: ReadState = ReadState.NO_DATA; - private readCompressFlag: Buffer = Buffer.alloc(1); - private readPartialSize: Buffer = Buffer.alloc(4); - private readSizeRemaining = 4; - private readMessageSize = 0; - private readPartialMessage: Buffer[] = []; - private readMessageRemaining = 0; - - write(data: Buffer): Buffer[] { - let readHead = 0; - let toRead: number; - const result: Buffer[] = []; - - while (readHead < data.length) { - switch (this.readState) { - case ReadState.NO_DATA: - this.readCompressFlag = data.slice(readHead, readHead + 1); - readHead += 1; - this.readState = ReadState.READING_SIZE; - this.readPartialSize.fill(0); - this.readSizeRemaining = 4; - this.readMessageSize = 0; - this.readMessageRemaining = 0; - this.readPartialMessage = []; - break; - case ReadState.READING_SIZE: - toRead = Math.min(data.length - readHead, this.readSizeRemaining); - data.copy( - this.readPartialSize, - 4 - this.readSizeRemaining, - readHead, - readHead + toRead - ); - this.readSizeRemaining -= toRead; - readHead += toRead; - // readSizeRemaining >=0 here - if (this.readSizeRemaining === 0) { - this.readMessageSize = this.readPartialSize.readUInt32BE(0); - this.readMessageRemaining = this.readMessageSize; - if (this.readMessageRemaining > 0) { - this.readState = ReadState.READING_MESSAGE; - } else { - const message = Buffer.concat( - [this.readCompressFlag, this.readPartialSize], - 5 - ); - - this.readState = ReadState.NO_DATA; - result.push(message); - } - } - break; - case ReadState.READING_MESSAGE: - toRead = Math.min(data.length - readHead, this.readMessageRemaining); - this.readPartialMessage.push(data.slice(readHead, readHead + toRead)); - this.readMessageRemaining -= toRead; - readHead += toRead; - // readMessageRemaining >=0 here - if (this.readMessageRemaining === 0) { - // At this point, we have read a full message - const framedMessageBuffers = [ - this.readCompressFlag, - this.readPartialSize, - ].concat(this.readPartialMessage); - const framedMessage = Buffer.concat( - framedMessageBuffers, - this.readMessageSize + 5 - ); - - this.readState = ReadState.NO_DATA; - result.push(framedMessage); - } - break; - default: - throw new Error('Unexpected read state'); - } - } - - return result; - } +const enum ReadState { + NO_DATA, + READING_SIZE, + READING_MESSAGE, } const kMessageSizeBytes = 4 as const; const kEmptyMessage = Buffer.alloc(0); -interface StreamDecoder2 { - next: StreamDecoder2 | null; +interface StreamDecoder { + next: StreamDecoder | null; readState: ReadState; readCompressFlag: number; readPartialSize: Buffer; @@ -129,7 +45,7 @@ interface StreamDecoder2 { write(data: Buffer): GrpcFrame[]; } -function StreamDecoder2(this: StreamDecoder2) { +function StreamDecoder(this: StreamDecoder) { // reusify reference this.next = null; @@ -253,4 +169,7 @@ function StreamDecoder2(this: StreamDecoder2) { }; } -export const decoder = reusify(StreamDecoder2); +export const decoder = reusify(StreamDecoder) as { + get(): StreamDecoder; + release(decoder: StreamDecoder): void; +}; diff --git a/packages/grpc-js/src/subchannel-call.ts b/packages/grpc-js/src/subchannel-call.ts index d54a6bcbf..6befa79d5 100644 --- a/packages/grpc-js/src/subchannel-call.ts +++ b/packages/grpc-js/src/subchannel-call.ts @@ -20,7 +20,7 @@ import * as os from 'os'; import { Status } from './constants'; import { Metadata } from './metadata'; -import { StreamDecoder } from './stream-decoder'; +import { decoder } from './stream-decoder'; import * as logging from './logging'; import { LogVerbosity } from './constants'; import { @@ -83,7 +83,7 @@ export interface SubchannelCallInterceptingListener } export class Http2SubchannelCall implements SubchannelCall { - private decoder = new StreamDecoder(); + private decoder = decoder.get(); private isReadFilterPending = false; private isPushPending = false; @@ -175,9 +175,16 @@ export class Http2SubchannelCall implements SubchannelCall { const messages = this.decoder.write(data); for (const message of messages) { - this.trace('parsed message of length ' + message.length); + this.trace('parsed message of length ' + message.size); this.callEventTracker!.addMessageReceived(); - this.tryPush(message); + + // TODO: a teach all the client-side interceptors to work with decoded GrpcFrames + const messageBytes = Buffer.allocUnsafe(message.size + 5); + messageBytes.writeUint8(message.compressed, 0); + messageBytes.writeUint32BE(message.size, 1); + message.message.copy(messageBytes, 5); + + this.tryPush(messageBytes); } }); http2Stream.on('end', () => { @@ -186,6 +193,8 @@ export class Http2SubchannelCall implements SubchannelCall { }); http2Stream.on('close', () => { this.serverEndedCall = true; + decoder.release(this.decoder); + /* Use process.next tick to ensure that this code happens after any * "error" event that may be emitted at about the same time, so that * we can bubble up the error message from that event. */ From 4b64cb680eb5edcf52bff4563c43a8604027a27b Mon Sep 17 00:00:00 2001 From: AVVS Date: Tue, 30 Apr 2024 16:29:20 -0700 Subject: [PATCH 4/9] feat: work on benchmarks, improved metadata, stream-decoder --- packages/grpc-js/.gitignore | 1 + packages/grpc-js/benchmarks/README.md | 4 + packages/grpc-js/benchmarks/bench/metadata.js | 68 +++++ .../benchmarks/bench/stream-decoder.js | 108 ++++++++ packages/grpc-js/benchmarks/common.js | 100 +++++++ .../helpers/{encode.ts => encode.js} | 26 +- packages/grpc-js/benchmarks/helpers/utils.js | 28 ++ packages/grpc-js/benchmarks/helpers/utils.ts | 29 --- packages/grpc-js/benchmarks/markdown.js | 27 ++ packages/grpc-js/benchmarks/package.json | 20 ++ .../benchmarks/{server.ts => server-old.js} | 14 +- packages/grpc-js/benchmarks/server.js | 38 +++ packages/grpc-js/package.json | 2 +- packages/grpc-js/src/metadata.ts | 246 +++++++++++++----- packages/grpc-js/src/stream-decoder.ts | 2 +- packages/grpc-js/test/test-metadata.ts | 36 +-- packages/grpc-js/tsconfig.json | 3 +- 17 files changed, 620 insertions(+), 132 deletions(-) create mode 100644 packages/grpc-js/.gitignore create mode 100644 packages/grpc-js/benchmarks/bench/metadata.js create mode 100644 packages/grpc-js/benchmarks/bench/stream-decoder.js create mode 100644 packages/grpc-js/benchmarks/common.js rename packages/grpc-js/benchmarks/helpers/{encode.ts => encode.js} (55%) create mode 100644 packages/grpc-js/benchmarks/helpers/utils.js delete mode 100644 packages/grpc-js/benchmarks/helpers/utils.ts create mode 100644 packages/grpc-js/benchmarks/markdown.js create mode 100644 packages/grpc-js/benchmarks/package.json rename packages/grpc-js/benchmarks/{server.ts => server-old.js} (69%) create mode 100644 packages/grpc-js/benchmarks/server.js diff --git a/packages/grpc-js/.gitignore b/packages/grpc-js/.gitignore new file mode 100644 index 000000000..ca235a431 --- /dev/null +++ b/packages/grpc-js/.gitignore @@ -0,0 +1 @@ +.clinic diff --git a/packages/grpc-js/benchmarks/README.md b/packages/grpc-js/benchmarks/README.md index a3fe7a85f..49aa16b2e 100644 --- a/packages/grpc-js/benchmarks/README.md +++ b/packages/grpc-js/benchmarks/README.md @@ -10,6 +10,10 @@ This folder contains basic benchmarks to approximate performance impact of chang For mac os: `DYLD_INSERT_LIBRARIES=$(brew --prefix jemalloc)/lib/libjemalloc.dylib pnpm ts-node --transpile-only ./benchmarks/server.ts` +`DYLD_INSERT_LIBRARIES=$(brew --prefix jemalloc)/lib/libjemalloc.dylib NODE_ENV=production clinic flame -- node -r ts-node/register/transpile-only ./benchmarks/server.ts` + +`DYLD_INSERT_LIBRARIES=$(brew --prefix jemalloc)/lib/libjemalloc.dylib NODE_ENV=production node -r ts-node/register/transpile-only --trace-opt --trace-deopt ./benchmarks/server.ts` + 2. h2load -n200000 -m 50 http://localhost:9999/EchoService/Echo -c10 -t 10 -H 'content-type: application/grpc' -d ./echo-unary.bin Baseline on M1 Max Laptop: diff --git a/packages/grpc-js/benchmarks/bench/metadata.js b/packages/grpc-js/benchmarks/bench/metadata.js new file mode 100644 index 000000000..8277ca058 --- /dev/null +++ b/packages/grpc-js/benchmarks/bench/metadata.js @@ -0,0 +1,68 @@ +const { createBenchmarkSuite } = require('../common'); +const { + sensitiveHeaders, + constants: { + HTTP2_HEADER_ACCEPT_ENCODING, + HTTP2_HEADER_TE, + HTTP2_HEADER_CONTENT_TYPE, + }, +} = require('node:http2'); +const { + Metadata: MetadataOriginal, +} = require('@grpc/grpc-js/build/src/metadata'); +const { Metadata } = require('../../build/src/metadata'); + +const suite = createBenchmarkSuite('Metadata'); + +const GRPC_ACCEPT_ENCODING_HEADER = 'grpc-accept-encoding'; +const GRPC_ENCODING_HEADER = 'grpc-encoding'; +const GRPC_TIMEOUT_HEADER = 'grpc-timeout'; +const headers = Object.setPrototypeOf( + { + ':path': '/EchoService/Echo', + ':scheme': 'http', + ':authority': 'localhost:9999', + ':method': 'POST', + 'user-agent': 'h2load nghttp2/1.58.0', + 'content-type': 'application/grpc', + 'content-length': '19', + [sensitiveHeaders]: [], + }, + null +); + +const ogMeta = MetadataOriginal.fromHttp2Headers(headers); +const currentMeta = Metadata.fromHttp2Headers(headers); + +suite + .add('grpc-js@1.0.6 fromHttp2Headers', function () { + return MetadataOriginal.fromHttp2Headers(headers); + }) + .add('grpc-js@1.0.6 toHttp2Headers', function () { + return ogMeta.toHttp2Headers(); + }) + .add('grpc-js@1.0.6 fromHttp2Headers + common operations', function () { + const metadata = MetadataOriginal.fromHttp2Headers(headers); + metadata.remove(GRPC_TIMEOUT_HEADER); + metadata.remove(GRPC_ENCODING_HEADER); + metadata.remove(GRPC_ACCEPT_ENCODING_HEADER); + metadata.remove(HTTP2_HEADER_ACCEPT_ENCODING); + metadata.remove(HTTP2_HEADER_TE); + metadata.remove(HTTP2_HEADER_CONTENT_TYPE); + }) + .add('current fromHttp2Headers', function () { + return Metadata.fromHttp2Headers(headers); + }) + .add('current toHttp2Headers', function () { + return currentMeta.toHttp2Headers(); + }) + .add('current + common operations', function () { + const metadata = Metadata.fromHttp2Headers(headers); + metadata.remove(GRPC_TIMEOUT_HEADER); + metadata.remove(GRPC_ENCODING_HEADER); + metadata.remove(GRPC_ACCEPT_ENCODING_HEADER); + metadata.remove(HTTP2_HEADER_ACCEPT_ENCODING); + metadata.remove(HTTP2_HEADER_TE); + metadata.remove(HTTP2_HEADER_CONTENT_TYPE); + }) + .run({ async: false }); diff --git a/packages/grpc-js/benchmarks/bench/stream-decoder.js b/packages/grpc-js/benchmarks/bench/stream-decoder.js new file mode 100644 index 000000000..1d54150e5 --- /dev/null +++ b/packages/grpc-js/benchmarks/bench/stream-decoder.js @@ -0,0 +1,108 @@ +const { createBenchmarkSuite } = require('../common'); +const { serializeMessage } = require('../helpers/encode'); +const { echoService } = require('../helpers/utils'); +const { + StreamDecoder: OGStreamDecoder, +} = require('@grpc/grpc-js/build/src/stream-decoder'); +const { + StreamDecoder: NewStreamDecoder, +} = require('../../build/src/stream-decoder'); + +const suite = createBenchmarkSuite('Stream Decoder'); + +const smallBinary = serializeMessage( + echoService.service.Echo.requestSerialize, + { + value: 'string-val', + value2: 10, + } +); + +const smallBinarySplitPartOne = Buffer.from(smallBinary.subarray(0, 3)); +const smallBinarySplitPartTwo = Buffer.from(smallBinary.subarray(3, 5)); +const smallBinarySplitPartThree = Buffer.from(smallBinary.subarray(5)); + +const largeBinary = serializeMessage( + echoService.service.Echo.requestSerialize, + { + value: 'a'.repeat(2 ** 16), + value2: 12803182109, + } +); + +const largeBinarySplitPartOne = Buffer.from(largeBinary.subarray(0, 4096)); +const largeBinarySplitPartTwo = Buffer.from(largeBinary.subarray(4096)); + +const cachedSD2 = new NewStreamDecoder(); +const cachedOG = new OGStreamDecoder(); + +suite + .add('original stream decoder', function () { + const decoder = new OGStreamDecoder(); + decoder.write(smallBinary); + }) + .add('original stream decoder cached', function () { + cachedOG.write(smallBinary); + }) + .add('stream decoder v2', function () { + const decoder = new NewStreamDecoder(); + decoder.write(smallBinary); + }) + .add('stream decoder v2 cached', function () { + cachedSD2.write(smallBinary); + }) + .add('original stream decoder - large', function () { + const decoder = new OGStreamDecoder(); + decoder.write(largeBinary); + }) + .add('original stream decoder cached - large', function () { + cachedOG.write(largeBinary); + }) + .add('stream decoder v2 - large', function () { + const decoder = new NewStreamDecoder(); + decoder.write(largeBinary); + }) + .add('stream decoder v2 cached - large', function () { + cachedSD2.write(largeBinary); + }) + .add('original stream decoder - small split', function () { + const decoder = new OGStreamDecoder(); + decoder.write(smallBinarySplitPartOne); + decoder.write(smallBinarySplitPartTwo); + decoder.write(smallBinarySplitPartThree); + }) + .add('original stream decoder cached - small split', function () { + cachedOG.write(smallBinarySplitPartOne); + cachedOG.write(smallBinarySplitPartTwo); + cachedOG.write(smallBinarySplitPartThree); + }) + .add('stream decoder v2 - small split', function () { + const decoder = new NewStreamDecoder(); + decoder.write(smallBinarySplitPartOne); + decoder.write(smallBinarySplitPartTwo); + decoder.write(smallBinarySplitPartThree); + }) + .add('stream decoder v2 cached - small split', function () { + cachedSD2.write(smallBinarySplitPartOne); + cachedSD2.write(smallBinarySplitPartTwo); + cachedSD2.write(smallBinarySplitPartThree); + }) + .add('original stream decoder - large split', function () { + const decoder = new OGStreamDecoder(); + decoder.write(largeBinarySplitPartOne); + decoder.write(largeBinarySplitPartTwo); + }) + .add('original stream decoder cached - large split', function () { + cachedOG.write(largeBinarySplitPartOne); + cachedOG.write(largeBinarySplitPartTwo); + }) + .add('stream decoder v2 - large split', function () { + const decoder = new NewStreamDecoder(); + decoder.write(largeBinarySplitPartOne); + decoder.write(largeBinarySplitPartTwo); + }) + .add('stream decoder v2 cached - large split', function () { + cachedSD2.write(largeBinarySplitPartOne); + cachedSD2.write(largeBinarySplitPartTwo); + }) + .run({ async: false }); diff --git a/packages/grpc-js/benchmarks/common.js b/packages/grpc-js/benchmarks/common.js new file mode 100644 index 000000000..2928d86a4 --- /dev/null +++ b/packages/grpc-js/benchmarks/common.js @@ -0,0 +1,100 @@ +const Benchmark = require('benchmark'); +const { createTableHeader, H2, eventToMdTable } = require('./markdown'); +const os = require('os'); + +function installMarkdownEmitter( + suite, + name, + tableHeaderColumns = ['name', 'ops/sec', 'samples'] +) { + const tableHeader = createTableHeader(tableHeaderColumns); + + suite + .on('start', function () { + console.log(H2(name)); + console.log(tableHeader); + }) + .on('cycle', function (event) { + console.log(eventToMdTable(event)); + }); +} + +function getMachineInfo() { + return { + platform: os.platform(), + arch: os.arch(), + cpus: os.cpus().length, + totalMemory: os.totalmem() / 1024 ** 3, + }; +} + +function installMarkdownMachineInfo(suite) { + if (!process.env.CI) return; + + const { platform, arch, cpus, totalMemory } = getMachineInfo(); + + const machineInfo = `${platform} ${arch} | ${cpus} vCPUs | ${totalMemory.toFixed( + 1 + )}GB Mem`; + + suite.on('complete', () => { + const writter = process.stdout; + + writter.write('\n\n'); + writter.write('
\n'); + writter.write('Environment'); + writter.write(`\n +* __Machine:__ ${machineInfo} +* __Run:__ ${new Date()} +`); + writter.write('
'); + writter.write('\n\n'); + }); +} + +function installMarkdownHiddenDetailedInfo(suite) { + if (!process.env.CI) return; + + const cycleEvents = []; + + suite + .on('cycle', function (event) { + cycleEvents.push({ + name: event.target.name, + opsSec: event.target.hz, + samples: event.target.cycles, + }); + }) + .on('complete', function () { + const writter = process.stdout; + + writter.write('\n'); + }); +} + +function createBenchmarkSuite( + name, + { tableHeaderColumns = ['name', 'ops/sec', 'samples'] } = {} +) { + const suite = new Benchmark.Suite(); + + installMarkdownEmitter(suite, name, tableHeaderColumns); + installMarkdownMachineInfo(suite); + installMarkdownHiddenDetailedInfo(suite); + + return suite; +} + +module.exports = { + createBenchmarkSuite, + installMarkdownEmitter, + installMarkdownMachineInfo, + installMarkdownHiddenDetailedInfo, +}; diff --git a/packages/grpc-js/benchmarks/helpers/encode.ts b/packages/grpc-js/benchmarks/helpers/encode.js similarity index 55% rename from packages/grpc-js/benchmarks/helpers/encode.ts rename to packages/grpc-js/benchmarks/helpers/encode.js index ab4fea5df..ced6fdc04 100644 --- a/packages/grpc-js/benchmarks/helpers/encode.ts +++ b/packages/grpc-js/benchmarks/helpers/encode.js @@ -1,13 +1,13 @@ -import * as fs from 'node:fs'; -import { resolve } from 'node:path'; -import { echoService } from './utils'; +const fs = require('node:fs'); +const { resolve } = require('node:path'); +const { echoService } = require('./utils'); /** * Serialize a message to a length-delimited byte string. * @param value * @returns */ -function serializeMessage(serialize: any, value: any) { +function serializeMessage(serialize, value) { const messageBuffer = serialize(value); const byteLength = messageBuffer.byteLength; const output = Buffer.allocUnsafe(byteLength + 5); @@ -27,11 +27,15 @@ const binaryMessage = serializeMessage( } ); -console.log( - 'Service %s\nEcho binary bytes: %d, hex: %s', - echoService.service.Echo.path, - binaryMessage.length, - binaryMessage.toString('hex') -); +if (require.main === module) { + console.log( + 'Service %s\nEcho binary bytes: %d, hex: %s', + echoService.service.Echo.path, + binaryMessage.length, + binaryMessage.toString('hex') + ); + + fs.writeFileSync(resolve(__dirname, '../echo-unary.bin'), binaryMessage); +} -fs.writeFileSync(resolve(__dirname, '../echo-unary.bin'), binaryMessage); +exports.serializeMessage = serializeMessage; diff --git a/packages/grpc-js/benchmarks/helpers/utils.js b/packages/grpc-js/benchmarks/helpers/utils.js new file mode 100644 index 000000000..ecca11af4 --- /dev/null +++ b/packages/grpc-js/benchmarks/helpers/utils.js @@ -0,0 +1,28 @@ +const loader = require('@grpc/proto-loader'); +const path = require('node:path'); + +// eslint-disable-next-line node/no-unpublished-import +const { loadPackageDefinition } = require('../../build/src/make-client'); + +const protoLoaderOptions = { + keepCase: true, + longs: String, + enums: String, + defaults: true, + oneofs: true, +}; + +function loadProtoFile(file) { + const packageDefinition = loader.loadSync(file, protoLoaderOptions); + return loadPackageDefinition(packageDefinition); +} + +const protoFile = path.join( + __dirname, + '../../test/fixtures', + 'echo_service.proto' +); +const echoService = loadProtoFile(protoFile).EchoService; + +exports.loadProtoFile = loadProtoFile; +exports.echoService = echoService; diff --git a/packages/grpc-js/benchmarks/helpers/utils.ts b/packages/grpc-js/benchmarks/helpers/utils.ts deleted file mode 100644 index 1dbf2e92d..000000000 --- a/packages/grpc-js/benchmarks/helpers/utils.ts +++ /dev/null @@ -1,29 +0,0 @@ -import * as loader from '@grpc/proto-loader'; -import * as path from 'node:path'; - -import { - GrpcObject, - ServiceClientConstructor, - loadPackageDefinition, -} from '../../build/src/make-client'; - -const protoLoaderOptions = { - keepCase: true, - longs: String, - enums: String, - defaults: true, - oneofs: true, -}; - -export function loadProtoFile(file: string): GrpcObject { - const packageDefinition = loader.loadSync(file, protoLoaderOptions); - return loadPackageDefinition(packageDefinition); -} - -const protoFile = path.join( - __dirname, - '../../test/fixtures', - 'echo_service.proto' -); -export const echoService = loadProtoFile(protoFile) - .EchoService as ServiceClientConstructor; diff --git a/packages/grpc-js/benchmarks/markdown.js b/packages/grpc-js/benchmarks/markdown.js new file mode 100644 index 000000000..3699f5503 --- /dev/null +++ b/packages/grpc-js/benchmarks/markdown.js @@ -0,0 +1,27 @@ +function eventToMdTable(event) { + const { target } = event; + return `|${target.name}|${Math.round(target.hz).toLocaleString()}|${ + target.stats.sample.length + }|`; +} + +function createTableHeader(columns) { + let header = '|'; + let headerSep = '|'; + for (const col of columns) { + header += `${col}|`; + headerSep += '-|'; + } + + return `${header}\n${headerSep}`; +} + +function H2(title) { + return `## ${title}\n`; +} + +module.exports = { + eventToMdTable, + createTableHeader, + H2, +}; diff --git a/packages/grpc-js/benchmarks/package.json b/packages/grpc-js/benchmarks/package.json new file mode 100644 index 000000000..7ff01cce7 --- /dev/null +++ b/packages/grpc-js/benchmarks/package.json @@ -0,0 +1,20 @@ +{ + "name": "benchmarks", + "private": true, + "version": "1.0.0", + "description": "", + "main": "index.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "keywords": [], + "author": "", + "license": "ISC", + "devDependencies": { + "@grpc/grpc-js": "1.10.6" + }, + "dependencies": { + "@grpc/proto-loader": "^0.7.12", + "benchmark": "^2.1.4" + } +} diff --git a/packages/grpc-js/benchmarks/server.ts b/packages/grpc-js/benchmarks/server-old.js similarity index 69% rename from packages/grpc-js/benchmarks/server.ts rename to packages/grpc-js/benchmarks/server-old.js index 69b318398..3bb72b40b 100644 --- a/packages/grpc-js/benchmarks/server.ts +++ b/packages/grpc-js/benchmarks/server-old.js @@ -1,13 +1,9 @@ -import { - Server, - ServerUnaryCall, - sendUnaryData, - ServerCredentials, -} from '../build/src/index'; -import { echoService } from './helpers/utils'; +/* eslint-disable node/no-unpublished-import */ +const { Server, ServerCredentials } = require('@grpc/grpc-js'); +const { echoService } = require('./helpers/utils'); const serviceImpl = { - echo: (call: ServerUnaryCall, callback: sendUnaryData) => { + echo: (call, callback) => { callback(null, call.request); }, }; @@ -26,7 +22,7 @@ async function main() { 5e3 ).unref(); - await new Promise((resolve, reject) => { + await new Promise((resolve, reject) => { server.bindAsync('localhost:9999', credentials, (error, port) => { if (error) { reject(error); diff --git a/packages/grpc-js/benchmarks/server.js b/packages/grpc-js/benchmarks/server.js new file mode 100644 index 000000000..c9e5948c4 --- /dev/null +++ b/packages/grpc-js/benchmarks/server.js @@ -0,0 +1,38 @@ +/* eslint-disable node/no-unpublished-import */ +const { Server, ServerCredentials } = require('../build/src/index'); +const { echoService } = require('./helpers/utils'); + +const serviceImpl = { + echo: (call, callback) => { + callback(null, call.request); + }, +}; + +async function main() { + const server = new Server({ + 'grpc.enable_channelz': 0, + }); + + server.addService(echoService.service, serviceImpl); + + const credentials = ServerCredentials.createInsecure(); + + setInterval( + () => console.log(`RSS: ${process.memoryUsage().rss / 1024 / 1024} MiB`), + 5e3 + ).unref(); + + await new Promise((resolve, reject) => { + server.bindAsync('localhost:9999', credentials, (error, port) => { + if (error) { + reject(error); + return; + } + + console.log('server listening on port %d', port); + resolve(); + }); + }); +} + +main(); diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 337a797d9..5c9b12c69 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -67,7 +67,7 @@ "generate-test-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --include-dirs test/fixtures/ -O test/generated/ --grpcLib ../../src/index test_service.proto" }, "dependencies": { - "@grpc/proto-loader": "^0.7.10", + "@grpc/proto-loader": "^0.7.12", "@js-sdsl/ordered-map": "^4.4.2", "reusify": "^1.0.4" }, diff --git a/packages/grpc-js/src/metadata.ts b/packages/grpc-js/src/metadata.ts index eabd2dff4..f80df4c56 100644 --- a/packages/grpc-js/src/metadata.ts +++ b/packages/grpc-js/src/metadata.ts @@ -19,11 +19,16 @@ import * as http2 from 'http2'; import { log } from './logging'; import { LogVerbosity } from './constants'; import { getErrorMessage } from './error'; + const LEGAL_KEY_REGEX = /^[0-9a-z_.-]+$/; const LEGAL_NON_BINARY_VALUE_REGEX = /^[ -~]*$/; +const { isArray } = Array; +// const { hasOwnProperty } = Object.prototype; export type MetadataValue = string | Buffer; -export type MetadataObject = Map; +export interface MetadataObject { + [key: string]: MetadataValue[] | undefined; +} function isLegalKey(key: string): boolean { return LEGAL_KEY_REGEX.test(key); @@ -33,12 +38,14 @@ function isLegalNonBinaryValue(value: string): boolean { return LEGAL_NON_BINARY_VALUE_REGEX.test(value); } +// https://github.com/RafaelGSS/nodejs-bench-operations/blob/main/RESULTS-v20.md#endswith-comparison function isBinaryKey(key: string): boolean { - return key.endsWith('-bin'); + // return key.endsWith('-bin'); + return key.slice(-4) === '-bin'; } function isCustomMetadata(key: string): boolean { - return !key.startsWith('grpc-'); + return key.slice(0, 5) !== 'grpc-'; } function normalizeKey(key: string): string { @@ -70,6 +77,42 @@ function validate(key: string, value?: MetadataValue): void { } } +function validateString(key: string, value: string): void { + if (!isLegalKey(key)) { + throw new Error('Metadata key "' + key + '" contains illegal characters'); + } + + if (!isLegalNonBinaryValue(value)) { + throw new Error( + 'Metadata string value "' + value + '" contains illegal characters' + ); + } +} + +function validateStrings(key: string, values: string[]): void { + if (!isLegalKey(key)) { + throw new Error('Metadata key "' + key + '" contains illegal characters'); + } + + for (let i = 0; i < values.length; i += 1) { + if (!isLegalNonBinaryValue(values[i])) { + throw new Error( + 'Metadata string value "' + values[i] + '" contains illegal characters' + ); + } + } +} + +function validateBinary(key: string): void { + if (!isLegalKey(key)) { + throw new Error('Metadata key "' + key + '" contains illegal characters'); + } + + if (!isBinaryKey(key)) { + throw new Error("keys that end with '-bin' must have Buffer values"); + } +} + export interface MetadataOptions { /* Signal that the request is idempotent. Defaults to false */ idempotentRequest?: boolean; @@ -83,14 +126,25 @@ export interface MetadataOptions { corked?: boolean; } +function MetadataObject() {} +MetadataObject.prototype = Object.create(null); + /** * A class for storing metadata. Keys are normalized to lowercase ASCII. */ export class Metadata { - protected internalRepr: MetadataObject = new Map(); + // @ts-expect-error - cached object + protected internalRepr: MetadataObject = new MetadataObject(); private options: MetadataOptions; - constructor(options: MetadataOptions = {}) { + constructor( + options: MetadataOptions = { + idempotentRequest: false, + waitForReady: false, + cacheableRequest: false, + corked: false, + } + ) { this.options = options; } @@ -104,7 +158,7 @@ export class Metadata { set(key: string, value: MetadataValue): void { key = normalizeKey(key); validate(key, value); - this.internalRepr.set(key, [value]); + this.internalRepr[key] = [value]; } /** @@ -118,16 +172,35 @@ export class Metadata { key = normalizeKey(key); validate(key, value); - const existingValue: MetadataValue[] | undefined = - this.internalRepr.get(key); + const existingValue: MetadataValue[] | undefined = this.internalRepr[key]; if (existingValue === undefined) { - this.internalRepr.set(key, [value]); + this.internalRepr[key] = [value]; } else { existingValue.push(value); } } + addString(key: string, value: string): void { + validateString(key, value); + this.internalRepr[key] = [value]; + } + + addStrings(key: string, values: string[]): void { + validateStrings(key, values); + this.internalRepr[key] = values; + } + + addBuffer(key: string, value: Buffer): void { + validateBinary(key); + this.internalRepr[key] = [value]; + } + + addBuffers(key: string, values: Buffer[]): void { + validateBinary(key); + this.internalRepr[key] = values; + } + /** * Removes the given key and any associated values. Normalizes the key. * @param key The key whose values should be removed. @@ -135,7 +208,9 @@ export class Metadata { remove(key: string): void { key = normalizeKey(key); // validate(key); - this.internalRepr.delete(key); + if (this.internalRepr[key] !== undefined) { + this.internalRepr[key] = undefined; // expensive, but cheaper in new versions + } } /** @@ -144,9 +219,7 @@ export class Metadata { * @return A list of values associated with the given key. */ get(key: string): MetadataValue[] { - key = normalizeKey(key); - // validate(key); - return this.internalRepr.get(key) || []; + return this.internalRepr[normalizeKey(key)] || []; } /** @@ -156,10 +229,16 @@ export class Metadata { */ getMap(): { [key: string]: MetadataValue } { const result: { [key: string]: MetadataValue } = {}; - - for (const [key, values] of this.internalRepr) { - if (values.length > 0) { - const v = values[0]; + const keys = Object.keys(this.internalRepr); + + let values; + let key; + let v; + for (let i = 0; i < keys.length; i += 1) { + key = keys[i]; + values = this.internalRepr[key]; + if (values !== undefined && values.length > 0) { + v = values[0]; result[key] = Buffer.isBuffer(v) ? Buffer.from(v) : v; } } @@ -173,17 +252,24 @@ export class Metadata { clone(): Metadata { const newMetadata = new Metadata(this.options); const newInternalRepr = newMetadata.internalRepr; + const keys = Object.keys(this.internalRepr); + + let values; + let key; + for (let i = 0; i < keys.length; i += 1) { + key = keys[i]; + values = this.internalRepr[key]; + if (values !== undefined) { + const clonedValue: MetadataValue[] = values.map(v => { + if (Buffer.isBuffer(v)) { + return Buffer.from(v); + } else { + return v; + } + }); - for (const [key, value] of this.internalRepr) { - const clonedValue: MetadataValue[] = value.map(v => { - if (Buffer.isBuffer(v)) { - return Buffer.from(v); - } else { - return v; - } - }); - - newInternalRepr.set(key, clonedValue); + newInternalRepr[key] = clonedValue; + } } return newMetadata; @@ -197,12 +283,19 @@ export class Metadata { * @param other A Metadata object. */ merge(other: Metadata): void { - for (const [key, values] of other.internalRepr) { + const keys = Object.keys(other.internalRepr); + + let values; + let key; + for (let i = 0; i < keys.length; i += 1) { + key = keys[i]; + values = other.internalRepr[key] || []; + const mergedValue: MetadataValue[] = ( - this.internalRepr.get(key) || [] + this.internalRepr[key] || [] ).concat(values); - this.internalRepr.set(key, mergedValue); + this.internalRepr[key] = mergedValue; } } @@ -218,13 +311,15 @@ export class Metadata { * Creates an OutgoingHttpHeaders object that can be used with the http2 API. */ toHttp2Headers(): http2.OutgoingHttpHeaders { - // NOTE: Node <8.9 formats http2 headers incorrectly. - const result: http2.OutgoingHttpHeaders = {}; - - for (const [key, values] of this.internalRepr) { - // We assume that the user's interaction with this object is limited to - // through its public API (i.e. keys and values are already validated). - result[key] = values.map(bufToString); + const o = this.internalRepr; + const result: http2.OutgoingHttpHeaders = Object.create(null); + + for (const k in o) { + const cur = o[k]; + if (cur !== undefined) { + // @ts-expect-error manual buffer conversion + result[k] = isBinaryKey(k) ? cur.map(bufToString) : cur; + } } return result; @@ -236,8 +331,16 @@ export class Metadata { */ toJSON() { const result: { [key: string]: MetadataValue[] } = {}; - for (const [key, values] of this.internalRepr) { - result[key] = values; + const keys = Object.keys(this.internalRepr); + + let values; + let key; + for (let i = 0; i < keys.length; i += 1) { + key = keys[i]; + values = this.internalRepr[key]; + if (values !== undefined) { + result[key] = values; + } } return result; } @@ -249,38 +352,25 @@ export class Metadata { */ static fromHttp2Headers(headers: http2.IncomingHttpHeaders): Metadata { const result = new Metadata(); - for (const key of Object.keys(headers)) { + const keys = Object.keys(headers); + + let key: string; + let values: string | string[] | undefined; + + for (let i = 0; i < keys.length; i += 1) { + key = keys[i]; // Reserved headers (beginning with `:`) are not valid keys. if (key.charAt(0) === ':') { continue; } - const values = headers[key]; + values = headers[key]; + if (values === undefined) { + continue; + } try { - if (isBinaryKey(key)) { - if (Array.isArray(values)) { - values.forEach(value => { - result.add(key, Buffer.from(value, 'base64')); - }); - } else if (values !== undefined) { - if (isCustomMetadata(key)) { - values.split(',').forEach(v => { - result.add(key, Buffer.from(v.trim(), 'base64')); - }); - } else { - result.add(key, Buffer.from(values, 'base64')); - } - } - } else { - if (Array.isArray(values)) { - values.forEach(value => { - result.add(key, value); - }); - } else if (values !== undefined) { - result.add(key, values); - } - } + handleMetadataValue(result, key, values); } catch (error) { const message = `Failed to add metadata entry ${key}: ${values}. ${getErrorMessage( error @@ -293,6 +383,34 @@ export class Metadata { } } +function handleMetadataValue( + result: Metadata, + key: string, + values: string | string[] +): void { + if (isBinaryKey(key)) { + if (isArray(values)) { + result.addBuffers( + key, + values.map(v => Buffer.from(v, 'base64')) + ); + } else if (isCustomMetadata(key)) { + result.addBuffers( + key, + values.split(',').map(val => Buffer.from(val.trim(), 'base64')) + ); + } else { + result.addBuffer(key, Buffer.from(values, 'base64')); + } + } else { + if (isArray(values)) { + result.addStrings(key, values); + } else { + result.addString(key, values); + } + } +} + const bufToString = (val: string | Buffer): string => { return Buffer.isBuffer(val) ? val.toString('base64') : val; }; diff --git a/packages/grpc-js/src/stream-decoder.ts b/packages/grpc-js/src/stream-decoder.ts index 6af39d5b2..c4f10881c 100644 --- a/packages/grpc-js/src/stream-decoder.ts +++ b/packages/grpc-js/src/stream-decoder.ts @@ -45,7 +45,7 @@ interface StreamDecoder { write(data: Buffer): GrpcFrame[]; } -function StreamDecoder(this: StreamDecoder) { +export function StreamDecoder(this: StreamDecoder) { // reusify reference this.next = null; diff --git a/packages/grpc-js/test/test-metadata.ts b/packages/grpc-js/test/test-metadata.ts index 44182ef39..d019fa63f 100644 --- a/packages/grpc-js/test/test-metadata.ts +++ b/packages/grpc-js/test/test-metadata.ts @@ -18,7 +18,7 @@ import * as assert from 'assert'; import * as http2 from 'http2'; import { range } from 'lodash'; -import { Metadata, MetadataObject, MetadataValue } from '../src/metadata'; +import { Metadata, MetadataObject } from '../src/metadata'; class TestMetadata extends Metadata { getInternalRepresentation() { @@ -272,20 +272,26 @@ describe('Metadata', () => { metadata.add('key-bin', Buffer.from(range(16, 32))); metadata.add('key-bin', Buffer.from(range(0, 32))); const headers = metadata.toHttp2Headers(); - assert.deepStrictEqual(headers, { - key1: ['value1'], - key2: ['value2'], - key3: ['value3a', 'value3b'], - 'key-bin': [ - 'AAECAwQFBgcICQoLDA0ODw==', - 'EBESExQVFhcYGRobHB0eHw==', - 'AAECAwQFBgcICQoLDA0ODxAREhMUFRYXGBkaGxwdHh8=', - ], - }); + assert.deepStrictEqual( + headers, + Object.setPrototypeOf( + { + key1: ['value1'], + key2: ['value2'], + key3: ['value3a', 'value3b'], + 'key-bin': [ + 'AAECAwQFBgcICQoLDA0ODw==', + 'EBESExQVFhcYGRobHB0eHw==', + 'AAECAwQFBgcICQoLDA0ODxAREhMUFRYXGBkaGxwdHh8=', + ], + }, + null + ) + ); }); it('creates an empty header object from empty Metadata', () => { - assert.deepStrictEqual(metadata.toHttp2Headers(), {}); + assert.deepStrictEqual(metadata.toHttp2Headers(), Object.create(null)); }); }); @@ -304,7 +310,7 @@ describe('Metadata', () => { }; const metadataFromHeaders = TestMetadata.fromHttp2Headers(headers); const internalRepr = metadataFromHeaders.getInternalRepresentation(); - const expected: MetadataObject = new Map([ + const expected: MetadataObject = Object.fromEntries([ ['key1', ['value1']], ['key2', ['value2']], ['key3', ['value3a', 'value3b']], @@ -318,13 +324,13 @@ describe('Metadata', () => { ], ], ]); - assert.deepStrictEqual(internalRepr, expected); + assert.deepEqual(internalRepr, Object.setPrototypeOf(expected, null)); }); it('creates an empty Metadata object from empty headers', () => { const metadataFromHeaders = TestMetadata.fromHttp2Headers({}); const internalRepr = metadataFromHeaders.getInternalRepresentation(); - assert.deepStrictEqual(internalRepr, new Map()); + assert.deepEqual(internalRepr, Object.create(null)); }); }); }); diff --git a/packages/grpc-js/tsconfig.json b/packages/grpc-js/tsconfig.json index fd224bb93..549205826 100644 --- a/packages/grpc-js/tsconfig.json +++ b/packages/grpc-js/tsconfig.json @@ -11,9 +11,8 @@ "sourceMap": true, "strict": true, "alwaysStrict": true, - "lib": ["es2017"], "outDir": "build", - "target": "es2017", + "target": "ES2022", "module": "commonjs", "resolveJsonModule": true, "incremental": true, From 516479720f17634d6ba47a65a0454532a91124a6 Mon Sep 17 00:00:00 2001 From: AVVS Date: Tue, 30 Apr 2024 17:02:34 -0700 Subject: [PATCH 5/9] chore: add common headers --- packages/grpc-js/benchmarks/bench/metadata.js | 8 +++-- packages/grpc-js/src/metadata.ts | 30 +++++++++---------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/packages/grpc-js/benchmarks/bench/metadata.js b/packages/grpc-js/benchmarks/bench/metadata.js index 8277ca058..1f12ccc2f 100644 --- a/packages/grpc-js/benchmarks/bench/metadata.js +++ b/packages/grpc-js/benchmarks/bench/metadata.js @@ -26,6 +26,8 @@ const headers = Object.setPrototypeOf( 'user-agent': 'h2load nghttp2/1.58.0', 'content-type': 'application/grpc', 'content-length': '19', + [GRPC_ACCEPT_ENCODING_HEADER]: 'identity,deflate,gzip', + [GRPC_ENCODING_HEADER]: 'identity', [sensitiveHeaders]: [], }, null @@ -35,13 +37,13 @@ const ogMeta = MetadataOriginal.fromHttp2Headers(headers); const currentMeta = Metadata.fromHttp2Headers(headers); suite - .add('grpc-js@1.0.6 fromHttp2Headers', function () { + .add('grpc-js@1.10.6 fromHttp2Headers', function () { return MetadataOriginal.fromHttp2Headers(headers); }) - .add('grpc-js@1.0.6 toHttp2Headers', function () { + .add('grpc-js@1.10.6 toHttp2Headers', function () { return ogMeta.toHttp2Headers(); }) - .add('grpc-js@1.0.6 fromHttp2Headers + common operations', function () { + .add('grpc-js@1.10.6 fromHttp2Headers + common operations', function () { const metadata = MetadataOriginal.fromHttp2Headers(headers); metadata.remove(GRPC_TIMEOUT_HEADER); metadata.remove(GRPC_ENCODING_HEADER); diff --git a/packages/grpc-js/src/metadata.ts b/packages/grpc-js/src/metadata.ts index f80df4c56..81d27b45d 100644 --- a/packages/grpc-js/src/metadata.ts +++ b/packages/grpc-js/src/metadata.ts @@ -188,7 +188,7 @@ export class Metadata { addStrings(key: string, values: string[]): void { validateStrings(key, values); - this.internalRepr[key] = values; + this.internalRepr[key] = values.slice(); // shallow copy } addBuffer(key: string, value: Buffer): void { @@ -206,10 +206,11 @@ export class Metadata { * @param key The key whose values should be removed. */ remove(key: string): void { - key = normalizeKey(key); + const k = normalizeKey(key); // validate(key); - if (this.internalRepr[key] !== undefined) { - this.internalRepr[key] = undefined; // expensive, but cheaper in new versions + const { internalRepr } = this; + if (k in internalRepr) { + internalRepr[k] = undefined; // expensive, but cheaper in new versions } } @@ -311,14 +312,13 @@ export class Metadata { * Creates an OutgoingHttpHeaders object that can be used with the http2 API. */ toHttp2Headers(): http2.OutgoingHttpHeaders { - const o = this.internalRepr; const result: http2.OutgoingHttpHeaders = Object.create(null); + const o = this.internalRepr; for (const k in o) { const cur = o[k]; if (cur !== undefined) { - // @ts-expect-error manual buffer conversion - result[k] = isBinaryKey(k) ? cur.map(bufToString) : cur; + result[k] = isBinaryKey(k) ? cur.map(bufToString) : (cur as string[]); } } @@ -390,17 +390,11 @@ function handleMetadataValue( ): void { if (isBinaryKey(key)) { if (isArray(values)) { - result.addBuffers( - key, - values.map(v => Buffer.from(v, 'base64')) - ); + result.addBuffers(key, values.map(toBufferFromBase64)); } else if (isCustomMetadata(key)) { - result.addBuffers( - key, - values.split(',').map(val => Buffer.from(val.trim(), 'base64')) - ); + result.addBuffers(key, values.split(',').map(toBufferFromBase64Trim)); } else { - result.addBuffer(key, Buffer.from(values, 'base64')); + result.addBuffer(key, toBufferFromBase64(values)); } } else { if (isArray(values)) { @@ -414,3 +408,7 @@ function handleMetadataValue( const bufToString = (val: string | Buffer): string => { return Buffer.isBuffer(val) ? val.toString('base64') : val; }; + +const toBufferFromBase64 = (v: string): Buffer => Buffer.from(v, 'base64'); +const toBufferFromBase64Trim = (v: string): Buffer => + Buffer.from(v.trim(), 'base64'); From 5161dfd8a1d8100351fd4bed70142e38a1461b35 Mon Sep 17 00:00:00 2001 From: AVVS Date: Wed, 1 May 2024 07:47:30 -0700 Subject: [PATCH 6/9] fix: slightly improved bench --- .../benchmarks/bench/stream-decoder.js | 148 +++++++++++------- packages/grpc-js/src/stream-decoder.ts | 21 ++- 2 files changed, 101 insertions(+), 68 deletions(-) diff --git a/packages/grpc-js/benchmarks/bench/stream-decoder.js b/packages/grpc-js/benchmarks/bench/stream-decoder.js index 1d54150e5..c0fe87098 100644 --- a/packages/grpc-js/benchmarks/bench/stream-decoder.js +++ b/packages/grpc-js/benchmarks/bench/stream-decoder.js @@ -6,103 +6,131 @@ const { } = require('@grpc/grpc-js/build/src/stream-decoder'); const { StreamDecoder: NewStreamDecoder, + decoder: decoderManager, } = require('../../build/src/stream-decoder'); +const { buffer } = require('stream/consumers'); const suite = createBenchmarkSuite('Stream Decoder'); -const smallBinary = serializeMessage( +const serializedSmallBinary = serializeMessage( echoService.service.Echo.requestSerialize, { value: 'string-val', value2: 10, } ); +const getSmallBinary = () => { + const buf = Buffer.allocUnsafe(serializedSmallBinary.length); + serializedSmallBinary.copy(buf); + return buf; +}; -const smallBinarySplitPartOne = Buffer.from(smallBinary.subarray(0, 3)); -const smallBinarySplitPartTwo = Buffer.from(smallBinary.subarray(3, 5)); -const smallBinarySplitPartThree = Buffer.from(smallBinary.subarray(5)); +const getSmallSplit = () => { + const binary = getSmallBinary(); + return [binary.subarray(0, 3), binary.subarray(3, 5), binary.subarray(5)]; +}; -const largeBinary = serializeMessage( +const largeObj = { + value: 'a'.repeat(2 ** 16), + value2: 12803182109, +}; +const serializedLargeObj = serializeMessage( echoService.service.Echo.requestSerialize, - { - value: 'a'.repeat(2 ** 16), - value2: 12803182109, - } + largeObj ); -const largeBinarySplitPartOne = Buffer.from(largeBinary.subarray(0, 4096)); -const largeBinarySplitPartTwo = Buffer.from(largeBinary.subarray(4096)); +const getLargeBinary = () => { + const buf = Buffer.allocUnsafeSlow(serializedLargeObj.length); + serializedLargeObj.copy(buf); + return buf; +}; + +const getLargeSplit = () => { + const binary = getLargeBinary(); + return [ + binary.subarray(0, Math.ceil(Buffer.poolSize * 0.5)), + binary.subarray(Math.ceil(Buffer.poolSize * 0.5)), + ]; +}; -const cachedSD2 = new NewStreamDecoder(); -const cachedOG = new OGStreamDecoder(); +const originalCached = new OGStreamDecoder(); +const currentCached = decoderManager.get(); suite + // mark -- original decoder, fresh copies .add('original stream decoder', function () { const decoder = new OGStreamDecoder(); - decoder.write(smallBinary); + decoder.write(getSmallBinary()); }) - .add('original stream decoder cached', function () { - cachedOG.write(smallBinary); - }) - .add('stream decoder v2', function () { - const decoder = new NewStreamDecoder(); - decoder.write(smallBinary); - }) - .add('stream decoder v2 cached', function () { - cachedSD2.write(smallBinary); + .add('original stream decoder - small split', function () { + const decoder = new OGStreamDecoder(); + for (const item of getSmallSplit()) { + decoder.write(item); + } }) .add('original stream decoder - large', function () { const decoder = new OGStreamDecoder(); - decoder.write(largeBinary); + decoder.write(getLargeBinary()); }) - .add('original stream decoder cached - large', function () { - cachedOG.write(largeBinary); + .add('original stream decoder - large split', function () { + const decoder = new OGStreamDecoder(); + for (const item of getLargeSplit()) { + decoder.write(item); + } }) - .add('stream decoder v2 - large', function () { - const decoder = new NewStreamDecoder(); - decoder.write(largeBinary); + // original decoder - cached instance + .add('original stream decoder cached', function () { + originalCached.write(getSmallBinary()); }) - .add('stream decoder v2 cached - large', function () { - cachedSD2.write(largeBinary); + .add('original stream decoder cached - small split', function () { + for (const item of getSmallSplit()) { + originalCached.write(item); + } }) - .add('original stream decoder - small split', function () { - const decoder = new OGStreamDecoder(); - decoder.write(smallBinarySplitPartOne); - decoder.write(smallBinarySplitPartTwo); - decoder.write(smallBinarySplitPartThree); + .add('original stream decoder cached - large', function () { + originalCached.write(getLargeBinary()); }) - .add('original stream decoder cached - small split', function () { - cachedOG.write(smallBinarySplitPartOne); - cachedOG.write(smallBinarySplitPartTwo); - cachedOG.write(smallBinarySplitPartThree); + .add('original stream decoder cached - large split', function () { + for (const item of getLargeSplit()) { + originalCached.write(item); + } }) - .add('stream decoder v2 - small split', function () { + // decoder v2 - new instance + .add('stream decoder v2', function () { const decoder = new NewStreamDecoder(); - decoder.write(smallBinarySplitPartOne); - decoder.write(smallBinarySplitPartTwo); - decoder.write(smallBinarySplitPartThree); + decoder.write(getSmallBinary()); }) - .add('stream decoder v2 cached - small split', function () { - cachedSD2.write(smallBinarySplitPartOne); - cachedSD2.write(smallBinarySplitPartTwo); - cachedSD2.write(smallBinarySplitPartThree); - }) - .add('original stream decoder - large split', function () { - const decoder = new OGStreamDecoder(); - decoder.write(largeBinarySplitPartOne); - decoder.write(largeBinarySplitPartTwo); + .add('stream decoder v2 - small split', function () { + const decoder = new NewStreamDecoder(); + for (const item of getSmallSplit()) { + decoder.write(item); + } }) - .add('original stream decoder cached - large split', function () { - cachedOG.write(largeBinarySplitPartOne); - cachedOG.write(largeBinarySplitPartTwo); + .add('stream decoder v2 - large', function () { + const decoder = new NewStreamDecoder(); + decoder.write(getLargeBinary()); }) .add('stream decoder v2 - large split', function () { const decoder = new NewStreamDecoder(); - decoder.write(largeBinarySplitPartOne); - decoder.write(largeBinarySplitPartTwo); + for (const item of getLargeSplit()) { + decoder.write(item); + } + }) + // decoder v2 - cached + .add('stream decoder v2 cached', function () { + currentCached.write(getSmallBinary()); + }) + .add('stream decoder v2 cached - small split', function () { + for (const item of getSmallSplit()) { + currentCached.write(item); + } + }) + .add('stream decoder v2 cached - large', function () { + currentCached.write(getLargeBinary()); }) .add('stream decoder v2 cached - large split', function () { - cachedSD2.write(largeBinarySplitPartOne); - cachedSD2.write(largeBinarySplitPartTwo); + for (const item of getLargeSplit()) { + currentCached.write(item); + } }) .run({ async: false }); diff --git a/packages/grpc-js/src/stream-decoder.ts b/packages/grpc-js/src/stream-decoder.ts index c4f10881c..1d36701cf 100644 --- a/packages/grpc-js/src/stream-decoder.ts +++ b/packages/grpc-js/src/stream-decoder.ts @@ -112,9 +112,18 @@ export function StreamDecoder(this: StreamDecoder) { // readSizeRemaining >=0 here if (readSizeRemaining === 0) { - if (that.readMessageSize > 0) { + const { readMessageSize } = that; + if (readMessageSize > 0) { that.readState = ReadState.READING_MESSAGE; - that.readMessageRemaining = that.readMessageSize; + that.readMessageRemaining = readMessageSize; + + // allocate buffer / partial message array if we don't have all the data yet + if (len - readHead < readMessageSize) { + that.readPartialMessage = + readMessageSize <= Buffer.poolSize * 0.5 + ? Buffer.allocUnsafe(readMessageSize) + : Buffer.allocUnsafeSlow(readMessageSize); + } } else { that.readState = ReadState.NO_DATA; result.push({ @@ -132,12 +141,8 @@ export function StreamDecoder(this: StreamDecoder) { if (toRead === readMessageSize) { that.readPartialMessage = data.subarray(readHead, readHead + toRead); } else { - if (that.readPartialMessage === null) { - that.readPartialMessage = Buffer.allocUnsafe(readMessageSize); - } - data.copy( - that.readPartialMessage, + that.readPartialMessage!, readMessageSize - readMessageRemaining, readHead, readHead + toRead @@ -153,7 +158,7 @@ export function StreamDecoder(this: StreamDecoder) { result.push({ compressed: that.readCompressFlag, size: readMessageSize, - message: that.readPartialMessage, + message: that.readPartialMessage!, }); that.readState = ReadState.NO_DATA; From 73b701acaf4316e45635413a6991a3908a69eb4f Mon Sep 17 00:00:00 2001 From: AVVS Date: Wed, 1 May 2024 08:39:29 -0700 Subject: [PATCH 7/9] fix: improve benchmark --- .../benchmarks/bench/stream-decoder.js | 1 - packages/grpc-js/benchmarks/common.js | 96 +------------------ packages/grpc-js/benchmarks/markdown.js | 27 ------ packages/grpc-js/benchmarks/package.json | 3 +- 4 files changed, 6 insertions(+), 121 deletions(-) delete mode 100644 packages/grpc-js/benchmarks/markdown.js diff --git a/packages/grpc-js/benchmarks/bench/stream-decoder.js b/packages/grpc-js/benchmarks/bench/stream-decoder.js index c0fe87098..f3353f596 100644 --- a/packages/grpc-js/benchmarks/bench/stream-decoder.js +++ b/packages/grpc-js/benchmarks/bench/stream-decoder.js @@ -8,7 +8,6 @@ const { StreamDecoder: NewStreamDecoder, decoder: decoderManager, } = require('../../build/src/stream-decoder'); -const { buffer } = require('stream/consumers'); const suite = createBenchmarkSuite('Stream Decoder'); diff --git a/packages/grpc-js/benchmarks/common.js b/packages/grpc-js/benchmarks/common.js index 2928d86a4..68491f3be 100644 --- a/packages/grpc-js/benchmarks/common.js +++ b/packages/grpc-js/benchmarks/common.js @@ -1,100 +1,12 @@ -const Benchmark = require('benchmark'); -const { createTableHeader, H2, eventToMdTable } = require('./markdown'); -const os = require('os'); +const Benchmarkify = require('benchmarkify'); -function installMarkdownEmitter( - suite, - name, - tableHeaderColumns = ['name', 'ops/sec', 'samples'] -) { - const tableHeader = createTableHeader(tableHeaderColumns); - - suite - .on('start', function () { - console.log(H2(name)); - console.log(tableHeader); - }) - .on('cycle', function (event) { - console.log(eventToMdTable(event)); - }); -} - -function getMachineInfo() { - return { - platform: os.platform(), - arch: os.arch(), - cpus: os.cpus().length, - totalMemory: os.totalmem() / 1024 ** 3, - }; -} - -function installMarkdownMachineInfo(suite) { - if (!process.env.CI) return; - - const { platform, arch, cpus, totalMemory } = getMachineInfo(); - - const machineInfo = `${platform} ${arch} | ${cpus} vCPUs | ${totalMemory.toFixed( - 1 - )}GB Mem`; - - suite.on('complete', () => { - const writter = process.stdout; - - writter.write('\n\n'); - writter.write('
\n'); - writter.write('Environment'); - writter.write(`\n -* __Machine:__ ${machineInfo} -* __Run:__ ${new Date()} -`); - writter.write('
'); - writter.write('\n\n'); - }); -} - -function installMarkdownHiddenDetailedInfo(suite) { - if (!process.env.CI) return; - - const cycleEvents = []; - - suite - .on('cycle', function (event) { - cycleEvents.push({ - name: event.target.name, - opsSec: event.target.hz, - samples: event.target.cycles, - }); - }) - .on('complete', function () { - const writter = process.stdout; - - writter.write('\n'); - }); -} - -function createBenchmarkSuite( - name, - { tableHeaderColumns = ['name', 'ops/sec', 'samples'] } = {} -) { - const suite = new Benchmark.Suite(); - - installMarkdownEmitter(suite, name, tableHeaderColumns); - installMarkdownMachineInfo(suite); - installMarkdownHiddenDetailedInfo(suite); +function createBenchmarkSuite(name) { + const benchmark = new Benchmarkify('grpc-js benchmarks').printHeader(); + const suite = benchmark.createSuite(name); return suite; } module.exports = { createBenchmarkSuite, - installMarkdownEmitter, - installMarkdownMachineInfo, - installMarkdownHiddenDetailedInfo, }; diff --git a/packages/grpc-js/benchmarks/markdown.js b/packages/grpc-js/benchmarks/markdown.js deleted file mode 100644 index 3699f5503..000000000 --- a/packages/grpc-js/benchmarks/markdown.js +++ /dev/null @@ -1,27 +0,0 @@ -function eventToMdTable(event) { - const { target } = event; - return `|${target.name}|${Math.round(target.hz).toLocaleString()}|${ - target.stats.sample.length - }|`; -} - -function createTableHeader(columns) { - let header = '|'; - let headerSep = '|'; - for (const col of columns) { - header += `${col}|`; - headerSep += '-|'; - } - - return `${header}\n${headerSep}`; -} - -function H2(title) { - return `## ${title}\n`; -} - -module.exports = { - eventToMdTable, - createTableHeader, - H2, -}; diff --git a/packages/grpc-js/benchmarks/package.json b/packages/grpc-js/benchmarks/package.json index 7ff01cce7..e88845616 100644 --- a/packages/grpc-js/benchmarks/package.json +++ b/packages/grpc-js/benchmarks/package.json @@ -15,6 +15,7 @@ }, "dependencies": { "@grpc/proto-loader": "^0.7.12", - "benchmark": "^2.1.4" + "benchmark": "^2.1.4", + "benchmarkify": "^4.0.0" } } From ff0880ce1ec501d06a9a9d081d8876cf95b67a16 Mon Sep 17 00:00:00 2001 From: AVVS Date: Wed, 1 May 2024 10:22:16 -0700 Subject: [PATCH 8/9] fix: stream-decoder benches --- .../benchmarks/bench/stream-decoder.js | 99 ++++++++++--------- packages/grpc-js/benchmarks/common.js | 4 +- 2 files changed, 54 insertions(+), 49 deletions(-) diff --git a/packages/grpc-js/benchmarks/bench/stream-decoder.js b/packages/grpc-js/benchmarks/bench/stream-decoder.js index f3353f596..62416a4da 100644 --- a/packages/grpc-js/benchmarks/bench/stream-decoder.js +++ b/packages/grpc-js/benchmarks/bench/stream-decoder.js @@ -1,4 +1,4 @@ -const { createBenchmarkSuite } = require('../common'); +const { benchmark, createBenchmarkSuite } = require('../common'); const { serializeMessage } = require('../helpers/encode'); const { echoService } = require('../helpers/utils'); const { @@ -9,8 +9,6 @@ const { decoder: decoderManager, } = require('../../build/src/stream-decoder'); -const suite = createBenchmarkSuite('Stream Decoder'); - const serializedSmallBinary = serializeMessage( echoService.service.Echo.requestSerialize, { @@ -55,81 +53,86 @@ const getLargeSplit = () => { const originalCached = new OGStreamDecoder(); const currentCached = decoderManager.get(); -suite +createBenchmarkSuite('Small Payload') // mark -- original decoder, fresh copies - .add('original stream decoder', function () { + .add('1.10.6', function () { const decoder = new OGStreamDecoder(); decoder.write(getSmallBinary()); }) - .add('original stream decoder - small split', function () { + .add('1.10.6 cached', function () { const decoder = new OGStreamDecoder(); - for (const item of getSmallSplit()) { - decoder.write(item); - } + decoder.write(getSmallBinary()); }) - .add('original stream decoder - large', function () { - const decoder = new OGStreamDecoder(); - decoder.write(getLargeBinary()); + .add('current', function () { + const decoder = new NewStreamDecoder(); + decoder.write(getSmallBinary()); }) - .add('original stream decoder - large split', function () { + .add('current cached', function () { + currentCached.write(getSmallBinary()); + }); + +createBenchmarkSuite('Small Payload Chunked') + .add('1.10.6', function () { const decoder = new OGStreamDecoder(); - for (const item of getLargeSplit()) { + for (const item of getSmallSplit()) { decoder.write(item); } }) - // original decoder - cached instance - .add('original stream decoder cached', function () { - originalCached.write(getSmallBinary()); - }) - .add('original stream decoder cached - small split', function () { + .add('1.10.6 cached', function () { for (const item of getSmallSplit()) { originalCached.write(item); } }) - .add('original stream decoder cached - large', function () { - originalCached.write(getLargeBinary()); - }) - .add('original stream decoder cached - large split', function () { - for (const item of getLargeSplit()) { - originalCached.write(item); - } - }) - // decoder v2 - new instance - .add('stream decoder v2', function () { - const decoder = new NewStreamDecoder(); - decoder.write(getSmallBinary()); - }) - .add('stream decoder v2 - small split', function () { + .add('current', function () { const decoder = new NewStreamDecoder(); for (const item of getSmallSplit()) { decoder.write(item); } }) - .add('stream decoder v2 - large', function () { - const decoder = new NewStreamDecoder(); + .add('current cached', function () { + for (const item of getSmallSplit()) { + currentCached.write(item); + } + }); + +createBenchmarkSuite('Large Payload') + .add('1.10.6', function () { + const decoder = new OGStreamDecoder(); decoder.write(getLargeBinary()); }) - .add('stream decoder v2 - large split', function () { + .add('1.10.6 cached', function () { + originalCached.write(getLargeBinary()); + }) + .add('current', function () { const decoder = new NewStreamDecoder(); + decoder.write(getLargeBinary()); + }) + .add('current cached', function () { + currentCached.write(getLargeBinary()); + }); + +createBenchmarkSuite('Large Payload Chunked') + .add('1.10.6', function () { + const decoder = new OGStreamDecoder(); for (const item of getLargeSplit()) { decoder.write(item); } }) - // decoder v2 - cached - .add('stream decoder v2 cached', function () { - currentCached.write(getSmallBinary()); - }) - .add('stream decoder v2 cached - small split', function () { - for (const item of getSmallSplit()) { - currentCached.write(item); + .add('1.10.6 cached', function () { + for (const item of getLargeSplit()) { + originalCached.write(item); } }) - .add('stream decoder v2 cached - large', function () { - currentCached.write(getLargeBinary()); + .add('current', function () { + const decoder = new NewStreamDecoder(); + for (const item of getLargeSplit()) { + decoder.write(item); + } }) - .add('stream decoder v2 cached - large split', function () { + .add('current cached', function () { for (const item of getLargeSplit()) { currentCached.write(item); } - }) - .run({ async: false }); + }); + +benchmark.run(); diff --git a/packages/grpc-js/benchmarks/common.js b/packages/grpc-js/benchmarks/common.js index 68491f3be..1b9afda6a 100644 --- a/packages/grpc-js/benchmarks/common.js +++ b/packages/grpc-js/benchmarks/common.js @@ -1,12 +1,14 @@ const Benchmarkify = require('benchmarkify'); +const benchmark = new Benchmarkify('grpc-js benchmarks').printHeader(); + function createBenchmarkSuite(name) { - const benchmark = new Benchmarkify('grpc-js benchmarks').printHeader(); const suite = benchmark.createSuite(name); return suite; } module.exports = { + benchmark, createBenchmarkSuite, }; From 3077f82dc17f5f4a31de8adfe0e18a702c8cabdb Mon Sep 17 00:00:00 2001 From: AVVS Date: Wed, 1 May 2024 10:35:03 -0700 Subject: [PATCH 9/9] chore: adjust metadata tests --- packages/grpc-js/benchmarks/bench/metadata.js | 65 ++++++++++--------- 1 file changed, 35 insertions(+), 30 deletions(-) diff --git a/packages/grpc-js/benchmarks/bench/metadata.js b/packages/grpc-js/benchmarks/bench/metadata.js index 1f12ccc2f..a9a0991fa 100644 --- a/packages/grpc-js/benchmarks/bench/metadata.js +++ b/packages/grpc-js/benchmarks/bench/metadata.js @@ -1,4 +1,4 @@ -const { createBenchmarkSuite } = require('../common'); +const { benchmark, createBenchmarkSuite } = require('../common'); const { sensitiveHeaders, constants: { @@ -12,8 +12,6 @@ const { } = require('@grpc/grpc-js/build/src/metadata'); const { Metadata } = require('../../build/src/metadata'); -const suite = createBenchmarkSuite('Metadata'); - const GRPC_ACCEPT_ENCODING_HEADER = 'grpc-accept-encoding'; const GRPC_ENCODING_HEADER = 'grpc-encoding'; const GRPC_TIMEOUT_HEADER = 'grpc-timeout'; @@ -36,35 +34,42 @@ const headers = Object.setPrototypeOf( const ogMeta = MetadataOriginal.fromHttp2Headers(headers); const currentMeta = Metadata.fromHttp2Headers(headers); -suite - .add('grpc-js@1.10.6 fromHttp2Headers', function () { - return MetadataOriginal.fromHttp2Headers(headers); - }) - .add('grpc-js@1.10.6 toHttp2Headers', function () { - return ogMeta.toHttp2Headers(); +const removeHeaders = metadata => { + metadata.remove(GRPC_TIMEOUT_HEADER); + metadata.remove(GRPC_ENCODING_HEADER); + metadata.remove(GRPC_ACCEPT_ENCODING_HEADER); + metadata.remove(HTTP2_HEADER_ACCEPT_ENCODING); + metadata.remove(HTTP2_HEADER_TE); + metadata.remove(HTTP2_HEADER_CONTENT_TYPE); +}; + +removeHeaders(ogMeta); +removeHeaders(currentMeta); + +createBenchmarkSuite('fromHttp2Headers') + .add('1.10.6', function () { + MetadataOriginal.fromHttp2Headers(headers); }) - .add('grpc-js@1.10.6 fromHttp2Headers + common operations', function () { + .add('current', function () { + Metadata.fromHttp2Headers(headers); + }); + +createBenchmarkSuite('fromHttp2Headers + common operations') + .add('1.10.6', () => { const metadata = MetadataOriginal.fromHttp2Headers(headers); - metadata.remove(GRPC_TIMEOUT_HEADER); - metadata.remove(GRPC_ENCODING_HEADER); - metadata.remove(GRPC_ACCEPT_ENCODING_HEADER); - metadata.remove(HTTP2_HEADER_ACCEPT_ENCODING); - metadata.remove(HTTP2_HEADER_TE); - metadata.remove(HTTP2_HEADER_CONTENT_TYPE); - }) - .add('current fromHttp2Headers', function () { - return Metadata.fromHttp2Headers(headers); + removeHeaders(metadata); }) - .add('current toHttp2Headers', function () { - return currentMeta.toHttp2Headers(); - }) - .add('current + common operations', function () { + .add('current', () => { const metadata = Metadata.fromHttp2Headers(headers); - metadata.remove(GRPC_TIMEOUT_HEADER); - metadata.remove(GRPC_ENCODING_HEADER); - metadata.remove(GRPC_ACCEPT_ENCODING_HEADER); - metadata.remove(HTTP2_HEADER_ACCEPT_ENCODING); - metadata.remove(HTTP2_HEADER_TE); - metadata.remove(HTTP2_HEADER_CONTENT_TYPE); + removeHeaders(metadata); + }); + +createBenchmarkSuite('toHttp2Headers') + .add('1.10.6', function () { + return ogMeta.toHttp2Headers(); }) - .run({ async: false }); + .add('current', function () { + return currentMeta.toHttp2Headers(); + }); + +benchmark.run();