From 231912e118b18ff482a08ae29dc47c047ac8f893 Mon Sep 17 00:00:00 2001 From: raghavTayal Date: Fri, 17 Feb 2023 14:28:55 +0530 Subject: [PATCH 1/3] circuit breaker rolling window config --- .idea/.gitignore | 8 ++++++ .idea/hystrix-go.iml | 9 +++++++ .idea/modules.xml | 8 ++++++ .idea/vcs.xml | 6 +++++ hystrix/rolling/rolling.go | 48 ++++++++++++++++++++++++--------- hystrix/rolling/rolling_test.go | 16 +++++++++-- 6 files changed, 81 insertions(+), 14 deletions(-) create mode 100644 .idea/.gitignore create mode 100644 .idea/hystrix-go.iml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/hystrix-go.iml b/.idea/hystrix-go.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/.idea/hystrix-go.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..7b79fde --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/hystrix/rolling/rolling.go b/hystrix/rolling/rolling.go index eef9c62..2f65104 100644 --- a/hystrix/rolling/rolling.go +++ b/hystrix/rolling/rolling.go @@ -1,15 +1,26 @@ package rolling import ( + "errors" "sync" "time" ) -// Number tracks a numberBucket over a bounded number of -// time buckets. Currently the buckets are one second long and only the last 10 seconds are kept. +const ( + defaultWindow = 10 +) + +var ( + ErrNegativeWindow = errors.New("window must be positive integer") +) + +// Number tracks a numberBucket over a bounded Number of +// time buckets. Currently the buckets are one second long and only the last {window} seconds are kept. + type Number struct { Buckets map[int64]*numberBucket Mutex *sync.RWMutex + window int64 } type numberBucket struct { @@ -19,12 +30,25 @@ type numberBucket struct { // NewNumber initializes a RollingNumber struct. func NewNumber() *Number { r := &Number{ - Buckets: make(map[int64]*numberBucket), + Buckets: make(map[int64]*numberBucket, defaultWindow), Mutex: &sync.RWMutex{}, + window: defaultWindow, } return r } +// NewNumberWithWindow initializes a RollingNumber with window given +func NewNumberWithWindow(window int64) (*Number, error) { + if window <= 0 { + return nil, ErrNegativeWindow + } + return &Number{ + Buckets: make(map[int64]*numberBucket, window), + Mutex: &sync.RWMutex{}, + window: window, + }, nil +} + func (r *Number) getCurrentBucket() *numberBucket { now := time.Now().Unix() var bucket *numberBucket @@ -39,10 +63,9 @@ func (r *Number) getCurrentBucket() *numberBucket { } func (r *Number) removeOldBuckets() { - now := time.Now().Unix() - 10 + now := time.Now().Unix() - r.window for timestamp := range r.Buckets { - // TODO: configurable rolling window if timestamp <= now { delete(r.Buckets, timestamp) } @@ -75,7 +98,7 @@ func (r *Number) UpdateMax(n float64) { r.removeOldBuckets() } -// Sum sums the values over the buckets in the last 10 seconds. +// Sum sums the values over the buckets in the last {window} seconds. func (r *Number) Sum(now time.Time) float64 { sum := float64(0) @@ -83,8 +106,7 @@ func (r *Number) Sum(now time.Time) float64 { defer r.Mutex.RUnlock() for timestamp, bucket := range r.Buckets { - // TODO: configurable rolling window - if timestamp >= now.Unix()-10 { + if timestamp >= now.Unix()-r.window { sum += bucket.Value } } @@ -92,7 +114,7 @@ func (r *Number) Sum(now time.Time) float64 { return sum } -// Max returns the maximum value seen in the last 10 seconds. +// Max returns the maximum value seen in the last window seconds. func (r *Number) Max(now time.Time) float64 { var max float64 @@ -100,8 +122,7 @@ func (r *Number) Max(now time.Time) float64 { defer r.Mutex.RUnlock() for timestamp, bucket := range r.Buckets { - // TODO: configurable rolling window - if timestamp >= now.Unix()-10 { + if timestamp >= now.Unix()-r.window { if bucket.Value > max { max = bucket.Value } @@ -112,5 +133,8 @@ func (r *Number) Max(now time.Time) float64 { } func (r *Number) Avg(now time.Time) float64 { - return r.Sum(now) / 10 + if r.window == 0 { // unexpected 0 + return r.Sum(now) + } + return r.Sum(now) / float64(r.window) } diff --git a/hystrix/rolling/rolling_test.go b/hystrix/rolling/rolling_test.go index f784024..557f94b 100644 --- a/hystrix/rolling/rolling_test.go +++ b/hystrix/rolling/rolling_test.go @@ -3,10 +3,22 @@ package rolling import ( "testing" "time" - - . "github.com/smartystreets/goconvey/convey" ) +func TestWindowAvg(t *testing.T) { + Convey("when adding values to a rolling number", t, func() { + n, _ := NewNumberWithWindow(2) + for _, x := range []float64{0.5, 1.5, 2.5, 3.5, 4.5} { + n.Increment(x) + time.Sleep(1 * time.Second) + } + + Convey("it should calculate the average over the number of configured buckets", func() { + So(n.Avg(time.Now()), ShouldEqual, 4) // (3.5+4.5)/2 + }) + }) +} + func TestMax(t *testing.T) { Convey("when adding values to a rolling number", t, func() { From ba9e20198572678b4a108725c4108eb7095e3515 Mon Sep 17 00:00:00 2001 From: raghavTayal Date: Fri, 17 Feb 2023 15:53:20 +0530 Subject: [PATCH 2/3] fixed imports --- hystrix/eventstream.go | 2 +- hystrix/metric_collector/default_metric_collector.go | 2 +- hystrix/metrics.go | 3 +-- hystrix/pool_metrics.go | 2 +- loadtest/service/main.go | 5 ++--- plugins/datadog_collector.go | 1 - plugins/graphite_aggregator.go | 3 --- plugins/statsd_collector.go | 1 - 8 files changed, 6 insertions(+), 13 deletions(-) diff --git a/hystrix/eventstream.go b/hystrix/eventstream.go index 0b49de5..fe60fce 100644 --- a/hystrix/eventstream.go +++ b/hystrix/eventstream.go @@ -7,7 +7,7 @@ import ( "sync" "time" - "github.com/afex/hystrix-go/hystrix/rolling" + "github.com/raghavTayal/hystrix-go/hystrix/rolling" ) const ( diff --git a/hystrix/metric_collector/default_metric_collector.go b/hystrix/metric_collector/default_metric_collector.go index d874aae..b5cfb01 100644 --- a/hystrix/metric_collector/default_metric_collector.go +++ b/hystrix/metric_collector/default_metric_collector.go @@ -3,7 +3,7 @@ package metricCollector import ( "sync" - "github.com/afex/hystrix-go/hystrix/rolling" + "github.com/raghavTayal/hystrix-go/hystrix/rolling" ) // DefaultMetricCollector holds information about the circuit state. diff --git a/hystrix/metrics.go b/hystrix/metrics.go index d289fe6..a4865c9 100644 --- a/hystrix/metrics.go +++ b/hystrix/metrics.go @@ -4,8 +4,7 @@ import ( "sync" "time" - "github.com/afex/hystrix-go/hystrix/metric_collector" - "github.com/afex/hystrix-go/hystrix/rolling" + "github.com/raghavTayal/hystrix-go/hystrix/rolling" ) type commandExecution struct { diff --git a/hystrix/pool_metrics.go b/hystrix/pool_metrics.go index 93e97d9..4013f2a 100644 --- a/hystrix/pool_metrics.go +++ b/hystrix/pool_metrics.go @@ -3,7 +3,7 @@ package hystrix import ( "sync" - "github.com/afex/hystrix-go/hystrix/rolling" + "github.com/raghavTayal/hystrix-go/hystrix/rolling" ) type poolMetrics struct { diff --git a/loadtest/service/main.go b/loadtest/service/main.go index 5ac2af5..237b022 100644 --- a/loadtest/service/main.go +++ b/loadtest/service/main.go @@ -11,10 +11,9 @@ import ( "runtime" "time" - "github.com/afex/hystrix-go/hystrix" - "github.com/afex/hystrix-go/hystrix/metric_collector" - "github.com/afex/hystrix-go/plugins" "github.com/cactus/go-statsd-client/statsd" + "github.com/raghavTayal/hystrix-go/hystrix" + "github.com/raghavTayal/hystrix-go/plugins" ) const ( diff --git a/plugins/datadog_collector.go b/plugins/datadog_collector.go index 68e170b..7ec1228 100644 --- a/plugins/datadog_collector.go +++ b/plugins/datadog_collector.go @@ -4,7 +4,6 @@ import ( // Developed on https://github.com/DataDog/datadog-go/tree/a27810dd518c69be741a7fd5d0e39f674f615be8 "github.com/DataDog/datadog-go/statsd" - "github.com/afex/hystrix-go/hystrix/metric_collector" ) // These metrics are constants because we're leveraging the Datadog tagging diff --git a/plugins/graphite_aggregator.go b/plugins/graphite_aggregator.go index 3a3b74f..7f119bd 100644 --- a/plugins/graphite_aggregator.go +++ b/plugins/graphite_aggregator.go @@ -6,9 +6,6 @@ import ( "net" "strings" "time" - - "github.com/afex/hystrix-go/hystrix/metric_collector" - "github.com/rcrowley/go-metrics" ) var makeTimerFunc = func() interface{} { return metrics.NewTimer() } diff --git a/plugins/statsd_collector.go b/plugins/statsd_collector.go index 6f835ab..3900026 100644 --- a/plugins/statsd_collector.go +++ b/plugins/statsd_collector.go @@ -5,7 +5,6 @@ import ( "strings" "time" - "github.com/afex/hystrix-go/hystrix/metric_collector" "github.com/cactus/go-statsd-client/statsd" ) From c02aa80a8548460d8e02ec30f78a245d77f9df96 Mon Sep 17 00:00:00 2001 From: raghavTayal Date: Mon, 20 Feb 2023 11:06:37 +0530 Subject: [PATCH 3/3] changed default to 20 --- hystrix/rolling/rolling.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/hystrix/rolling/rolling.go b/hystrix/rolling/rolling.go index 2f65104..5098ff8 100644 --- a/hystrix/rolling/rolling.go +++ b/hystrix/rolling/rolling.go @@ -63,7 +63,7 @@ func (r *Number) getCurrentBucket() *numberBucket { } func (r *Number) removeOldBuckets() { - now := time.Now().Unix() - r.window + now := time.Now().Unix() - 20 for timestamp := range r.Buckets { if timestamp <= now { @@ -106,7 +106,7 @@ func (r *Number) Sum(now time.Time) float64 { defer r.Mutex.RUnlock() for timestamp, bucket := range r.Buckets { - if timestamp >= now.Unix()-r.window { + if timestamp >= now.Unix()-20 { sum += bucket.Value } } @@ -122,7 +122,7 @@ func (r *Number) Max(now time.Time) float64 { defer r.Mutex.RUnlock() for timestamp, bucket := range r.Buckets { - if timestamp >= now.Unix()-r.window { + if timestamp >= now.Unix()-20 { if bucket.Value > max { max = bucket.Value } @@ -133,8 +133,8 @@ func (r *Number) Max(now time.Time) float64 { } func (r *Number) Avg(now time.Time) float64 { - if r.window == 0 { // unexpected 0 - return r.Sum(now) - } - return r.Sum(now) / float64(r.window) + //if r.window == 0 { // unexpected 0 + // return r.Sum(now) + //} + return r.Sum(now) / float64(20) }