Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(logstream): use beginLLSN in Replicate method #958

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion internal/storagenode/client/log_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type SubscribeResult struct {
}

var InvalidSubscribeResult = SubscribeResult{
LogEntry: varlogpb.InvalidLogEntry(),
LogEntry: varlogpb.LogEntry{},
Error: errors.New("invalid subscribe result"),
}

Expand Down
12 changes: 6 additions & 6 deletions internal/storagenode/logstream/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func NewExecutor(opts ...ExecutorOption) (lse *Executor, err error) {
return lse, err
}

func (lse *Executor) Replicate(ctx context.Context, llsnList []types.LLSN, dataList [][]byte) error {
func (lse *Executor) Replicate(ctx context.Context, beginLLSN types.LLSN, dataList [][]byte) error {
lse.inflight.Add(1)
defer lse.inflight.Add(-1)

Expand All @@ -178,7 +178,7 @@ func (lse *Executor) Replicate(ctx context.Context, llsnList []types.LLSN, dataL
var preparationDuration time.Duration
startTime := time.Now()
dataBytes := int64(0)
batchSize := len(llsnList)
batchSize := len(dataList)
defer func() {
if lse.lsm == nil {
return
Expand All @@ -190,11 +190,11 @@ func (lse *Executor) Replicate(ctx context.Context, llsnList []types.LLSN, dataL
lse.lsm.ReplicatePreparationMicro.Add(preparationDuration.Microseconds())
}()

oldLLSN, newLLSN := llsnList[0], llsnList[batchSize-1]+1
oldLLSN, newLLSN := beginLLSN, beginLLSN+types.LLSN(batchSize)
wb := lse.stg.NewWriteBatch()
cwts := newListQueue()
for i := 0; i < len(llsnList); i++ {
_ = wb.Set(llsnList[i], dataList[i])
for i := 0; i < batchSize; i++ {
_ = wb.Set(beginLLSN+types.LLSN(i), dataList[i])
dataBytes += int64(len(dataList[i]))
cwts.PushFront(newCommitWaitTask(nil))
}
Expand Down Expand Up @@ -329,7 +329,7 @@ func (lse *Executor) Unseal(_ context.Context, replicas []varlogpb.LogStreamRepl
replica: replicas[i],
rpcConn: rpcConn,
queueCapacity: lse.replicateClientQueueCapacity,
//grpcDialOptions: lse.replicateClientGRPCOptions,
// grpcDialOptions: lse.replicateClientGRPCOptions,
lse: lse,
logger: lse.logger.Named("replicate client"),
})
Expand Down
24 changes: 9 additions & 15 deletions internal/storagenode/logstream/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestExecutor_Closed(t *testing.T) {
_, err := lse.Append(context.Background(), TestNewBatchData(t, 1, 0))
assert.ErrorIs(t, err, verrors.ErrClosed)

err = lse.Replicate(context.Background(), []types.LLSN{1}, TestNewBatchData(t, 1, 0))
err = lse.Replicate(context.Background(), types.LLSN(1), TestNewBatchData(t, 1, 0))
assert.ErrorIs(t, err, verrors.ErrClosed)

_, _, err = lse.Seal(context.Background(), types.MinGLSN)
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestExecutor_Sealing(t *testing.T) {
assert.Equal(t, varlogpb.LogStreamStatusSealing, st)
assert.Equal(t, executorStateSealing, lse.esm.load())

err = lse.Replicate(context.Background(), []types.LLSN{1}, TestNewBatchData(t, 1, 0))
err = lse.Replicate(context.Background(), types.LLSN(1), TestNewBatchData(t, 1, 0))
assert.ErrorIs(t, err, verrors.ErrSealed)
},
},
Expand Down Expand Up @@ -286,7 +286,7 @@ func TestExecutor_Sealed(t *testing.T) {
_, err = lse.Append(context.Background(), TestNewBatchData(t, 1, 0))
assert.ErrorIs(t, err, verrors.ErrSealed)

err = lse.Replicate(context.Background(), []types.LLSN{1}, TestNewBatchData(t, 1, 0))
err = lse.Replicate(context.Background(), types.LLSN(1), TestNewBatchData(t, 1, 0))
assert.ErrorIs(t, err, verrors.ErrSealed)
}

Expand Down Expand Up @@ -616,22 +616,18 @@ func TestExecutor_Replicate(t *testing.T) {

// primary
if tc.isErr {
err := lse.Replicate(context.Background(), []types.LLSN{1}, [][]byte{nil})
err := lse.Replicate(context.Background(), types.LLSN(1), [][]byte{nil})
assert.Error(t, err)
return
}

// backup
var llsn types.LLSN
llsn := types.MinLLSN
for _, batchLen := range batchlet.LengthClasses {
dataList := TestNewBatchData(t, batchLen, 0)
llsnList := make([]types.LLSN, batchLen)
for i := 0; i < batchLen; i++ {
llsn++
llsnList[i] = llsn
}
err := lse.Replicate(context.Background(), llsnList, dataList)
err := lse.Replicate(context.Background(), llsn, dataList)
assert.NoError(t, err)
llsn += types.LLSN(len(dataList))
}

// Commit
Expand Down Expand Up @@ -875,7 +871,7 @@ func TestExecutor_ReplicateSeal(t *testing.T) {
go func() {
defer wg.Done()
for llsn := lastLLSN + 1; llsn < types.MaxLLSN; llsn++ {
err := lse.Replicate(context.Background(), []types.LLSN{llsn}, [][]byte{nil})
err := lse.Replicate(context.Background(), llsn, [][]byte{nil})
if err != nil {
break
}
Expand Down Expand Up @@ -935,7 +931,7 @@ func TestExecutor_ReplicateSeal(t *testing.T) {
go func() {
defer wg.Done()
for llsn := lastLLSN + 1; llsn < types.MaxLLSN; llsn++ {
err := lse.Replicate(context.Background(), []types.LLSN{llsn}, [][]byte{nil})
err := lse.Replicate(context.Background(), llsn, [][]byte{nil})
if err != nil {
break
}
Expand Down Expand Up @@ -3091,7 +3087,6 @@ func TestExecutorSyncInit(t *testing.T) {
rpt.UncommittedLLSNLength == 0 &&
rpt.HighWatermark == types.GLSN(dstLastLSN) &&
rpt.Version == types.Version(1)

}, time.Second, 10*time.Millisecond)
wg.Wait()

Expand Down Expand Up @@ -3500,7 +3495,6 @@ func TestExecutorSyncReplicate(t *testing.T) {
rpt.UncommittedLLSNLength == 0 &&
rpt.HighWatermark == types.GLSN(numLogs) &&
rpt.Version == types.Version(1)

}, time.Second, 10*time.Millisecond)
wg.Wait()

Expand Down
10 changes: 7 additions & 3 deletions internal/storagenode/logstream/replicate_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type replicateClient struct {

rpcClient snpb.ReplicatorClient
streamClient snpb.Replicator_ReplicateClient
//req *snpb.ReplicateRequest
// req *snpb.ReplicateRequest
}

// newReplicateClient creates a new client to replicate logs to backup replica.
Expand All @@ -52,7 +52,7 @@ func newReplicateClient(ctx context.Context, cfg replicateClientConfig) (*replic
replicateClientConfig: cfg,
queue: make(chan *replicateTask, cfg.queueCapacity),
runner: runner.New("replicate client", cfg.logger),
//rpcConn: rpcConn,
// rpcConn: rpcConn,
rpcClient: rpcClient,
streamClient: streamClient,
// NOTE: To reuse the request struct, we need to initialize the field LLSN.
Expand Down Expand Up @@ -135,11 +135,15 @@ func (rc *replicateClient) sendLoop(ctx context.Context) {
func (rc *replicateClient) sendLoopInternal(_ context.Context, rt *replicateTask, req *snpb.ReplicateRequest) error {
// Remove maxAppendSubBatchSize, since rt already has batched data.
startTime := time.Now()
// TODO(jun): Since (snpb.ReplicateRequest).LLSN is deprecated, it will disappear soon.
// NOTE: We need to copy the LLSN array, since the array is reused.
req.LLSN = req.LLSN[0:len(rt.llsnList)]
copy(req.LLSN, rt.llsnList)
//req.LLSN = rt.llsnList
// req.LLSN = rt.llsnList
req.Data = rt.dataList
if len(rt.llsnList) > 0 {
req.BeginLLSN = rt.llsnList[0]
}
rt.release()
err := rc.streamClient.Send(req)
inflight := rc.inflight.Add(-1)
Expand Down
2 changes: 1 addition & 1 deletion internal/storagenode/replication_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (rs *replicationServer) replicate(ctx context.Context, requestC <-chan *rep

lse.Metrics().ReplicateServerOperations.Add(1)

err = lse.Replicate(ctx, rst.req.LLSN, rst.req.Data)
err = lse.Replicate(ctx, rst.req.BeginLLSN, rst.req.Data)
if err != nil {
rst.release()
return
Expand Down
55 changes: 41 additions & 14 deletions pkg/rpc/codec.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,62 @@
package rpc

import (
gogoproto "github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/proto" //nolint:staticcheck
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/mem"
)

const name = "proto"

type codec struct{}
type gogoprotoMessage interface {
MarshalToSizedBuffer([]byte) (int, error)
Unmarshal([]byte) error
ProtoSize() int
}

var pool = mem.DefaultBufferPool()

type codec struct {
fallback encoding.CodecV2
}

var _ encoding.Codec = codec{}
var _ encoding.CodecV2 = &codec{}

func init() {
encoding.RegisterCodec(codec{})
encoding.RegisterCodecV2(&codec{
fallback: encoding.GetCodecV2(name),
})
}

func (codec) Marshal(v interface{}) ([]byte, error) {
if m, ok := v.(gogoproto.Marshaler); ok {
return m.Marshal()
func (c *codec) Marshal(v any) (mem.BufferSlice, error) {
if m, ok := v.(gogoprotoMessage); ok {
size := m.ProtoSize()
if mem.IsBelowBufferPoolingThreshold(size) {
buf := make([]byte, size)
if _, err := m.MarshalToSizedBuffer(buf[:size]); err != nil {
return nil, err
}
return mem.BufferSlice{mem.SliceBuffer(buf)}, nil
}

buf := pool.Get(size)
if _, err := m.MarshalToSizedBuffer((*buf)[:size]); err != nil {
pool.Put(buf)
return nil, err
}
return mem.BufferSlice{mem.NewBuffer(buf, pool)}, nil
}
return proto.Marshal(v.(proto.Message))
return c.fallback.Marshal(v)
}

func (codec) Unmarshal(data []byte, v interface{}) error {
if m, ok := v.(gogoproto.Unmarshaler); ok {
return m.Unmarshal(data)
func (c *codec) Unmarshal(data mem.BufferSlice, v any) error {
if m, ok := v.(gogoprotoMessage); ok {
buf := data.MaterializeToBuffer(pool)
defer buf.Free()
return m.Unmarshal(buf.ReadOnlyData())
}
return proto.Unmarshal(data, v.(proto.Message))
return c.fallback.Unmarshal(data, v)
}

func (codec) Name() string {
func (*codec) Name() string {
return name
}
82 changes: 82 additions & 0 deletions pkg/util/netutil/netutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,97 @@ package netutil

import (
"context"
"net"
"strconv"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/verrors"
)

func TestStoppableListener(t *testing.T) {
tcs := []struct {
name string
addr string
wantErr bool
}{
{
name: "ValidAddress",
addr: "127.0.0.1:0",
wantErr: false,
},
{
name: "InvalidAddress",
addr: "127.0.0.1:-1",
wantErr: true,
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
lis, err := NewStoppableListener(context.Background(), tc.addr)
if tc.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.NotNil(t, lis)

err = lis.Close()
require.NoError(t, err)
})
}
}

func TestStoppableListener_AcceptStopped(t *testing.T) {
const expireDuration = 10 * time.Millisecond

ctx, cancel := context.WithTimeout(context.Background(), expireDuration)
defer cancel()

lis, err := NewStoppableListener(ctx, "127.0.0.1:0")
require.NoError(t, err)
t.Cleanup(func() {
err := lis.Close()
require.NoError(t, err)
})

_, err = lis.Accept()
require.Equal(t, verrors.ErrStopped, err)
}

func TestStoppableListener_AcceptSucceed(t *testing.T) {
lis, err := NewStoppableListener(context.Background(), "127.0.0.1:0")
require.NoError(t, err)
t.Cleanup(func() {
err := lis.Close()
require.NoError(t, err)
})

addr := lis.Addr().String()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
conn, err := net.Dial("tcp", addr)
require.NoError(t, err)
err = conn.Close()
require.NoError(t, err)
}()

conn, err := lis.Accept()
require.NoError(t, err)
require.NotNil(t, conn)

err = conn.Close()
require.NoError(t, err)
}

func TestGetListenerAddr(t *testing.T) {
tests := []struct {
in string
Expand Down
Loading
Loading