Skip to content

Commit

Permalink
feat: catch error for pool.connect
Browse files Browse the repository at this point in the history
  • Loading branch information
LukasDeco committed Oct 11, 2024
1 parent a4f9a79 commit d63d50f
Show file tree
Hide file tree
Showing 21 changed files with 715 additions and 602 deletions.
14 changes: 10 additions & 4 deletions packages/database/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { drizzle, NodePgDatabase } from "drizzle-orm/node-postgres";
import * as schemaDefs from "./schema";
import { Pool } from "pg";
import { Pool, PoolClient } from "pg";
import "dotenv/config";

let connectionString = process.env.FUTARCHY_PG_URL;
Expand All @@ -23,10 +23,16 @@ export async function getClient() {

export async function usingDb<T>(
fn: (connection: NodePgDatabase<typeof schemaDefs>) => Promise<T>
): Promise<T> {
const client = await pool.connect();
const connection = drizzle(pool, { schema: schemaDefs });
): Promise<T | undefined> {
let client: PoolClient;
try {
client = await pool.connect();
} catch (e) {
console.error(e);
return;
}
try {
const connection = drizzle(pool, { schema: schemaDefs });
const result = await fn(connection);
return result;
} finally {
Expand Down
113 changes: 60 additions & 53 deletions packages/indexer/src/builders/swaps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,17 @@ export class SwapPersistable {

async persist() {
try {
const upsertResult = await usingDb((db) =>
db
.insert(schema.transactions)
.values(this.transactionRecord)
.onConflictDoUpdate({
target: schema.transactions.txSig,
set: this.transactionRecord,
})
.returning({ txSig: schema.transactions.txSig })
);
const upsertResult =
(await usingDb((db) =>
db
.insert(schema.transactions)
.values(this.transactionRecord)
.onConflictDoUpdate({
target: schema.transactions.txSig,
set: this.transactionRecord,
})
.returning({ txSig: schema.transactions.txSig })
)) ?? [];
if (
upsertResult.length !== 1 ||
upsertResult[0].txSig !== this.transactionRecord.txSig
Expand All @@ -60,13 +61,14 @@ export class SwapPersistable {
)}`
);
}
const orderInsertRes = await usingDb((db) =>
db
.insert(schema.orders)
.values(this.ordersRecord)
.onConflictDoNothing()
.returning({ txSig: schema.takes.orderTxSig })
);
const orderInsertRes =
(await usingDb((db) =>
db
.insert(schema.orders)
.values(this.ordersRecord)
.onConflictDoNothing()
.returning({ txSig: schema.takes.orderTxSig })
)) ?? [];
if (orderInsertRes.length > 0) {
console.log(
"successfully inserted swap order record",
Expand All @@ -78,13 +80,14 @@ export class SwapPersistable {
${this.ordersRecord.orderTxSig}`
);
}
const takeInsertRes = await usingDb((db) =>
db
.insert(schema.takes)
.values(this.takesRecord)
.onConflictDoNothing()
.returning({ txSig: schema.takes.orderTxSig })
);
const takeInsertRes =
(await usingDb((db) =>
db
.insert(schema.takes)
.values(this.takesRecord)
.onConflictDoNothing()
.returning({ txSig: schema.takes.orderTxSig })
)) ?? [];
if (takeInsertRes.length > 0) {
logger.log(
`successfully inserted swap take record.
Expand All @@ -110,13 +113,14 @@ export class SwapBuilder {
): Promise<Result<SwapPersistable, TaggedUnion>> {
try {
// first check to see if swap is already persisted
const swapOrder = await usingDb((db) =>
db
.select()
.from(schema.orders)
.where(eq(schema.orders.orderTxSig, signature))
.execute()
);
const swapOrder =
(await usingDb((db) =>
db
.select()
.from(schema.orders)
.where(eq(schema.orders.orderTxSig, signature))
.execute()
)) ?? [];
if (swapOrder.length > 0) {
return Err({ type: SwapPersistableError.AlreadyPersistedSwap });
}
Expand Down Expand Up @@ -266,34 +270,37 @@ export class SwapBuilder {
// determine price
// NOTE: This is estimated given the output is a min expected value
// default is input / output (buying a token with USDC or whatever)
const marketAcctRecord = await usingDb((db) =>
db
.select()
.from(schema.markets)
.where(eq(schema.markets.marketAcct, marketAcct.pubkey))
.execute()
);
const marketAcctRecord =
(await usingDb((db) =>
db
.select()
.from(schema.markets)
.where(eq(schema.markets.marketAcct, marketAcct.pubkey))
.execute()
)) ?? [];
if (marketAcctRecord.length === 0) {
return Err({ type: AmmInstructionIndexerError.MissingMarket });
}
const baseToken = await usingDb((db) =>
db
.select()
.from(schema.tokens)
.where(eq(schema.tokens.mintAcct, marketAcctRecord[0].baseMintAcct))
.execute()
);
const baseToken =
(await usingDb((db) =>
db
.select()
.from(schema.tokens)
.where(eq(schema.tokens.mintAcct, marketAcctRecord[0].baseMintAcct))
.execute()
)) ?? [];
if (baseToken.length === 0) {
return Err({ type: AmmInstructionIndexerError.MissingMarket });
}
const quoteToken = await usingDb((db) =>
db
.select()
.from(schema.tokens)
.where(eq(schema.tokens.mintAcct, marketAcctRecord[0].quoteMintAcct))
.limit(1)
.execute()
);
const quoteToken =
(await usingDb((db) =>
db
.select()
.from(schema.tokens)
.where(eq(schema.tokens.mintAcct, marketAcctRecord[0].quoteMintAcct))
.limit(1)
.execute()
)) ?? [];
if (baseToken.length === 0) {
return Err({ type: AmmInstructionIndexerError.MissingMarket });
}
Expand Down
26 changes: 15 additions & 11 deletions packages/indexer/src/cli/txw/common/select-account.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import { usingDb, schema } from "@metadaoproject/indexer-db";
import inquirer from 'inquirer';
import inquirer from "inquirer";

export async function selectAccount(): Promise<string> {
const accounts = (await usingDb(db => db.select().from(schema.transactionWatchers))).map(({acct}) => acct);
const accounts = (
(await usingDb((db) => db.select().from(schema.transactionWatchers))) ?? []
).map(({ acct }) => acct);
const prompt = inquirer.createPromptModule();
const ACCOUNT_ANSWER = 'account';
const account: string = (await prompt([
{
type: 'list',
name: ACCOUNT_ANSWER,
message: 'Select account to reset:',
choices: accounts
}
]))[ACCOUNT_ANSWER];
const ACCOUNT_ANSWER = "account";
const account: string = (
await prompt([
{
type: "list",
name: ACCOUNT_ANSWER,
message: "Select account to reset:",
choices: accounts,
},
])
)[ACCOUNT_ANSWER];
return account;
}
Loading

0 comments on commit d63d50f

Please sign in to comment.