Skip to content

Commit

Permalink
use better cache line pads
Browse files Browse the repository at this point in the history
  • Loading branch information
alphadose committed Jul 26, 2022
1 parent d98439f commit 7fa4e6c
Showing 1 changed file with 15 additions and 12 deletions.
27 changes: 15 additions & 12 deletions zenq.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,21 @@ type (

// ZenQ is the CPU cache optimized ringbuffer implementation
ZenQ[T any] struct {
// The padding members 1 to 5 below are here to ensure each item is on a separate cache line.
// The padding members 0 to 4 below are here to ensure each item is on a separate cache line.
// This prevents false sharing and hence improves performance.
writerIndex uint32
_p1 [constants.CacheLinePadSize - unsafe.Sizeof(uint32(0))]byte
readerIndex uint32
_p2 [constants.CacheLinePadSize - unsafe.Sizeof(uint32(0))]byte
globalState uint32
indexMask uint32
contents unsafe.Pointer
_p0 cacheLinePadding
writerIndex uint32
_p1 [constants.CacheLinePadSize - unsafe.Sizeof(uint32(0))]byte
readerIndex uint32
_p2 [constants.CacheLinePadSize - unsafe.Sizeof(uint32(0))]byte
globalState uint32
indexMask uint32
strideLength uintptr
contents unsafe.Pointer
// memory pool refs for storing and leasing parking spots for goroutines
alloc func() any
free func(any)
_p3 [constants.CacheLinePadSize - 2*unsafe.Sizeof(uint32(0)) - 2*unsafe.Sizeof(func() {}) - unsafe.Sizeof(unsafe.Pointer(nil))]byte
_p3 [constants.CacheLinePadSize - 2*unsafe.Sizeof(uint32(0)) - 2*unsafe.Sizeof(func() {}) - unsafe.Sizeof(unsafe.Pointer(nil)) - unsafe.Sizeof(uintptr(0))]byte
selectFactory
_p4 [constants.CacheLinePadSize - unsafe.Sizeof(selectFactory{})]byte
}
Expand All @@ -102,6 +104,7 @@ func New[T any](size uint32) *ZenQ[T] {
contents[idx].WriteParker = NewThreadParker[T](unsafe.Pointer(n))
}
zenq := &ZenQ[T]{
strideLength: unsafe.Sizeof(slot[T]{}),
contents: unsafe.Pointer(&contents[0]),
alloc: parkPool.Get,
free: parkPool.Put,
Expand Down Expand Up @@ -147,7 +150,7 @@ direct_send:
goto direct_send
}

slot := (*slot[T])(unsafe.Pointer(uintptr(self.contents) + uintptr(atomic.AddUint32(&self.writerIndex, 1)&self.indexMask)*unsafe.Sizeof(slot[T]{})))
slot := (*slot[T])(unsafe.Pointer(uintptr(atomic.AddUint32(&self.writerIndex, 1)&self.indexMask)*self.strideLength + uintptr(self.contents)))

// CAS -> change slot_state to busy if slot_state == empty
for !atomic.CompareAndSwapUint32(&slot.State, SlotEmpty, SlotBusy) {
Expand All @@ -173,7 +176,7 @@ direct_send:

// Read reads a value from the queue, you can once read once per object
func (self *ZenQ[T]) Read() (data T, queueOpen bool) {
slot := (*slot[T])(unsafe.Pointer(uintptr(self.contents) + uintptr(atomic.AddUint32(&self.readerIndex, 1)&self.indexMask)*unsafe.Sizeof(slot[T]{})))
slot := (*slot[T])(unsafe.Pointer(uintptr(atomic.AddUint32(&self.readerIndex, 1)&self.indexMask)*self.strideLength + uintptr(self.contents)))

// CAS -> change slot_state to busy if slot_state == committed
for !atomic.CompareAndSwapUint32(&slot.State, SlotCommitted, SlotBusy) {
Expand Down Expand Up @@ -222,7 +225,7 @@ func (self *ZenQ[T]) Close() (alreadyClosedForWrites bool) {
alreadyClosedForWrites = true
return
}
slot := (*slot[T])(unsafe.Pointer(uintptr(self.contents) + uintptr(atomic.AddUint32(&self.writerIndex, 1)&self.indexMask)*unsafe.Sizeof(slot[T]{})))
slot := (*slot[T])(unsafe.Pointer(uintptr(atomic.AddUint32(&self.writerIndex, 1)&self.indexMask)*self.strideLength + uintptr(self.contents)))

// CAS -> change slot_state to busy if slot_state == empty
for !atomic.CompareAndSwapUint32(&slot.State, SlotEmpty, SlotBusy) {
Expand Down

0 comments on commit 7fa4e6c

Please sign in to comment.