diff --git a/internal/batchlet/batchlet.go b/internal/batchlet/batchlet.go deleted file mode 100644 index d698be748..000000000 --- a/internal/batchlet/batchlet.go +++ /dev/null @@ -1,21 +0,0 @@ -package batchlet - -var ( - LengthClasses = []int{ - 1 << 4, - 1 << 6, - 1 << 8, - 1 << 10, - } -) - -func SelectLengthClass(size int) (idx, batchletLen int) { - idx = 0 - for idx < len(LengthClasses)-1 { - if size <= LengthClasses[idx] { - break - } - idx++ - } - return idx, LengthClasses[idx] -} diff --git a/internal/storagenode/log_server.go b/internal/storagenode/log_server.go index fa58be2df..6789df4c4 100644 --- a/internal/storagenode/log_server.go +++ b/internal/storagenode/log_server.go @@ -162,7 +162,7 @@ func (ls *logServer) appendStreamSendLoop(stream snpb.LogIO_AppendServer, cq <-c appendTask.Release() goto RecordMetric } - appendTask.ReleaseWriteWaitGroups() + appendTask.ReleaseWriteWaitGroup() appendTask.Release() rsp.Results = res diff --git a/internal/storagenode/logstream/append.go b/internal/storagenode/logstream/append.go index 6ca5b0877..4f1fbd765 100644 --- a/internal/storagenode/logstream/append.go +++ b/internal/storagenode/logstream/append.go @@ -9,7 +9,6 @@ import ( "go.uber.org/zap" "google.golang.org/grpc/codes" - "github.com/kakao/varlog/internal/batchlet" snerrors "github.com/kakao/varlog/internal/storagenode/errors" "github.com/kakao/varlog/internal/storagenode/telemetry" "github.com/kakao/varlog/pkg/types" @@ -52,10 +51,8 @@ func (at *AppendTask) Release() { appendTaskPool.Put(at) } -func (at *AppendTask) ReleaseWriteWaitGroups() { - for i := range at.apc.wwgs { - at.apc.wwgs[i].release() - } +func (at *AppendTask) ReleaseWriteWaitGroup() { + at.apc.wwg.release() } func (at *AppendTask) WaitForCompletion(ctx context.Context) (res []snpb.AppendResult, err error) { @@ -118,8 +115,14 @@ func appendTaskDeferredFunc(at *AppendTask) { } type appendContext struct { - sts []*sequenceTask - wwgs []*writeWaitGroup + st *sequenceTask + wwg *writeWaitGroup + // NOTE(jun): awgs represents a collection of wait groups corresponding to + // each log entry in the batch. While storage typically writes log entries + // in a batch simultaneously, the commit operation, although expected to + // handle all entries in a batch, is not strictly enforced to do so. + // Therefore, we should maintain awgs until we can guarantee batch-level + // atomic commits. awgs []*appendWaitGroup totalBytes int64 } @@ -146,15 +149,7 @@ func (lse *Executor) AppendAsync(ctx context.Context, dataBatch [][]byte, append return snerrors.ErrNotPrimary } - _, batchletLen := batchlet.SelectLengthClass(dataBatchLen) - batchletCount := dataBatchLen / batchletLen - if dataBatchLen%batchletLen > 0 { - batchletCount++ - } - appendTask.apc = appendContext{ - sts: make([]*sequenceTask, 0, batchletCount), - wwgs: make([]*writeWaitGroup, 0, batchletCount), awgs: make([]*appendWaitGroup, 0, dataBatchLen), } @@ -173,7 +168,7 @@ func (lse *Executor) AppendAsync(ctx context.Context, dataBatch [][]byte, append lse.prepareAppendContext(dataBatch, &appendTask.apc) preparationDuration = time.Since(startTime) - lse.sendSequenceTasks(ctx, appendTask.apc.sts) + lse.sendSequenceTask(ctx, appendTask.apc.st) return nil } @@ -202,15 +197,7 @@ func (lse *Executor) Append(ctx context.Context, dataBatch [][]byte) ([]snpb.App var preparationDuration time.Duration dataBatchLen := len(dataBatch) - _, batchletLen := batchlet.SelectLengthClass(dataBatchLen) - batchletCount := dataBatchLen / batchletLen - if dataBatchLen%batchletLen > 0 { - batchletCount++ - } - apc := appendContext{ - sts: make([]*sequenceTask, 0, batchletCount), - wwgs: make([]*writeWaitGroup, 0, batchletCount), awgs: make([]*appendWaitGroup, 0, dataBatchLen), } @@ -231,39 +218,23 @@ func (lse *Executor) Append(ctx context.Context, dataBatch [][]byte) ([]snpb.App lse.prepareAppendContext(dataBatch, &apc) preparationDuration = time.Since(startTime) - lse.sendSequenceTasks(ctx, apc.sts) + lse.sendSequenceTask(ctx, apc.st) res, err := lse.waitForCompletionOfAppends(ctx, dataBatchLen, apc.awgs) if err == nil { - for i := range apc.wwgs { - apc.wwgs[i].release() - } + apc.wwg.release() } return res, err } func (lse *Executor) prepareAppendContext(dataBatch [][]byte, apc *appendContext) { begin, end := 0, len(dataBatch) - for begin < end { - batchletClassIdx, batchletLen := batchlet.SelectLengthClass(end - begin) - batchletEndIdx := begin + batchletLen - if batchletEndIdx > end { - batchletEndIdx = end - } - - lse.prepareAppendContextInternal(dataBatch, begin, batchletEndIdx, batchletClassIdx, apc) - begin = batchletEndIdx - } -} - -func (lse *Executor) prepareAppendContextInternal(dataBatch [][]byte, begin, end, batchletClassIdx int, apc *appendContext) { numBackups := len(lse.primaryBackups) - 1 - batchletData := dataBatch[begin:end] st := newSequenceTask() - apc.sts = append(apc.sts, st) + apc.st = st // data batch - st.dataBatch = batchletData + st.dataBatch = dataBatch // replicate tasks st.rts = newReplicateTaskSlice() @@ -271,20 +242,20 @@ func (lse *Executor) prepareAppendContextInternal(dataBatch [][]byte, begin, end rt := newReplicateTask(end - begin) rt.tpid = lse.tpid rt.lsid = lse.lsid - rt.dataList = batchletData + rt.dataList = dataBatch st.rts.tasks = append(st.rts.tasks, rt) } // write wait group st.wwg = newWriteWaitGroup() - apc.wwgs = append(apc.wwgs, st.wwg) + apc.wwg = st.wwg // st.dwb = lse.stg.NewWriteBatch().Deferred(batchletClassIdx) st.wb = lse.stg.NewWriteBatch() st.cwts = newListQueue() - for i := 0; i < len(batchletData); i++ { + for i := 0; i < len(dataBatch); i++ { // st.dwb.PutData(batchletData[i]) - logEntrySize := int64(len(batchletData[i])) + logEntrySize := int64(len(dataBatch[i])) apc.totalBytes += logEntrySize if lse.lsm != nil { // TODO: Set the correct status code. @@ -297,20 +268,9 @@ func (lse *Executor) prepareAppendContextInternal(dataBatch [][]byte, begin, end st.awgs = apc.awgs[begin:end] } -func (lse *Executor) sendSequenceTasks(ctx context.Context, sts []*sequenceTask) { - var err error - sendIdx := 0 - for sendIdx < len(sts) { - err = lse.sq.send(ctx, sts[sendIdx]) - if err != nil { - break - } - sendIdx++ - } - for stIdx := sendIdx; stIdx < len(sts); stIdx++ { - st := sts[stIdx] +func (lse *Executor) sendSequenceTask(ctx context.Context, st *sequenceTask) { + if err := lse.sq.send(ctx, st); err != nil { st.wwg.done(err) - // _ = st.dwb.Close() _ = st.wb.Close() releaseCommitWaitTaskList(st.cwts) releaseReplicateTasks(st.rts.tasks) diff --git a/internal/storagenode/logstream/executor_test.go b/internal/storagenode/logstream/executor_test.go index 6eb3e6570..35b328b2e 100644 --- a/internal/storagenode/logstream/executor_test.go +++ b/internal/storagenode/logstream/executor_test.go @@ -14,7 +14,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "github.com/kakao/varlog/internal/batchlet" "github.com/kakao/varlog/internal/storage" snerrors "github.com/kakao/varlog/internal/storagenode/errors" "github.com/kakao/varlog/pkg/types" @@ -488,9 +487,15 @@ func TestExecutor_Append(t *testing.T) { } // primary - batchLens := make([]int, 0, len(batchlet.LengthClasses)*3+1) + lengthClasses := []int{ + 1 << 4, + 1 << 6, + 1 << 8, + 1 << 10, + } + batchLens := make([]int, 0, len(lengthClasses)*3+1) batchLens = append(batchLens, 1) - for _, batchletLen := range batchlet.LengthClasses { + for _, batchletLen := range lengthClasses { batchLens = append(batchLens, batchletLen-1) batchLens = append(batchLens, batchletLen) batchLens = append(batchLens, batchletLen+1) @@ -621,9 +626,15 @@ func TestExecutor_Replicate(t *testing.T) { return } + lengthClasses := []int{ + 1 << 4, + 1 << 6, + 1 << 8, + 1 << 10, + } // backup llsn := types.MinLLSN - for _, batchLen := range batchlet.LengthClasses { + for _, batchLen := range lengthClasses { dataList := TestNewBatchData(t, batchLen, 0) err := lse.Replicate(context.Background(), llsn, dataList) assert.NoError(t, err) @@ -636,7 +647,7 @@ func TestExecutor_Replicate(t *testing.T) { lastGLSN = types.InvalidGLSN lastVersion = types.InvalidVersion ) - for _, batchLen := range batchlet.LengthClasses { + for _, batchLen := range lengthClasses { assert.Eventually(t, func() bool { _ = lse.Commit(context.Background(), snpb.LogStreamCommitResult{ TopicID: lse.tpid, diff --git a/internal/storagenode/logstream/replicate_client.go b/internal/storagenode/logstream/replicate_client.go index 5209a0d58..3280461f9 100644 --- a/internal/storagenode/logstream/replicate_client.go +++ b/internal/storagenode/logstream/replicate_client.go @@ -3,14 +3,13 @@ package logstream import ( "context" "fmt" + "slices" "sync/atomic" "time" "go.uber.org/zap" - "github.com/kakao/varlog/internal/batchlet" "github.com/kakao/varlog/pkg/rpc" - "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/pkg/util/runner" "github.com/kakao/varlog/pkg/verrors" "github.com/kakao/varlog/proto/snpb" @@ -107,12 +106,9 @@ func (rc *replicateClient) sendLoop(ctx context.Context) { defer func() { _ = rc.streamClient.CloseSend() }() - // NOTE: To reuse the request struct, we need to initialize the field LLSN. - maxBatchletLength := batchlet.LengthClasses[len(batchlet.LengthClasses)-1] req := &snpb.ReplicateRequest{ TopicID: rc.lse.tpid, LogStreamID: rc.lse.lsid, - LLSN: make([]types.LLSN, maxBatchletLength), } streamCtx := rc.streamClient.Context() for { @@ -136,10 +132,7 @@ func (rc *replicateClient) sendLoopInternal(_ context.Context, rt *replicateTask // Remove maxAppendSubBatchSize, since rt already has batched data. startTime := time.Now() // TODO(jun): Since (snpb.ReplicateRequest).LLSN is deprecated, it will disappear soon. - // NOTE: We need to copy the LLSN array, since the array is reused. - req.LLSN = req.LLSN[0:len(rt.llsnList)] - copy(req.LLSN, rt.llsnList) - // req.LLSN = rt.llsnList + req.LLSN = slices.Clone(rt.llsnList) req.Data = rt.dataList if len(rt.llsnList) > 0 { req.BeginLLSN = rt.llsnList[0] diff --git a/internal/storagenode/logstream/replicate_client_test.go b/internal/storagenode/logstream/replicate_client_test.go index 04f275900..32759f66b 100644 --- a/internal/storagenode/logstream/replicate_client_test.go +++ b/internal/storagenode/logstream/replicate_client_test.go @@ -123,7 +123,7 @@ func TestReplicateClientRPCError(t *testing.T) { rc.queue = make(chan *replicateTask, 1) rc.streamClient = mockStreamClient - rt := newReplicateTaskDeprecated(0) + rt := newReplicateTask(1) rt.tpid = lse.tpid rt.lsid = lse.lsid rt.llsnList = append(rt.llsnList, 1) @@ -156,7 +156,7 @@ func TestReplicateClientDrain(t *testing.T) { rc.queue = make(chan *replicateTask, numTasks) for i := 0; i < numTasks; i++ { - rt := newReplicateTaskDeprecated(0) + rt := newReplicateTask(1) err := rc.send(context.Background(), rt) assert.NoError(t, err) } @@ -206,7 +206,7 @@ func TestReplicateClient(t *testing.T) { assert.NoError(t, err) defer rc.stop() - rt := newReplicateTaskDeprecated(0) + rt := newReplicateTask(1) rt.tpid = lse.tpid rt.lsid = lse.lsid rt.llsnList = append(rt.llsnList, 1) diff --git a/internal/storagenode/logstream/replicate_task.go b/internal/storagenode/logstream/replicate_task.go index c5fbe9c23..a0922bc55 100644 --- a/internal/storagenode/logstream/replicate_task.go +++ b/internal/storagenode/logstream/replicate_task.go @@ -3,7 +3,6 @@ package logstream import ( "sync" - "github.com/kakao/varlog/internal/batchlet" "github.com/kakao/varlog/pkg/types" ) @@ -13,28 +12,6 @@ type replicateTask struct { lsid types.LogStreamID llsnList []types.LLSN dataList [][]byte - - poolIdx int -} - -// newReplicateTaskDeprecated returns a new replicateTask. -// The argument poolIdx should be the index of replicateTaskPools, which is returned from batchlet.SelectLengthClass. -// -// Deprecated: Use newReplicateTask. -func newReplicateTaskDeprecated(poolIdx int) *replicateTask { - rt := replicateTaskPools[poolIdx].Get().(*replicateTask) - return rt -} - -// releaseDeprecated releases the task to the pool. -// -// Deprecated: Use release. -func (rt *replicateTask) releaseDeprecated() { - rt.tpid = 0 - rt.lsid = 0 - rt.llsnList = rt.llsnList[0:0] - rt.dataList = nil - replicateTaskPools[rt.poolIdx].Put(rt) } // newReplicateTask returns a new replicateTask. The capacity of the returned @@ -81,7 +58,6 @@ func (p *replicateTaskPool) get(size int) *replicateTask { } return &replicateTask{ llsnList: make([]types.LLSN, 0, size), - poolIdx: 0, } } @@ -89,43 +65,6 @@ func (p *replicateTaskPool) put(rt *replicateTask) { p.pool.Put(rt) } -// replicateTaskPools is a set of pools for replicateTask. -// Deprecated: Use defaultReplicateTaskPool. -var replicateTaskPools = [...]sync.Pool{ - { - New: func() interface{} { - return &replicateTask{ - llsnList: make([]types.LLSN, 0, batchlet.LengthClasses[0]), - poolIdx: 0, - } - }, - }, - { - New: func() interface{} { - return &replicateTask{ - llsnList: make([]types.LLSN, 0, batchlet.LengthClasses[1]), - poolIdx: 1, - } - }, - }, - { - New: func() interface{} { - return &replicateTask{ - llsnList: make([]types.LLSN, 0, batchlet.LengthClasses[2]), - poolIdx: 2, - } - }, - }, - { - New: func() interface{} { - return &replicateTask{ - llsnList: make([]types.LLSN, 0, batchlet.LengthClasses[3]), - poolIdx: 3, - } - }, - }, -} - const defaultLengthOfReplicationTaskSlice = 3 type replicateTaskSlice struct { diff --git a/internal/storagenode/logstream/replicate_task_test.go b/internal/storagenode/logstream/replicate_task_test.go index 0e5228d10..80a823a5b 100644 --- a/internal/storagenode/logstream/replicate_task_test.go +++ b/internal/storagenode/logstream/replicate_task_test.go @@ -4,24 +4,22 @@ import ( "testing" "github.com/stretchr/testify/assert" - - "github.com/kakao/varlog/internal/batchlet" ) func TestReplicateTaskPools(t *testing.T) { const repeatCount = 1000 + lengthList := []int{ + 1 << 4, + 1 << 6, + 1 << 8, + 1 << 10, + } for range repeatCount { - for poolIdx, batchletLen := range batchlet.LengthClasses { - rt1 := newReplicateTaskDeprecated(poolIdx) - assert.Empty(t, rt1.llsnList) - assert.Equal(t, batchletLen, cap(rt1.llsnList)) - - rt2 := newReplicateTask(batchletLen) + for _, length := range lengthList { + rt2 := newReplicateTask(length) assert.Empty(t, rt2.llsnList) - assert.GreaterOrEqual(t, cap(rt2.llsnList), batchletLen) - - rt1.releaseDeprecated() + assert.GreaterOrEqual(t, cap(rt2.llsnList), length) rt2.release() } }