diff --git a/benchmarks/cgo_test/cgobench.go b/benchmarks/cgo_test/cgobench.go new file mode 100644 index 0000000..cf46074 --- /dev/null +++ b/benchmarks/cgo_test/cgobench.go @@ -0,0 +1,93 @@ +package main + +import ( + "fmt" + "runtime" + "sync" + "unsafe" + _ "unsafe" +) + +/* +#include +*/ +import "C" + +//go:linkname noescape runtime.noescape +func noescape(p unsafe.Pointer) unsafe.Pointer + +//go:linkname memmove runtime.memmove +func memmove(to, from unsafe.Pointer, n uintptr) + +//go:linkname memclrNoHeapPointers runtime.memclrNoHeapPointers +func memclrNoHeapPointers(ptr unsafe.Pointer, n uintptr) + +//go:linkname mallocgc runtime.mallocgc +func mallocgc(size uintptr, typ unsafe.Pointer, needzero bool) unsafe.Pointer + +func alloc[T any](sample T, size uintptr) unsafe.Pointer { + length := unsafe.Sizeof(sample) * size + return mallocgc(length, nil, true) +} + +func getIndexAt[T any](ptr unsafe.Pointer, offset uintptr) unsafe.Pointer { + return unsafe.Pointer(uintptr(ptr) + offset) +} + +type block struct { + Data int + Kooky string + Endy float64 + // Last *uint +} + +func main() { + // a := make([]int32, 0, 3) + // a = append(a, 10, 20, 30) + // t := unsafe.Pointer(&a[0]) + // fmt.Println(*(*int32)(unsafe.Pointer(uintptr(t) + 2*unsafe.Sizeof(int32(0))))) + // return + + const n = uintptr(100) + t := make([]block, n, n) + k := unsafe.Pointer(&t[0]) + // k := alloc(block{Data: 1, Kooky: "2", Endy: 3.2, Last: new(uint)}, n) + // k := C.calloc(C.ulong(unsafe.Sizeof(block{})), C.ulong(n)) + // unsafe.Slice(k, n) + // memclrNoHeapPointers(k, n) + // t := (*[]block)(k) + // runtime.KeepAlive((*[n]block)(k)) + // for i := uintptr(0); i < n; i++ { + // slot := getIndexAt[block](k, i*unsafe.Sizeof(block{})) + // slot.Data = int(i) + // slot.Kooky = fmt.Sprintf("wutface%d", i) + // slot.Endy = float64(i) + // slot.Last = new(uint) + // *slot.Last = uint(i) + // } + // for i := uintptr(0); i < n; i++ { + // fmt.Printf("%#v\n", t[i]) + // } + // return + var wg sync.WaitGroup + wg.Add(int(n)) + for i := uintptr(0); i < n; i++ { + slot := unsafe.Pointer(uintptr(k) + i*unsafe.Sizeof(block{})) + (*block)(slot).Data = int(i) + (*block)(slot).Kooky = fmt.Sprintf("wutface%d", i) + (*block)(slot).Endy = float64(i) + // (*block)(slot).Last = new(uint) + // *(*block)(slot).Last = uint(i) + } + for i := uintptr(0); i < n; i++ { + j := i + go func() { + slot := unsafe.Pointer(uintptr(k) + j*unsafe.Sizeof(block{})) + // *(*block)(slot).Last++ + fmt.Println(uintptr(slot), " ", *(*block)(slot)) + runtime.GC() + wg.Done() + }() + } + wg.Wait() +} diff --git a/zenq.go b/zenq.go index 88e355a..449d179 100644 --- a/zenq.go +++ b/zenq.go @@ -74,14 +74,13 @@ type ( _p2 [constants.CacheLinePadSize - unsafe.Sizeof(uint32(0))]byte globalState uint32 indexMask uint32 + contents unsafe.Pointer // memory pool refs for storing and leasing parking spots for goroutines alloc func() any free func(any) - _p3 [constants.CacheLinePadSize - 2*unsafe.Sizeof(uint32(0)) - 2*unsafe.Sizeof(func() {})]byte + _p3 [constants.CacheLinePadSize - 2*unsafe.Sizeof(uint32(0)) - 2*unsafe.Sizeof(func() {}) - unsafe.Sizeof(unsafe.Pointer(nil))]byte selectFactory - _p4 [constants.CacheLinePadSize - unsafe.Sizeof(selectFactory{})]byte - contents []*slot[T] - _p5 cacheLinePadding + _p4 [constants.CacheLinePadSize - unsafe.Sizeof(selectFactory{})]byte } ) @@ -94,16 +93,16 @@ func nextGreaterPowerOf2(val uint32) uint32 { func New[T any](size uint32) *ZenQ[T] { var ( queueSize uint32 = nextGreaterPowerOf2(size) - contents = make([]*slot[T], queueSize, queueSize) + contents = make([]slot[T], queueSize, queueSize) parkPool = sync.Pool{New: func() any { return new(parkSpot[T]) }} ) for idx := uint32(0); idx < queueSize; idx++ { n := parkPool.Get().(*parkSpot[T]) n.threadPtr, n.next = nil, nil - contents[idx] = &slot[T]{WriteParker: NewThreadParker[T](unsafe.Pointer(n))} + contents[idx].WriteParker = NewThreadParker[T](unsafe.Pointer(n)) } zenq := &ZenQ[T]{ - contents: contents, + contents: unsafe.Pointer(&contents[0]), alloc: parkPool.Get, free: parkPool.Put, selectFactory: selectFactory{waitList: NewList()}, @@ -148,7 +147,7 @@ direct_send: goto direct_send } - slot := self.contents[atomic.AddUint32(&self.writerIndex, 1)&self.indexMask] + slot := (*slot[T])(unsafe.Pointer(uintptr(self.contents) + uintptr(atomic.AddUint32(&self.writerIndex, 1)&self.indexMask)*unsafe.Sizeof(slot[T]{}))) // CAS -> change slot_state to busy if slot_state == empty for !atomic.CompareAndSwapUint32(&slot.State, SlotEmpty, SlotBusy) { @@ -174,7 +173,7 @@ direct_send: // Read reads a value from the queue, you can once read once per object func (self *ZenQ[T]) Read() (data T, queueOpen bool) { - slot := self.contents[atomic.AddUint32(&self.readerIndex, 1)&self.indexMask] + slot := (*slot[T])(unsafe.Pointer(uintptr(self.contents) + uintptr(atomic.AddUint32(&self.readerIndex, 1)&self.indexMask)*unsafe.Sizeof(slot[T]{}))) // CAS -> change slot_state to busy if slot_state == committed for !atomic.CompareAndSwapUint32(&slot.State, SlotCommitted, SlotBusy) { @@ -223,7 +222,7 @@ func (self *ZenQ[T]) Close() (alreadyClosedForWrites bool) { alreadyClosedForWrites = true return } - slot := self.contents[atomic.AddUint32(&self.writerIndex, 1)&self.indexMask] + slot := (*slot[T])(unsafe.Pointer(uintptr(self.contents) + uintptr(atomic.AddUint32(&self.writerIndex, 1)&self.indexMask)*unsafe.Sizeof(slot[T]{}))) // CAS -> change slot_state to busy if slot_state == empty for !atomic.CompareAndSwapUint32(&slot.State, SlotEmpty, SlotBusy) { @@ -291,9 +290,9 @@ func (self *ZenQ[T]) Reset() { // Unsafe to be called from multiple goroutines func (self *ZenQ[T]) Dump() { fmt.Printf("writerIndex: %3d, readerIndex: %3d\n contents:-\n\n", self.writerIndex, self.readerIndex) - for idx := range self.contents { - fmt.Printf("%5v : State -> %5v, Item -> %5v\n", idx, self.contents[idx].State, self.contents[idx].Item) - } + // for idx := range self.contents { + // fmt.Printf("%5v : State -> %5v, Item -> %5v\n", idx, self.contents[idx].State, self.contents[idx].Item) + // } } // selectSender is an auxillary thread which remains parked by default