From 32d748d642f09b76afa1ce3ea50f5bb14b520b9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Sun, 15 Nov 2020 13:17:25 +0000 Subject: [PATCH 1/2] introduce GetFn and PeekFn methods, for zero-copy access. This commit introduces two methods GetFn and PeekFn. These methods provide zero-copy access to cache entries, avoiding allocs unless absolutely necessary. They accept a function, and call it with a slice over the current underlying value of the key in memory. The slice is constrained in length and capacity. The RingBuf struct has been augmented with a Slice() method to support these operations. The only case where these variants allocate is when the value wraps around the ring buffer. --- cache.go | 36 ++++++++++++++++++++++++++++++ cache_test.go | 37 +++++++++++++++++++++++++++++- ringbuf.go | 30 +++++++++++++++++++++++++ segment.go | 62 +++++++++++++++++++++++++++++++++++---------------- 4 files changed, 145 insertions(+), 20 deletions(-) diff --git a/cache.go b/cache.go index bfcb846..df76d50 100644 --- a/cache.go +++ b/cache.go @@ -84,6 +84,24 @@ func (cache *Cache) Get(key []byte) (value []byte, err error) { return } +// GetFn is equivalent to Get or GetWithBuf, but it attempts to be zero-copy, +// calling the provided function with slice view over the current underlying +// value of the key in memory. The slice is constrained in length and capacity. +// +// In moth cases, this method will not alloc a byte buffer. The only exception +// is when the value wraps around the underlying segment ring buffer. +// +// The method will return ErrNotFound is there's a miss, and the function will +// not be called. Errors returned by the function will be propagated. +func (cache *Cache) GetFn(key []byte, fn func([]byte) error) (err error) { + hashVal := hashFunc(key) + segID := hashVal & segmentAndOpVal + cache.locks[segID].Lock() + err = cache.segments[segID].view(key, fn, hashVal, false) + cache.locks[segID].Unlock() + return +} + // GetOrSet returns existing value or if record doesn't exist // it sets a new key, value and expiration for a cache entry and stores it in the cache, returns nil in that case func (cache *Cache) GetOrSet(key, value []byte, expireSeconds int) (retValue []byte, err error) { @@ -109,6 +127,24 @@ func (cache *Cache) Peek(key []byte) (value []byte, err error) { return } +// PeekFn is equivalent to Peek, but it attempts to be zero-copy, calling the +// provided function with slice view over the current underlying value of the +// key in memory. The slice is constrained in length and capacity. +// +// In moth cases, this method will not alloc a byte buffer. The only exception +// is when the value wraps around the underlying segment ring buffer. +// +// The method will return ErrNotFound is there's a miss, and the function will +// not be called. Errors returned by the function will be propagated. +func (cache *Cache) PeekFn(key []byte, fn func([]byte) error) (err error) { + hashVal := hashFunc(key) + segID := hashVal & segmentAndOpVal + cache.locks[segID].Lock() + err = cache.segments[segID].view(key, fn, hashVal, true) + cache.locks[segID].Unlock() + return +} + // GetWithBuf copies the value to the buf or returns not found error. // This method doesn't allocate memory when the capacity of buf is greater or equal to value. func (cache *Cache) GetWithBuf(key, buf []byte) (value []byte, err error) { diff --git a/cache_test.go b/cache_test.go index 0651efa..c12eebf 100644 --- a/cache_test.go +++ b/cache_test.go @@ -83,6 +83,12 @@ func TestFreeCache(t *testing.T) { t.Errorf("value is %v, expected %v", string(value), expectedValStr) } } + err = cache.GetFn([]byte(keyStr), func(val []byte) error { + if string(val) != expectedValStr { + t.Errorf("getfn: value is %v, expected %v", string(val), expectedValStr) + } + return nil + }) } t.Logf("hit rate is %v, evacuates %v, entries %v, average time %v, expire count %v\n", @@ -156,6 +162,15 @@ func TestGetOrSet(t *testing.T) { if err != nil || string(r) != "efgh" { t.Errorf("Expected to get old record, got: value=%v, err=%v", string(r), err) } + err = cache.GetFn(key, func(val []byte) error { + if string(val) != "efgh" { + t.Errorf("getfn: Expected to get old record, got: value=%v, err=%v", string(r), err) + } + return nil + }) + if err != nil { + t.Errorf("did not expect error from GetFn, got: %s", err) + } } func TestGetWithExpiration(t *testing.T) { @@ -663,7 +678,7 @@ func BenchmarkCacheGet(b *testing.B) { } } -func BenchmarkParallelCacheGet(b *testing.B) { +func BenchmarkCacheGetFn(b *testing.B) { b.ReportAllocs() b.StopTimer() cache := NewCache(256 * 1024 * 1024) @@ -674,7 +689,27 @@ func BenchmarkParallelCacheGet(b *testing.B) { cache.Set(key[:], buf, 0) } b.StartTimer() + for i := 0; i < b.N; i++ { + binary.LittleEndian.PutUint64(key[:], uint64(i)) + _ = cache.GetFn(key[:], func(val []byte) error { + _ = val + return nil + }) + } + b.Logf("b.N: %d; hit rate: %f", b.N, cache.HitRate()) +} +func BenchmarkParallelCacheGet(b *testing.B) { + b.ReportAllocs() + b.StopTimer() + cache := NewCache(256 * 1024 * 1024) + buf := make([]byte, 64) + var key [8]byte + for i := 0; i < b.N; i++ { + binary.LittleEndian.PutUint64(key[:], uint64(i)) + cache.Set(key[:], buf, 0) + } + b.StartTimer() b.RunParallel(func(pb *testing.PB) { counter := 0 b.ReportAllocs() diff --git a/ringbuf.go b/ringbuf.go index 6da31a2..227f527 100644 --- a/ringbuf.go +++ b/ringbuf.go @@ -80,6 +80,36 @@ func (rb *RingBuf) ReadAt(p []byte, off int64) (n int, err error) { return } +// Slice returns a slice of the supplied range of the ring buffer. It will +// not alloc unless the requested range wraps the ring buffer. +func (rb *RingBuf) Slice(off, length int64) ([]byte, error) { + if off > rb.end || off < rb.begin { + return nil, ErrOutOfRange + } + var readOff int + if rb.end-rb.begin < int64(len(rb.data)) { + readOff = int(off - rb.begin) + } else { + readOff = rb.index + int(off-rb.begin) + } + if readOff >= len(rb.data) { + readOff -= len(rb.data) + } + readEnd := readOff + int(length) + if readEnd <= len(rb.data) { + return rb.data[readOff:readEnd:readEnd], nil + } + buf := make([]byte, length) + n := copy(buf, rb.data[readOff:]) + if n < int(length) { + n += copy(buf[n:], rb.data[:readEnd-len(rb.data)]) + } + if n < int(length) { + return nil, io.EOF + } + return buf, nil +} + func (rb *RingBuf) Write(p []byte) (n int, err error) { if len(p) > len(rb.data) { err = ErrOutOfRange diff --git a/segment.go b/segment.go index c7a56ec..be113f9 100644 --- a/segment.go +++ b/segment.go @@ -83,12 +83,11 @@ func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (e slotId := uint8(hashVal >> 8) hash16 := uint16(hashVal >> 16) + slot := seg.getSlot(slotId) + idx, match := seg.lookup(slot, hash16, key) var hdrBuf [ENTRY_HDR_SIZE]byte hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0])) - - slot := seg.getSlot(slotId) - idx, match := seg.lookup(slot, hash16, key) if match { matchedPtr := &slot[idx] seg.rb.ReadAt(hdrBuf[:], matchedPtr.offset) @@ -158,7 +157,6 @@ func (seg *segment) touch(key []byte, hashVal uint64, expireSeconds int) (err er slotId := uint8(hashVal >> 8) hash16 := uint16(hashVal >> 16) - slot := seg.getSlot(slotId) idx, match := seg.lookup(slot, hash16, key) if !match { @@ -238,6 +236,44 @@ func (seg *segment) evacuate(entryLen int64, slotId uint8, now uint32) (slotModi } func (seg *segment) get(key, buf []byte, hashVal uint64, peek bool) (value []byte, expireAt uint32, err error) { + hdr, ptr, err := seg.locate(key, hashVal, peek) + if err != nil { + return + } + expireAt = hdr.expireAt + if cap(buf) >= int(hdr.valLen) { + value = buf[:hdr.valLen] + } else { + value = make([]byte, hdr.valLen) + } + + seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen)) + if !peek { + atomic.AddInt64(&seg.hitCount, 1) + } + return +} + +// view provides zero-copy access to the element's value, without copying to +// an intermediate buffer. +func (seg *segment) view(key []byte, fn func([]byte) error, hashVal uint64, peek bool) (err error) { + hdr, ptr, err := seg.locate(key, hashVal, peek) + if err != nil { + return + } + start := ptr.offset + ENTRY_HDR_SIZE + int64(hdr.keyLen) + val, err := seg.rb.Slice(start, int64(hdr.valLen)) + if err != nil { + return err + } + err = fn(val) + if !peek { + atomic.AddInt64(&seg.hitCount, 1) + } + return +} + +func (seg *segment) locate(key []byte, hashVal uint64, peek bool) (hdr *entryHdr, ptr *entryPtr, err error) { slotId := uint8(hashVal >> 8) hash16 := uint16(hashVal >> 16) slot := seg.getSlot(slotId) @@ -249,15 +285,13 @@ func (seg *segment) get(key, buf []byte, hashVal uint64, peek bool) (value []byt } return } - ptr := &slot[idx] + ptr = &slot[idx] var hdrBuf [ENTRY_HDR_SIZE]byte seg.rb.ReadAt(hdrBuf[:], ptr.offset) - hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0])) + hdr = (*entryHdr)(unsafe.Pointer(&hdrBuf[0])) if !peek { now := seg.timer.Now() - expireAt = hdr.expireAt - if hdr.expireAt != 0 && hdr.expireAt <= now { seg.delEntryPtr(slotId, slot, idx) atomic.AddInt64(&seg.totalExpired, 1) @@ -269,17 +303,7 @@ func (seg *segment) get(key, buf []byte, hashVal uint64, peek bool) (value []byt hdr.accessTime = now seg.rb.WriteAt(hdrBuf[:], ptr.offset) } - if cap(buf) >= int(hdr.valLen) { - value = buf[:hdr.valLen] - } else { - value = make([]byte, hdr.valLen) - } - - seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen)) - if !peek { - atomic.AddInt64(&seg.hitCount, 1) - } - return + return hdr, ptr, err } func (seg *segment) del(key []byte, hashVal uint64) (affected bool) { From ef5a04d39eefb0d2d0f332eb826085ba360fefd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Sun, 15 Nov 2020 16:57:36 +0000 Subject: [PATCH 2/2] more realistic benchmarks. Fixes https://github.com/coocood/freecache/issues/91. --- cache_test.go | 140 +++++++++++++++++++++++++------------------------- 1 file changed, 71 insertions(+), 69 deletions(-) diff --git a/cache_test.go b/cache_test.go index c12eebf..ad953e8 100644 --- a/cache_test.go +++ b/cache_test.go @@ -631,6 +631,7 @@ func TestEvacuateCount(t *testing.T) { func BenchmarkCacheSet(b *testing.B) { cache := NewCache(256 * 1024 * 1024) var key [8]byte + b.ReportAllocs() for i := 0; i < b.N; i++ { binary.LittleEndian.PutUint64(key[:], uint64(i)) cache.Set(key[:], make([]byte, 8), 0) @@ -655,6 +656,7 @@ func BenchmarkParallelCacheSet(b *testing.B) { func BenchmarkMapSet(b *testing.B) { m := make(map[string][]byte) var key [8]byte + b.ReportAllocs() for i := 0; i < b.N; i++ { binary.LittleEndian.PutUint64(key[:], uint64(i)) m[string(key[:])] = make([]byte, 8) @@ -662,121 +664,120 @@ func BenchmarkMapSet(b *testing.B) { } func BenchmarkCacheGet(b *testing.B) { + cache, count := populateCache() + + b.ResetTimer() b.ReportAllocs() - b.StopTimer() - cache := NewCache(256 * 1024 * 1024) + var key [8]byte - buf := make([]byte, 64) for i := 0; i < b.N; i++ { - binary.LittleEndian.PutUint64(key[:], uint64(i)) - cache.Set(key[:], buf, 0) + binary.LittleEndian.PutUint64(key[:], uint64(i%count)) + _, _ = cache.Get(key[:]) } - b.StartTimer() - for i := 0; i < b.N; i++ { + b.Logf("b.N: %d; hit rate: %f", b.N, cache.HitRate()) +} + +func populateCache() (*Cache, int) { + var ( + cache = NewCache(256 * 1024 * 1024) + buf = make([]byte, 64) + key [8]byte + ) + + // number of entries that can fit with the above parameters before an + // eviction is needed, with the standard hash function and sequential + // uint64 keys. + const maxEntries = 2739652 + for i := 0; i < maxEntries; i++ { binary.LittleEndian.PutUint64(key[:], uint64(i)) - cache.Get(key[:]) + _ = cache.Set(key[:], buf, 0) } + return cache, int(cache.EntryCount()) } func BenchmarkCacheGetFn(b *testing.B) { + cache, count := populateCache() + + b.ResetTimer() b.ReportAllocs() - b.StopTimer() - cache := NewCache(256 * 1024 * 1024) - var key [8]byte - buf := make([]byte, 64) - for i := 0; i < b.N; i++ { - binary.LittleEndian.PutUint64(key[:], uint64(i)) - cache.Set(key[:], buf, 0) + + fn := func(val []byte) error { + _ = val + return nil } - b.StartTimer() + + var key [8]byte for i := 0; i < b.N; i++ { - binary.LittleEndian.PutUint64(key[:], uint64(i)) - _ = cache.GetFn(key[:], func(val []byte) error { - _ = val - return nil - }) + binary.LittleEndian.PutUint64(key[:], uint64(i%count)) + _ = cache.GetFn(key[:], fn) } b.Logf("b.N: %d; hit rate: %f", b.N, cache.HitRate()) } func BenchmarkParallelCacheGet(b *testing.B) { + cache, count := populateCache() + + b.ResetTimer() b.ReportAllocs() - b.StopTimer() - cache := NewCache(256 * 1024 * 1024) - buf := make([]byte, 64) - var key [8]byte - for i := 0; i < b.N; i++ { - binary.LittleEndian.PutUint64(key[:], uint64(i)) - cache.Set(key[:], buf, 0) - } - b.StartTimer() + b.RunParallel(func(pb *testing.PB) { - counter := 0 - b.ReportAllocs() - for pb.Next() { - binary.LittleEndian.PutUint64(key[:], uint64(counter)) - cache.Get(key[:]) - counter = counter + 1 + var key [8]byte + for i := 0; pb.Next(); i++ { + binary.LittleEndian.PutUint64(key[:], uint64(i%count)) + _, _ = cache.Get(key[:]) } }) + b.Logf("b.N: %d; hit rate: %f", b.N, cache.HitRate()) } func BenchmarkCacheGetWithBuf(b *testing.B) { + cache, count := populateCache() + + b.ResetTimer() b.ReportAllocs() - b.StopTimer() - cache := NewCache(256 * 1024 * 1024) + var key [8]byte buf := make([]byte, 64) for i := 0; i < b.N; i++ { - binary.LittleEndian.PutUint64(key[:], uint64(i)) - cache.Set(key[:], buf, 0) - } - b.StartTimer() - for i := 0; i < b.N; i++ { - binary.LittleEndian.PutUint64(key[:], uint64(i)) - cache.GetWithBuf(key[:], buf) + binary.LittleEndian.PutUint64(key[:], uint64(i%count)) + _, _ = cache.GetWithBuf(key[:], buf) } + b.Logf("b.N: %d; hit rate: %f", b.N, cache.HitRate()) } func BenchmarkParallelCacheGetWithBuf(b *testing.B) { + cache, count := populateCache() + + b.ResetTimer() b.ReportAllocs() - b.StopTimer() - cache := NewCache(256 * 1024 * 1024) - var key [8]byte - buf := make([]byte, 64) - for i := 0; i < b.N; i++ { - binary.LittleEndian.PutUint64(key[:], uint64(i)) - cache.Set(key[:], buf, 0) - } - b.StartTimer() b.RunParallel(func(pb *testing.PB) { - counter := 0 - b.ReportAllocs() - for pb.Next() { - binary.LittleEndian.PutUint64(key[:], uint64(counter)) - cache.GetWithBuf(key[:], buf) - counter = counter + 1 + var key [8]byte + buf := make([]byte, 64) + for i := 0; pb.Next(); i++ { + binary.LittleEndian.PutUint64(key[:], uint64(i%count)) + _, _ = cache.GetWithBuf(key[:], buf) } }) + b.Logf("b.N: %d; hit rate: %f", b.N, cache.HitRate()) } func BenchmarkCacheGetWithExpiration(b *testing.B) { - b.StopTimer() - cache := NewCache(256 * 1024 * 1024) + cache, count := populateCache() + + b.ResetTimer() + b.ReportAllocs() + var key [8]byte for i := 0; i < b.N; i++ { - binary.LittleEndian.PutUint64(key[:], uint64(i)) - cache.Set(key[:], make([]byte, 8), 0) - } - b.StartTimer() - for i := 0; i < b.N; i++ { - binary.LittleEndian.PutUint64(key[:], uint64(i)) - cache.GetWithExpiration(key[:]) + binary.LittleEndian.PutUint64(key[:], uint64(i%count)) + _, _, _ = cache.GetWithExpiration(key[:]) } + b.Logf("b.N: %d; hit rate: %f", b.N, cache.HitRate()) } func BenchmarkMapGet(b *testing.B) { + b.ReportAllocs() b.StopTimer() m := make(map[string][]byte) var key [8]byte @@ -799,6 +800,7 @@ func BenchmarkHashFunc(b *testing.B) { rand.Read(key) b.ResetTimer() + b.ReportAllocs() for i := 0; i < b.N; i++ { hashFunc(key) }