Skip to content

Commit

Permalink
feat: support 5.x.x
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Dec 2, 2023
1 parent b5daccc commit a43ebc8
Show file tree
Hide file tree
Showing 15 changed files with 277 additions and 135 deletions.
23 changes: 0 additions & 23 deletions .github/workflows/benchmark.yml

This file was deleted.

2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ jobs:
run: go build -v ./...

- name: Test
run: go test -v ./...
run: go test -parallel 1 -v ./...
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ $ go get github.com/Trendyol/go-dcp
| `dcp.group.membership.totalMembers` | int | no | 1 | Set this if membership is `static` or `kubernetesStatefulSet`. Other methods will ignore this field. |
| `dcp.group.membership.rebalanceDelay` | time.Duration | no | 20s | Works for autonomous mode. |
| `dcp.group.membership.config` | map[string]string | no | *not set | Set key-values of config. `expirySeconds`,`heartbeatInterval`,`heartbeatToleranceDuration`,`monitorInterval`,`timeout` for `couchbase` type |
| `dcp.config.disableExpiryOpcode` | bool | no | false | Set this to true if Couchbase Server lower then 6.5.0 |
| `dcp.config.disableStreamEndByClient` | bool | no | false | Set this to true if Couchbase Server lower then 5.5.0 |
| `leaderElection.enabled` | bool | no | false | Set this true for memberships `kubernetesHa`. |
| `leaderElection.type` | string | no | kubernetes | Leader Election types. `kubernetes` |
| `leaderElection.config` | map[string]string | no | *not set | Set key-values of config. `leaseLockName`,`leaseLockNamespace`, `leaseDuration`, `renewDeadline`, `retryPeriod` for `kubernetes` type. |
Expand Down Expand Up @@ -164,6 +166,13 @@ In case you haven't configured a metric.path, the metrics will be exposed at the
| cbgo_offset_write_current | The latest number of the offset write | N/A | Gauge |
| cbgo_offset_write_latency_ms_current | The latest offset write latency in milliseconds | N/A | Gauge |

### Compatibility

| Go DCP Version | Minimum Couchbase Server Version |
|----------------|----------------------------------|
| x<1.1.16 | 6.5.x |
| 1.1.16>=x | 5.x.x |

### Examples

- [example with couchbase membership](example/main.go)
Expand Down
16 changes: 11 additions & 5 deletions config/dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,18 @@ type DCPListener struct {
BufferSize uint `yaml:"bufferSize"`
}

type ExternalDcpConfig struct {
DisableExpiryOpcode bool `yaml:"disableExpiryOpcode"`
DisableStreamEndByClient bool `yaml:"disableStreamEndByClient"`
}

type ExternalDcp struct {
BufferSize any `yaml:"bufferSize"`
ConnectionBufferSize any `yaml:"connectionBufferSize"`
Group DCPGroup `yaml:"group"`
ConnectionTimeout time.Duration `yaml:"connectionTimeout"`
Listener DCPListener `yaml:"listener"`
BufferSize any `yaml:"bufferSize"`
ConnectionBufferSize any `yaml:"connectionBufferSize"`
Group DCPGroup `yaml:"group"`
ConnectionTimeout time.Duration `yaml:"connectionTimeout"`
Listener DCPListener `yaml:"listener"`
Config ExternalDcpConfig `yaml:"config"`
}

type API struct {
Expand Down
2 changes: 1 addition & 1 deletion couchbase/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (s *client) DcpConnect() error {
},
DCPConfig: gocbcore.DCPConfig{
BufferSize: helpers.ResolveUnionIntOrStringValue(s.config.Dcp.BufferSize),
UseExpiryOpcode: true,
UseExpiryOpcode: !s.config.Dcp.Config.DisableExpiryOpcode,
},
IoConfig: gocbcore.IoConfig{
UseCollections: true,
Expand Down
152 changes: 116 additions & 36 deletions dcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,50 +3,111 @@ package dcp
import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/Trendyol/go-dcp/models"

"github.com/Trendyol/go-dcp/logger"

"github.com/Trendyol/go-dcp/config"

"github.com/Trendyol/go-dcp/couchbase"
"github.com/Trendyol/go-dcp/models"

"github.com/couchbase/gocbcore/v10"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
)

var c = &config.Dcp{
Hosts: []string{"localhost:8091"},
Username: "user",
Password: "password",
BucketName: "dcp-test",
Dcp: config.ExternalDcp{
Group: config.DCPGroup{
Name: "groupName",
Membership: config.DCPGroupMembership{
RebalanceDelay: 3 * time.Second,
func panicVersion(version string) {
panic(fmt.Sprintf("invalid version: %v", version))
}

func parseVersion(version string) (int, int, int) {
parse := strings.Split(version, ".")
if len(parse) < 3 {
panicVersion(version)
}

major, err := strconv.Atoi(parse[0])
if err != nil {
panicVersion(version)
}

minor, err := strconv.Atoi(parse[1])
if err != nil {
panicVersion(version)
}

patch, err := strconv.Atoi(parse[2])
if err != nil {
panicVersion(version)
}

return major, minor, patch
}

// true when version lower than 6.5.0
func disableExpiryOpcode(c *config.Dcp, version string) {
major, minor, _ := parseVersion(version)

if major < 6 || (major == 6 && minor < 5) {
c.Dcp.Config.DisableExpiryOpcode = true
}
}

// true when version lower than 5.5.0
func disableStreamEndByClient(c *config.Dcp, version string) {
major, minor, _ := parseVersion(version)

if major < 5 || (major == 5 && minor < 5) {
c.Dcp.Config.DisableStreamEndByClient = true
}
}

func isVersion5xx(version string) bool {
major, _, _ := parseVersion(version)
return major == 5
}

func getConfig() *config.Dcp {
return &config.Dcp{
Hosts: []string{"localhost:8091"},
Username: "user",
Password: "password",
BucketName: "dcp-test",
Dcp: config.ExternalDcp{
Group: config.DCPGroup{
Name: "groupName",
Membership: config.DCPGroupMembership{
RebalanceDelay: 3 * time.Second,
},
},
},
},
Debug: true,
Debug: true,
}
}

func setupContainer(ctx context.Context) (testcontainers.Container, error) {
func setupContainer(c *config.Dcp, ctx context.Context, version string) (testcontainers.Container, error) {
var entrypoint string
if isVersion5xx(version) {
entrypoint = "scripts/entrypoint_5.sh"
} else {
entrypoint = "scripts/entrypoint.sh"
}

req := testcontainers.ContainerRequest{
Image: "couchbase:6.5.1",
Image: fmt.Sprintf("couchbase:%v", version),
ExposedPorts: []string{"8091:8091/tcp", "8093:8093/tcp", "11210:11210/tcp"},
WaitingFor: wait.ForLog("/entrypoint.sh couchbase-server").WithStartupTimeout(20 * time.Second),
WaitingFor: wait.ForLog("/entrypoint.sh couchbase-server").WithStartupTimeout(30 * time.Second),
Env: map[string]string{
"USERNAME": c.Username,
"PASSWORD": c.Password,
"BUCKET_NAME": c.BucketName,
"BUCKET_TYPE": "couchbase",
"BUCKET_RAMSIZE": "1024",
"SERVICES": "data,index,query,fts,analytics,eventing",
"CLUSTER_RAMSIZE": "1024",
"CLUSTER_INDEX_RAMSIZE": "512",
"CLUSTER_EVENTING_RAMSIZE": "256",
Expand All @@ -66,7 +127,7 @@ func setupContainer(ctx context.Context) (testcontainers.Container, error) {
},
Files: []testcontainers.ContainerFile{
{
HostFilePath: "scripts/entrypoint.sh",
HostFilePath: entrypoint,
ContainerFilePath: "/config-entrypoint.sh",
FileMode: 600,
},
Expand All @@ -79,14 +140,14 @@ func setupContainer(ctx context.Context) (testcontainers.Container, error) {
})
}

func insertDataToContainer(b *testing.B, iteration int, chunkSize int, bulkSize int) {
func insertDataToContainer(c *config.Dcp, t *testing.T, iteration int, chunkSize int, bulkSize int) {
logger.Log.Info("mock data stream started with iteration=%v", iteration)

client := couchbase.NewClient(c)

err := client.Connect()
if err != nil {
b.Fatal(err)
t.Fatal(err)
}

var iter int
Expand Down Expand Up @@ -114,13 +175,13 @@ func insertDataToContainer(b *testing.B, iteration int, chunkSize int, bulkSize
err = opm.Wait(op, err)

if err != nil {
b.Error(err)
t.Error(err)
}

err = <-ch

if err != nil {
b.Error(err)
t.Error(err)
}

wg.Done()
Expand All @@ -139,32 +200,32 @@ func insertDataToContainer(b *testing.B, iteration int, chunkSize int, bulkSize
}

//nolint:funlen
func BenchmarkDcp(b *testing.B) {
func test(t *testing.T, version string) {
chunkSize := 4
bulkSize := 1024
iteration := 24
iteration := 96
mockDataSize := iteration * bulkSize * chunkSize
totalNotify := 10
notifySize := mockDataSize / totalNotify

c := getConfig()
c.ApplyDefaults()

disableExpiryOpcode(c, version)
disableStreamEndByClient(c, version)

ctx := context.Background()

container, err := setupContainer(ctx)
container, err := setupContainer(c, ctx, version)
if err != nil {
b.Fatal(err)
t.Fatal(err)
}

counter := 0
finish := make(chan struct{}, 1)

c.ApplyDefaults()

dcp, err := NewDcp(c, func(ctx *models.ListenerContext) {
if _, ok := ctx.Event.(models.DcpMutation); ok {
if counter == 0 {
b.ResetTimer()
}

ctx.Ack()

counter++
Expand All @@ -179,12 +240,12 @@ func BenchmarkDcp(b *testing.B) {
}
})
if err != nil {
b.Fatal(err)
t.Fatal(err)
}

go func() {
<-dcp.WaitUntilReady()
insertDataToContainer(b, iteration, chunkSize, bulkSize)
insertDataToContainer(c, t, iteration, chunkSize, bulkSize)
}()

go func() {
Expand All @@ -196,6 +257,25 @@ func BenchmarkDcp(b *testing.B) {

err = container.Terminate(ctx)
if err != nil {
b.Fatal(err)
t.Fatal(err)
}
}

func TestDcp(t *testing.T) {
for _, version := range []string{
"5.0.1",
"5.1.0",
"5.5.0",
"6.0.0",
"6.5.0",
"6.6.0",
"7.0.0",
"7.1.0",
"7.2.0",
"7.2.3",
} {
t.Run(version, func(t *testing.T) {
test(t, version)
})
}
}
14 changes: 7 additions & 7 deletions example/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ require (
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/couchbase/gocbcore/v10 v10.2.9 // indirect
github.com/couchbase/gocbcore/v10 v10.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.10.1 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/gofiber/adaptor/v2 v2.2.1 // indirect
github.com/gofiber/fiber/v2 v2.50.0 // indirect
github.com/gofiber/fiber/v2 v2.51.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
Expand All @@ -33,7 +33,7 @@ require (
github.com/klauspost/compress v1.16.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mhmtszr/concurrent-swiss-map v1.0.5 // indirect
Expand All @@ -52,7 +52,7 @@ require (
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.11.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
Expand All @@ -61,9 +61,9 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.28.3 // indirect
k8s.io/apimachinery v0.28.3 // indirect
k8s.io/client-go v0.28.3 // indirect
k8s.io/api v0.28.4 // indirect
k8s.io/apimachinery v0.28.4 // indirect
k8s.io/client-go v0.28.4 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect
Expand Down
Loading

0 comments on commit a43ebc8

Please sign in to comment.