Skip to content

Commit

Permalink
drastically reduce allocations in ring buffer implementation (#64)
Browse files Browse the repository at this point in the history
* reuse cap of `s.b` slice by shifting its elements to the left on writes

Before this change `Append` was allocating a new slice every time because len(s.b) == cap(s.b). In my testings len(s.b) is usually small enough (<= 50, sometimes spikes to 200) even in high-write benchmarks such as BenchmarkSendRecvLarge, often near 0, so copy should not be too expensive. It is also efficient when write rate is roughly equal to read rate.

* shift buffer only if at least ~1/4 of slice is empty to prevent extreme case when we shift slice by one every time

* add more doc to segmentedBuffer
  • Loading branch information
pymq authored Nov 20, 2021
1 parent 3613688 commit d6101de
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
1 change: 1 addition & 0 deletions bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func BenchmarkSendRecv(b *testing.B) {
recvBuf := make([]byte, 512)

doneCh := make(chan struct{})
b.ResetTimer()
go func() {
defer close(doneCh)
defer server.Close()
Expand Down
39 changes: 31 additions & 8 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,21 @@ type segmentedBuffer struct {
cap uint32
len uint32
bm sync.Mutex
// read position in b[0].
// read position in b[bPos].
// We must not reslice any of the buffers in b, as we need to put them back into the pool.
readPos int
b [][]byte
// bPos is an index in b slice. If bPos == len(b), it means that buffer is empty.
bPos int
// b is used as a growable buffer. Each Append adds []byte to the end of b.
// If there is no space available at the end of the buffer (len(b) == cap(b)), but it has space
// at the beginning (bPos > 0 and at least 1/4 of the buffer is empty), data inside b is shifted to the beginning.
// Each Read reads from b[bPos] and increments bPos if b[bPos] was fully read.
b [][]byte
}

// NewSegmentedBuffer allocates a ring buffer.
func newSegmentedBuffer(initialCapacity uint32) segmentedBuffer {
return segmentedBuffer{cap: initialCapacity, b: make([][]byte, 0)}
return segmentedBuffer{cap: initialCapacity, b: make([][]byte, 0, 16)}
}

// Len is the amount of data in the receive buffer.
Expand Down Expand Up @@ -109,15 +115,15 @@ func (s *segmentedBuffer) GrowTo(max uint32, force bool) (bool, uint32) {
func (s *segmentedBuffer) Read(b []byte) (int, error) {
s.bm.Lock()
defer s.bm.Unlock()
if len(s.b) == 0 {
if s.bPos == len(s.b) {
return 0, io.EOF
}
data := s.b[0][s.readPos:]
data := s.b[s.bPos][s.readPos:]
n := copy(b, data)
if n == len(data) {
pool.Put(s.b[0])
s.b[0] = nil
s.b = s.b[1:]
pool.Put(s.b[s.bPos])
s.b[s.bPos] = nil
s.bPos++
s.readPos = 0
} else {
s.readPos += n
Expand Down Expand Up @@ -152,6 +158,23 @@ func (s *segmentedBuffer) Append(input io.Reader, length uint32) error {
if n > 0 {
s.len += uint32(n)
s.cap -= uint32(n)
// s.b has no available space at the end, but has space at the beginning
if len(s.b) == cap(s.b) && s.bPos > 0 {
if s.bPos == len(s.b) {
// the buffer is empty, so just move pos
s.bPos = 0
s.b = s.b[:0]
} else if s.bPos > cap(s.b)/4 {
// at least 1/4 of buffer is empty, so shift data to the left to free space at the end
copied := copy(s.b, s.b[s.bPos:])
// clear references to copied data
for i := copied; i < len(s.b); i++ {
s.b[i] = nil
}
s.b = s.b[:copied]
s.bPos = 0
}
}
s.b = append(s.b, dst[0:n])
}
return err
Expand Down

0 comments on commit d6101de

Please sign in to comment.