From 41cce90c698bf35d1f3985b5b50fa029d9ed9fef Mon Sep 17 00:00:00 2001 From: Matthew Holt Date: Thu, 31 Dec 2020 13:37:16 -0700 Subject: [PATCH] Add verbose mode (-v flag) (#24) The central processor and Google Photos data sources now implement more verbose (but rudimentary) logging. We should probably switch to zap at some point. --- README.md | 2 +- account.go | 4 ++ cmd/timeliner/main.go | 7 ++- datasources/googlephotos/googlephotos.go | 57 ++++++++++++++--------- datasources/googlephotos/media.go | 4 +- itemfiles.go | 15 +++--- processing.go | 58 +++++++++++++++++------- ratelimit.go | 6 +-- timeliner.go | 4 ++ wrappedclient.go | 27 +++++++---- 10 files changed, 123 insertions(+), 61 deletions(-) diff --git a/README.md b/README.md index 367d6f9..e469ccc 100644 --- a/README.md +++ b/README.md @@ -152,7 +152,7 @@ $ timeliner get-all google_photos/you@gmail.com This process can take weeks if you have a large library. Even if you have a fast Internet connection, the client is carefully rate-limited to be a good API citizen, so the process will be slow. -If you open your timeline folder in a file browser, you will see it start to fill up with your photos from Google Photos. +If you open your timeline folder in a file browser, you will see it start to fill up with your photos from Google Photos. To see more verbose logging, use the `-v` flag (NOTE: this will drastically slow down processing that isn't bottlenecked by the network). Data sources may create checkpoints as they go. If so, `get-all` or `get-latest` will automatically resume the last listing if it was interrupted, but only if the same command is repeated (you can't resume a `get-latest` with `get-all`, for example, or with different timeframe parameters). In the case of Google Photos, each page of API results is checkpointed. Checkpoints are not intended for long-term pauses. In other words, a resume should happen fairly shortly after being interrupted, and should be resumed using the same command as before. (A checkpoint will be automatically resumed only if the command parameters are identical.) diff --git a/account.go b/account.go index 644d113..84df677 100644 --- a/account.go +++ b/account.go @@ -46,6 +46,10 @@ func (acc Account) NewHTTPClient() (*http.Client, error) { return httpClient, nil } +func (acc Account) String() string { + return acc.DataSourceID + "/" + acc.UserID +} + // AddAccount authenticates userID with the service identified // within the application by dataSourceID, and then stores it in the // database. The account must not yet exist. diff --git a/cmd/timeliner/main.go b/cmd/timeliner/main.go index f89d94c..415d760 100644 --- a/cmd/timeliner/main.go +++ b/cmd/timeliner/main.go @@ -29,6 +29,7 @@ func init() { flag.StringVar(&repoDir, "repo", repoDir, "The path to the folder of the repository") flag.IntVar(&maxRetries, "max-retries", maxRetries, "If > 0, will retry on failure at most this many times") flag.DurationVar(&retryAfter, "retry-after", retryAfter, "If > 0, will wait this long between retries") + flag.BoolVar(&verbose, "v", verbose, "Verbose output (can be very slow if data source isn't bottlenecked by network)") flag.BoolVar(&prune, "prune", prune, "When finishing, delete items not found on remote (download-all or import only)") flag.BoolVar(&integrity, "integrity", integrity, "Perform integrity check on existing items and reprocess if needed (download-all or import only)") @@ -147,6 +148,7 @@ func main() { Integrity: integrity, Timeframe: tf, Merge: mergeOpt, + Verbose: verbose, } // make a client for each account @@ -157,7 +159,7 @@ func main() { log.Fatalf("[FATAL][%s/%s] Creating data source client: %v", a.dataSourceID, a.userID, err) } - // configure the client + // configure the client (TODO: this is not good design; should happen in their own packages) switch v := wc.Client.(type) { case *twitter.Client: v.Retweets = twitterRetweets @@ -185,7 +187,7 @@ func main() { if retryNum > 0 { log.Println("[INFO] Retrying command") } - err := wc.GetLatest(ctx, tf.Until) + err := wc.GetLatest(ctx, procOpt) if err != nil { log.Printf("[ERROR][%s/%s] Getting latest: %v", wc.DataSourceID(), wc.UserID(), err) @@ -375,6 +377,7 @@ var ( configFile = "timeliner.toml" maxRetries int retryAfter time.Duration + verbose bool integrity bool prune bool diff --git a/datasources/googlephotos/googlephotos.go b/datasources/googlephotos/googlephotos.go index 708a171..c14494a 100644 --- a/datasources/googlephotos/googlephotos.go +++ b/datasources/googlephotos/googlephotos.go @@ -84,11 +84,11 @@ func (c *Client) ListItems(ctx context.Context, itemChan chan<- *timeliner.ItemG // get items and collections errChan := make(chan error) go func() { - err := c.listItems(ctx, itemChan, opt.Timeframe) + err := c.listItems(ctx, itemChan, opt) errChan <- err }() go func() { - err := c.listCollections(ctx, itemChan, opt.Timeframe) + err := c.listCollections(ctx, itemChan, opt) errChan <- err }() @@ -98,7 +98,7 @@ func (c *Client) ListItems(ctx context.Context, itemChan chan<- *timeliner.ItemG for i := 0; i < 2; i++ { err := <-errChan if err != nil { - log.Printf("[ERROR][%s/%s] A listing goroutine errored: %v", DataSourceID, c.userID, err) + log.Printf("[ERROR] %s/%s: a listing goroutine errored: %v", DataSourceID, c.userID, err) errs = append(errs, err.Error()) } } @@ -109,8 +109,7 @@ func (c *Client) ListItems(ctx context.Context, itemChan chan<- *timeliner.ItemG return nil } -func (c *Client) listItems(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, - timeframe timeliner.Timeframe) error { +func (c *Client) listItems(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, opt timeliner.ListingOptions) error { c.checkpoint.mu.Lock() pageToken := c.checkpoint.ItemsNextPage c.checkpoint.mu.Unlock() @@ -121,7 +120,7 @@ func (c *Client) listItems(ctx context.Context, itemChan chan<- *timeliner.ItemG return nil default: var err error - pageToken, err = c.getItemsNextPage(itemChan, pageToken, timeframe) + pageToken, err = c.getItemsNextPage(itemChan, pageToken, opt.Timeframe) if err != nil { return fmt.Errorf("getting items on next page: %v", err) } @@ -173,8 +172,7 @@ func (c *Client) getItemsNextPage(itemChan chan<- *timeliner.ItemGraph, // a timeframe in this search. // // See https://developers.google.com/photos/library/reference/rest/v1/mediaItems/search. -func (c *Client) listCollections(ctx context.Context, - itemChan chan<- *timeliner.ItemGraph, timeframe timeliner.Timeframe) error { +func (c *Client) listCollections(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, opt timeliner.ListingOptions) error { c.checkpoint.mu.Lock() albumPageToken := c.checkpoint.AlbumsNextPage c.checkpoint.mu.Unlock() @@ -184,8 +182,13 @@ func (c *Client) listCollections(ctx context.Context, case <-ctx.Done(): return nil default: + if opt.Verbose { + log.Printf("[DEBUG] %s/%s: listing albums: next page (page_token=%s)", + DataSourceID, c.userID, albumPageToken) + } + var err error - albumPageToken, err = c.getAlbumsAndTheirItemsNextPage(itemChan, albumPageToken, timeframe) + albumPageToken, err = c.getAlbumsAndTheirItemsNextPage(itemChan, albumPageToken, opt) if err != nil { return err } @@ -202,7 +205,7 @@ func (c *Client) listCollections(ctx context.Context, } func (c *Client) getAlbumsAndTheirItemsNextPage(itemChan chan<- *timeliner.ItemGraph, - pageToken string, timeframe timeliner.Timeframe) (string, error) { + pageToken string, opt timeliner.ListingOptions) (string, error) { vals := url.Values{ "pageToken": {pageToken}, "pageSize": {"50"}, @@ -215,7 +218,12 @@ func (c *Client) getAlbumsAndTheirItemsNextPage(itemChan chan<- *timeliner.ItemG } for _, album := range respBody.Albums { - err = c.getAlbumItems(itemChan, album, timeframe) + if opt.Verbose { + log.Printf("[DEBUG] %s/%s: listing items in album: '%s' (album_id=%s item_count=%s)", + DataSourceID, c.userID, album.Title, album.ID, album.MediaItemsCount) + } + + err = c.getAlbumItems(itemChan, album, opt) if err != nil { return "", err } @@ -224,15 +232,22 @@ func (c *Client) getAlbumsAndTheirItemsNextPage(itemChan chan<- *timeliner.ItemG return respBody.NextPageToken, nil } -func (c *Client) getAlbumItems(itemChan chan<- *timeliner.ItemGraph, album gpAlbum, timeframe timeliner.Timeframe) error { +func (c *Client) getAlbumItems(itemChan chan<- *timeliner.ItemGraph, album gpAlbum, opt timeliner.ListingOptions) error { var albumItemsNextPage string var counter int + const pageSize = 100 + for { reqBody := listMediaItemsRequest{ AlbumID: album.ID, PageToken: albumItemsNextPage, - PageSize: 100, + PageSize: pageSize, + } + + if opt.Verbose { + log.Printf("[DEBUG] %s/%s: getting next page of media items in album (album_id=%s page_size=%d page_token=%s)", + DataSourceID, c.userID, album.ID, pageSize, albumItemsNextPage) } page, err := c.pageOfMediaItems(reqBody) @@ -248,10 +263,10 @@ func (c *Client) getAlbumItems(itemChan chan<- *timeliner.ItemGraph, album gpAlb // have to iterate all items in all albums, but at least we // can just skip items that fall outside the timeframe... ts := it.Timestamp() - if timeframe.Since != nil && ts.Before(*timeframe.Since) { + if opt.Timeframe.Since != nil && ts.Before(*opt.Timeframe.Since) { continue } - if timeframe.Until != nil && ts.After(*timeframe.Until) { + if opt.Timeframe.Until != nil && ts.After(*opt.Timeframe.Until) { continue } @@ -297,7 +312,7 @@ func (c *Client) apiRequestWithRetry(method, endpoint string, reqBodyData, respI var resp *http.Response resp, err = c.apiRequest(method, endpoint, reqBodyData) if err != nil { - log.Printf("[ERROR][%s/%s] Doing API request: >>> %v <<< - retrying... (attempt %d/%d)", + log.Printf("[ERROR] %s/%s: doing API request: >>> %v <<< - retrying... (attempt %d/%d)", DataSourceID, c.userID, err, i+1, maxTries) time.Sleep(10 * time.Second) continue @@ -315,14 +330,14 @@ func (c *Client) apiRequestWithRetry(method, endpoint string, reqBodyData, respI // extra-long pause for rate limiting errors if resp.StatusCode == http.StatusTooManyRequests { - log.Printf("[ERROR][%s/%s] Rate limited: HTTP %d: %s: %s - retrying in 35 seconds... (attempt %d/%d)", + log.Printf("[ERROR] %s/%s: rate limited: HTTP %d: %s: %s - retrying in 35 seconds... (attempt %d/%d)", DataSourceID, c.userID, resp.StatusCode, resp.Status, bodyText, i+1, maxTries) time.Sleep(35 * time.Second) continue } // for any other error, wait a couple seconds and retry - log.Printf("[ERROR][%s/%s] Bad API response: %v - retrying... (attempt %d/%d)", + log.Printf("[ERROR] %s/%s: bad API response: %v - retrying... (attempt %d/%d)", DataSourceID, c.userID, err, i+1, maxTries) time.Sleep(10 * time.Second) continue @@ -333,7 +348,7 @@ func (c *Client) apiRequestWithRetry(method, endpoint string, reqBodyData, respI if err != nil { resp.Body.Close() err = fmt.Errorf("decoding JSON: %v", err) - log.Printf("[ERROR][%s/%s] Reading API response: %v - retrying... (attempt %d/%d)", + log.Printf("[ERROR] %s/%s: reading API response: %v - retrying... (attempt %d/%d)", DataSourceID, c.userID, err, i+1, maxTries) time.Sleep(10 * time.Second) continue @@ -418,7 +433,7 @@ type checkpointInfo struct { func (ch *checkpointInfo) save(ctx context.Context) { gobBytes, err := timeliner.MarshalGob(ch) if err != nil { - log.Printf("[ERROR][%s] Encoding checkpoint: %v", DataSourceID, err) + log.Printf("[ERROR] %s: encoding checkpoint: %v", DataSourceID, err) } timeliner.Checkpoint(ctx, gobBytes) } @@ -431,6 +446,6 @@ func (ch *checkpointInfo) load(checkpointGob []byte) { } err := timeliner.UnmarshalGob(checkpointGob, ch) if err != nil { - log.Printf("[ERROR][%s] Decoding checkpoint: %v", DataSourceID, err) + log.Printf("[ERROR] %s: decoding checkpoint: %v", DataSourceID, err) } } diff --git a/datasources/googlephotos/media.go b/datasources/googlephotos/media.go index 121acc0..938cefb 100644 --- a/datasources/googlephotos/media.go +++ b/datasources/googlephotos/media.go @@ -70,7 +70,7 @@ func (m mediaItem) DataFileReader() (io.ReadCloser, error) { resp, err = http.Get(u) if err != nil { err = fmt.Errorf("getting media contents: %v", err) - log.Printf("[ERROR][%s] %s: %v - retrying... (attempt %d/%d)", DataSourceID, u, err, i+1, maxTries) + log.Printf("[ERROR] %s: %s: %v - retrying... (attempt %d/%d)", DataSourceID, u, err, i+1, maxTries) time.Sleep(30 * time.Second) continue } @@ -84,7 +84,7 @@ func (m mediaItem) DataFileReader() (io.ReadCloser, error) { err = fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status) } - log.Printf("[ERROR][%s] %s: Bad response: %v - waiting and retrying... (attempt %d/%d)", + log.Printf("[ERROR %s: %s: Bad response: %v - waiting and retrying... (attempt %d/%d)", DataSourceID, u, err, i+1, maxTries) time.Sleep(15 * time.Second) continue diff --git a/itemfiles.go b/itemfiles.go index 787cd87..276d4b3 100644 --- a/itemfiles.go +++ b/itemfiles.go @@ -18,12 +18,12 @@ import ( ) // downloadItemFile ... TODO: finish godoc. -func (t *Timeline) downloadItemFile(src io.ReadCloser, dest *os.File, h hash.Hash) error { +func (t *Timeline) downloadItemFile(src io.ReadCloser, dest *os.File, h hash.Hash) (int64, error) { if src == nil { - return fmt.Errorf("missing reader with which to download file") + return 0, fmt.Errorf("missing reader with which to download file") } if dest == nil { - return fmt.Errorf("missing file to download into") + return 0, fmt.Errorf("missing file to download into") } // TODO: What if file already exists on disk (byte-for-byte)? - i.e. data_hash in DB has a duplicate @@ -31,18 +31,19 @@ func (t *Timeline) downloadItemFile(src io.ReadCloser, dest *os.File, h hash.Has // give the hasher a copy of the file bytes tr := io.TeeReader(src, h) - if _, err := io.Copy(dest, tr); err != nil { + n, err := io.Copy(dest, tr) + if err != nil { os.Remove(dest.Name()) - return fmt.Errorf("copying contents: %v", err) + return n, fmt.Errorf("copying contents: %v", err) } if err := dest.Sync(); err != nil { os.Remove(dest.Name()) - return fmt.Errorf("syncing file: %v", err) + return n, fmt.Errorf("syncing file: %v", err) } // TODO: If mime type is photo or video, extract most important EXIF data and return it for storage in DB? - return nil + return n, nil } // makeUniqueCanonicalItemDataFileName returns an available diff --git a/processing.go b/processing.go index 3714643..642b11e 100644 --- a/processing.go +++ b/processing.go @@ -27,9 +27,6 @@ func (wc *WrappedClient) beginProcessing(cc concurrentCuckoo, po ProcessingOptio go func(i int) { defer wg.Done() for ig := range ch { - if ig == nil { - continue - } _, err := wc.processItemGraph(ig, &recursiveState{ timestamp: time.Now(), procOpt: po, @@ -38,8 +35,7 @@ func (wc *WrappedClient) beginProcessing(cc concurrentCuckoo, po ProcessingOptio cuckoo: cc, }) if err != nil { - log.Printf("[ERROR][%s/%s] Processing item graph: %v", - wc.ds.ID, wc.acc.UserID, err) + log.Printf("[ERROR] %s: processing item graph: %v", wc.acc, err) } } }(i) @@ -65,11 +61,27 @@ type recursiveState struct { } func (wc *WrappedClient) processItemGraph(ig *ItemGraph, state *recursiveState) (int64, error) { + if ig == nil { + return 0, nil + } + // don't visit a node twice if igID, ok := state.seen[ig]; ok { + if state.procOpt.Verbose { + log.Printf("[DEBUG] %s: item graph already visited: %p", wc.acc, ig) + } return igID, nil } + if state.procOpt.Verbose { + var nodeItemID string + if ig.Node != nil { + nodeItemID = ig.Node.ID() + } + log.Printf("[DEBUG] %s: visiting item graph %p (node_item_id=%s edges=%d collections=%d relations=%d)", + wc.acc, ig, nodeItemID, len(ig.Edges), len(ig.Collections), len(ig.Relations)) + } + var igRowID int64 if ig.Node == nil { @@ -279,6 +291,10 @@ func (wc *WrappedClient) storeItemFromService(it Item, timestamp time.Time, proc // already have it if !wc.shouldProcessExistingItem(it, ir, doingSoftMerge, procOpt) { + if procOpt.Verbose { + log.Printf("[DEBUG] %s: skipping processing of existing item (item_id=%s item_row_id=%d soft_merge=%t)", + wc.acc, itemOriginalID, ir.ID, doingSoftMerge) + } return ir.ID, nil } @@ -307,12 +323,12 @@ func (wc *WrappedClient) storeItemFromService(it Item, timestamp time.Time, proc if err == nil { err := os.Remove(bakFile) if err != nil && !os.IsNotExist(err) { - log.Printf("[ERROR] Deleting data file backup: %v", err) + log.Printf("[ERROR] deleting data file backup: %v", err) } } else { err := os.Rename(bakFile, origFile) if err != nil && !os.IsNotExist(err) { - log.Printf("[ERROR] Restoring original data file from backup: %v", err) + log.Printf("[ERROR] restoring original data file from backup: %v", err) } } }() @@ -352,11 +368,16 @@ func (wc *WrappedClient) storeItemFromService(it Item, timestamp time.Time, proc return 0, fmt.Errorf("getting item row ID: %v", err) } + if procOpt.Verbose { + log.Printf("[DEBUG] %s: stored or updated item in database (item_id=%s item_row_id=%d soft_merge=%t)", + wc.acc, itemOriginalID, itemRowID, doingSoftMerge) + } + // if there is a data file, download it and compute its checksum; // then update the item's row in the DB with its name and checksum if processDataFile { h := sha256.New() - err := wc.tl.downloadItemFile(rc, datafile, h) + dataFileSize, err := wc.tl.downloadItemFile(rc, datafile, h) if err != nil { return 0, fmt.Errorf("downloading data file: %v (item_id=%v)", err, itemRowID) } @@ -376,10 +397,15 @@ func (wc *WrappedClient) storeItemFromService(it Item, timestamp time.Time, proc _, err = wc.tl.db.Exec(`UPDATE items SET data_hash=? WHERE id=?`, // TODO: LIMIT 1... (see https://github.com/mattn/go-sqlite3/pull/802) b64hash, itemRowID) if err != nil { - log.Printf("[ERROR][%s/%s] Updating item's data file hash in DB: %v; cleaning up data file: %s (item_id=%d)", - wc.ds.ID, wc.acc.UserID, err, datafile.Name(), itemRowID) + log.Printf("[ERROR] %s: updating item's data file hash in DB: %v; cleaning up data file: %s (item_id=%d)", + wc.acc, err, datafile.Name(), itemRowID) os.Remove(wc.tl.fullpath(*dataFileName)) } + + if procOpt.Verbose { + log.Printf("[DEBUG] %s: downloaded data file (item_id=%s filename=%s size=%d)", + wc.acc, itemOriginalID, *dataFileName, dataFileSize) + } } return itemRowID, nil @@ -390,22 +416,22 @@ func (wc *WrappedClient) shouldProcessExistingItem(it Item, dbItem ItemRow, doin if procOpt.Integrity && dbItem.DataFile != nil && dbItem.DataHash != nil { datafile, err := os.Open(wc.tl.fullpath(*dbItem.DataFile)) if err != nil { - log.Printf("[ERROR][%s/%s] Integrity check: opening existing data file: %v; reprocessing (item_id=%d)", - wc.ds.ID, wc.acc.UserID, err, dbItem.ID) + log.Printf("[ERROR] %s: integrity check: opening existing data file: %v; reprocessing (item_id=%d)", + wc.acc, err, dbItem.ID) return true } defer datafile.Close() h := sha256.New() _, err = io.Copy(h, datafile) if err != nil { - log.Printf("[ERROR][%s/%s] Integrity check: reading existing data file: %v; reprocessing (item_id=%d)", - wc.ds.ID, wc.acc.UserID, err, dbItem.ID) + log.Printf("[ERROR] %s: integrity check: reading existing data file: %v; reprocessing (item_id=%d)", + wc.acc, err, dbItem.ID) return true } b64hash := base64.StdEncoding.EncodeToString(h.Sum(nil)) if b64hash != *dbItem.DataHash { - log.Printf("[ERROR][%s/%s] Integrity check: checksum mismatch: expected %s, got %s; reprocessing (item_id=%d)", - wc.ds.ID, wc.acc.UserID, *dbItem.DataHash, b64hash, dbItem.ID) + log.Printf("[ERROR] %s: integrity check: checksum mismatch: expected %s, got %s; reprocessing (item_id=%d)", + wc.acc, *dbItem.DataHash, b64hash, dbItem.ID) return true } } diff --git a/ratelimit.go b/ratelimit.go index a38f566..d109731 100644 --- a/ratelimit.go +++ b/ratelimit.go @@ -17,9 +17,7 @@ type RateLimit struct { // NewRateLimitedRoundTripper adds rate limiting to rt based on the rate // limiting policy registered by the data source associated with acc. func (acc Account) NewRateLimitedRoundTripper(rt http.RoundTripper) http.RoundTripper { - rlKey := acc.DataSourceID + "_" + acc.UserID - - rl, ok := acc.t.rateLimiters[rlKey] + rl, ok := acc.t.rateLimiters[acc.String()] if !ok && acc.ds.RateLimit.RequestsPerHour > 0 { secondsBetweenReqs := 60.0 / (float64(acc.ds.RateLimit.RequestsPerHour) / 60.0) @@ -41,7 +39,7 @@ func (acc Account) NewRateLimitedRoundTripper(rt http.RoundTripper) http.RoundTr } }() - acc.t.rateLimiters[rlKey] = rl + acc.t.rateLimiters[acc.String()] = rl } return rateLimitedRoundTripper{ diff --git a/timeliner.go b/timeliner.go index 86aa398..c4da8c3 100644 --- a/timeliner.go +++ b/timeliner.go @@ -148,6 +148,7 @@ type ProcessingOptions struct { Integrity bool Timeframe Timeframe Merge MergeOptions + Verbose bool } // MergeOptions configures how items are merged. By @@ -217,4 +218,7 @@ type ListingOptions struct { // A checkpoint from which to resume // item retrieval. Checkpoint []byte + + // Enable verbose output (logs). + Verbose bool } diff --git a/wrappedclient.go b/wrappedclient.go index 765dedc..4bcd669 100644 --- a/wrappedclient.go +++ b/wrappedclient.go @@ -34,16 +34,21 @@ type WrappedClient struct { } // GetLatest gets the most recent items from wc. It does not prune or -// reprocess; only meant for a quick pull. If there are no items pulled -// yet, all items will be pulled. If until is not nil, the latest only -// up to that timestamp will be pulled, and if until is after the latest -// item, no items will be pulled. -func (wc *WrappedClient) GetLatest(ctx context.Context, until *time.Time) error { +// reprocess; only meant for a quick pull (error will be returned if +// procOpt is not compatible). If there are no items pulled yet, all +// items will be pulled. If procOpt.Timeframe.Until is not nil, the +// latest only up to that timestamp will be pulled, and if until is +// after the latest item, no items will be pulled. +func (wc *WrappedClient) GetLatest(ctx context.Context, procOpt ProcessingOptions) error { if ctx == nil { ctx = context.Background() } ctx = context.WithValue(ctx, wrappedClientCtxKey, wc) + if procOpt.Reprocess || procOpt.Prune || procOpt.Integrity || procOpt.Timeframe.Since != nil { + return fmt.Errorf("get-latest does not support -reprocess, -prune, -integrity, or -start") + } + // get date and original ID of the most recent item for this // account from the last successful run var mostRecentTimestamp int64 @@ -57,7 +62,7 @@ func (wc *WrappedClient) GetLatest(ctx context.Context, until *time.Time) error } // constrain the pull to the recent timeframe - timeframe := Timeframe{Until: until} + timeframe := Timeframe{Until: procOpt.Timeframe.Until} if mostRecentTimestamp > 0 { ts := time.Unix(mostRecentTimestamp, 0) timeframe.Since = &ts @@ -72,11 +77,12 @@ func (wc *WrappedClient) GetLatest(ctx context.Context, until *time.Time) error checkpoint := wc.prepareCheckpoint(timeframe) - wg, ch := wc.beginProcessing(concurrentCuckoo{}, ProcessingOptions{}) + wg, ch := wc.beginProcessing(concurrentCuckoo{}, procOpt) err := wc.Client.ListItems(ctx, ch, ListingOptions{ Timeframe: timeframe, Checkpoint: checkpoint, + Verbose: procOpt.Verbose, }) if err != nil { return fmt.Errorf("getting items from service: %v", err) @@ -119,7 +125,11 @@ func (wc *WrappedClient) GetAll(ctx context.Context, procOpt ProcessingOptions) wg, ch := wc.beginProcessing(cc, procOpt) - err := wc.Client.ListItems(ctx, ch, ListingOptions{Checkpoint: checkpoint, Timeframe: procOpt.Timeframe}) + err := wc.Client.ListItems(ctx, ch, ListingOptions{ + Checkpoint: checkpoint, + Timeframe: procOpt.Timeframe, + Verbose: procOpt.Verbose, + }) if err != nil { return fmt.Errorf("getting items from service: %v", err) } @@ -198,6 +208,7 @@ func (wc *WrappedClient) Import(ctx context.Context, filename string, procOpt Pr Filename: filename, Checkpoint: wc.acc.checkpoint, Timeframe: procOpt.Timeframe, + Verbose: procOpt.Verbose, }) if err != nil { return fmt.Errorf("importing: %v", err)