Skip to content

Commit

Permalink
feat: add tracer DRPObject (#483)
Browse files Browse the repository at this point in the history
Co-authored-by: Sacha Froment <[email protected]>
  • Loading branch information
anhnd350309 and sfroment authored Feb 21, 2025
1 parent 44b7e5b commit 321dd72
Show file tree
Hide file tree
Showing 12 changed files with 170 additions and 89 deletions.
3 changes: 2 additions & 1 deletion examples/grid/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
},
"dependencies": {
"@ts-drp/node": "0.7.0",
"@ts-drp/object": "0.7.0"
"@ts-drp/object": "0.7.0",
"@ts-drp/tracer": "^0.7.0"
},
"devDependencies": {
"@types/node": "^22.5.4",
Expand Down
17 changes: 14 additions & 3 deletions examples/grid/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { DRPNode } from "@ts-drp/node";
import { enableTracing, IMetrics, OpentelemetryMetrics } from "@ts-drp/tracer";

import { Grid } from "./objects/grid";
import { render, enableUIControls, renderInfo } from "./render";
Expand Down Expand Up @@ -74,13 +75,16 @@ async function createConnectHandlers() {
});
}

async function run() {
async function run(metrics?: IMetrics) {
enableUIControls();
renderInfo();

const button_create = <HTMLButtonElement>document.getElementById("createGrid");
button_create.addEventListener("click", async () => {
gridState.drpObject = await gridState.node.createObject({ drp: new Grid() });
gridState.drpObject = await gridState.node.createObject({
drp: new Grid(),
metrics,
});
gridState.gridDRP = gridState.drpObject.drp as Grid;
await createConnectHandlers();
await addUser();
Expand All @@ -94,6 +98,7 @@ async function run() {
gridState.drpObject = await gridState.node.connectObject({
id: drpId,
drp: new Grid(),
metrics,
});
gridState.gridDRP = gridState.drpObject.drp as Grid;
await createConnectHandlers();
Expand Down Expand Up @@ -127,12 +132,18 @@ async function run() {
}

async function main() {
let metrics: IMetrics | undefined = undefined;
if (import.meta.env.VITE_ENABLE_TRACING) {
enableTracing();
metrics = new OpentelemetryMetrics("grid-service-2");
}

const networkConfig = getNetworkConfigFromEnv();
gridState.node = new DRPNode(networkConfig ? { network_config: networkConfig } : undefined);
await gridState.node.start();
await gridState.node.networkNode.isDialable(async () => {
console.log("Started node", import.meta.env);
await run();
await run(metrics);
});

setInterval(renderInfo, import.meta.env.VITE_RENDER_INFO_INTERVAL);
Expand Down
1 change: 1 addition & 0 deletions packages/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"@ts-drp/network": "0.7.0",
"@ts-drp/object": "0.7.0",
"@ts-drp/logger": "0.7.0",
"@ts-drp/tracer": "0.7.0",
"@ts-drp/types": "0.7.0",
"commander": "^13.0.0",
"uint8arrays": "^5.1.0"
Expand Down
10 changes: 9 additions & 1 deletion packages/node/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { EventCallback, StreamHandler } from "@libp2p/interface";
import { Logger, type LoggerOptions } from "@ts-drp/logger";
import { DRPNetworkNode, type DRPNetworkNodeConfig } from "@ts-drp/network";
import { type ACL, type DRP, DRPObject } from "@ts-drp/object";
import { IMetrics } from "@ts-drp/tracer";
import { Message, MessageType } from "@ts-drp/types";

import { drpMessagesHandler } from "./handlers.js";
Expand Down Expand Up @@ -89,13 +90,15 @@ export class DRPNode {
enabled: boolean;
peerId?: string;
};
metrics?: IMetrics;
}) {
const object = new DRPObject({
peerId: this.networkNode.peerId,
publicCredential: options.acl ? undefined : this.credentialStore.getPublicCredential(),
acl: options.acl,
drp: options.drp,
id: options.id,
metrics: options.metrics,
});
operations.createObject(this, object);
await operations.subscribeObject(this, object.id);
Expand All @@ -118,8 +121,13 @@ export class DRPNode {
sync?: {
peerId?: string;
};
metrics?: IMetrics;
}) {
const object = operations.connectObject(this, options.id, options.drp, options.sync?.peerId);
const object = operations.connectObject(this, options.id, {
peerId: options.sync?.peerId,
drp: options.drp,
metrics: options.metrics,
});
return object;
}

Expand Down
29 changes: 18 additions & 11 deletions packages/node/src/operations.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,45 @@
import { type DRP, DRPObject, HashGraph } from "@ts-drp/object";
import { IMetrics } from "@ts-drp/tracer";
import { FetchState, Message, MessageType, Sync } from "@ts-drp/types";

import { drpMessagesHandler, drpObjectChangesHandler } from "./handlers.js";
import { type DRPNode, log } from "./index.js";

export function createObject(node: DRPNode, object: DRPObject) {
node.objectStore.put(object.id, object);
object.subscribe((obj, originFn, vertices) =>
drpObjectChangesHandler(node, obj, originFn, vertices)
);
object.subscribe((obj, originFn, vertices) => {
drpObjectChangesHandler(node, obj, originFn, vertices);
});
}

export type ConnectObjectOptions = {
drp?: DRP;
peerId?: string;
metrics?: IMetrics;
};

export async function connectObject(
node: DRPNode,
id: string,
drp?: DRP,
peerId?: string
options: ConnectObjectOptions
): Promise<DRPObject> {
const object = DRPObject.createObject({
peerId: node.networkNode.peerId,
id,
drp,
drp: options.drp,
metrics: options.metrics,
});
node.objectStore.put(id, object);

await fetchState(node, id, peerId);
await fetchState(node, id, options.peerId);
// sync process needs to finish before subscribing
const retry = setInterval(async () => {
if (object.acl) {
await syncObject(node, id, peerId);
await syncObject(node, id, options.peerId);
await subscribeObject(node, id);
object.subscribe((obj, originFn, vertices) =>
drpObjectChangesHandler(node, obj, originFn, vertices)
);
object.subscribe((obj, originFn, vertices) => {
drpObjectChangesHandler(node, obj, originFn, vertices);
});
clearInterval(retry);
}
}, 1000);
Expand Down
1 change: 1 addition & 0 deletions packages/object/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"@chainsafe/bls": "^8.1.0",
"@msgpack/msgpack": "^3.0.1",
"@ts-drp/logger": "^0.7.0",
"@ts-drp/tracer": "^0.7.0",
"@ts-drp/types": "^0.7.0",
"es-toolkit": "1.30.1",
"fast-deep-equal": "^3.1.3",
Expand Down
14 changes: 13 additions & 1 deletion packages/object/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Logger, type LoggerOptions } from "@ts-drp/logger";
import { IMetrics } from "@ts-drp/tracer";
import {
DRPObjectBase,
DRPState,
Expand All @@ -15,6 +16,7 @@ import type { ACL } from "./acl/interface.js";
import { type FinalityConfig, FinalityStore } from "./finality/index.js";
import { type Hash, HashGraph } from "./hashgraph/index.js";
import {
ConnectObjectOptions,
type DRP,
type DRPObjectCallback,
type DRPPublicCredential,
Expand Down Expand Up @@ -60,6 +62,7 @@ export class DRPObject implements DRPObjectBase {
drp?: DRP;
id?: string;
config?: DRPObjectConfig;
metrics?: IMetrics;
}) {
if (!options.acl && !options.publicCredential) {
throw new Error("Either publicCredential or acl must be provided to create a DRPObject");
Expand Down Expand Up @@ -94,6 +97,14 @@ export class DRPObject implements DRPObjectBase {
this.finalityStore = new FinalityStore(options.config?.finality_config);
this.originalObjectACL = cloneDeep(objAcl);
this.originalDRP = cloneDeep(options.drp);
this.callFn =
options.metrics?.traceFunc("drpObject.callFn", this.callFn.bind(this)) ?? this.callFn;
this._computeObjectACL =
options.metrics?.traceFunc("drpObject.computeObjectACL", this._computeObjectACL.bind(this)) ??
this._computeObjectACL;
this._computeDRP =
options.metrics?.traceFunc("drpObject.computeDRP", this._computeDRP.bind(this)) ??
this._computeDRP;
}

private _initLocalDrpInstance(peerId: string, drp: DRP, acl: DRP) {
Expand All @@ -112,7 +123,7 @@ export class DRPObject implements DRPObjectBase {
this.vertices = this.hashGraph.getAllVertices();
}

static createObject(options: { peerId: string; id?: string; drp?: DRP }) {
static createObject(options: ConnectObjectOptions) {
const aclObj = new ObjectACL({
admins: new Map(),
permissionless: true,
Expand All @@ -122,6 +133,7 @@ export class DRPObject implements DRPObjectBase {
id: options.id,
acl: aclObj,
drp: options.drp,
metrics: options.metrics,
});
return object;
}
Expand Down
7 changes: 7 additions & 0 deletions packages/object/src/interface.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { IMetrics } from "@ts-drp/tracer";
import { ObjectPb } from "@ts-drp/types";
import { type Vertex_Operation as Operation, Vertex } from "@ts-drp/types";

Expand Down Expand Up @@ -31,3 +32,9 @@ export interface LcaAndOperations {
lca: string;
linearizedOperations: Operation[];
}
export type ConnectObjectOptions = {
peerId: string;
id?: string;
drp?: DRP;
metrics?: IMetrics;
};
107 changes: 58 additions & 49 deletions packages/tracer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import {
} from "@opentelemetry/sdk-trace-web";
import { ATTR_SERVICE_NAME } from "@opentelemetry/semantic-conventions";

import { IMetrics } from "./interface.js";

export { IMetrics };

let enabled = false;
let tracer: OtTracer | undefined;
let provider: WebTracerProvider | undefined;
let exporter: OTLPTraceExporter | undefined;

Expand All @@ -31,21 +34,16 @@ export type EnableTracingOptions = {
};
};

export const enableTracing = (tracerName: string, opts: EnableTracingOptions = {}): void => {
export const enableTracing = (opts: EnableTracingOptions = {}): void => {
enabled = true;
initContextManager();
initProvider(opts.provider);

if (provider) {
tracer = provider.getTracer(tracerName) as OtTracer;
}
};

// disableTracing should reset the tracer, provider, and exporter
// there for testing purposes
export const disableTracing = (): void => {
enabled = false;
tracer = undefined;
provider = undefined;
exporter = undefined;
};
Expand Down Expand Up @@ -179,49 +177,60 @@ function wrapAsyncGenerator<T>(gen: AsyncGenerator<T>, span: Span): AsyncGenerat
return wrapped;
}

export function traceFunc<Args extends unknown[], Return>(
name: string,
fn: (...args: Args) => Return,
setAttributes?: (span: Span, ...args: Args) => void
): (...args: Args) => Return {
return (...args: Args): Return => {
if (!tracer || !enabled) return fn(...args);
const parentContext = context.active();
const span = tracer.startSpan(name, {}, parentContext);

if (setAttributes) {
setAttributes(span, ...args);
}

let result: Return;
const childContext = trace.setSpan(parentContext, span);
try {
result = context.with(childContext, fn, undefined, ...args);
} catch (err) {
const error = err instanceof Error ? err : new Error(String(err));
span.recordException(error);
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.toString(),
});
export class OpentelemetryMetrics implements IMetrics {
private tracer: OtTracer | undefined;

constructor(tracerName: string) {
if (!provider) return;
this.tracer = provider.getTracer(tracerName) as OtTracer;
}

public traceFunc<Args extends unknown[], Return>(
name: string,
fn: (...args: Args) => Return,
setAttributes?: (span: Span, ...args: Args) => void
): (...args: Args) => Return {
return (...args: Args): Return => {
if (!this.tracer || !enabled) {
return fn(...args);
}
const parentContext = context.active();
const span = this.tracer.startSpan(name, {}, parentContext);

if (setAttributes) {
setAttributes(span, ...args);
}

let result: Return;
const childContext = trace.setSpan(parentContext, span);
try {
result = context.with(childContext, fn, undefined, ...args);
} catch (err) {
const error = err instanceof Error ? err : new Error(String(err));
span.recordException(error);
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.toString(),
});
span.end();
throw error;
}

if (isPromise<unknown>(result)) {
return wrapPromise(result, span) as Return;
}
if (isGenerator(result)) {
return wrapGenerator(result, span) as Return;
}
if (isAsyncGenerator(result)) {
return wrapAsyncGenerator(result, span) as Return;
}

span.setStatus({ code: SpanStatusCode.OK });
span.end();
throw error;
}

if (isPromise<unknown>(result)) {
return wrapPromise(result, span) as Return;
}
if (isGenerator(result)) {
return wrapGenerator(result, span) as Return;
}
if (isAsyncGenerator(result)) {
return wrapAsyncGenerator(result, span) as Return;
}

span.setStatus({ code: SpanStatusCode.OK });
span.end();
return result;
};
return result;
};
}
}

const initExporter = (opts: EnableTracingOptions["provider"]): OTLPTraceExporter => {
Expand Down
8 changes: 8 additions & 0 deletions packages/tracer/src/interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
export interface IMetrics {
traceFunc<Args extends unknown[], Return>(
name: string,
fn: (...args: Args) => Return,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
setAttributes?: (span: any, ...args: Args) => void
): (...args: Args) => Return;
}
Loading

0 comments on commit 321dd72

Please sign in to comment.