Skip to content

Commit

Permalink
Merge branch 'paul/paginateCaching'
Browse files Browse the repository at this point in the history
  • Loading branch information
paullinator committed Mar 13, 2024
2 parents 2ea8e41 + 5f7f2a3 commit 2e687c4
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 130 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# edge-reports-server

## Unreleased

- changed: Paginate caching engine to prevent timeouts
- changed: Create caching engine 'initialized' document entry for each app:partner pair

## 0.1.0

- Initial release
19 changes: 4 additions & 15 deletions src/apiAnalytics.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { AnalyticsResult } from './types'

interface UtcValues {
y: number
m: number
Expand All @@ -22,19 +24,6 @@ interface Bucket {
currencyPairs: { [currencyPair: string]: number }
}

interface AnalyticsResult {
result: {
hour: Bucket[]
day: Bucket[]
month: Bucket[]
numAllTxs: number
}
appId: string
partnerId: string
start: number
end: number
}

export const getAnalytics = (
txs: DbTx[],
start: number,
Expand Down Expand Up @@ -126,14 +115,14 @@ export const getAnalytics = (
}
}

const analyticsResult = {
const analyticsResult: AnalyticsResult = {
result: {
month: monthArray,
day: dayArray,
hour: hourArray,
numAllTxs: txs.length
},
appId,
app: appId,
partnerId,
start: start,
end: end
Expand Down
166 changes: 116 additions & 50 deletions src/cacheEngine.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
import startOfMonth from 'date-fns/startOfMonth'
import sub from 'date-fns/sub'
import nano from 'nano'

import { getAnalytics } from './apiAnalytics'
import { config } from './config'
import { getAnalytic } from './dbutils'
import { asDbReq } from './dbutils'
import { initDbs } from './initDbs'
import { asApps } from './types'
import { datelog, snooze } from './util'
import {
datelog,
getStartOfMonthsAgo,
getStartOfMonthsFromNow,
snooze
} from './util'

const CACHE_UPDATE_LOOKBACK_MONTHS = config.cacheLookbackMonths ?? 3

const BULK_WRITE_SIZE = 50
const UPDATE_FREQUENCY_MS = 1000 * 60 * 30
const CACHE_UPDATE_BLOCK_MONTHS = 3

const nanoDb = nano(config.couchDbFullpath)

const TIME_PERIODS = ['hour', 'day', 'month']
const birthdayStart = getStartOfMonthsAgo(
new Date('2017-01Z').toISOString(),
0
).toISOString()

export async function cacheEngine(): Promise<void> {
datelog('Starting Cache Engine')
Expand All @@ -30,18 +39,9 @@ export async function cacheEngine(): Promise<void> {
const reportsMonth = nanoDb.use('reports_month')

while (true) {
let start
const end = new Date(Date.now()).getTime() / 1000

try {
await reportsMonth.get('initialized:initialized')
const monthStart = startOfMonth(new Date(Date.now()))
start =
sub(monthStart, { months: CACHE_UPDATE_LOOKBACK_MONTHS }).getTime() /
1000
} catch (e) {
start = new Date(2017, 1, 20).getTime() / 1000
}
let start: string
const now = new Date().toISOString()
const end = getStartOfMonthsFromNow(now, 1).toISOString()

const query = {
selector: {
Expand All @@ -64,18 +64,85 @@ export async function cacheEngine(): Promise<void> {
) {
continue
}
for (const timePeriod of TIME_PERIODS) {
const data = await getAnalytic(
start,
end,

const appAndPartnerId = `${app.appId}_${partnerId}`
const initializedDocId = `${appAndPartnerId}:00000000_initialized`

try {
await reportsMonth.get(initializedDocId)
start = getStartOfMonthsAgo(
now,
CACHE_UPDATE_LOOKBACK_MONTHS
).toISOString()
} catch (e) {
start = birthdayStart
}

for (
let localStart: string = start;
localStart < end;
localStart = getStartOfMonthsFromNow(
localStart,
CACHE_UPDATE_BLOCK_MONTHS
).toISOString()
) {
const localEnd = getStartOfMonthsFromNow(
localStart,
CACHE_UPDATE_BLOCK_MONTHS
).toISOString()

const tsStart = new Date(localStart).getTime() / 1000
const tsEnd = new Date(localEnd).getTime() / 1000
const query = {
selector: {
status: { $eq: 'complete' },
usdValue: { $gte: 0 },
timestamp: { $gte: tsStart, $lt: tsEnd }
},
fields: [
'orderId',
'depositCurrency',
'payoutCurrency',
'timestamp',
'usdValue'
],
use_index: 'timestamp-p',
sort: ['timestamp'],
limit: 1000000
}

let data
try {
datelog('find txs', appAndPartnerId, localStart, localEnd)
data = await reportsTransactions.partitionedFind(
appAndPartnerId,
query
)
} catch (e) {
datelog('Error fetching transactions', e)
console.error(e)
break
}

const dbReq = asDbReq(data)
const dbTxs = dbReq.docs

if (dbTxs.length === 0) continue

const analytic = getAnalytics(
dbTxs,
tsStart,
tsEnd,
app.appId,
[partnerId],
timePeriod,
reportsTransactions
appAndPartnerId,
TIME_PERIODS.join(',')
)
// Create cache docs
if (data.length > 0) {
const cacheResult = data[0].result[timePeriod].map(bucket => {
const { result } = analytic
if (result == null) continue
if (result.numAllTxs === 0) continue
for (const timePeriod of TIME_PERIODS) {
// Create cache docs
const cacheResult = result[timePeriod].map(bucket => {
return {
_id: `${app.appId}_${partnerId}:${bucket.isoDate}`,
timestamp: bucket.start,
Expand All @@ -93,7 +160,7 @@ export async function cacheEngine(): Promise<void> {
database = reportsMonth
}
// Fetch existing _revs of cache
if (start !== new Date(2017, 1, 20).getTime() / 1000) {
if (localStart !== birthdayStart) {
const documentIds = cacheResult.map(cache => {
return cache._id
})
Expand All @@ -113,45 +180,44 @@ export async function cacheEngine(): Promise<void> {
)

for (
let start = 0;
start < cacheResult.length;
start += BULK_WRITE_SIZE
let writeStart = 0;
writeStart < cacheResult.length;
writeStart += BULK_WRITE_SIZE
) {
const end =
start + BULK_WRITE_SIZE > cacheResult.length
const writeEnd =
writeStart + BULK_WRITE_SIZE > cacheResult.length
? cacheResult.length
: start + BULK_WRITE_SIZE
: writeStart + BULK_WRITE_SIZE
// datelog(`Bulk writing docs ${start} to ${end - 1}`)
const docs = cacheResult.slice(start, end)
const docs = cacheResult.slice(writeStart, writeEnd)
datelog(
`Bulk writing docs ${start} to ${end} of ${cacheResult.length.toString()}`
`Bulk writing docs ${writeStart} to ${writeEnd} of ${cacheResult.length.toString()}`
)
await database.bulk({ docs })
}

datelog(
`Finished updating ${timePeriod} cache for ${app.appId}_${partnerId}`
`Finished updating ${timePeriod} ${localStart} ${localEnd} cache for ${app.appId}_${partnerId}`
)
} catch (e) {
datelog('Error doing bulk cache update', e)
throw e
}
}
}
}
}
try {
await reportsMonth.get('initialized:initialized')
datelog('Cache Update Complete.')
} catch {
try {
await reportsMonth
.insert({ _id: 'initialized:initialized' })
.then(() => {
datelog('Cache Initialized.')
})
} catch {
datelog('Failed to Create Initialized Marker.')

try {
await reportsMonth.get(initializedDocId)
datelog(`${initializedDocId} Cache Update Complete`)
} catch {
try {
await reportsMonth.insert({ _id: initializedDocId }).then(() => {
datelog(`${initializedDocId} Initialized Marker Created`)
})
} catch {
datelog(`${initializedDocId} Initialized Marker Creation Failed`)
}
}
}
}
console.timeEnd('cacheEngine')
Expand Down
67 changes: 3 additions & 64 deletions src/dbutils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { datelog, promiseTimeout } from './util'
const BATCH_ADVANCE = 100
const SIX_DAYS_IN_SECONDS = 6 * 24 * 60 * 60

const asDbReq = asObject({
export const asDbReq = asObject({
docs: asArray(
asObject({
orderId: asString,
Expand All @@ -21,6 +21,8 @@ const asDbReq = asObject({
)
})

export type DbReq = ReturnType<typeof asDbReq>

export const pagination = async <T>(
txArray: any[],
partition: nano.DocumentScope<T>
Expand Down Expand Up @@ -50,69 +52,6 @@ export const pagination = async <T>(
datelog(`total errors: ${numErrors}`)
}

export const getAnalytic = async (
start: number,
end: number,
appId: string,
partnerIds: string[],
timePeriod: string,
transactionDatabase: any
): Promise<any> => {
const query = {
selector: {
status: { $eq: 'complete' },
usdValue: { $gte: 0 },
timestamp: { $gte: start, $lt: end }
},
fields: [
'orderId',
'depositCurrency',
'payoutCurrency',
'timestamp',
'usdValue'
],
use_index: 'timestamp-p',
sort: ['timestamp'],
limit: 1000000
}
const results: any[] = []
const promises: Array<Promise<any>> = []
try {
for (const partnerId of partnerIds) {
const appAndPartnerId = `${appId}_${partnerId}`
const result = transactionDatabase
.partitionedFind(appAndPartnerId, query)
.then(data => {
const analytic = getAnalytics(
asDbReq(data).docs,
start,
end,
appId,
appAndPartnerId,
timePeriod
)
if (analytic.result.numAllTxs > 0) results.push(analytic)
})
promises.push(result)
}
console.time(`${appId} promiseAll`)
await Promise.all(promises)
console.timeEnd(`${appId} promiseAll`)
return results.sort((a, b) => {
if (a.pluginId < b.pluginId) {
return -1
}
if (a.pluginId > b.pluginId) {
return 1
}
return 0
})
} catch (e) {
console.log(e)
return `Internal server error.`
}
}

export const cacheAnalytic = async (
start: number,
end: number,
Expand Down
3 changes: 2 additions & 1 deletion src/initDbs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ const transactionIndexFields: string[][] = [
['status', 'payoutCurrency', 'isoDate'],
['status', 'usdValue'],
['status', 'usdvalue', 'timestamp'],
['usdValue']
['usdValue'],
['timestamp']
]

interface Index {
Expand Down
Loading

0 comments on commit 2e687c4

Please sign in to comment.