forked from tleyden/elastic-thought
-
Notifications
You must be signed in to change notification settings - Fork 0
/
changes_listener.go
219 lines (163 loc) · 5.84 KB
/
changes_listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package elasticthought
import (
"encoding/json"
"fmt"
"io"
"strings"
"github.com/couchbaselabs/logg"
"github.com/tleyden/go-couch"
)
// A changes listener listens for changes on the _changes feed and reacts to them.
// The changes listener currently runs as a goroutine in the httpd process, and
// so the system only currently supports having a single httpd process, because otherwise
// there would be multiple changes listeners on the same changes feed, which will
// cause duplicate jobs to get kicked off. If the system needs to support multiple
// http processes, then the changes listener needs to run in its own process.
type ChangesListener struct {
Configuration Configuration
Database couch.Database
JobScheduler JobScheduler
}
// Create a new ChangesListener
func NewChangesListener(c Configuration, jobScheduler JobScheduler) (*ChangesListener, error) {
db := c.DbConnection()
return &ChangesListener{
Configuration: c,
Database: db,
JobScheduler: jobScheduler,
}, nil
}
// Follow changes feed. This will typically be run in its own goroutine.
func (c ChangesListener) FollowChangesFeed() {
logg.LogTo("CHANGES", "going to follow changes feed")
var since interface{}
handleChange := func(reader io.Reader) interface{} {
logg.LogTo("CHANGES", "handleChange() callback called")
changes, err := decodeChanges(reader)
if err != nil {
// it's very common for this to timeout while waiting for new changes.
// since we want to follow the changes feed forever, just log an error
logg.LogTo("CHANGES", "%T decoding changes: %v.", err, err)
return since
}
c.processChanges(changes)
since = changes.LastSequence
logg.LogTo("CHANGES", "returning since: %v", since)
return since
}
options := map[string]interface{}{}
options["feed"] = "longpoll"
logg.LogTo("CHANGES", "Following changes feed: %+v.", options)
// this will block until the handleChange callback returns nil
c.Database.Changes(handleChange, options)
logg.LogPanic("Changes listener died -- this should never happen")
}
func (c ChangesListener) processChanges(changes couch.Changes) {
for _, change := range changes.Results {
if change.Deleted {
logg.LogTo("CHANGES", "change was deleted, skipping")
continue
}
// ignore certain docs, like "_user/*"
if strings.HasPrefix(change.Id, "_user") {
logg.LogTo("CHANGES", "Ignoring change: %v", change.Id)
continue
}
doc := ElasticThoughtDoc{}
err := c.Database.Retrieve(change.Id, &doc)
if err != nil {
errMsg := fmt.Errorf("Didn't retrieve: %v - %v", change.Id, err)
logg.LogError(errMsg)
continue
}
switch doc.Type {
case DOC_TYPE_DATAFILE:
c.handleDatafileChange(change, doc)
case DOC_TYPE_DATASET:
c.handleDatasetChange(change, doc)
case DOC_TYPE_TRAINING_JOB:
c.handleTrainingJobChange(change, doc)
case DOC_TYPE_CLASSIFY_JOB:
c.handleClassifyJobChange(change, doc)
}
}
}
func (c ChangesListener) handleTrainingJobChange(change couch.Change, doc ElasticThoughtDoc) {
logg.LogTo("CHANGES", "got a training job doc: %+v", doc)
// create a Training Job doc from the ElasticThoughtDoc
trainingJob := &TrainingJob{}
if err := c.Database.Retrieve(change.Id, &trainingJob); err != nil {
errMsg := fmt.Errorf("Didn't retrieve: %v - %v", change.Id, err)
logg.LogError(errMsg)
return
}
// check the state, only schedule if state == pending
if trainingJob.ProcessingState != Pending {
logg.LogTo("CHANGES", "State != pending: %+v", trainingJob)
return
}
job := NewJobDescriptor(doc.Id)
c.JobScheduler.ScheduleJob(*job)
}
func (c ChangesListener) handleClassifyJobChange(change couch.Change, doc ElasticThoughtDoc) {
logg.LogTo("CHANGES", "got a classify job doc: %+v", doc)
// create a Training Job doc from the ElasticThoughtDoc
classifyJob := NewClassifyJob(c.Configuration)
if err := classifyJob.Find(change.Id); err != nil {
errMsg := fmt.Errorf("Could not find: %v - %v", change.Id, err)
logg.LogError(errMsg)
return
}
// check the state, only schedule if state == pending
if classifyJob.ProcessingState != Pending {
logg.LogTo("CHANGES", "State != pending: %+v", classifyJob)
return
}
job := NewJobDescriptor(doc.Id)
c.JobScheduler.ScheduleJob(*job)
}
func (c ChangesListener) handleDatasetChange(change couch.Change, doc ElasticThoughtDoc) {
logg.LogTo("CHANGES", "got a dataset doc: %+v", doc)
// create a Dataset doc from the ElasticThoughtDoc
dataset := NewDataset(c.Configuration)
if err := c.Database.Retrieve(change.Id, &dataset); err != nil {
errMsg := fmt.Errorf("Didn't retrieve: %v - %v", change.Id, err)
logg.LogError(errMsg)
return
}
logg.LogTo("CHANGES", "convert to dataset: %+v", dataset)
// check the state, only schedule if state == pending
if dataset.ProcessingState != Pending {
logg.LogTo("CHANGES", "Dataset state != pending: %+v", dataset)
return
}
job := NewJobDescriptor(doc.Id)
c.JobScheduler.ScheduleJob(*job)
}
func (c ChangesListener) handleDatafileChange(change couch.Change, doc ElasticThoughtDoc) {
logg.LogTo("CHANGES", "got a datafile doc: %+v", doc)
// create a Datafile doc from the ElasticThoughtDoc
datafile := NewDatafile(c.Configuration)
if err := c.Database.Retrieve(change.Id, &datafile); err != nil {
errMsg := fmt.Errorf("Didn't retrieve: %v - %v", change.Id, err)
logg.LogError(errMsg)
return
}
logg.LogTo("CHANGES", "convert to datafile: %+v", datafile)
// check the state, only schedule if state == pending
if datafile.ProcessingState != Pending {
logg.LogTo("CHANGES", "Datafile state != pending: %+v", datafile)
return
}
job := NewJobDescriptor(doc.Id)
c.JobScheduler.ScheduleJob(*job)
}
func decodeChanges(reader io.Reader) (couch.Changes, error) {
changes := couch.Changes{}
decoder := json.NewDecoder(reader)
err := decoder.Decode(&changes)
if err != nil {
logg.LogTo("CHANGES", "Err decoding changes: %v", err)
}
return changes, err
}