diff --git a/README.MD b/README.MD index 06432e2..035426a 100644 --- a/README.MD +++ b/README.MD @@ -17,17 +17,6 @@ Gohalt is simple and convenient yet powerful and efficient throttling go library. Gohalt provides various throttlers and surronding tools to build throttling pipelines and rate limiters of any complexity adjusted to your specific needs. Gohalt provides an easy way to integrate throttling and rate limiting with your infrastructure through built in middlewares. -## Features - -- [x] Blastly fast and efficient, Gohalt has minimal performance overhead, it was design with performance as the primary goal. -- [x] Flexible and powerful, Gohalt supports numbers of different throttling strategies and conditions that could be easily combined and customized to match your needs [link](#Throttlers). -- [x] Easy to integrate, Gohalt provides separate package with numbers of built in middlewares for simple (couple lines of code) integrations with stdlib and other libraries, among which are: io, rpc/grpc, http, sql, gin, [etc](#Integrations). -- [x] Metrics awareness, Gohalt could use Prometheus metrics as a conditions for throttling. -- [x] Queueing and delayed processing, Gohalt supports throttling queueing which means you can easily save throttled query to rabbitmq/kafka stream to process it later. - -- [ ] Durable storage, Gohalt has embedded k/v storage to provide thtottling persistence and durability. -- [ ] Meta awareness, Gohalt provides easy way to access inner throttlers state in form of meta that can be later exposed to logging, headers, etc. - ## Concepts Gohalt uses `Throttler` as the core interface for all derived throttlers and surronding tools. @@ -78,6 +67,10 @@ func WithTimestamp(ctx context.Context, ts time.Time) context.Context // to differ `Acquire` priority levels. // Resulted context is used by: `priority` throtttler. func WithPriority(ctx context.Context, priority uint8) context.Context +// WithWeight adds the provided weight to the provided context +// to differ `Acquire` weight levels. +// Resulted context is used by: `semaphore` and `cellrate` throtttlers. +func WithWeight(ctx context.Context, weight int64) context.Context // WithKey adds the provided key to the provided context // to add additional call identifier to context. // Resulted context is used by: `pattern` and `generator` throtttlers. @@ -94,10 +87,11 @@ func WithMarshaler(ctx context.Context, mrsh Marshaler) context.Context // WithParams facade call that respectively calls: // - `WithTimestamp` // - `WithPriority` +// - `WithWeight` // - `WithKey` // - `WithMessage` // - `WithMarshaler` -func WithParams(ctx context.Context, ts time.Time, priority uint8, key string, message interface{}, marshaler Marshaler) context.Context +func WithParams(ctx context.Context, ts time.Time, priority uint8, weight int64, key string, message interface{}, marshaler Marshaler) context.Context ``` Also there is yet another throttling sugar `func WithThrottler(ctx context.Context, thr Throttler, freq time.Duration) context.Context` related to context. Which defines context implementation that uses parrent context plus throttler internally. Using it you can keep typical context patterns for cancelation handling and apply and combine it with throttling. ```go @@ -172,6 +166,7 @@ You can find list of returning error types for all existing throttlers in thrott | cache | `func NewThrottlerCache(thr Throttler, cache time.Duration) Throttler` | Caches provided throttler calls for the provided cache duration, throttler release resulting resets cache.
Only non throttling calls are cached for the provided cache duration.
- could return any underlying throttler error; | | generator | `func NewThrottlerGenerator(gen Generator, capacity uint64, eviction float64) Throttler` | Creates new throttler instance that throttles if found key matching throttler throttles.
If no key matching throttler has been found generator used insted to provide new throttler that will be added to existing throttlers map.
Generated throttlers are kept in bounded map with capacity *c* defined by the specified capacity and eviction rate *e* defined by specified eviction value is normalized to [0.0, 1.0], where eviction rate affects number of throttlers that will be removed from the map after bounds overflow.
Use `WithKey` to specify key for throttler matching and generation.
- could return `ErrorInternal`;
- could return any underlying throttler error; | | semaphore | `func NewThrottlerSemaphore(weight int64) Throttler` | Creates new throttler instance that throttles call if underlying semaphore throttles.
Use `WithWeight` to override context call weight, 1 by default.
- could return `ErrorThreshold`; | +| cellrate | `func NewThrottlerCellRate(threshold uint64, interval time.Duration, monotone bool) Throttler` | Creates new throttler instance that uses generic cell rate algorithm to throttles call within provided interval and threshold.
If provided monotone flag is set class to release will have no effect on throttler.
Use `WithWeight` to override context call qunatity, 1 by default.
- could return `ErrorThreshold`; | ## Integrations diff --git a/context.go b/context.go index e6fa7e4..705c3d1 100644 --- a/context.go +++ b/context.go @@ -48,6 +48,22 @@ func ctxPriority(ctx context.Context, limit uint8) uint8 { return 1 } +// WithWeight adds the provided weight to the provided context +// to differ `Acquire` weight levels. +// Resulted context is used by: `semaphore` and `cellrate` throtttlers. +func WithWeight(ctx context.Context, weight int64) context.Context { + return context.WithValue(ctx, ghctxweight, weight) +} + +func ctxWeight(ctx context.Context) int64 { + if val := ctx.Value(ghctxweight); val != nil { + if weight, ok := val.(int64); ok && weight > 0 { + return weight + } + } + return 1 +} + // WithKey adds the provided key to the provided context // to add additional call identifier to context. // Resulted context is used by: `pattern` and `generator` throtttlers. @@ -94,6 +110,7 @@ func ctxMarshaler(ctx context.Context) Marshaler { // WithParams facade call that respectively calls: // - `WithTimestamp` // - `WithPriority` +// - `WithWeight` // - `WithKey` // - `WithMessage` // - `WithMarshaler` @@ -101,12 +118,14 @@ func WithParams( ctx context.Context, ts time.Time, priority uint8, + weight int64, key string, message interface{}, marshaler Marshaler, ) context.Context { ctx = WithTimestamp(ctx, ts) ctx = WithPriority(ctx, priority) + ctx = WithWeight(ctx, weight) ctx = WithKey(ctx, key) ctx = WithMessage(ctx, message) ctx = WithMarshaler(ctx, marshaler) @@ -154,19 +173,3 @@ func (ctx ctxthr) Err() (err error) { func (ctx ctxthr) Throttler() Throttler { return ctx.thr } - -// WithWeight adds the provided weight to the provided context -// to differ `Acquire` weight levels. -// Resulted context is used by: `semaphore` throtttler. -func WithWeight(ctx context.Context, weight int64) context.Context { - return context.WithValue(ctx, ghctxweight, weight) -} - -func ctxWeight(ctx context.Context) int64 { - if val := ctx.Value(ghctxweight); val != nil { - if weight, ok := val.(int64); ok && weight > 0 { - return weight - } - } - return 1 -} diff --git a/context_test.go b/context_test.go index 180cd3d..ee89b68 100644 --- a/context_test.go +++ b/context_test.go @@ -16,6 +16,7 @@ func TestContext(t *testing.T) { ctx := WithParams( context.Background(), time.Now(), + 1, 0, "", nil, diff --git a/throttlers.go b/throttlers.go index a84151f..06567d4 100644 --- a/throttlers.go +++ b/throttlers.go @@ -1106,3 +1106,49 @@ func (thr tsemaphore) Release(ctx context.Context) error { thr.sem.Release(ctxWeight(ctx)) return nil } + +type tcellrate struct { + current uint64 + threshold uint64 + quantum time.Duration + monotone bool +} + +// NewThrottlerCellRate creates new throttler instance that +// uses generic cell rate algorithm to throttles call within provided interval and threshold. +// If provided monotone flag is set class to release will have no effect on throttler. +// Use `WithWeight` to override context call qunatity, 1 by default. +// - could return `ErrorThreshold`; +func NewThrottlerCellRate(threshold uint64, interval time.Duration, monotone bool) Throttler { + quantum := time.Duration(math.Ceil(float64(interval) / float64(threshold))) + return &tcellrate{threshold: threshold, quantum: quantum, monotone: monotone} +} + +func (thr *tcellrate) Acquire(ctx context.Context) error { + current := atomicGet(&thr.current) + nowTs := uint64(time.Now().UTC().UnixNano()) + if current < nowTs { + current = nowTs + } + updated := current + (uint64(thr.quantum) * uint64(ctxWeight(ctx))) + max := nowTs + (uint64(thr.quantum) * thr.threshold) + if updated > max { + current := uint64(math.Round(float64(updated-nowTs) / float64(thr.quantum))) + return ErrorThreshold{ + Throttler: "cellrate", + Threshold: strpair{current: current, threshold: thr.threshold}, + } + } + atomicSet(&thr.current, updated) + return nil +} + +func (thr *tcellrate) Release(ctx context.Context) error { + // don't decrement calls for monotone cell. + if thr.monotone { + return nil + } + updated := atomicGet(&thr.current) - (uint64(thr.quantum) * uint64(ctxWeight(ctx))) + atomicSet(&thr.current, updated) + return nil +} diff --git a/throttlers_test.go b/throttlers_test.go index 2b93b5f..ba3fd68 100644 --- a/throttlers_test.go +++ b/throttlers_test.go @@ -20,6 +20,7 @@ const ( ms3_0 time.Duration = 3 * time.Millisecond ms4_0 time.Duration = 4 * time.Millisecond ms5_0 time.Duration = 5 * time.Millisecond + ms6_0 time.Duration = 6 * time.Millisecond ms7_0 time.Duration = 7 * time.Millisecond ms8_0 time.Duration = 8 * time.Millisecond ms9_0 time.Duration = 9 * time.Millisecond @@ -1237,6 +1238,68 @@ func TestThrottlers(t *testing.T) { }, }, }, + "Throttler monotone cellrate should throttle on threshold": { + tms: 5, + thr: NewThrottlerCellRate(2, ms6_0, true), + ctxs: []context.Context{ + context.TODO(), + context.TODO(), + context.TODO(), + context.TODO(), + WithWeight(context.TODO(), 2), + }, + pres: []Runnable{ + nope, + nope, + nope, + delayed(ms4_0, nope), + delayed(ms3_0, nope), + }, + errs: []error{ + nil, + nil, + ErrorThreshold{ + Throttler: "cellrate", + Threshold: strpair{current: 3, threshold: 2}, + }, + nil, + ErrorThreshold{ + Throttler: "cellrate", + Threshold: strpair{current: 3, threshold: 2}, + }, + }, + }, + "Throttler not monotone cellrate should throttle on threshold": { + tms: 5, + thr: NewThrottlerCellRate(2, ms9_0, false), + acts: []Runnable{ + delayed(ms5_0, nope), + delayed(ms5_0, nope), + delayed(ms5_0, nope), + nope, + nope, + }, + pres: []Runnable{ + nope, + nope, + nope, + nope, + delayed(ms7_0, nope), + }, + errs: []error{ + nil, + nil, + ErrorThreshold{ + Throttler: "cellrate", + Threshold: strpair{current: 3, threshold: 2}, + }, + ErrorThreshold{ + Throttler: "cellrate", + Threshold: strpair{current: 3, threshold: 2}, + }, + nil, + }, + }, } for tname, ptrtcase := range table { t.Run(tname, func(t *testing.T) {