Skip to content

Commit

Permalink
Merge pull request #335 from metaDAOproject/feat/performance
Browse files Browse the repository at this point in the history
feat: aggressively backoff when db connect fails
  • Loading branch information
R-K-H authored Dec 6, 2024
2 parents d3a0abe + 5f727d7 commit 9b7365c
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 136 deletions.
46 changes: 33 additions & 13 deletions packages/database/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ import "dotenv/config";
let connectionString = process.env.FUTARCHY_PG_URL;

// Add retry configuration
const RETRY_ATTEMPTS = 5;
const INITIAL_RETRY_DELAY = 100; // Start with shorter delay
const MAX_RETRY_DELAY = 2000; // Max backoff delay
const RETRY_ATTEMPTS = 12;
const INITIAL_RETRY_DELAY = 1000; // Start with shorter delay
const MAX_RETRY_DELAY = 600000; // Max backoff delay
const ACQUIRE_TIMEOUT = 10000; // 10 second timeout for acquiring connection

// Add connection pool configuration
const poolConfig = {
connectionString: connectionString,
min: 5,
max: 100, // Reduced from 1000 to a more reasonable number
idleTimeoutMillis: 30000,
min: 20,
max: 1000,
idleTimeoutMillis: 30 * 1000,
connectionTimeoutMillis: 5000,
// Add error handling for the pool
async errorHandler(err: Error) {
Expand All @@ -31,6 +31,32 @@ pool.on('error', (err) => {
console.error('Unexpected pool error:', err);
});

// Add new constants for monitoring
const POOL_STATS_INTERVAL = 30000; // Log every 30 seconds
const POOL_WARNING_THRESHOLD = 300; // Warn when total connections exceed this

setInterval(() => {
const stats = pool.totalCount;
const idle = pool.idleCount;
const waiting = pool.waitingCount;
const active = stats - idle;

console.log('Database Pool Statistics:', {
total: stats,
active,
idle,
waiting,
available: poolConfig.max - stats
});

if (stats > POOL_WARNING_THRESHOLD) {
console.warn('High connection pool usage detected', {
total: stats,
threshold: POOL_WARNING_THRESHOLD
});
}
}, POOL_STATS_INTERVAL);

export async function getClient() {
return pool.connect();
}
Expand All @@ -53,14 +79,8 @@ export async function usingDb<T>(
)
]);

const connection = drizzle(client, { schema: schemaDefs });
const connection = drizzle(pool, { schema: schemaDefs });
const result = await fn(connection);

if (client) {
client.release();
client = undefined;
}

return result;
} catch (e) {
attempts++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@ object_relationships:
- name: program
using:
foreign_key_constraint_on: program_acct
- name: spot_market
using:
manual_configuration:
column_mapping:
base_acct: market_acct
insertion_order: null
remote_table:
name: markets
schema: public
- name: token
using:
foreign_key_constraint_on: base_acct
Expand Down
65 changes: 65 additions & 0 deletions packages/hasura/metadata/query_collections.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,68 @@
dao_slug
}
}
- name: proposals_by_pk
query: |
query proposals_by_pk ($proposal_acct: String!) {
proposals_by_pk(proposal_acct: $proposal_acct) {
autocrat_version
base_vault
completed_at
created_at
dao_acct
description_url
duration_in_slots
end_slot
ended_at
fail_market_acct
initial_slot
min_base_futarchic_liquidity
min_quote_futarchic_liquidity
pass_market_acct
pass_threshold_bps
pricing_model_fail_acct
pricing_model_pass_acct
proposal_acct
proposal_num
proposer_acct
quote_vault
status
twap_initial_observation
twap_max_observation_change_per_update
updated_at
}
}
- name: MyQuery
query: |
query MyQuery {
proposals(where: {proposal_acct:{_eq:"hEoj5ca54sU14MD3QqmKp734h7EgiuoPqyeWkBTUDr1"}}) {
pass_market {
market_acct
prices(limit: 1, order_by: {updated_slot:desc}) {
price
quote_amount
base_amount
}
}
fail_market {
market_acct
prices(limit: 1, order_by: {updated_slot:desc}) {
price
quote_amount
base_amount
}
}
}
}
- name: MyQuery2
query: |
query MyQuery {
proposal_bars(where: {proposal_acct:{_eq:"hEoj5ca54sU14MD3QqmKp734h7EgiuoPqyeWkBTUDr1"}}, order_by: [{bar_start_time:desc}], limit: 1) {
pass_price
fail_price
pass_base_amount
pass_quote_amount
fail_base_amount
fail_quote_amount
}
}
27 changes: 27 additions & 0 deletions packages/hasura/metadata/rest_endpoints.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,24 @@
- GET
name: fetchLatestFourProposals
url: fetchlatestfourproposals
- comment: ""
definition:
query:
collection_name: allowed-queries
query_name: MyQuery
methods:
- GET
name: MyQuery
url: myquery
- comment: ""
definition:
query:
collection_name: allowed-queries
query_name: MyQuery2
methods:
- GET
name: MyQuery2
url: myquery2
- comment: ""
definition:
query:
Expand All @@ -34,3 +52,12 @@
- GET
name: getProposalDescription
url: proposal-description/:pk
- comment: ""
definition:
query:
collection_name: allowed-queries
query_name: proposals_by_pk
methods:
- GET
name: proposals_by_pk
url: proposals/:proposal_acct
6 changes: 2 additions & 4 deletions packages/indexer/src/v3_indexer/builders/swaps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import {
import { logger } from "../../logger";
import { getMainIxTypeFromTransaction } from "../transaction/watcher";
import { getHumanPrice } from "../usecases/math";
import { AmmMarketAccountUpdateIndexer } from '../indexers/amm/amm-market-account-indexer';
import { indexAmmMarketAccountWithContext } from '../indexers/amm/utils';
import { PublicKey, RpcResponseAndContext, AccountInfo } from "@solana/web3.js";
import { rpc } from "../../rpc-wrapper";

Expand Down Expand Up @@ -221,7 +221,6 @@ export class SwapBuilder {
} else {
// handle non-swap transactions (add/remove liquidity, crank, etc)
// find market account from instructions
console.log("builder::buildOrderFromSwapIx::looking for market account in non swap txn");
let marketAcct: PublicKey | undefined;
for (const ix of tx.instructions) {
const candidate = ix.accountsWithData.find((a) => a.name === "amm");
Expand All @@ -231,7 +230,6 @@ export class SwapBuilder {
}
}
if (marketAcct) {
console.log("builder::buildOrderFromSwapIx::market found for non swap txn, indexing price and twap", marketAcct);
this.indexPriceAndTWAPForAccount(marketAcct);
}
}
Expand Down Expand Up @@ -263,7 +261,7 @@ export class SwapBuilder {
) as RpcResponseAndContext<AccountInfo<Buffer> | null>;

if (accountInfo.value) {
const res = await AmmMarketAccountUpdateIndexer.index(
const res = await indexAmmMarketAccountWithContext(
accountInfo.value,
account,
accountInfo.context
Expand Down
Loading

0 comments on commit 9b7365c

Please sign in to comment.