Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add connectionBufferSizeMB field in order to specify support mb values #70

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ jobs:

- name: Test
run: go test -v ./...

- name: Upload coverage
run: bash <(curl -s https://codecov.io/bash)
89 changes: 45 additions & 44 deletions README.md

Large diffs are not rendered by default.

107 changes: 61 additions & 46 deletions config/dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,32 @@ package config

import (
"errors"
"fmt"
"os"
"strconv"
"time"

"github.com/Trendyol/go-dcp/helpers"

"github.com/Trendyol/go-dcp/logger"
)

const (
DefaultScopeName = "_default"
DefaultCollectionName = "_default"
FileMetadataFileNameConfig = "fileName"
MetadataTypeCouchbase = "couchbase"
MetadataTypeFile = "file"
MembershipTypeCouchbase = "couchbase"
CouchbaseMetadataBucketConfig = "bucket"
CouchbaseMetadataScopeConfig = "scope"
CouchbaseMetadataCollectionConfig = "collection"
CouchbaseMetadataConnectionBufferSizeConfig = "connectionBufferSize"
CouchbaseMetadataConnectionTimeoutConfig = "connectionTimeout"
CheckpointTypeAuto = "auto"
DefaultScopeName = "_default"
DefaultCollectionName = "_default"
FileMetadataFileNameConfig = "fileName"
MetadataTypeCouchbase = "couchbase"
MetadataTypeFile = "file"
MembershipTypeCouchbase = "couchbase"
CouchbaseMetadataBucketConfig = "bucket"
CouchbaseMetadataScopeConfig = "scope"
CouchbaseMetadataCollectionConfig = "collection"
CouchbaseMetadataConnectionBufferSizeConfig = "connectionBufferSize"
CouchbaseMetadataConnectionBufferSizeMBConfig = "connectionBufferSizeMB"
CouchbaseMetadataConnectionTimeoutConfig = "connectionTimeout"
CheckpointTypeAuto = "auto"

DefaultConnectionBufferSize = "20MB"
)

type DCPGroupMembership struct {
Expand All @@ -41,11 +47,12 @@ type DCPListener struct {
}

type ExternalDcp struct {
Group DCPGroup `yaml:"group"`
BufferSize int `yaml:"bufferSize"`
ConnectionBufferSize uint `yaml:"connectionBufferSize"`
ConnectionTimeout time.Duration `yaml:"connectionTimeout"`
Listener DCPListener `yaml:"listener"`
ConnectionBufferSizeMB string `yaml:"connectionBufferSizeMB"`
Group DCPGroup `yaml:"group"`
BufferSize int `yaml:"bufferSize"`
ConnectionBufferSize uint `yaml:"connectionBufferSize"`
ConnectionTimeout time.Duration `yaml:"connectionTimeout"`
Listener DCPListener `yaml:"listener"`
}

type API struct {
Expand Down Expand Up @@ -99,26 +106,33 @@ type Logging struct {
}

type Dcp struct {
Logging Logging `yaml:"logging"`
BucketName string `yaml:"bucketName"`
ScopeName string `yaml:"scopeName"`
Password string `yaml:"password"`
RootCAPath string `yaml:"rootCAPath"`
Username string `yaml:"username"`
Metadata Metadata `yaml:"metadata"`
Hosts []string `yaml:"hosts"`
CollectionNames []string `yaml:"collectionNames"`
Metric Metric `yaml:"metric"`
Checkpoint Checkpoint `yaml:"checkpoint"`
LeaderElection LeaderElection `yaml:"leaderElector"`
Dcp ExternalDcp `yaml:"dcp"`
HealthCheck HealthCheck `yaml:"healthCheck"`
RollbackMitigation RollbackMitigation `yaml:"rollbackMitigation"`
API API `yaml:"api"`
ConnectionTimeout time.Duration `yaml:"connectionTimeout"`
ConnectionBufferSize uint `yaml:"connectionBufferSize"`
SecureConnection bool `yaml:"secureConnection"`
Debug bool `yaml:"debug"`
Logging Logging `yaml:"logging"`
BucketName string `yaml:"bucketName"`
ScopeName string `yaml:"scopeName"`
Password string `yaml:"password"`
RootCAPath string `yaml:"rootCAPath"`
Username string `yaml:"username"`
Metadata Metadata `yaml:"metadata"`
Hosts []string `yaml:"hosts"`
CollectionNames []string `yaml:"collectionNames"`
Metric Metric `yaml:"metric"`
Checkpoint Checkpoint `yaml:"checkpoint"`
LeaderElection LeaderElection `yaml:"leaderElector"`
Dcp ExternalDcp `yaml:"dcp"`
HealthCheck HealthCheck `yaml:"healthCheck"`
RollbackMitigation RollbackMitigation `yaml:"rollbackMitigation"`
API API `yaml:"api"`
ConnectionTimeout time.Duration `yaml:"connectionTimeout"`
SecureConnection bool `yaml:"secureConnection"`
Debug bool `yaml:"debug"`
}

func (c *Dcp) GetConnectionBufferSize() uint {
if c.Dcp.ConnectionBufferSize != 0 {
return c.Dcp.ConnectionBufferSize
}

return helpers.MBToBytes(c.Dcp.ConnectionBufferSizeMB)
}

func (c *Dcp) IsCollectionModeEnabled() bool {
Expand Down Expand Up @@ -189,14 +203,17 @@ func (c *Dcp) getMetadataConnectionBufferSize() uint {
if connectionBufferSize, ok := c.Metadata.Config[CouchbaseMetadataConnectionBufferSizeConfig]; ok {
parsedConnectionBufferSize, err := strconv.ParseUint(connectionBufferSize, 10, 32)
if err != nil {
logger.Log.Error("failed to parse metadata connection buffer size: %v", err)
panic(err)
panic(fmt.Errorf("failed to parse metadata connection buffer size: %v", err))
}

return uint(parsedConnectionBufferSize)
}

return 5242880 // 5 MB
if connectionBufferSize, ok := c.Metadata.Config[CouchbaseMetadataConnectionBufferSizeMBConfig]; ok {
return helpers.MBToBytes(connectionBufferSize)
}

return helpers.MBToBytes("5MB")
}

func (c *Dcp) getMetadataConnectionTimeout() time.Duration {
Expand Down Expand Up @@ -325,8 +342,8 @@ func (c *Dcp) applyDefaultScopeName() {
}

func (c *Dcp) applyDefaultConnectionBufferSize() {
if c.ConnectionBufferSize == 0 {
c.ConnectionBufferSize = 20971520
if c.Dcp.ConnectionBufferSizeMB == "" {
c.Dcp.ConnectionBufferSizeMB = DefaultConnectionBufferSize
}
}

Expand Down Expand Up @@ -358,12 +375,10 @@ func (c *Dcp) applyDefaultLeaderElection() {

func (c *Dcp) applyDefaultDcp() {
if c.Dcp.BufferSize == 0 {
c.Dcp.BufferSize = 16777216
c.Dcp.BufferSize = 16_777_216
}

if c.Dcp.ConnectionBufferSize == 0 {
c.Dcp.ConnectionBufferSize = 20971520
}
c.applyDefaultConnectionBufferSize()

if c.Dcp.Listener.BufferSize == 0 {
c.Dcp.Listener.BufferSize = 1000
Expand Down
111 changes: 107 additions & 4 deletions config/dcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package config
import (
"testing"
"time"

"github.com/Trendyol/go-dcp/helpers"
)

func TestDefaultConfig(t *testing.T) {
Expand Down Expand Up @@ -199,8 +201,8 @@ func TestDcpApplyDefaultConnectionBufferSize(t *testing.T) {
c := &Dcp{}
c.applyDefaultConnectionBufferSize()

if c.ConnectionBufferSize != 20971520 {
t.Errorf("ConnectionBufferSize is not set to expected value")
if c.Dcp.ConnectionBufferSizeMB != "20MB" {
t.Errorf("ConnectionBufferSizeMB is not set to expected value")
}
}

Expand Down Expand Up @@ -255,8 +257,8 @@ func TestDcpApplyDefaultDcp(t *testing.T) {
t.Errorf("Dcp.BufferSize is not set to expected value")
}

if c.Dcp.ConnectionBufferSize != 20971520 {
t.Errorf("Dcp.ConnectionBufferSize is not set to expected value")
if c.Dcp.ConnectionBufferSizeMB != "20MB" {
t.Errorf("Dcp.ConnectionBufferSizeMB is not set to expected value")
}

if c.Dcp.Listener.BufferSize != 1000 {
Expand All @@ -279,3 +281,104 @@ func TestApplyDefaultMetadata(t *testing.T) {
t.Errorf("Metadata.Type is not set to expected value")
}
}

func TestDcp_getMetadataConnectionBufferSize(t *testing.T) {
t.Run("When_User_Cannot_Specify_ConnectionBufferSize", func(t *testing.T) {
// Given
c := &Dcp{}
expected := helpers.MBToBytes("5MB")

// When
actual := c.getMetadataConnectionBufferSize()

// Then
if expected != actual {
t.Errorf("Default metadata connection buffer size must be 5mb")
}
})
t.Run("When_User_Specify_ConnectionBufferSize", func(t *testing.T) {
// Given
c := &Dcp{
Metadata: Metadata{
Config: map[string]string{
CouchbaseMetadataConnectionBufferSizeConfig: "1048576",
},
},
}
expected := helpers.MBToBytes("1MB")

// When
actual := c.getMetadataConnectionBufferSize()

// Then
if expected != actual {
t.Errorf("Metadata connection buffer size must be 1mb")
}
})
t.Run("When_User_Specify_ConnectionBufferSizeMB", func(t *testing.T) {
// Given
c := &Dcp{
Metadata: Metadata{
Config: map[string]string{
CouchbaseMetadataConnectionBufferSizeMBConfig: "10MB",
},
},
}
expected := helpers.MBToBytes("10MB")

// When
actual := c.getMetadataConnectionBufferSize()

// Then
if expected != actual {
t.Errorf("Metadata connection buffer size must be 10MB")
}
})
}

func TestDcp_GetConnectionBufferSize(t *testing.T) {
t.Run("When_User_Cannot_Specify_ConnectionBufferSize", func(t *testing.T) {
// Given
c := &Dcp{}
c.applyDefaultConnectionBufferSize()
expected := helpers.MBToBytes("20MB")

// When
actual := c.GetConnectionBufferSize()

// Then
if expected != actual {
t.Errorf("Default connection buffer size must be 20MB")
}
})
t.Run("When_User_Specify_ConnectionBufferSize", func(t *testing.T) {
// Given
c := &Dcp{
Dcp: ExternalDcp{ConnectionBufferSize: 1048576},
}
expected := helpers.MBToBytes("1MB")

// When
actual := c.GetConnectionBufferSize()

// Then
if expected != actual {
t.Errorf("Connection buffer size must be 1MB")
}
})
t.Run("When_User_Specify_ConnectionBufferSizeMB", func(t *testing.T) {
// Given
c := &Dcp{
Dcp: ExternalDcp{ConnectionBufferSizeMB: "15MB"},
}
expected := helpers.MBToBytes("15MB")

// When
actual := c.GetConnectionBufferSize()

// Then
if expected != actual {
t.Errorf("Connection buffer size must be 15MB")
}
})
}
2 changes: 1 addition & 1 deletion couchbase/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func resolveHostsAsHTTP(hosts []string) []string {
}

func (s *client) Connect() error {
connectionBufferSize := s.config.ConnectionBufferSize
connectionBufferSize := s.config.GetConnectionBufferSize()
connectionTimeout := s.config.ConnectionTimeout

if s.config.IsCouchbaseMetadata() {
Expand Down
15 changes: 15 additions & 0 deletions helpers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package helpers
import (
"bytes"
"reflect"
"strconv"
"strings"
"time"
)

Expand Down Expand Up @@ -52,3 +54,16 @@ func Retry(f func() error, attempts int, sleep time.Duration) (err error) {

return err
}

const OneMbInBytes = 1_048_576

func MBToBytes(str string) uint {
str = strings.ToUpper(str)

s := strings.Split(str, "MB")

s[0] = strings.ReplaceAll(s[0], ",", ".")

mb, _ := strconv.ParseFloat(s[0], 64)
return uint(mb * OneMbInBytes)
}
Loading