Skip to content

Commit

Permalink
feat: emit state changes after each user triggered state update
Browse files Browse the repository at this point in the history
  • Loading branch information
retro committed Jan 28, 2024
1 parent a178053 commit 56ee804
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 28 deletions.
29 changes: 20 additions & 9 deletions src/Service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,11 @@ export class Service<
});

yield* $(
Effect.all(queuedFx, { concurrency: 'inherit', discard: true })
Effect.all(queuedFx, {
concurrency: 'inherit',
batching: 'inherit',
discard: true,
})
);
}
});
Expand All @@ -670,36 +674,43 @@ export class Service<
path: readonly string[];
workflowId: WorkflowId;
}): ExecutionContext {
const { queue, state } = this;
const self = this;
return {
...input,
emitStateChanges: () => this.emitStateChanges(),
emitStateChanges: () => self.emitStateChanges(),
defaultActivityPayload: {
getWorkflowContext() {
return state
return self.state
.getWorkflow(input.workflowId)
.pipe(Effect.map((w) => w.context));
},
updateWorkflowContext(contextOrUpdater: unknown) {
return Effect.gen(function* ($) {
if (typeof contextOrUpdater === 'function') {
const workflow = yield* $(state.getWorkflow(input.workflowId));
const workflow = yield* $(
self.state.getWorkflow(input.workflowId)
);
return yield* $(
state.updateWorkflowContext(
self.state.updateWorkflowContext(
input.workflowId,
contextOrUpdater(workflow.context)
)
),
Effect.tap(() => self.emitStateChanges())
);
}
return yield* $(
state.updateWorkflowContext(input.workflowId, contextOrUpdater)
self.state.updateWorkflowContext(
input.workflowId,
contextOrUpdater
),
Effect.tap(() => self.emitStateChanges())
);
});
},
},
queue: {
offer(item: ExecutionContextQueueItem) {
return queue.offer(item);
return self.queue.offer(item);
},
},
};
Expand Down
11 changes: 6 additions & 5 deletions src/elements/BaseTask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ export abstract class BaseTask {
discard: true,
}),
],
{ discard: true, concurrency: 'inherit' }
{ discard: true, concurrency: 'inherit', batching: 'inherit' }
);
}

Expand Down Expand Up @@ -310,8 +310,8 @@ export abstract class BaseTask {

return yield* $(
Effect.all(updates, {
batching: 'inherit',
discard: true,
batching: 'inherit',
concurrency: 'inherit',
})
);
Expand Down Expand Up @@ -349,8 +349,8 @@ export abstract class BaseTask {
return flow.nextElement.incrementMarking(workflowId);
});
return Effect.all(updates, {
batching: 'inherit',
concurrency: 'inherit',
batching: 'inherit',
discard: true,
});
}
Expand Down Expand Up @@ -378,7 +378,7 @@ export abstract class BaseTask {
Array.from(self.incomingFlows).map((flow) =>
flow.priorElement.getMarking(workflowId)
),
{ batching: 'inherit', concurrency: 'inherit' }
{ concurrency: 'inherit', batching: 'inherit' }
)
);
return markings.filter((m) => m > 0).length === 1 ? true : false;
Expand Down Expand Up @@ -407,7 +407,8 @@ export abstract class BaseTask {
return Effect.allSuccesses(
Array.from(this.outgoingFlows).map((flow) =>
flow.nextElement.enableTasks(workflowId)
)
),
{ concurrency: 'inherit', batching: 'inherit' }
);
}
}
21 changes: 14 additions & 7 deletions src/elements/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,13 @@ export class Task extends BaseTask {
const updates = preSet.map((condition) =>
condition.decrementMarking(workflowId)
);
yield* $(Effect.all(updates, { discard: true, batching: true }));
yield* $(
Effect.all(updates, {
discard: true,
concurrency: 'inherit',
batching: 'inherit',
})
);
}).pipe(
Effect.provideService(State, stateManager),
Effect.provideService(ExecutionContext, executionContext)
Expand Down Expand Up @@ -279,7 +285,7 @@ export class Task extends BaseTask {
workItems.map(({ id }) =>
self.cancelWorkItem(workflowId, id, undefined, false)
),
{ batching: true }
{ concurrency: 'inherit', batching: true }
)
);
yield* $(stateManager.completeTask(workflowId, self.name));
Expand Down Expand Up @@ -358,7 +364,7 @@ export class Task extends BaseTask {
workItems.map(({ id }) =>
self.cancelWorkItem(workflowId, id, undefined, false)
),
{ concurrency: 'inherit' }
{ concurrency: 'inherit', batching: 'inherit' }
)
);
yield* $(stateManager.cancelTask(workflowId, self.name));
Expand Down Expand Up @@ -423,7 +429,7 @@ export class Task extends BaseTask {
workItems.map(({ id }) =>
self.cancelWorkItem(workflowId, id, undefined, false)
),
{ concurrency: 'inherit' }
{ concurrency: 'inherit', batching: 'inherit' }
)
);
yield* $(stateManager.failTask(workflowId, self.name));
Expand Down Expand Up @@ -528,9 +534,10 @@ export class Task extends BaseTask {
.pipe(Effect.provideService(State, stateManager));
},
updateWorkItemPayload(payload: unknown) {
return self
.updateWorkItem(workflowId, workItemId, payload)
.pipe(Effect.provideService(State, stateManager));
return self.updateWorkItem(workflowId, workItemId, payload).pipe(
Effect.tap(() => executionContext.emitStateChanges()),
Effect.provideService(State, stateManager)
);
},
};
});
Expand Down
17 changes: 10 additions & 7 deletions src/elements/Workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ export class Workflow<
Object.values(self.tasks).map((task) =>
task.maybeCancelOrDisable(id)
),
{ batching: true }
{ concurrency: 'inherit', batching: 'inherit' }
)
);
yield* $(stateManager.updateWorkflowState(id, 'completed'));
Expand Down Expand Up @@ -253,15 +253,15 @@ export class Workflow<
Object.values(self.tasks).map((task) =>
task.maybeCancelOrDisable(id)
),
{ batching: true }
{ concurrency: 'inherit', batching: 'inherit' }
)
);
yield* $(
Effect.all(
Object.values(self.conditions).map((condition) =>
condition.cancel(id)
),
{ batching: true }
{ concurrency: 'inherit', batching: 'inherit' }
)
);
yield* $(stateManager.updateWorkflowState(id, 'canceled'));
Expand Down Expand Up @@ -336,15 +336,15 @@ export class Workflow<
Object.values(self.tasks).map((task) =>
task.maybeCancelOrDisable(id)
),
{ batching: true }
{ concurrency: 'inherit', batching: 'inherit' }
)
);
yield* $(
Effect.all(
Object.values(self.conditions).map((condition) =>
condition.cancel(id)
),
{ batching: true }
{ concurrency: 'inherit', batching: 'inherit' }
)
);
yield* $(stateManager.updateWorkflowState(id, 'failed'));
Expand Down Expand Up @@ -380,6 +380,7 @@ export class Workflow<
getDefaultActivityPayload(id: WorkflowId) {
return Effect.gen(function* ($) {
const stateManager = yield* $(State);
const executionContext = yield* $(ExecutionContext);
const workflow = yield* $(stateManager.getWorkflow(id));

return {
Expand Down Expand Up @@ -444,11 +445,13 @@ export class Workflow<
stateManager.updateWorkflowContext(
id,
contextOrUpdater(context)
)
),
Effect.tap(() => executionContext.emitStateChanges())
);
}
return yield* $(
stateManager.updateWorkflowContext(id, contextOrUpdater)
stateManager.updateWorkflowContext(id, contextOrUpdater),
Effect.tap(() => executionContext.emitStateChanges())
);
});
},
Expand Down

0 comments on commit 56ee804

Please sign in to comment.