diff --git a/internal/storagenode/logstream/committer.go b/internal/storagenode/logstream/committer.go index a072997ef..0abb21a6d 100644 --- a/internal/storagenode/logstream/committer.go +++ b/internal/storagenode/logstream/committer.go @@ -218,10 +218,10 @@ func (cm *committer) commit(_ context.Context, ct *commitTask) error { CommittedLLSNBegin: uncommittedLLSNBegin, } - return cm.commitInternal(commitContext, true) + return cm.commitInternal(commitContext) } -func (cm *committer) commitInternal(cc storage.CommitContext, requireCommitWaitTasks bool) (err error) { +func (cm *committer) commitInternal(cc storage.CommitContext) (err error) { _, _, uncommittedBegin, _ := cm.lse.lsc.reportCommitBase() uncommttedLLSNBegin := uncommittedBegin.LLSN @@ -236,7 +236,7 @@ func (cm *committer) commitInternal(cc storage.CommitContext, requireCommitWaitT // size of commitWaitQueue. For instance, a batch of log entries could be written to the // storage, however, it is failed to push them into the commitWaitQueue. // See #VARLOG-444. - if requireCommitWaitTasks && cm.commitWaitQ.size() < numCommits { + if cm.commitWaitQ.size() < numCommits { return nil } @@ -269,14 +269,10 @@ func (cm *committer) commitInternal(cc storage.CommitContext, requireCommitWaitT return err } - // If requireCommitWaitTasks is true, since the number of tasks in commitWaitQ is - // inspected above, cwt must exist. - // If cwt is null, it means that there is no task in commitWaitQ anymore. When this - // method is executed by SyncReplicate, it is okay for cwt not to exist. + // Since the number of tasks in commitWaitQ is inspected above, cwt + // must exist. cwt := iter.task() - if cwt != nil { - cwt.awg.setGLSN(glsn) - } + cwt.awg.setGLSN(glsn) err = cb.Set(llsn, glsn) if err != nil { @@ -295,12 +291,7 @@ func (cm *committer) commitInternal(cc storage.CommitContext, requireCommitWaitT // NOTE: This cwt should not be nil, because the size of commitWaitQ is inspected // above. cwt := cm.commitWaitQ.pop() - if cwt != nil { - // FIXME (jun): If sync replication is occurred, cwt may be nil. - // It means that above codes tried to pop from empty commitWaitQ. - // It is very difficult to detect bug, so it should be re-organized. - committedTasks = append(committedTasks, cwt) - } + committedTasks = append(committedTasks, cwt) } if numCommits > 0 { @@ -323,7 +314,6 @@ func (cm *committer) commitInternal(cc storage.CommitContext, requireCommitWaitT cwt.release() } - // NOTE: When sync replication is occurred, it can be zero. if len(committedTasks) > 0 { cm.inflightCommitWait.Add(int64(-numCommits)) }