Skip to content

Commit

Permalink
Add verbose mode (-v flag) (#24)
Browse files Browse the repository at this point in the history
The central processor and Google Photos data sources now implement more verbose (but rudimentary) logging. We should probably switch to zap at some point.
  • Loading branch information
mholt committed Dec 31, 2020
1 parent 0828cdb commit 41cce90
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 61 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ $ timeliner get-all google_photos/[email protected]

This process can take weeks if you have a large library. Even if you have a fast Internet connection, the client is carefully rate-limited to be a good API citizen, so the process will be slow.

If you open your timeline folder in a file browser, you will see it start to fill up with your photos from Google Photos.
If you open your timeline folder in a file browser, you will see it start to fill up with your photos from Google Photos. To see more verbose logging, use the `-v` flag (NOTE: this will drastically slow down processing that isn't bottlenecked by the network).

Data sources may create checkpoints as they go. If so, `get-all` or `get-latest` will automatically resume the last listing if it was interrupted, but only if the same command is repeated (you can't resume a `get-latest` with `get-all`, for example, or with different timeframe parameters). In the case of Google Photos, each page of API results is checkpointed. Checkpoints are not intended for long-term pauses. In other words, a resume should happen fairly shortly after being interrupted, and should be resumed using the same command as before. (A checkpoint will be automatically resumed only if the command parameters are identical.)

Expand Down
4 changes: 4 additions & 0 deletions account.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (acc Account) NewHTTPClient() (*http.Client, error) {
return httpClient, nil
}

func (acc Account) String() string {
return acc.DataSourceID + "/" + acc.UserID
}

// AddAccount authenticates userID with the service identified
// within the application by dataSourceID, and then stores it in the
// database. The account must not yet exist.
Expand Down
7 changes: 5 additions & 2 deletions cmd/timeliner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func init() {
flag.StringVar(&repoDir, "repo", repoDir, "The path to the folder of the repository")
flag.IntVar(&maxRetries, "max-retries", maxRetries, "If > 0, will retry on failure at most this many times")
flag.DurationVar(&retryAfter, "retry-after", retryAfter, "If > 0, will wait this long between retries")
flag.BoolVar(&verbose, "v", verbose, "Verbose output (can be very slow if data source isn't bottlenecked by network)")

flag.BoolVar(&prune, "prune", prune, "When finishing, delete items not found on remote (download-all or import only)")
flag.BoolVar(&integrity, "integrity", integrity, "Perform integrity check on existing items and reprocess if needed (download-all or import only)")
Expand Down Expand Up @@ -147,6 +148,7 @@ func main() {
Integrity: integrity,
Timeframe: tf,
Merge: mergeOpt,
Verbose: verbose,
}

// make a client for each account
Expand All @@ -157,7 +159,7 @@ func main() {
log.Fatalf("[FATAL][%s/%s] Creating data source client: %v", a.dataSourceID, a.userID, err)
}

// configure the client
// configure the client (TODO: this is not good design; should happen in their own packages)
switch v := wc.Client.(type) {
case *twitter.Client:
v.Retweets = twitterRetweets
Expand Down Expand Up @@ -185,7 +187,7 @@ func main() {
if retryNum > 0 {
log.Println("[INFO] Retrying command")
}
err := wc.GetLatest(ctx, tf.Until)
err := wc.GetLatest(ctx, procOpt)
if err != nil {
log.Printf("[ERROR][%s/%s] Getting latest: %v",
wc.DataSourceID(), wc.UserID(), err)
Expand Down Expand Up @@ -375,6 +377,7 @@ var (
configFile = "timeliner.toml"
maxRetries int
retryAfter time.Duration
verbose bool

integrity bool
prune bool
Expand Down
57 changes: 36 additions & 21 deletions datasources/googlephotos/googlephotos.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ func (c *Client) ListItems(ctx context.Context, itemChan chan<- *timeliner.ItemG
// get items and collections
errChan := make(chan error)
go func() {
err := c.listItems(ctx, itemChan, opt.Timeframe)
err := c.listItems(ctx, itemChan, opt)
errChan <- err
}()
go func() {
err := c.listCollections(ctx, itemChan, opt.Timeframe)
err := c.listCollections(ctx, itemChan, opt)
errChan <- err
}()

Expand All @@ -98,7 +98,7 @@ func (c *Client) ListItems(ctx context.Context, itemChan chan<- *timeliner.ItemG
for i := 0; i < 2; i++ {
err := <-errChan
if err != nil {
log.Printf("[ERROR][%s/%s] A listing goroutine errored: %v", DataSourceID, c.userID, err)
log.Printf("[ERROR] %s/%s: a listing goroutine errored: %v", DataSourceID, c.userID, err)
errs = append(errs, err.Error())
}
}
Expand All @@ -109,8 +109,7 @@ func (c *Client) ListItems(ctx context.Context, itemChan chan<- *timeliner.ItemG
return nil
}

func (c *Client) listItems(ctx context.Context, itemChan chan<- *timeliner.ItemGraph,
timeframe timeliner.Timeframe) error {
func (c *Client) listItems(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, opt timeliner.ListingOptions) error {
c.checkpoint.mu.Lock()
pageToken := c.checkpoint.ItemsNextPage
c.checkpoint.mu.Unlock()
Expand All @@ -121,7 +120,7 @@ func (c *Client) listItems(ctx context.Context, itemChan chan<- *timeliner.ItemG
return nil
default:
var err error
pageToken, err = c.getItemsNextPage(itemChan, pageToken, timeframe)
pageToken, err = c.getItemsNextPage(itemChan, pageToken, opt.Timeframe)
if err != nil {
return fmt.Errorf("getting items on next page: %v", err)
}
Expand Down Expand Up @@ -173,8 +172,7 @@ func (c *Client) getItemsNextPage(itemChan chan<- *timeliner.ItemGraph,
// a timeframe in this search.
//
// See https://developers.google.com/photos/library/reference/rest/v1/mediaItems/search.
func (c *Client) listCollections(ctx context.Context,
itemChan chan<- *timeliner.ItemGraph, timeframe timeliner.Timeframe) error {
func (c *Client) listCollections(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, opt timeliner.ListingOptions) error {
c.checkpoint.mu.Lock()
albumPageToken := c.checkpoint.AlbumsNextPage
c.checkpoint.mu.Unlock()
Expand All @@ -184,8 +182,13 @@ func (c *Client) listCollections(ctx context.Context,
case <-ctx.Done():
return nil
default:
if opt.Verbose {
log.Printf("[DEBUG] %s/%s: listing albums: next page (page_token=%s)",
DataSourceID, c.userID, albumPageToken)
}

var err error
albumPageToken, err = c.getAlbumsAndTheirItemsNextPage(itemChan, albumPageToken, timeframe)
albumPageToken, err = c.getAlbumsAndTheirItemsNextPage(itemChan, albumPageToken, opt)
if err != nil {
return err
}
Expand All @@ -202,7 +205,7 @@ func (c *Client) listCollections(ctx context.Context,
}

func (c *Client) getAlbumsAndTheirItemsNextPage(itemChan chan<- *timeliner.ItemGraph,
pageToken string, timeframe timeliner.Timeframe) (string, error) {
pageToken string, opt timeliner.ListingOptions) (string, error) {
vals := url.Values{
"pageToken": {pageToken},
"pageSize": {"50"},
Expand All @@ -215,7 +218,12 @@ func (c *Client) getAlbumsAndTheirItemsNextPage(itemChan chan<- *timeliner.ItemG
}

for _, album := range respBody.Albums {
err = c.getAlbumItems(itemChan, album, timeframe)
if opt.Verbose {
log.Printf("[DEBUG] %s/%s: listing items in album: '%s' (album_id=%s item_count=%s)",
DataSourceID, c.userID, album.Title, album.ID, album.MediaItemsCount)
}

err = c.getAlbumItems(itemChan, album, opt)
if err != nil {
return "", err
}
Expand All @@ -224,15 +232,22 @@ func (c *Client) getAlbumsAndTheirItemsNextPage(itemChan chan<- *timeliner.ItemG
return respBody.NextPageToken, nil
}

func (c *Client) getAlbumItems(itemChan chan<- *timeliner.ItemGraph, album gpAlbum, timeframe timeliner.Timeframe) error {
func (c *Client) getAlbumItems(itemChan chan<- *timeliner.ItemGraph, album gpAlbum, opt timeliner.ListingOptions) error {
var albumItemsNextPage string
var counter int

const pageSize = 100

for {
reqBody := listMediaItemsRequest{
AlbumID: album.ID,
PageToken: albumItemsNextPage,
PageSize: 100,
PageSize: pageSize,
}

if opt.Verbose {
log.Printf("[DEBUG] %s/%s: getting next page of media items in album (album_id=%s page_size=%d page_token=%s)",
DataSourceID, c.userID, album.ID, pageSize, albumItemsNextPage)
}

page, err := c.pageOfMediaItems(reqBody)
Expand All @@ -248,10 +263,10 @@ func (c *Client) getAlbumItems(itemChan chan<- *timeliner.ItemGraph, album gpAlb
// have to iterate all items in all albums, but at least we
// can just skip items that fall outside the timeframe...
ts := it.Timestamp()
if timeframe.Since != nil && ts.Before(*timeframe.Since) {
if opt.Timeframe.Since != nil && ts.Before(*opt.Timeframe.Since) {
continue
}
if timeframe.Until != nil && ts.After(*timeframe.Until) {
if opt.Timeframe.Until != nil && ts.After(*opt.Timeframe.Until) {
continue
}

Expand Down Expand Up @@ -297,7 +312,7 @@ func (c *Client) apiRequestWithRetry(method, endpoint string, reqBodyData, respI
var resp *http.Response
resp, err = c.apiRequest(method, endpoint, reqBodyData)
if err != nil {
log.Printf("[ERROR][%s/%s] Doing API request: >>> %v <<< - retrying... (attempt %d/%d)",
log.Printf("[ERROR] %s/%s: doing API request: >>> %v <<< - retrying... (attempt %d/%d)",
DataSourceID, c.userID, err, i+1, maxTries)
time.Sleep(10 * time.Second)
continue
Expand All @@ -315,14 +330,14 @@ func (c *Client) apiRequestWithRetry(method, endpoint string, reqBodyData, respI

// extra-long pause for rate limiting errors
if resp.StatusCode == http.StatusTooManyRequests {
log.Printf("[ERROR][%s/%s] Rate limited: HTTP %d: %s: %s - retrying in 35 seconds... (attempt %d/%d)",
log.Printf("[ERROR] %s/%s: rate limited: HTTP %d: %s: %s - retrying in 35 seconds... (attempt %d/%d)",
DataSourceID, c.userID, resp.StatusCode, resp.Status, bodyText, i+1, maxTries)
time.Sleep(35 * time.Second)
continue
}

// for any other error, wait a couple seconds and retry
log.Printf("[ERROR][%s/%s] Bad API response: %v - retrying... (attempt %d/%d)",
log.Printf("[ERROR] %s/%s: bad API response: %v - retrying... (attempt %d/%d)",
DataSourceID, c.userID, err, i+1, maxTries)
time.Sleep(10 * time.Second)
continue
Expand All @@ -333,7 +348,7 @@ func (c *Client) apiRequestWithRetry(method, endpoint string, reqBodyData, respI
if err != nil {
resp.Body.Close()
err = fmt.Errorf("decoding JSON: %v", err)
log.Printf("[ERROR][%s/%s] Reading API response: %v - retrying... (attempt %d/%d)",
log.Printf("[ERROR] %s/%s: reading API response: %v - retrying... (attempt %d/%d)",
DataSourceID, c.userID, err, i+1, maxTries)
time.Sleep(10 * time.Second)
continue
Expand Down Expand Up @@ -418,7 +433,7 @@ type checkpointInfo struct {
func (ch *checkpointInfo) save(ctx context.Context) {
gobBytes, err := timeliner.MarshalGob(ch)
if err != nil {
log.Printf("[ERROR][%s] Encoding checkpoint: %v", DataSourceID, err)
log.Printf("[ERROR] %s: encoding checkpoint: %v", DataSourceID, err)
}
timeliner.Checkpoint(ctx, gobBytes)
}
Expand All @@ -431,6 +446,6 @@ func (ch *checkpointInfo) load(checkpointGob []byte) {
}
err := timeliner.UnmarshalGob(checkpointGob, ch)
if err != nil {
log.Printf("[ERROR][%s] Decoding checkpoint: %v", DataSourceID, err)
log.Printf("[ERROR] %s: decoding checkpoint: %v", DataSourceID, err)
}
}
4 changes: 2 additions & 2 deletions datasources/googlephotos/media.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (m mediaItem) DataFileReader() (io.ReadCloser, error) {
resp, err = http.Get(u)
if err != nil {
err = fmt.Errorf("getting media contents: %v", err)
log.Printf("[ERROR][%s] %s: %v - retrying... (attempt %d/%d)", DataSourceID, u, err, i+1, maxTries)
log.Printf("[ERROR] %s: %s: %v - retrying... (attempt %d/%d)", DataSourceID, u, err, i+1, maxTries)
time.Sleep(30 * time.Second)
continue
}
Expand All @@ -84,7 +84,7 @@ func (m mediaItem) DataFileReader() (io.ReadCloser, error) {
err = fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status)
}

log.Printf("[ERROR][%s] %s: Bad response: %v - waiting and retrying... (attempt %d/%d)",
log.Printf("[ERROR %s: %s: Bad response: %v - waiting and retrying... (attempt %d/%d)",
DataSourceID, u, err, i+1, maxTries)
time.Sleep(15 * time.Second)
continue
Expand Down
15 changes: 8 additions & 7 deletions itemfiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,32 @@ import (
)

// downloadItemFile ... TODO: finish godoc.
func (t *Timeline) downloadItemFile(src io.ReadCloser, dest *os.File, h hash.Hash) error {
func (t *Timeline) downloadItemFile(src io.ReadCloser, dest *os.File, h hash.Hash) (int64, error) {
if src == nil {
return fmt.Errorf("missing reader with which to download file")
return 0, fmt.Errorf("missing reader with which to download file")
}
if dest == nil {
return fmt.Errorf("missing file to download into")
return 0, fmt.Errorf("missing file to download into")
}

// TODO: What if file already exists on disk (byte-for-byte)? - i.e. data_hash in DB has a duplicate

// give the hasher a copy of the file bytes
tr := io.TeeReader(src, h)

if _, err := io.Copy(dest, tr); err != nil {
n, err := io.Copy(dest, tr)
if err != nil {
os.Remove(dest.Name())
return fmt.Errorf("copying contents: %v", err)
return n, fmt.Errorf("copying contents: %v", err)
}
if err := dest.Sync(); err != nil {
os.Remove(dest.Name())
return fmt.Errorf("syncing file: %v", err)
return n, fmt.Errorf("syncing file: %v", err)
}

// TODO: If mime type is photo or video, extract most important EXIF data and return it for storage in DB?

return nil
return n, nil
}

// makeUniqueCanonicalItemDataFileName returns an available
Expand Down
Loading

0 comments on commit 41cce90

Please sign in to comment.