Skip to content

Commit

Permalink
UI driven workflow using signal proxy pattern (#106)
Browse files Browse the repository at this point in the history
* Synchronous proxy workflow.

This uses the proxy workflow pattern to allow synchronous interaction
with the main workflow from a UI. Once Temporal implements a synchronous
request/response primitive this can be simplified.
  • Loading branch information
robholland authored Jun 8, 2021
1 parent 6741b80 commit 58385d6
Show file tree
Hide file tree
Showing 7 changed files with 510 additions and 0 deletions.
25 changes: 25 additions & 0 deletions synchronous-proxy/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
## Synchronous Proxy Sample

This sample demonstrates how to achieve synchronous interaction with a main workflow.

We call this pattern a proxy workflow. The proxy workflow sends a signal to the main workflow and then blocks waiting for a signal in response.

This mimics a synchronous SendAndReceiveSignal feature which Temporal does not currently provide natively.

The flow of calls is outlined in the diagram below.

![Flow Diagram](flow.png)

### Steps to run this sample:

1) You need a Temporal service running. See details in README.md
2) Run the following command to start the worker
```shell
go run worker/main.go
```
3) Run the following command to start the simple UI
```shell
go run ui/main.go
```

Once the UI has exited you will be able to see delivery details in the worker output, as might have been emailed to you in a real implementation.
66 changes: 66 additions & 0 deletions synchronous-proxy/activities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package synchronousproxy

import (
"context"
"fmt"
"time"

"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/temporal"
)

func RegisterEmail(ctx context.Context, email string) error {
logger := activity.GetLogger(ctx)

logger.Info("activity: registered email", email)

return nil
}

func ValidateSize(ctx context.Context, size string) error {
for _, key := range TShirtSizes {
if key == size {
return nil
}
}

return temporal.NewNonRetryableApplicationError(
fmt.Sprintf("size: %s is not valid (%v)", size, TShirtSizes),
"InvalidSize",
nil,
nil,
)
}

func ValidateColor(ctx context.Context, color string) error {
for _, key := range TShirtColors {
if key == color {
return nil
}
}

return temporal.NewNonRetryableApplicationError(
fmt.Sprintf("color: %s is not valid (%v)", color, TShirtColors),
"InvalidColor",
nil,
nil,
)
}

func ScheduleDelivery(ctx context.Context, order TShirtOrder) (time.Time, error) {
logger := activity.GetLogger(ctx)

deliveryDate := time.Now().Add(time.Hour * 48)

logger.Info("activity: scheduled delivery", order, deliveryDate)

return deliveryDate, nil
}

func SendDeliveryEmail(ctx context.Context, order TShirtOrder, deliveryDate time.Time) error {
logger := activity.GetLogger(ctx)

logger.Info(fmt.Sprintf("email to: %s order: %v scheduled delivery: %v", order.Email, order, deliveryDate))

return nil
}
Binary file added synchronous-proxy/flow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
106 changes: 106 additions & 0 deletions synchronous-proxy/proxy/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package proxy

import (
"fmt"

"go.temporal.io/sdk/workflow"
)

const proxyRequestSignalName = "proxy-request-signal"
const proxyResponseSignalName = "proxy-response-signal"

type proxySignalRequest struct {
Key string
Value string
CallingWorkflowId string
}

type proxySignalResponse struct {
Key string
Value string
Error string
}

func SendErrorResponse(ctx workflow.Context, id string, err error) error {
logger := workflow.GetLogger(ctx)

logger.Info("Sending error response", id)

return workflow.SignalExternalWorkflow(
ctx,
id,
"",
proxyResponseSignalName,
proxySignalResponse{Error: err.Error()},
).Get(ctx, nil)
}

func SendResponse(ctx workflow.Context, id string, key string, value string) error {
logger := workflow.GetLogger(ctx)

logger.Info("Sending response", id)

return workflow.SignalExternalWorkflow(
ctx,
id,
"",
proxyResponseSignalName,
proxySignalResponse{Key: key, Value: value},
).Get(ctx, nil)
}

func SendRequest(ctx workflow.Context, targetWorkflowID string, key string, value string) error {
logger := workflow.GetLogger(ctx)

workflowID := workflow.GetInfo(ctx).WorkflowExecution.ID

logger.Info("Sending request", targetWorkflowID, workflowID)

return workflow.SignalExternalWorkflow(
ctx,
targetWorkflowID,
"",
proxyRequestSignalName,
proxySignalRequest{
CallingWorkflowId: workflowID,
Key: key,
Value: value,
},
).Get(ctx, nil)
}

func ReceiveResponse(ctx workflow.Context) (string, string, error) {
logger := workflow.GetLogger(ctx)

var res proxySignalResponse

ch := workflow.GetSignalChannel(ctx, proxyResponseSignalName)

logger.Info("Waiting for response")

ch.Receive(ctx, &res)

logger.Info("Received response")

if res.Error != "" {
return "", "", fmt.Errorf("%s", res.Error)
}

return res.Key, res.Value, nil
}

func ReceiveRequest(ctx workflow.Context) (string, string, string) {
logger := workflow.GetLogger(ctx)

var req proxySignalRequest

ch := workflow.GetSignalChannel(ctx, proxyRequestSignalName)

logger.Info("Waiting for request")

ch.Receive(ctx, &req)

logger.Info("Received request")

return req.CallingWorkflowId, req.Key, req.Value
}
108 changes: 108 additions & 0 deletions synchronous-proxy/ui/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package main

import (
"bufio"
"context"
"fmt"
"log"
"os"
"strings"

synchronousproxy "github.com/temporalio/samples-go/synchronous-proxy"
"go.temporal.io/sdk/client"
)

func main() {
c, err := client.NewClient(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

fmt.Println("T-Shirt Order")

status, err := CreateOrder(c)
if err != nil {
log.Fatalln("Unable to create order", err)
}

for {
email := PromptAndReadInput("Please enter you email address:")
status, err = UpdateOrder(c, status.OrderID, synchronousproxy.RegisterStage, email)
if err != nil {
log.Println("invalid email", err)
continue
}

break
}

for {
size := PromptAndReadInput("Please enter your requested size:")
status, err = UpdateOrder(c, status.OrderID, synchronousproxy.SizeStage, size)
if err != nil {
log.Println("invalid size", err)
continue
}

break
}

for {
color := PromptAndReadInput("Please enter your required tshirt color:")
status, err = UpdateOrder(c, status.OrderID, synchronousproxy.ColorStage, color)
if err != nil {
log.Println("invalid color", err)
continue
}

break
}

fmt.Println("Thanks for your order!")
fmt.Println("You will receive an email with shipping details shortly")
}

func PromptAndReadInput(prompt string) string {
fmt.Println(prompt)

reader := bufio.NewReader(os.Stdin)
text, _ := reader.ReadString('\n')
return strings.TrimSpace(text)
}

func CreateOrder(c client.Client) (synchronousproxy.OrderStatus, error) {
workflowOptions := client.StartWorkflowOptions{
TaskQueue: "ui-driven",
}
ctx := context.Background()
var status synchronousproxy.OrderStatus

we, err := c.ExecuteWorkflow(ctx, workflowOptions, synchronousproxy.OrderWorkflow)
if err != nil {
return status, fmt.Errorf("unable to execute order workflow: %w", err)
}

status.OrderID = we.GetID()

return status, nil
}

func UpdateOrder(c client.Client, orderID string, stage string, value string) (synchronousproxy.OrderStatus, error) {
workflowOptions := client.StartWorkflowOptions{
TaskQueue: "ui-driven",
}
ctx := context.Background()
status := synchronousproxy.OrderStatus{OrderID: orderID}

we, err := c.ExecuteWorkflow(ctx, workflowOptions, synchronousproxy.UpdateOrderWorkflow, orderID, stage, value)
if err != nil {
return status, fmt.Errorf("unable to execute workflow: %w", err)
}
err = we.Get(ctx, &status)
if err != nil {
return status, err
}

return status, nil
}
35 changes: 35 additions & 0 deletions synchronous-proxy/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package main

import (
"log"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"

synchronousproxy "github.com/temporalio/samples-go/synchronous-proxy"
)

func main() {
// The client and worker are heavyweight objects that should be created once per process.
c, err := client.NewClient(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

w := worker.New(c, "ui-driven", worker.Options{})

w.RegisterWorkflow(synchronousproxy.OrderWorkflow)
w.RegisterWorkflow(synchronousproxy.UpdateOrderWorkflow)
w.RegisterWorkflow(synchronousproxy.ShippingWorkflow)
w.RegisterActivity(synchronousproxy.RegisterEmail)
w.RegisterActivity(synchronousproxy.ValidateSize)
w.RegisterActivity(synchronousproxy.ValidateColor)
w.RegisterActivity(synchronousproxy.ScheduleDelivery)
w.RegisterActivity(synchronousproxy.SendDeliveryEmail)

err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}
Loading

0 comments on commit 58385d6

Please sign in to comment.