Skip to content

Commit

Permalink
Snappy compression converter example (#144)
Browse files Browse the repository at this point in the history
* Snappy compression converter example

* Typos in snappycompress example

Co-authored-by: Tihomir Surdilovic <[email protected]>
  • Loading branch information
cretz and tsurdilo authored Oct 27, 2021
1 parent d511cf5 commit 03b64be
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.16

require (
github.com/golang/mock v1.6.0
github.com/golang/snappy v0.0.4
github.com/hashicorp/go-plugin v1.4.0
github.com/opentracing/opentracing-go v1.2.0
github.com/pborman/uuid v1.2.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
Expand Down
20 changes: 20 additions & 0 deletions snappycompress/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
### Steps to run this sample:
1) You need a Temporal service running. See details in README.md
2) Compile the snappycompress plugin for tctl
```
go build -o ../bin/snappycompress-plugin plugin/main.go
```
3) Run the following command to start the worker
```
go run worker/main.go
```
4) Run the following command to start the example
```
go run starter/main.go
```
5) Run the following command and see the compressed payloads
```
export PATH="../bin:$PATH" TEMPORAL_CLI_PLUGIN_DATA_CONVERTER=snappycompress-plugin
tctl workflow show --wid snappycompress_workflowID
```
Note: plugins should normally be available in your PATH, we include the current directory in the path here for ease of testing.
62 changes: 62 additions & 0 deletions snappycompress/data_converter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package snappycompress

import (
"github.com/golang/snappy"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/sdk/converter"
)

// AlwaysCompressDataConverter is a converter that will always perform
// compression even if the compressed size is larger than the original.
var AlwaysCompressDataConverter = NewDataConverter(converter.GetDefaultDataConverter(), Options{AlwaysEncode: true})

// Options are options for Snappy compression.
type Options struct {
// If true, will always "compress" even if the compression results in a larger
// sized payload.
AlwaysEncode bool
}

// NewDataConverter creates a new data converter that wraps the given data
// converter with snappy compression.
func NewDataConverter(underlying converter.DataConverter, options Options) converter.DataConverter {
return converter.NewEncodingDataConverter(underlying, &Encoder{Options: options})
}

// Encoder implements converter.PayloadEncoder for snappy compression.
type Encoder struct {
Options Options
}

// Encode implements converter.PayloadEncoder.Encode.
func (e *Encoder) Encode(p *commonpb.Payload) error {
// Marshal proto
origBytes, err := p.Marshal()
if err != nil {
return err
}
// Compress
b := snappy.Encode(nil, origBytes)
// Only apply if the compression is smaller or always encode is set
if len(b) < len(origBytes) || e.Options.AlwaysEncode {
p.Metadata = map[string][]byte{converter.MetadataEncoding: []byte("binary/snappy")}
p.Data = b
}
return nil
}

// Decode implements converter.PayloadEncoder.Decode.
func (*Encoder) Decode(p *commonpb.Payload) error {
// Only if it's our encoding
if string(p.Metadata[converter.MetadataEncoding]) != "binary/snappy" {
return nil
}
// Uncompress
b, err := snappy.Decode(nil, p.Data)
if err != nil {
return err
}
// Unmarshal proto
p.Reset()
return p.Unmarshal(b)
}
45 changes: 45 additions & 0 deletions snappycompress/data_converter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package snappycompress

import (
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/testsuite"
)

func Test_Workflow(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()

// Mock activity implementation
env.OnActivity(Activity, mock.Anything, mock.Anything).Return("Hello Temporal!", nil)

env.ExecuteWorkflow(Workflow, "Temporal")

require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
var result string
require.NoError(t, env.GetWorkflowResult(&result))
require.Equal(t, "Hello Temporal!", result)
}

func Test_DataConverter(t *testing.T) {
defConv := converter.GetDefaultDataConverter()
snappyConv := AlwaysCompressDataConverter

defaultPayloads, err := defConv.ToPayloads("Testing")
require.NoError(t, err)

compressedPayloads, err := snappyConv.ToPayloads("Testing")
require.NoError(t, err)

require.NotEqual(t, defaultPayloads.Payloads[0].GetData(), compressedPayloads.Payloads[0].GetData())

var result string
err = snappyConv.FromPayloads(compressedPayloads, &result)
require.NoError(t, err)

require.Equal(t, "Testing", result)
}
20 changes: 20 additions & 0 deletions snappycompress/plugin/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package main

import (
"github.com/hashicorp/go-plugin"
"github.com/temporalio/samples-go/snappycompress"
cliplugin "go.temporal.io/server/tools/cli/plugin"
)

func main() {
var pluginMap = map[string]plugin.Plugin{
cliplugin.DataConverterPluginType: &cliplugin.DataConverterPlugin{
Impl: snappycompress.AlwaysCompressDataConverter,
},
}

plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: cliplugin.PluginHandshakeConfig,
Plugins: pluginMap,
})
}
49 changes: 49 additions & 0 deletions snappycompress/snappycompress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package snappycompress

import (
"context"
"time"

"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)

// Workflow is a standard workflow definition.
// Note that the Workflow and Activity don't need to care that
// their inputs/results are being compressed.
func Workflow(ctx workflow.Context, name string) (string, error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)

logger := workflow.GetLogger(ctx)
logger.Info("Compressed Payloads workflow started", "name", name)

info := map[string]string{
"name": name,
}

var result string
err := workflow.ExecuteActivity(ctx, Activity, info).Get(ctx, &result)
if err != nil {
logger.Error("Activity failed.", "Error", err)
return "", err
}

logger.Info("Compressed Payloads workflow completed.", "result", result)

return result, nil
}

func Activity(ctx context.Context, info map[string]string) (string, error) {
logger := activity.GetLogger(ctx)
logger.Info("Activity", "info", info)

name, ok := info["name"]
if !ok {
name = "someone"
}

return "Hello " + name + "!", nil
}
48 changes: 48 additions & 0 deletions snappycompress/starter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package main

import (
"context"
"log"

"github.com/temporalio/samples-go/snappycompress"
"go.temporal.io/sdk/client"
)

func main() {
// The client is a heavyweight object that should be created once per process.
c, err := client.NewClient(client.Options{
// Set DataConverter here to ensure that workflow inputs and results are
// compressed as required.
DataConverter: snappycompress.AlwaysCompressDataConverter,
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

workflowOptions := client.StartWorkflowOptions{
ID: "snappycompress_workflowID",
TaskQueue: "snappycompress",
}

// The workflow input "My Compressed Friend" will be compressed by the DataConverter before being sent to Temporal
we, err := c.ExecuteWorkflow(
context.Background(),
workflowOptions,
snappycompress.Workflow,
"My Compressed Friend",
)
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}

log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())

// Synchronously wait for the workflow completion.
var result string
err = we.Get(context.Background(), &result)
if err != nil {
log.Fatalln("Unable get workflow result", err)
}
log.Println("Workflow result:", result)
}
32 changes: 32 additions & 0 deletions snappycompress/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package main

import (
"log"

"github.com/temporalio/samples-go/snappycompress"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

func main() {
// The client and worker are heavyweight objects that should be created once per process.
c, err := client.NewClient(client.Options{
// Set DataConverter here so that workflow and activity inputs/results will
// be compressed as required.
DataConverter: snappycompress.AlwaysCompressDataConverter,
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

w := worker.New(c, "snappycompress", worker.Options{})

w.RegisterWorkflow(snappycompress.Workflow)
w.RegisterActivity(snappycompress.Activity)

err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}

0 comments on commit 03b64be

Please sign in to comment.