forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbroker.go
130 lines (110 loc) · 3.33 KB
/
broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package influxdb
import (
"fmt"
"log"
"net/http"
"time"
"github.com/influxdb/influxdb/messaging"
)
// Broker represents an InfluxDB specific messaging broker.
type Broker struct {
*messaging.Broker
done chan struct{}
// send CQ processing requests to the same data node
currentCQProcessingNode *messaging.Replica
// variables to control when to trigger processing and when to timeout
TriggerInterval time.Duration
TriggerTimeout time.Duration
TriggerFailurePause time.Duration
}
const (
// DefaultContinuousQueryCheckTime is how frequently the broker will ask a data node
// in the cluster to run any continuous queries that should be run.
DefaultContinuousQueryCheckTime = 1 * time.Second
// DefaultDataNodeTimeout is how long the broker will wait before timing out on a data node
// that it has requested process continuous queries.
DefaultDataNodeTimeout = 1 * time.Second
// DefaultFailureSleep is how long the broker will sleep before trying the next data node in
// the cluster if the current data node failed to respond
DefaultFailureSleep = 100 * time.Millisecond
)
// NewBroker returns a new instance of a Broker with default values.
func NewBroker() *Broker {
b := &Broker{
TriggerInterval: 5 * time.Second,
TriggerTimeout: 20 * time.Second,
TriggerFailurePause: 1 * time.Second,
}
b.Broker = messaging.NewBroker()
return b
}
// RunContinuousQueryLoop starts running continuous queries on a background goroutine.
func (b *Broker) RunContinuousQueryLoop() {
b.done = make(chan struct{})
go b.continuousQueryLoop(b.done)
}
// Close closes the broker.
func (b *Broker) Close() error {
if b.done != nil {
close(b.done)
b.done = nil
}
return b.Broker.Close()
}
func (b *Broker) continuousQueryLoop(done chan struct{}) {
for {
// Check if broker is currently leader.
if b.Broker.IsLeader() {
b.runContinuousQueries()
}
// Sleep until either the broker is closed or we need to run continuous queries again
select {
case <-done:
return
case <-time.After(DefaultContinuousQueryCheckTime):
}
}
}
func (b *Broker) runContinuousQueries() {
next := 0
for {
// if the current node hasn't been set it's our first time or we're reset. move to the next one
if b.currentCQProcessingNode == nil {
dataNodes := b.Broker.Replicas()
if len(dataNodes) == 0 {
return // don't have any nodes to try, give it up
}
next = next % len(dataNodes)
b.currentCQProcessingNode = dataNodes[next]
next++
}
// if no error, we're all good
err := b.requestContinuousQueryProcessing()
if err == nil {
return
}
log.Printf("broker cq: error hitting data node: %s: %s\n", b.currentCQProcessingNode.URL, err.Error())
// reset and let the loop try the next data node in the cluster
b.currentCQProcessingNode = nil
<-time.After(DefaultFailureSleep)
}
}
func (b *Broker) requestContinuousQueryProcessing() error {
// Send request.
cqURL := copyURL(b.currentCQProcessingNode.URL)
cqURL.Path = "/process_continuous_queries"
cqURL.Scheme = "http"
client := &http.Client{
Timeout: DefaultDataNodeTimeout,
}
resp, err := client.Post(cqURL.String(), "application/octet-stream", nil)
if err != nil {
return err
}
defer resp.Body.Close()
// Check if created.
if resp.StatusCode != http.StatusAccepted {
return fmt.Errorf("request returned status %s", resp.Status)
}
return nil
}