diff --git a/docker/docker-compose.integration.yml b/docker/docker-compose.integration.yml index 9a9d06ebb9..c7d57bf22a 100644 --- a/docker/docker-compose.integration.yml +++ b/docker/docker-compose.integration.yml @@ -14,15 +14,17 @@ services: image: redis:5.0.3 influx: - image: influxdb:1.2.4 + image: influxdb:1.7.9 environment: INFLUXDB_REPORTING_DISABLED: 'true' INFLUXDB_META_LOGGING_ENABLED: 'false' INFLUXDB_DATA_QUERY_LOG_ENABLED: 'false' INFLUXDB_DATA_WAL_LOGGING_ENABLED: 'false' INFLUXDB_DATA_DATA_LOGGING_ENABLED: 'false' - INFLUXDB_HTTP_LOG_ENABLED: 'true' + INFLUXDB_HTTP_LOG_ENABLED: 'false' INFLUXDB_CONTINUOUS_QUERIES_LOG_ENABLED: 'false' + INFLUXDB_DATA_INDEX_VERSION: 'tsi1' + INFLUXDB_DATA_MAX_INDEX_LOG_FILE_SIZE: '5m' listenbrainz: build: diff --git a/docker/docker-compose.test.yml b/docker/docker-compose.test.yml index 040f447eca..635ab3c94c 100644 --- a/docker/docker-compose.test.yml +++ b/docker/docker-compose.test.yml @@ -14,15 +14,17 @@ services: image: redis:5.0.3 influx: - image: influxdb:1.2.4 + image: influxdb:1.7.9 environment: INFLUXDB_REPORTING_DISABLED: 'true' INFLUXDB_META_LOGGING_ENABLED: 'false' INFLUXDB_DATA_QUERY_LOG_ENABLED: 'false' INFLUXDB_DATA_WAL_LOGGING_ENABLED: 'false' INFLUXDB_DATA_DATA_LOGGING_ENABLED: 'false' - INFLUXDB_HTTP_LOG_ENABLED: 'true' + INFLUXDB_HTTP_LOG_ENABLED: 'false' INFLUXDB_CONTINUOUS_QUERIES_LOG_ENABLED: 'false' + INFLUXDB_DATA_INDEX_VERSION: 'tsi1' + INFLUXDB_DATA_MAX_INDEX_LOG_FILE_SIZE: '5m' rabbitmq: image: rabbitmq:3.6.5 diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 2e4e23fdbc..d6b32d6dd8 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -23,7 +23,7 @@ services: - redis:/data:z influx: - image: influxdb:1.2.4 + image: influxdb:1.7.9 volumes: - influxdb:/var/lib/influxdb:z environment: @@ -32,8 +32,10 @@ services: INFLUXDB_DATA_QUERY_LOG_ENABLED: 'false' INFLUXDB_DATA_WAL_LOGGING_ENABLED: 'false' INFLUXDB_DATA_DATA_LOGGING_ENABLED: 'false' - INFLUXDB_HTTP_LOG_ENABLED: 'true' + INFLUXDB_HTTP_LOG_ENABLED: 'false' INFLUXDB_CONTINUOUS_QUERIES_LOG_ENABLED: 'false' + INFLUXDB_DATA_INDEX_VERSION: 'tsi1' + INFLUXDB_DATA_MAX_INDEX_LOG_FILE_SIZE: '5m' rabbitmq: image: rabbitmq:3.6.5 diff --git a/listenbrainz/influx_writer/influx_writer.py b/listenbrainz/influx_writer/influx_writer.py index c9e9dd20ed..9934277b12 100755 --- a/listenbrainz/influx_writer/influx_writer.py +++ b/listenbrainz/influx_writer/influx_writer.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 -import json import sys import os import pika @@ -37,10 +36,18 @@ def __init__(self): self.unique_ch = None self.redis_listenstore = None + self.callback_t0 = 0.0 + self.cb_delta = 0.0 + def callback(self, ch, method, properties, body): + self.cb_delta = time() - self.callback_t0 + + t0 = time() listens = ujson.loads(body) + t1 = time() ret = self.write(listens) + t2 = time() if not ret: return ret @@ -54,7 +61,9 @@ def callback(self, ch, method, properties, body): count = len(listens) self._collect_and_log_stats(count, call_method=self.ls.update_listen_counts) + t3 = time() + self.callback_t0 = time() return ret @@ -100,7 +109,7 @@ def insert_to_listenstore(self, data, retries=5): except (InfluxDBServerError, InfluxDBClientError, ValueError, ConnectionError) as e: error_message = 'Unable to insert bad listen to listenstore: {error}, listen={json}' influx_dict = data[0].to_influx(get_measurement_name(data[0].user_name)) - current_app.logger.error(error_message.format(error=str(e), json=json.dumps(influx_dict, indent=3)), exc_info=True) + current_app.logger.error(error_message.format(error=str(e), json=ujson.dumps(influx_dict, indent=3)), exc_info=True) return 0 else: slice_index = len(data) // 2 @@ -127,7 +136,7 @@ def write(self, listen_dicts): # if the timestamp is illegal, don't use it for ranges if t.bit_length() > 32: - current_app.logger.error("timestamp %d is too large. listen: %s", t, json.dumps(listen, indent=3)) + current_app.logger.error("timestamp %d is too large. listen: %s", t, ujson.dumps(listen, indent=3)) continue if user_name not in users: @@ -163,7 +172,9 @@ def write(self, listen_dicts): while True: try: + t0 = time() results = self.influx.query(query) + t1 = time() break except Exception as e: current_app.logger.warn('Could not query influx, trying again: %s', str(e), exc_info=True) @@ -171,9 +182,11 @@ def write(self, listen_dicts): # collect all the timestamps for this given time range. + t2 = time() timestamps = defaultdict(list) # dict of list of listens indexed by timestamp for result in results.get_points(measurement=get_measurement_name(user_name)): timestamps[convert_to_unix_timestamp(result['time'])].append(result) + t3 = time() for listen in users[user_name]['listens']: # Check if a listen with the same timestamp and recording msid is already present in @@ -204,11 +217,13 @@ def write(self, listen_dicts): 'recording_msid': recording_msid }) - t0 = time() + t4 = time() submitted_count = self.insert_to_listenstore(submit) - self.time += time() - t0 + t5 = time() + self.time += t5 - t0 - current_app.logger.info("dups: %d, unique: %d, submitted: %d" % (duplicate_count, unique_count, submitted_count)) +# current_app.logger.info("dups: %d, unique: %d, submitted: %d" % (duplicate_count, unique_count, submitted_count)) + current_app.logger.info("delta: %.3f read: %.3f fetch: %.3f save: %.3f (%d, %d, %d)" % (self.cb_delta, t1 - t0, t3 - t2, t5 - t4, duplicate_count, unique_count, submitted_count)) if not unique_count: return True diff --git a/listenbrainz/spotify_updater/spotify_read_listens.py b/listenbrainz/spotify_updater/spotify_read_listens.py index 51886593b8..8bf1ba0a52 100644 --- a/listenbrainz/spotify_updater/spotify_read_listens.py +++ b/listenbrainz/spotify_updater/spotify_read_listens.py @@ -292,6 +292,8 @@ def process_one_user(user): spotify.update_latest_listened_at(user.user_id, latest_listened_at) spotify.update_last_updated(user.user_id) + current_app.logger.info('imported %d listens for %s' % (len(listens), str(user))) + def process_all_spotify_users(): """ Get a batch of users to be processed and import their Spotify plays. @@ -315,7 +317,6 @@ def process_all_spotify_users(): failure = 0 for u in users: t = time.time() - current_app.logger.info('Importing spotify listens for user %s', str(u)) try: process_one_user(u) success += 1 @@ -336,8 +337,6 @@ def process_all_spotify_users(): current_app.logger.critical('spotify_reader could not import listens: %s', str(e), exc_info=True) failure += 1 - current_app.logger.info('Took a total of %.2f seconds to process user %s', time.time() - t, str(u)) - current_app.logger.info('Processed %d users successfully!', success) current_app.logger.info('Encountered errors while processing %d users.', failure) return success, failure