Skip to content

Commit

Permalink
Merge pull request #96 from leepeuker/improve-job-processing
Browse files Browse the repository at this point in the history
Update job processing and add job queue page
  • Loading branch information
leepeuker authored Aug 15, 2022
2 parents 6aca463 + 436eee7 commit fc9d883
Show file tree
Hide file tree
Showing 17 changed files with 483 additions and 150 deletions.
51 changes: 51 additions & 0 deletions db/migrations/20220815113051_UpdateJobQueueTable.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?php declare(strict_types=1);

use Phinx\Migration\AbstractMigration;

final class UpdateJobQueueTable extends AbstractMigration
{
public function down() : void
{
$this->execute(
<<<SQL
DROP TABLE `job_queue`
SQL
);

$this->execute(
<<<SQL
CREATE TABLE `job_queue` (
`id` INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,
`job` TEXT NOT NULL,
`created_at` DATETIME NOT NULL DEFAULT NOW(),
PRIMARY KEY (`id`)
) COLLATE="utf8mb4_unicode_ci" ENGINE=InnoDB
SQL
);
}

public function up() : void
{
$this->execute(
<<<SQL
DROP TABLE `job_queue`
SQL
);

$this->execute(
<<<SQL
CREATE TABLE `job_queue` (
`id` INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,
`job_type` TEXT NOT NULL,
`job_status` TEXT NOT NULL,
`user_id` INT(10) UNSIGNED DEFAULT NULL,
`parameters` TEXT DEFAULT NULL,
`updated_at` DATETIME DEFAULT NULL ON UPDATE NOW(),
`created_at` DATETIME NOT NULL DEFAULT NOW(),
PRIMARY KEY (`id`),
FOREIGN KEY (`user_id`) REFERENCES user (`id`)
) COLLATE="utf8mb4_unicode_ci" ENGINE=InnoDB
SQL
);
}
}
5 changes: 5 additions & 0 deletions settings/routes.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@
'/{username:.+}/most-watched-directors',
[\Movary\HttpController\MostWatchedDirectorsController::class, 'renderPage']
);
$routeCollector->addRoute(
'GET',
'/job-queue',
[\Movary\HttpController\JobController::class, 'renderQueuePage']
);
$routeCollector->addRoute(
'GET',
'/log-movie',
Expand Down
13 changes: 12 additions & 1 deletion src/Application/Service/Letterboxd/ImportHistory.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Movary\Application\Service\Letterboxd\ValueObject\CsvLineHistory;
use Movary\Application\Service\Tmdb;
use Movary\Application\Service\Trakt\PlaysPerDateDtoList;
use Movary\ValueObject\Job;
use Psr\Log\LoggerInterface;

class ImportHistory
Expand Down Expand Up @@ -40,7 +41,7 @@ public function execute(int $userId, string $historyCsvPath, bool $overwriteExis
$watchDatesToImport[$movie->getId()] = PlaysPerDateDtoList::create();
}

$watchDatesToImport[$movie->getId()]->incrementPlaysForDate($csvLineHistory->getDate ());
$watchDatesToImport[$movie->getId()]->incrementPlaysForDate($csvLineHistory->getDate());
}

foreach ($watchDates->getRecords() as $watchDate) {
Expand All @@ -67,6 +68,16 @@ public function execute(int $userId, string $historyCsvPath, bool $overwriteExis
unlink($historyCsvPath);
}

public function executeJob(Job $job) : void
{
$userId = $job->getUserId();
if ($userId === null) {
throw new \RuntimeException('Missing userId');
}

$this->execute($userId, $job->getParameters()['importFile']);
}

public function fetchMovieByLetterboxdUri(string $letterboxdUri) : Movie\Entity
{
$letterboxdId = basename($letterboxdUri);
Expand Down
11 changes: 11 additions & 0 deletions src/Application/Service/Letterboxd/ImportRatings.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Movary\Application\Movie;
use Movary\Application\Service\Letterboxd\ValueObject\CsvLineRating;
use Movary\Application\SyncLog;
use Movary\ValueObject\Job;
use Movary\ValueObject\PersonalRating;
use Psr\Log\LoggerInterface;

Expand Down Expand Up @@ -62,6 +63,16 @@ public function execute(int $userId, string $ratingsCsvPath, bool $verbose = fal
unlink($ratingsCsvPath);
}

public function executeJob(Job $job) : void
{
$userId = $job->getUserId();
if ($userId === null) {
throw new \RuntimeException('Missing userId');
}

$this->execute($userId, $job->getParameters()['importFile']);
}

public function findMovieByLetterboxdUri(string $letterboxdUri) : ?Movie\Entity
{
$letterboxdId = basename($letterboxdUri);
Expand Down
11 changes: 11 additions & 0 deletions src/Application/Service/Trakt/SyncRatings.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use Movary\Application;
use Movary\Application\Service\Trakt\Exception\TraktClientIdNotSet;
use Movary\Application\Service\Trakt\Exception\TraktUserNameNotSet;
use Movary\ValueObject\Job;
use Movary\ValueObject\PersonalRating;

class SyncRatings
Expand Down Expand Up @@ -55,4 +56,14 @@ public function execute(int $userId, bool $overwriteExistingData = false) : void

$this->scanLogRepository->insertLogForTraktSync();
}

public function executeJob(Job $job) : void
{
$userId = $job->getUserId();
if ($userId === null) {
throw new \RuntimeException('Missing userId');
}

$this->execute($userId);
}
}
11 changes: 11 additions & 0 deletions src/Application/Service/Trakt/SyncWatchedMovies.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Movary\Application\Service\Trakt\Exception\TraktClientIdNotSet;
use Movary\Application\Service\Trakt\Exception\TraktUserNameNotSet;
use Movary\ValueObject\Date;
use Movary\ValueObject\Job;
use Psr\Log\LoggerInterface;

class SyncWatchedMovies
Expand Down Expand Up @@ -67,6 +68,16 @@ public function execute(int $userId, bool $overwriteExistingData = false, bool $
$this->scanLogRepository->insertLogForTraktSync();
}

public function executeJob(Job $job) : void
{
$userId = $job->getUserId();
if ($userId === null) {
throw new \RuntimeException('Missing userId');
}

$this->execute($userId);
}

private function findOrCreateMovieLocally(Api\Trakt\ValueObject\Movie\Dto $watchedMovie) : Application\Movie\Entity
{
$traktId = $watchedMovie->getTraktId();
Expand Down
19 changes: 15 additions & 4 deletions src/Command/ProcessJobs.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace Movary\Command;

use Movary\ValueObject\JobStatus;
use Movary\ValueObject\JobType;
use Movary\Worker;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Input\InputInterface;
Expand Down Expand Up @@ -66,17 +68,26 @@ protected function execute(InputInterface $input, OutputInterface $output) : int
return Command::SUCCESS;
}

private function processJob() : ?string
private function processJob() : ?JobType
{
$job = $this->repository->fetchOldestJob();
$job = $this->repository->fetchOldestWaitingJob();

if ($job === null) {
return null;
}

$this->workerService->processJob($job);
try {
$this->workerService->setJobToInProgress($job->getId());

$this->workerService->processJob($job);

$this->repository->updateJobStatus($job->getId(), JobStatus::createDone());
} catch (\Exception $e) {
$this->repository->updateJobStatus($job->getId(), JobStatus::createFailed());

throw $e;
}

/** @noinspection PhpUnreachableStatementInspection */
return $job->getType();
}
}
23 changes: 21 additions & 2 deletions src/HttpController/JobController.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Movary\ValueObject\Http\Response;
use Movary\ValueObject\Http\StatusCode;
use Movary\Worker\Service;
use Twig\Environment;

class JobController
{
Expand All @@ -18,9 +19,27 @@ public function __construct(
private readonly Service $workerService,
private readonly ImportHistoryFileValidator $letterboxdImportHistoryFileValidator,
private readonly ImportRatingsFileValidator $letterboxdImportRatingsFileValidator,
private readonly Environment $twig,
) {
}

public function renderQueuePage() : Response
{
if ($this->authenticationService->isUserAuthenticated() === false) {
return Response::createFoundRedirect('/');
}

$jobs = $this->workerService->fetchJobsForStatusPage($this->authenticationService->getCurrentUserId());

return Response::create(
StatusCode::createOk(),
$this->twig->render(
'page/job-queue.html.twig',
['jobs' => $jobs]
),
);
}

public function scheduleLetterboxdHistoryImport(Request $request) : Response
{
if ($this->authenticationService->isUserAuthenticated() === false) {
Expand Down Expand Up @@ -109,7 +128,7 @@ public function scheduleTraktHistorySync() : Response
return Response::createFoundRedirect('/');
}

$this->workerService->addTraktHistorySyncJob($this->authenticationService->getCurrentUserId());
$this->workerService->addTraktImportHistoryJob($this->authenticationService->getCurrentUserId());

$_SESSION['scheduledTraktHistorySync'] = true;

Expand All @@ -126,7 +145,7 @@ public function scheduleTraktRatingsSync() : Response
return Response::createFoundRedirect('/');
}

$this->workerService->addTraktRatingsSyncJob($this->authenticationService->getCurrentUserId());
$this->workerService->addTraktImportRatingsJob($this->authenticationService->getCurrentUserId());

$_SESSION['scheduledTraktRatingsSync'] = true;

Expand Down
67 changes: 67 additions & 0 deletions src/ValueObject/Job.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
<?php declare(strict_types=1);

namespace Movary\ValueObject;

use Movary\Util\Json;

class Job
{
private function __construct(
private readonly int $id,
private readonly JobType $type,
private readonly JobStatus $status,
private readonly ?int $userId,
private readonly array $parameters,
private readonly ?DateTime $updatedAt,
private readonly DateTime $createdAt,
) {
}

public static function createFromArray(array $data) : self
{
return new self(
$data['id'],
JobType::createFromString($data['job_type']),
JobStatus::createFromString($data['job_status']),
$data['user_id'],
$data['parameters'] === null ? [] : Json::decode($data['parameters']),
$data['updated_at'] === null ? null : DateTime::createFromString($data['updated_at']),
DateTime::createFromString($data['created_at']),
);
}

public function getCreatedAt() : DateTime
{
return $this->createdAt;
}

public function getId() : int
{
return $this->id;
}

public function getParameters() : array
{
return $this->parameters;
}

public function getStatus() : JobStatus
{
return $this->status;
}

public function getType() : JobType
{
return $this->type;
}

public function getUpdatedAt() : ?DateTime
{
return $this->updatedAt;
}

public function getUserId() : ?int
{
return $this->userId;
}
}
33 changes: 33 additions & 0 deletions src/ValueObject/JobList.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php declare(strict_types=1);

namespace Movary\ValueObject;

use Movary\AbstractList;

/**
* @method Job[] getIterator()
* @psalm-suppress ImplementedReturnTypeMismatch
*/
class JobList extends AbstractList
{
public static function create() : self
{
return new self();
}

public static function createFromArray(array $data) : self
{
$list = new self();

foreach ($data as $movie) {
$list->add(Job::createFromArray($movie));
}

return $list;
}

private function add(Job $dto) : void
{
$this->data[] = $dto;
}
}
Loading

0 comments on commit fc9d883

Please sign in to comment.