Skip to content

Commit

Permalink
Rename SeriesIDV2 to GlobalSeriesID
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 committed Jul 6, 2023
1 parent 4762af1 commit c86be30
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 32 deletions.
14 changes: 7 additions & 7 deletions api/common/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,22 @@ func (s SeriesID) Marshal() []byte {
return convert.Uint64ToBytes(uint64(s))
}

// SeriesIDV2 identities a series in a shard.
type SeriesIDV2 struct {
// GlobalSeriesID identities a series in a shard.
type GlobalSeriesID struct {
Name string
SeriesID SeriesID
}

// Marshal encodes series id v2 to bytes.
func (s SeriesIDV2) Marshal() []byte {
// Marshal encodes global series id to bytes.
func (s GlobalSeriesID) Marshal() []byte {
seriesIDBytes := convert.Uint64ToBytes(uint64(s.SeriesID))
nameBytes := []byte(s.Name)
return append(seriesIDBytes, nameBytes...)
}

// ParseSeriesIDV2 parses series id v2 from bytes.
func ParseSeriesIDV2(b []byte) SeriesIDV2 {
return SeriesIDV2{
// ParseGlobalSeriesID parses global series id from bytes.
func ParseGlobalSeriesID(b []byte) GlobalSeriesID {
return GlobalSeriesID{
SeriesID: SeriesID(convert.BytesToUint64(b[:8])),
Name: string(b[8:]),
}
Expand Down
38 changes: 19 additions & 19 deletions pkg/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type WAL interface {
// Write a logging entity.
// It will return immediately when the data is written in the buffer,
// The callback function will be called when the entity is flushed on the persistent storage.
Write(seriesID common.SeriesIDV2, timestamp time.Time, data []byte, callback func(common.SeriesIDV2, time.Time, []byte, error))
Write(seriesID common.GlobalSeriesID, timestamp time.Time, data []byte, callback func(common.GlobalSeriesID, time.Time, []byte, error))
// Read specified segment by SegmentID.
Read(segmentID SegmentID) (Segment, error)
// ReadAllSegments reads all segments sorted by their creation time in ascending order.
Expand All @@ -104,7 +104,7 @@ type Segment interface {

// LogEntry used for attain detail value of WAL entry.
type LogEntry interface {
GetSeriesID() common.SeriesIDV2
GetSeriesID() common.GlobalSeriesID
GetTimestamps() []time.Time
GetValues() *list.List
}
Expand Down Expand Up @@ -136,24 +136,24 @@ type segment struct {
}

type logRequest struct {
seriesID common.SeriesIDV2
seriesID common.GlobalSeriesID
timestamp time.Time
callback func(common.SeriesIDV2, time.Time, []byte, error)
callback func(common.GlobalSeriesID, time.Time, []byte, error)
data []byte
}

type logEntry struct {
timestamps []time.Time
values *list.List
seriesID common.SeriesIDV2
seriesID common.GlobalSeriesID
entryLength uint64
count uint32
}

type buffer struct {
timestampMap map[common.SeriesIDV2][]time.Time
valueMap map[common.SeriesIDV2][]byte
callbackMap map[common.SeriesIDV2][]func(common.SeriesIDV2, time.Time, []byte, error)
timestampMap map[common.GlobalSeriesID][]time.Time
valueMap map[common.GlobalSeriesID][]byte
callbackMap map[common.GlobalSeriesID][]func(common.GlobalSeriesID, time.Time, []byte, error)
count int
}

Expand Down Expand Up @@ -205,9 +205,9 @@ func New(path string, options *Options) (WAL, error) {
flushCloser: flushCloser,
chanGroupCloser: chanGroupCloser,
buffer: buffer{
timestampMap: make(map[common.SeriesIDV2][]time.Time),
valueMap: make(map[common.SeriesIDV2][]byte),
callbackMap: make(map[common.SeriesIDV2][]func(common.SeriesIDV2, time.Time, []byte, error)),
timestampMap: make(map[common.GlobalSeriesID][]time.Time),
valueMap: make(map[common.GlobalSeriesID][]byte),
callbackMap: make(map[common.GlobalSeriesID][]func(common.GlobalSeriesID, time.Time, []byte, error)),
count: 0,
},
}
Expand All @@ -223,7 +223,7 @@ func New(path string, options *Options) (WAL, error) {
// Write a logging entity.
// It will return immediately when the data is written in the buffer,
// The callback function will be called when the entity is flushed on the persistent storage.
func (log *log) Write(seriesID common.SeriesIDV2, timestamp time.Time, data []byte, callback func(common.SeriesIDV2, time.Time, []byte, error)) {
func (log *log) Write(seriesID common.GlobalSeriesID, timestamp time.Time, data []byte, callback func(common.GlobalSeriesID, time.Time, []byte, error)) {
if !log.writeCloser.AddSender() {
return
}
Expand Down Expand Up @@ -434,9 +434,9 @@ func (log *log) triggerFlushing() {

func (log *log) newBuffer() {
log.buffer = buffer{
timestampMap: make(map[common.SeriesIDV2][]time.Time),
valueMap: make(map[common.SeriesIDV2][]byte),
callbackMap: make(map[common.SeriesIDV2][]func(common.SeriesIDV2, time.Time, []byte, error)),
timestampMap: make(map[common.GlobalSeriesID][]time.Time),
valueMap: make(map[common.GlobalSeriesID][]byte),
callbackMap: make(map[common.GlobalSeriesID][]func(common.GlobalSeriesID, time.Time, []byte, error)),
count: 0,
}
}
Expand Down Expand Up @@ -632,7 +632,7 @@ func (segment *segment) parseLogEntries() error {
var batchLen uint64
var entryLen uint64
var seriesIDLen uint16
var seriesID common.SeriesIDV2
var seriesID common.GlobalSeriesID
var seriesCount uint32
var timestampsBinaryLen uint16
var entryEndPosition uint64
Expand Down Expand Up @@ -772,8 +772,8 @@ func (segment *segment) parseSeriesIDLength(data []byte) (uint16, error) {
return seriesIDLen, nil
}

func (segment *segment) parseSeriesID(data []byte) common.SeriesIDV2 {
return common.ParseSeriesIDV2(data)
func (segment *segment) parseSeriesID(data []byte) common.GlobalSeriesID {
return common.ParseGlobalSeriesID(data)
}

func (segment *segment) parseSeriesCountLength(data []byte) (uint32, error) {
Expand Down Expand Up @@ -826,7 +826,7 @@ func (segment *segment) parseValuesBinary(data []byte) (*list.List, error) {
return values, nil
}

func (logEntry *logEntry) GetSeriesID() common.SeriesIDV2 {
func (logEntry *logEntry) GetSeriesID() common.GlobalSeriesID {
return logEntry.seriesID
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/wal/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ var _ = ginkgo.Describe("WAL", func() {
wg.Add(writeLogCount)
baseTime := time.Now()
for i := 0; i < seriesIDCount; i++ {
seriesID := &common.SeriesIDV2{
seriesID := &common.GlobalSeriesID{
SeriesID: common.SeriesID(i),
Name: fmt.Sprintf("series-%d", i),
}
go func() {
for j := 0; j < seriesIDElementCount; j++ {
timestamp := time.UnixMilli(baseTime.UnixMilli() + int64(j))
value := []byte(fmt.Sprintf("value-%d", j))
callback := func(seriesID common.SeriesIDV2, t time.Time, bytes []byte, err error) {
callback := func(seriesID common.GlobalSeriesID, t time.Time, bytes []byte, err error) {
gomega.Expect(err).ToNot(gomega.HaveOccurred())

wg.Done()
Expand All @@ -115,7 +115,7 @@ var _ = ginkgo.Describe("WAL", func() {
for _, entity := range entries {
seriesID := entity.GetSeriesID()
seriesIDSequence := seriesID.SeriesID
expectSeriesID := common.SeriesIDV2{
expectSeriesID := common.GlobalSeriesID{
SeriesID: seriesIDSequence,
Name: fmt.Sprintf("series-%d", seriesIDSequence),
}
Expand Down Expand Up @@ -171,15 +171,15 @@ var _ = ginkgo.Describe("WAL", func() {
writeLogCount := 3

wg.Add(writeLogCount)
expectSegments := make(map[wal.SegmentID]common.SeriesIDV2)
expectSegments := make(map[wal.SegmentID]common.GlobalSeriesID)
for i := 0; i < writeLogCount; i++ {
seriesID := &common.SeriesIDV2{
seriesID := &common.GlobalSeriesID{
SeriesID: common.SeriesID(i),
Name: fmt.Sprintf("series-%d", i),
}
timestamp := time.Now()
value := []byte(fmt.Sprintf("value-%d", i))
callback := func(seriesID common.SeriesIDV2, t time.Time, bytes []byte, err error) {
callback := func(seriesID common.GlobalSeriesID, t time.Time, bytes []byte, err error) {
gomega.Expect(err).ToNot(gomega.HaveOccurred())

// Rotate
Expand Down

0 comments on commit c86be30

Please sign in to comment.