From d7a971c607d194911014bf34a84bfd6886fe9228 Mon Sep 17 00:00:00 2001 From: Paul Puey Date: Fri, 9 Feb 2024 22:54:55 -0800 Subject: [PATCH 1/7] Change getAnalytic to take just one partnerId We never used more than one partnerId and it simplifies the code for a future refactoring commit --- src/apiAnalytics.ts | 19 ++++------------- src/cacheEngine.ts | 8 +++---- src/dbutils.ts | 52 ++++++++++++++++----------------------------- 3 files changed, 26 insertions(+), 53 deletions(-) diff --git a/src/apiAnalytics.ts b/src/apiAnalytics.ts index e13dbbd7..117b4195 100644 --- a/src/apiAnalytics.ts +++ b/src/apiAnalytics.ts @@ -1,3 +1,5 @@ +import { AnalyticsResult } from './types' + interface UtcValues { y: number m: number @@ -22,19 +24,6 @@ interface Bucket { currencyPairs: { [currencyPair: string]: number } } -interface AnalyticsResult { - result: { - hour: Bucket[] - day: Bucket[] - month: Bucket[] - numAllTxs: number - } - appId: string - partnerId: string - start: number - end: number -} - export const getAnalytics = ( txs: DbTx[], start: number, @@ -126,14 +115,14 @@ export const getAnalytics = ( } } - const analyticsResult = { + const analyticsResult: AnalyticsResult = { result: { month: monthArray, day: dayArray, hour: hourArray, numAllTxs: txs.length }, - appId, + app: appId, partnerId, start: start, end: end diff --git a/src/cacheEngine.ts b/src/cacheEngine.ts index b860a3b4..d1ae54e1 100644 --- a/src/cacheEngine.ts +++ b/src/cacheEngine.ts @@ -65,17 +65,17 @@ export async function cacheEngine(): Promise { continue } for (const timePeriod of TIME_PERIODS) { - const data = await getAnalytic( + const result = await getAnalytic( start, end, app.appId, - [partnerId], + partnerId, timePeriod, reportsTransactions ) // Create cache docs - if (data.length > 0) { - const cacheResult = data[0].result[timePeriod].map(bucket => { + if (result != null) { + const cacheResult = result[timePeriod].map(bucket => { return { _id: `${app.appId}_${partnerId}:${bucket.isoDate}`, timestamp: bucket.start, diff --git a/src/dbutils.ts b/src/dbutils.ts index 9ee6c132..15094da9 100644 --- a/src/dbutils.ts +++ b/src/dbutils.ts @@ -54,10 +54,10 @@ export const getAnalytic = async ( start: number, end: number, appId: string, - partnerIds: string[], + partnerId: string, timePeriod: string, transactionDatabase: any -): Promise => { +): Promise => { const query = { selector: { status: { $eq: 'complete' }, @@ -75,41 +75,25 @@ export const getAnalytic = async ( sort: ['timestamp'], limit: 1000000 } - const results: any[] = [] - const promises: Array> = [] try { - for (const partnerId of partnerIds) { - const appAndPartnerId = `${appId}_${partnerId}` - const result = transactionDatabase - .partitionedFind(appAndPartnerId, query) - .then(data => { - const analytic = getAnalytics( - asDbReq(data).docs, - start, - end, - appId, - appAndPartnerId, - timePeriod - ) - if (analytic.result.numAllTxs > 0) results.push(analytic) - }) - promises.push(result) - } - console.time(`${appId} promiseAll`) - await Promise.all(promises) - console.timeEnd(`${appId} promiseAll`) - return results.sort((a, b) => { - if (a.pluginId < b.pluginId) { - return -1 - } - if (a.pluginId > b.pluginId) { - return 1 - } - return 0 - }) + const appAndPartnerId = `${appId}_${partnerId}` + const data = await transactionDatabase.partitionedFind( + appAndPartnerId, + query + ) + + const analytic = getAnalytics( + asDbReq(data).docs, + start, + end, + appId, + appAndPartnerId, + timePeriod + ) + return analytic.result.numAllTxs > 0 ? analytic : undefined } catch (e) { console.log(e) - return `Internal server error.` + throw new Error(`getAnalytic: Internal server error.`) } } From 4749ad12e37a3204452af8197adf60b7a7ecd2c3 Mon Sep 17 00:00:00 2001 From: Paul Puey Date: Fri, 9 Feb 2024 23:06:38 -0800 Subject: [PATCH 2/7] Do one data query and re-use for each timeperiod --- src/cacheEngine.ts | 48 ++++++++++++++++++++++++++++++++++++++----- src/dbutils.ts | 51 +++------------------------------------------- 2 files changed, 46 insertions(+), 53 deletions(-) diff --git a/src/cacheEngine.ts b/src/cacheEngine.ts index d1ae54e1..4843bed1 100644 --- a/src/cacheEngine.ts +++ b/src/cacheEngine.ts @@ -2,8 +2,9 @@ import startOfMonth from 'date-fns/startOfMonth' import sub from 'date-fns/sub' import nano from 'nano' +import { getAnalytics } from './apiAnalytics' import { config } from './config' -import { getAnalytic } from './dbutils' +import { asDbReq } from './dbutils' import { initDbs } from './initDbs' import { asApps } from './types' import { datelog, snooze } from './util' @@ -64,15 +65,52 @@ export async function cacheEngine(): Promise { ) { continue } + + const query = { + selector: { + status: { $eq: 'complete' }, + usdValue: { $gte: 0 }, + timestamp: { $gte: start, $lt: end } + }, + fields: [ + 'orderId', + 'depositCurrency', + 'payoutCurrency', + 'timestamp', + 'usdValue' + ], + use_index: 'timestamp-p', + sort: ['timestamp'], + limit: 1000000 + } + const appAndPartnerId = `${app.appId}_${partnerId}` + let data + try { + data = await reportsTransactions.partitionedFind( + appAndPartnerId, + query + ) + } catch (e) { + datelog('Error fetching transactions', e) + console.error(e) + continue + } + + const dbReq = asDbReq(data) + const dbTxs = dbReq.docs + for (const timePeriod of TIME_PERIODS) { - const result = await getAnalytic( + const analytic = getAnalytics( + dbTxs, start, end, app.appId, - partnerId, - timePeriod, - reportsTransactions + appAndPartnerId, + timePeriod ) + const { result } = analytic + if (result.numAllTxs === 0) continue + // Create cache docs if (result != null) { const cacheResult = result[timePeriod].map(bucket => { diff --git a/src/dbutils.ts b/src/dbutils.ts index 15094da9..7f8dbc15 100644 --- a/src/dbutils.ts +++ b/src/dbutils.ts @@ -9,7 +9,7 @@ import { datelog, promiseTimeout } from './util' const BATCH_ADVANCE = 100 const SIX_DAYS_IN_SECONDS = 6 * 24 * 60 * 60 -const asDbReq = asObject({ +export const asDbReq = asObject({ docs: asArray( asObject({ orderId: asString, @@ -21,6 +21,8 @@ const asDbReq = asObject({ ) }) +export type DbReq = ReturnType + export const pagination = async ( txArray: any[], partition: nano.DocumentScope @@ -50,53 +52,6 @@ export const pagination = async ( datelog(`total errors: ${numErrors}`) } -export const getAnalytic = async ( - start: number, - end: number, - appId: string, - partnerId: string, - timePeriod: string, - transactionDatabase: any -): Promise => { - const query = { - selector: { - status: { $eq: 'complete' }, - usdValue: { $gte: 0 }, - timestamp: { $gte: start, $lt: end } - }, - fields: [ - 'orderId', - 'depositCurrency', - 'payoutCurrency', - 'timestamp', - 'usdValue' - ], - use_index: 'timestamp-p', - sort: ['timestamp'], - limit: 1000000 - } - try { - const appAndPartnerId = `${appId}_${partnerId}` - const data = await transactionDatabase.partitionedFind( - appAndPartnerId, - query - ) - - const analytic = getAnalytics( - asDbReq(data).docs, - start, - end, - appId, - appAndPartnerId, - timePeriod - ) - return analytic.result.numAllTxs > 0 ? analytic : undefined - } catch (e) { - console.log(e) - throw new Error(`getAnalytic: Internal server error.`) - } -} - export const cacheAnalytic = async ( start: number, end: number, From bb870ed3b95783ba201693d221a806ba9111db1b Mon Sep 17 00:00:00 2001 From: Paul Puey Date: Fri, 9 Feb 2024 23:48:37 -0800 Subject: [PATCH 3/7] Paginate cache updates into 60 day chunks --- src/cacheEngine.ts | 108 +++++++++++++++++++++++++-------------------- 1 file changed, 60 insertions(+), 48 deletions(-) diff --git a/src/cacheEngine.ts b/src/cacheEngine.ts index 4843bed1..55ac9019 100644 --- a/src/cacheEngine.ts +++ b/src/cacheEngine.ts @@ -13,6 +13,7 @@ const CACHE_UPDATE_LOOKBACK_MONTHS = config.cacheLookbackMonths ?? 3 const BULK_WRITE_SIZE = 50 const UPDATE_FREQUENCY_MS = 1000 * 60 * 30 +const CACHE_UPDATE_BLOCK_S = 60 * 60 * 24 * 60 // 60 days const nanoDb = nano(config.couchDbFullpath) @@ -66,53 +67,62 @@ export async function cacheEngine(): Promise { continue } - const query = { - selector: { - status: { $eq: 'complete' }, - usdValue: { $gte: 0 }, - timestamp: { $gte: start, $lt: end } - }, - fields: [ - 'orderId', - 'depositCurrency', - 'payoutCurrency', - 'timestamp', - 'usdValue' - ], - use_index: 'timestamp-p', - sort: ['timestamp'], - limit: 1000000 - } - const appAndPartnerId = `${app.appId}_${partnerId}` - let data - try { - data = await reportsTransactions.partitionedFind( - appAndPartnerId, - query - ) - } catch (e) { - datelog('Error fetching transactions', e) - console.error(e) - continue - } + for ( + let localStart = start; + localStart < end + CACHE_UPDATE_BLOCK_S; + localStart += CACHE_UPDATE_BLOCK_S + ) { + const localEnd = localStart + CACHE_UPDATE_BLOCK_S + + const query = { + selector: { + status: { $eq: 'complete' }, + usdValue: { $gte: 0 }, + timestamp: { $gte: localStart, $lt: localEnd } + }, + fields: [ + 'orderId', + 'depositCurrency', + 'payoutCurrency', + 'timestamp', + 'usdValue' + ], + use_index: 'timestamp-p', + sort: ['timestamp'], + limit: 1000000 + } + const appAndPartnerId = `${app.appId}_${partnerId}` + + let data + try { + data = await reportsTransactions.partitionedFind( + appAndPartnerId, + query + ) + } catch (e) { + datelog('Error fetching transactions', e) + console.error(e) + continue + } - const dbReq = asDbReq(data) - const dbTxs = dbReq.docs + const dbReq = asDbReq(data) + const dbTxs = dbReq.docs + + if (dbTxs.length === 0) continue - for (const timePeriod of TIME_PERIODS) { const analytic = getAnalytics( dbTxs, - start, - end, + localStart, + localEnd, app.appId, appAndPartnerId, - timePeriod + TIME_PERIODS.join(',') ) const { result } = analytic + if (result == null) continue if (result.numAllTxs === 0) continue - - // Create cache docs - if (result != null) { + for (const timePeriod of TIME_PERIODS) { + // Create cache docs const cacheResult = result[timePeriod].map(bucket => { return { _id: `${app.appId}_${partnerId}:${bucket.isoDate}`, @@ -131,7 +141,7 @@ export async function cacheEngine(): Promise { database = reportsMonth } // Fetch existing _revs of cache - if (start !== new Date(2017, 1, 20).getTime() / 1000) { + if (localStart !== new Date(2017, 1, 20).getTime() / 1000) { const documentIds = cacheResult.map(cache => { return cache._id }) @@ -151,24 +161,26 @@ export async function cacheEngine(): Promise { ) for ( - let start = 0; - start < cacheResult.length; - start += BULK_WRITE_SIZE + let writeStart = 0; + writeStart < cacheResult.length; + writeStart += BULK_WRITE_SIZE ) { - const end = - start + BULK_WRITE_SIZE > cacheResult.length + const writeEnd = + writeStart + BULK_WRITE_SIZE > cacheResult.length ? cacheResult.length - : start + BULK_WRITE_SIZE + : writeStart + BULK_WRITE_SIZE // datelog(`Bulk writing docs ${start} to ${end - 1}`) - const docs = cacheResult.slice(start, end) + const docs = cacheResult.slice(writeStart, writeEnd) datelog( - `Bulk writing docs ${start} to ${end} of ${cacheResult.length.toString()}` + `Bulk writing docs ${writeStart} to ${writeEnd} of ${cacheResult.length.toString()}` ) await database.bulk({ docs }) } + const dateStart = new Date(localStart * 1000).toISOString() + const dateEnd = new Date(localEnd * 1000).toISOString() datelog( - `Finished updating ${timePeriod} cache for ${app.appId}_${partnerId}` + `Finished updating ${timePeriod} ${dateStart} ${dateEnd} cache for ${app.appId}_${partnerId}` ) } catch (e) { datelog('Error doing bulk cache update', e) From ffac794e302a63ec1e78575cce11f7f49c2ff53f Mon Sep 17 00:00:00 2001 From: Paul Puey Date: Sat, 10 Feb 2024 17:12:58 -0800 Subject: [PATCH 4/7] Add timestamp index for transactions db --- src/initDbs.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/initDbs.ts b/src/initDbs.ts index 93f8d1cc..878ef7a9 100644 --- a/src/initDbs.ts +++ b/src/initDbs.ts @@ -15,7 +15,8 @@ const transactionIndexFields: string[][] = [ ['status', 'payoutCurrency', 'isoDate'], ['status', 'usdValue'], ['status', 'usdvalue', 'timestamp'], - ['usdValue'] + ['usdValue'], + ['timestamp'] ] interface Index { From ca2899d09ddd8726c3ee1266e0a74db77dd623dd Mon Sep 17 00:00:00 2001 From: Paul Puey Date: Sat, 10 Feb 2024 17:20:27 -0800 Subject: [PATCH 5/7] Fix partial cache updates Use of date-fns introduces 2 bugs 1. It incorrectly adds and subtracts months due to differences in # of days in a month. ie 2024-02-01 minus 3 months is 2023-10-31. This possibly causes missed transactions or txs getting double counted 2. Results of date manipulation are in local time zone, not UTC We instead use explicit UTC date manipulation. --- src/cacheEngine.ts | 61 +++++++++++++++++++++++++++++----------------- src/util.ts | 21 ++++++++++++++++ 2 files changed, 59 insertions(+), 23 deletions(-) diff --git a/src/cacheEngine.ts b/src/cacheEngine.ts index 55ac9019..92e9b31d 100644 --- a/src/cacheEngine.ts +++ b/src/cacheEngine.ts @@ -1,5 +1,3 @@ -import startOfMonth from 'date-fns/startOfMonth' -import sub from 'date-fns/sub' import nano from 'nano' import { getAnalytics } from './apiAnalytics' @@ -7,17 +5,26 @@ import { config } from './config' import { asDbReq } from './dbutils' import { initDbs } from './initDbs' import { asApps } from './types' -import { datelog, snooze } from './util' +import { + datelog, + getStartOfMonthsAgo, + getStartOfMonthsFromNow, + snooze +} from './util' const CACHE_UPDATE_LOOKBACK_MONTHS = config.cacheLookbackMonths ?? 3 const BULK_WRITE_SIZE = 50 const UPDATE_FREQUENCY_MS = 1000 * 60 * 30 -const CACHE_UPDATE_BLOCK_S = 60 * 60 * 24 * 60 // 60 days +const CACHE_UPDATE_BLOCK_MONTHS = 3 const nanoDb = nano(config.couchDbFullpath) const TIME_PERIODS = ['hour', 'day', 'month'] +const birthdayStart = getStartOfMonthsAgo( + new Date('2017-01Z').toISOString(), + 0 +).toISOString() export async function cacheEngine(): Promise { datelog('Starting Cache Engine') @@ -32,17 +39,18 @@ export async function cacheEngine(): Promise { const reportsMonth = nanoDb.use('reports_month') while (true) { - let start - const end = new Date(Date.now()).getTime() / 1000 + let start: string + const now = new Date().toISOString() + const end = getStartOfMonthsFromNow(now, 1).toISOString() try { await reportsMonth.get('initialized:initialized') - const monthStart = startOfMonth(new Date(Date.now())) - start = - sub(monthStart, { months: CACHE_UPDATE_LOOKBACK_MONTHS }).getTime() / - 1000 + start = getStartOfMonthsAgo( + now, + CACHE_UPDATE_LOOKBACK_MONTHS + ).toISOString() } catch (e) { - start = new Date(2017, 1, 20).getTime() / 1000 + start = birthdayStart } const query = { @@ -68,17 +76,25 @@ export async function cacheEngine(): Promise { } for ( - let localStart = start; - localStart < end + CACHE_UPDATE_BLOCK_S; - localStart += CACHE_UPDATE_BLOCK_S + let localStart: string = start; + localStart < end; + localStart = getStartOfMonthsFromNow( + localStart, + CACHE_UPDATE_BLOCK_MONTHS + ).toISOString() ) { - const localEnd = localStart + CACHE_UPDATE_BLOCK_S + const localEnd = getStartOfMonthsFromNow( + localStart, + CACHE_UPDATE_BLOCK_MONTHS + ).toISOString() + const tsStart = new Date(localStart).getTime() / 1000 + const tsEnd = new Date(localEnd).getTime() / 1000 const query = { selector: { status: { $eq: 'complete' }, usdValue: { $gte: 0 }, - timestamp: { $gte: localStart, $lt: localEnd } + timestamp: { $gte: tsStart, $lt: tsEnd } }, fields: [ 'orderId', @@ -95,6 +111,7 @@ export async function cacheEngine(): Promise { let data try { + datelog('find txs', appAndPartnerId, localStart, localEnd) data = await reportsTransactions.partitionedFind( appAndPartnerId, query @@ -102,7 +119,7 @@ export async function cacheEngine(): Promise { } catch (e) { datelog('Error fetching transactions', e) console.error(e) - continue + break } const dbReq = asDbReq(data) @@ -112,8 +129,8 @@ export async function cacheEngine(): Promise { const analytic = getAnalytics( dbTxs, - localStart, - localEnd, + tsStart, + tsEnd, app.appId, appAndPartnerId, TIME_PERIODS.join(',') @@ -141,7 +158,7 @@ export async function cacheEngine(): Promise { database = reportsMonth } // Fetch existing _revs of cache - if (localStart !== new Date(2017, 1, 20).getTime() / 1000) { + if (localStart !== birthdayStart) { const documentIds = cacheResult.map(cache => { return cache._id }) @@ -177,10 +194,8 @@ export async function cacheEngine(): Promise { await database.bulk({ docs }) } - const dateStart = new Date(localStart * 1000).toISOString() - const dateEnd = new Date(localEnd * 1000).toISOString() datelog( - `Finished updating ${timePeriod} ${dateStart} ${dateEnd} cache for ${app.appId}_${partnerId}` + `Finished updating ${timePeriod} ${localStart} ${localEnd} cache for ${app.appId}_${partnerId}` ) } catch (e) { datelog('Error doing bulk cache update', e) diff --git a/src/util.ts b/src/util.ts index 6858d24c..bd138fa4 100644 --- a/src/util.ts +++ b/src/util.ts @@ -93,3 +93,24 @@ export const retryFetch = async ( } throw err } + +export const getStartOfMonthsAgo = ( + dateString: string, + months: number +): Date => { + const date = new Date(dateString) + const year = date.getUTCFullYear() + const month = date.getUTCMonth() + + return new Date(Date.UTC(year, month - months, 1)) +} + +export const getStartOfMonthsFromNow = ( + dateString: string, + months: number +): Date => { + const date = new Date(dateString) + const year = date.getUTCFullYear() + const month = date.getUTCMonth() + return new Date(Date.UTC(year, month + months, 1, 0, 0, 0, 0)) +} From 443e72b8bf984647370c752095ed9f9e1d03f40d Mon Sep 17 00:00:00 2001 From: Paul Puey Date: Sat, 10 Feb 2024 17:31:36 -0800 Subject: [PATCH 6/7] Set initialized marker per app and partnerid --- src/cacheEngine.ts | 51 +++++++++++++++++++++++----------------------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/src/cacheEngine.ts b/src/cacheEngine.ts index 92e9b31d..50499985 100644 --- a/src/cacheEngine.ts +++ b/src/cacheEngine.ts @@ -43,16 +43,6 @@ export async function cacheEngine(): Promise { const now = new Date().toISOString() const end = getStartOfMonthsFromNow(now, 1).toISOString() - try { - await reportsMonth.get('initialized:initialized') - start = getStartOfMonthsAgo( - now, - CACHE_UPDATE_LOOKBACK_MONTHS - ).toISOString() - } catch (e) { - start = birthdayStart - } - const query = { selector: { appId: { $exists: true } @@ -75,6 +65,19 @@ export async function cacheEngine(): Promise { continue } + const appAndPartnerId = `${app.appId}_${partnerId}` + const initializedDocId = `${appAndPartnerId}:00000000_initialized` + + try { + await reportsMonth.get(initializedDocId) + start = getStartOfMonthsAgo( + now, + CACHE_UPDATE_LOOKBACK_MONTHS + ).toISOString() + } catch (e) { + start = birthdayStart + } + for ( let localStart: string = start; localStart < end; @@ -107,7 +110,6 @@ export async function cacheEngine(): Promise { sort: ['timestamp'], limit: 1000000 } - const appAndPartnerId = `${app.appId}_${partnerId}` let data try { @@ -203,20 +205,19 @@ export async function cacheEngine(): Promise { } } } - } - } - try { - await reportsMonth.get('initialized:initialized') - datelog('Cache Update Complete.') - } catch { - try { - await reportsMonth - .insert({ _id: 'initialized:initialized' }) - .then(() => { - datelog('Cache Initialized.') - }) - } catch { - datelog('Failed to Create Initialized Marker.') + + try { + await reportsMonth.get(initializedDocId) + datelog(`${initializedDocId} Cache Update Complete`) + } catch { + try { + await reportsMonth.insert({ _id: initializedDocId }).then(() => { + datelog(`${initializedDocId} Initialized Marker Created`) + }) + } catch { + datelog(`${initializedDocId} Initialized Marker Creation Failed`) + } + } } } console.timeEnd('cacheEngine') From 5f7f2a34bd4b60261c28d02dbdf92799cf655f14 Mon Sep 17 00:00:00 2001 From: Paul V Puey Date: Sat, 10 Feb 2024 18:43:24 -0800 Subject: [PATCH 7/7] Add more currency code mappings for rates conversion --- CHANGELOG.md | 10 ++++++++++ src/util.ts | 8 ++++++++ 2 files changed, 18 insertions(+) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..c223ad6e --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,10 @@ +# edge-reports-server + +## Unreleased + +- changed: Paginate caching engine to prevent timeouts +- changed: Create caching engine 'initialized' document entry for each app:partner pair + +## 0.1.0 + +- Initial release diff --git a/src/util.ts b/src/util.ts index bd138fa4..4d9f46bf 100644 --- a/src/util.ts +++ b/src/util.ts @@ -5,10 +5,18 @@ import { config } from './config' export const SIX_DAYS = 6 const CURRENCY_CONVERSION = { + AWCBEP20: 'AWC', + AWCBSC: 'AWC', + DAIMATIC: 'DAI', + ETHOP: 'ETH', + WBTCMATIC: 'WBTC', + USDCERC20: 'USDC', USDT20: 'USDT', USDTERC20: 'USDT', USDTPOLYGON: 'USDT', USDCPOLYGON: 'USDC', + USDCTRC20: 'USDC', + USDTTRC20: 'USDT', ZADDR: 'ZEC', BCHABC: 'BCH', BCHSV: 'BSV',