diff --git a/README.md b/README.md index daa7209..71ecd6b 100644 --- a/README.md +++ b/README.md @@ -13,9 +13,9 @@ This repository contains go implementation of a Couchbase Database Change Protoc + Our main goal is to build a dcp client for faster and stateful systems. We're already using this repository in below implementations: - + [Elastic Connector](https://github.com/Trendyol/go-dcp-elasticsearch) - + [Kafka Connector](https://github.com/Trendyol/go-dcp-kafka) - + [Couchbase Connector](https://github.com/Trendyol/go-dcp-couchbase) + + [Elastic Connector](https://github.com/Trendyol/go-dcp-elasticsearch) + + [Kafka Connector](https://github.com/Trendyol/go-dcp-kafka) + + [Couchbase Connector](https://github.com/Trendyol/go-dcp-couchbase) ### Example @@ -71,50 +71,51 @@ $ go get github.com/Trendyol/go-dcp ### Configuration -| Variable | Type | Required | Default | Description | -|------------------------------------------|:-----------------:|:--------:|:----------:|-------------------------------------------------------------------------------------------------------------------------| -| `hosts` | []string | yes | - | Couchbase host like `localhost:8091`. | -| `username` | string | yes | - | Couchbase username. | -| `password` | string | yes | - | Couchbase password. | -| `bucketName` | string | yes | - | Couchbase DCP bucket. | -| `dcp.group.name` | string | yes | | DCP group name for vbuckets. | -| `scopeName` | string | no | _default | Couchbase scope name. | -| `collectionNames` | []string | no | _default | Couchbase collection names. | -| `connectionBufferSize` | uint | no | 20971520 | [gocbcore](github.com/couchbase/gocbcore) library buffer size. `20mb` is default. Check this if you get OOM Killed. | -| `connectionTimeout` | time.Duration | no | 5s | Couchbase connection timeout. | -| `secureConnection` | bool | no | false | Enable TLS connection of Couchbase. | -| `rootCAPath` | string | no | *not set | if `secureConnection` set `true` this field is required. | -| `debug` | bool | no | false | For debugging purpose. | -| `dcp.bufferSize` | int | no | 16777216 | Go DCP listener pre-allocated buffer size. `16mb` is default. Check this if you get OOM Killed. | -| `dcp.connectionBufferSize` | uint | no | 20971520 | [gocbcore](github.com/couchbase/gocbcore) library buffer size. `20mb` is default. Check this if you get OOM Killed. | -| `dcp.connectionTimeout` | time.Duration | no | 5s | DCP connection timeout. | -| `dcp.listener.bufferSize` | uint | no | 1000 | Go DCP listener buffered channel size. | -| `dcp.group.membership.type` | string | no | | DCP membership types. `couchbase`, `kubernetesHa`, `kubernetesStatefulSet` or `static`. Check examples for details. | -| `dcp.group.membership.memberNumber` | int | no | 1 | Set this if membership is `static`. Other methods will ignore this field. | -| `dcp.group.membership.totalMembers` | int | no | 1 | Set this if membership is `static` or `kubernetesStatefulSet`. Other methods will ignore this field. | -| `dcp.group.membership.rebalanceDelay` | time.Duration | no | 20s | Works for autonomous mode. | -| `leaderElection.enabled` | bool | no | false | Set this true for memberships `kubernetesHa`. | -| `leaderElection.type` | string | no | kubernetes | Leader Election types. `kubernetes` | -| `leaderElection.config` | map[string]string | no | *not set | Set lease key-values like `leaseLockName`,`leaseLockNamespace`. | -| `leaderElection.rpc.port` | int | no | 8081 | This field is usable for `kubernetesStatefulSet` membership. | -| `checkpoint.type` | string | no | auto | Set checkpoint type `auto` or `manual`. | -| `checkpoint.autoReset` | string | no | earliest | Set checkpoint start point to `earliest` or `latest`. | -| `checkpoint.interval` | time.Duration | no | 20s | Checkpoint checking interval. | -| `checkpoint.timeout` | time.Duration | no | 60s | Checkpoint checking timeout. | -| `healthCheck.disabled` | bool | no | false | Disable Couchbase connection health check. | -| `healthCheck.interval` | time.Duration | no | 20s | Couchbase connection health checking interval duration. | -| `healthCheck.timeout` | time.Duration | no | 5s | Couchbase connection health checking timeout duration. | -| `rollbackMitigation.disabled` | bool | no | false | Disable reprocessing for roll-backed Vbucket offsets. | -| `rollbackMitigation.interval` | time.Duration | no | 500ms | Persisted sequence numbers polling interval. | -| `rollbackMitigation.configWatchInterval` | time.Duration | no | 2s | Cluster config changes listener interval. | -| `metadata.type` | string | no | couchbase | Metadata storing types. `file` or `couchbase`. | -| `metadata.readOnly` | bool | no | false | Set this for debugging state purposes. | -| `metadata.config` | map[string]string | no | *not set | Set key-values of config. `bucket`,`scope`,`collection`,`connectionBufferSize`,`connectionTimeout` for `couchbase` type | -| `api.disabled` | bool | no | false | Disable metric endpoints | -| `api.port` | int | no | 8080 | Set API port | -| `metric.path` | string | no | /metrics | Set metric endpoint path. | -| `metric.averageWindowSec` | float64 | no | 10.0 | Set metric window range. | -| `logging.level` | string | no | info | Set logging level. | +| Variable | Type | Required | Default | Description | +|------------------------------------------|:-----------------:|:--------:|:----------:|---------------------------------------------------------------------------------------------------------------------------------------------| +| `hosts` | []string | yes | - | Couchbase host like `localhost:8091`. | +| `username` | string | yes | - | Couchbase username. | +| `password` | string | yes | - | Couchbase password. | +| `bucketName` | string | yes | - | Couchbase DCP bucket. | +| `dcp.group.name` | string | yes | | DCP group name for vbuckets. | +| `scopeName` | string | no | _default | Couchbase scope name. | +| `collectionNames` | []string | no | _default | Couchbase collection names. | +| `connectionBufferSize` | uint | no | 20971520 | [gocbcore](github.com/couchbase/gocbcore) library buffer size. `20mb` is default. Check this if you get OOM Killed. | +| `connectionTimeout` | time.Duration | no | 5s | Couchbase connection timeout. | +| `secureConnection` | bool | no | false | Enable TLS connection of Couchbase. | +| `rootCAPath` | string | no | *not set | if `secureConnection` set `true` this field is required. | +| `debug` | bool | no | false | For debugging purpose. | +| `dcp.bufferSize` | int | no | 16mb | Go DCP listener pre-allocated buffer size. `16mb` is default. Check this if you get OOM Killed. | +| `dcp.connectionBufferSize` | uint | no | 20mb | [gocbcore](github.com/couchbase/gocbcore) library buffer size. `20mb` is default. Check this if you get OOM Killed. | +| `dcp.connectionTimeout` | time.Duration | no | 5s | DCP connection timeout. | +| `dcp.listener.bufferSize` | uint | no | 1000 | Go DCP listener buffered channel size. | +| `dcp.group.membership.type` | string | no | | DCP membership types. `couchbase`, `kubernetesHa`, `kubernetesStatefulSet` or `static`. Check examples for details. | +| `dcp.group.membership.memberNumber` | int | no | 1 | Set this if membership is `static`. Other methods will ignore this field. | +| `dcp.group.membership.totalMembers` | int | no | 1 | Set this if membership is `static` or `kubernetesStatefulSet`. Other methods will ignore this field. | +| `dcp.group.membership.rebalanceDelay` | time.Duration | no | 20s | Works for autonomous mode. | +| `dcp.group.membership.config` | map[string]string | no | *not set | Set key-values of config. `expirySeconds`,`heartbeatInterval`,`heartbeatToleranceDuration`,`monitorInterval`,`timeout` for `couchbase` type | +| `leaderElection.enabled` | bool | no | false | Set this true for memberships `kubernetesHa`. | +| `leaderElection.type` | string | no | kubernetes | Leader Election types. `kubernetes` | +| `leaderElection.config` | map[string]string | no | *not set | Set lease key-values like `leaseLockName`,`leaseLockNamespace`. | +| `leaderElection.rpc.port` | int | no | 8081 | This field is usable for `kubernetesStatefulSet` membership. | +| `checkpoint.type` | string | no | auto | Set checkpoint type `auto` or `manual`. | +| `checkpoint.autoReset` | string | no | earliest | Set checkpoint start point to `earliest` or `latest`. | +| `checkpoint.interval` | time.Duration | no | 20s | Checkpoint checking interval. | +| `checkpoint.timeout` | time.Duration | no | 60s | Checkpoint checking timeout. | +| `healthCheck.disabled` | bool | no | false | Disable Couchbase connection health check. | +| `healthCheck.interval` | time.Duration | no | 20s | Couchbase connection health checking interval duration. | +| `healthCheck.timeout` | time.Duration | no | 5s | Couchbase connection health checking timeout duration. | +| `rollbackMitigation.disabled` | bool | no | false | Disable reprocessing for roll-backed Vbucket offsets. | +| `rollbackMitigation.interval` | time.Duration | no | 500ms | Persisted sequence numbers polling interval. | +| `rollbackMitigation.configWatchInterval` | time.Duration | no | 2s | Cluster config changes listener interval. | +| `metadata.type` | string | no | couchbase | Metadata storing types. `file` or `couchbase`. | +| `metadata.readOnly` | bool | no | false | Set this for debugging state purposes. | +| `metadata.config` | map[string]string | no | *not set | Set key-values of config. `bucket`,`scope`,`collection`,`connectionBufferSize`,`connectionTimeout` for `couchbase` type | +| `api.disabled` | bool | no | false | Disable metric endpoints | +| `api.port` | int | no | 8080 | Set API port | +| `metric.path` | string | no | /metrics | Set metric endpoint path. | +| `metric.averageWindowSec` | float64 | no | 10.0 | Set metric window range. | +| `logging.level` | string | no | info | Set logging level. | ### Environment Variables @@ -122,8 +123,8 @@ These environment variables will **overwrite** the corresponding configs. | Variable | Type | Corresponding Config | Description | |---------------------------------------------|:----:|:---------------------------------:|:------------------------------------------------------------:| -| `GO_DCP__DCP_GROUP_MEMBERSHIP_MEMBERNUMBER` | int | dcp.group.membership.memberNumber | To be able to prevent making deployment to scale up or down. -| `GO_DCP__DCP_GROUP_MEMBERSHIP_TOTALMEMBERS` | int | dcp.group.membership.totalMembers | To be able to prevent making deployment to scale up or down. +| `GO_DCP__DCP_GROUP_MEMBERSHIP_MEMBERNUMBER` | int | dcp.group.membership.memberNumber | To be able to prevent making deployment to scale up or down. | +| `GO_DCP__DCP_GROUP_MEMBERSHIP_TOTALMEMBERS` | int | dcp.group.membership.totalMembers | To be able to prevent making deployment to scale up or down. | ### Monitoring @@ -172,4 +173,4 @@ You can adjust the average window time for the metrics by specifying the value o - [couchbase membership config](example/config.yml) - thanks to [@onursak](https://github.com/onursak) - [kubernetesStatefulSet membership config](example/config_k8s_stateful_set.yml) - [kubernetesHa membership config](example/config_k8s_leader_election.yml) -- [static membership config](example/config_static.yml) +- [static membership config](example/config_static.yml) \ No newline at end of file diff --git a/config/dcp.go b/config/dcp.go index 5cab019..a4c1556 100644 --- a/config/dcp.go +++ b/config/dcp.go @@ -6,6 +6,8 @@ import ( "strconv" "time" + "github.com/Trendyol/go-dcp/helpers" + "github.com/Trendyol/go-dcp/logger" ) @@ -22,13 +24,19 @@ const ( CouchbaseMetadataConnectionBufferSizeConfig = "connectionBufferSize" CouchbaseMetadataConnectionTimeoutConfig = "connectionTimeout" CheckpointTypeAuto = "auto" + CouchbaseMembershipExpirySecondsConfig = "expirySeconds" + CouchbaseMembershipHeartbeatIntervalConfig = "heartbeatInterval" + CouchbaseMembershipHeartbeatToleranceConfig = "heartbeatToleranceDuration" + CouchbaseMembershipMonitorIntervalConfig = "monitorInterval" + CouchbaseMembershipTimeoutConfig = "timeout" ) type DCPGroupMembership struct { - Type string `yaml:"type"` - MemberNumber int `yaml:"memberNumber"` - TotalMembers int `yaml:"totalMembers"` - RebalanceDelay time.Duration `yaml:"rebalanceDelay"` + Config map[string]string `yaml:"config"` + Type string `yaml:"type"` + MemberNumber int `yaml:"memberNumber"` + TotalMembers int `yaml:"totalMembers"` + RebalanceDelay time.Duration `yaml:"rebalanceDelay"` } type DCPGroup struct { @@ -41,9 +49,9 @@ type DCPListener struct { } type ExternalDcp struct { + BufferSize any `yaml:"bufferSize"` + ConnectionBufferSize any `yaml:"connectionBufferSize"` Group DCPGroup `yaml:"group"` - BufferSize int `yaml:"bufferSize"` - ConnectionBufferSize uint `yaml:"connectionBufferSize"` ConnectionTimeout time.Duration `yaml:"connectionTimeout"` Listener DCPListener `yaml:"listener"` } @@ -89,9 +97,9 @@ type RollbackMitigation struct { } type Metadata struct { - Config map[string]string `yaml:"config"` - Type string `yaml:"type"` - ReadOnly bool `json:"readOnly"` + Config map[string]any `yaml:"config"` + Type string `yaml:"type"` + ReadOnly bool `json:"readOnly"` } type Logging struct { @@ -99,26 +107,25 @@ type Logging struct { } type Dcp struct { - Logging Logging `yaml:"logging"` - BucketName string `yaml:"bucketName"` - ScopeName string `yaml:"scopeName"` - Password string `yaml:"password"` - RootCAPath string `yaml:"rootCAPath"` - Username string `yaml:"username"` - Metadata Metadata `yaml:"metadata"` - Hosts []string `yaml:"hosts"` - CollectionNames []string `yaml:"collectionNames"` - Metric Metric `yaml:"metric"` - Checkpoint Checkpoint `yaml:"checkpoint"` - LeaderElection LeaderElection `yaml:"leaderElector"` - Dcp ExternalDcp `yaml:"dcp"` - HealthCheck HealthCheck `yaml:"healthCheck"` - RollbackMitigation RollbackMitigation `yaml:"rollbackMitigation"` - API API `yaml:"api"` - ConnectionTimeout time.Duration `yaml:"connectionTimeout"` - ConnectionBufferSize uint `yaml:"connectionBufferSize"` - SecureConnection bool `yaml:"secureConnection"` - Debug bool `yaml:"debug"` + BucketName string `yaml:"bucketName"` + ScopeName string `yaml:"scopeName"` + Password string `yaml:"password"` + RootCAPath string `yaml:"rootCAPath"` + Username string `yaml:"username"` + Logging Logging `yaml:"logging"` + Metadata Metadata `yaml:"metadata"` + Hosts []string `yaml:"hosts"` + CollectionNames []string `yaml:"collectionNames"` + Metric Metric `yaml:"metric"` + Checkpoint Checkpoint `yaml:"checkpoint"` + LeaderElection LeaderElection `yaml:"leaderElector"` + Dcp ExternalDcp `yaml:"dcp"` + HealthCheck HealthCheck `yaml:"healthCheck"` + RollbackMitigation RollbackMitigation `yaml:"rollbackMitigation"` + API API `yaml:"api"` + ConnectionTimeout time.Duration `yaml:"connectionTimeout"` + SecureConnection bool `yaml:"secureConnection"` + Debug bool `yaml:"debug"` } func (c *Dcp) IsCollectionModeEnabled() bool { @@ -137,7 +144,7 @@ func (c *Dcp) GetFileMetadata() string { var fileName string if _, ok := c.Metadata.Config[FileMetadataFileNameConfig]; ok { - fileName = c.Metadata.Config[FileMetadataFileNameConfig] + fileName = c.Metadata.Config[FileMetadataFileNameConfig].(string) } else { err := errors.New("file metadata file name is not set") logger.Log.Error("failed to get metadata file name: %v", err) @@ -153,39 +160,105 @@ func (c *Dcp) GetFileMetadata() string { return fileName } -func (c *Dcp) GetCouchbaseMetadata() (string, string, string, uint, time.Duration) { - return c.getMetadataBucket(), - c.getMetadataScope(), - c.getMetadataCollection(), - c.getMetadataConnectionBufferSize(), - c.getMetadataConnectionTimeout() +type CouchbaseMembership struct { + ExpirySeconds uint32 `yaml:"expirySeconds"` + HeartbeatInterval time.Duration `yaml:"heartbeatInterval"` + HeartbeatToleranceDuration time.Duration `yaml:"heartbeatToleranceDuration"` + MonitorInterval time.Duration `yaml:"monitorInterval"` + Timeout time.Duration `yaml:"timeout"` } -func (c *Dcp) getMetadataBucket() string { - if bucket, ok := c.Metadata.Config[CouchbaseMetadataBucketConfig]; ok { - return bucket +func (c *Dcp) GetCouchbaseMembership() *CouchbaseMembership { + couchbaseMembership := CouchbaseMembership{ + ExpirySeconds: 10, + HeartbeatInterval: 5 * time.Second, + HeartbeatToleranceDuration: 2 * time.Second, + MonitorInterval: 500 * time.Millisecond, + Timeout: 10 * time.Second, } - return c.BucketName -} + if expirySeconds, ok := c.Dcp.Group.Membership.Config[CouchbaseMembershipExpirySecondsConfig]; ok { + parsedExpirySeconds, err := strconv.ParseUint(expirySeconds, 10, 32) + if err != nil { + logger.Log.Error("failed to parse membership expiry seconds: %v", err) + panic(err) + } -func (c *Dcp) getMetadataScope() string { - if scope, ok := c.Metadata.Config[CouchbaseMetadataScopeConfig]; ok { - return scope + couchbaseMembership.ExpirySeconds = uint32(parsedExpirySeconds) } - return DefaultScopeName -} + if heartbeatInterval, ok := c.Dcp.Group.Membership.Config[CouchbaseMembershipHeartbeatIntervalConfig]; ok { + parsedHeartbeatInterval, err := time.ParseDuration(heartbeatInterval) + if err != nil { + logger.Log.Error("failed to parse membership heartbeat interval: %v", err) + panic(err) + } -func (c *Dcp) getMetadataCollection() string { - if collection, ok := c.Metadata.Config[CouchbaseMetadataCollectionConfig]; ok { - return collection + couchbaseMembership.HeartbeatInterval = parsedHeartbeatInterval + } + + if heartbeatToleranceDuration, ok := c.Dcp.Group.Membership.Config[CouchbaseMembershipHeartbeatToleranceConfig]; ok { + parsedHeartbeatToleranceDuration, err := time.ParseDuration(heartbeatToleranceDuration) + if err != nil { + logger.Log.Error("failed to parse membership heartbeat tolerance duration: %v", err) + panic(err) + } + + couchbaseMembership.HeartbeatToleranceDuration = parsedHeartbeatToleranceDuration + } + + if monitorInterval, ok := c.Dcp.Group.Membership.Config[CouchbaseMembershipMonitorIntervalConfig]; ok { + parsedMonitorInterval, err := time.ParseDuration(monitorInterval) + if err != nil { + logger.Log.Error("failed to parse membership monitor interval: %v", err) + panic(err) + } + + couchbaseMembership.MonitorInterval = parsedMonitorInterval + } + + if timeout, ok := c.Dcp.Group.Membership.Config[CouchbaseMembershipTimeoutConfig]; ok { + parsedTimeout, err := time.ParseDuration(timeout) + if err != nil { + logger.Log.Error("failed to parse membership timeout: %v", err) + panic(err) + } + + couchbaseMembership.Timeout = parsedTimeout } - return DefaultCollectionName + return &couchbaseMembership +} + +type CouchbaseMetadata struct { + Bucket string `yaml:"bucket"` + Scope string `yaml:"scope"` + Collection string `yaml:"collection"` + ConnectionBufferSize uint `yaml:"connectionBufferSize"` + ConnectionTimeout time.Duration `yaml:"connectionTimeout"` } -func (c *Dcp) getMetadataConnectionBufferSize() uint { +func (c *Dcp) GetCouchbaseMetadata() CouchbaseMetadata { + couchbaseMetadata := CouchbaseMetadata{ + Bucket: c.BucketName, + Scope: DefaultScopeName, + Collection: DefaultCollectionName, + ConnectionBufferSize: 5242880, // 5 MB + ConnectionTimeout: 5 * time.Second, + } + + if bucket, ok := c.Metadata.Config[CouchbaseMetadataBucketConfig]; ok { + couchbaseMetadata.Bucket = bucket + } + + if scope, ok := c.Metadata.Config[CouchbaseMetadataScopeConfig]; ok { + couchbaseMetadata.Scope = scope + } + + if collection, ok := c.Metadata.Config[CouchbaseMetadataCollectionConfig]; ok { + couchbaseMetadata.Collection = collection + } + if connectionBufferSize, ok := c.Metadata.Config[CouchbaseMetadataConnectionBufferSizeConfig]; ok { parsedConnectionBufferSize, err := strconv.ParseUint(connectionBufferSize, 10, 32) if err != nil { @@ -193,13 +266,9 @@ func (c *Dcp) getMetadataConnectionBufferSize() uint { panic(err) } - return uint(parsedConnectionBufferSize) + couchbaseMetadata.ConnectionBufferSize = uint(parsedConnectionBufferSize) } - return 5242880 // 5 MB -} - -func (c *Dcp) getMetadataConnectionTimeout() time.Duration { if connectionTimeout, ok := c.Metadata.Config[CouchbaseMetadataConnectionTimeoutConfig]; ok { parsedConnectionTimeout, err := time.ParseDuration(connectionTimeout) if err != nil { @@ -207,10 +276,18 @@ func (c *Dcp) getMetadataConnectionTimeout() time.Duration { panic(err) } - return parsedConnectionTimeout + couchbaseMetadata.ConnectionTimeout = parsedConnectionTimeout } - return 5 * time.Second + return couchbaseMetadata +} + +func (c *Dcp) getMetadataBucket() string { + if bucket, ok := c.Metadata.Config[CouchbaseMetadataBucketConfig].(string); ok { + return bucket + } + + return c.BucketName } func (c *Dcp) ApplyDefaults() { @@ -325,8 +402,9 @@ func (c *Dcp) applyDefaultScopeName() { } func (c *Dcp) applyDefaultConnectionBufferSize() { - if c.ConnectionBufferSize == 0 { - c.ConnectionBufferSize = 20971520 + if c.Dcp.ConnectionBufferSize == nil { + defaultValue, _ := helpers.ConvertSizeUnitToByte("20mb") + c.Dcp.ConnectionBufferSize = defaultValue } } @@ -357,13 +435,11 @@ func (c *Dcp) applyDefaultLeaderElection() { } func (c *Dcp) applyDefaultDcp() { - if c.Dcp.BufferSize == 0 { - c.Dcp.BufferSize = 16777216 + if c.Dcp.BufferSize == nil { + c.Dcp.BufferSize = helpers.ResolveUnionIntOrStringValue("16mb") } - if c.Dcp.ConnectionBufferSize == 0 { - c.Dcp.ConnectionBufferSize = 20971520 - } + c.applyDefaultConnectionBufferSize() if c.Dcp.Listener.BufferSize == 0 { c.Dcp.Listener.BufferSize = 1000 diff --git a/config/dcp_test.go b/config/dcp_test.go index 27530da..a5fc272 100644 --- a/config/dcp_test.go +++ b/config/dcp_test.go @@ -37,7 +37,7 @@ func TestDefaultConfig(t *testing.T) { func TestGetCouchbaseMetadata(t *testing.T) { dcp := &Dcp{ Metadata: Metadata{ - Config: map[string]string{ + Config: map[string]any{ CouchbaseMetadataBucketConfig: "mybucket", CouchbaseMetadataScopeConfig: "myscope", }, @@ -45,7 +45,7 @@ func TestGetCouchbaseMetadata(t *testing.T) { BucketName: "mybucket2", } - bucket, scope, collection, connectionBufferSize, connectionTimeout := dcp.GetCouchbaseMetadata() + couchbaseMetadata := dcp.GetCouchbaseMetadata() expectedBucket := "mybucket" expectedScope := "myscope" @@ -53,31 +53,76 @@ func TestGetCouchbaseMetadata(t *testing.T) { expectedConnectionBufferSize := uint(5242880) expectedConnectionTimeout := 5 * time.Second - if bucket != expectedBucket { + if couchbaseMetadata.Bucket != expectedBucket { t.Errorf("Bucket is not set to expected value") } - if scope != expectedScope { + if couchbaseMetadata.Scope != expectedScope { t.Errorf("Scope is not set to expected value") } - if collection != expectedCollection { + if couchbaseMetadata.Collection != expectedCollection { t.Errorf("Collection is not set to expected value") } - if connectionBufferSize != expectedConnectionBufferSize { + if couchbaseMetadata.ConnectionBufferSize != expectedConnectionBufferSize { t.Errorf("ConnectionBufferSize is not set to expected value") } - if connectionTimeout != expectedConnectionTimeout { + if couchbaseMetadata.ConnectionTimeout != expectedConnectionTimeout { t.Errorf("ConnectionTimeout is not set to expected value") } } +func TestGetCouchbaseMembership(t *testing.T) { + dcp := &Dcp{ + Dcp: ExternalDcp{ + Group: DCPGroup{ + Membership: DCPGroupMembership{ + Config: map[string]string{ + CouchbaseMembershipExpirySecondsConfig: "5", + CouchbaseMembershipHeartbeatIntervalConfig: "5s", + CouchbaseMembershipMonitorIntervalConfig: "1s", + CouchbaseMembershipTimeoutConfig: "10s", + }, + }, + }, + }, + } + + couchbaseMembership := dcp.GetCouchbaseMembership() + + expectedExpiryDuration := uint32(5) + expectedHeartbeatInterval := 5 * time.Second + expectedHeartbeatTolerance := 2 * time.Second + expectedMonitorInterval := 1 * time.Second + expectedTimeout := 10 * time.Second + + if couchbaseMembership.ExpirySeconds != expectedExpiryDuration { + t.Errorf("ExpiryDuration is not set to expected value") + } + + if couchbaseMembership.HeartbeatInterval != expectedHeartbeatInterval { + t.Errorf("HeartbeatInterval is not set to expected value") + } + + if couchbaseMembership.HeartbeatToleranceDuration != expectedHeartbeatTolerance { + t.Errorf("HeartbeatToleranceDuration is not set to expected value") + } + + if couchbaseMembership.MonitorInterval != expectedMonitorInterval { + t.Errorf("MonitorInterval is not set to expected value") + } + + if couchbaseMembership.Timeout != expectedTimeout { + t.Errorf("Timeout is not set to expected value") + } +} + func TestDcp_GetFileMetadata(t *testing.T) { dcp := &Dcp{ Metadata: Metadata{ - Config: map[string]string{ + Config: map[string]any{ FileMetadataFileNameConfig: "testfile.json", }, }, @@ -199,7 +244,7 @@ func TestDcpApplyDefaultConnectionBufferSize(t *testing.T) { c := &Dcp{} c.applyDefaultConnectionBufferSize() - if c.ConnectionBufferSize != 20971520 { + if c.Dcp.ConnectionBufferSize.(int) != 20971520 { t.Errorf("ConnectionBufferSize is not set to expected value") } } @@ -251,11 +296,11 @@ func TestDcpApplyDefaultDcp(t *testing.T) { c := &Dcp{} c.applyDefaultDcp() - if c.Dcp.BufferSize != 16777216 { + if c.Dcp.BufferSize.(int) != 16777216 { t.Errorf("Dcp.BufferSize is not set to expected value") } - if c.Dcp.ConnectionBufferSize != 20971520 { + if c.Dcp.ConnectionBufferSize.(int) != 20971520 { t.Errorf("Dcp.ConnectionBufferSize is not set to expected value") } diff --git a/couchbase/client.go b/couchbase/client.go index 09dee70..04a023d 100644 --- a/couchbase/client.go +++ b/couchbase/client.go @@ -8,6 +8,8 @@ import ( "os" "time" + "github.com/Trendyol/go-dcp/helpers" + "github.com/couchbase/gocbcore/v10/connstr" "github.com/Trendyol/go-dcp/config" @@ -215,7 +217,7 @@ func resolveHostsAsHTTP(hosts []string) []string { } func (s *client) Connect() error { - connectionBufferSize := s.config.ConnectionBufferSize + connectionBufferSize := uint(helpers.ResolveUnionIntOrStringValue(s.config.Dcp.ConnectionBufferSize)) connectionTimeout := s.config.ConnectionTimeout if s.config.IsCouchbaseMetadata() { @@ -283,11 +285,11 @@ func (s *client) DcpConnect() error { Enabled: true, }, DCPConfig: gocbcore.DCPConfig{ - BufferSize: s.config.Dcp.BufferSize, + BufferSize: helpers.ResolveUnionIntOrStringValue(s.config.Dcp.BufferSize), UseExpiryOpcode: true, }, KVConfig: gocbcore.KVConfig{ - ConnectionBufferSize: s.config.Dcp.ConnectionBufferSize, + ConnectionBufferSize: uint(helpers.ResolveUnionIntOrStringValue(s.config.Dcp.ConnectionBufferSize)), }, } diff --git a/helpers/data_units.go b/helpers/data_units.go new file mode 100644 index 0000000..86b6ced --- /dev/null +++ b/helpers/data_units.go @@ -0,0 +1,61 @@ +package helpers + +import ( + "fmt" + "strconv" + "strings" +) + +func ResolveUnionIntOrStringValue(input any) int { + switch value := input.(type) { + case int: + return value + case uint: + return int(value) + case string: + intValue, err := strconv.ParseInt(value, 10, 64) + if err == nil { + return int(intValue) + } + + result, err := ConvertSizeUnitToByte(value) + if err != nil { + panic(err) + } + + return result + } + + return 0 +} + +func ConvertSizeUnitToByte(str string) (int, error) { + if len(str) < 2 { + return 0, fmt.Errorf("invalid input: %s", str) + } + + // Extract the numeric part of the input + sizeStr := str[:len(str)-2] + sizeStr = strings.TrimSpace(sizeStr) + sizeStr = strings.ReplaceAll(sizeStr, ",", ".") + + size, err := strconv.ParseFloat(sizeStr, 64) + if err != nil { + return 0, fmt.Errorf("cannot extract numeric part for the input %s, err = %w", str, err) + } + + // Determine the unit (B, KB, MB, GB) + unit := str[len(str)-2:] + switch strings.ToUpper(unit) { + case "B": + return int(size), nil + case "KB": + return int(size * 1024), nil + case "MB": + return int(size * 1024 * 1024), nil + case "GB": + return int(size * 1024 * 1024 * 1024), nil + default: + return 0, fmt.Errorf("unsupported unit: %s, you can specify one of B, KB, MB and GB", unit) + } +} diff --git a/helpers/data_units_test.go b/helpers/data_units_test.go new file mode 100644 index 0000000..5e43d68 --- /dev/null +++ b/helpers/data_units_test.go @@ -0,0 +1,85 @@ +package helpers + +import "testing" + +func TestDcp_ResolveConnectionBufferSize(t *testing.T) { + tests := []struct { + input any + name string + want int + }{ + { + name: "When_Client_Gives_Int_Value", + input: 20971520, + want: 20971520, + }, + { + name: "When_Client_Gives_UInt_Value", + input: uint(10971520), + want: 10971520, + }, + { + name: "When_Client_Gives_StringInt_Value", + input: "15971520", + want: 15971520, + }, + { + name: "When_Client_Gives_KB_Value", + input: "500kb", + want: 500 * 1024, + }, + { + name: "When_Client_Gives_MB_Value", + input: "10mb", + want: 10 * 1024 * 1024, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := ResolveUnionIntOrStringValue(tt.input); got != tt.want { + t.Errorf("ResolveConnectionBufferSize() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestConvertToBytes(t *testing.T) { + testCases := []struct { + input string + expected int + err bool + }{ + {"1kb", 1024, false}, + {"5mb", 5 * 1024 * 1024, false}, + {"5,5mb", 5.5 * 1024 * 1024, false}, + {"8.5mb", 8.5 * 1024 * 1024, false}, + {"10,25 mb", 10.25 * 1024 * 1024, false}, + {"10gb", 10 * 1024 * 1024 * 1024, false}, + {"1KB", 1024, false}, + {"5MB", 5 * 1024 * 1024, false}, + {"12 MB", 12 * 1024 * 1024, false}, + {"10GB", 10 * 1024 * 1024 * 1024, false}, + {"123", 0, true}, + {"15TB", 0, true}, + {"invalid", 0, true}, + {"", 0, true}, + {"123 KB", 123 * 1024, false}, + {"1 MB", 1 * 1024 * 1024, false}, + } + + for _, tc := range testCases { + result, err := ConvertSizeUnitToByte(tc.input) + + if tc.err && err == nil { + t.Errorf("Expected an error for input %s, but got none", tc.input) + } + + if !tc.err && err != nil { + t.Errorf("Unexpected error for input %s: %v", tc.input, err) + } + + if result != tc.expected { + t.Errorf("For input %s, expected %d bytes, but got %d", tc.input, tc.expected, result) + } + } +}