Skip to content

Commit

Permalink
twitter: Add preliminary support for the Twitter API as a data source
Browse files Browse the repository at this point in the history
Plus several bug fixes
  • Loading branch information
mholt committed Feb 11, 2019
1 parent 210323b commit 0401be2
Show file tree
Hide file tree
Showing 11 changed files with 779 additions and 329 deletions.
2 changes: 1 addition & 1 deletion cmd/timeliner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func init() {
flag.BoolVar(&reprocess, "reprocess", reprocess, "Reprocess every item that has not been modified locally (download-all or import only)")

flag.BoolVar(&twitterRetweets, "twitter-retweets", twitterRetweets, "Twitter: include retweets")
flag.BoolVar(&twitterReplies, "twitter-replies", twitterReplies, "Twitter: include replies")
flag.BoolVar(&twitterReplies, "twitter-replies", twitterReplies, "Twitter: include replies that are not just replies to self")
}

func main() {
Expand Down
35 changes: 26 additions & 9 deletions datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ type Client interface {
// relationships can be stored. If the relationships are not
// discovered until later, that's OK: item processing is
// idempotent, so repeating an item from earlier will have no
// adverse effects.
// adverse effects (this is possible because a unique ID is
// required for each item).
//
// Implementations must honor the context's cancellation. If
// ctx.Done() is closed, the function should return. Typically,
Expand All @@ -169,10 +170,15 @@ type Client interface {
// filename is not specified but required, an error should be
// returned.
//
// opt.Timeframe consists of two optional timestamp values.
// If set, item listings should be bounded in the respective
// direction by that timestamp. If timeframes are not supported,
// this should be documented, but an error need not be returned.
// opt.Timeframe consists of two optional timestamp and/or item
// ID values. If set, item listings should be bounded in the
// respective direction by that timestamp / item ID. (Items
// are assumed to be part of a chronology; both timestamp and
// item ID *may be* provided, when possible, to accommodate
// data sources which do not constrain by timestamp but which
// do by item ID instead.) It should be documented if timeframes
// are not supported, but an error need not be returned if it
// cannot be honored.
//
// opt.Checkpoint consists of the last checkpoint for this
// account if the last call to ListItems did not finish and
Expand All @@ -186,11 +192,22 @@ type Client interface {
ListItems(ctx context.Context, itemChan chan<- *ItemGraph, opt Options) error
}

// Timeframe represents a start and end time, where
// either value could be nil, meaning unbounded in
// that direction.
// Timeframe represents a start and end time and/or
// a start and end item, where either value could be
// nil which means unbounded in that direction.
// When items are used as the timeframe boundaries,
// the ItemID fields will be populated. It is not
// guaranteed that any particular field will be set
// or unset just because other fields are set or unset.
// However, if both Since or both Until fields are
// set, that means the timestamp and items are
// correlated; i.e. the Since timestamp is (approx.)
// that of the item ID. Or, put another way: there
// will never be no conflicts among the fields which
// are non-nil.
type Timeframe struct {
Since, Until *time.Time
Since, Until *time.Time
SinceItemID, UntilItemID *string
}

var dataSources = make(map[string]DataSource)
149 changes: 149 additions & 0 deletions datasources/twitter/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package twitter

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"

"github.com/mholt/timeliner"
)

func (c *Client) getFromAPI(ctx context.Context, itemChan chan<- *timeliner.ItemGraph, opt timeliner.Options) error {
// load any previous checkpoint
c.checkpoint.load(opt.Checkpoint)

// get account owner information
cleanedScreenName := strings.TrimPrefix(c.acc.UserID, "@")
ownerAccount, err := c.getOwnerAccountFromAPI(cleanedScreenName)
if err != nil {
return fmt.Errorf("getting user account information for @%s: %v", cleanedScreenName, err)
}
c.ownerAccount = ownerAccount

// get the starting bounds of this operation
var maxTweet, minTweet string
if opt.Timeframe.SinceItemID != nil {
minTweet = *opt.Timeframe.SinceItemID
}
if c.checkpoint.LastTweetID != "" {
// by default, start off at the last checkpoint
maxTweet = c.checkpoint.LastTweetID
if opt.Timeframe.UntilItemID != nil {
// if both a timeframe UntilItemID and a checkpoint are set,
// we will choose the one with a tweet ID that is higher,
// meaning more recent, to avoid potentially skipping
// a chunk of the timeline
maxTweet = maxTweetID(c.checkpoint.LastTweetID, *opt.Timeframe.UntilItemID)
}
}

for {
select {
case <-ctx.Done():
return nil
default:
tweets, err := c.nextPageOfTweetsFromAPI(maxTweet, minTweet)
if err != nil {
return fmt.Errorf("getting next page of tweets: %v", err)
}

// TODO: Is this the right finish criteria?
if len(tweets) == 0 {
return nil
}

for _, t := range tweets {
skip, err := c.prepareTweet(&t, "api")
if err != nil {
return fmt.Errorf("preparing tweet: %v", err)
}
if skip {
continue
}

c.processTweet(itemChan, t, "")
}

// since max_id is inclusive, subtract 1 from the tweet ID
// https://developer.twitter.com/en/docs/tweets/timelines/guides/working-with-timelines
nextTweetID := tweets[len(tweets)-1].TweetID - 1
c.checkpoint.LastTweetID = strconv.FormatInt(int64(nextTweetID), 10)
c.checkpoint.save(ctx)

// decrease maxTweet to get the next page on next iteration
maxTweet = c.checkpoint.LastTweetID
}
}
}

// nextPageOfTweetsFromAPI returns the next page of tweets starting at maxTweet
// and going for a full page or until minTweet, whichever comes first. Generally,
// iterating over this function will involve decreasing maxTweet and leaving
// minTweet the same, if set at all (maxTweet = "until", minTweet = "since").
// Either or both can be empty strings, for no boundaries. This function returns
// at least 0 tweets (signaling done, I think) or up to a full page of tweets.
func (c *Client) nextPageOfTweetsFromAPI(maxTweet, minTweet string) ([]tweet, error) {
q := url.Values{
"user_id": {c.ownerAccount.id()}, // TODO
"count": {"200"},
"tweet_mode": {"extended"}, // https://developer.twitter.com/en/docs/tweets/tweet-updates
"exclude_replies": {"false"}, // always include replies in case it's a self-reply; we can filter all others
"include_rts": {"false"},
}
if maxTweet != "" {
q.Set("max_id", maxTweet)
}
if minTweet != "" {
q.Set("since_id", minTweet)
}
if c.Retweets {
q.Set("include_rts", "true")
}

resp, err := c.HTTPClient.Get("https://api.twitter.com/1.1/statuses/user_timeline.json?" + q.Encode())
if err != nil {
return nil, fmt.Errorf("performing API request: %v", err)
}
defer resp.Body.Close()

// TODO: handle HTTP errors, esp. rate limiting, a lot better
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("HTTP error: %s", resp.Status)
}

var tweets []tweet
err = json.NewDecoder(resp.Body).Decode(&tweets)
if err != nil {
return nil, fmt.Errorf("reading response body: %v", err)
}

return tweets, nil
}

func (c *Client) getOwnerAccountFromAPI(screenName string) (twitterAccount, error) {
var ta twitterAccount

q := url.Values{"screen_name": {screenName}}

resp, err := c.HTTPClient.Get("https://api.twitter.com/1.1/users/show.json?" + q.Encode())
if err != nil {
return ta, fmt.Errorf("performing API request: %v", err)
}
defer resp.Body.Close()

// TODO: handle HTTP errors, esp. rate limiting, a lot better
if resp.StatusCode != http.StatusOK {
return ta, fmt.Errorf("HTTP error: %s", resp.Status)
}

err = json.NewDecoder(resp.Body).Decode(&ta)
if err != nil {
return ta, fmt.Errorf("reading response body: %v", err)
}

return ta, nil
}
168 changes: 168 additions & 0 deletions datasources/twitter/archives.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package twitter

import (
"encoding/json"
"fmt"
"io"

"github.com/mholt/archiver"
"github.com/mholt/timeliner"
)

func (c *Client) getFromArchiveFile(itemChan chan<- *timeliner.ItemGraph, opt timeliner.Options) error {
// load the user's account ID
var err error
c.ownerAccount, err = c.getOwnerAccountFromArchive(opt.Filename)
if err != nil {
return fmt.Errorf("unable to get user account ID: %v", err)
}

// first pass - add tweets to timeline
err = c.processArchive(opt.Filename, itemChan, c.processTweet)
if err != nil {
return fmt.Errorf("processing tweets: %v", err)
}

// second pass - add tweet relationships to timeline
err = c.processArchive(opt.Filename, itemChan, c.processReplyRelationFromArchive)
if err != nil {
return fmt.Errorf("processing tweets: %v", err)
}

return nil
}

func (c *Client) processArchive(archiveFilename string, itemChan chan<- *timeliner.ItemGraph, processFunc archiveProcessFn) error {
err := archiver.Walk(archiveFilename, func(f archiver.File) error {
defer f.Close()
if f.Name() != "tweet.js" {
return nil
}

// consume non-JSON preface (JavaScript variable definition)
err := stripPreface(f, tweetFilePreface)
if err != nil {
return fmt.Errorf("reading tweet file preface: %v", err)
}

err = c.processTweetsFromArchive(itemChan, f, archiveFilename, processFunc)
if err != nil {
return fmt.Errorf("processing tweet file: %v", err)
}

return archiver.ErrStopWalk
})
if err != nil {
return fmt.Errorf("walking archive file %s: %v", archiveFilename, err)
}

return nil
}

func (c *Client) processTweetsFromArchive(itemChan chan<- *timeliner.ItemGraph, f io.Reader,
archiveFilename string, processFunc archiveProcessFn) error {

dec := json.NewDecoder(f)

// read array opening bracket '['
_, err := dec.Token()
if err != nil {
return fmt.Errorf("decoding opening token: %v", err)
}

for dec.More() {
var t tweet
err := dec.Decode(&t)
if err != nil {
return fmt.Errorf("decoding tweet element: %v", err)
}

skip, err := c.prepareTweet(&t, "archive")
if err != nil {
return fmt.Errorf("preparing tweet: %v", err)
}
if skip {
continue
}

err = processFunc(itemChan, t, archiveFilename)
if err != nil {
return fmt.Errorf("processing tweet: %v", err)
}
}

return nil
}

func (c *Client) processReplyRelationFromArchive(itemChan chan<- *timeliner.ItemGraph, t tweet, archiveFilename string) error {
if t.InReplyToStatusIDStr == "" {
// current tweet is not a reply, so no relationship to add
return nil
}
if !c.topLevelTweets.Lookup([]byte(t.InReplyToStatusIDStr)) {
// reply is not to a top-level tweet by self, so doesn't qualify for what we want
return nil
}

ig := &timeliner.ItemGraph{
Relations: []timeliner.RawRelation{
{
FromItemID: t.TweetIDStr,
ToItemID: t.InReplyToStatusIDStr,
Relation: timeliner.RelReplyTo,
},
},
}

itemChan <- ig

return nil
}

func (c *Client) getOwnerAccountFromArchive(filename string) (twitterAccount, error) {
var ta twitterAccount
err := archiver.Walk(filename, func(f archiver.File) error {
defer f.Close()
if f.Name() != "account.js" {
return nil
}

// consume non-JSON preface (JavaScript variable definition)
err := stripPreface(f, accountFilePreface)
if err != nil {
return fmt.Errorf("reading account file preface: %v", err)
}

var accFile twitterAccountFile
err = json.NewDecoder(f).Decode(&accFile)
if err != nil {
return fmt.Errorf("decoding account file: %v", err)
}
if len(accFile) == 0 {
return fmt.Errorf("account file was empty")
}

ta = accFile[0].Account

return archiver.ErrStopWalk
})
return ta, err
}

func stripPreface(f io.Reader, preface string) error {
buf := make([]byte, len(preface))
_, err := io.ReadFull(f, buf)
return err
}

// archiveProcessFn is a function that processes a
// tweet from a Twitter export archive.
type archiveProcessFn func(itemChan chan<- *timeliner.ItemGraph, t tweet, archiveFilename string) error

// Variable definitions that are intended for
// use with JavaScript but which are of no use
// to us and would break the JSON parser.
const (
tweetFilePreface = "window.YTD.tweet.part0 ="
accountFilePreface = "window.YTD.account.part0 ="
)
Loading

0 comments on commit 0401be2

Please sign in to comment.