Skip to content

Commit

Permalink
LB-639: Send only top 1000 recordings for each user (#936)
Browse files Browse the repository at this point in the history
* Send only top 750 entities

* Add tests

* Improve logging on spark side

* Fix incorrect commit
  • Loading branch information
ishaanshah authored Jul 14, 2020
1 parent aa00c0c commit 7c744e3
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 4 deletions.
1 change: 1 addition & 0 deletions listenbrainz/webserver/views/stats_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ def get_recording(user_name):
.. note::
- This endpoint is currently in beta
- We only calculate the top 1000 all_time recordings
- ``artist_mbids``, ``artist_msid``, ``release_name``, ``release_mbid``, ``release_msid``,
``recording_mbid`` and ``recording_msid`` are optional fields and may not be present in all the responses
Expand Down
20 changes: 16 additions & 4 deletions listenbrainz_spark/request_consumer/request_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,27 @@ def get_result(self, request):
try:
return query_handler(**params)
except TypeError as e:
current_app.logger.error("TypeError in the query handler for query '%s', maybe bad params. Error: %s", query, str(e), exc_info=True)
current_app.logger.error(
"TypeError in the query handler for query '%s', maybe bad params. Error: %s", query, str(e), exc_info=True)
return None
except Exception as e:
current_app.logger.error("Error in the query handler for query '%s': %s", query, str(e), exc_info=True)
return None

def push_to_result_queue(self, messages):
current_app.logger.debug("Pushing result to RabbitMQ...")
num_of_messages = 0
avg_size_of_message = 0
for message in messages:
num_of_messages += 1
body = json.dumps(message)
avg_size_of_message += len(body)
while message is not None:
try:
self.result_channel.basic_publish(
exchange=current_app.config['SPARK_RESULT_EXCHANGE'],
routing_key='',
body=json.dumps(message),
body=body,
properties=pika.BasicProperties(delivery_mode=2,),
)
break
Expand All @@ -80,7 +86,12 @@ def push_to_result_queue(self, messages):
self.rabbitmq.close()
self.connect_to_rabbitmq()
self.init_rabbitmq_channels()
current_app.logger.debug("Done!")

avg_size_of_message //= num_of_messages

current_app.logger.info("Done!")
current_app.logger.info("Number of messages sent: {}".format(num_of_messages))
current_app.logger.info("Average size of message: {} bytes".format(avg_size_of_message))

def callback(self, channel, method, properties, body):
request = json.loads(body.decode('utf-8'))
Expand Down Expand Up @@ -139,7 +150,8 @@ def run(self):
continue
self.rabbitmq.close()
except Py4JJavaError as e:
current_app.logger.critical("Critical: JAVA error in spark-request consumer: %s, message: %s", str(e), str(e.java_exception), exc_info=True)
current_app.logger.critical("Critical: JAVA error in spark-request consumer: %s, message: %s",
str(e), str(e.java_exception), exc_info=True)
time.sleep(2)
except Exception as e:
current_app.logger.critical("Error in spark-request-consumer: %s", str(e), exc_info=True)
Expand Down
5 changes: 5 additions & 0 deletions listenbrainz_spark/stats/user/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ def create_messages(data, entity: str, stats_range: str, from_ts: int, to_ts: in
"""
for entry in data:
_dict = entry.asDict(recursive=True)

# Clip the recordings to top 1000 so that we don't drop messages
if entity == "recordings" and stats_range == "all_time":
_dict[entity] = _dict[entity][:1000]

try:
model = UserEntityStatMessage(**{
'musicbrainz_id': _dict['user_name'],
Expand Down
31 changes: 31 additions & 0 deletions listenbrainz_spark/stats/user/tests/test_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,34 @@ def test_get_entity_all_time(self, mock_create_messages, mock_get_listens, mock_
mock_df.createOrReplaceTempView.assert_called_with('user_test_all_time')
mock_create_messages.assert_called_with(data='user_test_all_time_data', entity='test', stats_range='all_time',
from_ts=from_date.timestamp(), to_ts=to_date.timestamp())

def test_create_messages_recordings(self):
""" Test to check if the number of recordings are clipped to top 1000 """
recordings = []
for i in range(0, 2000):
recordings.append({
'artist_name': 'artist_{}'.format(i),
'artist_msid': str(i),
'artist_mbids': [str(i)],
'release_name': 'release_{}'.format(i),
'release_msid': str(i),
'release_mbid': str(i),
'track_name': 'recording_{}'.format(i),
'recording_mbid': str(i),
'recording_msid': str(i),
'listen_count': i
})

mock_result = MagicMock()
mock_result.asDict.return_value = {
'user_name': "test",
'recordings': recordings
}

messages = entity_stats.create_messages([mock_result], 'recordings', 'all_time', 0, 10)

received_list = next(messages)['data']
expected_list = recordings[:1000]

self.assertEqual(len(received_list), 1000)
self.assertListEqual(expected_list, received_list)

0 comments on commit 7c744e3

Please sign in to comment.