Skip to content

Commit

Permalink
Better caching logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Tpessia committed Aug 2, 2024
1 parent c2f364d commit 9c819a2
Show file tree
Hide file tree
Showing 13 changed files with 157 additions and 305 deletions.
2 changes: 1 addition & 1 deletion .github/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ repository:
default_branch: main
name: dados-financeiros
description: Repositório de Fontes de Dados Financeiros
topics: finance, stock-market, market-data, brasil, financial-data
topics: finance, stock-market, market-data, financial-data, brazil, brasil
homepage: https://InvestTester.com/api
private: false
has_issues: true
Expand Down
61 changes: 10 additions & 51 deletions api/src/@utils/async/promise.ts
Original file line number Diff line number Diff line change
@@ -1,64 +1,21 @@
// Similar to Promise.all, but parallel
export function promiseParallel<T>(tasks: (() => Promise<T>)[], concurrencyLimit: number): Promise<T[]> {
return new Promise<T[]>((res, rej) => {
if (tasks.length === 0) res([]);

const results: T[] = [];
const pool: Promise<T>[] = [];
let canceled: boolean = false;

tasks.slice(0, concurrencyLimit).map(e => runPromise(e));

function runPromise(task: () => Promise<T>): Promise<T> {
const promise = task();

pool.push(promise);

promise.then(r => {
if (canceled) return;

results.push(r);

const poolIndex = pool.indexOf(promise);
pool.splice(poolIndex, 1);

if (tasks.length === results.length)
res(results);

const nextIndex = concurrencyLimit + results.length - 1;
const nextTask = tasks[nextIndex];

if (!nextTask) return;

runPromise(nextTask);
}).catch(err => {
canceled = true;
rej(err);
});

return promise;
}
});
}

// Similar to Promise.all, but parallel and without rejection
export function promiseParallelAll<T, TRej = Error>(tasks: (() => Promise<T>)[], concurrencyLimit: number): Promise<(T | TRej)[]> {
export function promiseParallel<T, TRej = T>(tasks: (() => Promise<T>)[], concurrencyLimit: number, noReject: boolean = false): Promise<(T | TRej)[]> {
return new Promise<(T | TRej)[]>((res, rej) => {
if (tasks.length === 0) res([]);

const results: (T | TRej)[] = [];
const pool: Promise<T>[] = [];
const pool: Promise<T | TRej>[] = [];
let canceled: boolean = false;

tasks.slice(0, concurrencyLimit).map(e => runPromise(e));

function runPromise(task: () => Promise<T>): Promise<T> {
const promise = task();
function runPromise(task: () => Promise<T>): Promise<T | TRej> {
let promise: Promise<T | TRej> = task();

pool.push(promise);

promise.catch((e: TRej) => e)
.then(r => {
if (noReject) promise = promise.catch((e: TRej) => e);

promise.then(r => {
if (canceled) return;

results.push(r);
Expand All @@ -75,7 +32,9 @@ export function promiseParallelAll<T, TRej = Error>(tasks: (() => Promise<T>)[],
if (!nextTask) return;

runPromise(nextTask);
});
})

if (!noReject) promise.catch(err => { canceled = true; rej(err); });

return promise;
}
Expand Down
186 changes: 27 additions & 159 deletions api/src/@utils/cache/memoize.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { jsonDateReviver, readFileSync, runOrResolve, tryParseJson, tryStringifyJson, TypedFunction, TypedFunctionWrapper, writeFileSync } from '@/@utils';
import { createHash } from 'crypto';
import * as fs from 'fs';
import { isEqual, merge } from 'lodash';
import { cloneDeep, isEqual, merge } from 'lodash';
import { tmpdir } from 'os';
import * as path from 'path';

Expand All @@ -16,9 +16,9 @@ export interface MemoizeConfig {
onCall?: (config: MemoizeConfig, args: any[], cache: MemoizeCache<any, any>) => void;
cacheType?: MemoizeCacheType | MemoizeCache<any, any>;
cacheConfig?: MemoizeCacheConfig;
_instance?: Object;
_target?: Object;
_method?: PropertyDescriptor;
_instance?: Object;
}
export interface MemoizeCacheConfig {
cacheDir?: string;
Expand All @@ -36,24 +36,25 @@ export function isMemoizeCacheType(value: any): value is MemoizeCacheType {
}

export interface MemoizeCacheItem<F extends TypedFunction> {
date: Date,
value: ReturnType<F>,
date: Date;
value: ReturnType<F>;
}

export type MemoizeCacheMap<F extends TypedFunction> = Record<string, MemoizeCacheItem<F>>;

export interface MemoizeCache<F extends TypedFunction, C> {
disabled?: boolean,
config?: MemoizeConfig['cacheConfig'],
cache: C,
state: any,
memoizedFunc?: Memoized<ReturnType<F>>,
init: () => void,
get: (key: string) => MemoizeCacheItem<F> | undefined,
set: (key: string, value: MemoizeCacheItem<F> | Promise<MemoizeCacheItem<F>>) => void,
delete: (key: string) => void,
flush: () => void,
invalidate: (predicate: (key: string, value: MemoizeCacheItem<F>) => boolean) => void,
type: MemoizeCacheType | string;
disabled?: boolean;
config?: MemoizeConfig['cacheConfig'];
cache: C;
state: any;
memoizedFunc?: Memoized<ReturnType<F>>;
init: () => void;
get: (key: string) => MemoizeCacheItem<F> | undefined;
set: (key: string, value: MemoizeCacheItem<F> | Promise<MemoizeCacheItem<F>>) => void;
delete: (key: string) => void;
flush: () => void;
invalidate: (predicate: (key: string, value: MemoizeCacheItem<F>) => boolean) => void;
}

// CONSTS
Expand All @@ -62,11 +63,12 @@ export const globalMemoizeConfig: Partial<MemoizeConfig> = { // HOW-TO: globalMe
cacheType: MemoizeCacheType.Memory,
cacheConfig: {
cacheDir: path.join(tmpdir(), `/${path.basename(process.cwd())}-${path.basename(process.argv[1])}`),
}
},
};

const memoizeMemory = <F extends TypedFunction>(config: MemoizeConfig) => {
const memoryCache: MemoizeCache<F, MemoizeCacheMap<F>> = {
type: MemoizeCacheType.Memory,
config: config.cacheConfig,
cache: {},
state: {},
Expand Down Expand Up @@ -124,13 +126,16 @@ const memoizeStorage = <F extends TypedFunction>(config: MemoizeConfig) => {
const newFileInfo = getFileInfo(storageCache.config.cachePath);
storageCache.state[fileInfoKey] = newFileInfo;
return storageCache.cache;
}, err => err);
}, err => {
return err; // prevent unhandled promise rejection
});
}

if (config.cacheConfig.cacheDir == null && config.cacheConfig.cachePath == null) throw new Error('Invalid cacheDir (null)');
const defaultConfig: MemoizeCacheConfig = { cachePath: path.join(config.cacheConfig.cacheDir, `memoize-${config.funcKey(config)}.json`) };

const storageCache: MemoizeCache<F, MemoizeCacheMap<F>> = {
type: MemoizeCacheType.Storage,
config: merge(defaultConfig, config.cacheConfig),
cache: {},
state: {},
Expand Down Expand Up @@ -173,7 +178,7 @@ const cacheTypes: Record<MemoizeCacheType, (config: MemoizeConfig) => MemoizeCac
// FUNCTIONS

export function memoize<F extends TypedFunction>(func: Memoized<F>, config: MemoizeConfig): Memoized<F> {
config = merge(globalMemoizeConfig, config);
config = merge(cloneDeep(globalMemoizeConfig), config);

if (config.funcKey == null) throw new Error('Invalid funcKey');

Expand Down Expand Up @@ -220,14 +225,14 @@ export function Memoize(config?: MemoizeConfig) { // use with "@Memoize()"
configurable: true,
get() {
config ??= {} as any;
config._instance = this;
config._target = target;
config._method = descriptor;
config._instance = this;
config.funcKey ??= () => `${config._instance?.constructor?.name}:${config._method.value?.name}`;

const memoized = memoize(config._method.value.bind(config._instance), config).bind(config._instance);
const memoized = memoize(config._method.value.bind(config._instance), config);

Object.defineProperty(this, name, {
Object.defineProperty(config._instance, name, {
value: memoized,
configurable: true,
writable: true,
Expand All @@ -254,141 +259,4 @@ export function Memoize(config?: MemoizeConfig) { // use with "@Memoize()"
// const key = `${target?.constructor?.name}:${originalMethod?.name}`;
// descriptor.value = memoizeDisk(originalMethod, key);
// return descriptor;
// }




// interface DataPoint {
// value: number;
// date: Date;
// }

// interface CacheEntry {
// key: string;
// data: DataPoint[];
// }

// interface CacheResult {
// data: CacheEntry[];
// missing: { minDate: Date; maxDate: Date }[];
// }

// const cache = new Map<string, DataPoint[]>();

// function fetchData(minDate: Date, maxDate: Date): DataPoint[] {
// // Mock data
// return [{ value: 123, date: minDate }, { value: 321, date: maxDate }];
// }

// function getData(minDate: Date, maxDate: Date): DataPoint[] {
// const { data: cacheData, missing } = getDataFromCache(minDate, maxDate);
// const fetchedData = missing.map(e => fetchData(e.minDate, e.maxDate)).flatMap(e => e);

// // Flatten the cache data
// const flattenedCacheData = cacheData.map(entry => entry.data).flatMap(e => e);
// console.log(fetchedData, flattenedCacheData);

// // Combine and sort the data
// return [...fetchedData, ...flattenedCacheData].sort((a, b) => a.date.getTime() - b.date.getTime());
// }

// function getDataFromCache(minDate: Date, maxDate: Date): CacheResult {
// // (e.g. minDate: 2023-03-15, maxDate: 2023-04-20)
// const result: CacheResult = {
// data: [], // Will store cache entries that overlap with the requested date range
// missing: [], // Will store date ranges not found in the cache
// };

// // Convert cache keys to DateRange objects for easier manipulation
// // (e.g. Assume cache has: '2023-03-01>2023-03-31', '2023-04-10>2023-04-30')
// const cacheRanges = Array.from(cache.keys()).map(key => {
// const [start, end] = key.split('>');
// return {
// minDate: new Date(start),
// maxDate: new Date(end)
// };
// });
// // (e.g. cacheRanges now: [{minDate: 2023-03-01, maxDate: 2023-03-31}, {minDate: 2023-04-10, maxDate: 2023-04-30}])

// // Sort cache ranges by minDate to optimize the search process
// cacheRanges.sort((a, b) => a.minDate.getTime() - b.minDate.getTime());
// // (e.g. cacheRanges remains the same as they're already sorted)

// // Initialize currentDate to the start of the requested range
// let currentDate = new Date(minDate.getTime());
// // (e.g. currentDate: 2023-03-15)

// // Iterate through the entire requested date range
// while (currentDate <= maxDate) {
// // Find a cache range that overlaps with the current date
// const overlappingRange = cacheRanges.find(range =>
// range.minDate <= currentDate && range.maxDate >= currentDate
// );
// // (e.g. First iteration: overlappingRange is {minDate: 2023-03-01, maxDate: 2023-03-31})

// if (overlappingRange) {
// // If an overlapping range is found, add it to the result
// const key = `${overlappingRange.minDate.toISOString().slice(0, 10)}>${overlappingRange.maxDate.toISOString().slice(0, 10)}`;
// result.data.push({ key, data: cache.get(key)! });
// // (e.g. result.data now has one entry: {key: '2023-03-01>2023-03-31', data: [...]}])

// // Move currentDate to the end of the overlapping range (or maxDate if it's earlier)
// currentDate = new Date(Math.min(maxDate.getTime(), overlappingRange.maxDate.getTime()) + 86400000); // Add one day
// // (e.g. currentDate is now 2023-04-01)
// } else {
// // If no overlapping range is found, look for the next available cache range
// const nextOverlap = cacheRanges.find(range => range.minDate > currentDate);
// // (e.g. nextOverlap is {minDate: 2023-04-10, maxDate: 2023-04-30})

// // Determine the end of the missing range
// const missingEndDate = nextOverlap
// ? new Date(Math.min(maxDate.getTime(), nextOverlap.minDate.getTime() - 86400000)) // One day before next overlap
// : new Date(maxDate.getTime()); // Or maxDate if no next overlap
// // (e.g. missingEndDate is 2023-04-09)

// // Add the missing range to the result
// result.missing.push({ minDate: new Date(currentDate.getTime()), maxDate: missingEndDate });
// // (e.g. result.missing now has one entry: {minDate: 2023-04-01, maxDate: 2023-04-09})

// // Move currentDate to the start of the next day after the missing range
// currentDate = new Date(missingEndDate.getTime() + 86400000); // Add one day
// // (e.g. currentDate is now 2023-04-10)
// }
// }

// // (e.g. Final result:
// // result.data = [
// // {key: '2023-03-01>2023-03-31', data: [...]},
// // {key: '2023-04-10>2023-04-30', data: [...]}
// // ]
// // result.missing = [
// // {minDate: 2023-04-01, maxDate: 2023-04-09}
// // ])

// return result;
// }

// // Helper function to add data to the cache
// function addToCache(minDate: Date, maxDate: Date, data: DataPoint[]): void {
// const key = `${minDate.toISOString().slice(0, 10)}>${maxDate.toISOString().slice(0, 10)}`;
// cache.set(key, data);
// }

// // Example usage
// const startDate = new Date('2020-04-01');
// const endDate = new Date('2020-06-30');

// // Add some data to the cache
// addToCache(new Date('2020-04-01'), new Date('2020-04-30'), [
// { value: 100, date: new Date('2020-01-15') },
// { value: 200, date: new Date('2020-01-30') }
// ]);

// addToCache(new Date('2020-06-01'), new Date('2020-06-30'), [
// { value: 300, date: new Date('2020-03-15') },
// { value: 400, date: new Date('2020-03-30') }
// ]);

// const result = getData(startDate, endDate);
// console.log(result);
// }
5 changes: 4 additions & 1 deletion api/src/core/services/AssetTransformers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,8 @@ export function applyLeverage(data: AssetData[], leverage: number): AssetData[]
}

export function cleanUpData(data: AssetData[]): AssetData[] {
return data.map(e => ({ ...e, value: e.value > 1 ? round(e.value, 2) : e.value }));
for (let item of data) {
if (item.value > 1) item.value = round(item.value, 2);
}
return data;
}
Loading

0 comments on commit 9c819a2

Please sign in to comment.