Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster lock + tests fix #40

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
6 changes: 2 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
dist: bionic
sudo: required

services:
Expand All @@ -6,13 +7,10 @@ services:
language: go

go:
- 1.8.3
- 1.14.x

sudo: false

install:
- make deps

script:
- make test-no-race
- make compatibility
Expand Down
5 changes: 0 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@ build:
install:
@go install .

deps:
@go get -u -v github.com/kardianos/govendor
@govendor sync
@cd compatibility; govendor sync

test: build
@go fmt . ./check
@go vet . ./check
Expand Down
6 changes: 4 additions & 2 deletions check/broker_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ func (check *HealthCheck) checkBrokerHealth(metadata *proto.MetadataResp) Broker
message := &proto.Message{Value: []byte(payload)}

if _, err := check.producer.Produce(check.config.topicName, check.partitionID, message); err != nil {
log.Println("producer failure - broker unhealthy:", err)
log.Warnf("producer failure - broker unhealthy: %s", err)
} else {
status = check.waitForMessage(message)
}

brokerStatus := BrokerStatus{ID: int32(check.config.brokerID), Status: status}
if status == healthy {
check.producer.Produce(check.config.replicationTopicName, check.replicationPartitionID, message)
if _, err := check.producer.Produce(check.config.replicationTopicName, check.replicationPartitionID, message); err != nil {
log.Warnf("producer failure - replication topic: %s", err)
}
check.brokerInSync(&brokerStatus, metadata)
check.brokerReplicates(&brokerStatus, metadata)
}
Expand Down
6 changes: 0 additions & 6 deletions check/cluster_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ func (check *HealthCheck) checkBrokerMetadata(metadata *proto.MetadataResp, zkBr
cluster.Status = red
}
}

return
}

func (check *HealthCheck) checkTopics(metadata *proto.MetadataResp, zkTopics []ZkTopic, cluster *ClusterStatus) {
Expand Down Expand Up @@ -76,15 +74,11 @@ func (check *HealthCheck) checkTopics(metadata *proto.MetadataResp, zkTopics []Z
cluster.Status = worstStatus(cluster.Status, status.Status)
}
}

return
}

func checkPartition(partition *proto.MetadataRespPartition, zkTopic *ZkTopic, topicStatus *TopicStatus) {
status := PartitionStatus{Status: green}

replicas := partition.Replicas

replicas, ok := zkTopic.Partitions[partition.ID]
if !ok {
status.Status = red
Expand Down
29 changes: 29 additions & 0 deletions check/connectors.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package check

import (
"fmt"
"time"

"github.com/optiopay/kafka"
Expand Down Expand Up @@ -60,11 +61,14 @@ type ZkConnection interface {
Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error)
Children(path string) ([]string, *zk.Stat, error)
Get(path string) ([]byte, *zk.Stat, error)
Lock(path string) error
Unlock(path string) error
}

// Actual implementation based on samuel/go-zookeeper/zk
type zkConnection struct {
connection *zk.Conn
locks map[string]*zk.Lock
}

type zkNullLogger struct {
Expand All @@ -78,11 +82,13 @@ func (zkConn *zkConnection) Connect(servers []string, sessionTimeout time.Durati
}
connection, events, err := zk.Connect(servers, sessionTimeout, loggerOption)
zkConn.connection = connection
zkConn.locks = map[string]*zk.Lock{}
return events, err
}

func (zkConn *zkConnection) Close() {
zkConn.connection.Close()
zkConn.locks = nil
}

func (zkConn *zkConnection) Exists(path string) (bool, *zk.Stat, error) {
Expand All @@ -104,3 +110,26 @@ func (zkConn *zkConnection) Children(path string) ([]string, *zk.Stat, error) {
func (zkConn *zkConnection) Get(path string) ([]byte, *zk.Stat, error) {
return zkConn.connection.Get(path)
}

func (zkConn *zkConnection) Lock(path string) error {
if zkConn.locks == nil {
return fmt.Errorf("connection not initialized")
}
if _, ok := zkConn.locks[path]; !ok {
zkConn.locks[path] = zk.NewLock(zkConn.connection, path, zk.WorldACL(zk.PermAll))
}
lock := zkConn.locks[path]

return lock.Lock()
}

func (zkConn *zkConnection) Unlock(path string) error {
if zkConn.locks == nil {
return fmt.Errorf("connection not initialized")
}
if lock, ok := zkConn.locks[path]; !ok {
return fmt.Errorf("not locked")
} else {
return lock.Unlock()
}
}
Loading