From a5563c63d257f0feb27b37a71ae9e09d17b14fdd Mon Sep 17 00:00:00 2001 From: Injun Song Date: Mon, 20 Jan 2025 19:23:50 +0900 Subject: [PATCH] refactor(storagenode): remove unused lines Removed dead code that was commented out. This improves code readability and maintainability by eliminating dead code. --- internal/storage/write_batch.go | 106 ------------------ internal/storagenode/logstream/append.go | 2 - .../logstream/commit_wait_queue.go | 73 ------------ internal/storagenode/logstream/sequencer.go | 17 +-- .../storagenode/logstream/sequencer_test.go | 3 +- internal/storagenode/logstream/writer.go | 2 - 6 files changed, 3 insertions(+), 200 deletions(-) diff --git a/internal/storage/write_batch.go b/internal/storage/write_batch.go index 6af964a0f..228d942b0 100644 --- a/internal/storage/write_batch.go +++ b/internal/storage/write_batch.go @@ -36,40 +36,11 @@ func (wb *WriteBatch) release() { writeBatchPool.Put(wb) } -/* -// Deferred returns a new DeferredWriteBatch that can be used to defer assigning LLSNs. -func (wb *WriteBatch) Deferred(poolIdx int) *DeferredWriteBatch { - if wb.count != 0 { - panic("storage: non empty write batch could not be deferred") - } - dwb := newDeferredWriteBatch(wb, poolIdx) - return dwb -} -*/ - -// Put writes the given LLSN and data to the batch. -//func (wb *WriteBatch) Put(llsn types.LLSN, data []byte) error { -// dk := encodeDataKeyInternal(llsn, wb.dk) -// if err := wb.batch.Set(dk, data, nil); err != nil { -// return err -// } -// return nil -//} - // Set writes the given LLSN and data to the batch. func (wb *WriteBatch) Set(llsn types.LLSN, data []byte) error { return wb.batch.Set(encodeDataKeyInternal(llsn, wb.dk), data, nil) } -// SetDeferred writes the given LLSN and data to the batch. -//func (wb *WriteBatch) SetDeferred(llsn types.LLSN, data []byte) error { -// op := wb.batch.SetDeferred(dataKeyLength, len(data)) -// op.Key = encodeDataKeyInternal(llsn, op.Key) -// copy(op.Value, data) -// op.Finish() -// return nil -//} - // Apply applies the batch to the underlying storage. func (wb *WriteBatch) Apply() error { return wb.batch.Commit(wb.writeOpts) @@ -81,80 +52,3 @@ func (wb *WriteBatch) Close() error { wb.release() return err } - -/* -var deferredWriteBatchPools = [...]sync.Pool{ - { - New: func() interface{} { - return &DeferredWriteBatch{ - deferredOps: make([]*pebble.DeferredBatchOp, 0, batchlet.LengthClasses[0]), - poolIdx: 0, - } - }, - }, - { - New: func() interface{} { - return &DeferredWriteBatch{ - deferredOps: make([]*pebble.DeferredBatchOp, 0, batchlet.LengthClasses[1]), - poolIdx: 1, - } - }, - }, - { - New: func() interface{} { - return &DeferredWriteBatch{ - deferredOps: make([]*pebble.DeferredBatchOp, 0, batchlet.LengthClasses[2]), - poolIdx: 2, - } - }, - }, - { - New: func() interface{} { - return &DeferredWriteBatch{ - deferredOps: make([]*pebble.DeferredBatchOp, 0, batchlet.LengthClasses[3]), - poolIdx: 3, - } - }, - }, -} - -type DeferredWriteBatch struct { - wb *WriteBatch - deferredOps []*pebble.DeferredBatchOp - poolIdx int -} - -func newDeferredWriteBatch(wb *WriteBatch, poolIdx int) *DeferredWriteBatch { - dwb := deferredWriteBatchPools[poolIdx].Get().(*DeferredWriteBatch) - dwb.wb = wb - return dwb -} - -func (dwb *DeferredWriteBatch) release() { - dwb.wb = nil - dwb.deferredOps = dwb.deferredOps[0:0] - deferredWriteBatchPools[dwb.poolIdx].Put(dwb) -} - -func (dwb *DeferredWriteBatch) PutData(data []byte) { - deferredOp := dwb.wb.batch.SetDeferred(dataKeyLength, len(data)) - copy(deferredOp.Value, data) - dwb.deferredOps = append(dwb.deferredOps, deferredOp) -} - -func (dwb *DeferredWriteBatch) SetLLSN(idx int, llsn types.LLSN) { - dwb.deferredOps[idx].Key = encodeDataKeyInternal(llsn, dwb.deferredOps[idx].Key) - dwb.deferredOps[idx].Finish() - dwb.deferredOps[idx] = nil -} - -func (dwb *DeferredWriteBatch) Apply() error { - return dwb.wb.Apply() -} - -func (dwb *DeferredWriteBatch) Close() error { - err := dwb.wb.Close() - dwb.release() - return err -} -*/ diff --git a/internal/storagenode/logstream/append.go b/internal/storagenode/logstream/append.go index e4d7a2599..dee35d6d3 100644 --- a/internal/storagenode/logstream/append.go +++ b/internal/storagenode/logstream/append.go @@ -249,11 +249,9 @@ func (lse *Executor) prepareAppendContext(dataBatch [][]byte, apc *appendContext st.wwg = newWriteWaitGroup() apc.wwg = st.wwg - // st.dwb = lse.stg.NewWriteBatch().Deferred(batchletClassIdx) st.wb = lse.stg.NewWriteBatch() st.cwts = newListQueue() for i := 0; i < len(dataBatch); i++ { - // st.dwb.PutData(batchletData[i]) logEntrySize := int64(len(dataBatch[i])) apc.totalBytes += logEntrySize if lse.lsm != nil { diff --git a/internal/storagenode/logstream/commit_wait_queue.go b/internal/storagenode/logstream/commit_wait_queue.go index 0962fefa5..f351a1abc 100644 --- a/internal/storagenode/logstream/commit_wait_queue.go +++ b/internal/storagenode/logstream/commit_wait_queue.go @@ -89,76 +89,3 @@ func (cwq *commitWaitQueue) size() int { cwq.mu.Unlock() return ret } - -//type commitWaitQueueIterator struct { -// curr *list.Element -// queue *commitWaitQueue -//} -// -//func (iter commitWaitQueueIterator) task() *commitWaitTask { -// if !iter.valid() { -// return nil -// } -// return iter.curr.Value.(*commitWaitTask) -//} -// -//func (iter *commitWaitQueueIterator) next() bool { -// iter.queue.mu.Lock() -// defer iter.queue.mu.Unlock() -// if iter.curr != nil { -// iter.curr = iter.curr.Prev() -// } -// return iter.valid() -//} -// -//func (iter commitWaitQueueIterator) valid() bool { -// return iter.curr != nil -//} -// -//type commitWaitQueue struct { -// queue *list.List -// mu sync.Mutex -//} -// -//func newCommitWaitQueue() *commitWaitQueue { -// return &commitWaitQueue{ -// queue: list.New(), -// } -//} -// -//func (cwq *commitWaitQueue) push(cwt *commitWaitTask) error { -// if cwt == nil { -// panic("log stream: commit wait queue: task is nil") -// } -// cwq.mu.Lock() -// cwq.queue.PushFront(cwt) -// cwq.mu.Unlock() -// return nil -//} -// -//func (cwq *commitWaitQueue) peekIterator() commitWaitQueueIterator { -// cwq.mu.Lock() -// iter := commitWaitQueueIterator{ -// curr: cwq.queue.Back(), -// queue: cwq, -// } -// cwq.mu.Unlock() -// return iter -//} -// -//func (cwq *commitWaitQueue) pop() *commitWaitTask { -// cwq.mu.Lock() -// defer cwq.mu.Unlock() -// elem := cwq.queue.Back() -// if elem == nil { -// return nil -// } -// return cwq.queue.Remove(elem).(*commitWaitTask) -//} -// -//func (cwq *commitWaitQueue) size() int { -// cwq.mu.Lock() -// ret := cwq.queue.Len() -// cwq.mu.Unlock() -// return ret -//} diff --git a/internal/storagenode/logstream/sequencer.go b/internal/storagenode/logstream/sequencer.go index 9c2cb6bf4..806bdf75e 100644 --- a/internal/storagenode/logstream/sequencer.go +++ b/internal/storagenode/logstream/sequencer.go @@ -116,7 +116,6 @@ func (sq *sequencer) sequenceLoopInternal(ctx context.Context, st *sequenceTask) if err := st.wb.Set(sq.llsn, st.dataBatch[dataIdx]); err != nil { // TODO: handle error } - // st.dwb.SetLLSN(dataIdx, sq.llsn) } operationEndTime = time.Now() @@ -248,9 +247,8 @@ var sequenceTaskPool = sync.Pool{ } type sequenceTask struct { - wwg *writeWaitGroup - awgs []*appendWaitGroup - // dwb *storage.DeferredWriteBatch + wwg *writeWaitGroup + awgs []*appendWaitGroup wb *storage.WriteBatch dataBatch [][]byte cwts *listQueue @@ -262,20 +260,9 @@ func newSequenceTask() *sequenceTask { return st } -//func newSequenceTask(wwg *writeWaitGroup, dwb *storage.DeferredWriteBatch, awgs []*appendWaitGroup, cwts *listQueue, rts []*replicateTask) *sequenceTask { -// st := sequenceTaskPool.Get().(*sequenceTask) -// st.wwg = wwg -// st.awgs = awgs -// st.dwb = dwb -// st.cwts = cwts -// st.rts = rts -// return st -//} - func (st *sequenceTask) release() { st.wwg = nil st.awgs = nil - // st.dwb = nil st.wb = nil st.dataBatch = nil st.cwts = nil diff --git a/internal/storagenode/logstream/sequencer_test.go b/internal/storagenode/logstream/sequencer_test.go index 58fec044e..3b962b592 100644 --- a/internal/storagenode/logstream/sequencer_test.go +++ b/internal/storagenode/logstream/sequencer_test.go @@ -73,9 +73,8 @@ func testSequenceTask(stg *storage.Storage) *sequenceTask { awg := newAppendWaitGroup(st.wwg) st.awgs = append(st.awgs, awg) - st.wb = stg.NewWriteBatch() // .Deferred(0) + st.wb = stg.NewWriteBatch() st.dataBatch = [][]byte{nil} - // st.dwb.PutData(nil) st.cwts = newListQueue() st.cwts.PushFront(newCommitWaitTask(awg)) diff --git a/internal/storagenode/logstream/writer.go b/internal/storagenode/logstream/writer.go index ac0c81311..d5f628fe5 100644 --- a/internal/storagenode/logstream/writer.go +++ b/internal/storagenode/logstream/writer.go @@ -88,7 +88,6 @@ func (w *writer) writeLoopInternal(_ context.Context, st *sequenceTask) { cnt := len(st.awgs) defer func() { st.wwg.done(err) - // _ = st.dwb.Close() _ = st.wb.Close() st.release() inflight := w.inflight.Add(-1) @@ -100,7 +99,6 @@ func (w *writer) writeLoopInternal(_ context.Context, st *sequenceTask) { w.lse.lsm.WriterInflightOperations.Store(inflight) }() - // err = st.dwb.Apply() err = st.wb.Apply() if err != nil { w.logger.Error("could not apply data", zap.Error(err))