Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add tracer DRPObject #483

Merged
merged 18 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/grid/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
},
"dependencies": {
"@ts-drp/node": "0.7.0",
"@ts-drp/object": "0.7.0"
"@ts-drp/object": "0.7.0",
"@ts-drp/tracer": "^0.7.0"
},
"devDependencies": {
"@types/node": "^22.5.4",
Expand Down
17 changes: 14 additions & 3 deletions examples/grid/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { DRPNode } from "@ts-drp/node";
import { enableTracing, IMetrics, OpentelemetryMetrics } from "@ts-drp/tracer";

import { Grid } from "./objects/grid";
import { render, enableUIControls, renderInfo } from "./render";
Expand Down Expand Up @@ -74,13 +75,16 @@ async function createConnectHandlers() {
});
}

async function run() {
async function run(metrics?: IMetrics) {
enableUIControls();
renderInfo();

const button_create = <HTMLButtonElement>document.getElementById("createGrid");
button_create.addEventListener("click", async () => {
gridState.drpObject = await gridState.node.createObject({ drp: new Grid() });
gridState.drpObject = await gridState.node.createObject({
drp: new Grid(),
metrics,
});
gridState.gridDRP = gridState.drpObject.drp as Grid;
await createConnectHandlers();
await addUser();
Expand All @@ -94,6 +98,7 @@ async function run() {
gridState.drpObject = await gridState.node.connectObject({
id: drpId,
drp: new Grid(),
metrics,
});
gridState.gridDRP = gridState.drpObject.drp as Grid;
await createConnectHandlers();
Expand Down Expand Up @@ -127,12 +132,18 @@ async function run() {
}

async function main() {
let metrics: IMetrics | undefined = undefined;
if (import.meta.env.VITE_ENABLE_TRACING) {
enableTracing();
metrics = new OpentelemetryMetrics("grid-service-2");
}

const networkConfig = getNetworkConfigFromEnv();
gridState.node = new DRPNode(networkConfig ? { network_config: networkConfig } : undefined);
await gridState.node.start();
await gridState.node.networkNode.isDialable(async () => {
console.log("Started node", import.meta.env);
await run();
await run(metrics);
});

setInterval(renderInfo, import.meta.env.VITE_RENDER_INFO_INTERVAL);
Expand Down
1 change: 1 addition & 0 deletions packages/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"@ts-drp/network": "0.7.0",
"@ts-drp/object": "0.7.0",
"@ts-drp/logger": "0.7.0",
"@ts-drp/tracer": "0.7.0",
"@ts-drp/types": "0.7.0",
"commander": "^13.0.0",
"uint8arrays": "^5.1.0"
Expand Down
10 changes: 9 additions & 1 deletion packages/node/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { EventCallback, StreamHandler } from "@libp2p/interface";
import { Logger, type LoggerOptions } from "@ts-drp/logger";
import { DRPNetworkNode, type DRPNetworkNodeConfig } from "@ts-drp/network";
import { type ACL, type DRP, DRPObject } from "@ts-drp/object";
import { IMetrics } from "@ts-drp/tracer";
import { Message, MessageType } from "@ts-drp/types";

import { drpMessagesHandler } from "./handlers.js";
Expand Down Expand Up @@ -89,13 +90,15 @@ export class DRPNode {
enabled: boolean;
peerId?: string;
};
metrics?: IMetrics;
}) {
const object = new DRPObject({
peerId: this.networkNode.peerId,
publicCredential: options.acl ? undefined : this.credentialStore.getPublicCredential(),
acl: options.acl,
drp: options.drp,
id: options.id,
metrics: options.metrics,
});
operations.createObject(this, object);
await operations.subscribeObject(this, object.id);
Expand All @@ -118,8 +121,13 @@ export class DRPNode {
sync?: {
peerId?: string;
};
metrics?: IMetrics;
}) {
const object = operations.connectObject(this, options.id, options.drp, options.sync?.peerId);
const object = operations.connectObject(this, options.id, {
peerId: options.sync?.peerId,
drp: options.drp,
metrics: options.metrics,
});
return object;
}

Expand Down
29 changes: 18 additions & 11 deletions packages/node/src/operations.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,45 @@
import { type DRP, DRPObject, HashGraph } from "@ts-drp/object";
import { IMetrics } from "@ts-drp/tracer";
import { FetchState, Message, MessageType, Sync } from "@ts-drp/types";

import { drpMessagesHandler, drpObjectChangesHandler } from "./handlers.js";
import { type DRPNode, log } from "./index.js";

export function createObject(node: DRPNode, object: DRPObject) {
node.objectStore.put(object.id, object);
object.subscribe((obj, originFn, vertices) =>
drpObjectChangesHandler(node, obj, originFn, vertices)
);
object.subscribe((obj, originFn, vertices) => {
drpObjectChangesHandler(node, obj, originFn, vertices);
});
}

export type ConnectObjectOptions = {
drp?: DRP;
peerId?: string;
metrics?: IMetrics;
};

export async function connectObject(
node: DRPNode,
id: string,
drp?: DRP,
peerId?: string
options: ConnectObjectOptions
): Promise<DRPObject> {
const object = DRPObject.createObject({
peerId: node.networkNode.peerId,
id,
drp,
drp: options.drp,
metrics: options.metrics,
});
node.objectStore.put(id, object);

await fetchState(node, id, peerId);
await fetchState(node, id, options.peerId);
// sync process needs to finish before subscribing
const retry = setInterval(async () => {
if (object.acl) {
await syncObject(node, id, peerId);
await syncObject(node, id, options.peerId);
await subscribeObject(node, id);
object.subscribe((obj, originFn, vertices) =>
drpObjectChangesHandler(node, obj, originFn, vertices)
);
object.subscribe((obj, originFn, vertices) => {
drpObjectChangesHandler(node, obj, originFn, vertices);
});
clearInterval(retry);
}
}, 1000);
Expand Down
1 change: 1 addition & 0 deletions packages/object/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"@chainsafe/bls": "^8.1.0",
"@msgpack/msgpack": "^3.0.1",
"@ts-drp/logger": "^0.7.0",
"@ts-drp/tracer": "^0.7.0",
"@ts-drp/types": "^0.7.0",
"es-toolkit": "1.30.1",
"fast-deep-equal": "^3.1.3",
Expand Down
14 changes: 13 additions & 1 deletion packages/object/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Logger, type LoggerOptions } from "@ts-drp/logger";
import { IMetrics } from "@ts-drp/tracer";
import {
DRPObjectBase,
DRPState,
Expand All @@ -15,6 +16,7 @@ import type { ACL } from "./acl/interface.js";
import { type FinalityConfig, FinalityStore } from "./finality/index.js";
import { type Hash, HashGraph } from "./hashgraph/index.js";
import {
ConnectObjectOptions,
type DRP,
type DRPObjectCallback,
type DRPPublicCredential,
Expand Down Expand Up @@ -60,6 +62,7 @@ export class DRPObject implements DRPObjectBase {
drp?: DRP;
id?: string;
config?: DRPObjectConfig;
metrics?: IMetrics;
}) {
if (!options.acl && !options.publicCredential) {
throw new Error("Either publicCredential or acl must be provided to create a DRPObject");
Expand Down Expand Up @@ -94,6 +97,14 @@ export class DRPObject implements DRPObjectBase {
this.finalityStore = new FinalityStore(options.config?.finality_config);
this.originalObjectACL = cloneDeep(objAcl);
this.originalDRP = cloneDeep(options.drp);
this.callFn =
options.metrics?.traceFunc("drpObject.callFn", this.callFn.bind(this)) ?? this.callFn;
this._computeObjectACL =
options.metrics?.traceFunc("drpObject.computeObjectACL", this._computeObjectACL.bind(this)) ??
this._computeObjectACL;
this._computeDRP =
options.metrics?.traceFunc("drpObject.computeDRP", this._computeDRP.bind(this)) ??
this._computeDRP;
}

private _initLocalDrpInstance(peerId: string, drp: DRP, acl: DRP) {
Expand All @@ -112,7 +123,7 @@ export class DRPObject implements DRPObjectBase {
this.vertices = this.hashGraph.getAllVertices();
}

static createObject(options: { peerId: string; id?: string; drp?: DRP }) {
static createObject(options: ConnectObjectOptions) {
const aclObj = new ObjectACL({
admins: new Map(),
permissionless: true,
Expand All @@ -122,6 +133,7 @@ export class DRPObject implements DRPObjectBase {
id: options.id,
acl: aclObj,
drp: options.drp,
metrics: options.metrics,
});
return object;
}
Expand Down
7 changes: 7 additions & 0 deletions packages/object/src/interface.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { IMetrics } from "@ts-drp/tracer";
import { ObjectPb } from "@ts-drp/types";
import { type Vertex_Operation as Operation, Vertex } from "@ts-drp/types";

Expand Down Expand Up @@ -31,3 +32,9 @@ export interface LcaAndOperations {
lca: string;
linearizedOperations: Operation[];
}
export type ConnectObjectOptions = {
peerId: string;
id?: string;
drp?: DRP;
metrics?: IMetrics;
};
107 changes: 58 additions & 49 deletions packages/tracer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import {
} from "@opentelemetry/sdk-trace-web";
import { ATTR_SERVICE_NAME } from "@opentelemetry/semantic-conventions";

import { IMetrics } from "./interface.js";

export { IMetrics };

let enabled = false;
let tracer: OtTracer | undefined;
let provider: WebTracerProvider | undefined;
let exporter: OTLPTraceExporter | undefined;

Expand All @@ -31,21 +34,16 @@ export type EnableTracingOptions = {
};
};

export const enableTracing = (tracerName: string, opts: EnableTracingOptions = {}): void => {
export const enableTracing = (opts: EnableTracingOptions = {}): void => {
enabled = true;
initContextManager();
initProvider(opts.provider);

if (provider) {
tracer = provider.getTracer(tracerName) as OtTracer;
}
};

// disableTracing should reset the tracer, provider, and exporter
// there for testing purposes
export const disableTracing = (): void => {
enabled = false;
tracer = undefined;
provider = undefined;
exporter = undefined;
};
Expand Down Expand Up @@ -179,49 +177,60 @@ function wrapAsyncGenerator<T>(gen: AsyncGenerator<T>, span: Span): AsyncGenerat
return wrapped;
}

export function traceFunc<Args extends unknown[], Return>(
name: string,
fn: (...args: Args) => Return,
setAttributes?: (span: Span, ...args: Args) => void
): (...args: Args) => Return {
return (...args: Args): Return => {
if (!tracer || !enabled) return fn(...args);
const parentContext = context.active();
const span = tracer.startSpan(name, {}, parentContext);

if (setAttributes) {
setAttributes(span, ...args);
}

let result: Return;
const childContext = trace.setSpan(parentContext, span);
try {
result = context.with(childContext, fn, undefined, ...args);
} catch (err) {
const error = err instanceof Error ? err : new Error(String(err));
span.recordException(error);
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.toString(),
});
export class OpentelemetryMetrics implements IMetrics {
private tracer: OtTracer | undefined;

constructor(tracerName: string) {
if (!provider) return;
this.tracer = provider.getTracer(tracerName) as OtTracer;
}

public traceFunc<Args extends unknown[], Return>(
name: string,
fn: (...args: Args) => Return,
setAttributes?: (span: Span, ...args: Args) => void
): (...args: Args) => Return {
return (...args: Args): Return => {
if (!this.tracer || !enabled) {
return fn(...args);
}
const parentContext = context.active();
const span = this.tracer.startSpan(name, {}, parentContext);

if (setAttributes) {
setAttributes(span, ...args);
}

let result: Return;
const childContext = trace.setSpan(parentContext, span);
try {
result = context.with(childContext, fn, undefined, ...args);
} catch (err) {
const error = err instanceof Error ? err : new Error(String(err));
span.recordException(error);
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.toString(),
});
span.end();
throw error;
}

if (isPromise<unknown>(result)) {
return wrapPromise(result, span) as Return;
}
if (isGenerator(result)) {
return wrapGenerator(result, span) as Return;
}
if (isAsyncGenerator(result)) {
return wrapAsyncGenerator(result, span) as Return;
}

span.setStatus({ code: SpanStatusCode.OK });
span.end();
throw error;
}

if (isPromise<unknown>(result)) {
return wrapPromise(result, span) as Return;
}
if (isGenerator(result)) {
return wrapGenerator(result, span) as Return;
}
if (isAsyncGenerator(result)) {
return wrapAsyncGenerator(result, span) as Return;
}

span.setStatus({ code: SpanStatusCode.OK });
span.end();
return result;
};
return result;
};
}
}

const initExporter = (opts: EnableTracingOptions["provider"]): OTLPTraceExporter => {
Expand Down
8 changes: 8 additions & 0 deletions packages/tracer/src/interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
export interface IMetrics {
traceFunc<Args extends unknown[], Return>(
name: string,
fn: (...args: Args) => Return,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
setAttributes?: (span: any, ...args: Args) => void
): (...args: Args) => Return;
}
Loading
Loading