Skip to content

Commit

Permalink
refactor: config init event for first config load
Browse files Browse the repository at this point in the history
  • Loading branch information
zackpollard committed Nov 5, 2024
1 parent 5edbb93 commit 3169d92
Show file tree
Hide file tree
Showing 18 changed files with 171 additions and 142 deletions.
11 changes: 9 additions & 2 deletions server/src/interfaces/event.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ type EventMap = {
'app.bootstrap': [ImmichWorker];
'app.shutdown': [ImmichWorker];

'config.init': [{ newConfig: SystemConfig }];
// config events
'config.update': [
{
newConfig: SystemConfig;
/** When the server starts, `oldConfig` is `undefined` */
oldConfig?: SystemConfig;
oldConfig: SystemConfig;
},
];
'config.validate': [{ newConfig: SystemConfig; oldConfig: SystemConfig }];
Expand Down Expand Up @@ -89,6 +89,13 @@ export type EventItem<T extends EmitEvent> = {
server: boolean;
};

export enum BootstrapEventPriority {
// Database service should be initialized before anything else, most other services need database access
DatabaseService = -200,
// Initialise config after other bootstrap services, stop other services from using config on bootstrap
SystemConfig = 100,
}

export interface IEventRepository {
setup(options: { services: ClassConstructor<unknown>[] }): void;
emit<T extends keyof EventMap>(event: T, ...args: ArgsOf<T>): Promise<void>;
Expand Down
15 changes: 15 additions & 0 deletions server/src/repositories/map.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export class MapRepository implements IMapRepository {
}

await this.importGeodata();
console.log('Importing natural earth countries');
await this.importNaturalEarthCountries();

await this.metadataRepository.set(SystemMetadataKey.REVERSE_GEOCODING_STATE, {
Expand Down Expand Up @@ -215,13 +216,16 @@ export class MapRepository implements IMapRepository {
await queryRunner.connect();

const { resourcePaths } = this.configRepository.getEnv();
console.log('Importing geodata admin 1');
const admin1 = await this.loadAdmin(resourcePaths.geodata.admin1);
console.log('Importing geodata admin 2');
const admin2 = await this.loadAdmin(resourcePaths.geodata.admin2);

try {
await queryRunner.startTransaction();

await queryRunner.manager.clear(GeodataPlacesEntity);
console.log('Importing geodata cities500');
await this.loadCities500(queryRunner, admin1, admin2);

await queryRunner.commitTransaction();
Expand All @@ -240,15 +244,21 @@ export class MapRepository implements IMapRepository {
filePath: string,
options?: { entityFilter?: (linesplit: string[]) => boolean },
) {
let timerStart = performance.now();
const _entityFilter = options?.entityFilter ?? (() => true);
if (!existsSync(filePath)) {
this.logger.error(`Geodata file ${filePath} not found`);
throw new Error(`Geodata file ${filePath} not found`);
}
console.log('Startup:', performance.now() - timerStart);
timerStart = performance.now();

const input = createReadStream(filePath);
let bufferGeodata: QueryDeepPartialEntity<GeodataPlacesEntity>[] = [];
const lineReader = readLine.createInterface({ input });
console.log('Read file:', performance.now() - timerStart);
timerStart = performance.now();
const totalTime = performance.now();

for await (const line of lineReader) {
const lineSplit = line.split('\t');
Expand All @@ -258,11 +268,16 @@ export class MapRepository implements IMapRepository {
const geoData = lineToEntityMapper(lineSplit);
bufferGeodata.push(geoData);
if (bufferGeodata.length > 1000) {
console.log('Buffer Time:', performance.now() - timerStart);
timerStart = performance.now();
await queryRunner.manager.upsert(GeodataPlacesEntity, bufferGeodata, ['id']);
console.log('Upsert Time:', performance.now() - timerStart);
timerStart = performance.now();
bufferGeodata = [];
}
}
await queryRunner.manager.upsert(GeodataPlacesEntity, bufferGeodata, ['id']);
console.log('Total Time:', performance.now() - totalTime);
}

private async loadCities500(
Expand Down
24 changes: 9 additions & 15 deletions server/src/services/backup.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { PassThrough } from 'node:stream';
import { defaults, SystemConfig } from 'src/config';
import { StorageCore } from 'src/cores/storage.core';
import { ImmichWorker, StorageFolder } from 'src/enum';
import { IConfigRepository } from 'src/interfaces/config.interface';
import { IDatabaseRepository } from 'src/interfaces/database.interface';
import { IJobRepository, JobStatus } from 'src/interfaces/job.interface';
import { IProcessRepository } from 'src/interfaces/process.interface';
Expand All @@ -16,13 +17,14 @@ describe(BackupService.name, () => {
let sut: BackupService;

let databaseMock: Mocked<IDatabaseRepository>;
let configMock: Mocked<IConfigRepository>;
let jobMock: Mocked<IJobRepository>;
let processMock: Mocked<IProcessRepository>;
let storageMock: Mocked<IStorageRepository>;
let systemMock: Mocked<ISystemMetadataRepository>;

beforeEach(() => {
({ sut, databaseMock, jobMock, processMock, storageMock, systemMock } = newTestService(BackupService));
({ sut, configMock, databaseMock, jobMock, processMock, storageMock, systemMock } = newTestService(BackupService));
});

it('should work', () => {
Expand All @@ -32,35 +34,32 @@ describe(BackupService.name, () => {
describe('onBootstrapEvent', () => {
it('should init cron job and handle config changes', async () => {
databaseMock.tryLock.mockResolvedValue(true);
systemMock.get.mockResolvedValue(systemConfigStub.backupEnabled);

await sut.onBootstrap(ImmichWorker.API);
await sut.onConfigInit({ newConfig: systemConfigStub.backupEnabled as SystemConfig });

expect(jobMock.addCronJob).toHaveBeenCalled();
expect(systemMock.get).toHaveBeenCalled();
});

it('should not initialize backup database cron job when lock is taken', async () => {
systemMock.get.mockResolvedValue(systemConfigStub.backupEnabled);
databaseMock.tryLock.mockResolvedValue(false);

await sut.onBootstrap(ImmichWorker.API);
await sut.onConfigInit({ newConfig: systemConfigStub.backupEnabled as SystemConfig });

expect(jobMock.addCronJob).not.toHaveBeenCalled();
});

it('should not initialise backup database job when running on microservices', async () => {
await sut.onBootstrap(ImmichWorker.MICROSERVICES);
configMock.getWorker.mockReturnValue(ImmichWorker.MICROSERVICES);
await sut.onConfigInit({ newConfig: systemConfigStub.backupEnabled as SystemConfig });

expect(jobMock.addCronJob).not.toHaveBeenCalled();
});
});

describe('onConfigUpdateEvent', () => {
beforeEach(async () => {
systemMock.get.mockResolvedValue(defaults);
databaseMock.tryLock.mockResolvedValue(true);
await sut.onBootstrap(ImmichWorker.API);
await sut.onConfigInit({ newConfig: defaults });
});

it('should update cron job if backup is enabled', () => {
Expand All @@ -80,14 +79,9 @@ describe(BackupService.name, () => {
expect(jobMock.updateCronJob).toHaveBeenCalled();
});

it('should do nothing if oldConfig is not provided', () => {
sut.onConfigUpdate({ newConfig: systemConfigStub.backupEnabled as SystemConfig });
expect(jobMock.updateCronJob).not.toHaveBeenCalled();
});

it('should do nothing if instance does not have the backup database lock', async () => {
databaseMock.tryLock.mockResolvedValue(false);
await sut.onBootstrap(ImmichWorker.API);
await sut.onConfigInit({ newConfig: defaults });
sut.onConfigUpdate({ newConfig: systemConfigStub.backupEnabled as SystemConfig, oldConfig: defaults });
expect(jobMock.updateCronJob).not.toHaveBeenCalled();
});
Expand Down
17 changes: 9 additions & 8 deletions server/src/services/backup.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ import { validateCronExpression } from 'src/validation';
export class BackupService extends BaseService {
private backupLock = false;

@OnEvent({ name: 'app.bootstrap' })
async onBootstrap(workerType: ImmichWorker) {
if (workerType !== ImmichWorker.API) {
@OnEvent({ name: 'config.init' })
async onConfigInit({
newConfig: {
backup: { database },
},
}: ArgOf<'config.init'>) {
if (this.worker !== ImmichWorker.API) {
return;
}
const {
backup: { database },
} = await this.getConfig({ withCache: true });

this.backupLock = await this.databaseRepository.tryLock(DatabaseLock.BackupDatabase);

Expand All @@ -36,8 +37,8 @@ export class BackupService extends BaseService {
}

@OnEvent({ name: 'config.update', server: true })
onConfigUpdate({ newConfig: { backup }, oldConfig }: ArgOf<'config.update'>) {
if (!oldConfig || !this.backupLock) {
onConfigUpdate({ newConfig: { backup } }: ArgOf<'config.update'>) {
if (!this.backupLock) {
return;
}

Expand Down
3 changes: 2 additions & 1 deletion server/src/services/database.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
VectorExtension,
VectorIndex,
} from 'src/interfaces/database.interface';
import { BootstrapEventPriority } from 'src/interfaces/event.interface';
import { BaseService } from 'src/services/base.service';

type CreateFailedArgs = { name: string; extension: string; otherName: string };
Expand Down Expand Up @@ -64,7 +65,7 @@ const RETRY_DURATION = Duration.fromObject({ seconds: 5 });
export class DatabaseService extends BaseService {
private reconnection?: NodeJS.Timeout;

@OnEvent({ name: 'app.bootstrap', priority: -200 })
@OnEvent({ name: 'app.bootstrap', priority: BootstrapEventPriority.DatabaseService })
async onBootstrap() {
const version = await this.databaseRepository.getPostgresVersion();
const current = semver.coerce(version);
Expand Down
2 changes: 1 addition & 1 deletion server/src/services/job.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ describe(JobService.name, () => {

describe('onConfigUpdate', () => {
it('should update concurrency', () => {
sut.onConfigUpdate({ oldConfig: defaults, newConfig: defaults });
sut.onConfigInitOrUpdate({ newConfig: defaults });

expect(jobMock.setConcurrency).toHaveBeenCalledTimes(15);
expect(jobMock.setConcurrency).toHaveBeenNthCalledWith(5, QueueName.FACIAL_RECOGNITION, 1);
Expand Down
3 changes: 2 additions & 1 deletion server/src/services/job.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ const asJobItem = (dto: JobCreateDto): JobItem => {

@Injectable()
export class JobService extends BaseService {
@OnEvent({ name: 'config.init' })
@OnEvent({ name: 'config.update', server: true })
onConfigUpdate({ newConfig: config }: ArgOf<'config.update'>) {
onConfigInitOrUpdate({ newConfig: config }: ArgOf<'config.init'>) {
if (this.worker !== ImmichWorker.MICROSERVICES) {
return;
}
Expand Down
Loading

0 comments on commit 3169d92

Please sign in to comment.