Skip to content

Commit 61dd820

Browse files
committed
feat: refactoring code, add adaptive bloom filter, adaptive migration and other updates
1 parent fb3cfd5 commit 61dd820

35 files changed

+1645
-330
lines changed

Readme.MD

Lines changed: 79 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,34 +33,53 @@ import (
3333
"github.com/arturmon/multi-tier-caching/storage"
3434
)
3535

36-
func main() {
3736

37+
func main() {
38+
ctx, cancel := context.WithCancel(context.Background())
39+
defer cancel()
3840
cfg := config.LoadConfig()
3941

42+
var debugCache = false
43+
4044
logger.InitLogger(cfg.LogLevel)
4145
logger.Info("Launching the cache system", "memoryCacheSize", cfg.MemoryCacheSize)
4246

43-
dbStorage, err := storage.NewDatabaseStorage(cfg.DatabaseDSN)
47+
memoryStorage, err := storage.NewRistrettoCache(ctx, int64(cfg.MemoryCacheSize))
48+
if err != nil {
49+
logger.Error("Failed to create Memory stoage: %v", err)
50+
}
51+
52+
dbStorage, err := storage.NewDatabaseStorage(cfg.DatabaseDSN, debugCache)
4453
if err != nil {
4554
logger.Error("Failed to connect to the database", "error", err)
4655
return
4756
}
4857
defer dbStorage.Close()
4958

50-
redisStorage, err := storage.NewRedisStorage(cfg.RedisAddr, cfg.RedisPassword)
59+
redisStorage, err := storage.NewRedisStorage(cfg.RedisAddr, cfg.RedisPassword, 1)
5160
if err != nil {
5261
logger.Error("Failed to connect to Redis", "error", err)
5362
return
5463
}
5564

56-
cache := multi_tier_caching.NewMultiTierCache(
57-
[]multi_tier_caching.CacheLayer{
58-
multi_tier_caching.NewMemoryCache(),
59-
multi_tier_caching.NewRedisCache(redisStorage),
65+
logger.Info("Launching the cache system", "memoryCacheSize", cfg.MemoryCacheSize)
66+
logger.Info("Launching the cache system", "databaseDSN", cfg.DatabaseDSN)
67+
logger.Info("Launching the cache system", "redisAddr", cfg.RedisAddr)
68+
69+
cacheConfig := multi_tier_caching.MultiTierCacheConfig{
70+
Layers: []multi_tier_caching.LayerInfo{
71+
multi_tier_caching.NewLayerInfo(multi_tier_caching.NewMemoryCache(memoryStorage)),
72+
multi_tier_caching.NewLayerInfo(multi_tier_caching.NewRedisCache(redisStorage)),
6073
},
61-
multi_tier_caching.NewDatabaseCache(dbStorage),
62-
)
74+
DB: multi_tier_caching.NewDatabaseCache(dbStorage),
75+
Thresholds: []int{10, 5},
76+
BloomSize: 100000,
77+
BloomHashes: 5,
78+
Debug: debugCache,
79+
}
6380

81+
cache := multi_tier_caching.NewMultiTierCache(ctx, cacheConfig)
82+
6483
err = cache.Set(context.Background(), "key1", "value1")
6584
if err != nil {
6685
return
@@ -90,11 +109,57 @@ fmt.Println("Cached Value:", value)
90109
```
91110

92111
## Features
93-
- **Multi-tier caching**: Uses memory, Redis, and database storage layers.
94-
- **Bloom filter**: Reduces unnecessary database queries.
95-
- **Write-behind queue**: Efficient database updates.
96-
- **TTL Management**: Supports automatic expiration of cached items.
97-
- **Cache Analytics**: Provides cache hit/miss statistics.
112+
113+
- **Multi-tier caching architecture**:
114+
- Configurable cache layers (e.g., hot, warm, cold) for optimized data access.
115+
- Prioritizes checks from the hottest layer (in-memory) to colder layers (e.g., disk, remote).
116+
117+
- **Bloom filter optimization**:
118+
- Reduces unnecessary database queries by probabilistically checking key existence.
119+
- Dynamically resizes based on request frequency, miss rate, and load factors.
120+
121+
- **Adaptive TTL management**:
122+
- Adjusts time-to-live (TTL) dynamically using key request frequency.
123+
- Longer TTL for high-frequency keys to minimize cache churn.
124+
125+
- **Intelligent data migration**:
126+
- Automatically promotes/demotes keys between layers using frequency thresholds.
127+
- Processes migrations asynchronously with adjustable intervals via background workers.
128+
129+
- **Metrics and analytics**:
130+
- Tracks cache hits, misses, migration times, and key frequency.
131+
- Exposes Prometheus metrics (`cacheHits`, `migrationTime`, `keyFrequency`).
132+
133+
- **Write-behind queue**:
134+
- Batches and asynchronously persists updates to reduce database latency.
135+
- Adaptive processing intervals based on queue load.
136+
137+
- **Self-optimizing components**:
138+
- **Bloom filter auto-scaling**: Dynamically adjusts size and hash functions.
139+
- **TTL auto-tuning**: Balances cache efficiency and storage costs.
140+
141+
- **Health monitoring**:
142+
- Health checks for all cache layers and the underlying database.
143+
- Graceful shutdown with resource cleanup.
144+
145+
- **Debugging and observability**:
146+
- Detailed logs for migrations, TTL adjustments, and cache operations.
147+
- `GetDebugInfo()` method for real-time Bloom filter stats (false positive rate, load factor).
148+
149+
- **Configurable policies**:
150+
- Customizable frequency thresholds for layer transitions.
151+
- Adjustable Bloom filter parameters (initial size, hash functions).
152+
153+
- **Database integration**:
154+
- Fallback to database on cache misses, with automatic cache refresh for fetched keys.
155+
156+
- **Concurrency and scalability**:
157+
- Thread-safe operations using mutexes and channels.
158+
- Parallel migration workers for high-throughput scenarios.
159+
160+
- **Prometheus integration**:
161+
- Built-in support for Prometheus metrics (histograms, counters, gauges).
162+
98163

99164
## Contributing
100165
Pull requests are welcome. Please open an issue first to discuss any major changes.

analytics.go

Lines changed: 46 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -2,94 +2,69 @@ package multi_tier_caching
22

33
import (
44
"sync"
5-
"time"
5+
6+
"github.com/prometheus/client_golang/prometheus"
7+
dto "github.com/prometheus/client_model/go"
68
)
79

810
type CacheAnalytics struct {
9-
hits, misses int
10-
frequencies map[string]int // Request frequency for each keyword
11-
recentFreq sync.Map // Request frequency in the last minute
12-
mu sync.Mutex
13-
lastResetTime time.Time
14-
}
15-
16-
func NewCacheAnalytics() *CacheAnalytics {
17-
return &CacheAnalytics{
18-
frequencies: make(map[string]int),
19-
lastResetTime: time.Now(),
20-
}
11+
cacheHits *prometheus.CounterVec
12+
cacheMisses prometheus.Counter
13+
keyFrequency *prometheus.GaugeVec
14+
migrationTime prometheus.Histogram
15+
migrationCount *prometheus.CounterVec
16+
mu sync.RWMutex
17+
keys map[string]struct{}
2118
}
2219

23-
func (ca *CacheAnalytics) LogHit(key string) {
24-
ca.mu.Lock()
25-
defer ca.mu.Unlock()
26-
ca.hits++
27-
ca.frequencies[key]++
28-
valRecent, _ := ca.recentFreq.LoadOrStore(key, 0)
29-
ca.recentFreq.Store(key, valRecent.(int)+1)
20+
// LogHit records a cache hit for a specific layer and key, updating the corresponding metrics.
21+
func (a *CacheAnalytics) LogHit(layerName, key string) {
22+
a.cacheHits.WithLabelValues(layerName).Inc()
23+
a.keyFrequency.WithLabelValues(key).Inc()
24+
a.mu.Lock()
25+
defer a.mu.Unlock()
26+
a.keys[key] = struct{}{}
3027
}
3128

32-
func (ca *CacheAnalytics) LogMiss() {
33-
ca.mu.Lock()
34-
defer ca.mu.Unlock()
35-
ca.misses++
29+
// LogMiss increments the cache miss counter.
30+
func (a *CacheAnalytics) LogMiss() {
31+
a.cacheMisses.Inc()
3632
}
3733

38-
func (ca *CacheAnalytics) GetStats() (int, int) {
39-
ca.mu.Lock()
40-
defer ca.mu.Unlock()
41-
return ca.hits, ca.misses
42-
}
34+
// GetFrequency retrieves the request frequency of a specific cache key.
35+
func (a *CacheAnalytics) GetFrequency(key string) int {
36+
gauge, err := a.keyFrequency.GetMetricWithLabelValues(key)
37+
if err != nil {
38+
return 0
39+
}
4340

44-
func (ca *CacheAnalytics) GetFrequency(key string) int {
45-
ca.mu.Lock()
46-
defer ca.mu.Unlock()
47-
return ca.frequencies[key]
48-
}
41+
var metric dto.Metric
42+
if err = gauge.Write(&metric); err != nil {
43+
return 0
44+
}
4945

50-
// GetTotalFrequency Returns the total number of times a key has been accessed.
51-
func (ca *CacheAnalytics) GetTotalFrequency(key string) int {
52-
ca.mu.Lock()
53-
defer ca.mu.Unlock()
54-
return ca.frequencies[key]
46+
return int(metric.Gauge.GetValue())
5547
}
5648

57-
// GetFrequencyPerMinute Returns the request rate per minute and resets the counters
58-
func (ca *CacheAnalytics) GetFrequencyPerMinute() map[string]int {
59-
ca.mu.Lock()
60-
defer ca.mu.Unlock()
49+
// GetFrequencyPerMinute returns a map containing the request frequency of all tracked keys.
50+
func (a *CacheAnalytics) GetFrequencyPerMinute() map[string]int {
51+
result := make(map[string]int)
6152

62-
now := time.Now()
63-
if now.Sub(ca.lastResetTime).Minutes() < 1 {
64-
return nil // Not a minute has passed yet
53+
// Create a copy of the keys for secure iteration
54+
a.mu.RLock()
55+
keysCopy := make([]string, 0, len(a.keys))
56+
for key := range a.keys {
57+
keysCopy = append(keysCopy, key)
6558
}
59+
a.mu.RUnlock()
6660

67-
freqCopy := make(map[string]int)
68-
// Iterating over the sync.Map using Range
69-
ca.recentFreq.Range(func(k, v interface{}) bool {
70-
if key, ok := k.(string); ok {
71-
if count, ok := v.(int); ok {
72-
freqCopy[key] = count
61+
for _, key := range keysCopy {
62+
if gauge, err := a.keyFrequency.GetMetricWithLabelValues(key); err == nil {
63+
var metric dto.Metric
64+
if err = gauge.Write(&metric); err == nil {
65+
result[key] = int(*metric.Gauge.Value)
7366
}
7467
}
75-
return true // continue iteration
76-
})
77-
78-
// Resetting temporary data
79-
ca.recentFreq = sync.Map{}
80-
ca.lastResetTime = now
81-
82-
return freqCopy
83-
}
84-
85-
// ResetFrequencies We reset the counters every minute
86-
func (ca *CacheAnalytics) ResetFrequencies() {
87-
ca.mu.Lock()
88-
defer ca.mu.Unlock()
89-
90-
now := time.Now()
91-
if now.Sub(ca.lastResetTime).Minutes() >= 1 {
92-
ca.frequencies = make(map[string]int) // Clearing the counters
93-
ca.lastResetTime = now
9468
}
69+
return result
9570
}

analyticsMetrics.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package multi_tier_caching
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
"github.com/prometheus/client_golang/prometheus/promauto"
6+
)
7+
8+
// NewCacheAnalytics initializes and returns a new CacheAnalytics instance with Prometheus metrics.
9+
func NewCacheAnalytics() *CacheAnalytics {
10+
return &CacheAnalytics{
11+
cacheHits: promauto.NewCounterVec(prometheus.CounterOpts{
12+
Name: "cache_hits_total",
13+
Help: "Total cache hits per layer",
14+
}, []string{"layer"}),
15+
16+
cacheMisses: promauto.NewCounter(prometheus.CounterOpts{
17+
Name: "cache_misses_total",
18+
Help: "Total cache misses",
19+
}),
20+
21+
keyFrequency: promauto.NewGaugeVec(prometheus.GaugeOpts{
22+
Name: "cache_key_frequency",
23+
Help: "Request frequency for keys",
24+
}, []string{"key"}),
25+
26+
migrationTime: promauto.NewHistogram(prometheus.HistogramOpts{
27+
Name: "cache_migration_duration_seconds",
28+
Help: "Time spent on data migration",
29+
Buckets: []float64{0.1, 0.5, 1, 5},
30+
}),
31+
32+
migrationCount: promauto.NewCounterVec(prometheus.CounterOpts{
33+
Name: "cache_migration_operations_total",
34+
Help: "Total migration operations",
35+
}, []string{"status"}),
36+
keys: make(map[string]struct{}),
37+
}
38+
}

analytics_test.go

Lines changed: 0 additions & 30 deletions
This file was deleted.

0 commit comments

Comments
 (0)