Skip to content

Commit

Permalink
initial version
Browse files Browse the repository at this point in the history
  • Loading branch information
trinitum committed Aug 29, 2017
0 parents commit 5daa73d
Show file tree
Hide file tree
Showing 9 changed files with 1,019 additions and 0 deletions.
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2017 Pavel Shaydo

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
71 changes: 71 additions & 0 deletions count_avg_aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package estream

import (
"math"
)

// CountAvgAggregator struct represents aggregator calculating simple average
// value over the number of events.
type CountAvgAggregator struct {
getValue func(interface{}) int64
sum int64
sqSum int64
count uint64
}

// NewCountAvgAggregator creates a new CountAvgAggregator and returns a
// reference to it. Argument is a function that accepts an event as the
// argument and returns the numeric value for this event.
func NewCountAvgAggregator(getValue func(interface{}) int64) *CountAvgAggregator {
return &CountAvgAggregator{getValue: getValue}
}

// Enter is called when a new event enters window
func (d *CountAvgAggregator) Enter(e Event, w Window) {
val := d.getValue(e.Value)
d.sum += val
d.sqSum += val * val
d.count++
}

// Leave is called when an event leaves window
func (d *CountAvgAggregator) Leave(e Event, w Window) {
val := d.getValue(e.Value)
d.sum -= val
d.sqSum -= val * val
d.count--
}

// Reset is called for batch aggregators when the window is full
func (d *CountAvgAggregator) Reset(w Window) {
d.sum = 0
d.sqSum = 0
d.count = 0
}

// TimeChange is called when time for the window has changed
func (d *CountAvgAggregator) TimeChange(w Window) {}

// Mean returns current sample averate of values of all events in the window
func (d *CountAvgAggregator) Mean() float64 {
if d.count == 0 {
return math.NaN()
}
return float64(d.sum) / float64(d.count)
}

// Variance returns sample variance of all events in the window
func (d *CountAvgAggregator) Variance() float64 {
if d.count == 0 {
return math.NaN()
} else if d.count == 1 {
return float64(0)
} else {
mean := d.Mean()
vari := (float64(d.sqSum) - float64(d.count)*mean*mean) / float64(d.count-1)
if vari < 0 {
vari = 0
}
return vari
}
}
41 changes: 41 additions & 0 deletions count_avg_aggregator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package estream

import (
"math"
"testing"
)

func TestCountAvgAggregator(t *testing.T) {
d := NewCountAvgAggregator(func(i interface{}) int64 { return i.(int64) })

type dTest struct {
op int
value int64
mean float64
vari float64
}
dTests := []dTest{
{1, 10, 10, 0},
{1, 20, 15, 50},
{1, 0, 10, 100},
{-1, 10, 10, 200},
{0, 0, math.NaN(), math.NaN()},
}
for i, test := range dTests {
ev := Event{0, test.value}
w := Window{}
if test.op > 0 {
d.Enter(ev, w)
} else if test.op < 0 {
d.Leave(ev, w)
} else {
d.Reset(w)
}
if (math.IsNaN(test.mean) && !math.IsNaN(d.Mean())) || math.Abs(d.Mean()-test.mean) > 1e-6 {
t.Errorf("In test %d expected mean to be %f, but it's %f", i, test.mean, d.Mean())
}
if (math.IsNaN(test.vari) && !math.IsNaN(d.Variance())) || math.Abs(d.Variance()-test.vari) > 1e-6 {
t.Errorf("In test %d expected variance to be %f, but it's %f", i, test.vari, d.Variance())
}
}
}
5 changes: 5 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/*
Package estream provides framework for processing streams of events. It
allows you to aggregate, transform and filter streams of events.
*/
package estream
54 changes: 54 additions & 0 deletions estream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package estream

// Event struct represents an event. Event consists of timestamp, which is an
// abstract 64bit integer value, and abstract payload.
type Event struct {
// Timestamp of the event. Abstract integer value.
Timestamp int64
// Value. This is payload of the event.
Value interface{}
}

// Window struct represents sliding window attached to aggregator.
type Window struct {
// Window start time
StartTime int64
// Window end time
EndTime int64
// Events that are in the window
Events []Event
}

// TimeLength returns the length of the time interval that is included in
// window
func (w *Window) TimeLength() int64 {
return w.EndTime - w.StartTime
}

func (w *Window) shiftEvent() Event {
ev := w.Events[0]
w.Events = w.Events[1:]
return ev
}

// Aggregator interface represents aggregate function. The aggregator is
// attached to the Stream with window parameters specified and stream invokes
// its methods when time changes or events enter or leave aggregator's window.
type Aggregator interface {
// Enter is called when event enters aggregator's window. Event's
// timestamp is always equal to window's end time, and event is
// already in the Window
Enter(Event, Window)
// Leave is called when event is leaving aggregator's window.
// Window's start time is always equal to event's timestamp, and
// event is already removed from the Window. For batch aggregators
// Reset method is used instead.
Leave(Event, Window)
// Reset is called for batch aggregators when the window is full, and
// all events are leaving window. Window is already empty and start
// time is equal to end time.
Reset(Window)
// TimeChange is called when start or end time of the window has
// changed.
TimeChange(Window)
}
Loading

0 comments on commit 5daa73d

Please sign in to comment.