-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathagent.go
104 lines (90 loc) · 2.42 KB
/
agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package pipelines
import (
"log"
"strconv"
"time"
"github.com/bign8/pipelines/utils"
"github.com/golang/protobuf/proto"
"github.com/nats-io/nats"
)
type agent struct {
ID string
conn *nats.Conn
inbox chan *Work
}
func (a *agent) start() (<-chan *Work, chan<- stater) {
a.conn.Subscribe("pipeliens.agent."+a.ID+".enqueue", a.enqueue)
a.conn.Subscribe("pipelines.agent.search", a.search)
a.inbox = make(chan *Work)
return a.buffer()
}
func (a *agent) search(m *nats.Msg) {
m, err := a.conn.Request("pipelines.server.agent.find", []byte(a.ID), time.Second)
if err != nil {
log.Printf("Error in Agent Search Request: %s", err)
}
newGUID := string(m.Data)
if newGUID != a.ID {
// TODO: move to new subscription IDs
log.Printf("TODO: release all existing subscriptions and start with new ID: %s -> %s", a.ID, newGUID)
} else {
log.Printf("Re-found UUID: %s", a.ID)
}
a.ID = newGUID
}
func (a *agent) enqueue(m *nats.Msg) {
var work Work
err := proto.Unmarshal(m.Data, &work)
if err != nil {
log.Printf("error unmarshaling: %s", err)
a.conn.Publish(m.Reply, []byte(err.Error()))
return
}
a.inbox <- &work
a.conn.Publish(m.Reply, []byte("+"))
}
func (a *agent) buffer() (<-chan *Work, chan<- stater) {
outbox := make(chan *Work)
completed := make(chan stater)
go func() {
const maxRunning = 200
pending := utils.NewQueue()
stats := make(map[string]int64)
ticker := time.Tick(5 * time.Second)
var active, lastLength, lastActive = 0, -1, -1
for {
var first *Work
var starting chan *Work
if pending.Len() > 0 && active < maxRunning {
first = pending.Poll().(*Work)
starting = outbox
}
select {
case <-ticker:
length := pending.Len()
if length != lastLength || active != lastActive {
log.Printf("Queue Depth: %d; Active: %d", length, active)
lastLength, lastActive = length, active
}
// REPORT ANALYTICS ALL THE TIME!!!
for key, value := range stats {
if value != 0 {
a.conn.Publish("pipelines.stats."+key, []byte(strconv.FormatInt(value, 10)))
stats[key] = 0
}
}
case work := <-a.inbox:
pending.Push(work)
stats["enqueued_"+work.Service]++
case starting <- first:
active++
case stat := <-completed:
stats["duration_"+stat.Subject] += stat.Duration
stats["completed_"+stat.Subject]++
active--
a.conn.Publish("pipelines.server.agent.stop", []byte(a.ID))
}
}
}()
return outbox, completed
}