From 8ab2573c1607dd626c3f20327ba7c50abc7532c2 Mon Sep 17 00:00:00 2001 From: loay Date: Tue, 26 Sep 2023 13:07:27 +0200 Subject: [PATCH] add discovery mix --- .github/workflows/publish.yml | 37 ++++ .github/workflows/tests.yml | 34 ++++ .gitignore | 3 + README.md | 19 +- pyproject.toml | 53 +++++ test/__init__.py | 0 test/discovery_mix/__init__.py | 0 .../test_daily_update_transformation.py | 43 ++++ .../test_observed_tracks_aggregator.py | 55 +++++ test/discovery_mix/test_post_processor.py | 141 +++++++++++++ test/pyspark_test.py | 28 +++ tidal_algorithmic_mixes/__init__.py | 0 .../discovery_mix/__init__.py | 1 + .../daily_update_transformation.py | 80 ++++++++ ...served_tracks_aggregator_transformation.py | 62 ++++++ .../post_processor_transformation.py | 191 ++++++++++++++++++ .../sasrec_model_transformation.py | 120 +++++++++++ tidal_algorithmic_mixes/etl_model.py | 53 +++++ tidal_algorithmic_mixes/utils/__init__.py | 0 tidal_algorithmic_mixes/utils/config.py | 45 +++++ tidal_algorithmic_mixes/utils/constants.py | 53 +++++ tidal_algorithmic_mixes/utils/mix_utils.py | 81 ++++++++ .../utils/transformers/__init__.py | 0 .../utils/transformers/artist/__init__.py | 0 .../artist_index_enricher_transformer.py | 23 +++ ...rtist_top_tracks_mix_output_transformer.py | 35 ++++ .../artist/enrich_with_artist_cluster.py | 22 ++ .../artist/enrich_with_similar_artist.py | 29 +++ ...ain_artist_compound_mapping_transformer.py | 38 ++++ .../utils/transformers/blacklist/__init__.py | 0 .../user_blacklist_filter_transformer.py | 102 ++++++++++ .../transformers/discovery_mix/__init__.py | 0 .../discovery_mix_output_transformer.py | 30 +++ .../discovery_mix_sort_transformer.py | 29 +++ .../flag_known_artists_transformer.py | 44 ++++ .../split_by_known_artists_transformer.py | 48 +++++ .../diversity_sort_transformer.py | 50 +++++ .../transformers/posexplode_transformer.py | 27 +++ .../utils/transformers/user/__init__.py | 0 .../user/enrich_user_transformer.py | 34 ++++ .../filter_streamed_albums_transformer.py | 27 +++ .../filter_streamed_tracks_transformer.py | 47 +++++ 42 files changed, 1682 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/publish.yml create mode 100644 .github/workflows/tests.yml create mode 100644 pyproject.toml create mode 100644 test/__init__.py create mode 100644 test/discovery_mix/__init__.py create mode 100644 test/discovery_mix/test_daily_update_transformation.py create mode 100644 test/discovery_mix/test_observed_tracks_aggregator.py create mode 100644 test/discovery_mix/test_post_processor.py create mode 100644 test/pyspark_test.py create mode 100644 tidal_algorithmic_mixes/__init__.py create mode 100644 tidal_algorithmic_mixes/discovery_mix/__init__.py create mode 100644 tidal_algorithmic_mixes/discovery_mix/daily_update_transformation.py create mode 100644 tidal_algorithmic_mixes/discovery_mix/observed_tracks_aggregator_transformation.py create mode 100644 tidal_algorithmic_mixes/discovery_mix/post_processor_transformation.py create mode 100644 tidal_algorithmic_mixes/discovery_mix/sasrec_model_transformation.py create mode 100644 tidal_algorithmic_mixes/etl_model.py create mode 100644 tidal_algorithmic_mixes/utils/__init__.py create mode 100644 tidal_algorithmic_mixes/utils/config.py create mode 100644 tidal_algorithmic_mixes/utils/constants.py create mode 100644 tidal_algorithmic_mixes/utils/mix_utils.py create mode 100644 tidal_algorithmic_mixes/utils/transformers/__init__.py create mode 100644 tidal_algorithmic_mixes/utils/transformers/artist/__init__.py create mode 100644 tidal_algorithmic_mixes/utils/transformers/artist/artist_index_enricher_transformer.py create mode 100644 tidal_algorithmic_mixes/utils/transformers/artist/artist_top_tracks_mix_output_transformer.py create mode 100644 tidal_algorithmic_mixes/utils/transformers/artist/enrich_with_artist_cluster.py create mode 100644 tidal_algorithmic_mixes/utils/transformers/artist/enrich_with_similar_artist.py create mode 100644 tidal_algorithmic_mixes/utils/transformers/artist/main_artist_compound_mapping_transformer.py create mode 100644 tidal_algorithmic_mixes/utils/transformers/blacklist/__init__.py create mode 100644 tidal_algorithmic_mixes/utils/transformers/blacklist/user_blacklist_filter_transformer.py create mode 100644 tidal_algorithmic_mixes/utils/transformers/discovery_mix/__init__.py create mode 100644 tidal_algorithmic_mixes/utils/transformers/discovery_mix/discovery_mix_output_transformer.py create mode 100644 tidal_algorithmic_mixes/utils/transformers/discovery_mix/discovery_mix_sort_transformer.py create mode 100644 tidal_algorithmic_mixes/utils/transformers/discovery_mix/flag_known_artists_transformer.py create mode 100644 tidal_algorithmic_mixes/utils/transformers/discovery_mix/split_by_known_artists_transformer.py create mode 100644 tidal_algorithmic_mixes/utils/transformers/diversity_sort_transformer.py create mode 100644 tidal_algorithmic_mixes/utils/transformers/posexplode_transformer.py create mode 100644 tidal_algorithmic_mixes/utils/transformers/user/__init__.py create mode 100644 tidal_algorithmic_mixes/utils/transformers/user/enrich_user_transformer.py create mode 100644 tidal_algorithmic_mixes/utils/transformers/user/filter_streamed_albums_transformer.py create mode 100644 tidal_algorithmic_mixes/utils/transformers/user/filter_streamed_tracks_transformer.py diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml new file mode 100644 index 0000000..21dd431 --- /dev/null +++ b/.github/workflows/publish.yml @@ -0,0 +1,37 @@ +name: Pypi publish + +on: + push: + branches: + - main + +jobs: + build-n-publish: + name: Build and publish Python distributions to PyPI + runs-on: ubuntu-latest + + steps: + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: '3.8' + + - name: Check out code + uses: actions/checkout@v2 + + - name: Upgrade pip + run: python -m pip install --upgrade pip + + - name: Install poetry + run: pip install poetry==1.5.1 + + - name: Install dependencies + run: poetry install --no-root + +# - name: Build package +# run: python3 -m build +# +# - name: Publish distribution to PyPI +# uses: pypa/gh-action-pypi-publish@release/v1 +# with: +# password: ${{ secrets.PYPI_API_TOKEN }} diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..ee11bea --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,34 @@ +name: Unit tests + +on: + push: + branches: + - main + pull_request: + branches: + - main + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: '3.8' + + - name: Check out code + uses: actions/checkout@v2 + + - name: Upgrade pip + run: python -m pip install --upgrade pip + + - name: Install poetry + run: pip install poetry==1.5.1 + + - name: Install dependencies + run: poetry install --no-root + + - name: Run unit tests + run: PYTHONPATH=tidal_algorithmic_mixes:test poetry run pytest test/ diff --git a/.gitignore b/.gitignore index 68bc17f..f385d67 100644 --- a/.gitignore +++ b/.gitignore @@ -158,3 +158,6 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ +/.python-version +/poetry.lock +*.pyc \ No newline at end of file diff --git a/README.md b/README.md index 357cf0c..96ba2da 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,17 @@ -# tidal-algorithmic-mixes -Tidal algorithmic mixes +# Tidal algorithmic mixes + +This contains the logic of how tidal create its algorithmic offline mixes, +how it utilizes different machine learning models, +alongside business rules to create different mixes for different use cases, +included personalized mixes (like my mix, my new arrivals and daily discovery) +and non-personalized like track radio and artist radio. + +- Make sure you have pyenv and [pyenv](https://github.com/pyenv/pyenv) amd [pyenv-virtualenv](https://github.com/pyenv/pyenv-virtualenv) installed on your local environment. +- Install python 3.8.16 with pyenv `pyenv install 3.8.16`. +- Set up a new virtual env `pyenv virtualenv 3.8.16 mixes` +- Set local pyenv version `pyenv local mixes` +- Activate the virtual pyenv using `pyenv activate mixes` +- Upgrade the pip package installer `pip install --upgrade pip` +- Install poetry for package management `pip install poetry==1.5.1` +- Install dependencies from the lock file `poetry install --no-root` + diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..11b49fc --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,53 @@ +[project] +requires-python = ">=3.8" +classifiers = [ + "Programming Language :: Python :: 3", + "License :: Apache License V 2.0", + "Operating System :: OS Independent", +] + +[project.urls] +"GitHub" = "https://github.com/tidal-music/tidal-algorithmic-mixes" + +[tool.poetry] +name = "tidal_algorithmic_mixes" +version = "0.0.1" +description = "common transformers used by the tidal personalization team." +authors = [ + "Loay ", + "Jing ", + "Tao ", + "Thomas ", + "Yuhua yuhua@squareup.com" +] + +license = "Apache License V 2.0" +readme = "README.md" + +[tool.poetry.dependencies] +python = ">=3.8.0" +pyspark = "3.4.0" +numpy = ">=1.16.4" +s3fs = "2022.11.0" +boto3 = "1.24.59" +pandas = ">=1.4.2" +great-expectations = "0.16.15" +scikit-learn = "1.1.1" +alphabet-detector = "0.0.7" +pyarrow = "7.0.0" +tidal-per-transformers = "0.0.4" +torch = "1.9.1" +mlflow = "2.1.1" + +[tool.poetry.group.dev.dependencies] +pytest = "6.1.2" +coverage = ">=4.5.2" +pytest-cov = ">=2.6.1" +coveralls = ">=1.6.0" +mock = ">=2.0.0" +moto = ">=3.1.11" + + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/discovery_mix/__init__.py b/test/discovery_mix/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/discovery_mix/test_daily_update_transformation.py b/test/discovery_mix/test_daily_update_transformation.py new file mode 100644 index 0000000..0221dce --- /dev/null +++ b/test/discovery_mix/test_daily_update_transformation.py @@ -0,0 +1,43 @@ +from datetime import date + +import tidal_algorithmic_mixes.utils.constants as c + +from test.pyspark_test import PySparkTest +from tidal_algorithmic_mixes.discovery_mix.daily_update_transformation import DiscoveryMixDailyUpdateTransformation, \ + DiscoveryMixDailyUpdateTransformationData + + +class DiscoveryMixDailyUpdateTransformationTestInterface(DiscoveryMixDailyUpdateTransformation): + def extract(self, *args, **kwargs): + ... + + def validate(self, *args, **kwargs): + ... + + def load(self, *args, **kwargs): + ... + + +class DiscoveryMixDailyUpdateTest(PySparkTest): + + def test_slicer(self): + mixes = self.spark.createDataFrame([ + (0, [10, 11, 12, 13, 14, 15, 16]), + (1, [10, 11, 12, 13, 14, 15, 16]), + (2, [10, 11, 12, 13, 14, 15, 16]), + (3, [10, 11, 12, 13, 14, 15, 16]) + ], [c.USER, c.TRACKS]) + + runner = DiscoveryMixDailyUpdateTransformationTestInterface(self.spark) + + runner._data = DiscoveryMixDailyUpdateTransformationData(mixes) + + self.assertEqual(runner.slicer(mixes, date(2021, 2, 15), 1).collect()[0][c.TRACKS][0], 10) + self.assertEqual(runner.slicer(mixes, date(2021, 2, 18), 1).collect()[0][c.TRACKS][0], 13) + self.assertEqual(runner.slicer(mixes, date(2021, 2, 21), 1).collect()[0][c.TRACKS][0], 16) + + def test_offset(self): + runner = DiscoveryMixDailyUpdateTransformationTestInterface(self.spark) + self.assertEqual(runner.offset(date(2021, 2, 15), 10), 0) + self.assertEqual(runner.offset(date(2021, 2, 18), 10), 30) + self.assertEqual(runner.offset(date(2021, 2, 21), 10), 60) diff --git a/test/discovery_mix/test_observed_tracks_aggregator.py b/test/discovery_mix/test_observed_tracks_aggregator.py new file mode 100644 index 0000000..d3c1929 --- /dev/null +++ b/test/discovery_mix/test_observed_tracks_aggregator.py @@ -0,0 +1,55 @@ +import tidal_algorithmic_mixes.utils.constants as c +from pyspark_test import PySparkTest +from tidal_algorithmic_mixes.discovery_mix.observed_tracks_aggregator_transformation import \ + ObservedDiscoveryMixTracksAggregatorTransformation, ObservedDiscoveryMixTracksAggregatorTransformationData + + +class ObservedDiscoveryMixTracksAggregatorTransformationTestInterface( + ObservedDiscoveryMixTracksAggregatorTransformation): + def extract(self, *args, **kwargs): + ... + + def validate(self, *args, **kwargs): + ... + + def load(self, *args, **kwargs): + ... + + +class ObservedDiscoveryMixTracksAggregatorTest(PySparkTest): + + def setUp(self): + super().setUp() + + def test_transform(self): + user_1 = 26129743 + user_2 = 43727840 + + user_1_mix = "5b5b0f74b66cbecf46de5f00297" + user_2_mix = "c71e7c0b5f8daeaff1bdea48f9f" + + tracks_user_1 = [1, 2, 3, 4, 5, 6] + tracks_user_2 = [3, 4, 5, 6, 7, 9] + + mixes = self.spark.createDataFrame([ + (user_1_mix,), + (user_2_mix,), + ], [c.MIX_ID]) + + observed_mixes = self.spark.createDataFrame([ + (user_1_mix, user_1, tracks_user_1), + (user_2_mix, user_2, tracks_user_2), + ("xvxfewfwsdf34r3sf3jfaae4tgs", 1664, [11, 22, 33, 44]), + ("a71e7xffw4rzdzdf34zsz23ead3", 1984, [55, 66, 77, 11, 22]), + ], [c.MIX_ID, c.USER, c.TRACKS]) + + runner = ObservedDiscoveryMixTracksAggregatorTransformationTestInterface(self.spark) + runner._data = ObservedDiscoveryMixTracksAggregatorTransformationData(observed_mixes=observed_mixes, + mixes=mixes) + runner.transform() + res = runner.output.output + + self.assertEqual(res.columns, [c.USER, c.TRACK_GROUP]) + self.assertEqual(res.count(), len(tracks_user_1) + len(tracks_user_2)) + + self.assertEqual([user_1, user_2], ([x[c.USER] for x in res.select(c.USER).distinct().collect()])) diff --git a/test/discovery_mix/test_post_processor.py b/test/discovery_mix/test_post_processor.py new file mode 100644 index 0000000..74bca47 --- /dev/null +++ b/test/discovery_mix/test_post_processor.py @@ -0,0 +1,141 @@ +from datetime import datetime +from pyspark.sql.types import Row +from pyspark_test import PySparkTest +from tidal_algorithmic_mixes.discovery_mix.post_processor_transformation import\ + DiscoveryMixPostProcessorTransformation, DiscoveryMixPostProcessorTransformationData + + +class DiscoveryMixPostProcessorTransformationTestInterface(DiscoveryMixPostProcessorTransformation): + def extract(self, *args, **kwargs): + ... + + def validate(self, *args, **kwargs): + ... + + def load(self, *args, **kwargs): + ... + + +class DiscoveryMixPostProcessorTest(PySparkTest): + + def setUp(self): + tracks_metadata = self.spark.createDataFrame([Row(id=1, + title='Chime again', + popularityWW=0, + trackNumber=16, + volumeNumber=1, + numAlbums=3, + explicit=False, + generatedFromVideo=False, + trackGroup='xxx', + audioQuality='LOSSLESS', + available=True, + version='x', + duration=192, + mixes={'x': 'y'}, + mainArtistsIds=[1], + mainArtistsNames=['Me'], + mainArtistId=1, + mainArtistPicture='xxx', + featuringArtistsIds=[''], + albumId=1, + masterBundleId='x', + albumTitle='Victorian', + albumCover='be7c307bc938', + releaseDate=datetime(2010, 6, 8, 0, 0), + albumReleaseDate=datetime(2010, 6, 8, 0, 0), + creditsArtistId=[1], + creditsName=['La La'], + creditsRole=['Main Artist'], + creditsRoleCategory=['HIDDEN'], + numTrackStreams=0, + numTrackStreamers=0, + voicenessScore=0, + voice=1, + genre='Christmas', + originalGenre='Christmas', + AvailableCountryCodes=['AD', 'AE'])]) + track_groups_metadata = self.spark.createDataFrame([Row(trackGroup='xxx', + AvailableCountryCodes=['AD', 'AE'])]) + + precomputed_recs = self.spark.createDataFrame([Row(user=1, + recommendations=['xxx', 'xyz'])]) + + user_history_tracks = self.spark.createDataFrame([Row(userId=1, + productId=2, + artistId=2, + trackGroup='xyz', + title="Don't Let The Sun Go Down On Me", + cleanedTitle="dd", + count=2, + source='UserTracksHistory', + dt=datetime(2020, 12, 21, 13, 3, 36, 534000))]) + user_history_artists = self.spark.createDataFrame([Row(userId=1, + artistId=3, + count=10, + source='UserArtistsHistory', + dt=datetime(2022, 5, 2, 20, 28, 23, 516000))]) + user_fav_tracks = self.spark.createDataFrame([Row(userId=1, + productId=5, + artistId=7, + trackGroup='aaa', + title='Breathing Underwater', + cleanedTitle='aa', + count=1, + source='UserTracksFavourite', + dt=datetime(2020, 10, 23, 6, 49, 33))]) + user_fav_artists = self.spark.createDataFrame([Row(userId=1, + artistId=111, + count=1, + source='UserArtistsFavourite', + dt=datetime(2019, 11, 21, 13, 31, 11))]) + + artist_clusters = self.spark.createDataFrame([Row(artistId=1, cluster=42)]) + + user_observed_tracks = self.spark.createDataFrame([Row(userId=1, + productId=5, + artistId=7, + trackGroup='aaa', + title='Breathing Underwater', + cleanedTitle='aa', + count=1, + source='UserTracksDiscoveryObserved', + dt=datetime(2020, 10, 23, 6, 49, 33))]) + + user_table = self.spark.createDataFrame([Row(id=1, countrycode='AD')]) + + user_blacklist_table = self.spark.createDataFrame([Row(artifactId='111', + artifactType='TRACK', + created=1568546619349, + userId='3')]) + + artist_compound_mapping_table = self.spark.createDataFrame([Row(id=4, + artistid=5, + artistcompoundid=6, + priority=1, + mainartist=False)]) + + self.data = DiscoveryMixPostProcessorTransformationData(tracks_metadata, + track_groups_metadata, + precomputed_recs, + user_history_tracks, + user_history_artists, + user_fav_tracks, + user_fav_artists, + artist_clusters, + user_observed_tracks, + user_table, + user_blacklist_table, + artist_compound_mapping_table + ) + + def test_transform(self): + post_processor = DiscoveryMixPostProcessorTransformationTestInterface(self.spark, + threshold_known_artists=1, + mix_size=1, + min_mix_size=0) + post_processor._data = self.data + post_processor.transform() + res = post_processor.output.output.collect()[0] + self.assertEqual(Row(user=1, tracks=['xxx'], mixId='1f1451b3b417516e9e4b4423958', atDate=res.atDate), + res) diff --git a/test/pyspark_test.py b/test/pyspark_test.py new file mode 100644 index 0000000..6a83876 --- /dev/null +++ b/test/pyspark_test.py @@ -0,0 +1,28 @@ +import logging +import unittest + +from pyspark.sql import SparkSession + + +class PySparkTest(unittest.TestCase): + + @classmethod + def suppress_py4j_logging(cls): + logger = logging.getLogger('py4j') + logger.setLevel(logging.WARN) + + @classmethod + def create_testing_pyspark_session(cls): + return (SparkSession + .builder + # fix a bug UnsupportedOperationException + .config("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") + .config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") + .appName(__name__) + .enableHiveSupport() + .getOrCreate()) + + @classmethod + def setUpClass(cls): + cls.suppress_py4j_logging() + cls.spark = cls.create_testing_pyspark_session() diff --git a/tidal_algorithmic_mixes/__init__.py b/tidal_algorithmic_mixes/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tidal_algorithmic_mixes/discovery_mix/__init__.py b/tidal_algorithmic_mixes/discovery_mix/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/tidal_algorithmic_mixes/discovery_mix/__init__.py @@ -0,0 +1 @@ + diff --git a/tidal_algorithmic_mixes/discovery_mix/daily_update_transformation.py b/tidal_algorithmic_mixes/discovery_mix/daily_update_transformation.py new file mode 100644 index 0000000..2067e57 --- /dev/null +++ b/tidal_algorithmic_mixes/discovery_mix/daily_update_transformation.py @@ -0,0 +1,80 @@ +import abc +import time +import pyspark.sql.functions as F + +from dataclasses import dataclass +from datetime import date +from pyspark.sql import SparkSession, DataFrame +from tidal_algorithmic_mixes.etl_model import ETLModel +from tidal_algorithmic_mixes.utils import mix_utils +from tidal_algorithmic_mixes.utils.config import Config +import tidal_algorithmic_mixes.utils.constants as c + + +@dataclass +class DiscoveryMixDailyUpdateTransformationData: + mixes: DataFrame + + +@dataclass +class DiscoveryMixDailyUpdateTransformationOutput: + output: DataFrame + + +class DiscoveryMixDailyUpdateTransformationConfig(Config): + def __init__(self, **kwargs): + self.current_date = kwargs.get('current_date') + self.mix_size = int(kwargs.get('mix_size', 70)) + Config.__init__(self, **kwargs) + + +class DiscoveryMixDailyUpdateTransformation(ETLModel): + """ + The main discovery mix pipeline is responsible for outputting a large number of recommendations for each user + on a weekly basis. This job simply takes a subset of these recommendations each day making sure each user + gets 10 new recommendations each day. E.g. on Monday we use recommendations from 0-9, Tuesday 10-19, etc. + """ + + # noinspection PyTypeChecker + def __init__(self, spark: SparkSession, **kwargs): + self.spark = spark + self._data: DiscoveryMixDailyUpdateTransformationData = None + self._output: DiscoveryMixDailyUpdateTransformationOutput = None + self.config = DiscoveryMixDailyUpdateTransformationConfig(**kwargs) + + @abc.abstractmethod + def extract(self, *args, **kwargs): + pass + + @abc.abstractmethod + def validate(self, *args, **kwargs): + pass + + @abc.abstractmethod + def load(self, *args, **kwargs): + pass + + def transform(self, *args, **kwargs): + discovery_mix = (self.slicer(self.data.mixes, + self.config.current_date, + self.config.mix_size) + .withColumn(c.UPDATED, F.lit(mix_utils.updated(time.time()))) + .where(F.size(c.TRACKS) >= self.config.mix_size - 2)) + self._output = DiscoveryMixDailyUpdateTransformationOutput(output=discovery_mix) + + def slicer(self, mixes: DataFrame, current_date: date, mix_size: int) -> DataFrame: + """ Extract the tracks of the day from the weekly computed list """ + offset = self.offset(current_date, mix_size) + 1 # slice starts from 1 + return mixes.withColumn(c.TRACKS, F.slice(c.TRACKS, offset, mix_size)) + + @staticmethod + def offset(current_date, mix_size): + return current_date.weekday() * mix_size + + @property + def data(self) -> DiscoveryMixDailyUpdateTransformationData: + return self._data + + @property + def output(self) -> DiscoveryMixDailyUpdateTransformationOutput: + return self._output diff --git a/tidal_algorithmic_mixes/discovery_mix/observed_tracks_aggregator_transformation.py b/tidal_algorithmic_mixes/discovery_mix/observed_tracks_aggregator_transformation.py new file mode 100644 index 0000000..4a5835c --- /dev/null +++ b/tidal_algorithmic_mixes/discovery_mix/observed_tracks_aggregator_transformation.py @@ -0,0 +1,62 @@ + +import abc +from dataclasses import dataclass + +import pyspark.sql.functions as F +from pyspark.sql import DataFrame +from pyspark.sql.session import SparkSession + +import tidal_algorithmic_mixes.utils.constants as c +from tidal_algorithmic_mixes.etl_model import ETLModel + + +@dataclass +class ObservedDiscoveryMixTracksAggregatorTransformationData: + observed_mixes: DataFrame + mixes: DataFrame + + +@dataclass +class ObservedDiscoveryMixTracksAggregatorTransformationOutput: + output: DataFrame + + +class ObservedDiscoveryMixTracksAggregatorTransformation(ETLModel): + """ + Daily storing of tracks that a user has observed after opening a discovery mix + """ + + # noinspection PyUnusedLocal,PyTypeChecker + def __init__(self, spark: SparkSession, **kwargs): + self.spark = spark + self._data: ObservedDiscoveryMixTracksAggregatorTransformationData = None + self._output: ObservedDiscoveryMixTracksAggregatorTransformationOutput = None + + @abc.abstractmethod + def extract(self, *args, **kwargs): + pass + + @abc.abstractmethod + def validate(self, *args, **kwargs): + pass + + @abc.abstractmethod + def load(self, *args, **kwargs): + pass + + def transform(self): + """ + Fetch tracks for the mixes that have been observed + """ + self._output = ObservedDiscoveryMixTracksAggregatorTransformationOutput( + self.data.mixes + .join(self.data.observed_mixes, c.MIX_ID) + .select(c.USER, F.explode(c.TRACKS).alias(c.TRACK_GROUP))) + + @property + def data(self) -> ObservedDiscoveryMixTracksAggregatorTransformationData: + return self._data + + @property + def output(self) -> ObservedDiscoveryMixTracksAggregatorTransformationOutput: + return self._output diff --git a/tidal_algorithmic_mixes/discovery_mix/post_processor_transformation.py b/tidal_algorithmic_mixes/discovery_mix/post_processor_transformation.py new file mode 100644 index 0000000..024a81f --- /dev/null +++ b/tidal_algorithmic_mixes/discovery_mix/post_processor_transformation.py @@ -0,0 +1,191 @@ +import abc +from datetime import datetime + +import pyspark.sql.functions as F +from pyspark.ml import Pipeline +from tidal_per_transformers.transformers import WithColumnRenamedTransformer, JoinTransformer, CleanTextTransformer, \ + TrackGroupAvailabilityByCountryTransformer, TopItemsTransformer, AggregateTransformer, SelectTransformer + +import tidal_algorithmic_mixes.utils.constants as c + +from dataclasses import dataclass +from pyspark.sql import SparkSession, DataFrame +from tidal_algorithmic_mixes.etl_model import ETLModel +from tidal_algorithmic_mixes.utils.config import Config +from tidal_algorithmic_mixes.utils.transformers.artist.enrich_with_artist_cluster import EnrichWithArtistCluster +from tidal_algorithmic_mixes.utils.transformers.blacklist.user_blacklist_filter_transformer import \ + UserBlacklistFilterTransformer +from tidal_algorithmic_mixes.utils.transformers.discovery_mix.discovery_mix_output_transformer import \ + DiscoveryMixOutputTransformer +from tidal_algorithmic_mixes.utils.transformers.discovery_mix.discovery_mix_sort_transformer import \ + DiscoveryMixSortTransformer +from tidal_algorithmic_mixes.utils.transformers.discovery_mix.flag_known_artists_transformer import \ + FlagKnownArtistsTransformer +from tidal_algorithmic_mixes.utils.transformers.discovery_mix.split_by_known_artists_transformer import \ + SplitByKnownArtistsTransformer +from tidal_algorithmic_mixes.utils.transformers.diversity_sort_transformer import DiversitySortTransformer +from tidal_algorithmic_mixes.utils.transformers.posexplode_transformer import PosExplodeTransformer +from tidal_algorithmic_mixes.utils.transformers.user.filter_streamed_tracks_transformer import \ + FilterStreamedTracksTransformer + + +@dataclass +class DiscoveryMixPostProcessorTransformationData: + tracks_metadata: DataFrame + track_groups_metadata: DataFrame + precomputed_recs: DataFrame + user_history_tracks: DataFrame + user_history_artists: DataFrame + user_fav_tracks: DataFrame + user_fav_artists: DataFrame + artist_clusters: DataFrame + user_observed_tracks: DataFrame + user_table: DataFrame + user_blacklist_table: DataFrame + artist_compound_mapping_table: DataFrame + + +@dataclass +class DiscoveryMixPostProcessorTransformationOutput: + output: DataFrame + + +class DiscoveryMixPostProcessorTransformationConfig(Config): + + def __init__(self, **kwargs): + self.mix_size = int(kwargs.get('mix_size', 70)) + self.min_mix_size = int(kwargs.get('min_mix_size', 30)) + self.threshold_known_artists = float(kwargs.get('threshold_known_artists', 0.2)) + self.max_artist_items = int(kwargs.get('max_artist_items', 1)) + self.known_artist_streams = int(kwargs.get('known_artist_streams', 3)) + self.known_artist_recency = int(kwargs.get('known_artist_recency', 6)) + self.known_track_streams = int(kwargs.get('known_track_streams', 2)) + self.known_track_recency = int(kwargs.get('known_track_recency', 12)) + self.now = datetime.utcnow().date().isoformat() + + Config.__init__(self, **kwargs) + + +class DiscoveryMixPostProcessorTransformation(ETLModel): + + # noinspection PyTypeChecker + def __init__(self, spark: SparkSession, **kwargs): + self.spark = spark + self.conf = DiscoveryMixPostProcessorTransformationConfig(**kwargs) + self._data: DiscoveryMixPostProcessorTransformationData = None + self._output: DiscoveryMixPostProcessorTransformationOutput = None + + @abc.abstractmethod + def extract(self, *args, **kwargs): + pass + + @abc.abstractmethod + def validate(self, *args, **kwargs): + pass + + @abc.abstractmethod + def load(self, *args, **kwargs): + pass + + def transform(self): + user_country = DiscoveryMixPostProcessorTransformation.get_user_country(self.data.user_table) + track_group_metadata = DiscoveryMixPostProcessorTransformation.get_track_group_metadata( + self.data.tracks_metadata) + track_group_available_countries = DiscoveryMixPostProcessorTransformation.get_track_group_available_countries( + self.data.track_groups_metadata) + + pipeline = Pipeline(stages=[ + WithColumnRenamedTransformer(c.USER, c.USER_ID), + PosExplodeTransformer(explode_col=c.RECOMMENDATIONS, alias=c.TRACK_GROUP), + JoinTransformer(user_country, on=c.USER_ID), + JoinTransformer(track_group_metadata, on=c.TRACK_GROUP), + CleanTextTransformer(output_col=c.CLEANED_TITLE), + UserBlacklistFilterTransformer(self.data.user_blacklist_table, + self.data.artist_compound_mapping_table, + filter_track=True, + filter_artist=True, + user_col=c.USER_ID), + TrackGroupAvailabilityByCountryTransformer(track_group_available_countries), + FilterStreamedTracksTransformer(self.data.user_observed_tracks, + playback_threshold=0, + recency_threshold=0, + last_stream_date_column=c.DT), + FilterStreamedTracksTransformer(self.data.user_history_tracks, + playback_threshold=self.conf.known_track_streams, + recency_threshold=self.conf.known_track_recency, + last_stream_date_column=c.DT), + FilterStreamedTracksTransformer(self.data.user_history_tracks, + playback_threshold=self.conf.known_track_streams, + recency_threshold=self.conf.known_track_recency, + join_columns=[c.USER_ID, c.CLEANED_TITLE], + last_stream_date_column=c.DT), + FilterStreamedTracksTransformer(self.data.user_fav_tracks, + playback_threshold=0, + recency_threshold=0, + last_stream_date_column=c.DT), + FlagKnownArtistsTransformer(self.data.user_history_artists, + self.conf.known_artist_streams, + self.conf.known_artist_recency, + self.data.user_fav_artists, + last_stream_date_column=c.DT), + TopItemsTransformer([c.USER_ID, c.ARTIST_ID], + F.col(c.POS), + self.conf.max_artist_items), + EnrichWithArtistCluster(self.data.artist_clusters), + SplitByKnownArtistsTransformer(self.conf.mix_size, + self.conf.threshold_known_artists), + DiversitySortTransformer([c.USER_ID], + [c.CLUSTER], + [c.ARTIST_ID], + c.POS, + gap=int(self.conf.mix_size / 7)), + # spread out the clusters + DiscoveryMixSortTransformer(self.conf.mix_size, sort_column=c.RANK), + DiscoveryMixOutputTransformer(self.conf.now, sort_column=c.RANK, min_mix_size=self.conf.min_mix_size) + ]) + output = pipeline.fit(self.data.precomputed_recs).transform(self.data.precomputed_recs).persist() + self._output = DiscoveryMixPostProcessorTransformationOutput(output) + + @staticmethod + def get_user_country(user_table: DataFrame): + """Returns user id and user country + + :param user_table: user table df + :return: user id and user country DF + """ + user_country_pipeline = Pipeline(stages=[ + SelectTransformer([c.ID, c.COUNTRY_CODE]), + WithColumnRenamedTransformer(c.ID, c.USER_ID)]) + return user_country_pipeline.fit(user_table).transform(user_table) + + @staticmethod + def get_track_group_metadata(tracks_metadata: DataFrame): + """Returns track group and all main artists + + :param tracks_metadata: tracks metadata fs table + :return: track group and all main artists + """ + track_group_artist_pipeline = Pipeline(stages=[ + AggregateTransformer(c.TRACK_GROUP, + [F.first(c.MAIN_ARTISTS_IDS).getItem(0).alias(c.ARTIST_ID), + F.first(c.TITLE).alias(c.TITLE)]), + ]) + return track_group_artist_pipeline.fit(tracks_metadata).transform(tracks_metadata) + + @staticmethod + def get_track_group_available_countries(track_group_metadata: DataFrame): + """Gets all available countries for a track group + + :param track_group_metadata: tracks group metadata fs table + :return: track group available countries + """ + select = SelectTransformer([c.TRACK_GROUP, c.AVAILABLE_COUNTRY_CODES]) + return select.transform(track_group_metadata) + + @property + def data(self) -> DiscoveryMixPostProcessorTransformationData: + return self._data + + @property + def output(self) -> DiscoveryMixPostProcessorTransformationOutput: + return self._output diff --git a/tidal_algorithmic_mixes/discovery_mix/sasrec_model_transformation.py b/tidal_algorithmic_mixes/discovery_mix/sasrec_model_transformation.py new file mode 100644 index 0000000..0f78374 --- /dev/null +++ b/tidal_algorithmic_mixes/discovery_mix/sasrec_model_transformation.py @@ -0,0 +1,120 @@ +import abc +from dataclasses import dataclass + +from mlflow.pyfunc.spark_model_cache import SparkModelCache +from mlflow.tracking.artifact_utils import _download_artifact_from_uri +from mlflow.utils.file_utils import TempDir +import numpy as np +import pandas +import pyspark.sql.functions as F +import torch +from pyspark.sql.pandas.functions import pandas_udf +from pyspark.sql.types import ArrayType, StringType +from pyspark.sql import SparkSession, DataFrame + +import tidal_algorithmic_mixes.utils.constants as c +from tidal_algorithmic_mixes.etl_model import ETLModel +from tidal_algorithmic_mixes.utils.config import Config +from tidal_algorithmic_mixes.utils.mix_utils import last_n_items + + +@dataclass +class DiscoveryMixSasRecModelTransformationData: + inference: DataFrame + + +@dataclass +class DiscoveryMixSasRecModelTransformationOutput: + output: DataFrame + + +class DiscoveryMixSasRecModelTransformationConfig(Config): + def __init__(self, **kwargs): + self.max_seq_len = int(kwargs.get("max_seq_len", 500)) + self.batch_size = int(kwargs.get("batch_size", 128)) + self.n_recs = int(kwargs.get("n_recs", 6000)) + self.n_partitions = int(kwargs.get("n_partitions", 60)) + self.inference_device = kwargs.get("inference_device", "cuda:0") # cpu or cuda:0 + self.model_path = kwargs.get("model_path") + Config.__init__(self, **kwargs) + + +class DiscoveryMixSasRecModelTransformation(ETLModel): + + # noinspection PyTypeChecker + def __init__(self, spark: SparkSession, **kwargs): + self.spark = spark + self.conf = DiscoveryMixSasRecModelTransformationConfig(**kwargs) + self._data: DiscoveryMixSasRecModelTransformationData = None + self._output: DiscoveryMixSasRecModelTransformationOutput = None + self.spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", self.conf.batch_size) + + @abc.abstractmethod + def extract(self, *args, **kwargs): + pass + + @abc.abstractmethod + def validate(self, *args, **kwargs): + pass + + @abc.abstractmethod + def load(self, *args, **kwargs): + pass + + def transform(self): + + user_items = (self.data.inference + .withColumn(c.ITEMS, last_n_items(c.ITEMS, F.lit(self.conf.max_seq_len))) + .repartition(self.conf.n_partitions)) + + predictions = self.predict(user_items, + self.conf.n_recs, + f'runs:/{self.conf.run_id}/script_model', + self.conf.inference_device, + self.conf.model_path) + + self._output = DiscoveryMixSasRecModelTransformationOutput(predictions) + + def predict(self, user_items, n_recs, model_uri, device, path): + spark_udf = _get_spark_pandas_udf(self.spark, n_recs, model_uri, device, path) + return user_items.select(F.col(c.USER), spark_udf(c.ITEMS).alias(c.RECOMMENDATIONS)) + + @property + def data(self) -> DiscoveryMixSasRecModelTransformationData: + return self._data + + @property + def output(self) -> DiscoveryMixSasRecModelTransformationOutput: + return self._output + + +def _get_spark_pandas_udf(spark, n_recs, model_uri, device, path): + with TempDir() as local_tmpdir: + local_model_path = _download_artifact_from_uri( + artifact_uri=model_uri, output_path=local_tmpdir.path() + ) + archive_path = SparkModelCache.add_local_model(spark, local_model_path) + + vocabulary = spark.read.parquet(f"{path}/vocabulary") + + spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") # performance improvement + + ix2item = vocabulary.select(c.ITEM, c.INDEX).toPandas() + ix2item_bc = spark.sparkContext.broadcast(dict(zip(ix2item[c.INDEX], ix2item[c.ITEM]))) + + def predict(items_batch): + model, _ = SparkModelCache.get_or_load(archive_path) + max_len = max( + map(lambda x: len(x), items_batch)) # get the longest sequence length in this batch, used for padding + items_batch = list(map(lambda x: x.tolist()[-max_len:] + [0] * (max_len - len(x)), items_batch)) + items_batch = torch.tensor(items_batch) + results = model.predict({c.SEQ_MODEL_ITEMS_SEQ: items_batch, + c.SEQ_MODEL_TOP_K: n_recs, + c.DEVICE: device}) + res = [] + for items in results: + res.append(np.vectorize(ix2item_bc.value.get)(np.array(items))) + return pandas.Series(res) + + # noinspection PyTypeChecker + return pandas_udf(predict, ArrayType(StringType())) diff --git a/tidal_algorithmic_mixes/etl_model.py b/tidal_algorithmic_mixes/etl_model.py new file mode 100644 index 0000000..3e8667c --- /dev/null +++ b/tidal_algorithmic_mixes/etl_model.py @@ -0,0 +1,53 @@ +import abc +import mlflow +import tidal_algorithmic_mixes.utils.constants as c + +from typing import Dict + + +class ETLModel(abc.ABC): + + @abc.abstractmethod + def extract(self, *args, **kwargs): + """ Read the data from the source tables. + """ + + @abc.abstractmethod + def transform(self, *args, **kwargs): + """ Apply the transformations to the dataset. Use a pipeline if possible. + """ + + @abc.abstractmethod + def load(self, *args, **kwargs): + """ Load the data to the end target + """ + + @abc.abstractmethod + def validate(self, *args, **kwargs): + """ Validate data, generally using great expectations + """ + + @staticmethod + def evaluate(metrics: Dict = None, params: Dict = None, experiment_path: str = ''): + """Log metrics and parameters to mlflow + + + :param metrics: ETL pipeline metrics to be logged + :param params: ETL pipeline parameters to be logged + :param experiment_path: MLFlow Experiment path to log metrics + """ + mlflow.set_tracking_uri(c.DATABRICKS_MLFLOW_URI) + mlflow.set_experiment(experiment_path) + mlflow.start_run() + mlflow.log_metrics(metrics) if metrics else None + mlflow.log_params(params) if params else None + mlflow.end_run() + + def run(self): + """Runs the ETL job + + """ + self.extract() + self.transform() + self.validate() + self.load() diff --git a/tidal_algorithmic_mixes/utils/__init__.py b/tidal_algorithmic_mixes/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tidal_algorithmic_mixes/utils/config.py b/tidal_algorithmic_mixes/utils/config.py new file mode 100644 index 0000000..8a165e9 --- /dev/null +++ b/tidal_algorithmic_mixes/utils/config.py @@ -0,0 +1,45 @@ +import json +from datetime import datetime + + +class Config: + def __init__(self, **kwargs): + self.run_id = kwargs.get('run_id', None) + self.git_hash = kwargs.get('git_hash', None) + self.n_partitions = int(kwargs.get('n_partitions', 800)) + self.validate_kwargs(**kwargs) + + def validate_kwargs(self, **kwargs): + """ Check that no unknown arguments are passed to the job. """ + for k in kwargs: + if not hasattr(self, k): + raise AttributeError('arg %s is not a member of %s' % (k, self.__class__.__name__)) + + def as_dict(self, git_hash_prefix=None): + """ Convert the config object to a dictionary with all members. """ + d = {} + + for k, v in self.__dict__.items(): + if isinstance(v, dict): + for h, val in v.items(): + d[f"{k}_{h}"] = val + elif isinstance(v, (bool, list)): + d[k] = str(v) + elif isinstance(v, (str, float, int)): + d[k] = v + elif isinstance(v, type(None)): + d[k] = "None" + + if git_hash_prefix: + ts = datetime.utcnow().replace(microsecond=0).isoformat() + d[f"{git_hash_prefix}_git_hash_{ts}"] = d.pop('git_hash') + + return d + + @staticmethod + def parse_boolean(kwarg): + """ Parse a boolean kwarg. """ + if isinstance(kwarg, bool): + return kwarg + + return json.loads(kwarg.lower()) diff --git a/tidal_algorithmic_mixes/utils/constants.py b/tidal_algorithmic_mixes/utils/constants.py new file mode 100644 index 0000000..78e17d8 --- /dev/null +++ b/tidal_algorithmic_mixes/utils/constants.py @@ -0,0 +1,53 @@ +DATABRICKS_MLFLOW_URI = "databricks" +DISCOVERY_MIX = "discovery_mix" +DISCOVERY_MIX_SEQ = "discovery_mix_sequence" +UPDATED = "updated" +MIX_ID = "mixId" +MIX_ID_LENGTH = 27 +TRACK = "track" +TRACKS = "tracks" +NAME = "name" +COVER = "cover" +ID = "id" +IMAGE = "image" +DISCOVERY_MIX_INTERNAL_ID_PREFIX = "016" +TRACK_GROUP = "trackGroup" +USER = "user" +ARTIFACT_TYPE = "artifactType" +COMPOUND_ID = "compoundId" +ARTIST_ID = "artistId" +ARTIST_COMPOUND_ID = "artistCompoundId" +MAIN_ARTIST = "mainArtist" +MAIN_ARTISTS = "mainArtists" +MAIN_ARTIST_ID = "mainArtistId" +MAIN_ARTISTS_IDS = "mainArtistsIds" +MAIN_ARTISTS_NAMES = "mainArtistsNames" +MAIN_ARTIST_PICTURE = "mainArtistPicture" +USER_ID = "userId" +VIDEO_ID = "videoId" +AT_DATE = "atDate" +POS = "pos" +KNOWN_ARTIST = "knownArtist" +LAST_STREAMED_DATE = "lastStreamedDate" +COUNT = 'count' +DT = 'dt' +CLEANED_TITLE = "cleanedTitle" +RECOMMENDATIONS = "recommendations" +CLUSTER = "cluster" +RANK = "rank" +COUNTRY_CODE = "countryCode" +TITLE = "title" +AVAILABLE_COUNTRY_CODES = "AvailableCountryCodes" +MASTER_BUNDLE_ID = "masterBundleId" +PICTURE = "picture" +NEIGHBOURS = "neighbours" +ENRICHED_POSITION = "enriched_position" +ENRICHED = "enriched" +RESOLVED_ARTIST_ID = "resolvedArtistId" +PRIORITY = "priority" +ITEMS = "items" +ITEM = "item" +INDEX = "index" +SEQ_MODEL_ITEMS_SEQ = "items_seq" +SEQ_MODEL_TOP_K = "top_k" +DEVICE = "device" diff --git a/tidal_algorithmic_mixes/utils/mix_utils.py b/tidal_algorithmic_mixes/utils/mix_utils.py new file mode 100644 index 0000000..0c7883b --- /dev/null +++ b/tidal_algorithmic_mixes/utils/mix_utils.py @@ -0,0 +1,81 @@ +from typing import List, Any + +import pyspark.sql.functions as F +import pyspark.sql.types as T +import tidal_algorithmic_mixes.utils.constants as c + + +def at_date(date): + """ + Standardized function for generating the atDate columns in DynamoDB mix tables. + + :type date: datetime.datetime + :rtype: str + """ + return date.isoformat(timespec="seconds") + + +def updated(time): + """ + Standardized function for generating the updated columns in the DynamoDB mix tables. + + :type time: time.time + :rtype: int + """ + return int(round(time * 1000)) + + +def mix_id(col1, col2): + """ + Standardized function for generating the mixId column in the DynamoDB mix tables. + + :param col1: The first combination key (e.g. a timestamp) + :type col1: pyspark.sql.Column + :param col2: The column containing the userId + :type col2: pyspark.sql.Column + :rtype: pyspark.sql.Column + """ + return F.substring(F.md5(F.concat(col1, col2)), 0, c.MIX_ID_LENGTH) + + +def mix_id_stable(col1): + """ + Standardized function for generating the mixId column in the DynamoDB mix tables when IDs should be stable. + + :param col1: The key used to generate the ID (e.g. artistId or contributorMixType) + :rtype: pyspark.sql.Column + """ + return F.substring(F.md5(F.col(col1)), 0, c.MIX_ID_LENGTH) + + +@F.udf(T.ArrayType(T.StructType([ + T.StructField(c.ID, T.IntegerType(), False), + T.StructField(c.NAME, T.StringType(), False), + T.StructField(c.IMAGE, T.StringType(), False)]))) +def pick_top_artists_udf(artists, num_artists): + distinct, seen = [], {} + for a in artists: + if not a[c.ID] in seen and a[c.IMAGE]: + seen.update({a[c.ID]: True}) + distinct.append(a) + return distinct[:num_artists] + + +@F.udf(T.ArrayType(T.StructType([ + T.StructField(c.ID, T.IntegerType(), False), + T.StructField(c.NAME, T.StringType(), False), + T.StructField(c.IMAGE, T.StringType(), False), + T.StructField(c.COVER, T.StringType(), False)]))) +def pick_top_artists_with_cover(artists, num_artists): + distinct, seen = [], {} + for a in artists: + if not a[c.ID] in seen and a[c.IMAGE] and a[c.COVER]: + seen.update({a[c.ID]: True}) + distinct.append(a) + return distinct[:num_artists] + + +@F.udf(returnType=T.ArrayType(T.IntegerType())) +def last_n_items(items: List[Any], n): + """ F.slice does not work for large negative numbers (e.g. last 100 items) and returns empty lists!""" + return items[-n:] diff --git a/tidal_algorithmic_mixes/utils/transformers/__init__.py b/tidal_algorithmic_mixes/utils/transformers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tidal_algorithmic_mixes/utils/transformers/artist/__init__.py b/tidal_algorithmic_mixes/utils/transformers/artist/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tidal_algorithmic_mixes/utils/transformers/artist/artist_index_enricher_transformer.py b/tidal_algorithmic_mixes/utils/transformers/artist/artist_index_enricher_transformer.py new file mode 100644 index 0000000..d9b39a8 --- /dev/null +++ b/tidal_algorithmic_mixes/utils/transformers/artist/artist_index_enricher_transformer.py @@ -0,0 +1,23 @@ +# TODO: move to per-transformers + +from pyspark.ml.base import Transformer +from pyspark.pandas import DataFrame + +import tidal_algorithmic_mixes.utils.constants as c + + +class ArtistMetaDataEnricherTransformer(Transformer): + + def __init__(self, artist_metadata: DataFrame, artist_id_col=c.ARTIST_ID, cols=(c.NAME, c.IMAGE)): + self.artist_metadata = artist_metadata + self.artist_id_col = artist_id_col + self.cols = list(cols) + super().__init__() + + def _transform(self, dataset): + index = (self.artist_metadata + .withColumnRenamed(c.ID, self.artist_id_col) + .withColumnRenamed(c.PICTURE, c.IMAGE) + .select([self.artist_id_col] + self.cols)) + + return dataset.join(index, self.artist_id_col) diff --git a/tidal_algorithmic_mixes/utils/transformers/artist/artist_top_tracks_mix_output_transformer.py b/tidal_algorithmic_mixes/utils/transformers/artist/artist_top_tracks_mix_output_transformer.py new file mode 100644 index 0000000..2c4413a --- /dev/null +++ b/tidal_algorithmic_mixes/utils/transformers/artist/artist_top_tracks_mix_output_transformer.py @@ -0,0 +1,35 @@ +# TODO: move to per-transformers + +import pyspark.sql.functions as F +from pyspark.ml import Transformer +from pyspark.sql import Window +from pyspark.sql.types import StringType + +import tidal_algorithmic_mixes.utils.constants as c +from tidal_algorithmic_mixes.utils import mix_utils + + +class ArtistTopTracksMixOutputTransformer(Transformer): + + def __init__(self, today, sort_column=c.RANK): + super(ArtistTopTracksMixOutputTransformer, self).__init__() + self.today = today + self.sort_column = sort_column + + def _transform(self, dataset): + dataset = dataset.withColumn(c.TRACK_GROUP, F.col(c.TRACK_GROUP).astype(StringType())) + + w = Window.partitionBy(c.ARTIST_ID).orderBy(self.sort_column) + + return (dataset + .withColumn(c.TRACKS, F.collect_list(c.TRACK_GROUP).over(w)) + .groupBy(c.ARTIST_ID) + .agg(F.max(c.TRACKS).alias(c.TRACKS)) + .withColumnRenamed(c.USER_ID, c.USER) + .withColumn(c.ARTIST_ID, F.col(c.ARTIST_ID).astype('int')) + .withColumn(c.MIX_ID, mix_utils.mix_id(F.lit(self.get_prefix()), F.col(c.ARTIST_ID))) + .withColumn(c.AT_DATE, F.lit(self.today).cast(StringType()))) + # .withColumn(c.AT_DATE, F.lit("latest"))) # TODO change to latest when doing PER-1686 + + def get_prefix(self): + return "artist_top_tracks_mix_" diff --git a/tidal_algorithmic_mixes/utils/transformers/artist/enrich_with_artist_cluster.py b/tidal_algorithmic_mixes/utils/transformers/artist/enrich_with_artist_cluster.py new file mode 100644 index 0000000..1c31006 --- /dev/null +++ b/tidal_algorithmic_mixes/utils/transformers/artist/enrich_with_artist_cluster.py @@ -0,0 +1,22 @@ +# TODO: move to per-transformers + +from pyspark.ml.base import Transformer + +import tidal_algorithmic_mixes.utils.constants as c + + +class EnrichWithArtistCluster(Transformer): + """ Enrich with data from artist clustering model """ + + def __init__(self, artist_clusters, artist_id_col=c.ARTIST_ID, artist_clusters_cols=(c.CLUSTER)): + super().__init__() + self.artist_clusters = artist_clusters + self.artist_id_col = artist_id_col + self.artist_clusters_cols = artist_clusters_cols + + def _transform(self, dataset): + artist_cluster = (self.artist_clusters + .withColumnRenamed(c.ARTIST_ID, self.artist_id_col) + .select(self.artist_id_col, self.artist_clusters_cols)) + + return dataset.join(artist_cluster, self.artist_id_col, "left_outer") diff --git a/tidal_algorithmic_mixes/utils/transformers/artist/enrich_with_similar_artist.py b/tidal_algorithmic_mixes/utils/transformers/artist/enrich_with_similar_artist.py new file mode 100644 index 0000000..a5d4dde --- /dev/null +++ b/tidal_algorithmic_mixes/utils/transformers/artist/enrich_with_similar_artist.py @@ -0,0 +1,29 @@ +# TODO: move to per-transformers + +from pyspark.ml.base import Transformer +from pyspark.sql import functions as F + +import tidal_algorithmic_mixes.utils.constants as c + + +class EnrichWithSimilarArtists(Transformer): + """ + Enrich with artists from the artist clustering model + it will add a column ENRICHED=1 and ENRICHED_POSITION=neighbour position + """ + + def __init__(self, similar_artists, max_similars): + super().__init__() + self.similar_artists = similar_artists + self.max_similars = max_similars + + def _transform(self, dataset): + similar_artists_exploded = (self.similar_artists + .withColumn(c.NEIGHBOURS, F.slice(F.col(c.NEIGHBOURS), 1, self.max_similars)) + .select(c.ARTIST_ID, F.posexplode(c.NEIGHBOURS))) + + return (dataset.join(similar_artists_exploded, c.ARTIST_ID) + .withColumnRenamed("pos", c.ENRICHED_POSITION) # pos from posexplode + .withColumn(c.ARTIST_ID, F.col("col")) # col from posexplode + .drop(c.NEIGHBOURS, "col") + .withColumn(c.ENRICHED, F.lit(1))) diff --git a/tidal_algorithmic_mixes/utils/transformers/artist/main_artist_compound_mapping_transformer.py b/tidal_algorithmic_mixes/utils/transformers/artist/main_artist_compound_mapping_transformer.py new file mode 100644 index 0000000..b133602 --- /dev/null +++ b/tidal_algorithmic_mixes/utils/transformers/artist/main_artist_compound_mapping_transformer.py @@ -0,0 +1,38 @@ +# TODO: move to per-transformers + +from pyspark.ml.base import Transformer +from pyspark.sql import functions as F, DataFrame +from tidal_per_transformers.transformers.utils.spark_utils import get_top_items + +import tidal_algorithmic_mixes.utils.constants as c + + +class MainArtistCompoundMappingTransformer(Transformer): + """ + Map the artist compound id's to a single main artist (e.g. Miguel feat. Travis Scott -> Miguel) + + :returns DataFrame where the compound id's have been mapped to their constituent parts + """ + def __init__(self, artist_compound_mapping: DataFrame): + super().__init__() + self.artist_compound_mapping = artist_compound_mapping + + def _transform(self, dataset): + compound_map = (self.artist_compound_mapping + .where("mainartist = 'true'") + .withColumnRenamed(c.ARTIST_COMPOUND_ID, c.RESOLVED_ARTIST_ID) + .drop(c.ID)) + + # Unfortunately the compound table contain duplicates (multiple main artists), keep only 1 (lowest priority) + deduped = get_top_items(compound_map, [c.ARTIST_ID], c.PRIORITY, 1) + + # If there is no compound entry we already have the main artist + joined = dataset.join(deduped, c.ARTIST_ID, "left") + + mapped = (joined + .withColumn(c.ARTIST_ID, F.when( + F.col(c.RESOLVED_ARTIST_ID).isNull(), F.col(c.ARTIST_ID)) + .otherwise(F.col(c.RESOLVED_ARTIST_ID))) + .drop(c.RESOLVED_ARTIST_ID, c.PRIORITY, c.MAIN_ARTIST, c.ARTIST_COMPOUND_ID)) + + return mapped diff --git a/tidal_algorithmic_mixes/utils/transformers/blacklist/__init__.py b/tidal_algorithmic_mixes/utils/transformers/blacklist/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tidal_algorithmic_mixes/utils/transformers/blacklist/user_blacklist_filter_transformer.py b/tidal_algorithmic_mixes/utils/transformers/blacklist/user_blacklist_filter_transformer.py new file mode 100644 index 0000000..56f9e4b --- /dev/null +++ b/tidal_algorithmic_mixes/utils/transformers/blacklist/user_blacklist_filter_transformer.py @@ -0,0 +1,102 @@ +# TODO: move to per-transformers + +import pyspark.sql.functions as F +from pyspark.ml.base import Transformer +from pyspark.sql import DataFrame +import tidal_algorithmic_mixes.utils.constants as c + + +ARTIFACT_ID = "artifactId" +TRACK = "TRACK" +VIDEO = "VIDEO" +ARTIST = "ARTIST" + + +class UserBlacklistFilterTransformer(Transformer): + """ Transformer for removing content blacklisted by users from a DataFrame. + """ + def __init__(self, + user_blacklist: DataFrame, # backup_tables.get_user_blacklist_table(self.sc) + compound_table: DataFrame, # backup_tables.get_artist_compound_mapping_table(self.sc) + filter_track: bool = True, + filter_artist: bool = True, + filter_video: bool = False, + user_col: str = c.USER_ID): + super().__init__() + self.filter_track = filter_track + self.filter_video = filter_video + self.filter_artist = filter_artist + self.user_col = user_col + self.user_blacklist = user_blacklist + self.compound_table = compound_table + + def _transform(self, dataset): + + if self.user_col != c.USER_ID: + self.user_blacklist = self.user_blacklist.withColumnRenamed(c.USER_ID, self.user_col) + + if self.filter_track: + dataset = self.filter_blacklisted_tracks(dataset, self.user_blacklist) + + if self.filter_video: + dataset = self.filter_blacklisted_videos(dataset, self.user_blacklist) + + if self.filter_artist: + dataset = self.filter_blacklisted_artists(dataset, self.user_blacklist) + + return dataset + + def filter_blacklisted_tracks(self, dataset, blacklist): + """ + Remove blacklisted tracks from the dataset. Maps the productIds -> trackGroups before removing the + blacklisted content from the dataset. + + :type dataset: pyspark.sql.DataFrame + :param blacklist: DataFrame containing all blacklisted content + :type blacklist: pyspark.sql.DataFrame + """ + track_blacklist = (blacklist + .where(F.col(c.ARTIFACT_TYPE) == TRACK) + .select(self.user_col, F.col(ARTIFACT_ID).alias(c.TRACK_GROUP))) + + return dataset.join(track_blacklist, [self.user_col, c.TRACK_GROUP], "left_anti") + + def filter_blacklisted_videos(self, dataset, blacklist): + """ + Remove blacklisted videos from the dataset. + + :type dataset: pyspark.sql.DataFrame + :param blacklist: DataFrame containing all blacklisted content + :type blacklist: pyspark.sql.DataFrame + """ + video_blacklist = (blacklist + .where(F.col(c.ARTIFACT_TYPE) == VIDEO) + .withColumnRenamed(ARTIFACT_ID, c.VIDEO_ID)) + + return dataset.join(video_blacklist, [self.user_col, c.VIDEO_ID], "left_anti") + + def filter_blacklisted_artists(self, dataset, blacklist): + """ + Remove blacklisted artists from the dataset. Filters any tracks by the artist or any compound the artist + is part of. + + :type dataset: pyspark.sql.DataFrame + :param blacklist: DataFrame containing all blacklisted content + :type blacklist: pyspark.sql.DataFrame + """ + main_artist_blacklist = (blacklist + .where(F.col(c.ARTIFACT_TYPE) == ARTIST) + .select(F.col(self.user_col), F.col(ARTIFACT_ID).alias(c.ARTIST_ID)) + .persist()) + + compound_table = (self.compound_table + .withColumnRenamed(c.ARTIST_ID, c.COMPOUND_ID)) + + compound_blacklist = (main_artist_blacklist + .join(compound_table, main_artist_blacklist[c.ARTIST_ID] == compound_table[c.ARTIST_COMPOUND_ID]) + .select(F.col(self.user_col), F.col(c.COMPOUND_ID).alias(c.ARTIST_ID))) + + # Keep both the main artists and any compounds they are part of + full_blacklist = main_artist_blacklist.union(compound_blacklist) + + return dataset.join(full_blacklist, [self.user_col, c.ARTIST_ID], "left_anti") diff --git a/tidal_algorithmic_mixes/utils/transformers/discovery_mix/__init__.py b/tidal_algorithmic_mixes/utils/transformers/discovery_mix/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tidal_algorithmic_mixes/utils/transformers/discovery_mix/discovery_mix_output_transformer.py b/tidal_algorithmic_mixes/utils/transformers/discovery_mix/discovery_mix_output_transformer.py new file mode 100644 index 0000000..f1f15a6 --- /dev/null +++ b/tidal_algorithmic_mixes/utils/transformers/discovery_mix/discovery_mix_output_transformer.py @@ -0,0 +1,30 @@ +import pyspark.sql.functions as F +from pyspark.ml import Transformer +from pyspark.sql import Window +from pyspark.sql.types import IntegerType, StringType + +import tidal_algorithmic_mixes.utils.constants as c +from tidal_algorithmic_mixes.utils import mix_utils + + +class DiscoveryMixOutputTransformer(Transformer): + + def __init__(self, today, sort_column=c.POS, min_mix_size=66): + super(DiscoveryMixOutputTransformer, self).__init__() + self.today = today + self.sort_column = sort_column + self.min_mix_size = min_mix_size + + def _transform(self, dataset): + w = Window.partitionBy(c.USER_ID).orderBy(self.sort_column) + + return (dataset + .withColumn(c.TRACK_GROUP, F.col(c.TRACK_GROUP).astype(StringType())) + .withColumn(c.TRACKS, F.collect_list(c.TRACK_GROUP).over(w)) + .groupBy(c.USER_ID) + .agg(F.max(c.TRACKS).alias(c.TRACKS)) + .where(F.size(c.TRACKS) >= self.min_mix_size) + .withColumnRenamed(c.USER_ID, c.USER) + .withColumn(c.USER, F.col(c.USER).astype(IntegerType())) + .withColumn(c.MIX_ID, mix_utils.mix_id(F.lit("discovery_mix_"), F.col(c.USER))) + .withColumn(c.AT_DATE, F.lit(self.today).cast(StringType()))) diff --git a/tidal_algorithmic_mixes/utils/transformers/discovery_mix/discovery_mix_sort_transformer.py b/tidal_algorithmic_mixes/utils/transformers/discovery_mix/discovery_mix_sort_transformer.py new file mode 100644 index 0000000..571bf51 --- /dev/null +++ b/tidal_algorithmic_mixes/utils/transformers/discovery_mix/discovery_mix_sort_transformer.py @@ -0,0 +1,29 @@ +import pyspark.sql.functions as F +from pyspark.ml import Transformer +from pyspark.sql.window import Window + +import tidal_algorithmic_mixes.utils.constants as c +KNOWN_PERCENTAGE = "knownPercentage" + + +class DiscoveryMixSortTransformer(Transformer): + """ + We don't want all the recommended tracks from the artists that are known to a user to be put into a single mix. + The tracks from known artists should be distributed evenly over the different days of the week + """ + def __init__(self, precompute_size=70, sort_column=c.POS): + super(DiscoveryMixSortTransformer, self).__init__() + self.precompute_size = precompute_size + self.sort_column = sort_column + + def _transform(self, dataset): + return (dataset + .withColumn(KNOWN_PERCENTAGE, + F.sum(c.KNOWN_ARTIST).over(Window.partitionBy(c.USER_ID)) / self.precompute_size) + .withColumn(self.sort_column, + F.row_number().over(Window.partitionBy(c.USER_ID, c.KNOWN_ARTIST).orderBy(self.sort_column))) + .withColumn(self.sort_column, + F.when(F.col(c.KNOWN_ARTIST) == 1, + F.col(self.sort_column) * (1 - F.col(KNOWN_PERCENTAGE))) + .otherwise(F.col(self.sort_column) * F.col(KNOWN_PERCENTAGE))) + .drop(KNOWN_PERCENTAGE)) diff --git a/tidal_algorithmic_mixes/utils/transformers/discovery_mix/flag_known_artists_transformer.py b/tidal_algorithmic_mixes/utils/transformers/discovery_mix/flag_known_artists_transformer.py new file mode 100644 index 0000000..8318648 --- /dev/null +++ b/tidal_algorithmic_mixes/utils/transformers/discovery_mix/flag_known_artists_transformer.py @@ -0,0 +1,44 @@ +from datetime import datetime + +import pyspark.sql.functions as F +from dateutil.relativedelta import relativedelta +from pyspark.ml import Transformer + +import tidal_algorithmic_mixes.utils.constants as c + + +class FlagKnownArtistsTransformer(Transformer): + """ Flag the dataset with artists that are known/previously observed by a user """ + def __init__(self, + all_time_tracks, + artist_playback_threshold: int, + recency_threshold: int, + fav_artists=None, + last_stream_date_column: str = c.LAST_STREAMED_DATE): + """ + :param all_time_tracks: all time history of user + :param artist_playback_threshold: hard limit for # streams before an artist is considered as known/observed + :param last_stream_date_column: column for track last stream date + :param fav_artists: df containing user favourite artists + :param recency_threshold: months since last stream before we "forget" lower stream counts + """ + super().__init__() + self.all_time_tracks = all_time_tracks + self.playback_threshold = artist_playback_threshold + self.recency_threshold = datetime.now() - relativedelta(months=recency_threshold) + self.fav_artists = fav_artists + self.last_stream_date_column = last_stream_date_column + + def _transform(self, dataset): + known_artists = (self.all_time_tracks + .where((F.col(c.COUNT) >= self.playback_threshold) + | (F.col(self.last_stream_date_column) >= self.recency_threshold)) + .select(c.USER_ID, c.ARTIST_ID, F.lit(1).alias(c.KNOWN_ARTIST))) + + if self.fav_artists is not None: + fav_artists = self.fav_artists.select(c.USER_ID, c.ARTIST_ID, F.lit(1).alias(c.KNOWN_ARTIST)) + known_artists = known_artists.union(fav_artists).distinct() + + return (dataset.join(known_artists, [c.USER_ID, c.ARTIST_ID], "left_outer") + .select(c.USER_ID, c.ARTIST_ID, c.TRACK_GROUP, c.KNOWN_ARTIST, c.POS) + .na.fill(0, c.KNOWN_ARTIST)) diff --git a/tidal_algorithmic_mixes/utils/transformers/discovery_mix/split_by_known_artists_transformer.py b/tidal_algorithmic_mixes/utils/transformers/discovery_mix/split_by_known_artists_transformer.py new file mode 100644 index 0000000..8d90ee8 --- /dev/null +++ b/tidal_algorithmic_mixes/utils/transformers/discovery_mix/split_by_known_artists_transformer.py @@ -0,0 +1,48 @@ +import pyspark.sql.functions as F +from pyspark.ml import Transformer +from pyspark.sql import Window +from tidal_per_transformers.transformers import TopItemsTransformer + +import tidal_algorithmic_mixes.utils.constants as c + + +NR_KNOWN_ARTIST = "nr_known_artist" +NR_UNKNOWN_ARTIST = "nr_unknown_artist" + + +class SplitByKnownArtistsTransformer(Transformer): + """ Keep only a % of known artists + + :param mix_size: number of tracks to keep + :param threshold_known_artists: how much of known artist + + """ + + def __init__(self, mix_size, threshold_known_artists): + super(SplitByKnownArtistsTransformer, self).__init__() + self.mix_size = mix_size + self.threshold_known_artists = threshold_known_artists + + def _transform(self, dataset): + window = Window.partitionBy(c.USER_ID) + dataset = dataset.withColumn(NR_KNOWN_ARTIST, F.sum(c.KNOWN_ARTIST).over(window)) + dataset = dataset.withColumn("total_artists", F.count(c.ARTIST_ID).over(window)) + dataset = dataset.withColumn(NR_UNKNOWN_ARTIST, F.expr(f"total_artists - {NR_KNOWN_ARTIST}")).drop("total_artists") + + optimal_max_known_artists = int(self.threshold_known_artists * self.mix_size) + optimal_max_unknown_artists = int(self.mix_size - optimal_max_known_artists) + + max_known_artists = F.expr( + f"if({optimal_max_unknown_artists}>{NR_UNKNOWN_ARTIST}, {self.mix_size}-{NR_UNKNOWN_ARTIST}, {self.mix_size}-{optimal_max_unknown_artists} )") + + # for users without enough known artists, need more unknown artists + max_unknown_artists = F.expr( + f"if({optimal_max_known_artists}>{NR_KNOWN_ARTIST}, {self.mix_size}-{NR_KNOWN_ARTIST}, {self.mix_size}-{optimal_max_known_artists} )") + + known_artist_recs = (TopItemsTransformer(c.USER_ID, F.col(c.POS), max_known_artists) + .transform(dataset.where(f"{c.KNOWN_ARTIST}=1"))) + + unknown_artist_recs = (TopItemsTransformer(c.USER_ID, F.col(c.POS), max_unknown_artists) + .transform(dataset.where(f"{c.KNOWN_ARTIST}=0"))) + + return known_artist_recs.unionAll(unknown_artist_recs).drop(NR_KNOWN_ARTIST) diff --git a/tidal_algorithmic_mixes/utils/transformers/diversity_sort_transformer.py b/tidal_algorithmic_mixes/utils/transformers/diversity_sort_transformer.py new file mode 100644 index 0000000..96c68de --- /dev/null +++ b/tidal_algorithmic_mixes/utils/transformers/diversity_sort_transformer.py @@ -0,0 +1,50 @@ +# TODO: move to per-transformers + +from pyspark.ml.base import Transformer +from pyspark.sql import functions as F +from pyspark.sql.window import Window + +import tidal_algorithmic_mixes.utils.constants as c + + +class DiversitySortTransformer(Transformer): + """ + Sorts a mix in an 'intelligent' manner by e.g. spacing out tracks from the same artists or whatever you + pass using the partition arguments. + + :param id_col: Your id, this could e.g. be a mixId or userId or another partition we want to sort for + :param partition_one: The first partition you would like space out (e.g. artistId) + :param partition_two: The second partition you would like to space out (e.g. trackBundleId) + :param order_by: Column containing the original relevance based ordering + :param gap: The number of spaces to add between each artist/album recommendation + :return: A DataFrame with a new 'rank' column that can be used to sort the list + """ + + def __init__(self, id_col, partition_one, partition_two, order_by, gap=5): + super(DiversitySortTransformer, self).__init__() + self.id_col = id_col + self.partition_one = self.id_col + partition_one + self.partition_two = self.id_col + partition_two + self.order_by = order_by + self.gap = gap + + def _transform(self, dataset): + w1 = Window.partitionBy(self.partition_one).orderBy(self.order_by) + w1_rank_window = Window.partitionBy(self.id_col).orderBy("w1_first_rank") + w2 = Window.partitionBy(self.partition_two).orderBy(self.order_by) + w2_rank_window = Window.partitionBy(self.id_col).orderBy("w2_first_rank") + + ordered = (dataset + .withColumn("w1_first_rank", F.first(self.order_by, True).over(w1) - 1) + .withColumn("w2_first_rank", F.first(self.order_by, True).over(w2) - 1) + .withColumn("w1_rank", F.dense_rank().over(w1_rank_window) - 1) + .withColumn("w2_rank", F.dense_rank().over(w2_rank_window) - 1) + .withColumn("w1_inter_rank", F.row_number().over(w1) - 1) + .withColumn("w2_inter_rank", F.row_number().over(w2) - 1) + .withColumn("ordering", (F.least(F.col("w1_rank"), F.col("w2_rank")) + + self.gap * F.greatest(F.col("w1_inter_rank"), F.col("w2_inter_rank")))) + .withColumn(c.RANK, F.row_number().over(Window.partitionBy(self.id_col).orderBy( + F.col('ordering'), F.desc(self.order_by)))) + .drop("w1_inter_rank", "w2_inter_rank", "w2_rank", "w1_rank", "w1_first_rank", "w2_first_rank", + "ordering")) + return ordered diff --git a/tidal_algorithmic_mixes/utils/transformers/posexplode_transformer.py b/tidal_algorithmic_mixes/utils/transformers/posexplode_transformer.py new file mode 100644 index 0000000..3ac7376 --- /dev/null +++ b/tidal_algorithmic_mixes/utils/transformers/posexplode_transformer.py @@ -0,0 +1,27 @@ +# TODO: move to per-transformers + +import pyspark.sql.functions as F +from pyspark.ml.base import Transformer + + +class PosExplodeTransformer(Transformer): + """ Returns a new row for each element with position in the given array or map + :type alias: str|dict + """ + def __init__(self, explode_col=None, alias='col'): + super(PosExplodeTransformer, self).__init__() + self.explode_col = explode_col + self.alias = alias + + def _transform(self, dataset): + columns = [col for col in dataset.columns if self.explode_col not in col] + exploded = dataset.select(F.posexplode(self.explode_col), *columns) + + if type(self.alias) is dict: + for k, v in self.alias.items(): + exploded = exploded.withColumn(v, F.col(k)) + exploded = exploded.drop('col') + else: + exploded = exploded.withColumnRenamed('col', self.alias) + + return exploded diff --git a/tidal_algorithmic_mixes/utils/transformers/user/__init__.py b/tidal_algorithmic_mixes/utils/transformers/user/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tidal_algorithmic_mixes/utils/transformers/user/enrich_user_transformer.py b/tidal_algorithmic_mixes/utils/transformers/user/enrich_user_transformer.py new file mode 100644 index 0000000..138abbb --- /dev/null +++ b/tidal_algorithmic_mixes/utils/transformers/user/enrich_user_transformer.py @@ -0,0 +1,34 @@ +# TODO: move to per-transformers + +from pyspark.ml import Transformer +from pyspark.sql import DataFrame + + +class EnrichUserTransformer(Transformer): + """ Join the dataset with fields from the user table. + """ + def __init__(self, user_table: DataFrame, userid_col: str, mapping=None): + """ + :param userid_col: Name of the dataset column containing the user id. + :param mapping: List of columns to include in the join from the user table. Optionally a dict can be passed + with mappings from user table columns to new column names that will be joined with the dataset. + :type mapping: tuple[str]|List[str]|dict[str, str] + """ + super().__init__() + self.user_table = user_table + self.userid_col = userid_col + + if isinstance(mapping, dict): + self.mapping = mapping + elif isinstance(mapping, (tuple, list)): + self.mapping = {k: k for k in mapping} + else: + raise ValueError() + + def _transform(self, dataset): + user_table = self.user_table.withColumnRenamed('id', self.userid_col) + + for k, v in self.mapping.items(): + user_table = user_table.withColumnRenamed(k, v) + + return dataset.join(user_table, self.userid_col, 'left') diff --git a/tidal_algorithmic_mixes/utils/transformers/user/filter_streamed_albums_transformer.py b/tidal_algorithmic_mixes/utils/transformers/user/filter_streamed_albums_transformer.py new file mode 100644 index 0000000..1e6ebc9 --- /dev/null +++ b/tidal_algorithmic_mixes/utils/transformers/user/filter_streamed_albums_transformer.py @@ -0,0 +1,27 @@ +# TODO: move to per-transformers + +import pyspark.sql.functions as F +from pyspark.ml import Transformer + +import tidal_algorithmic_mixes.utils.constants as c + + +class FilterStreamedAlbumsTransformer(Transformer): + + """ + Filter album that have been listened more than the threshold filter_min_album_streams + """ + + def __init__(self, filter_min_album_streams, user_history): + super().__init__() + self.filter_min_album_streams = filter_min_album_streams + self.user_history = user_history + + def _transform(self, dataset): + user_history = (self.user_history + .groupBy(c.USER_ID, c.MASTER_BUNDLE_ID) + .agg(F.sum("count").alias(c.COUNT)) + .where(f"count>={self.filter_min_album_streams}") + .drop("count")) + + return dataset.join(user_history, on=[c.USER_ID, c.MASTER_BUNDLE_ID], how="left_anti") diff --git a/tidal_algorithmic_mixes/utils/transformers/user/filter_streamed_tracks_transformer.py b/tidal_algorithmic_mixes/utils/transformers/user/filter_streamed_tracks_transformer.py new file mode 100644 index 0000000..32053ce --- /dev/null +++ b/tidal_algorithmic_mixes/utils/transformers/user/filter_streamed_tracks_transformer.py @@ -0,0 +1,47 @@ +# TODO: move to per-transformers + +from datetime import datetime + +import pyspark.sql.functions as F +from dateutil.relativedelta import relativedelta +from pyspark.ml import Transformer + +import tidal_algorithmic_mixes.utils.constants as c + + +class FilterStreamedTracksTransformer(Transformer): + + def __init__(self, + track_dataset, + playback_threshold: int = 1, + recency_threshold: int = 12, + join_columns=[c.USER_ID, c.TRACK_GROUP], + last_stream_date_column: str = c.LAST_STREAMED_DATE, + filter_streamed_tracks=True): + """ + :param track_dataset: dataset containing the user track listening history + :param playback_threshold: hard limit on # streams before a track is considered known + :param recency_threshold: for lower stream counts we can apply a recency filter + :param join_columns: columns used to remove known tracks + :param last_stream_date_column: column for track last stream date + :param filter_streamed_tracks: toggle sorting on/off for pipelines where this is configurable + """ + super(FilterStreamedTracksTransformer, self).__init__() + self.filter_streamed_tracks = filter_streamed_tracks + self.track_dataset = track_dataset + self.join_columns = join_columns + self.playback_threshold = playback_threshold + self.recency_threshold = recency_threshold + self.last_stream_date_column = last_stream_date_column + + def _transform(self, dataset): + if self.filter_streamed_tracks and self.track_dataset is not None: + if self.playback_threshold and self.recency_threshold: + memory_window = datetime.now() - relativedelta(months=self.recency_threshold) + self.track_dataset = (self.track_dataset + .where((F.col(c.COUNT) >= self.playback_threshold) | + (F.col(self.last_stream_date_column) >= memory_window))) + + return dataset.join(self.track_dataset, on=self.join_columns, how="left_anti") + else: + return dataset