From 56ee80435f8c5e6c0c7dde63b4a8bcd20605bf9a Mon Sep 17 00:00:00 2001 From: Mihael Konjevic Date: Sun, 28 Jan 2024 09:16:07 +0100 Subject: [PATCH] feat: emit state changes after each user triggered state update --- src/Service.ts | 29 ++++++++++++++++++++--------- src/elements/BaseTask.ts | 11 ++++++----- src/elements/Task.ts | 21 ++++++++++++++------- src/elements/Workflow.ts | 17 ++++++++++------- 4 files changed, 50 insertions(+), 28 deletions(-) diff --git a/src/Service.ts b/src/Service.ts index 84589de..07f38c2 100644 --- a/src/Service.ts +++ b/src/Service.ts @@ -656,7 +656,11 @@ export class Service< }); yield* $( - Effect.all(queuedFx, { concurrency: 'inherit', discard: true }) + Effect.all(queuedFx, { + concurrency: 'inherit', + batching: 'inherit', + discard: true, + }) ); } }); @@ -670,36 +674,43 @@ export class Service< path: readonly string[]; workflowId: WorkflowId; }): ExecutionContext { - const { queue, state } = this; + const self = this; return { ...input, - emitStateChanges: () => this.emitStateChanges(), + emitStateChanges: () => self.emitStateChanges(), defaultActivityPayload: { getWorkflowContext() { - return state + return self.state .getWorkflow(input.workflowId) .pipe(Effect.map((w) => w.context)); }, updateWorkflowContext(contextOrUpdater: unknown) { return Effect.gen(function* ($) { if (typeof contextOrUpdater === 'function') { - const workflow = yield* $(state.getWorkflow(input.workflowId)); + const workflow = yield* $( + self.state.getWorkflow(input.workflowId) + ); return yield* $( - state.updateWorkflowContext( + self.state.updateWorkflowContext( input.workflowId, contextOrUpdater(workflow.context) - ) + ), + Effect.tap(() => self.emitStateChanges()) ); } return yield* $( - state.updateWorkflowContext(input.workflowId, contextOrUpdater) + self.state.updateWorkflowContext( + input.workflowId, + contextOrUpdater + ), + Effect.tap(() => self.emitStateChanges()) ); }); }, }, queue: { offer(item: ExecutionContextQueueItem) { - return queue.offer(item); + return self.queue.offer(item); }, }, }; diff --git a/src/elements/BaseTask.ts b/src/elements/BaseTask.ts index 704c0dd..f2e3ce5 100644 --- a/src/elements/BaseTask.ts +++ b/src/elements/BaseTask.ts @@ -273,7 +273,7 @@ export abstract class BaseTask { discard: true, }), ], - { discard: true, concurrency: 'inherit' } + { discard: true, concurrency: 'inherit', batching: 'inherit' } ); } @@ -310,8 +310,8 @@ export abstract class BaseTask { return yield* $( Effect.all(updates, { - batching: 'inherit', discard: true, + batching: 'inherit', concurrency: 'inherit', }) ); @@ -349,8 +349,8 @@ export abstract class BaseTask { return flow.nextElement.incrementMarking(workflowId); }); return Effect.all(updates, { - batching: 'inherit', concurrency: 'inherit', + batching: 'inherit', discard: true, }); } @@ -378,7 +378,7 @@ export abstract class BaseTask { Array.from(self.incomingFlows).map((flow) => flow.priorElement.getMarking(workflowId) ), - { batching: 'inherit', concurrency: 'inherit' } + { concurrency: 'inherit', batching: 'inherit' } ) ); return markings.filter((m) => m > 0).length === 1 ? true : false; @@ -407,7 +407,8 @@ export abstract class BaseTask { return Effect.allSuccesses( Array.from(this.outgoingFlows).map((flow) => flow.nextElement.enableTasks(workflowId) - ) + ), + { concurrency: 'inherit', batching: 'inherit' } ); } } diff --git a/src/elements/Task.ts b/src/elements/Task.ts index d71ae96..b25d1c1 100644 --- a/src/elements/Task.ts +++ b/src/elements/Task.ts @@ -177,7 +177,13 @@ export class Task extends BaseTask { const updates = preSet.map((condition) => condition.decrementMarking(workflowId) ); - yield* $(Effect.all(updates, { discard: true, batching: true })); + yield* $( + Effect.all(updates, { + discard: true, + concurrency: 'inherit', + batching: 'inherit', + }) + ); }).pipe( Effect.provideService(State, stateManager), Effect.provideService(ExecutionContext, executionContext) @@ -279,7 +285,7 @@ export class Task extends BaseTask { workItems.map(({ id }) => self.cancelWorkItem(workflowId, id, undefined, false) ), - { batching: true } + { concurrency: 'inherit', batching: true } ) ); yield* $(stateManager.completeTask(workflowId, self.name)); @@ -358,7 +364,7 @@ export class Task extends BaseTask { workItems.map(({ id }) => self.cancelWorkItem(workflowId, id, undefined, false) ), - { concurrency: 'inherit' } + { concurrency: 'inherit', batching: 'inherit' } ) ); yield* $(stateManager.cancelTask(workflowId, self.name)); @@ -423,7 +429,7 @@ export class Task extends BaseTask { workItems.map(({ id }) => self.cancelWorkItem(workflowId, id, undefined, false) ), - { concurrency: 'inherit' } + { concurrency: 'inherit', batching: 'inherit' } ) ); yield* $(stateManager.failTask(workflowId, self.name)); @@ -528,9 +534,10 @@ export class Task extends BaseTask { .pipe(Effect.provideService(State, stateManager)); }, updateWorkItemPayload(payload: unknown) { - return self - .updateWorkItem(workflowId, workItemId, payload) - .pipe(Effect.provideService(State, stateManager)); + return self.updateWorkItem(workflowId, workItemId, payload).pipe( + Effect.tap(() => executionContext.emitStateChanges()), + Effect.provideService(State, stateManager) + ); }, }; }); diff --git a/src/elements/Workflow.ts b/src/elements/Workflow.ts index b44ca1b..b48402c 100644 --- a/src/elements/Workflow.ts +++ b/src/elements/Workflow.ts @@ -189,7 +189,7 @@ export class Workflow< Object.values(self.tasks).map((task) => task.maybeCancelOrDisable(id) ), - { batching: true } + { concurrency: 'inherit', batching: 'inherit' } ) ); yield* $(stateManager.updateWorkflowState(id, 'completed')); @@ -253,7 +253,7 @@ export class Workflow< Object.values(self.tasks).map((task) => task.maybeCancelOrDisable(id) ), - { batching: true } + { concurrency: 'inherit', batching: 'inherit' } ) ); yield* $( @@ -261,7 +261,7 @@ export class Workflow< Object.values(self.conditions).map((condition) => condition.cancel(id) ), - { batching: true } + { concurrency: 'inherit', batching: 'inherit' } ) ); yield* $(stateManager.updateWorkflowState(id, 'canceled')); @@ -336,7 +336,7 @@ export class Workflow< Object.values(self.tasks).map((task) => task.maybeCancelOrDisable(id) ), - { batching: true } + { concurrency: 'inherit', batching: 'inherit' } ) ); yield* $( @@ -344,7 +344,7 @@ export class Workflow< Object.values(self.conditions).map((condition) => condition.cancel(id) ), - { batching: true } + { concurrency: 'inherit', batching: 'inherit' } ) ); yield* $(stateManager.updateWorkflowState(id, 'failed')); @@ -380,6 +380,7 @@ export class Workflow< getDefaultActivityPayload(id: WorkflowId) { return Effect.gen(function* ($) { const stateManager = yield* $(State); + const executionContext = yield* $(ExecutionContext); const workflow = yield* $(stateManager.getWorkflow(id)); return { @@ -444,11 +445,13 @@ export class Workflow< stateManager.updateWorkflowContext( id, contextOrUpdater(context) - ) + ), + Effect.tap(() => executionContext.emitStateChanges()) ); } return yield* $( - stateManager.updateWorkflowContext(id, contextOrUpdater) + stateManager.updateWorkflowContext(id, contextOrUpdater), + Effect.tap(() => executionContext.emitStateChanges()) ); }); },