forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.go
120 lines (108 loc) · 2.72 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package kapacitor
import (
"fmt"
"log"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/stateful"
)
type StreamNode struct {
node
s *pipeline.StreamNode
}
// Create a new StreamNode which copies all data to children
func newStreamNode(et *ExecutingTask, n *pipeline.StreamNode, l *log.Logger) (*StreamNode, error) {
sn := &StreamNode{
node: node{Node: n, et: et, logger: l},
s: n,
}
sn.node.runF = sn.runSourceStream
return sn, nil
}
func (s *StreamNode) runSourceStream([]byte) error {
for pt, ok := s.ins[0].NextPoint(); ok; pt, ok = s.ins[0].NextPoint() {
for _, child := range s.outs {
err := child.CollectPoint(pt)
if err != nil {
return err
}
}
}
return nil
}
type FromNode struct {
node
s *pipeline.FromNode
expression stateful.Expression
scopePool stateful.ScopePool
dimensions []string
allDimensions bool
db string
rp string
name string
}
// Create a new FromNode which filters data from a source.
func newFromNode(et *ExecutingTask, n *pipeline.FromNode, l *log.Logger) (*FromNode, error) {
sn := &FromNode{
node: node{Node: n, et: et, logger: l},
s: n,
db: n.Database,
rp: n.RetentionPolicy,
name: n.Measurement,
}
sn.node.runF = sn.runStream
sn.allDimensions, sn.dimensions = determineDimensions(n.Dimensions)
if n.Lambda != nil {
expr, err := stateful.NewExpression(n.Lambda.Expression)
if err != nil {
return nil, fmt.Errorf("Failed to compile from expression: %v", err)
}
sn.expression = expr
sn.scopePool = stateful.NewScopePool(stateful.FindReferenceVariables(n.Lambda.Expression))
}
return sn, nil
}
func (s *FromNode) runStream([]byte) error {
for pt, ok := s.ins[0].NextPoint(); ok; pt, ok = s.ins[0].NextPoint() {
s.timer.Start()
if s.matches(pt) {
if s.s.Truncate != 0 {
pt.Time = pt.Time.Truncate(s.s.Truncate)
}
if s.s.Round != 0 {
pt.Time = pt.Time.Round(s.s.Round)
}
pt = setGroupOnPoint(pt, s.allDimensions, s.dimensions)
s.timer.Pause()
for _, child := range s.outs {
err := child.CollectPoint(pt)
if err != nil {
return err
}
}
s.timer.Resume()
}
s.timer.Stop()
}
return nil
}
func (s *FromNode) matches(p models.Point) bool {
if s.db != "" && p.Database != s.db {
return false
}
if s.rp != "" && p.RetentionPolicy != s.rp {
return false
}
if s.name != "" && p.Name != s.name {
return false
}
if s.expression != nil {
if pass, err := EvalPredicate(s.expression, s.scopePool, p.Time, p.Fields, p.Tags); err != nil {
s.logger.Println("E! error while evaluating WHERE expression:", err)
return false
} else {
return pass
}
}
return true
}