diff --git a/examples/grid/package.json b/examples/grid/package.json index 3b87c0f9..d5f6013b 100644 --- a/examples/grid/package.json +++ b/examples/grid/package.json @@ -10,7 +10,8 @@ }, "dependencies": { "@ts-drp/node": "0.7.0", - "@ts-drp/object": "0.7.0" + "@ts-drp/object": "0.7.0", + "@ts-drp/tracer": "^0.7.0" }, "devDependencies": { "@types/node": "^22.5.4", diff --git a/examples/grid/src/index.ts b/examples/grid/src/index.ts index 4e59e1a6..6387d25a 100644 --- a/examples/grid/src/index.ts +++ b/examples/grid/src/index.ts @@ -1,4 +1,5 @@ import { DRPNode } from "@ts-drp/node"; +import { enableTracing, IMetrics, OpentelemetryMetrics } from "@ts-drp/tracer"; import { Grid } from "./objects/grid"; import { render, enableUIControls, renderInfo } from "./render"; @@ -74,13 +75,16 @@ async function createConnectHandlers() { }); } -async function run() { +async function run(metrics?: IMetrics) { enableUIControls(); renderInfo(); const button_create = document.getElementById("createGrid"); button_create.addEventListener("click", async () => { - gridState.drpObject = await gridState.node.createObject({ drp: new Grid() }); + gridState.drpObject = await gridState.node.createObject({ + drp: new Grid(), + metrics, + }); gridState.gridDRP = gridState.drpObject.drp as Grid; await createConnectHandlers(); await addUser(); @@ -94,6 +98,7 @@ async function run() { gridState.drpObject = await gridState.node.connectObject({ id: drpId, drp: new Grid(), + metrics, }); gridState.gridDRP = gridState.drpObject.drp as Grid; await createConnectHandlers(); @@ -127,12 +132,18 @@ async function run() { } async function main() { + let metrics: IMetrics | undefined = undefined; + if (import.meta.env.VITE_ENABLE_TRACING) { + enableTracing(); + metrics = new OpentelemetryMetrics("grid-service-2"); + } + const networkConfig = getNetworkConfigFromEnv(); gridState.node = new DRPNode(networkConfig ? { network_config: networkConfig } : undefined); await gridState.node.start(); await gridState.node.networkNode.isDialable(async () => { console.log("Started node", import.meta.env); - await run(); + await run(metrics); }); setInterval(renderInfo, import.meta.env.VITE_RENDER_INFO_INTERVAL); diff --git a/packages/node/package.json b/packages/node/package.json index be709c46..c44e9c21 100644 --- a/packages/node/package.json +++ b/packages/node/package.json @@ -51,6 +51,7 @@ "@ts-drp/network": "0.7.0", "@ts-drp/object": "0.7.0", "@ts-drp/logger": "0.7.0", + "@ts-drp/tracer": "0.7.0", "@ts-drp/types": "0.7.0", "commander": "^13.0.0", "uint8arrays": "^5.1.0" diff --git a/packages/node/src/index.ts b/packages/node/src/index.ts index 31b6d7bd..8e64c30c 100644 --- a/packages/node/src/index.ts +++ b/packages/node/src/index.ts @@ -3,6 +3,7 @@ import type { EventCallback, StreamHandler } from "@libp2p/interface"; import { Logger, type LoggerOptions } from "@ts-drp/logger"; import { DRPNetworkNode, type DRPNetworkNodeConfig } from "@ts-drp/network"; import { type ACL, type DRP, DRPObject } from "@ts-drp/object"; +import { IMetrics } from "@ts-drp/tracer"; import { Message, MessageType } from "@ts-drp/types"; import { drpMessagesHandler } from "./handlers.js"; @@ -89,6 +90,7 @@ export class DRPNode { enabled: boolean; peerId?: string; }; + metrics?: IMetrics; }) { const object = new DRPObject({ peerId: this.networkNode.peerId, @@ -96,6 +98,7 @@ export class DRPNode { acl: options.acl, drp: options.drp, id: options.id, + metrics: options.metrics, }); operations.createObject(this, object); await operations.subscribeObject(this, object.id); @@ -118,8 +121,13 @@ export class DRPNode { sync?: { peerId?: string; }; + metrics?: IMetrics; }) { - const object = operations.connectObject(this, options.id, options.drp, options.sync?.peerId); + const object = operations.connectObject(this, options.id, { + peerId: options.sync?.peerId, + drp: options.drp, + metrics: options.metrics, + }); return object; } diff --git a/packages/node/src/operations.ts b/packages/node/src/operations.ts index bcb331b5..92f8df8a 100644 --- a/packages/node/src/operations.ts +++ b/packages/node/src/operations.ts @@ -1,4 +1,5 @@ import { type DRP, DRPObject, HashGraph } from "@ts-drp/object"; +import { IMetrics } from "@ts-drp/tracer"; import { FetchState, Message, MessageType, Sync } from "@ts-drp/types"; import { drpMessagesHandler, drpObjectChangesHandler } from "./handlers.js"; @@ -6,33 +7,39 @@ import { type DRPNode, log } from "./index.js"; export function createObject(node: DRPNode, object: DRPObject) { node.objectStore.put(object.id, object); - object.subscribe((obj, originFn, vertices) => - drpObjectChangesHandler(node, obj, originFn, vertices) - ); + object.subscribe((obj, originFn, vertices) => { + drpObjectChangesHandler(node, obj, originFn, vertices); + }); } +export type ConnectObjectOptions = { + drp?: DRP; + peerId?: string; + metrics?: IMetrics; +}; + export async function connectObject( node: DRPNode, id: string, - drp?: DRP, - peerId?: string + options: ConnectObjectOptions ): Promise { const object = DRPObject.createObject({ peerId: node.networkNode.peerId, id, - drp, + drp: options.drp, + metrics: options.metrics, }); node.objectStore.put(id, object); - await fetchState(node, id, peerId); + await fetchState(node, id, options.peerId); // sync process needs to finish before subscribing const retry = setInterval(async () => { if (object.acl) { - await syncObject(node, id, peerId); + await syncObject(node, id, options.peerId); await subscribeObject(node, id); - object.subscribe((obj, originFn, vertices) => - drpObjectChangesHandler(node, obj, originFn, vertices) - ); + object.subscribe((obj, originFn, vertices) => { + drpObjectChangesHandler(node, obj, originFn, vertices); + }); clearInterval(retry); } }, 1000); diff --git a/packages/object/package.json b/packages/object/package.json index 3f9aa9ed..257f6051 100644 --- a/packages/object/package.json +++ b/packages/object/package.json @@ -39,6 +39,7 @@ "@chainsafe/bls": "^8.1.0", "@msgpack/msgpack": "^3.0.1", "@ts-drp/logger": "^0.7.0", + "@ts-drp/tracer": "^0.7.0", "@ts-drp/types": "^0.7.0", "es-toolkit": "1.30.1", "fast-deep-equal": "^3.1.3", diff --git a/packages/object/src/index.ts b/packages/object/src/index.ts index 0d843a16..ea4f369f 100644 --- a/packages/object/src/index.ts +++ b/packages/object/src/index.ts @@ -1,4 +1,5 @@ import { Logger, type LoggerOptions } from "@ts-drp/logger"; +import { IMetrics } from "@ts-drp/tracer"; import { DRPObjectBase, DRPState, @@ -15,6 +16,7 @@ import type { ACL } from "./acl/interface.js"; import { type FinalityConfig, FinalityStore } from "./finality/index.js"; import { type Hash, HashGraph } from "./hashgraph/index.js"; import { + ConnectObjectOptions, type DRP, type DRPObjectCallback, type DRPPublicCredential, @@ -60,6 +62,7 @@ export class DRPObject implements DRPObjectBase { drp?: DRP; id?: string; config?: DRPObjectConfig; + metrics?: IMetrics; }) { if (!options.acl && !options.publicCredential) { throw new Error("Either publicCredential or acl must be provided to create a DRPObject"); @@ -94,6 +97,14 @@ export class DRPObject implements DRPObjectBase { this.finalityStore = new FinalityStore(options.config?.finality_config); this.originalObjectACL = cloneDeep(objAcl); this.originalDRP = cloneDeep(options.drp); + this.callFn = + options.metrics?.traceFunc("drpObject.callFn", this.callFn.bind(this)) ?? this.callFn; + this._computeObjectACL = + options.metrics?.traceFunc("drpObject.computeObjectACL", this._computeObjectACL.bind(this)) ?? + this._computeObjectACL; + this._computeDRP = + options.metrics?.traceFunc("drpObject.computeDRP", this._computeDRP.bind(this)) ?? + this._computeDRP; } private _initLocalDrpInstance(peerId: string, drp: DRP, acl: DRP) { @@ -112,7 +123,7 @@ export class DRPObject implements DRPObjectBase { this.vertices = this.hashGraph.getAllVertices(); } - static createObject(options: { peerId: string; id?: string; drp?: DRP }) { + static createObject(options: ConnectObjectOptions) { const aclObj = new ObjectACL({ admins: new Map(), permissionless: true, @@ -122,6 +133,7 @@ export class DRPObject implements DRPObjectBase { id: options.id, acl: aclObj, drp: options.drp, + metrics: options.metrics, }); return object; } diff --git a/packages/object/src/interface.ts b/packages/object/src/interface.ts index e51b0a5b..76edfd15 100644 --- a/packages/object/src/interface.ts +++ b/packages/object/src/interface.ts @@ -1,3 +1,4 @@ +import { IMetrics } from "@ts-drp/tracer"; import { ObjectPb } from "@ts-drp/types"; import { type Vertex_Operation as Operation, Vertex } from "@ts-drp/types"; @@ -31,3 +32,9 @@ export interface LcaAndOperations { lca: string; linearizedOperations: Operation[]; } +export type ConnectObjectOptions = { + peerId: string; + id?: string; + drp?: DRP; + metrics?: IMetrics; +}; diff --git a/packages/tracer/src/index.ts b/packages/tracer/src/index.ts index 85f80826..0ab6d83a 100644 --- a/packages/tracer/src/index.ts +++ b/packages/tracer/src/index.ts @@ -10,8 +10,11 @@ import { } from "@opentelemetry/sdk-trace-web"; import { ATTR_SERVICE_NAME } from "@opentelemetry/semantic-conventions"; +import { IMetrics } from "./interface.js"; + +export { IMetrics }; + let enabled = false; -let tracer: OtTracer | undefined; let provider: WebTracerProvider | undefined; let exporter: OTLPTraceExporter | undefined; @@ -31,21 +34,16 @@ export type EnableTracingOptions = { }; }; -export const enableTracing = (tracerName: string, opts: EnableTracingOptions = {}): void => { +export const enableTracing = (opts: EnableTracingOptions = {}): void => { enabled = true; initContextManager(); initProvider(opts.provider); - - if (provider) { - tracer = provider.getTracer(tracerName) as OtTracer; - } }; // disableTracing should reset the tracer, provider, and exporter // there for testing purposes export const disableTracing = (): void => { enabled = false; - tracer = undefined; provider = undefined; exporter = undefined; }; @@ -179,49 +177,60 @@ function wrapAsyncGenerator(gen: AsyncGenerator, span: Span): AsyncGenerat return wrapped; } -export function traceFunc( - name: string, - fn: (...args: Args) => Return, - setAttributes?: (span: Span, ...args: Args) => void -): (...args: Args) => Return { - return (...args: Args): Return => { - if (!tracer || !enabled) return fn(...args); - const parentContext = context.active(); - const span = tracer.startSpan(name, {}, parentContext); - - if (setAttributes) { - setAttributes(span, ...args); - } - - let result: Return; - const childContext = trace.setSpan(parentContext, span); - try { - result = context.with(childContext, fn, undefined, ...args); - } catch (err) { - const error = err instanceof Error ? err : new Error(String(err)); - span.recordException(error); - span.setStatus({ - code: SpanStatusCode.ERROR, - message: error.toString(), - }); +export class OpentelemetryMetrics implements IMetrics { + private tracer: OtTracer | undefined; + + constructor(tracerName: string) { + if (!provider) return; + this.tracer = provider.getTracer(tracerName) as OtTracer; + } + + public traceFunc( + name: string, + fn: (...args: Args) => Return, + setAttributes?: (span: Span, ...args: Args) => void + ): (...args: Args) => Return { + return (...args: Args): Return => { + if (!this.tracer || !enabled) { + return fn(...args); + } + const parentContext = context.active(); + const span = this.tracer.startSpan(name, {}, parentContext); + + if (setAttributes) { + setAttributes(span, ...args); + } + + let result: Return; + const childContext = trace.setSpan(parentContext, span); + try { + result = context.with(childContext, fn, undefined, ...args); + } catch (err) { + const error = err instanceof Error ? err : new Error(String(err)); + span.recordException(error); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error.toString(), + }); + span.end(); + throw error; + } + + if (isPromise(result)) { + return wrapPromise(result, span) as Return; + } + if (isGenerator(result)) { + return wrapGenerator(result, span) as Return; + } + if (isAsyncGenerator(result)) { + return wrapAsyncGenerator(result, span) as Return; + } + + span.setStatus({ code: SpanStatusCode.OK }); span.end(); - throw error; - } - - if (isPromise(result)) { - return wrapPromise(result, span) as Return; - } - if (isGenerator(result)) { - return wrapGenerator(result, span) as Return; - } - if (isAsyncGenerator(result)) { - return wrapAsyncGenerator(result, span) as Return; - } - - span.setStatus({ code: SpanStatusCode.OK }); - span.end(); - return result; - }; + return result; + }; + } } const initExporter = (opts: EnableTracingOptions["provider"]): OTLPTraceExporter => { diff --git a/packages/tracer/src/interface.ts b/packages/tracer/src/interface.ts new file mode 100644 index 00000000..29198a7a --- /dev/null +++ b/packages/tracer/src/interface.ts @@ -0,0 +1,8 @@ +export interface IMetrics { + traceFunc( + name: string, + fn: (...args: Args) => Return, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + setAttributes?: (span: any, ...args: Args) => void + ): (...args: Args) => Return; +} diff --git a/packages/tracer/tests/index.spec.ts b/packages/tracer/tests/index.spec.ts index 6e9e1129..6be54b91 100644 --- a/packages/tracer/tests/index.spec.ts +++ b/packages/tracer/tests/index.spec.ts @@ -1,4 +1,4 @@ -import type { Span } from "@opentelemetry/api"; +import { Span } from "@opentelemetry/api"; import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http"; import { WebTracerProvider } from "@opentelemetry/sdk-trace-web"; import { beforeEach, describe, expect, test, vi } from "vitest"; @@ -10,8 +10,9 @@ import { isAsyncGenerator, isGenerator, isPromise, - traceFunc, + OpentelemetryMetrics, } from "../src/index.js"; +import { IMetrics } from "../src/interface.js"; // Mock OpenTelemetry dependencies vi.mock("@opentelemetry/api", () => { @@ -230,12 +231,15 @@ describe("isAsyncGenerator", () => { }); describe("tracing lifecycle", () => { + let metrics: IMetrics; + beforeEach(() => { vi.clearAllMocks(); + metrics = new OpentelemetryMetrics("metric"); }); test("should enable and disable tracing", async () => { - enableTracing("test-service", { + enableTracing({ provider: { serviceName: "test", exporterUrl: "http://localhost:9999", @@ -249,7 +253,7 @@ describe("tracing lifecycle", () => { headers: expect.any(Object), }); - const fn = traceFunc("test", (a: number) => a + 1); + const fn = metrics.traceFunc("test", (a: number) => a + 1); expect(fn(1)).toBe(2); disableTracing(); @@ -260,7 +264,7 @@ describe("tracing lifecycle", () => { }); test("should allow flushing traces", async () => { - enableTracing("test-service"); + enableTracing(); expect(WebTracerProvider).toHaveBeenCalled(); const mockProvider = vi.mocked(WebTracerProvider).mock.results[0].value; @@ -273,21 +277,21 @@ describe("tracing lifecycle", () => { describe("wrapping functions", () => { beforeEach(() => { vi.clearAllMocks(); - enableTracing("test-service"); + enableTracing(); }); test("should wrap synchronous functions", () => { - const fn = traceFunc("test", (a: number, b: number) => a + b); + const fn = metrics.traceFunc("test", (a: number, b: number) => a + b); expect(fn(1, 2)).toBe(3); }); test("should wrap async functions", async () => { - const fn = traceFunc("test", async (a: number, b: number) => a + b); + const fn = metrics.traceFunc("test", async (a: number, b: number) => a + b); expect(await fn(1, 2)).toBe(3); }); test("should wrap generator functions", () => { - const fn = traceFunc("test", function* (a: number) { + const fn = metrics.traceFunc("test", function* (a: number) { yield a + 1; yield a + 2; }); @@ -298,7 +302,7 @@ describe("tracing lifecycle", () => { }); test("should wrap async generator functions", async () => { - const fn = traceFunc("test", async function* (a: number) { + const fn = metrics.traceFunc("test", async function* (a: number) { yield a + 1; yield a + 2; }); @@ -309,21 +313,21 @@ describe("tracing lifecycle", () => { }); test("should handle errors in synchronous functions", () => { - const fn = traceFunc("test", () => { + const fn = metrics.traceFunc("test", () => { throw new Error("test error"); }); expect(() => fn()).toThrow("test error"); }); test("should handle errors in async functions", async () => { - const fn = traceFunc("test", async () => { + const fn = metrics.traceFunc("test", async () => { throw new Error("test error"); }); await expect(fn()).rejects.toThrow("test error"); }); test("should apply custom attributes", () => { - const fn = traceFunc( + const fn = metrics.traceFunc( "test", (a: number) => a + 1, (span: Span, a: number) => { @@ -334,13 +338,13 @@ describe("tracing lifecycle", () => { }); test("should trace functions that return promises", async () => { - const tracedPromise = traceFunc("promise-test", () => Promise.resolve(42)); + const tracedPromise = metrics.traceFunc("promise-test", () => Promise.resolve(42)); const result = await tracedPromise(); expect(result).toBe(42); }); test("should trace functions that return generators", () => { - const tracedGenerator = traceFunc("generator-test", function* () { + const tracedGenerator = metrics.traceFunc("generator-test", function* () { yield 1; yield 2; return 3; @@ -354,7 +358,7 @@ describe("tracing lifecycle", () => { }); test("should trace functions that return async generators", async () => { - const tracedAsyncGenerator = traceFunc("async-generator-test", async function* () { + const tracedAsyncGenerator = metrics.traceFunc("async-generator-test", async function* () { yield 1; yield 2; return 3; @@ -368,14 +372,14 @@ describe("tracing lifecycle", () => { }); test("should handle errors in returned promises", async () => { - const tracedPromise = traceFunc("error-promise-test", () => + const tracedPromise = metrics.traceFunc("error-promise-test", () => Promise.reject(new Error("promise error")) ); await expect(tracedPromise()).rejects.toThrow("promise error"); }); test("should handle errors in returned generators", () => { - const tracedGenerator = traceFunc("error-generator-test", function* () { + const tracedGenerator = metrics.traceFunc("error-generator-test", function* () { yield 1; throw new Error("generator error"); }); @@ -385,10 +389,13 @@ describe("tracing lifecycle", () => { }); test("should handle errors in returned async generators", async () => { - const tracedAsyncGenerator = traceFunc("error-async-generator-test", async function* () { - yield 1; - throw new Error("async generator error"); - }); + const tracedAsyncGenerator = metrics.traceFunc( + "error-async-generator-test", + async function* () { + yield 1; + throw new Error("async generator error"); + } + ); const gen = tracedAsyncGenerator(); expect((await gen.next()).value).toBe(1); await expect(gen.next()).rejects.toThrow("async generator error"); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5109fefa..fc700a47 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -130,6 +130,9 @@ importers: '@ts-drp/object': specifier: 0.7.0 version: link:../../packages/object + '@ts-drp/tracer': + specifier: ^0.7.0 + version: link:../../packages/tracer devDependencies: '@types/node': specifier: ^22.5.4 @@ -305,6 +308,9 @@ importers: '@ts-drp/object': specifier: 0.7.0 version: link:../object + '@ts-drp/tracer': + specifier: 0.7.0 + version: link:../tracer '@ts-drp/types': specifier: 0.7.0 version: link:../types @@ -348,6 +354,9 @@ importers: '@ts-drp/logger': specifier: ^0.7.0 version: link:../logger + '@ts-drp/tracer': + specifier: ^0.7.0 + version: link:../tracer '@ts-drp/types': specifier: ^0.7.0 version: link:../types @@ -11657,7 +11666,7 @@ snapshots: ky: 1.7.5 registry-auth-token: 5.1.0 registry-url: 6.0.1 - semver: 7.6.3 + semver: 7.7.1 pako@1.0.11: {}