diff --git a/src/config.ts b/src/config.ts index 6d32ea5..ddd68ac 100644 --- a/src/config.ts +++ b/src/config.ts @@ -20,6 +20,7 @@ const DEFAULTS = Object.freeze({ maxParallelTasks: 5, maxTaskRetries: 3, taskTimeout: 1000 * 60 * 5, // 5 min, + startupTaskTimeout: 1000 * 60 * 2, // 2 min, enableLogging: true, startupTimeout: 1000 * 90, // 90 sec exitOnNoProposals: false, @@ -35,6 +36,7 @@ export class ExecutorConfig { readonly package?: Package | string; readonly maxParallelTasks: number; readonly taskTimeout: number; + readonly startupTaskTimeout: number; readonly budget: number; readonly subnetTag: string; readonly networkIp?: string; @@ -87,6 +89,7 @@ export class ExecutorConfig { this.budget = options.budget || DEFAULTS.budget; this.maxParallelTasks = options.maxParallelTasks || DEFAULTS.maxParallelTasks; this.taskTimeout = options.taskTimeout || DEFAULTS.taskTimeout; + this.startupTaskTimeout = options.startupTimeout || DEFAULTS.startupTaskTimeout; this.subnetTag = options.subnetTag || processEnv.env?.YAGNA_SUBNET || DEFAULTS.subnetTag; this.networkIp = options.networkIp; this.logger = (() => { diff --git a/src/executor.spec.ts b/src/executor.spec.ts index ef2c98c..31d391c 100644 --- a/src/executor.spec.ts +++ b/src/executor.spec.ts @@ -187,6 +187,7 @@ describe("Task Executor", () => { activityReadySetupFunctions: [], maxRetries: 0, timeout: 300000, + startupTimeout: 120000, }); await executor.shutdown(); }); @@ -209,6 +210,7 @@ describe("Task Executor", () => { activityReadySetupFunctions: [], maxRetries: 0, timeout: 300000, + startupTimeout: 120000, }); await executor.shutdown(); }); diff --git a/src/executor.ts b/src/executor.ts index de1283b..0b1edc6 100644 --- a/src/executor.ts +++ b/src/executor.ts @@ -76,6 +76,13 @@ export type ExecutorOptions = { * that meets these criteria may take a bit longer. */ startupTimeout?: number; + /** + * Timeout for waiting for signing an agreement with an available provider from the moment the task initiated. + * This parameter is expressed in ms. Default is 120_000 (2 minutes). + * If it is not possible to sign an agreement within the specified time, + * the task will stop with an error and will be queued to be retried if the `maxTaskRetries` parameter > 0 + */ + taskStartupTimeout?: number; /** * If set to `true`, the executor will exit with an error when no proposals are accepted. * You can customize how long the executor will wait for proposals using the `startupTimeout` parameter. @@ -405,6 +412,7 @@ export class TaskExecutor { task = new Task((++this.lastTaskIndex).toString(), worker, { maxRetries: options?.maxRetries ?? this.options.maxTaskRetries, timeout: options?.timeout ?? this.options.taskTimeout, + startupTimeout: options?.startupTimeout ?? this.options.startupTaskTimeout, activityReadySetupFunctions: this.activityReadySetupFunctions, }); this.taskQueue.addToEnd(task); diff --git a/src/task.spec.ts b/src/task.spec.ts index 4438cf1..6cc1dde 100644 --- a/src/task.spec.ts +++ b/src/task.spec.ts @@ -1,6 +1,7 @@ import { Task, TaskState } from "./task"; -import { Activity, Result, ResultState } from "@golem-sdk/golem-js"; +import { Activity, GolemTimeoutError, Result, ResultState } from "@golem-sdk/golem-js"; import { instance, mock } from "@johanblumenberg/ts-mockito"; +import { sleep } from "./utils"; describe("Task", function () { const worker = async () => null; @@ -52,4 +53,24 @@ describe("Task", function () { task.stop(undefined, error, true); expect(task.getState()).toEqual(TaskState.Retry); }); + + it("should stop the task with a timeout error if the task does not complete within the specified time", async () => { + const task = new Task("1", worker, { timeout: 1, maxRetries: 0 }); + task.start(activity); + await sleep(2, true); + expect(task.getError()).toEqual(new GolemTimeoutError("Task 1 timeout.")); + expect(task.getState() === TaskState.Rejected); + }); + + it("should stop the task with a timeout error if the task does not started within the specified time", async () => { + const task = new Task("1", worker, { startupTimeout: 1, maxRetries: 0 }); + task.init(); + await sleep(2, true); + expect(task.getError()).toEqual( + new GolemTimeoutError( + "Task startup 1 timeout. Failed to sign an agreement with the provider within the specified time", + ), + ); + expect(task.getState() === TaskState.Rejected); + }); }); diff --git a/src/task.ts b/src/task.ts index 3a8b356..e971781 100644 --- a/src/task.ts +++ b/src/task.ts @@ -19,8 +19,10 @@ export enum TaskState { export type TaskOptions = { /** maximum number of retries if task failed due to provider reason, default = 5 */ maxRetries?: number; - /** timeout in ms for task execution, including retries, default = 300_000 (5min) */ + /** timeout in ms for task execution, measured for one attempt from start to stop, default = 300_000 (5min) */ timeout?: number; + /** timeout in ms for task startup, measured from initialization to start, default = 120_000 (2min) */ + startupTimeout?: number; /** array of setup functions to run on each activity */ activityReadySetupFunctions?: Worker[]; }; @@ -37,6 +39,7 @@ export type TaskDetails = { const DEFAULTS = { MAX_RETRIES: 5, TIMEOUT: 1000 * 60 * 5, + STARTUP_TIMEOUT: 1000 * 60 * 2, }; /** @@ -51,7 +54,9 @@ export class Task implements QueueableTask { private retriesCount = 0; private listeners = new Set<(state: TaskState) => void>(); private timeoutId?: NodeJS.Timeout; + private startupTimeoutId?: NodeJS.Timeout; private readonly timeout: number; + private readonly startupTimeout: number; private readonly maxRetries: number; private readonly activityReadySetupFunctions: Worker[]; private activity?: Activity; @@ -63,6 +68,7 @@ export class Task implements QueueableTask { options?: TaskOptions, ) { this.timeout = options?.timeout ?? DEFAULTS.TIMEOUT; + this.startupTimeout = options?.startupTimeout ?? DEFAULTS.STARTUP_TIMEOUT; this.maxRetries = options?.maxRetries ?? DEFAULTS.MAX_RETRIES; this.activityReadySetupFunctions = options?.activityReadySetupFunctions ?? []; if (this.maxRetries < 0) { @@ -79,10 +85,22 @@ export class Task implements QueueableTask { } init() { this.state = TaskState.Queued; + this.startupTimeoutId = setTimeout( + () => + this.stop( + undefined, + new GolemTimeoutError( + `Task startup ${this.id} timeout. Failed to sign an agreement with the provider within the specified time`, + ), + true, + ), + this.startupTimeout, + ); } start(activity: Activity, networkNode?: NetworkNode) { this.state = TaskState.Pending; + clearTimeout(this.startupTimeoutId); this.activity = activity; this.networkNode = networkNode; this.listeners.forEach((listener) => listener(this.state)); @@ -96,6 +114,7 @@ export class Task implements QueueableTask { return; } clearTimeout(this.timeoutId); + clearTimeout(this.startupTimeoutId); if (error) { this.error = error; if (retry && this.retriesCount < this.maxRetries) {