Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(server): filter on-event handlers by worker #14085

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion server/src/decorators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { SetMetadata, applyDecorators } from '@nestjs/common';
import { ApiExtension, ApiOperation, ApiProperty, ApiTags } from '@nestjs/swagger';
import _ from 'lodash';
import { ADDED_IN_PREFIX, DEPRECATED_IN_PREFIX, LIFECYCLE_EXTENSION } from 'src/constants';
import { MetadataKey } from 'src/enum';
import { ImmichWorker, MetadataKey } from 'src/enum';
import { EmitEvent } from 'src/interfaces/event.interface';
import { JobName, QueueName } from 'src/interfaces/job.interface';
import { setUnion } from 'src/utils/set';
Expand Down Expand Up @@ -120,6 +120,8 @@ export type EventConfig = {
server?: boolean;
/** lower value has higher priority, defaults to 0 */
priority?: number;
/** register events for these workers, defaults to all workers */
workers?: ImmichWorker[];
};
export const OnEvent = (config: EventConfig) => SetMetadata(MetadataKey.EVENT_CONFIG, config);

Expand Down
16 changes: 14 additions & 2 deletions server/src/repositories/event.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import { ClassConstructor } from 'class-transformer';
import _ from 'lodash';
import { Server, Socket } from 'socket.io';
import { EventConfig } from 'src/decorators';
import { MetadataKey } from 'src/enum';
import { ImmichWorker, MetadataKey } from 'src/enum';
import { IConfigRepository } from 'src/interfaces/config.interface';
import {
ArgsOf,
ClientEventMap,
Expand All @@ -23,6 +24,7 @@ import {
ServerEvents,
} from 'src/interfaces/event.interface';
import { ILoggerRepository } from 'src/interfaces/logger.interface';
import { ConfigRepository } from 'src/repositories/config.repository';
import { AuthService } from 'src/services/auth.service';
import { handlePromiseError } from 'src/utils/misc';

Expand Down Expand Up @@ -50,6 +52,7 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect

constructor(
private moduleRef: ModuleRef,
@Inject(IConfigRepository) private configRepository: ConfigRepository,
@Inject(ILoggerRepository) private logger: ILoggerRepository,
) {
this.logger.setContext(EventRepository.name);
Expand All @@ -58,6 +61,10 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect
setup({ services }: { services: ClassConstructor<unknown>[] }) {
const reflector = this.moduleRef.get(Reflector, { strict: false });
const items: Item<EmitEvent>[] = [];
const worker = this.configRepository.getWorker();
if (!worker) {
throw new Error('Unable to determine worker type');
}

// discovery
for (const Service of services) {
Expand All @@ -79,6 +86,11 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect
continue;
}

const workers = event.workers ?? Object.values(ImmichWorker);
if (!workers.includes(worker)) {
continue;
}

items.push({
event: event.name,
priority: event.priority || 0,
Expand Down Expand Up @@ -133,7 +145,7 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect
await client.leave(client.nsp.name);
}

private addHandler<T extends EmitEvent>(item: EventItem<T>): void {
private addHandler<T extends EmitEvent>(item: Item<T>): void {
const event = item.event;

if (!this.emitHandlers[event]) {
Expand Down
6 changes: 1 addition & 5 deletions server/src/services/backup.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,12 @@ import { handlePromiseError } from 'src/utils/misc';
export class BackupService extends BaseService {
private backupLock = false;

@OnEvent({ name: 'config.init' })
@OnEvent({ name: 'config.init', workers: [ImmichWorker.MICROSERVICES] })
async onConfigInit({
newConfig: {
backup: { database },
},
}: ArgOf<'config.init'>) {
if (this.worker !== ImmichWorker.API) {
return;
}

this.backupLock = await this.databaseRepository.tryLock(DatabaseLock.BackupDatabase);

if (this.backupLock) {
Expand Down
6 changes: 1 addition & 5 deletions server/src/services/job.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,8 @@ const asJobItem = (dto: JobCreateDto): JobItem => {

@Injectable()
export class JobService extends BaseService {
@OnEvent({ name: 'config.init' })
@OnEvent({ name: 'config.init', workers: [ImmichWorker.MICROSERVICES] })
onConfigInit({ newConfig: config }: ArgOf<'config.init'>) {
if (this.worker !== ImmichWorker.MICROSERVICES) {
return;
}

this.logger.debug(`Updating queue concurrency settings`);
for (const queueName of Object.values(QueueName)) {
let concurrency = 1;
Expand Down
7 changes: 0 additions & 7 deletions server/src/services/library.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,6 @@ describe(LibraryService.name, () => {

expect(cronMock.create).not.toHaveBeenCalled();
});

it('should not initialize watcher or library scan job when running on api', async () => {
configMock.getWorker.mockReturnValue(ImmichWorker.API);
await sut.onConfigInit({ newConfig: systemConfigStub.libraryScan as SystemConfig });

expect(cronMock.create).not.toHaveBeenCalled();
});
});

describe('onConfigUpdateEvent', () => {
Expand Down
6 changes: 1 addition & 5 deletions server/src/services/library.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,12 @@ export class LibraryService extends BaseService {
private lock = false;
private watchers: Record<string, () => Promise<void>> = {};

@OnEvent({ name: 'config.init' })
@OnEvent({ name: 'config.init', workers: [ImmichWorker.MICROSERVICES] })
async onConfigInit({
newConfig: {
library: { watch, scan },
},
}: ArgOf<'config.init'>) {
if (this.worker !== ImmichWorker.MICROSERVICES) {
return;
}

// This ensures that library watching only occurs in one microservice
this.lock = await this.databaseRepository.tryLock(DatabaseLock.Library);

Expand Down
9 changes: 0 additions & 9 deletions server/src/services/metadata.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,6 @@ describe(MetadataService.name, () => {
expect(mapMock.init).toHaveBeenCalledTimes(1);
expect(jobMock.resume).toHaveBeenCalledTimes(1);
});

it('should return if running on api', async () => {
configMock.getWorker.mockReturnValue(ImmichWorker.API);
await sut.onBootstrap();

expect(jobMock.pause).not.toHaveBeenCalled();
expect(mapMock.init).not.toHaveBeenCalled();
expect(jobMock.resume).not.toHaveBeenCalled();
});
});

describe('handleLivePhotoLinking', () => {
Expand Down
5 changes: 1 addition & 4 deletions server/src/services/metadata.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,8 @@ const validateRange = (value: number | undefined, min: number, max: number): Non

@Injectable()
export class MetadataService extends BaseService {
@OnEvent({ name: 'app.bootstrap' })
@OnEvent({ name: 'app.bootstrap', workers: [ImmichWorker.MICROSERVICES] })
async onBootstrap() {
if (this.worker !== ImmichWorker.MICROSERVICES) {
return;
}
this.logger.log('Bootstrapping metadata service');
await this.init();
}
Expand Down
29 changes: 0 additions & 29 deletions server/src/services/smart-info.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,6 @@ describe(SmartInfoService.name, () => {
});

describe('onConfigInit', () => {
it('should return if not microservices', async () => {
configMock.getWorker.mockReturnValue(ImmichWorker.API);
await sut.onConfigInit({ newConfig: systemConfigStub.machineLearningEnabled as SystemConfig });

expect(searchMock.getDimensionSize).not.toHaveBeenCalled();
expect(searchMock.setDimensionSize).not.toHaveBeenCalled();
expect(searchMock.deleteAllSearchEmbeddings).not.toHaveBeenCalled();
expect(jobMock.getQueueStatus).not.toHaveBeenCalled();
expect(jobMock.pause).not.toHaveBeenCalled();
expect(jobMock.waitForQueueCompletion).not.toHaveBeenCalled();
expect(jobMock.resume).not.toHaveBeenCalled();
});

it('should return if machine learning is disabled', async () => {
await sut.onConfigInit({ newConfig: systemConfigStub.machineLearningDisabled as SystemConfig });

Expand Down Expand Up @@ -136,22 +123,6 @@ describe(SmartInfoService.name, () => {
});

describe('onConfigUpdateEvent', () => {
it('should return if not microservices', async () => {
configMock.getWorker.mockReturnValue(ImmichWorker.API);
await sut.onConfigUpdate({
newConfig: systemConfigStub.machineLearningEnabled as SystemConfig,
oldConfig: systemConfigStub.machineLearningEnabled as SystemConfig,
});

expect(searchMock.getDimensionSize).not.toHaveBeenCalled();
expect(searchMock.setDimensionSize).not.toHaveBeenCalled();
expect(searchMock.deleteAllSearchEmbeddings).not.toHaveBeenCalled();
expect(jobMock.getQueueStatus).not.toHaveBeenCalled();
expect(jobMock.pause).not.toHaveBeenCalled();
expect(jobMock.waitForQueueCompletion).not.toHaveBeenCalled();
expect(jobMock.resume).not.toHaveBeenCalled();
});

it('should return if machine learning is disabled', async () => {
systemMock.get.mockResolvedValue(systemConfigStub.machineLearningDisabled);

Expand Down
6 changes: 3 additions & 3 deletions server/src/services/smart-info.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import { usePagination } from 'src/utils/pagination';

@Injectable()
export class SmartInfoService extends BaseService {
@OnEvent({ name: 'config.init' })
@OnEvent({ name: 'config.init', workers: [ImmichWorker.MICROSERVICES] })
async onConfigInit({ newConfig }: ArgOf<'config.init'>) {
await this.init(newConfig);
}

@OnEvent({ name: 'config.update', server: true })
@OnEvent({ name: 'config.update', workers: [ImmichWorker.MICROSERVICES], server: true })
async onConfigUpdate({ oldConfig, newConfig }: ArgOf<'config.update'>) {
await this.init(newConfig, oldConfig);
}
Expand All @@ -35,7 +35,7 @@ export class SmartInfoService extends BaseService {
}

private async init(newConfig: SystemConfig, oldConfig?: SystemConfig) {
if (this.worker !== ImmichWorker.MICROSERVICES || !isSmartSearchEnabled(newConfig.machineLearning)) {
if (!isSmartSearchEnabled(newConfig.machineLearning)) {
return;
}

Expand Down
Loading