From de94a897f63da778818ae7b055614dfa676b8413 Mon Sep 17 00:00:00 2001 From: Mihael Konjevic Date: Sat, 27 Jan 2024 23:08:42 +0100 Subject: [PATCH] feat: process queued items concurrently Queued items are now processed concurrently, and state changes are emitted more often. BREAKING CHANGE: If a user provides one of the activity functions (for workflows, tasks or work items), state change will be emitted immediatelly after user "advances" the state (manually calls `startTask`, `completeWorkItem` or one of the similar functions. Whatever was enqueued during the current "turn" will be processed concurrently. For instance if you initialize and enqueue start of multiple work items, they will be started concurrently. Previously, they were processed sequentially. --- src/Service.ts | 70 ++-- .../__snapshots__/concurrency.test.ts.snap | 378 ++++++++++++++++++ src/__tests__/concurrency.test.ts | 70 ++++ src/elements/CompositeTask.ts | 18 +- src/elements/Task.ts | 53 ++- src/elements/Workflow.ts | 22 +- src/types.ts | 1 + 7 files changed, 564 insertions(+), 48 deletions(-) create mode 100644 src/__tests__/__snapshots__/concurrency.test.ts.snap create mode 100644 src/__tests__/concurrency.test.ts diff --git a/src/Service.ts b/src/Service.ts index ab3d50f..84589de 100644 --- a/src/Service.ts +++ b/src/Service.ts @@ -1,4 +1,4 @@ -import { Effect, Match, Option, Queue, pipe } from 'effect'; +import { Chunk, Effect, Match, Option, Queue, pipe } from 'effect'; import { Get } from 'type-fest'; import { State } from './State.js'; @@ -613,42 +613,51 @@ export class Service< const self = this; return Effect.gen(function* ($) { - yield* $(self.emitStateChanges()); while (true) { - const item = yield* $( - Queue.poll(self.queue), - Effect.map(Option.getOrNull) + yield* $(self.emitStateChanges()); + + const queued = yield* $( + Queue.takeAll(self.queue), + Effect.map(Chunk.toReadonlyArray) ); - if (item === null) { + if (queued.length === 0) { return; } - const match = pipe( - Match.type(), - Match.when({ type: 'startTask' }, ({ path, input }) => - self.unsafeStartTask(path, input, false) - ), - Match.when({ type: 'startWorkflow' }, ({ path, input }) => - self.unsafeStartWorkflow(path, input, false) - ), - Match.when({ type: 'startWorkItem' }, ({ path, input }) => - self.unsafeStartWorkItem(path, input, false) - ), - Match.when({ type: 'completeWorkItem' }, ({ path, input }) => - self.unsafeCompleteWorkItem(path, input, false) - ), - Match.when({ type: 'cancelWorkItem' }, ({ path, input }) => - self.unsafeCancelWorkItem(path, input, false) - ), - Match.when({ type: 'failWorkItem' }, ({ path, input }) => - self.unsafeFailWorkItem(path, input, false) - ), - Match.exhaustive - ); + const queuedFx = queued.map((item) => { + const match = pipe( + Match.type(), + Match.when({ type: 'startTask' }, ({ path, input }) => + self.unsafeStartTask(path, input, false) + ), + Match.when({ type: 'startWorkflow' }, ({ path, input }) => + self.unsafeStartWorkflow(path, input, false) + ), + Match.when({ type: 'startWorkItem' }, ({ path, input }) => + self.unsafeStartWorkItem(path, input, false) + ), + Match.when({ type: 'completeWorkItem' }, ({ path, input }) => + self.unsafeCompleteWorkItem(path, input, false) + ), + Match.when({ type: 'cancelWorkItem' }, ({ path, input }) => + self.unsafeCancelWorkItem(path, input, false) + ), + Match.when({ type: 'failWorkItem' }, ({ path, input }) => + self.unsafeFailWorkItem(path, input, false) + ), + Match.exhaustive + ); - yield* $(match(item)); - yield* $(self.emitStateChanges()); + return Effect.gen(function* ($) { + yield* $(match(item)); + yield* $(self.emitStateChanges()); + }); + }); + + yield* $( + Effect.all(queuedFx, { concurrency: 'inherit', discard: true }) + ); } }); } @@ -664,6 +673,7 @@ export class Service< const { queue, state } = this; return { ...input, + emitStateChanges: () => this.emitStateChanges(), defaultActivityPayload: { getWorkflowContext() { return state diff --git a/src/__tests__/__snapshots__/concurrency.test.ts.snap b/src/__tests__/__snapshots__/concurrency.test.ts.snap new file mode 100644 index 0000000..0362a90 --- /dev/null +++ b/src/__tests__/__snapshots__/concurrency.test.ts.snap @@ -0,0 +1,378 @@ +// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html + +exports[`will start work items concurrently and emit state change on each work item start 1`] = ` +{ + "conditions": [ + { + "marking": 0, + "name": "start", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + { + "marking": 1, + "name": "end", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + ], + "tasks": [ + { + "generation": 1, + "name": "t1", + "state": "completed", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + ], + "workItems": [ + { + "id": "workItem-1", + "payload": 1, + "state": "completed", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + { + "id": "workItem-2", + "payload": 2, + "state": "completed", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + { + "id": "workItem-3", + "payload": 3, + "state": "completed", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + ], + "workflows": [ + { + "context": undefined, + "id": "workflow-1", + "name": "activities", + "parent": null, + "state": "completed", + }, + ], +} +`; + +exports[`will start work items concurrently and emit state change on each work item start 2`] = ` +[ + [ + { + "change": { + "type": "WORKFLOW_INITIALIZED", + "workflow": { + "context": undefined, + "id": "workflow-1", + "name": "activities", + "parent": null, + "state": "initialized", + }, + }, + "getState": [Function], + }, + { + "change": { + "task": { + "generation": 0, + "name": "t1", + "state": "disabled", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + "type": "TASK_INITIALIZED", + }, + "getState": [Function], + }, + { + "change": { + "condition": { + "marking": 0, + "name": "start", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + "type": "CONDITION_INITIALIZED", + }, + "getState": [Function], + }, + { + "change": { + "condition": { + "marking": 0, + "name": "end", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + "type": "CONDITION_INITIALIZED", + }, + "getState": [Function], + }, + ], + [ + { + "change": { + "type": "WORKFLOW_STATE_UPDATED", + "workflow": { + "context": undefined, + "id": "workflow-1", + "name": "activities", + "parent": null, + "state": "started", + }, + }, + "getState": [Function], + }, + { + "change": { + "condition": { + "marking": 1, + "name": "start", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + "type": "CONDITION_MARKING_UPDATED", + }, + "getState": [Function], + }, + { + "change": { + "task": { + "generation": 0, + "name": "t1", + "state": "enabled", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + "type": "TASK_STATE_UPDATED", + }, + "getState": [Function], + }, + ], + [ + { + "change": { + "task": { + "generation": 1, + "name": "t1", + "state": "started", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + "type": "TASK_STATE_UPDATED", + }, + "getState": [Function], + }, + { + "change": { + "condition": { + "marking": 0, + "name": "start", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + "type": "CONDITION_MARKING_UPDATED", + }, + "getState": [Function], + }, + ], + [ + { + "change": { + "type": "WORK_ITEM_INITIALIZED", + "workItem": { + "id": "workItem-1", + "payload": 1, + "state": "initialized", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + }, + "getState": [Function], + }, + { + "change": { + "type": "WORK_ITEM_INITIALIZED", + "workItem": { + "id": "workItem-2", + "payload": 2, + "state": "initialized", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + }, + "getState": [Function], + }, + { + "change": { + "type": "WORK_ITEM_INITIALIZED", + "workItem": { + "id": "workItem-3", + "payload": 3, + "state": "initialized", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + }, + "getState": [Function], + }, + ], + [ + { + "change": { + "type": "WORK_ITEM_STATE_UPDATED", + "workItem": { + "id": "workItem-1", + "payload": 1, + "state": "started", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + }, + "getState": [Function], + }, + ], + [ + { + "change": { + "type": "WORK_ITEM_STATE_UPDATED", + "workItem": { + "id": "workItem-2", + "payload": 2, + "state": "started", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + }, + "getState": [Function], + }, + ], + [ + { + "change": { + "type": "WORK_ITEM_STATE_UPDATED", + "workItem": { + "id": "workItem-3", + "payload": 3, + "state": "started", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + }, + "getState": [Function], + }, + ], + [ + { + "change": { + "type": "WORK_ITEM_STATE_UPDATED", + "workItem": { + "id": "workItem-1", + "payload": 1, + "state": "completed", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + }, + "getState": [Function], + }, + ], + [ + { + "change": { + "type": "WORK_ITEM_STATE_UPDATED", + "workItem": { + "id": "workItem-2", + "payload": 2, + "state": "completed", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + }, + "getState": [Function], + }, + ], + [ + { + "change": { + "type": "WORK_ITEM_STATE_UPDATED", + "workItem": { + "id": "workItem-3", + "payload": 3, + "state": "completed", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + }, + "getState": [Function], + }, + { + "change": { + "task": { + "generation": 1, + "name": "t1", + "state": "completed", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + "type": "TASK_STATE_UPDATED", + }, + "getState": [Function], + }, + { + "change": { + "condition": { + "marking": 1, + "name": "end", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + "type": "CONDITION_MARKING_UPDATED", + }, + "getState": [Function], + }, + { + "change": { + "type": "WORKFLOW_STATE_UPDATED", + "workflow": { + "context": undefined, + "id": "workflow-1", + "name": "activities", + "parent": null, + "state": "completed", + }, + }, + "getState": [Function], + }, + ], +] +`; diff --git a/src/__tests__/concurrency.test.ts b/src/__tests__/concurrency.test.ts new file mode 100644 index 0000000..e361570 --- /dev/null +++ b/src/__tests__/concurrency.test.ts @@ -0,0 +1,70 @@ +import { Effect } from 'effect'; +import { it } from 'vitest'; + +import { Builder, IdGenerator, Service } from '../index.js'; +import { getEnabledTaskNames, makeIdGenerator } from './shared.js'; + +const workflowDefinition = Builder.workflow() + .withName('activities') + .startCondition('start') + .task('t1', (t) => + t() + .withWorkItem((w) => + w().onStart(({ startWorkItem, getWorkItem }) => + Effect.gen(function* ($) { + const { enqueueCompleteWorkItem } = yield* $(startWorkItem()); + const workItem = yield* $(getWorkItem()); + yield* $(Effect.sleep(`${workItem.payload * 100} millis`)); + yield* $(enqueueCompleteWorkItem()); + }) + ) + ) + .onEnable(({ enableTask }) => + Effect.gen(function* ($) { + const { enqueueStartTask } = yield* $(enableTask()); + yield* $(enqueueStartTask()); + }) + ) + .onStart(({ startTask }) => + Effect.gen(function* ($) { + const { initializeWorkItem, enqueueStartWorkItem } = yield* $( + startTask() + ); + for (const i of [1, 2, 3]) { + const { id } = yield* $(initializeWorkItem(i)); + yield* $(enqueueStartWorkItem(id)); + } + }) + ) + ) + .endCondition('end') + .connectCondition('start', (to) => to.task('t1')) + .connectTask('t1', (to) => to.condition('end')); + +it('will start work items concurrently and emit state change on each work item start', async ({ + expect, +}) => { + const program = Effect.gen(function* ($) { + const idGenerator = makeIdGenerator(); + const logs: unknown[] = []; + + const service = yield* $( + workflowDefinition.build(), + Effect.flatMap((workflow) => Service.initialize(workflow)), + Effect.provideService(IdGenerator, idGenerator) + ); + + service.onStateChange((changes) => { + return Effect.succeed(logs.push(changes)); + }); + + yield* $(service.start()); + const state = yield* $(service.getState()); + expect(state).toMatchSnapshot(); + expect(logs).toMatchSnapshot(); + expect(getEnabledTaskNames(state)).toEqual(new Set()); + expect(state.workflows[0]?.state).toEqual('completed'); + }); + + await Effect.runPromise(program); +}); diff --git a/src/elements/CompositeTask.ts b/src/elements/CompositeTask.ts index 1e989f6..b2c01bf 100644 --- a/src/elements/CompositeTask.ts +++ b/src/elements/CompositeTask.ts @@ -84,6 +84,7 @@ export class CompositeTask extends BaseTask { enableTask() { return pipe( perform, + Effect.tap(() => executionContext.emitStateChanges()), Effect.map(() => ({ enqueueStartTask })) ); }, @@ -126,7 +127,10 @@ export class CompositeTask extends BaseTask { self.activities.onDisable({ ...executionContext.defaultActivityPayload, disableTask() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }) as Effect.Effect ); @@ -211,6 +215,7 @@ export class CompositeTask extends BaseTask { startTask() { return pipe( perform, + Effect.tap(() => executionContext.emitStateChanges()), Effect.map(() => ({ initializeWorkflow, enqueueStartWorkflow, @@ -306,6 +311,7 @@ export class CompositeTask extends BaseTask { completeTask() { return pipe( perform, + Effect.tap(() => executionContext.emitStateChanges()), Effect.provideService(State, stateManager), Effect.provideService(ExecutionContext, executionContext) ); @@ -369,7 +375,10 @@ export class CompositeTask extends BaseTask { self.activities.onCancel({ ...executionContext.defaultActivityPayload, cancelTask() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }) as Effect.Effect ); @@ -430,7 +439,10 @@ export class CompositeTask extends BaseTask { self.activities.onFail({ ...executionContext.defaultActivityPayload, failTask() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }) as Effect.Effect ); diff --git a/src/elements/Task.ts b/src/elements/Task.ts index 852dd0d..d71ae96 100644 --- a/src/elements/Task.ts +++ b/src/elements/Task.ts @@ -90,6 +90,7 @@ export class Task extends BaseTask { enableTask() { return pipe( perform, + Effect.tap(() => executionContext.emitStateChanges()), Effect.map(() => ({ enqueueStartTask })) ); }, @@ -132,7 +133,10 @@ export class Task extends BaseTask { self.activities.onDisable({ ...executionContext.defaultActivityPayload, disableTask() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }) as Effect.Effect ); @@ -206,6 +210,7 @@ export class Task extends BaseTask { startTask() { return pipe( perform, + Effect.tap(() => executionContext.emitStateChanges()), Effect.map(() => ({ initializeWorkItem, enqueueStartWorkItem, @@ -303,7 +308,10 @@ export class Task extends BaseTask { self.activities.onComplete({ ...executionContext.defaultActivityPayload, completeTask() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }) as Effect.Effect ); @@ -350,7 +358,7 @@ export class Task extends BaseTask { workItems.map(({ id }) => self.cancelWorkItem(workflowId, id, undefined, false) ), - { batching: true } + { concurrency: 'inherit' } ) ); yield* $(stateManager.cancelTask(workflowId, self.name)); @@ -365,7 +373,10 @@ export class Task extends BaseTask { self.activities.onCancel({ ...executionContext.defaultActivityPayload, cancelTask() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }) as Effect.Effect ); @@ -412,7 +423,7 @@ export class Task extends BaseTask { workItems.map(({ id }) => self.cancelWorkItem(workflowId, id, undefined, false) ), - { batching: true } + { concurrency: 'inherit' } ) ); yield* $(stateManager.failTask(workflowId, self.name)); @@ -428,7 +439,10 @@ export class Task extends BaseTask { self.activities.onFail({ ...executionContext.defaultActivityPayload, failTask() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }) as Effect.Effect ); @@ -574,7 +588,10 @@ export class Task extends BaseTask { ...workItemActivityPayload, startWorkItem() { return Effect.gen(function* ($) { - yield* $(perform); + yield* $( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); return { enqueueCompleteWorkItem(input?: unknown) { return executionContext.queue.offer({ @@ -622,6 +639,8 @@ export class Task extends BaseTask { yield* $(self.ensureIsStarted(workflowId)); const stateManager = yield* $(State); + const executionContext = yield* $(ExecutionContext); + const perform = yield* $( Effect.once( Effect.gen(function* ($) { @@ -646,7 +665,10 @@ export class Task extends BaseTask { { ...workItemActivityPayload, completeWorkItem() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }, input @@ -671,6 +693,8 @@ export class Task extends BaseTask { yield* $(self.ensureIsStarted(workflowId)); const stateManager = yield* $(State); + const executionContext = yield* $(ExecutionContext); + const perform = yield* $( Effect.once( Effect.gen(function* ($) { @@ -695,7 +719,10 @@ export class Task extends BaseTask { { ...workItemActivityPayload, cancelWorkItem() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }, input @@ -722,6 +749,8 @@ export class Task extends BaseTask { yield* $(self.ensureIsStarted(workflowId)); const stateManager = yield* $(State); + const executionContext = yield* $(ExecutionContext); + const perform = yield* $( Effect.once( Effect.gen(function* ($) { @@ -746,7 +775,10 @@ export class Task extends BaseTask { { ...workItemActivityPayload, failWorkItem() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }, input @@ -754,6 +786,7 @@ export class Task extends BaseTask { ); yield* $(perform); + if (autoFailTask) { yield* $(self.maybeFail(workflowId)); } diff --git a/src/elements/Workflow.ts b/src/elements/Workflow.ts index 27c8914..b44ca1b 100644 --- a/src/elements/Workflow.ts +++ b/src/elements/Workflow.ts @@ -1,4 +1,4 @@ -import { Effect } from 'effect'; +import { Effect, pipe } from 'effect'; import { State } from '../State.js'; import { E2WFOJNet } from '../e2wfojnet.js'; @@ -140,7 +140,10 @@ export class Workflow< { ...defaultActivityPayload, startWorkflow() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }, input @@ -206,7 +209,10 @@ export class Workflow< self.activities.onComplete({ ...defaultActivityPayload, completeWorkflow() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }) as Effect.Effect ); @@ -280,7 +286,10 @@ export class Workflow< { ...defaultActivityPayload, cancelWorkflow() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }, input @@ -354,7 +363,10 @@ export class Workflow< { ...defaultActivityPayload, failWorkflow() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }, input diff --git a/src/types.ts b/src/types.ts index 4dd1999..cfc25d6 100644 --- a/src/types.ts +++ b/src/types.ts @@ -530,6 +530,7 @@ export type ExecutionContextQueueItem = export interface ExecutionContext { path: readonly string[]; workflowId: WorkflowId; + emitStateChanges: () => Effect.Effect; defaultActivityPayload: { getWorkflowContext: () => Effect.Effect< never,