-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdiskqueue.go
126 lines (111 loc) · 2.28 KB
/
diskqueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package diskqueue
import (
"context"
"errors"
"io"
"os"
"sync"
"time"
)
type Diskqueue struct {
sync.RWMutex
close bool
ticker *time.Ticker
wg *sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
var (
Reader = &reader{}
Writer = &writer{mtime: time.Now()}
Config = &config{
Path: "data",
FilePerm: 0600,
BatchSize: 100,
BatchTime: time.Second,
SegmentSize: 50 * 1024 * 1024,
SegmentLimit: 2048,
WriteTimeout: 300,
CheckpointFile: ".checkpoint",
MinRequiredSpace: 1024 * 1024 * 1024,
}
)
// Start diskqueue
func Start() (*Diskqueue, error) {
if _, err := os.Stat(Config.Path); err != nil {
return nil, err
}
queue := &Diskqueue{close: false, wg: &sync.WaitGroup{}}
queue.ticker = time.NewTicker(Config.BatchTime)
queue.ctx, queue.cancel = context.WithCancel(context.TODO())
_ = Reader.restore()
go queue.schedule()
return queue, nil
}
// Write data
func (queue *Diskqueue) Write(data []byte) error {
if queue.close {
return errors.New("closed")
}
queue.Lock()
defer queue.Unlock()
return Writer.write(data)
}
// Read data
func (queue *Diskqueue) Read() (int64, int64, []byte, error) {
if queue.close {
return 0, 0, nil, errors.New("closed")
}
index, offset, data, err := Reader.read()
if err == nil {
return index, offset, data, err
}
queue.RLock()
defer queue.RUnlock()
if err == io.EOF && (Writer.file == nil || Reader.file.Name() != Writer.file.Name()) {
_ = Reader.rotate()
}
return index, offset, data, err
}
// Commit index and offset
func (queue *Diskqueue) Commit(index int64, offset int64) {
if queue.close {
return
}
ck := &Reader.checkpoint
ck.Index, ck.Offset = index, offset
Reader.sync()
}
// Close diskqueue
func (queue *Diskqueue) Close() {
if queue.close {
return
}
queue.close = true
queue.ticker.Stop()
queue.cancel()
queue.wg.Wait()
Writer.close()
Reader.close()
}
// scheduled task
func (queue *Diskqueue) schedule() {
queue.wg.Add(1)
defer queue.wg.Done()
for {
select {
case <-queue.ticker.C:
func() {
queue.Lock()
defer queue.Unlock()
Writer.sync()
since := int64(time.Since(Writer.mtime).Seconds())
if since > Config.WriteTimeout {
Writer.close()
}
}()
case <-queue.ctx.Done():
return
}
}
}