Skip to content

Commit

Permalink
use single dag pipeline (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
l-moamen committed Jan 19, 2024
1 parent 80dec41 commit bfbe3e3
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 9 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and non-personalized like track radio and artist radio.
- Set local pyenv version `pyenv local mixes`
- Activate the virtual pyenv using `pyenv activate mixes`
- Upgrade the pip package installer `pip install --upgrade pip`
- Install poetry for package management `pip install poetry==1.5.1`
- Install poetry for package management `pip install poetry==1.7.0`
- Install dependencies from the lock file `poetry install --no-root`

[pyproject.toml](pyproject.toml)[pyproject.toml](pyproject.toml)
7 changes: 3 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ classifiers = [

[tool.poetry]
name = "tidal_algorithmic_mixes"
version = "0.0.7"
version = "0.0.8"
description = "common transformers used by the tidal personalization team."
authors = [
"Loay <[email protected]>",
"Jing <[email protected]>",
"Tao <[email protected]>",
"Thomas <[email protected]>",
"Yuhua [email protected]"
"Yuhua <[email protected]>"
]

license = "Apache License V 2.0"
Expand All @@ -35,7 +34,7 @@ great-expectations = "0.16.15"
scikit-learn = "1.1.1"
alphabet-detector = "0.0.7"
pyarrow = "7.0.0"
tidal-per-transformers = ">=0.0.7"
tidal-per-transformers = ">=0.0.8"
torch = "1.12.1"
mlflow = "2.1.1"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
from datetime import datetime

import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from tidal_per_transformers.transformers import WithColumnRenamedTransformer, JoinTransformer, CleanTextTransformer, \
TrackGroupAvailabilityByCountryTransformer, TopItemsTransformer, AggregateTransformer, SelectTransformer
from tidal_per_transformers.transformers.single_dag_pipeline import SingleDAGPipeline as Pipeline

import tidal_algorithmic_mixes.utils.constants as c

Expand Down Expand Up @@ -143,7 +143,7 @@ def transform(self):
DiscoveryMixSortTransformer(self.conf.mix_size, sort_column=c.RANK),
DiscoveryMixOutputTransformer(self.conf.now, sort_column=c.RANK, min_mix_size=self.conf.min_mix_size)
])
output = pipeline.fit(self.data.precomputed_recs).transform(self.data.precomputed_recs).persist()
output = pipeline.fit(self.data.precomputed_recs).persist()
self._output = DiscoveryMixPostProcessorTransformationOutput(output)

@staticmethod
Expand All @@ -156,7 +156,7 @@ def get_user_country(user_table: DataFrame):
user_country_pipeline = Pipeline(stages=[
SelectTransformer([c.ID, c.COUNTRY_CODE]),
WithColumnRenamedTransformer(c.ID, c.USER_ID)])
return user_country_pipeline.fit(user_table).transform(user_table)
return user_country_pipeline.fit(user_table)

@staticmethod
def get_track_group_metadata(tracks_metadata: DataFrame):
Expand All @@ -170,7 +170,7 @@ def get_track_group_metadata(tracks_metadata: DataFrame):
[F.first(c.MAIN_ARTISTS_IDS).getItem(0).alias(c.ARTIST_ID),
F.first(c.TITLE).alias(c.TITLE)]),
])
return track_group_artist_pipeline.fit(tracks_metadata).transform(tracks_metadata)
return track_group_artist_pipeline.fit(tracks_metadata)

@staticmethod
def get_track_group_available_countries(track_group_metadata: DataFrame):
Expand Down

0 comments on commit bfbe3e3

Please sign in to comment.