Skip to content

Commit

Permalink
fix: migrate set -> sorted set for existing nonce-recycled keys (#693)
Browse files Browse the repository at this point in the history
* fix: migrate set -> sorted set for existing nonce-recycled keys

* blocking poll

* exit 0
  • Loading branch information
arcoraven authored Oct 2, 2024
1 parent fbf6d5f commit 26397db
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 11 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
"generate:sdk": "npx tsx ./src/scripts/generate-sdk && cd ./sdk && yarn build",
"prisma:setup:dev": "npx tsx ./src/scripts/setup-db.ts",
"prisma:setup:prod": "npx tsx ./dist/scripts/setup-db.js",
"start": "yarn prisma:setup:prod && yarn start:run",
"start": "yarn prisma:setup:prod && yarn start:migrations && yarn start:run",
"start:migrations": "npx tsx ./dist/scripts/apply-migrations.js",
"start:run": "node --experimental-specifier-resolution=node ./dist/index.js",
"start:docker": "docker compose --profile engine --env-file ./.env up --remove-orphans",
"start:docker-force-build": "docker compose --profile engine --env-file ./.env up --remove-orphans --build",
Expand Down
16 changes: 8 additions & 8 deletions src/db/wallets/walletNonce.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import {
Address,
eth_getTransactionCount,
getAddress,
getRpcClient,
type Address,
} from "thirdweb";
import { getChain } from "../../utils/chain";
import { logger } from "../../utils/logger";
Expand Down Expand Up @@ -37,7 +37,7 @@ export const getUsedBackendWallets = async (
return keys.map((key) => {
const tokens = key.split(":");
return {
chainId: parseInt(tokens[1]),
chainId: Number.parseInt(tokens[1]),
walletAddress: getAddress(tokens[2]),
};
});
Expand All @@ -61,7 +61,7 @@ export const lastUsedNonceKey = (chainId: number, walletAddress: Address) =>
export const splitLastUsedNonceKey = (key: string) => {
const _splittedKeys = key.split(":");
const walletAddress = normalizeAddress(_splittedKeys[2]);
const chainId = parseInt(_splittedKeys[1]);
const chainId = Number.parseInt(_splittedKeys[1]);
return { walletAddress, chainId };
};

Expand All @@ -87,7 +87,7 @@ export const sentNoncesKey = (chainId: number, walletAddress: Address) =>
export const splitSentNoncesKey = (key: string) => {
const _splittedKeys = key.split(":");
const walletAddress = normalizeAddress(_splittedKeys[2]);
const chainId = parseInt(_splittedKeys[1]);
const chainId = Number.parseInt(_splittedKeys[1]);
return { walletAddress, chainId };
};

Expand Down Expand Up @@ -166,7 +166,7 @@ export const acquireNonce = async (args: {
nonce,
queueId,
});
return { nonce, isRecycledNonce: false };
return { nonce, isRecycledNonce };
};

/**
Expand All @@ -181,7 +181,7 @@ export const recycleNonce = async (
walletAddress: Address,
nonce: number,
) => {
if (isNaN(nonce)) {
if (Number.isNaN(nonce)) {
logger({
level: "warn",
message: `[recycleNonce] Invalid nonce: ${nonce}`,
Expand Down Expand Up @@ -209,7 +209,7 @@ const _acquireRecycledNonce = async (
if (result.length === 0) {
return null;
}
return parseInt(result[0]);
return Number.parseInt(result[0]);
};

/**
Expand Down Expand Up @@ -246,7 +246,7 @@ export const syncLatestNonceFromOnchain = async (
export const inspectNonce = async (chainId: number, walletAddress: Address) => {
const key = lastUsedNonceKey(chainId, walletAddress);
const nonce = await redis.get(key);
return nonce ? parseInt(nonce) : 0;
return nonce ? Number.parseInt(nonce) : 0;
};

/**
Expand Down
69 changes: 69 additions & 0 deletions src/scripts/apply-migrations.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { logger } from "../utils/logger";
import { acquireLock, releaseLock, waitForLock } from "../utils/redis/lock";
import { redis } from "../utils/redis/redis";

const MIGRATION_LOCK_TTL_SECONDS = 60;

const main = async () => {
// Acquire a lock to allow only one host to run migrations.
// Other hosts block until the migration is completed or lock times out.
const acquiredLock = await acquireLock(
"lock:apply-migrations",
MIGRATION_LOCK_TTL_SECONDS,
);
if (!acquiredLock) {
logger({
level: "info",
message: "Migration in progress. Waiting for the lock to release...",
service: "server",
});
await waitForLock("lock:apply-migrations");
process.exit(0);
}

try {
await migrateRecycledNonces();

logger({
level: "info",
message: "Completed migrations without errors.",
service: "server",
});
} catch (e) {
logger({
level: "error",
message: `Failed to complete migrations: ${e}`,
service: "server",
});
process.exit(1);
} finally {
await releaseLock("lock:apply-migrations");
}

process.exit(0);
};

const migrateRecycledNonces = async () => {
const keys = await redis.keys("nonce-recycled:*");

// For each `nonce-recycled:*` key that is a "set" in Redis,
// migrate all members to a sorted set with score == nonce.
for (const key of keys) {
const type = await redis.type(key);
if (type !== "set") {
continue;
}

const members = await redis.smembers(key);
await redis.del(key);
if (members.length > 0) {
const args = members.flatMap((member) => {
const score = Number.parseInt(member);
return Number.isNaN(score) ? [] : [score, member];
});
await redis.zadd(key, ...args);
}
}
};

main();
2 changes: 1 addition & 1 deletion src/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ export const initServer = async () => {
logger({
service: "server",
level: "fatal",
message: `Failed to start server`,
message: "Failed to start server",
error: err,
});
process.exit(1);
Expand Down
41 changes: 41 additions & 0 deletions src/utils/redis/lock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { redis } from "./redis";

// Add more locks here.
type LockType = "lock:apply-migrations";

/**
* Acquire a lock to prevent duplicate runs of a workflow.
*
* @param key string The lock identifier.
* @param ttlSeconds number The number of seconds before the lock is automatically released.
* @returns true if the lock was acquired. Else false.
*/
export const acquireLock = async (
key: LockType,
ttlSeconds: number,
): Promise<boolean> => {
const result = await redis.set(key, Date.now(), "EX", ttlSeconds, "NX");
return result === "OK";
};

/**
* Release a lock.
*
* @param key The lock identifier.
* @returns true if the lock was active before releasing.
*/
export const releaseLock = async (key: LockType) => {
const result = await redis.del(key);
return result > 0;
};

/**
* Blocking polls a lock every second until it's released.
*
* @param key The lock identifier.
*/
export const waitForLock = async (key: LockType) => {
while (await redis.get(key)) {
await new Promise((resolve) => setTimeout(resolve, 1_000));
}
};
2 changes: 1 addition & 1 deletion src/utils/redis/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export const isRedisReachable = async () => {
try {
await redis.ping();
return true;
} catch (error) {
} catch {
return false;
}
};

0 comments on commit 26397db

Please sign in to comment.