Skip to content

Commit

Permalink
fix: Missing low wallet balance webhook (#737)
Browse files Browse the repository at this point in the history
* fix: Missing low wallet balance webhook

* undo file

* fix build

* fix throttle logic
  • Loading branch information
arcoraven authored Oct 30, 2024
1 parent 63a3db2 commit 8c12183
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 80 deletions.
4 changes: 2 additions & 2 deletions src/schema/webhooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ export enum WebhooksEventTypes {
CONTRACT_SUBSCRIPTION = "contract_subscription",
}

export interface WalletBalanceWebhookSchema {
export type BackendWalletBalanceWebhookParams = {
walletAddress: string;
minimumBalance: string;
currentBalance: string;
chainId: number;
message: string;
}
};
79 changes: 8 additions & 71 deletions src/utils/webhook.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
import { Webhooks } from "@prisma/client";
import crypto from "crypto";
import {
WalletBalanceWebhookSchema,
WebhooksEventTypes,
} from "../schema/webhooks";
import { getWebhooksByEventType } from "./cache/getWebhook";
import { logger } from "./logger";

let balanceNotificationLastSentAt = -1;
import type { Webhooks } from "@prisma/client";
import crypto from "node:crypto";
import { prettifyError } from "./error";

export const generateSignature = (
body: Record<string, any>,
body: Record<string, unknown>,
timestamp: string,
secret: string,
): string => {
Expand All @@ -21,7 +14,7 @@ export const generateSignature = (

export const createWebhookRequestHeaders = async (
webhook: Webhooks,
body: Record<string, any>,
body: Record<string, unknown>,
): Promise<HeadersInit> => {
const headers: {
Accept: string;
Expand Down Expand Up @@ -54,7 +47,7 @@ export interface WebhookResponse {

export const sendWebhookRequest = async (
webhook: Webhooks,
body: Record<string, any>,
body: Record<string, unknown>,
): Promise<WebhookResponse> => {
try {
const headers = await createWebhookRequestHeaders(webhook, body);
Expand All @@ -69,67 +62,11 @@ export const sendWebhookRequest = async (
status: resp.status,
body: await resp.text(),
};
} catch (e: any) {
} catch (e) {
return {
ok: false,
status: 500,
body: e.toString(),
body: prettifyError(e),
};
}
};

// TODO: Add retry logic upto
export const sendBalanceWebhook = async (
data: WalletBalanceWebhookSchema,
): Promise<void> => {
try {
const elaspsedTime = Date.now() - balanceNotificationLastSentAt;
if (elaspsedTime < 30000) {
logger({
service: "server",
level: "warn",
message: `[sendBalanceWebhook] Low wallet balance notification sent within last 30 Seconds. Skipping.`,
});
return;
}

const webhooks = await getWebhooksByEventType(
WebhooksEventTypes.BACKEND_WALLET_BALANCE,
);

if (webhooks.length === 0) {
logger({
service: "server",
level: "debug",
message: "No webhook set, skipping webhook send",
});

return;
}

webhooks.map(async (config) => {
if (!config || config.revokedAt) {
logger({
service: "server",
level: "debug",
message: "No webhook set or active, skipping webhook send",
});

return;
}

const success = await sendWebhookRequest(config, data);

if (success) {
balanceNotificationLastSentAt = Date.now();
}
});
} catch (error) {
logger({
service: "server",
level: "error",
message: `Failed to send balance webhook`,
error,
});
}
};
36 changes: 32 additions & 4 deletions src/worker/queues/sendWebhookQueue.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import {
import type {
ContractEventLogs,
ContractTransactionReceipts,
Webhooks,
} from "@prisma/client";
import { Queue } from "bullmq";
import SuperJSON from "superjson";
import { WebhooksEventTypes } from "../../schema/webhooks";
import {
WebhooksEventTypes,
type BackendWalletBalanceWebhookParams,
} from "../../schema/webhooks";
import { getWebhooksByEventType } from "../../utils/cache/getWebhook";
import { logger } from "../../utils/logger";
import { redis } from "../../utils/redis/redis";
Expand All @@ -27,10 +30,16 @@ export type EnqueueTransactionWebhookData = {
queueId: string;
};

export type EnqueueLowBalanceWebhookData = {
type: WebhooksEventTypes.BACKEND_WALLET_BALANCE;
body: BackendWalletBalanceWebhookParams;
};

// Add other webhook event types here.
type EnqueueWebhookData =
| EnqueueContractSubscriptionWebhookData
| EnqueueTransactionWebhookData;
| EnqueueTransactionWebhookData
| EnqueueLowBalanceWebhookData;

export interface WebhookJob {
data: EnqueueWebhookData;
Expand All @@ -56,6 +65,8 @@ export class SendWebhookQueue {
case WebhooksEventTypes.ERRORED_TX:
case WebhooksEventTypes.CANCELLED_TX:
return this._enqueueTransactionWebhook(data);
case WebhooksEventTypes.BACKEND_WALLET_BALANCE:
return this._enqueueBackendWalletBalanceWebhook(data);
default:
logger({
service: "worker",
Expand Down Expand Up @@ -105,7 +116,8 @@ export class SendWebhookQueue {

if (eventLog) {
return `${webhook.url}.${eventLog.transactionHash}.${eventLog.logIndex}`;
} else if (transactionReceipt) {
}
if (transactionReceipt) {
return `${webhook.url}.${transactionReceipt.transactionHash}`;
}
throw 'Must provide "eventLog" or "transactionReceipt".';
Expand Down Expand Up @@ -140,4 +152,20 @@ export class SendWebhookQueue {
eventType: WebhooksEventTypes;
queueId: string;
}) => `${args.webhook.url}.${args.eventType}.${args.queueId}`;

private static _enqueueBackendWalletBalanceWebhook = async (
data: EnqueueLowBalanceWebhookData,
) => {
const webhooks = await getWebhooksByEventType(
WebhooksEventTypes.BACKEND_WALLET_BALANCE,
);
for (const webhook of webhooks) {
const job: WebhookJob = { data, webhook };
const serialized = SuperJSON.stringify(job);
await this.q.add(
`${data.type}:${data.body.chainId}:${data.body.walletAddress}`,
serialized,
);
}
};
}
66 changes: 66 additions & 0 deletions src/worker/tasks/mineTransactionWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,26 @@ import { Worker, type Job, type Processor } from "bullmq";
import assert from "node:assert";
import superjson from "superjson";
import {
eth_getBalance,
eth_getTransactionByHash,
eth_getTransactionReceipt,
getAddress,
getRpcClient,
toTokens,
type Address,
} from "thirdweb";
import { stringify } from "thirdweb/utils";
import { getUserOpReceipt, getUserOpReceiptRaw } from "thirdweb/wallets/smart";
import { TransactionDB } from "../../db/transactions/db";
import { recycleNonce, removeSentNonce } from "../../db/wallets/walletNonce";
import { WebhooksEventTypes } from "../../schema/webhooks";
import { getBlockNumberish } from "../../utils/block";
import { getConfig } from "../../utils/cache/getConfig";
import { getWebhooksByEventType } from "../../utils/cache/getWebhook";
import { getChain } from "../../utils/chain";
import { msSince } from "../../utils/date";
import { env } from "../../utils/env";
import { prettifyError } from "../../utils/error";
import { logger } from "../../utils/logger";
import { recordMetrics } from "../../utils/prometheus";
import { redis } from "../../utils/redis/redis";
Expand All @@ -33,6 +38,7 @@ import {
type MineTransactionData,
} from "../queues/mineTransactionQueue";
import { SendTransactionQueue } from "../queues/sendTransactionQueue";
import { SendWebhookQueue } from "../queues/sendWebhookQueue";

/**
* Check if the submitted transaction or userOp is mined onchain.
Expand Down Expand Up @@ -66,6 +72,7 @@ const handler: Processor<any, void, string> = async (job: Job<string>) => {
if (resultTransaction.status === "mined") {
await TransactionDB.set(resultTransaction);
await enqueueTransactionWebhook(resultTransaction);
await _notifyIfLowBalance(resultTransaction);
await _reportUsageSuccess(resultTransaction);
recordMetrics({
event: "transaction_mined",
Expand Down Expand Up @@ -257,6 +264,65 @@ const _mineUserOp = async (
};
};

const _notifyIfLowBalance = async (transaction: MinedTransaction) => {
const { isUserOp, chainId, from } = transaction;
if (isUserOp) {
// Skip for userOps since they may not use the wallet's gas balance.
return;
}

try {
const webhooks = await getWebhooksByEventType(
WebhooksEventTypes.BACKEND_WALLET_BALANCE,
);
if (webhooks.length === 0) {
// Skip if no webhooks configured.
return;
}

// Set a key with 5min TTL if it doesn't exist.
// This effectively throttles this check once every 5min.
const throttleKey = `webhook:${WebhooksEventTypes.BACKEND_WALLET_BALANCE}:${chainId}:${from}`;
const isThrottled =
(await redis.set(throttleKey, "", "EX", 5 * 60, "NX")) === null;
if (isThrottled) {
return;
}

// Get the current wallet balance.
const rpcRequest = getRpcClient({
client: thirdwebClient,
chain: await getChain(chainId),
});
const currentBalance = await eth_getBalance(rpcRequest, {
address: from,
});

const config = await getConfig();
if (currentBalance >= BigInt(config.minWalletBalance)) {
// Skip if the balance is above the alert threshold.
return;
}

await SendWebhookQueue.enqueueWebhook({
type: WebhooksEventTypes.BACKEND_WALLET_BALANCE,
body: {
chainId,
walletAddress: from,
minimumBalance: config.minWalletBalance,
currentBalance: currentBalance.toString(),
message: `LowBalance: The backend wallet ${from} on chain ${chainId} has ${toTokens(currentBalance, 18)} gas remaining.`,
},
});
} catch (e) {
logger({
level: "warn",
message: `[mineTransactionWorker] Error sending low balance notification: ${prettifyError(e)}`,
service: "worker",
});
}
};

// Must be explicitly called for the worker to run on this host.
export const initMineTransactionWorker = () => {
const _worker = new Worker(MineTransactionQueue.q.name, handler, {
Expand Down
14 changes: 11 additions & 3 deletions src/worker/tasks/sendWebhookWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ import type { Static } from "@sinclair/typebox";
import { Worker, type Job, type Processor } from "bullmq";
import superjson from "superjson";
import { TransactionDB } from "../../db/transactions/db";
import { WebhooksEventTypes } from "../../schema/webhooks";
import {
WebhooksEventTypes,
type BackendWalletBalanceWebhookParams,
} from "../../schema/webhooks";
import { toEventLogSchema } from "../../server/schemas/eventLog";
import {
toTransactionSchema,
Expand Down Expand Up @@ -57,10 +60,15 @@ const handler: Processor<any, void, string> = async (job: Job<string>) => {
resp = await sendWebhookRequest(webhook, webhookBody);
break;
}

case WebhooksEventTypes.BACKEND_WALLET_BALANCE: {
const webhookBody: BackendWalletBalanceWebhookParams = data.body;
resp = await sendWebhookRequest(webhook, webhookBody);
break;
}
}

const shouldRetry = resp && resp.status >= 500 && resp.status <= 599;
if (shouldRetry) {
if (resp && resp.status >= 500) {
// Throw on 5xx so it remains in the queue to retry later.
throw new Error(
`Received status ${resp.status} from webhook ${webhook.url}.`,
Expand Down

0 comments on commit 8c12183

Please sign in to comment.