diff --git a/indexer/packages/postgres/src/index.ts b/indexer/packages/postgres/src/index.ts index 52b328a1e7..43637830ea 100644 --- a/indexer/packages/postgres/src/index.ts +++ b/indexer/packages/postgres/src/index.ts @@ -71,3 +71,4 @@ export * as testConstants from '../__tests__/helpers/constants'; export * as testConversionHelpers from '../__tests__/helpers/conversion-helpers'; export * as helpers from './db/helpers'; +export * as loopHelpers from './loops/loopHelper'; diff --git a/indexer/packages/redis/__tests__/caches/orderbook-mid-prices-cache.test.ts b/indexer/packages/redis/__tests__/caches/orderbook-mid-prices-cache.test.ts index 5dfd662f68..339611ae74 100644 --- a/indexer/packages/redis/__tests__/caches/orderbook-mid-prices-cache.test.ts +++ b/indexer/packages/redis/__tests__/caches/orderbook-mid-prices-cache.test.ts @@ -1,24 +1,50 @@ import { deleteAllAsync } from '../../src/helpers/redis'; import { redis as client } from '../helpers/utils'; import { - setPrice, - getMedianPrice, + fetchAndCacheOrderbookMidPrices, + getMedianPrices, ORDERBOOK_MID_PRICES_CACHE_KEY_PREFIX, } from '../../src/caches/orderbook-mid-prices-cache'; +import * as OrderbookLevelsCache from '../../src/caches/orderbook-levels-cache'; + +// Mock the OrderbookLevelsCache module +jest.mock('../../src/caches/orderbook-levels-cache', () => ({ + getOrderBookMidPrice: jest.fn(), +})); describe('orderbook-mid-prices-cache', () => { - const ticker: string = 'BTC-USD'; + const defaultTicker: string = 'BTC-USD'; + + // Helper function to set a price for a given market ticker + const setPrice = (marketTicker: string, price: string) => { + const now = Date.now(); + client.zadd(`${ORDERBOOK_MID_PRICES_CACHE_KEY_PREFIX}${marketTicker}`, now, price); + }; + + afterAll(async () => { + await deleteAllAsync(client); + }); beforeEach(async () => { await deleteAllAsync(client); + jest.resetAllMocks(); + (OrderbookLevelsCache.getOrderBookMidPrice as jest.Mock).mockReset(); }); - describe('setPrice', () => { + describe('fetchAndCacheOrderbookMidPrices', () => { it('sets a price for a ticker', async () => { - await setPrice(client, ticker, '50000'); + (OrderbookLevelsCache.getOrderBookMidPrice as jest.Mock).mockResolvedValue('50000'); + + await fetchAndCacheOrderbookMidPrices(client, [defaultTicker]); + + expect(OrderbookLevelsCache.getOrderBookMidPrice).toHaveBeenCalledTimes(1); + expect(OrderbookLevelsCache.getOrderBookMidPrice).toHaveBeenCalledWith( + defaultTicker, + client, + ); - await client.zrange( - `${ORDERBOOK_MID_PRICES_CACHE_KEY_PREFIX}${ticker}`, + client.zrange( + `${ORDERBOOK_MID_PRICES_CACHE_KEY_PREFIX}${defaultTicker}`, 0, -1, (_: any, response: string[]) => { @@ -28,109 +54,278 @@ describe('orderbook-mid-prices-cache', () => { }); it('sets multiple prices for a ticker', async () => { - await Promise.all([ - setPrice(client, ticker, '50000'), - setPrice(client, ticker, '51000'), - setPrice(client, ticker, '49000'), - ]); - - await client.zrange( - `${ORDERBOOK_MID_PRICES_CACHE_KEY_PREFIX}${ticker}`, + const mockPrices = ['49000', '50000', '51000']; + for (const price of mockPrices) { + (OrderbookLevelsCache.getOrderBookMidPrice as jest.Mock).mockResolvedValue(price); + await fetchAndCacheOrderbookMidPrices(client, [defaultTicker]); + } + + client.zrange( + `${ORDERBOOK_MID_PRICES_CACHE_KEY_PREFIX}${defaultTicker}`, 0, -1, (_: any, response: string[]) => { - expect(response).toEqual(['49000', '50000', '51000']); + expect(response).toEqual(mockPrices); }, ); + expect(OrderbookLevelsCache.getOrderBookMidPrice).toHaveBeenCalledTimes(3); + }); + + it('sets prices for multiple tickers', async () => { + const ticker2 = 'SHIB-USD'; + const ticker3 = 'SOL-USD'; + const mockPrices = { + [defaultTicker]: '49000', + [ticker2]: '50000', + [ticker3]: '51000', + }; + + // Mock the getOrderBookMidPrice function for each ticker + (OrderbookLevelsCache.getOrderBookMidPrice as jest.Mock) + .mockResolvedValueOnce(mockPrices[defaultTicker]) + .mockResolvedValueOnce(mockPrices[ticker2]) + .mockResolvedValueOnce(mockPrices[ticker3]); + + await fetchAndCacheOrderbookMidPrices(client, [defaultTicker, ticker2, ticker3]); + expect(OrderbookLevelsCache.getOrderBookMidPrice).toHaveBeenCalledTimes(3); + + for (const [key, price] of Object.entries(mockPrices)) { + client.zrange(`${ORDERBOOK_MID_PRICES_CACHE_KEY_PREFIX}${key}`, + 0, + -1, + (err: Error, res: string[]) => { + expect(res).toHaveLength(1); + expect(res[0]).toEqual(price); + }); + } }); }); describe('getMedianPrice', () => { + it('returns null when no prices are set', async () => { - const result = await getMedianPrice(client, ticker); - expect(result).toBeNull(); + const result: {[ticker: string]: string | undefined} = await getMedianPrices( + client, + [defaultTicker], + ); + expect(result).toEqual({ 'BTC-USD': undefined }); }); it('returns the median price for odd number of prices', async () => { - await Promise.all([ - setPrice(client, ticker, '50000'), - setPrice(client, ticker, '51000'), - setPrice(client, ticker, '49000'), - ]); - - const result = await getMedianPrice(client, ticker); - expect(result).toBe('50000'); + setPrice(defaultTicker, '51000'); + setPrice(defaultTicker, '50000'); + setPrice(defaultTicker, '49000'); + + const result: {[ticker: string]: string | undefined} = await getMedianPrices( + client, + [defaultTicker], + ); + expect(result).toEqual({ 'BTC-USD': '50000' }); }); it('returns the median price for even number of prices', async () => { - await Promise.all([ - setPrice(client, ticker, '50000'), - setPrice(client, ticker, '51000'), - setPrice(client, ticker, '49000'), - setPrice(client, ticker, '52000'), - ]); - - const result = await getMedianPrice(client, ticker); - expect(result).toBe('50500'); + setPrice(defaultTicker, '50000'); + setPrice(defaultTicker, '51000'); + setPrice(defaultTicker, '49000'); + setPrice(defaultTicker, '52000'); + + const result: {[ticker: string]: string | undefined} = await getMedianPrices( + client, + [defaultTicker], + ); + expect(result).toEqual({ 'BTC-USD': '50500' }); }); - it('returns the correct median price after 5 seconds', async () => { + it('returns the correct median price after 30 seconds', async () => { jest.useFakeTimers(); + // Mock the getOrderBookMidPrice function for the ticker + const mockPrices: string[] = ['50000', '51000', '49000', '48000', '52000', '53000']; + + (OrderbookLevelsCache.getOrderBookMidPrice as jest.Mock) + .mockResolvedValueOnce(mockPrices[0]) + .mockResolvedValueOnce(mockPrices[1]) + .mockResolvedValueOnce(mockPrices[2]) + .mockResolvedValueOnce(mockPrices[3]) + .mockResolvedValueOnce(mockPrices[4]) + .mockResolvedValueOnce(mockPrices[5]); - const nowSeconds = Math.floor(Date.now() / 1000); - jest.setSystemTime(nowSeconds * 1000); + // Fetch and cache initial prices + await fetchAndCacheOrderbookMidPrices(client, [defaultTicker, defaultTicker]); + expect(OrderbookLevelsCache.getOrderBookMidPrice).toHaveBeenCalledTimes(2); - await Promise.all([ - setPrice(client, ticker, '50000'), - setPrice(client, ticker, '51000'), - ]); + // Advance time and fetch more prices + jest.advanceTimersByTime(31000); // Advance time by 31 seconds + await fetchAndCacheOrderbookMidPrices( + client, + [defaultTicker, defaultTicker, defaultTicker, defaultTicker], + ); - jest.advanceTimersByTime(6000); // Advance time by 6 seconds - await Promise.all([ - setPrice(client, ticker, '49000'), - setPrice(client, ticker, '48000'), - setPrice(client, ticker, '52000'), - setPrice(client, ticker, '53000'), - ]); + client.zrange(`${ORDERBOOK_MID_PRICES_CACHE_KEY_PREFIX}${defaultTicker}`, + 0, + -1, + (err: Error, res: string[]) => { + expect(res).toHaveLength(4); + }); + expect(OrderbookLevelsCache.getOrderBookMidPrice).toHaveBeenCalledTimes(6); - const result = await getMedianPrice(client, ticker); - expect(result).toBe('50500'); + // Check the median price + const result:{[ticker: string]: string | undefined} = await getMedianPrices( + client, + [defaultTicker], + ); + // Median of last 4 prices, as first two should have expired after moving clock forward + expect(result).toEqual({ 'BTC-USD': '50500' }); jest.useRealTimers(); }); it('returns the correct median price for small numbers with even number of prices', async () => { - await Promise.all([ - setPrice(client, ticker, '0.00000000002345'), - setPrice(client, ticker, '0.00000000002346'), - ]); + setPrice(defaultTicker, '0.00000000002345'); + setPrice(defaultTicker, '0.00000000002346'); - const midPrice1 = await getMedianPrice(client, ticker); - expect(midPrice1).toEqual('0.000000000023455'); + const midPrice1: { [ticker: string]: string | undefined } = await getMedianPrices( + client, + [defaultTicker], + ); + expect(midPrice1).toEqual({ 'BTC-USD': '0.000000000023455' }); }); it('returns the correct median price for small numbers with odd number of prices', async () => { - await Promise.all([ - setPrice(client, ticker, '0.00000000001'), - setPrice(client, ticker, '0.00000000002'), - setPrice(client, ticker, '0.00000000003'), - setPrice(client, ticker, '0.00000000004'), - setPrice(client, ticker, '0.00000000005'), - ]); + setPrice(defaultTicker, '0.00000000001'); + setPrice(defaultTicker, '0.00000000002'); + setPrice(defaultTicker, '0.00000000003'); + setPrice(defaultTicker, '0.00000000004'); + setPrice(defaultTicker, '0.00000000005'); + + const midPrice1: { [ticker: string]: string | undefined } = await getMedianPrices( + client, + [defaultTicker], + ); + expect(midPrice1).toEqual({ 'BTC-USD': '0.00000000003' }); - const midPrice1 = await getMedianPrice(client, ticker); - expect(midPrice1).toEqual('0.00000000003'); + await deleteAllAsync(client); + + setPrice(defaultTicker, '0.00000847007'); + setPrice(defaultTicker, '0.00000847006'); + setPrice(defaultTicker, '0.00000847008'); + + const midPrice2: { [ticker: string]: string | undefined } = await getMedianPrices( + client, + [defaultTicker], + ); + expect(midPrice2).toEqual({ 'BTC-USD': '0.00000847007' }); + }); + }); + + describe('getMedianPrices for multiple markets', () => { + const btcUsdTicker = 'BTC-USD'; + const ethUsdTicker = 'ETH-USD'; + const solUsdTicker = 'SOL-USD'; + beforeEach(async () => { await deleteAllAsync(client); + }); + + it('returns correct median prices for multiple markets with odd number of prices', async () => { + // Set prices for BTC-USD + setPrice(btcUsdTicker, '50000'); + setPrice(btcUsdTicker, '51000'); + setPrice(btcUsdTicker, '49000'); + + // Set prices for ETH-USD + setPrice(ethUsdTicker, '3000'); + setPrice(ethUsdTicker, '3100'); + setPrice(ethUsdTicker, '2900'); + + // Set prices for SOL-USD + setPrice(solUsdTicker, '100'); + setPrice(solUsdTicker, '102'); + setPrice(solUsdTicker, '98'); + + const result: { [ticker: string]: string | undefined } = await getMedianPrices( + client, + [btcUsdTicker, ethUsdTicker, solUsdTicker], + ); + expect(result).toEqual({ + 'BTC-USD': '50000', + 'ETH-USD': '3000', + 'SOL-USD': '100', + }); + }); - await Promise.all([ - setPrice(client, ticker, '0.00000847007'), - setPrice(client, ticker, '0.00000847006'), - setPrice(client, ticker, '0.00000847008'), - ]); + it('returns correct median prices for multiple markets with even number of prices', async () => { + // Set prices for BTC-USD + setPrice(btcUsdTicker, '50000'); + setPrice(btcUsdTicker, '51000'); + setPrice(btcUsdTicker, '49000'); + setPrice(btcUsdTicker, '52000'); - const midPrice2 = await getMedianPrice(client, ticker); - expect(midPrice2).toEqual('0.00000847007'); + // Set prices for ETH-USD + setPrice(ethUsdTicker, '3000'); + setPrice(ethUsdTicker, '3100'); + setPrice(ethUsdTicker, '2900'); + setPrice(ethUsdTicker, '3200'); + + const result: { [ticker: string]: string | undefined } = await getMedianPrices( + client, + [btcUsdTicker, ethUsdTicker], + ); + expect(result).toEqual({ + 'BTC-USD': '50500', + 'ETH-USD': '3050', + }); + }); + + it('handles markets with different numbers of prices', async () => { + // Set prices for BTC-USD (odd number) + setPrice(btcUsdTicker, '50000'); + setPrice(btcUsdTicker, '51000'); + setPrice(btcUsdTicker, '49000'); + + // Set prices for ETH-USD (even number) + setPrice(ethUsdTicker, '3000'); + setPrice(ethUsdTicker, '3100'); + setPrice(ethUsdTicker, '2900'); + setPrice(ethUsdTicker, '3200'); + + // Set no prices for SOL-USD + + const result: { [ticker: string]: string | undefined } = await getMedianPrices( + client, + [btcUsdTicker, ethUsdTicker, solUsdTicker], + ); + expect(result).toEqual({ + 'BTC-USD': '50000', + 'ETH-USD': '3050', + 'SOL-USD': undefined, + }); + }); + + it('calculates correct median prices for markets with small and large numbers', async () => { + // Set prices for BTC-USD (large numbers) + setPrice(btcUsdTicker, '50000.12345'); + setPrice(btcUsdTicker, '50000.12346'); + + // Set prices for ETH-USD (medium numbers) + setPrice(ethUsdTicker, '3000.5'); + setPrice(ethUsdTicker, '3000.6'); + setPrice(ethUsdTicker, '3000.7'); + + // Set prices for SOL-USD (small numbers) + setPrice(solUsdTicker, '0.00000123'); + setPrice(solUsdTicker, '0.00000124'); + setPrice(solUsdTicker, '0.00000125'); + setPrice(solUsdTicker, '0.00000126'); + + const result: { [ticker: string]: string | undefined } = await getMedianPrices( + client, + [btcUsdTicker, ethUsdTicker, solUsdTicker], + ); + expect(result).toEqual({ + 'BTC-USD': '50000.123455', + 'ETH-USD': '3000.6', + 'SOL-USD': '0.000001245', + }); }); }); }); diff --git a/indexer/packages/redis/src/caches/orderbook-mid-prices-cache.ts b/indexer/packages/redis/src/caches/orderbook-mid-prices-cache.ts index ece95a3ca2..bec1c50fa4 100644 --- a/indexer/packages/redis/src/caches/orderbook-mid-prices-cache.ts +++ b/indexer/packages/redis/src/caches/orderbook-mid-prices-cache.ts @@ -1,9 +1,11 @@ +import { logger } from '@dydxprotocol-indexer/base'; import Big from 'big.js'; import { Callback, RedisClient } from 'redis'; +import { getOrderBookMidPrice } from './orderbook-levels-cache'; import { - addMarketPriceScript, - getMarketMedianScript, + addOrderbookMidPricesScript, + getOrderbookMidPricesScript, } from './scripts'; // Cache of orderbook prices for each clob pair @@ -20,73 +22,89 @@ function getOrderbookMidPriceCacheKey(ticker: string): string { } /** - * Adds a price to the market prices cache for a given ticker. - * Uses a Lua script to add the price with a timestamp to a sorted set in Redis. + * Fetches and caches mid prices for multiple tickers. * @param client The Redis client - * @param ticker The ticker symbol - * @param price The price to be added - * @returns A promise that resolves when the operation is complete + * @param tickers An array of ticker symbols + * @returns A promise that resolves when all prices are fetched and cached */ -export async function setPrice( +export async function fetchAndCacheOrderbookMidPrices( client: RedisClient, - ticker: string, - price: string, + tickers: string[], ): Promise { - // Number of keys for the lua script. - const numKeys: number = 1; - - let evalAsync: ( - marketCacheKey: string, - ) => Promise = (marketCacheKey) => { + // Fetch midPrices and filter out undefined values + const cacheKeyPricePairs: ({ cacheKey: string, midPrice: string } | null)[] = await Promise.all( + tickers.map(async (ticker) => { + const cacheKey: string = getOrderbookMidPriceCacheKey(ticker); + const midPrice: string | undefined = await getOrderBookMidPrice(ticker, client); + if (midPrice !== undefined) { + return { cacheKey, midPrice }; + } + return null; + }), + ); - return new Promise((resolve, reject) => { - const callback: Callback = ( - err: Error | null, - ) => { - if (err) { - return reject(err); - } - return resolve(); - }; + // Filter out null values + const validPairs: { cacheKey: string, midPrice: string }[] = cacheKeyPricePairs.filter( + (pair): pair is { cacheKey: string, midPrice: string } => pair !== null, + ); + if (validPairs.length === 0) { + // No valid midPrices to cache + return; + } - const nowSeconds = Math.floor(Date.now() / 1000); // Current time in seconds - client.evalsha( - addMarketPriceScript.hash, - numKeys, - marketCacheKey, - price, - nowSeconds, - callback, - ); + const nowSeconds: number = Math.floor(Date.now() / 1000); // Current time in seconds + // Extract cache keys and prices + const priceValues: string[] = validPairs.map((pair) => pair.midPrice); + const priceCacheKeys: string[] = validPairs.map((pair) => { + logger.info({ + at: 'orderbook-mid-prices-cache#fetchAndCacheOrderbookMidPrices', + message: 'Caching orderbook mid price', + cacheKey: pair.cacheKey, + midPrice: pair.midPrice, }); - }; - evalAsync = evalAsync.bind(client); - - return evalAsync( - getOrderbookMidPriceCacheKey(ticker), - ); + return pair.cacheKey; + }); + + return new Promise((resolve, reject) => { + client.evalsha( + addOrderbookMidPricesScript.hash, + priceCacheKeys.length, + ...priceCacheKeys, + ...priceValues, + nowSeconds, + (err: Error | null) => { + if (err) { + reject(err); + } else { + resolve(); + } + }, + ); + }); } /** - * Retrieves the median price for a given ticker from the cache. - * Uses a Lua script to fetch either the middle element (for odd number of prices) - * or the two middle elements (for even number of prices) from a sorted set in Redis. - * If two middle elements are returned, their average is calculated in JavaScript. + * Retrieves the median prices for a given array of tickers from the cache. * @param client The Redis client - * @param ticker The ticker symbol - * @returns A promise that resolves with the median price as a string, or null if not found + * @param tickers Array of ticker symbols + * @returns A promise that resolves with an object mapping tickers + * to their median prices (as strings) or undefined if not found */ -export async function getMedianPrice(client: RedisClient, ticker: string): Promise { +export async function getMedianPrices( + client: RedisClient, + tickers: string[], +): Promise<{ [ticker: string]: string | undefined }> { + let evalAsync: ( - marketCacheKey: string, - ) => Promise = ( - marketCacheKey, + marketCacheKeys: string[], + ) => Promise = ( + marketCacheKeys, ) => { return new Promise((resolve, reject) => { - const callback: Callback = ( + const callback: Callback = ( err: Error | null, - results: string[], + results: string[][], ) => { if (err) { return reject(err); @@ -95,33 +113,46 @@ export async function getMedianPrice(client: RedisClient, ticker: string): Promi }; client.evalsha( - getMarketMedianScript.hash, - 1, - marketCacheKey, + getOrderbookMidPricesScript.hash, // The Lua script to get cached prices + marketCacheKeys.length, + ...marketCacheKeys, callback, ); }); }; evalAsync = evalAsync.bind(client); - const prices = await evalAsync( - getOrderbookMidPriceCacheKey(ticker), - ); - - if (!prices || prices.length === 0) { - return null; - } - - if (prices.length === 1) { - return Big(prices[0]).toFixed(); - } - - if (prices.length === 2) { - const [price1, price2] = prices.map((price) => { - return Big(price); - }); - return price1.plus(price2).div(2).toFixed(); - } - - return null; + // Map tickers to cache keys + const marketCacheKeys: string[] = tickers.map(getOrderbookMidPriceCacheKey); + // Fetch the prices arrays from Redis (without scores) + const pricesArrays: string[][] = await evalAsync(marketCacheKeys); + + const result: { [ticker: string]: string | undefined } = {}; + tickers.forEach((ticker, index) => { + const prices = pricesArrays[index]; + + // Check if there are any prices + if (!prices || prices.length === 0) { + result[ticker] = undefined; + return; + } + + // Convert the prices to Big.js objects for precision + const bigPrices: Big[] = prices.map((price) => Big(price)); + + // Sort the prices in ascending order + bigPrices.sort((a, b) => a.cmp(b)); + + // Calculate the median + const mid: number = Math.floor(bigPrices.length / 2); + if (bigPrices.length % 2 === 1) { + // Odd number of prices: the middle one is the median + result[ticker] = bigPrices[mid].toFixed(); + } else { + // Even number of prices: average the two middle ones + result[ticker] = bigPrices[mid - 1].plus(bigPrices[mid]).div(2).toFixed(); + } + }); + + return result; } diff --git a/indexer/packages/redis/src/caches/scripts.ts b/indexer/packages/redis/src/caches/scripts.ts index f4f74bffd5..f9ff244e7c 100644 --- a/indexer/packages/redis/src/caches/scripts.ts +++ b/indexer/packages/redis/src/caches/scripts.ts @@ -63,8 +63,8 @@ export const removeOrderScript: LuaScript = newLuaScript('removeOrder', '../scri export const addCanceledOrderIdScript: LuaScript = newLuaScript('addCanceledOrderId', '../scripts/add_canceled_order_id.lua'); export const addStatefulOrderUpdateScript: LuaScript = newLuaScript('addStatefulOrderUpdate', '../scripts/add_stateful_order_update.lua'); export const removeStatefulOrderUpdateScript: LuaScript = newLuaScript('removeStatefulOrderUpdate', '../scripts/remove_stateful_order_update.lua'); -export const addMarketPriceScript: LuaScript = newLuaScript('addMarketPrice', '../scripts/add_market_price.lua'); -export const getMarketMedianScript: LuaScript = newLuaScript('getMarketMedianPrice', '../scripts/get_market_median_price.lua'); +export const addOrderbookMidPricesScript: LuaScript = newLuaScript('addOrderbookMidPrices', '../scripts/add_orderbook_mid_prices.lua'); +export const getOrderbookMidPricesScript: LuaScript = newLuaScript('getOrderbookMidPrices', '../scripts/get_orderbook_mid_prices.lua'); export const allLuaScripts: LuaScript[] = [ deleteZeroPriceLevelScript, @@ -77,6 +77,6 @@ export const allLuaScripts: LuaScript[] = [ addCanceledOrderIdScript, addStatefulOrderUpdateScript, removeStatefulOrderUpdateScript, - addMarketPriceScript, - getMarketMedianScript, + addOrderbookMidPricesScript, + getOrderbookMidPricesScript, ]; diff --git a/indexer/packages/redis/src/scripts/add_market_price.lua b/indexer/packages/redis/src/scripts/add_market_price.lua deleted file mode 100644 index 0e1467bb31..0000000000 --- a/indexer/packages/redis/src/scripts/add_market_price.lua +++ /dev/null @@ -1,17 +0,0 @@ --- Key for the ZSET storing price data -local priceCacheKey = KEYS[1] --- Price to be added -local price = tonumber(ARGV[1]) --- Current timestamp -local nowSeconds = tonumber(ARGV[2]) --- Time window (5 seconds) -local fiveSeconds = 5 - --- 1. Add the price to the sorted set (score is the current timestamp) -redis.call("zadd", priceCacheKey, nowSeconds, price) - --- 2. Remove any entries older than 5 seconds -local cutoffTime = nowSeconds - fiveSeconds -redis.call("zremrangebyscore", priceCacheKey, "-inf", cutoffTime) - -return true \ No newline at end of file diff --git a/indexer/packages/redis/src/scripts/add_orderbook_mid_prices.lua b/indexer/packages/redis/src/scripts/add_orderbook_mid_prices.lua new file mode 100644 index 0000000000..63590b3e05 --- /dev/null +++ b/indexer/packages/redis/src/scripts/add_orderbook_mid_prices.lua @@ -0,0 +1,38 @@ +-- KEYS contains the market cache keys +-- ARGV contains the prices for each market and a single timestamp at the end + +local numKeys = #KEYS +local numArgs = #ARGV + +-- Get the timestamp from the last argument +local timestamp = tonumber(ARGV[numArgs]) + +-- Time window (30 seconds) +local thirtySeconds = 30 + +-- Validate the timestamp +if not timestamp then + return redis.error_reply("Invalid timestamp") +end + +-- Calculate the cutoff time for removing old prices +local cutoffTime = timestamp - thirtySeconds + +-- Iterate through each key (market) and corresponding price +for i = 1, numKeys do + local priceCacheKey = KEYS[i] + local price = tonumber(ARGV[i]) + + -- Validate the price + if not price then + return redis.error_reply("Invalid price for key " .. priceCacheKey) + end + + -- Add the price to the sorted set with the current timestamp as the score + redis.call("ZADD", priceCacheKey, timestamp, price) + + -- Remove entries older than the cutoff time (older than 30 seconds) + redis.call("ZREMRANGEBYSCORE", priceCacheKey, "-inf", cutoffTime) +end + +return true diff --git a/indexer/packages/redis/src/scripts/get_market_median_price.lua b/indexer/packages/redis/src/scripts/get_market_median_price.lua deleted file mode 100644 index a318296f20..0000000000 --- a/indexer/packages/redis/src/scripts/get_market_median_price.lua +++ /dev/null @@ -1,22 +0,0 @@ --- Key for the sorted set storing price data -local priceCacheKey = KEYS[1] - --- Get all the prices from the sorted set (ascending order) -local prices = redis.call('zrange', priceCacheKey, 0, -1) - --- If no prices are found, return nil -if #prices == 0 then - return nil -end - --- Calculate the middle index -local middle = math.floor(#prices / 2) - --- Calculate median -if #prices % 2 == 0 then - -- If even, return both prices, division will be handled in Javascript - return {prices[middle], prices[middle + 1]} -else - -- If odd, return the middle element - return {prices[middle + 1]} -end diff --git a/indexer/packages/redis/src/scripts/get_orderbook_mid_prices.lua b/indexer/packages/redis/src/scripts/get_orderbook_mid_prices.lua new file mode 100644 index 0000000000..d897ccc9f7 --- /dev/null +++ b/indexer/packages/redis/src/scripts/get_orderbook_mid_prices.lua @@ -0,0 +1,10 @@ +-- KEYS is an array of cache keys for a market + +local results = {} +for i, key in ipairs(KEYS) do + -- Get the prices for each key, but limit to a maximum of 10 + local prices = redis.call("ZRANGE", key, 0, 9) + results[i] = prices +end + +return results diff --git a/indexer/services/ender/__tests__/caches/orderbook-mid-price-memory-cache.test.ts b/indexer/services/ender/__tests__/caches/orderbook-mid-price-memory-cache.test.ts new file mode 100644 index 0000000000..62f21dac93 --- /dev/null +++ b/indexer/services/ender/__tests__/caches/orderbook-mid-price-memory-cache.test.ts @@ -0,0 +1,83 @@ +import { OrderbookMidPricesCache } from '@dydxprotocol-indexer/redis'; +import * as orderbookMidPriceMemoryCache from '../../src/caches/orderbook-mid-price-memory-cache'; +import { + dbHelpers, + testMocks, +} from '@dydxprotocol-indexer/postgres'; +import config from '../../src/config'; +import { logger, stats } from '@dydxprotocol-indexer/base'; + +describe('orderbook-mid-price-memory-cache', () => { + + beforeAll(async () => { + await dbHelpers.migrate(); + await dbHelpers.clearData(); + }); + + beforeEach(async () => { + await testMocks.seedData(); + }); + + afterEach(async () => { + await dbHelpers.clearData(); + }); + + describe('getOrderbookMidPrice', () => { + it('should return the mid price for a given ticker', async () => { + jest.spyOn(OrderbookMidPricesCache, 'getMedianPrices') + .mockReturnValue(Promise.resolve({ 'BTC-USD': '300', 'ETH-USD': '200' })); + + await orderbookMidPriceMemoryCache.updateOrderbookMidPrices(); + + expect(orderbookMidPriceMemoryCache.getOrderbookMidPrice('BTC-USD')).toBe('300'); + expect(orderbookMidPriceMemoryCache.getOrderbookMidPrice('ETH-USD')).toBe('200'); + }); + }); + + describe('updateOrderbookMidPrices', () => { + it('should update the orderbook mid price cache', async () => { + const mockMedianPrices = { + 'BTC-USD': '50000', + 'ETH-USD': '3000', + 'SOL-USD': '1000', + }; + + jest.spyOn(OrderbookMidPricesCache, 'getMedianPrices') + .mockResolvedValue(mockMedianPrices); + + await orderbookMidPriceMemoryCache.updateOrderbookMidPrices(); + + expect(orderbookMidPriceMemoryCache.getOrderbookMidPrice('BTC-USD')).toBe('50000'); + expect(orderbookMidPriceMemoryCache.getOrderbookMidPrice('ETH-USD')).toBe('3000'); + expect(orderbookMidPriceMemoryCache.getOrderbookMidPrice('SOL-USD')).toBe('1000'); + }); + + it('should handle errors and log them', async () => { + const mockError = new Error('Test error'); + jest.spyOn(OrderbookMidPricesCache, 'getMedianPrices').mockImplementation(() => { + throw mockError; + }); + + jest.spyOn(logger, 'error'); + await orderbookMidPriceMemoryCache.updateOrderbookMidPrices(); + + expect(logger.error).toHaveBeenCalledWith( + expect.objectContaining({ + at: 'orderbook-mid-price-cache#updateOrderbookMidPrices', + message: 'Failed to fetch OrderbookMidPrices', + error: mockError, + }), + ); + }); + + it('should record timing stats', async () => { + jest.spyOn(stats, 'timing'); + await orderbookMidPriceMemoryCache.updateOrderbookMidPrices(); + + expect(stats.timing).toHaveBeenCalledWith( + `${config.SERVICE_NAME}.update_orderbook_mid_prices_cache.timing`, + expect.any(Number), + ); + }); + }); +}); diff --git a/indexer/services/ender/__tests__/lib/candles-generator.test.ts b/indexer/services/ender/__tests__/lib/candles-generator.test.ts index cd014d3eaf..f20b68354c 100644 --- a/indexer/services/ender/__tests__/lib/candles-generator.test.ts +++ b/indexer/services/ender/__tests__/lib/candles-generator.test.ts @@ -25,6 +25,7 @@ import _ from 'lodash'; import { clearCandlesMap, getCandlesMap, startCandleCache, } from '../../src/caches/candle-cache'; +import * as OrderbookMidPriceMemoryCache from '../../src/caches/orderbook-mid-price-memory-cache'; import config from '../../src/config'; import { CandlesGenerator, getOrderbookMidPriceMap } from '../../src/lib/candles-generator'; import { KafkaPublisher } from '../../src/lib/kafka-publisher'; @@ -34,8 +35,8 @@ import { contentToSingleTradeMessage, createConsolidatedKafkaEventFromTrade } fr import { redisClient } from '../../src/helpers/redis/redis-controller'; import { redis, - OrderbookMidPricesCache, } from '@dydxprotocol-indexer/redis'; +import { ORDERBOOK_MID_PRICES_CACHE_KEY_PREFIX } from '@dydxprotocol-indexer/redis/build/src/caches/orderbook-mid-prices-cache'; describe('candleHelper', () => { beforeAll(async () => { @@ -61,6 +62,12 @@ describe('candleHelper', () => { jest.resetAllMocks(); }); + // Helper function to set a price for a given market ticker + const setCachePrice = (marketTicker: string, price: string) => { + const now = Date.now(); + redisClient.zadd(`${ORDERBOOK_MID_PRICES_CACHE_KEY_PREFIX}${marketTicker}`, now, price); + }; + const defaultPrice: string = defaultTradeContent.price; const defaultPrice2: string = '15000'; const defaultCandle: CandleCreateObject = { @@ -115,11 +122,10 @@ describe('candleHelper', () => { ]); const ticker = 'BTC-USD'; - await Promise.all([ - OrderbookMidPricesCache.setPrice(redisClient, ticker, '100000'), - OrderbookMidPricesCache.setPrice(redisClient, ticker, '105000'), - OrderbookMidPricesCache.setPrice(redisClient, ticker, '110000'), - ]); + setCachePrice(ticker, '100000'); + setCachePrice(ticker, '105000'); + setCachePrice(ticker, '110000'); + await OrderbookMidPriceMemoryCache.updateOrderbookMidPrices(); await runUpdateCandles(publisher); @@ -137,8 +143,8 @@ describe('candleHelper', () => { id: CandleTable.uuid(currentStartedAt, defaultCandle.ticker, resolution), startedAt: currentStartedAt, resolution, - orderbookMidPriceClose: null, - orderbookMidPriceOpen: null, + orderbookMidPriceClose: '105000', + orderbookMidPriceOpen: '105000', }; }, ); @@ -160,11 +166,10 @@ describe('candleHelper', () => { ]); const ticker = 'BTC-USD'; - await Promise.all([ - OrderbookMidPricesCache.setPrice(redisClient, ticker, '80000'), - OrderbookMidPricesCache.setPrice(redisClient, ticker, '81000'), - OrderbookMidPricesCache.setPrice(redisClient, ticker, '80500'), - ]); + setCachePrice(ticker, '80000'); + setCachePrice(ticker, '81000'); + setCachePrice(ticker, '80500'); + await OrderbookMidPriceMemoryCache.updateOrderbookMidPrices(); // Create Perpetual Position to set open position const openInterest: string = '100'; @@ -187,8 +192,8 @@ describe('candleHelper', () => { startedAt: currentStartedAt, resolution, startingOpenInterest: openInterest, - orderbookMidPriceClose: null, - orderbookMidPriceOpen: null, + orderbookMidPriceClose: '80500', + orderbookMidPriceOpen: '80500', }; }, ); @@ -311,8 +316,8 @@ describe('candleHelper', () => { usdVolume: '0', trades: 0, startingOpenInterest: '100', - orderbookMidPriceClose: null, - orderbookMidPriceOpen: null, + orderbookMidPriceClose: '1000', + orderbookMidPriceOpen: '1000', }, true, 1000, @@ -342,8 +347,8 @@ describe('candleHelper', () => { startedAt, resolution: CandleResolution.ONE_MINUTE, startingOpenInterest: '100', - orderbookMidPriceClose: null, - orderbookMidPriceOpen: null, + orderbookMidPriceClose: '1000', + orderbookMidPriceOpen: '1000', }, true, // contains kafka messages 1000, // orderbook mid price @@ -435,7 +440,8 @@ describe('candleHelper', () => { containsKafkaMessages: boolean = true, orderbookMidPrice: number, ) => { - await OrderbookMidPricesCache.setPrice(redisClient, 'BTC-USD', orderbookMidPrice.toFixed()); + setCachePrice('BTC-USD', orderbookMidPrice.toFixed()); + await OrderbookMidPriceMemoryCache.updateOrderbookMidPrices(); if (initialCandle !== undefined) { await CandleTable.create(initialCandle); @@ -471,18 +477,209 @@ describe('candleHelper', () => { expectTimingStats(); }); - it('successfully creates an orderbook price map for each market', async () => { - await Promise.all([ - OrderbookMidPricesCache.setPrice(redisClient, 'BTC-USD', '105000'), - OrderbookMidPricesCache.setPrice(redisClient, 'ISO-USD', '115000'), - OrderbookMidPricesCache.setPrice(redisClient, 'ETH-USD', '150000'), + it('Updates previous candle orderBookMidPriceClose if startTime is past candle resolution', async () => { + // Create existing candles + const existingPrice: string = '7000'; + const startingOpenInterest: string = '200'; + const baseTokenVolume: string = '10'; + const usdVolume: string = Big(existingPrice).times(baseTokenVolume).toString(); + const orderbookMidPriceClose = '7500'; + const orderbookMidPriceOpen = '8000'; + await Promise.all( + _.map(Object.values(CandleResolution), (resolution: CandleResolution) => { + return CandleTable.create({ + startedAt: previousStartedAt, + ticker: testConstants.defaultPerpetualMarket.ticker, + resolution, + low: existingPrice, + high: existingPrice, + open: existingPrice, + close: existingPrice, + baseTokenVolume, + usdVolume, + trades: existingTrades, + startingOpenInterest, + orderbookMidPriceClose, + orderbookMidPriceOpen, + }); + }), + ); + await startCandleCache(); + + setCachePrice('BTC-USD', '10005'); + await OrderbookMidPriceMemoryCache.updateOrderbookMidPrices(); + + const publisher: KafkaPublisher = new KafkaPublisher(); + publisher.addEvents([ + defaultTradeKafkaEvent, + defaultTradeKafkaEvent2, ]); + // Create new candles, with trades + await runUpdateCandles(publisher).then(async () => { + + // Verify previous candles have orderbookMidPriceClose updated + const previousExpectedCandles: CandleFromDatabase[] = _.map( + Object.values(CandleResolution), + (resolution: CandleResolution) => { + return { + id: CandleTable.uuid(previousStartedAt, defaultCandle.ticker, resolution), + startedAt: previousStartedAt, + ticker: defaultCandle.ticker, + resolution, + low: existingPrice, + high: existingPrice, + open: existingPrice, + close: existingPrice, + baseTokenVolume, + usdVolume, + trades: existingTrades, + startingOpenInterest, + orderbookMidPriceClose: '10005', + orderbookMidPriceOpen, + }; + }, + ); + await verifyCandlesInPostgres(previousExpectedCandles); + }); + + // Verify new candles were created + const expectedCandles: CandleFromDatabase[] = _.map( + Object.values(CandleResolution), + (resolution: CandleResolution) => { + const currentStartedAt: IsoString = helpers.calculateNormalizedCandleStartTime( + testConstants.createdDateTime, + resolution, + ).toISO(); + + return { + id: CandleTable.uuid(currentStartedAt, defaultCandle.ticker, resolution), + startedAt: currentStartedAt, + ticker: defaultCandle.ticker, + resolution, + low: '10000', + high: defaultPrice2, + open: '10000', + close: defaultPrice2, + baseTokenVolume: '20', + usdVolume: '250000', + trades: 2, + startingOpenInterest: '0', + orderbookMidPriceClose: '10005', + orderbookMidPriceOpen: '10005', + }; + }, + ); + await verifyCandlesInPostgres(expectedCandles); + await validateCandlesCache(); + expectTimingStats(); + }); + + it('creates an empty candle and updates the previous candle orderBookMidPriceClose if startTime is past candle resolution', async () => { + // Create existing candles + const existingPrice: string = '7000'; + const startingOpenInterest: string = '200'; + const baseTokenVolume: string = '10'; + const usdVolume: string = Big(existingPrice).times(baseTokenVolume).toString(); + const orderbookMidPriceClose = '7500'; + const orderbookMidPriceOpen = '8000'; + + await Promise.all( + _.map(Object.values(CandleResolution), (resolution: CandleResolution) => { + return CandleTable.create({ + startedAt: previousStartedAt, + ticker: testConstants.defaultPerpetualMarket.ticker, + resolution, + low: existingPrice, + high: existingPrice, + open: existingPrice, + close: existingPrice, + baseTokenVolume, + usdVolume, + trades: existingTrades, + startingOpenInterest, + orderbookMidPriceClose, + orderbookMidPriceOpen, + }); + }), + ); + await startCandleCache(); + + setCachePrice('BTC-USD', '10005'); + await OrderbookMidPriceMemoryCache.updateOrderbookMidPrices(); + + const publisher: KafkaPublisher = new KafkaPublisher(); + publisher.addEvents([]); + + // Create new candles, without trades + await runUpdateCandles(publisher); + + // Verify previous candles have orderbookMidPriceClose updated + const previousExpectedCandles: CandleFromDatabase[] = _.map( + Object.values(CandleResolution), + (resolution: CandleResolution) => { + return { + id: CandleTable.uuid(previousStartedAt, defaultCandle.ticker, resolution), + startedAt: previousStartedAt, + ticker: defaultCandle.ticker, + resolution, + low: existingPrice, + high: existingPrice, + open: existingPrice, + close: existingPrice, + baseTokenVolume, + usdVolume, + trades: existingTrades, + startingOpenInterest, + orderbookMidPriceClose: '10005', + orderbookMidPriceOpen, + }; + }, + ); + await verifyCandlesInPostgres(previousExpectedCandles); + + // Verify new empty candle was created + const expectedCandles: CandleFromDatabase[] = _.map( + Object.values(CandleResolution), + (resolution: CandleResolution) => { + const currentStartedAt: IsoString = helpers.calculateNormalizedCandleStartTime( + testConstants.createdDateTime, + resolution, + ).toISO(); + + return { + id: CandleTable.uuid(currentStartedAt, defaultCandle.ticker, resolution), + startedAt: currentStartedAt, + ticker: defaultCandle.ticker, + resolution, + low: existingPrice, + high: existingPrice, + open: existingPrice, + close: existingPrice, + baseTokenVolume: '0', + usdVolume: '0', + trades: 0, + startingOpenInterest: '0', + orderbookMidPriceClose: '10005', + orderbookMidPriceOpen: '10005', + }; + }, + ); + await verifyCandlesInPostgres(expectedCandles); + + }); + + it('successfully creates an orderbook price map for each market', async () => { + setCachePrice('BTC-USD', '105000'); + setCachePrice('ISO-USD', '115000'); + setCachePrice('ETH-USD', '150000'); + await OrderbookMidPriceMemoryCache.updateOrderbookMidPrices(); + const map = await getOrderbookMidPriceMap(); expect(map).toEqual({ - 'BTC-USD': undefined, - 'ETH-USD': undefined, - 'ISO-USD': undefined, + 'BTC-USD': '105000', + 'ETH-USD': '150000', + 'ISO-USD': '115000', 'ISO2-USD': undefined, 'SHIB-USD': undefined, }); diff --git a/indexer/services/ender/src/caches/orderbook-mid-price-memory-cache.ts b/indexer/services/ender/src/caches/orderbook-mid-price-memory-cache.ts new file mode 100644 index 0000000000..7a6e61998e --- /dev/null +++ b/indexer/services/ender/src/caches/orderbook-mid-price-memory-cache.ts @@ -0,0 +1,69 @@ +import { logger, stats } from '@dydxprotocol-indexer/base'; +import { + PerpetualMarketFromDatabase, + perpetualMarketRefresher, + loopHelpers, +} from '@dydxprotocol-indexer/postgres'; +import { OrderbookMidPricesCache } from '@dydxprotocol-indexer/redis'; + +import config from '../config'; +import { redisClient } from '../helpers/redis/redis-controller'; + +interface OrderbookMidPriceCache { + [ticker: string]: string | undefined, +} + +let orderbookMidPriceCache: OrderbookMidPriceCache = {}; + +/** + * Refresh loop to cache the list of all perpetual markets from the database in-memory. + */ +export async function start(): Promise { + await loopHelpers.startUpdateLoop( + updateOrderbookMidPrices, + config.ORDERBOOK_MID_PRICE_REFRESH_INTERVAL_MS, + 'updateOrderbookMidPrices', + ); +} + +export function getOrderbookMidPrice(ticker: string): string | undefined { + return orderbookMidPriceCache[ticker]; +} + +export async function updateOrderbookMidPrices(): Promise { + const startTime: number = Date.now(); + try { + const perpetualMarkets: PerpetualMarketFromDatabase[] = Object.values( + perpetualMarketRefresher.getPerpetualMarketsMap(), + ); + + const tickers: string[] = perpetualMarkets.map((market) => market.ticker); + + orderbookMidPriceCache = await OrderbookMidPricesCache.getMedianPrices( + redisClient, + tickers, + ); + + // Log out each median price for each market + Object.entries(orderbookMidPriceCache).forEach(([ticker, price]) => { + logger.info({ + at: 'orderbook-mid-price-cache#updateOrderbookMidPrices', + message: `Median price for market ${ticker}`, + ticker, + price, + }); + }); + + } catch (error) { + logger.error({ + at: 'orderbook-mid-price-cache#updateOrderbookMidPrices', + message: 'Failed to fetch OrderbookMidPrices', + error, + }); + } finally { + stats.timing( + `${config.SERVICE_NAME}.update_orderbook_mid_prices_cache.timing`, + Date.now() - startTime, + ); + } +} diff --git a/indexer/services/ender/src/config.ts b/indexer/services/ender/src/config.ts index 201f7807ee..0e940650e1 100644 --- a/indexer/services/ender/src/config.ts +++ b/indexer/services/ender/src/config.ts @@ -7,6 +7,7 @@ import { baseConfigSchema, parseBoolean, parseString, + parseInteger, } from '@dydxprotocol-indexer/base'; import { kafkaConfigSchema, @@ -31,6 +32,7 @@ export const configSchema = { SKIP_STATEFUL_ORDER_UUIDS: parseString({ default: '', }), + ORDERBOOK_MID_PRICE_REFRESH_INTERVAL_MS: parseInteger({ default: 10_000 }), // 10 seconds }; export default parseSchema(configSchema); diff --git a/indexer/services/ender/src/index.ts b/indexer/services/ender/src/index.ts index c72d8542ee..07702f9b7d 100644 --- a/indexer/services/ender/src/index.ts +++ b/indexer/services/ender/src/index.ts @@ -5,6 +5,7 @@ import { } from '@dydxprotocol-indexer/postgres'; import { initializeAllCaches } from './caches/block-cache'; +import * as OrderbookMidPriceMemoryCache from './caches/orderbook-mid-price-memory-cache'; import config from './config'; import { connect } from './helpers/kafka/kafka-controller'; import { createPostgresFunctions } from './helpers/postgres/postgres-functions'; @@ -28,8 +29,10 @@ async function startKafka(): Promise { ]); // Ender does not need to refresh its caches in a loop because Ender is the only service that // writes to the key attributes of perpetual_markets, asset_refresher, and market_refresher - // The only exception are the aggregated properties of perpetual_markets + // The two exceptions are the aggregated properties of perpetual_markets and the + // OrderbookMidPriceMemoryCache await initializeAllCaches(); + wrapBackgroundTask(OrderbookMidPriceMemoryCache.start(), true, 'startUpdateOrderbookMidPrices'); await connect(); await startConsumer(); diff --git a/indexer/services/ender/src/lib/candles-generator.ts b/indexer/services/ender/src/lib/candles-generator.ts index f1daa75f06..744c17f92e 100644 --- a/indexer/services/ender/src/lib/candles-generator.ts +++ b/indexer/services/ender/src/lib/candles-generator.ts @@ -26,6 +26,7 @@ import _ from 'lodash'; import { DateTime } from 'luxon'; import { getCandle } from '../caches/candle-cache'; +import { getOrderbookMidPrice } from '../caches/orderbook-mid-price-memory-cache'; import config from '../config'; import { KafkaPublisher } from './kafka-publisher'; import { ConsolidatedKafkaEvent, SingleTradeMessage } from './types'; @@ -169,7 +170,7 @@ export class CandlesGenerator { const promises: Promise[] = []; const openInterestMap: OpenInterestMap = await this.getOpenInterestMap(); - const orderbookMidPriceMap = await getOrderbookMidPriceMap(); + const orderbookMidPriceMap = getOrderbookMidPriceMap(); _.forEach( Object.values(perpetualMarketRefresher.getPerpetualMarketsMap()), (perpetualMarket: PerpetualMarketFromDatabase) => { @@ -531,18 +532,13 @@ export class CandlesGenerator { /** * Get the cached orderbook mid price for a given ticker */ -export async function getOrderbookMidPriceMap(): Promise<{ [ticker: string]: OrderbookMidPrice }> { +export function getOrderbookMidPriceMap(): { [ticker: string]: OrderbookMidPrice } { const start: number = Date.now(); const perpetualMarkets = Object.values(perpetualMarketRefresher.getPerpetualMarketsMap()); - const promises = perpetualMarkets.map(async (perpetualMarket: PerpetualMarketFromDatabase) => { - return Promise.resolve({ [perpetualMarket.ticker]: undefined }); - }); - - const pricesArray = await Promise.all(promises); const priceMap: { [ticker: string]: OrderbookMidPrice } = {}; - pricesArray.forEach((price) => { - Object.assign(priceMap, price); + perpetualMarkets.forEach((perpetualMarket: PerpetualMarketFromDatabase) => { + priceMap[perpetualMarket.ticker] = getOrderbookMidPrice(perpetualMarket.ticker); }); stats.timing(`${config.SERVICE_NAME}.get_orderbook_mid_price_map.timing`, Date.now() - start); diff --git a/indexer/services/roundtable/__tests__/tasks/cache-orderbook-mid-prices.test.ts b/indexer/services/roundtable/__tests__/tasks/cache-orderbook-mid-prices.test.ts new file mode 100644 index 0000000000..84ffc577e0 --- /dev/null +++ b/indexer/services/roundtable/__tests__/tasks/cache-orderbook-mid-prices.test.ts @@ -0,0 +1,95 @@ +import { + dbHelpers, + PerpetualMarketFromDatabase, + PerpetualMarketTable, + testConstants, + testMocks, + perpetualMarketRefresher, +} from '@dydxprotocol-indexer/postgres'; +import { + OrderbookLevelsCache, + OrderbookMidPricesCache, + redis, +} from '@dydxprotocol-indexer/redis'; +import { redisClient } from '../../src/helpers/redis'; +import runTask from '../../src/tasks/cache-orderbook-mid-prices'; + +describe('cache-orderbook-mid-prices', () => { + beforeEach(async () => { + await redis.deleteAllAsync(redisClient); + await testMocks.seedData(); + await perpetualMarketRefresher.updatePerpetualMarkets(); + }); + + afterAll(() => { + jest.restoreAllMocks(); + }); + + beforeAll(async () => { + await dbHelpers.migrate(); + await dbHelpers.clearData(); + }); + + afterEach(async () => { + await dbHelpers.clearData(); + await perpetualMarketRefresher.clear(); + }); + + it('caches mid prices for all markets', async () => { + const market1: PerpetualMarketFromDatabase | undefined = await PerpetualMarketTable + .findByMarketId( + testConstants.defaultMarket.id, + ); + const market2: PerpetualMarketFromDatabase | undefined = await PerpetualMarketTable + .findByMarketId( + testConstants.defaultMarket2.id, + ); + const market3: PerpetualMarketFromDatabase | undefined = await PerpetualMarketTable + .findByMarketId( + testConstants.defaultMarket3.id, + ); + if (!market1 || !market2 || !market3) { + throw new Error('Test market not found'); + } + + jest.spyOn(OrderbookLevelsCache, 'getOrderBookMidPrice') + .mockReturnValueOnce(Promise.resolve('200')) + .mockReturnValueOnce(Promise.resolve('300')) + .mockReturnValueOnce(Promise.resolve('400')); + + await runTask(); + expect(OrderbookLevelsCache.getOrderBookMidPrice).toHaveBeenCalledTimes(5); + + const prices: {[ticker: string]: string | undefined} = await + OrderbookMidPricesCache.getMedianPrices( + redisClient, + [market1.ticker, market2.ticker, market3.ticker], + ); + + expect(prices[market1.ticker]).toBe('200'); + expect(prices[market2.ticker]).toBe('300'); + expect(prices[market3.ticker]).toBe('400'); + }); + + it('handles undefined prices', async () => { + const market1 = await PerpetualMarketTable + .findByMarketId( + testConstants.defaultMarket.id, + ); + + if (!market1) { + throw new Error('Market 1 not found'); + } + + jest.spyOn(PerpetualMarketTable, 'findAll') + .mockReturnValueOnce(Promise.resolve([market1] as PerpetualMarketFromDatabase[])); + + jest.spyOn(OrderbookLevelsCache, 'getOrderBookMidPrice') + .mockReturnValueOnce(Promise.resolve(undefined)); + + await runTask(); + + const price = await OrderbookMidPricesCache.getMedianPrices(redisClient, [market1.ticker]); + expect(price).toEqual({ 'BTC-USD': undefined }); + }); +}); diff --git a/indexer/services/roundtable/src/config.ts b/indexer/services/roundtable/src/config.ts index 3d03fd0e19..0e80bffbe1 100644 --- a/indexer/services/roundtable/src/config.ts +++ b/indexer/services/roundtable/src/config.ts @@ -61,6 +61,7 @@ export const configSchema = { LOOPS_ENABLED_UPDATE_AFFILIATE_INFO: parseBoolean({ default: true }), LOOPS_ENABLED_DELETE_OLD_FIREBASE_NOTIFICATION_TOKENS: parseBoolean({ default: true }), LOOPS_ENABLED_REFRESH_VAULT_PNL: parseBoolean({ default: true }), + LOOPS_ENABLED_CACHE_ORDERBOOK_MID_PRICES: parseBoolean({ default: true }), // Loop Timing LOOPS_INTERVAL_MS_MARKET_UPDATER: parseInteger({ @@ -139,7 +140,7 @@ export const configSchema = { default: 30 * ONE_DAY_IN_MILLISECONDS, }), LOOPS_INTERVAL_MS_CACHE_ORDERBOOK_MID_PRICES: parseInteger({ - default: ONE_SECOND_IN_MILLISECONDS, + default: TEN_SECONDS_IN_MILLISECONDS, }), LOOPS_INTERVAL_MS_REFRESH_VAULT_PNL: parseInteger({ default: 5 * ONE_MINUTE_IN_MILLISECONDS, diff --git a/indexer/services/roundtable/src/index.ts b/indexer/services/roundtable/src/index.ts index 5a2db7c728..f33d552f8a 100644 --- a/indexer/services/roundtable/src/index.ts +++ b/indexer/services/roundtable/src/index.ts @@ -10,6 +10,7 @@ import { connect as connectToRedis, } from './helpers/redis'; import aggregateTradingRewardsTasks from './tasks/aggregate-trading-rewards'; +import cacheOrderbookMidPrices from './tasks/cache-orderbook-mid-prices'; import cancelStaleOrdersTask from './tasks/cancel-stale-orders'; import createLeaderboardTask from './tasks/create-leaderboard'; import createPnlTicksTask from './tasks/create-pnl-ticks'; @@ -280,6 +281,13 @@ async function start(): Promise { config.LOOPS_INTERVAL_MS_REFRESH_VAULT_PNL, ); } + if (config.LOOPS_ENABLED_CACHE_ORDERBOOK_MID_PRICES) { + startLoop( + cacheOrderbookMidPrices, + 'cache-orderbook-mid-prices', + config.LOOPS_INTERVAL_MS_CACHE_ORDERBOOK_MID_PRICES, + ); + } logger.info({ at: 'index', diff --git a/indexer/services/roundtable/src/tasks/cache-orderbook-mid-prices.ts b/indexer/services/roundtable/src/tasks/cache-orderbook-mid-prices.ts new file mode 100644 index 0000000000..3705d07d3a --- /dev/null +++ b/indexer/services/roundtable/src/tasks/cache-orderbook-mid-prices.ts @@ -0,0 +1,52 @@ +import { + logger, +} from '@dydxprotocol-indexer/base'; +import { + PerpetualMarketFromDatabase, + perpetualMarketRefresher, +} from '@dydxprotocol-indexer/postgres'; +import { + OrderbookMidPricesCache, +} from '@dydxprotocol-indexer/redis'; + +import { redisClient } from '../helpers/redis'; + +/** + * Updates OrderbookMidPricesCache with current orderbook mid price for each market + */ +export default async function runTask(): Promise { + try { + let perpetualMarkets: PerpetualMarketFromDatabase[] = Object.values( + perpetualMarketRefresher.getPerpetualMarketsMap(), + ); + let marketTickers: string[] = perpetualMarkets.map( + (market: PerpetualMarketFromDatabase) => market.ticker, + ); + + // If no markets found, try updating the perpetual markets cache and fetch again + if (marketTickers.length === 0) { + await perpetualMarketRefresher.updatePerpetualMarkets(); + perpetualMarkets = Object.values(perpetualMarketRefresher.getPerpetualMarketsMap()); + marketTickers = perpetualMarkets.map( + (market: PerpetualMarketFromDatabase) => market.ticker, + ); + + if (marketTickers.length === 0) { + throw new Error('perpetualMarketRefresher is empty'); + } + } + + logger.info({ + at: 'cache-orderbook-mid-prices#runTask', + message: 'Caching orderbook mid prices for markets', + markets: marketTickers.join(', '), + }); + await OrderbookMidPricesCache.fetchAndCacheOrderbookMidPrices(redisClient, marketTickers); + } catch (error) { + logger.error({ + at: 'cache-orderbook-mid-prices#runTask', + message: (error as Error).message, + error, + }); + } +}