-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathcoordinator.go
62 lines (51 loc) · 2.12 KB
/
coordinator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package metafora
// CoordinatorContext is the context passed to coordinators by the core
// consumer.
type CoordinatorContext interface {
// Lost is called by the Coordinator when a claimed task is lost to another
// node. The Consumer will stop the task locally.
//
// Since this implies there is a window of time where the task is executing
// more than once, this is a sign of an unhealthy cluster.
Lost(Task)
}
// Coordinator is the core interface Metafora uses to discover, claim, and
// tasks as well as receive commands.
type Coordinator interface {
// Init is called once by the consumer to provide a Logger to Coordinator
// implementations. NewConsumer will return Init's return value.
Init(CoordinatorContext) error
// Watch the broker for claimable tasks. Watch blocks until Close is called
// or it encounters an error. Tasks are sent to consumer via the tasks chan.
Watch(tasks chan<- Task) (err error)
// Claim is called by the Consumer when a Balancer has determined that a task
// ID can be claimed. Claim returns false if another consumer has already
// claimed the ID.
Claim(Task) bool
// Release a task for other consumers to claim. May be called after Close.
Release(Task)
// Done is called by Metafora when a task has been completed and should never
// be scheduled to run again (in other words: deleted from the broker).
//
// May be called after Close.
Done(Task)
// Command blocks until a command for this node is received from the broker
// by the coordinator. Command must return (nil, nil) when Close is called.
Command() (Command, error)
// Close the coordinator. Stop waiting for tasks and commands. Remove node from broker.
//
// Do not release tasks. The consumer will handle task releasing.
Close()
// Name of the coordinator for use in logs and other tooling.
Name() string
}
type coordinatorContext struct {
*Consumer
}
// Lost is a light wrapper around Coordinator.stopTask to make it suitable for
// calling by Coordinator implementations via the CoordinatorContext interface.
func (ctx *coordinatorContext) Lost(t Task) {
tid := t.ID()
Errorf("Lost task %s", tid)
ctx.stopTask(tid)
}