-
Notifications
You must be signed in to change notification settings - Fork 0
/
channel.go
157 lines (125 loc) · 2.58 KB
/
channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package pool
import (
"context"
"errors"
"fmt"
"net"
"sync"
)
type channelPool struct {
//保证并发安全(connCh的修改)
mu sync.RWMutex
//存储未使用的conn
connCh chan net.Conn
closed bool // pool是否已关闭
// net.Conn 生产者
factory Factory
maxConn int64 // 最大conn数量, <= 0 不限制
maxFree int64 // 最大空闲conn数量
openNum int64 // 已创建连接数
}
var (
ErrTimeOut = errors.New("time out")
)
// Factory net.Conn 生产者
type Factory func() (net.Conn, error)
func NewChannelPool(maxFree, maxConn int64, factory Factory) (*channelPool, error) {
if maxFree <= 0 || maxConn < 0 || maxFree > maxConn {
return nil, errors.New("invalid capacity settings")
}
p := &channelPool{
connCh: make(chan net.Conn, maxFree),
factory: factory,
maxConn: maxConn,
maxFree: maxFree,
}
// 初始化链接
for i := 0; i < int(maxFree); i++ {
conn, err := factory()
if err != nil {
_ = p.Close()
return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
}
p.connCh <- conn
}
p.openNum = maxFree
return p, nil
}
func (p *channelPool) Get() (net.Conn, error) {
return p.GetWitchContext(context.Background())
}
func (p *channelPool) GetWitchContext(ctx context.Context) (net.Conn, error) {
p.mu.Lock()
if p.closed {
p.mu.Unlock()
return nil, ErrClosed
}
// 有空闲链接, 或者已达到最大链接数,都只能从connCh中获取
if len(p.connCh) > 0 || (p.maxConn > 0 && p.openNum >= p.maxConn) {
p.mu.Unlock()
select {
case <-ctx.Done():
return nil, ErrTimeOut
case conn := <-p.connCh:
if conn == nil {
return nil, ErrClosed
}
return conn, nil
}
}
defer p.mu.Unlock()
// 未达到最大链接数,可以创建新链接
conn, err := p.factory()
if err != nil {
return nil, err
}
p.openNum++
return conn, nil
}
func (p *channelPool) Put(conn net.Conn) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}
p.mu.Lock()
defer p.mu.Unlock()
// 已关闭
if p.closed {
err := conn.Close()
if err == nil {
p.openNum--
}
return err
}
select {
case p.connCh <- conn:
return nil
default:
err := conn.Close()
if err == nil {
p.openNum--
}
return err
}
}
func (p *channelPool) Close() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return ErrClosed
}
p.closed = true
close(p.connCh)
for c := range p.connCh {
if err := c.Close(); err != nil {
return err
}
p.openNum--
}
return nil
}
func (p *channelPool) Len() int {
return len(p.connCh)
}
func (p *channelPool) OpenNum() int {
return int(p.openNum)
}