Skip to content

Commit

Permalink
switch to pointer arithmetic
Browse files Browse the repository at this point in the history
  • Loading branch information
alphadose committed Jul 26, 2022
1 parent 00197d6 commit d98439f
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 13 deletions.
93 changes: 93 additions & 0 deletions benchmarks/cgo_test/cgobench.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package main

import (
"fmt"
"runtime"
"sync"
"unsafe"
_ "unsafe"
)

/*
#include <stdlib.h>
*/
import "C"

//go:linkname noescape runtime.noescape
func noescape(p unsafe.Pointer) unsafe.Pointer

//go:linkname memmove runtime.memmove
func memmove(to, from unsafe.Pointer, n uintptr)

//go:linkname memclrNoHeapPointers runtime.memclrNoHeapPointers
func memclrNoHeapPointers(ptr unsafe.Pointer, n uintptr)

//go:linkname mallocgc runtime.mallocgc
func mallocgc(size uintptr, typ unsafe.Pointer, needzero bool) unsafe.Pointer

func alloc[T any](sample T, size uintptr) unsafe.Pointer {
length := unsafe.Sizeof(sample) * size
return mallocgc(length, nil, true)
}

func getIndexAt[T any](ptr unsafe.Pointer, offset uintptr) unsafe.Pointer {
return unsafe.Pointer(uintptr(ptr) + offset)
}

type block struct {
Data int
Kooky string
Endy float64
// Last *uint
}

func main() {
// a := make([]int32, 0, 3)
// a = append(a, 10, 20, 30)
// t := unsafe.Pointer(&a[0])
// fmt.Println(*(*int32)(unsafe.Pointer(uintptr(t) + 2*unsafe.Sizeof(int32(0)))))
// return

const n = uintptr(100)
t := make([]block, n, n)
k := unsafe.Pointer(&t[0])
// k := alloc(block{Data: 1, Kooky: "2", Endy: 3.2, Last: new(uint)}, n)
// k := C.calloc(C.ulong(unsafe.Sizeof(block{})), C.ulong(n))
// unsafe.Slice(k, n)
// memclrNoHeapPointers(k, n)
// t := (*[]block)(k)
// runtime.KeepAlive((*[n]block)(k))
// for i := uintptr(0); i < n; i++ {
// slot := getIndexAt[block](k, i*unsafe.Sizeof(block{}))
// slot.Data = int(i)
// slot.Kooky = fmt.Sprintf("wutface%d", i)
// slot.Endy = float64(i)
// slot.Last = new(uint)
// *slot.Last = uint(i)
// }
// for i := uintptr(0); i < n; i++ {
// fmt.Printf("%#v\n", t[i])
// }
// return
var wg sync.WaitGroup
wg.Add(int(n))
for i := uintptr(0); i < n; i++ {
slot := unsafe.Pointer(uintptr(k) + i*unsafe.Sizeof(block{}))
(*block)(slot).Data = int(i)
(*block)(slot).Kooky = fmt.Sprintf("wutface%d", i)
(*block)(slot).Endy = float64(i)
// (*block)(slot).Last = new(uint)
// *(*block)(slot).Last = uint(i)
}
for i := uintptr(0); i < n; i++ {
j := i
go func() {
slot := unsafe.Pointer(uintptr(k) + j*unsafe.Sizeof(block{}))
// *(*block)(slot).Last++
fmt.Println(uintptr(slot), " ", *(*block)(slot))
runtime.GC()
wg.Done()
}()
}
wg.Wait()
}
25 changes: 12 additions & 13 deletions zenq.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,13 @@ type (
_p2 [constants.CacheLinePadSize - unsafe.Sizeof(uint32(0))]byte
globalState uint32
indexMask uint32
contents unsafe.Pointer
// memory pool refs for storing and leasing parking spots for goroutines
alloc func() any
free func(any)
_p3 [constants.CacheLinePadSize - 2*unsafe.Sizeof(uint32(0)) - 2*unsafe.Sizeof(func() {})]byte
_p3 [constants.CacheLinePadSize - 2*unsafe.Sizeof(uint32(0)) - 2*unsafe.Sizeof(func() {}) - unsafe.Sizeof(unsafe.Pointer(nil))]byte
selectFactory
_p4 [constants.CacheLinePadSize - unsafe.Sizeof(selectFactory{})]byte
contents []*slot[T]
_p5 cacheLinePadding
_p4 [constants.CacheLinePadSize - unsafe.Sizeof(selectFactory{})]byte
}
)

Expand All @@ -94,16 +93,16 @@ func nextGreaterPowerOf2(val uint32) uint32 {
func New[T any](size uint32) *ZenQ[T] {
var (
queueSize uint32 = nextGreaterPowerOf2(size)
contents = make([]*slot[T], queueSize, queueSize)
contents = make([]slot[T], queueSize, queueSize)
parkPool = sync.Pool{New: func() any { return new(parkSpot[T]) }}
)
for idx := uint32(0); idx < queueSize; idx++ {
n := parkPool.Get().(*parkSpot[T])
n.threadPtr, n.next = nil, nil
contents[idx] = &slot[T]{WriteParker: NewThreadParker[T](unsafe.Pointer(n))}
contents[idx].WriteParker = NewThreadParker[T](unsafe.Pointer(n))
}
zenq := &ZenQ[T]{
contents: contents,
contents: unsafe.Pointer(&contents[0]),
alloc: parkPool.Get,
free: parkPool.Put,
selectFactory: selectFactory{waitList: NewList()},
Expand Down Expand Up @@ -148,7 +147,7 @@ direct_send:
goto direct_send
}

slot := self.contents[atomic.AddUint32(&self.writerIndex, 1)&self.indexMask]
slot := (*slot[T])(unsafe.Pointer(uintptr(self.contents) + uintptr(atomic.AddUint32(&self.writerIndex, 1)&self.indexMask)*unsafe.Sizeof(slot[T]{})))

// CAS -> change slot_state to busy if slot_state == empty
for !atomic.CompareAndSwapUint32(&slot.State, SlotEmpty, SlotBusy) {
Expand All @@ -174,7 +173,7 @@ direct_send:

// Read reads a value from the queue, you can once read once per object
func (self *ZenQ[T]) Read() (data T, queueOpen bool) {
slot := self.contents[atomic.AddUint32(&self.readerIndex, 1)&self.indexMask]
slot := (*slot[T])(unsafe.Pointer(uintptr(self.contents) + uintptr(atomic.AddUint32(&self.readerIndex, 1)&self.indexMask)*unsafe.Sizeof(slot[T]{})))

// CAS -> change slot_state to busy if slot_state == committed
for !atomic.CompareAndSwapUint32(&slot.State, SlotCommitted, SlotBusy) {
Expand Down Expand Up @@ -223,7 +222,7 @@ func (self *ZenQ[T]) Close() (alreadyClosedForWrites bool) {
alreadyClosedForWrites = true
return
}
slot := self.contents[atomic.AddUint32(&self.writerIndex, 1)&self.indexMask]
slot := (*slot[T])(unsafe.Pointer(uintptr(self.contents) + uintptr(atomic.AddUint32(&self.writerIndex, 1)&self.indexMask)*unsafe.Sizeof(slot[T]{})))

// CAS -> change slot_state to busy if slot_state == empty
for !atomic.CompareAndSwapUint32(&slot.State, SlotEmpty, SlotBusy) {
Expand Down Expand Up @@ -291,9 +290,9 @@ func (self *ZenQ[T]) Reset() {
// Unsafe to be called from multiple goroutines
func (self *ZenQ[T]) Dump() {
fmt.Printf("writerIndex: %3d, readerIndex: %3d\n contents:-\n\n", self.writerIndex, self.readerIndex)
for idx := range self.contents {
fmt.Printf("%5v : State -> %5v, Item -> %5v\n", idx, self.contents[idx].State, self.contents[idx].Item)
}
// for idx := range self.contents {
// fmt.Printf("%5v : State -> %5v, Item -> %5v\n", idx, self.contents[idx].State, self.contents[idx].Item)
// }
}

// selectSender is an auxillary thread which remains parked by default
Expand Down

0 comments on commit d98439f

Please sign in to comment.