Skip to content

Commit

Permalink
Merge branch 'production' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
paramsingh committed Mar 28, 2020
2 parents 4c1d280 + c91d7d4 commit 2d023fd
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 15 deletions.
6 changes: 4 additions & 2 deletions docker/docker-compose.integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ services:
image: redis:5.0.3

influx:
image: influxdb:1.2.4
image: influxdb:1.7.9
environment:
INFLUXDB_REPORTING_DISABLED: 'true'
INFLUXDB_META_LOGGING_ENABLED: 'false'
INFLUXDB_DATA_QUERY_LOG_ENABLED: 'false'
INFLUXDB_DATA_WAL_LOGGING_ENABLED: 'false'
INFLUXDB_DATA_DATA_LOGGING_ENABLED: 'false'
INFLUXDB_HTTP_LOG_ENABLED: 'true'
INFLUXDB_HTTP_LOG_ENABLED: 'false'
INFLUXDB_CONTINUOUS_QUERIES_LOG_ENABLED: 'false'
INFLUXDB_DATA_INDEX_VERSION: 'tsi1'
INFLUXDB_DATA_MAX_INDEX_LOG_FILE_SIZE: '5m'

listenbrainz:
build:
Expand Down
6 changes: 4 additions & 2 deletions docker/docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ services:
image: redis:5.0.3

influx:
image: influxdb:1.2.4
image: influxdb:1.7.9
environment:
INFLUXDB_REPORTING_DISABLED: 'true'
INFLUXDB_META_LOGGING_ENABLED: 'false'
INFLUXDB_DATA_QUERY_LOG_ENABLED: 'false'
INFLUXDB_DATA_WAL_LOGGING_ENABLED: 'false'
INFLUXDB_DATA_DATA_LOGGING_ENABLED: 'false'
INFLUXDB_HTTP_LOG_ENABLED: 'true'
INFLUXDB_HTTP_LOG_ENABLED: 'false'
INFLUXDB_CONTINUOUS_QUERIES_LOG_ENABLED: 'false'
INFLUXDB_DATA_INDEX_VERSION: 'tsi1'
INFLUXDB_DATA_MAX_INDEX_LOG_FILE_SIZE: '5m'

rabbitmq:
image: rabbitmq:3.6.5
Expand Down
6 changes: 4 additions & 2 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ services:
- redis:/data:z

influx:
image: influxdb:1.2.4
image: influxdb:1.7.9
volumes:
- influxdb:/var/lib/influxdb:z
environment:
Expand All @@ -32,8 +32,10 @@ services:
INFLUXDB_DATA_QUERY_LOG_ENABLED: 'false'
INFLUXDB_DATA_WAL_LOGGING_ENABLED: 'false'
INFLUXDB_DATA_DATA_LOGGING_ENABLED: 'false'
INFLUXDB_HTTP_LOG_ENABLED: 'true'
INFLUXDB_HTTP_LOG_ENABLED: 'false'
INFLUXDB_CONTINUOUS_QUERIES_LOG_ENABLED: 'false'
INFLUXDB_DATA_INDEX_VERSION: 'tsi1'
INFLUXDB_DATA_MAX_INDEX_LOG_FILE_SIZE: '5m'

rabbitmq:
image: rabbitmq:3.6.5
Expand Down
27 changes: 21 additions & 6 deletions listenbrainz/influx_writer/influx_writer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#!/usr/bin/env python3

import json
import sys
import os
import pika
Expand Down Expand Up @@ -37,10 +36,18 @@ def __init__(self):
self.unique_ch = None
self.redis_listenstore = None

self.callback_t0 = 0.0
self.cb_delta = 0.0


def callback(self, ch, method, properties, body):
self.cb_delta = time() - self.callback_t0

t0 = time()
listens = ujson.loads(body)
t1 = time()
ret = self.write(listens)
t2 = time()
if not ret:
return ret

Expand All @@ -54,7 +61,9 @@ def callback(self, ch, method, properties, body):
count = len(listens)

self._collect_and_log_stats(count, call_method=self.ls.update_listen_counts)
t3 = time()

self.callback_t0 = time()
return ret


Expand Down Expand Up @@ -100,7 +109,7 @@ def insert_to_listenstore(self, data, retries=5):
except (InfluxDBServerError, InfluxDBClientError, ValueError, ConnectionError) as e:
error_message = 'Unable to insert bad listen to listenstore: {error}, listen={json}'
influx_dict = data[0].to_influx(get_measurement_name(data[0].user_name))
current_app.logger.error(error_message.format(error=str(e), json=json.dumps(influx_dict, indent=3)), exc_info=True)
current_app.logger.error(error_message.format(error=str(e), json=ujson.dumps(influx_dict, indent=3)), exc_info=True)
return 0
else:
slice_index = len(data) // 2
Expand All @@ -127,7 +136,7 @@ def write(self, listen_dicts):

# if the timestamp is illegal, don't use it for ranges
if t.bit_length() > 32:
current_app.logger.error("timestamp %d is too large. listen: %s", t, json.dumps(listen, indent=3))
current_app.logger.error("timestamp %d is too large. listen: %s", t, ujson.dumps(listen, indent=3))
continue

if user_name not in users:
Expand Down Expand Up @@ -163,17 +172,21 @@ def write(self, listen_dicts):

while True:
try:
t0 = time()
results = self.influx.query(query)
t1 = time()
break
except Exception as e:
current_app.logger.warn('Could not query influx, trying again: %s', str(e), exc_info=True)
sleep(3)

# collect all the timestamps for this given time range.

t2 = time()
timestamps = defaultdict(list) # dict of list of listens indexed by timestamp
for result in results.get_points(measurement=get_measurement_name(user_name)):
timestamps[convert_to_unix_timestamp(result['time'])].append(result)
t3 = time()

for listen in users[user_name]['listens']:
# Check if a listen with the same timestamp and recording msid is already present in
Expand Down Expand Up @@ -204,11 +217,13 @@ def write(self, listen_dicts):
'recording_msid': recording_msid
})

t0 = time()
t4 = time()
submitted_count = self.insert_to_listenstore(submit)
self.time += time() - t0
t5 = time()
self.time += t5 - t0

current_app.logger.info("dups: %d, unique: %d, submitted: %d" % (duplicate_count, unique_count, submitted_count))
# current_app.logger.info("dups: %d, unique: %d, submitted: %d" % (duplicate_count, unique_count, submitted_count))
current_app.logger.info("delta: %.3f read: %.3f fetch: %.3f save: %.3f (%d, %d, %d)" % (self.cb_delta, t1 - t0, t3 - t2, t5 - t4, duplicate_count, unique_count, submitted_count))
if not unique_count:
return True

Expand Down
5 changes: 2 additions & 3 deletions listenbrainz/spotify_updater/spotify_read_listens.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ def process_one_user(user):
spotify.update_latest_listened_at(user.user_id, latest_listened_at)
spotify.update_last_updated(user.user_id)

current_app.logger.info('imported %d listens for %s' % (len(listens), str(user)))


def process_all_spotify_users():
""" Get a batch of users to be processed and import their Spotify plays.
Expand All @@ -315,7 +317,6 @@ def process_all_spotify_users():
failure = 0
for u in users:
t = time.time()
current_app.logger.info('Importing spotify listens for user %s', str(u))
try:
process_one_user(u)
success += 1
Expand All @@ -336,8 +337,6 @@ def process_all_spotify_users():
current_app.logger.critical('spotify_reader could not import listens: %s', str(e), exc_info=True)
failure += 1

current_app.logger.info('Took a total of %.2f seconds to process user %s', time.time() - t, str(u))

current_app.logger.info('Processed %d users successfully!', success)
current_app.logger.info('Encountered errors while processing %d users.', failure)
return success, failure
Expand Down

0 comments on commit 2d023fd

Please sign in to comment.