Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supports deep copy for time window #58

Merged
merged 2 commits into from
Oct 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion metrics/time_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ type SlotAggregator[T any] interface {
Sub(acc, v T) T
}

// Cloneable is used by time window to return cloned value of pointer type for thread safe.
type Cloneable[T any] interface {
Clone(v T) T
}

type SlotAggregatorCloneable[T any] interface {
SlotAggregator[T]
Cloneable[T]
}

type SimpleSlotData interface {
int | int64 | uint | uint64 | float32 | float64
}
Expand Down Expand Up @@ -82,8 +92,11 @@ type TimeWindow[T any] struct {

aggData T // aggregation data within the time window scope
aggregator SlotAggregator[T] // to aggregate slot data

dataCloneable Cloneable[T] // deep copy for thread safe
}

// NewTimeWindow creates a new time window.
func NewTimeWindow[T any](slotInterval time.Duration, numSlots int, aggregator SlotAggregator[T], val ...T) *TimeWindow[T] {
tw := TimeWindow[T]{
slots: list.New(),
Expand All @@ -99,10 +112,18 @@ func NewTimeWindow[T any](slotInterval time.Duration, numSlots int, aggregator S
return &tw
}

// NewSimpleTimeWindow creates a new time window with default SimpleSlotData aggregator.
func NewSimpleTimeWindow[T SimpleSlotData](slotInterval time.Duration, numSlots int, val ...T) *TimeWindow[T] {
return NewTimeWindow(slotInterval, numSlots, simpleSlotAggregator[T]{}, val...)
}

// NewTimeWindowCloneable create a new time window that supports to return cloned data.
func NewTimeWindowCloneable[T any](slotInterval time.Duration, numSlots int, aggregator SlotAggregatorCloneable[T], val ...T) *TimeWindow[T] {
tw := NewTimeWindow(slotInterval, numSlots, aggregator, val...)
tw.dataCloneable = aggregator
return tw
}

// Add adds data sample to time window
func (tw *TimeWindow[T]) Add(sample T) {
tw.mu.Lock()
Expand All @@ -127,7 +148,14 @@ func (tw *TimeWindow[T]) Data() T {
tw.mu.Lock()
defer tw.mu.Unlock()

return tw.data(time.Now())
data := tw.data(time.Now())

if tw.dataCloneable == nil {
return data
}

// return cloned data if specified
return tw.dataCloneable.Clone(data)
}

func (tw *TimeWindow[T]) data(now time.Time) T {
Expand Down
Loading