Skip to content

Commit

Permalink
Merge pull request #61 from leepeuker/add-job-queue-system
Browse files Browse the repository at this point in the history
Add job proccessing
  • Loading branch information
leepeuker authored Jul 20, 2022
2 parents 0637a7b + ca080c8 commit 70da2e3
Show file tree
Hide file tree
Showing 20 changed files with 462 additions and 38 deletions.
1 change: 1 addition & 0 deletions .env.development.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ ENV=development
USER_ID=1000
HTTP_PORT=80
TIMEZONE="Europe/Berlin"
MIN_RUNTIME_IN_SECONDS_FOR_JOB_PROCESSING=15

# Database
DATABASE_HOST="mysql"
Expand Down
1 change: 1 addition & 0 deletions .env.production.example
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Enviroment
ENV=production
TIMEZONE="Europe/Berlin"
MIN_RUNTIME_IN_SECONDS_FOR_JOB_PROCESSING=15

# Database
DATABASE_HOST=
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ app_sync_tmdb:
app_sync_letterboxd:
make exec_app_cmd CMD="php bin/console.php letterboxd:sync $(CSV_PATH)"

app_jobs_process:
make exec_app_cmd CMD="php bin/console.php jobs:process"

# Tests
#######
test: test_phpcs test_psalm test_phpstan
Expand Down
50 changes: 27 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ Demo installation can be found [here](https://movary-demo.leepeuker.dev/) (login
2. [Install via docker](#install-via-docker)
3. [Important: First steps](#important-first-steps)
4. [Features](#features)
1. [Plex Scrobbler](#plex-scrobbler)
2. [Trakt.tv Sync](#trakttv-sync)
3. [Tmdb Sync](#tmdb-sync)
1. [Tmdb Sync](#tmdb-sync)
2. [Plex Scrobbler](#plex-scrobbler)
3. [Trakt.tv Sync](#trakttv-sync)
5. [Development](#development)

<a name="#about"></a>
Expand Down Expand Up @@ -111,6 +111,10 @@ DATABASE_PASSWORD=
DATABASE_DRIVER=pdo_mysql
DATABASE_CHARSET=utf8

# Minimum number of seconds the job processing worker has to run
# => the smallest possible timeperiode between processing two jobs
MIN_RUNTIME_IN_SECONDS_FOR_JOB_PROCESSING=15

# https://www.themoviedb.org/settings/api
TMDB_API_KEY=

Expand All @@ -127,6 +131,26 @@ their [docs](https://dockerfile.readthedocs.io/en/latest/content/DockerImages/do
## Features
<a name="#tmdb-sync"></a>
### tmdb sync
Update movie (meta) data with themoviedb.org information.
Make sure you have added the variables `TMDB_API_KEY` to the environment.
Example:
`docker exec movary php bin/console.php tmdb:sync`
**Flags:**
- `--hours`
Only movies which were last synced X hours or longer ago will be synced
- `--threshold`
Maximum number of movies to sync
<a name="#development"></a>
<a name="#plex-scrobbler"></a>
### Plex Scrobbler
Expand Down Expand Up @@ -162,26 +186,6 @@ Example (syncing history and ratings for user with id 1):
- `--ignore-cache`
Use if you want to sync everything from trakt regardless if there was a change since the last sync.
<a name="#tmdb-sync"></a>
### tmdb sync
Update movie (meta) data with themoviedb.org information.
Make sure you have added the variables `TMDB_API_KEY` to the environment.
Example:
`docker exec movary php bin/console.php tmdb:sync`
**Flags:**
- `--hours`
Only movies which were last synced X hours or longer ago will be synced
- `--threshold`
Maximum number of movies to sync
<a name="#development"></a>
## Development
### Setup
Expand Down
1 change: 1 addition & 0 deletions bin/console.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@
$application->add($container->get(Movary\Command\UserDelete::class));
$application->add($container->get(Movary\Command\UserUpdate::class));
$application->add($container->get(Movary\Command\UserList::class));
$application->add($container->get(Movary\Command\ProcessJobs::class));

$application->run();
1 change: 1 addition & 0 deletions bootstrap.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
\Movary\Command\DatabaseMigrationStatus::class => DI\factory([Factory::class, 'createDatabaseMigrationStatusCommand']),
\Movary\Command\DatabaseMigrationMigrate::class => DI\factory([Factory::class, 'createDatabaseMigrationMigrateCommand']),
\Movary\Command\DatabaseMigrationRollback::class => DI\factory([Factory::class, 'createDatabaseMigrationRollbackCommand']),
\Movary\Command\ProcessJobs::class => DI\factory([Factory::class, 'createProcessJobCommand']),
\Psr\Http\Client\ClientInterface::class => DI\factory([Factory::class, 'createHttpClient']),
\Psr\Log\LoggerInterface::class => DI\factory([Factory::class, 'createFileLogger']),
\Doctrine\DBAL\Connection::class => DI\factory([Factory::class, 'createDbConnection']),
Expand Down
1 change: 1 addition & 0 deletions build/php/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ ARG APPLICATION_VERSION
ENV APPLICATION_VERSION=${APPLICATION_VERSION}
COPY --chown=application ./ ./
COPY .env.production.example .env
COPY settings/supervisor/movary.conf /opt/docker/etc/supervisor.d/movary.conf
RUN composer install --no-dev
29 changes: 29 additions & 0 deletions db/migrations/20220719134322_AddJobQueueTable.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php declare(strict_types=1);

use Phinx\Migration\AbstractMigration;

final class AddJobQueueTable extends AbstractMigration
{
public function down() : void
{
$this->execute(
<<<SQL
DROP TABLE `job_queue`
SQL
);
}

public function up() : void
{
$this->execute(
<<<SQL
CREATE TABLE `job_queue` (
`id` INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,
`job` TEXT NOT NULL,
`created_at` DATETIME NOT NULL DEFAULT NOW(),
PRIMARY KEY (`id`)
) COLLATE="utf8mb4_unicode_ci" ENGINE=InnoDB
SQL
);
}
}
10 changes: 10 additions & 0 deletions settings/routes.php
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,16 @@
'/user/delete-account',
[\Movary\HttpController\SettingsController::class, 'deleteAccount']
);
$routeCollector->addRoute(
'GET',
'/jobs/schedule/trakt-history-sync',
[\Movary\HttpController\JobController::class, 'scheduleTraktHistorySync']
);
$routeCollector->addRoute(
'GET',
'/jobs/schedule/trakt-ratings-sync',
[\Movary\HttpController\JobController::class, 'scheduleTraktRatingsSync']
);
$routeCollector->addRoute(
'POST',
'/user/date-format',
Expand Down
12 changes: 12 additions & 0 deletions settings/supervisor/movary.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[program:movary]
command=/usr/local/bin/php /app/bin/console.php jobs:process
numprocs=1
user=application
autostart=true
autorestart=true
startsecs=1
startretries=10
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
2 changes: 1 addition & 1 deletion src/Application/Service/Tmdb/SyncMovies.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public function __construct(
) {
}

public function syncMovies(?int $maxAgeInHours, ?int $movieCountSyncThreshold) : void
public function syncMovies(?int $maxAgeInHours = null, ?int $movieCountSyncThreshold = null) : void
{
$movies = $this->movieApi->fetchAllOrderedByLastUpdatedAtTmdbAsc();

Expand Down
8 changes: 4 additions & 4 deletions src/Application/User/Entity.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ private function __construct(
private readonly string $passwordHash,
private readonly bool $areCoreAccountChangesDisabled,
private readonly ?string $plexWebhookUuid,
private readonly ?string $dateFormat,
private readonly ?int $dateFormatId,
private readonly ?string $TraktUserName,
private readonly ?string $TraktClientId,
) {
Expand All @@ -22,7 +22,7 @@ public static function createFromArray(array $data) : self
$data['password'],
(bool)$data['core_account_changes_disabled'],
$data['plex_webhook_uuid'],
$data['date_format'],
$data['date_format_id'],
$data['trakt_user_name'],
$data['trakt_client_id'],
);
Expand All @@ -33,9 +33,9 @@ public function areCoreAccountChangesDisabled() : bool
return $this->areCoreAccountChangesDisabled;
}

public function getDateFormat() : ?string
public function getDateFormatId() : ?int
{
return $this->dateFormat;
return $this->dateFormatId;
}

public function getId() : int
Expand Down
82 changes: 82 additions & 0 deletions src/Command/ProcessJobs.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?php declare(strict_types=1);

namespace Movary\Command;

use Movary\Worker;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

class ProcessJobs extends Command
{
private const OPTION_NAME_MIN_RUNTIME = 'minRuntime';

protected static $defaultName = 'jobs:process';

public function __construct(
private readonly Worker\Repository $repository,
private readonly Worker\Service $workerService,
private readonly LoggerInterface $logger,
private readonly ?int $minRuntimeInSeconds = null
) {
parent::__construct();
}

protected function configure() : void
{
$this
->setDescription('Process job from the queue.')
->addOption(self::OPTION_NAME_MIN_RUNTIME, 'minRuntime', InputOption::VALUE_REQUIRED, 'Minimum runtime of command.');
}

// phpcs:ignore Generic.CodeAnalysis.UnusedFunctionParameter
protected function execute(InputInterface $input, OutputInterface $output) : int
{
$minRuntime = $input->getOption(self::OPTION_NAME_MIN_RUNTIME) ?? $this->minRuntimeInSeconds;

$timeStart = microtime(true);

$this->generateOutput($output, 'Processing job...');

try {
$processedJobType = $this->processJob();
} catch (\Exception $e) {
$this->logger->error('Could not process job.', ['exception' => $e]);

return Command::FAILURE;
}

$processedMessage = 'Nothing to process.';
if ($processedJobType !== null) {
$processedMessage = 'Processed job of type: ' . $processedJobType;
}

$this->generateOutput($output, $processedMessage);

$missingTime = (int)$minRuntime - (microtime(true) - $timeStart);
if ($missingTime > 0) {
$waitTime = max((int)ceil($missingTime * 1000000), 0);

$this->generateOutput($output, 'Sleeping for ' . $waitTime / 1000000 . ' seconds to reach min runtime...');

usleep($waitTime);
}

return Command::SUCCESS;
}

private function processJob() : ?string
{
$job = $this->repository->fetchOldestJob();

if ($job === null) {
return null;
}

$this->workerService->processJob($job);

/** @noinspection PhpUnreachableStatementInspection */
return $job->getType();
}
}
16 changes: 16 additions & 0 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,20 @@ public static function createTwigFilesystemLoader() : Twig\Loader\FilesystemLoad
{
return new Twig\Loader\FilesystemLoader(__DIR__ . '/../templates');
}

public function createProcessJobCommand(ContainerInterface $container, Config $config) : Command\ProcessJobs
{
try {
$minRuntimeInSeconds = $config->getAsInt('MIN_RUNTIME_IN_SECONDS_FOR_JOB_PROCESSING');
} catch (\OutOfBoundsException) {
$minRuntimeInSeconds = null;
}

return new Command\ProcessJobs(
$container->get(Worker\Repository::class),
$container->get(Worker\Service::class),
$container->get(LoggerInterface::class),
$minRuntimeInSeconds,
);
}
}
52 changes: 52 additions & 0 deletions src/HttpController/JobController.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?php declare(strict_types=1);

namespace Movary\HttpController;

use Movary\Application\User\Service\Authentication;
use Movary\ValueObject\Http\Header;
use Movary\ValueObject\Http\Response;
use Movary\ValueObject\Http\StatusCode;
use Movary\Worker\Service;

class JobController
{
public function __construct(
private readonly Authentication $authenticationService,
private readonly Service $workerService
) {
}

public function scheduleTraktHistorySync() : Response
{
if ($this->authenticationService->isUserAuthenticated() === false) {
return Response::createFoundRedirect('/');
}

$this->workerService->addTraktHistorySyncJob($this->authenticationService->getCurrentUserId());

$_SESSION['scheduledTraktHistorySync'] = true;

return Response::create(
StatusCode::createSeeOther(),
null,
[Header::createLocation($_SERVER['HTTP_REFERER'])]
);
}

public function scheduleTraktRatingsSync() : Response
{
if ($this->authenticationService->isUserAuthenticated() === false) {
return Response::createFoundRedirect('/');
}

$this->workerService->addTraktRatingsSyncJob($this->authenticationService->getCurrentUserId());

$_SESSION['scheduledTraktRatingsSync'] = true;

return Response::create(
StatusCode::createSeeOther(),
null,
[Header::createLocation($_SERVER['HTTP_REFERER'])]
);
}
}
Loading

0 comments on commit 70da2e3

Please sign in to comment.