From 503f37e26d5e9988c7164bd4ee64f77ed5c260fc Mon Sep 17 00:00:00 2001 From: Prithvish Baidya Date: Tue, 18 Feb 2025 09:56:09 +0530 Subject: [PATCH] feat: Add Balance Subscriptions feature (#849) * feat: Add Balance Subscriptions feature * remove bad logs * feat: Enhance Balance Subscriptions webhook handling - Add support for creating new webhooks with optional labels - Allow using existing webhooks by ID - Validate webhook event type and revocation status - Improve error handling for webhook creation and selection * refactor: Simplify balance retrieval using thirdweb SDK - Replace manual balance fetching with `getWalletBalance` method - Support both native token and ERC20 token balance retrieval - Remove redundant contract and RPC client initialization code * generate SDK * refactor: Rename contractAddress to tokenAddress in Balance Subscriptions - Update Prisma schema, migration, and database indexes - Modify TypeScript interfaces and schemas across services and routes - Ensure consistent naming for token-related address fields - Update worker and database interaction methods * change to wallet subscriptions pattern * addressed review comments --- sdk/src/Engine.ts | 3 + sdk/src/index.ts | 1 + sdk/src/services/WalletCredentialsService.ts | 53 ++- .../services/WalletSubscriptionsService.ts | 323 ++++++++++++++++++ sdk/src/services/WebhooksService.ts | 4 +- .../migration.sql | 25 ++ src/prisma/schema.prisma | 22 ++ src/scripts/generate-sdk.ts | 4 +- src/server/middleware/admin-routes.ts | 2 + .../configuration/wallet-subscriptions/get.ts | 42 +++ .../wallet-subscriptions/update.ts | 64 ++++ src/server/routes/index.ts | 10 + src/server/routes/system/health.ts | 5 +- src/server/routes/wallet-subscriptions/add.ts | 122 +++++++ .../routes/wallet-subscriptions/delete.ts | 50 +++ .../routes/wallet-subscriptions/get-all.ts | 47 +++ .../routes/wallet-subscriptions/update.ts | 81 +++++ src/server/schemas/wallet-subscription.ts | 48 +++ .../create-wallet-subscription.ts | 55 +++ .../delete-wallet-subscription.ts | 16 + .../get-all-wallet-subscriptions.ts | 27 ++ .../update-wallet-subscription.ts | 53 +++ .../schemas/wallet-subscription-conditions.ts | 53 +++ src/shared/schemas/webhooks.ts | 10 + src/worker/index.ts | 3 +- src/worker/queues/send-webhook-queue.ts | 26 +- .../queues/wallet-subscription-queue.ts | 14 + src/worker/tasks/send-webhook-worker.ts | 15 +- .../tasks/wallet-subscription-worker.ts | 155 +++++++++ .../wallet-subscription-worker.test.ts | 222 ++++++++++++ 30 files changed, 1545 insertions(+), 10 deletions(-) create mode 100644 sdk/src/services/WalletSubscriptionsService.ts create mode 100644 src/prisma/migrations/20250212235511_wallet_subscriptions/migration.sql create mode 100644 src/server/routes/configuration/wallet-subscriptions/get.ts create mode 100644 src/server/routes/configuration/wallet-subscriptions/update.ts create mode 100644 src/server/routes/wallet-subscriptions/add.ts create mode 100644 src/server/routes/wallet-subscriptions/delete.ts create mode 100644 src/server/routes/wallet-subscriptions/get-all.ts create mode 100644 src/server/routes/wallet-subscriptions/update.ts create mode 100644 src/server/schemas/wallet-subscription.ts create mode 100644 src/shared/db/wallet-subscriptions/create-wallet-subscription.ts create mode 100644 src/shared/db/wallet-subscriptions/delete-wallet-subscription.ts create mode 100644 src/shared/db/wallet-subscriptions/get-all-wallet-subscriptions.ts create mode 100644 src/shared/db/wallet-subscriptions/update-wallet-subscription.ts create mode 100644 src/shared/schemas/wallet-subscription-conditions.ts create mode 100644 src/worker/queues/wallet-subscription-queue.ts create mode 100644 src/worker/tasks/wallet-subscription-worker.ts create mode 100644 tests/e2e/tests/workers/wallet-subscription-worker.test.ts diff --git a/sdk/src/Engine.ts b/sdk/src/Engine.ts index 72dd69d95..dba4af76a 100644 --- a/sdk/src/Engine.ts +++ b/sdk/src/Engine.ts @@ -31,6 +31,7 @@ import { PermissionsService } from './services/PermissionsService'; import { RelayerService } from './services/RelayerService'; import { TransactionService } from './services/TransactionService'; import { WalletCredentialsService } from './services/WalletCredentialsService'; +import { WalletSubscriptionsService } from './services/WalletSubscriptionsService'; import { WebhooksService } from './services/WebhooksService'; type HttpRequestConstructor = new (config: OpenAPIConfig) => BaseHttpRequest; @@ -62,6 +63,7 @@ class EngineLogic { public readonly relayer: RelayerService; public readonly transaction: TransactionService; public readonly walletCredentials: WalletCredentialsService; + public readonly walletSubscriptions: WalletSubscriptionsService; public readonly webhooks: WebhooksService; public readonly request: BaseHttpRequest; @@ -104,6 +106,7 @@ class EngineLogic { this.relayer = new RelayerService(this.request); this.transaction = new TransactionService(this.request); this.walletCredentials = new WalletCredentialsService(this.request); + this.walletSubscriptions = new WalletSubscriptionsService(this.request); this.webhooks = new WebhooksService(this.request); } } diff --git a/sdk/src/index.ts b/sdk/src/index.ts index 9a6e7cda8..992231be9 100644 --- a/sdk/src/index.ts +++ b/sdk/src/index.ts @@ -35,4 +35,5 @@ export { PermissionsService } from './services/PermissionsService'; export { RelayerService } from './services/RelayerService'; export { TransactionService } from './services/TransactionService'; export { WalletCredentialsService } from './services/WalletCredentialsService'; +export { WalletSubscriptionsService } from './services/WalletSubscriptionsService'; export { WebhooksService } from './services/WebhooksService'; diff --git a/sdk/src/services/WalletCredentialsService.ts b/sdk/src/services/WalletCredentialsService.ts index 1ac32e4ab..bb973fc73 100644 --- a/sdk/src/services/WalletCredentialsService.ts +++ b/sdk/src/services/WalletCredentialsService.ts @@ -21,9 +21,9 @@ export class WalletCredentialsService { label: string; type: 'circle'; /** - * 32-byte hex string. If not provided, a random one will be generated. + * 32-byte hex string. Consult https://developers.circle.com/w3s/entity-secret-management to create and register an entity secret. */ - entitySecret?: string; + entitySecret: string; /** * Whether this credential should be set as the default for its type. Only one credential can be default per type. */ @@ -102,7 +102,7 @@ export class WalletCredentialsService { id: string; type: string; label: (string | null); - isDefault: boolean; + isDefault: (boolean | null); createdAt: string; updatedAt: string; deletedAt: (string | null); @@ -122,4 +122,51 @@ export class WalletCredentialsService { }); } + /** + * Update wallet credential + * Update a wallet credential's label, default status, and entity secret. + * @param id The ID of the wallet credential to update. + * @param requestBody + * @returns any Default Response + * @throws ApiError + */ + public updateWalletCredential( + id: string, + requestBody?: { + label?: string; + /** + * Whether this credential should be set as the default for its type. Only one credential can be default per type. + */ + isDefault?: boolean; + /** + * 32-byte hex string. Consult https://developers.circle.com/w3s/entity-secret-management to create and register an entity secret. + */ + entitySecret?: string; + }, + ): CancelablePromise<{ + result: { + id: string; + type: string; + label: (string | null); + isDefault: (boolean | null); + createdAt: string; + updatedAt: string; + }; + }> { + return this.httpRequest.request({ + method: 'PUT', + url: '/wallet-credentials/{id}', + path: { + 'id': id, + }, + body: requestBody, + mediaType: 'application/json', + errors: { + 400: `Bad Request`, + 404: `Not Found`, + 500: `Internal Server Error`, + }, + }); + } + } diff --git a/sdk/src/services/WalletSubscriptionsService.ts b/sdk/src/services/WalletSubscriptionsService.ts new file mode 100644 index 000000000..b699eb887 --- /dev/null +++ b/sdk/src/services/WalletSubscriptionsService.ts @@ -0,0 +1,323 @@ +/* generated using openapi-typescript-codegen -- do no edit */ +/* istanbul ignore file */ +/* tslint:disable */ +/* eslint-disable */ +import type { CancelablePromise } from '../core/CancelablePromise'; +import type { BaseHttpRequest } from '../core/BaseHttpRequest'; + +export class WalletSubscriptionsService { + + constructor(public readonly httpRequest: BaseHttpRequest) {} + + /** + * Get wallet subscriptions + * Get all wallet subscriptions. + * @param page Specify the page number. + * @param limit Specify the number of results to return per page. + * @returns any Default Response + * @throws ApiError + */ + public getAllWalletSubscriptions( + page: number = 1, + limit: number = 100, + ): CancelablePromise<{ + result: Array<{ + id: string; + /** + * The chain ID of the subscription. + */ + chainId: string; + /** + * A contract or wallet address + */ + walletAddress: string; + /** + * Array of conditions to monitor for this wallet + */ + conditions: Array<({ + type: 'token_balance_lt'; + tokenAddress: (string | 'native'); + /** + * The threshold value in wei + */ + value: string; + } | { + type: 'token_balance_gt'; + tokenAddress: (string | 'native'); + /** + * The threshold value in wei + */ + value: string; + })>; + webhook?: { + url: string; + }; + createdAt: string; + updatedAt: string; + }>; + }> { + return this.httpRequest.request({ + method: 'GET', + url: '/wallet-subscriptions/get-all', + path: { + 'page': page, + 'limit': limit, + }, + errors: { + 400: `Bad Request`, + 404: `Not Found`, + 500: `Internal Server Error`, + }, + }); + } + + /** + * Add wallet subscription + * Subscribe to wallet conditions. + * @param requestBody + * @returns any Default Response + * @throws ApiError + */ + public addWalletSubscription( + requestBody?: ({ + /** + * A chain ID ("137") or slug ("polygon-amoy-testnet"). Chain ID is preferred. + */ + chain: string; + /** + * A contract or wallet address + */ + walletAddress: string; + /** + * Array of conditions to monitor for this wallet + */ + conditions: Array<({ + type: 'token_balance_lt'; + tokenAddress: (string | 'native'); + /** + * The threshold value in wei + */ + value: string; + } | { + type: 'token_balance_gt'; + tokenAddress: (string | 'native'); + /** + * The threshold value in wei + */ + value: string; + })>; + } & ({ + /** + * Webhook URL to create a new webhook + */ + webhookUrl: string; + /** + * Optional label for the webhook when creating a new one + */ + webhookLabel?: string; + } | { + /** + * ID of an existing webhook to use + */ + webhookId: number; + })), + ): CancelablePromise<{ + result: { + id: string; + /** + * The chain ID of the subscription. + */ + chainId: string; + /** + * A contract or wallet address + */ + walletAddress: string; + /** + * Array of conditions to monitor for this wallet + */ + conditions: Array<({ + type: 'token_balance_lt'; + tokenAddress: (string | 'native'); + /** + * The threshold value in wei + */ + value: string; + } | { + type: 'token_balance_gt'; + tokenAddress: (string | 'native'); + /** + * The threshold value in wei + */ + value: string; + })>; + webhook?: { + url: string; + }; + createdAt: string; + updatedAt: string; + }; + }> { + return this.httpRequest.request({ + method: 'POST', + url: '/wallet-subscriptions', + body: requestBody, + mediaType: 'application/json', + errors: { + 400: `Bad Request`, + 404: `Not Found`, + 500: `Internal Server Error`, + }, + }); + } + + /** + * Update wallet subscription + * Update an existing wallet subscription. + * @param subscriptionId The ID of the wallet subscription to update. + * @param requestBody + * @returns any Default Response + * @throws ApiError + */ + public updateWalletSubscription( + subscriptionId: string, + requestBody?: { + /** + * A chain ID ("137") or slug ("polygon-amoy-testnet"). Chain ID is preferred. + */ + chain?: string; + /** + * A contract or wallet address + */ + walletAddress?: string; + /** + * Array of conditions to monitor for this wallet + */ + conditions?: Array<({ + type: 'token_balance_lt'; + tokenAddress: (string | 'native'); + /** + * The threshold value in wei + */ + value: string; + } | { + type: 'token_balance_gt'; + tokenAddress: (string | 'native'); + /** + * The threshold value in wei + */ + value: string; + })>; + webhookId?: (number | null); + }, + ): CancelablePromise<{ + result: { + id: string; + /** + * The chain ID of the subscription. + */ + chainId: string; + /** + * A contract or wallet address + */ + walletAddress: string; + /** + * Array of conditions to monitor for this wallet + */ + conditions: Array<({ + type: 'token_balance_lt'; + tokenAddress: (string | 'native'); + /** + * The threshold value in wei + */ + value: string; + } | { + type: 'token_balance_gt'; + tokenAddress: (string | 'native'); + /** + * The threshold value in wei + */ + value: string; + })>; + webhook?: { + url: string; + }; + createdAt: string; + updatedAt: string; + }; + }> { + return this.httpRequest.request({ + method: 'POST', + url: '/wallet-subscriptions/{subscriptionId}', + path: { + 'subscriptionId': subscriptionId, + }, + body: requestBody, + mediaType: 'application/json', + errors: { + 400: `Bad Request`, + 404: `Not Found`, + 500: `Internal Server Error`, + }, + }); + } + + /** + * Delete wallet subscription + * Delete an existing wallet subscription. + * @param subscriptionId The ID of the wallet subscription to update. + * @returns any Default Response + * @throws ApiError + */ + public deleteWalletSubscription( + subscriptionId: string, + ): CancelablePromise<{ + result: { + id: string; + /** + * The chain ID of the subscription. + */ + chainId: string; + /** + * A contract or wallet address + */ + walletAddress: string; + /** + * Array of conditions to monitor for this wallet + */ + conditions: Array<({ + type: 'token_balance_lt'; + tokenAddress: (string | 'native'); + /** + * The threshold value in wei + */ + value: string; + } | { + type: 'token_balance_gt'; + tokenAddress: (string | 'native'); + /** + * The threshold value in wei + */ + value: string; + })>; + webhook?: { + url: string; + }; + createdAt: string; + updatedAt: string; + }; + }> { + return this.httpRequest.request({ + method: 'DELETE', + url: '/wallet-subscriptions/{subscriptionId}', + path: { + 'subscriptionId': subscriptionId, + }, + errors: { + 400: `Bad Request`, + 404: `Not Found`, + 500: `Internal Server Error`, + }, + }); + } + +} diff --git a/sdk/src/services/WebhooksService.ts b/sdk/src/services/WebhooksService.ts index 5e6a0b90a..d06835dbd 100644 --- a/sdk/src/services/WebhooksService.ts +++ b/sdk/src/services/WebhooksService.ts @@ -51,7 +51,7 @@ export class WebhooksService { */ url: string; name?: string; - eventType: ('queued_transaction' | 'sent_transaction' | 'mined_transaction' | 'errored_transaction' | 'cancelled_transaction' | 'all_transactions' | 'backend_wallet_balance' | 'auth' | 'contract_subscription'); + eventType: ('queued_transaction' | 'sent_transaction' | 'mined_transaction' | 'errored_transaction' | 'cancelled_transaction' | 'all_transactions' | 'backend_wallet_balance' | 'auth' | 'contract_subscription' | 'wallet_subscription'); }, ): CancelablePromise<{ result: { @@ -113,7 +113,7 @@ export class WebhooksService { * @throws ApiError */ public getEventTypes(): CancelablePromise<{ - result: Array<('queued_transaction' | 'sent_transaction' | 'mined_transaction' | 'errored_transaction' | 'cancelled_transaction' | 'all_transactions' | 'backend_wallet_balance' | 'auth' | 'contract_subscription')>; + result: Array<('queued_transaction' | 'sent_transaction' | 'mined_transaction' | 'errored_transaction' | 'cancelled_transaction' | 'all_transactions' | 'backend_wallet_balance' | 'auth' | 'contract_subscription' | 'wallet_subscription')>; }> { return this.httpRequest.request({ method: 'GET', diff --git a/src/prisma/migrations/20250212235511_wallet_subscriptions/migration.sql b/src/prisma/migrations/20250212235511_wallet_subscriptions/migration.sql new file mode 100644 index 000000000..92fdb940e --- /dev/null +++ b/src/prisma/migrations/20250212235511_wallet_subscriptions/migration.sql @@ -0,0 +1,25 @@ +-- AlterTable +ALTER TABLE "configuration" ADD COLUMN "walletSubscriptionsCronSchedule" TEXT; + +-- CreateTable +CREATE TABLE "wallet_subscriptions" ( + "id" TEXT NOT NULL, + "chainId" TEXT NOT NULL, + "walletAddress" TEXT NOT NULL, + "conditions" JSONB[], + "webhookId" INTEGER, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + "deletedAt" TIMESTAMP(3), + + CONSTRAINT "wallet_subscriptions_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "wallet_subscriptions_chainId_idx" ON "wallet_subscriptions"("chainId"); + +-- CreateIndex +CREATE INDEX "wallet_subscriptions_walletAddress_idx" ON "wallet_subscriptions"("walletAddress"); + +-- AddForeignKey +ALTER TABLE "wallet_subscriptions" ADD CONSTRAINT "wallet_subscriptions_webhookId_fkey" FOREIGN KEY ("webhookId") REFERENCES "webhooks"("id") ON DELETE SET NULL ON UPDATE CASCADE; diff --git a/src/prisma/schema.prisma b/src/prisma/schema.prisma index 1c4a473c4..602edf6a4 100644 --- a/src/prisma/schema.prisma +++ b/src/prisma/schema.prisma @@ -29,6 +29,8 @@ model Configuration { cursorDelaySeconds Int @default(2) @map("cursorDelaySeconds") contractSubscriptionsRetryDelaySeconds String @default("10") @map("contractSubscriptionsRetryDelaySeconds") + walletSubscriptionsCronSchedule String? @map("walletSubscriptionsCronSchedule") + // Wallet provider specific configurations, non-credential walletProviderConfigs Json @default("{}") @map("walletProviderConfigs") /// Eg: { "aws": { "defaultAwsRegion": "us-east-1" }, "gcp": { "defaultGcpKmsLocationId": "us-east1-b" } } @@ -221,6 +223,7 @@ model Webhooks { updatedAt DateTime @updatedAt @map("updatedAt") revokedAt DateTime? @map("revokedAt") ContractSubscriptions ContractSubscriptions[] + WalletSubscriptions WalletSubscriptions[] @@map("webhooks") } @@ -286,6 +289,25 @@ model ContractEventLogs { @@map("contract_event_logs") } +model WalletSubscriptions { + id String @id @default(uuid()) + chainId String + walletAddress String + + conditions Json[] // Array of condition objects with discriminated union type + + webhookId Int? + webhook Webhooks? @relation(fields: [webhookId], references: [id], onDelete: SetNull) + + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + deletedAt DateTime? + + @@index([chainId]) + @@index([walletAddress]) + @@map("wallet_subscriptions") +} + model ContractTransactionReceipts { chainId String blockNumber Int diff --git a/src/scripts/generate-sdk.ts b/src/scripts/generate-sdk.ts index 759dd8f00..13c3ca5fb 100644 --- a/src/scripts/generate-sdk.ts +++ b/src/scripts/generate-sdk.ts @@ -142,7 +142,9 @@ export class Engine extends EngineLogic { const ercServices: string[] = ["erc20", "erc721", "erc1155"]; for (const tag of ercServices) { - const fileName = `${tag.charAt(0).toUpperCase() + tag.slice(1)}Service.ts`; + const fileName = `${ + tag.charAt(0).toUpperCase() + tag.slice(1) + }Service.ts`; const filePath = path.join(servicesDir, fileName); const originalCode = fs.readFileSync(filePath, "utf-8"); diff --git a/src/server/middleware/admin-routes.ts b/src/server/middleware/admin-routes.ts index f503a5b9b..4fd700ab2 100644 --- a/src/server/middleware/admin-routes.ts +++ b/src/server/middleware/admin-routes.ts @@ -15,6 +15,7 @@ import { ProcessTransactionReceiptsQueue } from "../../worker/queues/process-tra import { PruneTransactionsQueue } from "../../worker/queues/prune-transactions-queue"; import { SendTransactionQueue } from "../../worker/queues/send-transaction-queue"; import { SendWebhookQueue } from "../../worker/queues/send-webhook-queue"; +import { WalletSubscriptionQueue } from "../../worker/queues/wallet-subscription-queue"; export const ADMIN_QUEUES_BASEPATH = "/admin/queues"; const ADMIN_ROUTES_USERNAME = "admin"; @@ -31,6 +32,7 @@ const QUEUES: Queue[] = [ PruneTransactionsQueue.q, NonceResyncQueue.q, NonceHealthCheckQueue.q, + WalletSubscriptionQueue.q, ]; export const withAdminRoutes = async (fastify: FastifyInstance) => { diff --git a/src/server/routes/configuration/wallet-subscriptions/get.ts b/src/server/routes/configuration/wallet-subscriptions/get.ts new file mode 100644 index 000000000..19d1932a5 --- /dev/null +++ b/src/server/routes/configuration/wallet-subscriptions/get.ts @@ -0,0 +1,42 @@ +import { type Static, Type } from "@sinclair/typebox"; +import type { FastifyInstance } from "fastify"; +import { StatusCodes } from "http-status-codes"; +import { getConfig } from "../../../../shared/utils/cache/get-config"; +import { standardResponseSchema } from "../../../schemas/shared-api-schemas"; + +const responseBodySchema = Type.Object({ + result: Type.Object({ + walletSubscriptionsCronSchedule: Type.String(), + }), +}); + +export async function getWalletSubscriptionsConfiguration( + fastify: FastifyInstance, +) { + fastify.route<{ + Reply: Static; + }>({ + method: "GET", + url: "/configuration/wallet-subscriptions", + schema: { + summary: "Get wallet subscriptions configuration", + description: + "Get wallet subscriptions configuration including cron schedule", + tags: ["Configuration"], + operationId: "getWalletSubscriptionsConfiguration", + response: { + ...standardResponseSchema, + [StatusCodes.OK]: responseBodySchema, + }, + }, + handler: async (_req, res) => { + const config = await getConfig(false); + res.status(StatusCodes.OK).send({ + result: { + walletSubscriptionsCronSchedule: + config.walletSubscriptionsCronSchedule || "*/30 * * * * *", + }, + }); + }, + }); +} diff --git a/src/server/routes/configuration/wallet-subscriptions/update.ts b/src/server/routes/configuration/wallet-subscriptions/update.ts new file mode 100644 index 000000000..94f358130 --- /dev/null +++ b/src/server/routes/configuration/wallet-subscriptions/update.ts @@ -0,0 +1,64 @@ +import { type Static, Type } from "@sinclair/typebox"; +import type { FastifyInstance } from "fastify"; +import { StatusCodes } from "http-status-codes"; +import { updateConfiguration } from "../../../../shared/db/configuration/update-configuration"; +import { getConfig } from "../../../../shared/utils/cache/get-config"; +import { isValidCron } from "../../../../shared/utils/cron/is-valid-cron"; +import { createCustomError } from "../../../middleware/error"; +import { standardResponseSchema } from "../../../schemas/shared-api-schemas"; + +const requestBodySchema = Type.Object({ + walletSubscriptionsCronSchedule: Type.String({ + description: + "Cron expression for wallet subscription checks. It should be in the format of 'ss mm hh * * *' where ss is seconds, mm is minutes and hh is hours. Seconds should not be '*' or less than 10", + default: "*/30 * * * * *", + }), +}); + +const responseBodySchema = Type.Object({ + result: Type.Object({ + walletSubscriptionsCronSchedule: Type.String(), + }), +}); + +export async function updateWalletSubscriptionsConfiguration( + fastify: FastifyInstance, +) { + fastify.route<{ + Body: Static; + }>({ + method: "POST", + url: "/configuration/wallet-subscriptions", + schema: { + summary: "Update wallet subscriptions configuration", + description: + "Update wallet subscriptions configuration including cron schedule", + tags: ["Configuration"], + operationId: "updateWalletSubscriptionsConfiguration", + body: requestBodySchema, + response: { + ...standardResponseSchema, + [StatusCodes.OK]: responseBodySchema, + }, + }, + handler: async (req, res) => { + const { walletSubscriptionsCronSchedule } = req.body; + if (isValidCron(walletSubscriptionsCronSchedule) === false) { + throw createCustomError( + "Invalid cron expression.", + StatusCodes.BAD_REQUEST, + "INVALID_CRON", + ); + } + + await updateConfiguration({ walletSubscriptionsCronSchedule }); + const config = await getConfig(false); + res.status(StatusCodes.OK).send({ + result: { + walletSubscriptionsCronSchedule: + config.walletSubscriptionsCronSchedule, + }, + }); + }, + }); +} diff --git a/src/server/routes/index.ts b/src/server/routes/index.ts index be174a78f..4a74394e1 100644 --- a/src/server/routes/index.ts +++ b/src/server/routes/index.ts @@ -117,6 +117,10 @@ import { createWalletCredentialRoute } from "./wallet-credentials/create"; import { getWalletCredentialRoute } from "./wallet-credentials/get"; import { getAllWalletCredentialsRoute } from "./wallet-credentials/get-all"; import { updateWalletCredentialRoute } from "./wallet-credentials/update"; +import { getAllWalletSubscriptionsRoute } from "./wallet-subscriptions/get-all"; +import { addWalletSubscriptionRoute } from "./wallet-subscriptions/add"; +import { updateWalletSubscriptionRoute } from "./wallet-subscriptions/update"; +import { deleteWalletSubscriptionRoute } from "./wallet-subscriptions/delete"; export async function withRoutes(fastify: FastifyInstance) { // Backend Wallets @@ -268,6 +272,12 @@ export async function withRoutes(fastify: FastifyInstance) { await fastify.register(getContractIndexedBlockRange); await fastify.register(getLatestBlock); + // Wallet Subscriptions + await fastify.register(getAllWalletSubscriptionsRoute); + await fastify.register(addWalletSubscriptionRoute); + await fastify.register(updateWalletSubscriptionRoute); + await fastify.register(deleteWalletSubscriptionRoute); + // Contract Transactions // @deprecated await fastify.register(getContractTransactionReceipts); diff --git a/src/server/routes/system/health.ts b/src/server/routes/system/health.ts index ed3718df1..3613c64de 100644 --- a/src/server/routes/system/health.ts +++ b/src/server/routes/system/health.ts @@ -12,7 +12,8 @@ type EngineFeature = | "IP_ALLOWLIST" | "HETEROGENEOUS_WALLET_TYPES" | "SMART_BACKEND_WALLETS" - | "WALLET_CREDENTIALS"; + | "WALLET_CREDENTIALS" + | "BALANCE_SUBSCRIPTIONS"; const ReplySchema = Type.Object({ db: Type.Boolean(), @@ -28,6 +29,7 @@ const ReplySchema = Type.Object({ Type.Literal("HETEROGENEOUS_WALLET_TYPES"), Type.Literal("SMART_BACKEND_WALLETS"), Type.Literal("WALLET_CREDENTIALS"), + Type.Literal("BALANCE_SUBSCRIPTIONS"), ]), ), clientId: Type.String(), @@ -80,6 +82,7 @@ const getFeatures = (): EngineFeature[] => { "CONTRACT_SUBSCRIPTIONS", "SMART_BACKEND_WALLETS", "WALLET_CREDENTIALS", + "BALANCE_SUBSCRIPTIONS", ]; if (env.ENABLE_KEYPAIR_AUTH) features.push("KEYPAIR_AUTH"); diff --git a/src/server/routes/wallet-subscriptions/add.ts b/src/server/routes/wallet-subscriptions/add.ts new file mode 100644 index 000000000..9239011cf --- /dev/null +++ b/src/server/routes/wallet-subscriptions/add.ts @@ -0,0 +1,122 @@ +import { type Static, Type } from "@sinclair/typebox"; +import type { FastifyInstance } from "fastify"; +import { StatusCodes } from "http-status-codes"; +import { createWalletSubscription } from "../../../shared/db/wallet-subscriptions/create-wallet-subscription"; +import { insertWebhook } from "../../../shared/db/webhooks/create-webhook"; +import { getWebhook } from "../../../shared/db/webhooks/get-webhook"; +import { WebhooksEventTypes } from "../../../shared/schemas/webhooks"; +import { createCustomError } from "../../middleware/error"; +import { AddressSchema } from "../../schemas/address"; +import { chainIdOrSlugSchema } from "../../schemas/chain"; +import { standardResponseSchema } from "../../schemas/shared-api-schemas"; +import { getChainIdFromChain } from "../../utils/chain"; +import { isValidWebhookUrl } from "../../utils/validator"; +import { + walletSubscriptionSchema, + toWalletSubscriptionSchema, +} from "../../schemas/wallet-subscription"; +import { WalletConditionsSchema } from "../../../shared/schemas/wallet-subscription-conditions"; + +const webhookUrlSchema = Type.Object({ + webhookUrl: Type.String({ + description: "Webhook URL to create a new webhook", + examples: ["https://example.com/webhook"], + }), + webhookLabel: Type.Optional( + Type.String({ + description: "Optional label for the webhook when creating a new one", + examples: ["My Wallet Subscription Webhook"], + minLength: 3, + }), + ), +}); + +const webhookIdSchema = Type.Object({ + webhookId: Type.Integer({ + description: "ID of an existing webhook to use", + }), +}); + +const requestBodySchema = Type.Intersect([ + Type.Object({ + chain: chainIdOrSlugSchema, + walletAddress: AddressSchema, + conditions: WalletConditionsSchema, + }), + Type.Optional(Type.Union([webhookUrlSchema, webhookIdSchema])), +]); + +const responseSchema = Type.Object({ + result: walletSubscriptionSchema, +}); + +export async function addWalletSubscriptionRoute(fastify: FastifyInstance) { + fastify.route<{ + Body: Static; + Reply: Static; + }>({ + method: "POST", + url: "/wallet-subscriptions", + schema: { + summary: "Add wallet subscription", + description: "Subscribe to wallet conditions.", + tags: ["Wallet-Subscriptions"], + operationId: "addWalletSubscription", + body: requestBodySchema, + response: { + ...standardResponseSchema, + [StatusCodes.OK]: responseSchema, + }, + }, + handler: async (request, reply) => { + const { chain, walletAddress, conditions } = request.body; + const chainId = await getChainIdFromChain(chain); + + let finalWebhookId: number | undefined; + + if ("webhookUrl" in request.body) { + const { webhookUrl, webhookLabel } = request.body; + + if (!isValidWebhookUrl(webhookUrl)) { + throw createCustomError( + "Invalid webhook URL. Make sure it starts with 'https://'.", + StatusCodes.BAD_REQUEST, + "BAD_REQUEST", + ); + } + + const webhook = await insertWebhook({ + url: webhookUrl, + name: webhookLabel, + eventType: WebhooksEventTypes.WALLET_SUBSCRIPTION, + }); + + finalWebhookId = webhook.id; + } else { + const { webhookId } = request.body; + const webhook = await getWebhook(webhookId); + + if (!webhook || webhook.revokedAt) { + throw createCustomError( + "Invalid webhook ID or webhook has been revoked.", + StatusCodes.BAD_REQUEST, + "BAD_REQUEST", + ); + } + + finalWebhookId = webhookId; + } + + const subscription = await createWalletSubscription({ + chainId: chainId.toString(), + walletAddress, + conditions, + webhookId: finalWebhookId, + }); + + reply.status(StatusCodes.OK).send({ + result: toWalletSubscriptionSchema(subscription), + }); + }, + }); +} diff --git a/src/server/routes/wallet-subscriptions/delete.ts b/src/server/routes/wallet-subscriptions/delete.ts new file mode 100644 index 000000000..947087107 --- /dev/null +++ b/src/server/routes/wallet-subscriptions/delete.ts @@ -0,0 +1,50 @@ +import { type Static, Type } from "@sinclair/typebox"; +import type { FastifyInstance } from "fastify"; +import { StatusCodes } from "http-status-codes"; +import { deleteWalletSubscription } from "../../../shared/db/wallet-subscriptions/delete-wallet-subscription"; +import { + walletSubscriptionSchema, + toWalletSubscriptionSchema, +} from "../../schemas/wallet-subscription"; +import { standardResponseSchema } from "../../schemas/shared-api-schemas"; + +const responseSchema = Type.Object({ + result: walletSubscriptionSchema, +}); + +const paramsSchema = Type.Object({ + subscriptionId: Type.String({ + description: "The ID of the wallet subscription to update.", + }), +}); + + +export async function deleteWalletSubscriptionRoute(fastify: FastifyInstance) { + fastify.route<{ + Reply: Static; + Params: Static; + }>({ + method: "DELETE", + url: "/wallet-subscriptions/:subscriptionId", + schema: { + summary: "Delete wallet subscription", + description: "Delete an existing wallet subscription.", + tags: ["Wallet-Subscriptions"], + operationId: "deleteWalletSubscription", + params: paramsSchema, + response: { + ...standardResponseSchema, + [StatusCodes.OK]: responseSchema, + }, + }, + handler: async (request, reply) => { + const { subscriptionId } = request.params; + + const subscription = await deleteWalletSubscription(subscriptionId); + + reply.status(StatusCodes.OK).send({ + result: toWalletSubscriptionSchema(subscription), + }); + }, + }); +} diff --git a/src/server/routes/wallet-subscriptions/get-all.ts b/src/server/routes/wallet-subscriptions/get-all.ts new file mode 100644 index 000000000..9f5621635 --- /dev/null +++ b/src/server/routes/wallet-subscriptions/get-all.ts @@ -0,0 +1,47 @@ +import { type Static, Type } from "@sinclair/typebox"; +import type { FastifyInstance } from "fastify"; +import { StatusCodes } from "http-status-codes"; +import { getAllWalletSubscriptions } from "../../../shared/db/wallet-subscriptions/get-all-wallet-subscriptions"; +import { + walletSubscriptionSchema, + toWalletSubscriptionSchema, +} from "../../schemas/wallet-subscription"; +import { standardResponseSchema } from "../../schemas/shared-api-schemas"; +import { PaginationSchema } from "../../schemas/pagination"; + +const responseSchema = Type.Object({ + result: Type.Array(walletSubscriptionSchema), +}); + +export async function getAllWalletSubscriptionsRoute(fastify: FastifyInstance) { + fastify.route<{ + Reply: Static; + Params: Static; + }>({ + method: "GET", + url: "/wallet-subscriptions/get-all", + schema: { + params: PaginationSchema, + summary: "Get wallet subscriptions", + description: "Get all wallet subscriptions.", + tags: ["Wallet-Subscriptions"], + operationId: "getAllWalletSubscriptions", + response: { + ...standardResponseSchema, + [StatusCodes.OK]: responseSchema, + }, + }, + handler: async (request, reply) => { + const { page, limit } = request.params; + + const subscriptions = await getAllWalletSubscriptions({ + page, + limit, + }); + + reply.status(StatusCodes.OK).send({ + result: subscriptions.map(toWalletSubscriptionSchema), + }); + }, + }); +} diff --git a/src/server/routes/wallet-subscriptions/update.ts b/src/server/routes/wallet-subscriptions/update.ts new file mode 100644 index 000000000..c4030dcbd --- /dev/null +++ b/src/server/routes/wallet-subscriptions/update.ts @@ -0,0 +1,81 @@ +import { type Static, Type } from "@sinclair/typebox"; +import type { FastifyInstance } from "fastify"; +import { StatusCodes } from "http-status-codes"; +import { updateWalletSubscription } from "../../../shared/db/wallet-subscriptions/update-wallet-subscription"; +import { WalletConditionsSchema } from "../../../shared/schemas/wallet-subscription-conditions"; +import { AddressSchema } from "../../schemas/address"; +import { chainIdOrSlugSchema } from "../../schemas/chain"; +import { + walletSubscriptionSchema, + toWalletSubscriptionSchema, +} from "../../schemas/wallet-subscription"; +import { standardResponseSchema } from "../../schemas/shared-api-schemas"; +import { getChainIdFromChain } from "../../utils/chain"; + +const requestBodySchema = Type.Object({ + chain: Type.Optional(chainIdOrSlugSchema), + walletAddress: Type.Optional(AddressSchema), + conditions: Type.Optional(WalletConditionsSchema), + webhookId: Type.Optional( + Type.Union([ + Type.Integer({ + description: "The ID of an existing webhook to use.", + }), + Type.Null(), + ]), + ), +}); + +const paramsSchema = Type.Object({ + subscriptionId: Type.String({ + description: "The ID of the wallet subscription to update.", + }), +}); + +const responseSchema = Type.Object({ + result: walletSubscriptionSchema, +}); + +export async function updateWalletSubscriptionRoute(fastify: FastifyInstance) { + fastify.route<{ + Body: Static; + Reply: Static; + Params: Static; + }>({ + method: "POST", + url: "/wallet-subscriptions/:subscriptionId", + schema: { + params: paramsSchema, + summary: "Update wallet subscription", + description: "Update an existing wallet subscription.", + tags: ["Wallet-Subscriptions"], + operationId: "updateWalletSubscription", + body: requestBodySchema, + response: { + ...standardResponseSchema, + [StatusCodes.OK]: responseSchema, + }, + }, + handler: async (request, reply) => { + const { subscriptionId } = request.params; + + const { chain, walletAddress, conditions, webhookId } = request.body; + + // Get chainId if chain is provided + const chainId = chain ? await getChainIdFromChain(chain) : undefined; + + // Update the subscription + const subscription = await updateWalletSubscription({ + id: subscriptionId, + chainId: chainId?.toString(), + walletAddress, + conditions, + webhookId, + }); + + reply.status(StatusCodes.OK).send({ + result: toWalletSubscriptionSchema(subscription), + }); + }, + }); +} diff --git a/src/server/schemas/wallet-subscription.ts b/src/server/schemas/wallet-subscription.ts new file mode 100644 index 000000000..8930c01d0 --- /dev/null +++ b/src/server/schemas/wallet-subscription.ts @@ -0,0 +1,48 @@ +import { Type } from "@sinclair/typebox"; +import type { WalletSubscriptions, Webhooks } from "@prisma/client"; +import { AddressSchema } from "./address"; +import { + WalletConditionsSchema, + validateConditions, +} from "../../shared/schemas/wallet-subscription-conditions"; + +type WalletSubscriptionWithWebhook = WalletSubscriptions & { + webhook: Webhooks | null; +}; + +export const walletSubscriptionSchema = Type.Object({ + id: Type.String(), + chainId: Type.String({ + description: "The chain ID of the subscription.", + }), + walletAddress: AddressSchema, + conditions: WalletConditionsSchema, + webhook: Type.Optional( + Type.Object({ + url: Type.String(), + }), + ), + createdAt: Type.String(), + updatedAt: Type.String(), +}); + +export type WalletSubscriptionSchema = typeof walletSubscriptionSchema; + +export function toWalletSubscriptionSchema( + subscription: WalletSubscriptionWithWebhook, +) { + return { + id: subscription.id, + chainId: subscription.chainId, + walletAddress: subscription.walletAddress, + conditions: validateConditions(subscription.conditions), + webhook: + subscription.webhookId && subscription.webhook + ? { + url: subscription.webhook.url, + } + : undefined, + createdAt: subscription.createdAt.toISOString(), + updatedAt: subscription.updatedAt.toISOString(), + }; +} diff --git a/src/shared/db/wallet-subscriptions/create-wallet-subscription.ts b/src/shared/db/wallet-subscriptions/create-wallet-subscription.ts new file mode 100644 index 000000000..437833a4f --- /dev/null +++ b/src/shared/db/wallet-subscriptions/create-wallet-subscription.ts @@ -0,0 +1,55 @@ +import type { Prisma } from "@prisma/client"; +import { prisma } from "../client"; +import type { WalletConditions } from "../../schemas/wallet-subscription-conditions"; +import { validateConditions } from "../../schemas/wallet-subscription-conditions"; +import { getWebhook } from "../webhooks/get-webhook"; +import { WebhooksEventTypes } from "../../schemas/webhooks"; + +interface CreateWalletSubscriptionParams { + chainId: string; + walletAddress: string; + conditions: WalletConditions; + webhookId?: number; +} + +export async function createWalletSubscription({ + chainId, + walletAddress, + conditions, + webhookId, +}: CreateWalletSubscriptionParams) { + // Validate conditions + const validatedConditions = validateConditions(conditions); + + if (webhookId) { + const webhook = await getWebhook(webhookId); + if (!webhook) { + throw new Error("Webhook not found"); + } + if (webhook.revokedAt) { + throw new Error("Webhook has been revoked"); + } + if (webhook.eventType !== WebhooksEventTypes.WALLET_SUBSCRIPTION) { + throw new Error("Webhook is not a wallet subscription webhook"); + } + } + + const existingSubscriptionsCount = await prisma.walletSubscriptions.count({}); + + if (existingSubscriptionsCount >= 1000) { + throw new Error("Maximum number of wallet subscriptions reached"); + } + + // Create a new subscription + return await prisma.walletSubscriptions.create({ + data: { + chainId, + walletAddress: walletAddress.toLowerCase(), + conditions: validatedConditions as Prisma.InputJsonValue[], + webhookId, + }, + include: { + webhook: true, + }, + }); +} diff --git a/src/shared/db/wallet-subscriptions/delete-wallet-subscription.ts b/src/shared/db/wallet-subscriptions/delete-wallet-subscription.ts new file mode 100644 index 000000000..65e3ecd49 --- /dev/null +++ b/src/shared/db/wallet-subscriptions/delete-wallet-subscription.ts @@ -0,0 +1,16 @@ +import { prisma } from "../client"; + +export async function deleteWalletSubscription(id: string) { + return await prisma.walletSubscriptions.update({ + where: { + id, + deletedAt: null, + }, + data: { + deletedAt: new Date(), + }, + include: { + webhook: true, + }, + }); +} \ No newline at end of file diff --git a/src/shared/db/wallet-subscriptions/get-all-wallet-subscriptions.ts b/src/shared/db/wallet-subscriptions/get-all-wallet-subscriptions.ts new file mode 100644 index 000000000..631254375 --- /dev/null +++ b/src/shared/db/wallet-subscriptions/get-all-wallet-subscriptions.ts @@ -0,0 +1,27 @@ +import { validateConditions } from "../../schemas/wallet-subscription-conditions"; +import { prisma } from "../client"; + +export async function getAllWalletSubscriptions(args?: { + page?: number; + limit?: number; +}) { + const { page, limit } = args || {}; + const subscriptions = await prisma.walletSubscriptions.findMany({ + where: { + deletedAt: null, + }, + include: { + webhook: true, + }, + skip: page && limit ? (page - 1) * limit : undefined, + take: limit, + orderBy: { + updatedAt: "desc", + }, + }); + + return subscriptions.map((subscription) => ({ + ...subscription, + conditions: validateConditions(subscription.conditions), + })); +} diff --git a/src/shared/db/wallet-subscriptions/update-wallet-subscription.ts b/src/shared/db/wallet-subscriptions/update-wallet-subscription.ts new file mode 100644 index 000000000..2082470d7 --- /dev/null +++ b/src/shared/db/wallet-subscriptions/update-wallet-subscription.ts @@ -0,0 +1,53 @@ +import type { Prisma } from "@prisma/client"; +import { prisma } from "../client"; +import type { WalletConditions } from "../../schemas/wallet-subscription-conditions"; +import { validateConditions } from "../../schemas/wallet-subscription-conditions"; +import { WebhooksEventTypes } from "../../schemas/webhooks"; +import { getWebhook } from "../webhooks/get-webhook"; + +interface UpdateWalletSubscriptionParams { + id: string; + chainId?: string; + walletAddress?: string; + conditions?: WalletConditions; + webhookId?: number | null; +} + +export async function updateWalletSubscription({ + id, + chainId, + walletAddress, + conditions, + webhookId, +}: UpdateWalletSubscriptionParams) { + if (webhookId) { + const webhook = await getWebhook(webhookId); + if (!webhook) { + throw new Error("Webhook not found"); + } + if (webhook.revokedAt) { + throw new Error("Webhook has been revoked"); + } + if (webhook.eventType !== WebhooksEventTypes.WALLET_SUBSCRIPTION) { + throw new Error("Webhook is not a wallet subscription webhook"); + } + } + + return await prisma.walletSubscriptions.update({ + where: { + id, + deletedAt: null, + }, + data: { + ...(chainId && { chainId }), + ...(walletAddress && { walletAddress: walletAddress.toLowerCase() }), + ...(conditions && { + conditions: validateConditions(conditions) as Prisma.InputJsonValue[], + }), + ...(webhookId !== undefined && { webhookId }), + }, + include: { + webhook: true, + }, + }); +} diff --git a/src/shared/schemas/wallet-subscription-conditions.ts b/src/shared/schemas/wallet-subscription-conditions.ts new file mode 100644 index 000000000..72e27f040 --- /dev/null +++ b/src/shared/schemas/wallet-subscription-conditions.ts @@ -0,0 +1,53 @@ +import { Type } from "@sinclair/typebox"; +import { z } from "zod"; +import { AddressSchema } from "../../server/schemas/address"; + +// TypeBox schemas for API validation +export const WalletConditionSchema = Type.Union([ + Type.Object({ + type: Type.Literal('token_balance_lt'), + tokenAddress: Type.Union([AddressSchema, Type.Literal('native')]), + value: Type.String({ + description: "The threshold value in wei", + examples: ["1000000000000000000"] // 1 ETH + }) + }), + Type.Object({ + type: Type.Literal('token_balance_gt'), + tokenAddress: Type.Union([AddressSchema, Type.Literal('native')]), + value: Type.String({ + description: "The threshold value in wei", + examples: ["1000000000000000000"] // 1 ETH + }) + }) +]); + +export const WalletConditionsSchema = Type.Array(WalletConditionSchema, { + maxItems: 100, + description: "Array of conditions to monitor for this wallet" +}); + +// Zod schemas for internal validation +export const WalletConditionZ = z.discriminatedUnion('type', [ + z.object({ + type: z.literal('token_balance_lt'), + tokenAddress: z.union([z.string(), z.literal('native')]), + value: z.string() + }), + z.object({ + type: z.literal('token_balance_gt'), + tokenAddress: z.union([z.string(), z.literal('native')]), + value: z.string() + }) +]); + +export const WalletConditionsZ = z.array(WalletConditionZ).max(100); + +// Type exports +export type WalletCondition = z.infer; +export type WalletConditions = z.infer; + +// Helper to validate conditions +export function validateConditions(conditions: unknown): WalletConditions { + return WalletConditionsZ.parse(conditions); +} \ No newline at end of file diff --git a/src/shared/schemas/webhooks.ts b/src/shared/schemas/webhooks.ts index 57279d378..478168430 100644 --- a/src/shared/schemas/webhooks.ts +++ b/src/shared/schemas/webhooks.ts @@ -1,3 +1,5 @@ +import type { WalletCondition } from "./wallet-subscription-conditions"; + export enum WebhooksEventTypes { QUEUED_TX = "queued_transaction", SENT_TX = "sent_transaction", @@ -8,6 +10,7 @@ export enum WebhooksEventTypes { BACKEND_WALLET_BALANCE = "backend_wallet_balance", AUTH = "auth", CONTRACT_SUBSCRIPTION = "contract_subscription", + WALLET_SUBSCRIPTION = "wallet_subscription", } export type BackendWalletBalanceWebhookParams = { @@ -17,3 +20,10 @@ export type BackendWalletBalanceWebhookParams = { chainId: number; message: string; }; +export interface WalletSubscriptionWebhookParams { + subscriptionId: string; + chainId: string; + walletAddress: string; + condition: WalletCondition; + currentValue: string; +} diff --git a/src/worker/index.ts b/src/worker/index.ts index dce96767f..41b3825d3 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -16,6 +16,7 @@ import { initProcessTransactionReceiptsWorker } from "./tasks/process-transactio import { initPruneTransactionsWorker } from "./tasks/prune-transactions-worker"; import { initSendTransactionWorker } from "./tasks/send-transaction-worker"; import { initSendWebhookWorker } from "./tasks/send-webhook-worker"; +import { initWalletSubscriptionWorker } from "./tasks/wallet-subscription-worker"; export const initWorker = async () => { initCancelRecycledNoncesWorker(); @@ -25,10 +26,10 @@ export const initWorker = async () => { initSendTransactionWorker(); initMineTransactionWorker(); initSendWebhookWorker(); - initNonceHealthCheckWorker(); await initNonceResyncWorker(); + await initWalletSubscriptionWorker(); // Listen for new & updated configuration data. await newConfigurationListener(); diff --git a/src/worker/queues/send-webhook-queue.ts b/src/worker/queues/send-webhook-queue.ts index 1f79bc224..6259670c4 100644 --- a/src/worker/queues/send-webhook-queue.ts +++ b/src/worker/queues/send-webhook-queue.ts @@ -8,6 +8,7 @@ import SuperJSON from "superjson"; import { WebhooksEventTypes, type BackendWalletBalanceWebhookParams, + type WalletSubscriptionWebhookParams, } from "../../shared/schemas/webhooks"; import { getWebhooksByEventType } from "../../shared/utils/cache/get-webhook"; import { redis } from "../../shared/utils/redis/redis"; @@ -34,11 +35,18 @@ export type EnqueueLowBalanceWebhookData = { body: BackendWalletBalanceWebhookParams; }; +export type EnqueueWalletSubscriptionWebhookData = { + type: WebhooksEventTypes.WALLET_SUBSCRIPTION; + webhook: Webhooks; + body: WalletSubscriptionWebhookParams; +}; + // Add other webhook event types here. type EnqueueWebhookData = | EnqueueContractSubscriptionWebhookData | EnqueueTransactionWebhookData - | EnqueueLowBalanceWebhookData; + | EnqueueLowBalanceWebhookData + | EnqueueWalletSubscriptionWebhookData; export interface WebhookJob { data: EnqueueWebhookData; @@ -66,6 +74,8 @@ export class SendWebhookQueue { return this._enqueueTransactionWebhook(data); case WebhooksEventTypes.BACKEND_WALLET_BALANCE: return this._enqueueBackendWalletBalanceWebhook(data); + case WebhooksEventTypes.WALLET_SUBSCRIPTION: + return this._enqueueWalletSubscriptionWebhook(data); } }; @@ -161,4 +171,18 @@ export class SendWebhookQueue { ); } }; + + private static _enqueueWalletSubscriptionWebhook = async ( + data: EnqueueWalletSubscriptionWebhookData, + ) => { + const { type, webhook, body } = data; + if (!webhook.revokedAt && type === webhook.eventType) { + const job: WebhookJob = { data, webhook }; + const serialized = SuperJSON.stringify(job); + await this.q.add( + `${type}:${body.chainId}:${body.walletAddress}:${body.subscriptionId}`, + serialized, + ); + } + }; } diff --git a/src/worker/queues/wallet-subscription-queue.ts b/src/worker/queues/wallet-subscription-queue.ts new file mode 100644 index 000000000..15f4344ae --- /dev/null +++ b/src/worker/queues/wallet-subscription-queue.ts @@ -0,0 +1,14 @@ +import { Queue } from "bullmq"; +import { redis } from "../../shared/utils/redis/redis"; +import { defaultJobOptions } from "./queues"; + +export class WalletSubscriptionQueue { + static q = new Queue("wallet-subscription", { + connection: redis, + defaultJobOptions, + }); + + constructor() { + WalletSubscriptionQueue.q.setGlobalConcurrency(1); + } +} \ No newline at end of file diff --git a/src/worker/tasks/send-webhook-worker.ts b/src/worker/tasks/send-webhook-worker.ts index dda1f1f5b..61649133f 100644 --- a/src/worker/tasks/send-webhook-worker.ts +++ b/src/worker/tasks/send-webhook-worker.ts @@ -5,6 +5,7 @@ import { TransactionDB } from "../../shared/db/transactions/db"; import { WebhooksEventTypes, type BackendWalletBalanceWebhookParams, + type WalletSubscriptionWebhookParams, } from "../../shared/schemas/webhooks"; import { toEventLogSchema } from "../../server/schemas/event-log"; import { @@ -18,7 +19,10 @@ import { sendWebhookRequest, type WebhookResponse, } from "../../shared/utils/webhook"; -import { SendWebhookQueue, type WebhookJob } from "../queues/send-webhook-queue"; +import { + SendWebhookQueue, + type WebhookJob, +} from "../queues/send-webhook-queue"; const handler: Processor = async (job: Job) => { const { data, webhook } = superjson.parse(job.data); @@ -69,6 +73,15 @@ const handler: Processor = async (job: Job) => { resp = await sendWebhookRequest(webhook, webhookBody); break; } + + case WebhooksEventTypes.WALLET_SUBSCRIPTION: { + const webhookBody: WalletSubscriptionWebhookParams = data.body; + resp = await sendWebhookRequest( + webhook, + webhookBody as unknown as Record, + ); + break; + } } // Throw on 5xx so it remains in the queue to retry later. diff --git a/src/worker/tasks/wallet-subscription-worker.ts b/src/worker/tasks/wallet-subscription-worker.ts new file mode 100644 index 000000000..9ede42425 --- /dev/null +++ b/src/worker/tasks/wallet-subscription-worker.ts @@ -0,0 +1,155 @@ +import { type Job, type Processor, Worker } from "bullmq"; +import { getAllWalletSubscriptions } from "../../shared/db/wallet-subscriptions/get-all-wallet-subscriptions"; +import { getConfig } from "../../shared/utils/cache/get-config"; +import { logger } from "../../shared/utils/logger"; +import { redis } from "../../shared/utils/redis/redis"; +import { WalletSubscriptionQueue } from "../queues/wallet-subscription-queue"; +import { logWorkerExceptions } from "../queues/queues"; +import { SendWebhookQueue } from "../queues/send-webhook-queue"; +import { WebhooksEventTypes } from "../../shared/schemas/webhooks"; +import { getChain } from "../../shared/utils/chain"; +import { thirdwebClient } from "../../shared/utils/sdk"; +import { getWalletBalance } from "thirdweb/wallets"; +import type { Chain } from "thirdweb/chains"; +import type { WalletCondition } from "../../shared/schemas/wallet-subscription-conditions"; +import type { WalletSubscriptions, Webhooks } from "@prisma/client"; +import { prettifyError } from "../../shared/utils/error"; + +type WalletSubscriptionWithWebhook = WalletSubscriptions & { + conditions: WalletCondition[]; + webhook: Webhooks | null; +}; + +// Split array into chunks of specified size +function chunk(arr: T[], size: number): T[][] { + return Array.from({ length: Math.ceil(arr.length / size) }, (_, i) => + arr.slice(i * size, i * size + size), + ); +} + +/** + * Verify if a condition is met for a given wallet + * Returns the current value if condition is met, undefined otherwise + */ +async function verifyCondition({ + condition, + walletAddress, + chain, +}: { + condition: WalletCondition; + walletAddress: string; + chain: Chain; +}): Promise { + switch (condition.type) { + case "token_balance_lt": + case "token_balance_gt": { + const currentBalanceResponse = await getWalletBalance({ + address: walletAddress, + client: thirdwebClient, + tokenAddress: + condition.tokenAddress === "native" + ? undefined + : condition.tokenAddress, + chain, + }); + + const currentBalance = currentBalanceResponse.value; + const threshold = BigInt(condition.value); + + const isConditionMet = + condition.type === "token_balance_lt" + ? currentBalance < threshold + : currentBalance > threshold; + + return isConditionMet ? currentBalance.toString() : null; + } + } +} + +/** + * Process a batch of subscriptions and trigger webhooks for any met conditions + */ +async function processSubscriptions( + subscriptions: WalletSubscriptionWithWebhook[], +) { + await Promise.all( + subscriptions.map(async (subscription) => { + try { + const chain = await getChain(Number.parseInt(subscription.chainId)); + + // Process each condition for the subscription + for (const condition of subscription.conditions) { + const currentValue = await verifyCondition({ + condition, + walletAddress: subscription.walletAddress, + chain, + }); + + if (currentValue && subscription.webhookId && subscription.webhook) { + await SendWebhookQueue.enqueueWebhook({ + type: WebhooksEventTypes.WALLET_SUBSCRIPTION, + webhook: subscription.webhook, + body: { + subscriptionId: subscription.id, + chainId: subscription.chainId, + walletAddress: subscription.walletAddress, + condition, + currentValue, + }, + }); + } + } + } catch (error) { + // Log error but continue processing other subscriptions + const message = prettifyError(error); + logger({ + service: "worker", + level: "error", + message: `Error processing wallet subscription ${subscription.id}: ${message}`, + error: error as Error, + }); + } + }), + ); +} + +// Must be explicitly called for the worker to run on this host. +export const initWalletSubscriptionWorker = async () => { + const config = await getConfig(); + const cronPattern = + config.walletSubscriptionsCronSchedule || "*/30 * * * * *"; // Default to every 30 seconds + + logger({ + service: "worker", + level: "info", + message: `Initializing wallet subscription worker with cron pattern: ${cronPattern}`, + }); + + WalletSubscriptionQueue.q.add("cron", "", { + repeat: { pattern: cronPattern }, + jobId: "wallet-subscription-cron", + }); + + const _worker = new Worker(WalletSubscriptionQueue.q.name, handler, { + connection: redis, + concurrency: 1, + }); + logWorkerExceptions(_worker); +}; + +/** + * Process all wallet subscriptions and notify webhooks when conditions are met. + */ +const handler: Processor = async (_job: Job) => { + // Get all active wallet subscriptions + const subscriptions = await getAllWalletSubscriptions(); + if (subscriptions.length === 0) { + return; + } + + // Process in batches of 50 + const batches = chunk(subscriptions, 50); + for (const batch of batches) { + await processSubscriptions(batch); + } +}; diff --git a/tests/e2e/tests/workers/wallet-subscription-worker.test.ts b/tests/e2e/tests/workers/wallet-subscription-worker.test.ts new file mode 100644 index 000000000..5bdaec1fd --- /dev/null +++ b/tests/e2e/tests/workers/wallet-subscription-worker.test.ts @@ -0,0 +1,222 @@ +import { + beforeAll, + afterAll, + describe, + expect, + test, + beforeEach, + afterEach, +} from "vitest"; +import Fastify, { type FastifyInstance } from "fastify"; +import { setup } from "../setup"; +import type { WalletSubscriptionWebhookParams } from "../../../../src/shared/schemas/webhooks"; +import type { Engine } from "../../../../sdk/dist/thirdweb-dev-engine.cjs"; +import type { WalletCondition } from "../../../../src/shared/schemas/wallet-subscription-conditions"; +import { sleep } from "bun"; + +describe("Wallet Subscription Worker", () => { + let testCallbackServer: FastifyInstance; + let engine: Engine; + let webhookPayloads: WalletSubscriptionWebhookParams[] = []; + let webhookId: number; + + beforeAll(async () => { + engine = (await setup()).engine; + testCallbackServer = await createTempCallbackServer(); + + // Create a webhook that we'll reuse for all tests + const webhook = await engine.webhooks.create({ + url: "http://localhost:3006/callback", + eventType: "wallet_subscription", + }); + webhookId = webhook.result.id; + }); + + afterAll(async () => { + await testCallbackServer.close(); + }); + + beforeEach(() => { + // Clear webhook payloads before each test + webhookPayloads = []; + }); + + afterEach(async () => { + await sleep(5000); // wait for any unsent webhooks to be sent + }); + + const createTempCallbackServer = async () => { + const tempServer = Fastify(); + + tempServer.post("/callback", async (request) => { + const payload = request.body as WalletSubscriptionWebhookParams; + webhookPayloads.push(payload); + return { success: true }; + }); + + await tempServer.listen({ port: 3006 }); + return tempServer; + }; + + const waitForWebhookPayloads = async ( + timeoutMs = 5000, + ): Promise => { + // Wait for initial webhooks to come in + await new Promise((resolve) => setTimeout(resolve, timeoutMs)); + return webhookPayloads; + }; + + const createSubscription = async (conditions: WalletCondition[]) => { + const subscription = await engine.walletSubscriptions.addWalletSubscription( + { + chain: "137", + walletAddress: "0xE52772e599b3fa747Af9595266b527A31611cebd", + conditions, + webhookId, + }, + ); + + return subscription.result; + }; + + test("should create and validate wallet subscription", async () => { + const condition: WalletCondition = { + type: "token_balance_lt", + value: "100000000000000000", // 0.1 ETH + tokenAddress: "native", + }; + + const subscription = await createSubscription([condition]); + + expect(subscription.chainId).toBe("137"); + expect(subscription.walletAddress.toLowerCase()).toBe( + "0xE52772e599b3fa747Af9595266b527A31611cebd".toLowerCase(), + ); + expect(subscription.conditions).toEqual([condition]); + expect(subscription.webhook?.url).toBe("http://localhost:3006/callback"); + + // Cleanup + await engine.walletSubscriptions.deleteWalletSubscription(subscription.id); + }); + + test("should fire webhooks for token balance less than threshold", async () => { + const condition: WalletCondition = { + type: "token_balance_lt", + value: "1000000000000000000000", // 1000 ETH (high threshold to ensure trigger) + tokenAddress: "native", + }; + + const subscription = await createSubscription([condition]); + + try { + const payloads = await waitForWebhookPayloads(); + + // Verify we got webhooks + expect(payloads.length).toBeGreaterThan(0); + + // Verify webhook data is correct + for (const payload of payloads) { + expect(payload.subscriptionId).toBe(subscription.id); + expect(payload.chainId).toBe("137"); + expect(payload.walletAddress.toLowerCase()).toBe( + "0xE52772e599b3fa747Af9595266b527A31611cebd".toLowerCase(), + ); + expect(payload.condition).toEqual(condition); + expect(BigInt(payload.currentValue)).toBeLessThan( + BigInt(condition.value), + ); + } + } finally { + await engine.walletSubscriptions.deleteWalletSubscription( + subscription.id, + ); + } + }); + + test("should fire webhooks for token balance greater than threshold", async () => { + const condition: WalletCondition = { + type: "token_balance_gt", + value: "1000000000000", // Very small threshold to ensure trigger + tokenAddress: "native", + }; + + const subscription = await createSubscription([condition]); + + try { + const payloads = await waitForWebhookPayloads(); + + // Verify we got webhooks + expect(payloads.length).toBeGreaterThan(0); + + // Verify webhook data is correct + for (const payload of payloads) { + expect(payload.subscriptionId).toBe(subscription.id); + expect(payload.chainId).toBe("137"); + expect(payload.walletAddress.toLowerCase()).toBe( + "0xE52772e599b3fa747Af9595266b527A31611cebd".toLowerCase(), + ); + expect(payload.condition).toEqual(condition); + expect(BigInt(payload.currentValue)).toBeGreaterThan( + BigInt(condition.value), + ); + } + } finally { + await engine.walletSubscriptions.deleteWalletSubscription( + subscription.id, + ); + } + }); + + test("should fire webhooks for multiple conditions", async () => { + const conditions: WalletCondition[] = [ + { + type: "token_balance_gt", + value: "1000000000000", // Very small threshold to ensure trigger + tokenAddress: "native", + }, + { + type: "token_balance_lt", + value: "1000000000000000000000", // 1000 ETH (high threshold to ensure trigger) + tokenAddress: "native", + }, + ]; + + const subscription = await createSubscription(conditions); + + try { + const payloads = await waitForWebhookPayloads(); + + // Verify we got webhooks for both conditions + expect(payloads.length).toBeGreaterThan(1); + + // Verify we got webhooks for both conditions + const uniqueConditions = new Set(payloads.map((p) => p.condition.type)); + expect(uniqueConditions.size).toBe(2); + + // Verify each webhook has correct data + for (const payload of payloads) { + expect(payload.subscriptionId).toBe(subscription.id); + expect(payload.chainId).toBe("137"); + expect(payload.walletAddress.toLowerCase()).toBe( + "0xE52772e599b3fa747Af9595266b527A31611cebd".toLowerCase(), + ); + expect(payload.currentValue).toBeDefined(); + + // Verify the value satisfies the condition + if (payload.condition.type === "token_balance_gt") { + expect(BigInt(payload.currentValue)).toBeGreaterThan( + BigInt(payload.condition.value), + ); + } else { + expect(BigInt(payload.currentValue)).toBeLessThan( + BigInt(payload.condition.value), + ); + } + } + } finally { + await engine.walletSubscriptions.deleteWalletSubscription( + subscription.id, + ); + } + }); +});