Skip to content

Commit

Permalink
Catch ChunkedEncodingError in stream as well as sample
Browse files Browse the repository at this point in the history
This is a refactor of stream and sample so that ChunkedEncodingError is
caught in both. Also checking for disconnect errors is done in both now
instead of only sample. Logging was enhanced to include the ids for
tweets that have been archived, as it was in twarc1. See #471 (again)
for background details about ChunkedEncodingError.
  • Loading branch information
edsu committed Jul 2, 2021
1 parent cea8138 commit 6223f85
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 82 deletions.
105 changes: 38 additions & 67 deletions twarc/client2.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ def __init__(
access_token_secret=None,
bearer_token=None,
connection_errors=0,
http_errors=0,
metadata=True,
):
"""
Expand Down Expand Up @@ -71,14 +70,11 @@ def __init__(
Bearer Token, can be generated from API keys.
connection_errors (int):
Number of retries for GETs
http_errors (int):
Number of retries for sample stream.
metadata (bool):
Append `__twarc` metadata to results.
"""
self.api_version = "2"
self.connection_errors = connection_errors
self.http_errors = http_errors
self.metadata = metadata
self.bearer_token = None

Expand Down Expand Up @@ -144,7 +140,7 @@ def _search(
made_call = time.monotonic()

for response in self.get_paginated(url, params=params):
# can return without 'data' if there are no results
# can't return without 'data' if there are no results
if "data" in response:
count += len(response["data"])
yield response
Expand Down Expand Up @@ -436,6 +432,7 @@ def lookup_batch(users):
if batch:
yield (lookup_batch(batch))

@catch_chunked_encoding_error
@requires_app_auth
def sample(self, event=None, record_keepalive=False):
"""
Expand All @@ -458,70 +455,30 @@ def sample(self, event=None, record_keepalive=False):
generator[dict]: a generator, dict for each tweet.
"""
url = "https://api.twitter.com/2/tweets/sample/stream"
errors = 0

while True:
try:
log.info("Connecting to V2 sample stream")
resp = self.get(url, params=expansions.EVERYTHING.copy(), stream=True)
errors = 0
for line in resp.iter_lines(chunk_size=512):

# quit & close the stream if the event is set
if event and event.is_set():
log.info("stopping sample")
resp.close()
return

# return the JSON data w/ optional keep-alive
if not line:
log.info("keep-alive")
if record_keepalive:
yield "keep-alive"
continue
else:
data = json.loads(line.decode())
if self.metadata:
data = _append_metadata(data, resp.url)
yield data

# Check for an operational disconnect error in the response
if data.get("errors", []):
for error in data["errors"]:
if (
error.get("disconnect_type")
== "OperationalDisconnect"
):
log.info(
"Received operational disconnect message: "
"This stream has fallen too far behind in "
"processing tweets. Some data may have been "
"lost."
)
# Sleep briefly, then break this get call and
# attempt to reconnect.
time.sleep(5)
break

except requests.exceptions.RequestException as e:
errors += 1
log.error("caught request error %s on %s try", e, errors)

if self.http_errors and errors == self.http_errors:
log.warning("too many errors")
raise e

if (
isinstance(e, requests.exceptions.HTTPError)
and response.status_code == 420
):
if interruptible_sleep(errors * 60, event):
log.info("stopping filter")
return
log.info("Connecting to V2 sample stream")
resp = self.get(url, params=expansions.EVERYTHING.copy(), stream=True)
for line in resp.iter_lines(chunk_size=512):

# quit & close the stream if the event is set
if event and event.is_set():
log.info("stopping sample")
resp.close()
return

# return the JSON data w/ optional keep-alive
if not line:
log.info("keep-alive")
if record_keepalive:
yield "keep-alive"
continue
else:
if interruptible_sleep(errors * 5, event):
log.info("stopping filter")
return
data = json.loads(line.decode())
if self.metadata:
data = _append_metadata(data, resp.url)
yield data
self._check_for_disconnect(data)


@requires_app_auth
def add_stream_rules(self, rules):
Expand Down Expand Up @@ -568,6 +525,7 @@ def delete_stream_rule_ids(self, rule_ids):
url = "https://api.twitter.com/2/tweets/search/stream/rules"
return self.post(url, {"delete": {"ids": rule_ids}}).json()

@catch_chunked_encoding_error
@requires_app_auth
def stream(self, event=None, record_keep_alives=False):
"""
Expand Down Expand Up @@ -610,6 +568,7 @@ def stream(self, event=None, record_keep_alives=False):
data = _append_metadata(data, resp.url)

yield data
self._check_for_disconnect(data)

def _timeline(
self,
Expand Down Expand Up @@ -963,6 +922,17 @@ def id_exists(user):
else:
raise ValueError(f"No such user {user}")


def _check_for_disconnect(self, data):
"""
Look for disconnect errors in a response, and reconnect if found.
"""
for error in data.get("errors", []):
if error.get("disconnect_type") == "OperationalDisconnect":
log.info("Received operational disconnect message, reconnecting")
self.connect()
break


def _ts(dt):
"""
Expand Down Expand Up @@ -1007,3 +977,4 @@ def _append_metadata(result, url):
"""
result["__twarc"] = {"url": url, "version": version, "retrieved_at": _utcnow()}
return result

39 changes: 25 additions & 14 deletions twarc/command2.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from click_config_file import configuration_option

config_provider = ConfigProvider()
log = logging.getLogger("twarc")


@with_plugins(iter_entry_points("twarc.plugins"))
Expand Down Expand Up @@ -66,7 +67,7 @@
"higher with user authentication, but not all endpoints are supported.",
show_default=True,
)
@click.option("--log", default="twarc.log")
@click.option("--log", "-l", "log_file", default="twarc.log")
@click.option("--verbose", is_flag=True, default=False)
@click.option(
"--metadata/--no-metadata",
Expand All @@ -85,7 +86,7 @@ def twarc2(
access_token,
access_token_secret,
bearer_token,
log,
log_file,
metadata,
app_auth,
verbose,
Expand All @@ -94,12 +95,12 @@ def twarc2(
Collect data from the Twitter V2 API.
"""
logging.basicConfig(
filename=log,
filename=log_file,
level=logging.DEBUG if verbose else logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)

logging.info("using config %s", config_provider.file_path)
log.info("using config %s", config_provider.file_path)

if bearer_token or (consumer_key and consumer_secret):
if app_auth and (bearer_token or (consumer_key and consumer_secret)):
Expand Down Expand Up @@ -152,11 +153,11 @@ def configure(ctx):
"""

config_file = config_provider.file_path
logging.info("creating config file: %s", config_file)
log.info("creating config file: %s", config_file)

config_dir = pathlib.Path(config_file).parent
if not config_dir.is_dir():
logging.info("creating config directory: %s", config_dir)
log.info("creating config directory: %s", config_dir)
config_dir.mkdir(parents=True)

keys = handshake()
Expand Down Expand Up @@ -255,6 +256,10 @@ def search(
query, since_id, until_id, start_time, end_time, max_results
):
_write(result, outfile)

tweet_ids = [t['id'] for t in result.get("data", [])]
log.info("archived %s", ','.join(tweet_ids))

count += len(result["data"])
if limit != 0 and count >= limit:
break
Expand Down Expand Up @@ -436,6 +441,8 @@ def sample(T, outfile, limit):
if limit != 0 and count >= limit:
event.set()
_write(result, outfile)
if result:
log.info("archived %s", result["data"]["id"])


@twarc2.command("hydrate")
Expand All @@ -449,6 +456,8 @@ def hydrate(T, infile, outfile):
"""
for result in T.tweet_lookup(infile):
_write(result, outfile)
tweet_ids = [t["id"] for t in result.get("data", [])]
log.info("archived %s", ','.join(tweet_ids))


@twarc2.command("users")
Expand Down Expand Up @@ -620,7 +629,7 @@ def timelines(
line_count += 1
line = line.strip()
if line == "":
logging.warn("skipping blank line on line %s", line_count)
log.warn("skipping blank line on line %s", line_count)
continue

users = None
Expand All @@ -634,7 +643,7 @@ def timelines(
if isinstance(json_data, str) and json_data:
users = set([json_data])
else:
logging.warn(
log.warn(
"ignored line %s which didn't contain users", line_count
)
continue
Expand All @@ -657,7 +666,7 @@ def timelines(

# only process a given user once
if user in seen:
logging.info("already processed %s, skipping", user)
log.info("already processed %s, skipping", user)
continue
seen.add(user)

Expand Down Expand Up @@ -802,24 +811,24 @@ def f():
for conv_id in conv_ids:

if conv_id in seen:
logging.info(f"already fetched conversation_id {conv_id}")
log.info(f"already fetched conversation_id {conv_id}")
seen.add(conv_id)

conv_count = 0

logging.info(f"fetching conversation {conv_id}")
log.info(f"fetching conversation {conv_id}")
for result in search(f"conversation_id:{conv_id}"):
_write(result, outfile, False)

count += len(result["data"])
if limit != 0 and count >= limit:
logging.info(f"reached tweet limit of {limit}")
log.info(f"reached tweet limit of {limit}")
stop = True
break

conv_count += len(result["data"])
if conversation_limit != 0 and conv_count >= conversation_limit:
logging.info(f"reached conversation limit {conversation_limit}")
log.info(f"reached conversation limit {conversation_limit}")
break


Expand Down Expand Up @@ -867,9 +876,11 @@ def stream(T, outfile, limit):
for result in T.stream(event=event):
count += 1
if limit != 0 and count == limit:
logging.info(f"reached limit {limit}")
log.info(f"reached limit {limit}")
event.set()
_write(result, outfile)
if "data" in result:
log.info("archived %s", result["data"]["id"])


@twarc2.group()
Expand Down
1 change: 1 addition & 0 deletions twarc/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,4 @@ def new_f(self, *args, **kwargs):
return f(self, *args, **kwargs)

return new_f

2 changes: 1 addition & 1 deletion twarc/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "2.3.3"
version = "2.3.4"

0 comments on commit 6223f85

Please sign in to comment.