diff --git a/internal/storagenode/client/log_client.go b/internal/storagenode/client/log_client.go index 730b7d5ec..8c2a3123a 100644 --- a/internal/storagenode/client/log_client.go +++ b/internal/storagenode/client/log_client.go @@ -18,7 +18,7 @@ type SubscribeResult struct { } var InvalidSubscribeResult = SubscribeResult{ - LogEntry: varlogpb.InvalidLogEntry(), + LogEntry: varlogpb.LogEntry{}, Error: errors.New("invalid subscribe result"), } diff --git a/internal/storagenode/logstream/executor.go b/internal/storagenode/logstream/executor.go index 4c57b1023..4174bbe81 100644 --- a/internal/storagenode/logstream/executor.go +++ b/internal/storagenode/logstream/executor.go @@ -160,7 +160,7 @@ func NewExecutor(opts ...ExecutorOption) (lse *Executor, err error) { return lse, err } -func (lse *Executor) Replicate(ctx context.Context, llsnList []types.LLSN, dataList [][]byte) error { +func (lse *Executor) Replicate(ctx context.Context, beginLLSN types.LLSN, dataList [][]byte) error { lse.inflight.Add(1) defer lse.inflight.Add(-1) @@ -178,7 +178,7 @@ func (lse *Executor) Replicate(ctx context.Context, llsnList []types.LLSN, dataL var preparationDuration time.Duration startTime := time.Now() dataBytes := int64(0) - batchSize := len(llsnList) + batchSize := len(dataList) defer func() { if lse.lsm == nil { return @@ -190,11 +190,11 @@ func (lse *Executor) Replicate(ctx context.Context, llsnList []types.LLSN, dataL lse.lsm.ReplicatePreparationMicro.Add(preparationDuration.Microseconds()) }() - oldLLSN, newLLSN := llsnList[0], llsnList[batchSize-1]+1 + oldLLSN, newLLSN := beginLLSN, beginLLSN+types.LLSN(batchSize) wb := lse.stg.NewWriteBatch() cwts := newListQueue() - for i := 0; i < len(llsnList); i++ { - _ = wb.Set(llsnList[i], dataList[i]) + for i := 0; i < batchSize; i++ { + _ = wb.Set(beginLLSN+types.LLSN(i), dataList[i]) dataBytes += int64(len(dataList[i])) cwts.PushFront(newCommitWaitTask(nil)) } @@ -329,7 +329,7 @@ func (lse *Executor) Unseal(_ context.Context, replicas []varlogpb.LogStreamRepl replica: replicas[i], rpcConn: rpcConn, queueCapacity: lse.replicateClientQueueCapacity, - //grpcDialOptions: lse.replicateClientGRPCOptions, + // grpcDialOptions: lse.replicateClientGRPCOptions, lse: lse, logger: lse.logger.Named("replicate client"), }) diff --git a/internal/storagenode/logstream/executor_test.go b/internal/storagenode/logstream/executor_test.go index 3384ce885..6eb3e6570 100644 --- a/internal/storagenode/logstream/executor_test.go +++ b/internal/storagenode/logstream/executor_test.go @@ -84,7 +84,7 @@ func TestExecutor_Closed(t *testing.T) { _, err := lse.Append(context.Background(), TestNewBatchData(t, 1, 0)) assert.ErrorIs(t, err, verrors.ErrClosed) - err = lse.Replicate(context.Background(), []types.LLSN{1}, TestNewBatchData(t, 1, 0)) + err = lse.Replicate(context.Background(), types.LLSN(1), TestNewBatchData(t, 1, 0)) assert.ErrorIs(t, err, verrors.ErrClosed) _, _, err = lse.Seal(context.Background(), types.MinGLSN) @@ -182,7 +182,7 @@ func TestExecutor_Sealing(t *testing.T) { assert.Equal(t, varlogpb.LogStreamStatusSealing, st) assert.Equal(t, executorStateSealing, lse.esm.load()) - err = lse.Replicate(context.Background(), []types.LLSN{1}, TestNewBatchData(t, 1, 0)) + err = lse.Replicate(context.Background(), types.LLSN(1), TestNewBatchData(t, 1, 0)) assert.ErrorIs(t, err, verrors.ErrSealed) }, }, @@ -286,7 +286,7 @@ func TestExecutor_Sealed(t *testing.T) { _, err = lse.Append(context.Background(), TestNewBatchData(t, 1, 0)) assert.ErrorIs(t, err, verrors.ErrSealed) - err = lse.Replicate(context.Background(), []types.LLSN{1}, TestNewBatchData(t, 1, 0)) + err = lse.Replicate(context.Background(), types.LLSN(1), TestNewBatchData(t, 1, 0)) assert.ErrorIs(t, err, verrors.ErrSealed) } @@ -616,22 +616,18 @@ func TestExecutor_Replicate(t *testing.T) { // primary if tc.isErr { - err := lse.Replicate(context.Background(), []types.LLSN{1}, [][]byte{nil}) + err := lse.Replicate(context.Background(), types.LLSN(1), [][]byte{nil}) assert.Error(t, err) return } // backup - var llsn types.LLSN + llsn := types.MinLLSN for _, batchLen := range batchlet.LengthClasses { dataList := TestNewBatchData(t, batchLen, 0) - llsnList := make([]types.LLSN, batchLen) - for i := 0; i < batchLen; i++ { - llsn++ - llsnList[i] = llsn - } - err := lse.Replicate(context.Background(), llsnList, dataList) + err := lse.Replicate(context.Background(), llsn, dataList) assert.NoError(t, err) + llsn += types.LLSN(len(dataList)) } // Commit @@ -875,7 +871,7 @@ func TestExecutor_ReplicateSeal(t *testing.T) { go func() { defer wg.Done() for llsn := lastLLSN + 1; llsn < types.MaxLLSN; llsn++ { - err := lse.Replicate(context.Background(), []types.LLSN{llsn}, [][]byte{nil}) + err := lse.Replicate(context.Background(), llsn, [][]byte{nil}) if err != nil { break } @@ -935,7 +931,7 @@ func TestExecutor_ReplicateSeal(t *testing.T) { go func() { defer wg.Done() for llsn := lastLLSN + 1; llsn < types.MaxLLSN; llsn++ { - err := lse.Replicate(context.Background(), []types.LLSN{llsn}, [][]byte{nil}) + err := lse.Replicate(context.Background(), llsn, [][]byte{nil}) if err != nil { break } @@ -3091,7 +3087,6 @@ func TestExecutorSyncInit(t *testing.T) { rpt.UncommittedLLSNLength == 0 && rpt.HighWatermark == types.GLSN(dstLastLSN) && rpt.Version == types.Version(1) - }, time.Second, 10*time.Millisecond) wg.Wait() @@ -3500,7 +3495,6 @@ func TestExecutorSyncReplicate(t *testing.T) { rpt.UncommittedLLSNLength == 0 && rpt.HighWatermark == types.GLSN(numLogs) && rpt.Version == types.Version(1) - }, time.Second, 10*time.Millisecond) wg.Wait() diff --git a/internal/storagenode/logstream/replicate_client.go b/internal/storagenode/logstream/replicate_client.go index c125b81bb..5209a0d58 100644 --- a/internal/storagenode/logstream/replicate_client.go +++ b/internal/storagenode/logstream/replicate_client.go @@ -26,7 +26,7 @@ type replicateClient struct { rpcClient snpb.ReplicatorClient streamClient snpb.Replicator_ReplicateClient - //req *snpb.ReplicateRequest + // req *snpb.ReplicateRequest } // newReplicateClient creates a new client to replicate logs to backup replica. @@ -52,7 +52,7 @@ func newReplicateClient(ctx context.Context, cfg replicateClientConfig) (*replic replicateClientConfig: cfg, queue: make(chan *replicateTask, cfg.queueCapacity), runner: runner.New("replicate client", cfg.logger), - //rpcConn: rpcConn, + // rpcConn: rpcConn, rpcClient: rpcClient, streamClient: streamClient, // NOTE: To reuse the request struct, we need to initialize the field LLSN. @@ -135,11 +135,15 @@ func (rc *replicateClient) sendLoop(ctx context.Context) { func (rc *replicateClient) sendLoopInternal(_ context.Context, rt *replicateTask, req *snpb.ReplicateRequest) error { // Remove maxAppendSubBatchSize, since rt already has batched data. startTime := time.Now() + // TODO(jun): Since (snpb.ReplicateRequest).LLSN is deprecated, it will disappear soon. // NOTE: We need to copy the LLSN array, since the array is reused. req.LLSN = req.LLSN[0:len(rt.llsnList)] copy(req.LLSN, rt.llsnList) - //req.LLSN = rt.llsnList + // req.LLSN = rt.llsnList req.Data = rt.dataList + if len(rt.llsnList) > 0 { + req.BeginLLSN = rt.llsnList[0] + } rt.release() err := rc.streamClient.Send(req) inflight := rc.inflight.Add(-1) diff --git a/internal/storagenode/replication_server.go b/internal/storagenode/replication_server.go index 12274f50a..07f957356 100644 --- a/internal/storagenode/replication_server.go +++ b/internal/storagenode/replication_server.go @@ -173,7 +173,7 @@ func (rs *replicationServer) replicate(ctx context.Context, requestC <-chan *rep lse.Metrics().ReplicateServerOperations.Add(1) - err = lse.Replicate(ctx, rst.req.LLSN, rst.req.Data) + err = lse.Replicate(ctx, rst.req.BeginLLSN, rst.req.Data) if err != nil { rst.release() return diff --git a/pkg/rpc/codec.go b/pkg/rpc/codec.go index cc215968e..38da7c50a 100644 --- a/pkg/rpc/codec.go +++ b/pkg/rpc/codec.go @@ -1,35 +1,62 @@ package rpc import ( - gogoproto "github.com/gogo/protobuf/proto" - "github.com/golang/protobuf/proto" //nolint:staticcheck "google.golang.org/grpc/encoding" + "google.golang.org/grpc/mem" ) const name = "proto" -type codec struct{} +type gogoprotoMessage interface { + MarshalToSizedBuffer([]byte) (int, error) + Unmarshal([]byte) error + ProtoSize() int +} + +var pool = mem.DefaultBufferPool() + +type codec struct { + fallback encoding.CodecV2 +} -var _ encoding.Codec = codec{} +var _ encoding.CodecV2 = &codec{} func init() { - encoding.RegisterCodec(codec{}) + encoding.RegisterCodecV2(&codec{ + fallback: encoding.GetCodecV2(name), + }) } -func (codec) Marshal(v interface{}) ([]byte, error) { - if m, ok := v.(gogoproto.Marshaler); ok { - return m.Marshal() +func (c *codec) Marshal(v any) (mem.BufferSlice, error) { + if m, ok := v.(gogoprotoMessage); ok { + size := m.ProtoSize() + if mem.IsBelowBufferPoolingThreshold(size) { + buf := make([]byte, size) + if _, err := m.MarshalToSizedBuffer(buf[:size]); err != nil { + return nil, err + } + return mem.BufferSlice{mem.SliceBuffer(buf)}, nil + } + + buf := pool.Get(size) + if _, err := m.MarshalToSizedBuffer((*buf)[:size]); err != nil { + pool.Put(buf) + return nil, err + } + return mem.BufferSlice{mem.NewBuffer(buf, pool)}, nil } - return proto.Marshal(v.(proto.Message)) + return c.fallback.Marshal(v) } -func (codec) Unmarshal(data []byte, v interface{}) error { - if m, ok := v.(gogoproto.Unmarshaler); ok { - return m.Unmarshal(data) +func (c *codec) Unmarshal(data mem.BufferSlice, v any) error { + if m, ok := v.(gogoprotoMessage); ok { + buf := data.MaterializeToBuffer(pool) + defer buf.Free() + return m.Unmarshal(buf.ReadOnlyData()) } - return proto.Unmarshal(data, v.(proto.Message)) + return c.fallback.Unmarshal(data, v) } -func (codec) Name() string { +func (*codec) Name() string { return name } diff --git a/pkg/util/netutil/netutil_test.go b/pkg/util/netutil/netutil_test.go index b06722f83..0695ea3c1 100644 --- a/pkg/util/netutil/netutil_test.go +++ b/pkg/util/netutil/netutil_test.go @@ -2,15 +2,97 @@ package netutil import ( "context" + "net" "strconv" + "sync" "testing" + "time" "github.com/stretchr/testify/require" "go.uber.org/goleak" "github.com/kakao/varlog/pkg/types" + "github.com/kakao/varlog/pkg/verrors" ) +func TestStoppableListener(t *testing.T) { + tcs := []struct { + name string + addr string + wantErr bool + }{ + { + name: "ValidAddress", + addr: "127.0.0.1:0", + wantErr: false, + }, + { + name: "InvalidAddress", + addr: "127.0.0.1:-1", + wantErr: true, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + lis, err := NewStoppableListener(context.Background(), tc.addr) + if tc.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.NotNil(t, lis) + + err = lis.Close() + require.NoError(t, err) + }) + } +} + +func TestStoppableListener_AcceptStopped(t *testing.T) { + const expireDuration = 10 * time.Millisecond + + ctx, cancel := context.WithTimeout(context.Background(), expireDuration) + defer cancel() + + lis, err := NewStoppableListener(ctx, "127.0.0.1:0") + require.NoError(t, err) + t.Cleanup(func() { + err := lis.Close() + require.NoError(t, err) + }) + + _, err = lis.Accept() + require.Equal(t, verrors.ErrStopped, err) +} + +func TestStoppableListener_AcceptSucceed(t *testing.T) { + lis, err := NewStoppableListener(context.Background(), "127.0.0.1:0") + require.NoError(t, err) + t.Cleanup(func() { + err := lis.Close() + require.NoError(t, err) + }) + + addr := lis.Addr().String() + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + conn, err := net.Dial("tcp", addr) + require.NoError(t, err) + err = conn.Close() + require.NoError(t, err) + }() + + conn, err := lis.Accept() + require.NoError(t, err) + require.NotNil(t, conn) + + err = conn.Close() + require.NoError(t, err) +} + func TestGetListenerAddr(t *testing.T) { tests := []struct { in string diff --git a/pkg/util/testutil/testutil.go b/pkg/util/testutil/testutil.go index dbd3521f2..d245d513e 100644 --- a/pkg/util/testutil/testutil.go +++ b/pkg/util/testutil/testutil.go @@ -6,8 +6,6 @@ import ( "strings" "time" - "github.com/pkg/errors" - "github.com/kakao/varlog/internal/vtesting" ) @@ -36,62 +34,6 @@ func CompareWaitN(factor int64, cmp func() bool) bool { return CompareWait(cmp, vtesting.TimeoutUnitTimesFactor(factor)) } -func CompareWait100(cmp func() bool) bool { - return CompareWaitN(100, cmp) -} - -func CompareWait10(cmp func() bool) bool { - return CompareWaitN(10, cmp) -} - -func CompareWait1(cmp func() bool) bool { - return CompareWaitN(1, cmp) -} - -func CompareWaitErrorWithRetryInterval(cmp func() (bool, error), timeout time.Duration, retryInterval time.Duration) error { - after := time.NewTimer(timeout) - defer after.Stop() - - numTries := 0 - for { - select { - case <-after.C: - return errors.Errorf("compare wait timeout (%s,tries=%d)", timeout.String(), numTries) - default: - numTries++ - ok, err := cmp() - if err != nil { - return err - } - - if ok { - return nil - } - time.Sleep(retryInterval) - } - } -} - -func CompareWaitError(cmp func() (bool, error), timeout time.Duration) error { - return CompareWaitErrorWithRetryInterval(cmp, timeout, time.Millisecond) -} - -func CompareWaitErrorWithRetryIntervalN(factor int64, retryInterval time.Duration, cmp func() (bool, error)) error { - if factor < 1 { - factor = 1 - } - - return CompareWaitErrorWithRetryInterval(cmp, vtesting.TimeoutUnitTimesFactor(factor), retryInterval) -} - -func CompareWaitErrorN(factor int64, cmp func() (bool, error)) error { - if factor < 1 { - factor = 1 - } - - return CompareWaitError(cmp, vtesting.TimeoutUnitTimesFactor(factor)) -} - func GetFunctionName(i interface{}) string { a := runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name() s := strings.Split(a, "/") diff --git a/pkg/util/testutil/testutil_test.go b/pkg/util/testutil/testutil_test.go new file mode 100644 index 000000000..392389bd5 --- /dev/null +++ b/pkg/util/testutil/testutil_test.go @@ -0,0 +1,62 @@ +package testutil_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/kakao/varlog/pkg/util/testutil" +) + +func TestCompareWait(t *testing.T) { + tcs := []struct { + cmp func() bool + want bool + }{ + {cmp: func() bool { return true }, want: true}, + {cmp: func() bool { return false }, want: false}, + } + + for _, tc := range tcs { + got := testutil.CompareWait(tc.cmp, time.Second) + require.Equal(t, tc.want, got) + } +} + +func TestCompareWaitN_Factor0(t *testing.T) { + ts := time.Now() + testutil.CompareWaitN(0, func() bool { + return false + }) + factor0 := time.Since(ts) + + ts = time.Now() + testutil.CompareWaitN(1, func() bool { + return false + }) + factor1 := time.Since(ts) + + require.InEpsilon(t, 1.0, factor1/factor0, float64((10 * time.Millisecond).Nanoseconds())) +} + +func TestCompareWaitN_Factor2(t *testing.T) { + ts := time.Now() + testutil.CompareWaitN(1, func() bool { + return false + }) + factor1 := time.Since(ts) + + ts = time.Now() + testutil.CompareWaitN(2, func() bool { + return false + }) + factor2 := time.Since(ts) + + require.InEpsilon(t, 2.0, factor2/factor1, float64((10 * time.Millisecond).Nanoseconds())) +} + +func TestGetFunctionName(t *testing.T) { + got := testutil.GetFunctionName(TestGetFunctionName) + require.Equal(t, "testutil_test.TestGetFunctionName", got) +} diff --git a/pkg/varlog/subscribe.go b/pkg/varlog/subscribe.go index 533981018..481ef784e 100644 --- a/pkg/varlog/subscribe.go +++ b/pkg/varlog/subscribe.go @@ -518,7 +518,7 @@ func (p *dispatcher) dispatch(_ context.Context) { sentErr = sentErr || res.Error != nil } if !sentErr { - p.onNextFunc(varlogpb.InvalidLogEntry(), io.EOF) + p.onNextFunc(varlogpb.LogEntry{}, io.EOF) } } @@ -532,7 +532,7 @@ type invalidSubscriber struct { } func (s invalidSubscriber) Next() (varlogpb.LogEntry, error) { - return varlogpb.InvalidLogEntry(), s.err + return varlogpb.LogEntry{}, s.err } func (s invalidSubscriber) Close() error { diff --git a/pkg/varlogtest/admin.go b/pkg/varlogtest/admin.go index 04d55d5ff..33bdb7883 100644 --- a/pkg/varlogtest/admin.go +++ b/pkg/varlogtest/admin.go @@ -82,6 +82,7 @@ func (c *testAdmin) ListStorageNodes(ctx context.Context, opts ...varlog.AdminCa return ret, nil } + func (c *testAdmin) GetStorageNodes(ctx context.Context, opts ...varlog.AdminCallOption) (map[types.StorageNodeID]admpb.StorageNodeMetadata, error) { snms, err := c.ListStorageNodes(ctx) if err != nil { @@ -182,8 +183,7 @@ func (c *testAdmin) AddTopic(ctx context.Context, opts ...varlog.AdminCallOption c.vt.topics[topicID] = topicDesc c.vt.trimGLSNs[topicID] = types.InvalidGLSN - invalidLogEntry := varlogpb.InvalidLogEntry() - c.vt.globalLogEntries[topicID] = []*varlogpb.LogEntry{&invalidLogEntry} + c.vt.globalLogEntries[topicID] = []*varlogpb.LogEntry{{}} return proto.Clone(&topicDesc).(*varlogpb.TopicDescriptor), nil } @@ -294,8 +294,7 @@ func (c *testAdmin) AddLogStream(_ context.Context, topicID types.TopicID, logSt c.vt.logStreams[logStreamID] = lsd - invalidLogEntry := varlogpb.InvalidLogEntry() - c.vt.localLogEntries[logStreamID] = []*varlogpb.LogEntry{&invalidLogEntry} + c.vt.localLogEntries[logStreamID] = []*varlogpb.LogEntry{{}} topicDesc.LogStreams = append(topicDesc.LogStreams, logStreamID) c.vt.topics[topicID] = topicDesc diff --git a/pkg/varlogtest/log.go b/pkg/varlogtest/log.go index f7cc50b0f..2e6810c2f 100644 --- a/pkg/varlogtest/log.go +++ b/pkg/varlogtest/log.go @@ -215,7 +215,7 @@ func (c *testLog) Subscribe(ctx context.Context, topicID types.TopicID, begin ty for _, logEntry := range copiedLogEntries { onNextFunc(logEntry, nil) } - onNextFunc(varlogpb.InvalidLogEntry(), io.EOF) + onNextFunc(varlogpb.LogEntry{}, io.EOF) }() return func() { @@ -319,7 +319,6 @@ func (c *testLog) PeekLogStream(ctx context.Context, tpid types.TopicID, lsid ty GLSN: tail.GLSN, } return first, last, nil - } func (c *testLog) AppendableLogStreams(tpid types.TopicID) map[types.LogStreamID]struct{} { @@ -463,7 +462,7 @@ func newErrSubscriber(err error) *errSubscriber { } func (s errSubscriber) Next() (varlogpb.LogEntry, error) { - return varlogpb.InvalidLogEntry(), s.err + return varlogpb.LogEntry{}, s.err } func (s errSubscriber) Close() error { @@ -496,7 +495,7 @@ func (s *subscriberImpl) Next() (varlogpb.LogEntry, error) { logEntry, err := s.next() if err != nil { s.setErr(err) - return varlogpb.InvalidLogEntry(), err + return varlogpb.LogEntry{}, err } if s.cursor == s.end { s.setErr(io.EOF) diff --git a/proto/snpb/metadata.go b/proto/snpb/metadata.go index 3e89df940..cd8710708 100644 --- a/proto/snpb/metadata.go +++ b/proto/snpb/metadata.go @@ -5,6 +5,9 @@ import ( "github.com/kakao/varlog/proto/varlogpb" ) +// ToStorageNodeDescriptor converts a StorageNodeMetadataDescriptor to a +// varlogpb.StorageNodeDescriptor. It returns nil if the +// StorageNodeMetadataDescriptor is nil. func (snmd *StorageNodeMetadataDescriptor) ToStorageNodeDescriptor() *varlogpb.StorageNodeDescriptor { if snmd == nil { return nil @@ -19,6 +22,9 @@ func (snmd *StorageNodeMetadataDescriptor) ToStorageNodeDescriptor() *varlogpb.S return snd } +// GetLogStream retrieves a LogStreamReplicaMetadataDescriptor by its +// LogStreamID. It returns the LogStreamReplicaMetadataDescriptor and true if +// found, otherwise an empty descriptor and false. func (snmd *StorageNodeMetadataDescriptor) GetLogStream(logStreamID types.LogStreamID) (LogStreamReplicaMetadataDescriptor, bool) { logStreams := snmd.GetLogStreamReplicas() for i := range logStreams { @@ -29,6 +35,10 @@ func (snmd *StorageNodeMetadataDescriptor) GetLogStream(logStreamID types.LogStr return LogStreamReplicaMetadataDescriptor{}, false } +// Head returns the varlogpb.LogEntryMeta corresponding to the local low +// watermark of the LogStreamReplicaMetadataDescriptor. The "head" represents +// the earliest log entry in the log stream replica. It returns an empty +// varlogpb.LogEntryMeta if the LogStreamReplicaMetadataDescriptor is nil. func (lsrmd *LogStreamReplicaMetadataDescriptor) Head() varlogpb.LogEntryMeta { if lsrmd == nil { return varlogpb.LogEntryMeta{} @@ -41,6 +51,10 @@ func (lsrmd *LogStreamReplicaMetadataDescriptor) Head() varlogpb.LogEntryMeta { } } +// Tail returns the varlogpb.LogEntryMeta corresponding to the local high +// watermark of the LogStreamReplicaMetadataDescriptor. The "tail" represents +// the latest log entry in the log stream replica. It returns an empty +// varlogpb.LogEntryMeta if the LogStreamReplicaMetadataDescriptor is nil. func (lsrmd *LogStreamReplicaMetadataDescriptor) Tail() varlogpb.LogEntryMeta { if lsrmd == nil { return varlogpb.LogEntryMeta{} diff --git a/proto/snpb/metadata_test.go b/proto/snpb/metadata_test.go new file mode 100644 index 000000000..293af33d6 --- /dev/null +++ b/proto/snpb/metadata_test.go @@ -0,0 +1,191 @@ +package snpb_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/kakao/varlog/pkg/types" + "github.com/kakao/varlog/proto/snpb" + "github.com/kakao/varlog/proto/varlogpb" +) + +func TestStorageNodeMetadataDescriptor_ToStorageNodeDescriptor(t *testing.T) { + tcs := []struct { + snmd *snpb.StorageNodeMetadataDescriptor + want *varlogpb.StorageNodeDescriptor + name string + }{ + { + name: "Nil", + snmd: nil, + want: nil, + }, + { + name: "NonNil", + snmd: &snpb.StorageNodeMetadataDescriptor{ + StorageNode: varlogpb.StorageNode{ + StorageNodeID: types.MinStorageNodeID, + Address: "node1", + }, + Storages: []varlogpb.StorageDescriptor{ + {Path: "/path1"}, + {Path: "/path2"}, + }, + }, + want: &varlogpb.StorageNodeDescriptor{ + StorageNode: varlogpb.StorageNode{ + StorageNodeID: types.MinStorageNodeID, + Address: "node1", + }, + Paths: []string{"/path1", "/path2"}, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + got := tc.snmd.ToStorageNodeDescriptor() + require.Equal(t, tc.want, got) + }) + } +} + +func TestStorageNodeMetadataDescriptor_GetLogStream(t *testing.T) { + tcs := []struct { + name string + logStreamID types.LogStreamID + want snpb.LogStreamReplicaMetadataDescriptor + wantFound bool + }{ + { + name: "Found", + logStreamID: types.LogStreamID(1), + want: snpb.LogStreamReplicaMetadataDescriptor{ + LogStreamReplica: varlogpb.LogStreamReplica{ + TopicLogStream: varlogpb.TopicLogStream{ + LogStreamID: types.LogStreamID(1), + }, + }, + }, + wantFound: true, + }, + { + name: "NotFound", + logStreamID: types.LogStreamID(3), + want: snpb.LogStreamReplicaMetadataDescriptor{}, + wantFound: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + snmd := &snpb.StorageNodeMetadataDescriptor{ + LogStreamReplicas: []snpb.LogStreamReplicaMetadataDescriptor{ + { + LogStreamReplica: varlogpb.LogStreamReplica{ + TopicLogStream: varlogpb.TopicLogStream{ + LogStreamID: types.LogStreamID(1), + }, + }, + }, + { + LogStreamReplica: varlogpb.LogStreamReplica{ + TopicLogStream: varlogpb.TopicLogStream{ + LogStreamID: types.LogStreamID(2), + }, + }, + }, + }, + } + + got, found := snmd.GetLogStream(tc.logStreamID) + require.Equal(t, tc.wantFound, found) + require.Equal(t, tc.want, got) + }) + } +} + +func TestLogStreamReplicaMetadataDescriptor_Head(t *testing.T) { + tcs := []struct { + lsrmd *snpb.LogStreamReplicaMetadataDescriptor + name string + want varlogpb.LogEntryMeta + }{ + { + name: "Nil", + lsrmd: nil, + want: varlogpb.LogEntryMeta{}, + }, + { + name: "NonNil", + lsrmd: &snpb.LogStreamReplicaMetadataDescriptor{ + LogStreamReplica: varlogpb.LogStreamReplica{ + TopicLogStream: varlogpb.TopicLogStream{ + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(2), + }, + }, + LocalLowWatermark: varlogpb.LogSequenceNumber{ + LLSN: types.LLSN(3), + GLSN: types.GLSN(4), + }, + }, + want: varlogpb.LogEntryMeta{ + TopicID: 1, + LogStreamID: 2, + LLSN: 3, + GLSN: 4, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + got := tc.lsrmd.Head() + require.Equal(t, tc.want, got) + }) + } +} + +func TestLogStreamReplicaMetadataDescriptor_Tail(t *testing.T) { + tcs := []struct { + lsrmd *snpb.LogStreamReplicaMetadataDescriptor + name string + want varlogpb.LogEntryMeta + }{ + { + name: "Nil", + lsrmd: nil, + want: varlogpb.LogEntryMeta{}, + }, + { + name: "NonNil", + lsrmd: &snpb.LogStreamReplicaMetadataDescriptor{ + LogStreamReplica: varlogpb.LogStreamReplica{ + TopicLogStream: varlogpb.TopicLogStream{ + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(2), + }, + }, + LocalHighWatermark: varlogpb.LogSequenceNumber{ + LLSN: types.LLSN(5), + GLSN: types.GLSN(6), + }, + }, + want: varlogpb.LogEntryMeta{ + TopicID: 1, + LogStreamID: 2, + LLSN: 5, + GLSN: 6, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + got := tc.lsrmd.Tail() + require.Equal(t, tc.want, got) + }) + } +} diff --git a/proto/snpb/replicator.go b/proto/snpb/replicator.go index 6d9800cf8..f146355f0 100644 --- a/proto/snpb/replicator.go +++ b/proto/snpb/replicator.go @@ -6,26 +6,42 @@ import ( "github.com/kakao/varlog/pkg/types" ) +// InvalidSyncPosition returns a SyncPosition with both LLSN and GLSN set to +// types.InvalidGLSN. func InvalidSyncPosition() SyncPosition { return SyncPosition{LLSN: types.InvalidLLSN, GLSN: types.InvalidGLSN} } +// Invalid checks if the SyncPosition is invalid. A SyncPosition is considered +// invalid if either LLSN or GLSN is invalid. func (sp SyncPosition) Invalid() bool { return sp.LLSN.Invalid() || sp.GLSN.Invalid() } +// LessThan checks if the current SyncPosition "sp" is less than another +// SyncPosition "other". It returns true if both LLSN and GLSN of the current +// SyncPosition are less than those of the other SyncPosition. func (sp SyncPosition) LessThan(other SyncPosition) bool { return sp.LLSN < other.LLSN && sp.GLSN < other.GLSN } +// InvalidSyncRange returns a SyncRange with both FirstLLSN and LastLLSN set to +// types.InvalidLLSN. func InvalidSyncRange() SyncRange { return SyncRange{FirstLLSN: types.InvalidLLSN, LastLLSN: types.InvalidLLSN} } +// Invalid determines if the SyncRange is invalid. A SyncRange is considered +// invalid if either FirstLLSN or LastLLSN is invalid, or if FirstLLSN is +// greater than LastLLSN. func (sr SyncRange) Invalid() bool { return sr.FirstLLSN.Invalid() || sr.LastLLSN.Invalid() || sr.FirstLLSN > sr.LastLLSN } +// Validate checks the validity of the SyncRange. It returns an error if +// FirstLLSN is greater than LastLLSN, or if FirstLLSN is invalid while +// LastLLSN is valid. If both FirstLLSN and LastLLSN are invalid, it returns +// nil, indicating that the entire log range is considered trimmed. func (sr SyncRange) Validate() error { if sr.FirstLLSN > sr.LastLLSN { return fmt.Errorf("invalid sync range: first %d, last %d", sr.FirstLLSN, sr.LastLLSN) diff --git a/proto/snpb/replicator.pb.go b/proto/snpb/replicator.pb.go index 139a31eb6..709bfb0f7 100644 --- a/proto/snpb/replicator.pb.go +++ b/proto/snpb/replicator.pb.go @@ -70,8 +70,12 @@ func (SyncState) EnumDescriptor() ([]byte, []int) { type ReplicateRequest struct { TopicID github_com_kakao_varlog_pkg_types.TopicID `protobuf:"varint,1,opt,name=topic_id,json=topicId,proto3,casttype=github.com/kakao/varlog/pkg/types.TopicID" json:"topic_id,omitempty"` LogStreamID github_com_kakao_varlog_pkg_types.LogStreamID `protobuf:"varint,2,opt,name=log_stream_id,json=logStreamId,proto3,casttype=github.com/kakao/varlog/pkg/types.LogStreamID" json:"log_stream_id,omitempty"` - LLSN []github_com_kakao_varlog_pkg_types.LLSN `protobuf:"varint,3,rep,packed,name=llsn,proto3,casttype=github.com/kakao/varlog/pkg/types.LLSN" json:"llsn,omitempty"` - Data [][]byte `protobuf:"bytes,4,rep,name=data,proto3" json:"data,omitempty"` + // LLSN is a list of local log sequence numbers where the log entries are + // replicated. The primary replica fills this field for backward + // compatibility, but it will disappear soon. + LLSN []github_com_kakao_varlog_pkg_types.LLSN `protobuf:"varint,3,rep,packed,name=llsn,proto3,casttype=github.com/kakao/varlog/pkg/types.LLSN" json:"llsn,omitempty"` // Deprecated: Do not use. + Data [][]byte `protobuf:"bytes,4,rep,name=data,proto3" json:"data,omitempty"` + BeginLLSN github_com_kakao_varlog_pkg_types.LLSN `protobuf:"varint,5,opt,name=begin_llsn,json=beginLlsn,proto3,casttype=github.com/kakao/varlog/pkg/types.LLSN" json:"begin_llsn,omitempty"` } func (m *ReplicateRequest) Reset() { *m = ReplicateRequest{} } @@ -121,6 +125,7 @@ func (m *ReplicateRequest) GetLogStreamID() github_com_kakao_varlog_pkg_types.Lo return 0 } +// Deprecated: Do not use. func (m *ReplicateRequest) GetLLSN() []github_com_kakao_varlog_pkg_types.LLSN { if m != nil { return m.LLSN @@ -135,6 +140,13 @@ func (m *ReplicateRequest) GetData() [][]byte { return nil } +func (m *ReplicateRequest) GetBeginLLSN() github_com_kakao_varlog_pkg_types.LLSN { + if m != nil { + return m.BeginLLSN + } + return 0 +} + type ReplicateResponse struct { } @@ -664,71 +676,73 @@ func init() { func init() { proto.RegisterFile("proto/snpb/replicator.proto", fileDescriptor_85705cb817486b63) } var fileDescriptor_85705cb817486b63 = []byte{ - // 1012 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe4, 0x56, 0xcf, 0x6f, 0xe3, 0x44, - 0x14, 0x8e, 0x13, 0x67, 0x9b, 0xbc, 0xb4, 0x25, 0x9d, 0xb2, 0x34, 0x04, 0x6a, 0x67, 0x83, 0x84, - 0xc2, 0x8f, 0x8d, 0xa5, 0xae, 0x58, 0x96, 0x6a, 0xa5, 0x85, 0x96, 0xb4, 0x44, 0x0a, 0x6d, 0x35, - 0xae, 0x10, 0x82, 0x43, 0x71, 0x9d, 0x59, 0x63, 0xd5, 0xf1, 0x18, 0x7b, 0x82, 0xe8, 0x7f, 0x80, - 0x7a, 0x42, 0xdc, 0x2b, 0x2a, 0x51, 0x21, 0x8e, 0xdc, 0x58, 0xfe, 0x83, 0x1e, 0xf7, 0xc8, 0x29, - 0x12, 0xe9, 0x85, 0x3f, 0x01, 0xed, 0x09, 0xcd, 0x78, 0xec, 0xa4, 0xc9, 0x96, 0xb6, 0x82, 0x1b, - 0x37, 0x7b, 0xde, 0xf7, 0xbe, 0x79, 0xf3, 0xbe, 0xef, 0x8d, 0x0d, 0xaf, 0x04, 0x21, 0x65, 0xd4, - 0x88, 0xfc, 0x60, 0xdf, 0x08, 0x49, 0xe0, 0xb9, 0xb6, 0xc5, 0x68, 0xd8, 0x14, 0xab, 0xa8, 0xf4, - 0xb5, 0x15, 0x7a, 0xd4, 0x69, 0xf2, 0x68, 0x55, 0x77, 0x28, 0x75, 0x3c, 0x62, 0x88, 0xd0, 0x7e, - 0xff, 0xb1, 0xc1, 0xdc, 0x1e, 0x89, 0x98, 0xd5, 0x0b, 0x62, 0x74, 0xf5, 0xae, 0xe3, 0xb2, 0x2f, - 0xfb, 0xfb, 0x4d, 0x9b, 0xf6, 0x0c, 0x87, 0x3a, 0x74, 0x84, 0xe4, 0x6f, 0xf1, 0x3e, 0xfc, 0x49, - 0xc2, 0x97, 0x62, 0xf2, 0x60, 0xdf, 0xe8, 0x11, 0x66, 0x75, 0x2d, 0x66, 0xc5, 0x81, 0xfa, 0xaf, - 0x59, 0x28, 0x63, 0x59, 0x0a, 0xc1, 0xe4, 0xab, 0x3e, 0x89, 0x18, 0x32, 0xa1, 0xc0, 0x68, 0xe0, - 0xda, 0x7b, 0x6e, 0xb7, 0xa2, 0xd4, 0x94, 0x46, 0x7e, 0xed, 0xc1, 0x70, 0xa0, 0xcf, 0xec, 0xf2, - 0xb5, 0xf6, 0x87, 0xcf, 0x06, 0xfa, 0x1b, 0x63, 0xbb, 0x1f, 0x58, 0x07, 0x16, 0x35, 0x62, 0x7e, - 0x23, 0x38, 0x70, 0x0c, 0x76, 0x18, 0x90, 0xa8, 0x29, 0xc1, 0x78, 0x46, 0x30, 0xb5, 0xbb, 0xa8, - 0x0b, 0x73, 0x1e, 0x75, 0xf6, 0x22, 0x16, 0x12, 0xab, 0xc7, 0x99, 0xb3, 0x82, 0xf9, 0xfd, 0xe1, - 0x40, 0x2f, 0x75, 0xa8, 0x63, 0x8a, 0x75, 0xc1, 0x7e, 0xf7, 0x6a, 0xf6, 0xb1, 0x04, 0x5c, 0xf2, - 0xd2, 0x97, 0x2e, 0xda, 0x00, 0xd5, 0xf3, 0x22, 0xbf, 0x92, 0xab, 0xe5, 0x1a, 0xea, 0xda, 0xca, - 0x70, 0xa0, 0xab, 0x9d, 0x8e, 0xb9, 0xf5, 0x6c, 0xa0, 0xbf, 0x7e, 0x0d, 0xd6, 0x8e, 0xb9, 0x85, - 0x45, 0x3e, 0x42, 0xa0, 0xf2, 0x2e, 0x55, 0xd4, 0x5a, 0xae, 0x31, 0x8b, 0xc5, 0xf3, 0xea, 0xec, - 0x93, 0x13, 0x5d, 0xf9, 0xf3, 0x44, 0x57, 0xfe, 0x3a, 0xd1, 0x95, 0xfa, 0x22, 0x2c, 0x8c, 0x35, - 0x2e, 0x0a, 0xa8, 0x1f, 0x91, 0xfa, 0xa9, 0x02, 0xb3, 0xe6, 0xa1, 0x6f, 0xef, 0xd0, 0xc8, 0x65, - 0x2e, 0xf5, 0xd3, 0x7a, 0x78, 0x1b, 0xff, 0x4d, 0x3d, 0x1b, 0xa0, 0x3a, 0x9c, 0x27, 0x3b, 0xe2, - 0xd9, 0xbc, 0x36, 0xcf, 0xa6, 0xe0, 0xe1, 0xf9, 0xab, 0x2a, 0xaf, 0xbf, 0xfe, 0x44, 0x81, 0x22, - 0x2f, 0x13, 0x5b, 0xbe, 0x43, 0xd0, 0x27, 0x00, 0x8f, 0xdd, 0x30, 0x62, 0x7b, 0x63, 0x95, 0xbe, - 0x3b, 0x1c, 0xe8, 0xc5, 0x0d, 0xbe, 0x7a, 0xc3, 0x72, 0x8b, 0x82, 0xaa, 0xc3, 0x6b, 0x36, 0xa1, - 0xe8, 0x59, 0x09, 0x6d, 0x5c, 0xf8, 0xfd, 0xe1, 0x40, 0x2f, 0x74, 0xac, 0x1b, 0xb3, 0x16, 0x38, - 0x11, 0x27, 0xad, 0xff, 0x90, 0x83, 0x17, 0x78, 0xe9, 0x6d, 0xdf, 0x65, 0x89, 0x5f, 0x3f, 0x07, - 0xb0, 0xbd, 0x7e, 0xc4, 0x48, 0x38, 0x72, 0xec, 0x43, 0x7e, 0x80, 0xf5, 0x78, 0x55, 0xb8, 0xea, - 0xad, 0xab, 0xb7, 0x4a, 0xe1, 0xb8, 0x28, 0xf9, 0xda, 0x5d, 0xf4, 0x08, 0x6e, 0x45, 0xb4, 0x1f, - 0xda, 0x44, 0x1c, 0xa1, 0xb4, 0x72, 0xa7, 0x29, 0x07, 0x35, 0x19, 0xa9, 0x91, 0x19, 0xa5, 0x1f, - 0xd6, 0xd4, 0xb3, 0x81, 0x9e, 0xc1, 0x32, 0x0d, 0xb5, 0xa1, 0xd4, 0x25, 0x11, 0x73, 0x7d, 0x8b, - 0x3b, 0xa2, 0x92, 0xbb, 0x19, 0xcb, 0x78, 0x2e, 0x5a, 0x81, 0x7c, 0xc8, 0x25, 0xab, 0xa8, 0x82, - 0xe4, 0xa5, 0xe6, 0xd8, 0x9d, 0xd1, 0x4c, 0x05, 0x95, 0x99, 0x31, 0x14, 0x51, 0x58, 0x14, 0x2a, - 0xd8, 0xb4, 0xd7, 0x73, 0x19, 0x23, 0xdd, 0x58, 0x8f, 0xbc, 0xd0, 0xe3, 0xd1, 0x70, 0xa0, 0x2f, - 0x70, 0x3d, 0xd6, 0x93, 0xe8, 0x0d, 0x85, 0x59, 0xf0, 0x2e, 0x24, 0x73, 0x85, 0x36, 0xa0, 0x3c, - 0x12, 0x28, 0x9e, 0x8b, 0x51, 0xe1, 0xca, 0xb5, 0x0b, 0xaf, 0xff, 0xa1, 0x00, 0xf0, 0x90, 0xc9, - 0x2c, 0xd6, 0x8f, 0xd0, 0xdb, 0x90, 0x8f, 0x98, 0xc5, 0x62, 0x8a, 0xf9, 0xe7, 0x50, 0x70, 0x1c, - 0xc1, 0x31, 0x08, 0xbd, 0x03, 0x79, 0x61, 0x44, 0x29, 0xda, 0xcb, 0x53, 0xe8, 0x64, 0x42, 0x93, - 0x3d, 0x05, 0x1a, 0xdd, 0x03, 0x95, 0x1f, 0x48, 0x8a, 0x74, 0x65, 0x96, 0x00, 0xa3, 0xf7, 0x60, - 0xc6, 0xee, 0x87, 0x21, 0xf1, 0x99, 0xd4, 0xe5, 0xca, 0xbc, 0x04, 0x5f, 0xff, 0x5e, 0x81, 0x92, - 0x88, 0x5b, 0x87, 0x1e, 0xb5, 0xba, 0xa8, 0x05, 0xf3, 0xb1, 0x4e, 0x7b, 0x36, 0xf5, 0x19, 0xf9, - 0x86, 0xc9, 0x86, 0x69, 0x53, 0x76, 0x89, 0x7b, 0xbe, 0x1e, 0xa3, 0xf0, 0x9c, 0x3d, 0xfe, 0x8a, - 0xee, 0x43, 0x91, 0xdf, 0xb5, 0xc4, 0x67, 0xe1, 0xe1, 0x64, 0x07, 0xc6, 0x0d, 0xd7, 0xe2, 0x00, - 0x5c, 0xf0, 0xe4, 0xd3, 0xaa, 0x7a, 0xc6, 0x6f, 0x87, 0xdf, 0xb2, 0xf0, 0xa2, 0xd0, 0x64, 0xf2, - 0xbb, 0xf0, 0xbf, 0x99, 0xb3, 0x07, 0x30, 0x13, 0xc4, 0x8a, 0x48, 0x45, 0x2b, 0xd3, 0x8a, 0xc6, - 0xf1, 0x44, 0x50, 0x09, 0xaf, 0x7f, 0x04, 0xb7, 0x27, 0x5a, 0x27, 0x27, 0xc0, 0x80, 0x5b, 0x91, - 0x30, 0xb2, 0x54, 0x74, 0xe9, 0xb9, 0xfe, 0xed, 0x47, 0x58, 0xc2, 0xde, 0xfc, 0x49, 0xde, 0xd1, - 0xc2, 0xd6, 0x68, 0x19, 0xf2, 0x2d, 0x8c, 0xb7, 0x71, 0x39, 0x53, 0x45, 0x47, 0xc7, 0xb5, 0xf9, - 0x34, 0xd2, 0x0a, 0x43, 0x1a, 0xa2, 0x06, 0x94, 0xda, 0x5b, 0x7b, 0x3b, 0x78, 0x7b, 0x13, 0xb7, - 0x4c, 0xb3, 0xac, 0x54, 0x97, 0x8e, 0x8e, 0x6b, 0x8b, 0x29, 0xa8, 0xed, 0xef, 0x84, 0xd4, 0x09, - 0x49, 0x14, 0xa1, 0xd7, 0xa0, 0xb0, 0xbe, 0xfd, 0xf1, 0x4e, 0xa7, 0xb5, 0xdb, 0x2a, 0x67, 0xab, - 0xb7, 0x8f, 0x8e, 0x6b, 0x0b, 0x29, 0x6c, 0x9d, 0xf6, 0x02, 0x8f, 0xc4, 0xbb, 0x99, 0xbb, 0x1f, - 0xe0, 0xdd, 0x72, 0x6e, 0x62, 0x37, 0x93, 0x59, 0x21, 0xab, 0xce, 0x7e, 0xfb, 0xa3, 0x96, 0xf9, - 0xf9, 0x54, 0xcb, 0xfc, 0x72, 0xaa, 0x29, 0x2b, 0xe7, 0x59, 0x00, 0x9c, 0xfe, 0xcd, 0xa0, 0x2d, - 0x28, 0xa6, 0xa7, 0x47, 0xcb, 0x17, 0x4e, 0x39, 0x69, 0xa8, 0xaa, 0x76, 0x59, 0x58, 0x7e, 0x4e, - 0x33, 0x0d, 0x05, 0xb5, 0xa1, 0x90, 0x5c, 0x27, 0xe8, 0xd5, 0xa9, 0xa6, 0x8d, 0x7d, 0x06, 0xaa, - 0xcb, 0x97, 0x44, 0x13, 0x32, 0xf4, 0x29, 0xcc, 0x5d, 0x10, 0x07, 0xdd, 0x99, 0xbe, 0x87, 0x26, - 0x4b, 0xac, 0xff, 0x13, 0x24, 0x65, 0xfe, 0x02, 0x16, 0x2f, 0x84, 0x62, 0x87, 0xfd, 0x67, 0xfc, - 0x0d, 0x65, 0xed, 0xe1, 0xd9, 0x50, 0x53, 0x9e, 0x0e, 0x35, 0xe5, 0xbb, 0x73, 0x2d, 0x73, 0x72, - 0xae, 0x29, 0x4f, 0xcf, 0xb5, 0xcc, 0xef, 0xe7, 0x5a, 0xe6, 0xb3, 0xfa, 0xa5, 0x03, 0x97, 0xfe, - 0x6d, 0xee, 0xdf, 0x12, 0xcf, 0xf7, 0xfe, 0x0e, 0x00, 0x00, 0xff, 0xff, 0x1f, 0x25, 0x79, 0x9a, - 0x82, 0x0a, 0x00, 0x00, + // 1042 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe4, 0x56, 0x4f, 0x6f, 0xe3, 0x44, + 0x14, 0x8f, 0x1b, 0xa7, 0x4d, 0x5e, 0xda, 0x92, 0x4e, 0x59, 0x1a, 0x02, 0xb5, 0xb3, 0x41, 0x42, + 0xe1, 0xcf, 0x26, 0x52, 0x17, 0x96, 0xa5, 0x5a, 0x69, 0x21, 0x25, 0xed, 0x46, 0x0a, 0x6d, 0x35, + 0xae, 0x10, 0x82, 0x43, 0x70, 0x92, 0x59, 0x63, 0xd5, 0xf1, 0x18, 0xcf, 0x04, 0xd1, 0x6f, 0x80, + 0x7a, 0x02, 0xee, 0x15, 0x95, 0xa8, 0x10, 0x47, 0x8e, 0xcb, 0x37, 0xe8, 0x71, 0x8f, 0x9c, 0x22, + 0x91, 0x5e, 0xf8, 0x08, 0x68, 0x4f, 0x68, 0xc6, 0x93, 0x3f, 0x4d, 0xb6, 0xb4, 0x91, 0xb8, 0xed, + 0xcd, 0x9e, 0xf7, 0x7b, 0xbf, 0xf7, 0xe7, 0xf7, 0xde, 0xd8, 0xf0, 0x5a, 0x10, 0x52, 0x4e, 0xcb, + 0xcc, 0x0f, 0x9a, 0xe5, 0x90, 0x04, 0x9e, 0xdb, 0xb2, 0x39, 0x0d, 0x4b, 0xf2, 0x14, 0xa5, 0xbf, + 0xb5, 0x43, 0x8f, 0x3a, 0x25, 0x61, 0xcd, 0x99, 0x0e, 0xa5, 0x8e, 0x47, 0xca, 0xd2, 0xd4, 0xec, + 0x3e, 0x2e, 0x73, 0xb7, 0x43, 0x18, 0xb7, 0x3b, 0x41, 0x84, 0xce, 0xdd, 0x71, 0x5c, 0xfe, 0x75, + 0xb7, 0x59, 0x6a, 0xd1, 0x4e, 0xd9, 0xa1, 0x0e, 0x1d, 0x21, 0xc5, 0x5b, 0x14, 0x47, 0x3c, 0x29, + 0xf8, 0x5a, 0x44, 0x1e, 0x34, 0xcb, 0x1d, 0xc2, 0xed, 0xb6, 0xcd, 0xed, 0xc8, 0x50, 0xf8, 0x31, + 0x0e, 0x19, 0xac, 0x52, 0x21, 0x98, 0x7c, 0xd3, 0x25, 0x8c, 0x23, 0x0b, 0x92, 0x9c, 0x06, 0x6e, + 0xab, 0xe1, 0xb6, 0xb3, 0x5a, 0x5e, 0x2b, 0x26, 0x2a, 0xf7, 0xfb, 0x3d, 0x73, 0xe1, 0x40, 0x9c, + 0xd5, 0x3e, 0x79, 0xd6, 0x33, 0xdf, 0x1a, 0x8b, 0x7e, 0x68, 0x1f, 0xda, 0xb4, 0x1c, 0xf1, 0x97, + 0x83, 0x43, 0xa7, 0xcc, 0x8f, 0x02, 0xc2, 0x4a, 0x0a, 0x8c, 0x17, 0x24, 0x53, 0xad, 0x8d, 0xda, + 0xb0, 0xe4, 0x51, 0xa7, 0xc1, 0x78, 0x48, 0xec, 0x8e, 0x60, 0x9e, 0x93, 0xcc, 0x1f, 0xf5, 0x7b, + 0x66, 0xba, 0x4e, 0x1d, 0x4b, 0x9e, 0x4b, 0xf6, 0x3b, 0xd7, 0xb3, 0x8f, 0x39, 0xe0, 0xb4, 0x37, + 0x7c, 0x69, 0xa3, 0x47, 0xa0, 0x7b, 0x1e, 0xf3, 0xb3, 0xf1, 0x7c, 0xbc, 0xa8, 0x57, 0xde, 0xeb, + 0xf7, 0x4c, 0xbd, 0x5e, 0xb7, 0x76, 0x9f, 0xf5, 0xcc, 0x37, 0x6f, 0xc0, 0x5a, 0xb7, 0x76, 0xb3, + 0x1a, 0x96, 0x0c, 0x08, 0x81, 0x2e, 0xfa, 0x94, 0xd5, 0xf3, 0xf1, 0xe2, 0x22, 0x96, 0xcf, 0xe8, + 0x33, 0x80, 0x26, 0x71, 0x5c, 0xbf, 0x21, 0x63, 0x24, 0xf2, 0x5a, 0x51, 0xaf, 0x7c, 0xd0, 0xef, + 0x99, 0xa9, 0x8a, 0x38, 0x9d, 0x2d, 0x10, 0x4e, 0x49, 0xaa, 0xba, 0xc7, 0xfc, 0xcd, 0xc5, 0x27, + 0xa7, 0xa6, 0xf6, 0xf7, 0xa9, 0xa9, 0xfd, 0x73, 0x6a, 0x6a, 0x85, 0x55, 0x58, 0x19, 0x93, 0x84, + 0x05, 0xd4, 0x67, 0xa4, 0x70, 0xa6, 0xc1, 0xa2, 0x75, 0xe4, 0xb7, 0xf6, 0x29, 0x73, 0xb9, 0x4b, + 0x7d, 0xb4, 0xad, 0x2a, 0xd5, 0x64, 0x16, 0x1b, 0xb3, 0x57, 0xaa, 0xea, 0xdc, 0x06, 0xdd, 0x11, + 0x3c, 0x73, 0x23, 0x9e, 0x9d, 0x1b, 0xf3, 0xec, 0x48, 0x1e, 0xe1, 0xbf, 0xa9, 0x8b, 0xfc, 0x0b, + 0x4f, 0x34, 0x48, 0x89, 0x34, 0xb1, 0xed, 0x3b, 0x44, 0xf4, 0xeb, 0xb1, 0x1b, 0x32, 0xde, 0x18, + 0xcb, 0x54, 0xf6, 0x6b, 0x5b, 0x9c, 0xce, 0xda, 0x2f, 0x49, 0x25, 0xfa, 0x85, 0x2c, 0x48, 0x79, + 0xf6, 0x80, 0x36, 0x4a, 0xfc, 0x5e, 0xbf, 0x67, 0x26, 0xeb, 0xf6, 0xcc, 0xac, 0x49, 0x41, 0x24, + 0x48, 0x0b, 0x3f, 0xc7, 0xe1, 0x25, 0x91, 0x7a, 0xcd, 0x77, 0xf9, 0x60, 0x13, 0xbe, 0x04, 0x68, + 0x79, 0x5d, 0xc6, 0x49, 0x38, 0xda, 0x85, 0x07, 0xa2, 0x80, 0xad, 0xe8, 0x54, 0xce, 0xeb, 0x3b, + 0xd7, 0x87, 0x1a, 0xc2, 0x71, 0x4a, 0xf1, 0xd5, 0xda, 0xe8, 0x21, 0xcc, 0x33, 0xda, 0x0d, 0x5b, + 0x44, 0x96, 0x90, 0xde, 0xb8, 0x5d, 0x52, 0x57, 0xc0, 0x60, 0x59, 0x47, 0x63, 0xae, 0xe6, 0xa1, + 0xa2, 0x9f, 0xf7, 0xcc, 0x18, 0x56, 0x6e, 0xa8, 0x06, 0xe9, 0x36, 0x61, 0xdc, 0xf5, 0x6d, 0x31, + 0x11, 0xd9, 0xf8, 0x6c, 0x2c, 0xe3, 0xbe, 0x68, 0x03, 0x12, 0xa1, 0x90, 0x2c, 0xab, 0x4b, 0x92, + 0x57, 0x4a, 0x63, 0xb7, 0x51, 0x69, 0x28, 0xa8, 0xf2, 0x8c, 0xa0, 0x88, 0xc2, 0xaa, 0x54, 0xa1, + 0x45, 0x3b, 0x1d, 0x97, 0x73, 0xd2, 0x1e, 0x5f, 0x8b, 0x87, 0xfd, 0x9e, 0xb9, 0x22, 0xf4, 0xd8, + 0x1a, 0x58, 0x67, 0x14, 0x66, 0xc5, 0xbb, 0xe4, 0x2c, 0x14, 0xda, 0x86, 0xcc, 0x48, 0xa0, 0x68, + 0x2f, 0x46, 0x89, 0x6b, 0x37, 0x4e, 0xbc, 0xf0, 0x97, 0x06, 0x20, 0x4c, 0x16, 0xb7, 0x79, 0x97, + 0xa1, 0x77, 0x21, 0xc1, 0xb8, 0xcd, 0x23, 0x8a, 0xe5, 0xe7, 0x50, 0x08, 0x1c, 0xc1, 0x11, 0x08, + 0xbd, 0x0f, 0x09, 0x39, 0x88, 0x4a, 0xb4, 0x57, 0xa7, 0xd0, 0x83, 0x0d, 0x1d, 0xc4, 0x94, 0x68, + 0x74, 0x17, 0x74, 0x51, 0x90, 0x12, 0xe9, 0x5a, 0x2f, 0x09, 0x46, 0x1f, 0xc2, 0x42, 0xab, 0x1b, + 0x86, 0xc4, 0xe7, 0x4a, 0x97, 0x6b, 0xfd, 0x06, 0xf8, 0xc2, 0x4f, 0x1a, 0xa4, 0xa5, 0xdd, 0x3e, + 0xf2, 0xa8, 0xdd, 0x46, 0x55, 0x58, 0x8e, 0x74, 0x6a, 0xb4, 0xa8, 0xcf, 0xc9, 0x77, 0x5c, 0x35, + 0xcc, 0x98, 0x1a, 0x97, 0xa8, 0xe7, 0x5b, 0x11, 0x0a, 0x2f, 0xb5, 0xc6, 0x5f, 0xd1, 0x3d, 0x48, + 0x89, 0x5b, 0x9c, 0xf8, 0x3c, 0x3c, 0x9a, 0xec, 0xc0, 0xf8, 0xc0, 0x55, 0x05, 0x00, 0x27, 0x3d, + 0xf5, 0xb4, 0xa9, 0x9f, 0x8b, 0xdb, 0xe1, 0x8f, 0x39, 0x78, 0x59, 0x6a, 0x32, 0xf9, 0xc5, 0x79, + 0x61, 0xf6, 0xec, 0x3e, 0x2c, 0x04, 0x91, 0x22, 0x4a, 0xd1, 0xec, 0xb4, 0xa2, 0x91, 0x7d, 0x20, + 0xa8, 0x82, 0x17, 0x1e, 0xc1, 0xad, 0x89, 0xd6, 0xa9, 0x0d, 0x28, 0xc3, 0x3c, 0x93, 0x83, 0xac, + 0x14, 0x5d, 0x7b, 0xee, 0xfc, 0x76, 0x19, 0x56, 0xb0, 0xb7, 0x7f, 0x55, 0x77, 0xb4, 0x1c, 0x6b, + 0xb4, 0x0e, 0x89, 0x2a, 0xc6, 0x7b, 0x38, 0x13, 0xcb, 0xa1, 0xe3, 0x93, 0xfc, 0xf2, 0xd0, 0x52, + 0x0d, 0x43, 0x1a, 0xa2, 0x22, 0xa4, 0x6b, 0xbb, 0x8d, 0x7d, 0xbc, 0xb7, 0x83, 0xab, 0x96, 0x95, + 0xd1, 0x72, 0x6b, 0xc7, 0x27, 0xf9, 0xd5, 0x21, 0xa8, 0xe6, 0xef, 0x87, 0xd4, 0x09, 0x09, 0x63, + 0xe8, 0x0d, 0x48, 0x6e, 0xed, 0x7d, 0xba, 0x5f, 0xaf, 0x1e, 0x54, 0x33, 0x73, 0xb9, 0x5b, 0xc7, + 0x27, 0xf9, 0x95, 0x21, 0x6c, 0x8b, 0x76, 0x02, 0x8f, 0x44, 0xd1, 0xac, 0x83, 0x8f, 0xf1, 0x41, + 0x26, 0x3e, 0x11, 0xcd, 0xe2, 0x76, 0xc8, 0x73, 0x8b, 0xdf, 0xff, 0x62, 0xc4, 0x7e, 0x3b, 0x33, + 0x62, 0xbf, 0x9f, 0x19, 0xda, 0xc6, 0xc5, 0x1c, 0x00, 0x1e, 0xfe, 0x27, 0xa1, 0x5d, 0x48, 0x0d, + 0xab, 0x47, 0xeb, 0x97, 0xaa, 0x9c, 0x1c, 0xa8, 0x9c, 0x71, 0x95, 0x59, 0x7d, 0x4e, 0x63, 0x45, + 0x0d, 0xd5, 0x20, 0x39, 0xb8, 0x4e, 0xd0, 0xeb, 0x53, 0x4d, 0x1b, 0xfb, 0x0c, 0xe4, 0xd6, 0xaf, + 0xb0, 0x0e, 0xc8, 0xd0, 0xe7, 0xb0, 0x74, 0x49, 0x1c, 0x74, 0x7b, 0xfa, 0x1e, 0x9a, 0x4c, 0xb1, + 0xf0, 0x5f, 0x90, 0x21, 0xf3, 0x57, 0xb0, 0x7a, 0xc9, 0x14, 0x4d, 0xd8, 0xff, 0xc6, 0x5f, 0xd4, + 0x2a, 0x0f, 0xce, 0xfb, 0x86, 0xf6, 0xb4, 0x6f, 0x68, 0x3f, 0x5c, 0x18, 0xb1, 0xd3, 0x0b, 0x43, + 0x7b, 0x7a, 0x61, 0xc4, 0xfe, 0xbc, 0x30, 0x62, 0x5f, 0x14, 0xae, 0x5c, 0xb8, 0xe1, 0x7f, 0x6c, + 0x73, 0x5e, 0x3e, 0xdf, 0xfd, 0x37, 0x00, 0x00, 0xff, 0xff, 0x92, 0x00, 0xbc, 0x2a, 0xdc, 0x0a, + 0x00, 0x00, } func (x SyncState) String() string { @@ -779,6 +793,9 @@ func (this *ReplicateRequest) Equal(that interface{}) bool { return false } } + if this.BeginLLSN != that1.BeginLLSN { + return false + } return true } func (this *SyncPosition) Equal(that interface{}) bool { @@ -1178,6 +1195,11 @@ func (m *ReplicateRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.BeginLLSN != 0 { + i = encodeVarintReplicator(dAtA, i, uint64(m.BeginLLSN)) + i-- + dAtA[i] = 0x28 + } if len(m.Data) > 0 { for iNdEx := len(m.Data) - 1; iNdEx >= 0; iNdEx-- { i -= len(m.Data[iNdEx]) @@ -1636,6 +1658,7 @@ func NewPopulatedReplicateRequest(r randyReplicator, easy bool) *ReplicateReques this.Data[i][j] = byte(r.Intn(256)) } } + this.BeginLLSN = github_com_kakao_varlog_pkg_types.LLSN(uint64(r.Uint32())) if !easy && r.Intn(10) != 0 { } return this @@ -1738,6 +1761,9 @@ func (m *ReplicateRequest) ProtoSize() (n int) { n += 1 + l + sovReplicator(uint64(l)) } } + if m.BeginLLSN != 0 { + n += 1 + sovReplicator(uint64(m.BeginLLSN)) + } return n } @@ -2080,6 +2106,25 @@ func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { m.Data = append(m.Data, make([]byte, postIndex-iNdEx)) copy(m.Data[len(m.Data)-1], dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field BeginLLSN", wireType) + } + m.BeginLLSN = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowReplicator + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.BeginLLSN |= github_com_kakao_varlog_pkg_types.LLSN(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipReplicator(dAtA[iNdEx:]) diff --git a/proto/snpb/replicator.proto b/proto/snpb/replicator.proto index 0e8c3a637..6b4048cbc 100644 --- a/proto/snpb/replicator.proto +++ b/proto/snpb/replicator.proto @@ -29,11 +29,19 @@ message ReplicateRequest { (gogoproto.casttype) = "github.com/kakao/varlog/pkg/types.LogStreamID", (gogoproto.customname) = "LogStreamID" ]; + // LLSN is a list of local log sequence numbers where the log entries are + // replicated. The primary replica fills this field for backward + // compatibility, but it will disappear soon. repeated uint64 llsn = 3 [ + deprecated = true, (gogoproto.casttype) = "github.com/kakao/varlog/pkg/types.LLSN", (gogoproto.customname) = "LLSN" ]; repeated bytes data = 4; + uint64 begin_llsn = 5 [ + (gogoproto.casttype) = "github.com/kakao/varlog/pkg/types.LLSN", + (gogoproto.customname) = "BeginLLSN" + ]; } message ReplicateResponse {} diff --git a/proto/snpb/replicator_test.go b/proto/snpb/replicator_test.go index 437c06d5d..5915e3876 100644 --- a/proto/snpb/replicator_test.go +++ b/proto/snpb/replicator_test.go @@ -4,6 +4,8 @@ import ( "testing" "github.com/stretchr/testify/require" + + "github.com/kakao/varlog/pkg/types" ) func TestSyncPositionInvalid(t *testing.T) { @@ -80,3 +82,73 @@ func TestSyncPositionLessThan(t *testing.T) { }) } } + +func TestInvalidSyncRange(t *testing.T) { + sr := InvalidSyncRange() + require.True(t, sr.Invalid()) +} + +func TestSyncRangeInvalid(t *testing.T) { + tcs := []struct { + sr SyncRange + expected bool + }{ + { + sr: SyncRange{FirstLLSN: types.InvalidLLSN, LastLLSN: types.InvalidLLSN}, + expected: true, + }, + { + sr: SyncRange{FirstLLSN: types.LLSN(1), LastLLSN: types.InvalidLLSN}, + expected: true, + }, + { + sr: SyncRange{FirstLLSN: types.InvalidLLSN, LastLLSN: types.LLSN(1)}, + expected: true, + }, + { + sr: SyncRange{FirstLLSN: types.LLSN(2), LastLLSN: types.LLSN(1)}, + expected: true, + }, + { + sr: SyncRange{FirstLLSN: types.LLSN(1), LastLLSN: types.LLSN(2)}, + expected: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.sr.String(), func(t *testing.T) { + require.Equal(t, tc.expected, tc.sr.Invalid()) + }) + } +} + +func TestSyncRangeValidate(t *testing.T) { + tcs := []struct { + sr SyncRange + wantErr bool + }{ + { + sr: SyncRange{FirstLLSN: types.LLSN(2), LastLLSN: types.LLSN(1)}, + wantErr: true, + }, + { + sr: SyncRange{FirstLLSN: types.InvalidLLSN, LastLLSN: types.LLSN(1)}, + wantErr: true, + }, + { + sr: SyncRange{FirstLLSN: types.LLSN(1), LastLLSN: types.LLSN(2)}, + wantErr: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.sr.String(), func(t *testing.T) { + err := tc.sr.Validate() + if tc.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + }) + } +} diff --git a/proto/varlogpb/log_entry.go b/proto/varlogpb/log_entry.go deleted file mode 100644 index ee73f76ff..000000000 --- a/proto/varlogpb/log_entry.go +++ /dev/null @@ -1,13 +0,0 @@ -package varlogpb - -func InvalidLogEntryMeta() LogEntryMeta { - return LogEntryMeta{} -} - -func InvalidLogEntry() LogEntry { - return LogEntry{} -} - -func (le LogEntry) Invalid() bool { - return le.GLSN.Invalid() && le.LLSN.Invalid() && len(le.Data) == 0 -} diff --git a/proto/varlogpb/metadata.go b/proto/varlogpb/metadata.go index 87ec0c1fa..1a5570bd0 100644 --- a/proto/varlogpb/metadata.go +++ b/proto/varlogpb/metadata.go @@ -288,20 +288,6 @@ func (m *MetadataDescriptor) MustHaveStorageNode(id types.StorageNodeID) (*Stora return m.Must().HaveStorageNode(id) } -func (m *MetadataDescriptor) NotHaveStorageNode(id types.StorageNodeID) error { - if m == nil { - return errors.New("MetadataDescriptor is nil") - } - if snd := m.GetStorageNode(id); snd == nil { - return nil - } - return errors.Wrap(verrors.ErrExist, "storage node") -} - -func (m *MetadataDescriptor) MustNotHaveStorageNode(id types.StorageNodeID) error { - return m.Must().NotHaveStorageNode(id) -} - func (m *MetadataDescriptor) InsertLogStream(ls *LogStreamDescriptor) error { if m == nil || ls == nil { return nil @@ -519,20 +505,6 @@ func (m *MetadataDescriptor) MustHaveTopic(id types.TopicID) (*TopicDescriptor, return m.Must().HaveTopic(id) } -func (m *MetadataDescriptor) NotHaveTopic(id types.TopicID) error { - if m == nil { - return errors.New("MetadataDescriptor is nil") - } - if tnd := m.GetTopic(id); tnd == nil { - return nil - } - return errors.Wrap(verrors.ErrExist, "storage node") -} - -func (m *MetadataDescriptor) MustNotHaveTopic(id types.TopicID) error { - return m.Must().NotHaveTopic(id) -} - func (t *TopicDescriptor) searchLogStream(id types.LogStreamID) (int, bool) { i := sort.Search(len(t.LogStreams), func(i int) bool { return t.LogStreams[i] >= id diff --git a/proto/varlogpb/metadata_test.go b/proto/varlogpb/metadata_test.go index affd2fbec..556b42f38 100644 --- a/proto/varlogpb/metadata_test.go +++ b/proto/varlogpb/metadata_test.go @@ -1,39 +1,2094 @@ package varlogpb import ( + "encoding/json" "testing" + + "github.com/stretchr/testify/require" + + "github.com/kakao/varlog/pkg/types" ) +func TestLogStreamStatusMarshalJSON(t *testing.T) { + tcs := []struct { + in LogStreamStatus + want string + isErr bool + }{ + {in: LogStreamStatusRunning, want: `"running"`, isErr: false}, + {in: LogStreamStatusSealing, want: `"sealing"`, isErr: false}, + {in: LogStreamStatusSealed, want: `"sealed"`, isErr: false}, + {in: LogStreamStatusDeleted, want: `"deleted"`, isErr: false}, + {in: LogStreamStatusUnsealing, want: `"unsealing"`, isErr: false}, + {in: LogStreamStatus(LogStreamStatusUnsealing + 1), isErr: true}, + } + + for _, tc := range tcs { + t.Run(tc.in.String(), func(t *testing.T) { + got, err := json.Marshal(tc.in) + if tc.isErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tc.want, string(got)) + }) + } +} + +func TestLogStreamStatusUnmarshalJSON(t *testing.T) { + tcs := []struct { + in string + want LogStreamStatus + isErr bool + }{ + {in: `"running"`, want: LogStreamStatusRunning, isErr: false}, + {in: `"sealing"`, want: LogStreamStatusSealing, isErr: false}, + {in: `"sealed"`, want: LogStreamStatusSealed, isErr: false}, + {in: `"deleted"`, want: LogStreamStatusDeleted, isErr: false}, + {in: `"unsealing"`, want: LogStreamStatusUnsealing, isErr: false}, + {in: `"unknown"`, want: LogStreamStatus(0), isErr: true}, + {in: `{malformed}`, want: LogStreamStatus(0), isErr: true}, + } + + for _, tc := range tcs { + t.Run(tc.in, func(t *testing.T) { + var got LogStreamStatus + err := json.Unmarshal([]byte(tc.in), &got) + if tc.isErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tc.want, got) + }) + } +} + func TestLogStreamStatus(t *testing.T) { - var tests = []struct { - in LogStreamStatus - f func(st LogStreamStatus) bool - out bool + tcs := []struct { + in LogStreamStatus + f func(st LogStreamStatus) bool + want bool + }{ + {in: LogStreamStatusRunning, f: LogStreamStatus.Deleted, want: false}, + {in: LogStreamStatusRunning, f: LogStreamStatus.Running, want: true}, + {in: LogStreamStatusRunning, f: LogStreamStatus.Sealed, want: false}, + + {in: LogStreamStatusSealing, f: LogStreamStatus.Deleted, want: false}, + {in: LogStreamStatusSealing, f: LogStreamStatus.Running, want: false}, + {in: LogStreamStatusSealing, f: LogStreamStatus.Sealed, want: true}, + + {in: LogStreamStatusSealed, f: LogStreamStatus.Deleted, want: false}, + {in: LogStreamStatusSealed, f: LogStreamStatus.Running, want: false}, + {in: LogStreamStatusSealed, f: LogStreamStatus.Sealed, want: true}, + + {in: LogStreamStatusDeleted, f: LogStreamStatus.Deleted, want: true}, + {in: LogStreamStatusDeleted, f: LogStreamStatus.Running, want: false}, + {in: LogStreamStatusDeleted, f: LogStreamStatus.Sealed, want: false}, + } + + for _, tc := range tcs { + t.Run(tc.in.String(), func(t *testing.T) { + got := tc.f(tc.in) + require.Equal(t, tc.want, got) + }) + } +} + +func TestStorageNodeStatus(t *testing.T) { + tcs := []struct { + in StorageNodeStatus + f func(st StorageNodeStatus) bool + want bool + }{ + {in: StorageNodeStatusRunning, f: StorageNodeStatus.Running, want: true}, + {in: StorageNodeStatusRunning, f: StorageNodeStatus.Deleted, want: false}, + {in: StorageNodeStatusDeleted, f: StorageNodeStatus.Running, want: false}, + {in: StorageNodeStatusDeleted, f: StorageNodeStatus.Deleted, want: true}, + } + + for _, tc := range tcs { + t.Run(tc.in.String(), func(t *testing.T) { + got := tc.f(tc.in) + require.Equal(t, tc.want, got) + }) + } +} + +func TestTopicStatus(t *testing.T) { + tcs := []struct { + in TopicStatus + f func(ts TopicStatus) bool + want bool + }{ + {in: TopicStatusDeleted, f: TopicStatus.Deleted, want: true}, + {in: TopicStatus(0), f: TopicStatus.Deleted, want: false}, // Assuming 0 is not a deleted status + } + + for _, tc := range tcs { + t.Run(tc.in.String(), func(t *testing.T) { + got := tc.f(tc.in) + require.Equal(t, tc.want, got) + }) + } +} + +func TestLogStreamDescriptor_Validate(t *testing.T) { + tcs := []struct { + name string + lsd LogStreamDescriptor + wantErr bool + }{ + { + name: "Valid", + lsd: LogStreamDescriptor{ + Replicas: []*ReplicaDescriptor{ + {StorageNodeID: types.StorageNodeID(1), StorageNodePath: "path"}, + }, + }, + wantErr: false, + }, + { + name: "NoReplicas", + lsd: LogStreamDescriptor{Replicas: []*ReplicaDescriptor{}}, + wantErr: true, + }, + { + name: "NilReplica", + lsd: LogStreamDescriptor{Replicas: []*ReplicaDescriptor{nil}}, + wantErr: true, + }, + { + name: "DuplicateStorageNodes", + lsd: LogStreamDescriptor{ + Replicas: []*ReplicaDescriptor{ + {StorageNodeID: types.StorageNodeID(1), StorageNodePath: "path"}, + {StorageNodeID: types.StorageNodeID(1), StorageNodePath: "path"}, + }, + }, + wantErr: true, + }, + { + name: "InvalidReplica", + lsd: LogStreamDescriptor{ + Replicas: []*ReplicaDescriptor{ + {StorageNodeID: types.StorageNodeID(1), StorageNodePath: "path"}, + {StorageNodeID: types.StorageNodeID(2), StorageNodePath: ""}, + }, + }, + wantErr: true, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + err := tc.lsd.Validate() + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestLogStreamDescriptor_Valid(t *testing.T) { + tcs := []struct { + name string + lsd *LogStreamDescriptor + want bool + }{ + { + name: "Valid", + lsd: &LogStreamDescriptor{ + Replicas: []*ReplicaDescriptor{ + {StorageNodeID: types.StorageNodeID(1), StorageNodePath: "path"}, + }, + }, + want: true, + }, + { + name: "Nil", + lsd: nil, + want: false, + }, + { + name: "NoReplicas", + lsd: &LogStreamDescriptor{ + Replicas: []*ReplicaDescriptor{}, + }, + want: false, + }, + { + name: "InvalidReplica", + lsd: &LogStreamDescriptor{ + Replicas: []*ReplicaDescriptor{ + {StorageNodeID: types.StorageNodeID(1), StorageNodePath: ""}, + }, + }, + want: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.want, tc.lsd.Valid()) + }) + } +} + +func TestLogStreamDescriptor_IsReplica(t *testing.T) { + tcs := []struct { + name string + lsd *LogStreamDescriptor + snid types.StorageNodeID + want bool + }{ + { + name: "is replica", + lsd: &LogStreamDescriptor{ + Replicas: []*ReplicaDescriptor{ + {StorageNodeID: types.StorageNodeID(1), StorageNodePath: "path"}, + }, + }, + snid: types.StorageNodeID(1), + want: true, + }, + { + name: "not a replica", + lsd: &LogStreamDescriptor{ + Replicas: []*ReplicaDescriptor{ + {StorageNodeID: types.StorageNodeID(1), StorageNodePath: "path"}, + }, + }, + snid: types.StorageNodeID(2), + want: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.want, tc.lsd.IsReplica(tc.snid)) + }) + } +} + +func TestReplicaDescriptor_Validate(t *testing.T) { + tcs := []struct { + name string + rd ReplicaDescriptor + wantErr bool + }{ + { + name: "Valid", + rd: ReplicaDescriptor{StorageNodeID: types.StorageNodeID(1), StorageNodePath: "path"}, + wantErr: false, + }, + { + name: "InvalidStorageNodeId", + rd: ReplicaDescriptor{StorageNodeID: types.StorageNodeID(0), StorageNodePath: "path"}, + wantErr: true, + }, + { + name: "NoPath", + rd: ReplicaDescriptor{StorageNodeID: types.StorageNodeID(1), StorageNodePath: ""}, + wantErr: true, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + err := tc.rd.Validate() + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestReplicaDescriptor_Valid(t *testing.T) { + tcs := []struct { + name string + rd *ReplicaDescriptor + want bool + }{ + { + name: "Valid", + rd: &ReplicaDescriptor{StorageNodeID: types.StorageNodeID(1), StorageNodePath: "path"}, + want: true, + }, + { + name: "Nil", + rd: nil, + want: false, + }, + { + name: "NoPath", + rd: &ReplicaDescriptor{StorageNodeID: types.StorageNodeID(1), StorageNodePath: ""}, + want: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.want, tc.rd.valid()) + }) + } +} + +func TestMetadataDescriptor_Must(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + want *MetadataDescriptor + }{ + { + name: "Valid", + md: &MetadataDescriptor{}, + want: &MetadataDescriptor{}, + }, + { + name: "Nil", + md: nil, + want: nil, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + if tc.md == nil { + require.Panics(t, func() { tc.md.Must() }) + } else { + require.Equal(t, tc.want, tc.md.Must()) + } + }) + } +} + +func TestMetadataDescriptor_InsertStorageNode(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + sn *StorageNodeDescriptor + want *MetadataDescriptor + wantErr bool + }{ + { + name: "ValidStorageNode", + md: &MetadataDescriptor{}, + sn: &StorageNodeDescriptor{ + StorageNode: StorageNode{StorageNodeID: types.StorageNodeID(1)}, + }, + want: &MetadataDescriptor{ + StorageNodes: []*StorageNodeDescriptor{ + { + StorageNode: StorageNode{StorageNodeID: types.StorageNodeID(1)}, + }, + }, + }, + wantErr: false, + }, + { + name: "NilStorageNode", + md: &MetadataDescriptor{}, + sn: nil, + want: &MetadataDescriptor{}, + wantErr: false, + }, + { + name: "NilMetadataDescriptor", + md: nil, + sn: &StorageNodeDescriptor{ + StorageNode: StorageNode{StorageNodeID: types.StorageNodeID(1)}, + }, + want: nil, + wantErr: false, + }, + { + name: "DuplicateStorageNode", + md: &MetadataDescriptor{ + StorageNodes: []*StorageNodeDescriptor{ + { + StorageNode: StorageNode{StorageNodeID: types.StorageNodeID(1)}, + }, + }, + }, + sn: &StorageNodeDescriptor{ + StorageNode: StorageNode{StorageNodeID: types.StorageNodeID(1)}, + }, + want: &MetadataDescriptor{ + StorageNodes: []*StorageNodeDescriptor{ + { + StorageNode: StorageNode{StorageNodeID: types.StorageNodeID(1)}, + }, + }, + }, + + wantErr: true, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + err := tc.md.InsertStorageNode(tc.sn) + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.want, tc.md) + }) + } +} + +func TestMetadataDescriptor_UpdateStorageNode(t *testing.T) { + tcs := []struct { + name string + initialNodes []*StorageNodeDescriptor + updateNode *StorageNodeDescriptor + want []*StorageNodeDescriptor + wantErr bool + }{ + { + name: "UpdateExistingNode", + initialNodes: []*StorageNodeDescriptor{ + { + StorageNode: StorageNode{StorageNodeID: types.StorageNodeID(1)}, + Paths: []string{"path1"}, + }, + { + StorageNode: StorageNode{StorageNodeID: types.StorageNodeID(2)}, + Paths: []string{"path2"}, + }, + }, + updateNode: &StorageNodeDescriptor{ + StorageNode: StorageNode{StorageNodeID: types.StorageNodeID(1)}, + Paths: []string{"newpath1"}, + }, + want: []*StorageNodeDescriptor{ + { + StorageNode: StorageNode{StorageNodeID: types.StorageNodeID(1)}, + Paths: []string{"newpath1"}, + }, + { + StorageNode: StorageNode{StorageNodeID: types.StorageNodeID(2)}, + Paths: []string{"path2"}, + }, + }, + wantErr: false, + }, + { + name: "UpdateNonExistingNode", + initialNodes: []*StorageNodeDescriptor{ + { + StorageNode: StorageNode{StorageNodeID: types.StorageNodeID(1)}, + Paths: []string{"path1"}, + }, + }, + updateNode: &StorageNodeDescriptor{ + StorageNode: StorageNode{StorageNodeID: types.StorageNodeID(2)}, + Paths: []string{"newpath2"}, + }, + want: []*StorageNodeDescriptor{ + { + StorageNode: StorageNode{StorageNodeID: types.StorageNodeID(1)}, + Paths: []string{"path1"}, + }, + }, + wantErr: true, + }, + { + name: "NilMetadataDescriptor", + initialNodes: nil, + updateNode: &StorageNodeDescriptor{ + StorageNode: StorageNode{StorageNodeID: types.StorageNodeID(1)}, + Paths: []string{"newpath1"}, + }, + want: nil, + wantErr: true, + }, + { + name: "NilNode", + initialNodes: []*StorageNodeDescriptor{ + { + StorageNode: StorageNode{StorageNodeID: types.StorageNodeID(1)}, + Paths: []string{"path1"}, + }, + }, + updateNode: nil, + want: []*StorageNodeDescriptor{ + { + StorageNode: StorageNode{StorageNodeID: types.StorageNodeID(1)}, + Paths: []string{"path1"}, + }, + }, + wantErr: true, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + md := &MetadataDescriptor{ + StorageNodes: tc.initialNodes, + } + + err := md.UpdateStorageNode(tc.updateNode) + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.want, md.StorageNodes) + }) + } +} + +func TestMetadataDescriptor_UpsertStorageNode(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + sn *StorageNodeDescriptor + want *MetadataDescriptor + }{ + { + name: "NilMetadataDescriptor", + md: nil, + sn: &StorageNodeDescriptor{ + StorageNode: StorageNode{ + StorageNodeID: types.StorageNodeID(1), + }, + }, + want: nil, + }, + { + name: "NilStorageNode", + md: &MetadataDescriptor{}, + sn: nil, + want: &MetadataDescriptor{}, + }, + { + name: "InsertNew", + md: &MetadataDescriptor{}, + sn: &StorageNodeDescriptor{ + StorageNode: StorageNode{ + StorageNodeID: types.StorageNodeID(1), + }, + }, + want: &MetadataDescriptor{ + StorageNodes: []*StorageNodeDescriptor{ + { + StorageNode: StorageNode{ + StorageNodeID: types.StorageNodeID(1), + }, + }, + }, + }, + }, + { + name: "UpdateExisting", + md: &MetadataDescriptor{ + StorageNodes: []*StorageNodeDescriptor{ + { + StorageNode: StorageNode{ + StorageNodeID: types.StorageNodeID(1), + Address: "old-address", + }, + }, + }, + }, + sn: &StorageNodeDescriptor{ + StorageNode: StorageNode{ + StorageNodeID: types.StorageNodeID(1), + Address: "new-address", + }, + }, + want: &MetadataDescriptor{ + StorageNodes: []*StorageNodeDescriptor{ + { + StorageNode: StorageNode{ + StorageNodeID: types.StorageNodeID(1), + Address: "new-address", + }, + }, + }, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + err := tc.md.UpsertStorageNode(tc.sn) + require.NoError(t, err) + require.Equal(t, tc.want, tc.md) + }) + } +} + +func TestMetadataDescriptor_DeleteStorageNode(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + id types.StorageNodeID + want *MetadataDescriptor + wantErr bool }{ - {LogStreamStatusRunning, LogStreamStatus.Deleted, false}, - {LogStreamStatusRunning, LogStreamStatus.Running, true}, - {LogStreamStatusRunning, LogStreamStatus.Sealed, false}, + { + name: "NilMetadataDescriptor", + md: nil, + id: types.StorageNodeID(1), + want: nil, + wantErr: false, + }, + { + name: "NotExists", + md: &MetadataDescriptor{}, + id: types.StorageNodeID(1), + want: &MetadataDescriptor{}, + wantErr: true, + }, + { + name: "DeleteExisting", + md: &MetadataDescriptor{ + StorageNodes: []*StorageNodeDescriptor{ + { + StorageNode: StorageNode{ + StorageNodeID: types.StorageNodeID(1), + }, + }, + }, + }, + id: types.StorageNodeID(1), + want: &MetadataDescriptor{ + StorageNodes: []*StorageNodeDescriptor{}, + }, + wantErr: false, + }, + { + name: "DeleteExistingWithMultipleNodes", + md: &MetadataDescriptor{ + StorageNodes: []*StorageNodeDescriptor{ + { + StorageNode: StorageNode{ + StorageNodeID: types.StorageNodeID(1), + }, + }, + { + StorageNode: StorageNode{ + StorageNodeID: types.StorageNodeID(2), + }, + }, + }, + }, + id: types.StorageNodeID(1), + want: &MetadataDescriptor{ + StorageNodes: []*StorageNodeDescriptor{ + { + StorageNode: StorageNode{ + StorageNodeID: types.StorageNodeID(2), + }, + }, + }, + }, + wantErr: false, + }, + } - {LogStreamStatusSealing, LogStreamStatus.Deleted, false}, - {LogStreamStatusSealing, LogStreamStatus.Running, false}, - {LogStreamStatusSealing, LogStreamStatus.Sealed, true}, + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + err := tc.md.DeleteStorageNode(tc.id) + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.want, tc.md) + }) + } +} - {LogStreamStatusSealed, LogStreamStatus.Deleted, false}, - {LogStreamStatusSealed, LogStreamStatus.Running, false}, - {LogStreamStatusSealed, LogStreamStatus.Sealed, true}, +func TestMetadataDescriptor_GetStorageNode(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + id types.StorageNodeID + want *StorageNodeDescriptor + }{ + { + name: "NilMetadataDescriptor", + md: nil, + id: types.StorageNodeID(1), + want: nil, + }, + { + name: "NotExists", + md: &MetadataDescriptor{}, + id: types.StorageNodeID(1), + want: nil, + }, + { + name: "GetExisting", + md: &MetadataDescriptor{ + StorageNodes: []*StorageNodeDescriptor{ + { + StorageNode: StorageNode{ + StorageNodeID: types.StorageNodeID(1), + Address: "addr1", + }, + }, + }, + }, + id: types.StorageNodeID(1), + want: &StorageNodeDescriptor{ + StorageNode: StorageNode{ + StorageNodeID: types.StorageNodeID(1), + Address: "addr1", + }, + }, + }, + } - {LogStreamStatusDeleted, LogStreamStatus.Deleted, true}, - {LogStreamStatusDeleted, LogStreamStatus.Running, false}, - {LogStreamStatusDeleted, LogStreamStatus.Sealed, false}, + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + got := tc.md.GetStorageNode(tc.id) + require.Equal(t, tc.want, got) + }) } +} - for i := range tests { - tt := tests[i] - t.Run(tt.in.String(), func(t *testing.T) { - s := tt.f(tt.in) - if s != tt.out { - t.Errorf("input=%v, expected=%v, actual=%v", tt.in, tt.out, s) +func TestMetadataDescriptor_HaveStorageNode(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + id types.StorageNodeID + want *StorageNodeDescriptor + wantError bool + }{ + { + name: "NilMetadataDescriptor", + md: nil, + id: types.StorageNodeID(1), + want: nil, + wantError: true, + }, + { + name: "NotExists", + md: &MetadataDescriptor{}, + id: types.StorageNodeID(1), + want: nil, + wantError: true, + }, + { + name: "Exists", + md: &MetadataDescriptor{ + StorageNodes: []*StorageNodeDescriptor{ + { + StorageNode: StorageNode{ + StorageNodeID: types.StorageNodeID(1), + Address: "addr1", + }, + }, + }, + }, + id: types.StorageNodeID(1), + want: &StorageNodeDescriptor{ + StorageNode: StorageNode{ + StorageNodeID: types.StorageNodeID(1), + Address: "addr1", + }, + }, + wantError: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + got, err := tc.md.HaveStorageNode(tc.id) + if tc.wantError { + require.Error(t, err) + } else { + require.NoError(t, err) } + require.Equal(t, tc.want, got) + }) + } +} + +func TestMetadataDescriptor_MustHaveStorageNode(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + id types.StorageNodeID + want *StorageNodeDescriptor + wantPanic bool + wantError bool + }{ + { + name: "NilMetadataDescriptor", + md: nil, + id: types.StorageNodeID(1), + want: nil, + wantPanic: true, + wantError: false, + }, + { + name: "NotExists", + md: &MetadataDescriptor{}, + id: types.StorageNodeID(1), + want: nil, + wantPanic: false, + wantError: true, + }, + { + name: "Exists", + md: &MetadataDescriptor{ + StorageNodes: []*StorageNodeDescriptor{ + { + StorageNode: StorageNode{ + StorageNodeID: types.StorageNodeID(1), + Address: "addr1", + }, + }, + }, + }, + id: types.StorageNodeID(1), + want: &StorageNodeDescriptor{ + StorageNode: StorageNode{ + StorageNodeID: types.StorageNodeID(1), + Address: "addr1", + }, + }, + wantPanic: false, + wantError: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + if tc.wantPanic { + require.Panics(t, func() { + _, _ = tc.md.MustHaveStorageNode(tc.id) + }) + return + } + + got, err := tc.md.MustHaveStorageNode(tc.id) + if tc.wantError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.want, got) + }) + } +} + +func TestMetadataDescriptor_InsertLogStream(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + ls *LogStreamDescriptor + want *MetadataDescriptor + wantErr bool + }{ + { + name: "NilMetadataDescriptor", + md: nil, + ls: &LogStreamDescriptor{ + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(1), + }, + want: nil, + wantErr: false, + }, + { + name: "NilLogStream", + md: &MetadataDescriptor{}, + ls: nil, + want: &MetadataDescriptor{}, + wantErr: false, + }, + { + name: "InsertNew", + md: &MetadataDescriptor{}, + ls: &LogStreamDescriptor{ + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(1), + }, + want: &MetadataDescriptor{ + LogStreams: []*LogStreamDescriptor{ + { + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(1), + }, + }, + }, + wantErr: false, + }, + { + name: "AlreadyExists", + md: &MetadataDescriptor{ + LogStreams: []*LogStreamDescriptor{ + { + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(1), + }, + }, + }, + ls: &LogStreamDescriptor{ + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(1), + }, + want: &MetadataDescriptor{ + LogStreams: []*LogStreamDescriptor{ + { + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(1), + }, + }, + }, + wantErr: true, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + err := tc.md.InsertLogStream(tc.ls) + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.want, tc.md) + }) + } +} + +func TestMetadataDescriptor_UpdateLogStream(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + ls *LogStreamDescriptor + want *MetadataDescriptor + wantErr bool + }{ + { + name: "NilMetadataDescriptor", + md: nil, + ls: &LogStreamDescriptor{ + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(1), + }, + want: nil, + wantErr: true, + }, + { + name: "NilLogStream", + md: &MetadataDescriptor{}, + ls: nil, + want: &MetadataDescriptor{}, + wantErr: true, + }, + { + name: "NotExists", + md: &MetadataDescriptor{}, + ls: &LogStreamDescriptor{ + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(1), + }, + want: &MetadataDescriptor{}, + wantErr: true, + }, + { + name: "UpdateExisting", + md: &MetadataDescriptor{ + LogStreams: []*LogStreamDescriptor{ + { + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(1), + Status: LogStreamStatusRunning, + }, + }, + }, + ls: &LogStreamDescriptor{ + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(1), + Status: LogStreamStatusSealed, + }, + want: &MetadataDescriptor{ + LogStreams: []*LogStreamDescriptor{ + { + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(1), + Status: LogStreamStatusSealed, + }, + }, + }, + wantErr: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + err := tc.md.UpdateLogStream(tc.ls) + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.want, tc.md) + }) + } +} + +func TestMetadataDescriptor_UpsertLogStream(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + ls *LogStreamDescriptor + want *MetadataDescriptor + }{ + { + name: "NilMetadataDescriptor", + md: nil, + ls: &LogStreamDescriptor{ + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(1), + }, + want: nil, + }, + { + name: "NilLogStream", + md: &MetadataDescriptor{}, + ls: nil, + want: &MetadataDescriptor{}, + }, + { + name: "InsertNew", + md: &MetadataDescriptor{}, + ls: &LogStreamDescriptor{ + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(1), + Status: LogStreamStatusRunning, + }, + want: &MetadataDescriptor{ + LogStreams: []*LogStreamDescriptor{ + { + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(1), + Status: LogStreamStatusRunning, + }, + }, + }, + }, + { + name: "UpdateExisting", + md: &MetadataDescriptor{ + LogStreams: []*LogStreamDescriptor{ + { + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(1), + Status: LogStreamStatusRunning, + }, + }, + }, + ls: &LogStreamDescriptor{ + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(1), + Status: LogStreamStatusSealed, + }, + want: &MetadataDescriptor{ + LogStreams: []*LogStreamDescriptor{ + { + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(1), + Status: LogStreamStatusSealed, + }, + }, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + err := tc.md.UpsertLogStream(tc.ls) + require.NoError(t, err) + require.Equal(t, tc.want, tc.md) + }) + } +} + +func TestMetadataDescriptor_DeleteLogStream(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + id types.LogStreamID + want *MetadataDescriptor + wantErr bool + }{ + { + name: "NilMetadataDescriptor", + md: nil, + id: types.LogStreamID(1), + want: nil, + wantErr: false, + }, + { + name: "NotExists", + md: &MetadataDescriptor{}, + id: types.LogStreamID(1), + want: &MetadataDescriptor{}, + wantErr: true, + }, + { + name: "DeleteExisting", + md: &MetadataDescriptor{ + LogStreams: []*LogStreamDescriptor{ + { + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(1), + }, + }, + }, + id: types.LogStreamID(1), + want: &MetadataDescriptor{ + LogStreams: []*LogStreamDescriptor{}, + }, + wantErr: false, + }, + { + name: "DeleteExistingWithMultipleStreams", + md: &MetadataDescriptor{ + LogStreams: []*LogStreamDescriptor{ + { + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(1), + }, + { + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(2), + }, + }, + }, + id: types.LogStreamID(1), + want: &MetadataDescriptor{ + LogStreams: []*LogStreamDescriptor{ + { + TopicID: types.TopicID(1), + LogStreamID: types.LogStreamID(2), + }, + }, + }, + wantErr: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + err := tc.md.DeleteLogStream(tc.id) + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.want, tc.md) + }) + } +} + +func TestMetadataDescriptor_GetLogStream(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + id types.LogStreamID + want *LogStreamDescriptor + }{ + { + name: "NilMetadataDescriptor", + md: nil, + id: types.LogStreamID(1), + want: nil, + }, + { + name: "ExistingLogStream", + md: &MetadataDescriptor{ + LogStreams: []*LogStreamDescriptor{ + {LogStreamID: types.LogStreamID(1)}, + {LogStreamID: types.LogStreamID(2)}, + {LogStreamID: types.LogStreamID(3)}, + }, + }, + id: types.LogStreamID(2), + want: &LogStreamDescriptor{LogStreamID: types.LogStreamID(2)}, + }, + { + name: "NotExistingLogStream", + md: &MetadataDescriptor{ + LogStreams: []*LogStreamDescriptor{ + {LogStreamID: types.LogStreamID(1)}, + {LogStreamID: types.LogStreamID(2)}, + {LogStreamID: types.LogStreamID(3)}, + }, + }, + + id: types.LogStreamID(4), + want: nil, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + got := tc.md.GetLogStream(tc.id) + require.Equal(t, tc.want, got) + }) + } +} + +func TestMetadataDescriptor_HaveLogStream(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + id types.LogStreamID + want *LogStreamDescriptor + wantErr bool + }{ + { + name: "NilMetadataDescriptor", + md: nil, + id: types.LogStreamID(1), + want: nil, + wantErr: true, + }, + { + name: "ExistingLogStream", + md: &MetadataDescriptor{ + LogStreams: []*LogStreamDescriptor{ + {LogStreamID: types.LogStreamID(1)}, + {LogStreamID: types.LogStreamID(2)}, + {LogStreamID: types.LogStreamID(3)}, + }, + }, + id: types.LogStreamID(2), + want: &LogStreamDescriptor{LogStreamID: types.LogStreamID(2)}, + wantErr: false, + }, + { + name: "NotExistingLogStream", + md: &MetadataDescriptor{ + LogStreams: []*LogStreamDescriptor{ + {LogStreamID: types.LogStreamID(1)}, + {LogStreamID: types.LogStreamID(2)}, + {LogStreamID: types.LogStreamID(3)}, + }, + }, + id: types.LogStreamID(4), + want: nil, + wantErr: true, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + got, err := tc.md.HaveLogStream(tc.id) + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.want, got) + }) + } +} + +func TestMetadataDescriptor_MustHaveLogStream(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + id types.LogStreamID + want *LogStreamDescriptor + wantPanic bool + wantError bool + }{ + { + name: "NilMetadataDescriptor", + md: nil, + id: types.LogStreamID(1), + want: nil, + wantPanic: true, + wantError: false, + }, + { + name: "NotExists", + md: &MetadataDescriptor{}, + id: types.LogStreamID(1), + want: nil, + wantPanic: false, + wantError: true, + }, + { + name: "Exists", + md: &MetadataDescriptor{ + LogStreams: []*LogStreamDescriptor{ + { + LogStreamID: types.LogStreamID(1), + }, + }, + }, + id: types.LogStreamID(1), + want: &LogStreamDescriptor{LogStreamID: types.LogStreamID(1)}, + wantPanic: false, + wantError: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + if tc.wantPanic { + require.Panics(t, func() { + _, _ = tc.md.MustHaveLogStream(tc.id) + }) + return + } + + got, err := tc.md.MustHaveLogStream(tc.id) + if tc.wantError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.want, got) + }) + } +} + +func TestMetadataDescriptor_NotHaveLogStream(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + id types.LogStreamID + wantError bool + }{ + { + name: "NilMetadataDescriptor", + md: nil, + id: types.LogStreamID(1), + wantError: true, + }, + { + name: "NotExists", + md: &MetadataDescriptor{}, + id: types.LogStreamID(1), + wantError: false, + }, + { + name: "Exists", + md: &MetadataDescriptor{ + LogStreams: []*LogStreamDescriptor{ + { + LogStreamID: types.LogStreamID(1), + }, + }, + }, + id: types.LogStreamID(1), + wantError: true, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + err := tc.md.NotHaveLogStream(tc.id) + if tc.wantError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestMetadataDescriptor_MustNotHaveLogStream(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + id types.LogStreamID + wantPanic bool + wantError bool + }{ + { + name: "NilMetadataDescriptor", + md: nil, + id: types.LogStreamID(1), + wantPanic: true, + wantError: false, + }, + { + name: "NotExists", + md: &MetadataDescriptor{}, + id: types.LogStreamID(1), + wantPanic: false, + wantError: false, + }, + { + name: "Exists", + md: &MetadataDescriptor{ + LogStreams: []*LogStreamDescriptor{ + { + LogStreamID: types.LogStreamID(1), + }, + }, + }, + id: types.LogStreamID(1), + wantPanic: false, + wantError: true, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + if tc.wantPanic { + require.Panics(t, func() { + _ = tc.md.MustNotHaveLogStream(tc.id) + }) + return + } + + err := tc.md.MustNotHaveLogStream(tc.id) + if tc.wantError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestMetadataDescriptor_GetReplicasByStorageNodeID(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + id types.StorageNodeID + want []*ReplicaDescriptor + }{ + { + name: "NilMetadataDescriptor", + md: nil, + id: types.StorageNodeID(1), + want: nil, + }, + { + name: "NoReplicas", + md: &MetadataDescriptor{}, + id: types.StorageNodeID(1), + want: []*ReplicaDescriptor{}, + }, + { + name: "ReplicasExist", + md: &MetadataDescriptor{ + LogStreams: []*LogStreamDescriptor{ + { + Replicas: []*ReplicaDescriptor{ + {StorageNodeID: types.StorageNodeID(1), StorageNodePath: "path1"}, + {StorageNodeID: types.StorageNodeID(2), StorageNodePath: "path2"}, + }, + }, + { + Replicas: []*ReplicaDescriptor{ + {StorageNodeID: types.StorageNodeID(1), StorageNodePath: "path3"}, + }, + }, + }, + }, + id: types.StorageNodeID(1), + want: []*ReplicaDescriptor{ + {StorageNodeID: types.StorageNodeID(1), StorageNodePath: "path1"}, + {StorageNodeID: types.StorageNodeID(1), StorageNodePath: "path3"}, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + got := tc.md.GetReplicasByStorageNodeID(tc.id) + require.Equal(t, tc.want, got) + }) + } +} + +func TestMetadataDescriptor_GetTopic(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + id types.TopicID + want *TopicDescriptor + }{ + { + name: "NilMetadataDescriptor", + md: nil, + id: types.TopicID(1), + want: nil, + }, + { + name: "TopicNotExists", + md: &MetadataDescriptor{}, + id: types.TopicID(1), + want: nil, + }, + { + name: "TopicExists", + md: &MetadataDescriptor{ + Topics: []*TopicDescriptor{ + {TopicID: types.TopicID(1)}, + {TopicID: types.TopicID(2)}, + }, + }, + id: types.TopicID(1), + want: &TopicDescriptor{TopicID: types.TopicID(1)}, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + got := tc.md.GetTopic(tc.id) + require.Equal(t, tc.want, got) + }) + } +} + +func TestMetadataDescriptor_InsertTopic(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + topic *TopicDescriptor + want *MetadataDescriptor + wantErr bool + }{ + { + name: "NilMetadataDescriptor", + md: nil, + topic: &TopicDescriptor{TopicID: types.TopicID(1)}, + want: nil, + wantErr: false, + }, + { + name: "NilTopic", + md: &MetadataDescriptor{}, + topic: nil, + want: &MetadataDescriptor{}, + wantErr: false, + }, + { + name: "InsertNew", + md: &MetadataDescriptor{}, + topic: &TopicDescriptor{TopicID: types.TopicID(1)}, + want: &MetadataDescriptor{ + Topics: []*TopicDescriptor{ + {TopicID: types.TopicID(1)}, + }, + }, + wantErr: false, + }, + { + name: "AlreadyExists", + md: &MetadataDescriptor{ + Topics: []*TopicDescriptor{ + {TopicID: types.TopicID(1)}, + }, + }, + topic: &TopicDescriptor{TopicID: types.TopicID(1)}, + want: &MetadataDescriptor{ + Topics: []*TopicDescriptor{ + {TopicID: types.TopicID(1)}, + }, + }, + wantErr: true, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + err := tc.md.InsertTopic(tc.topic) + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.want, tc.md) + }) + } +} + +func TestMetadataDescriptor_DeleteTopic(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + id types.TopicID + want *MetadataDescriptor + wantErr bool + }{ + { + name: "NilMetadataDescriptor", + md: nil, + id: types.TopicID(1), + want: nil, + wantErr: false, + }, + { + name: "NotExists", + md: &MetadataDescriptor{}, + id: types.TopicID(1), + want: &MetadataDescriptor{}, + wantErr: true, + }, + { + name: "DeleteExisting", + md: &MetadataDescriptor{ + Topics: []*TopicDescriptor{ + {TopicID: types.TopicID(1)}, + }, + }, + id: types.TopicID(1), + want: &MetadataDescriptor{ + Topics: []*TopicDescriptor{}, + }, + wantErr: false, + }, + { + name: "DeleteExistingWithMultipleTopics", + md: &MetadataDescriptor{ + Topics: []*TopicDescriptor{ + {TopicID: types.TopicID(1)}, + {TopicID: types.TopicID(2)}, + }, + }, + id: types.TopicID(1), + want: &MetadataDescriptor{ + Topics: []*TopicDescriptor{ + {TopicID: types.TopicID(2)}, + }, + }, + wantErr: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + err := tc.md.DeleteTopic(tc.id) + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.want, tc.md) + }) + } +} + +func TestMetadataDescriptor_UpdateTopic(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + topic *TopicDescriptor + want *MetadataDescriptor + wantErr bool + }{ + { + name: "ValidUpdate", + md: &MetadataDescriptor{ + Topics: []*TopicDescriptor{ + {TopicID: types.TopicID(1)}, + }, + }, + topic: &TopicDescriptor{TopicID: types.TopicID(1), Status: TopicStatusDeleted}, + want: &MetadataDescriptor{ + Topics: []*TopicDescriptor{ + {TopicID: types.TopicID(1), Status: TopicStatusDeleted}, + }, + }, + wantErr: false, + }, + { + name: "NonExistingTopic", + md: &MetadataDescriptor{ + Topics: []*TopicDescriptor{ + {TopicID: types.TopicID(1)}, + }, + }, + topic: &TopicDescriptor{TopicID: types.TopicID(2), Status: TopicStatusDeleted}, + want: &MetadataDescriptor{ + Topics: []*TopicDescriptor{ + {TopicID: types.TopicID(1)}, + }, + }, + wantErr: true, + }, + { + name: "NilMetadataDescriptor", + md: nil, + topic: &TopicDescriptor{TopicID: types.TopicID(1)}, + want: nil, + wantErr: true, + }, + { + name: "NilTopicDescriptor", + md: &MetadataDescriptor{ + Topics: []*TopicDescriptor{ + {TopicID: types.TopicID(1)}, + }, + }, + topic: nil, + want: &MetadataDescriptor{ + Topics: []*TopicDescriptor{ + {TopicID: types.TopicID(1)}, + }, + }, + wantErr: true, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + err := tc.md.UpdateTopic(tc.topic) + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.want, tc.md) + }) + } +} + +func TestMetadataDescriptor_UpsertTopic(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + topic *TopicDescriptor + want *MetadataDescriptor + wantErr bool + }{ + { + name: "InsertNewTopic", + md: &MetadataDescriptor{}, + topic: &TopicDescriptor{ + TopicID: types.TopicID(1), + Status: TopicStatusRunning, + }, + want: &MetadataDescriptor{ + Topics: []*TopicDescriptor{ + { + TopicID: types.TopicID(1), + Status: TopicStatusRunning, + }, + }, + }, + wantErr: false, + }, + { + name: "UpdateExistingTopic", + md: &MetadataDescriptor{ + Topics: []*TopicDescriptor{ + { + TopicID: types.TopicID(1), + Status: TopicStatusRunning, + }, + }, + }, + topic: &TopicDescriptor{ + TopicID: types.TopicID(1), + Status: TopicStatusDeleted, + }, + want: &MetadataDescriptor{ + Topics: []*TopicDescriptor{ + { + TopicID: types.TopicID(1), + Status: TopicStatusDeleted, + }, + }, + }, + wantErr: false, + }, + { + name: "NilMetadataDescriptor", + md: nil, + topic: &TopicDescriptor{TopicID: types.TopicID(1)}, + want: nil, + wantErr: false, + }, + { + name: "NilTopicDescriptor", + md: &MetadataDescriptor{ + Topics: []*TopicDescriptor{ + {TopicID: types.TopicID(1)}, + }, + }, + topic: nil, + want: &MetadataDescriptor{ + Topics: []*TopicDescriptor{ + {TopicID: types.TopicID(1)}, + }, + }, + wantErr: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + err := tc.md.UpsertTopic(tc.topic) + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.want, tc.md) + }) + } +} + +func TestMetadataDescriptor_HaveTopic(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + topicID types.TopicID + want *TopicDescriptor + wantErr bool + }{ + { + name: "TopicExists", + md: &MetadataDescriptor{ + Topics: []*TopicDescriptor{ + {TopicID: types.TopicID(1)}, + }, + }, + topicID: types.TopicID(1), + want: &TopicDescriptor{TopicID: types.TopicID(1)}, + wantErr: false, + }, + { + name: "TopicNotExists", + md: &MetadataDescriptor{ + Topics: []*TopicDescriptor{ + {TopicID: types.TopicID(1)}, + }, + }, + topicID: types.TopicID(2), + want: nil, + wantErr: true, + }, + { + name: "NilMetadataDescriptor", + md: nil, + topicID: types.TopicID(1), + want: nil, + wantErr: true, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + got, err := tc.md.HaveTopic(tc.topicID) + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.want, got) + }) + } +} + +func TestMetadataDescriptor_MustHaveTopic(t *testing.T) { + tcs := []struct { + name string + md *MetadataDescriptor + topicID types.TopicID + want *TopicDescriptor + wantPanic bool + wantErr bool + }{ + { + name: "TopicExists", + md: &MetadataDescriptor{ + Topics: []*TopicDescriptor{ + {TopicID: types.TopicID(1)}, + }, + }, + topicID: types.TopicID(1), + want: &TopicDescriptor{TopicID: types.TopicID(1)}, + wantPanic: false, + wantErr: false, + }, + { + name: "TopicNotExists", + md: &MetadataDescriptor{ + Topics: []*TopicDescriptor{ + {TopicID: types.TopicID(1)}, + }, + }, + topicID: types.TopicID(2), + want: nil, + wantPanic: false, + wantErr: true, + }, + { + name: "NilMetadataDescriptor", + md: nil, + topicID: types.TopicID(1), + want: nil, + wantPanic: true, + wantErr: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + if tc.wantPanic { + require.Panics(t, func() { + _, _ = tc.md.MustHaveTopic(tc.topicID) + }) + return + } + + got, err := tc.md.MustHaveTopic(tc.topicID) + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.want, got) + }) + } +} + +func TestTopicDescriptor_InsertLogStream(t *testing.T) { + tcs := []struct { + name string + td *TopicDescriptor + lsID types.LogStreamID + want *TopicDescriptor + wantErr bool + }{ + { + name: "InsertNewLogStream", + td: &TopicDescriptor{}, + lsID: types.LogStreamID(1), + want: &TopicDescriptor{ + LogStreams: []types.LogStreamID{types.LogStreamID(1)}, + }, + wantErr: false, + }, + { + name: "InsertExistingLogStream", + td: &TopicDescriptor{ + LogStreams: []types.LogStreamID{types.LogStreamID(1)}, + }, + lsID: types.LogStreamID(1), + want: &TopicDescriptor{ + LogStreams: []types.LogStreamID{types.LogStreamID(1)}, + }, + wantErr: false, + }, + { + name: "NilTopicDescriptor", + td: nil, + lsID: types.LogStreamID(1), + want: nil, + wantErr: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + tc.td.InsertLogStream(tc.lsID) + require.Equal(t, tc.want, tc.td) + }) + } +} + +func TestTopicDescriptor_HasLogStream(t *testing.T) { + tcs := []struct { + name string + td *TopicDescriptor + lsID types.LogStreamID + want bool + }{ + { + name: "LogStreamExists", + td: &TopicDescriptor{ + LogStreams: []types.LogStreamID{types.LogStreamID(1)}, + }, + lsID: types.LogStreamID(1), + want: true, + }, + { + name: "LogStreamNotExists", + td: &TopicDescriptor{ + LogStreams: []types.LogStreamID{types.LogStreamID(1)}, + }, + lsID: types.LogStreamID(2), + want: false, + }, + { + name: "NilTopicDescriptor", + td: nil, + lsID: types.LogStreamID(1), + want: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + got := tc.td.HasLogStream(tc.lsID) + require.Equal(t, tc.want, got) + }) + } +} + +func TestLogSequenceNumber_Invalid(t *testing.T) { + tcs := []struct { + name string + lsn LogSequenceNumber + want bool + }{ + { + name: "BothValid", + lsn: LogSequenceNumber{ + LLSN: types.LLSN(1), + GLSN: types.GLSN(1), + }, + want: false, + }, + { + name: "LLSNInvalid", + lsn: LogSequenceNumber{ + LLSN: types.InvalidLLSN, + GLSN: types.GLSN(1), + }, + want: true, + }, + { + name: "GLSNInvalid", + lsn: LogSequenceNumber{ + LLSN: types.LLSN(1), + GLSN: types.InvalidGLSN, + }, + want: true, + }, + { + name: "BothInvalid", + lsn: LogSequenceNumber{ + LLSN: types.InvalidLLSN, + GLSN: types.InvalidGLSN, + }, + want: true, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + got := tc.lsn.Invalid() + require.Equal(t, tc.want, got) }) } } diff --git a/proto/varlogpb/replica_test.go b/proto/varlogpb/replica_test.go new file mode 100644 index 000000000..ca9a337c5 --- /dev/null +++ b/proto/varlogpb/replica_test.go @@ -0,0 +1,152 @@ +package varlogpb_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/kakao/varlog/proto/varlogpb" +) + +func TestEqualReplicas(t *testing.T) { + tcs := []struct { + name string + xs []varlogpb.LogStreamReplica + ys []varlogpb.LogStreamReplica + want bool + }{ + { + name: "EqualReplicas", + xs: []varlogpb.LogStreamReplica{ + { + StorageNode: varlogpb.StorageNode{StorageNodeID: 1}, + TopicLogStream: varlogpb.TopicLogStream{LogStreamID: 1}, + }, + }, + ys: []varlogpb.LogStreamReplica{ + { + StorageNode: varlogpb.StorageNode{StorageNodeID: 1}, + TopicLogStream: varlogpb.TopicLogStream{LogStreamID: 1}, + }, + }, + want: true, + }, + { + name: "DifferentLengths", + xs: []varlogpb.LogStreamReplica{ + { + StorageNode: varlogpb.StorageNode{StorageNodeID: 1}, + TopicLogStream: varlogpb.TopicLogStream{LogStreamID: 1}, + }, + }, + ys: []varlogpb.LogStreamReplica{}, + want: false, + }, + { + name: "DifferentStorageNodeID", + xs: []varlogpb.LogStreamReplica{ + { + StorageNode: varlogpb.StorageNode{StorageNodeID: 1}, + TopicLogStream: varlogpb.TopicLogStream{LogStreamID: 1}, + }, + }, + ys: []varlogpb.LogStreamReplica{ + { + StorageNode: varlogpb.StorageNode{StorageNodeID: 2}, + TopicLogStream: varlogpb.TopicLogStream{LogStreamID: 1}, + }, + }, + want: false, + }, + { + name: "DifferentLogStreamID", + xs: []varlogpb.LogStreamReplica{ + { + StorageNode: varlogpb.StorageNode{StorageNodeID: 1}, + TopicLogStream: varlogpb.TopicLogStream{LogStreamID: 1}, + }, + }, + ys: []varlogpb.LogStreamReplica{ + { + StorageNode: varlogpb.StorageNode{StorageNodeID: 1}, + TopicLogStream: varlogpb.TopicLogStream{LogStreamID: 2}, + }, + }, + want: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + got := varlogpb.EqualReplicas(tc.xs, tc.ys) + require.Equal(t, tc.want, got) + }) + } +} + +func TestValidReplicas(t *testing.T) { + tcs := []struct { + name string + replicas []varlogpb.LogStreamReplica + wantErr bool + }{ + { + name: "ValidReplicas", + replicas: []varlogpb.LogStreamReplica{ + { + StorageNode: varlogpb.StorageNode{StorageNodeID: 1}, + TopicLogStream: varlogpb.TopicLogStream{LogStreamID: 1}, + }, + { + StorageNode: varlogpb.StorageNode{StorageNodeID: 2}, + TopicLogStream: varlogpb.TopicLogStream{LogStreamID: 1}, + }, + }, + wantErr: false, + }, + { + name: "NoReplicas", + replicas: []varlogpb.LogStreamReplica{}, + wantErr: true, + }, + { + name: "LogStreamIDMismatch", + replicas: []varlogpb.LogStreamReplica{ + { + StorageNode: varlogpb.StorageNode{StorageNodeID: 1}, + TopicLogStream: varlogpb.TopicLogStream{LogStreamID: 1}, + }, + { + StorageNode: varlogpb.StorageNode{StorageNodeID: 2}, + TopicLogStream: varlogpb.TopicLogStream{LogStreamID: 2}, + }, + }, + wantErr: true, + }, + { + name: "StorageNodeIDDuplicated", + replicas: []varlogpb.LogStreamReplica{ + { + StorageNode: varlogpb.StorageNode{StorageNodeID: 1}, + TopicLogStream: varlogpb.TopicLogStream{LogStreamID: 1}, + }, + { + StorageNode: varlogpb.StorageNode{StorageNodeID: 1}, + TopicLogStream: varlogpb.TopicLogStream{LogStreamID: 1}, + }, + }, + wantErr: true, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + err := varlogpb.ValidReplicas(tc.replicas) + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/proto/varlogpb/storage_node_test.go b/proto/varlogpb/storage_node_test.go new file mode 100644 index 000000000..1f30ccbbd --- /dev/null +++ b/proto/varlogpb/storage_node_test.go @@ -0,0 +1,62 @@ +package varlogpb_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/kakao/varlog/proto/varlogpb" +) + +func TestStorageNodeDescriptor_Valid(t *testing.T) { + tcs := []struct { + name string + snd *varlogpb.StorageNodeDescriptor + want bool + }{ + { + name: "Valid", + snd: &varlogpb.StorageNodeDescriptor{ + StorageNode: varlogpb.StorageNode{StorageNodeID: 1, Address: "127.0.0.1"}, + Paths: []string{"/path/to/storage"}, + }, + want: true, + }, + { + name: "NilDescriptor", + snd: nil, + want: false, + }, + { + name: "EmptyAddress", + snd: &varlogpb.StorageNodeDescriptor{ + StorageNode: varlogpb.StorageNode{StorageNodeID: 1, Address: ""}, + Paths: []string{"/path/to/storage"}, + }, + want: false, + }, + { + name: "EmptyPaths", + snd: &varlogpb.StorageNodeDescriptor{ + StorageNode: varlogpb.StorageNode{StorageNodeID: 1, Address: "127.0.0.1"}, + Paths: []string{}, + }, + want: false, + }, + { + name: "EmptyPathInPaths", + snd: &varlogpb.StorageNodeDescriptor{ + StorageNode: varlogpb.StorageNode{StorageNodeID: 1, Address: "127.0.0.1"}, + Paths: []string{""}, + }, + want: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + got := tc.snd.Valid() + require.Equal(t, tc.want, got) + }) + } +}