Skip to content

Commit

Permalink
fix(core): Ensure worker stops picking up new jobs while shutting down (
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov authored Mar 10, 2025
1 parent cbf2476 commit 4fe2495
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
4 changes: 2 additions & 2 deletions packages/cli/src/scaling/__tests__/scaling.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ describe('ScalingService', () => {
});

describe('if worker', () => {
it('should wait for running jobs to finish', async () => {
it('should pause queue and wait for running jobs to finish', async () => {
// @ts-expect-error readonly property
instanceSettings.instanceType = 'worker';
await scalingService.setupQueue();
Expand All @@ -211,7 +211,7 @@ describe('ScalingService', () => {
await scalingService.stop();

expect(getRunningJobsCountSpy).toHaveBeenCalled();
expect(queue.pause).not.toHaveBeenCalled();
expect(queue.pause).toHaveBeenCalled();
expect(stopQueueRecoverySpy).not.toHaveBeenCalled();
});
});
Expand Down
12 changes: 8 additions & 4 deletions packages/cli/src/scaling/scaling.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,21 @@ export class ScalingService {
else if (instanceType === 'worker') await this.stopWorker();
}

private async pauseQueue() {
await this.queue.pause(true, true); // no more jobs will be enqueued or picked up
this.logger.debug('Paused queue');
}

private async stopMain() {
if (this.instanceSettings.isSingleMain) {
await this.queue.pause(true, true); // no more jobs will be picked up
this.logger.debug('Queue paused');
}
if (this.instanceSettings.isSingleMain) await this.pauseQueue();

if (this.queueRecoveryContext.timeout) this.stopQueueRecovery();
if (this.isQueueMetricsEnabled) this.stopQueueMetrics();
}

private async stopWorker() {
await this.pauseQueue();

let count = 0;

while (this.getRunningJobsCount() !== 0) {
Expand Down

0 comments on commit 4fe2495

Please sign in to comment.