diff --git a/zenq.go b/zenq.go index 449d179..af9ebd8 100644 --- a/zenq.go +++ b/zenq.go @@ -66,19 +66,21 @@ type ( // ZenQ is the CPU cache optimized ringbuffer implementation ZenQ[T any] struct { - // The padding members 1 to 5 below are here to ensure each item is on a separate cache line. + // The padding members 0 to 4 below are here to ensure each item is on a separate cache line. // This prevents false sharing and hence improves performance. - writerIndex uint32 - _p1 [constants.CacheLinePadSize - unsafe.Sizeof(uint32(0))]byte - readerIndex uint32 - _p2 [constants.CacheLinePadSize - unsafe.Sizeof(uint32(0))]byte - globalState uint32 - indexMask uint32 - contents unsafe.Pointer + _p0 cacheLinePadding + writerIndex uint32 + _p1 [constants.CacheLinePadSize - unsafe.Sizeof(uint32(0))]byte + readerIndex uint32 + _p2 [constants.CacheLinePadSize - unsafe.Sizeof(uint32(0))]byte + globalState uint32 + indexMask uint32 + strideLength uintptr + contents unsafe.Pointer // memory pool refs for storing and leasing parking spots for goroutines alloc func() any free func(any) - _p3 [constants.CacheLinePadSize - 2*unsafe.Sizeof(uint32(0)) - 2*unsafe.Sizeof(func() {}) - unsafe.Sizeof(unsafe.Pointer(nil))]byte + _p3 [constants.CacheLinePadSize - 2*unsafe.Sizeof(uint32(0)) - 2*unsafe.Sizeof(func() {}) - unsafe.Sizeof(unsafe.Pointer(nil)) - unsafe.Sizeof(uintptr(0))]byte selectFactory _p4 [constants.CacheLinePadSize - unsafe.Sizeof(selectFactory{})]byte } @@ -102,6 +104,7 @@ func New[T any](size uint32) *ZenQ[T] { contents[idx].WriteParker = NewThreadParker[T](unsafe.Pointer(n)) } zenq := &ZenQ[T]{ + strideLength: unsafe.Sizeof(slot[T]{}), contents: unsafe.Pointer(&contents[0]), alloc: parkPool.Get, free: parkPool.Put, @@ -147,7 +150,7 @@ direct_send: goto direct_send } - slot := (*slot[T])(unsafe.Pointer(uintptr(self.contents) + uintptr(atomic.AddUint32(&self.writerIndex, 1)&self.indexMask)*unsafe.Sizeof(slot[T]{}))) + slot := (*slot[T])(unsafe.Pointer(uintptr(atomic.AddUint32(&self.writerIndex, 1)&self.indexMask)*self.strideLength + uintptr(self.contents))) // CAS -> change slot_state to busy if slot_state == empty for !atomic.CompareAndSwapUint32(&slot.State, SlotEmpty, SlotBusy) { @@ -173,7 +176,7 @@ direct_send: // Read reads a value from the queue, you can once read once per object func (self *ZenQ[T]) Read() (data T, queueOpen bool) { - slot := (*slot[T])(unsafe.Pointer(uintptr(self.contents) + uintptr(atomic.AddUint32(&self.readerIndex, 1)&self.indexMask)*unsafe.Sizeof(slot[T]{}))) + slot := (*slot[T])(unsafe.Pointer(uintptr(atomic.AddUint32(&self.readerIndex, 1)&self.indexMask)*self.strideLength + uintptr(self.contents))) // CAS -> change slot_state to busy if slot_state == committed for !atomic.CompareAndSwapUint32(&slot.State, SlotCommitted, SlotBusy) { @@ -222,7 +225,7 @@ func (self *ZenQ[T]) Close() (alreadyClosedForWrites bool) { alreadyClosedForWrites = true return } - slot := (*slot[T])(unsafe.Pointer(uintptr(self.contents) + uintptr(atomic.AddUint32(&self.writerIndex, 1)&self.indexMask)*unsafe.Sizeof(slot[T]{}))) + slot := (*slot[T])(unsafe.Pointer(uintptr(atomic.AddUint32(&self.writerIndex, 1)&self.indexMask)*self.strideLength + uintptr(self.contents))) // CAS -> change slot_state to busy if slot_state == empty for !atomic.CompareAndSwapUint32(&slot.State, SlotEmpty, SlotBusy) {