Skip to content

Commit

Permalink
add generic cell rate throttler
Browse files Browse the repository at this point in the history
  • Loading branch information
1pkg committed Aug 14, 2021
1 parent 72255b8 commit bcb2a1a
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 28 deletions.
19 changes: 7 additions & 12 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,6 @@

Gohalt is simple and convenient yet powerful and efficient throttling go library. Gohalt provides various throttlers and surronding tools to build throttling pipelines and rate limiters of any complexity adjusted to your specific needs. Gohalt provides an easy way to integrate throttling and rate limiting with your infrastructure through built in middlewares.

## Features

- [x] Blastly fast and efficient, Gohalt has minimal performance overhead, it was design with performance as the primary goal.
- [x] Flexible and powerful, Gohalt supports numbers of different throttling strategies and conditions that could be easily combined and customized to match your needs [link](#Throttlers).
- [x] Easy to integrate, Gohalt provides separate package with numbers of built in middlewares for simple (couple lines of code) integrations with stdlib and other libraries, among which are: io, rpc/grpc, http, sql, gin, [etc](#Integrations).
- [x] Metrics awareness, Gohalt could use Prometheus metrics as a conditions for throttling.
- [x] Queueing and delayed processing, Gohalt supports throttling queueing which means you can easily save throttled query to rabbitmq/kafka stream to process it later.

- [ ] Durable storage, Gohalt has embedded k/v storage to provide thtottling persistence and durability.
- [ ] Meta awareness, Gohalt provides easy way to access inner throttlers state in form of meta that can be later exposed to logging, headers, etc.

## Concepts

Gohalt uses `Throttler` as the core interface for all derived throttlers and surronding tools.
Expand Down Expand Up @@ -78,6 +67,10 @@ func WithTimestamp(ctx context.Context, ts time.Time) context.Context
// to differ `Acquire` priority levels.
// Resulted context is used by: `priority` throtttler.
func WithPriority(ctx context.Context, priority uint8) context.Context
// WithWeight adds the provided weight to the provided context
// to differ `Acquire` weight levels.
// Resulted context is used by: `semaphore` and `cellrate` throtttlers.
func WithWeight(ctx context.Context, weight int64) context.Context
// WithKey adds the provided key to the provided context
// to add additional call identifier to context.
// Resulted context is used by: `pattern` and `generator` throtttlers.
Expand All @@ -94,10 +87,11 @@ func WithMarshaler(ctx context.Context, mrsh Marshaler) context.Context
// WithParams facade call that respectively calls:
// - `WithTimestamp`
// - `WithPriority`
// - `WithWeight`
// - `WithKey`
// - `WithMessage`
// - `WithMarshaler`
func WithParams(ctx context.Context, ts time.Time, priority uint8, key string, message interface{}, marshaler Marshaler) context.Context
func WithParams(ctx context.Context, ts time.Time, priority uint8, weight int64, key string, message interface{}, marshaler Marshaler) context.Context
```
Also there is yet another throttling sugar `func WithThrottler(ctx context.Context, thr Throttler, freq time.Duration) context.Context` related to context. Which defines context implementation that uses parrent context plus throttler internally. Using it you can keep typical context patterns for cancelation handling and apply and combine it with throttling.
```go
Expand Down Expand Up @@ -172,6 +166,7 @@ You can find list of returning error types for all existing throttlers in thrott
| cache | `func NewThrottlerCache(thr Throttler, cache time.Duration) Throttler` | Caches provided throttler calls for the provided cache duration, throttler release resulting resets cache.<br> Only non throttling calls are cached for the provided cache duration.<br> - could return any underlying throttler error; |
| generator | `func NewThrottlerGenerator(gen Generator, capacity uint64, eviction float64) Throttler` | Creates new throttler instance that throttles if found key matching throttler throttles.<br> If no key matching throttler has been found generator used insted to provide new throttler that will be added to existing throttlers map.<br> Generated throttlers are kept in bounded map with capacity *c* defined by the specified capacity and eviction rate *e* defined by specified eviction value is normalized to [0.0, 1.0], where eviction rate affects number of throttlers that will be removed from the map after bounds overflow.<br> Use `WithKey` to specify key for throttler matching and generation.<br> - could return `ErrorInternal`;<br> - could return any underlying throttler error; |
| semaphore | `func NewThrottlerSemaphore(weight int64) Throttler` | Creates new throttler instance that throttles call if underlying semaphore throttles.<br>Use `WithWeight` to override context call weight, 1 by default.<br> - could return `ErrorThreshold`; |
| cellrate | `func NewThrottlerCellRate(threshold uint64, interval time.Duration, monotone bool) Throttler` | Creates new throttler instance that uses generic cell rate algorithm to throttles call within provided interval and threshold.<br>If provided monotone flag is set class to release will have no effect on throttler.<br>Use `WithWeight` to override context call qunatity, 1 by default.<br> - could return `ErrorThreshold`; |

## Integrations

Expand Down
35 changes: 19 additions & 16 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,22 @@ func ctxPriority(ctx context.Context, limit uint8) uint8 {
return 1
}

// WithWeight adds the provided weight to the provided context
// to differ `Acquire` weight levels.
// Resulted context is used by: `semaphore` and `cellrate` throtttlers.
func WithWeight(ctx context.Context, weight int64) context.Context {
return context.WithValue(ctx, ghctxweight, weight)
}

func ctxWeight(ctx context.Context) int64 {
if val := ctx.Value(ghctxweight); val != nil {
if weight, ok := val.(int64); ok && weight > 0 {
return weight
}
}
return 1
}

// WithKey adds the provided key to the provided context
// to add additional call identifier to context.
// Resulted context is used by: `pattern` and `generator` throtttlers.
Expand Down Expand Up @@ -94,19 +110,22 @@ func ctxMarshaler(ctx context.Context) Marshaler {
// WithParams facade call that respectively calls:
// - `WithTimestamp`
// - `WithPriority`
// - `WithWeight`
// - `WithKey`
// - `WithMessage`
// - `WithMarshaler`
func WithParams(
ctx context.Context,
ts time.Time,
priority uint8,
weight int64,
key string,
message interface{},
marshaler Marshaler,
) context.Context {
ctx = WithTimestamp(ctx, ts)
ctx = WithPriority(ctx, priority)
ctx = WithWeight(ctx, weight)
ctx = WithKey(ctx, key)
ctx = WithMessage(ctx, message)
ctx = WithMarshaler(ctx, marshaler)
Expand Down Expand Up @@ -154,19 +173,3 @@ func (ctx ctxthr) Err() (err error) {
func (ctx ctxthr) Throttler() Throttler {
return ctx.thr
}

// WithWeight adds the provided weight to the provided context
// to differ `Acquire` weight levels.
// Resulted context is used by: `semaphore` throtttler.
func WithWeight(ctx context.Context, weight int64) context.Context {
return context.WithValue(ctx, ghctxweight, weight)
}

func ctxWeight(ctx context.Context) int64 {
if val := ctx.Value(ghctxweight); val != nil {
if weight, ok := val.(int64); ok && weight > 0 {
return weight
}
}
return 1
}
1 change: 1 addition & 0 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func TestContext(t *testing.T) {
ctx := WithParams(
context.Background(),
time.Now(),
1,
0,
"",
nil,
Expand Down
46 changes: 46 additions & 0 deletions throttlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,3 +1106,49 @@ func (thr tsemaphore) Release(ctx context.Context) error {
thr.sem.Release(ctxWeight(ctx))
return nil
}

type tcellrate struct {
current uint64
threshold uint64
quantum time.Duration
monotone bool
}

// NewThrottlerCellRate creates new throttler instance that
// uses generic cell rate algorithm to throttles call within provided interval and threshold.
// If provided monotone flag is set class to release will have no effect on throttler.
// Use `WithWeight` to override context call qunatity, 1 by default.
// - could return `ErrorThreshold`;
func NewThrottlerCellRate(threshold uint64, interval time.Duration, monotone bool) Throttler {
quantum := time.Duration(math.Ceil(float64(interval) / float64(threshold)))
return &tcellrate{threshold: threshold, quantum: quantum, monotone: monotone}
}

func (thr *tcellrate) Acquire(ctx context.Context) error {
current := atomicGet(&thr.current)
nowTs := uint64(time.Now().UTC().UnixNano())
if current < nowTs {
current = nowTs
}
updated := current + (uint64(thr.quantum) * uint64(ctxWeight(ctx)))
max := nowTs + (uint64(thr.quantum) * thr.threshold)
if updated > max {
current := uint64(math.Round(float64(updated-nowTs) / float64(thr.quantum)))
return ErrorThreshold{
Throttler: "cellrate",
Threshold: strpair{current: current, threshold: thr.threshold},
}
}
atomicSet(&thr.current, updated)
return nil
}

func (thr *tcellrate) Release(ctx context.Context) error {
// don't decrement calls for monotone cell.
if thr.monotone {
return nil
}
updated := atomicGet(&thr.current) - (uint64(thr.quantum) * uint64(ctxWeight(ctx)))
atomicSet(&thr.current, updated)
return nil
}
63 changes: 63 additions & 0 deletions throttlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
ms3_0 time.Duration = 3 * time.Millisecond
ms4_0 time.Duration = 4 * time.Millisecond
ms5_0 time.Duration = 5 * time.Millisecond
ms6_0 time.Duration = 6 * time.Millisecond
ms7_0 time.Duration = 7 * time.Millisecond
ms8_0 time.Duration = 8 * time.Millisecond
ms9_0 time.Duration = 9 * time.Millisecond
Expand Down Expand Up @@ -1237,6 +1238,68 @@ func TestThrottlers(t *testing.T) {
},
},
},
"Throttler monotone cellrate should throttle on threshold": {
tms: 5,
thr: NewThrottlerCellRate(2, ms6_0, true),
ctxs: []context.Context{
context.TODO(),
context.TODO(),
context.TODO(),
context.TODO(),
WithWeight(context.TODO(), 2),
},
pres: []Runnable{
nope,
nope,
nope,
delayed(ms4_0, nope),
delayed(ms3_0, nope),
},
errs: []error{
nil,
nil,
ErrorThreshold{
Throttler: "cellrate",
Threshold: strpair{current: 3, threshold: 2},
},
nil,
ErrorThreshold{
Throttler: "cellrate",
Threshold: strpair{current: 3, threshold: 2},
},
},
},
"Throttler not monotone cellrate should throttle on threshold": {
tms: 5,
thr: NewThrottlerCellRate(2, ms9_0, false),
acts: []Runnable{
delayed(ms5_0, nope),
delayed(ms5_0, nope),
delayed(ms5_0, nope),
nope,
nope,
},
pres: []Runnable{
nope,
nope,
nope,
nope,
delayed(ms7_0, nope),
},
errs: []error{
nil,
nil,
ErrorThreshold{
Throttler: "cellrate",
Threshold: strpair{current: 3, threshold: 2},
},
ErrorThreshold{
Throttler: "cellrate",
Threshold: strpair{current: 3, threshold: 2},
},
nil,
},
},
}
for tname, ptrtcase := range table {
t.Run(tname, func(t *testing.T) {
Expand Down

0 comments on commit bcb2a1a

Please sign in to comment.