-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfileiter.go
124 lines (105 loc) · 3.22 KB
/
fileiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package fileperf
import (
"context"
"errors"
"io"
"time"
)
type fileIterUpdate struct {
updated time.Time
file File
stats JobStats
streamErr error
}
// ErrScanCancelled is reported by FileIter.Err() if it's been cancelled.
var ErrScanCancelled = errors.New("the scan has been cancelled")
// FileIter is a file iterator returned by a Scanner. It's used to step
// through the results of a Scanner as they're produced.
type FileIter struct {
start time.Time
ch <-chan fileIterUpdate
cancel context.CancelFunc
end time.Time
file File
stats JobStats
err error
}
// Scan waits for the next file to become available from the iterator.
// It returns false if the context is cancelled, the scanner encounters an
// error, or the end of the stream is reached.
//
// When Scan returns false, check iter.Err() for a non-nil error to
// understand the cause.
func (iter *FileIter) Scan(ctx context.Context) bool {
select {
case <-ctx.Done():
// This scan call has been cancelled, but that doesn't necessarily
// mean the entire job has been cancelled. We record the context
// error in the iterator here so this condition isn't confused with
// job completion, but we don't issue an iter.cancel().
iter.err = ctx.Err()
return false
case update, ok := <-iter.ch:
// Check for an end-of-stream condition, indicated by channel closure
if !ok {
iter.cancel()
return false
}
// Stats are always updated, even if there's a stream error
iter.stats = update.stats
iter.end = update.updated
// Files are only updated when there's no stream error
if update.streamErr == nil {
iter.file = update.file
return true
}
// Ignore io.EOF, which is the job's way of telling us it's
// wrapping up and is sending us completion stats
if update.streamErr != io.EOF {
iter.err = update.streamErr
}
// Stream errors immediately precede channel closure, so record the
// error and wrap up
iter.cancel()
// Ranging here isn't necessary, but we do so out of caution and as a
// nice way to make sure the channel has been drained
for update := range iter.ch {
iter.stats = update.stats
}
return false
}
}
// File returns the most recently matched file. It is updated each time
// Scan() returns true. Scan() must be called at least once before calling
// this funcion.
func (iter *FileIter) File() File {
return iter.file
}
// Err returns a non-nil error if the iterator's job encountered an error and
// stopped. It should be called after Scan() returns false. It returns nil
// if the job completed successfully.
func (iter *FileIter) Err() error {
return iter.err
}
// Stats returns the statistics for the iterator's job.
func (iter *FileIter) Stats() JobStats {
return iter.stats
}
// Duration returns the duration of the iterator's job.
func (iter *FileIter) Duration() time.Duration {
return iter.end.Sub(iter.start)
}
// Close causes the iterator's job to stop running. It always returns nil.
func (iter *FileIter) Close() error {
// Request cancellation
iter.cancel()
// Collect stats as we wait for the job to wrap up
for update := range iter.ch {
iter.stats = update.stats
iter.end = update.updated
if update.streamErr != nil {
iter.err = update.streamErr
}
}
return nil
}