-
Notifications
You must be signed in to change notification settings - Fork 1
/
semaphore.go
107 lines (89 loc) · 2.31 KB
/
semaphore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package syncx
import (
"context"
"errors"
)
// Semaphore defines semaphore interface
type Semaphore interface {
//Acquire acquires one permit, if its not available the goroutine will block till its available or Context.Done() occurs.
//You can pass context.WithTimeout() to support timeoutable acquire.
Acquire(ctx context.Context) error
//AcquireMany is similar to Acquire() but for many permits
//Returns successfully acquired permits.
AcquireMany(ctx context.Context, n int) (int, error)
//Release releases one permit
Release()
//ReleaseMany releases many permits
ReleaseMany(n int) error
//AvailablePermits returns number of available unacquired permits
AvailablePermits() int
//DrainPermits acquires all available permits and return the number of permits acquired
DrainPermits() (int, error)
}
// NewSemaphore returns new Semaphore instance
func NewSemaphore(permits int) (*semaphore, error) {
if permits < 1 {
return nil, errors.New("invalid number of permits. Less than 1")
}
return &semaphore{
channel: make(chan struct{}, permits),
}, nil
}
type semaphore struct {
channel chan struct{}
}
func (s *semaphore) Acquire(ctx context.Context) error {
select {
case s.channel <- struct{}{}:
return nil
case <-ctx.Done():
return errors.New("acquire canceled")
}
}
func (s *semaphore) AcquireMany(ctx context.Context, n int) (int, error) {
if n < 0 {
return 0, errors.New("acquir count coundn't be negative")
}
if n > s.totalPermits() {
return 0, errors.New("too many requested permits")
}
acquired := 0
for ; n > 0; n-- {
select {
case s.channel <- struct{}{}:
acquired++
continue
case <-ctx.Done():
return acquired, errors.New("acquire canceled")
}
}
return acquired, nil
}
func (s *semaphore) AvailablePermits() int {
return s.totalPermits() - len(s.channel)
}
func (s *semaphore) DrainPermits(ctx context.Context) (int, error) {
n := s.AvailablePermits()
if n > 0 {
return s.AcquireMany(ctx, n)
}
return n, nil
}
func (s *semaphore) Release() {
<-s.channel
}
func (s *semaphore) ReleaseMany(n int) error {
if n < 0 {
return errors.New("release count coundn't be negative")
}
if n > s.totalPermits() {
return errors.New("too many requested releases")
}
for ; n > 0; n-- {
s.Release()
}
return nil
}
func (s *semaphore) totalPermits() int {
return cap(s.channel)
}