Skip to content

Commit

Permalink
Revert "remove allocator"
Browse files Browse the repository at this point in the history
This reverts commit f893f03.
  • Loading branch information
xtaci committed Dec 18, 2019
1 parent f893f03 commit 3627305
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 1 deletion.
69 changes: 69 additions & 0 deletions alloc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package smux

import (
"sync"

"github.com/pkg/errors"
)

var defaultAllocator *Allocator

func init() {
defaultAllocator = NewAllocator()
}

// Allocator for incoming frames, optimized to prevent overwriting after zeroing
type Allocator struct {
buffers []sync.Pool
}

// NewAllocator initiates a []byte allocator for frames less than 65536 bytes,
// the waste(memory fragmentation) of space allocation is guaranteed to be
// no more than 50%.
func NewAllocator() *Allocator {
alloc := new(Allocator)
alloc.buffers = make([]sync.Pool, 17) // 1B -> 64K
for k := range alloc.buffers {
i := k
alloc.buffers[k].New = func() interface{} {
return make([]byte, 1<<uint32(i))
}
}
return alloc
}

// Get a []byte from pool with most appropriate cap
func (alloc *Allocator) Get(size int) []byte {
if size <= 0 || size > 65536 {
return nil
}

bits := msb(size)
if size == 1<<bits {
return alloc.buffers[bits].Get().([]byte)[:size]
} else {
return alloc.buffers[bits+1].Get().([]byte)[:size]
}
}

// Put returns a []byte to pool for future use,
// which the cap must be exactly 2^n
func (alloc *Allocator) Put(buf []byte) error {
bits := msb(cap(buf))
if cap(buf) == 0 || cap(buf) > 65536 || cap(buf) != 1<<bits {
return errors.New("allocator Put() incorrect buffer size")
}
alloc.buffers[bits].Put(buf)
return nil
}

// msb return the pos of most significiant bit
func msb(size int) uint16 {
var pos uint16
size >>= 1
for size > 0 {
size >>= 1
pos++
}
return pos
}
75 changes: 75 additions & 0 deletions alloc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package smux

import (
"math/rand"
"testing"
)

func TestAllocGet(t *testing.T) {
alloc := NewAllocator()
if alloc.Get(0) != nil {
t.Fatal(0)
}
if len(alloc.Get(1)) != 1 {
t.Fatal(1)
}
if len(alloc.Get(2)) != 2 {
t.Fatal(2)
}
if len(alloc.Get(3)) != 3 || cap(alloc.Get(3)) != 4 {
t.Fatal(3)
}
if len(alloc.Get(4)) != 4 {
t.Fatal(4)
}
if len(alloc.Get(1023)) != 1023 || cap(alloc.Get(1023)) != 1024 {
t.Fatal(1023)
}
if len(alloc.Get(1024)) != 1024 {
t.Fatal(1024)
}
if len(alloc.Get(65536)) != 65536 {
t.Fatal(65536)
}
if alloc.Get(65537) != nil {
t.Fatal(65537)
}
}

func TestAllocPut(t *testing.T) {
alloc := NewAllocator()
if err := alloc.Put(nil); err == nil {
t.Fatal("put nil misbehavior")
}
if err := alloc.Put(make([]byte, 3, 3)); err == nil {
t.Fatal("put elem:3 []bytes misbehavior")
}
if err := alloc.Put(make([]byte, 4, 4)); err != nil {
t.Fatal("put elem:4 []bytes misbehavior")
}
if err := alloc.Put(make([]byte, 1023, 1024)); err != nil {
t.Fatal("put elem:1024 []bytes misbehavior")
}
if err := alloc.Put(make([]byte, 65536, 65536)); err != nil {
t.Fatal("put elem:65536 []bytes misbehavior")
}
if err := alloc.Put(make([]byte, 65537, 65537)); err == nil {
t.Fatal("put elem:65537 []bytes misbehavior")
}
}

func TestAllocPutThenGet(t *testing.T) {
alloc := NewAllocator()
data := alloc.Get(4)
alloc.Put(data)
newData := alloc.Get(4)
if cap(data) != cap(newData) {
t.Fatal("different cap while alloc.Get()")
}
}

func BenchmarkMSB(b *testing.B) {
for i := 0; i < b.N; i++ {
msb(rand.Int())
}
}
2 changes: 1 addition & 1 deletion session.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (s *Session) recvLoop() {
s.streamLock.Unlock()
case cmdPSH:
if hdr.Length() > 0 {
newbuf := make([]byte, hdr.Length())
newbuf := defaultAllocator.Get(int(hdr.Length()))
if written, err := io.ReadFull(s.conn, newbuf); err == nil {
s.streamLock.Lock()
if stream, ok := s.streams[sid]; ok {
Expand Down
9 changes: 9 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Stream struct {
sess *Session

buffers [][]byte
heads [][]byte // slice heads kept for recycle

bufferLock sync.Mutex
frameSize int
Expand Down Expand Up @@ -70,6 +71,9 @@ func (s *Stream) Read(b []byte) (n int, err error) {
if len(s.buffers[0]) == 0 {
s.buffers[0] = nil
s.buffers = s.buffers[1:]
// full recycle
defaultAllocator.Put(s.heads[0])
s.heads = s.heads[1:]
}
}
s.bufferLock.Unlock()
Expand All @@ -93,11 +97,13 @@ func (s *Stream) WriteTo(w io.Writer) (n int64, err error) {
if len(s.buffers) > 0 {
buf = s.buffers[0]
s.buffers = s.buffers[1:]
s.heads = s.heads[1:]
}
s.bufferLock.Unlock()

if buf != nil {
nw, ew := w.Write(buf)
defaultAllocator.Put(buf)
s.sess.returnTokens(len(buf))
if nw > 0 {
n += int64(nw)
Expand Down Expand Up @@ -256,6 +262,7 @@ func (s *Stream) RemoteAddr() net.Addr {
func (s *Stream) pushBytes(buf []byte) (written int, err error) {
s.bufferLock.Lock()
s.buffers = append(s.buffers, buf)
s.heads = append(s.heads, buf)
s.bufferLock.Unlock()
return
}
Expand All @@ -265,8 +272,10 @@ func (s *Stream) recycleTokens() (n int) {
s.bufferLock.Lock()
for k := range s.buffers {
n += len(s.buffers[k])
defaultAllocator.Put(s.heads[k])
}
s.buffers = nil
s.heads = nil
s.bufferLock.Unlock()
return
}
Expand Down

0 comments on commit 3627305

Please sign in to comment.