-
Notifications
You must be signed in to change notification settings - Fork 114
/
Copy pathupload_arbitrary_file.go
106 lines (91 loc) · 2.45 KB
/
upload_arbitrary_file.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package datasync
import (
"context"
"io"
"os"
"path/filepath"
"github.com/pkg/errors"
v1 "go.viam.com/api/app/datasync/v1"
)
// UploadChunkSize defines the size of the data included in each message of a FileUpload stream.
var UploadChunkSize = 64 * 1024
func uploadArbitraryFile(ctx context.Context, client v1.DataSyncServiceClient, f *os.File, partID string, tags []string) error {
stream, err := client.FileUpload(ctx)
if err != nil {
return err
}
md := &v1.UploadMetadata{
PartId: partID,
Type: v1.DataType_DATA_TYPE_FILE,
FileName: filepath.Base(f.Name()),
FileExtension: filepath.Ext(f.Name()),
Tags: tags,
}
// Send metadata FileUploadRequest.
req := &v1.FileUploadRequest{
UploadPacket: &v1.FileUploadRequest_Metadata{
Metadata: md,
},
}
if err := stream.Send(req); err != nil {
return err
}
if err := sendFileUploadRequests(ctx, stream, f); err != nil {
return errors.Wrapf(err, "error syncing %s", f.Name())
}
if _, err := stream.CloseAndRecv(); err != nil {
return errors.Wrapf(err, "received error response while syncing %s", f.Name())
}
return nil
}
func sendFileUploadRequests(ctx context.Context, stream v1.DataSyncService_FileUploadClient, f *os.File) error {
// Loop until there is no more content to be read from file.
for {
select {
case <-ctx.Done():
return context.Canceled
default:
// Get the next UploadRequest from the file.
uploadReq, err := getNextFileUploadRequest(ctx, f)
// EOF means we've completed successfully.
if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
return err
}
if err = stream.Send(uploadReq); err != nil {
return err
}
}
}
}
func getNextFileUploadRequest(ctx context.Context, f *os.File) (*v1.FileUploadRequest, error) {
select {
case <-ctx.Done():
return nil, context.Canceled
default:
// Get the next file data reading from file, check for an error.
next, err := readNextFileChunk(f)
if err != nil {
return nil, err
}
// Otherwise, return an UploadRequest and no error.
return &v1.FileUploadRequest{
UploadPacket: &v1.FileUploadRequest_FileContents{
FileContents: next,
},
}, nil
}
}
func readNextFileChunk(f *os.File) (*v1.FileData, error) {
byteArr := make([]byte, UploadChunkSize)
numBytesRead, err := f.Read(byteArr)
if numBytesRead < UploadChunkSize {
byteArr = byteArr[:numBytesRead]
}
if err != nil {
return nil, err
}
return &v1.FileData{Data: byteArr}, nil
}