Skip to content

Commit

Permalink
Merge branch 'main' into fix-bug
Browse files Browse the repository at this point in the history
  • Loading branch information
hanahmily authored Jul 17, 2023
2 parents 344676c + 482831f commit e2f255d
Show file tree
Hide file tree
Showing 10 changed files with 2,354 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Release Notes.
### Features

- List all properties in a group.
- Implement Write-ahead Logging

### Bugs

Expand Down
26 changes: 26 additions & 0 deletions api/common/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,32 @@ func (s SeriesID) Marshal() []byte {
return convert.Uint64ToBytes(uint64(s))
}

// GlobalSeriesID identities a series in a shard.
type GlobalSeriesID struct {
Name string
SeriesID SeriesID
}

// Marshal encodes global series id to bytes.
func (s GlobalSeriesID) Marshal() []byte {
seriesIDBytes := convert.Uint64ToBytes(uint64(s.SeriesID))
nameBytes := []byte(s.Name)
return append(seriesIDBytes, nameBytes...)
}

// Volume returns the estimated bytes volume of global series id.
func (s GlobalSeriesID) Volume() int {
return 8 + len(s.Name)
}

// ParseGlobalSeriesID parses global series id from bytes.
func ParseGlobalSeriesID(b []byte) GlobalSeriesID {
return GlobalSeriesID{
SeriesID: SeriesID(convert.BytesToUint64(b[:8])),
Name: string(b[8:]),
}
}

// positionKey is a context key to store the module position.
var positionKey = contextPositionKey{}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ require (
github.com/golang/glog v1.1.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/golang/snappy v0.0.3
github.com/google/btree v1.1.2 // indirect
github.com/google/flatbuffers v1.12.1 // indirect
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 // indirect
Expand Down
124 changes: 124 additions & 0 deletions pkg/run/channel_closer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package run

import (
"context"
"sync"
)

var dummyChannelCloserChan <-chan struct{}

// ChannelCloser can close a goroutine then wait for it to stop.
type ChannelCloser struct {
ctx context.Context
cancel context.CancelFunc
sender sync.WaitGroup
receiver sync.WaitGroup
lock sync.RWMutex
closed bool
}

// NewChannelCloser instances a new ChannelCloser.
func NewChannelCloser() *ChannelCloser {
c := &ChannelCloser{}
c.ctx, c.cancel = context.WithCancel(context.Background())
c.sender.Add(1)
c.receiver.Add(1)
return c
}

// AddSender adds a running sender.
func (c *ChannelCloser) AddSender() bool {
if c == nil {
return false
}
c.lock.RLock()
defer c.lock.RUnlock()
if c.closed {
return false
}
c.sender.Add(1)
return true
}

// SenderDone notifies that running sender is done.
func (c *ChannelCloser) SenderDone() {
if c == nil {
return
}
c.sender.Done()
}

// AddReceiver adds a running receiver.
func (c *ChannelCloser) AddReceiver() bool {
if c == nil {
return false
}
c.lock.RLock()
defer c.lock.RUnlock()
if c.closed {
return false
}
c.receiver.Add(1)
return true
}

// ReceiverDone notifies that receiver task is done.
func (c *ChannelCloser) ReceiverDone() {
if c == nil {
return
}
c.receiver.Done()
}

// CloseNotify receives a signal from Close.
func (c *ChannelCloser) CloseNotify() <-chan struct{} {
if c == nil {
return dummyChannelCloserChan
}
return c.ctx.Done()
}

// CloseThenWait closes all tasks then waits till they are done.
func (c *ChannelCloser) CloseThenWait() {
if c == nil {
return
}

c.lock.Lock()
c.closed = true
c.lock.Unlock()

c.sender.Done()
c.sender.Wait()

c.cancel()
c.receiver.Done()
c.receiver.Wait()
}

// Closed returns whether the ChannelCloser is closed.
func (c *ChannelCloser) Closed() bool {
if c == nil {
return true
}
c.lock.RLock()
defer c.lock.RUnlock()
return c.closed
}
Loading

0 comments on commit e2f255d

Please sign in to comment.