Skip to content

Commit

Permalink
Add created field to dumps (#2949)
Browse files Browse the repository at this point in the history
* Add created field to dumps

Useful in incremental generation of stats.

* fix spark tests
  • Loading branch information
amCap1712 authored Aug 27, 2024
1 parent 8183d15 commit 38287bb
Show file tree
Hide file tree
Showing 16 changed files with 11 additions and 3 deletions.
11 changes: 8 additions & 3 deletions listenbrainz/listenstore/dump_listenstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

SPARK_LISTENS_SCHEMA = pa.schema([
pa.field("listened_at", pa.timestamp("ms"), False),
pa.field("created", pa.timestamp("ms"), False),
pa.field("user_id", pa.int64(), False),
pa.field("recording_msid", pa.string(), False),
pa.field("artist_name", pa.string(), False),
Expand Down Expand Up @@ -342,6 +343,7 @@ def write_parquet_files(self,
-- setting multiple columns at once.
WITH listen_with_mbid AS (
SELECT l.listened_at
, l.created
, l.user_id
, l.recording_msid
-- converting jsonb array to text array is non-trivial, so return a jsonb array not text
Expand All @@ -365,6 +367,7 @@ def write_parquet_files(self,
WHERE {criteria} > %(start)s
AND {criteria} <= %(end)s
) SELECT l.listened_at
, l.created
, l.user_id
, l.recording_msid::TEXT
, l_artist_credit_mbids
Expand All @@ -388,14 +391,15 @@ def write_parquet_files(self,
listen_count = 0
current_listened_at = None
conn = timescale.engine.raw_connection()
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as curs:
with (conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as curs):
curs.execute(query, args)
while True:
t0 = time.monotonic()
written = 0
approx_size = 0
data = {
'listened_at': [],
'created': [],
'user_id': [],
'recording_msid': [],
'artist_name': [],
Expand Down Expand Up @@ -438,10 +442,11 @@ def write_parquet_files(self,

current_listened_at = result["listened_at"]
data["listened_at"].append(current_listened_at)
data["created"].append(result["created"])
data["user_id"].append(result["user_id"])
data["recording_msid"].append(result["recording_msid"])
approx_size += len(str(result["listened_at"])) + len(str(result["user_id"])) \
+ len(result["recording_msid"])
approx_size += len(str(result["listened_at"])) + len(str(result["created"])) \
+ len(str(result["user_id"])) + len(result["recording_msid"])

written += 1
listen_count += 1
Expand Down
1 change: 1 addition & 0 deletions listenbrainz_spark/hdfs/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ def process_full_listens_dump(self):
select extract(year from listened_at) as year
, extract(month from listened_at) as month
, listened_at
, created
, user_id
, recording_msid
, artist_name
Expand Down
1 change: 1 addition & 0 deletions listenbrainz_spark/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

listens_new_schema = StructType([
StructField('listened_at', TimestampType(), nullable=False),
StructField('created', TimestampType(), nullable=False),
StructField('user_id', IntegerType(), nullable=False),
StructField('recording_msid', StringType(), nullable=False),
StructField('artist_name', StringType(), nullable=False),
Expand Down
Binary file modified listenbrainz_spark/testdata/fresh_releases_listens.parquet
Binary file not shown.
Binary file modified listenbrainz_spark/testdata/full-dump/0.parquet
Binary file not shown.
Binary file modified listenbrainz_spark/testdata/full-dump/1.parquet
Binary file not shown.
Binary file modified listenbrainz_spark/testdata/full-dump/2.parquet
Binary file not shown.
Binary file modified listenbrainz_spark/testdata/full-dump/3.parquet
Binary file not shown.
Binary file modified listenbrainz_spark/testdata/full-dump/4.parquet
Binary file not shown.
Binary file modified listenbrainz_spark/testdata/full-dump/5.parquet
Binary file not shown.
Binary file modified listenbrainz_spark/testdata/full-dump/6.parquet
Binary file not shown.
Binary file modified listenbrainz_spark/testdata/incremental-dump-1/0.parquet
Binary file not shown.
Binary file modified listenbrainz_spark/testdata/incremental-dump-2/0.parquet
Binary file not shown.
Binary file modified listenbrainz_spark/testdata/mapped_listens.parquet
Binary file not shown.
Binary file modified listenbrainz_spark/testdata/rec_listens.parquet
Binary file not shown.
1 change: 1 addition & 0 deletions listenbrainz_spark/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ def get_intermediate_stats_df(start: datetime, end: datetime):

query = dedent(f"""\
select listened_at
, created
, user_id
, recording_msid
, artist_name
Expand Down

0 comments on commit 38287bb

Please sign in to comment.