Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storagenode): remove unused lines #968

Open
wants to merge 1 commit into
base: refactor_sn_no_batchlet_in_append
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 0 additions & 106 deletions internal/storage/write_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,40 +36,11 @@ func (wb *WriteBatch) release() {
writeBatchPool.Put(wb)
}

/*
// Deferred returns a new DeferredWriteBatch that can be used to defer assigning LLSNs.
func (wb *WriteBatch) Deferred(poolIdx int) *DeferredWriteBatch {
if wb.count != 0 {
panic("storage: non empty write batch could not be deferred")
}
dwb := newDeferredWriteBatch(wb, poolIdx)
return dwb
}
*/

// Put writes the given LLSN and data to the batch.
//func (wb *WriteBatch) Put(llsn types.LLSN, data []byte) error {
// dk := encodeDataKeyInternal(llsn, wb.dk)
// if err := wb.batch.Set(dk, data, nil); err != nil {
// return err
// }
// return nil
//}

// Set writes the given LLSN and data to the batch.
func (wb *WriteBatch) Set(llsn types.LLSN, data []byte) error {
return wb.batch.Set(encodeDataKeyInternal(llsn, wb.dk), data, nil)
}

// SetDeferred writes the given LLSN and data to the batch.
//func (wb *WriteBatch) SetDeferred(llsn types.LLSN, data []byte) error {
// op := wb.batch.SetDeferred(dataKeyLength, len(data))
// op.Key = encodeDataKeyInternal(llsn, op.Key)
// copy(op.Value, data)
// op.Finish()
// return nil
//}

// Apply applies the batch to the underlying storage.
func (wb *WriteBatch) Apply() error {
return wb.batch.Commit(wb.writeOpts)
Expand All @@ -81,80 +52,3 @@ func (wb *WriteBatch) Close() error {
wb.release()
return err
}

/*
var deferredWriteBatchPools = [...]sync.Pool{
{
New: func() interface{} {
return &DeferredWriteBatch{
deferredOps: make([]*pebble.DeferredBatchOp, 0, batchlet.LengthClasses[0]),
poolIdx: 0,
}
},
},
{
New: func() interface{} {
return &DeferredWriteBatch{
deferredOps: make([]*pebble.DeferredBatchOp, 0, batchlet.LengthClasses[1]),
poolIdx: 1,
}
},
},
{
New: func() interface{} {
return &DeferredWriteBatch{
deferredOps: make([]*pebble.DeferredBatchOp, 0, batchlet.LengthClasses[2]),
poolIdx: 2,
}
},
},
{
New: func() interface{} {
return &DeferredWriteBatch{
deferredOps: make([]*pebble.DeferredBatchOp, 0, batchlet.LengthClasses[3]),
poolIdx: 3,
}
},
},
}

type DeferredWriteBatch struct {
wb *WriteBatch
deferredOps []*pebble.DeferredBatchOp
poolIdx int
}

func newDeferredWriteBatch(wb *WriteBatch, poolIdx int) *DeferredWriteBatch {
dwb := deferredWriteBatchPools[poolIdx].Get().(*DeferredWriteBatch)
dwb.wb = wb
return dwb
}

func (dwb *DeferredWriteBatch) release() {
dwb.wb = nil
dwb.deferredOps = dwb.deferredOps[0:0]
deferredWriteBatchPools[dwb.poolIdx].Put(dwb)
}

func (dwb *DeferredWriteBatch) PutData(data []byte) {
deferredOp := dwb.wb.batch.SetDeferred(dataKeyLength, len(data))
copy(deferredOp.Value, data)
dwb.deferredOps = append(dwb.deferredOps, deferredOp)
}

func (dwb *DeferredWriteBatch) SetLLSN(idx int, llsn types.LLSN) {
dwb.deferredOps[idx].Key = encodeDataKeyInternal(llsn, dwb.deferredOps[idx].Key)
dwb.deferredOps[idx].Finish()
dwb.deferredOps[idx] = nil
}

func (dwb *DeferredWriteBatch) Apply() error {
return dwb.wb.Apply()
}

func (dwb *DeferredWriteBatch) Close() error {
err := dwb.wb.Close()
dwb.release()
return err
}
*/
2 changes: 0 additions & 2 deletions internal/storagenode/logstream/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,9 @@ func (lse *Executor) prepareAppendContext(dataBatch [][]byte, apc *appendContext
st.wwg = newWriteWaitGroup()
apc.wwg = st.wwg

// st.dwb = lse.stg.NewWriteBatch().Deferred(batchletClassIdx)
st.wb = lse.stg.NewWriteBatch()
st.cwts = newListQueue()
for i := 0; i < len(dataBatch); i++ {
// st.dwb.PutData(batchletData[i])
logEntrySize := int64(len(dataBatch[i]))
apc.totalBytes += logEntrySize
if lse.lsm != nil {
Expand Down
73 changes: 0 additions & 73 deletions internal/storagenode/logstream/commit_wait_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,76 +89,3 @@ func (cwq *commitWaitQueue) size() int {
cwq.mu.Unlock()
return ret
}

//type commitWaitQueueIterator struct {
// curr *list.Element
// queue *commitWaitQueue
//}
//
//func (iter commitWaitQueueIterator) task() *commitWaitTask {
// if !iter.valid() {
// return nil
// }
// return iter.curr.Value.(*commitWaitTask)
//}
//
//func (iter *commitWaitQueueIterator) next() bool {
// iter.queue.mu.Lock()
// defer iter.queue.mu.Unlock()
// if iter.curr != nil {
// iter.curr = iter.curr.Prev()
// }
// return iter.valid()
//}
//
//func (iter commitWaitQueueIterator) valid() bool {
// return iter.curr != nil
//}
//
//type commitWaitQueue struct {
// queue *list.List
// mu sync.Mutex
//}
//
//func newCommitWaitQueue() *commitWaitQueue {
// return &commitWaitQueue{
// queue: list.New(),
// }
//}
//
//func (cwq *commitWaitQueue) push(cwt *commitWaitTask) error {
// if cwt == nil {
// panic("log stream: commit wait queue: task is nil")
// }
// cwq.mu.Lock()
// cwq.queue.PushFront(cwt)
// cwq.mu.Unlock()
// return nil
//}
//
//func (cwq *commitWaitQueue) peekIterator() commitWaitQueueIterator {
// cwq.mu.Lock()
// iter := commitWaitQueueIterator{
// curr: cwq.queue.Back(),
// queue: cwq,
// }
// cwq.mu.Unlock()
// return iter
//}
//
//func (cwq *commitWaitQueue) pop() *commitWaitTask {
// cwq.mu.Lock()
// defer cwq.mu.Unlock()
// elem := cwq.queue.Back()
// if elem == nil {
// return nil
// }
// return cwq.queue.Remove(elem).(*commitWaitTask)
//}
//
//func (cwq *commitWaitQueue) size() int {
// cwq.mu.Lock()
// ret := cwq.queue.Len()
// cwq.mu.Unlock()
// return ret
//}
17 changes: 2 additions & 15 deletions internal/storagenode/logstream/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ func (sq *sequencer) sequenceLoopInternal(ctx context.Context, st *sequenceTask)
if err := st.wb.Set(sq.llsn, st.dataBatch[dataIdx]); err != nil {
// TODO: handle error
}
// st.dwb.SetLLSN(dataIdx, sq.llsn)
}

operationEndTime = time.Now()
Expand Down Expand Up @@ -248,9 +247,8 @@ var sequenceTaskPool = sync.Pool{
}

type sequenceTask struct {
wwg *writeWaitGroup
awgs []*appendWaitGroup
// dwb *storage.DeferredWriteBatch
wwg *writeWaitGroup
awgs []*appendWaitGroup
wb *storage.WriteBatch
dataBatch [][]byte
cwts *listQueue
Expand All @@ -262,20 +260,9 @@ func newSequenceTask() *sequenceTask {
return st
}

//func newSequenceTask(wwg *writeWaitGroup, dwb *storage.DeferredWriteBatch, awgs []*appendWaitGroup, cwts *listQueue, rts []*replicateTask) *sequenceTask {
// st := sequenceTaskPool.Get().(*sequenceTask)
// st.wwg = wwg
// st.awgs = awgs
// st.dwb = dwb
// st.cwts = cwts
// st.rts = rts
// return st
//}

func (st *sequenceTask) release() {
st.wwg = nil
st.awgs = nil
// st.dwb = nil
st.wb = nil
st.dataBatch = nil
st.cwts = nil
Expand Down
3 changes: 1 addition & 2 deletions internal/storagenode/logstream/sequencer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,8 @@ func testSequenceTask(stg *storage.Storage) *sequenceTask {
awg := newAppendWaitGroup(st.wwg)
st.awgs = append(st.awgs, awg)

st.wb = stg.NewWriteBatch() // .Deferred(0)
st.wb = stg.NewWriteBatch()
st.dataBatch = [][]byte{nil}
// st.dwb.PutData(nil)

st.cwts = newListQueue()
st.cwts.PushFront(newCommitWaitTask(awg))
Expand Down
2 changes: 0 additions & 2 deletions internal/storagenode/logstream/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func (w *writer) writeLoopInternal(_ context.Context, st *sequenceTask) {
cnt := len(st.awgs)
defer func() {
st.wwg.done(err)
// _ = st.dwb.Close()
_ = st.wb.Close()
st.release()
inflight := w.inflight.Add(-1)
Expand All @@ -100,7 +99,6 @@ func (w *writer) writeLoopInternal(_ context.Context, st *sequenceTask) {
w.lse.lsm.WriterInflightOperations.Store(inflight)
}()

// err = st.dwb.Apply()
err = st.wb.Apply()
if err != nil {
w.logger.Error("could not apply data", zap.Error(err))
Expand Down
Loading