-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvisibility.go
107 lines (88 loc) · 2.14 KB
/
visibility.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package honu
import (
"encoding/json"
"os"
"time"
)
// VisibilityBufferSize describes the maximum number of async visiblity log
// statements before the caller will have to block.
const VisibilityBufferSize = 10000
// NewVisibilityLogger creates a logger for write visibility at the path.
func NewVisibilityLogger(path string) (*VisibilityLogger, error) {
out, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, err
}
vl := &VisibilityLogger{
file: out,
err: nil,
msgs: make(chan *visibilityMessage, VisibilityBufferSize),
done: make(chan bool),
}
go vl.flusher()
return vl, nil
}
// VisibilityLogger records the time a write becomes visible on the local
// replica, storing the information on disk. It uses an asynchronous writer
// so it doesn't block other store operations.
type VisibilityLogger struct {
file *os.File
err error
msgs chan *visibilityMessage
done chan bool
}
// simple data structure for storing visibility information.
type visibilityMessage struct {
Key string
Version string
Timestamp time.Time
}
// Log a Put to the key/value store
func (l *VisibilityLogger) Log(key, version string) {
l.msgs <- &visibilityMessage{
Key: key, Version: version, Timestamp: time.Now(),
}
}
// Close the logger and wait until it's done writing all buffered messages.
func (l *VisibilityLogger) Close() error {
close(l.msgs)
<-l.done
if l.err != nil {
return l.err
}
if err := l.file.Sync(); err != nil {
return err
}
return l.file.Close()
}
// Error returns any issues the visibility logger had
func (l *VisibilityLogger) Error() error {
return l.err
}
// routine that reads visibility log messages off the msgs channel and
// writes them to disk.
func (l *VisibilityLogger) flusher() {
for msg := range l.msgs {
var data []byte
data, l.err = json.Marshal(msg)
if l.err != nil {
warne(l.err)
close(l.msgs)
break
}
data = append(data, byte('\n'))
_, l.err = l.file.Write(data)
if l.err != nil {
warne(l.err)
close(l.msgs)
break
}
l.err = l.file.Sync()
if l.err != nil {
warne(l.err)
close(l.msgs)
break
}
}
l.done <- true
}