Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#32 Add cron for cleanup of keys from DiceDB #43

Merged
merged 3 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .env.sample
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
DICEDB_ADDR=localhost:7379
DICEDB_ADMIN_ADDR=localhost:7379
DICEDB_ADMIN_USERNAME=diceadmin
DICEDB_ADMIN_PASSWORD=
DICEDB_ADDR=localhost:7380
DICEDB_USERNAME=dice
DICEDB_PASSWORD=
SERVER_PORT=:8080
IS_TEST_ENVIRONMENT=false
REQUEST_LIMIT_PER_MIN=1000
REQUEST_WINDOW_SEC=60
ALLOWED_ORIGINS=http://localhost:3000
ALLOWED_ORIGINS=http://localhost:3000
CRON_CLEANUP_FREQUENCY_MINS=15
58 changes: 40 additions & 18 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,59 +1,81 @@
package config

import (
"fmt"
"log/slog"
"os"
"strconv"
"strings"
"time"

"github.com/joho/godotenv"
)

// Config holds the application configuration
type Config struct {
// Config for DiceDBAdmin instance. This instance holds internal keys
// and is separate from DiceDB hosting global key pool i.e. user facing.
DiceDBAdmin struct {
Addr string // Field for the Dice address
Username string // Field for the username
Password string // Field for the password
}
// Config for DiceDB User instance. This instance holds internal keys
// and is separate from DiceDB hosting global key pool i.e. user facing.
DiceDB struct {
Addr string // Field for the Dice address
Username string // Field for the username
Password string // Field for the password
}
Server struct {
Port string // Field for the server port
IsTestEnv bool
RequestLimitPerMin int64 // Field for the request limit
RequestWindowSec float64 // Field for the time window in float64
AllowedOrigins []string // Field for the allowed origins
Port string // Field for the server port
IsTestEnv bool
RequestLimitPerMin int64 // Field for the request limit
RequestWindowSec float64 // Field for the time window in float64
AllowedOrigins []string // Field for the allowed origins
CronCleanupFrequency time.Duration // Field for configuring key cleanup cron
}
}

// LoadConfig loads the application configuration from environment variables or defaults
func LoadConfig() *Config {
err := godotenv.Load()
if err != nil {
fmt.Println("Warning: .env file not found, falling back to system environment variables.")
slog.Debug("Warning: .env file not found, falling back to system environment variables.")
}

return &Config{
DiceDBAdmin: struct {
Addr string
Username string
Password string
}{
Addr: getEnv("DICEDB_ADMIN_ADDR", "localhost:7379"), // Default DiceDB Admin address
Username: getEnv("DICEDB_ADMIN_USERNAME", "diceadmin"), // Default DiceDB Admin username
Password: getEnv("DICEDB_ADMIN_PASSWORD", ""), // Default DiceDB Admin password
},
DiceDB: struct {
Addr string
Username string
Password string
}{
Addr: getEnv("DICEDB_ADDR", "localhost:7379"), // Default Dice address
Addr: getEnv("DICEDB_ADDR", "localhost:7380"), // Default DiceDB address
Username: getEnv("DICEDB_USERNAME", "dice"), // Default username
Password: getEnv("DICEDB_PASSWORD", ""), // Default password
},
Server: struct {
Port string
IsTestEnv bool
RequestLimitPerMin int64
RequestWindowSec float64
AllowedOrigins []string
Port string
IsTestEnv bool
RequestLimitPerMin int64
RequestWindowSec float64
AllowedOrigins []string
CronCleanupFrequency time.Duration
}{
Port: getEnv("SERVER_PORT", ":8080"),
IsTestEnv: getEnvBool("IS_TEST_ENVIRONMENT", false), // Default server port
RequestLimitPerMin: getEnvInt("REQUEST_LIMIT_PER_MIN", 1000), // Default request limit
RequestWindowSec: getEnvFloat64("REQUEST_WINDOW_SEC", 60), // Default request window in float64
AllowedOrigins: getEnvArray("ALLOWED_ORIGINS", []string{"http://localhost:3000"}), // Default allowed origins
Port: getEnv("SERVER_PORT", ":8080"),
IsTestEnv: getEnvBool("IS_TEST_ENVIRONMENT", false), // Default server port
RequestLimitPerMin: getEnvInt("REQUEST_LIMIT_PER_MIN", 1000), // Default request limit
RequestWindowSec: getEnvFloat64("REQUEST_WINDOW_SEC", 60), // Default request window in float64
AllowedOrigins: getEnvArray("ALLOWED_ORIGINS", []string{"http://localhost:3000"}), // Default allowed origins
CronCleanupFrequency: time.Duration(getEnvInt("CRON_CLEANUP_FREQUENCY_MINS", 15)) * time.Minute, // Default cron cleanup frequency
},
}
}
Expand Down
20 changes: 18 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
services:
dicedb:
dicedbadmin:
image: dicedb/dicedb:latest
ports:
- "7379:7379"
Expand All @@ -11,15 +11,31 @@ services:
networks:
- dice-network

dicedb:
image: dicedb/dicedb:latest
ports:
- "7380:7379"
healthcheck:
test: [ "CMD", "PING" ]
interval: 10s
timeout: 3s
retries: 3
networks:
- dice-network

backend:
build:
context: .
ports:
- "8080:8080"
depends_on:
- dicedbadmin
- dicedb
environment:
- DICEDB_ADDR=localhost:7379
- DICEDB_ADMIN_ADDR=localhost:7379
- DICEDB_ADMIN_USERNAME=${DICEDB_ADMIN_USERNAME}
- DICEDB_ADMIN_PASSWORD=${DICEDB_ADMIN_PASSWORD}
- DICEDB_ADDR=localhost:7380
- DICEDB_USERNAME=${DICEDB_USERNAME}
- DICEDB_PASSWORD=${DICEDB_PASSWORD}
networks:
Expand Down
30 changes: 21 additions & 9 deletions internal/db/dicedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,27 @@ func (db *DiceDB) CloseDiceDB() {
}
}

func InitDiceClient(configValue *config.Config) (*DiceDB, error) {
diceClient := dicedb.NewClient(&dicedb.Options{
Addr: configValue.DiceDB.Addr,
Username: configValue.DiceDB.Username,
Password: configValue.DiceDB.Password,
DialTimeout: 10 * time.Second,
MaxRetries: 10,
EnablePrettyResponse: true,
})
func InitDiceClient(configValue *config.Config, isAdmin bool) (*DiceDB, error) {
var diceClient *dicedb.Client
if isAdmin {
diceClient = dicedb.NewClient(&dicedb.Options{
Addr: configValue.DiceDBAdmin.Addr,
Username: configValue.DiceDBAdmin.Username,
Password: configValue.DiceDBAdmin.Password,
DialTimeout: 10 * time.Second,
MaxRetries: 10,
EnablePrettyResponse: true,
})
} else {
diceClient = dicedb.NewClient(&dicedb.Options{
Addr: configValue.DiceDB.Addr,
Username: configValue.DiceDB.Username,
Password: configValue.DiceDB.Password,
DialTimeout: 10 * time.Second,
MaxRetries: 10,
EnablePrettyResponse: true,
})
}

// Ping the dicedb client to verify the connection
err := diceClient.Ping(context.Background()).Err()
Expand Down
30 changes: 24 additions & 6 deletions internal/middleware/ratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log/slog"
"net/http"
"server/internal/db"
"server/internal/server/utils"
mock "server/internal/tests/dbmocks"
"strconv"
"strings"
Expand Down Expand Up @@ -58,7 +59,7 @@ func RateLimiter(client *db.DiceDB, next http.Handler, limit int64, window float
// Check if the request count exceeds the limit
if requestCount >= limit {
slog.Warn("Request limit exceeded", "count", requestCount)
addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window))
addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window), 0)
http.Error(w, "429 - Too Many Requests", http.StatusTooManyRequests)
return
}
Expand All @@ -77,7 +78,22 @@ func RateLimiter(client *db.DiceDB, next http.Handler, limit int64, window float
}
}

addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window))
// Get the cron last cleanup run time
var lastCronCleanupTime int64
resp := client.Client.Get(ctx, utils.LastCronCleanupTimeUnixMs)
if resp.Err() != nil && !errors.Is(resp.Err(), dicedb.Nil) {
slog.Error("Failed to get last cron cleanup time for headers", slog.Any("err", resp.Err().Error()))
}

if resp.Val() != "" {
lastCronCleanupTime, err = strconv.ParseInt(resp.Val(), 10, 64)
if err != nil {
slog.Error("Error converting last cron cleanup time", "error", err)
}
}

addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window),
lastCronCleanupTime)

slog.Info("Request processed", "count", requestCount+1)
next.ServeHTTP(w, r)
Expand Down Expand Up @@ -126,7 +142,7 @@ func MockRateLimiter(client *mock.DiceDBMock, next http.Handler, limit int64, wi
// Check if the request limit has been exceeded
if requestCount >= limit {
slog.Warn("Request limit exceeded", "count", requestCount)
addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window))
addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window), 0)
http.Error(w, "429 - Too Many Requests", http.StatusTooManyRequests)
return
}
Expand All @@ -147,19 +163,21 @@ func MockRateLimiter(client *mock.DiceDBMock, next http.Handler, limit int64, wi
}
}

addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window))
addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window), 0)

slog.Info("Request processed", "count", requestCount)
next.ServeHTTP(w, r)
})
}

func addRateLimitHeaders(w http.ResponseWriter, limit, remaining, used, resetTime int64) {
func addRateLimitHeaders(w http.ResponseWriter, limit, remaining, used, resetTime, cronLastCleanupTime int64) {
w.Header().Set("x-ratelimit-limit", strconv.FormatInt(limit, 10))
w.Header().Set("x-ratelimit-remaining", strconv.FormatInt(remaining, 10))
w.Header().Set("x-ratelimit-used", strconv.FormatInt(used, 10))
w.Header().Set("x-ratelimit-reset", strconv.FormatInt(resetTime, 10))
w.Header().Set("x-last-cleanup-time", strconv.FormatInt(cronLastCleanupTime, 10))

// Expose the rate limit headers to the client
w.Header().Set("Access-Control-Expose-Headers", "x-ratelimit-limit, x-ratelimit-remaining, x-ratelimit-used, x-ratelimit-reset")
w.Header().Set("Access-Control-Expose-Headers", "x-ratelimit-limit, x-ratelimit-remaining,"+
"x-ratelimit-used, x-ratelimit-reset, x-last-cleanup-time")
}
83 changes: 83 additions & 0 deletions internal/server/cleanup_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package server

import (
"context"
"errors"
"log/slog"
"server/internal/db"
"server/internal/server/utils"
"strconv"
"sync"
"time"

"github.com/dicedb/dicedb-go"
)

type CleanupManager struct {
diceDBAdminClient *db.DiceDB
diceDBClient *db.DiceDB
cronFrequency time.Duration
}

func NewCleanupManager(diceDBAdminClient *db.DiceDB,
diceDBClient *db.DiceDB, cronFrequency time.Duration) *CleanupManager {
return &CleanupManager{
diceDBAdminClient: diceDBAdminClient,
diceDBClient: diceDBClient,
cronFrequency: cronFrequency,
}
}

func (c *CleanupManager) Run(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
c.start(ctx)
}

func (c *CleanupManager) start(ctx context.Context) {
ticker := time.NewTicker(c.cronFrequency)
defer ticker.Stop()

// Get the last cron run time
resp := c.diceDBAdminClient.Client.Get(ctx, utils.LastCronCleanupTimeUnixMs)
if resp.Err() != nil {
if errors.Is(resp.Err(), dicedb.Nil) {
// Default to current time
cleanupTime := strconv.FormatInt(time.Now().UnixMilli(), 10)
slog.Debug("Defaulting last cron cleanup time key since not set", slog.Any("cleanupTime", cleanupTime))
resp := c.diceDBAdminClient.Client.Set(ctx, utils.LastCronCleanupTimeUnixMs, cleanupTime, -1)
if resp.Err() != nil {
slog.Error("Failed to set default value for last cron cleanup time key",
slog.Any("err", resp.Err().Error()))
}
} else {
slog.Error("Failed to get last cron cleanup time", slog.Any("err", resp.Err().Error()))
}
}

for {
select {
case <-ticker.C:
c.runCronTasks()
case <-ctx.Done():
slog.Info("Shutting down cleanup manager")
return
}
}
}

func (c *CleanupManager) runCronTasks() {
// Flush the user DiceDB instance
resp := c.diceDBClient.Client.FlushDB(c.diceDBClient.Ctx)
if resp.Err() != nil {
slog.Error("Failed to flush keys from DiceDB user instance.")
}

// Update last cron run time on DiceDB instance
cleanupTime := strconv.FormatInt(time.Now().UnixMilli(), 10)
resp = c.diceDBAdminClient.Client.Set(c.diceDBClient.Ctx, utils.LastCronCleanupTimeUnixMs,
cleanupTime, -1)
slog.Debug("Updating last cron cleanup time key", slog.Any("cleanupTime", cleanupTime))
if resp.Err() != nil {
slog.Error("Failed to set LastCronCleanupTimeUnixMs")
}
}
Loading