diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index c58c252..99e84a3 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -37,3 +37,6 @@ jobs: - name: Test run: go test -v ./... + + - name: Upload coverage + run: bash <(curl -s https://codecov.io/bash) \ No newline at end of file diff --git a/README.md b/README.md index daa7209..861958a 100644 --- a/README.md +++ b/README.md @@ -71,50 +71,51 @@ $ go get github.com/Trendyol/go-dcp ### Configuration -| Variable | Type | Required | Default | Description | -|------------------------------------------|:-----------------:|:--------:|:----------:|-------------------------------------------------------------------------------------------------------------------------| -| `hosts` | []string | yes | - | Couchbase host like `localhost:8091`. | -| `username` | string | yes | - | Couchbase username. | -| `password` | string | yes | - | Couchbase password. | -| `bucketName` | string | yes | - | Couchbase DCP bucket. | -| `dcp.group.name` | string | yes | | DCP group name for vbuckets. | -| `scopeName` | string | no | _default | Couchbase scope name. | -| `collectionNames` | []string | no | _default | Couchbase collection names. | -| `connectionBufferSize` | uint | no | 20971520 | [gocbcore](github.com/couchbase/gocbcore) library buffer size. `20mb` is default. Check this if you get OOM Killed. | -| `connectionTimeout` | time.Duration | no | 5s | Couchbase connection timeout. | -| `secureConnection` | bool | no | false | Enable TLS connection of Couchbase. | -| `rootCAPath` | string | no | *not set | if `secureConnection` set `true` this field is required. | -| `debug` | bool | no | false | For debugging purpose. | -| `dcp.bufferSize` | int | no | 16777216 | Go DCP listener pre-allocated buffer size. `16mb` is default. Check this if you get OOM Killed. | -| `dcp.connectionBufferSize` | uint | no | 20971520 | [gocbcore](github.com/couchbase/gocbcore) library buffer size. `20mb` is default. Check this if you get OOM Killed. | -| `dcp.connectionTimeout` | time.Duration | no | 5s | DCP connection timeout. | -| `dcp.listener.bufferSize` | uint | no | 1000 | Go DCP listener buffered channel size. | -| `dcp.group.membership.type` | string | no | | DCP membership types. `couchbase`, `kubernetesHa`, `kubernetesStatefulSet` or `static`. Check examples for details. | -| `dcp.group.membership.memberNumber` | int | no | 1 | Set this if membership is `static`. Other methods will ignore this field. | -| `dcp.group.membership.totalMembers` | int | no | 1 | Set this if membership is `static` or `kubernetesStatefulSet`. Other methods will ignore this field. | -| `dcp.group.membership.rebalanceDelay` | time.Duration | no | 20s | Works for autonomous mode. | -| `leaderElection.enabled` | bool | no | false | Set this true for memberships `kubernetesHa`. | -| `leaderElection.type` | string | no | kubernetes | Leader Election types. `kubernetes` | -| `leaderElection.config` | map[string]string | no | *not set | Set lease key-values like `leaseLockName`,`leaseLockNamespace`. | -| `leaderElection.rpc.port` | int | no | 8081 | This field is usable for `kubernetesStatefulSet` membership. | -| `checkpoint.type` | string | no | auto | Set checkpoint type `auto` or `manual`. | -| `checkpoint.autoReset` | string | no | earliest | Set checkpoint start point to `earliest` or `latest`. | -| `checkpoint.interval` | time.Duration | no | 20s | Checkpoint checking interval. | -| `checkpoint.timeout` | time.Duration | no | 60s | Checkpoint checking timeout. | -| `healthCheck.disabled` | bool | no | false | Disable Couchbase connection health check. | -| `healthCheck.interval` | time.Duration | no | 20s | Couchbase connection health checking interval duration. | -| `healthCheck.timeout` | time.Duration | no | 5s | Couchbase connection health checking timeout duration. | -| `rollbackMitigation.disabled` | bool | no | false | Disable reprocessing for roll-backed Vbucket offsets. | -| `rollbackMitigation.interval` | time.Duration | no | 500ms | Persisted sequence numbers polling interval. | -| `rollbackMitigation.configWatchInterval` | time.Duration | no | 2s | Cluster config changes listener interval. | -| `metadata.type` | string | no | couchbase | Metadata storing types. `file` or `couchbase`. | -| `metadata.readOnly` | bool | no | false | Set this for debugging state purposes. | -| `metadata.config` | map[string]string | no | *not set | Set key-values of config. `bucket`,`scope`,`collection`,`connectionBufferSize`,`connectionTimeout` for `couchbase` type | -| `api.disabled` | bool | no | false | Disable metric endpoints | -| `api.port` | int | no | 8080 | Set API port | -| `metric.path` | string | no | /metrics | Set metric endpoint path. | -| `metric.averageWindowSec` | float64 | no | 10.0 | Set metric window range. | -| `logging.level` | string | no | info | Set logging level. | +| Variable | Type | Required | Default | Description | +|------------------------------------------|:-----------------:|:--------:|:----------:|---------------------------------------------------------------------------------------------------------------------------------------------------| +| `hosts` | []string | yes | - | Couchbase host like `localhost:8091`. | +| `username` | string | yes | - | Couchbase username. | +| `password` | string | yes | - | Couchbase password. | +| `bucketName` | string | yes | - | Couchbase DCP bucket. | +| `dcp.group.name` | string | yes | | DCP group name for vbuckets. | +| `scopeName` | string | no | _default | Couchbase scope name. | +| `collectionNames` | []string | no | _default | Couchbase collection names. | +| `connectionBufferSize` | uint | no | 20971520 | [gocbcore](github.com/couchbase/gocbcore) library buffer size. `20mb` is default. Check this if you get OOM Killed. | +| `connectionTimeout` | time.Duration | no | 5s | Couchbase connection timeout. | +| `secureConnection` | bool | no | false | Enable TLS connection of Couchbase. | +| `rootCAPath` | string | no | *not set | if `secureConnection` set `true` this field is required. | +| `debug` | bool | no | false | For debugging purpose. | +| `dcp.bufferSize` | int | no | 16777216 | Go DCP listener pre-allocated buffer size. `16mb` is default. Check this if you get OOM Killed. | +| `dcp.connectionBufferSize` | uint | no | 20971520 | it will be depracated. Please use `dcp.connectionBufferSizeMB` | +| `dcp.connectionBufferSizeMB` | string | no | "20mb" | [gocbcore](github.com/couchbase/gocbcore) library buffer size. `20mb` is default. Check this if you get OOM Killed. | +| `dcp.connectionTimeout` | time.Duration | no | 5s | DCP connection timeout. | +| `dcp.listener.bufferSize` | uint | no | 1000 | Go DCP listener buffered channel size. | +| `dcp.group.membership.type` | string | no | | DCP membership types. `couchbase`, `kubernetesHa`, `kubernetesStatefulSet` or `static`. Check examples for details. | +| `dcp.group.membership.memberNumber` | int | no | 1 | Set this if membership is `static`. Other methods will ignore this field. | +| `dcp.group.membership.totalMembers` | int | no | 1 | Set this if membership is `static` or `kubernetesStatefulSet`. Other methods will ignore this field. | +| `dcp.group.membership.rebalanceDelay` | time.Duration | no | 20s | Works for autonomous mode. | +| `leaderElection.enabled` | bool | no | false | Set this true for memberships `kubernetesHa`. | +| `leaderElection.type` | string | no | kubernetes | Leader Election types. `kubernetes` | +| `leaderElection.config` | map[string]string | no | *not set | Set lease key-values like `leaseLockName`,`leaseLockNamespace`. | +| `leaderElection.rpc.port` | int | no | 8081 | This field is usable for `kubernetesStatefulSet` membership. | +| `checkpoint.type` | string | no | auto | Set checkpoint type `auto` or `manual`. | +| `checkpoint.autoReset` | string | no | earliest | Set checkpoint start point to `earliest` or `latest`. | +| `checkpoint.interval` | time.Duration | no | 20s | Checkpoint checking interval. | +| `checkpoint.timeout` | time.Duration | no | 60s | Checkpoint checking timeout. | +| `healthCheck.disabled` | bool | no | false | Disable Couchbase connection health check. | +| `healthCheck.interval` | time.Duration | no | 20s | Couchbase connection health checking interval duration. | +| `healthCheck.timeout` | time.Duration | no | 5s | Couchbase connection health checking timeout duration. | +| `rollbackMitigation.disabled` | bool | no | false | Disable reprocessing for roll-backed Vbucket offsets. | +| `rollbackMitigation.interval` | time.Duration | no | 500ms | Persisted sequence numbers polling interval. | +| `rollbackMitigation.configWatchInterval` | time.Duration | no | 2s | Cluster config changes listener interval. | +| `metadata.type` | string | no | couchbase | Metadata storing types. `file` or `couchbase`. | +| `metadata.readOnly` | bool | no | false | Set this for debugging state purposes. | +| `metadata.config` | map[string]string | no | *not set | Set key-values of config. `bucket`,`scope`,`collection`,`connectionBufferSize`,`connectionBufferSizeMB`, `connectionTimeout` for `couchbase` type | +| `api.disabled` | bool | no | false | Disable metric endpoints | +| `api.port` | int | no | 8080 | Set API port | +| `metric.path` | string | no | /metrics | Set metric endpoint path. | +| `metric.averageWindowSec` | float64 | no | 10.0 | Set metric window range. | +| `logging.level` | string | no | info | Set logging level. | ### Environment Variables diff --git a/config/dcp.go b/config/dcp.go index 5cab019..8103ff2 100644 --- a/config/dcp.go +++ b/config/dcp.go @@ -2,26 +2,32 @@ package config import ( "errors" + "fmt" "os" "strconv" "time" + "github.com/Trendyol/go-dcp/helpers" + "github.com/Trendyol/go-dcp/logger" ) const ( - DefaultScopeName = "_default" - DefaultCollectionName = "_default" - FileMetadataFileNameConfig = "fileName" - MetadataTypeCouchbase = "couchbase" - MetadataTypeFile = "file" - MembershipTypeCouchbase = "couchbase" - CouchbaseMetadataBucketConfig = "bucket" - CouchbaseMetadataScopeConfig = "scope" - CouchbaseMetadataCollectionConfig = "collection" - CouchbaseMetadataConnectionBufferSizeConfig = "connectionBufferSize" - CouchbaseMetadataConnectionTimeoutConfig = "connectionTimeout" - CheckpointTypeAuto = "auto" + DefaultScopeName = "_default" + DefaultCollectionName = "_default" + FileMetadataFileNameConfig = "fileName" + MetadataTypeCouchbase = "couchbase" + MetadataTypeFile = "file" + MembershipTypeCouchbase = "couchbase" + CouchbaseMetadataBucketConfig = "bucket" + CouchbaseMetadataScopeConfig = "scope" + CouchbaseMetadataCollectionConfig = "collection" + CouchbaseMetadataConnectionBufferSizeConfig = "connectionBufferSize" + CouchbaseMetadataConnectionBufferSizeMBConfig = "connectionBufferSizeMB" + CouchbaseMetadataConnectionTimeoutConfig = "connectionTimeout" + CheckpointTypeAuto = "auto" + + DefaultConnectionBufferSize = "20MB" ) type DCPGroupMembership struct { @@ -41,11 +47,12 @@ type DCPListener struct { } type ExternalDcp struct { - Group DCPGroup `yaml:"group"` - BufferSize int `yaml:"bufferSize"` - ConnectionBufferSize uint `yaml:"connectionBufferSize"` - ConnectionTimeout time.Duration `yaml:"connectionTimeout"` - Listener DCPListener `yaml:"listener"` + ConnectionBufferSizeMB string `yaml:"connectionBufferSizeMB"` + Group DCPGroup `yaml:"group"` + BufferSize int `yaml:"bufferSize"` + ConnectionBufferSize uint `yaml:"connectionBufferSize"` + ConnectionTimeout time.Duration `yaml:"connectionTimeout"` + Listener DCPListener `yaml:"listener"` } type API struct { @@ -99,26 +106,33 @@ type Logging struct { } type Dcp struct { - Logging Logging `yaml:"logging"` - BucketName string `yaml:"bucketName"` - ScopeName string `yaml:"scopeName"` - Password string `yaml:"password"` - RootCAPath string `yaml:"rootCAPath"` - Username string `yaml:"username"` - Metadata Metadata `yaml:"metadata"` - Hosts []string `yaml:"hosts"` - CollectionNames []string `yaml:"collectionNames"` - Metric Metric `yaml:"metric"` - Checkpoint Checkpoint `yaml:"checkpoint"` - LeaderElection LeaderElection `yaml:"leaderElector"` - Dcp ExternalDcp `yaml:"dcp"` - HealthCheck HealthCheck `yaml:"healthCheck"` - RollbackMitigation RollbackMitigation `yaml:"rollbackMitigation"` - API API `yaml:"api"` - ConnectionTimeout time.Duration `yaml:"connectionTimeout"` - ConnectionBufferSize uint `yaml:"connectionBufferSize"` - SecureConnection bool `yaml:"secureConnection"` - Debug bool `yaml:"debug"` + Logging Logging `yaml:"logging"` + BucketName string `yaml:"bucketName"` + ScopeName string `yaml:"scopeName"` + Password string `yaml:"password"` + RootCAPath string `yaml:"rootCAPath"` + Username string `yaml:"username"` + Metadata Metadata `yaml:"metadata"` + Hosts []string `yaml:"hosts"` + CollectionNames []string `yaml:"collectionNames"` + Metric Metric `yaml:"metric"` + Checkpoint Checkpoint `yaml:"checkpoint"` + LeaderElection LeaderElection `yaml:"leaderElector"` + Dcp ExternalDcp `yaml:"dcp"` + HealthCheck HealthCheck `yaml:"healthCheck"` + RollbackMitigation RollbackMitigation `yaml:"rollbackMitigation"` + API API `yaml:"api"` + ConnectionTimeout time.Duration `yaml:"connectionTimeout"` + SecureConnection bool `yaml:"secureConnection"` + Debug bool `yaml:"debug"` +} + +func (c *Dcp) GetConnectionBufferSize() uint { + if c.Dcp.ConnectionBufferSize != 0 { + return c.Dcp.ConnectionBufferSize + } + + return helpers.MBToBytes(c.Dcp.ConnectionBufferSizeMB) } func (c *Dcp) IsCollectionModeEnabled() bool { @@ -189,14 +203,17 @@ func (c *Dcp) getMetadataConnectionBufferSize() uint { if connectionBufferSize, ok := c.Metadata.Config[CouchbaseMetadataConnectionBufferSizeConfig]; ok { parsedConnectionBufferSize, err := strconv.ParseUint(connectionBufferSize, 10, 32) if err != nil { - logger.Log.Error("failed to parse metadata connection buffer size: %v", err) - panic(err) + panic(fmt.Errorf("failed to parse metadata connection buffer size: %v", err)) } return uint(parsedConnectionBufferSize) } - return 5242880 // 5 MB + if connectionBufferSize, ok := c.Metadata.Config[CouchbaseMetadataConnectionBufferSizeMBConfig]; ok { + return helpers.MBToBytes(connectionBufferSize) + } + + return helpers.MBToBytes("5MB") } func (c *Dcp) getMetadataConnectionTimeout() time.Duration { @@ -325,8 +342,8 @@ func (c *Dcp) applyDefaultScopeName() { } func (c *Dcp) applyDefaultConnectionBufferSize() { - if c.ConnectionBufferSize == 0 { - c.ConnectionBufferSize = 20971520 + if c.Dcp.ConnectionBufferSizeMB == "" { + c.Dcp.ConnectionBufferSizeMB = DefaultConnectionBufferSize } } @@ -358,12 +375,10 @@ func (c *Dcp) applyDefaultLeaderElection() { func (c *Dcp) applyDefaultDcp() { if c.Dcp.BufferSize == 0 { - c.Dcp.BufferSize = 16777216 + c.Dcp.BufferSize = 16_777_216 } - if c.Dcp.ConnectionBufferSize == 0 { - c.Dcp.ConnectionBufferSize = 20971520 - } + c.applyDefaultConnectionBufferSize() if c.Dcp.Listener.BufferSize == 0 { c.Dcp.Listener.BufferSize = 1000 diff --git a/config/dcp_test.go b/config/dcp_test.go index 27530da..19c3dc2 100644 --- a/config/dcp_test.go +++ b/config/dcp_test.go @@ -3,6 +3,8 @@ package config import ( "testing" "time" + + "github.com/Trendyol/go-dcp/helpers" ) func TestDefaultConfig(t *testing.T) { @@ -199,8 +201,8 @@ func TestDcpApplyDefaultConnectionBufferSize(t *testing.T) { c := &Dcp{} c.applyDefaultConnectionBufferSize() - if c.ConnectionBufferSize != 20971520 { - t.Errorf("ConnectionBufferSize is not set to expected value") + if c.Dcp.ConnectionBufferSizeMB != "20MB" { + t.Errorf("ConnectionBufferSizeMB is not set to expected value") } } @@ -255,8 +257,8 @@ func TestDcpApplyDefaultDcp(t *testing.T) { t.Errorf("Dcp.BufferSize is not set to expected value") } - if c.Dcp.ConnectionBufferSize != 20971520 { - t.Errorf("Dcp.ConnectionBufferSize is not set to expected value") + if c.Dcp.ConnectionBufferSizeMB != "20MB" { + t.Errorf("Dcp.ConnectionBufferSizeMB is not set to expected value") } if c.Dcp.Listener.BufferSize != 1000 { @@ -279,3 +281,104 @@ func TestApplyDefaultMetadata(t *testing.T) { t.Errorf("Metadata.Type is not set to expected value") } } + +func TestDcp_getMetadataConnectionBufferSize(t *testing.T) { + t.Run("When_User_Cannot_Specify_ConnectionBufferSize", func(t *testing.T) { + // Given + c := &Dcp{} + expected := helpers.MBToBytes("5MB") + + // When + actual := c.getMetadataConnectionBufferSize() + + // Then + if expected != actual { + t.Errorf("Default metadata connection buffer size must be 5mb") + } + }) + t.Run("When_User_Specify_ConnectionBufferSize", func(t *testing.T) { + // Given + c := &Dcp{ + Metadata: Metadata{ + Config: map[string]string{ + CouchbaseMetadataConnectionBufferSizeConfig: "1048576", + }, + }, + } + expected := helpers.MBToBytes("1MB") + + // When + actual := c.getMetadataConnectionBufferSize() + + // Then + if expected != actual { + t.Errorf("Metadata connection buffer size must be 1mb") + } + }) + t.Run("When_User_Specify_ConnectionBufferSizeMB", func(t *testing.T) { + // Given + c := &Dcp{ + Metadata: Metadata{ + Config: map[string]string{ + CouchbaseMetadataConnectionBufferSizeMBConfig: "10MB", + }, + }, + } + expected := helpers.MBToBytes("10MB") + + // When + actual := c.getMetadataConnectionBufferSize() + + // Then + if expected != actual { + t.Errorf("Metadata connection buffer size must be 10MB") + } + }) +} + +func TestDcp_GetConnectionBufferSize(t *testing.T) { + t.Run("When_User_Cannot_Specify_ConnectionBufferSize", func(t *testing.T) { + // Given + c := &Dcp{} + c.applyDefaultConnectionBufferSize() + expected := helpers.MBToBytes("20MB") + + // When + actual := c.GetConnectionBufferSize() + + // Then + if expected != actual { + t.Errorf("Default connection buffer size must be 20MB") + } + }) + t.Run("When_User_Specify_ConnectionBufferSize", func(t *testing.T) { + // Given + c := &Dcp{ + Dcp: ExternalDcp{ConnectionBufferSize: 1048576}, + } + expected := helpers.MBToBytes("1MB") + + // When + actual := c.GetConnectionBufferSize() + + // Then + if expected != actual { + t.Errorf("Connection buffer size must be 1MB") + } + }) + t.Run("When_User_Specify_ConnectionBufferSizeMB", func(t *testing.T) { + // Given + c := &Dcp{ + Dcp: ExternalDcp{ConnectionBufferSizeMB: "15MB"}, + } + expected := helpers.MBToBytes("15MB") + + // When + actual := c.GetConnectionBufferSize() + + // Then + if expected != actual { + t.Errorf("Connection buffer size must be 15MB") + } + }) +} diff --git a/couchbase/client.go b/couchbase/client.go index 09dee70..96f2091 100644 --- a/couchbase/client.go +++ b/couchbase/client.go @@ -215,7 +215,7 @@ func resolveHostsAsHTTP(hosts []string) []string { } func (s *client) Connect() error { - connectionBufferSize := s.config.ConnectionBufferSize + connectionBufferSize := s.config.GetConnectionBufferSize() connectionTimeout := s.config.ConnectionTimeout if s.config.IsCouchbaseMetadata() { diff --git a/helpers/utils.go b/helpers/utils.go index fe51cd2..96b32fd 100644 --- a/helpers/utils.go +++ b/helpers/utils.go @@ -3,6 +3,8 @@ package helpers import ( "bytes" "reflect" + "strconv" + "strings" "time" ) @@ -52,3 +54,16 @@ func Retry(f func() error, attempts int, sleep time.Duration) (err error) { return err } + +const OneMbInBytes = 1_048_576 + +func MBToBytes(str string) uint { + str = strings.ToUpper(str) + + s := strings.Split(str, "MB") + + s[0] = strings.ReplaceAll(s[0], ",", ".") + + mb, _ := strconv.ParseFloat(s[0], 64) + return uint(mb * OneMbInBytes) +} diff --git a/helpers/utils_test.go b/helpers/utils_test.go index b0fa7f5..6ed19e4 100644 --- a/helpers/utils_test.go +++ b/helpers/utils_test.go @@ -31,3 +31,62 @@ func TestIsMetadata_ReturnsFalse_WhenStructHasNoKeyPrefix(t *testing.T) { t.Errorf("IsMetadata() = %v, want %v", IsMetadata(testData), false) } } + +func TestMBToBytes(t *testing.T) { + type args struct { + str string + } + tests := []struct { + name string + args args + want uint + }{ + { + name: "Case#1", + args: args{str: "1mb"}, + want: 1048576, + }, + { + name: "Case#2", + args: args{str: "1MB"}, + want: 1048576, + }, + { + name: "Case#3", + args: args{str: "15MB"}, + want: 15 * 1048576, + }, + { + name: "Case#4", + args: args{str: "20mb"}, + want: 20 * 1048576, + }, + { + name: "Case#5", + args: args{str: "2.5mb"}, + want: 2.5 * 1048576, + }, + { + name: "Case#6", + args: args{str: "2,5mb"}, + want: 2.5 * 1048576, + }, + { + name: "Case#7", + args: args{str: "20.0mb"}, + want: 20 * 1048576, + }, + { + name: "Case#8", + args: args{str: "20.5MB"}, + want: 20.5 * 1048576, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := MBToBytes(tt.args.str); got != tt.want { + t.Errorf("MBToBytes() = %v, want %v", got, tt.want) + } + }) + } +}