Skip to content

Commit

Permalink
feat: process queued items concurrently
Browse files Browse the repository at this point in the history
Queued items are now processed concurrently, and state changes are emitted more often.

BREAKING CHANGE: If a user provides one of the activity functions (for workflows, tasks or work
items), state change will be emitted immediatelly after user "advances" the state (manually calls
`startTask`, `completeWorkItem` or one of the similar functions.

Whatever was enqueued during the current "turn" will be processed concurrently. For instance if you initialize
and enqueue start of multiple work items, they will be started concurrently. Previously, they were processed
sequentially.
  • Loading branch information
retro committed Jan 27, 2024
1 parent 1207d16 commit de94a89
Show file tree
Hide file tree
Showing 7 changed files with 564 additions and 48 deletions.
70 changes: 40 additions & 30 deletions src/Service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Effect, Match, Option, Queue, pipe } from 'effect';
import { Chunk, Effect, Match, Option, Queue, pipe } from 'effect';
import { Get } from 'type-fest';

import { State } from './State.js';
Expand Down Expand Up @@ -613,42 +613,51 @@ export class Service<
const self = this;

return Effect.gen(function* ($) {
yield* $(self.emitStateChanges());
while (true) {
const item = yield* $(
Queue.poll(self.queue),
Effect.map(Option.getOrNull)
yield* $(self.emitStateChanges());

const queued = yield* $(
Queue.takeAll(self.queue),
Effect.map(Chunk.toReadonlyArray)
);

if (item === null) {
if (queued.length === 0) {
return;
}

const match = pipe(
Match.type<ExecutionContextQueueItem>(),
Match.when({ type: 'startTask' }, ({ path, input }) =>
self.unsafeStartTask(path, input, false)
),
Match.when({ type: 'startWorkflow' }, ({ path, input }) =>
self.unsafeStartWorkflow(path, input, false)
),
Match.when({ type: 'startWorkItem' }, ({ path, input }) =>
self.unsafeStartWorkItem(path, input, false)
),
Match.when({ type: 'completeWorkItem' }, ({ path, input }) =>
self.unsafeCompleteWorkItem(path, input, false)
),
Match.when({ type: 'cancelWorkItem' }, ({ path, input }) =>
self.unsafeCancelWorkItem(path, input, false)
),
Match.when({ type: 'failWorkItem' }, ({ path, input }) =>
self.unsafeFailWorkItem(path, input, false)
),
Match.exhaustive
);
const queuedFx = queued.map((item) => {
const match = pipe(
Match.type<ExecutionContextQueueItem>(),
Match.when({ type: 'startTask' }, ({ path, input }) =>
self.unsafeStartTask(path, input, false)
),
Match.when({ type: 'startWorkflow' }, ({ path, input }) =>
self.unsafeStartWorkflow(path, input, false)
),
Match.when({ type: 'startWorkItem' }, ({ path, input }) =>
self.unsafeStartWorkItem(path, input, false)
),
Match.when({ type: 'completeWorkItem' }, ({ path, input }) =>
self.unsafeCompleteWorkItem(path, input, false)
),
Match.when({ type: 'cancelWorkItem' }, ({ path, input }) =>
self.unsafeCancelWorkItem(path, input, false)
),
Match.when({ type: 'failWorkItem' }, ({ path, input }) =>
self.unsafeFailWorkItem(path, input, false)
),
Match.exhaustive
);

yield* $(match(item));
yield* $(self.emitStateChanges());
return Effect.gen(function* ($) {
yield* $(match(item));
yield* $(self.emitStateChanges());
});
});

yield* $(
Effect.all(queuedFx, { concurrency: 'inherit', discard: true })
);
}
});
}
Expand All @@ -664,6 +673,7 @@ export class Service<
const { queue, state } = this;
return {
...input,
emitStateChanges: () => this.emitStateChanges(),
defaultActivityPayload: {
getWorkflowContext() {
return state
Expand Down
Loading

0 comments on commit de94a89

Please sign in to comment.