Skip to content

Commit

Permalink
feat: multi goroutine (#99)
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan authored Oct 23, 2024
1 parent ee939ac commit 987b584
Show file tree
Hide file tree
Showing 17 changed files with 326 additions and 383 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dcp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
version: [ "5.0.1", "5.1.3", "5.5.6", "6.0.5", "6.5.2", "6.6.6", "7.0.5", "7.1.6", "7.2.5", "7.6.1" ]
version: [ "5.0.1", "5.1.3", "5.5.6", "6.0.5", "6.5.2", "6.6.6", "7.0.5", "7.1.6", "7.2.5", "7.6.2" ]
steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
version: [ "7.1.6", "7.2.5", "7.6.1" ]
version: [ "7.1.6", "7.2.5", "7.6.2" ]
steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ test:
go test ./... .

race:
CB_VERSION=7.6.1 go test ./... -race .
CB_VERSION=7.6.2 go test ./... -race .

tidy:
go mod tidy
Expand Down
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ $ go get github.com/Trendyol/go-dcp
| `dcp.connectionBufferSize` | uint, string | no | 20mb | [gocbcore](github.com/couchbase/gocbcore) library buffer size. `20mb` is default. Check this if you get OOM Killed. |
| `dcp.connectionTimeout` | time.Duration | no | 5s | DCP connection timeout. |
| `dcp.maxQueueSize` | int | no | 2048 | The maximum number of requests that can be queued waiting to be sent to a node. `2048` is default. Check this if you get queue overflowed or queue full. |
| `dcp.listener.bufferSize` | uint | no | 1000 | Go DCP listener buffered channel size. |
| `dcp.listener.skipUntil` | time.Time | no | | Set this if you want to skip events until certain time. |
| `dcp.group.membership.type` | string | no | | DCP membership types. `couchbase`, `kubernetesHa`, `kubernetesStatefulSet`, `static` or `dynamic`. Check examples for details. |
| `dcp.group.membership.memberNumber` | int | no | 1 | Set this if membership is `static`. Other methods will ignore this field. |
Expand All @@ -114,7 +113,7 @@ $ go get github.com/Trendyol/go-dcp
| `rollbackMitigation.configWatchInterval` | time.Duration | no | 2s | Cluster config changes listener interval. |
| `metadata.type` | string | no | couchbase | Metadata storing types. `file` or `couchbase`. |
| `metadata.readOnly` | bool | no | false | Set this for debugging state purposes. |
| `metadata.config` | map[string]string | no | *not set | Set key-values of config. `bucket`,`scope`,`collection`,`connectionBufferSize`,`connectionTimeout` for `couchbase` type |
| `metadata.config` | map[string]string | no | *not set | Set key-values of config. `bucket`,`scope`,`collection`,`maxQueueSize`,`connectionBufferSize`,`connectionTimeout` for `couchbase` type |
| `api.disabled` | bool | no | false | Disable metric endpoints |
| `api.port` | int | no | 8080 | Set API port |
| `metric.path` | string | no | /metrics | Set metric endpoint path. |
Expand Down
14 changes: 8 additions & 6 deletions config/dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
CouchbaseMetadataBucketConfig = "bucket"
CouchbaseMetadataScopeConfig = "scope"
CouchbaseMetadataCollectionConfig = "collection"
CouchbaseMetadataMaxQueueSizeConfig = "maxQueueSize"
CouchbaseMetadataConnectionBufferSizeConfig = "connectionBufferSize"
CouchbaseMetadataConnectionTimeoutConfig = "connectionTimeout"
CheckpointTypeAuto = "auto"
Expand Down Expand Up @@ -50,8 +51,7 @@ type DCPGroup struct {
}

type DCPListener struct {
SkipUntil *time.Time `yaml:"skipUntil"`
BufferSize uint `yaml:"bufferSize"`
SkipUntil *time.Time `yaml:"skipUntil"`
}

type ExternalDcpConfig struct {
Expand Down Expand Up @@ -307,6 +307,7 @@ type CouchbaseMetadata struct {
Bucket string `yaml:"bucket"`
Scope string `yaml:"scope"`
Collection string `yaml:"collection"`
MaxQueueSize int `yaml:"maxQueueSize"`
ConnectionBufferSize uint `yaml:"connectionBufferSize"`
ConnectionTimeout time.Duration `yaml:"connectionTimeout"`
}
Expand All @@ -316,6 +317,7 @@ func (c *Dcp) GetCouchbaseMetadata() *CouchbaseMetadata {
Bucket: c.BucketName,
Scope: DefaultScopeName,
Collection: DefaultCollectionName,
MaxQueueSize: 2048,
ConnectionBufferSize: 5242880, // 5 MB
ConnectionTimeout: 5 * time.Second,
}
Expand All @@ -332,6 +334,10 @@ func (c *Dcp) GetCouchbaseMetadata() *CouchbaseMetadata {
couchbaseMetadata.Collection = collection
}

if maxQueueSize, ok := c.Metadata.Config[CouchbaseMetadataMaxQueueSizeConfig]; ok {
couchbaseMetadata.MaxQueueSize = helpers.ResolveUnionIntOrStringValue(maxQueueSize)
}

if connectionBufferSize, ok := c.Metadata.Config[CouchbaseMetadataConnectionBufferSizeConfig]; ok {
couchbaseMetadata.ConnectionBufferSize = uint(helpers.ResolveUnionIntOrStringValue(connectionBufferSize))
}
Expand Down Expand Up @@ -511,10 +517,6 @@ func (c *Dcp) applyDefaultDcp() {
if c.Dcp.MaxQueueSize == 0 {
c.Dcp.MaxQueueSize = 2048
}

if c.Dcp.Listener.BufferSize == 0 {
c.Dcp.Listener.BufferSize = 1000
}
}

func (c *Dcp) applyDefaultMetadata() {
Expand Down
8 changes: 0 additions & 8 deletions config/dcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ func TestDefaultConfig(t *testing.T) {
t.Errorf("Checkpoint.Type is not set to auto")
}

if config.Dcp.Listener.BufferSize != 1000 {
t.Errorf("Dcp.Listener.BufferSize is not set to 1000")
}

if config.Dcp.Group.Membership.Type != MembershipTypeCouchbase {
t.Errorf("Dcp.Group.Membership.Type is not set to couchbase")
}
Expand Down Expand Up @@ -310,10 +306,6 @@ func TestDcpApplyDefaultDcp(t *testing.T) {
if c.Dcp.ConnectionBufferSize.(int) != 20971520 {
t.Errorf("Dcp.ConnectionBufferSize is not set to expected value")
}

if c.Dcp.Listener.BufferSize != 1000 {
t.Errorf("Dcp.Listener.BufferSize is not set to expected value")
}
}

func TestApplyDefaultMetadata(t *testing.T) {
Expand Down
46 changes: 27 additions & 19 deletions couchbase/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Client interface {
DcpClose()
GetVBucketSeqNos(awareCollection bool) (*wrapper.ConcurrentSwissMap[uint16, uint64], error)
GetNumVBuckets() int
GetFailoverLogs(vbID uint16) ([]gocbcore.FailoverEntry, error)
GetFailOverLogs(vbID uint16) ([]gocbcore.FailoverEntry, error)
OpenStream(vbID uint16, collectionIDs map[uint32]string, offset *models.Offset, observer Observer) error
CloseStream(vbID uint16) error
GetCollectionIDs(scopeName string, collectionNames []string) map[uint32]string
Expand Down Expand Up @@ -164,7 +164,7 @@ func CreateSecurityConfig(username string, password string, secureConnection boo

func CreateAgent(httpAddresses []string, bucketName string,
username string, password string, secureConnection bool, rootCAPath string,
maxQueueSize int, connectionBufferSize uint, connectionTimeout time.Duration,
maxQueueSize int, poolSize int, connectionBufferSize uint, connectionTimeout time.Duration,
) (*gocbcore.Agent, error) {
agent, err := gocbcore.CreateAgent(
&gocbcore.AgentConfig{
Expand All @@ -180,6 +180,7 @@ func CreateAgent(httpAddresses []string, bucketName string,
UseCollections: true,
},
KVConfig: gocbcore.KVConfig{
PoolSize: poolSize,
ConnectionBufferSize: connectionBufferSize,
MaxQueueSize: maxQueueSize,
},
Expand Down Expand Up @@ -211,8 +212,14 @@ func CreateAgent(httpAddresses []string, bucketName string,
return agent, nil
}

func (s *client) connect(bucketName string, maxQueueSize int, connectionBufferSize uint, connectionTimeout time.Duration) (*gocbcore.Agent, error) { //nolint:lll,unused
return CreateAgent(s.config.Hosts, bucketName, s.config.Username, s.config.Password, s.config.SecureConnection, s.config.RootCAPath, maxQueueSize, connectionBufferSize, connectionTimeout) //nolint:lll
func (s *client) connect(bucketName string,
maxQueueSize int, poolSize int, connectionBufferSize uint, connectionTimeout time.Duration,
) (*gocbcore.Agent, error) {
return CreateAgent(
s.config.Hosts, bucketName, s.config.Username, s.config.Password,
s.config.SecureConnection, s.config.RootCAPath,
maxQueueSize, poolSize, connectionBufferSize, connectionTimeout,
)
}

func resolveHostsAsHTTP(hosts []string) []string {
Expand Down Expand Up @@ -257,7 +264,7 @@ func (s *client) Connect() error {
}
}

agent, err := s.connect(s.config.BucketName, s.config.MaxQueueSize, connectionBufferSize, connectionTimeout)
agent, err := s.connect(s.config.BucketName, s.config.MaxQueueSize, 0, connectionBufferSize, connectionTimeout)
if err != nil {
logger.Log.Error("error while connect to source bucket, err: %v", err)
return err
Expand All @@ -272,7 +279,8 @@ func (s *client) Connect() error {
} else {
metaAgent, err := s.connect(
couchbaseMetadataConfig.Bucket,
0, // gocb will use default value (2048)
couchbaseMetadataConfig.MaxQueueSize,
0,
couchbaseMetadataConfig.ConnectionBufferSize,
couchbaseMetadataConfig.ConnectionTimeout,
)
Expand Down Expand Up @@ -516,20 +524,20 @@ func (s *client) GetDcpAgentConfigSnapshot() (*gocbcore.ConfigSnapshot, error) {
return s.dcpAgent.ConfigSnapshot()
}

func (s *client) GetFailoverLogs(vbID uint16) ([]gocbcore.FailoverEntry, error) {
func (s *client) GetFailOverLogs(vbID uint16) ([]gocbcore.FailoverEntry, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()

opm := NewAsyncOp(ctx)

ch := make(chan error, 1)

var failoverLogs []gocbcore.FailoverEntry
var failOverLogs []gocbcore.FailoverEntry

op, err := s.dcpAgent.GetFailoverLog(
vbID,
func(entries []gocbcore.FailoverEntry, err error) {
failoverLogs = entries
failOverLogs = entries

opm.Resolve()

Expand All @@ -541,7 +549,7 @@ func (s *client) GetFailoverLogs(vbID uint16) ([]gocbcore.FailoverEntry, error)
return nil, err
}

return failoverLogs, <-ch
return failOverLogs, <-ch
}

func (s *client) openStreamWithRollback(vbID uint16,
Expand All @@ -555,16 +563,16 @@ func (s *client) openStreamWithRollback(vbID uint16,
vbID, failedSeqNo, rollbackSeqNo,
)

failoverLogs, err := s.GetFailoverLogs(vbID)
failOverLogs, err := s.GetFailOverLogs(vbID)
if err != nil {
logger.Log.Error("error while get failover logs when rollback, err: %v", err)
logger.Log.Error("error while get failOver logs when rollback, err: %v", err)
return err
}

var targetUUID gocbcore.VbUUID = 0

for i := len(failoverLogs) - 1; i >= 0; i-- {
log := failoverLogs[i]
for i := len(failOverLogs) - 1; i >= 0; i-- {
log := failOverLogs[i]
if rollbackSeqNo >= log.SeqNo {
targetUUID = log.VbUUID
}
Expand All @@ -587,10 +595,10 @@ func (s *client) openStreamWithRollback(vbID uint16,
rollbackSeqNo,
observer,
openStreamOptions,
func(failoverLogs []gocbcore.FailoverEntry, err error) {
func(failOverLogs []gocbcore.FailoverEntry, err error) {
if err == nil {
observer.SetVbUUID(vbID, failoverLogs[0].VbUUID)
observer.AddCatchup(vbID, failedSeqNo)
observer.SetVbUUID(failOverLogs[0].VbUUID)
observer.SetCatchup(failedSeqNo)
}

opm.Resolve()
Expand Down Expand Up @@ -648,9 +656,9 @@ func (s *client) OpenStream(
gocbcore.SeqNo(offset.EndSeqNo),
observer,
openStreamOptions,
func(failoverLogs []gocbcore.FailoverEntry, err error) {
func(failOverLogs []gocbcore.FailoverEntry, err error) {
if err == nil {
observer.SetVbUUID(vbID, failoverLogs[0].VbUUID)
observer.SetVbUUID(failOverLogs[0].VbUUID)
}

opm.Resolve()
Expand Down
Loading

0 comments on commit 987b584

Please sign in to comment.