From 795650665825c6005b1762013f5f701020d2f6c5 Mon Sep 17 00:00:00 2001 From: Aman Garg Date: Sun, 15 Aug 2021 16:07:46 +0530 Subject: [PATCH 1/3] Fix goleak in #monitor when channel close isn't communicated --- hystrix/circuit.go | 14 +++++++++----- hystrix/metrics.go | 40 ++++++++++++++++++++++++---------------- hystrix/metrics_test.go | 9 ++++++++- hystrix/pool.go | 6 ++++-- hystrix/pool_metrics.go | 24 ++++++++++++++++-------- hystrix/pool_test.go | 5 +++-- 6 files changed, 64 insertions(+), 34 deletions(-) diff --git a/hystrix/circuit.go b/hystrix/circuit.go index 87d88b9..83f1417 100644 --- a/hystrix/circuit.go +++ b/hystrix/circuit.go @@ -1,6 +1,7 @@ package hystrix import ( + "context" "fmt" "sync" "sync/atomic" @@ -15,9 +16,10 @@ type CircuitBreaker struct { forceOpen bool mutex *sync.RWMutex openedOrLastTestedTime int64 - - executorPool *executorPool - metrics *metricExchange + ctx context.Context + ctxCancelFunc context.CancelFunc + executorPool *executorPool + metrics *metricExchange } var ( @@ -60,6 +62,7 @@ func Flush() { for name, cb := range circuitBreakers { cb.metrics.Reset() cb.executorPool.Metrics.Reset() + cb.ctxCancelFunc() delete(circuitBreakers, name) } } @@ -67,9 +70,10 @@ func Flush() { // newCircuitBreaker creates a CircuitBreaker with associated Health func newCircuitBreaker(name string) *CircuitBreaker { c := &CircuitBreaker{} + c.ctx, c.ctxCancelFunc = context.WithCancel(context.TODO()) c.Name = name - c.metrics = newMetricExchange(name) - c.executorPool = newExecutorPool(name) + c.metrics = newMetricExchange(c.ctx, name) + c.executorPool = newExecutorPool(c.ctx, name) c.mutex = &sync.RWMutex{} return c diff --git a/hystrix/metrics.go b/hystrix/metrics.go index d289fe6..3faa512 100644 --- a/hystrix/metrics.go +++ b/hystrix/metrics.go @@ -1,10 +1,12 @@ package hystrix import ( + "context" + "fmt" "sync" "time" - "github.com/afex/hystrix-go/hystrix/metric_collector" + metricCollector "github.com/afex/hystrix-go/hystrix/metric_collector" "github.com/afex/hystrix-go/hystrix/rolling" ) @@ -23,7 +25,7 @@ type metricExchange struct { metricCollectors []metricCollector.MetricCollector } -func newMetricExchange(name string) *metricExchange { +func newMetricExchange(ctx context.Context, name string) *metricExchange { m := &metricExchange{} m.Name = name @@ -32,7 +34,7 @@ func newMetricExchange(name string) *metricExchange { m.metricCollectors = metricCollector.Registry.InitializeMetricCollectors(name) m.Reset() - go m.Monitor() + go m.Monitor(ctx) return m } @@ -49,20 +51,26 @@ func (m *metricExchange) DefaultCollector() *metricCollector.DefaultMetricCollec return collection } -func (m *metricExchange) Monitor() { - for update := range m.Updates { - // we only grab a read lock to make sure Reset() isn't changing the numbers. - m.Mutex.RLock() - - totalDuration := time.Since(update.Start) - wg := &sync.WaitGroup{} - for _, collector := range m.metricCollectors { - wg.Add(1) - go m.IncrementMetrics(wg, collector, update, totalDuration) +func (m *metricExchange) Monitor(ctx context.Context) { + for { + select { + case <-ctx.Done(): + fmt.Println("No longer waiting in func (m *metricExchange) Monitor(ctx context.Context) {") + return + case u := <-m.Updates: + // we only grab a read lock to make sure Reset() isn't changing the numbers. + m.Mutex.RLock() + + totalDuration := time.Since(u.Start) + wg := &sync.WaitGroup{} + for _, collector := range m.metricCollectors { + wg.Add(1) + go m.IncrementMetrics(wg, collector, u, totalDuration) + } + wg.Wait() + + m.Mutex.RUnlock() } - wg.Wait() - - m.Mutex.RUnlock() } } diff --git a/hystrix/metrics_test.go b/hystrix/metrics_test.go index 188d5ba..a1ab364 100644 --- a/hystrix/metrics_test.go +++ b/hystrix/metrics_test.go @@ -1,14 +1,17 @@ package hystrix import ( + "context" "testing" "time" + "go.uber.org/goleak" + . "github.com/smartystreets/goconvey/convey" ) func metricFailingPercent(p int) *metricExchange { - m := newMetricExchange("") + m := newMetricExchange(context.Background(), "") for i := 0; i < 100; i++ { t := "success" if i < p { @@ -43,3 +46,7 @@ func TestErrorPercent(t *testing.T) { }) }) } + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/hystrix/pool.go b/hystrix/pool.go index d2c58c8..0773240 100644 --- a/hystrix/pool.go +++ b/hystrix/pool.go @@ -1,5 +1,7 @@ package hystrix +import "context" + type executorPool struct { Name string Metrics *poolMetrics @@ -7,10 +9,10 @@ type executorPool struct { Tickets chan *struct{} } -func newExecutorPool(name string) *executorPool { +func newExecutorPool(ctx context.Context, name string) *executorPool { p := &executorPool{} p.Name = name - p.Metrics = newPoolMetrics(name) + p.Metrics = newPoolMetrics(ctx, name) p.Max = getSettings(name).MaxConcurrentRequests p.Tickets = make(chan *struct{}, p.Max) diff --git a/hystrix/pool_metrics.go b/hystrix/pool_metrics.go index 93e97d9..187f4a9 100644 --- a/hystrix/pool_metrics.go +++ b/hystrix/pool_metrics.go @@ -1,6 +1,8 @@ package hystrix import ( + "context" + "fmt" "sync" "github.com/afex/hystrix-go/hystrix/rolling" @@ -19,7 +21,7 @@ type poolMetricsUpdate struct { activeCount int } -func newPoolMetrics(name string) *poolMetrics { +func newPoolMetrics(ctx context.Context, name string) *poolMetrics { m := &poolMetrics{} m.Name = name m.Updates = make(chan poolMetricsUpdate) @@ -27,7 +29,7 @@ func newPoolMetrics(name string) *poolMetrics { m.Reset() - go m.Monitor() + go m.Monitor(ctx) return m } @@ -40,13 +42,19 @@ func (m *poolMetrics) Reset() { m.Executed = rolling.NewNumber() } -func (m *poolMetrics) Monitor() { - for u := range m.Updates { - m.Mutex.RLock() +func (m *poolMetrics) Monitor(ctx context.Context) { + for { + select { + case <-ctx.Done(): + fmt.Println("No longer waiting in (m *poolMetrics) Monitor(ctx context.Context)") + return + case u := <-m.Updates: + m.Mutex.RLock() - m.Executed.Increment(1) - m.MaxActiveRequests.UpdateMax(float64(u.activeCount)) + m.Executed.Increment(1) + m.MaxActiveRequests.UpdateMax(float64(u.activeCount)) - m.Mutex.RUnlock() + m.Mutex.RUnlock() + } } } diff --git a/hystrix/pool_test.go b/hystrix/pool_test.go index 6aff5da..8c42d67 100644 --- a/hystrix/pool_test.go +++ b/hystrix/pool_test.go @@ -1,6 +1,7 @@ package hystrix import ( + "context" "testing" "time" @@ -11,7 +12,7 @@ func TestReturn(t *testing.T) { defer Flush() Convey("when returning a ticket to the pool", t, func() { - pool := newExecutorPool("pool") + pool := newExecutorPool(context.Background(), "pool") ticket := <-pool.Tickets pool.Return(ticket) time.Sleep(1 * time.Millisecond) @@ -25,7 +26,7 @@ func TestActiveCount(t *testing.T) { defer Flush() Convey("when 3 tickets are pulled", t, func() { - pool := newExecutorPool("pool") + pool := newExecutorPool(context.Background(), "pool") <-pool.Tickets <-pool.Tickets ticket := <-pool.Tickets From b3f5486bf91be8fad176021eb5a1b5bcfeb1f701 Mon Sep 17 00:00:00 2001 From: Aman Garg Date: Sun, 15 Aug 2021 18:31:29 +0530 Subject: [PATCH 2/3] Fix leaky tests in pool.go --- hystrix/metrics.go | 2 -- hystrix/pool.go | 4 +++- hystrix/pool_metrics.go | 4 ---- hystrix/pool_test.go | 10 ++++++---- 4 files changed, 9 insertions(+), 11 deletions(-) diff --git a/hystrix/metrics.go b/hystrix/metrics.go index 3faa512..70db83b 100644 --- a/hystrix/metrics.go +++ b/hystrix/metrics.go @@ -2,7 +2,6 @@ package hystrix import ( "context" - "fmt" "sync" "time" @@ -55,7 +54,6 @@ func (m *metricExchange) Monitor(ctx context.Context) { for { select { case <-ctx.Done(): - fmt.Println("No longer waiting in func (m *metricExchange) Monitor(ctx context.Context) {") return case u := <-m.Updates: // we only grab a read lock to make sure Reset() isn't changing the numbers. diff --git a/hystrix/pool.go b/hystrix/pool.go index 0773240..9a9672f 100644 --- a/hystrix/pool.go +++ b/hystrix/pool.go @@ -1,6 +1,8 @@ package hystrix -import "context" +import ( + "context" +) type executorPool struct { Name string diff --git a/hystrix/pool_metrics.go b/hystrix/pool_metrics.go index 187f4a9..dc4010a 100644 --- a/hystrix/pool_metrics.go +++ b/hystrix/pool_metrics.go @@ -2,7 +2,6 @@ package hystrix import ( "context" - "fmt" "sync" "github.com/afex/hystrix-go/hystrix/rolling" @@ -46,14 +45,11 @@ func (m *poolMetrics) Monitor(ctx context.Context) { for { select { case <-ctx.Done(): - fmt.Println("No longer waiting in (m *poolMetrics) Monitor(ctx context.Context)") return case u := <-m.Updates: m.Mutex.RLock() - m.Executed.Increment(1) m.MaxActiveRequests.UpdateMax(float64(u.activeCount)) - m.Mutex.RUnlock() } } diff --git a/hystrix/pool_test.go b/hystrix/pool_test.go index 8c42d67..773fed7 100644 --- a/hystrix/pool_test.go +++ b/hystrix/pool_test.go @@ -9,10 +9,11 @@ import ( ) func TestReturn(t *testing.T) { - defer Flush() + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() Convey("when returning a ticket to the pool", t, func() { - pool := newExecutorPool(context.Background(), "pool") + pool := newExecutorPool(ctx, "pool") ticket := <-pool.Tickets pool.Return(ticket) time.Sleep(1 * time.Millisecond) @@ -23,10 +24,11 @@ func TestReturn(t *testing.T) { } func TestActiveCount(t *testing.T) { - defer Flush() + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() Convey("when 3 tickets are pulled", t, func() { - pool := newExecutorPool(context.Background(), "pool") + pool := newExecutorPool(ctx, "pool") <-pool.Tickets <-pool.Tickets ticket := <-pool.Tickets From bf329b393f82e4ebf395b30e4a588a61a8cea670 Mon Sep 17 00:00:00 2001 From: Aman Garg Date: Sun, 15 Aug 2021 18:55:24 +0530 Subject: [PATCH 3/3] Fix leaky tests in hystrix.go/metrics.go --- hystrix/circuit_test.go | 3 +-- hystrix/hystrix.go | 2 +- hystrix/hystrix_test.go | 39 +++++++++++++++++++++++++++++++-------- hystrix/metrics.go | 8 +++++--- hystrix/metrics_test.go | 19 +++++++++++++++---- hystrix/pool.go | 14 ++++++++++---- hystrix/pool_metrics.go | 5 ++++- hystrix/pool_test.go | 4 ++-- 8 files changed, 69 insertions(+), 25 deletions(-) diff --git a/hystrix/circuit_test.go b/hystrix/circuit_test.go index 9f6ba54..fde2046 100644 --- a/hystrix/circuit_test.go +++ b/hystrix/circuit_test.go @@ -74,7 +74,6 @@ func TestMultithreadedGetCircuit(t *testing.T) { func TestReportEventOpenThenClose(t *testing.T) { Convey("when a circuit is closed", t, func() { defer Flush() - ConfigureCommand("", CommandConfig{ErrorPercentThreshold: 50}) cb, _, err := GetCircuit("") @@ -83,7 +82,7 @@ func TestReportEventOpenThenClose(t *testing.T) { openedTime := cb.openedOrLastTestedTime Convey("but the metrics are unhealthy", func() { - cb.metrics = metricFailingPercent(100) + cb.metrics = metricFailingPercentWithContext(cb.ctx, 100) So(cb.metrics.IsHealthy(time.Now()), ShouldBeFalse) Convey("and a success is reported", func() { diff --git a/hystrix/hystrix.go b/hystrix/hystrix.go index 8b4d68f..9acc808 100644 --- a/hystrix/hystrix.go +++ b/hystrix/hystrix.go @@ -100,7 +100,7 @@ func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) for !ticketChecked { ticketCond.Wait() } - cmd.circuit.executorPool.Return(cmd.ticket) + cmd.circuit.executorPool.Return(cmd.circuit.ctx, cmd.ticket) cmd.Unlock() } // Shared by the following two goroutines. It ensures only the faster diff --git a/hystrix/hystrix_test.go b/hystrix/hystrix_test.go index 44f03ea..dc4f0a6 100644 --- a/hystrix/hystrix_test.go +++ b/hystrix/hystrix_test.go @@ -73,7 +73,10 @@ func TestTimeout(t *testing.T) { ConfigureCommand("", CommandConfig{Timeout: 100}) resultChan := make(chan int) - errChan := GoC(context.Background(), "", func(ctx context.Context) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + errChan := GoC(ctx, "", func(ctx context.Context) error { time.Sleep(1 * time.Second) resultChan <- 1 return nil @@ -99,7 +102,10 @@ func TestTimeoutEmptyFallback(t *testing.T) { ConfigureCommand("", CommandConfig{Timeout: 100}) resultChan := make(chan int) - errChan := GoC(context.Background(), "", func(ctx context.Context) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + errChan := GoC(ctx, "", func(ctx context.Context) error { time.Sleep(1 * time.Second) resultChan <- 1 return nil @@ -131,12 +137,14 @@ func TestMaxConcurrent(t *testing.T) { resultChan <- 1 return nil } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() Convey("and 3 of those commands try to execute at the same time", func() { var good, bad int for i := 0; i < 3; i++ { - errChan := GoC(context.Background(), "", run, nil) + errChan := GoC(ctx, "", run, nil) time.Sleep(10 * time.Millisecond) select { @@ -346,11 +354,20 @@ func TestReturnTicket_QuickCheck(t *testing.T) { compareTicket := func() bool { defer Flush() ConfigureCommand("", CommandConfig{Timeout: 2}) - errChan := GoC(context.Background(), "", func(ctx context.Context) error { + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + errChan := GoC(ctx, "", func(ctx context.Context) error { + //there are multiple ways to block here, the following sequence of steps + //will block:: c := make(chan struct{}); <-c; return nil // should block + //however, this would leak the internal GoC.func goroutine + //another non-leaking way to do this would be to simply: return ErrTimeout c := make(chan struct{}) - <-c // should block + <-c // should block (hence we add an exception in go-leak) return nil }, nil) + err := <-errChan So(err, ShouldResemble, ErrTimeout) cb, _, err := GetCircuit("") @@ -371,10 +388,16 @@ func TestReturnTicket(t *testing.T) { defer Flush() ConfigureCommand("", CommandConfig{Timeout: 10}) - - errChan := GoC(context.Background(), "", func(ctx context.Context) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + errChan := GoC(ctx, "", func(ctx context.Context) error { + //there are multiple ways to block here, the following sequence of steps + //will block:: c := make(chan struct{}); <-c; return nil // should block + //however, this would leak the internal GoC.func goroutine + //another non-leaking way to do this would be to simply: return ErrTimeout c := make(chan struct{}) - <-c // should block + <-c // should block (hence we add an exception in go-leak) return nil }, nil) diff --git a/hystrix/metrics.go b/hystrix/metrics.go index 70db83b..861981b 100644 --- a/hystrix/metrics.go +++ b/hystrix/metrics.go @@ -38,7 +38,7 @@ func newMetricExchange(ctx context.Context, name string) *metricExchange { return m } -// The Default Collector function will panic if collectors are not setup to specification. +// DefaultCollector will panic if collectors are not setup to specification. func (m *metricExchange) DefaultCollector() *metricCollector.DefaultMetricCollector { if len(m.metricCollectors) < 1 { panic("No Metric Collectors Registered.") @@ -55,7 +55,10 @@ func (m *metricExchange) Monitor(ctx context.Context) { select { case <-ctx.Done(): return - case u := <-m.Updates: + case u, ok := <-m.Updates: + if !ok { + return + } // we only grab a read lock to make sure Reset() isn't changing the numbers. m.Mutex.RLock() @@ -66,7 +69,6 @@ func (m *metricExchange) Monitor(ctx context.Context) { go m.IncrementMetrics(wg, collector, u, totalDuration) } wg.Wait() - m.Mutex.RUnlock() } } diff --git a/hystrix/metrics_test.go b/hystrix/metrics_test.go index a1ab364..9046bfb 100644 --- a/hystrix/metrics_test.go +++ b/hystrix/metrics_test.go @@ -11,7 +11,11 @@ import ( ) func metricFailingPercent(p int) *metricExchange { - m := newMetricExchange(context.Background(), "") + return metricFailingPercentWithContext(context.Background(), p) +} + +func metricFailingPercentWithContext(ctx context.Context, p int) *metricExchange { + m := newMetricExchange(ctx, "") for i := 0; i < 100; i++ { t := "success" if i < p { @@ -20,7 +24,7 @@ func metricFailingPercent(p int) *metricExchange { m.Updates <- &commandExecution{Types: []string{t}} } - // Updates needs to be flushed + // updates need to be flushed time.Sleep(100 * time.Millisecond) return m @@ -28,7 +32,10 @@ func metricFailingPercent(p int) *metricExchange { func TestErrorPercent(t *testing.T) { Convey("with a metric failing 40 percent of the time", t, func() { - m := metricFailingPercent(40) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + m := metricFailingPercentWithContext(ctx, 40) now := time.Now() Convey("ErrorPercent() should return 40", func() { @@ -48,5 +55,9 @@ func TestErrorPercent(t *testing.T) { } func TestMain(m *testing.M) { - goleak.VerifyTestMain(m) + goleak.VerifyTestMain(m, + goleak.IgnoreTopFunction("time.Sleep"), //tests that sleep in goroutines explicitly + goleak.IgnoreTopFunction("github.com/afex/hystrix-go/hystrix.TestReturnTicket.func1.1"), //explicit leak + goleak.IgnoreTopFunction("github.com/afex/hystrix-go/hystrix.TestReturnTicket_QuickCheck.func1.1"), //explicit leak + ) } diff --git a/hystrix/pool.go b/hystrix/pool.go index 9a9672f..e3851d6 100644 --- a/hystrix/pool.go +++ b/hystrix/pool.go @@ -25,15 +25,21 @@ func newExecutorPool(ctx context.Context, name string) *executorPool { return p } -func (p *executorPool) Return(ticket *struct{}) { +func (p *executorPool) Return(ctx context.Context, ticket *struct{}) { if ticket == nil { return } - p.Metrics.Updates <- poolMetricsUpdate{ - activeCount: p.ActiveCount(), + for { + select { + case <-ctx.Done(): + return + default: + p.Metrics.Updates <- poolMetricsUpdate{activeCount: p.ActiveCount()} + p.Tickets <- ticket + return + } } - p.Tickets <- ticket } func (p *executorPool) ActiveCount() int { diff --git a/hystrix/pool_metrics.go b/hystrix/pool_metrics.go index dc4010a..41d8a23 100644 --- a/hystrix/pool_metrics.go +++ b/hystrix/pool_metrics.go @@ -46,7 +46,10 @@ func (m *poolMetrics) Monitor(ctx context.Context) { select { case <-ctx.Done(): return - case u := <-m.Updates: + case u, ok := <-m.Updates: + if !ok { + return + } m.Mutex.RLock() m.Executed.Increment(1) m.MaxActiveRequests.UpdateMax(float64(u.activeCount)) diff --git a/hystrix/pool_test.go b/hystrix/pool_test.go index 773fed7..22a4a7e 100644 --- a/hystrix/pool_test.go +++ b/hystrix/pool_test.go @@ -15,7 +15,7 @@ func TestReturn(t *testing.T) { Convey("when returning a ticket to the pool", t, func() { pool := newExecutorPool(ctx, "pool") ticket := <-pool.Tickets - pool.Return(ticket) + pool.Return(ctx, ticket) time.Sleep(1 * time.Millisecond) Convey("total executed requests should increment", func() { So(pool.Metrics.Executed.Sum(time.Now()), ShouldEqual, 1) @@ -38,7 +38,7 @@ func TestActiveCount(t *testing.T) { }) Convey("and one is returned", func() { - pool.Return(ticket) + pool.Return(ctx, ticket) Convey("max active requests should be 3", func() { time.Sleep(1 * time.Millisecond) // allow poolMetrics to process channel