Skip to content

Commit

Permalink
Typescript: make parsing faster (foxglove#1185)
Browse files Browse the repository at this point in the history
### Changelog
- Typescript: improved record parsing time.

### Docs

None.

### Description

I was trying to port some of the improvements from
foxglove#1168 to typescript, and noticed
that this change to `parse.ts` improves reading speed significantly with
no additional API changes.

This change switches from `new Uint8Array(fromBuffer.slice(start, end)`
to `new Uint8Array(fromBuffer, start, length).slice()`. These both have
the same effect of producing a new Uint8Array with a copy of some part
of `fromBuffer`.

I also rewrote the typescript benchmark utility to remove the dependency
on `benny`, so that I could gather memory statistics as part of the
benchmark.

#### Before:

```
(python-sNIFi2pF) j@192-168-1-105 benchmarks % yarn bench --suite reader
Running 'reader' suite
McapStreamReader
        1.29±0.02 op/s  Heap Used: 259.38±4.95 MB/op    Heap Total: 288.73±3.47 MB/op   ArrayBuffers: 214.31±1.60 MB/op
McapIndexedReader
        0.95±0.01 op/s  Heap Used: 248.20±21.02 MB/op   Heap Total: 281.67±20.46 MB/op  ArrayBuffers: 98.33±5.42 MB/op
McapIndexedReader_reverse
        1.00±0.02 op/s  Heap Used: 260.77±17.31 MB/op   Heap Total: 295.44±17.31 MB/op  ArrayBuffers: 102.49±4.62 MB/op
```
#### After

```
(python-sNIFi2pF) j@192-168-1-105 benchmarks % yarn bench --suite reader
Running 'reader' suite
McapStreamReader
        3.04±0.02 op/s  Heap Used: 261.58±8.20 MB/op    Heap Total: 289.82±6.71 MB/op   ArrayBuffers: 214.18±1.60 MB/op
McapIndexedReader
        1.83±0.01 op/s  Heap Used: 281.22±4.06 MB/op    Heap Total: 317.18±2.63 MB/op   ArrayBuffers: 107.06±0.00 MB/op
McapIndexedReader_reverse
        1.86±0.00 op/s  Heap Used: 278.37±2.01 MB/op    Heap Total: 313.65±0.93 MB/op   ArrayBuffers: 107.06±0.00 MB/op
```
  • Loading branch information
james-rms authored Jun 24, 2024
1 parent b9c9778 commit 037ceb7
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 259 deletions.
107 changes: 107 additions & 0 deletions typescript/benchmarks/bench.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import { hrtime, memoryUsage } from "process";

const COUNT = 5;

type BenchmarkResult =
| {
name: string;
gcExposed: true;
samples: {
duration: bigint;
memoryUsage: {
rss: number;
heapTotal: number;
heapUsed: number;
external: number;
arrayBuffers: number;
};
}[];
}
| {
name: string;
gcExposed: false;
samples: {
duration: bigint;
}[];
};

/** runs a benchmark and logs statistics about runtime and memory usage afterward.
*
* @param name A name for the benchmark.
* @param run a routine that runs the benchmark code.
*/
export async function runBenchmark(name: string, run: () => Promise<void>): Promise<void> {
let result: BenchmarkResult;
if (global.gc != undefined) {
result = {
name,
gcExposed: true,
samples: [],
};
for (let i = 0; i < COUNT; i++) {
global.gc();
const before = hrtime.bigint();
await run();
const after = hrtime.bigint();
result.samples.push({
duration: after - before,
memoryUsage: memoryUsage(),
});
}
} else {
result = {
name,
gcExposed: false,
samples: [],
};
for (let i = 0; i < COUNT; i++) {
const before = hrtime.bigint();
await run();
const after = hrtime.bigint();
result.samples.push({ duration: after - before });
}
}
printStats(result);
}

function humanReadableStatistics(values: number[], unit: string): string {
const count = values.length;
if (count < 1) {
return "(No samples)";
}
if (count < 2) {
return `${values[0]} ${unit}`;
}
const mean = values.reduce((a, b) => a + b, 0) / count;
const stdDev = Math.sqrt(
values.map((value) => (mean - value) ** 2).reduce((a, b) => a + b, 0) / (count - 1),
);
const stdErr = stdDev / Math.sqrt(count);
return `${mean.toFixed(2)}±${stdErr.toFixed(2)} ${unit}`;
}

function printStats(result: BenchmarkResult) {
let memoryResult = "(use --expose-gc to gather memory statistics)";
if (result.gcExposed) {
const used = humanReadableStatistics(
result.samples.map((sample) => sample.memoryUsage.heapUsed / 2 ** 20),
"MB/op",
);
const total = humanReadableStatistics(
result.samples.map((sample) => sample.memoryUsage.heapTotal / 2 ** 20),
"MB/op",
);
const arrayBuffers = humanReadableStatistics(
result.samples.map((sample) => sample.memoryUsage.arrayBuffers / 2 ** 20),
"MB/op",
);
memoryResult = `Heap Used: ${used}\tHeap Total: ${total}\tArrayBuffers: ${arrayBuffers}`;
}
const name = result.name;
const timeStat = humanReadableStatistics(
result.samples.map((r) => 1 / (Number(r.duration) / 1e9)),
"op/s",
);
console.log(name);
console.log(`\t${timeStat}\t${memoryResult}`);
}
155 changes: 112 additions & 43 deletions typescript/benchmarks/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { McapWriter } from "@mcap/core";
import { add, complete, cycle, suite } from "benny";
import { McapIndexedReader, McapStreamReader, McapWriter, TempBuffer } from "@mcap/core";
import assert from "assert";
import { program } from "commander";

import { runBenchmark } from "./bench";

/**
* An IWritable that copies data to memory, but overwrites previous data. This allows benchmarking
Expand Down Expand Up @@ -30,7 +33,78 @@ class FakeMemoryWritable {
}
}

function addWriteBenchmark({
async function benchmarkReaders() {
const messageSize = 10;
const chunkSize = 1024 * 1024 * 4;
const numMessages = 1_000_000;
const messageData = new Uint8Array(messageSize).fill(42);
const buf = new TempBuffer();
const writer = new McapWriter({ writable: buf, chunkSize });
await writer.start({ library: "", profile: "" });
const channelId = await writer.registerChannel({
schemaId: 0,
topic: "",
messageEncoding: "",
metadata: new Map([]),
});
for (let i = 0; i < numMessages; i++) {
await writer.addMessage({
channelId,
sequence: i,
logTime: BigInt(i),
publishTime: BigInt(i),
data: messageData,
});
}
await writer.end();
await runBenchmark(McapStreamReader.name, async () => {
const reader = new McapStreamReader();
reader.append(buf.get());
let messageCount = 0;
for (;;) {
const rec = reader.nextRecord();
if (rec != undefined) {
if (rec.type === "Message") {
messageCount++;
}
} else {
break;
}
}
assert(messageCount === numMessages, `expected ${numMessages} messages, got ${messageCount}`);
});
await runBenchmark(McapIndexedReader.name, async () => {
const reader = await McapIndexedReader.Initialize({ readable: buf });
let messageCount = 0;
for await (const _ of reader.readMessages()) {
messageCount++;
}
assert(messageCount === numMessages, `expected ${numMessages} messages, got ${messageCount}`);
});
await runBenchmark(McapIndexedReader.name + "_reverse", async () => {
const reader = await McapIndexedReader.Initialize({ readable: buf });
let messageCount = 0;
for await (const _ of reader.readMessages({ reverse: true })) {
messageCount++;
}
assert(messageCount === numMessages, `expected ${numMessages} messages, got ${messageCount}`);
});
}

export async function benchmarkWriter(): Promise<void> {
await runWriteBenchmark({ numMessages: 1_000_000, messageSize: 1, chunkSize: 1024 * 1024 });
await runWriteBenchmark({ numMessages: 100_000, messageSize: 1000, chunkSize: 1024 * 1024 });
await runWriteBenchmark({ numMessages: 100, messageSize: 1_000_000, chunkSize: 1024 * 1024 });
await runWriteBenchmark({ numMessages: 1_000_000, messageSize: 1, chunkSize: 10 * 1024 * 1024 });
await runWriteBenchmark({ numMessages: 100_000, messageSize: 1000, chunkSize: 10 * 1024 * 1024 });
await runWriteBenchmark({
numMessages: 100,
messageSize: 1_000_000,
chunkSize: 10 * 1024 * 1024,
});
}

async function runWriteBenchmark({
numMessages,
messageSize,
chunkSize,
Expand All @@ -39,54 +113,49 @@ function addWriteBenchmark({
messageSize: number;
chunkSize: number;
}) {
return add(
const messageData = new Uint8Array(messageSize).fill(42);
const writable = new FakeMemoryWritable(2 * chunkSize);
await runBenchmark(
`count=${numMessages.toLocaleString()} size=${messageSize.toLocaleString()} chunkSize=${chunkSize.toLocaleString()} (1 op ≈ ${(
numMessages * messageSize
).toLocaleString()} bytes)`,
async () => {
const messageData = new Uint8Array(messageSize).fill(42);
const writable = new FakeMemoryWritable(2 * chunkSize);
return async () => {
writable.reset();
const writer = new McapWriter({ writable, chunkSize });
await writer.start({ library: "", profile: "" });
const channelId = await writer.registerChannel({
schemaId: 0,
topic: "",
messageEncoding: "",
metadata: new Map([]),
writable.reset();
const writer = new McapWriter({ writable, chunkSize });
await writer.start({ library: "", profile: "" });
const channelId = await writer.registerChannel({
schemaId: 0,
topic: "",
messageEncoding: "",
metadata: new Map([]),
});
for (let i = 0; i < numMessages; i++) {
await writer.addMessage({
channelId,
sequence: i,
logTime: BigInt(i),
publishTime: BigInt(i),
data: messageData,
});
for (let i = 0; i < numMessages; i++) {
await writer.addMessage({
channelId,
sequence: i,
logTime: BigInt(i),
publishTime: BigInt(i),
data: messageData,
});
}
await writer.end();
};
}
await writer.end();
},
);
}

async function benchmarkWriter() {
await suite(
McapWriter.name,
addWriteBenchmark({ numMessages: 1_000_000, messageSize: 1, chunkSize: 1024 * 1024 }),
addWriteBenchmark({ numMessages: 100_000, messageSize: 1000, chunkSize: 1024 * 1024 }),
addWriteBenchmark({ numMessages: 100, messageSize: 1_000_000, chunkSize: 1024 * 1024 }),
addWriteBenchmark({ numMessages: 1_000_000, messageSize: 1, chunkSize: 10 * 1024 * 1024 }),
addWriteBenchmark({ numMessages: 100_000, messageSize: 1000, chunkSize: 10 * 1024 * 1024 }),
addWriteBenchmark({ numMessages: 100, messageSize: 1_000_000, chunkSize: 10 * 1024 * 1024 }),
cycle(),
complete(),
);
}

async function main() {
await benchmarkWriter();
async function main(args: { suite?: string }) {
const { suite } = args;
if (suite == undefined || suite === "writer") {
console.log("Running 'writer' suite");
await benchmarkWriter();
}
if (suite == undefined || suite === "reader") {
console.log("Running 'reader' suite");
await benchmarkReaders();
}
}

void main();
program
.addOption(program.createOption("--suite <suite>", "Name of suite to run"))
.action(main)
.parse();
6 changes: 3 additions & 3 deletions typescript/benchmarks/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
"typecheck": "tsc -p tsconfig.json --noEmit",
"lint:ci": "eslint --report-unused-disable-directives .",
"lint": "eslint --report-unused-disable-directives --fix .",
"bench": "ts-node --files --project tsconfig.cjs.json index.ts",
"bench:debug": "NODE_OPTIONS='--inspect-brk' ts-node --files --project tsconfig.cjs.json index.ts"
"bench": "TS_NODE_FILES=true TS_NODE_PROJECT=tsconfig.cjs.json node --expose-gc -r 'ts-node/register' index.ts",
"bench:debug": "TS_NODE_FILES=true TS_NODE_PROJECT=tsconfig.cjs.json node --inspect-brk --expose-gc -r 'ts-node/register' index.ts"
},
"devDependencies": {
"@foxglove/eslint-plugin": "1.0.1",
Expand All @@ -27,7 +27,7 @@
"@types/node": "18.13.0",
"@typescript-eslint/eslint-plugin": "6.11.0",
"@typescript-eslint/parser": "6.11.0",
"benny": "^3.7.1",
"commander": "12.1.0",
"eslint": "8.54.0",
"eslint-config-prettier": "9.0.0",
"eslint-plugin-es": "4.1.0",
Expand Down
2 changes: 1 addition & 1 deletion typescript/core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@mcap/core",
"version": "2.1.1",
"version": "2.1.2",
"description": "MCAP file support in TypeScript",
"license": "MIT",
"repository": {
Expand Down
43 changes: 22 additions & 21 deletions typescript/core/src/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,15 @@ export function parseRecord({
}

if (!isKnownOpcode(opcode)) {
const data = new Uint8Array(
view.buffer,
view.byteOffset + headerReader.offset,
recordLengthNum,
);
const record: TypedMcapRecord = {
type: "Unknown",
opcode,
data: new Uint8Array(view.buffer, view.byteOffset + headerReader.offset, recordLengthNum),
data,
};
return { record, usedBytes: recordEndOffset - startOffset };
}
Expand Down Expand Up @@ -107,11 +112,10 @@ export function parseRecord({
throw new Error(`Schema data length ${dataLen} exceeds bounds of record`);
}
const data = new Uint8Array(
recordView.buffer.slice(
recordView.byteOffset + reader.offset,
recordView.byteOffset + reader.offset + dataLen,
),
);
recordView.buffer,
recordView.byteOffset + reader.offset,
dataLen,
).slice();
reader.offset += dataLen;

const record: TypedMcapRecord = {
Expand Down Expand Up @@ -153,11 +157,10 @@ export function parseRecord({
const logTime = reader.uint64();
const publishTime = reader.uint64();
const data = new Uint8Array(
recordView.buffer.slice(
recordView.byteOffset + reader.offset,
recordView.byteOffset + recordView.byteLength,
),
);
recordView.buffer,
recordView.byteOffset + reader.offset,
recordView.byteLength - reader.offset,
).slice();
const record: TypedMcapRecord = {
type: "Message",
channelId,
Expand All @@ -180,11 +183,10 @@ export function parseRecord({
throw new Error("Chunk records length exceeds remaining record size");
}
const records = new Uint8Array(
recordView.buffer.slice(
recordView.byteOffset + reader.offset,
recordView.byteOffset + reader.offset + recordByteLength,
),
);
recordView.buffer,
recordView.byteOffset + reader.offset,
recordByteLength,
).slice();
const record: TypedMcapRecord = {
type: "Chunk",
messageStartTime: startTime,
Expand Down Expand Up @@ -250,11 +252,10 @@ export function parseRecord({
throw new Error(`Attachment data length ${dataLen} exceeds bounds of record`);
}
const data = new Uint8Array(
recordView.buffer.slice(
recordView.byteOffset + reader.offset,
recordView.byteOffset + reader.offset + Number(dataLen),
),
);
recordView.buffer,
recordView.byteOffset + reader.offset,
Number(dataLen),
).slice();
reader.offset += Number(dataLen);
const crcLength = reader.offset;
const expectedCrc = reader.uint32();
Expand Down
Loading

0 comments on commit 037ceb7

Please sign in to comment.