diff --git a/gogenerics/constraints/errors.go b/gogenerics/constraints/errors.go index 369ff9e..48d9653 100644 --- a/gogenerics/constraints/errors.go +++ b/gogenerics/constraints/errors.go @@ -5,6 +5,7 @@ import ( ) // AtomicError defines an atomic error. + type AtomicError struct { err atomic.Value // error } diff --git a/gogenerics/constraints/errors_test.go b/gogenerics/constraints/errors_test.go index 5bf2b0e..07dc726 100644 --- a/gogenerics/constraints/errors_test.go +++ b/gogenerics/constraints/errors_test.go @@ -40,10 +40,7 @@ func BenchmarkAtomicError(b *testing.B) { b.Run("Load", func(b *testing.B) { var done uint32 go func() { - for { - if atomic.LoadUint32(&done) != 0 { - break - } + for atomic.LoadUint32(&done) == 0 { wg.Add(1) go func() { aerr.Set(errDummy) @@ -62,10 +59,7 @@ func BenchmarkAtomicError(b *testing.B) { b.Run("Set", func(b *testing.B) { var done uint32 go func() { - for { - if atomic.LoadUint32(&done) != 0 { - break - } + for atomic.LoadUint32(&done) == 0 { wg.Add(1) go func() { _ = aerr.Load() diff --git a/gogenerics/gconcurrent/mapx/README.md b/gogenerics/gconcurrent/mapx/README.md new file mode 100644 index 0000000..26a9323 --- /dev/null +++ b/gogenerics/gconcurrent/mapx/README.md @@ -0,0 +1,2 @@ +# mapx +mapx is the thread-safe and high perf map implementation for golang. \ No newline at end of file diff --git a/gogenerics/gconcurrent/mr/mr.go b/gogenerics/gconcurrent/mr/mr.go new file mode 100644 index 0000000..8ab7ec3 --- /dev/null +++ b/gogenerics/gconcurrent/mr/mr.go @@ -0,0 +1,391 @@ +package mr + +import ( + "context" + "errors" + "sync" + "sync/atomic" + + "github.com/miniLCT/gosb/gogenerics/constraints" +) + +const ( + defaultWorkers = 16 + minWorkers = 1 +) + +var ( + // ErrCancelWithNil is an error that mapreduce was cancelled with nil. + ErrCancelWithNil = errors.New("mapreduce cancelled with nil") + + // ErrReduceNoOutput is an error that reduce did not output a value. + ErrReduceNoOutput = errors.New("reduce not writing value") +) + +type ( + // ForEachFunc is used to do element processing, but no output. + ForEachFunc[T any] func(item T) + + // GenerateFunc is used to let callers send elements into source. + GenerateFunc[T any] func(source chan<- T) + + // MapFunc is used to do element processing and write the output to writer. + MapFunc[T, U any] func(item T, writer Writer[U]) + + // MapperFunc is used to do element processing and write the output to writer, + // use cancel func to cancel the processing. + MapperFunc[T, U any] func(item T, writer Writer[U], cancel func(error)) + + // ReducerFunc is used to reduce all the mapping output and write to writer, + // use cancel func to cancel the processing. + ReducerFunc[U, V any] func(pipe <-chan U, writer Writer[V], cancel func(error)) + + // VoidReducerFunc is used to reduce all the mapping output, but no output. + // Use cancel func to cancel the processing. + VoidReducerFunc[U any] func(pipe <-chan U, cancel func(error)) + + // Option defines the method to customize the mapreduce. + Option func(opts *mapReduceOptions) + + mapperContext[T, U any] struct { + ctx context.Context + mapper MapFunc[T, U] + source <-chan T + panicChan *onceChan + collector chan<- U + doneChan <-chan struct{} + workers int + } + + mapReduceOptions struct { + ctx context.Context + workers int + } + + // Writer interface wraps Write method. + Writer[T any] interface { + Write(v T) + } +) + +// Finish runs fns parallelly, cancelled on any error. +func Finish(fns ...func() error) error { + if len(fns) == 0 { + return nil + } + + return MapReduceVoid(func(source chan<- func() error) { + for _, fn := range fns { + source <- fn + } + }, func(fn func() error, writer Writer[any], cancel func(error)) { + if err := fn(); err != nil { + cancel(err) + } + }, func(pipe <-chan any, cancel func(error)) {}, + WithWorkers(len(fns))) +} + +// FinishVoid runs fns parallelly. +func FinishVoid(fns ...func()) { + if len(fns) == 0 { + return + } + + ForEach(func(source chan<- func()) { + for _, fn := range fns { + source <- fn + } + }, func(fn func()) { + fn() + }, WithWorkers(len(fns))) +} + +// ForEach maps all elements from given generate but no output. +func ForEach[T any](generate GenerateFunc[T], mapper ForEachFunc[T], opts ...Option) { + options := buildOptions(opts...) + panicChan := &onceChan{channel: make(chan any)} + source := buildSource(generate, panicChan) + collector := make(chan any) + done := make(chan struct{}) + + go executeMappers(mapperContext[T, any]{ + ctx: options.ctx, + mapper: func(item T, _ Writer[any]) { + mapper(item) + }, + source: source, + panicChan: panicChan, + collector: collector, + doneChan: done, + workers: options.workers, + }) + + for { + select { + case v := <-panicChan.channel: + panic(v) + case _, ok := <-collector: + if !ok { + return + } + } + } +} + +// MapReduce maps all elements generated from given generate func, +// and reduces the output elements with given reducer. +func MapReduce[T, U, V any](generate GenerateFunc[T], mapper MapperFunc[T, U], reducer ReducerFunc[U, V], + opts ...Option) (V, error) { + panicChan := &onceChan{channel: make(chan any)} + source := buildSource(generate, panicChan) + return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...) +} + +// MapReduceChan maps all elements from source, and reduce the output elements with given reducer. +func MapReduceChan[T, U, V any](source <-chan T, mapper MapperFunc[T, U], reducer ReducerFunc[U, V], + opts ...Option) (V, error) { + panicChan := &onceChan{channel: make(chan any)} + return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...) +} + +// mapReduceWithPanicChan maps all elements from source, and reduce the output elements with given reducer. +func mapReduceWithPanicChan[T, U, V any](source <-chan T, panicChan *onceChan, mapper MapperFunc[T, U], + reducer ReducerFunc[U, V], opts ...Option) (val V, err error) { + options := buildOptions(opts...) + // output is used to write the final result + output := make(chan V) + defer func() { + // reducer can only write once, if more, panic + for range output { + panic("more than one element written in reducer") + } + }() + + // collector is used to collect data from mapper, and consume in reducer + collector := make(chan U, options.workers) + // if done is closed, all mappers and reducer should stop processing + done := make(chan struct{}) + writer := newGuardedWriter(options.ctx, output, done) + var closeOnce sync.Once + // use atomic type to avoid data race + var retErr constraints.AtomicError + finish := func() { + closeOnce.Do(func() { + close(done) + close(output) + }) + } + cancel := once(func(err error) { + if err != nil { + retErr.Set(err) + } else { + retErr.Set(ErrCancelWithNil) + } + + drain(source) + finish() + }) + + go func() { + defer func() { + drain(collector) + if r := recover(); r != nil { + panicChan.write(r) + } + finish() + }() + + reducer(collector, writer, cancel) + }() + + go executeMappers(mapperContext[T, U]{ + ctx: options.ctx, + mapper: func(item T, w Writer[U]) { + mapper(item, w, cancel) + }, + source: source, + panicChan: panicChan, + collector: collector, + doneChan: done, + workers: options.workers, + }) + + select { + case <-options.ctx.Done(): + cancel(context.DeadlineExceeded) + err = context.DeadlineExceeded + case v := <-panicChan.channel: + // drain output here, otherwise for loop panic in defer + drain(output) + panic(v) + case v, ok := <-output: + if e := retErr.Load(); e != nil { + err = e + } else if ok { + val = v + } else { + err = ErrReduceNoOutput + } + } + + return +} + +// MapReduceVoid maps all elements generated from given generate, +// and reduce the output elements with given reducer. +func MapReduceVoid[T, U any](generate GenerateFunc[T], mapper MapperFunc[T, U], + reducer VoidReducerFunc[U], opts ...Option) error { + _, err := MapReduce(generate, mapper, func(input <-chan U, writer Writer[any], cancel func(error)) { + reducer(input, cancel) + }, opts...) + if errors.Is(err, ErrReduceNoOutput) { + return nil + } + + return err +} + +// WithContext customizes a mapreduce processing accepts a given ctx. +func WithContext(ctx context.Context) Option { + return func(opts *mapReduceOptions) { + opts.ctx = ctx + } +} + +// WithWorkers customizes a mapreduce processing with given workers. +func WithWorkers(workers int) Option { + return func(opts *mapReduceOptions) { + if workers < minWorkers { + opts.workers = minWorkers + } else { + opts.workers = workers + } + } +} + +func buildOptions(opts ...Option) *mapReduceOptions { + options := newOptions() + for _, opt := range opts { + opt(options) + } + + return options +} + +func buildSource[T any](generate GenerateFunc[T], panicChan *onceChan) chan T { + source := make(chan T) + go func() { + defer func() { + if r := recover(); r != nil { + panicChan.write(r) + } + close(source) + }() + + generate(source) + }() + + return source +} + +// drain drains the channel. +func drain[T any](channel <-chan T) { + // drain the channel + for range channel { + } + +} + +func executeMappers[T, U any](mCtx mapperContext[T, U]) { + var wg sync.WaitGroup + defer func() { + wg.Wait() + close(mCtx.collector) + drain(mCtx.source) + }() + + var failed int32 + pool := make(chan struct{}, mCtx.workers) + writer := newGuardedWriter(mCtx.ctx, mCtx.collector, mCtx.doneChan) + for atomic.LoadInt32(&failed) == 0 { + select { + case <-mCtx.ctx.Done(): + return + case <-mCtx.doneChan: + return + case pool <- struct{}{}: + item, ok := <-mCtx.source + if !ok { + <-pool + return + } + + wg.Add(1) + go func() { + defer func() { + if r := recover(); r != nil { + atomic.AddInt32(&failed, 1) + mCtx.panicChan.write(r) + } + wg.Done() + <-pool + }() + + mCtx.mapper(item, writer) + }() + } + } +} + +func newOptions() *mapReduceOptions { + return &mapReduceOptions{ + ctx: context.Background(), + workers: defaultWorkers, + } +} + +func once(fn func(error)) func(error) { + once := new(sync.Once) + return func(err error) { + once.Do(func() { + fn(err) + }) + } +} + +type guardedWriter[T any] struct { + ctx context.Context + channel chan<- T + done <-chan struct{} +} + +func newGuardedWriter[T any](ctx context.Context, channel chan<- T, done <-chan struct{}) guardedWriter[T] { + return guardedWriter[T]{ + ctx: ctx, + channel: channel, + done: done, + } +} + +func (gw guardedWriter[T]) Write(v T) { + select { + case <-gw.ctx.Done(): + return + case <-gw.done: + return + default: + gw.channel <- v + } +} + +type onceChan struct { + channel chan any + wrote int32 +} + +func (oc *onceChan) write(val any) { + if atomic.CompareAndSwapInt32(&oc.wrote, 0, 1) { + oc.channel <- val + } +} diff --git a/gogenerics/gconcurrent/mr/mr_test.go b/gogenerics/gconcurrent/mr/mr_test.go new file mode 100644 index 0000000..70c9790 --- /dev/null +++ b/gogenerics/gconcurrent/mr/mr_test.go @@ -0,0 +1 @@ +package mr diff --git a/gogenerics/gconcurrent/sync/map.go b/gogenerics/gconcurrent/sync/map.go new file mode 100644 index 0000000..7d64677 --- /dev/null +++ b/gogenerics/gconcurrent/sync/map.go @@ -0,0 +1,524 @@ +package sync + +import ( + "sync" + "sync/atomic" + "unsafe" +) + +// Map is like a Go map[any]any but is safe for concurrent use +// by multiple goroutines without additional locking or coordination. +// Loads, stores, and deletes run in amortized constant time. +// +// The Map type is specialized. Most code should use a plain Go map instead, +// with separate locking or coordination, for better type safety and to make it +// easier to maintain other invariants along with the map content. +// +// The Map type is optimized for two common use cases: (1) when the entry for a given +// key is only ever written once but read many times, as in caches that only grow, +// or (2) when multiple goroutines read, write, and overwrite entries for disjoint +// sets of keys. In these two cases, use of a Map may significantly reduce lock +// contention compared to a Go map paired with a separate Mutex or RWMutex. +// +// The zero Map is empty and ready for use. A Map must not be copied after first use. +// +// In the terminology of the Go memory model, Map arranges that a write operation +// “synchronizes before” any read operation that observes the effect of the write, where +// read and write operations are defined as follows. +// Load, LoadAndDelete, LoadOrStore, Swap, CompareAndSwap, and CompareAndDelete +// are read operations; Delete, LoadAndDelete, Store, and Swap are write operations; +// LoadOrStore is a write operation when it returns loaded set to false; +// CompareAndSwap is a write operation when it returns swapped set to true; +// and CompareAndDelete is a write operation when it returns deleted set to true. + +type Map[K comparable, V any] struct { + mu sync.Mutex + + // read contains the portion of the map's contents that are safe for + // concurrent access (with or without mu held). + // + // The read field itself is always safe to load, but must only be stored with + // mu held. + // + // Entries stored in read may be updated concurrently without mu, but updating + // a previously-expunged entry requires that the entry be copied to the dirty + // map and unexpunged with mu held. + read atomic.Pointer[readOnly[K, V]] + + // dirty contains the portion of the map's contents that require mu to be + // held. To ensure that the dirty map can be promoted to the read map quickly, + // it also includes all of the non-expunged entries in the read map. + // + // Expunged entries are not stored in the dirty map. An expunged entry in the + // clean map must be unexpunged and added to the dirty map before a new value + // can be stored to it. + // + // If the dirty map is nil, the next write to the map will initialize it by + // making a shallow copy of the clean map, omitting stale entries. + dirty map[K]*entry[V] + + // misses counts the number of loads since the read map was last updated that + // needed to lock mu to determine whether the key was present. + // + // Once enough misses have occurred to cover the cost of copying the dirty + // map, the dirty map will be promoted to the read map (in the unamended + // state) and the next store to the map will make a new dirty copy. + misses int +} + +// readOnly is an immutable struct stored atomically in the Map.read field. +type readOnly[K comparable, V any] struct { + m map[K]*entry[V] + amended bool // true if the dirty map contains some key not in m. +} + +// expunged is an arbitrary pointer that marks entries which have been deleted +// from the dirty map. + +var expunged = unsafe.Pointer(new(any)) + +// An entry is a slot in the map corresponding to a particular key. + +type entry[V any] struct { + // p points to the any value stored for the entry. + // + // If p == nil, the entry has been deleted, and either m.dirty == nil or + // m.dirty[key] is e. + // + // If unsafe.Pointer(p) == expunged, the entry has been deleted, m.dirty != nil, and the entry + // is missing from m.dirty. + // + // Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty + // != nil, in m.dirty[key]. + // + // An entry can be deleted by atomic replacement with nil: when m.dirty is + // next created, it will atomically replace nil with expunged and leave + // m.dirty[key] unset. + // + // An entry's associated value can be updated by atomic replacement, provided + // p != expunged. If unsafe.Pointer(p) == expunged, an entry's associated value can be updated + // only after first setting m.dirty[key] = e so that lookups using the dirty + // map find the entry. + p atomic.Pointer[V] +} + +func newEntry[V any](i V) *entry[V] { + e := &entry[V]{} + e.p.Store(&i) + return e +} + +func (m *Map[K, V]) loadReadOnly() readOnly[K, V] { + if p := m.read.Load(); p != nil { + return *p + } + return readOnly[K, V]{} +} + +// Load returns the value stored in the map for a key, or nil if no +// value is present. +// The ok result indicates whether value was found in the map. +func (m *Map[K, V]) Load(key K) (value V, ok bool) { + read := m.loadReadOnly() + e, ok := read.m[key] + if !ok && read.amended { + m.mu.Lock() + // Avoid reporting a spurious miss if m.dirty got promoted while we were + // blocked on m.mu. (If further loads of the same key will not miss, it's + // not worth copying the dirty map for this key.) + read = m.loadReadOnly() + e, ok = read.m[key] + if !ok && read.amended { + e, ok = m.dirty[key] + // Regardless of whether the entry was present, record a miss: this key + // will take the slow path until the dirty map is promoted to the read + // map. + m.missLocked() + } + m.mu.Unlock() + } + if !ok { + var zero V + return zero, false + } + return e.load() +} + +func (e *entry[V]) load() (value V, ok bool) { + p := e.p.Load() + if p == nil || unsafe.Pointer(p) == expunged { + var zero V + return zero, false + } + return *p, true +} + +// Store sets the value for a key. +func (m *Map[K, V]) Store(key K, value V) { + _, _ = m.Swap(key, value) +} + +// tryCompareAndSwap compare the entry with the given old value and swaps +// it with a new value if the entry is equal to the old value, and the entry +// has not been expunged. +// +// If the entry is expunged, tryCompareAndSwap returns false and leaves +// the entry unchanged. +func (e *entry[V]) tryCompareAndSwap(old, new V) bool { + p := e.p.Load() + if p == nil || unsafe.Pointer(p) == expunged { + return false + } + + // Copy the interface after the first load to make this method more amenable + // to escape analysis: if the comparison fails from the start, we shouldn't + // bother heap-allocating an interface value to store. + nc := new + for { + if e.p.CompareAndSwap(p, &nc) { + return true + } + p = e.p.Load() + if p == nil || unsafe.Pointer(p) == expunged { + return false + } + } +} + +// unexpungeLocked ensures that the entry is not marked as expunged. +// +// If the entry was previously expunged, it must be added to the dirty map +// before m.mu is unlocked. +func (e *entry[V]) unexpungeLocked() (wasExpunged bool) { + return e.p.CompareAndSwap((*V)(expunged), nil) +} + +// swapLocked unconditionally swaps a value into the entry. +// +// The entry must be known not to be expunged. +func (e *entry[V]) swapLocked(i *V) *V { + return e.p.Swap(i) +} + +// LoadOrStore returns the existing value for the key if present. +// Otherwise, it stores and returns the given value. +// The loaded result is true if the value was loaded, false if stored. +func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) { + // Avoid locking if it's a clean hit. + read := m.loadReadOnly() + if e, ok := read.m[key]; ok { + actual, loaded, ok := e.tryLoadOrStore(value) + if ok { + return actual, loaded + } + } + + m.mu.Lock() + read = m.loadReadOnly() + if e, ok := read.m[key]; ok { + if e.unexpungeLocked() { + m.dirty[key] = e + } + actual, loaded, _ = e.tryLoadOrStore(value) + } else if e, ok := m.dirty[key]; ok { + actual, loaded, _ = e.tryLoadOrStore(value) + m.missLocked() + } else { + if !read.amended { + // We're adding the first new key to the dirty map. + // Make sure it is allocated and mark the read-only map as incomplete. + m.dirtyLocked() + m.read.Store(&readOnly[K, V]{m: read.m, amended: true}) + } + m.dirty[key] = newEntry(value) + actual, loaded = value, false + } + m.mu.Unlock() + + return actual, loaded +} + +// tryLoadOrStore atomically loads or stores a value if the entry is not +// expunged. +// +// If the entry is expunged, tryLoadOrStore leaves the entry unchanged and +// returns with ok==false. +func (e *entry[V]) tryLoadOrStore(i V) (actual V, loaded, ok bool) { + p := e.p.Load() + if unsafe.Pointer(p) == expunged { + var zero V + return zero, false, false + } + if p != nil { + return *p, true, true + } + + // Copy the interface after the first load to make this method more amenable + // to escape analysis: if we hit the "load" path or the entry is expunged, we + // shouldn't bother heap-allocating. + ic := i + for { + if e.p.CompareAndSwap(nil, &ic) { + return i, false, true + } + p = e.p.Load() + if unsafe.Pointer(p) == expunged { + var zero V + return zero, false, false + } + if p != nil { + return *p, true, true + } + } +} + +// LoadAndDelete deletes the value for a key, returning the previous value if any. +// The loaded result reports whether the key was present. +func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool) { + read := m.loadReadOnly() + e, ok := read.m[key] + if !ok && read.amended { + m.mu.Lock() + read = m.loadReadOnly() + e, ok = read.m[key] + if !ok && read.amended { + e, ok = m.dirty[key] + delete(m.dirty, key) + // Regardless of whether the entry was present, record a miss: this key + // will take the slow path until the dirty map is promoted to the read + // map. + m.missLocked() + } + m.mu.Unlock() + } + if ok { + return e.delete() + } + + var zero V + return zero, false +} + +// Delete deletes the value for a key. +func (m *Map[K, V]) Delete(key K) { + m.LoadAndDelete(key) +} + +func (e *entry[V]) delete() (value V, ok bool) { + for { + p := e.p.Load() + if p == nil || unsafe.Pointer(p) == expunged { + var zero V + return zero, false + } + if e.p.CompareAndSwap(p, nil) { + return *p, true + } + } +} + +// trySwap swaps a value if the entry has not been expunged. +// +// If the entry is expunged, trySwap returns false and leaves the entry +// unchanged. +func (e *entry[V]) trySwap(i *V) (*V, bool) { + for { + p := e.p.Load() + if unsafe.Pointer(p) == expunged { + return nil, false + } + if e.p.CompareAndSwap(p, i) { + return p, true + } + } +} + +// Swap swaps the value for a key and returns the previous value if any. +// The loaded result reports whether the key was present. +func (m *Map[K, V]) Swap(key K, value V) (previous V, loaded bool) { + read := m.loadReadOnly() + if e, ok := read.m[key]; ok { + if v, ok := e.trySwap(&value); ok { + if v == nil { + var zero V + return zero, false + } + return *v, true + } + } + + m.mu.Lock() + read = m.loadReadOnly() + if e, ok := read.m[key]; ok { + if e.unexpungeLocked() { + // The entry was previously expunged, which implies that there is a + // non-nil dirty map and this entry is not in it. + m.dirty[key] = e + } + if v := e.swapLocked(&value); v != nil { + loaded = true + previous = *v + } + } else if e, ok := m.dirty[key]; ok { + if v := e.swapLocked(&value); v != nil { + loaded = true + previous = *v + } + } else { + if !read.amended { + // We're adding the first new key to the dirty map. + // Make sure it is allocated and mark the read-only map as incomplete. + m.dirtyLocked() + m.read.Store(&readOnly[K, V]{m: read.m, amended: true}) + } + m.dirty[key] = newEntry(value) + } + m.mu.Unlock() + return previous, loaded +} + +// CompareAndSwap swaps the old and new values for key +// if the value stored in the map is equal to old. +// The old value must be of a comparable type. +func (m *Map[K, V]) CompareAndSwap(key K, old, new V) bool { + read := m.loadReadOnly() + if e, ok := read.m[key]; ok { + return e.tryCompareAndSwap(old, new) + } else if !read.amended { + return false // No existing value for key. + } + + m.mu.Lock() + defer m.mu.Unlock() + read = m.loadReadOnly() + swapped := false + if e, ok := read.m[key]; ok { + swapped = e.tryCompareAndSwap(old, new) + } else if e, ok := m.dirty[key]; ok { + swapped = e.tryCompareAndSwap(old, new) + // We needed to lock mu in order to load the entry for key, + // and the operation didn't change the set of keys in the map + // (so it would be made more efficient by promoting the dirty + // map to read-only). + // Count it as a miss so that we will eventually switch to the + // more efficient steady state. + m.missLocked() + } + return swapped +} + +// CompareAndDelete deletes the entry for key if its value is equal to old. +// The old value must be of a comparable type. +// +// If there is no current value for key in the map, CompareAndDelete +// returns false (even if the old value is the nil interface value). +func (m *Map[K, V]) CompareAndDelete(key K, old V) (deleted bool) { + read := m.loadReadOnly() + e, ok := read.m[key] + if !ok && read.amended { + m.mu.Lock() + read = m.loadReadOnly() + e, ok = read.m[key] + if !ok && read.amended { + e, ok = m.dirty[key] + // Don't delete key from m.dirty: we still need to do the “compare” part + // of the operation. The entry will eventually be expunged when the + // dirty map is promoted to the read map. + // + // Regardless of whether the entry was present, record a miss: this key + // will take the slow path until the dirty map is promoted to the read + // map. + m.missLocked() + } + m.mu.Unlock() + } + for ok { + p := e.p.Load() + if p == nil || unsafe.Pointer(p) == expunged { + return false + } + if e.p.CompareAndSwap(p, nil) { + return true + } + } + return false +} + +// Range calls f sequentially for each key and value present in the map. +// If f returns false, range stops the iteration. +// +// Range does not necessarily correspond to any consistent snapshot of the Map's +// contents: no key will be visited more than once, but if the value for any key +// is stored or deleted concurrently (including by f), Range may reflect any +// mapping for that key from any point during the Range call. Range does not +// block other methods on the receiver; even f itself may call any method on m. +// +// Range may be O(N) with the number of elements in the map even if f returns +// false after a constant number of calls. +func (m *Map[K, V]) Range(f func(key K, value V) bool) { + // We need to be able to iterate over all the keys that were already + // present at the start of the call to Range. + // If read.amended is false, then read.m satisfies that property without + // requiring us to hold m.mu for a long time. + read := m.loadReadOnly() + if read.amended { + // m.dirty contains keys not in read.m. Fortunately, Range is already O(N) + // (assuming the caller does not break out early), so a call to Range + // amortizes an entire copy of the map: we can promote the dirty copy + // immediately! + m.mu.Lock() + read = m.loadReadOnly() + if read.amended { + read = readOnly[K, V]{m: m.dirty} + m.read.Store(&read) + m.dirty = nil + m.misses = 0 + } + m.mu.Unlock() + } + + for k, e := range read.m { + v, ok := e.load() + if !ok { + continue + } + if !f(k, v) { + break + } + } +} + +func (m *Map[K, V]) missLocked() { + m.misses++ + if m.misses < len(m.dirty) { + return + } + m.read.Store(&readOnly[K, V]{m: m.dirty}) + m.dirty = nil + m.misses = 0 +} + +func (m *Map[K, V]) dirtyLocked() { + if m.dirty != nil { + return + } + + read := m.loadReadOnly() + m.dirty = make(map[K]*entry[V], len(read.m)) + for k, e := range read.m { + if !e.tryExpungeLocked() { + m.dirty[k] = e + } + } +} + +func (e *entry[V]) tryExpungeLocked() (isExpunged bool) { + p := e.p.Load() + for p == nil { + if e.p.CompareAndSwap(nil, (*V)(expunged)) { + return true + } + p = e.p.Load() + } + return unsafe.Pointer(p) == expunged +} diff --git a/gogenerics/gconcurrent/sync/map_bench_test.go b/gogenerics/gconcurrent/sync/map_bench_test.go new file mode 100644 index 0000000..1e6e294 --- /dev/null +++ b/gogenerics/gconcurrent/sync/map_bench_test.go @@ -0,0 +1,864 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "fmt" + "reflect" + "sync" + "sync/atomic" + "testing" +) + +type bench struct { + setup func(*testing.B, mapInterface) + perG func(b *testing.B, pb *testing.PB, i int, m mapInterface) +} + +func benchMap(b *testing.B, bench bench) { + for _, m := range [...]mapInterface{&DeepCopyMap{}, &RWMutexMap{}, &SyncMap[int, int]{}, &Map[int, int]{}} { + b.Run(fmt.Sprintf("%T", m), func(b *testing.B) { + m = reflect.New(reflect.TypeOf(m).Elem()).Interface().(mapInterface) + if bench.setup != nil { + bench.setup(b, m) + } + + b.ResetTimer() + + var i int64 + b.RunParallel(func(pb *testing.PB) { + id := int(atomic.AddInt64(&i, 1) - 1) + bench.perG(b, pb, id*b.N, m) + }) + }) + } +} + +func BenchmarkLoadMostlyHits(b *testing.B) { + const hits, misses = 1023, 1 + + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + for i := 0; i < hits; i++ { + m.LoadOrStore(i, i) + } + // Prime the map to get it into a steady state. + for i := 0; i < hits*2; i++ { + m.Load(i % hits) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.Load(i % (hits + misses)) + } + }, + }) +} + +func BenchmarkLoadMostlyMisses(b *testing.B) { + const hits, misses = 1, 1023 + + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + for i := 0; i < hits; i++ { + m.LoadOrStore(i, i) + } + // Prime the map to get it into a steady state. + for i := 0; i < hits*2; i++ { + m.Load(i % hits) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.Load(i % (hits + misses)) + } + }, + }) +} + +func BenchmarkLoadOrStoreBalanced(b *testing.B) { + const hits, misses = 128, 128 + + benchMap(b, bench{ + setup: func(b *testing.B, m mapInterface) { + if _, ok := m.(*DeepCopyMap); ok { + b.Skip("DeepCopyMap has quadratic running time.") + } + for i := 0; i < hits; i++ { + m.LoadOrStore(i, i) + } + // Prime the map to get it into a steady state. + for i := 0; i < hits*2; i++ { + m.Load(i % hits) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + j := i % (hits + misses) + if j < hits { + if _, ok := m.LoadOrStore(j, i); !ok { + b.Fatalf("unexpected miss for %v", j) + } + } else { + if v, loaded := m.LoadOrStore(i, i); loaded { + b.Fatalf("failed to store %v: existing value %v", i, v) + } + } + } + }, + }) +} + +func BenchmarkLoadOrStoreUnique(b *testing.B) { + benchMap(b, bench{ + setup: func(b *testing.B, m mapInterface) { + if _, ok := m.(*DeepCopyMap); ok { + b.Skip("DeepCopyMap has quadratic running time.") + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.LoadOrStore(i, i) + } + }, + }) +} + +func BenchmarkLoadOrStoreCollision(b *testing.B) { + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + m.LoadOrStore(0, 0) + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.LoadOrStore(0, 0) + } + }, + }) +} + +func BenchmarkLoadAndDeleteBalanced(b *testing.B) { + const hits, misses = 128, 128 + + benchMap(b, bench{ + setup: func(b *testing.B, m mapInterface) { + if _, ok := m.(*DeepCopyMap); ok { + b.Skip("DeepCopyMap has quadratic running time.") + } + for i := 0; i < hits; i++ { + m.LoadOrStore(i, i) + } + // Prime the map to get it into a steady state. + for i := 0; i < hits*2; i++ { + m.Load(i % hits) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + j := i % (hits + misses) + if j < hits { + m.LoadAndDelete(j) + } else { + m.LoadAndDelete(i) + } + } + }, + }) +} + +func BenchmarkLoadAndDeleteUnique(b *testing.B) { + benchMap(b, bench{ + setup: func(b *testing.B, m mapInterface) { + if _, ok := m.(*DeepCopyMap); ok { + b.Skip("DeepCopyMap has quadratic running time.") + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.LoadAndDelete(i) + } + }, + }) +} + +func BenchmarkLoadAndDeleteCollision(b *testing.B) { + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + m.LoadOrStore(0, 0) + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + if _, loaded := m.LoadAndDelete(0); loaded { + m.Store(0, 0) + } + } + }, + }) +} + +func BenchmarkRange(b *testing.B) { + const mapSize = 1 << 10 + + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + for i := 0; i < mapSize; i++ { + m.Store(i, i) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.Range(func(_, _ int) bool { return true }) + } + }, + }) +} + +// BenchmarkAdversarialAlloc tests performance when we store a new value +// immediately whenever the map is promoted to clean and otherwise load a +// unique, missing key. +// +// This forces the Load calls to always acquire the map's mutex. +func BenchmarkAdversarialAlloc(b *testing.B) { + benchMap(b, bench{ + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + var stores, loadsSinceStore int + for ; pb.Next(); i++ { + m.Load(i) + if loadsSinceStore++; loadsSinceStore > stores { + m.LoadOrStore(i, stores) + loadsSinceStore = 0 + stores++ + } + } + }, + }) +} + +// BenchmarkAdversarialDelete tests performance when we periodically delete +// one key and add a different one in a large map. +// +// This forces the Load calls to always acquire the map's mutex and periodically +// makes a full copy of the map despite changing only one entry. +func BenchmarkAdversarialDelete(b *testing.B) { + const mapSize = 1 << 10 + + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + for i := 0; i < mapSize; i++ { + m.Store(i, i) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.Load(i) + + if i%mapSize == 0 { + m.Range(func(k, _ int) bool { + m.Delete(k) + return false + }) + m.Store(i, i) + } + } + }, + }) +} + +func BenchmarkDeleteCollision(b *testing.B) { + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + m.LoadOrStore(0, 0) + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.Delete(0) + } + }, + }) +} + +func BenchmarkSwapCollision(b *testing.B) { + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + m.LoadOrStore(0, 0) + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.Swap(0, 0) + } + }, + }) +} + +func BenchmarkSwapMostlyHits(b *testing.B) { + const hits, misses = 1023, 1 + + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + for i := 0; i < hits; i++ { + m.LoadOrStore(i, i) + } + // Prime the map to get it into a steady state. + for i := 0; i < hits*2; i++ { + m.Load(i % hits) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + if i%(hits+misses) < hits { + v := i % (hits + misses) + m.Swap(v, v) + } else { + m.Swap(i, i) + m.Delete(i) + } + } + }, + }) +} + +func BenchmarkSwapMostlyMisses(b *testing.B) { + const hits, misses = 1, 1023 + + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + for i := 0; i < hits; i++ { + m.LoadOrStore(i, i) + } + // Prime the map to get it into a steady state. + for i := 0; i < hits*2; i++ { + m.Load(i % hits) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + if i%(hits+misses) < hits { + v := i % (hits + misses) + m.Swap(v, v) + } else { + m.Swap(i, i) + m.Delete(i) + } + } + }, + }) +} + +func BenchmarkCompareAndSwapCollision(b *testing.B) { + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + m.LoadOrStore(0, 0) + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for pb.Next() { + if m.CompareAndSwap(0, 0, 42) { + m.CompareAndSwap(0, 42, 0) + } + } + }, + }) +} + +func BenchmarkCompareAndSwapNoExistingKey(b *testing.B) { + benchMap(b, bench{ + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + if m.CompareAndSwap(i, 0, 0) { + m.Delete(i) + } + } + }, + }) +} + +func BenchmarkCompareAndSwapValueNotEqual(b *testing.B) { + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + m.Store(0, 0) + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.CompareAndSwap(0, 1, 2) + } + }, + }) +} + +func BenchmarkCompareAndSwapMostlyHits(b *testing.B) { + const hits, misses = 1023, 1 + + benchMap(b, bench{ + setup: func(b *testing.B, m mapInterface) { + if _, ok := m.(*DeepCopyMap); ok { + b.Skip("DeepCopyMap has quadratic running time.") + } + + for i := 0; i < hits; i++ { + m.LoadOrStore(i, i) + } + // Prime the map to get it into a steady state. + for i := 0; i < hits*2; i++ { + m.Load(i % hits) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + v := i + if i%(hits+misses) < hits { + v = i % (hits + misses) + } + m.CompareAndSwap(v, v, v) + } + }, + }) +} + +func BenchmarkCompareAndSwapMostlyMisses(b *testing.B) { + const hits, misses = 1, 1023 + + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + for i := 0; i < hits; i++ { + m.LoadOrStore(i, i) + } + // Prime the map to get it into a steady state. + for i := 0; i < hits*2; i++ { + m.Load(i % hits) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + v := i + if i%(hits+misses) < hits { + v = i % (hits + misses) + } + m.CompareAndSwap(v, v, v) + } + }, + }) +} + +func BenchmarkCompareAndDeleteCollision(b *testing.B) { + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + m.LoadOrStore(0, 0) + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + if m.CompareAndDelete(0, 0) { + m.Store(0, 0) + } + } + }, + }) +} + +func BenchmarkCompareAndDeleteMostlyHits(b *testing.B) { + const hits, misses = 1023, 1 + + benchMap(b, bench{ + setup: func(b *testing.B, m mapInterface) { + if _, ok := m.(*DeepCopyMap); ok { + b.Skip("DeepCopyMap has quadratic running time.") + } + + for i := 0; i < hits; i++ { + m.LoadOrStore(i, i) + } + // Prime the map to get it into a steady state. + for i := 0; i < hits*2; i++ { + m.Load(i % hits) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + v := i + if i%(hits+misses) < hits { + v = i % (hits + misses) + } + if m.CompareAndDelete(v, v) { + m.Store(v, v) + } + } + }, + }) +} + +func BenchmarkCompareAndDeleteMostlyMisses(b *testing.B) { + const hits, misses = 1, 1023 + + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + for i := 0; i < hits; i++ { + m.LoadOrStore(i, i) + } + // Prime the map to get it into a steady state. + for i := 0; i < hits*2; i++ { + m.Load(i % hits) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + v := i + if i%(hits+misses) < hits { + v = i % (hits + misses) + } + if m.CompareAndDelete(v, v) { + m.Store(v, v) + } + } + }, + }) +} + +// below is the code contains reference map implementations for beach. + +// mapInterface is the interface Map implements. + +type mapInterface interface { + Load(int) (int, bool) + Store(key, value int) + LoadOrStore(key, value int) (actual int, loaded bool) + LoadAndDelete(key int) (value int, loaded bool) + Delete(int) + Swap(key, value int) (previous int, loaded bool) + CompareAndSwap(key, old, new int) (swapped bool) + CompareAndDelete(key, old int) (deleted bool) + Range(func(key, value int) (shouldContinue bool)) +} + +var ( + _ mapInterface = &RWMutexMap{} + _ mapInterface = &DeepCopyMap{} +) + +// RWMutexMap is an implementation of mapInterface using a sync.RWMutex. + +type RWMutexMap struct { + mu sync.RWMutex + dirty map[int]int +} + +func (m *RWMutexMap) Load(key int) (value int, ok bool) { + m.mu.RLock() + value, ok = m.dirty[key] + m.mu.RUnlock() + return +} + +func (m *RWMutexMap) Store(key, value int) { + m.mu.Lock() + if m.dirty == nil { + m.dirty = make(map[int]int) + } + m.dirty[key] = value + m.mu.Unlock() +} + +func (m *RWMutexMap) LoadOrStore(key, value int) (actual int, loaded bool) { + m.mu.Lock() + actual, loaded = m.dirty[key] + if !loaded { + actual = value + if m.dirty == nil { + m.dirty = make(map[int]int) + } + m.dirty[key] = value + } + m.mu.Unlock() + return actual, loaded +} + +func (m *RWMutexMap) Swap(key, value int) (previous int, loaded bool) { + m.mu.Lock() + if m.dirty == nil { + m.dirty = make(map[int]int) + } + + previous, loaded = m.dirty[key] + m.dirty[key] = value + m.mu.Unlock() + return +} + +func (m *RWMutexMap) LoadAndDelete(key int) (value int, loaded bool) { + m.mu.Lock() + value, loaded = m.dirty[key] + if !loaded { + m.mu.Unlock() + return 0, false + } + delete(m.dirty, key) + m.mu.Unlock() + return value, loaded +} + +func (m *RWMutexMap) Delete(key int) { + m.mu.Lock() + delete(m.dirty, key) + m.mu.Unlock() +} + +func (m *RWMutexMap) CompareAndSwap(key, old, new int) (swapped bool) { + m.mu.Lock() + defer m.mu.Unlock() + if m.dirty == nil { + return false + } + + value, loaded := m.dirty[key] + if loaded && value == old { + m.dirty[key] = new + return true + } + return false +} + +func (m *RWMutexMap) CompareAndDelete(key, old int) (deleted bool) { + m.mu.Lock() + defer m.mu.Unlock() + if m.dirty == nil { + return false + } + + value, loaded := m.dirty[key] + if loaded && value == old { + delete(m.dirty, key) + return true + } + return false +} + +func (m *RWMutexMap) Range(f func(key, value int) (shouldContinue bool)) { + m.mu.RLock() + keys := make([]int, 0, len(m.dirty)) + for k := range m.dirty { + keys = append(keys, k) + } + m.mu.RUnlock() + + for _, k := range keys { + v, ok := m.Load(k) + if !ok { + continue + } + if !f(k, v) { + break + } + } +} + +// DeepCopyMap is an implementation of mapInterface using a Mutex and +// atomic.Value. It makes deep copies of the map on every write to avoid +// acquiring the Mutex in Load. + +type DeepCopyMap struct { + mu sync.Mutex + clean atomic.Value +} + +func (m *DeepCopyMap) Load(key int) (value int, ok bool) { + clean, _ := m.clean.Load().(map[int]int) + value, ok = clean[key] + return value, ok +} + +func (m *DeepCopyMap) Store(key, value int) { + m.mu.Lock() + dirty := m.dirty() + dirty[key] = value + m.clean.Store(dirty) + m.mu.Unlock() +} + +func (m *DeepCopyMap) LoadOrStore(key, value int) (actual int, loaded bool) { + clean, _ := m.clean.Load().(map[int]int) + actual, loaded = clean[key] + if loaded { + return actual, loaded + } + + m.mu.Lock() + // Reload clean in case it changed while we were waiting on m.mu. + clean, _ = m.clean.Load().(map[int]int) + actual, loaded = clean[key] + if !loaded { + dirty := m.dirty() + dirty[key] = value + actual = value + m.clean.Store(dirty) + } + m.mu.Unlock() + return actual, loaded +} + +func (m *DeepCopyMap) Swap(key, value int) (previous int, loaded bool) { + m.mu.Lock() + dirty := m.dirty() + previous, loaded = dirty[key] + dirty[key] = value + m.clean.Store(dirty) + m.mu.Unlock() + return +} + +func (m *DeepCopyMap) LoadAndDelete(key int) (value int, loaded bool) { + m.mu.Lock() + dirty := m.dirty() + value, loaded = dirty[key] + delete(dirty, key) + m.clean.Store(dirty) + m.mu.Unlock() + return +} + +func (m *DeepCopyMap) Delete(key int) { + m.mu.Lock() + dirty := m.dirty() + delete(dirty, key) + m.clean.Store(dirty) + m.mu.Unlock() +} + +func (m *DeepCopyMap) CompareAndSwap(key, old, new int) (swapped bool) { + clean, _ := m.clean.Load().(map[int]int) + if previous, ok := clean[key]; !ok || previous != old { + return false + } + + m.mu.Lock() + defer m.mu.Unlock() + dirty := m.dirty() + value, loaded := dirty[key] + if loaded && value == old { + dirty[key] = new + m.clean.Store(dirty) + return true + } + return false +} + +func (m *DeepCopyMap) CompareAndDelete(key, old int) (deleted bool) { + clean, _ := m.clean.Load().(map[int]int) + if previous, ok := clean[key]; !ok || previous != old { + return false + } + + m.mu.Lock() + defer m.mu.Unlock() + + dirty := m.dirty() + value, loaded := dirty[key] + if loaded && value == old { + delete(dirty, key) + m.clean.Store(dirty) + return true + } + return false +} + +func (m *DeepCopyMap) Range(f func(key, value int) (shouldContinue bool)) { + clean, _ := m.clean.Load().(map[int]int) + for k, v := range clean { + if !f(k, v) { + break + } + } +} + +func (m *DeepCopyMap) dirty() map[int]int { + clean, _ := m.clean.Load().(map[int]int) + dirty := make(map[int]int, len(clean)+1) + for k, v := range clean { + dirty[k] = v + } + return dirty +} + +// sync.Map + +type SyncMap[K comparable, V any] struct { + dirty sync.Map +} + +func (m *SyncMap[K, V]) Load(key K) (value V, ok bool) { + v, ok := m.dirty.Load(key) + if !ok { + var v V + return v, false + } + + return v.(V), true +} + +func (m *SyncMap[K, V]) Store(key, value int) { + m.dirty.Store(key, value) +} + +func (m *SyncMap[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) { + v, loaded := m.dirty.LoadOrStore(key, value) + + return v.(V), loaded +} + +func (m *SyncMap[K, V]) Swap(key K, value V) (previous V, loaded bool) { + v, loaded := m.dirty.Swap(key, value) + + if !loaded { + var v V + return v, false + } + + return v.(V), true +} + +func (m *SyncMap[K, V]) LoadAndDelete(key K) (value V, loaded bool) { + v, loaded := m.dirty.LoadAndDelete(key) + if !loaded { + var v V + return v, false + } + + return v.(V), true +} + +func (m *SyncMap[K, V]) Delete(key K) { + m.dirty.Delete(key) +} + +func (m *SyncMap[K, V]) CompareAndSwap(key K, old, new V) (swapped bool) { + return m.dirty.CompareAndSwap(key, old, new) +} + +func (m *SyncMap[K, V]) CompareAndDelete(key K, old V) (deleted bool) { + return m.dirty.CompareAndDelete(key, old) +} + +func (m *SyncMap[K, V]) Range(f func(key K, value V) (shouldContinue bool)) { + m.dirty.Range(func(key, value any) bool { + return f(key.(K), value.(V)) + }) +} diff --git a/gogenerics/gconcurrent/sync/map_test.go b/gogenerics/gconcurrent/sync/map_test.go new file mode 100644 index 0000000..3ef0bbe --- /dev/null +++ b/gogenerics/gconcurrent/sync/map_test.go @@ -0,0 +1,157 @@ +package sync + +import ( + "reflect" + "runtime" + "sync" + "sync/atomic" + "testing" + + "github.com/miniLCT/gosb/hack/fastrand" +) + +func TestConcurrentRange(t *testing.T) { + const mapSize = 1 << 10 + + m := Map[int64, int64]{} + for n := int64(1); n <= mapSize; n++ { + m.Store(n, int64(n)) + } + + done := make(chan struct{}) + var wg sync.WaitGroup + defer func() { + close(done) + wg.Wait() + }() + for g := int64(runtime.GOMAXPROCS(0)); g > 0; g-- { + wg.Add(1) + go func(g int64) { + defer wg.Done() + for i := int64(0); ; i++ { + select { + case <-done: + return + default: + } + for n := int64(1); n < mapSize; n++ { + if fastrand.Int63n(mapSize) == 0 { + m.Store(n, n*i*g) + } else { + m.Load(n) + } + } + } + }(g) + } + + iters := 1 << 10 + if testing.Short() { + iters = 16 + } + for n := iters; n > 0; n-- { + seen := make(map[int64]bool, mapSize) + + m.Range(func(k, v int64) bool { + if v%k != 0 { + t.Fatalf("while Storing multiples of %v, Range saw value %v", k, v) + } + if seen[k] { + t.Fatalf("Range visited key %v twice", k) + } + seen[k] = true + return true + }) + + if len(seen) != mapSize { + t.Fatalf("Range visited %v elements of %v-element Map", len(seen), mapSize) + } + } +} + +func TestIssue40999(t *testing.T) { + var m sync.Map + + // Since the miss-counting in missLocked (via Delete) + // compares the miss count with len(m.dirty), + // add an initial entry to bias len(m.dirty) above the miss count. + m.Store(nil, struct{}{}) + + var finalized uint32 + + // Set finalizers that count for collected keys. A non-zero count + // indicates that keys have not been leaked. + for atomic.LoadUint32(&finalized) == 0 { + p := new(int) + runtime.SetFinalizer(p, func(*int) { + atomic.AddUint32(&finalized, 1) + }) + m.Store(p, struct{}{}) + m.Delete(p) + runtime.GC() + } +} + +func TestMapRangeNestedCall(t *testing.T) { // Issue 46399 + var m sync.Map + for i, v := range [3]string{"hello", "world", "Go"} { + m.Store(i, v) + } + m.Range(func(key, value any) bool { + m.Range(func(key, value any) bool { + // We should be able to load the key offered in the Range callback, + // because there are no concurrent Delete involved in this tested map. + if v, ok := m.Load(key); !ok || !reflect.DeepEqual(v, value) { + t.Fatalf("Nested Range loads unexpected value, got %+v want %+v", v, value) + } + + // We didn't keep 42 and a value into the map before, if somehow we loaded + // a value from such a key, meaning there must be an internal bug regarding + // nested range in the Map. + if _, loaded := m.LoadOrStore(42, "dummy"); loaded { + t.Fatalf("Nested Range loads unexpected value, want store a new value") + } + + // Try to Store then LoadAndDelete the corresponding value with the key + // 42 to the Map. In this case, the key 42 and associated value should be + // removed from the Map. Therefore any future range won't observe key 42 + // as we checked in above. + val := "sync.Map" + m.Store(42, val) + if v, loaded := m.LoadAndDelete(42); !loaded || !reflect.DeepEqual(v, val) { + t.Fatalf("Nested Range loads unexpected value, got %v, want %v", v, val) + } + return true + }) + + // Remove key from Map on-the-fly. + m.Delete(key) + return true + }) + + // After a Range of Delete, all keys should be removed and any + // further Range won't invoke the callback. Hence length remains 0. + length := 0 + m.Range(func(key, value any) bool { + length++ + return true + }) + + if length != 0 { + t.Fatalf("Unexpected sync.Map size, got %v want %v", length, 0) + } +} + +func TestCompareAndSwap_NonExistingKey(t *testing.T) { + m := &sync.Map{} + if m.CompareAndSwap(m, nil, 42) { + // See https://go.dev/issue/51972#issuecomment-1126408637. + t.Fatalf("CompareAndSwap on an non-existing key succeeded") + } + + mm := &Map[string, any]{} + if mm.CompareAndSwap("", nil, "foo") { + // See https://go.dev/issue/51972#issuecomment-1126408637. + t.Fatalf("CompareAndSwap on an non-existing key succeeded") + } +} diff --git a/gds/grbt/rbt.go b/gogenerics/gds/grbt/rbt.go similarity index 100% rename from gds/grbt/rbt.go rename to gogenerics/gds/grbt/rbt.go diff --git a/gds/grbt/rbt_test.go b/gogenerics/gds/grbt/rbt_test.go similarity index 100% rename from gds/grbt/rbt_test.go rename to gogenerics/gds/grbt/rbt_test.go diff --git a/gds/gskiplist/README.md b/gogenerics/gds/gskiplist/README.md similarity index 74% rename from gds/gskiplist/README.md rename to gogenerics/gds/gskiplist/README.md index b8abf9b..8df0e1c 100644 --- a/gds/gskiplist/README.md +++ b/gogenerics/gds/gskiplist/README.md @@ -28,29 +28,28 @@ $ go test -run=NOTEST -bench=. -count=1 -timeout=10m goos: darwin goarch: amd64 -pkg: icode.baidu.com/baidu/passport/gogenerics/gds/gskiplist +pkg: github.com/miniLCT/gosb/gds/gskiplist cpu: VirtualApple @ 2.50GHz -BenchmarkLoadMostlyHits/*gskiplist.DeepCopyMap-8 330280492 3.658 ns/op -BenchmarkLoadMostlyHits/*gskiplist.RWMutexMap-8 13389918 89.32 ns/op -BenchmarkLoadMostlyHits/*gskiplist.SyncMap[int,int]-8 159714217 7.327 ns/op -BenchmarkLoadMostlyHits/*gskiplist.SkipList[int,int]-8 8307650 145.4 ns/op -BenchmarkLoadMostlyMisses/*gskiplist.DeepCopyMap-8 615504688 1.844 ns/op -BenchmarkLoadMostlyMisses/*gskiplist.RWMutexMap-8 14186954 81.36 ns/op -BenchmarkLoadMostlyMisses/*gskiplist.SyncMap[int,int]-8 364721040 3.372 ns/op -BenchmarkLoadMostlyMisses/*gskiplist.SkipList[int,int]-8 874448346 1.482 ns/op -BenchmarkLoadOrStoreUnique/*gskiplist.RWMutexMap-8 3651057 339.9 ns/op -BenchmarkLoadOrStoreUnique/*gskiplist.SyncMap[int,int]-8 1000000 1265 ns/op -BenchmarkLoadOrStoreUnique/*gskiplist.SkipList[int,int]-8 238684 284895 ns/op -BenchmarkLoadOrStoreCollision/*gskiplist.DeepCopyMap-8 4190437 287.9 ns/op -BenchmarkLoadOrStoreCollision/*gskiplist.RWMutexMap-8 8657164 138.2 ns/op -BenchmarkLoadOrStoreCollision/*gskiplist.SyncMap[int,int]-8 7297562 163.6 ns/op -BenchmarkLoadOrStoreCollision/*gskiplist.SkipList[int,int]-8 64378400 18.23 ns/op -BenchmarkAdversarialAlloc/*gskiplist.DeepCopyMap-8 3414529 347.9 ns/op -BenchmarkAdversarialAlloc/*gskiplist.RWMutexMap-8 23436660 73.72 ns/op -BenchmarkAdversarialAlloc/*gskiplist.SyncMap[int,int]-8 6630920 218.9 ns/op -BenchmarkAdversarialAlloc/*gskiplist.SkipList[int,int]-8 2665887 1011 ns/op -BenchmarkDeleteCollision/*gskiplist.DeepCopyMap-8 8580079 143.4 ns/op -BenchmarkDeleteCollision/*gskiplist.RWMutexMap-8 10476196 115.5 ns/op -BenchmarkDeleteCollision/*gskiplist.SyncMap[int,int]-8 438461808 2.922 ns/op -BenchmarkDeleteCollision/*gskiplist.SkipList[int,int]-8 1000000000 0.9350 ns/op -``` +BenchmarkLoadMostlyHits/*gskiplist.DeepCopyMap-8 339300584 3.648 ns/op +BenchmarkLoadMostlyHits/*gskiplist.RWMutexMap-8 15707191 82.62 ns/op +BenchmarkLoadMostlyHits/*gskiplist.SyncMap[int,int]-8 148558563 8.203 ns/op +BenchmarkLoadMostlyHits/*gskiplist.SkipList[int,int]-8 8557464 144.6 ns/op +BenchmarkLoadMostlyMisses/*gskiplist.DeepCopyMap-8 691986650 1.965 ns/op +BenchmarkLoadMostlyMisses/*gskiplist.RWMutexMap-8 16790496 89.97 ns/op +BenchmarkLoadMostlyMisses/*gskiplist.SyncMap[int,int]-8 252446268 4.683 ns/op +BenchmarkLoadMostlyMisses/*gskiplist.SkipList[int,int]-8 814085949 1.394 ns/op +BenchmarkLoadOrStoreUnique/*gskiplist.RWMutexMap-8 3293228 347.4 ns/op +BenchmarkLoadOrStoreUnique/*gskiplist.SyncMap[int,int]-8 1890684 782.6 ns/op +BenchmarkLoadOrStoreCollision/*gskiplist.DeepCopyMap-8 3802148 326.7 ns/op +BenchmarkLoadOrStoreCollision/*gskiplist.RWMutexMap-8 8502034 140.1 ns/op +BenchmarkLoadOrStoreCollision/*gskiplist.SyncMap[int,int]-8 7065519 169.8 ns/op +BenchmarkLoadOrStoreCollision/*gskiplist.SkipList[int,int]-8 64089868 18.39 ns/op +BenchmarkAdversarialAlloc/*gskiplist.DeepCopyMap-8 2865942 393.4 ns/op +BenchmarkAdversarialAlloc/*gskiplist.RWMutexMap-8 22905480 74.07 ns/op +BenchmarkAdversarialAlloc/*gskiplist.SyncMap[int,int]-8 6309598 212.4 ns/op +BenchmarkAdversarialAlloc/*gskiplist.SkipList[int,int]-8 2038196 932.5 ns/op +BenchmarkDeleteCollision/*gskiplist.DeepCopyMap-8 6336438 180.4 ns/op +BenchmarkDeleteCollision/*gskiplist.RWMutexMap-8 10384668 117.9 ns/op +BenchmarkDeleteCollision/*gskiplist.SyncMap[int,int]-8 267763987 4.468 ns/op +BenchmarkDeleteCollision/*gskiplist.SkipList[int,int]-8 779491996 1.544 ns/op +``` \ No newline at end of file diff --git a/gds/gskiplist/skiplist.go b/gogenerics/gds/gskiplist/skiplist.go similarity index 97% rename from gds/gskiplist/skiplist.go rename to gogenerics/gds/gskiplist/skiplist.go index c27dea1..205cba3 100644 --- a/gds/gskiplist/skiplist.go +++ b/gogenerics/gds/gskiplist/skiplist.go @@ -4,9 +4,8 @@ import ( "fmt" "math/bits" - "icode.baidu.com/baidu/passport/gogenerics/constraints" - "icode.baidu.com/baidu/passport/gogenerics/gtype" - "icode.baidu.com/baidu/passport/gogenerics/internal/fastrand" + "github.com/miniLCT/gosb/gogenerics/constraints" + "github.com/miniLCT/gosb/hack/fastrand" ) const ( @@ -144,13 +143,13 @@ func (t *SkipList[K, V]) findExtended(key K, findGreaterOrEqual bool) (*SkipList // Find Expected Complexity O(log(n)) func (t *SkipList[K, V]) Find(key K) (V, bool) { if t == nil { - return gtype.Empty[V](), false + return constraints.Empty[V](), false } if elem, ok := t.findExtended(key, false); ok { return elem.value, true } - return gtype.Empty[V](), false + return constraints.Empty[V](), false } // FindGreaterOrEqual finds the first element, that is greater or equal to the given ListElement e. @@ -225,7 +224,7 @@ func (t *SkipList[K, V]) Delete(key K) { // Insert inserts the given ListElement into the skiplist. // Insert Expected Complexity O(log(n)). // -//gocyclo:ignore +// gocyclo:ignore func (t *SkipList[K, V]) Insert(key K, e V) { if t == nil { return @@ -349,7 +348,7 @@ func (t *SkipList[K, V]) Insert(key K, e V) { // GetValue extracts the ListElement value from a skiplist node. func (e *SkipListElement[K, V]) GetValue() V { if e == nil { - return gtype.Empty[V]() + return constraints.Empty[V]() } return e.value } diff --git a/gds/gskiplist/skiplist_beach_test.go b/gogenerics/gds/gskiplist/skiplist_beach_test.go similarity index 100% rename from gds/gskiplist/skiplist_beach_test.go rename to gogenerics/gds/gskiplist/skiplist_beach_test.go diff --git a/gds/gskiplist/skiplist_fuzz_test.go b/gogenerics/gds/gskiplist/skiplist_fuzz_test.go similarity index 100% rename from gds/gskiplist/skiplist_fuzz_test.go rename to gogenerics/gds/gskiplist/skiplist_fuzz_test.go diff --git a/gds/gskiplist/skiplist_test.go b/gogenerics/gds/gskiplist/skiplist_test.go similarity index 98% rename from gds/gskiplist/skiplist_test.go rename to gogenerics/gds/gskiplist/skiplist_test.go index 78976a7..aed5b64 100644 --- a/gds/gskiplist/skiplist_test.go +++ b/gogenerics/gds/gskiplist/skiplist_test.go @@ -6,7 +6,7 @@ import ( "github.com/stretchr/testify/assert" - "icode.baidu.com/baidu/passport/gogenerics/internal/fastrand" + "github.com/miniLCT/gosb/hack/fastrand" ) const (