Skip to content

Commit

Permalink
log-server: limit the number logs loaded in mem
Browse files Browse the repository at this point in the history
  • Loading branch information
QuentinRoy committed Jan 31, 2024
1 parent 0a6a337 commit a1f6832
Showing 1 changed file with 75 additions and 50 deletions.
125 changes: 75 additions & 50 deletions packages/log-server/src/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
} from 'kysely';
import { JsonObject } from 'type-fest';
import loglevel, { LogLevelDesc } from 'loglevel';
import { groupBy } from 'remeda';
import { groupBy, last, pick } from 'remeda';
import { arrayify } from './utils.js';

const __dirname = url.fileURLToPath(new URL('.', import.meta.url));
Expand Down Expand Up @@ -68,13 +68,18 @@ type Database = {

export class SQLiteStore {
#db: Kysely<Database>;
#selectQueryLimit: number;

constructor(
db: string,
{ logLevel = loglevel.getLevel() }: { logLevel?: LogLevelDesc } = {},
{
logLevel = loglevel.getLevel(),
selectQueryLimit = 1000,
}: { logLevel?: LogLevelDesc; selectQueryLimit?: number } = {},
) {
const logger = loglevel.getLogger('store');
logger.setLevel(logLevel);
this.#selectQueryLimit = selectQueryLimit;
this.#db = new Kysely({
dialect: new SqliteDialect({ database: new SQLiteDB(db) }),
log: (event) => {
Expand Down Expand Up @@ -449,63 +454,83 @@ export class SQLiteStore {
});
}

async *#getLogValues(filter: LogFilter) {
let lastRow: {
number: number;
logId: number;
experimentId: string;
runId: string;
} | null = null;
let isFirst = true;
let isDone = false;
while (!isDone) {
let result = await this.#db
.selectFrom('runLogView as l')
.innerJoin('logValue as v', 'l.logId', 'v.logId')
.$if(filter.experimentId != null, (qb) =>
qb.where('l.experimentId', 'in', arrayify(filter.experimentId, true)),
)
.$if(filter.runId != null, (qb) =>
qb.where('l.runId', 'in', arrayify(filter.runId, true)),
)
.$if(filter.type != null, (qb) =>
qb.where('l.type', 'in', arrayify(filter.type, true)),
)
.$if(!isFirst, (qb) => {
if (lastRow === null) throw new Error('lastRow is null');
return qb
.where('l.experimentId', '>=', lastRow.experimentId)
.where('l.runId', '>=', lastRow.runId)
.where('l.logNumber', '>', lastRow.number);
})
.where('l.type', 'is not', null)
.select([
'l.experimentId as experimentId',
'l.runId as runId',
'l.logId as logId',
'l.type as type',
'l.logNumber as number',
'v.name',
'v.value',
])
.$narrowType<{ type: string }>()
.orderBy('experimentId')
.orderBy('runId')
.orderBy('number')
.limit(this.#selectQueryLimit)
.execute();
isFirst = false;
lastRow = last(result) ?? null;
isDone = lastRow == null;
yield* result;
}
}

async *getLogs(filter: LogFilter = {}): AsyncGenerator<Log> {
// It would probably be better not to read everything at once because
// this could be a lot of data. Instead we could read a few yield, and
// restart with the remaining. However until this becomes a problem, this
// is good enough.
let result = await this.#db
.selectFrom('runLogView as l')
.innerJoin('logValue as v', 'l.logId', 'v.logId')
.$if(filter.experimentId != null, (qb) =>
qb.where('l.experimentId', 'in', arrayify(filter.experimentId, true)),
)
.$if(filter.runId != null, (qb) =>
qb.where('l.runId', 'in', arrayify(filter.runId, true)),
)
.$if(filter.type != null, (qb) =>
qb.where('l.type', 'in', arrayify(filter.type, true)),
)
.where('l.type', 'is not', null)
.select([
'l.experimentId as experimentId',
'l.runId as runId',
'l.logId as logId',
'l.type as type',
'l.logNumber as number',
'v.name',
'v.value',
])
.$narrowType<{ type: string }>()
.orderBy('experimentId')
.orderBy('runId')
.orderBy('number')
.execute();
const logValuesIterator = this.#getLogValues(filter);

if (result.length === 0) return;
let first = await logValuesIterator.next();
if (first.done) return;
let currentValues = [first.value];

function reconstructLog(start: number, end: number) {
let first = result[start];
function getLogFromCurrentValues() {
return {
experimentId: first.experimentId,
runId: first.runId,
type: first.type,
number: first.number,
values: reconstructValues(result.slice(start, end)),
...pick(currentValues[0], ['experimentId', 'runId', 'number', 'type']),
values: reconstructValues(currentValues),
};
}

let currentLogStart = 0;
let currentLogId = result[0].logId;
for (let i = 1; i < result.length; i++) {
let row = result[i];
if (row.logId !== currentLogId) {
yield reconstructLog(currentLogStart, i);
currentLogStart = i;
currentLogId = row.logId;
for await (let logValue of logValuesIterator) {
let logFirst = currentValues[0];
if (logValue.logId !== logFirst.logId) {
yield getLogFromCurrentValues();
currentValues = [logValue];
} else {
currentValues.push(logValue);
}
}
yield reconstructLog(currentLogStart, result.length);

yield getLogFromCurrentValues();
}

async migrateDatabase() {
Expand Down

0 comments on commit a1f6832

Please sign in to comment.