diff --git a/cmd/timeliner/main.go b/cmd/timeliner/main.go index c8a3d8b..cb3e4ff 100644 --- a/cmd/timeliner/main.go +++ b/cmd/timeliner/main.go @@ -30,7 +30,7 @@ func init() { flag.BoolVar(&reprocess, "reprocess", reprocess, "Reprocess every item that has not been modified locally (download-all or import only)") flag.BoolVar(&twitterRetweets, "twitter-retweets", twitterRetweets, "Twitter: include retweets") - flag.BoolVar(&twitterReplies, "twitter-replies", twitterReplies, "Twitter: include replies") + flag.BoolVar(&twitterReplies, "twitter-replies", twitterReplies, "Twitter: include replies that are not just replies to self") } func main() { diff --git a/datasource.go b/datasource.go index 242f0c9..5a62dce 100644 --- a/datasource.go +++ b/datasource.go @@ -148,7 +148,8 @@ type Client interface { // relationships can be stored. If the relationships are not // discovered until later, that's OK: item processing is // idempotent, so repeating an item from earlier will have no - // adverse effects. + // adverse effects (this is possible because a unique ID is + // required for each item). // // Implementations must honor the context's cancellation. If // ctx.Done() is closed, the function should return. Typically, @@ -169,10 +170,15 @@ type Client interface { // filename is not specified but required, an error should be // returned. // - // opt.Timeframe consists of two optional timestamp values. - // If set, item listings should be bounded in the respective - // direction by that timestamp. If timeframes are not supported, - // this should be documented, but an error need not be returned. + // opt.Timeframe consists of two optional timestamp and/or item + // ID values. If set, item listings should be bounded in the + // respective direction by that timestamp / item ID. (Items + // are assumed to be part of a chronology; both timestamp and + // item ID *may be* provided, when possible, to accommodate + // data sources which do not constrain by timestamp but which + // do by item ID instead.) It should be documented if timeframes + // are not supported, but an error need not be returned if it + // cannot be honored. // // opt.Checkpoint consists of the last checkpoint for this // account if the last call to ListItems did not finish and @@ -186,11 +192,22 @@ type Client interface { ListItems(ctx context.Context, itemChan chan<- *ItemGraph, opt Options) error } -// Timeframe represents a start and end time, where -// either value could be nil, meaning unbounded in -// that direction. +// Timeframe represents a start and end time and/or +// a start and end item, where either value could be +// nil which means unbounded in that direction. +// When items are used as the timeframe boundaries, +// the ItemID fields will be populated. It is not +// guaranteed that any particular field will be set +// or unset just because other fields are set or unset. +// However, if both Since or both Until fields are +// set, that means the timestamp and items are +// correlated; i.e. the Since timestamp is (approx.) +// that of the item ID. Or, put another way: there +// will never be no conflicts among the fields which +// are non-nil. type Timeframe struct { - Since, Until *time.Time + Since, Until *time.Time + SinceItemID, UntilItemID *string } var dataSources = make(map[string]DataSource) diff --git a/datasources/twitter/api.go b/datasources/twitter/api.go new file mode 100644 index 0000000..af4f506 --- /dev/null +++ b/datasources/twitter/api.go @@ -0,0 +1,149 @@ +package twitter + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "strconv" + "strings" + + "github.com/mholt/timeliner" +) + +func (c *Client) getFromAPI(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, opt timeliner.Options) error { + // load any previous checkpoint + c.checkpoint.load(opt.Checkpoint) + + // get account owner information + cleanedScreenName := strings.TrimPrefix(c.acc.UserID, "@") + ownerAccount, err := c.getOwnerAccountFromAPI(cleanedScreenName) + if err != nil { + return fmt.Errorf("getting user account information for @%s: %v", cleanedScreenName, err) + } + c.ownerAccount = ownerAccount + + // get the starting bounds of this operation + var maxTweet, minTweet string + if opt.Timeframe.SinceItemID != nil { + minTweet = *opt.Timeframe.SinceItemID + } + if c.checkpoint.LastTweetID != "" { + // by default, start off at the last checkpoint + maxTweet = c.checkpoint.LastTweetID + if opt.Timeframe.UntilItemID != nil { + // if both a timeframe UntilItemID and a checkpoint are set, + // we will choose the one with a tweet ID that is higher, + // meaning more recent, to avoid potentially skipping + // a chunk of the timeline + maxTweet = maxTweetID(c.checkpoint.LastTweetID, *opt.Timeframe.UntilItemID) + } + } + + for { + select { + case <-ctx.Done(): + return nil + default: + tweets, err := c.nextPageOfTweetsFromAPI(maxTweet, minTweet) + if err != nil { + return fmt.Errorf("getting next page of tweets: %v", err) + } + + // TODO: Is this the right finish criteria? + if len(tweets) == 0 { + return nil + } + + for _, t := range tweets { + skip, err := c.prepareTweet(&t, "api") + if err != nil { + return fmt.Errorf("preparing tweet: %v", err) + } + if skip { + continue + } + + c.processTweet(itemChan, t, "") + } + + // since max_id is inclusive, subtract 1 from the tweet ID + // https://developer.twitter.com/en/docs/tweets/timelines/guides/working-with-timelines + nextTweetID := tweets[len(tweets)-1].TweetID - 1 + c.checkpoint.LastTweetID = strconv.FormatInt(int64(nextTweetID), 10) + c.checkpoint.save(ctx) + + // decrease maxTweet to get the next page on next iteration + maxTweet = c.checkpoint.LastTweetID + } + } +} + +// nextPageOfTweetsFromAPI returns the next page of tweets starting at maxTweet +// and going for a full page or until minTweet, whichever comes first. Generally, +// iterating over this function will involve decreasing maxTweet and leaving +// minTweet the same, if set at all (maxTweet = "until", minTweet = "since"). +// Either or both can be empty strings, for no boundaries. This function returns +// at least 0 tweets (signaling done, I think) or up to a full page of tweets. +func (c *Client) nextPageOfTweetsFromAPI(maxTweet, minTweet string) ([]tweet, error) { + q := url.Values{ + "user_id": {c.ownerAccount.id()}, // TODO + "count": {"200"}, + "tweet_mode": {"extended"}, // https://developer.twitter.com/en/docs/tweets/tweet-updates + "exclude_replies": {"false"}, // always include replies in case it's a self-reply; we can filter all others + "include_rts": {"false"}, + } + if maxTweet != "" { + q.Set("max_id", maxTweet) + } + if minTweet != "" { + q.Set("since_id", minTweet) + } + if c.Retweets { + q.Set("include_rts", "true") + } + + resp, err := c.HTTPClient.Get("https://api.twitter.com/1.1/statuses/user_timeline.json?" + q.Encode()) + if err != nil { + return nil, fmt.Errorf("performing API request: %v", err) + } + defer resp.Body.Close() + + // TODO: handle HTTP errors, esp. rate limiting, a lot better + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("HTTP error: %s", resp.Status) + } + + var tweets []tweet + err = json.NewDecoder(resp.Body).Decode(&tweets) + if err != nil { + return nil, fmt.Errorf("reading response body: %v", err) + } + + return tweets, nil +} + +func (c *Client) getOwnerAccountFromAPI(screenName string) (twitterAccount, error) { + var ta twitterAccount + + q := url.Values{"screen_name": {screenName}} + + resp, err := c.HTTPClient.Get("https://api.twitter.com/1.1/users/show.json?" + q.Encode()) + if err != nil { + return ta, fmt.Errorf("performing API request: %v", err) + } + defer resp.Body.Close() + + // TODO: handle HTTP errors, esp. rate limiting, a lot better + if resp.StatusCode != http.StatusOK { + return ta, fmt.Errorf("HTTP error: %s", resp.Status) + } + + err = json.NewDecoder(resp.Body).Decode(&ta) + if err != nil { + return ta, fmt.Errorf("reading response body: %v", err) + } + + return ta, nil +} diff --git a/datasources/twitter/archives.go b/datasources/twitter/archives.go new file mode 100644 index 0000000..9702ded --- /dev/null +++ b/datasources/twitter/archives.go @@ -0,0 +1,168 @@ +package twitter + +import ( + "encoding/json" + "fmt" + "io" + + "github.com/mholt/archiver" + "github.com/mholt/timeliner" +) + +func (c *Client) getFromArchiveFile(itemChan chan<- *timeliner.ItemGraph, opt timeliner.Options) error { + // load the user's account ID + var err error + c.ownerAccount, err = c.getOwnerAccountFromArchive(opt.Filename) + if err != nil { + return fmt.Errorf("unable to get user account ID: %v", err) + } + + // first pass - add tweets to timeline + err = c.processArchive(opt.Filename, itemChan, c.processTweet) + if err != nil { + return fmt.Errorf("processing tweets: %v", err) + } + + // second pass - add tweet relationships to timeline + err = c.processArchive(opt.Filename, itemChan, c.processReplyRelationFromArchive) + if err != nil { + return fmt.Errorf("processing tweets: %v", err) + } + + return nil +} + +func (c *Client) processArchive(archiveFilename string, itemChan chan<- *timeliner.ItemGraph, processFunc archiveProcessFn) error { + err := archiver.Walk(archiveFilename, func(f archiver.File) error { + defer f.Close() + if f.Name() != "tweet.js" { + return nil + } + + // consume non-JSON preface (JavaScript variable definition) + err := stripPreface(f, tweetFilePreface) + if err != nil { + return fmt.Errorf("reading tweet file preface: %v", err) + } + + err = c.processTweetsFromArchive(itemChan, f, archiveFilename, processFunc) + if err != nil { + return fmt.Errorf("processing tweet file: %v", err) + } + + return archiver.ErrStopWalk + }) + if err != nil { + return fmt.Errorf("walking archive file %s: %v", archiveFilename, err) + } + + return nil +} + +func (c *Client) processTweetsFromArchive(itemChan chan<- *timeliner.ItemGraph, f io.Reader, + archiveFilename string, processFunc archiveProcessFn) error { + + dec := json.NewDecoder(f) + + // read array opening bracket '[' + _, err := dec.Token() + if err != nil { + return fmt.Errorf("decoding opening token: %v", err) + } + + for dec.More() { + var t tweet + err := dec.Decode(&t) + if err != nil { + return fmt.Errorf("decoding tweet element: %v", err) + } + + skip, err := c.prepareTweet(&t, "archive") + if err != nil { + return fmt.Errorf("preparing tweet: %v", err) + } + if skip { + continue + } + + err = processFunc(itemChan, t, archiveFilename) + if err != nil { + return fmt.Errorf("processing tweet: %v", err) + } + } + + return nil +} + +func (c *Client) processReplyRelationFromArchive(itemChan chan<- *timeliner.ItemGraph, t tweet, archiveFilename string) error { + if t.InReplyToStatusIDStr == "" { + // current tweet is not a reply, so no relationship to add + return nil + } + if !c.topLevelTweets.Lookup([]byte(t.InReplyToStatusIDStr)) { + // reply is not to a top-level tweet by self, so doesn't qualify for what we want + return nil + } + + ig := &timeliner.ItemGraph{ + Relations: []timeliner.RawRelation{ + { + FromItemID: t.TweetIDStr, + ToItemID: t.InReplyToStatusIDStr, + Relation: timeliner.RelReplyTo, + }, + }, + } + + itemChan <- ig + + return nil +} + +func (c *Client) getOwnerAccountFromArchive(filename string) (twitterAccount, error) { + var ta twitterAccount + err := archiver.Walk(filename, func(f archiver.File) error { + defer f.Close() + if f.Name() != "account.js" { + return nil + } + + // consume non-JSON preface (JavaScript variable definition) + err := stripPreface(f, accountFilePreface) + if err != nil { + return fmt.Errorf("reading account file preface: %v", err) + } + + var accFile twitterAccountFile + err = json.NewDecoder(f).Decode(&accFile) + if err != nil { + return fmt.Errorf("decoding account file: %v", err) + } + if len(accFile) == 0 { + return fmt.Errorf("account file was empty") + } + + ta = accFile[0].Account + + return archiver.ErrStopWalk + }) + return ta, err +} + +func stripPreface(f io.Reader, preface string) error { + buf := make([]byte, len(preface)) + _, err := io.ReadFull(f, buf) + return err +} + +// archiveProcessFn is a function that processes a +// tweet from a Twitter export archive. +type archiveProcessFn func(itemChan chan<- *timeliner.ItemGraph, t tweet, archiveFilename string) error + +// Variable definitions that are intended for +// use with JavaScript but which are of no use +// to us and would break the JSON parser. +const ( + tweetFilePreface = "window.YTD.tweet.part0 =" + accountFilePreface = "window.YTD.account.part0 =" +) diff --git a/datasources/twitter/models.go b/datasources/twitter/models.go index 09cd20b..b133d97 100644 --- a/datasources/twitter/models.go +++ b/datasources/twitter/models.go @@ -1,11 +1,12 @@ package twitter import ( + "bytes" + "encoding/json" "fmt" "io" "net/url" "path" - "strconv" "strings" "time" @@ -13,40 +14,44 @@ import ( ) type tweet struct { - Retweeted bool `json:"retweeted"` // always false (at least, with the export file) - Source string `json:"source"` - Entities *tweetEntities `json:"entities,omitempty"` // DO NOT USE (https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/entities-object.html#media) - DisplayTextRange []string `json:"display_text_range"` - FavoriteCount string `json:"favorite_count"` - TweetIDStr string `json:"id_str"` - Truncated bool `json:"truncated"` - Geo *tweetGeo `json:"geo,omitempty"` // deprecated, see coordinates + Contributors interface{} `json:"contributors"` Coordinates *tweetGeo `json:"coordinates,omitempty"` - RetweetCount string `json:"retweet_count"` - TweetID string `json:"id"` - InReplyToStatusID string `json:"in_reply_to_status_id,omitempty"` - InReplyToStatusIDStr string `json:"in_reply_to_status_id_str,omitempty"` CreatedAt string `json:"created_at"` + DisplayTextRange []transInt `json:"display_text_range"` + Entities *twitterEntities `json:"entities,omitempty"` // DO NOT USE (https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/entities-object.html#media) + ExtendedEntities *extendedEntities `json:"extended_entities,omitempty"` + FavoriteCount transInt `json:"favorite_count"` Favorited bool `json:"favorited"` - FullText string `json:"full_text"` - Lang string `json:"lang"` + FullText string `json:"full_text"` // tweet_mode=extended (https://developer.twitter.com/en/docs/tweets/tweet-updates) + Geo *tweetGeo `json:"geo,omitempty"` // deprecated, see coordinates InReplyToScreenName string `json:"in_reply_to_screen_name,omitempty"` - InReplyToUserID string `json:"in_reply_to_user_id,omitempty"` + InReplyToStatusID transInt `json:"in_reply_to_status_id,omitempty"` + InReplyToStatusIDStr string `json:"in_reply_to_status_id_str,omitempty"` + InReplyToUserID transInt `json:"in_reply_to_user_id,omitempty"` InReplyToUserIDStr string `json:"in_reply_to_user_id_str,omitempty"` + IsQuoteStatus bool `json:"is_quote_status"` + Lang string `json:"lang"` + Place interface{} `json:"place"` PossiblySensitive bool `json:"possibly_sensitive,omitempty"` - ExtendedEntities *extendedEntities `json:"extended_entities,omitempty"` + RetweetCount transInt `json:"retweet_count"` + Retweeted bool `json:"retweeted"` // always false for some reason + RetweetedStatus *tweet `json:"retweeted_status"` // API: contains full_text of a retweet (otherwise is truncated) + Source string `json:"source"` + Text string `json:"text"` // As of Feb. 2019, Twitter API default; truncated at ~140 chars (see FullText) + Truncated bool `json:"truncated"` // API: always false in tweet_mode=extended, even if full_text is truncated (retweets) + TweetID transInt `json:"id"` + TweetIDStr string `json:"id_str"` + User *twitterUser `json:"user"` WithheldCopyright bool `json:"withheld_copyright,omitempty"` WithheldInCountries []string `json:"withheld_in_countries,omitempty"` WithheldScope string `json:"withheld_scope,omitempty"` createdAtParsed time.Time ownerAccount twitterAccount + source string // "api|archive" } func (t *tweet) ID() string { - if t.TweetID != "" { - return t.TweetID - } return t.TweetIDStr } @@ -59,11 +64,22 @@ func (t *tweet) Class() timeliner.ItemClass { } func (t *tweet) Owner() (id *string, name *string) { - return &t.ownerAccount.AccountID, &t.ownerAccount.Username + idStr := t.ownerAccount.id() + nameStr := t.ownerAccount.screenName() + if idStr != "" { + id = &idStr + } + if nameStr != "" { + name = &nameStr + } + return } func (t *tweet) DataText() (*string, error) { - return &t.FullText, nil + if txt := t.text(); txt != "" { + return &txt, nil + } + return nil, nil } func (t *tweet) DataFileName() *string { @@ -103,14 +119,15 @@ func (t *tweet) Location() (*timeliner.Location, error) { } func (t *tweet) isRetweet() bool { - if t.Retweeted { + if t.Retweeted || t.RetweetedStatus != nil { return true } // TODO: For some reason, when exporting one's Twitter data, // it always sets "retweeted" to false, even when "full_text" // clearly shows it's a retweet by prefixing it with "RT @" - // - this seems like a bug with Twitter's exporter - return strings.HasPrefix(t.FullText, "RT @") + // - this seems like a bug with Twitter's exporter... okay + // actually the API does it too, that's dumb + return strings.HasPrefix(t.text(), "RT @") } func (t *tweet) hasExactlyOneMediaItem() bool { @@ -123,6 +140,19 @@ func (t *tweet) hasExactlyOneMediaItem() bool { return t.ExtendedEntities != nil && len(t.ExtendedEntities.Media) == 1 } +func (t *tweet) text() string { + // sigh, retweets get truncated if they're tall, + // so we have to get the full text from a subfield + if t.RetweetedStatus != nil { + return strings.TrimSpace(fmt.Sprintf("RT @%s %s", + t.RetweetedStatus.User.ScreenName, t.RetweetedStatus.text())) + } + if t.FullText != "" { + return t.FullText + } + return t.Text +} + type tweetGeo struct { Type string `json:"type"` Coordinates []string `json:"coordinates"` // "latitude, then a longitude" @@ -150,7 +180,7 @@ type boundingBox struct { Coordinates [][][]float64 `json:"coordinates"` } -type tweetEntities struct { +type twitterEntities struct { Hashtags []hashtagEntity `json:"hashtags"` Symbols []symbolEntity `json:"symbols"` UserMentions []userMentionEntity `json:"user_mentions"` @@ -159,13 +189,13 @@ type tweetEntities struct { } type hashtagEntity struct { - Indices []string `json:"indices"` - Text string `json:"text"` + Indices []transInt `json:"indices"` + Text string `json:"text"` } type symbolEntity struct { - Indices []string `json:"indices"` - Text string `json:"text"` + Indices []transInt `json:"indices"` + Text string `json:"text"` } type urlEntity struct { @@ -173,7 +203,7 @@ type urlEntity struct { ExpandedURL string `json:"expanded_url"` DisplayURL string `json:"display_url"` Unwound *urlEntityUnwound `json:"unwound,omitempty"` - Indices []string `json:"indices"` + Indices []transInt `json:"indices"` } type urlEntityUnwound struct { @@ -184,11 +214,11 @@ type urlEntityUnwound struct { } type userMentionEntity struct { - Name string `json:"name"` - ScreenName string `json:"screen_name"` - Indices []string `json:"indices"` - IDStr string `json:"id_str"` - ID string `json:"id"` + Name string `json:"name"` + ScreenName string `json:"screen_name"` + Indices []transInt `json:"indices"` + IDStr string `json:"id_str"` + ID transInt `json:"id"` } type pollEntity struct { @@ -207,31 +237,28 @@ type extendedEntities struct { } type mediaItem struct { + AdditionalMediaInfo *additionalMediaInfo `json:"additional_media_info,omitempty"` + DisplayURL string `json:"display_url"` ExpandedURL string `json:"expanded_url"` - SourceStatusID string `json:"source_status_id"` - Indices []string `json:"indices"` - URL string `json:"url"` - MediaURL string `json:"media_url"` + Indices []transInt `json:"indices"` + MediaID transInt `json:"id"` MediaIDStr string `json:"id_str"` - VideoInfo *videoInfo `json:"video_info,omitempty"` - SourceUserID string `json:"source_user_id"` - AdditionalMediaInfo *additionalMediaInfo `json:"additional_media_info,omitempty"` - MediaID string `json:"id"` + MediaURL string `json:"media_url"` MediaURLHTTPS string `json:"media_url_https"` - SourceUserIDStr string `json:"source_user_id_str"` Sizes mediaSizes `json:"sizes"` - Type string `json:"type"` + SourceStatusID transInt `json:"source_status_id"` SourceStatusIDStr string `json:"source_status_id_str"` - DisplayURL string `json:"display_url"` + SourceUserID transInt `json:"source_user_id"` + SourceUserIDStr string `json:"source_user_id_str"` + Type string `json:"type"` + URL string `json:"url"` + VideoInfo *videoInfo `json:"video_info,omitempty"` - parent *tweet - reader io.Reader + parent *tweet + readCloser io.ReadCloser // access to the media contents } func (m *mediaItem) ID() string { - if m.MediaID != "" { - return m.MediaID - } return m.MediaIDStr } @@ -252,7 +279,10 @@ func (m *mediaItem) Class() timeliner.ItemClass { } func (m *mediaItem) Owner() (id *string, name *string) { - return &m.SourceUserID, nil + if m.SourceUserIDStr == "" { + return m.parent.Owner() + } + return &m.SourceUserIDStr, nil } func (m *mediaItem) DataText() (*string, error) { @@ -260,37 +290,26 @@ func (m *mediaItem) DataText() (*string, error) { } func (m *mediaItem) DataFileName() *string { - var source string - switch m.Type { - case "animated_gif": - fallthrough - case "video": - _, _, source = m.getLargestVideo() - u, err := url.Parse(source) - if err == nil { - source = path.Base(u.Path) - } else { - source = path.Base(source) - } - case "photo": - // TODO -- how to get the largest, will there be multiple?? - mURL := m.getURL() - u, err := url.Parse(mURL) - if err == nil { - source = path.Base(u.Path) - } else { - source = path.Base(mURL) - } + source := m.getURL() + u, err := url.Parse(source) + if err == nil { + source = path.Base(u.Path) + } else { + source = path.Base(source) + } + // media in the export archives are prefixed by the + // tweet ID they were posted with and a hyphen + if m.parent.source == "archive" { + source = fmt.Sprintf("%s-%s", m.parent.TweetIDStr, source) } - filename := fmt.Sprintf("%s-%s", m.parent.TweetID, source) - return &filename + return &source } func (m *mediaItem) DataFileReader() (io.ReadCloser, error) { - if m.reader == nil { - return nil, fmt.Errorf("missing data file reader; this is probably a bug: %+v - video info: %+v", m, m.VideoInfo) + if m.readCloser == nil { + return nil, fmt.Errorf("missing data file reader; this is probably a bug: %+v -- video info (if any): %+v", m, m.VideoInfo) } - return timeliner.FakeCloser(m.reader), nil + return m.readCloser, nil } func (m *mediaItem) DataFileHash() []byte { @@ -331,34 +350,42 @@ func (m *mediaItem) Location() (*timeliner.Location, error) { return nil, nil // TODO } -// TODO: How to get the largest image file? (The importer only has a single copy available to it, but videos have multiple variants....) - func (m *mediaItem) getLargestVideo() (bitrate int, contentType, source string) { if m.VideoInfo == nil { return } bitrate = -1 // so that greater-than comparison below works for video bitrate=0 (animated_gif) for _, v := range m.VideoInfo.Variants { - brInt, err := strconv.Atoi(v.Bitrate) - if err != nil { - continue - } - if brInt > bitrate { + if int(v.Bitrate) > bitrate { source = v.URL contentType = v.ContentType - bitrate = brInt + bitrate = int(v.Bitrate) } } return } -// TODO: This works only for images... func (m *mediaItem) getURL() string { - if m.MediaURLHTTPS != "" { - return m.MediaURLHTTPS + switch m.Type { + case "animated_gif": + fallthrough + case "video": + _, _, source := m.getLargestVideo() + return source + case "photo": + // the size of the photo can be adjusted + // when downloading by appending a size + // to the end of the URL: ":thumb", ":small", + // ":medium", ":large", or ":orig" -- but + // we don't do that here, only do that when + // actually downloading + if m.MediaURLHTTPS != "" { + return m.MediaURLHTTPS + } + return m.MediaURL } - return m.MediaURL + return "" } type additionalMediaInfo struct { @@ -366,15 +393,15 @@ type additionalMediaInfo struct { } type videoInfo struct { - AspectRatio []string `json:"aspect_ratio"` - DurationMillis string `json:"duration_millis"` + AspectRatio []transFloat `json:"aspect_ratio"` + DurationMillis transInt `json:"duration_millis"` Variants []videoVariants `json:"variants"` } type videoVariants struct { - Bitrate string `json:"bitrate,omitempty"` - ContentType string `json:"content_type,omitempty"` - URL string `json:"url"` + Bitrate transInt `json:"bitrate,omitempty"` + ContentType string `json:"content_type,omitempty"` + URL string `json:"url"` } type mediaSizes struct { @@ -385,44 +412,173 @@ type mediaSizes struct { } type mediaSize struct { - W string `json:"w"` - H string `json:"h"` - Resize string `json:"resize"` // fit|crop + W transInt `json:"w"` + H transInt `json:"h"` + Resize string `json:"resize"` // fit|crop } type twitterUser struct { - ID int64 `json:"id"` - IDStr string `json:"id_str"` - Name string `json:"name"` - ScreenName string `json:"screen_name"` - Location string `json:"location"` - URL string `json:"url"` - Description string `json:"description"` - Verified bool `json:"verified"` - FollowersCount int `json:"followers_count"` - FriendsCount int `json:"friends_count"` - ListedCount int `json:"listed_count"` - FavouritesCount int `json:"favourites_count"` - StatusesCount int `json:"statuses_count"` - CreatedAt string `json:"created_at"` - UTCOffset interface{} `json:"utc_offset"` - TimeZone interface{} `json:"time_zone"` - GeoEnabled bool `json:"geo_enabled"` - Lang string `json:"lang"` - ProfileImageURLHTTPS string `json:"profile_image_url_https"` - - // TODO: more fields exist; need to get actual example to build struct from + ContributorsEnabled bool `json:"contributors_enabled"` + CreatedAt string `json:"created_at"` + DefaultProfile bool `json:"default_profile"` + DefaultProfileImage bool `json:"default_profile_image"` + Description string `json:"description"` + Entities *twitterEntities `json:"entities"` + FavouritesCount int `json:"favourites_count"` + FollowersCount int `json:"followers_count"` + Following interface{} `json:"following"` + FollowRequestSent interface{} `json:"follow_request_sent"` + FriendsCount int `json:"friends_count"` + GeoEnabled bool `json:"geo_enabled"` + HasExtendedProfile bool `json:"has_extended_profile"` + IsTranslationEnabled bool `json:"is_translation_enabled"` + IsTranslator bool `json:"is_translator"` + Lang string `json:"lang"` + ListedCount int `json:"listed_count"` + Location string `json:"location"` + Name string `json:"name"` + Notifications interface{} `json:"notifications"` + ProfileBackgroundColor string `json:"profile_background_color"` + ProfileBackgroundImageURL string `json:"profile_background_image_url"` + ProfileBackgroundImageURLHTTPS string `json:"profile_background_image_url_https"` + ProfileBackgroundTile bool `json:"profile_background_tile"` + ProfileBannerURL string `json:"profile_banner_url"` + ProfileImageURL string `json:"profile_image_url"` + ProfileImageURLHTTPS string `json:"profile_image_url_https"` + ProfileLinkColor string `json:"profile_link_color"` + ProfileSidebarBorderColor string `json:"profile_sidebar_border_color"` + ProfileSidebarFillColor string `json:"profile_sidebar_fill_color"` + ProfileTextColor string `json:"profile_text_color"` + ProfileUseBackgroundImage bool `json:"profile_use_background_image"` + Protected bool `json:"protected"` + ScreenName string `json:"screen_name"` + StatusesCount int `json:"statuses_count"` + TimeZone interface{} `json:"time_zone"` + TranslatorType string `json:"translator_type"` + URL string `json:"url"` + UserID transInt `json:"id"` + UserIDStr string `json:"id_str"` + UtcOffset interface{} `json:"utc_offset"` + Verified bool `json:"verified"` } + type twitterAccountFile []struct { Account twitterAccount `json:"account"` } type twitterAccount struct { - PhoneNumber string `json:"phoneNumber"` - Email string `json:"email"` - CreatedVia string `json:"createdVia"` - Username string `json:"username"` - AccountID string `json:"accountId"` - CreatedAt time.Time `json:"createdAt"` - AccountDisplayName string `json:"accountDisplayName"` + // fields from export archive file: account.js + PhoneNumber string `json:"phoneNumber"` + Email string `json:"email"` + CreatedVia string `json:"createdVia"` + Username string `json:"username"` + AccountID string `json:"accountId"` + AccountDisplayName string `json:"accountDisplayName"` + + // fields from API endpoint: GET users/show + ID int `json:"id"` + IDStr string `json:"id_str"` + Name string `json:"name"` + ScreenName string `json:"screen_name"` + Location string `json:"location"` + ProfileLocation interface{} `json:"profile_location"` + Description string `json:"description"` + URL string `json:"url"` + Protected bool `json:"protected"` + FollowersCount int `json:"followers_count"` + FriendsCount int `json:"friends_count"` + ListedCount int `json:"listed_count"` + FavouritesCount int `json:"favourites_count"` + UtcOffset interface{} `json:"utc_offset"` + TimeZone interface{} `json:"time_zone"` + GeoEnabled bool `json:"geo_enabled"` + Verified bool `json:"verified"` + StatusesCount int `json:"statuses_count"` + Lang string `json:"lang"` + Status *tweet `json:"status"` + ContributorsEnabled bool `json:"contributors_enabled"` + IsTranslator bool `json:"is_translator"` + IsTranslationEnabled bool `json:"is_translation_enabled"` + ProfileBackgroundColor string `json:"profile_background_color"` + ProfileBackgroundImageURL string `json:"profile_background_image_url"` + ProfileBackgroundImageURLHTTPS string `json:"profile_background_image_url_https"` + ProfileBackgroundTile bool `json:"profile_background_tile"` + ProfileImageURL string `json:"profile_image_url"` + ProfileImageURLHTTPS string `json:"profile_image_url_https"` + ProfileBannerURL string `json:"profile_banner_url"` + ProfileLinkColor string `json:"profile_link_color"` + ProfileSidebarBorderColor string `json:"profile_sidebar_border_color"` + ProfileSidebarFillColor string `json:"profile_sidebar_fill_color"` + ProfileTextColor string `json:"profile_text_color"` + ProfileUseBackgroundImage bool `json:"profile_use_background_image"` + HasExtendedProfile bool `json:"has_extended_profile"` + DefaultProfile bool `json:"default_profile"` + DefaultProfileImage bool `json:"default_profile_image"` + Following interface{} `json:"following"` + FollowRequestSent interface{} `json:"follow_request_sent"` + Notifications interface{} `json:"notifications"` + TranslatorType string `json:"translator_type"` + + // fields in both export archive file and API + CreatedAt string `json:"createdAt"` // NOTE: string with API, time.Time from archive +} + +func (ta twitterAccount) screenName() string { + if ta.ScreenName != "" { + return ta.ScreenName + } + return ta.Username +} + +func (ta twitterAccount) id() string { + if ta.IDStr != "" { + return ta.IDStr + } + return ta.AccountID +} + +func (ta twitterAccount) name() string { + if ta.Name != "" { + return ta.Name + } + return ta.AccountDisplayName +} + +// transInt is an integer that could be +// unmarshaled from a string, too. This +// is needed because the archive JSON +// from Twitter uses all string values, +// but the same fields are integers with +// the API. +type transInt int + +func (ti *transInt) UnmarshalJSON(b []byte) error { + if len(b) == 0 { + return fmt.Errorf("no value") + } + b = bytes.Trim(b, "\"") + var i int + err := json.Unmarshal(b, &i) + if err != nil { + return err + } + *ti = transInt(i) + return nil +} + +// transFloat is like transInt but for floats. +type transFloat float64 + +func (tf *transFloat) UnmarshalJSON(b []byte) error { + if len(b) == 0 { + return fmt.Errorf("no value") + } + b = bytes.Trim(b, "\"") + var f float64 + err := json.Unmarshal(b, &f) + if err != nil { + return err + } + *tf = transFloat(f) + return nil } diff --git a/datasources/twitter/twitter.go b/datasources/twitter/twitter.go index 02573ca..09849fe 100644 --- a/datasources/twitter/twitter.go +++ b/datasources/twitter/twitter.go @@ -6,11 +6,12 @@ import ( "archive/zip" "bytes" "context" - "encoding/json" "fmt" "io" "log" + "net/http" "path" + "strconv" "time" "github.com/mholt/archiver" @@ -27,8 +28,21 @@ const ( var dataSource = timeliner.DataSource{ ID: DataSourceID, Name: DataSourceName, + OAuth2: timeliner.OAuth2{ + ProviderID: "twitter", + }, + RateLimit: timeliner.RateLimit{ + // from https://developer.twitter.com/en/docs/basics/rate-limits + // with some leeway since it's actually a pretty generous limit + RequestsPerHour: 5900, + }, NewClient: func(acc timeliner.Account) (timeliner.Client, error) { + httpClient, err := acc.NewHTTPClient() + if err != nil { + return nil, err + } return &Client{ + HTTPClient: httpClient, acc: acc, topLevelTweets: cuckoo.NewFilter(1000000), }, nil @@ -49,115 +63,50 @@ type Client struct { Replies bool // TODO: replies should include context, like the surrounding conversation, as part of the graph... // Threads bool // TODO: this requires more tweets, using the API - acc timeliner.Account + HTTPClient *http.Client - ownerAccount twitterAccount + checkpoint checkpointInfo + acc timeliner.Account + ownerAccount twitterAccount topLevelTweets *cuckoo.Filter } -// ListItems lists items from opt.Filename. TODO: support API too +// ListItems lists items from opt.Filename if specified, or from the API otherwise. func (c *Client) ListItems(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, opt timeliner.Options) error { defer close(itemChan) - // TODO: integrate with the API too - if opt.Filename == "" { - return fmt.Errorf("filename is required") + if opt.Filename != "" { + return c.getFromArchiveFile(itemChan, opt) } - // load the user's account ID - var err error - c.ownerAccount, err = c.getOwnerAccount(opt.Filename) - if err != nil { - return fmt.Errorf("unable to get user account ID: %v", err) - } - - // first pass - add tweets to timeline - err = c.processArchive(opt.Filename, itemChan, c.processTweet) - if err != nil { - return fmt.Errorf("processing tweets: %v", err) - } - - // second pass - add tweet relationships to timeline - err = c.processArchive(opt.Filename, itemChan, c.processReplyRelation) - if err != nil { - return fmt.Errorf("processing tweets: %v", err) - } - - return nil + return c.getFromAPI(ctx, itemChan, opt) } -func (c *Client) processArchive(archiveFilename string, itemChan chan<- *timeliner.ItemGraph, processFunc processFn) error { - err := archiver.Walk(archiveFilename, func(f archiver.File) error { - defer f.Close() - if f.Name() != "tweet.js" { - return nil - } - - // consume non-JSON preface (JavaScript variable definition) - err := stripPreface(f, tweetFilePreface) - if err != nil { - return fmt.Errorf("reading tweet file preface: %v", err) - } +func (c *Client) prepareTweet(t *tweet, source string) (skip bool, err error) { + // mark whether this tweet came from the API or an export file + t.source = source - err = c.processTweets(itemChan, f, archiveFilename, processFunc) - if err != nil { - return fmt.Errorf("processing tweet file: %v", err) - } + // tweets from an import file are presumed to all be from the account owner + t.ownerAccount = c.ownerAccount - return archiver.ErrStopWalk - }) - if err != nil { - return fmt.Errorf("walking archive file %s: %v", archiveFilename, err) + // skip tweets we aren't interested in + if !c.Retweets && t.isRetweet() { + return true, nil } - - return nil -} - -func (c *Client) processTweets(itemChan chan<- *timeliner.ItemGraph, f io.Reader, - archiveFilename string, processFunc processFn) error { - - dec := json.NewDecoder(f) - - // read array opening bracket '[' - _, err := dec.Token() - if err != nil { - return fmt.Errorf("decoding opening token: %v", err) + if !c.Replies && t.InReplyToUserIDStr != "" && t.InReplyToUserIDStr != t.ownerAccount.id() { + // TODO: Replies should have more context, like what are we replying to, etc... the whole thread, even? + // this option is about replies to tweets other than our own, which are like a continuation of one thought + return true, nil } - for dec.More() { - var t tweet - err := dec.Decode(&t) - if err != nil { - return fmt.Errorf("decoding tweet element: %v", err) - } - - // tweets from an import file are presumed to all be from the account owner - t.ownerAccount = c.ownerAccount - - // skip tweets we aren't interested in - if !c.Retweets && t.isRetweet() { - continue // retweets - } - if !c.Replies && t.InReplyToUserID != "" && t.InReplyToUserID != t.ownerAccount.AccountID { - // TODO: Replies should have more context, like what are we replying to, etc... the whole thread, even? - // this option is about replies to tweets other than our own, which are like a continuation of one thought - continue // replies - } - - // parse Twitter's time string into an actual time value - t.createdAtParsed, err = time.Parse("Mon Jan 2 15:04:05 -0700 2006", t.CreatedAt) - if err != nil { - return fmt.Errorf("parsing created_at time: %v", err) - } - - err = processFunc(itemChan, t, archiveFilename) - if err != nil { - return fmt.Errorf("processing tweet: %v", err) - } + // parse Twitter's time string into an actual time value + t.createdAtParsed, err = time.Parse("Mon Jan 2 15:04:05 -0700 2006", t.CreatedAt) + if err != nil { + return false, fmt.Errorf("parsing created_at time: %v", err) } - return nil + return false, nil } func (c *Client) processTweet(itemChan chan<- *timeliner.ItemGraph, t tweet, archiveFilename string) error { @@ -168,7 +117,7 @@ func (c *Client) processTweet(itemChan chan<- *timeliner.ItemGraph, t tweet, arc // as a separate item, unless there's exactly 1, in which case we // in-line it into the tweet itself) var ig *timeliner.ItemGraph - if t.FullText != "" || !oneMediaItem { + if t.text() != "" || !oneMediaItem { ig = timeliner.NewItemGraph(&t) } @@ -188,37 +137,56 @@ func (c *Client) processTweet(itemChan chan<- *timeliner.ItemGraph, t tweet, arc dataFileName = *dfn } - targetFileInArchive := path.Join("tweet_media", dataFileName) + switch t.source { + case "archive": + targetFileInArchive := path.Join("tweet_media", dataFileName) - err := archiver.Walk(archiveFilename, func(f archiver.File) error { - if f.Header.(zip.FileHeader).Name != targetFileInArchive { - return nil - } + err := archiver.Walk(archiveFilename, func(f archiver.File) error { + if f.Header.(zip.FileHeader).Name != targetFileInArchive { + return nil + } - buf := new(bytes.Buffer) - _, err := io.Copy(buf, f) + buf := new(bytes.Buffer) + _, err := io.Copy(buf, f) + if err != nil { + return fmt.Errorf("copying item into memory: %v", err) + } + m.readCloser = timeliner.FakeCloser(buf) + + return archiver.ErrStopWalk + }) if err != nil { - return fmt.Errorf("copying item into memory: %v", err) + return fmt.Errorf("walking archive file %s in search of tweet media: %v", + archiveFilename, err) } - m.reader = buf - if !oneMediaItem { - if ig != nil { - ig.Add(m, timeliner.RelAttached) - } - collItems = append(collItems, timeliner.CollectionItem{ - Item: m, - Position: i, - }) + case "api": + mediaURL := m.getURL() + if m.Type == "photo" { + mediaURL += ":orig" // get original file, with metadata } + resp, err := http.Get(mediaURL) + if err != nil { + return fmt.Errorf("getting media resource %s: %v", m.MediaURLHTTPS, err) + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("media resource returned HTTP status %s: %s", resp.Status, m.MediaURLHTTPS) + } + m.readCloser = resp.Body - return archiver.ErrStopWalk - }) - if err != nil { - return fmt.Errorf("walking archive file %s in search of tweet media: %v", - archiveFilename, err) + default: + return fmt.Errorf("unrecognized source value: must be api or archive: %s", t.source) } + if !oneMediaItem { + if ig != nil { + ig.Add(m, timeliner.RelAttached) + } + collItems = append(collItems, timeliner.CollectionItem{ + Item: m, + Position: i, + }) + } } if len(collItems) > 0 { @@ -229,6 +197,12 @@ func (c *Client) processTweet(itemChan chan<- *timeliner.ItemGraph, t tweet, arc } } + // if we're using the API, go ahead and get the + // 'parent' tweet to which this tweet is a reply + if t.source == "api" && t.InReplyToStatusIDStr != "" { + // TODO: link up replies when processing via the API + } + // send the tweet for processing if ig != nil { itemChan <- ig @@ -237,81 +211,56 @@ func (c *Client) processTweet(itemChan chan<- *timeliner.ItemGraph, t tweet, arc // if this is a top-level tweet (i.e. not a reply), mark // it so that we can use it to get replies from our own // top level tweets, as they can be a continuation of thought - if t.InReplyToStatusID == "" { - c.topLevelTweets.InsertUnique([]byte(t.TweetID)) + if t.InReplyToStatusIDStr == "" { + c.topLevelTweets.InsertUnique([]byte(t.TweetIDStr)) } return nil } -func (c *Client) processReplyRelation(itemChan chan<- *timeliner.ItemGraph, t tweet, archiveFilename string) error { - if t.InReplyToStatusID == "" { - // current tweet is not a reply, so no relationship to add - return nil - } - if !c.topLevelTweets.Lookup([]byte(t.InReplyToStatusID)) { - // reply is not to a top-level tweet by self, so doesn't qualify for what we want - return nil - } +// Assuming checkpoints are short-lived (i.e. are resumed +// somewhat quickly, before the page tokens/cursors expire), +// we can just store the page tokens. +type checkpointInfo struct { + LastTweetID string +} - ig := &timeliner.ItemGraph{ - Relations: []timeliner.RawRelation{ - { - FromItemID: t.TweetID, - ToItemID: t.InReplyToStatusID, - Relation: timeliner.RelReplyTo, - }, - }, +// save records the checkpoint. +func (ch *checkpointInfo) save(ctx context.Context) { + gobBytes, err := timeliner.MarshalGob(ch) + if err != nil { + log.Printf("[ERROR][%s] Encoding checkpoint: %v", DataSourceID, err) } - - itemChan <- ig - - return nil + timeliner.Checkpoint(ctx, gobBytes) } -func (c *Client) getOwnerAccount(filename string) (twitterAccount, error) { - var ta twitterAccount - err := archiver.Walk(filename, func(f archiver.File) error { - defer f.Close() - if f.Name() != "account.js" { - return nil - } - - // consume non-JSON preface (JavaScript variable definition) - err := stripPreface(f, accountFilePreface) - if err != nil { - return fmt.Errorf("reading account file preface: %v", err) - } - - var accFile twitterAccountFile - err = json.NewDecoder(f).Decode(&accFile) - if err != nil { - return fmt.Errorf("decoding account file: %v", err) - } - if len(accFile) == 0 { - return fmt.Errorf("account file was empty") - } - - ta = accFile[0].Account - - return archiver.ErrStopWalk - }) - return ta, err +// load decodes the checkpoint. +func (ch *checkpointInfo) load(checkpointGob []byte) { + if len(checkpointGob) == 0 { + return + } + err := timeliner.UnmarshalGob(checkpointGob, ch) + if err != nil { + log.Printf("[ERROR][%s] Decoding checkpoint: %v", DataSourceID, err) + } } -func stripPreface(f io.Reader, preface string) error { - buf := make([]byte, len(preface)) - _, err := io.ReadFull(f, buf) - return err +// maxTweetID returns the higher of the two tweet IDs. +// Errors parsing the strings as integers are ignored. +// Empty string inputs are ignored so the other value +// will win automatically. If both are empty, an empty +// string is returned. +func maxTweetID(id1, id2 string) string { + if id1 == "" { + return id2 + } + if id2 == "" { + return id1 + } + id1int, _ := strconv.ParseInt(id1, 10, 64) + id2int, _ := strconv.ParseInt(id2, 10, 64) + if id1int > id2int { + return id1 + } + return id2 } - -// processFn is a function that processes a tweet. -type processFn func(itemChan chan<- *timeliner.ItemGraph, t tweet, archiveFilename string) error - -// Variable definitions that are intended for -// use with JavaScript but which are of no use -// to us and would break the JSON parser. -const ( - tweetFilePreface = "window.YTD.tweet.part0 =" - accountFilePreface = "window.YTD.account.part0 =" -) diff --git a/itemgraph.go b/itemgraph.go index 662b806..748ef35 100644 --- a/itemgraph.go +++ b/itemgraph.go @@ -145,14 +145,14 @@ const ( // These are the standard relationships that Timeliner // recognizes. Using these known relationships is not -// required, but it makes it easier to translate them +// required, but it makes it easier to translate them to // human-friendly phrases when visualizing the timeline. var ( - RelReplyTo = Relation{Label: "reply_to"} // " is in reply " - RelAttached = Relation{Label: "attached", Bidirectional: true} // " is attached " + RelReplyTo = Relation{Label: "reply_to"} // " is in reply to " + RelAttached = Relation{Label: "attached", Bidirectional: true} // " is attached to " ) -// ItemRow mirrors an item's row in our DB. +// ItemRow has the structure of an item's row in our DB. type ItemRow struct { ID int64 AccountID int64 diff --git a/persons.go b/persons.go index bf980ee..d86864f 100644 --- a/persons.go +++ b/persons.go @@ -27,7 +27,7 @@ func (t *Timeline) getPerson(dataSourceID, userID, name string) (Person, error) if err != nil { return Person{}, fmt.Errorf("getting person ID: %v", err) } - _, err = t.db.Exec(`INSERT INTO person_identities + _, err = t.db.Exec(`INSERT OR IGNORE INTO person_identities (person_id, data_source_id, user_id) VALUES (?, ?, ?)`, p.ID, dataSourceID, userID) if err != nil { diff --git a/ratelimit.go b/ratelimit.go index 0491053..a38f566 100644 --- a/ratelimit.go +++ b/ratelimit.go @@ -23,7 +23,11 @@ func (acc Account) NewRateLimitedRoundTripper(rt http.RoundTripper) http.RoundTr if !ok && acc.ds.RateLimit.RequestsPerHour > 0 { secondsBetweenReqs := 60.0 / (float64(acc.ds.RateLimit.RequestsPerHour) / 60.0) - reqInterval := time.Duration(secondsBetweenReqs) * time.Second + millisBetweenReqs := secondsBetweenReqs * 1000.0 + reqInterval := time.Duration(millisBetweenReqs) * time.Millisecond + if reqInterval < minInterval { + reqInterval = minInterval + } rl.ticker = time.NewTicker(reqInterval) rl.token = make(chan struct{}, rl.BurstSize) @@ -57,3 +61,5 @@ func (rt rateLimitedRoundTripper) RoundTrip(req *http.Request) (*http.Response, } var rateLimiters = make(map[string]RateLimit) + +const minInterval = 100 * time.Millisecond diff --git a/timeliner.go b/timeliner.go index 4510a6f..c602d13 100644 --- a/timeliner.go +++ b/timeliner.go @@ -116,11 +116,11 @@ type CheckpointFn func(checkpoint []byte) error // with the provided context. It overwrites any previous // checkpoint. Any errors are logged. func Checkpoint(ctx context.Context, checkpoint []byte) { - wc, ok := ctx.Value(wrappedClientCtxKey).(WrappedClient) + wc, ok := ctx.Value(wrappedClientCtxKey).(*WrappedClient) if !ok { - log.Printf("[ERROR][%s/%s] Checkpoint function not available; got %T", - wc.ds.ID, wc.acc.UserID, wc) + log.Printf("[ERROR][%s/%s] Checkpoint function not available; got type %T (%#v)", + wc.ds.ID, wc.acc.UserID, wc, wc) return } diff --git a/wrappedclient.go b/wrappedclient.go index 4d0d569..31363b8 100644 --- a/wrappedclient.go +++ b/wrappedclient.go @@ -35,12 +35,14 @@ func (wc *WrappedClient) GetLatest(ctx context.Context) error { // get date of most recent item for this account var mostRecent int64 - err := wc.tl.db.QueryRow(`SELECT timestamp FROM items + var mostRecentOriginalID string + err := wc.tl.db.QueryRow(`SELECT timestamp, original_id + FROM items WHERE account_id=? ORDER BY timestamp DESC - LIMIT 1`, wc.acc.ID).Scan(&mostRecent) + LIMIT 1`, wc.acc.ID).Scan(&mostRecent, &mostRecentOriginalID) if err != nil && err != sql.ErrNoRows { - return fmt.Errorf("getting most recent item timestamp: %v", err) + return fmt.Errorf("getting most recent item: %v", err) } // constrain the pull to the recent timeframe @@ -49,6 +51,9 @@ func (wc *WrappedClient) GetLatest(ctx context.Context) error { ts := time.Unix(mostRecent, 0) timeframe.Since = &ts } + if mostRecentOriginalID != "" { + timeframe.SinceItemID = &mostRecentOriginalID + } wg, ch := wc.beginProcessing(concurrentCuckoo{}, false, false)