diff --git a/README.md b/README.md index 999c236..45baa4f 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ # ZenQ -> A low-latency thread-safe queue in golang implemented using a lock-free ringbuffer +> A low-latency thread-safe queue in golang implemented using a lock-free ringbuffer and runtime internals + +Based on the [LMAX Disruptor Pattern](https://lmax-exchange.github.io/disruptor/disruptor.html) ## Features diff --git a/lib_runtime_linkage.go b/lib_runtime_linkage.go index 550f108..e1c2181 100644 --- a/lib_runtime_linkage.go +++ b/lib_runtime_linkage.go @@ -8,9 +8,9 @@ import ( "github.com/alphadose/zenq/v2/constants" ) -const cacheLinePadSize = constants.CacheLinePadSize - -type cacheLinePadding struct{ _ [cacheLinePadSize]byte } +type cacheLinePadding struct { + _ [constants.CacheLinePadSize]byte +} // Linking ZenQ with golang internal runtime library to allow usage of scheduling primitives // like goready(), mcall() etc to allow low-level scheduling of goroutines diff --git a/zenq.go b/zenq.go index 13f7c85..88e355a 100644 --- a/zenq.go +++ b/zenq.go @@ -1,15 +1,16 @@ -// A minimalist thread-safe queue implemented using a lock-free ringbuffer which is even faster -// and more resource friendly than golang's native channels +// A minimalist thread-safe queue implemented using a lock-free ringbuffer which is faster +// and has lower memory allocations than golang's native channels +// Based on the LMAX disruptor pattern https://lmax-exchange.github.io/disruptor/disruptor.html // Known Limitations:- // -// 1. Current max queue_size = 2^31 -// 2. The size of the queue must be a power of 2 +// 1. Max queue_size = 2^31 +// 2. The queue_size is a power of 2, in case a different size is provided then queue_size is rounded up to the next greater power of 2 // Suggestions:- // -// 1. Use runtime.LockOSThread() on the goroutine calling ZenQ.Read() for best performance provided you have > 1 cpu cores -// 2. Use large queue sizes (>= 2^14) in SPSC mode for best gains +// 1. Use runtime.LockOSThread() on the goroutine calling ZenQ.Read() for lowest latency provided you have > 1 cpu cores +// package zenq @@ -19,6 +20,8 @@ import ( "sync" "sync/atomic" "unsafe" + + "github.com/alphadose/zenq/v2/constants" ) // ZenQ global state enums @@ -48,13 +51,13 @@ const ( ) type ( - Slot[T any] struct { + slot[T any] struct { State uint32 WriteParker *ThreadParker[T] Item T } - SelectFactory struct { + selectFactory struct { state uint32 auxThread unsafe.Pointer backlog unsafe.Pointer @@ -66,44 +69,44 @@ type ( // The padding members 1 to 5 below are here to ensure each item is on a separate cache line. // This prevents false sharing and hence improves performance. writerIndex uint32 - _p1 [cacheLinePadSize - unsafe.Sizeof(uint32(0))]byte + _p1 [constants.CacheLinePadSize - unsafe.Sizeof(uint32(0))]byte readerIndex uint32 - _p2 [cacheLinePadSize - unsafe.Sizeof(uint32(0))]byte + _p2 [constants.CacheLinePadSize - unsafe.Sizeof(uint32(0))]byte globalState uint32 indexMask uint32 // memory pool refs for storing and leasing parking spots for goroutines alloc func() any free func(any) - _p3 [cacheLinePadSize - 2*unsafe.Sizeof(uint32(0)) - 2*unsafe.Sizeof(func() {})]byte - SelectFactory - _p4 [cacheLinePadSize - unsafe.Sizeof(SelectFactory{})]byte - contents []*Slot[T] + _p3 [constants.CacheLinePadSize - 2*unsafe.Sizeof(uint32(0)) - 2*unsafe.Sizeof(func() {})]byte + selectFactory + _p4 [constants.CacheLinePadSize - unsafe.Sizeof(selectFactory{})]byte + contents []*slot[T] _p5 cacheLinePadding } ) // returns the next greater power of 2 relative to val func nextGreaterPowerOf2(val uint32) uint32 { - return 1 << int32(math.Min(math.Ceil(Fastlog2(math.Max(float64(val), 1))), 31)) + return 1 << uint32(math.Min(math.Ceil(Fastlog2(math.Max(float64(val), 1))), 31)) } // New returns a new queue given its payload type passed as a generic parameter func New[T any](size uint32) *ZenQ[T] { var ( queueSize uint32 = nextGreaterPowerOf2(size) - contents = make([]*Slot[T], queueSize, queueSize) + contents = make([]*slot[T], queueSize, queueSize) parkPool = sync.Pool{New: func() any { return new(parkSpot[T]) }} ) for idx := uint32(0); idx < queueSize; idx++ { n := parkPool.Get().(*parkSpot[T]) n.threadPtr, n.next = nil, nil - contents[idx] = &Slot[T]{WriteParker: NewThreadParker[T](unsafe.Pointer(n))} + contents[idx] = &slot[T]{WriteParker: NewThreadParker[T](unsafe.Pointer(n))} } zenq := &ZenQ[T]{ contents: contents, alloc: parkPool.Get, free: parkPool.Put, - SelectFactory: SelectFactory{waitList: NewList()}, + selectFactory: selectFactory{waitList: NewList()}, indexMask: queueSize - 1, } go zenq.selectSender()