From e44a43beae55439f49b087284dd11d6ac3baaf7f Mon Sep 17 00:00:00 2001 From: Lee Peuker Date: Tue, 19 Jul 2022 20:56:48 +0200 Subject: [PATCH 1/6] Add job proccessing setup --- Makefile | 3 + bin/console.php | 1 + .../20220719134322_AddJobQueueTable.php | 29 +++++++ src/Application/Service/Tmdb/SyncMovies.php | 2 +- src/Command/ProcessJobs.php | 65 ++++++++++++++ src/Worker/Job.php | 84 +++++++++++++++++++ src/Worker/Repository.php | 45 ++++++++++ src/Worker/Service.php | 50 +++++++++++ 8 files changed, 278 insertions(+), 1 deletion(-) create mode 100644 db/migrations/20220719134322_AddJobQueueTable.php create mode 100644 src/Command/ProcessJobs.php create mode 100644 src/Worker/Job.php create mode 100644 src/Worker/Repository.php create mode 100644 src/Worker/Service.php diff --git a/Makefile b/Makefile index ab8b3961..4d684905 100644 --- a/Makefile +++ b/Makefile @@ -82,6 +82,9 @@ app_sync_tmdb: app_sync_letterboxd: make exec_app_cmd CMD="php bin/console.php letterboxd:sync $(CSV_PATH)" +app_jobs_process: + make exec_app_cmd CMD="php bin/console.php jobs:process" + # Tests ####### test: test_phpcs test_psalm test_phpstan diff --git a/bin/console.php b/bin/console.php index dff65729..39f721ef 100644 --- a/bin/console.php +++ b/bin/console.php @@ -13,5 +13,6 @@ $application->add($container->get(Movary\Command\UserDelete::class)); $application->add($container->get(Movary\Command\UserUpdate::class)); $application->add($container->get(Movary\Command\UserList::class)); +$application->add($container->get(Movary\Command\ProcessJobs::class)); $application->run(); diff --git a/db/migrations/20220719134322_AddJobQueueTable.php b/db/migrations/20220719134322_AddJobQueueTable.php new file mode 100644 index 00000000..eb9abdb4 --- /dev/null +++ b/db/migrations/20220719134322_AddJobQueueTable.php @@ -0,0 +1,29 @@ +execute( + <<execute( + <<movieApi->fetchAllOrderedByLastUpdatedAtTmdbAsc(); diff --git a/src/Command/ProcessJobs.php b/src/Command/ProcessJobs.php new file mode 100644 index 00000000..733f885c --- /dev/null +++ b/src/Command/ProcessJobs.php @@ -0,0 +1,65 @@ +setDescription('Process job from the queue.'); + } + + // phpcs:ignore Generic.CodeAnalysis.UnusedFunctionParameter + protected function execute(InputInterface $input, OutputInterface $output) : int + { + sleep(1); // For now to keep supervisor happy + + $this->generateOutput($output, 'Processing job...'); + + try { + $processedJobType = $this->processJob(); + } catch (\Exception $e) { + $this->logger->error('Could not process job.', ['exception' => $e]); + + return Command::FAILURE; + } + + $processedMessage = 'Nothing to process.'; + if ($processedJobType !== null) { + $processedMessage = 'Processed job of type: ' . $processedJobType; + } + + $this->generateOutput($output, $processedMessage); + + return Command::SUCCESS; + } + + private function processJob() : ?string + { + $job = $this->repository->fetchOldestJob(); + + if ($job === null) { + return null; + } + + $this->workerService->processJob($job); + + /** @noinspection PhpUnreachableStatementInspection */ + return $job->getType(); + } +} diff --git a/src/Worker/Job.php b/src/Worker/Job.php new file mode 100644 index 00000000..5a458e3a --- /dev/null +++ b/src/Worker/Job.php @@ -0,0 +1,84 @@ + $userId]); + } + + public static function createTraktRatingsSync(int $userId) : self + { + return new self((string)Uuid::uuid4(), self::TYPE_TRAKT_SYNC_RATINGS, ['userId' => $userId]); + } + + public function getParameters() : array + { + return $this->parameters; + } + + public function getType() : string + { + return $this->type; + } + + public function isOfTypeTmdbSync() : bool + { + return $this->type === self::TYPE_TMDB_SYNC; + } + + public function isOfTypeTraktSyncHistory() : bool + { + return $this->type === self::TYPE_TRAKT_SYNC_HISTORY; + } + + public function isOfTypeTraktSyncRankings() : bool + { + return $this->type === self::TYPE_TRAKT_SYNC_RATINGS; + } + + public function jsonSerialize() : array + { + return [ + 'uuid' => $this->uuid, + 'type' => $this->type, + 'parameters' => $this->parameters, + ]; + } +} diff --git a/src/Worker/Repository.php b/src/Worker/Repository.php new file mode 100644 index 00000000..c3c95c20 --- /dev/null +++ b/src/Worker/Repository.php @@ -0,0 +1,45 @@ +dbConnection->insert( + 'job_queue', + [ + 'job' => Json::encode($job), + ] + ); + } + + public function fetchOldestJob() : ?Job + { + $this->dbConnection->beginTransaction(); + + $data = $this->dbConnection->fetchAssociative('SELECT * FROM `job_queue` ORDER BY `created_at` LIMIT 1'); + + if ($data === false) { + return null; + } + + $this->deleteJob($data['id']); + + $this->dbConnection->commit(); + + return Job::createFromArray(Json::decode($data['job'])); + } + + private function deleteJob(int $id) : void + { + $this->dbConnection->delete('job_queue', ['id' => $id]); + } +} diff --git a/src/Worker/Service.php b/src/Worker/Service.php new file mode 100644 index 00000000..a0de9520 --- /dev/null +++ b/src/Worker/Service.php @@ -0,0 +1,50 @@ +repository->addJob($job); + } + + public function addTraktHistorySyncJob(int $userId) : void + { + $job = Job::createTraktHistorySync($userId); + + $this->repository->addJob($job); + } + + public function addTraktRatingsSyncJob(int $userId) : void + { + $job = Job::createTraktRatingsSync($userId); + + $this->repository->addJob($job); + } + + public function processJob(Job $job) : void + { + $parameters = $job->getParameters(); + + match (true) { + $job->isOfTypeTraktSyncRankings() => $this->traktSyncRatings->execute($parameters['userId']), + $job->isOfTypeTraktSyncHistory() => $this->traktSyncWatchedMovies->execute($parameters['userId']), + $job->isOfTypeTmdbSync() => $this->tmdbSyncMovies->syncMovies(), + default => throw new \RuntimeException('Job type not supported: ' . $job->getType()), + }; + } +} From 63d3d54d8a1523fdda78128aa66849e4e3fc6ce7 Mon Sep 17 00:00:00 2001 From: Lee Peuker Date: Tue, 19 Jul 2022 22:05:31 +0200 Subject: [PATCH 2/6] Add supervisor runner for job processing --- .dockerignore | 1 - .env.development.example | 1 + .env.production.example | 1 + bootstrap.php | 1 + build/php/Dockerfile | 1 + build/php/supervisor.d/movary.conf | 12 ++++++ settings/routes.php | 10 +++++ src/Application/User/Entity.php | 8 ++-- src/Command/ProcessJobs.php | 23 ++++++++-- src/Factory.php | 16 +++++++ src/HttpController/JobController.php | 52 +++++++++++++++++++++++ src/HttpController/SettingsController.php | 8 +++- templates/page/settings.html.twig | 12 ++++++ 13 files changed, 137 insertions(+), 9 deletions(-) create mode 100644 build/php/supervisor.d/movary.conf create mode 100644 src/HttpController/JobController.php diff --git a/.dockerignore b/.dockerignore index 020f11c3..a25e8b14 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,3 +1,2 @@ -build/ tests/ tmp/* diff --git a/.env.development.example b/.env.development.example index 5fe4cab1..84566e41 100644 --- a/.env.development.example +++ b/.env.development.example @@ -3,6 +3,7 @@ ENV=development USER_ID=1000 HTTP_PORT=80 TIMEZONE="Europe/Berlin" +MIN_RUNTIME_IN_SECONDS_FOR_JOB_PROCESSING=10 # Database DATABASE_HOST="mysql" diff --git a/.env.production.example b/.env.production.example index efdaf917..30fdaa3e 100644 --- a/.env.production.example +++ b/.env.production.example @@ -1,6 +1,7 @@ # Enviroment ENV=production TIMEZONE="Europe/Berlin" +MIN_RUNTIME_IN_SECONDS_FOR_JOB_PROCESSING=10 # Database DATABASE_HOST= diff --git a/bootstrap.php b/bootstrap.php index 8e79090b..b276d660 100644 --- a/bootstrap.php +++ b/bootstrap.php @@ -15,6 +15,7 @@ \Movary\Command\DatabaseMigrationStatus::class => DI\factory([Factory::class, 'createDatabaseMigrationStatusCommand']), \Movary\Command\DatabaseMigrationMigrate::class => DI\factory([Factory::class, 'createDatabaseMigrationMigrateCommand']), \Movary\Command\DatabaseMigrationRollback::class => DI\factory([Factory::class, 'createDatabaseMigrationRollbackCommand']), + \Movary\Command\ProcessJobs::class => DI\factory([Factory::class, 'createProcessJobCommand']), \Psr\Http\Client\ClientInterface::class => DI\factory([Factory::class, 'createHttpClient']), \Psr\Log\LoggerInterface::class => DI\factory([Factory::class, 'createFileLogger']), \Doctrine\DBAL\Connection::class => DI\factory([Factory::class, 'createDbConnection']), diff --git a/build/php/Dockerfile b/build/php/Dockerfile index a7021c03..46bafa24 100644 --- a/build/php/Dockerfile +++ b/build/php/Dockerfile @@ -9,4 +9,5 @@ ARG APPLICATION_VERSION ENV APPLICATION_VERSION=${APPLICATION_VERSION} COPY --chown=application ./ ./ COPY .env.production.example .env +COPY build/php/supervisor.d/movary.conf /opt/docker/etc/supervisor.d/movary.conf RUN composer install --no-dev diff --git a/build/php/supervisor.d/movary.conf b/build/php/supervisor.d/movary.conf new file mode 100644 index 00000000..47dbd8a1 --- /dev/null +++ b/build/php/supervisor.d/movary.conf @@ -0,0 +1,12 @@ +[program:movary] +command=/usr/local/bin/php /app/bin/console.php jobs:process +numprocs=1 +user=application +autostart=true +autorestart=true +startsecs=1 +startretries=10 +stdout_logfile=/dev/stdout +stdout_logfile_maxbytes=0 +stderr_logfile=/dev/stderr +stderr_logfile_maxbytes=0 diff --git a/settings/routes.php b/settings/routes.php index c6d6f6f0..640318d6 100644 --- a/settings/routes.php +++ b/settings/routes.php @@ -132,6 +132,16 @@ '/user/delete-account', [\Movary\HttpController\SettingsController::class, 'deleteAccount'] ); + $routeCollector->addRoute( + 'GET', + '/jobs/schedule/trakt-history-sync', + [\Movary\HttpController\JobController::class, 'scheduleTraktHistorySync'] + ); + $routeCollector->addRoute( + 'GET', + '/jobs/schedule/trakt-ratings-sync', + [\Movary\HttpController\JobController::class, 'scheduleTraktRatingsSync'] + ); $routeCollector->addRoute( 'POST', '/user/date-format', diff --git a/src/Application/User/Entity.php b/src/Application/User/Entity.php index e003943d..33943dc5 100644 --- a/src/Application/User/Entity.php +++ b/src/Application/User/Entity.php @@ -9,7 +9,7 @@ private function __construct( private readonly string $passwordHash, private readonly bool $areCoreAccountChangesDisabled, private readonly ?string $plexWebhookUuid, - private readonly ?string $dateFormat, + private readonly ?int $dateFormatId, private readonly ?string $TraktUserName, private readonly ?string $TraktClientId, ) { @@ -22,7 +22,7 @@ public static function createFromArray(array $data) : self $data['password'], (bool)$data['core_account_changes_disabled'], $data['plex_webhook_uuid'], - $data['date_format'], + $data['date_format_id'], $data['trakt_user_name'], $data['trakt_client_id'], ); @@ -33,9 +33,9 @@ public function areCoreAccountChangesDisabled() : bool return $this->areCoreAccountChangesDisabled; } - public function getDateFormat() : ?string + public function getDateFormatId() : ?int { - return $this->dateFormat; + return $this->dateFormatId; } public function getId() : int diff --git a/src/Command/ProcessJobs.php b/src/Command/ProcessJobs.php index 733f885c..924c221d 100644 --- a/src/Command/ProcessJobs.php +++ b/src/Command/ProcessJobs.php @@ -5,29 +5,37 @@ use Movary\Worker; use Psr\Log\LoggerInterface; use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; class ProcessJobs extends Command { + private const OPTION_NAME_MIN_RUNTIME = 'minRuntime'; + protected static $defaultName = 'jobs:process'; public function __construct( private readonly Worker\Repository $repository, private readonly Worker\Service $workerService, - private readonly LoggerInterface $logger + private readonly LoggerInterface $logger, + private readonly ?int $minRuntimeInSeconds = null ) { parent::__construct(); } protected function configure() : void { - $this->setDescription('Process job from the queue.'); + $this + ->setDescription('Process job from the queue.') + ->addOption(self::OPTION_NAME_MIN_RUNTIME, 'minRuntime', InputOption::VALUE_REQUIRED, 'Minimum runtime of command.'); } // phpcs:ignore Generic.CodeAnalysis.UnusedFunctionParameter protected function execute(InputInterface $input, OutputInterface $output) : int { - sleep(1); // For now to keep supervisor happy + $minRuntime = $input->getOption(self::OPTION_NAME_MIN_RUNTIME) ?? $this->minRuntimeInSeconds; + + $timeStart = microtime(true); $this->generateOutput($output, 'Processing job...'); @@ -46,6 +54,15 @@ protected function execute(InputInterface $input, OutputInterface $output) : int $this->generateOutput($output, $processedMessage); + $missingTime = (int)$minRuntime - (microtime(true) - $timeStart); + if ($missingTime > 0) { + $waitTime = max((int)ceil($missingTime * 1000000), 0); + + $this->generateOutput($output, 'Sleeping for ' . $waitTime / 1000000 . ' seconds to reach min runtime...'); + + usleep($waitTime); + } + return Command::SUCCESS; } diff --git a/src/Factory.php b/src/Factory.php index ac9136ae..94611138 100644 --- a/src/Factory.php +++ b/src/Factory.php @@ -169,4 +169,20 @@ public static function createTwigFilesystemLoader() : Twig\Loader\FilesystemLoad { return new Twig\Loader\FilesystemLoader(__DIR__ . '/../templates'); } + + public function createProcessJobCommand(ContainerInterface $container, Config $config) : Command\ProcessJobs + { + try { + $minRuntimeInSeconds = $config->getAsInt('MIN_RUNTIME_IN_SECONDS_FOR_JOB_PROCESSING'); + } catch (\OutOfBoundsException) { + $minRuntimeInSeconds = null; + } + + return new Command\ProcessJobs( + $container->get(Worker\Repository::class), + $container->get(Worker\Service::class), + $container->get(LoggerInterface::class), + $minRuntimeInSeconds, + ); + } } diff --git a/src/HttpController/JobController.php b/src/HttpController/JobController.php new file mode 100644 index 00000000..1daa9fe2 --- /dev/null +++ b/src/HttpController/JobController.php @@ -0,0 +1,52 @@ +authenticationService->isUserAuthenticated() === false) { + return Response::createFoundRedirect('/'); + } + + $this->workerService->addTraktHistorySyncJob($this->authenticationService->getCurrentUserId()); + + $_SESSION['scheduledTraktHistorySync'] = true; + + return Response::create( + StatusCode::createSeeOther(), + null, + [Header::createLocation($_SERVER['HTTP_REFERER'])] + ); + } + + public function scheduleTraktRatingsSync() : Response + { + if ($this->authenticationService->isUserAuthenticated() === false) { + return Response::createFoundRedirect('/'); + } + + $this->workerService->addTraktRatingsSyncJob($this->authenticationService->getCurrentUserId()); + + $_SESSION['scheduledTraktRatingsSync'] = true; + + return Response::create( + StatusCode::createSeeOther(), + null, + [Header::createLocation($_SERVER['HTTP_REFERER'])] + ); + } +} diff --git a/src/HttpController/SettingsController.php b/src/HttpController/SettingsController.php index 0a303aa4..113d6b92 100644 --- a/src/HttpController/SettingsController.php +++ b/src/HttpController/SettingsController.php @@ -105,6 +105,8 @@ public function render() : Response $deletedUserHistory = empty($_SESSION['deletedUserHistory']) === false ? $_SESSION['deletedUserHistory'] : null; $deletedUserRatings = empty($_SESSION['deletedUserRatings']) === false ? $_SESSION['deletedUserRatings'] : null; $dateFormatUpdated = empty($_SESSION['dateFormatUpdated']) === false ? $_SESSION['dateFormatUpdated'] : null; + $scheduledTraktHistorySync = empty($_SESSION['scheduledTraktHistorySync']) === false ? $_SESSION['scheduledTraktHistorySync'] : null; + $scheduledTraktRatingsSync = empty($_SESSION['scheduledTraktRatingsSync']) === false ? $_SESSION['scheduledTraktRatingsSync'] : null; unset( $_SESSION['passwordUpdated'], $_SESSION['passwordErrorCurrentInvalid'], @@ -117,6 +119,8 @@ public function render() : Response $_SESSION['deletedUserHistory'], $_SESSION['deletedUserRatings'], $_SESSION['dateFormatUpdated'], + $_SESSION['scheduledTraktHistorySync'], + $_SESSION['scheduledTraktRatingsSync'], ); $user = $this->userApi->fetchUser($userId); @@ -126,13 +130,15 @@ public function render() : Response $this->twig->render('page/settings.html.twig', [ 'coreAccountChangesDisabled' => $user->areCoreAccountChangesDisabled(), 'dateFormats' => DateFormat::getFormats(), - 'dateFormatSelected' => $user->getDateFormat(), + 'dateFormatSelected' => $user->getDateFormatId(), 'dateFormatUpdated' => $dateFormatUpdated, 'plexWebhookUrl' => $user->getPlexWebhookId() ?? '-', 'passwordErrorNotEqual' => $passwordErrorNotEqual, 'passwordErrorMinLength' => $passwordErrorMinLength, 'passwordErrorCurrentInvalid' => $passwordErrorCurrentInvalid, 'traktCredentialsUpdated' => $traktCredentialsUpdated, + 'traktScheduleHistorySyncSuccessful' => $scheduledTraktHistorySync, + 'traktScheduleRatingsSyncSuccessful' => $scheduledTraktRatingsSync, 'importHistorySuccessful' => $importHistorySuccessful, 'importRatingsSuccessful' => $importRatingsSuccessful, 'passwordUpdated' => $passwordUpdated, diff --git a/templates/page/settings.html.twig b/templates/page/settings.html.twig index 61f281f3..d34074b1 100644 --- a/templates/page/settings.html.twig +++ b/templates/page/settings.html.twig @@ -119,6 +119,18 @@ + +
+ Schedule history sync + Schedule ratings sync + + {% if traktScheduleHistorySyncSuccessful == true or traktScheduleRatingsSyncSuccessful == true %} + + {% endif %} +

From fa7f6905d861bc95113f1ac4312c9f2cef368e3e Mon Sep 17 00:00:00 2001 From: Lee Peuker Date: Tue, 19 Jul 2022 22:08:09 +0200 Subject: [PATCH 3/6] Cleanup --- .dockerignore | 1 + .env.development.example | 2 +- .env.production.example | 2 +- build/php/Dockerfile | 2 +- {build/php/supervisor.d => settings/supervisor}/movary.conf | 0 5 files changed, 4 insertions(+), 3 deletions(-) rename {build/php/supervisor.d => settings/supervisor}/movary.conf (100%) diff --git a/.dockerignore b/.dockerignore index a25e8b14..020f11c3 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,2 +1,3 @@ +build/ tests/ tmp/* diff --git a/.env.development.example b/.env.development.example index 84566e41..77d46b82 100644 --- a/.env.development.example +++ b/.env.development.example @@ -3,7 +3,7 @@ ENV=development USER_ID=1000 HTTP_PORT=80 TIMEZONE="Europe/Berlin" -MIN_RUNTIME_IN_SECONDS_FOR_JOB_PROCESSING=10 +MIN_RUNTIME_IN_SECONDS_FOR_JOB_PROCESSING=15 # Database DATABASE_HOST="mysql" diff --git a/.env.production.example b/.env.production.example index 30fdaa3e..bad5e1d8 100644 --- a/.env.production.example +++ b/.env.production.example @@ -1,7 +1,7 @@ # Enviroment ENV=production TIMEZONE="Europe/Berlin" -MIN_RUNTIME_IN_SECONDS_FOR_JOB_PROCESSING=10 +MIN_RUNTIME_IN_SECONDS_FOR_JOB_PROCESSING=15 # Database DATABASE_HOST= diff --git a/build/php/Dockerfile b/build/php/Dockerfile index 46bafa24..e8824577 100644 --- a/build/php/Dockerfile +++ b/build/php/Dockerfile @@ -9,5 +9,5 @@ ARG APPLICATION_VERSION ENV APPLICATION_VERSION=${APPLICATION_VERSION} COPY --chown=application ./ ./ COPY .env.production.example .env -COPY build/php/supervisor.d/movary.conf /opt/docker/etc/supervisor.d/movary.conf +COPY settings/supervisor/movary.conf /opt/docker/etc/supervisor.d/movary.conf RUN composer install --no-dev diff --git a/build/php/supervisor.d/movary.conf b/settings/supervisor/movary.conf similarity index 100% rename from build/php/supervisor.d/movary.conf rename to settings/supervisor/movary.conf From ce2cd63ac6a47c792c43e79b06c601824fd7a882 Mon Sep 17 00:00:00 2001 From: Lee Peuker Date: Wed, 20 Jul 2022 16:48:36 +0200 Subject: [PATCH 4/6] Disable sync buttons if trakt credentials are not complete --- templates/page/settings.html.twig | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/templates/page/settings.html.twig b/templates/page/settings.html.twig index d34074b1..d39b835a 100644 --- a/templates/page/settings.html.twig +++ b/templates/page/settings.html.twig @@ -121,8 +121,8 @@
- Schedule history sync - Schedule ratings sync + Schedule history sync + Schedule ratings sync {% if traktScheduleHistorySyncSuccessful == true or traktScheduleRatingsSyncSuccessful == true %} {% endif %} - +
- Schedule history sync - Schedule ratings sync + Schedule history sync + Schedule ratings sync {% if traktScheduleHistorySyncSuccessful == true or traktScheduleRatingsSyncSuccessful == true %}