diff --git a/CHANGES.md b/CHANGES.md index afb1d8ad2..c418e9cfb 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -7,6 +7,7 @@ Release Notes. ### Features - List all properties in a group. +- Implement Write-ahead Logging ### Bugs diff --git a/api/common/id.go b/api/common/id.go index 6cc879ca8..0d8c58907 100644 --- a/api/common/id.go +++ b/api/common/id.go @@ -40,6 +40,32 @@ func (s SeriesID) Marshal() []byte { return convert.Uint64ToBytes(uint64(s)) } +// GlobalSeriesID identities a series in a shard. +type GlobalSeriesID struct { + Name string + SeriesID SeriesID +} + +// Marshal encodes global series id to bytes. +func (s GlobalSeriesID) Marshal() []byte { + seriesIDBytes := convert.Uint64ToBytes(uint64(s.SeriesID)) + nameBytes := []byte(s.Name) + return append(seriesIDBytes, nameBytes...) +} + +// Volume returns the estimated bytes volume of global series id. +func (s GlobalSeriesID) Volume() int { + return 8 + len(s.Name) +} + +// ParseGlobalSeriesID parses global series id from bytes. +func ParseGlobalSeriesID(b []byte) GlobalSeriesID { + return GlobalSeriesID{ + SeriesID: SeriesID(convert.BytesToUint64(b[:8])), + Name: string(b[8:]), + } +} + // positionKey is a context key to store the module position. var positionKey = contextPositionKey{} diff --git a/go.mod b/go.mod index 373eafd9e..5eea559ec 100644 --- a/go.mod +++ b/go.mod @@ -74,7 +74,7 @@ require ( github.com/golang/glog v1.1.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/golang/snappy v0.0.3 // indirect + github.com/golang/snappy v0.0.3 github.com/google/btree v1.1.2 // indirect github.com/google/flatbuffers v1.12.1 // indirect github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 // indirect diff --git a/pkg/run/channel_closer.go b/pkg/run/channel_closer.go new file mode 100644 index 000000000..54c627070 --- /dev/null +++ b/pkg/run/channel_closer.go @@ -0,0 +1,124 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package run + +import ( + "context" + "sync" +) + +var dummyChannelCloserChan <-chan struct{} + +// ChannelCloser can close a goroutine then wait for it to stop. +type ChannelCloser struct { + ctx context.Context + cancel context.CancelFunc + sender sync.WaitGroup + receiver sync.WaitGroup + lock sync.RWMutex + closed bool +} + +// NewChannelCloser instances a new ChannelCloser. +func NewChannelCloser() *ChannelCloser { + c := &ChannelCloser{} + c.ctx, c.cancel = context.WithCancel(context.Background()) + c.sender.Add(1) + c.receiver.Add(1) + return c +} + +// AddSender adds a running sender. +func (c *ChannelCloser) AddSender() bool { + if c == nil { + return false + } + c.lock.RLock() + defer c.lock.RUnlock() + if c.closed { + return false + } + c.sender.Add(1) + return true +} + +// SenderDone notifies that running sender is done. +func (c *ChannelCloser) SenderDone() { + if c == nil { + return + } + c.sender.Done() +} + +// AddReceiver adds a running receiver. +func (c *ChannelCloser) AddReceiver() bool { + if c == nil { + return false + } + c.lock.RLock() + defer c.lock.RUnlock() + if c.closed { + return false + } + c.receiver.Add(1) + return true +} + +// ReceiverDone notifies that receiver task is done. +func (c *ChannelCloser) ReceiverDone() { + if c == nil { + return + } + c.receiver.Done() +} + +// CloseNotify receives a signal from Close. +func (c *ChannelCloser) CloseNotify() <-chan struct{} { + if c == nil { + return dummyChannelCloserChan + } + return c.ctx.Done() +} + +// CloseThenWait closes all tasks then waits till they are done. +func (c *ChannelCloser) CloseThenWait() { + if c == nil { + return + } + + c.lock.Lock() + c.closed = true + c.lock.Unlock() + + c.sender.Done() + c.sender.Wait() + + c.cancel() + c.receiver.Done() + c.receiver.Wait() +} + +// Closed returns whether the ChannelCloser is closed. +func (c *ChannelCloser) Closed() bool { + if c == nil { + return true + } + c.lock.RLock() + defer c.lock.RUnlock() + return c.closed +} diff --git a/pkg/run/channel_closer_test.go b/pkg/run/channel_closer_test.go new file mode 100644 index 000000000..9cc6bb5bb --- /dev/null +++ b/pkg/run/channel_closer_test.go @@ -0,0 +1,276 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package run + +import ( + "fmt" + "sync" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "github.com/onsi/gomega/gleak" + + "github.com/apache/skywalking-banyandb/pkg/test/flags" +) + +var _ = ginkgo.Describe("ChannelCloser", func() { + var goods []gleak.Goroutine + ginkgo.BeforeEach(func() { + goods = gleak.Goroutines() + }) + ginkgo.AfterEach(func() { + gomega.Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + }) + + ginkgo.Context("ChannelCloser", func() { + ginkgo.It("close simple channel", func() { + var wg sync.WaitGroup + + chanL1 := make(chan struct{}) + workerNum := 10 + chanNum := 1 + wg.Add(workerNum + chanNum) + + chanCloser := NewChannelCloser() + + for i := 0; i < workerNum; i++ { + go func(index int) { + fmt.Printf("Start worker - %d\n", index) + wg.Done() + + for { + if chanCloser.AddSender() { + time.Sleep(5 * time.Millisecond) + chanL1 <- struct{}{} + chanCloser.SenderDone() + } else { + fmt.Printf("Stop worker - %d\n", index) + return + } + } + }(i) + } + + go func() { + gomega.Expect(chanCloser.AddReceiver()).To(gomega.BeTrue()) + defer func() { + fmt.Printf("Stop consumer: chanL1\n") + chanCloser.ReceiverDone() + }() + + fmt.Printf("Start consumer: chanL1\n") + wg.Done() + + for { + select { + case <-chanL1: + time.Sleep(10 * time.Millisecond) + case <-chanCloser.CloseNotify(): + return + } + } + }() + + wg.Wait() + + fmt.Printf("Start close...\n") + chanCloser.CloseThenWait() + fmt.Printf("Stop close\n") + }) + + ginkgo.It("close multiple channels", func() { + var wg sync.WaitGroup + + chanA := make(chan struct{}) + chanB := make(chan struct{}) + chanAWorkerNum := 10 + chanBWorkerNum := 10 + chanNum := 2 + wg.Add(chanAWorkerNum + chanBWorkerNum + chanNum) + + chanCloser := NewChannelCloser() + + for i := 0; i < chanAWorkerNum; i++ { + go func(index int) { + fmt.Printf("Start chan-a worker - %d\n", index) + wg.Done() + + for { + if chanCloser.AddSender() { + time.Sleep(5 * time.Millisecond) + chanA <- struct{}{} + chanCloser.SenderDone() + } else { + fmt.Printf("Stop chan-a worker - %d\n", index) + return + } + } + }(i) + } + + for i := 0; i < chanBWorkerNum; i++ { + go func(index int) { + fmt.Printf("Start chan-b worker - %d\n", index) + wg.Done() + + for { + if chanCloser.AddSender() { + time.Sleep(5 * time.Millisecond) + chanB <- struct{}{} + chanCloser.SenderDone() + } else { + fmt.Printf("Stop chan-b worker - %d\n", index) + return + } + } + }(i) + } + + go func() { + gomega.Expect(chanCloser.AddReceiver()).To(gomega.BeTrue()) + defer func() { + fmt.Printf("Stop consumer: chan-a\n") + chanCloser.ReceiverDone() + }() + + fmt.Printf("Start consumer: chan-a\n") + wg.Done() + + for { + select { + case <-chanA: + time.Sleep(10 * time.Millisecond) + case <-chanCloser.CloseNotify(): + return + } + } + }() + + go func() { + gomega.Expect(chanCloser.AddReceiver()).To(gomega.BeTrue()) + defer func() { + fmt.Printf("Stop consumer: chan-b\n") + chanCloser.ReceiverDone() + }() + + fmt.Printf("Start consumer: chan-b\n") + wg.Done() + + for { + select { + case <-chanB: + time.Sleep(10 * time.Millisecond) + case <-chanCloser.CloseNotify(): + return + } + } + }() + + wg.Wait() + + fmt.Printf("Start close...\n") + + chanCloser.CloseThenWait() + + fmt.Printf("Stop close\n") + }) + }) + + ginkgo.Context("ChannelGroupCloser", func() { + ginkgo.It("close channel group", func() { + var wg sync.WaitGroup + + chanL1 := make(chan struct{}) + chanL2 := make(chan struct{}) + workerNum := 10 + chanNum := 2 + wg.Add(workerNum + chanNum) + + chanL1Closer := NewChannelCloser() + chanL2Closer := NewChannelCloser() + channelGroupCloser := NewChannelGroupCloser(chanL1Closer, chanL2Closer) + + for i := 0; i < workerNum; i++ { + go func(index int) { + fmt.Printf("Start worker - %d\n", index) + wg.Done() + + for { + if chanL1Closer.AddSender() { + time.Sleep(5 * time.Millisecond) + chanL1 <- struct{}{} + chanL1Closer.SenderDone() + } else { + fmt.Printf("Stop worker - %d\n", index) + return + } + } + }(i) + } + + go func() { + gomega.Expect(chanL1Closer.AddReceiver()).To(gomega.BeTrue()) + defer func() { + fmt.Printf("Stop consumer: chanL1\n") + chanL1Closer.ReceiverDone() + }() + + fmt.Printf("Start consumer: chanL1\n") + wg.Done() + + for { + select { + case req := <-chanL1: + chanL2 <- req + case <-chanL1Closer.CloseNotify(): + return + } + } + }() + + go func() { + gomega.Expect(chanL2Closer.AddReceiver()).To(gomega.BeTrue()) + defer func() { + fmt.Printf("Stop consumer: chanL2\n") + chanL2Closer.ReceiverDone() + }() + + fmt.Printf("Start consumer: chanL2\n") + wg.Done() + + for { + select { + case <-chanL2: + time.Sleep(10 * time.Millisecond) + case <-chanL2Closer.CloseNotify(): + return + } + } + }() + + wg.Wait() + + fmt.Printf("Start close...\n") + + channelGroupCloser.CloseThenWait() + + fmt.Printf("Stop close\n") + }) + }) +}) diff --git a/pkg/run/channel_group_closer.go b/pkg/run/channel_group_closer.go new file mode 100644 index 000000000..750c9f031 --- /dev/null +++ b/pkg/run/channel_group_closer.go @@ -0,0 +1,59 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package run + +import ( + "sync" +) + +// ChannelGroupCloser can close a goroutine group then wait for it to stop. +type ChannelGroupCloser struct { + group []*ChannelCloser + lock sync.RWMutex + closed bool +} + +// NewChannelGroupCloser instances a new ChannelGroupCloser. +func NewChannelGroupCloser(closer ...*ChannelCloser) *ChannelGroupCloser { + return &ChannelGroupCloser{group: closer} +} + +// CloseThenWait closes all closer then waits till they are done. +func (c *ChannelGroupCloser) CloseThenWait() { + if c == nil { + return + } + + c.lock.Lock() + c.closed = true + c.lock.Unlock() + + for _, closer := range c.group { + closer.CloseThenWait() + } +} + +// Closed returns whether the ChannelGroupCloser is closed. +func (c *ChannelGroupCloser) Closed() bool { + if c == nil { + return true + } + c.lock.RLock() + defer c.lock.RUnlock() + return c.closed +} diff --git a/pkg/wal/README.md b/pkg/wal/README.md new file mode 100644 index 000000000..e5d7b8771 --- /dev/null +++ b/pkg/wal/README.md @@ -0,0 +1,145 @@ +# WAL + +## Benchmark + +Testing environment: + +```text +MacBook Pro (13-inch, M1, 2020) +Processor: Apple M1 +Memory: 16 GB +CPU: 8 cores +``` + +Command used + +```shell +go test -bench=. -benchmem -run=^$ -benchtime=10s -count 1 -memprofile=mem_profile.out -cpuprofile=cpu_profile.out +``` + +Test report + +```text +goos: darwin +goarch: arm64 +pkg: github.com/apache/skywalking-banyandb/pkg/wal + +Benchmark_SeriesID_1-8 299770 41357 ns/op 2654 B/op 3 allocs/op +Benchmark_SeriesID_20-8 245113 42125 ns/op 1916 B/op 5 allocs/op +Benchmark_SeriesID_100-8 296856 42177 ns/op 1291 B/op 7 allocs/op +Benchmark_SeriesID_500-8 275554 42360 ns/op 1543 B/op 7 allocs/op +Benchmark_SeriesID_1000-8 289639 39556 ns/op 1543 B/op 7 allocs/op +Benchmark_SeriesID_1000_Buffer_64K-8 282884 38827 ns/op 1543 B/op 7 allocs/op +Benchmark_SeriesID_1000_Buffer_128K-8 606891 20238 ns/op 1534 B/op 7 allocs/op +Benchmark_SeriesID_1000_Buffer_512K-8 1958060 5764 ns/op 1553 B/op 7 allocs/op +Benchmark_SeriesID_1000_Buffer_1MB-8 4478250 2738 ns/op 1522 B/op 6 allocs/op +Benchmark_SeriesID_1000_Buffer_2MB-8 7515986 1537 ns/op 1818 B/op 5 allocs/op +Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush-8 7249369 1676 ns/op 1542 B/op 7 allocs/op +Benchmark_SeriesID_1000_Buffer_128K_NoSyncFlush-8 7443198 1632 ns/op 1534 B/op 7 allocs/op +Benchmark_SeriesID_1000_Buffer_512K_NoSyncFlush-8 7239253 1631 ns/op 1553 B/op 7 allocs/op +Benchmark_SeriesID_1000_Buffer_1MB_NoSyncFlush-8 8047040 1497 ns/op 1521 B/op 6 allocs/op +Benchmark_SeriesID_1000_Buffer_2MB_NoSyncFlush-8 7938543 1508 ns/op 1818 B/op 5 allocs/op +Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_16MB-8 7526020 1610 ns/op 1542 B/op 7 allocs/op +Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_32MB-8 7432317 1591 ns/op 1542 B/op 7 allocs/op +Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_64MB-8 7764439 1568 ns/op 1542 B/op 7 allocs/op +``` + +*Concluded from the benchmark test:* +- Turning off sync flush file performance is better. +- Increasing the buffer size is good for improving throughput. + +### CPU profile + +open with `go tool pprof -http="127.0.0.1:8080" cpu_profile.out` and you will get the following report + +```text +Flat Flat% Sum% Cum Cum% Name Inlined? +81.19s 29.74% 29.74% 81.19s 29.74% runtime.pthread_cond_signal +65.81s 24.11% 53.84% 65.81s 24.11% runtime.pthread_cond_wait +49.78s 18.23% 72.08% 49.78s 18.23% runtime.usleep +31.62s 11.58% 83.66% 31.62s 11.58% syscall.syscall +7.06s 2.59% 86.25% 7.06s 2.59% runtime.pthread_kill +6.74s 2.47% 88.71% 6.74s 2.47% runtime.madvise +2.43s 0.89% 89.60% 2.76s 1.01% github.com/golang/snappy.encodeBlock +2.11s 0.77% 90.38% 6.77s 2.48% runtime.scanobject +1.40s 0.51% 90.89% 2.36s 0.86% runtime.greyobject +1.32s 0.48% 91.37% 1.84s 0.67% runtime.mapaccess1 +1.03s 0.38% 91.75% 1.47s 0.54% runtime.findObject +0.98s 0.36% 92.11% 38.51s 14.11% runtime.stealWork +0.64s 0.23% 92.34% 2.43s 0.89% runtime.mallocgc +0.61s 0.22% 92.57% 2.34s 0.86% runtime.mapassign +0.33s 0.12% 92.69% 14.24s 5.22% runtime.gcDrain +0.20s 0.07% 92.76% 13.41s 4.91% runtime.lock2 +0.20s 0.07% 92.84% 4.42s 1.62% github.com/apache/skywalking-banyandb/pkg/wal.(*buffer).write (inline) +0.12s 0.04% 92.88% 112.36s 41.16% runtime.findRunnable +0.10s 0.04% 92.92% 36.86s 13.50% runtime.runqgrab +0.09s 0.03% 92.95% 1.57s 0.58% runtime.growslice +0.06s 0.02% 92.97% 36.91s 13.52% github.com/apache/skywalking-banyandb/pkg/wal.(*log).flushBuffer +0.06s 0.02% 92.99% 1.76s 0.64% github.com/apache/skywalking-banyandb/pkg/wal.(*buffer).notifyRequests +0.05s 0.02% 93.01% 106.31s 38.94% runtime.systemstack +0.05s 0.02% 93.03% 6.56s 2.40% runtime.(*mheap).allocSpan +0.05s 0.02% 93.05% 5.78s 2.12% runtime.(*gcWork).balance +0.04s 0.01% 93.06% 8.32s 3.05% runtime.gcBgMarkWorker +0.03s 0.01% 93.07% 36.89s 13.51% runtime.runqsteal +0.02s 0.01% 93.08% 67.03s 24.55% runtime.semasleep +0.02s 0.01% 93.09% 117.19s 42.93% runtime.schedule +0.02s 0.01% 93.10% 6.49s 2.38% runtime.preemptone +0.02s 0.01% 93.10% 2.80s 1.03% github.com/golang/snappy.Encode +0.02s 0.01% 93.11% 7.13s 2.61% github.com/apache/skywalking-banyandb/pkg/wal.(*log).start.func1 +0.01s 0.00% 93.11% 1.41s 0.52% time.NewTimer +0.01s 0.00% 93.12% 83.77s 30.68% runtime.wakep +0.01s 0.00% 93.12% 7.07s 2.59% runtime.signalM (inline) +0.01s 0.00% 93.12% 65.70s 24.07% runtime.notesleep +0.01s 0.00% 93.13% 3.79s 1.39% runtime.newstack +0.01s 0.00% 93.13% 78.99s 28.93% runtime.goready.func1 +0.01s 0.00% 93.14% 2.60s 0.95% runtime.forEachP +0.01s 0.00% 93.14% 5.94s 2.18% runtime.(*mheap).alloc.func1 +0.01s 0.00% 93.14% 26.23s 9.61% os.(*File).Write +0.01s 0.00% 93.15% 26.22s 9.60% internal/poll.(*FD).Write +0.01s 0.00% 93.15% 31.28s 11.46% github.com/apache/skywalking-banyandb/pkg/wal.(*log).writeWorkSegment +0.01s 0.00% 93.15% 38.70s 14.18% github.com/apache/skywalking-banyandb/pkg/wal.(*log).start.func2 +0.01s 0.00% 93.16% 2.81s 1.03% github.com/apache/skywalking-banyandb/pkg/wal.(*bufferWriter).WriteData +0 0.00% 93.16% 26.21s 9.60% syscall.write +0 0.00% 93.16% 5.04s 1.85% syscall.fcntl +0 0.00% 93.16% 26.21s 9.60% syscall.Write (inline) +0 0.00% 93.16% 6.30s 2.31% runtime.sysUsedOS (inline) +0 0.00% 93.16% 6.31s 2.31% runtime.sysUsed (inline) +0 0.00% 93.16% 66.66s 24.42% runtime.stopm +0 0.00% 93.16% 80.05s 29.32% runtime.startm +0 0.00% 93.16% 1.42s 0.52% runtime.startTheWorldWithSema +0 0.00% 93.16% 81.29s 29.78% runtime.semawakeup +0 0.00% 93.16% 4.51s 1.65% runtime.resetspinning +0 0.00% 93.16% 78.98s 28.93% runtime.ready +0 0.00% 93.16% 7.07s 2.59% runtime.preemptM +0 0.00% 93.16% 115.92s 42.46% runtime.park_m +0 0.00% 93.16% 13.03s 4.77% runtime.osyield (inline) +0 0.00% 93.16% 80.58s 29.52% runtime.notewakeup +0 0.00% 93.16% 3.58s 1.31% runtime.morestack +0 0.00% 93.16% 116.10s 42.53% runtime.mcall +0 0.00% 93.16% 1.45s 0.53% runtime.markroot +0 0.00% 93.16% 65.70s 24.07% runtime.mPark (inline) +0 0.00% 93.16% 13.41s 4.91% runtime.lockWithRank (inline) +0 0.00% 93.16% 13.41s 4.91% runtime.lock (inline) +0 0.00% 93.16% 3.35s 1.23% runtime.goschedImpl +0 0.00% 93.16% 3.32s 1.22% runtime.gopreempt_m +0 0.00% 93.16% 1.49s 0.55% runtime.gcMarkDone.func1 +0 0.00% 93.16% 14.24s 5.22% runtime.gcBgMarkWorker.func2 +0 0.00% 93.16% 5.55s 2.03% runtime.(*gcControllerState).enlistWorker +0 0.00% 93.16% 26.22s 9.60% os.(*File).write (inline) +0 0.00% 93.16% 5.04s 1.85% os.(*File).Sync +0 0.00% 93.16% 26.21s 9.60% internal/poll.ignoringEINTRIO (inline) +0 0.00% 93.16% 5.04s 1.85% internal/poll.ignoringEINTR (inline) +0 0.00% 93.16% 5.04s 1.85% internal/poll.(*FD).Fsync.func1 (inline) +0 0.00% 93.16% 5.04s 1.85% internal/poll.(*FD).Fsync +``` + +### Memory profile + +open with `go tool pprof -http="127.0.0.1:8080" mem_profile.out` and you will get the following report + +```text +Flat Flat% Sum% Cum Cum% Name +117496.32MB 87.39% 87.39% 117496.32MB 87.39% github.com/apache/skywalking-banyandb/pkg/wal.(*buffer).write +16789.80MB 12.49% 99.88% 16789.80MB 12.49% time.NewTimer +2.50MB 0.00% 99.88% 134335.12MB 99.91% github.com/apache/skywalking-banyandb/pkg/wal.(*log).start.func1 +``` \ No newline at end of file diff --git a/pkg/wal/wal.go b/pkg/wal/wal.go index bf325d0d9..39b022828 100644 --- a/pkg/wal/wal.go +++ b/pkg/wal/wal.go @@ -19,20 +19,59 @@ package wal import ( + "bytes" + "container/list" + "encoding/binary" + "fmt" + "math" + "os" + "path/filepath" + "strconv" + "strings" + "sync" "time" + "github.com/golang/snappy" + "github.com/pkg/errors" + "go.uber.org/multierr" + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/encoding" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/run" ) -// SegmentID identities a segment in a WAL. -type SegmentID uint64 +const ( + moduleName = "wal" + segmentNamePrefix = "seg" + segmentNameSuffix = ".wal" + batchLength = 8 + entryLength = 8 + seriesIDLength = 2 + seriesCountLength = 4 + timestampVolumeLength = 8 + timestampsBinaryLength = 2 + valuesBinaryLength = 2 + parseTimeStr = "2006-01-02 15:04:05" + maxRetries = 3 + maxSegmentID = uint64(math.MaxUint64) - 1 +) -// Options for creating Write-ahead Logging. -type Options struct{} +// DefaultOptions for Open(). +var DefaultOptions = &Options{ + FileSize: 67108864, // 64MB + BufferSize: 65535, // 16KB + BufferBatchInterval: 3 * time.Second, + NoSync: false, +} -// Segment allows reading underlying segments that hold WAl entities. -type Segment interface { - GetSegmentID() SegmentID +// Options for creating Write-ahead Logging. +type Options struct { + FileSize int + BufferSize int + BufferBatchInterval time.Duration + FlushQueueSize int + NoSync bool } // WAL denotes a Write-ahead logging. @@ -42,19 +81,984 @@ type Segment interface { type WAL interface { // Write a logging entity. // It will return immediately when the data is written in the buffer, - // The returned function will be called when the entity is flushed on the persistent storage. - Write(seriesID common.SeriesID, timestamp time.Time, data []byte) (func(), error) + // The callback function will be called when the entity is flushed on the persistent storage. + Write(seriesID common.GlobalSeriesID, timestamp time.Time, data []byte, callback func(common.GlobalSeriesID, time.Time, []byte, error)) // Read specified segment by SegmentID. - Read(segmentID SegmentID) (*Segment, error) + Read(segmentID SegmentID) (Segment, error) // ReadAllSegments reads all segments sorted by their creation time in ascending order. - ReadAllSegments() ([]*Segment, error) + ReadAllSegments() ([]Segment, error) // Rotate closes the open segment and opens a new one, returning the closed segment details. - Rotate() (*Segment, error) + Rotate() (Segment, error) // Delete the specified segment. Delete(segmentID SegmentID) error + // Close all of segments and stop WAL work. + Close() error +} + +// SegmentID identities a segment in a WAL. +type SegmentID uint64 + +// Segment allows reading underlying segments that hold WAl entities. +type Segment interface { + GetSegmentID() SegmentID + GetLogEntries() []LogEntry +} + +// LogEntry used for attain detail value of WAL entry. +type LogEntry interface { + GetSeriesID() common.GlobalSeriesID + GetTimestamps() []time.Time + GetValues() *list.List +} + +// log implements the WAL interface. +type log struct { + writeCloser *run.ChannelCloser + flushCloser *run.ChannelCloser + chanGroupCloser *run.ChannelGroupCloser + buffer buffer + logger *logger.Logger + bufferWriter *bufferWriter + segmentMap map[SegmentID]*segment + workSegment *segment + writeChannel chan logRequest + flushChannel chan buffer + path string + options Options + rwMutex sync.RWMutex + closerOnce sync.Once +} + +type segment struct { + file *os.File + path string + logEntries []LogEntry + segmentID SegmentID +} + +type logRequest struct { + seriesID common.GlobalSeriesID + timestamp time.Time + callback func(common.GlobalSeriesID, time.Time, []byte, error) + data []byte +} + +type logEntry struct { + timestamps []time.Time + values *list.List + seriesID common.GlobalSeriesID + entryLength uint64 + count uint32 +} + +type buffer struct { + timestampMap map[common.GlobalSeriesID][]time.Time + valueMap map[common.GlobalSeriesID][]byte + callbackMap map[common.GlobalSeriesID][]func(common.GlobalSeriesID, time.Time, []byte, error) + count int +} + +type bufferWriter struct { + buf *bytes.Buffer + seriesIDBuf *bytes.Buffer + timestampsBuf *bytes.Buffer + dataBuf []byte + dataLen int + seriesCount uint32 + batchLen uint64 } // New creates a WAL instance in the specified path. -func New(_ string, _ Options) (WAL, error) { - return nil, nil +func New(path string, options *Options) (WAL, error) { + // Check configuration options. + walOptions := DefaultOptions + if options != nil { + fileSize := options.FileSize + if fileSize <= 0 { + fileSize = DefaultOptions.FileSize + } + bufferSize := options.BufferSize + if bufferSize <= 0 { + bufferSize = DefaultOptions.BufferSize + } + bufferBatchInterval := options.BufferBatchInterval + if bufferBatchInterval <= 0 { + bufferBatchInterval = DefaultOptions.BufferBatchInterval + } + walOptions = &Options{ + FileSize: fileSize, + BufferSize: bufferSize, + BufferBatchInterval: bufferBatchInterval, + NoSync: options.NoSync, + } + } + + // Initial WAL path. + path, err := filepath.Abs(path) + if err != nil { + return nil, errors.Wrap(err, "Can not get absolute path: "+path) + } + if err := os.MkdirAll(path, os.ModePerm); err != nil { + return nil, err + } + + writeCloser := run.NewChannelCloser() + flushCloser := run.NewChannelCloser() + chanGroupCloser := run.NewChannelGroupCloser(writeCloser, flushCloser) + log := &log{ + path: path, + options: *walOptions, + logger: logger.GetLogger(moduleName), + writeChannel: make(chan logRequest), + flushChannel: make(chan buffer, walOptions.FlushQueueSize), + bufferWriter: newBufferWriter(), + writeCloser: writeCloser, + flushCloser: flushCloser, + chanGroupCloser: chanGroupCloser, + buffer: buffer{ + timestampMap: make(map[common.GlobalSeriesID][]time.Time), + valueMap: make(map[common.GlobalSeriesID][]byte), + callbackMap: make(map[common.GlobalSeriesID][]func(common.GlobalSeriesID, time.Time, []byte, error)), + count: 0, + }, + } + if err := log.load(); err != nil { + return nil, err + } + log.start() + + log.logger.Info().Str("path", path).Msg("WAL has be initialized") + return log, nil +} + +// Write a logging entity. +// It will return immediately when the data is written in the buffer, +// The callback function will be called when the entity is flushed on the persistent storage. +func (log *log) Write(seriesID common.GlobalSeriesID, timestamp time.Time, data []byte, callback func(common.GlobalSeriesID, time.Time, []byte, error)) { + if !log.writeCloser.AddSender() { + return + } + defer log.writeCloser.SenderDone() + + log.writeChannel <- logRequest{ + seriesID: seriesID, + timestamp: timestamp, + data: data, + callback: callback, + } +} + +// Read specified segment by SegmentID. +func (log *log) Read(segmentID SegmentID) (Segment, error) { + log.rwMutex.RLock() + defer log.rwMutex.RUnlock() + + segment := log.segmentMap[segmentID] + return segment, nil +} + +// ReadAllSegments reads all segments sorted by their creation time in ascending order. +func (log *log) ReadAllSegments() ([]Segment, error) { + log.rwMutex.RLock() + defer log.rwMutex.RUnlock() + + segments := make([]Segment, 0) + for _, segment := range log.segmentMap { + segments = append(segments, segment) + } + return segments, nil +} + +// Rotate closes the open segment and opens a new one, returning the closed segment details. +func (log *log) Rotate() (Segment, error) { + log.rwMutex.Lock() + defer log.rwMutex.Unlock() + + newSegmentID := uint64(log.workSegment.segmentID) + 1 + if newSegmentID > maxSegmentID { + return nil, errors.New("Segment ID overflow uint64," + + " please delete all WAL segment files and restart") + } + if err := log.workSegment.file.Close(); err != nil { + return nil, errors.Wrap(err, "Close WAL segment error") + } + + // Create new segment. + oldWorkSegment := log.workSegment + segment := &segment{ + segmentID: SegmentID(newSegmentID), + path: filepath.Join(log.path, segmentName(newSegmentID)), + } + if err := segment.openFile(true); err != nil { + return nil, errors.Wrap(err, "Open WAL segment error") + } + log.workSegment = segment + + // Update segment information. + log.segmentMap[log.workSegment.segmentID] = log.workSegment + return oldWorkSegment, nil +} + +// Delete the specified segment. +func (log *log) Delete(segmentID SegmentID) error { + log.rwMutex.Lock() + defer log.rwMutex.Unlock() + + // Segment which will be deleted must be closed. + if segmentID == log.workSegment.segmentID { + return errors.New("Can not delete the segment which is working") + } + + err := os.Remove(log.segmentMap[segmentID].path) + if err != nil { + return errors.Wrap(err, "Delete WAL segment error") + } + delete(log.segmentMap, segmentID) + return nil +} + +// Close all of segments and stop WAL work. +func (log *log) Close() error { + var globalErr error + log.closerOnce.Do(func() { + log.logger.Info().Msg("Closing WAL...") + + log.chanGroupCloser.CloseThenWait() + + if err := log.flushBuffer(log.buffer); err != nil { + globalErr = multierr.Append(globalErr, err) + } + if err := log.workSegment.file.Close(); err != nil { + globalErr = multierr.Append(globalErr, err) + } + log.logger.Info().Msg("Closed WAL") + }) + return globalErr +} + +func (log *log) start() { + var initialTasks sync.WaitGroup + initialTasks.Add(2) + + go func() { + if !log.writeCloser.AddReceiver() { + panic("writeCloser already closed") + } + defer log.writeCloser.ReceiverDone() + + log.logger.Info().Msg("Start batch task...") + initialTasks.Done() + + bufferVolume := 0 + for { + timer := time.NewTimer(log.options.BufferBatchInterval) + select { + case request, chOpen := <-log.writeChannel: + if !chOpen { + timer.Stop() + log.logger.Info().Msg("Stop batch task when write-channel closed") + return + } + + log.buffer.write(request) + if log.logger.Debug().Enabled() { + log.logger.Debug().Msg("Write request to buffer. elements: " + strconv.Itoa(log.buffer.count)) + } + + bufferVolume += request.seriesID.Volume() + timestampVolumeLength + len(request.data) + if bufferVolume > log.options.BufferSize { + log.triggerFlushing() + bufferVolume = 0 + } + case <-timer.C: + if bufferVolume == 0 { + continue + } + log.triggerFlushing() + bufferVolume = 0 + case <-log.writeCloser.CloseNotify(): + timer.Stop() + log.logger.Info().Msg("Stop batch task when close notify") + return + } + timer.Stop() + } + }() + + go func() { + if !log.flushCloser.AddReceiver() { + panic("flushCloser already closed") + } + defer log.flushCloser.ReceiverDone() + + log.logger.Info().Msg("Start flush task...") + initialTasks.Done() + + for { + select { + case batch, chOpen := <-log.flushChannel: + if !chOpen { + log.logger.Info().Msg("Stop flush task when flush-channel closed") + return + } + + startTime := time.Now() + var err error + for i := 0; i < maxRetries; i++ { + if err = log.flushBuffer(batch); err != nil { + log.logger.Err(err).Msg("Flushing buffer failed. Retrying...") + time.Sleep(time.Second) + continue + } + break + } + if log.logger.Debug().Enabled() { + log.logger.Debug().Msg("Flushed buffer to WAL file. elements: " + + strconv.Itoa(batch.count) + ", cost: " + time.Since(startTime).String()) + } + + batch.notifyRequests(err) + case <-log.flushCloser.CloseNotify(): + log.logger.Info().Msg("Stop flush task when close notify") + return + } + } + }() + + initialTasks.Wait() + log.logger.Info().Msg("Started WAL") +} + +func (log *log) triggerFlushing() { + log.flushChannel <- log.buffer + if log.logger.Debug().Enabled() { + log.logger.Debug().Msg("Send buffer to flush-channel. elements: " + strconv.Itoa(log.buffer.count)) + } + + log.newBuffer() +} + +func (log *log) newBuffer() { + log.buffer = buffer{ + timestampMap: make(map[common.GlobalSeriesID][]time.Time), + valueMap: make(map[common.GlobalSeriesID][]byte), + callbackMap: make(map[common.GlobalSeriesID][]func(common.GlobalSeriesID, time.Time, []byte, error)), + count: 0, + } +} + +func (log *log) flushBuffer(buffer buffer) error { + if buffer.count == 0 { + return nil + } + + var err error + if err = log.bufferWriter.Reset(); err != nil { + return errors.Wrap(err, "Reset buffer writer error") + } + + for seriesID, timestamps := range buffer.timestampMap { + log.bufferWriter.ResetSeries() + + if err = log.bufferWriter.WriteSeriesID(seriesID); err != nil { + return errors.Wrap(err, "Write seriesID error") + } + log.bufferWriter.WriteTimestamps(timestamps) + log.bufferWriter.WriteData(buffer.valueMap[seriesID]) + if err = log.bufferWriter.AddSeries(); err != nil { + return errors.Wrap(err, "Add series error") + } + } + + return log.writeWorkSegment(log.bufferWriter.Bytes()) +} + +func (log *log) writeWorkSegment(data []byte) error { + log.rwMutex.RLock() + defer log.rwMutex.RUnlock() + + // Write batch data to WAL segment file + if _, err := log.workSegment.file.Write(data); err != nil { + return errors.Wrap(err, "Write WAL segment file error, file: "+log.workSegment.path) + } + if !log.options.NoSync { + if err := log.workSegment.file.Sync(); err != nil { + log.logger.Warn().Msg("Sync WAL segment file to disk failed, file: " + log.workSegment.path) + } + } + return nil +} + +func (log *log) load() error { + files, err := os.ReadDir(log.path) + if err != nil { + return errors.Wrap(err, "Can not read dir: "+log.path) + } + // Load all of WAL segments. + var workSegmentID SegmentID + log.segmentMap = make(map[SegmentID]*segment) + for _, file := range files { + name := file.Name() + segmentID, parsePathErr := parseSegmentID(name) + if parsePathErr != nil { + return errors.Wrap(parsePathErr, "Parse file name error, name: "+name) + } + if segmentID > uint64(workSegmentID) { + workSegmentID = SegmentID(segmentID) + } + segment := &segment{ + segmentID: SegmentID(segmentID), + path: filepath.Join(log.path, segmentName(segmentID)), + } + if err = segment.parseLogEntries(); err != nil { + return errors.Wrap(err, "Fail to parse log entries") + } + log.segmentMap[SegmentID(segmentID)] = segment + + if log.logger.Debug().Enabled() { + log.logger.Debug().Msg("Loaded segment file: " + segment.path) + } + } + + // If load first time. + if len(log.segmentMap) == 0 { + segmentID := SegmentID(1) + segment := &segment{ + segmentID: segmentID, + path: filepath.Join(log.path, segmentName(uint64(segmentID))), + } + log.segmentMap[segmentID] = segment + log.workSegment = segment + } else { + log.workSegment = log.segmentMap[workSegmentID] + } + if err = log.workSegment.openFile(false); err != nil { + return errors.Wrap(err, "Open WAL segment error, file: "+log.workSegment.path) + } + return nil +} + +func newBufferWriter() *bufferWriter { + return &bufferWriter{ + buf: bytes.NewBuffer([]byte{}), + seriesIDBuf: bytes.NewBuffer([]byte{}), + timestampsBuf: bytes.NewBuffer([]byte{}), + dataBuf: make([]byte, 128), + } +} + +func (w *bufferWriter) Reset() error { + w.ResetSeries() + w.buf.Reset() + w.batchLen = 0 + + // pre-placement padding + err := w.writeBatchLength(0) + return err +} + +func (w *bufferWriter) ResetSeries() { + w.seriesIDBuf.Reset() + w.timestampsBuf.Reset() + w.dataLen = 0 + w.seriesCount = 0 +} + +func (w *bufferWriter) AddSeries() error { + seriesIDBytesLen := uint16(w.seriesIDBuf.Len()) + timestampsBytesLen := uint16(w.timestampsBuf.Len()) + entryLen := seriesIDLength + uint64(seriesIDBytesLen) + seriesCountLength + timestampsBinaryLength + uint64(timestampsBytesLen) + uint64(w.dataLen) + + var err error + if err = w.writeEntryLength(entryLen); err != nil { + return err + } + if err = w.writeSeriesIDLength(seriesIDBytesLen); err != nil { + return err + } + if err = w.writeSeriesID(w.seriesIDBuf.Bytes()); err != nil { + return err + } + if err = w.writeSeriesCount(w.seriesCount); err != nil { + return err + } + if err = w.writeTimestampsLength(timestampsBytesLen); err != nil { + return err + } + if err = w.writeTimestamps(w.timestampsBuf.Bytes()); err != nil { + return err + } + if err = w.writeData(w.dataBuf[:w.dataLen]); err != nil { + return err + } + w.batchLen += entryLen + return nil +} + +func (w *bufferWriter) Bytes() []byte { + batchBytes := w.buf.Bytes() + batchLen := uint64(len(batchBytes)) - batchLength + return w.rewriteBatchLength(batchBytes, batchLen) +} + +func (w *bufferWriter) WriteSeriesID(s common.GlobalSeriesID) error { + if err := writeUint64(w.seriesIDBuf, uint64(s.SeriesID)); err != nil { + return err + } + if _, err := w.seriesIDBuf.WriteString(s.Name); err != nil { + return err + } + return nil +} + +func (w *bufferWriter) WriteTimestamps(timestamps []time.Time) { + timestampWriter := encoding.NewWriter() + timestampEncoder := encoding.NewXOREncoder(timestampWriter) + timestampWriter.Reset(w.timestampsBuf) + for _, timestamp := range timestamps { + timestampEncoder.Write(timeToUnixNano(timestamp)) + } + timestampWriter.Flush() + w.seriesCount = uint32(len(timestamps)) +} + +func (w *bufferWriter) WriteData(data []byte) { + maxEncodedLen := snappy.MaxEncodedLen(len(data)) + dataBufLen := len(w.dataBuf) + if dataBufLen < maxEncodedLen { + newCapacity := (dataBufLen * 2) - (dataBufLen / 2) + if newCapacity < maxEncodedLen { + newCapacity = maxEncodedLen + } + w.dataBuf = make([]byte, newCapacity) + } + snappyData := snappy.Encode(w.dataBuf, data) + w.dataLen = len(snappyData) +} + +func (w *bufferWriter) writeBatchLength(data uint64) error { + return writeUint64(w.buf, data) +} + +func (w *bufferWriter) rewriteBatchLength(b []byte, batchLen uint64) []byte { + _ = b[7] // early bounds check to guarantee safety of writes below + b[0] = byte(batchLen) + b[1] = byte(batchLen >> 8) + b[2] = byte(batchLen >> 16) + b[3] = byte(batchLen >> 24) + b[4] = byte(batchLen >> 32) + b[5] = byte(batchLen >> 40) + b[6] = byte(batchLen >> 48) + b[7] = byte(batchLen >> 56) + return b +} + +func (w *bufferWriter) writeEntryLength(data uint64) error { + return writeUint64(w.buf, data) +} + +func (w *bufferWriter) writeSeriesIDLength(data uint16) error { + return writeUint16(w.buf, data) +} + +func (w *bufferWriter) writeSeriesID(data []byte) error { + _, err := w.buf.Write(data) + return err +} + +func (w *bufferWriter) writeSeriesCount(data uint32) error { + return writeUint32(w.buf, data) +} + +func (w *bufferWriter) writeTimestampsLength(data uint16) error { + return writeUint16(w.buf, data) +} + +func (w *bufferWriter) writeTimestamps(data []byte) error { + _, err := w.buf.Write(data) + return err +} + +func (w *bufferWriter) writeData(data []byte) error { + _, err := w.buf.Write(data) + return err +} + +func (segment *segment) GetSegmentID() SegmentID { + return segment.segmentID +} + +func (segment *segment) GetLogEntries() []LogEntry { + return segment.logEntries +} + +func (segment *segment) openFile(overwrite bool) error { + var err error + if overwrite { + segment.file, err = os.OpenFile(segment.path, os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.ModePerm) + } else { + segment.file, err = os.OpenFile(segment.path, os.O_CREATE|os.O_RDWR|os.O_APPEND, os.ModePerm) + } + return err +} + +func (segment *segment) parseLogEntries() error { + segmentBytes, err := os.ReadFile(segment.path) + if err != nil { + return errors.Wrap(err, "Read WAL segment failed, path: "+segment.path) + } + + var logEntries []LogEntry + var data []byte + var batchLen uint64 + var entryLen uint64 + var seriesIDLen uint16 + var seriesID common.GlobalSeriesID + var seriesCount uint32 + var timestampsBinaryLen uint16 + var entryEndPosition uint64 + + var oldPos uint64 + var pos uint64 + parseNextBatchFlag := true + segmentBytesLen := uint64(len(segmentBytes)) + + for { + if parseNextBatchFlag { + if segmentBytesLen <= batchLength { + break + } + data = segmentBytes[pos : pos+batchLength] + batchLen, err = segment.parseBatchLength(data) + if err != nil { + return errors.Wrap(err, "Parse batch length error") + } + + if segmentBytesLen <= batchLen { + break + } + + pos += batchLength + oldPos = pos + parseNextBatchFlag = false + } + + // Parse entryLength. + data = segmentBytes[pos : pos+entryLength] + + entryLen, err = segment.parseEntryLength(data) + if err != nil { + return errors.Wrap(err, "Parse entry length error") + } + pos += entryLength + + // Mark entry end-position + entryEndPosition = pos + entryLen + if segmentBytesLen < entryEndPosition { + break + } + + // Parse seriesIDLength. + data = segmentBytes[pos : pos+seriesIDLength] + seriesIDLen, err = segment.parseSeriesIDLength(data) + if err != nil { + return errors.Wrap(err, "Parse seriesID length error") + } + pos += seriesIDLength + + // Parse seriesID. + data = segmentBytes[pos : pos+uint64(seriesIDLen)] + seriesID = segment.parseSeriesID(data) + pos += uint64(seriesIDLen) + + // Parse series count. + data = segmentBytes[pos : pos+seriesCountLength] + seriesCount, err = segment.parseSeriesCountLength(data) + if err != nil { + return errors.Wrap(err, "Parse series count length error") + } + pos += seriesCountLength + + // Parse timestamps compression binary. + data = segmentBytes[pos : pos+timestampsBinaryLength] + timestampsBinaryLen, err = segment.parseTimestampsLength(data) + if err != nil { + return errors.Wrap(err, "Parse timestamps length error") + } + pos += timestampsBinaryLength + data = segmentBytes[pos : pos+uint64(timestampsBinaryLen)] + var timestamps []time.Time + timestamps, err = segment.parseTimestamps(seriesCount, data) + if err != nil { + return errors.Wrap(err, "Parse timestamps compression binary error") + } + pos += uint64(timestampsBinaryLen) + + // Parse values compression binary. + data = segmentBytes[pos:entryEndPosition] + values, err := segment.parseValuesBinary(data) + if err != nil { + return errors.Wrap(err, "Parse values compression binary error") + } + if values.Len() != int(seriesCount) { + return errors.New("values binary items not match series count. series count: " + + strconv.Itoa(int(seriesCount)) + ", values binary items: " + strconv.Itoa(values.Len())) + } + pos = entryEndPosition + + logEntry := &logEntry{ + entryLength: entryLen, + seriesID: seriesID, + count: seriesCount, + timestamps: timestamps, + values: values, + } + logEntries = append(logEntries, logEntry) + + if pos == segmentBytesLen { + break + } + if pos-oldPos == batchLen { + parseNextBatchFlag = true + } + } + segment.logEntries = logEntries + return nil +} + +func (segment *segment) parseBatchLength(data []byte) (uint64, error) { + var batchLen uint64 + buf := bytes.NewBuffer(data) + if err := binary.Read(buf, binary.LittleEndian, &batchLen); err != nil { + return 0, err + } + return batchLen, nil +} + +func (segment *segment) parseEntryLength(data []byte) (uint64, error) { + var entryLen uint64 + buf := bytes.NewBuffer(data) + if err := binary.Read(buf, binary.LittleEndian, &entryLen); err != nil { + return 0, err + } + return entryLen, nil +} + +func (segment *segment) parseSeriesIDLength(data []byte) (uint16, error) { + var seriesIDLen uint16 + buf := bytes.NewBuffer(data) + if err := binary.Read(buf, binary.LittleEndian, &seriesIDLen); err != nil { + return 0, err + } + return seriesIDLen, nil +} + +func (segment *segment) parseSeriesID(data []byte) common.GlobalSeriesID { + return common.GlobalSeriesID{ + SeriesID: common.SeriesID(bytesToUint64(data[:8])), + Name: string(data[8:]), + } +} + +func (segment *segment) parseSeriesCountLength(data []byte) (uint32, error) { + var seriesCount uint32 + buf := bytes.NewBuffer(data) + if err := binary.Read(buf, binary.LittleEndian, &seriesCount); err != nil { + return 0, err + } + return seriesCount, nil +} + +func (segment *segment) parseTimestampsLength(data []byte) (uint16, error) { + var timestampsLen uint16 + buf := bytes.NewBuffer(data) + if err := binary.Read(buf, binary.LittleEndian, ×tampsLen); err != nil { + return 0, err + } + return timestampsLen, nil +} + +func (segment *segment) parseTimestamps(seriesCount uint32, data []byte) ([]time.Time, error) { + timestampReader := encoding.NewReader(bytes.NewReader(data)) + timestampDecoder := encoding.NewXORDecoder(timestampReader) + var timestamps []time.Time + for i := 0; i < int(seriesCount); i++ { + if !timestampDecoder.Next() { + return nil, errors.New("Timestamps length not match series count") + } + timestamps = append(timestamps, unixNanoToTime(timestampDecoder.Value())) + } + return timestamps, nil +} + +func (segment *segment) parseValuesBinary(data []byte) (*list.List, error) { + var err error + if data, err = snappy.Decode(nil, data); err != nil { + return nil, errors.Wrap(err, "Decode values compression binary error") + } + + values := list.New() + position := 0 + for { + nextPosition, value := readValuesBinary(data, position, valuesBinaryLength) + if value == nil { + break + } + values.PushBack(value) + position = nextPosition + } + return values, nil +} + +func (logEntry *logEntry) GetSeriesID() common.GlobalSeriesID { + return logEntry.seriesID +} + +func (logEntry *logEntry) GetTimestamps() []time.Time { + return logEntry.timestamps +} + +func (logEntry *logEntry) GetValues() *list.List { + return logEntry.values +} + +func (buffer *buffer) write(request logRequest) { + seriesID := request.seriesID + buffer.timestampMap[seriesID] = append(buffer.timestampMap[seriesID], request.timestamp) + + // Value item: binary-length(2-bytes) + binary data(n-bytes) + binaryLen := uint16(len(request.data)) + buffer.valueMap[seriesID] = append(buffer.valueMap[seriesID], byte(binaryLen), byte(binaryLen>>8)) + buffer.valueMap[seriesID] = append(buffer.valueMap[seriesID], request.data...) + + buffer.callbackMap[seriesID] = append(buffer.callbackMap[seriesID], request.callback) + buffer.count++ +} + +func (buffer *buffer) notifyRequests(err error) { + var timestamps []time.Time + var values []byte + var valueItem []byte + var valuePos int + for seriesID, callbacks := range buffer.callbackMap { + timestamps = buffer.timestampMap[seriesID] + values = buffer.valueMap[seriesID] + valuePos = 0 + for index, callback := range callbacks { + valuePos, valueItem = readValuesBinary(values, valuePos, valuesBinaryLength) + buffer.runningCallback(func() { + callback(seriesID, timestamps[index], valueItem, err) + }) + } + } +} + +func (buffer *buffer) runningCallback(callback func()) { + defer func() { + _ = recover() + }() + callback() +} + +func segmentName(segmentID uint64) string { + return fmt.Sprintf("%v%016x%v", segmentNamePrefix, segmentID, segmentNameSuffix) +} + +// Parse segment ID. segmentName example: seg0000000000000001.wal. +func parseSegmentID(segmentName string) (uint64, error) { + _ = segmentName[22:] // early bounds check to guarantee safety of reads below + if !strings.HasPrefix(segmentName, segmentNamePrefix) { + return 0, errors.New("Invalid segment name: " + segmentName) + } + if !strings.HasSuffix(segmentName, segmentNameSuffix) { + return 0, errors.New("Invalid segment name: " + segmentName) + } + return strconv.ParseUint(segmentName[3:19], 10, 64) +} + +func readValuesBinary(raw []byte, position int, offsetLen int) (int, []byte) { + if position == len(raw) { + return position, nil + } + + data := raw[position : position+offsetLen] + binaryLen := bytesToUint16(data) + position += offsetLen + + data = raw[position : position+int(binaryLen)] + position += int(binaryLen) + return position, data +} + +func writeUint16(buffer *bytes.Buffer, data uint16) error { + var err error + if err = buffer.WriteByte(byte(data)); err != nil { + return err + } + if err = buffer.WriteByte(byte(data >> 8)); err != nil { + return err + } + return err +} + +func writeUint32(buffer *bytes.Buffer, data uint32) error { + var err error + if err = buffer.WriteByte(byte(data)); err != nil { + return err + } + if err = buffer.WriteByte(byte(data >> 8)); err != nil { + return err + } + if err = buffer.WriteByte(byte(data >> 16)); err != nil { + return err + } + if err = buffer.WriteByte(byte(data >> 24)); err != nil { + return err + } + return err +} + +func writeUint64(buffer *bytes.Buffer, data uint64) error { + var err error + if err = buffer.WriteByte(byte(data)); err != nil { + return err + } + if err = buffer.WriteByte(byte(data >> 8)); err != nil { + return err + } + if err = buffer.WriteByte(byte(data >> 16)); err != nil { + return err + } + if err = buffer.WriteByte(byte(data >> 24)); err != nil { + return err + } + if err = buffer.WriteByte(byte(data >> 32)); err != nil { + return err + } + if err = buffer.WriteByte(byte(data >> 40)); err != nil { + return err + } + if err = buffer.WriteByte(byte(data >> 48)); err != nil { + return err + } + if err = buffer.WriteByte(byte(data >> 56)); err != nil { + return err + } + return err +} + +func bytesToUint16(buf []byte) uint16 { + return binary.LittleEndian.Uint16(buf) +} + +func bytesToUint64(buf []byte) uint64 { + return binary.LittleEndian.Uint64(buf) +} + +func timeToUnixNano(time time.Time) uint64 { + return uint64(time.UnixNano()) +} + +func unixNanoToTime(unixNano uint64) time.Time { + return time.Unix(0, int64(unixNano)) } diff --git a/pkg/wal/wal_benchmark_test.go b/pkg/wal/wal_benchmark_test.go new file mode 100644 index 000000000..d6eccc396 --- /dev/null +++ b/pkg/wal/wal_benchmark_test.go @@ -0,0 +1,450 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package wal (Write-ahead logging) is an independent component to ensure data reliability. +package wal + +import ( + "crypto/rand" + "fmt" + "math/big" + "os" + "path/filepath" + "testing" + "time" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +var ( + path = "benchmark" + baseTime = time.Now().UnixMilli() + data = newBinaryData() + dataLen = len(data) + seriesID1 = newSeriesIDList(1) + seriesID20 = newSeriesIDList(20) + seriesID100 = newSeriesIDList(100) + seriesID500 = newSeriesIDList(500) + seriesID1000 = newSeriesIDList(1000) + callback = func(seriesID common.GlobalSeriesID, t time.Time, bytes []byte, err error) {} +) + +func Benchmark_SeriesID_1(b *testing.B) { + wal := newWAL(nil) + defer closeWAL(wal) + + seriesID := seriesID1 + seriesIDLen := len(seriesID) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + } + b.StopTimer() +} + +func Benchmark_SeriesID_20(b *testing.B) { + wal := newWAL(nil) + defer closeWAL(wal) + + seriesID := seriesID20 + seriesIDLen := len(seriesID) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + } + b.StopTimer() +} + +func Benchmark_SeriesID_100(b *testing.B) { + wal := newWAL(nil) + defer closeWAL(wal) + + seriesID := seriesID100 + seriesIDLen := len(seriesID) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + } + b.StopTimer() +} + +func Benchmark_SeriesID_500(b *testing.B) { + wal := newWAL(nil) + defer closeWAL(wal) + + seriesID := seriesID500 + seriesIDLen := len(seriesID) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + } + b.StopTimer() +} + +func Benchmark_SeriesID_1000(b *testing.B) { + wal := newWAL(nil) + defer closeWAL(wal) + + seriesID := seriesID1000 + seriesIDLen := len(seriesID) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + } + b.StopTimer() +} + +func Benchmark_SeriesID_1000_Buffer_64K(b *testing.B) { + wal := newWAL(&Options{BufferSize: 1024 * 64}) + defer closeWAL(wal) + + seriesID := seriesID1000 + seriesIDLen := len(seriesID) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + } + b.StopTimer() +} + +func Benchmark_SeriesID_1000_Buffer_128K(b *testing.B) { + wal := newWAL(&Options{BufferSize: 1024 * 128}) + defer closeWAL(wal) + + seriesID := seriesID1000 + seriesIDLen := len(seriesID) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + } + b.StopTimer() +} + +func Benchmark_SeriesID_1000_Buffer_512K(b *testing.B) { + wal := newWAL(&Options{BufferSize: 1024 * 512}) + defer closeWAL(wal) + + seriesID := seriesID1000 + seriesIDLen := len(seriesID) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + } + b.StopTimer() +} + +func Benchmark_SeriesID_1000_Buffer_1MB(b *testing.B) { + wal := newWAL(&Options{BufferSize: 1024 * 1024}) + defer closeWAL(wal) + + seriesID := seriesID1000 + seriesIDLen := len(seriesID) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + } + b.StopTimer() +} + +func Benchmark_SeriesID_1000_Buffer_2MB(b *testing.B) { + wal := newWAL(&Options{BufferSize: 1024 * 1024 * 2}) + defer closeWAL(wal) + + seriesID := seriesID1000 + seriesIDLen := len(seriesID) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + } + b.StopTimer() +} + +func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush(b *testing.B) { + wal := newWAL(&Options{BufferSize: 1024 * 64, NoSync: true}) + defer closeWAL(wal) + + seriesID := seriesID1000 + seriesIDLen := len(seriesID) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + } + b.StopTimer() +} + +func Benchmark_SeriesID_1000_Buffer_128K_NoSyncFlush(b *testing.B) { + wal := newWAL(&Options{BufferSize: 1024 * 128, NoSync: true}) + defer closeWAL(wal) + + seriesID := seriesID1000 + seriesIDLen := len(seriesID) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + } + b.StopTimer() +} + +func Benchmark_SeriesID_1000_Buffer_512K_NoSyncFlush(b *testing.B) { + wal := newWAL(&Options{BufferSize: 1024 * 512, NoSync: true}) + defer closeWAL(wal) + + seriesID := seriesID1000 + seriesIDLen := len(seriesID) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + } + b.StopTimer() +} + +func Benchmark_SeriesID_1000_Buffer_1MB_NoSyncFlush(b *testing.B) { + wal := newWAL(&Options{BufferSize: 1024 * 1024, NoSync: true}) + defer closeWAL(wal) + + seriesID := seriesID1000 + seriesIDLen := len(seriesID) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + } + b.StopTimer() +} + +func Benchmark_SeriesID_1000_Buffer_2MB_NoSyncFlush(b *testing.B) { + wal := newWAL(&Options{BufferSize: 1024 * 1024 * 2, NoSync: true}) + defer closeWAL(wal) + + seriesID := seriesID1000 + seriesIDLen := len(seriesID) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback) + } + b.StopTimer() +} + +func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_16MB(b *testing.B) { + wal := newWAL(&Options{BufferSize: 1024 * 64, NoSync: true}) + defer closeWAL(wal) + + seriesID := seriesID1000 + seriesIDLen := len(seriesID) + + rotateSize := 1024 * 1024 * 16 // 16MB + rotateChan := make(chan struct{}) + rotateMessage := struct{}{} + seriesIDVolume := 16 + timeVolume := 8 + var logVolume int + var binaryData []byte + + b.ResetTimer() + go func() { + for range rotateChan { + segment, err := wal.Rotate() + if err != nil { + panic(err) + } + wal.Delete(segment.GetSegmentID()) + } + }() + for i := 0; i < b.N; i++ { + binaryData = data[i%dataLen].binary + wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), binaryData, callback) + + logVolume += seriesIDVolume + timeVolume + len(binaryData) + if logVolume >= rotateSize { + rotateChan <- rotateMessage + logVolume = 0 + } + } + b.StopTimer() +} + +func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_32MB(b *testing.B) { + wal := newWAL(&Options{BufferSize: 1024 * 64, NoSync: true}) + defer closeWAL(wal) + + seriesID := seriesID1000 + seriesIDLen := len(seriesID) + + rotateSize := 1024 * 1024 * 32 // 32MB + rotateChan := make(chan struct{}) + rotateMessage := struct{}{} + seriesIDVolume := 16 + timeVolume := 8 + var logVolume int + var binaryData []byte + + b.ResetTimer() + go func() { + for range rotateChan { + segment, err := wal.Rotate() + if err != nil { + panic(err) + } + wal.Delete(segment.GetSegmentID()) + } + }() + for i := 0; i < b.N; i++ { + binaryData = data[i%dataLen].binary + wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), binaryData, callback) + + logVolume += seriesIDVolume + timeVolume + len(binaryData) + if logVolume >= rotateSize { + rotateChan <- rotateMessage + logVolume = 0 + } + } + b.StopTimer() +} + +func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_64MB(b *testing.B) { + wal := newWAL(&Options{BufferSize: 1024 * 64, NoSync: true}) + defer closeWAL(wal) + + seriesID := seriesID1000 + seriesIDLen := len(seriesID) + + rotateSize := 1024 * 1024 * 64 // 64MB + rotateChan := make(chan struct{}) + rotateMessage := struct{}{} + seriesIDVolume := 16 + timeVolume := 8 + var logVolume int + var binaryData []byte + + b.ResetTimer() + go func() { + for range rotateChan { + segment, err := wal.Rotate() + if err != nil { + panic(err) + } + wal.Delete(segment.GetSegmentID()) + } + }() + for i := 0; i < b.N; i++ { + binaryData = data[i%dataLen].binary + wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), binaryData, callback) + + logVolume += seriesIDVolume + timeVolume + len(binaryData) + if logVolume >= rotateSize { + rotateChan <- rotateMessage + logVolume = 0 + } + } + b.StopTimer() +} + +func newWAL(options *Options) WAL { + os.RemoveAll(path) + + logger.Init(logger.Logging{Level: "error"}) + logPath, _ := filepath.Abs(path) + if options == nil { + options = &Options{ + BufferSize: 1024 * 64, // 64KB + BufferBatchInterval: 3 * time.Second, + } + } + wal, err := New(logPath, options) + if err != nil { + panic(err) + } + return wal +} + +func closeWAL(wal WAL) { + err := wal.Close() + if err != nil { + panic(err) + } + + err = os.RemoveAll(path) + if err != nil { + panic(err) + } +} + +func newSeriesIDList(series int) []common.GlobalSeriesID { + var seriesIDSet []common.GlobalSeriesID + for i := 0; i < series; i++ { + seriesID := common.GlobalSeriesID{ + SeriesID: common.SeriesID(i), + Name: fmt.Sprintf("series-%d", i), + } + seriesIDSet = append(seriesIDSet, seriesID) + } + return seriesIDSet +} + +type Data struct { + binary []byte +} + +func newBinaryData() []Data { + var data []Data + for i := 0; i < 2000; i++ { + data = append(data, Data{binary: []byte(randStr())}) + } + return data +} + +func randStr() string { + bytes := []byte("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ\"',:{}[]") + result := []byte{} + + var err error + var strLengthLower int64 = 200 + var strLengthUpper int64 = 1024 + var strLength *big.Int + var strRandIndex *big.Int + strLength, err = rand.Int(rand.Reader, big.NewInt(strLengthUpper)) + if err != nil { + panic(err) + } + if strLength.Int64() < strLengthLower { + strLength = big.NewInt(strLengthLower) + } + for i := 0; i < int(strLength.Int64()); i++ { + strRandIndex, err = rand.Int(rand.Reader, big.NewInt(int64(len(bytes)))) + if err != nil { + panic(err) + } + + result = append(result, bytes[strRandIndex.Int64()]) + } + return string(result) +} diff --git a/pkg/wal/wal_test.go b/pkg/wal/wal_test.go new file mode 100644 index 000000000..f21cda561 --- /dev/null +++ b/pkg/wal/wal_test.go @@ -0,0 +1,254 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package version can be used to implement embedding versioning details from +// git branches and tags into the binary importing this package. +package wal_test + +import ( + "bytes" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "github.com/onsi/gomega/gleak" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/wal" +) + +var _ = ginkgo.Describe("WAL", func() { + var ( + path string + log wal.WAL + options *wal.Options + goods []gleak.Goroutine + ) + ginkgo.BeforeEach(func() { + options = &wal.Options{ + BufferSize: 1024, // 1KB + BufferBatchInterval: 1 * time.Second, + } + goods = gleak.Goroutines() + }) + ginkgo.AfterEach(func() { + err := os.RemoveAll(path) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + gomega.Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + }) + + ginkgo.Context("Write and Read", func() { + ginkgo.BeforeEach(func() { + var err error + path, err = filepath.Abs("test1") + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + options = &wal.Options{ + BufferSize: 1024, // 1KB + BufferBatchInterval: 1 * time.Second, + } + log, err = wal.New(path, options) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + }) + + ginkgo.AfterEach(func() { + err := log.Close() + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + }) + + ginkgo.It("should write and read data correctly", func() { + seriesIDCount := 100 + seriesIDElementCount := 20 + writeLogCount := seriesIDCount * seriesIDElementCount + + var wg sync.WaitGroup + wg.Add(writeLogCount) + baseTime := time.Now() + for i := 0; i < seriesIDCount; i++ { + seriesID := &common.GlobalSeriesID{ + SeriesID: common.SeriesID(i), + Name: fmt.Sprintf("series-%d", i), + } + go func() { + for j := 0; j < seriesIDElementCount; j++ { + timestamp := time.UnixMilli(baseTime.UnixMilli() + int64(j)) + value := []byte(fmt.Sprintf("value-%d", j)) + callback := func(seriesID common.GlobalSeriesID, t time.Time, bytes []byte, err error) { + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + + wg.Done() + } + log.Write(*seriesID, timestamp, value, callback) + } + }() + } + wg.Wait() + + err := log.Close() + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + log, err = wal.New(path, options) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + segments, err := log.ReadAllSegments() + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + readLogCount := 0 + + for _, segment := range segments { + entries := segment.GetLogEntries() + for _, entity := range entries { + seriesID := entity.GetSeriesID() + seriesIDSequence := seriesID.SeriesID + expectSeriesID := common.GlobalSeriesID{ + SeriesID: seriesIDSequence, + Name: fmt.Sprintf("series-%d", seriesIDSequence), + } + // Check seriesID + gomega.Expect(expectSeriesID == seriesID).To(gomega.BeTrue()) + + timestamps := entity.GetTimestamps() + values := entity.GetValues() + var timestamp time.Time + element := values.Front() + for i := 0; i < len(timestamps); i++ { + timestamp = timestamps[i] + + // Check timestamp + gomega.Expect(timestamp.UnixMilli() >= baseTime.UnixMilli()).To(gomega.BeTrue()) + gomega.Expect(timestamp.UnixMilli() <= baseTime.UnixMilli()+int64(seriesIDElementCount)).To(gomega.BeTrue()) + + // Check binary + elementSequence := timestamp.UnixMilli() - baseTime.UnixMilli() + value := element.Value.([]byte) + gomega.Expect(bytes.Equal([]byte(fmt.Sprintf("value-%d", elementSequence)), value)).To(gomega.BeTrue()) + + readLogCount++ + element = element.Next() + } + } + } + + // Check write/read log count + gomega.Expect(writeLogCount == readLogCount).To(gomega.BeTrue()) + }) + }) + + ginkgo.Context("Rotate", func() { + ginkgo.BeforeEach(func() { + var err error + path, err = filepath.Abs("test2") + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + options = &wal.Options{ + BufferSize: 1, + } + log, err = wal.New(path, options) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + }) + + ginkgo.AfterEach(func() { + err := log.Close() + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + }) + + ginkgo.It("should rotate correctly", func() { + var wg sync.WaitGroup + writeLogCount := 3 + + wg.Add(writeLogCount) + expectSegments := make(map[wal.SegmentID]common.GlobalSeriesID) + for i := 0; i < writeLogCount; i++ { + seriesID := &common.GlobalSeriesID{ + SeriesID: common.SeriesID(i), + Name: fmt.Sprintf("series-%d", i), + } + timestamp := time.Now() + value := []byte(fmt.Sprintf("value-%d", i)) + callback := func(seriesID common.GlobalSeriesID, t time.Time, bytes []byte, err error) { + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + + // Rotate + segment, err := log.Rotate() + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + expectSegments[segment.GetSegmentID()] = seriesID + + wg.Done() + } + log.Write(*seriesID, timestamp, value, callback) + } + wg.Wait() + + err := log.Close() + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + log, err = wal.New(path, options) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + + // Check segment files + gomega.Expect(len(expectSegments) == writeLogCount).To(gomega.BeTrue()) + for segmentID, seriesID := range expectSegments { + segment, err := log.Read(segmentID) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + entries := segment.GetLogEntries() + gomega.Expect(len(entries) == 1).To(gomega.BeTrue()) + gomega.Expect(entries[0].GetSeriesID() == seriesID).To(gomega.BeTrue()) + } + }) + }) + + ginkgo.Context("Delete", func() { + ginkgo.BeforeEach(func() { + var err error + path, err = filepath.Abs("test3") + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + log, err = wal.New(path, options) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + }) + + ginkgo.AfterEach(func() { + err := log.Close() + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + }) + + ginkgo.It("should delete correctly", func() { + var err error + + segments, err := log.ReadAllSegments() + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + gomega.Expect(len(segments) == 1).To(gomega.BeTrue()) + + segment, err := log.Rotate() + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + segments, err = log.ReadAllSegments() + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + gomega.Expect(len(segments) == 2).To(gomega.BeTrue()) + + err = log.Delete(segment.GetSegmentID()) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + + log.Close() + log, err = wal.New(path, options) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + + segments, err = log.ReadAllSegments() + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + + // Check segment files + gomega.Expect(len(segments) == 1).To(gomega.BeTrue()) + }) + }) +})