From d8d50c8db05d2e60f43b9ff7085ad02694235c5b Mon Sep 17 00:00:00 2001 From: Prashant Shubham Date: Tue, 22 Oct 2024 18:44:47 +0530 Subject: [PATCH] #32 Add cron for cleanup of keys from DiceDB (#43) --- .env.sample | 8 +- config/config.go | 58 +++++++++----- docker-compose.yml | 20 ++++- internal/db/dicedb.go | 30 ++++--- internal/middleware/ratelimiter.go | 30 +++++-- internal/server/cleanup_manager.go | 83 ++++++++++++++++++++ internal/server/http.go | 29 ++++--- internal/server/utils/constants.go | 5 ++ internal/tests/integration/commands/setup.go | 2 +- main.go | 39 ++++++--- 10 files changed, 245 insertions(+), 59 deletions(-) create mode 100644 internal/server/cleanup_manager.go create mode 100644 internal/server/utils/constants.go diff --git a/.env.sample b/.env.sample index 513ebe4..77fc0b1 100644 --- a/.env.sample +++ b/.env.sample @@ -1,8 +1,12 @@ -DICEDB_ADDR=localhost:7379 +DICEDB_ADMIN_ADDR=localhost:7379 +DICEDB_ADMIN_USERNAME=diceadmin +DICEDB_ADMIN_PASSWORD= +DICEDB_ADDR=localhost:7380 DICEDB_USERNAME=dice DICEDB_PASSWORD= SERVER_PORT=:8080 IS_TEST_ENVIRONMENT=false REQUEST_LIMIT_PER_MIN=1000 REQUEST_WINDOW_SEC=60 -ALLOWED_ORIGINS=http://localhost:3000 \ No newline at end of file +ALLOWED_ORIGINS=http://localhost:3000 +CRON_CLEANUP_FREQUENCY_MINS=15 \ No newline at end of file diff --git a/config/config.go b/config/config.go index cc7416d..c24a152 100644 --- a/config/config.go +++ b/config/config.go @@ -1,27 +1,38 @@ package config import ( - "fmt" + "log/slog" "os" "strconv" "strings" + "time" "github.com/joho/godotenv" ) // Config holds the application configuration type Config struct { + // Config for DiceDBAdmin instance. This instance holds internal keys + // and is separate from DiceDB hosting global key pool i.e. user facing. + DiceDBAdmin struct { + Addr string // Field for the Dice address + Username string // Field for the username + Password string // Field for the password + } + // Config for DiceDB User instance. This instance holds internal keys + // and is separate from DiceDB hosting global key pool i.e. user facing. DiceDB struct { Addr string // Field for the Dice address Username string // Field for the username Password string // Field for the password } Server struct { - Port string // Field for the server port - IsTestEnv bool - RequestLimitPerMin int64 // Field for the request limit - RequestWindowSec float64 // Field for the time window in float64 - AllowedOrigins []string // Field for the allowed origins + Port string // Field for the server port + IsTestEnv bool + RequestLimitPerMin int64 // Field for the request limit + RequestWindowSec float64 // Field for the time window in float64 + AllowedOrigins []string // Field for the allowed origins + CronCleanupFrequency time.Duration // Field for configuring key cleanup cron } } @@ -29,31 +40,42 @@ type Config struct { func LoadConfig() *Config { err := godotenv.Load() if err != nil { - fmt.Println("Warning: .env file not found, falling back to system environment variables.") + slog.Debug("Warning: .env file not found, falling back to system environment variables.") } return &Config{ + DiceDBAdmin: struct { + Addr string + Username string + Password string + }{ + Addr: getEnv("DICEDB_ADMIN_ADDR", "localhost:7379"), // Default DiceDB Admin address + Username: getEnv("DICEDB_ADMIN_USERNAME", "diceadmin"), // Default DiceDB Admin username + Password: getEnv("DICEDB_ADMIN_PASSWORD", ""), // Default DiceDB Admin password + }, DiceDB: struct { Addr string Username string Password string }{ - Addr: getEnv("DICEDB_ADDR", "localhost:7379"), // Default Dice address + Addr: getEnv("DICEDB_ADDR", "localhost:7380"), // Default DiceDB address Username: getEnv("DICEDB_USERNAME", "dice"), // Default username Password: getEnv("DICEDB_PASSWORD", ""), // Default password }, Server: struct { - Port string - IsTestEnv bool - RequestLimitPerMin int64 - RequestWindowSec float64 - AllowedOrigins []string + Port string + IsTestEnv bool + RequestLimitPerMin int64 + RequestWindowSec float64 + AllowedOrigins []string + CronCleanupFrequency time.Duration }{ - Port: getEnv("SERVER_PORT", ":8080"), - IsTestEnv: getEnvBool("IS_TEST_ENVIRONMENT", false), // Default server port - RequestLimitPerMin: getEnvInt("REQUEST_LIMIT_PER_MIN", 1000), // Default request limit - RequestWindowSec: getEnvFloat64("REQUEST_WINDOW_SEC", 60), // Default request window in float64 - AllowedOrigins: getEnvArray("ALLOWED_ORIGINS", []string{"http://localhost:3000"}), // Default allowed origins + Port: getEnv("SERVER_PORT", ":8080"), + IsTestEnv: getEnvBool("IS_TEST_ENVIRONMENT", false), // Default server port + RequestLimitPerMin: getEnvInt("REQUEST_LIMIT_PER_MIN", 1000), // Default request limit + RequestWindowSec: getEnvFloat64("REQUEST_WINDOW_SEC", 60), // Default request window in float64 + AllowedOrigins: getEnvArray("ALLOWED_ORIGINS", []string{"http://localhost:3000"}), // Default allowed origins + CronCleanupFrequency: time.Duration(getEnvInt("CRON_CLEANUP_FREQUENCY_MINS", 15)) * time.Minute, // Default cron cleanup frequency }, } } diff --git a/docker-compose.yml b/docker-compose.yml index 2b00215..0a5810c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,5 @@ services: - dicedb: + dicedbadmin: image: dicedb/dicedb:latest ports: - "7379:7379" @@ -11,15 +11,31 @@ services: networks: - dice-network + dicedb: + image: dicedb/dicedb:latest + ports: + - "7380:7379" + healthcheck: + test: [ "CMD", "PING" ] + interval: 10s + timeout: 3s + retries: 3 + networks: + - dice-network + backend: build: context: . ports: - "8080:8080" depends_on: + - dicedbadmin - dicedb environment: - - DICEDB_ADDR=localhost:7379 + - DICEDB_ADMIN_ADDR=localhost:7379 + - DICEDB_ADMIN_USERNAME=${DICEDB_ADMIN_USERNAME} + - DICEDB_ADMIN_PASSWORD=${DICEDB_ADMIN_PASSWORD} + - DICEDB_ADDR=localhost:7380 - DICEDB_USERNAME=${DICEDB_USERNAME} - DICEDB_PASSWORD=${DICEDB_PASSWORD} networks: diff --git a/internal/db/dicedb.go b/internal/db/dicedb.go index bd040a9..6cd6d8c 100644 --- a/internal/db/dicedb.go +++ b/internal/db/dicedb.go @@ -33,15 +33,27 @@ func (db *DiceDB) CloseDiceDB() { } } -func InitDiceClient(configValue *config.Config) (*DiceDB, error) { - diceClient := dicedb.NewClient(&dicedb.Options{ - Addr: configValue.DiceDB.Addr, - Username: configValue.DiceDB.Username, - Password: configValue.DiceDB.Password, - DialTimeout: 10 * time.Second, - MaxRetries: 10, - EnablePrettyResponse: true, - }) +func InitDiceClient(configValue *config.Config, isAdmin bool) (*DiceDB, error) { + var diceClient *dicedb.Client + if isAdmin { + diceClient = dicedb.NewClient(&dicedb.Options{ + Addr: configValue.DiceDBAdmin.Addr, + Username: configValue.DiceDBAdmin.Username, + Password: configValue.DiceDBAdmin.Password, + DialTimeout: 10 * time.Second, + MaxRetries: 10, + EnablePrettyResponse: true, + }) + } else { + diceClient = dicedb.NewClient(&dicedb.Options{ + Addr: configValue.DiceDB.Addr, + Username: configValue.DiceDB.Username, + Password: configValue.DiceDB.Password, + DialTimeout: 10 * time.Second, + MaxRetries: 10, + EnablePrettyResponse: true, + }) + } // Ping the dicedb client to verify the connection err := diceClient.Ping(context.Background()).Err() diff --git a/internal/middleware/ratelimiter.go b/internal/middleware/ratelimiter.go index 8cf9594..cfc7b3b 100644 --- a/internal/middleware/ratelimiter.go +++ b/internal/middleware/ratelimiter.go @@ -7,6 +7,7 @@ import ( "log/slog" "net/http" "server/internal/db" + "server/internal/server/utils" mock "server/internal/tests/dbmocks" "strconv" "strings" @@ -58,7 +59,7 @@ func RateLimiter(client *db.DiceDB, next http.Handler, limit int64, window float // Check if the request count exceeds the limit if requestCount >= limit { slog.Warn("Request limit exceeded", "count", requestCount) - addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window)) + addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window), 0) http.Error(w, "429 - Too Many Requests", http.StatusTooManyRequests) return } @@ -77,7 +78,22 @@ func RateLimiter(client *db.DiceDB, next http.Handler, limit int64, window float } } - addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window)) + // Get the cron last cleanup run time + var lastCronCleanupTime int64 + resp := client.Client.Get(ctx, utils.LastCronCleanupTimeUnixMs) + if resp.Err() != nil && !errors.Is(resp.Err(), dicedb.Nil) { + slog.Error("Failed to get last cron cleanup time for headers", slog.Any("err", resp.Err().Error())) + } + + if resp.Val() != "" { + lastCronCleanupTime, err = strconv.ParseInt(resp.Val(), 10, 64) + if err != nil { + slog.Error("Error converting last cron cleanup time", "error", err) + } + } + + addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window), + lastCronCleanupTime) slog.Info("Request processed", "count", requestCount+1) next.ServeHTTP(w, r) @@ -126,7 +142,7 @@ func MockRateLimiter(client *mock.DiceDBMock, next http.Handler, limit int64, wi // Check if the request limit has been exceeded if requestCount >= limit { slog.Warn("Request limit exceeded", "count", requestCount) - addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window)) + addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window), 0) http.Error(w, "429 - Too Many Requests", http.StatusTooManyRequests) return } @@ -147,19 +163,21 @@ func MockRateLimiter(client *mock.DiceDBMock, next http.Handler, limit int64, wi } } - addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window)) + addRateLimitHeaders(w, limit, limit-(requestCount+1), requestCount+1, currentWindow+int64(window), 0) slog.Info("Request processed", "count", requestCount) next.ServeHTTP(w, r) }) } -func addRateLimitHeaders(w http.ResponseWriter, limit, remaining, used, resetTime int64) { +func addRateLimitHeaders(w http.ResponseWriter, limit, remaining, used, resetTime, cronLastCleanupTime int64) { w.Header().Set("x-ratelimit-limit", strconv.FormatInt(limit, 10)) w.Header().Set("x-ratelimit-remaining", strconv.FormatInt(remaining, 10)) w.Header().Set("x-ratelimit-used", strconv.FormatInt(used, 10)) w.Header().Set("x-ratelimit-reset", strconv.FormatInt(resetTime, 10)) + w.Header().Set("x-last-cleanup-time", strconv.FormatInt(cronLastCleanupTime, 10)) // Expose the rate limit headers to the client - w.Header().Set("Access-Control-Expose-Headers", "x-ratelimit-limit, x-ratelimit-remaining, x-ratelimit-used, x-ratelimit-reset") + w.Header().Set("Access-Control-Expose-Headers", "x-ratelimit-limit, x-ratelimit-remaining,"+ + "x-ratelimit-used, x-ratelimit-reset, x-last-cleanup-time") } diff --git a/internal/server/cleanup_manager.go b/internal/server/cleanup_manager.go new file mode 100644 index 0000000..9e45d6a --- /dev/null +++ b/internal/server/cleanup_manager.go @@ -0,0 +1,83 @@ +package server + +import ( + "context" + "errors" + "log/slog" + "server/internal/db" + "server/internal/server/utils" + "strconv" + "sync" + "time" + + "github.com/dicedb/dicedb-go" +) + +type CleanupManager struct { + diceDBAdminClient *db.DiceDB + diceDBClient *db.DiceDB + cronFrequency time.Duration +} + +func NewCleanupManager(diceDBAdminClient *db.DiceDB, + diceDBClient *db.DiceDB, cronFrequency time.Duration) *CleanupManager { + return &CleanupManager{ + diceDBAdminClient: diceDBAdminClient, + diceDBClient: diceDBClient, + cronFrequency: cronFrequency, + } +} + +func (c *CleanupManager) Run(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + c.start(ctx) +} + +func (c *CleanupManager) start(ctx context.Context) { + ticker := time.NewTicker(c.cronFrequency) + defer ticker.Stop() + + // Get the last cron run time + resp := c.diceDBAdminClient.Client.Get(ctx, utils.LastCronCleanupTimeUnixMs) + if resp.Err() != nil { + if errors.Is(resp.Err(), dicedb.Nil) { + // Default to current time + cleanupTime := strconv.FormatInt(time.Now().UnixMilli(), 10) + slog.Debug("Defaulting last cron cleanup time key since not set", slog.Any("cleanupTime", cleanupTime)) + resp := c.diceDBAdminClient.Client.Set(ctx, utils.LastCronCleanupTimeUnixMs, cleanupTime, -1) + if resp.Err() != nil { + slog.Error("Failed to set default value for last cron cleanup time key", + slog.Any("err", resp.Err().Error())) + } + } else { + slog.Error("Failed to get last cron cleanup time", slog.Any("err", resp.Err().Error())) + } + } + + for { + select { + case <-ticker.C: + c.runCronTasks() + case <-ctx.Done(): + slog.Info("Shutting down cleanup manager") + return + } + } +} + +func (c *CleanupManager) runCronTasks() { + // Flush the user DiceDB instance + resp := c.diceDBClient.Client.FlushDB(c.diceDBClient.Ctx) + if resp.Err() != nil { + slog.Error("Failed to flush keys from DiceDB user instance.") + } + + // Update last cron run time on DiceDB instance + cleanupTime := strconv.FormatInt(time.Now().UnixMilli(), 10) + resp = c.diceDBAdminClient.Client.Set(c.diceDBClient.Ctx, utils.LastCronCleanupTimeUnixMs, + cleanupTime, -1) + slog.Debug("Updating last cron cleanup time key", slog.Any("cleanupTime", cleanupTime)) + if resp.Err() != nil { + slog.Error("Failed to set LastCronCleanupTimeUnixMs") + } +} diff --git a/internal/server/http.go b/internal/server/http.go index 876f959..b5209c2 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -7,7 +7,6 @@ import ( "log/slog" "net/http" "strings" - "sync" "time" "server/internal/db" @@ -51,11 +50,12 @@ func (cim *HandlerMux) ServeHTTP(w http.ResponseWriter, r *http.Request) { })).ServeHTTP(w, r) } -func NewHTTPServer(addr string, mux *http.ServeMux, client *db.DiceDB, limit int64, window float64) *HTTPServer { +func NewHTTPServer(addr string, mux *http.ServeMux, diceDBAdminClient *db.DiceDB, diceClient *db.DiceDB, + limit int64, window float64) *HTTPServer { handlerMux := &HandlerMux{ mux: mux, rateLimiter: func(w http.ResponseWriter, r *http.Request, next http.Handler) { - middleware.RateLimiter(client, next, limit, window).ServeHTTP(w, r) + middleware.RateLimiter(diceDBAdminClient, next, limit, window).ServeHTTP(w, r) }, } @@ -65,25 +65,30 @@ func NewHTTPServer(addr string, mux *http.ServeMux, client *db.DiceDB, limit int Handler: handlerMux, ReadHeaderTimeout: 5 * time.Second, }, - DiceClient: client, + DiceClient: diceClient, } } func (s *HTTPServer) Run(ctx context.Context) error { - var wg sync.WaitGroup + var err error - wg.Add(1) go func() { - defer wg.Done() - slog.Info("starting server at", slog.String("addr", s.httpServer.Addr)) - if err := s.httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + slog.Info("starting HTTP server at", slog.String("addr", s.httpServer.Addr)) + if err = s.httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { slog.Error("http server error: %v", slog.Any("err", err)) } }() - <-ctx.Done() - slog.Info("shutting down server...") - return s.Shutdown() + go func() { + <-ctx.Done() + err = s.Shutdown() + if err != nil { + slog.Error("Failed to gracefully shutdown HTTP server", slog.Any("err", err)) + return + } + }() + + return err } func (s *HTTPServer) Shutdown() error { diff --git a/internal/server/utils/constants.go b/internal/server/utils/constants.go new file mode 100644 index 0000000..b137eed --- /dev/null +++ b/internal/server/utils/constants.go @@ -0,0 +1,5 @@ +package utils + +const ( + LastCronCleanupTimeUnixMs = "playground_mono:last_cron_cleanup_run_time_unix_ms" +) diff --git a/internal/tests/integration/commands/setup.go b/internal/tests/integration/commands/setup.go index f6a5a43..2b2420b 100644 --- a/internal/tests/integration/commands/setup.go +++ b/internal/tests/integration/commands/setup.go @@ -41,7 +41,7 @@ type TestCase struct { func NewHTTPCommandExecutor() (*HTTPCommandExecutor, error) { configValue := config.LoadConfig() - diceClient, err := db.InitDiceClient(configValue) + diceClient, err := db.InitDiceClient(configValue, false) if err != nil { return nil, fmt.Errorf("failed to initialize DiceDB client: %v", err) } diff --git a/main.go b/main.go index 6fd164d..6539f8d 100644 --- a/main.go +++ b/main.go @@ -8,31 +8,52 @@ import ( "server/config" "server/internal/db" "server/internal/server" + "sync" _ "github.com/joho/godotenv/autoload" ) func main() { configValue := config.LoadConfig() - diceClient, err := db.InitDiceClient(configValue) + diceDBAdminClient, err := db.InitDiceClient(configValue, true) + if err != nil { + slog.Error("Failed to initialize DiceDB Admin client: %v", slog.Any("err", err)) + os.Exit(1) + } + + diceDBClient, err := db.InitDiceClient(configValue, false) if err != nil { slog.Error("Failed to initialize DiceDB client: %v", slog.Any("err", err)) os.Exit(1) } + // Graceful shutdown context + ctx, cancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + // Register a cleanup manager, this runs user DiceDB instance cleanup job at configured frequency + cleanupManager := server.NewCleanupManager(diceDBAdminClient, diceDBClient, configValue.Server.CronCleanupFrequency) + wg.Add(1) + go cleanupManager.Run(ctx, &wg) + // Create mux and register routes mux := http.NewServeMux() - httpServer := server.NewHTTPServer(":8080", mux, diceClient, configValue.Server.RequestLimitPerMin, configValue.Server.RequestWindowSec) + httpServer := server.NewHTTPServer(":8080", mux, diceDBAdminClient, diceDBClient, configValue.Server.RequestLimitPerMin, + configValue.Server.RequestWindowSec) mux.HandleFunc("/health", httpServer.HealthCheck) mux.HandleFunc("/shell/exec/{cmd}", httpServer.CliHandler) mux.HandleFunc("/search", httpServer.SearchHandler) - // Graceful shutdown context - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + wg.Add(1) + go func() { + defer wg.Done() + // Run the HTTP Server + if err := httpServer.Run(ctx); err != nil { + slog.Error("server failed: %v\n", slog.Any("err", err)) + diceDBAdminClient.CloseDiceDB() + cancel() + } + }() - // Run the HTTP Server - if err := httpServer.Run(ctx); err != nil { - slog.Error("server failed: %v\n", slog.Any("err", err)) - } + wg.Wait() + slog.Info("Server has shut down gracefully") }