-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathshuttle.go
117 lines (104 loc) · 3.05 KB
/
shuttle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package shuttle
import (
"io"
"io/ioutil"
"log"
"sync"
metrics "github.com/rcrowley/go-metrics"
)
// Default logger to /dev/null
var (
discardLogger = log.New(ioutil.Discard, "", 0)
)
// Shuttle is the main entry point into the library
type Shuttle struct {
LogLineReader
config Config
Batches chan Batch
readers []*LogLineReader
MetricsRegistry metrics.Registry
oWaiter, rWaiter *sync.WaitGroup
Drops, Lost *Counter
NewFormatterFunc NewHTTPFormatterFunc
Logger *log.Logger
ErrLogger *log.Logger
}
// NewShuttle returns a properly constructed Shuttle with a given config
func NewShuttle(config Config) *Shuttle {
b := make(chan Batch, config.BackBuff)
mr := metrics.NewRegistry()
return &Shuttle{
config: config,
Batches: b,
Drops: NewCounter(0),
Lost: NewCounter(0),
MetricsRegistry: mr,
NewFormatterFunc: config.FormatterFunc,
readers: make([]*LogLineReader, 0),
oWaiter: new(sync.WaitGroup),
rWaiter: new(sync.WaitGroup),
Logger: discardLogger,
ErrLogger: discardLogger,
}
}
// Launch a shuttle by spawing it's outlets and batchers (in that order), which
// is the reverse of shutdown.
func (s *Shuttle) Launch() {
s.startOutlets()
for _, rdr := range s.readers {
s.rWaiter.Add(1)
go func(rdr *LogLineReader) {
rdr.ReadLines()
s.rWaiter.Done()
}(rdr)
}
}
// startOutlet launches config.NumOutlets number of outlets. When inbox is
// closed the outlets will finish up their output and exit.
func (s *Shuttle) startOutlets() {
for i := 0; i < s.config.NumOutlets; i++ {
s.oWaiter.Add(1)
go func() {
outlet := NewHTTPOutlet(s)
outlet.Outlet()
s.oWaiter.Done()
}()
}
}
// LoadReader into the shuttle for processing it's lines. Use this if you want
// log-shuttle to track the readers for you. The errors returned by ReadLogLines
// are discarded.
func (s *Shuttle) LoadReader(rdr io.ReadCloser) {
r := NewLogLineReader(rdr, s)
s.readers = append(s.readers, r)
}
// CloseReaders closes all tracked readers and returns any errors returned by
// Close()ing the readers
func (s *Shuttle) CloseReaders() []error {
var errors []error
for _, closer := range s.readers {
if err := closer.Close(); err != nil {
errors = append(errors, err)
}
}
return errors
}
// WaitForReadersToFinish to finish reading
func (s *Shuttle) WaitForReadersToFinish() {
s.rWaiter.Wait()
}
// DockReaders closes all tracked readers and waits for all reading go routines
// to finish.
func (s *Shuttle) DockReaders() []error {
errors := s.CloseReaders()
s.WaitForReadersToFinish()
return errors
}
// Land gracefully terminates the shuttle instance, ensuring that anything
// read is batched and delivered. A panic is likely to happen if Land() is
// called before any readers passed to any ReadLogLines() calls aren't closed.
func (s *Shuttle) Land() {
s.DockReaders()
close(s.Batches) // Close the batch channel, all of the outlets will stop once they are done
s.oWaiter.Wait() // Wait for them to be done
}