diff --git a/src/Service.ts b/src/Service.ts index ab3d50f..84589de 100644 --- a/src/Service.ts +++ b/src/Service.ts @@ -1,4 +1,4 @@ -import { Effect, Match, Option, Queue, pipe } from 'effect'; +import { Chunk, Effect, Match, Option, Queue, pipe } from 'effect'; import { Get } from 'type-fest'; import { State } from './State.js'; @@ -613,42 +613,51 @@ export class Service< const self = this; return Effect.gen(function* ($) { - yield* $(self.emitStateChanges()); while (true) { - const item = yield* $( - Queue.poll(self.queue), - Effect.map(Option.getOrNull) + yield* $(self.emitStateChanges()); + + const queued = yield* $( + Queue.takeAll(self.queue), + Effect.map(Chunk.toReadonlyArray) ); - if (item === null) { + if (queued.length === 0) { return; } - const match = pipe( - Match.type(), - Match.when({ type: 'startTask' }, ({ path, input }) => - self.unsafeStartTask(path, input, false) - ), - Match.when({ type: 'startWorkflow' }, ({ path, input }) => - self.unsafeStartWorkflow(path, input, false) - ), - Match.when({ type: 'startWorkItem' }, ({ path, input }) => - self.unsafeStartWorkItem(path, input, false) - ), - Match.when({ type: 'completeWorkItem' }, ({ path, input }) => - self.unsafeCompleteWorkItem(path, input, false) - ), - Match.when({ type: 'cancelWorkItem' }, ({ path, input }) => - self.unsafeCancelWorkItem(path, input, false) - ), - Match.when({ type: 'failWorkItem' }, ({ path, input }) => - self.unsafeFailWorkItem(path, input, false) - ), - Match.exhaustive - ); + const queuedFx = queued.map((item) => { + const match = pipe( + Match.type(), + Match.when({ type: 'startTask' }, ({ path, input }) => + self.unsafeStartTask(path, input, false) + ), + Match.when({ type: 'startWorkflow' }, ({ path, input }) => + self.unsafeStartWorkflow(path, input, false) + ), + Match.when({ type: 'startWorkItem' }, ({ path, input }) => + self.unsafeStartWorkItem(path, input, false) + ), + Match.when({ type: 'completeWorkItem' }, ({ path, input }) => + self.unsafeCompleteWorkItem(path, input, false) + ), + Match.when({ type: 'cancelWorkItem' }, ({ path, input }) => + self.unsafeCancelWorkItem(path, input, false) + ), + Match.when({ type: 'failWorkItem' }, ({ path, input }) => + self.unsafeFailWorkItem(path, input, false) + ), + Match.exhaustive + ); - yield* $(match(item)); - yield* $(self.emitStateChanges()); + return Effect.gen(function* ($) { + yield* $(match(item)); + yield* $(self.emitStateChanges()); + }); + }); + + yield* $( + Effect.all(queuedFx, { concurrency: 'inherit', discard: true }) + ); } }); } @@ -664,6 +673,7 @@ export class Service< const { queue, state } = this; return { ...input, + emitStateChanges: () => this.emitStateChanges(), defaultActivityPayload: { getWorkflowContext() { return state diff --git a/src/__tests__/__snapshots__/concurrency.test.ts.snap b/src/__tests__/__snapshots__/concurrency.test.ts.snap new file mode 100644 index 0000000..0362a90 --- /dev/null +++ b/src/__tests__/__snapshots__/concurrency.test.ts.snap @@ -0,0 +1,378 @@ +// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html + +exports[`will start work items concurrently and emit state change on each work item start 1`] = ` +{ + "conditions": [ + { + "marking": 0, + "name": "start", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + { + "marking": 1, + "name": "end", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + ], + "tasks": [ + { + "generation": 1, + "name": "t1", + "state": "completed", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + ], + "workItems": [ + { + "id": "workItem-1", + "payload": 1, + "state": "completed", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + { + "id": "workItem-2", + "payload": 2, + "state": "completed", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + { + "id": "workItem-3", + "payload": 3, + "state": "completed", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + ], + "workflows": [ + { + "context": undefined, + "id": "workflow-1", + "name": "activities", + "parent": null, + "state": "completed", + }, + ], +} +`; + +exports[`will start work items concurrently and emit state change on each work item start 2`] = ` +[ + [ + { + "change": { + "type": "WORKFLOW_INITIALIZED", + "workflow": { + "context": undefined, + "id": "workflow-1", + "name": "activities", + "parent": null, + "state": "initialized", + }, + }, + "getState": [Function], + }, + { + "change": { + "task": { + "generation": 0, + "name": "t1", + "state": "disabled", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + "type": "TASK_INITIALIZED", + }, + "getState": [Function], + }, + { + "change": { + "condition": { + "marking": 0, + "name": "start", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + "type": "CONDITION_INITIALIZED", + }, + "getState": [Function], + }, + { + "change": { + "condition": { + "marking": 0, + "name": "end", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + "type": "CONDITION_INITIALIZED", + }, + "getState": [Function], + }, + ], + [ + { + "change": { + "type": "WORKFLOW_STATE_UPDATED", + "workflow": { + "context": undefined, + "id": "workflow-1", + "name": "activities", + "parent": null, + "state": "started", + }, + }, + "getState": [Function], + }, + { + "change": { + "condition": { + "marking": 1, + "name": "start", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + "type": "CONDITION_MARKING_UPDATED", + }, + "getState": [Function], + }, + { + "change": { + "task": { + "generation": 0, + "name": "t1", + "state": "enabled", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + "type": "TASK_STATE_UPDATED", + }, + "getState": [Function], + }, + ], + [ + { + "change": { + "task": { + "generation": 1, + "name": "t1", + "state": "started", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + "type": "TASK_STATE_UPDATED", + }, + "getState": [Function], + }, + { + "change": { + "condition": { + "marking": 0, + "name": "start", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + "type": "CONDITION_MARKING_UPDATED", + }, + "getState": [Function], + }, + ], + [ + { + "change": { + "type": "WORK_ITEM_INITIALIZED", + "workItem": { + "id": "workItem-1", + "payload": 1, + "state": "initialized", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + }, + "getState": [Function], + }, + { + "change": { + "type": "WORK_ITEM_INITIALIZED", + "workItem": { + "id": "workItem-2", + "payload": 2, + "state": "initialized", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + }, + "getState": [Function], + }, + { + "change": { + "type": "WORK_ITEM_INITIALIZED", + "workItem": { + "id": "workItem-3", + "payload": 3, + "state": "initialized", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + }, + "getState": [Function], + }, + ], + [ + { + "change": { + "type": "WORK_ITEM_STATE_UPDATED", + "workItem": { + "id": "workItem-1", + "payload": 1, + "state": "started", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + }, + "getState": [Function], + }, + ], + [ + { + "change": { + "type": "WORK_ITEM_STATE_UPDATED", + "workItem": { + "id": "workItem-2", + "payload": 2, + "state": "started", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + }, + "getState": [Function], + }, + ], + [ + { + "change": { + "type": "WORK_ITEM_STATE_UPDATED", + "workItem": { + "id": "workItem-3", + "payload": 3, + "state": "started", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + }, + "getState": [Function], + }, + ], + [ + { + "change": { + "type": "WORK_ITEM_STATE_UPDATED", + "workItem": { + "id": "workItem-1", + "payload": 1, + "state": "completed", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + }, + "getState": [Function], + }, + ], + [ + { + "change": { + "type": "WORK_ITEM_STATE_UPDATED", + "workItem": { + "id": "workItem-2", + "payload": 2, + "state": "completed", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + }, + "getState": [Function], + }, + ], + [ + { + "change": { + "type": "WORK_ITEM_STATE_UPDATED", + "workItem": { + "id": "workItem-3", + "payload": 3, + "state": "completed", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + }, + "getState": [Function], + }, + { + "change": { + "task": { + "generation": 1, + "name": "t1", + "state": "completed", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + "type": "TASK_STATE_UPDATED", + }, + "getState": [Function], + }, + { + "change": { + "condition": { + "marking": 1, + "name": "end", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + "type": "CONDITION_MARKING_UPDATED", + }, + "getState": [Function], + }, + { + "change": { + "type": "WORKFLOW_STATE_UPDATED", + "workflow": { + "context": undefined, + "id": "workflow-1", + "name": "activities", + "parent": null, + "state": "completed", + }, + }, + "getState": [Function], + }, + ], +] +`; diff --git a/src/__tests__/concurrency.test.ts b/src/__tests__/concurrency.test.ts new file mode 100644 index 0000000..e361570 --- /dev/null +++ b/src/__tests__/concurrency.test.ts @@ -0,0 +1,70 @@ +import { Effect } from 'effect'; +import { it } from 'vitest'; + +import { Builder, IdGenerator, Service } from '../index.js'; +import { getEnabledTaskNames, makeIdGenerator } from './shared.js'; + +const workflowDefinition = Builder.workflow() + .withName('activities') + .startCondition('start') + .task('t1', (t) => + t() + .withWorkItem((w) => + w().onStart(({ startWorkItem, getWorkItem }) => + Effect.gen(function* ($) { + const { enqueueCompleteWorkItem } = yield* $(startWorkItem()); + const workItem = yield* $(getWorkItem()); + yield* $(Effect.sleep(`${workItem.payload * 100} millis`)); + yield* $(enqueueCompleteWorkItem()); + }) + ) + ) + .onEnable(({ enableTask }) => + Effect.gen(function* ($) { + const { enqueueStartTask } = yield* $(enableTask()); + yield* $(enqueueStartTask()); + }) + ) + .onStart(({ startTask }) => + Effect.gen(function* ($) { + const { initializeWorkItem, enqueueStartWorkItem } = yield* $( + startTask() + ); + for (const i of [1, 2, 3]) { + const { id } = yield* $(initializeWorkItem(i)); + yield* $(enqueueStartWorkItem(id)); + } + }) + ) + ) + .endCondition('end') + .connectCondition('start', (to) => to.task('t1')) + .connectTask('t1', (to) => to.condition('end')); + +it('will start work items concurrently and emit state change on each work item start', async ({ + expect, +}) => { + const program = Effect.gen(function* ($) { + const idGenerator = makeIdGenerator(); + const logs: unknown[] = []; + + const service = yield* $( + workflowDefinition.build(), + Effect.flatMap((workflow) => Service.initialize(workflow)), + Effect.provideService(IdGenerator, idGenerator) + ); + + service.onStateChange((changes) => { + return Effect.succeed(logs.push(changes)); + }); + + yield* $(service.start()); + const state = yield* $(service.getState()); + expect(state).toMatchSnapshot(); + expect(logs).toMatchSnapshot(); + expect(getEnabledTaskNames(state)).toEqual(new Set()); + expect(state.workflows[0]?.state).toEqual('completed'); + }); + + await Effect.runPromise(program); +}); diff --git a/src/elements/CompositeTask.ts b/src/elements/CompositeTask.ts index 1e989f6..b2c01bf 100644 --- a/src/elements/CompositeTask.ts +++ b/src/elements/CompositeTask.ts @@ -84,6 +84,7 @@ export class CompositeTask extends BaseTask { enableTask() { return pipe( perform, + Effect.tap(() => executionContext.emitStateChanges()), Effect.map(() => ({ enqueueStartTask })) ); }, @@ -126,7 +127,10 @@ export class CompositeTask extends BaseTask { self.activities.onDisable({ ...executionContext.defaultActivityPayload, disableTask() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }) as Effect.Effect ); @@ -211,6 +215,7 @@ export class CompositeTask extends BaseTask { startTask() { return pipe( perform, + Effect.tap(() => executionContext.emitStateChanges()), Effect.map(() => ({ initializeWorkflow, enqueueStartWorkflow, @@ -306,6 +311,7 @@ export class CompositeTask extends BaseTask { completeTask() { return pipe( perform, + Effect.tap(() => executionContext.emitStateChanges()), Effect.provideService(State, stateManager), Effect.provideService(ExecutionContext, executionContext) ); @@ -369,7 +375,10 @@ export class CompositeTask extends BaseTask { self.activities.onCancel({ ...executionContext.defaultActivityPayload, cancelTask() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }) as Effect.Effect ); @@ -430,7 +439,10 @@ export class CompositeTask extends BaseTask { self.activities.onFail({ ...executionContext.defaultActivityPayload, failTask() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }) as Effect.Effect ); diff --git a/src/elements/Task.ts b/src/elements/Task.ts index 852dd0d..d71ae96 100644 --- a/src/elements/Task.ts +++ b/src/elements/Task.ts @@ -90,6 +90,7 @@ export class Task extends BaseTask { enableTask() { return pipe( perform, + Effect.tap(() => executionContext.emitStateChanges()), Effect.map(() => ({ enqueueStartTask })) ); }, @@ -132,7 +133,10 @@ export class Task extends BaseTask { self.activities.onDisable({ ...executionContext.defaultActivityPayload, disableTask() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }) as Effect.Effect ); @@ -206,6 +210,7 @@ export class Task extends BaseTask { startTask() { return pipe( perform, + Effect.tap(() => executionContext.emitStateChanges()), Effect.map(() => ({ initializeWorkItem, enqueueStartWorkItem, @@ -303,7 +308,10 @@ export class Task extends BaseTask { self.activities.onComplete({ ...executionContext.defaultActivityPayload, completeTask() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }) as Effect.Effect ); @@ -350,7 +358,7 @@ export class Task extends BaseTask { workItems.map(({ id }) => self.cancelWorkItem(workflowId, id, undefined, false) ), - { batching: true } + { concurrency: 'inherit' } ) ); yield* $(stateManager.cancelTask(workflowId, self.name)); @@ -365,7 +373,10 @@ export class Task extends BaseTask { self.activities.onCancel({ ...executionContext.defaultActivityPayload, cancelTask() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }) as Effect.Effect ); @@ -412,7 +423,7 @@ export class Task extends BaseTask { workItems.map(({ id }) => self.cancelWorkItem(workflowId, id, undefined, false) ), - { batching: true } + { concurrency: 'inherit' } ) ); yield* $(stateManager.failTask(workflowId, self.name)); @@ -428,7 +439,10 @@ export class Task extends BaseTask { self.activities.onFail({ ...executionContext.defaultActivityPayload, failTask() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }) as Effect.Effect ); @@ -574,7 +588,10 @@ export class Task extends BaseTask { ...workItemActivityPayload, startWorkItem() { return Effect.gen(function* ($) { - yield* $(perform); + yield* $( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); return { enqueueCompleteWorkItem(input?: unknown) { return executionContext.queue.offer({ @@ -622,6 +639,8 @@ export class Task extends BaseTask { yield* $(self.ensureIsStarted(workflowId)); const stateManager = yield* $(State); + const executionContext = yield* $(ExecutionContext); + const perform = yield* $( Effect.once( Effect.gen(function* ($) { @@ -646,7 +665,10 @@ export class Task extends BaseTask { { ...workItemActivityPayload, completeWorkItem() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }, input @@ -671,6 +693,8 @@ export class Task extends BaseTask { yield* $(self.ensureIsStarted(workflowId)); const stateManager = yield* $(State); + const executionContext = yield* $(ExecutionContext); + const perform = yield* $( Effect.once( Effect.gen(function* ($) { @@ -695,7 +719,10 @@ export class Task extends BaseTask { { ...workItemActivityPayload, cancelWorkItem() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }, input @@ -722,6 +749,8 @@ export class Task extends BaseTask { yield* $(self.ensureIsStarted(workflowId)); const stateManager = yield* $(State); + const executionContext = yield* $(ExecutionContext); + const perform = yield* $( Effect.once( Effect.gen(function* ($) { @@ -746,7 +775,10 @@ export class Task extends BaseTask { { ...workItemActivityPayload, failWorkItem() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }, input @@ -754,6 +786,7 @@ export class Task extends BaseTask { ); yield* $(perform); + if (autoFailTask) { yield* $(self.maybeFail(workflowId)); } diff --git a/src/elements/Workflow.ts b/src/elements/Workflow.ts index 27c8914..b44ca1b 100644 --- a/src/elements/Workflow.ts +++ b/src/elements/Workflow.ts @@ -1,4 +1,4 @@ -import { Effect } from 'effect'; +import { Effect, pipe } from 'effect'; import { State } from '../State.js'; import { E2WFOJNet } from '../e2wfojnet.js'; @@ -140,7 +140,10 @@ export class Workflow< { ...defaultActivityPayload, startWorkflow() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }, input @@ -206,7 +209,10 @@ export class Workflow< self.activities.onComplete({ ...defaultActivityPayload, completeWorkflow() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }) as Effect.Effect ); @@ -280,7 +286,10 @@ export class Workflow< { ...defaultActivityPayload, cancelWorkflow() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }, input @@ -354,7 +363,10 @@ export class Workflow< { ...defaultActivityPayload, failWorkflow() { - return perform; + return pipe( + perform, + Effect.tap(() => executionContext.emitStateChanges()) + ); }, }, input diff --git a/src/types.ts b/src/types.ts index 4dd1999..cfc25d6 100644 --- a/src/types.ts +++ b/src/types.ts @@ -530,6 +530,7 @@ export type ExecutionContextQueueItem = export interface ExecutionContext { path: readonly string[]; workflowId: WorkflowId; + emitStateChanges: () => Effect.Effect; defaultActivityPayload: { getWorkflowContext: () => Effect.Effect< never,