Skip to content

Commit

Permalink
feat: race cases fixed and bucket info print added
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Dec 14, 2023
1 parent e892dde commit ccfe694
Show file tree
Hide file tree
Showing 12 changed files with 210 additions and 107 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
run: go build -v ./...

- name: Test
run: go test -v ./...
run: go test -race -v ./...

security-gates:
uses: Trendyol/security-actions/.github/workflows/security-gates.yml@main
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/dcp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ jobs:
run: go get .

- name: Test DCP
run: go test -run "^\QTestDcp\E$"
run: go test -race -run "^\QTestDcp\E$"
env:
CB_VERSION: ${{ matrix.version }}
2 changes: 1 addition & 1 deletion .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
run: make compose

- name: Integration Test
run: go test -v test/integration/integration_test.go
run: go test -race -v test/integration/integration_test.go
env:
INPUT_PUBLISH: false
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
2 changes: 1 addition & 1 deletion config/dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type RollbackMitigation struct {
type Metadata struct {
Config map[string]string `yaml:"config"`
Type string `yaml:"type"`
ReadOnly bool `json:"readOnly"`
ReadOnly bool `yaml:"readOnly"`
}

type Logging struct {
Expand Down
75 changes: 13 additions & 62 deletions couchbase/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@ package couchbase
import (
"context"
"crypto/x509"
"encoding/base64"
"errors"
"fmt"
"os"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/valyala/fasthttp"

"github.com/Trendyol/go-dcp/helpers"

"github.com/couchbase/gocbcore/v10/connstr"
Expand All @@ -28,7 +24,6 @@ import (
)

type Client interface {
GetVersion() (string, error)
Ping() (*models.PingResult, error)
GetAgent() *gocbcore.Agent
GetMetaAgent() *gocbcore.Agent
Expand All @@ -52,91 +47,47 @@ type client struct {
config *config.Dcp
}

func (s *client) GetVersion() (string, error) {
pingResult, err := s.Ping()
if err != nil {
return "", err
}

freq := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(freq)

fres := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(fres)

uri := fmt.Sprintf("%v/pools", pingResult.MgmtEndpoint)
freq.SetRequestURI(uri)
freq.Header.SetMethod("GET")
freq.Header.Set(
"Authorization",
"Basic "+base64.StdEncoding.EncodeToString([]byte(s.config.Username+":"+s.config.Password)),
)

client := &fasthttp.Client{}
err = client.Do(freq, fres)
if err != nil {
return "", err
}

var result models.PoolsResult
err = jsoniter.Unmarshal(fres.Body(), &result)
if err != nil {
return "", err
func getServiceEndpoint(result *gocbcore.PingResult, serviceType gocbcore.ServiceType) string {
if serviceResults, ok := result.Services[serviceType]; ok {
for _, serviceResult := range serviceResults {
if serviceResult.Error == nil &&
serviceResult.State == gocbcore.PingStateOK {
return serviceResult.Endpoint
}
}
}

return result.ImplementationVersion, nil
return ""
}

func (s *client) Ping() (*models.PingResult, error) {
ctx, cancel := context.WithTimeout(context.Background(), s.config.HealthCheck.Timeout)
defer cancel()

opm := NewAsyncOp(ctx)

errorCh := make(chan error)

var pingResult models.PingResult

op, err := s.agent.Ping(gocbcore.PingOptions{
ServiceTypes: []gocbcore.ServiceType{gocbcore.MemdService, gocbcore.MgmtService},
}, func(result *gocbcore.PingResult, err error) {
memdSuccess := false
mgmtSuccess := false

if err == nil {
if memdServiceResults, ok := result.Services[gocbcore.MemdService]; ok {
for _, memdServiceResult := range memdServiceResults {
if memdServiceResult.Error == nil && memdServiceResult.State == gocbcore.PingStateOK {
memdSuccess = true
pingResult.MemdEndpoint = memdServiceResult.Endpoint
break
}
}
}

if mgmtServiceResults, ok := result.Services[gocbcore.MgmtService]; ok {
for _, mgmtServiceResult := range mgmtServiceResults {
if mgmtServiceResult.Error == nil && mgmtServiceResult.State == gocbcore.PingStateOK {
mgmtSuccess = true
pingResult.MgmtEndpoint = mgmtServiceResult.Endpoint
break
}
}
}
pingResult.MemdEndpoint = getServiceEndpoint(result, gocbcore.MemdService)
pingResult.MgmtEndpoint = getServiceEndpoint(result, gocbcore.MgmtService)
}

if !memdSuccess || !mgmtSuccess {
if pingResult.MemdEndpoint == "" || pingResult.MgmtEndpoint == "" {
if err == nil {
err = errors.New("some services are not healthy")
}
}

opm.Resolve()

errorCh <- err
})

err = opm.Wait(op, err)

if err != nil {
return nil, err
}
Expand Down
106 changes: 106 additions & 0 deletions couchbase/http_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package couchbase

import (
"encoding/base64"
"fmt"

"github.com/Trendyol/go-dcp/config"

jsoniter "github.com/json-iterator/go"
"github.com/valyala/fasthttp"
)

type PoolsResult struct {
ImplementationVersion string `json:"implementationVersion"`
}

type BucketResult struct {
BucketType string `json:"bucketType"`
StorageBackend string `json:"storageBackend"`
}

type HTTPClient interface {
Connect() error
GetVersion() (string, error)
GetBucketInformation() (*BucketResult, error)
}

type httpClient struct {
config *config.Dcp
httpClient *fasthttp.Client
client Client
baseURL string
}

func (h *httpClient) Connect() error {
pingResult, err := h.client.Ping()
if err != nil {
return err
}

h.baseURL = pingResult.MgmtEndpoint

return nil
}

func (h *httpClient) doRequest(req *fasthttp.Request, v interface{}) error {
req.Header.Set(
"Authorization",
"Basic "+base64.StdEncoding.EncodeToString([]byte(h.config.Username+":"+h.config.Password)),
)

res := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(res)

err := h.httpClient.Do(req, res)
if err != nil {
return err
}

err = jsoniter.Unmarshal(res.Body(), v)
if err != nil {
return err
}

return nil
}

func (h *httpClient) GetVersion() (string, error) {
req := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)

req.SetRequestURI(fmt.Sprintf("%v/pools", h.baseURL))
req.Header.SetMethod("GET")

var result PoolsResult
err := h.doRequest(req, &result)
if err != nil {
return "", err
}

return result.ImplementationVersion, nil
}

func (h *httpClient) GetBucketInformation() (*BucketResult, error) {
req := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)

req.SetRequestURI(fmt.Sprintf("%v/pools/default/buckets/%v", h.baseURL, h.config.BucketName))
req.Header.SetMethod("GET")

var result BucketResult
err := h.doRequest(req, &result)
if err != nil {
return nil, err
}

return &result, nil
}

func NewHTTPClient(config *config.Dcp, client Client) HTTPClient {
return &httpClient{
config: config,
httpClient: &fasthttp.Client{},
client: client,
}
}
10 changes: 10 additions & 0 deletions couchbase/membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type cbMembership struct {
config *config.Dcp
membershipConfig *config.CouchbaseMembership
monitorTicker *time.Ticker
lock *sync.Mutex
scopeName string
collectionName string
lastActiveInstances []Instance
Expand All @@ -52,10 +53,15 @@ const (
)

func (h *cbMembership) GetInfo() *membership.Model {
h.lock.Lock()

if h.info != nil {
defer h.lock.Unlock()
return h.info
}

h.lock.Unlock()

return <-h.infoChan
}

Expand Down Expand Up @@ -301,7 +307,10 @@ func (h *cbMembership) Close() {
}

func (h *cbMembership) membershipChangedListener(model *membership.Model) {
h.lock.Lock()
h.info = model
h.lock.Unlock()

go func() {
h.infoChan <- model
}()
Expand All @@ -319,6 +328,7 @@ func NewCBMembership(config *config.Dcp, client Client, bus EventBus.Bus) member
cbm := &cbMembership{
infoChan: make(chan *membership.Model),
client: client,
lock: &sync.Mutex{},
id: []byte(helpers.Prefix + config.Dcp.Group.Name + ":" + _type + ":" + uuid.New().String()),
instanceAll: []byte(helpers.Prefix + config.Dcp.Group.Name + ":" + _type + ":all"),
bus: bus,
Expand Down
Loading

0 comments on commit ccfe694

Please sign in to comment.