-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathworker.go
58 lines (52 loc) · 1.66 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package main
import (
"io/ioutil"
"sync"
"time"
"github.com/airnandez/chasqui/fileserver"
)
// DownloadReq is a HTTP download operation sent to a client worker for execution
type DownloadReq struct {
seqNumber uint64
server string
fsclient *fileserver.Client
fileID string
size uint64
notAfter time.Time
replyTo chan<- *DownloadResp
}
// DownloadResp is the report sent back by a worker after performing a download operation
// against the file server
type DownloadResp struct {
seqNumber uint64
start time.Time
end time.Time
size uint64
err error
}
// clientWorker is the goroutine executed by each client worker. It receives incoming
// download requests, performs the requested operation and sends the result back
// via the channel specified in the request
func clientWorker(workerId int, wg *sync.WaitGroup, reqChan <-chan *DownloadReq) {
defer wg.Done()
for req := range reqChan {
if time.Now().After(req.notAfter) {
continue
}
debug(1, "worker %d: processing download [seqNo:%d server:%s size:%d]", workerId, req.seqNumber, req.server, req.size)
req.replyTo <- processDownloadRequest(req)
debug(1, "worker %d seqNo:%d ended", workerId, req.seqNumber)
}
}
// processDownloadRequest perform a single file download against the server
// specified in the argument request
func processDownloadRequest(req *DownloadReq) *DownloadResp {
report := req.fsclient.DownloadFile(req.server, req.fileID, int(req.size), fileserver.ChecksumNone, fileserver.SHA256, ioutil.Discard)
return &DownloadResp{
seqNumber: req.seqNumber,
start: report.Start,
end: report.End,
size: req.size,
err: report.Err,
}
}