Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
alphadose committed Jul 25, 2022
1 parent 9de5a3a commit 00197d6
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 22 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# ZenQ

> A low-latency thread-safe queue in golang implemented using a lock-free ringbuffer
> A low-latency thread-safe queue in golang implemented using a lock-free ringbuffer and runtime internals
Based on the [LMAX Disruptor Pattern](https://lmax-exchange.github.io/disruptor/disruptor.html)

## Features

Expand Down
6 changes: 3 additions & 3 deletions lib_runtime_linkage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"github.com/alphadose/zenq/v2/constants"
)

const cacheLinePadSize = constants.CacheLinePadSize

type cacheLinePadding struct{ _ [cacheLinePadSize]byte }
type cacheLinePadding struct {
_ [constants.CacheLinePadSize]byte
}

// Linking ZenQ with golang internal runtime library to allow usage of scheduling primitives
// like goready(), mcall() etc to allow low-level scheduling of goroutines
Expand Down
39 changes: 21 additions & 18 deletions zenq.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
// A minimalist thread-safe queue implemented using a lock-free ringbuffer which is even faster
// and more resource friendly than golang's native channels
// A minimalist thread-safe queue implemented using a lock-free ringbuffer which is faster
// and has lower memory allocations than golang's native channels
// Based on the LMAX disruptor pattern https://lmax-exchange.github.io/disruptor/disruptor.html

// Known Limitations:-
//
// 1. Current max queue_size = 2^31
// 2. The size of the queue must be a power of 2
// 1. Max queue_size = 2^31
// 2. The queue_size is a power of 2, in case a different size is provided then queue_size is rounded up to the next greater power of 2

// Suggestions:-
//
// 1. Use runtime.LockOSThread() on the goroutine calling ZenQ.Read() for best performance provided you have > 1 cpu cores
// 2. Use large queue sizes (>= 2^14) in SPSC mode for best gains
// 1. Use runtime.LockOSThread() on the goroutine calling ZenQ.Read() for lowest latency provided you have > 1 cpu cores
//

package zenq

Expand All @@ -19,6 +20,8 @@ import (
"sync"
"sync/atomic"
"unsafe"

"github.com/alphadose/zenq/v2/constants"
)

// ZenQ global state enums
Expand Down Expand Up @@ -48,13 +51,13 @@ const (
)

type (
Slot[T any] struct {
slot[T any] struct {
State uint32
WriteParker *ThreadParker[T]
Item T
}

SelectFactory struct {
selectFactory struct {
state uint32
auxThread unsafe.Pointer
backlog unsafe.Pointer
Expand All @@ -66,44 +69,44 @@ type (
// The padding members 1 to 5 below are here to ensure each item is on a separate cache line.
// This prevents false sharing and hence improves performance.
writerIndex uint32
_p1 [cacheLinePadSize - unsafe.Sizeof(uint32(0))]byte
_p1 [constants.CacheLinePadSize - unsafe.Sizeof(uint32(0))]byte
readerIndex uint32
_p2 [cacheLinePadSize - unsafe.Sizeof(uint32(0))]byte
_p2 [constants.CacheLinePadSize - unsafe.Sizeof(uint32(0))]byte
globalState uint32
indexMask uint32
// memory pool refs for storing and leasing parking spots for goroutines
alloc func() any
free func(any)
_p3 [cacheLinePadSize - 2*unsafe.Sizeof(uint32(0)) - 2*unsafe.Sizeof(func() {})]byte
SelectFactory
_p4 [cacheLinePadSize - unsafe.Sizeof(SelectFactory{})]byte
contents []*Slot[T]
_p3 [constants.CacheLinePadSize - 2*unsafe.Sizeof(uint32(0)) - 2*unsafe.Sizeof(func() {})]byte
selectFactory
_p4 [constants.CacheLinePadSize - unsafe.Sizeof(selectFactory{})]byte
contents []*slot[T]
_p5 cacheLinePadding
}
)

// returns the next greater power of 2 relative to val
func nextGreaterPowerOf2(val uint32) uint32 {
return 1 << int32(math.Min(math.Ceil(Fastlog2(math.Max(float64(val), 1))), 31))
return 1 << uint32(math.Min(math.Ceil(Fastlog2(math.Max(float64(val), 1))), 31))
}

// New returns a new queue given its payload type passed as a generic parameter
func New[T any](size uint32) *ZenQ[T] {
var (
queueSize uint32 = nextGreaterPowerOf2(size)
contents = make([]*Slot[T], queueSize, queueSize)
contents = make([]*slot[T], queueSize, queueSize)
parkPool = sync.Pool{New: func() any { return new(parkSpot[T]) }}
)
for idx := uint32(0); idx < queueSize; idx++ {
n := parkPool.Get().(*parkSpot[T])
n.threadPtr, n.next = nil, nil
contents[idx] = &Slot[T]{WriteParker: NewThreadParker[T](unsafe.Pointer(n))}
contents[idx] = &slot[T]{WriteParker: NewThreadParker[T](unsafe.Pointer(n))}
}
zenq := &ZenQ[T]{
contents: contents,
alloc: parkPool.Get,
free: parkPool.Put,
SelectFactory: SelectFactory{waitList: NewList()},
selectFactory: selectFactory{waitList: NewList()},
indexMask: queueSize - 1,
}
go zenq.selectSender()
Expand Down

0 comments on commit 00197d6

Please sign in to comment.