-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgcs.go
158 lines (147 loc) · 4.33 KB
/
gcs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// GCS KV is a simple, persistent, key-value store built on top of Google Cloud Storage
// It stores all data flatly in the configure GCS bucket.
package gcskv
import (
"context"
"fmt"
"strings"
"cloud.google.com/go/storage"
"google.golang.org/api/iterator"
)
// GcsStore is an implementation of a persistent Key Value store.
//
// Each key is stored as the name of a GCS object.
// Each value is stored in the content of a GCS object.
// One GCS object contains one key/value pair.
//
// Write and scan operations are not safe for concurrent mutation by multiple
// goroutines, but Read operations are.
//
// One should use the New() method to create the GCSStore
type GcsStore struct {
client *storage.Client
bucketName string
basepath string
}
// New creates and returns a new GCSStore, initializing the GCS storage client.
//
// bucket specifies the name of the GCS bucket to store keys and values
//
// basepath is prepended to the name of each created object.
// For example, if the value of basepath is "gcskv/", all the key/value objects will be created in the gcskv/ folder
func New(bucket string, basepath string) (GcsStore, error) {
ctx := context.Background()
client, gcsErr := storage.NewClient(ctx)
if gcsErr != nil {
return GcsStore{}, gcsErr
}
return GcsStore{
client: client,
bucketName: bucket,
basepath: basepath,
}, nil
}
// Get returns the value of the given key. Error is returned if the key is not found
//
// add more docs
// even more docs
// feature 1
func (store GcsStore) Get(key string) ([]byte, error) {
ctx := context.Background()
key = store.basepath + key
rc, err := store.client.Bucket(store.bucketName).Object(key).NewReader(ctx)
if err != nil {
return nil, fmt.Errorf("file not found")
}
defer rc.Close()
out := make([]byte, rc.Attrs.Size)
size, err := rc.Read(out)
if int64(size) != rc.Attrs.Size {
return nil, fmt.Errorf("reading incomplete")
}
return out, err
}
// Set creates or overwrites a key/value pair
func (store GcsStore) Set(key string, value []byte) error {
ctx := context.Background()
key = store.basepath + key
writer := store.client.Bucket(store.bucketName).Object(key).NewWriter(ctx)
defer writer.Close()
size, err := writer.Write(value)
if size != len(value) {
return fmt.Errorf("writing incomplete")
}
return err
}
// Del removes the key/value pair from the GcsStore
func (store GcsStore) Del(key string) error {
ctx := context.Background()
key = store.basepath + key
return store.client.Bucket(store.bucketName).Object(key).Delete(ctx)
}
// Size returns the number of key/value pairs in the GcsStore
func (store GcsStore) Size() (int, error) {
ctx := context.Background()
query := &storage.Query{Prefix: store.basepath}
iter := store.client.Bucket(store.bucketName).Objects(ctx, query)
count := 0
for {
_, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return 0, err
}
count++
}
return count, nil
}
// Scan returns all the keys >= startKey and < endKey in lexicographic order
//
// prefix is prepended to both the startKey and endKey to form 2 complete keys.
// For example, Scan("foo/", "a", "b") will return all the keys >= "foo/a" and < "foo/b"
func (store GcsStore) Scan(prefix, startKey, endKey string) ([]string, error) {
ctx := context.Background()
query := &storage.Query{
StartOffset: store.basepath + prefix + startKey,
EndOffset: store.basepath + prefix + endKey,
}
query.SetAttrSelection([]string{"Name"})
iter := store.client.Bucket(store.bucketName).Objects(ctx, query)
names := make([]string, 0, iter.PageInfo().MaxSize)
for {
objAttrs, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return names, err
}
name := strings.Replace(objAttrs.Name, store.basepath, "", 1)
names = append(names, name)
}
return names, nil
}
// Clear remove all key/value pairs from GcsStore.
//
// It should not affect other objects in the bucket not prefixed by store.basepath
func (store GcsStore) Clear() error {
ctx := context.Background()
query := &storage.Query{Prefix: store.basepath}
iter := store.client.Bucket(store.bucketName).Objects(ctx, query)
for {
objAttrs, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
err = store.client.Bucket(store.bucketName).Object(objAttrs.Name).Delete(ctx)
if err != nil {
return err
}
}
return nil
}