diff --git a/x-pack/platform/plugins/shared/task_manager/server/plugin.ts b/x-pack/platform/plugins/shared/task_manager/server/plugin.ts index e8ed5aefbe6f9..13b4f51cdc880 100644 --- a/x-pack/platform/plugins/shared/task_manager/server/plugin.ts +++ b/x-pack/platform/plugins/shared/task_manager/server/plugin.ts @@ -356,7 +356,6 @@ export class TaskManagerPlugin taskStore, usageCounter: this.usageCounter, middleware: this.middleware, - elasticsearchAndSOAvailability$: this.elasticsearchAndSOAvailability$!, ...managedConfiguration, taskPartitioner, }); diff --git a/x-pack/platform/plugins/shared/task_manager/server/polling_lifecycle.test.ts b/x-pack/platform/plugins/shared/task_manager/server/polling_lifecycle.test.ts index 1ccbe57debe24..27bf2f3ba6feb 100644 --- a/x-pack/platform/plugins/shared/task_manager/server/polling_lifecycle.test.ts +++ b/x-pack/platform/plugins/shared/task_manager/server/polling_lifecycle.test.ts @@ -139,26 +139,16 @@ describe('TaskPollingLifecycle', () => { }, }); - test('begins polling once the ES and SavedObjects services are available', () => { - const elasticsearchAndSOAvailability$ = new Subject(); - new TaskPollingLifecycle({ ...taskManagerOpts, elasticsearchAndSOAvailability$ }); - - clock.tick(150); - expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).not.toHaveBeenCalled(); - - elasticsearchAndSOAvailability$.next(true); - - clock.tick(150); + test('begins polling', () => { + new TaskPollingLifecycle(taskManagerOpts); expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled(); }); test('provides TaskClaiming with the capacity available when strategy = CLAIM_STRATEGY_UPDATE_BY_QUERY', () => { - const elasticsearchAndSOAvailability$ = new Subject(); const capacity$ = new Subject(); new TaskPollingLifecycle({ ...taskManagerOpts, - elasticsearchAndSOAvailability$, capacityConfiguration$: capacity$, }); @@ -182,13 +172,11 @@ describe('TaskPollingLifecycle', () => { }); test('provides TaskClaiming with the capacity available when strategy = CLAIM_STRATEGY_MGET', () => { - const elasticsearchAndSOAvailability$ = new Subject(); const capacity$ = new Subject(); new TaskPollingLifecycle({ ...taskManagerOpts, config: { ...taskManagerOpts.config, claim_strategy: CLAIM_STRATEGY_MGET }, - elasticsearchAndSOAvailability$, capacityConfiguration$: capacity$, }); @@ -213,26 +201,8 @@ describe('TaskPollingLifecycle', () => { }); describe('stop', () => { - test('stops polling once the ES and SavedObjects services become unavailable', () => { - const elasticsearchAndSOAvailability$ = new Subject(); - new TaskPollingLifecycle({ elasticsearchAndSOAvailability$, ...taskManagerOpts }); - - elasticsearchAndSOAvailability$.next(true); - - clock.tick(150); - expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled(); - - elasticsearchAndSOAvailability$.next(false); - - mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable.mockClear(); - clock.tick(150); - expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).not.toHaveBeenCalled(); - }); - test('stops polling if stop() is called', () => { - const elasticsearchAndSOAvailability$ = new Subject(); const pollingLifecycle = new TaskPollingLifecycle({ - elasticsearchAndSOAvailability$, ...taskManagerOpts, config: { ...taskManagerOpts.config, @@ -240,10 +210,6 @@ describe('TaskPollingLifecycle', () => { }, }); - expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalledTimes(0); - elasticsearchAndSOAvailability$.next(true); - - clock.tick(50); expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalledTimes(1); pollingLifecycle.stop(); @@ -251,30 +217,6 @@ describe('TaskPollingLifecycle', () => { clock.tick(100); expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalledTimes(1); }); - - test('restarts polling once the ES and SavedObjects services become available again', () => { - const elasticsearchAndSOAvailability$ = new Subject(); - new TaskPollingLifecycle({ - elasticsearchAndSOAvailability$, - ...taskManagerOpts, - }); - - elasticsearchAndSOAvailability$.next(true); - - clock.tick(150); - expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled(); - - elasticsearchAndSOAvailability$.next(false); - mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable.mockClear(); - clock.tick(150); - - expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).not.toHaveBeenCalled(); - - elasticsearchAndSOAvailability$.next(true); - clock.tick(150); - - expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled(); - }); }); describe('claimAvailableTasks', () => { @@ -370,11 +312,7 @@ describe('TaskPollingLifecycle', () => { }) ) ); - const elasticsearchAndSOAvailability$ = new Subject(); - const taskPollingLifecycle = new TaskPollingLifecycle({ - ...taskManagerOpts, - elasticsearchAndSOAvailability$, - }); + const taskPollingLifecycle = new TaskPollingLifecycle(taskManagerOpts); const emittedEvents: TaskLifecycleEvent[] = []; @@ -382,7 +320,6 @@ describe('TaskPollingLifecycle', () => { emittedEvents.push(event) ); - elasticsearchAndSOAvailability$.next(true); expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled(); await retryUntil('workerUtilizationEvent emitted', () => { return !!emittedEvents.find( @@ -410,11 +347,7 @@ describe('TaskPollingLifecycle', () => { }) ) ); - const elasticsearchAndSOAvailability$ = new Subject(); - const taskPollingLifecycle = new TaskPollingLifecycle({ - ...taskManagerOpts, - elasticsearchAndSOAvailability$, - }); + const taskPollingLifecycle = new TaskPollingLifecycle(taskManagerOpts); const emittedEvents: TaskLifecycleEvent[] = []; @@ -422,7 +355,6 @@ describe('TaskPollingLifecycle', () => { emittedEvents.push(event) ); - elasticsearchAndSOAvailability$.next(true); expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled(); await retryUntil('workerUtilizationEvent emitted', () => { return !!emittedEvents.find( @@ -445,11 +377,8 @@ describe('TaskPollingLifecycle', () => { mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable.mockImplementation(() => { throw new Error('booo'); }); - const elasticsearchAndSOAvailability$ = new Subject(); - const taskPollingLifecycle = new TaskPollingLifecycle({ - ...taskManagerOpts, - elasticsearchAndSOAvailability$, - }); + + const taskPollingLifecycle = new TaskPollingLifecycle(taskManagerOpts); const emittedEvents: TaskLifecycleEvent[] = []; @@ -457,7 +386,6 @@ describe('TaskPollingLifecycle', () => { emittedEvents.push(event) ); - elasticsearchAndSOAvailability$.next(true); expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled(); await retryUntil('workerUtilizationEvent emitted', () => { return !!emittedEvents.find( @@ -478,11 +406,7 @@ describe('TaskPollingLifecycle', () => { }) ) ); - const elasticsearchAndSOAvailability$ = new Subject(); - const taskPollingLifecycle = new TaskPollingLifecycle({ - ...taskManagerOpts, - elasticsearchAndSOAvailability$, - }); + const taskPollingLifecycle = new TaskPollingLifecycle(taskManagerOpts); const emittedEvents: TaskLifecycleEvent[] = []; @@ -490,7 +414,6 @@ describe('TaskPollingLifecycle', () => { emittedEvents.push(event) ); - elasticsearchAndSOAvailability$.next(true); expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled(); await retryUntil('pollingCycleEvent emitted', () => { return !!emittedEvents.find( @@ -520,11 +443,7 @@ describe('TaskPollingLifecycle', () => { mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable.mockImplementation(() => { throw new Error('booo'); }); - const elasticsearchAndSOAvailability$ = new Subject(); - const taskPollingLifecycle = new TaskPollingLifecycle({ - ...taskManagerOpts, - elasticsearchAndSOAvailability$, - }); + const taskPollingLifecycle = new TaskPollingLifecycle(taskManagerOpts); const emittedEvents: TaskLifecycleEvent[] = []; @@ -532,7 +451,6 @@ describe('TaskPollingLifecycle', () => { emittedEvents.push(event) ); - elasticsearchAndSOAvailability$.next(true); expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled(); await retryUntil('pollingCycleEvent emitted', () => { return !!emittedEvents.find( @@ -561,11 +479,7 @@ describe('TaskPollingLifecycle', () => { }) ) ); - const elasticsearchAndSOAvailability$ = new Subject(); - const taskPollingLifecycle = new TaskPollingLifecycle({ - ...taskManagerOpts, - elasticsearchAndSOAvailability$, - }); + const taskPollingLifecycle = new TaskPollingLifecycle(taskManagerOpts); const emittedEvents: TaskLifecycleEvent[] = []; @@ -573,7 +487,6 @@ describe('TaskPollingLifecycle', () => { emittedEvents.push(event) ); - elasticsearchAndSOAvailability$.next(true); expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled(); await retryUntil('pollingCycleEvent emitted', () => { return !!emittedEvents.find( diff --git a/x-pack/platform/plugins/shared/task_manager/server/polling_lifecycle.ts b/x-pack/platform/plugins/shared/task_manager/server/polling_lifecycle.ts index 91f32d7201ea9..d502452c198d9 100644 --- a/x-pack/platform/plugins/shared/task_manager/server/polling_lifecycle.ts +++ b/x-pack/platform/plugins/shared/task_manager/server/polling_lifecycle.ts @@ -57,7 +57,6 @@ export type TaskPollingLifecycleOpts = { taskStore: TaskStore; config: TaskManagerConfig; middleware: Middleware; - elasticsearchAndSOAvailability$: Observable; executionContext: ExecutionContextStart; usageCounter?: UsageCounter; taskPartitioner: TaskPartitioner; @@ -107,8 +106,6 @@ export class TaskPollingLifecycle implements ITaskEventEmitter { - if (areESAndSOAvailable) { - // start polling for work - this.poller.start(); - } else if (!areESAndSOAvailable) { - this.logger.info( - `Stopping the task poller because Elasticsearch and/or saved-objects service became unavailable` - ); - this.poller.stop(); - this.pool.cancelRunningTasks(); - } - }); + this.poller.start(); } public get events(): Observable {