Skip to content

Commit

Permalink
Merge pull request #246 from rainlanguage/2024-11-02-rpc-report
Browse files Browse the repository at this point in the history
update rpc recorder
  • Loading branch information
rouzwelt authored Nov 3, 2024
2 parents 5773d1c + 9772f56 commit 230a276
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 35 deletions.
2 changes: 2 additions & 0 deletions src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -664,10 +664,12 @@ export const main = async (argv: any, version?: string) => {
"request-count": record.req,
"success-count": record.success,
"failure-count": record.failure,
"timeout-count": Object.keys(record.cache).length,
});
record.req = 0;
record.success = 0;
record.failure = 0;
record.cache = {};
span.end();
});
}
Expand Down
81 changes: 76 additions & 5 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,25 @@ import { getSgOrderbooks } from "./sg";
import { WNATIVE } from "sushi/currency";
import { ChainId, ChainKey } from "sushi/chain";
import { DataFetcher, LiquidityProviders } from "sushi/router";
import { BotConfig, BotDataFetcher, ChainConfig, ViemClient } from "./types";
import {
createWalletClient,
BotConfig,
RpcRecord,
ViemClient,
ChainConfig,
isOkRpcError,
isRpcRequest,
isRpcResponse,
BotDataFetcher,
} from "./types";
import {
http,
fallback,
HDAccount,
http,
PrivateKeyAccount,
publicActions,
PublicClient,
publicActions,
walletActions,
PrivateKeyAccount,
createWalletClient,
} from "viem";
import {
STABLES,
Expand Down Expand Up @@ -109,6 +118,68 @@ export async function createViemClient(
}).extend(publicActions) as any as ViemClient);
}

/**
* Keeps record of http fetch requests for a http viem client
*/
export function onFetchRequest(request: Request, rpcRecords: Record<string, RpcRecord>) {
let url = request.url;
if (!request.url.endsWith("/")) url = url + "/";
let record = rpcRecords[url];
if (!record)
record = rpcRecords[url] = {
req: 0,
success: 0,
failure: 0,
cache: [],
};
record.req++;
request
.json()
.then((v) => {
if (isRpcRequest(v)) record.cache[v.id] = {};
else record.req--;
})
.catch(() => {});
}

/**
* Keeps record of http fetch responses for a http viem client
*/
export function onFetchResponse(response: Response, rpcRecords: Record<string, RpcRecord>) {
let url = response.url;
if (!response.url.endsWith("/")) url = url + "/";
let record = rpcRecords[url];
if (!record)
record = rpcRecords[url] = {
req: 0,
success: 0,
failure: 0,
cache: [],
};
if (response.status !== 200) record.failure++;

// for clearing the cache we need to explicitly parse the results even
// if response status was not 200 but still can hold valid rpc obj id
response
.json()
.then((v) => {
if (isRpcResponse(v)) {
delete record.cache[v.id];
if (response.status === 200) {
if ("error" in v) {
if (isOkRpcError(v)) record.success++;
else record.failure++;
} else {
record.success++;
}
}
} else if (response.status === 200) record.failure++;
})
.catch(() => {
if (response.status === 200) record.failure++;
});
}

/**
* Instantiates a DataFetcher
* @param configOrViemClient - The network config data or a viem public client
Expand Down
43 changes: 19 additions & 24 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@ import { getQuery, statusCheckQuery } from "./query";
import { checkSgStatus, handleSgResults } from "./sg";
import { Tracer } from "@opentelemetry/sdk-trace-base";
import { BotConfig, CliOptions, RoundReport, RpcRecord, SgFilter } from "./types";
import { createViemClient, getChainConfig, getDataFetcher } from "./config";
import {
getChainConfig,
getDataFetcher,
onFetchRequest,
onFetchResponse,
createViemClient,
} from "./config";

/**
* Get the order details from a source, i.e array of subgraphs and/or a local json file
Expand Down Expand Up @@ -155,33 +161,22 @@ export async function getConfig(
const config = getChainConfig(chainId) as any as BotConfig;

const rpcRecords: Record<string, RpcRecord> = {};
rpcUrls.forEach((v) => (rpcRecords[v] = { req: 0, success: 0, failure: 0 }));

rpcUrls.forEach(
(v) =>
(rpcRecords[v.endsWith("/") ? v : v + "/"] = {
req: 0,
success: 0,
failure: 0,
cache: {},
}),
);
config.onFetchRequest = (request: Request) => {
const record = rpcRecords[request.url];
if (record) record.req++;
else rpcRecords[request.url] = { req: 1, success: 0, failure: 0 };
onFetchRequest(request, rpcRecords);
};
config.onFetchResponse = (response: Response) => {
const record = rpcRecords[response.url];
if (response.status === 200) {
if (record) record.success++;
else
rpcRecords[response.url] = {
req: 1,
success: 1,
failure: 0,
};
} else {
if (record) record.failure++;
else
rpcRecords[response.url] = {
req: 1,
success: 0,
failure: 1,
};
}
onFetchResponse(response.clone(), rpcRecords);
};

const lps = processLps(options.lps);
const viemClient = await createViemClient(
chainId,
Expand Down
99 changes: 94 additions & 5 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import { DataFetcher, LiquidityProviders } from "sushi/router";
import { ProcessPairHaltReason, ProcessPairReportStatus } from "./processOrders";
import {
Chain,
FallbackTransport,
HDAccount,
PublicActions,
PublicClient,
TestClient,
WalletActions,
WalletClient,
PublicClient,
PublicActions,
WalletActions,
FallbackTransport,
} from "viem";

export type BotError = {
Expand Down Expand Up @@ -223,4 +223,93 @@ export type SgFilter = {
orderbook?: string;
};

export type RpcRecord = { req: number; success: number; failure: number };
export type RpcRecord = {
req: number;
success: number;
failure: number;
cache: Record<number, any>;
};

export type RpcRequest = {
jsonrpc: `${number}`;
method: string;
params?: any | undefined;
id: number;
};

export type RpcResponse<result = any, error = any> = {
jsonrpc: `${number}`;
id: number;
} & (RpcSuccessResult<result> | RpcErrorResult<error>);

export type RpcSuccessResult<result> = {
method?: undefined;
result: result;
error?: undefined;
};

export type RpcErrorResult<error> = {
method?: undefined;
result?: undefined;
error: error;
};

export const RpcErrorCode = [
-32700, // Parse error
-32600, // Invalid request
-32601, // Method not found
-32602, // Invalid params
-32603, // Internal error
-32000, // Invalid input
-32001, // Resource not found
-32002, // Resource unavailable
-32003, // Transaction rejected
-32004, // Method not supported
-32005, // Limit exceeded
-32006, // JSON-RPC version not supported
-32042, // Method not found
] as const;

export const ProviderRpcErrorCode = [
4001, // User Rejected Request
4100, // Unauthorized
4200, // Unsupported Method
4900, // Disconnected
4901, // Chain Disconnected
4902, // Chain Not Recognized
] as const;

export function isOkRpcError(v: any): boolean {
if ("error" in v && "code" in v.error) {
const code = v.error.code;
if (typeof code === "number") {
return [...RpcErrorCode, ...ProviderRpcErrorCode].includes(code as any);
} else if (typeof code === "string" && /^-?[0-9]+$/.test(code)) {
return [...RpcErrorCode, ...ProviderRpcErrorCode].includes(Number(code) as any);
} else return false;
} else return false;
}

export function isRpcRequest(v: any): v is RpcRequest {
if (
typeof v === "object" &&
v !== null &&
"jsonrpc" in v &&
"id" in v &&
typeof v.id === "number"
)
return true;
else return false;
}

export function isRpcResponse(v: any): v is RpcResponse {
if (
typeof v === "object" &&
v !== null &&
"jsonrpc" in v &&
"id" in v &&
typeof v.id === "number"
)
return true;
else return false;
}
5 changes: 4 additions & 1 deletion test/cli.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const { trace, context } = require("@opentelemetry/api");
const { Resource } = require("@opentelemetry/resources");
const { BasicTracerProvider } = require("@opentelemetry/sdk-trace-base");
const { SEMRESATTRS_SERVICE_NAME } = require("@opentelemetry/semantic-conventions");
const { sleep } = require("../src/utils");

describe("Test cli", async function () {
beforeEach(() => mockServer.start(8080));
Expand Down Expand Up @@ -193,17 +194,19 @@ describe("Test cli", async function () {
arbAddress: `0x${"1".repeat(40)}`,
route: "single",
rpcRecords: {
"https://rpc.ankr.com/polygon": {
"https://rpc.ankr.com/polygon/": {
req: 1,
success: 1,
failure: 0,
cache: {},
},
},
},
options: {
botMinBalance: "0.123",
},
};
await sleep(1000);
assert.equal(result.roundGap, expected.roundGap);
assert.equal(result.poolUpdateInterval, expected.poolUpdateInterval);
assert.equal(result.config.chain.id, expected.config.chain.id);
Expand Down

0 comments on commit 230a276

Please sign in to comment.