Skip to content

Commit

Permalink
feat(storagenode): remove package internal/batchlet
Browse files Browse the repository at this point in the history
This PR removes the package internal/batchlet that is kind of small batch. Thus
batch in Append RPC message is not splitted any more.
  • Loading branch information
ijsong committed Jan 21, 2025
1 parent daf76e3 commit 485f6c6
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 172 deletions.
21 changes: 0 additions & 21 deletions internal/batchlet/batchlet.go

This file was deleted.

2 changes: 1 addition & 1 deletion internal/storagenode/log_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (ls *logServer) appendStreamSendLoop(stream snpb.LogIO_AppendServer, cq <-c
appendTask.Release()
goto RecordMetric
}
appendTask.ReleaseWriteWaitGroups()
appendTask.ReleaseWriteWaitGroup()
appendTask.Release()

rsp.Results = res
Expand Down
82 changes: 21 additions & 61 deletions internal/storagenode/logstream/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc/codes"

"github.com/kakao/varlog/internal/batchlet"
snerrors "github.com/kakao/varlog/internal/storagenode/errors"
"github.com/kakao/varlog/internal/storagenode/telemetry"
"github.com/kakao/varlog/pkg/types"
Expand Down Expand Up @@ -52,10 +51,8 @@ func (at *AppendTask) Release() {
appendTaskPool.Put(at)
}

func (at *AppendTask) ReleaseWriteWaitGroups() {
for i := range at.apc.wwgs {
at.apc.wwgs[i].release()
}
func (at *AppendTask) ReleaseWriteWaitGroup() {
at.apc.wwg.release()
}

func (at *AppendTask) WaitForCompletion(ctx context.Context) (res []snpb.AppendResult, err error) {
Expand Down Expand Up @@ -118,8 +115,14 @@ func appendTaskDeferredFunc(at *AppendTask) {
}

type appendContext struct {
sts []*sequenceTask
wwgs []*writeWaitGroup
st *sequenceTask
wwg *writeWaitGroup
// NOTE(jun): awgs represents a collection of wait groups corresponding to
// each log entry in the batch. While storage typically writes log entries
// in a batch simultaneously, the commit operation, although expected to
// handle all entries in a batch, is not strictly enforced to do so.
// Therefore, we should maintain awgs until we can guarantee batch-level
// atomic commits.
awgs []*appendWaitGroup
totalBytes int64
}
Expand All @@ -146,15 +149,7 @@ func (lse *Executor) AppendAsync(ctx context.Context, dataBatch [][]byte, append
return snerrors.ErrNotPrimary
}

_, batchletLen := batchlet.SelectLengthClass(dataBatchLen)
batchletCount := dataBatchLen / batchletLen
if dataBatchLen%batchletLen > 0 {
batchletCount++
}

appendTask.apc = appendContext{
sts: make([]*sequenceTask, 0, batchletCount),
wwgs: make([]*writeWaitGroup, 0, batchletCount),
awgs: make([]*appendWaitGroup, 0, dataBatchLen),
}

Expand All @@ -173,7 +168,7 @@ func (lse *Executor) AppendAsync(ctx context.Context, dataBatch [][]byte, append

lse.prepareAppendContext(dataBatch, &appendTask.apc)
preparationDuration = time.Since(startTime)
lse.sendSequenceTasks(ctx, appendTask.apc.sts)
lse.sendSequenceTask(ctx, appendTask.apc.st)
return nil
}

Expand Down Expand Up @@ -202,15 +197,7 @@ func (lse *Executor) Append(ctx context.Context, dataBatch [][]byte) ([]snpb.App
var preparationDuration time.Duration
dataBatchLen := len(dataBatch)

_, batchletLen := batchlet.SelectLengthClass(dataBatchLen)
batchletCount := dataBatchLen / batchletLen
if dataBatchLen%batchletLen > 0 {
batchletCount++
}

apc := appendContext{
sts: make([]*sequenceTask, 0, batchletCount),
wwgs: make([]*writeWaitGroup, 0, batchletCount),
awgs: make([]*appendWaitGroup, 0, dataBatchLen),
}

Expand All @@ -231,60 +218,44 @@ func (lse *Executor) Append(ctx context.Context, dataBatch [][]byte) ([]snpb.App

lse.prepareAppendContext(dataBatch, &apc)
preparationDuration = time.Since(startTime)
lse.sendSequenceTasks(ctx, apc.sts)
lse.sendSequenceTask(ctx, apc.st)
res, err := lse.waitForCompletionOfAppends(ctx, dataBatchLen, apc.awgs)
if err == nil {
for i := range apc.wwgs {
apc.wwgs[i].release()
}
apc.wwg.release()
}
return res, err
}

func (lse *Executor) prepareAppendContext(dataBatch [][]byte, apc *appendContext) {
begin, end := 0, len(dataBatch)
for begin < end {
batchletClassIdx, batchletLen := batchlet.SelectLengthClass(end - begin)
batchletEndIdx := begin + batchletLen
if batchletEndIdx > end {
batchletEndIdx = end
}

lse.prepareAppendContextInternal(dataBatch, begin, batchletEndIdx, batchletClassIdx, apc)
begin = batchletEndIdx
}
}

func (lse *Executor) prepareAppendContextInternal(dataBatch [][]byte, begin, end, batchletClassIdx int, apc *appendContext) {
numBackups := len(lse.primaryBackups) - 1
batchletData := dataBatch[begin:end]

st := newSequenceTask()
apc.sts = append(apc.sts, st)
apc.st = st

// data batch
st.dataBatch = batchletData
st.dataBatch = dataBatch

// replicate tasks
st.rts = newReplicateTaskSlice()
for i := 0; i < numBackups; i++ {
rt := newReplicateTask(end - begin)
rt.tpid = lse.tpid
rt.lsid = lse.lsid
rt.dataList = batchletData
rt.dataList = dataBatch
st.rts.tasks = append(st.rts.tasks, rt)
}

// write wait group
st.wwg = newWriteWaitGroup()
apc.wwgs = append(apc.wwgs, st.wwg)
apc.wwg = st.wwg

// st.dwb = lse.stg.NewWriteBatch().Deferred(batchletClassIdx)
st.wb = lse.stg.NewWriteBatch()
st.cwts = newListQueue()
for i := 0; i < len(batchletData); i++ {
for i := 0; i < len(dataBatch); i++ {
// st.dwb.PutData(batchletData[i])
logEntrySize := int64(len(batchletData[i]))
logEntrySize := int64(len(dataBatch[i]))
apc.totalBytes += logEntrySize
if lse.lsm != nil {
// TODO: Set the correct status code.
Expand All @@ -297,20 +268,9 @@ func (lse *Executor) prepareAppendContextInternal(dataBatch [][]byte, begin, end
st.awgs = apc.awgs[begin:end]
}

func (lse *Executor) sendSequenceTasks(ctx context.Context, sts []*sequenceTask) {
var err error
sendIdx := 0
for sendIdx < len(sts) {
err = lse.sq.send(ctx, sts[sendIdx])
if err != nil {
break
}
sendIdx++
}
for stIdx := sendIdx; stIdx < len(sts); stIdx++ {
st := sts[stIdx]
func (lse *Executor) sendSequenceTask(ctx context.Context, st *sequenceTask) {
if err := lse.sq.send(ctx, st); err != nil {
st.wwg.done(err)
// _ = st.dwb.Close()
_ = st.wb.Close()
releaseCommitWaitTaskList(st.cwts)
releaseReplicateTasks(st.rts.tasks)
Expand Down
21 changes: 16 additions & 5 deletions internal/storagenode/logstream/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/kakao/varlog/internal/batchlet"
"github.com/kakao/varlog/internal/storage"
snerrors "github.com/kakao/varlog/internal/storagenode/errors"
"github.com/kakao/varlog/pkg/types"
Expand Down Expand Up @@ -488,9 +487,15 @@ func TestExecutor_Append(t *testing.T) {
}

// primary
batchLens := make([]int, 0, len(batchlet.LengthClasses)*3+1)
lengthClasses := []int{
1 << 4,
1 << 6,
1 << 8,
1 << 10,
}
batchLens := make([]int, 0, len(lengthClasses)*3+1)
batchLens = append(batchLens, 1)
for _, batchletLen := range batchlet.LengthClasses {
for _, batchletLen := range lengthClasses {
batchLens = append(batchLens, batchletLen-1)
batchLens = append(batchLens, batchletLen)
batchLens = append(batchLens, batchletLen+1)
Expand Down Expand Up @@ -621,9 +626,15 @@ func TestExecutor_Replicate(t *testing.T) {
return
}

lengthClasses := []int{
1 << 4,
1 << 6,
1 << 8,
1 << 10,
}
// backup
llsn := types.MinLLSN
for _, batchLen := range batchlet.LengthClasses {
for _, batchLen := range lengthClasses {
dataList := TestNewBatchData(t, batchLen, 0)
err := lse.Replicate(context.Background(), llsn, dataList)
assert.NoError(t, err)
Expand All @@ -636,7 +647,7 @@ func TestExecutor_Replicate(t *testing.T) {
lastGLSN = types.InvalidGLSN
lastVersion = types.InvalidVersion
)
for _, batchLen := range batchlet.LengthClasses {
for _, batchLen := range lengthClasses {
assert.Eventually(t, func() bool {
_ = lse.Commit(context.Background(), snpb.LogStreamCommitResult{
TopicID: lse.tpid,
Expand Down
11 changes: 2 additions & 9 deletions internal/storagenode/logstream/replicate_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ package logstream
import (
"context"
"fmt"
"slices"
"sync/atomic"
"time"

"go.uber.org/zap"

"github.com/kakao/varlog/internal/batchlet"
"github.com/kakao/varlog/pkg/rpc"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/util/runner"
"github.com/kakao/varlog/pkg/verrors"
"github.com/kakao/varlog/proto/snpb"
Expand Down Expand Up @@ -107,12 +106,9 @@ func (rc *replicateClient) sendLoop(ctx context.Context) {
defer func() {
_ = rc.streamClient.CloseSend()
}()
// NOTE: To reuse the request struct, we need to initialize the field LLSN.
maxBatchletLength := batchlet.LengthClasses[len(batchlet.LengthClasses)-1]
req := &snpb.ReplicateRequest{
TopicID: rc.lse.tpid,
LogStreamID: rc.lse.lsid,
LLSN: make([]types.LLSN, maxBatchletLength),
}
streamCtx := rc.streamClient.Context()
for {
Expand All @@ -136,10 +132,7 @@ func (rc *replicateClient) sendLoopInternal(_ context.Context, rt *replicateTask
// Remove maxAppendSubBatchSize, since rt already has batched data.
startTime := time.Now()
// TODO(jun): Since (snpb.ReplicateRequest).LLSN is deprecated, it will disappear soon.
// NOTE: We need to copy the LLSN array, since the array is reused.
req.LLSN = req.LLSN[0:len(rt.llsnList)]
copy(req.LLSN, rt.llsnList)
// req.LLSN = rt.llsnList
req.LLSN = slices.Clone(rt.llsnList)
req.Data = rt.dataList
if len(rt.llsnList) > 0 {
req.BeginLLSN = rt.llsnList[0]
Expand Down
6 changes: 3 additions & 3 deletions internal/storagenode/logstream/replicate_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestReplicateClientRPCError(t *testing.T) {
rc.queue = make(chan *replicateTask, 1)
rc.streamClient = mockStreamClient

rt := newReplicateTaskDeprecated(0)
rt := newReplicateTask(1)
rt.tpid = lse.tpid
rt.lsid = lse.lsid
rt.llsnList = append(rt.llsnList, 1)
Expand Down Expand Up @@ -156,7 +156,7 @@ func TestReplicateClientDrain(t *testing.T) {
rc.queue = make(chan *replicateTask, numTasks)

for i := 0; i < numTasks; i++ {
rt := newReplicateTaskDeprecated(0)
rt := newReplicateTask(1)
err := rc.send(context.Background(), rt)
assert.NoError(t, err)
}
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestReplicateClient(t *testing.T) {
assert.NoError(t, err)
defer rc.stop()

rt := newReplicateTaskDeprecated(0)
rt := newReplicateTask(1)
rt.tpid = lse.tpid
rt.lsid = lse.lsid
rt.llsnList = append(rt.llsnList, 1)
Expand Down
Loading

0 comments on commit 485f6c6

Please sign in to comment.