From e50f0ba4651fd1eea2738b8b662db4cf7dee3d61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Mart=C3=ADn=20Nieto?= Date: Tue, 13 Jun 2017 22:40:24 +0200 Subject: [PATCH] Add a function call for listing objects under a prefix Given a list of prefixes, `ListObjects` goes off in parallel and returns a list of the keys which exist under them. --- list_objects.go | 231 +++++++++++++++++++++++++++++++++++++++++++ list_objects_test.go | 85 ++++++++++++++++ s3gof3r.go | 12 +++ 3 files changed, 328 insertions(+) create mode 100644 list_objects.go create mode 100644 list_objects_test.go diff --git a/list_objects.go b/list_objects.go new file mode 100644 index 0000000..1aa3949 --- /dev/null +++ b/list_objects.go @@ -0,0 +1,231 @@ +package s3gof3r + +import ( + "encoding/xml" + "math" + "net/http" + "strconv" + "sync" + "time" +) + +func newObjectLister(c *Config, b *Bucket, prefixes []string, maxKeys int) (*ObjectLister, error) { + l := new(ObjectLister) + l.c, l.b = new(Config), new(Bucket) + *l.c, *l.b = *c, *b + l.c.NTry = max(c.NTry, 1) + l.c.Concurrency = max(c.Concurrency, 1) + l.getCh, l.putCh = make(chan string), make(chan []string, 1) + l.quit = make(chan struct{}) + l.prefixes = prefixes + l.maxKeys = maxKeys + + for i := 0; i < l.c.Concurrency; i++ { + l.wg.Add(1) + go l.worker() + } + go l.initPrefixes() + + return l, nil +} + +type ObjectLister struct { + b *Bucket + c *Config + prefixes []string + maxKeys int + + next []string + err error + getCh chan string + putCh chan []string + wg sync.WaitGroup + quit chan struct{} + quitOnce sync.Once +} + +func (l *ObjectLister) closeQuit() { + l.quitOnce.Do(func() { close(l.quit) }) +} + +func (l *ObjectLister) initPrefixes() { + // We first enqueue all of the prefixes we were given + for _, p := range l.prefixes { + l.getCh <- p + } + close(l.getCh) + + l.wg.Wait() + close(l.putCh) +} + +func (l *ObjectLister) worker() { + for p := range l.getCh { + var continuation string + retries: + for { + res, err := l.retryListObjects(p, continuation) + if err != nil { + select { + case <-l.quit: + return + default: + l.err = err + l.closeQuit() + return + } + } + + keys := make([]string, 0, len(res.Contents)) + for _, c := range res.Contents { + keys = append(keys, c.Key) + } + + select { + case <-l.quit: + return + case l.putCh <- keys: + continuation = res.NextContinuationToken + if continuation != "" { + continue + } + + // Break from this prefix and grab the next one + break retries + } + } + } + + l.wg.Done() +} + +func (l *ObjectLister) retryListObjects(p, continuation string) (*listBucketResult, error) { + var err error + var res *listBucketResult + for i := 0; i < l.c.NTry; i++ { + opts := listObjectsOptions{MaxKeys: l.maxKeys, Prefix: p, ContinuationToken: continuation} + res, err = listObjects(l.c, l.b, opts) + if err == nil { + return res, nil + } + + time.Sleep(time.Duration(math.Exp2(float64(i))) * 100 * time.Millisecond) // exponential back-off + } + + return nil, err +} + +// Next moves the iterator to the next set of results. It returns true if there +// are more results, or false if there are no more results or there was an +// error. +func (l *ObjectLister) Next() bool { + if l.err != nil { + return false + } + + select { + case n, ok := <-l.putCh: + if !ok { + l.err = nil + return false + } + + l.next = n + return true + case <-l.quit: + return false + } +} + +func (l *ObjectLister) Value() []string { + return l.next +} + +func (l *ObjectLister) Error() error { + return l.err +} + +func (l *ObjectLister) Close() { + l.closeQuit() +} + +// ListObjectsOptions specifies the options for a ListObjects operation on a S3 +// bucket +type listObjectsOptions struct { + // Maximum number of keys to return per request + MaxKeys int + // Only list those keys that start with the given prefix + Prefix string + // Continuation token from the previous request + ContinuationToken string +} + +type listBucketResult struct { + Name string `xml:"Name"` + Prefix string `xml:"Prefix"` + KeyCount int `xml:"KeyCount"` + MaxKeys int `xml:"MaxKeys"` + IsTruncated bool `xml:"IsTrucated"` + NextContinuationToken string `xml:"NextContinuationToken"` + Contents []listBucketContents `xml:"Contents"` +} + +type listBucketContents struct { + Key string `xml:"Key"` + LastModified time.Time `xml:"LastModified"` + ETag string `xml:"ETag"` + Size int64 `xml:"Size"` + StorageClass string `xml:"StorageClass"` + CommonPrefixes []CommonPrefix `xml:"CommonPrefixes"` +} + +type CommonPrefix struct { + Prefix string `xml:"Prefix"` +} + +type ListObjectsResult struct { + result *listBucketResult +} + +func listObjects(c *Config, b *Bucket, opts listObjectsOptions) (result *listBucketResult, err error) { + result = new(listBucketResult) + u, err := b.url("", c) + if err != nil { + return nil, err + } + + q := u.Query() + q.Set("list-type", "2") + if opts.MaxKeys > 0 { + q.Set("max-keys", strconv.Itoa(opts.MaxKeys)) + } + if opts.Prefix != "" { + q.Set("prefix", opts.Prefix) + } + if opts.ContinuationToken != "" { + q.Set("continuation-token", opts.ContinuationToken) + } + u.RawQuery = q.Encode() + + r := http.Request{ + Method: "GET", + URL: u, + } + b.Sign(&r) + + resp, err := b.conf().Do(&r) + if err != nil { + return nil, err + } + defer checkClose(resp.Body, err) + if resp.StatusCode != 200 { + return nil, newRespError(resp) + } + + decoder := xml.NewDecoder(resp.Body) + if err := decoder.Decode(result); err != nil { + return nil, err + } + + return result, nil +} diff --git a/list_objects_test.go b/list_objects_test.go new file mode 100644 index 0000000..464c1b4 --- /dev/null +++ b/list_objects_test.go @@ -0,0 +1,85 @@ +package s3gof3r + +import ( + "log" + "sort" + "sync" + "testing" +) + +var keysForListing = []string{ + "list/one/two/three", + "list/one/two/four", + "list/two/three/four", + "list/two/three/five", + "list/three/four/five", + "list/three/four/six", + "list/four/five/six", + "list/four/five/seven", +} + +func uploadListerFiles() { + var wg sync.WaitGroup + for _, tt := range keysForListing { + wg.Add(1) + go func(path string) { + err := b.putReader(path, &randSrc{Size: 20}) + if err != nil { + log.Fatal(err) + } + wg.Done() + }(tt) + } + wg.Wait() +} + +func testListObjects(t *testing.T, prefixes []string, iterations, concurrency int) { + config := Config{ + Concurrency: 1, + Scheme: "https", + } + l, err := b.ListObjects(prefixes, 5, &config) + if err != nil { + t.Error(err) + } + + actual := make([]string, 0, len(keysForListing)) + actualIterations := 0 + for l.Next() { + actualIterations++ + actual = append(actual, l.Value()...) + } + + err = l.Error() + if err != nil { + t.Error(err) + } + + if actualIterations != iterations { + t.Errorf("expected %d iterations, got %d", iterations, actualIterations) + } + + if len(actual) != len(keysForListing) { + t.Errorf("expected %d keys, got %d", len(keysForListing), len(actual)) + } + + sort.Strings(keysForListing) + sort.Strings(actual) + + for i, a := range keysForListing { + if a != actual[i] { + t.Errorf("result mismatch, expected '%s', got '%s'", a, actual[i]) + } + } +} + +func TestListObjects(t *testing.T) { + t.Parallel() + + uploadListerFiles() + + testListObjects(t, []string{"list/"}, 2, 1) + testListObjects(t, []string{"list/"}, 2, 5) + testListObjects(t, []string{"list/one/", "list/two/", "list/three", "list/four"}, 4, 1) + testListObjects(t, []string{"list/one/", "list/two/", "list/three", "list/four"}, 4, 5) +} diff --git a/s3gof3r.go b/s3gof3r.go index af66f8f..1a3a59c 100644 --- a/s3gof3r.go +++ b/s3gof3r.go @@ -226,6 +226,18 @@ func (b *Bucket) delete(path string) error { return nil } +// ListObjects returns a list of objects under the given prefixes using parallel +// requests for each prefix and any continuations. +// +// maxKeys indicates how many keys should be returned per request +func (b *Bucket) ListObjects(prefixes []string, maxKeys int, c *Config) (*ObjectLister, error) { + if c == nil { + c = b.conf() + } + + return newObjectLister(c, b, prefixes, maxKeys) +} + // SetLogger wraps the standard library log package. // // It allows the internal logging of s3gof3r to be set to a desired output and format.