Skip to content

Commit

Permalink
Merge pull request #46 from golemfactory/feature/JST-680/task-startup…
Browse files Browse the repository at this point in the history
…-timeout

feat(task): added task startup timeout
  • Loading branch information
mgordel authored Mar 18, 2024
2 parents 8a538c6 + d7e51b9 commit 7c72f73
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 2 deletions.
3 changes: 3 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const DEFAULTS = Object.freeze({
maxParallelTasks: 5,
maxTaskRetries: 3,
taskTimeout: 1000 * 60 * 5, // 5 min,
startupTaskTimeout: 1000 * 60 * 2, // 2 min,
enableLogging: true,
startupTimeout: 1000 * 90, // 90 sec
exitOnNoProposals: false,
Expand All @@ -35,6 +36,7 @@ export class ExecutorConfig {
readonly package?: Package | string;
readonly maxParallelTasks: number;
readonly taskTimeout: number;
readonly startupTaskTimeout: number;
readonly budget: number;
readonly subnetTag: string;
readonly networkIp?: string;
Expand Down Expand Up @@ -87,6 +89,7 @@ export class ExecutorConfig {
this.budget = options.budget || DEFAULTS.budget;
this.maxParallelTasks = options.maxParallelTasks || DEFAULTS.maxParallelTasks;
this.taskTimeout = options.taskTimeout || DEFAULTS.taskTimeout;
this.startupTaskTimeout = options.startupTimeout || DEFAULTS.startupTaskTimeout;
this.subnetTag = options.subnetTag || processEnv.env?.YAGNA_SUBNET || DEFAULTS.subnetTag;
this.networkIp = options.networkIp;
this.logger = (() => {
Expand Down
2 changes: 2 additions & 0 deletions src/executor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ describe("Task Executor", () => {
activityReadySetupFunctions: [],
maxRetries: 0,
timeout: 300000,
startupTimeout: 120000,
});
await executor.shutdown();
});
Expand All @@ -209,6 +210,7 @@ describe("Task Executor", () => {
activityReadySetupFunctions: [],
maxRetries: 0,
timeout: 300000,
startupTimeout: 120000,
});
await executor.shutdown();
});
Expand Down
8 changes: 8 additions & 0 deletions src/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ export type ExecutorOptions = {
* that meets these criteria may take a bit longer.
*/
startupTimeout?: number;
/**
* Timeout for waiting for signing an agreement with an available provider from the moment the task initiated.
* This parameter is expressed in ms. Default is 120_000 (2 minutes).
* If it is not possible to sign an agreement within the specified time,
* the task will stop with an error and will be queued to be retried if the `maxTaskRetries` parameter > 0
*/
taskStartupTimeout?: number;
/**
* If set to `true`, the executor will exit with an error when no proposals are accepted.
* You can customize how long the executor will wait for proposals using the `startupTimeout` parameter.
Expand Down Expand Up @@ -405,6 +412,7 @@ export class TaskExecutor {
task = new Task((++this.lastTaskIndex).toString(), worker, {
maxRetries: options?.maxRetries ?? this.options.maxTaskRetries,
timeout: options?.timeout ?? this.options.taskTimeout,
startupTimeout: options?.startupTimeout ?? this.options.startupTaskTimeout,
activityReadySetupFunctions: this.activityReadySetupFunctions,
});
this.taskQueue.addToEnd(task);
Expand Down
23 changes: 22 additions & 1 deletion src/task.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Task, TaskState } from "./task";
import { Activity, Result, ResultState } from "@golem-sdk/golem-js";
import { Activity, GolemTimeoutError, Result, ResultState } from "@golem-sdk/golem-js";
import { instance, mock } from "@johanblumenberg/ts-mockito";
import { sleep } from "./utils";

describe("Task", function () {
const worker = async () => null;
Expand Down Expand Up @@ -52,4 +53,24 @@ describe("Task", function () {
task.stop(undefined, error, true);
expect(task.getState()).toEqual(TaskState.Retry);
});

it("should stop the task with a timeout error if the task does not complete within the specified time", async () => {
const task = new Task<unknown>("1", worker, { timeout: 1, maxRetries: 0 });
task.start(activity);
await sleep(2, true);
expect(task.getError()).toEqual(new GolemTimeoutError("Task 1 timeout."));
expect(task.getState() === TaskState.Rejected);
});

it("should stop the task with a timeout error if the task does not started within the specified time", async () => {
const task = new Task<unknown>("1", worker, { startupTimeout: 1, maxRetries: 0 });
task.init();
await sleep(2, true);
expect(task.getError()).toEqual(
new GolemTimeoutError(
"Task startup 1 timeout. Failed to sign an agreement with the provider within the specified time",
),
);
expect(task.getState() === TaskState.Rejected);
});
});
21 changes: 20 additions & 1 deletion src/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ export enum TaskState {
export type TaskOptions = {
/** maximum number of retries if task failed due to provider reason, default = 5 */
maxRetries?: number;
/** timeout in ms for task execution, including retries, default = 300_000 (5min) */
/** timeout in ms for task execution, measured for one attempt from start to stop, default = 300_000 (5min) */
timeout?: number;
/** timeout in ms for task startup, measured from initialization to start, default = 120_000 (2min) */
startupTimeout?: number;
/** array of setup functions to run on each activity */
activityReadySetupFunctions?: Worker<unknown>[];
};
Expand All @@ -37,6 +39,7 @@ export type TaskDetails = {
const DEFAULTS = {
MAX_RETRIES: 5,
TIMEOUT: 1000 * 60 * 5,
STARTUP_TIMEOUT: 1000 * 60 * 2,
};

/**
Expand All @@ -51,7 +54,9 @@ export class Task<OutputType = unknown> implements QueueableTask {
private retriesCount = 0;
private listeners = new Set<(state: TaskState) => void>();
private timeoutId?: NodeJS.Timeout;
private startupTimeoutId?: NodeJS.Timeout;
private readonly timeout: number;
private readonly startupTimeout: number;
private readonly maxRetries: number;
private readonly activityReadySetupFunctions: Worker<unknown>[];
private activity?: Activity;
Expand All @@ -63,6 +68,7 @@ export class Task<OutputType = unknown> implements QueueableTask {
options?: TaskOptions,
) {
this.timeout = options?.timeout ?? DEFAULTS.TIMEOUT;
this.startupTimeout = options?.startupTimeout ?? DEFAULTS.STARTUP_TIMEOUT;
this.maxRetries = options?.maxRetries ?? DEFAULTS.MAX_RETRIES;
this.activityReadySetupFunctions = options?.activityReadySetupFunctions ?? [];
if (this.maxRetries < 0) {
Expand All @@ -79,10 +85,22 @@ export class Task<OutputType = unknown> implements QueueableTask {
}
init() {
this.state = TaskState.Queued;
this.startupTimeoutId = setTimeout(
() =>
this.stop(
undefined,
new GolemTimeoutError(
`Task startup ${this.id} timeout. Failed to sign an agreement with the provider within the specified time`,
),
true,
),
this.startupTimeout,
);
}

start(activity: Activity, networkNode?: NetworkNode) {
this.state = TaskState.Pending;
clearTimeout(this.startupTimeoutId);
this.activity = activity;
this.networkNode = networkNode;
this.listeners.forEach((listener) => listener(this.state));
Expand All @@ -96,6 +114,7 @@ export class Task<OutputType = unknown> implements QueueableTask {
return;
}
clearTimeout(this.timeoutId);
clearTimeout(this.startupTimeoutId);
if (error) {
this.error = error;
if (retry && this.retriesCount < this.maxRetries) {
Expand Down

0 comments on commit 7c72f73

Please sign in to comment.