From 602c8c45d11756e3b1c61be080da28dd505c8651 Mon Sep 17 00:00:00 2001 From: ayushsatyam146 Date: Tue, 3 Dec 2024 15:49:26 +0530 Subject: [PATCH] fix lint --- config/config.go | 26 +++++++------- internal/iothread/iothread.go | 8 +++-- internal/wal/wal.go | 2 +- internal/wal/wal_aof.go | 64 ++++++++++++++++++----------------- internal/wal/wal_null.go | 2 +- internal/wal/wal_utils.go | 22 ------------ 6 files changed, 54 insertions(+), 70 deletions(-) diff --git a/config/config.go b/config/config.go index 314fa4923..d45ce0b8a 100644 --- a/config/config.go +++ b/config/config.go @@ -177,19 +177,19 @@ type persistence struct { } type WALConfig struct { - LogDir string `config:"log_dir" default:"tmp/deicdeb-wal-lt"` - Enabled bool `config:"enabled" default:"true"` - WalMode string `config:"mode" default:"buffered" validate:"oneof=buffered unbuffered"` - WriteMode string `config:"write_mode" default:"default" validate:"oneof=default fsync"` - BufferSizeMB int `config:"buffer_size" default:"1" validate:"min=1"` - RotationMode string `config:"rotation_mode" default:"segemnt-size" validate:"oneof=segment-size time"` - MaxSegmentSizeMB int64 `config:"buffer_size" default:"16" validate:"min=1"` - SegmentRotationTimeSec time.Duration `config:"max_segment_rotation_time" default:"60" validate:"min=1"` - BufferSyncIntervalMillis time.Duration `config:"max_segment_rotation_time" default:"200" validate:"min=1"` - RetentionMode string `config:"retention_mode" default:"num-segments" validate:"oneof=num-segments time checkpoint"` - MaxSegmentCount int `config:"max_segment_count" default:"10" validate:"min=1"` - SegmentRetentionDurationSec time.Duration `config:"max_segment_retention_time" default:"600" validate:"min=1"` - RecoveryMode string `config:"recovery_mode" default:"strict" validate:"oneof=strict truncate ignore"` + LogDir string `config:"log_dir" default:"tmp/deicdeb-wal-lt"` + Enabled bool `config:"enabled" default:"true"` + WalMode string `config:"mode" default:"buffered" validate:"oneof=buffered unbuffered"` + WriteMode string `config:"write_mode" default:"default" validate:"oneof=default fsync"` + BufferSizeMB int `config:"buffer_size" default:"1" validate:"min=1"` + RotationMode string `config:"rotation_mode" default:"segemnt-size" validate:"oneof=segment-size time"` + MaxSegmentSizeMB int64 `config:"buffer_size" default:"16" validate:"min=1"` + SegmentRotationTime time.Duration `config:"max_segment_rotation_time" default:"60" validate:"min=1"` + BufferSyncInterval time.Duration `config:"max_segment_rotation_time" default:"200" validate:"min=1"` + RetentionMode string `config:"retention_mode" default:"num-segments" validate:"oneof=num-segments time checkpoint"` + MaxSegmentCount int `config:"max_segment_count" default:"10" validate:"min=1"` + SegmentRetentionDuration time.Duration `config:"max_segment_retention_time" default:"600" validate:"min=1"` + RecoveryMode string `config:"recovery_mode" default:"strict" validate:"oneof=strict truncate ignore"` } type logging struct { diff --git a/internal/iothread/iothread.go b/internal/iothread/iothread.go index e2e9842d2..3111a18c7 100644 --- a/internal/iothread/iothread.go +++ b/internal/iothread/iothread.go @@ -564,13 +564,17 @@ func (t *BaseIOThread) handleCommand(ctx context.Context, cmdMeta CmdMeta, diceD } if err == nil && t.wl != nil { - t.wl.LogCommand([]byte(fmt.Sprintf("%s %s", diceDBCmd.Cmd, strings.Join(diceDBCmd.Args, " ")))) + if err := t.wl.LogCommand([]byte(fmt.Sprintf("%s %s", diceDBCmd.Cmd, strings.Join(diceDBCmd.Args, " ")))); err != nil { + return err + } } case MultiShard, AllShard: err = t.writeResponse(ctx, cmdMeta.composeResponse(storeOp...)) if err == nil && t.wl != nil { - t.wl.LogCommand([]byte(fmt.Sprintf("%s %s", diceDBCmd.Cmd, strings.Join(diceDBCmd.Args, " ")))) + if err := t.wl.LogCommand([]byte(fmt.Sprintf("%s %s", diceDBCmd.Cmd, strings.Join(diceDBCmd.Args, " ")))); err != nil { + return err + } } default: slog.Error("Unknown command type", diff --git a/internal/wal/wal.go b/internal/wal/wal.go index 54c8f2919..32c5e3174 100644 --- a/internal/wal/wal.go +++ b/internal/wal/wal.go @@ -10,7 +10,7 @@ import ( ) type AbstractWAL interface { - LogCommand([] byte) error + LogCommand([]byte) error Close() error Init(t time.Time) error ForEachCommand(f func(c cmd.DiceDBCmd) error) error diff --git a/internal/wal/wal_aof.go b/internal/wal/wal_aof.go index 7371e48aa..f66f44b20 100644 --- a/internal/wal/wal_aof.go +++ b/internal/wal/wal_aof.go @@ -55,15 +55,14 @@ type WALAOF struct { } func NewAOFWAL(directory string) (*WALAOF, error) { - ctx, cancel := context.WithCancel(context.Background()) return &WALAOF{ logDir: directory, walMode: config.DiceConfig.WAL.WalMode, - bufferSyncTicker: time.NewTicker(config.DiceConfig.WAL.BufferSyncIntervalMillis * time.Millisecond), - segmentRotationTicker: time.NewTicker(config.DiceConfig.WAL.SegmentRotationTimeSec * time.Second), - segmentRetentionTicker: time.NewTicker(config.DiceConfig.WAL.SegmentRetentionDurationSec * time.Second), + bufferSyncTicker: time.NewTicker(config.DiceConfig.WAL.BufferSyncInterval * time.Millisecond), + segmentRotationTicker: time.NewTicker(config.DiceConfig.WAL.SegmentRotationTime * time.Second), + segmentRetentionTicker: time.NewTicker(config.DiceConfig.WAL.SegmentRetentionDuration * time.Second), writeMode: config.DiceConfig.WAL.WriteMode, maxSegmentSize: config.DiceConfig.WAL.MaxSegmentSizeMB * 1024 * 1024, maxSegmentCount: config.DiceConfig.WAL.MaxSegmentCount, @@ -76,37 +75,37 @@ func NewAOFWAL(directory string) (*WALAOF, error) { }, nil } -func (w *WALAOF) Init(t time.Time) error { - - if err := w.validateConfig(); err != nil { +func (wal *WALAOF) Init(t time.Time) error { + if err := wal.validateConfig(); err != nil { return err } // TODO - Restore existing checkpoints to memory // Create the directory if it doesn't exist - if err := os.MkdirAll(w.logDir, 0755); err != nil { + if err := os.MkdirAll(wal.logDir, 0755); err != nil { return nil } // Get the list of log segment files in the directory - files, err := filepath.Glob(filepath.Join(w.logDir, segmentPrefix+"*")) + files, err := filepath.Glob(filepath.Join(wal.logDir, segmentPrefix+"*")) if err != nil { return nil } if len(files) > 0 { + fmt.Println("Found existing log segments:", files) // TODO - Check if we have newer WAL entries after the last checkpoint and simultaneously replay and checkpoint them } var wg sync.WaitGroup - errCh := make(chan error, w.maxSegmentCount) + errCh := make(chan error, wal.maxSegmentCount) - for i := 0; i < w.maxSegmentCount; i++ { + for i := 0; i < wal.maxSegmentCount; i++ { wg.Add(1) go func(index int) { defer wg.Done() - filePath := filepath.Join(w.logDir, segmentPrefix+fmt.Sprintf("-%d", index)) + filePath := filepath.Join(wal.logDir, segmentPrefix+fmt.Sprintf("-%d", index)) file, err := os.Create(filePath) if err != nil { errCh <- fmt.Errorf("error creating segment file %s: %v", filePath, err) @@ -119,24 +118,24 @@ func (w *WALAOF) Init(t time.Time) error { wg.Wait() close(errCh) - w.lastSequenceNo = 0 - w.currentSegmentIndex = 0 - w.oldestSegmentIndex = 0 - w.byteOffset = 0 - w.currentSegmentFile, err = os.OpenFile(filepath.Join(w.logDir, "seg-0"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if _, err = w.currentSegmentFile.Seek(0, io.SeekEnd); err != nil { + wal.lastSequenceNo = 0 + wal.currentSegmentIndex = 0 + wal.oldestSegmentIndex = 0 + wal.byteOffset = 0 + wal.currentSegmentFile, err = os.OpenFile(filepath.Join(wal.logDir, "seg-0"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if _, err := wal.currentSegmentFile.Seek(0, io.SeekEnd); err != nil { return err } - w.bufWriter = bufio.NewWriterSize(w.currentSegmentFile, w.bufferSize) + wal.bufWriter = bufio.NewWriterSize(wal.currentSegmentFile, wal.bufferSize) - go w.keepSyncingBuffer() + go wal.keepSyncingBuffer() - if w.rotationMode == "time" { - go w.rotateSegmentPeriodically() + if wal.rotationMode == "time" { //nolint:goconst + go wal.rotateSegmentPeriodically() } - if w.retentionMode == "time" { - go w.deleteSegmentPeriodically() + if wal.retentionMode == "time" { //nolint:goconst + go wal.deleteSegmentPeriodically() } return nil @@ -161,7 +160,9 @@ func (wal *WALAOF) writeEntry(data []byte) error { } entrySize := getEntrySize(data) - wal.rotateLogIfNeeded(entrySize) + if err := wal.rotateLogIfNeeded(entrySize); err != nil { + return err + } wal.byteOffset += entrySize @@ -169,9 +170,11 @@ func (wal *WALAOF) writeEntry(data []byte) error { return err } - // if wal-mode unbuffered immediatley sync to disk - if wal.walMode == "unbuffered" { - wal.Sync() + // if wal-mode unbuffered immediately sync to disk + if wal.walMode == "unbuffered" { //nolint:goconst + if err := wal.Sync(); err != nil { + return err + } } return nil @@ -232,7 +235,6 @@ func (wal *WALAOF) rotateLog() error { } func (wal *WALAOF) deleteOldestSegment() error { - oldestSegmentFilePath := filepath.Join(wal.logDir, segmentPrefix+fmt.Sprintf("%d", wal.oldestSegmentIndex)) // TODO: checkpoint before deleting the file @@ -260,7 +262,7 @@ func (wal *WALAOF) Sync() error { if err := wal.bufWriter.Flush(); err != nil { return err } - if wal.writeMode == "fsync" { + if wal.writeMode == "fsync" { //nolint:goconst if err := wal.currentSegmentFile.Sync(); err != nil { return err } @@ -323,7 +325,7 @@ func (wal *WALAOF) deleteSegmentPeriodically() { } } -func (w *WALAOF) ForEachCommand(f func(c cmd.DiceDBCmd) error) error { +func (wal *WALAOF) ForEachCommand(f func(c cmd.DiceDBCmd) error) error { // TODO: implement this method return nil } diff --git a/internal/wal/wal_null.go b/internal/wal/wal_null.go index 46ae01b75..80f3aa849 100644 --- a/internal/wal/wal_null.go +++ b/internal/wal/wal_null.go @@ -18,7 +18,7 @@ func (w *WALNull) Init(t time.Time) error { } // LogCommand serializes a WALLogEntry and writes it to the current WAL file. -func (w *WALNull) LogCommand(b []byte) error{ +func (w *WALNull) LogCommand(b []byte) error { return nil } diff --git a/internal/wal/wal_utils.go b/internal/wal/wal_utils.go index 57375918e..ee8536f03 100644 --- a/internal/wal/wal_utils.go +++ b/internal/wal/wal_utils.go @@ -2,32 +2,10 @@ package wal import ( "fmt" - "hash/crc32" "google.golang.org/protobuf/proto" ) -// unmarshalAndVerifyEntry unmarshals the given data into a WAL entry and -// verifies the CRC of the entry. Only returns an error if the CRC is invalid. -func unmarshalAndVerifyEntry(data []byte) (*WAL_Entry, error) { - var entry WAL_Entry - MustUnmarshal(data, &entry) - - if !verifyCRC(&entry) { - return nil, fmt.Errorf("CRC mismatch: data may be corrupted") - } - - return &entry, nil -} - -// Validates whether the given entry has a valid CRC. -func verifyCRC(entry *WAL_Entry) bool { - // Reset the entry CRC for the verification. - actualCRC := crc32.ChecksumIEEE(append(entry.GetData(), byte(entry.GetLogSequenceNumber()))) - - return entry.CRC == actualCRC -} - // Marshals func MustMarshal(entry *WAL_Entry) []byte { marshaledEntry, err := proto.Marshal(entry)