Skip to content

Commit

Permalink
Support storage types (#35)
Browse files Browse the repository at this point in the history
* rename workflowPersistence to Storage

* convert wfPerf to forked

* erradicate serial, parallel, recover and step logs

* README++

* chkpt1

* fsstorage impl and test

* switch snapshot manager to using FS store
  • Loading branch information
zhirafovod authored May 24, 2024
1 parent 9a2b6e6 commit 7ac245d
Show file tree
Hide file tree
Showing 26 changed files with 1,104 additions and 877 deletions.
53 changes: 44 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
* [Getting Started](#getting-started)
* [Installation](#installation)
* [Running the REPL](#running-the-repl)
* [Why Not Ordinary Stated Templates?](#why-not-ordinary-stated-templates)
* [Stated Templates](#stated-templates)
* [High-level design of Stated Workflows](#high-level-design-of-stated-workflows)
* [Stated Workflow Concurrency](#stated-workflow-concurrency)
* [Concurrent, Event Driven, Non-blocking](#concurrent-event-driven-non-blocking)
* [Atomic State Updates](#atomic-state-updates)
* [Pure Function Pipelines - $serial and $parallel](#pure-function-pipelines---serial-and-parallel)
* [Pub/Sub Clients Configuration](#pubsub-clients-configuration)
* [Test Data](#test-data)
* [Dispatcher mode](#dispatcher-mode)
Expand All @@ -18,10 +18,11 @@
* [Test Client Durability](#test-client-durability)
* [Pulsar Client Durability](#pulsar-client-durability)
* [Kafka Client Durability](#kafka-client-durability)
* [Workflow Step Logs](#workflow-step-logs)
* [Workflow snapshots](#workflow-snapshots)
* [Explicit Acknowledgement](#explicit-acknowledgement)
* [Workflow APIs](#workflow-apis)
* [Architecture](#architecture)
* [Components Design](#components-design)
<!-- TOC -->

# Overview
Expand Down Expand Up @@ -72,11 +73,11 @@ The REPL will launch, allowing you to interact with the stated-js library. In or

For example you can enter this command in the REPL:
```bash
> .init -f "example/homeworld.json"
> .init -f "example/homeworld.yaml"
```

# Why Not Ordinary Stated Templates?
Ordinary [stated templates](https://github.com/cisco-open/stated?tab=readme-ov-file#stated) run a change graph called a
# Stated Templates
[Stated Templates Processor](https://github.com/cisco-open/stated?tab=readme-ov-file#stated) run a change graph called a
[DAG](https://github.com/cisco-open/stated?tab=readme-ov-file#dag).
Stated flattens the DAG and executes it as a sequence of expressions called the `plan`. The example below illustrates how a plan executes a sequence of REST calls and transformations in an ordinary Stated
template.
Expand Down Expand Up @@ -106,7 +107,7 @@ the work where it left off. We can see that for "workflows", which implies lots
need a way to address these concerns.

# High-level design of Stated Workflows
[Stated Workflow High-level diagram](https://raw.githubusercontent.com/zhirafovod/shtuff/main/images/stated-worlkflows/stated-workflow-high-level.jpeg).
![Stated Workflow High-level diagram](https://raw.githubusercontent.com/zhirafovod/shtuff/main/images/stated-worlkflows/stated-workflow-high-level.jpeg).
Stated Workflows extends stated templates processor to add notion of Cloud Events, Pub/Sub functions, Durability and Scalability.

# Stated Workflow Concurrency
Expand Down Expand Up @@ -145,7 +146,7 @@ events and dispatched to a subscriber with a settable `parallelism` factor. The
the Star Wars character's full name from the REST response.

```yaml
start: ${ (produceParams.data; $millis()) } #record start time, after test dataset has been computed
start: ${ (produceParams.client.data; $millis()) } #record start time, after test dataset has been computed
# producer will be sending some test data
produceParams:
type: "my-topic"
Expand Down Expand Up @@ -471,7 +472,41 @@ report: "${( $console.log('result added: ' & $$.results) )}"
]
```


# Workflow APIs
[README.API.md](README.API.md) provides a REST API to manage the workflow. The API can be used to start, stop, and view
workflows. The API can also be used to fetch the latest snapshot and restore the workflow from the snapshot.

# Architecture
Overall architecture focuses on providing simple and scalable event-driven workflows.
It is a single service architecture, but with pluggable and easy to extend support
of different persistence stores, as well as providing custom functions and Pub/Sub clients.

At the same time the operator can restrict functions available to the workflow, and
decide on different parallelism and durability models.

## Components Design
Internally stated workflow consists of the following modules:
- StatedWorkflow extends Stated Template Processor functionality to support event-driven workflows
- WorkflowManager is a central component that manages the workflow lifecycle. It is
responsible for starting, stopping, and restoring workflows from the snapshot.
- WorkflowAPI provides a REST API to manage the workflow.
- SnapshotManager is responsible for workflow snapshot lifecycle.
- Storage is a pluggable module that supports different storage types to persist
its state.

Stated workflows supports different storage types to persist its state, including
workflow definitions, workflow snapshot, and node state.
- workflow - a uniq workflow id, name, and workflow template definition
- snapshot - a snapshot of a workflow. Snapshot includes its workflow definition, and if a snapshot enabled - it supersedes workflow definition.
- node - workflow node state, which is used to workflow to node allocation and load balancing

![Stated Workflow Components](https://raw.githubusercontent.com/zhirafovod/shtuff/main/images/stated-worlkflows/stated-workflow-components.jpeg).

## Scalability
Workflow Manager can scale linearly leveraging persistent storage
![Stated Workflow Components](https://raw.githubusercontent.com/zhirafovod/shtuff/main/images/stated-worlkflows/stated-workflow-new.jpeg).

## Failover
A node failure will be detected after it fails to update its heartbeat within a timeout
![Stated Workflow Failover](https://raw.githubusercontent.com/zhirafovod/shtuff/main/images/stated-worlkflows/stated-workflow-failover.jpeg).

2 changes: 1 addition & 1 deletion example/joinResistance.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
start: ${ (produceParams.data; $millis()) } #record start time, after test dataset has been computed
start: ${ (produceParams.client.data; $millis()) } #record start time, after test dataset has been computed
# producer will be sending some test data
produceParams:
type: "my-topic"
Expand Down
2 changes: 1 addition & 1 deletion example/joinResistanceBug.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
start: ${ (produceParams.data; $millis()) } #record start time, after test dataset has been computed
start: ${ (produceParams.client.data; $millis()) } #record start time, after test dataset has been computed
# producer will be sending some test data
produceParams:
type: "my-topic"
Expand Down
2 changes: 1 addition & 1 deletion example/joinResistanceFast.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
start: ${ (produceParams.data; $millis()) } #record start time, after test dataset has been computed
start: ${ (produceParams.client.data; $millis()) } #record start time, after test dataset has been computed
# producer will be sending some test data
produceParams:
type: "my-topic"
Expand Down
2 changes: 1 addition & 1 deletion example/joinResistanceRecovery.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
start: ${ (produceParams.data; $millis()) } #record start time, after test dataset has been computed
start: ${ (produceParams.client.data; $millis()) } #record start time, after test dataset has been computed
# producer will be sending some test data
produceParams:
type: "rebelDispatch"
Expand Down
2 changes: 1 addition & 1 deletion example/obsolete/joinResistanceRecoverySerial.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"start": "${ (produceParams.data; $millis()) }",
"start": "${ (produceParams.client.data; $millis()) }",
"produceParams": {
"type": "rebelDispatch",
"client": {
Expand Down
2 changes: 1 addition & 1 deletion example/obsolete/joinResistanceRecoverySerial.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
start: ${ (produceParams.data; $millis()) } #record start time, after test dataset has been computed
start: ${ (produceParams.client.data; $millis()) } #record start time, after test dataset has been computed
# producer will be sending some test data
produceParams:
type: "rebelDispatch"
Expand Down
2 changes: 1 addition & 1 deletion example/resistanceSnapshot.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"template": {
"start": "${ (produceParams.data; $millis()) }",
"start": "${ (produceParams.client.data; $millis()) }",
"produceParams": {
"type": "my-topic",
"client": {
Expand Down
20 changes: 7 additions & 13 deletions example/wfPerf01.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,14 @@ name: nozzleWork
subscribeParams: #parameters for subscribing to a cloud event
type: sys:cron
filter$: function($e){ $e.name='nozzleTime' }
to: ../${myWorkflow$}
to: /${ function($input) { $forked( "/nozzle", $input) } }
parallelism: 8
source: cloudEvent
client:
type: test
testData: "${ [1..300].({'name': 'nozzleTime', 'order':$}) }"

myWorkflow$: |
function($e){
$e ~> $serial([step1, step2])
}
step1:
name: primeTheNozzle
function: ${function($e){ ($e~>|$|{'primed':true}|) }}
step2:
name: sprayTheNozzle
function: ${function($e){ $e~>|$|{'sprayed':true}| }}
testData: "${ [1..1000].({'name': 'nozzleTime', 'order':$}) }"
nozzle: {'name': 'defaultNozzle', 'order': 0}
step1: /${nozzle~>|$|{'primed':true}|}
step2: /${step1~>|$|{'sprayed':true}|}
step3: /${$joined("/nozzles/-", step2)}
nozzles: []
Loading

0 comments on commit 7ac245d

Please sign in to comment.