Skip to content

Commit

Permalink
Ignore SO service and ES status when polling for tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
ersin-erdal committed Jan 19, 2025
1 parent ba92d08 commit 47d16ba
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ export class TaskManagerPlugin
taskStore,
usageCounter: this.usageCounter,
middleware: this.middleware,
elasticsearchAndSOAvailability$: this.elasticsearchAndSOAvailability$!,
...managedConfiguration,
taskPartitioner,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,26 +139,16 @@ describe('TaskPollingLifecycle', () => {
},
});

test('begins polling once the ES and SavedObjects services are available', () => {
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
new TaskPollingLifecycle({ ...taskManagerOpts, elasticsearchAndSOAvailability$ });

clock.tick(150);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).not.toHaveBeenCalled();

elasticsearchAndSOAvailability$.next(true);

clock.tick(150);
test('begins polling', () => {
new TaskPollingLifecycle(taskManagerOpts);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled();
});

test('provides TaskClaiming with the capacity available when strategy = CLAIM_STRATEGY_UPDATE_BY_QUERY', () => {
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
const capacity$ = new Subject<number>();

new TaskPollingLifecycle({
...taskManagerOpts,
elasticsearchAndSOAvailability$,
capacityConfiguration$: capacity$,
});

Expand All @@ -182,13 +172,11 @@ describe('TaskPollingLifecycle', () => {
});

test('provides TaskClaiming with the capacity available when strategy = CLAIM_STRATEGY_MGET', () => {
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
const capacity$ = new Subject<number>();

new TaskPollingLifecycle({
...taskManagerOpts,
config: { ...taskManagerOpts.config, claim_strategy: CLAIM_STRATEGY_MGET },
elasticsearchAndSOAvailability$,
capacityConfiguration$: capacity$,
});

Expand All @@ -213,68 +201,22 @@ describe('TaskPollingLifecycle', () => {
});

describe('stop', () => {
test('stops polling once the ES and SavedObjects services become unavailable', () => {
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
new TaskPollingLifecycle({ elasticsearchAndSOAvailability$, ...taskManagerOpts });

elasticsearchAndSOAvailability$.next(true);

clock.tick(150);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled();

elasticsearchAndSOAvailability$.next(false);

mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable.mockClear();
clock.tick(150);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).not.toHaveBeenCalled();
});

test('stops polling if stop() is called', () => {
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
const pollingLifecycle = new TaskPollingLifecycle({
elasticsearchAndSOAvailability$,
...taskManagerOpts,
config: {
...taskManagerOpts.config,
poll_interval: 100,
},
});

expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalledTimes(0);
elasticsearchAndSOAvailability$.next(true);

clock.tick(50);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalledTimes(1);

pollingLifecycle.stop();

clock.tick(100);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalledTimes(1);
});

test('restarts polling once the ES and SavedObjects services become available again', () => {
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
new TaskPollingLifecycle({
elasticsearchAndSOAvailability$,
...taskManagerOpts,
});

elasticsearchAndSOAvailability$.next(true);

clock.tick(150);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled();

elasticsearchAndSOAvailability$.next(false);
mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable.mockClear();
clock.tick(150);

expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).not.toHaveBeenCalled();

elasticsearchAndSOAvailability$.next(true);
clock.tick(150);

expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled();
});
});

describe('claimAvailableTasks', () => {
Expand Down Expand Up @@ -370,19 +312,14 @@ describe('TaskPollingLifecycle', () => {
})
)
);
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
const taskPollingLifecycle = new TaskPollingLifecycle({
...taskManagerOpts,
elasticsearchAndSOAvailability$,
});
const taskPollingLifecycle = new TaskPollingLifecycle(taskManagerOpts);

const emittedEvents: TaskLifecycleEvent[] = [];

taskPollingLifecycle.events.subscribe((event: TaskLifecycleEvent) =>
emittedEvents.push(event)
);

elasticsearchAndSOAvailability$.next(true);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled();
await retryUntil('workerUtilizationEvent emitted', () => {
return !!emittedEvents.find(
Expand Down Expand Up @@ -410,19 +347,14 @@ describe('TaskPollingLifecycle', () => {
})
)
);
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
const taskPollingLifecycle = new TaskPollingLifecycle({
...taskManagerOpts,
elasticsearchAndSOAvailability$,
});
const taskPollingLifecycle = new TaskPollingLifecycle(taskManagerOpts);

const emittedEvents: TaskLifecycleEvent[] = [];

taskPollingLifecycle.events.subscribe((event: TaskLifecycleEvent) =>
emittedEvents.push(event)
);

elasticsearchAndSOAvailability$.next(true);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled();
await retryUntil('workerUtilizationEvent emitted', () => {
return !!emittedEvents.find(
Expand All @@ -445,19 +377,15 @@ describe('TaskPollingLifecycle', () => {
mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable.mockImplementation(() => {
throw new Error('booo');
});
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
const taskPollingLifecycle = new TaskPollingLifecycle({
...taskManagerOpts,
elasticsearchAndSOAvailability$,
});

const taskPollingLifecycle = new TaskPollingLifecycle(taskManagerOpts);

const emittedEvents: TaskLifecycleEvent[] = [];

taskPollingLifecycle.events.subscribe((event: TaskLifecycleEvent) =>
emittedEvents.push(event)
);

elasticsearchAndSOAvailability$.next(true);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled();
await retryUntil('workerUtilizationEvent emitted', () => {
return !!emittedEvents.find(
Expand All @@ -478,19 +406,14 @@ describe('TaskPollingLifecycle', () => {
})
)
);
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
const taskPollingLifecycle = new TaskPollingLifecycle({
...taskManagerOpts,
elasticsearchAndSOAvailability$,
});
const taskPollingLifecycle = new TaskPollingLifecycle(taskManagerOpts);

const emittedEvents: TaskLifecycleEvent[] = [];

taskPollingLifecycle.events.subscribe((event: TaskLifecycleEvent) =>
emittedEvents.push(event)
);

elasticsearchAndSOAvailability$.next(true);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled();
await retryUntil('pollingCycleEvent emitted', () => {
return !!emittedEvents.find(
Expand Down Expand Up @@ -520,19 +443,14 @@ describe('TaskPollingLifecycle', () => {
mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable.mockImplementation(() => {
throw new Error('booo');
});
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
const taskPollingLifecycle = new TaskPollingLifecycle({
...taskManagerOpts,
elasticsearchAndSOAvailability$,
});
const taskPollingLifecycle = new TaskPollingLifecycle(taskManagerOpts);

const emittedEvents: TaskLifecycleEvent[] = [];

taskPollingLifecycle.events.subscribe((event: TaskLifecycleEvent) =>
emittedEvents.push(event)
);

elasticsearchAndSOAvailability$.next(true);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled();
await retryUntil('pollingCycleEvent emitted', () => {
return !!emittedEvents.find(
Expand Down Expand Up @@ -561,19 +479,14 @@ describe('TaskPollingLifecycle', () => {
})
)
);
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
const taskPollingLifecycle = new TaskPollingLifecycle({
...taskManagerOpts,
elasticsearchAndSOAvailability$,
});
const taskPollingLifecycle = new TaskPollingLifecycle(taskManagerOpts);

const emittedEvents: TaskLifecycleEvent[] = [];

taskPollingLifecycle.events.subscribe((event: TaskLifecycleEvent) =>
emittedEvents.push(event)
);

elasticsearchAndSOAvailability$.next(true);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled();
await retryUntil('pollingCycleEvent emitted', () => {
return !!emittedEvents.find(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ export type TaskPollingLifecycleOpts = {
taskStore: TaskStore;
config: TaskManagerConfig;
middleware: Middleware;
elasticsearchAndSOAvailability$: Observable<boolean>;
executionContext: ExecutionContextStart;
usageCounter?: UsageCounter;
taskPartitioner: TaskPartitioner;
Expand Down Expand Up @@ -107,8 +106,6 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
middleware,
capacityConfiguration$,
pollIntervalConfiguration$,
// Elasticsearch and SavedObjects availability status
elasticsearchAndSOAvailability$,
config,
taskStore,
definitions,
Expand Down Expand Up @@ -193,18 +190,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven

this.subscribeToPoller(this.poller.events$);

elasticsearchAndSOAvailability$.subscribe((areESAndSOAvailable) => {
if (areESAndSOAvailable) {
// start polling for work
this.poller.start();
} else if (!areESAndSOAvailable) {
this.logger.info(
`Stopping the task poller because Elasticsearch and/or saved-objects service became unavailable`
);
this.poller.stop();
this.pool.cancelRunningTasks();
}
});
this.poller.start();
}

public get events(): Observable<TaskLifecycleEvent> {
Expand Down

0 comments on commit 47d16ba

Please sign in to comment.