diff --git a/src/Service.ts b/src/Service.ts index 9a6bd1b..ab3d50f 100644 --- a/src/Service.ts +++ b/src/Service.ts @@ -670,8 +670,21 @@ export class Service< .getWorkflow(input.workflowId) .pipe(Effect.map((w) => w.context)); }, - updateWorkflowContext(context: unknown) { - return state.updateWorkflowContext(input.workflowId, context); + updateWorkflowContext(contextOrUpdater: unknown) { + return Effect.gen(function* ($) { + if (typeof contextOrUpdater === 'function') { + const workflow = yield* $(state.getWorkflow(input.workflowId)); + return yield* $( + state.updateWorkflowContext( + input.workflowId, + contextOrUpdater(workflow.context) + ) + ); + } + return yield* $( + state.updateWorkflowContext(input.workflowId, contextOrUpdater) + ); + }); }, }, queue: { diff --git a/src/State.ts b/src/State.ts index b274460..ed01c5a 100644 --- a/src/State.ts +++ b/src/State.ts @@ -61,6 +61,15 @@ export interface State { taskName: TaskName ): Effect.Effect; + getTaskPath( + workflowId: WorkflowId, + taskName: TaskName + ): Effect.Effect< + never, + TaskDoesNotExistInStore | WorkflowDoesNotExist, + string[] + >; + getCondition( workflowId: WorkflowId, conditionName: ConditionName diff --git a/src/__tests__/__snapshots__/auto-advancing-tasks.test.ts.snap b/src/__tests__/__snapshots__/auto-advancing-tasks.test.ts.snap new file mode 100644 index 0000000..569e9f6 --- /dev/null +++ b/src/__tests__/__snapshots__/auto-advancing-tasks.test.ts.snap @@ -0,0 +1,62 @@ +// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html + +exports[`handles series of auto advancing tasks 1`] = ` +{ + "conditions": [ + { + "marking": 0, + "name": "start", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + { + "marking": 1, + "name": "end", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + { + "marking": 0, + "name": "implicit:t1->t2", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + ], + "tasks": [ + { + "generation": 1, + "name": "t1", + "state": "completed", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + { + "generation": 1, + "name": "t2", + "state": "completed", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + ], + "workItems": [ + { + "id": "workItem-1", + "payload": null, + "state": "completed", + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + ], + "workflows": [ + { + "context": undefined, + "id": "workflow-1", + "name": "activities", + "parent": null, + "state": "completed", + }, + ], +} +`; diff --git a/src/__tests__/__snapshots__/update-workflow-context.test.ts.snap b/src/__tests__/__snapshots__/update-workflow-context.test.ts.snap new file mode 100644 index 0000000..b524da3 --- /dev/null +++ b/src/__tests__/__snapshots__/update-workflow-context.test.ts.snap @@ -0,0 +1,223 @@ +// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html + +exports[`handles context update when updater function is passed 1`] = ` +{ + "conditions": [ + { + "marking": 0, + "name": "start", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + { + "marking": 0, + "name": "end", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + ], + "tasks": [ + { + "generation": 1, + "name": "t1", + "state": "started", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + ], + "workItems": [], + "workflows": [ + { + "context": { + "count": 2, + }, + "id": "workflow-1", + "name": "activities", + "parent": null, + "state": "started", + }, + ], +} +`; + +exports[`handles context update when value is passed 1`] = ` +{ + "conditions": [ + { + "marking": 0, + "name": "start", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + { + "marking": 0, + "name": "end", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + ], + "tasks": [ + { + "generation": 1, + "name": "t1", + "state": "started", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + ], + "workItems": [], + "workflows": [ + { + "context": { + "count": 2, + }, + "id": "workflow-1", + "name": "activities", + "parent": null, + "state": "started", + }, + ], +} +`; + +exports[`handles parent context update when updater function is passed 1`] = ` +{ + "conditions": [ + { + "marking": 0, + "name": "start", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + { + "marking": 0, + "name": "end", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + { + "marking": 1, + "name": "start", + "workflowId": "workflow-2", + "workflowName": "sub", + }, + { + "marking": 0, + "name": "end", + "workflowId": "workflow-2", + "workflowName": "sub", + }, + ], + "tasks": [ + { + "generation": 1, + "name": "t1", + "state": "started", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + { + "generation": 0, + "name": "subT1", + "state": "enabled", + "workflowId": "workflow-2", + "workflowName": "sub", + }, + ], + "workItems": [], + "workflows": [ + { + "context": { + "count": 1, + }, + "id": "workflow-1", + "name": "activities", + "parent": null, + "state": "started", + }, + { + "context": undefined, + "id": "workflow-2", + "name": "sub", + "parent": { + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + "state": "started", + }, + ], +} +`; + +exports[`handles parent context update when value is passed 1`] = ` +{ + "conditions": [ + { + "marking": 0, + "name": "start", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + { + "marking": 0, + "name": "end", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + { + "marking": 1, + "name": "start", + "workflowId": "workflow-2", + "workflowName": "sub", + }, + { + "marking": 0, + "name": "end", + "workflowId": "workflow-2", + "workflowName": "sub", + }, + ], + "tasks": [ + { + "generation": 1, + "name": "t1", + "state": "started", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + { + "generation": 0, + "name": "subT1", + "state": "enabled", + "workflowId": "workflow-2", + "workflowName": "sub", + }, + ], + "workItems": [], + "workflows": [ + { + "context": { + "count": 1, + }, + "id": "workflow-1", + "name": "activities", + "parent": null, + "state": "started", + }, + { + "context": undefined, + "id": "workflow-2", + "name": "sub", + "parent": { + "taskGeneration": 1, + "taskName": "t1", + "workflowId": "workflow-1", + "workflowName": "activities", + }, + "state": "started", + }, + ], +} +`; diff --git a/src/__tests__/auto-advancing-tasks.test.ts b/src/__tests__/auto-advancing-tasks.test.ts new file mode 100644 index 0000000..d8abc9f --- /dev/null +++ b/src/__tests__/auto-advancing-tasks.test.ts @@ -0,0 +1,74 @@ +import { Effect } from 'effect'; +import { it } from 'vitest'; + +import { Builder, IdGenerator, Service } from '../index.js'; +import { getEnabledTaskNames, makeIdGenerator } from './shared.js'; + +const workflowDefinition = Builder.workflow() + .withName('activities') + .startCondition('start') + .task('t1', (t) => + t() + .withWorkItem((w) => + w().onStart(({ startWorkItem }) => + Effect.gen(function* ($) { + const { enqueueCompleteWorkItem } = yield* $(startWorkItem()); + yield* $(enqueueCompleteWorkItem()); + }) + ) + ) + .onEnable(({ enableTask }) => + Effect.gen(function* ($) { + const { enqueueStartTask } = yield* $(enableTask()); + yield* $(enqueueStartTask()); + }) + ) + .onStart(({ startTask }) => + Effect.gen(function* ($) { + const { initializeWorkItem, enqueueStartWorkItem } = yield* $( + startTask() + ); + const { id } = yield* $(initializeWorkItem()); + yield* $(enqueueStartWorkItem(id)); + }) + ) + ) + .task('t2', (t) => + t() + .onEnable(({ enableTask }) => + Effect.gen(function* ($) { + const { enqueueStartTask } = yield* $(enableTask()); + yield* $(enqueueStartTask()); + }) + ) + .onStart(({ startTask }) => + Effect.gen(function* ($) { + yield* $(startTask()); + }) + ) + .withShouldComplete(() => Effect.succeed(true)) + ) + .endCondition('end') + .connectCondition('start', (to) => to.task('t1')) + .connectTask('t1', (to) => to.task('t2')) + .connectTask('t2', (to) => to.condition('end')); + +it('handles series of auto advancing tasks', ({ expect }) => { + const program = Effect.gen(function* ($) { + const idGenerator = makeIdGenerator(); + + const service = yield* $( + workflowDefinition.build(), + Effect.flatMap((workflow) => Service.initialize(workflow)), + Effect.provideService(IdGenerator, idGenerator) + ); + + yield* $(service.start()); + const state = yield* $(service.getState()); + expect(state).toMatchSnapshot(); + expect(getEnabledTaskNames(state)).toEqual(new Set()); + expect(state.workflows[0]?.state).toEqual('completed'); + }); + + Effect.runSync(program); +}); diff --git a/src/__tests__/update-workflow-context.test.ts b/src/__tests__/update-workflow-context.test.ts new file mode 100644 index 0000000..7aa7eef --- /dev/null +++ b/src/__tests__/update-workflow-context.test.ts @@ -0,0 +1,198 @@ +import { Effect } from 'effect'; +import { it } from 'vitest'; + +import { Builder, IdGenerator, Service } from '../index.js'; +import { makeIdGenerator } from './shared.js'; + +const workflowDefinition1 = Builder.workflow<{ count: number }>() + .withName('activities') + .startCondition('start') + .task('t1', (t) => + t().onEnable(({ enableTask, getWorkflowContext, updateWorkflowContext }) => + Effect.gen(function* ($) { + const { enqueueStartTask } = yield* $(enableTask()); + yield* $(enqueueStartTask()); + const workflowContext = yield* $(getWorkflowContext()); + const { count } = workflowContext; + yield* $(updateWorkflowContext({ count: count + 1 })); + }) + ) + ) + .endCondition('end') + .connectCondition('start', (to) => to.task('t1')) + .connectTask('t1', (to) => to.condition('end')) + .onStart(({ getWorkflowContext, updateWorkflowContext }) => + Effect.gen(function* ($) { + const workflowContext = yield* $(getWorkflowContext()); + const { count } = workflowContext; + yield* $(updateWorkflowContext({ count: count + 1 })); + }) + ); + +it('handles context update when value is passed', ({ expect }) => { + const program = Effect.gen(function* ($) { + const idGenerator = makeIdGenerator(); + + const service = yield* $( + workflowDefinition1.build(), + Effect.flatMap((workflow) => Service.initialize(workflow, { count: 0 })), + Effect.provideService(IdGenerator, idGenerator) + ); + + yield* $(service.start()); + const state = yield* $(service.getState()); + expect(state).toMatchSnapshot(); + expect(state); + }); + + Effect.runSync(program); +}); + +const workflowDefinition2 = Builder.workflow<{ count: number }>() + .withName('activities') + .startCondition('start') + .task('t1', (t) => + t().onEnable(({ enableTask, updateWorkflowContext }) => + Effect.gen(function* ($) { + const { enqueueStartTask } = yield* $(enableTask()); + yield* $(enqueueStartTask()); + yield* $(updateWorkflowContext(({ count }) => ({ count: count + 1 }))); + }) + ) + ) + .endCondition('end') + .connectCondition('start', (to) => to.task('t1')) + .connectTask('t1', (to) => to.condition('end')) + .onStart(({ updateWorkflowContext }) => + Effect.gen(function* ($) { + yield* $(updateWorkflowContext(({ count }) => ({ count: count + 1 }))); + }) + ); + +it('handles context update when updater function is passed', ({ expect }) => { + const program = Effect.gen(function* ($) { + const idGenerator = makeIdGenerator(); + + const service = yield* $( + workflowDefinition2.build(), + Effect.flatMap((workflow) => Service.initialize(workflow, { count: 0 })), + Effect.provideService(IdGenerator, idGenerator) + ); + + yield* $(service.start()); + + const state = yield* $(service.getState()); + expect(state).toMatchSnapshot(); + expect(state); + }); + + Effect.runSync(program); +}); + +const subWorkflowDefinition1 = Builder.workflow() + .withName('sub') + .withParentContext<{ count: number }>() + .startCondition('start') + .task('subT1') + .endCondition('end') + .connectCondition('start', (to) => to.task('subT1')) + .connectTask('subT1', (to) => to.condition('end')) + .onStart(({ updateParentWorkflowContext }) => + updateParentWorkflowContext(({ count }) => ({ count: count + 1 })) + ); + +const parentWorkflowDefinition1 = Builder.workflow<{ count: number }>() + .withName('activities') + .startCondition('start') + .compositeTask('t1', (ct) => + ct() + .withSubWorkflow(subWorkflowDefinition1) + .onStart(({ startTask }) => + Effect.gen(function* ($) { + const { initializeWorkflow, enqueueStartWorkflow } = yield* $( + startTask() + ); + const workflow = yield* $(initializeWorkflow()); + yield* $(enqueueStartWorkflow(workflow.id)); + }) + ) + ) + .endCondition('end') + .connectCondition('start', (to) => to.task('t1')) + .connectTask('t1', (to) => to.condition('end')); + +it('handles parent context update when updater function is passed', ({ + expect, +}) => { + const program = Effect.gen(function* ($) { + const idGenerator = makeIdGenerator(); + + const service = yield* $( + parentWorkflowDefinition1.build(), + Effect.flatMap((workflow) => Service.initialize(workflow, { count: 0 })), + Effect.provideService(IdGenerator, idGenerator) + ); + + yield* $(service.start()); + yield* $(service.startTask('t1')); + + const state = yield* $(service.getState()); + expect(state).toMatchSnapshot(); + expect(state); + }); + + Effect.runSync(program); +}); + +const subWorkflowDefinition2 = Builder.workflow() + .withName('sub') + .withParentContext<{ count: number }>() + .startCondition('start') + .task('subT1') + .endCondition('end') + .connectCondition('start', (to) => to.task('subT1')) + .connectTask('subT1', (to) => to.condition('end')) + .onStart(({ updateParentWorkflowContext }) => + updateParentWorkflowContext({ count: 1 }) + ); + +const parentWorkflowDefinition2 = Builder.workflow<{ count: number }>() + .withName('activities') + .startCondition('start') + .compositeTask('t1', (ct) => + ct() + .withSubWorkflow(subWorkflowDefinition2) + .onStart(({ startTask }) => + Effect.gen(function* ($) { + const { initializeWorkflow, enqueueStartWorkflow } = yield* $( + startTask() + ); + const workflow = yield* $(initializeWorkflow()); + yield* $(enqueueStartWorkflow(workflow.id)); + }) + ) + ) + .endCondition('end') + .connectCondition('start', (to) => to.task('t1')) + .connectTask('t1', (to) => to.condition('end')); + +it('handles parent context update when value is passed', ({ expect }) => { + const program = Effect.gen(function* ($) { + const idGenerator = makeIdGenerator(); + + const service = yield* $( + parentWorkflowDefinition2.build(), + Effect.flatMap((workflow) => Service.initialize(workflow, { count: 0 })), + Effect.provideService(IdGenerator, idGenerator) + ); + + yield* $(service.start()); + yield* $(service.startTask('t1')); + + const state = yield* $(service.getState()); + expect(state).toMatchSnapshot(); + expect(state); + }); + + Effect.runSync(program); +}); diff --git a/src/elements/BaseTask.ts b/src/elements/BaseTask.ts index c5ef541..704c0dd 100644 --- a/src/elements/BaseTask.ts +++ b/src/elements/BaseTask.ts @@ -99,6 +99,17 @@ export abstract class BaseTask { }); } + getTaskPath(workflowId: WorkflowId) { + const self = this; + return Effect.gen(function* ($) { + const stateManager = yield* $(State); + const [_, ...path] = yield* $( + stateManager.getTaskPath(workflowId, self.name) + ); + return path; + }); + } + isEnabled(workflowId: WorkflowId) { return this.isStateEqualTo(workflowId, 'enabled'); } @@ -122,7 +133,8 @@ export abstract class BaseTask { | TaskDoesNotExistInStore | ConditionDoesNotExist | ConditionDoesNotExistInStore - | InvalidTaskStateTransition, + | InvalidTaskStateTransition + | WorkflowDoesNotExist, unknown >; diff --git a/src/elements/CompositeTask.ts b/src/elements/CompositeTask.ts index 92919f8..1e989f6 100644 --- a/src/elements/CompositeTask.ts +++ b/src/elements/CompositeTask.ts @@ -65,9 +65,10 @@ export class CompositeTask extends BaseTask { const isJoinSatisfied = yield* $(self.isJoinSatisfied(workflowId)); if (isJoinSatisfied) { const executionContext = yield* $(ExecutionContext); + const path = yield* $(self.getTaskPath(workflowId)); const enqueueStartTask = (input?: unknown) => { return executionContext.queue.offer({ - path: [...executionContext.path, self.name], + path, type: 'startTask', input, }); @@ -193,9 +194,11 @@ export class CompositeTask extends BaseTask { return workflow; }).pipe(Effect.provideService(State, stateManager)); + const path = yield* $(self.getTaskPath(workflowId)); + const enqueueStartWorkflow = (id: WorkflowId, input: unknown) => { return executionContext.queue.offer({ - path: [...executionContext.path, id], + path: [...path, id], type: 'startWorkflow', input, }); diff --git a/src/elements/Task.ts b/src/elements/Task.ts index 60d72ba..852dd0d 100644 --- a/src/elements/Task.ts +++ b/src/elements/Task.ts @@ -71,9 +71,10 @@ export class Task extends BaseTask { const isJoinSatisfied = yield* $(self.isJoinSatisfied(workflowId)); if (isJoinSatisfied) { const executionContext = yield* $(ExecutionContext); + const path = yield* $(self.getTaskPath(workflowId)); const enqueueStartTask = (input?: unknown) => { return executionContext.queue.offer({ - path: [...executionContext.path, self.name], + path, type: 'startTask', input, }); @@ -185,12 +186,14 @@ export class Task extends BaseTask { .initializeWorkItem(workflowId, payload) .pipe(Effect.provideService(State, stateManager)); + const path = yield* $(self.getTaskPath(workflowId)); + const enqueueStartWorkItem = ( workItemId: WorkItemId, input?: unknown ) => { return executionContext.queue.offer({ - path: [...executionContext.path, workItemId], + path: [...path, workItemId], type: 'startWorkItem', input, }); @@ -562,6 +565,9 @@ export class Task extends BaseTask { self.getWorkItemActivityPayload(workflowId, workItemId) ); + const taskPath = yield* $(self.getTaskPath(workflowId)); + const path = [...taskPath, workItemId]; + const result = yield* $( self.workItemActivities.onStart( { @@ -572,21 +578,21 @@ export class Task extends BaseTask { return { enqueueCompleteWorkItem(input?: unknown) { return executionContext.queue.offer({ - path: executionContext.path, + path, type: 'completeWorkItem', input, }); }, enqueueFailWorkItem(input?: unknown) { return executionContext.queue.offer({ - path: executionContext.path, + path, type: 'failWorkItem', input, }); }, enqueueCancelWorkItem(input?: unknown) { return executionContext.queue.offer({ - path: executionContext.path, + path, type: 'cancelWorkItem', input, }); diff --git a/src/elements/Workflow.ts b/src/elements/Workflow.ts index 6529b8c..27c8914 100644 --- a/src/elements/Workflow.ts +++ b/src/elements/Workflow.ts @@ -387,7 +387,7 @@ export class Workflow< return yield* $(stateManager.getWorkflowContext(parent.workflowId)); }); }, - updateParentWorkflowContext(context: unknown) { + updateParentWorkflowContext(contextOrUpdater: unknown) { return Effect.gen(function* ($) { const parent = workflow.parent; if (!parent) { @@ -400,8 +400,22 @@ export class Workflow< ) ); } + if (typeof contextOrUpdater === 'function') { + const context = yield* $( + stateManager.getWorkflowContext(parent.workflowId) + ); + return yield* $( + stateManager.updateWorkflowContext( + parent.workflowId, + contextOrUpdater(context) + ) + ); + } return yield* $( - stateManager.updateWorkflowContext(parent.workflowId, context) + stateManager.updateWorkflowContext( + parent.workflowId, + contextOrUpdater + ) ); }); }, @@ -410,8 +424,21 @@ export class Workflow< return (yield* $(stateManager.getWorkflowContext(id))) as Context; }); }, - updateWorkflowContext(context: unknown) { - return stateManager.updateWorkflowContext(id, context); + updateWorkflowContext(contextOrUpdater: unknown) { + return Effect.gen(function* ($) { + if (typeof contextOrUpdater === 'function') { + const context = yield* $(stateManager.getWorkflowContext(id)); + return yield* $( + stateManager.updateWorkflowContext( + id, + contextOrUpdater(context) + ) + ); + } + return yield* $( + stateManager.updateWorkflowContext(id, contextOrUpdater) + ); + }); }, }; }); diff --git a/src/state/StateImpl.ts b/src/state/StateImpl.ts index 28fd1c4..6658522 100644 --- a/src/state/StateImpl.ts +++ b/src/state/StateImpl.ts @@ -292,6 +292,29 @@ export class StateImpl implements State { ); } + getTaskPath( + workflowId: WorkflowId, + taskName: TaskName + ): Effect.Effect< + never, + TaskDoesNotExistInStore | WorkflowDoesNotExist, + string[] + > { + const self = this; + return Effect.gen(function* ($) { + const task = yield* $(self.getTask(workflowId, taskName)); + const workflow = yield* $(self.getWorkflow(workflowId)); + const path = [workflow.id, task.name]; + if (workflow.parent) { + const parentPath = yield* $( + self.getTaskPath(workflow.parent.workflowId, workflow.parent.taskName) + ); + return [...parentPath, ...path]; + } + return path; + }); + } + getCondition(workflowId: WorkflowId, conditionName: ConditionName) { const { store } = this; return Effect.gen(function* ($) { diff --git a/src/types.ts b/src/types.ts index 4a09eb1..4dd1999 100644 --- a/src/types.ts +++ b/src/types.ts @@ -65,11 +65,13 @@ export interface WorkflowBuilderDefinition { }; } +type UpdateWorkflowContext = ( + contextOrUpdater: C | ((context: C) => C) +) => Effect.Effect; + export interface DefaultTaskOrWorkItemActivityPayload { getWorkflowContext(): Effect.Effect; - updateWorkflowContext( - context: C - ): Effect.Effect; + updateWorkflowContext: UpdateWorkflowContext; } export type TaskOnDisablePayload = @@ -534,9 +536,7 @@ export interface ExecutionContext { WorkflowDoesNotExist, unknown >; - updateWorkflowContext: ( - context: unknown - ) => Effect.Effect; + updateWorkflowContext: UpdateWorkflowContext; }; queue: { offer: ( @@ -624,16 +624,14 @@ export interface DefaultWorkflowActivityPayload { updateParentWorkflowContext: PC extends never ? never : ( - context: PC + context: PC | ((context: PC) => PC) ) => Effect.Effect< never, ParentWorkflowDoesNotExist | WorkflowDoesNotExist, void >; getWorkflowContext(): Effect.Effect; - updateWorkflowContext( - context: C - ): Effect.Effect; + updateWorkflowContext: UpdateWorkflowContext; } export type WorkflowOnStartPayload = DefaultWorkflowActivityPayload<