Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: extract connectionBufferSize DcpBufferSize to interface for support kb,mb size units #71

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ $ go get github.com/Trendyol/go-dcp
| `secureConnection` | bool | no | false | Enable TLS connection of Couchbase. |
| `rootCAPath` | string | no | *not set | if `secureConnection` set `true` this field is required. |
| `debug` | bool | no | false | For debugging purpose. |
| `dcp.bufferSize` | int | no | 16777216 | Go DCP listener pre-allocated buffer size. `16mb` is default. Check this if you get OOM Killed. |
| `dcp.connectionBufferSize` | uint | no | 20971520 | [gocbcore](github.com/couchbase/gocbcore) library buffer size. `20mb` is default. Check this if you get OOM Killed. |
| `dcp.bufferSize` | int, string | no | 16777216 | Go DCP listener pre-allocated buffer size. `16mb` is default. Check this if you get OOM Killed. |
Abdulsametileri marked this conversation as resolved.
Show resolved Hide resolved
| `dcp.connectionBufferSize` | uint, string | no | 20971520 | [gocbcore](github.com/couchbase/gocbcore) library buffer size. `20mb` is default. Check this if you get OOM Killed. |
Abdulsametileri marked this conversation as resolved.
Show resolved Hide resolved
| `dcp.connectionTimeout` | time.Duration | no | 5s | DCP connection timeout. |
| `dcp.listener.bufferSize` | uint | no | 1000 | Go DCP listener buffered channel size. |
| `dcp.group.membership.type` | string | no | | DCP membership types. `couchbase`, `kubernetesHa`, `kubernetesStatefulSet` or `static`. Check examples for details. |
Expand Down
83 changes: 39 additions & 44 deletions config/dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strconv"
"time"

"github.com/Trendyol/go-dcp/helpers"

"github.com/Trendyol/go-dcp/logger"
)

Expand Down Expand Up @@ -41,9 +43,9 @@ type DCPListener struct {
}

type ExternalDcp struct {
BufferSize any `yaml:"bufferSize"`
ConnectionBufferSize any `yaml:"connectionBufferSize"`
Group DCPGroup `yaml:"group"`
BufferSize int `yaml:"bufferSize"`
ConnectionBufferSize uint `yaml:"connectionBufferSize"`
ConnectionTimeout time.Duration `yaml:"connectionTimeout"`
Listener DCPListener `yaml:"listener"`
}
Expand Down Expand Up @@ -89,36 +91,35 @@ type RollbackMitigation struct {
}

type Metadata struct {
Config map[string]string `yaml:"config"`
Type string `yaml:"type"`
ReadOnly bool `json:"readOnly"`
Config map[string]any `yaml:"config"`
Type string `yaml:"type"`
ReadOnly bool `json:"readOnly"`
}

type Logging struct {
Level string `yaml:"level"`
}

type Dcp struct {
Logging Logging `yaml:"logging"`
BucketName string `yaml:"bucketName"`
ScopeName string `yaml:"scopeName"`
Password string `yaml:"password"`
RootCAPath string `yaml:"rootCAPath"`
Username string `yaml:"username"`
Metadata Metadata `yaml:"metadata"`
Hosts []string `yaml:"hosts"`
CollectionNames []string `yaml:"collectionNames"`
Metric Metric `yaml:"metric"`
Checkpoint Checkpoint `yaml:"checkpoint"`
LeaderElection LeaderElection `yaml:"leaderElector"`
Dcp ExternalDcp `yaml:"dcp"`
HealthCheck HealthCheck `yaml:"healthCheck"`
RollbackMitigation RollbackMitigation `yaml:"rollbackMitigation"`
API API `yaml:"api"`
ConnectionTimeout time.Duration `yaml:"connectionTimeout"`
ConnectionBufferSize uint `yaml:"connectionBufferSize"`
SecureConnection bool `yaml:"secureConnection"`
Debug bool `yaml:"debug"`
BucketName string `yaml:"bucketName"`
ScopeName string `yaml:"scopeName"`
Password string `yaml:"password"`
RootCAPath string `yaml:"rootCAPath"`
Username string `yaml:"username"`
Logging Logging `yaml:"logging"`
Metadata Metadata `yaml:"metadata"`
Hosts []string `yaml:"hosts"`
CollectionNames []string `yaml:"collectionNames"`
Metric Metric `yaml:"metric"`
Checkpoint Checkpoint `yaml:"checkpoint"`
LeaderElection LeaderElection `yaml:"leaderElector"`
Dcp ExternalDcp `yaml:"dcp"`
HealthCheck HealthCheck `yaml:"healthCheck"`
RollbackMitigation RollbackMitigation `yaml:"rollbackMitigation"`
API API `yaml:"api"`
ConnectionTimeout time.Duration `yaml:"connectionTimeout"`
SecureConnection bool `yaml:"secureConnection"`
Debug bool `yaml:"debug"`
}

func (c *Dcp) IsCollectionModeEnabled() bool {
Expand All @@ -137,7 +138,7 @@ func (c *Dcp) GetFileMetadata() string {
var fileName string

if _, ok := c.Metadata.Config[FileMetadataFileNameConfig]; ok {
fileName = c.Metadata.Config[FileMetadataFileNameConfig]
fileName = c.Metadata.Config[FileMetadataFileNameConfig].(string)
} else {
err := errors.New("file metadata file name is not set")
logger.Log.Error("failed to get metadata file name: %v", err)
Expand All @@ -162,23 +163,23 @@ func (c *Dcp) GetCouchbaseMetadata() (string, string, string, uint, time.Duratio
}

func (c *Dcp) getMetadataBucket() string {
if bucket, ok := c.Metadata.Config[CouchbaseMetadataBucketConfig]; ok {
if bucket, ok := c.Metadata.Config[CouchbaseMetadataBucketConfig].(string); ok {
return bucket
}

return c.BucketName
}

func (c *Dcp) getMetadataScope() string {
if scope, ok := c.Metadata.Config[CouchbaseMetadataScopeConfig]; ok {
if scope, ok := c.Metadata.Config[CouchbaseMetadataScopeConfig].(string); ok {
return scope
}

return DefaultScopeName
}

func (c *Dcp) getMetadataCollection() string {
if collection, ok := c.Metadata.Config[CouchbaseMetadataCollectionConfig]; ok {
if collection, ok := c.Metadata.Config[CouchbaseMetadataCollectionConfig].(string); ok {
return collection
}

Expand All @@ -187,20 +188,15 @@ func (c *Dcp) getMetadataCollection() string {

func (c *Dcp) getMetadataConnectionBufferSize() uint {
if connectionBufferSize, ok := c.Metadata.Config[CouchbaseMetadataConnectionBufferSizeConfig]; ok {
parsedConnectionBufferSize, err := strconv.ParseUint(connectionBufferSize, 10, 32)
if err != nil {
logger.Log.Error("failed to parse metadata connection buffer size: %v", err)
panic(err)
}

return uint(parsedConnectionBufferSize)
return uint(helpers.ResolveUnionIntOrStringValue(connectionBufferSize))
}

return 5242880 // 5 MB
result, _ := helpers.ConvertSizeUnitToByte("5MB")
return uint(result)
}

func (c *Dcp) getMetadataConnectionTimeout() time.Duration {
if connectionTimeout, ok := c.Metadata.Config[CouchbaseMetadataConnectionTimeoutConfig]; ok {
if connectionTimeout, ok := c.Metadata.Config[CouchbaseMetadataConnectionTimeoutConfig].(string); ok {
parsedConnectionTimeout, err := time.ParseDuration(connectionTimeout)
if err != nil {
logger.Log.Error("failed to parse metadata connection timeout: %v", err)
Expand Down Expand Up @@ -325,8 +321,9 @@ func (c *Dcp) applyDefaultScopeName() {
}

func (c *Dcp) applyDefaultConnectionBufferSize() {
if c.ConnectionBufferSize == 0 {
c.ConnectionBufferSize = 20971520
if c.Dcp.ConnectionBufferSize == nil {
defaultValue, _ := helpers.ConvertSizeUnitToByte("20mb")
c.Dcp.ConnectionBufferSize = defaultValue
}
}

Expand Down Expand Up @@ -357,13 +354,11 @@ func (c *Dcp) applyDefaultLeaderElection() {
}

func (c *Dcp) applyDefaultDcp() {
if c.Dcp.BufferSize == 0 {
if c.Dcp.BufferSize == nil {
c.Dcp.BufferSize = 16777216
Abdulsametileri marked this conversation as resolved.
Show resolved Hide resolved
}

if c.Dcp.ConnectionBufferSize == 0 {
c.Dcp.ConnectionBufferSize = 20971520
}
c.applyDefaultConnectionBufferSize()

if c.Dcp.Listener.BufferSize == 0 {
c.Dcp.Listener.BufferSize = 1000
Expand Down
10 changes: 5 additions & 5 deletions config/dcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestDefaultConfig(t *testing.T) {
func TestGetCouchbaseMetadata(t *testing.T) {
dcp := &Dcp{
Metadata: Metadata{
Config: map[string]string{
Config: map[string]any{
CouchbaseMetadataBucketConfig: "mybucket",
CouchbaseMetadataScopeConfig: "myscope",
},
Expand Down Expand Up @@ -77,7 +77,7 @@ func TestGetCouchbaseMetadata(t *testing.T) {
func TestDcp_GetFileMetadata(t *testing.T) {
dcp := &Dcp{
Metadata: Metadata{
Config: map[string]string{
Config: map[string]any{
FileMetadataFileNameConfig: "testfile.json",
},
},
Expand Down Expand Up @@ -199,7 +199,7 @@ func TestDcpApplyDefaultConnectionBufferSize(t *testing.T) {
c := &Dcp{}
c.applyDefaultConnectionBufferSize()

if c.ConnectionBufferSize != 20971520 {
if c.Dcp.ConnectionBufferSize.(int) != 20971520 {
t.Errorf("ConnectionBufferSize is not set to expected value")
}
}
Expand Down Expand Up @@ -251,11 +251,11 @@ func TestDcpApplyDefaultDcp(t *testing.T) {
c := &Dcp{}
c.applyDefaultDcp()

if c.Dcp.BufferSize != 16777216 {
if c.Dcp.BufferSize.(int) != 16777216 {
t.Errorf("Dcp.BufferSize is not set to expected value")
}

if c.Dcp.ConnectionBufferSize != 20971520 {
if c.Dcp.ConnectionBufferSize.(int) != 20971520 {
t.Errorf("Dcp.ConnectionBufferSize is not set to expected value")
}

Expand Down
8 changes: 5 additions & 3 deletions couchbase/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"os"
"time"

"github.com/Trendyol/go-dcp/helpers"

"github.com/couchbase/gocbcore/v10/connstr"

"github.com/Trendyol/go-dcp/config"
Expand Down Expand Up @@ -215,7 +217,7 @@ func resolveHostsAsHTTP(hosts []string) []string {
}

func (s *client) Connect() error {
connectionBufferSize := s.config.ConnectionBufferSize
connectionBufferSize := uint(helpers.ResolveUnionIntOrStringValue(s.config.Dcp.ConnectionBufferSize))
connectionTimeout := s.config.ConnectionTimeout

if s.config.IsCouchbaseMetadata() {
Expand Down Expand Up @@ -283,11 +285,11 @@ func (s *client) DcpConnect() error {
Enabled: true,
},
DCPConfig: gocbcore.DCPConfig{
BufferSize: s.config.Dcp.BufferSize,
BufferSize: helpers.ResolveUnionIntOrStringValue(s.config.Dcp.BufferSize),
UseExpiryOpcode: true,
},
KVConfig: gocbcore.KVConfig{
ConnectionBufferSize: s.config.Dcp.ConnectionBufferSize,
ConnectionBufferSize: uint(helpers.ResolveUnionIntOrStringValue(s.config.Dcp.ConnectionBufferSize)),
},
}

Expand Down
61 changes: 61 additions & 0 deletions helpers/data_units.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package helpers

import (
"fmt"
"strconv"
"strings"
)

func ResolveUnionIntOrStringValue(input any) int {
switch value := input.(type) {
case int:
return value
case uint:
return int(value)
case string:
intValue, err := strconv.ParseInt(value, 10, 64)
if err == nil {
return int(intValue)
}

result, err := ConvertSizeUnitToByte(value)
if err != nil {
panic(err)
}

return result
}

return 0
}

func ConvertSizeUnitToByte(str string) (int, error) {
if len(str) < 2 {
return 0, fmt.Errorf("invalid input: %s", str)
}

// Extract the numeric part of the input
sizeStr := str[:len(str)-2]
sizeStr = strings.TrimSpace(sizeStr)
sizeStr = strings.ReplaceAll(sizeStr, ",", ".")

size, err := strconv.ParseFloat(sizeStr, 64)
if err != nil {
return 0, fmt.Errorf("cannot extract numeric part for the input %s, err = %w", str, err)
}

// Determine the unit (B, KB, MB, GB)
unit := str[len(str)-2:]
switch strings.ToUpper(unit) {
case "B":
return int(size), nil
case "KB":
return int(size * 1024), nil
case "MB":
return int(size * 1024 * 1024), nil
case "GB":
return int(size * 1024 * 1024 * 1024), nil
default:
return 0, fmt.Errorf("unsupported unit: %s, you can specify one of B, KB, MB and GB", unit)
}
}
Loading