From 4d53b842a1bd0d16f83b43a6b6bcfec52dbcd267 Mon Sep 17 00:00:00 2001 From: asdfzdfj Date: Wed, 22 May 2024 20:13:35 +0700 Subject: [PATCH] added lock when updating actor via UpdateActorHandler (#620) while working on incoming activity, the UpdateActorMessage could be fired more than once per actor, and even with #602 applied there could still be multiple UpdateActorHandler running at the same time updating the same actor, before the added guard could kick in this adds a bit of locking in UpdateActorHandler per actor url to try and ensure that only one handler is actually updating an actor at a time, and discards any following update actor message that comes after added the force parameter to UpdateActorMessage and handler to force actor update regardless of #602 last updated time (still have to get past the lock though) also use symfony rate limiter to throttle auto dispatching the UpdateActorMessage once per 5 minutes, allowing for redispatch recovery if the previously dispatched message+handler falied to update the actor, as it appears that multiple UpdateActorMessage will get dispatched when the actor is eligible for updating e.g. last fetched over an hour ago the `mbin:actor:update` command also got some jank in there fixed as they also used UpdateActorMessage to do its job, and the force parameter is also used here --- config/packages/rate_limiter.yaml | 4 ++ src/Command/ActorUpdateCommand.php | 20 +++++---- .../ActivityPub/UpdateActorMessage.php | 2 +- .../ActivityPub/UpdateActorHandler.php | 41 +++++++++++++++++-- src/Repository/MagazineRepository.php | 1 + src/Repository/UserRepository.php | 1 + src/Service/ActivityPubManager.php | 40 ++++++++++-------- 7 files changed, 78 insertions(+), 31 deletions(-) diff --git a/config/packages/rate_limiter.yaml b/config/packages/rate_limiter.yaml index 7401a8a7a..e5db56ecc 100644 --- a/config/packages/rate_limiter.yaml +++ b/config/packages/rate_limiter.yaml @@ -116,3 +116,7 @@ framework: policy: 'fixed_window' limit: 4 interval: '1 day' + ap_update_actor: + policy: 'sliding_window' + limit: 1 + interval: '5 minutes' diff --git a/src/Command/ActorUpdateCommand.php b/src/Command/ActorUpdateCommand.php index 8f3bb226e..f84c0cfb9 100644 --- a/src/Command/ActorUpdateCommand.php +++ b/src/Command/ActorUpdateCommand.php @@ -18,7 +18,7 @@ #[AsCommand( name: 'mbin:actor:update', - description: 'This command will allow you to update remote user info.', + description: 'This command will allow you to update remote actor (user/magazine) info.', )] class ActorUpdateCommand extends Command { @@ -32,28 +32,30 @@ public function __construct( protected function configure(): void { - $this->addArgument('user', InputArgument::OPTIONAL, 'Argument description') - ->addOption('users', null, InputOption::VALUE_NONE) - ->addOption('magazines', null, InputOption::VALUE_NONE); + $this->addArgument('user', InputArgument::OPTIONAL, 'AP url of the actor to update') + ->addOption('users', null, InputOption::VALUE_NONE, 'update *all* known users that needs updating') + ->addOption('magazines', null, InputOption::VALUE_NONE, 'update *all* known magazines that needs updating') + ->addOption('force', null, InputOption::VALUE_NONE, 'force actor update even if they are recently updated'); } protected function execute(InputInterface $input, OutputInterface $output): int { $io = new SymfonyStyle($input, $output); $userArg = $input->getArgument('user'); + $force = (bool) $input->getOption('force'); - if ($input->getOption('users')) { + if ($userArg) { + $this->bus->dispatch(new UpdateActorMessage($userArg, $force)); + } elseif ($input->getOption('users')) { foreach ($this->repository->findRemoteForUpdate() as $u) { - $this->bus->dispatch(new UpdateActorMessage($u->apProfileId)); + $this->bus->dispatch(new UpdateActorMessage($u->apProfileId, $force)); $io->info($u->username); } } elseif ($input->getOption('magazines')) { foreach ($this->magazineRepository->findRemoteForUpdate() as $u) { - $this->bus->dispatch(new UpdateActorMessage($u->apProfileId)); + $this->bus->dispatch(new UpdateActorMessage($u->apProfileId, $force)); $io->info($u->name); } - } elseif ($userArg) { - $this->bus->dispatch(new UpdateActorMessage($userArg)); } $io->success('Done.'); diff --git a/src/Message/ActivityPub/UpdateActorMessage.php b/src/Message/ActivityPub/UpdateActorMessage.php index b76bc73a1..26c9e5946 100644 --- a/src/Message/ActivityPub/UpdateActorMessage.php +++ b/src/Message/ActivityPub/UpdateActorMessage.php @@ -8,7 +8,7 @@ class UpdateActorMessage implements ActivityPubResolveInterface { - public function __construct(public string $actorUrl) + public function __construct(public string $actorUrl, public bool $force = false) { } } diff --git a/src/MessageHandler/ActivityPub/UpdateActorHandler.php b/src/MessageHandler/ActivityPub/UpdateActorHandler.php index f18cb6000..c13fb530d 100644 --- a/src/MessageHandler/ActivityPub/UpdateActorHandler.php +++ b/src/MessageHandler/ActivityPub/UpdateActorHandler.php @@ -5,18 +5,53 @@ namespace App\MessageHandler\ActivityPub; use App\Message\ActivityPub\UpdateActorMessage; +use App\Repository\MagazineRepository; +use App\Repository\UserRepository; use App\Service\ActivityPubManager; +use Psr\Log\LoggerInterface; +use Symfony\Component\Lock\LockFactory; use Symfony\Component\Messenger\Attribute\AsMessageHandler; #[AsMessageHandler] class UpdateActorHandler { - public function __construct(private readonly ActivityPubManager $manager) - { + public function __construct( + private readonly ActivityPubManager $manager, + private readonly LockFactory $lockFactory, + private readonly UserRepository $userRepository, + private readonly MagazineRepository $magazineRepository, + private readonly LoggerInterface $logger, + ) { } public function __invoke(UpdateActorMessage $message): void { - $this->manager->updateActor($message->actorUrl); + $actorUrl = $message->actorUrl; + $lock = $this->lockFactory->createLock('update_actor_'.hash('sha256', $actorUrl), 60); + + if (!$lock->acquire()) { + $this->logger->debug( + 'not updating actor at {url}: ongoing actor update is already in progress', + ['url' => $actorUrl] + ); + + return; + } + + $actor = $this->userRepository->findOneBy(['apProfileId' => $actorUrl]) + ?? $this->magazineRepository->findOneBy(['apProfileId' => $actorUrl]); + + if ($actor) { + if ($message->force || $actor->apFetchedAt < (new \DateTime())->modify('-1 hour')) { + $this->manager->updateActor($actorUrl); + } else { + $this->logger->debug('not updating actor {url}: last updated is recent: {fetched}', [ + 'url' => $actorUrl, + 'fetched' => $actor->apFetchedAt, + ]); + } + } + + $lock->release(); } } diff --git a/src/Repository/MagazineRepository.php b/src/Repository/MagazineRepository.php index 7c62c4fb2..d2763ff31 100644 --- a/src/Repository/MagazineRepository.php +++ b/src/Repository/MagazineRepository.php @@ -524,6 +524,7 @@ public function findRemoteForUpdate(): array ->andWhere('m.apDomain IS NULL') ->andWhere('m.apDeletedAt IS NULL') ->andWhere('m.apTimeoutAt IS NULL') + ->addOrderBy('m.apFetchedAt', 'ASC') ->setMaxResults(1000) ->getQuery() ->getResult(); diff --git a/src/Repository/UserRepository.php b/src/Repository/UserRepository.php index 65f9ceb53..56f7b8e49 100644 --- a/src/Repository/UserRepository.php +++ b/src/Repository/UserRepository.php @@ -349,6 +349,7 @@ public function findRemoteForUpdate(): array ->andWhere('u.apDomain IS NULL') ->andWhere('u.apDeletedAt IS NULL') ->andWhere('u.apTimeoutAt IS NULL') + ->addOrderBy('u.apFetchedAt', 'ASC') ->setMaxResults(1000) ->getQuery() ->getResult(); diff --git a/src/Service/ActivityPubManager.php b/src/Service/ActivityPubManager.php index 91bc49f56..a15eb4495 100644 --- a/src/Service/ActivityPubManager.php +++ b/src/Service/ActivityPubManager.php @@ -37,6 +37,7 @@ use League\HTMLToMarkdown\HtmlConverter; use Psr\Log\LoggerInterface; use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\RateLimiter\RateLimiterFactory; use Symfony\Component\Routing\Generator\UrlGeneratorInterface; class ActivityPubManager @@ -60,6 +61,7 @@ public function __construct( private readonly UrlGeneratorInterface $urlGenerator, private readonly MessageBusInterface $bus, private readonly LoggerInterface $logger, + private readonly RateLimiterFactory $apUpdateActorLimiter, ) { } @@ -133,7 +135,7 @@ public function findActorOrCreate(?string $actorUrlOrHandle): null|User|Magazine $user = $this->userRepository->findOneBy(['username' => ltrim($actorUrl, '@')]); if ($user instanceof User) { if ($user->apId && (!$user->apFetchedAt || $user->apFetchedAt->modify('+1 hour') < (new \DateTime()))) { - $this->bus->dispatch(new UpdateActorMessage($user->apProfileId)); + $this->dispatchUpdateActor($user->apProfileId); } return $user; @@ -166,10 +168,7 @@ public function findActorOrCreate(?string $actorUrlOrHandle): null|User|Magazine $user = $this->createUser($actorUrl); } else { if (!$user->apFetchedAt || $user->apFetchedAt->modify('+1 hour') < (new \DateTime())) { - try { - $this->bus->dispatch(new UpdateActorMessage($user->apProfileId)); - } catch (\Exception $e) { - } + $this->dispatchUpdateActor($user->apProfileId); } } @@ -185,10 +184,7 @@ public function findActorOrCreate(?string $actorUrlOrHandle): null|User|Magazine $magazine = $this->createMagazine($actorUrl); } else { if (!$magazine->apFetchedAt || $magazine->apFetchedAt->modify('+1 hour') < (new \DateTime())) { - try { - $this->bus->dispatch(new UpdateActorMessage($magazine->apProfileId)); - } catch (\Exception $e) { - } + $this->dispatchUpdateActor($magazine->apProfileId); } } @@ -212,6 +208,22 @@ public function findActorOrCreate(?string $actorUrlOrHandle): null|User|Magazine return null; } + public function dispatchUpdateActor(string $actorUrl) + { + $limiter = $this->apUpdateActorLimiter + ->create($actorUrl) + ->consume(1); + + if ($limiter->isAccepted()) { + $this->bus->dispatch(new UpdateActorMessage($actorUrl)); + } else { + $this->logger->debug( + 'not dispatching updating actor for {actor}: one has been dispatched recently', + ['actor' => $actorUrl, 'retry' => $limiter->getRetryAfter()] + ); + } + } + /** * Try to find an existing actor or create a new one if the actor doesn't yet exists. * @@ -291,10 +303,6 @@ public function updateUser(string $actorUrl): ?User $this->logger->info('updating user {name}', ['name' => $actorUrl]); $user = $this->userRepository->findOneBy(['apProfileId' => $actorUrl]); - if ($user instanceof User && $user->apFetchedAt > (new \DateTime())->modify('-1 hour')) { - return $user; - } - $actor = $this->apHttpClient->getActorObject($actorUrl); if (!$actor || !\is_array($actor)) { return null; @@ -426,10 +434,6 @@ public function updateMagazine(string $actorUrl): ?Magazine $this->logger->info('updating magazine "{magName}"', ['magName' => $actorUrl]); $magazine = $this->magazineRepository->findOneBy(['apProfileId' => $actorUrl]); - if ($magazine instanceof Magazine && $magazine->apFetchedAt > (new \DateTime())->modify('-1 hour')) { - return $magazine; - } - $actor = $this->apHttpClient->getActorObject($actorUrl); // Check if actor isn't empty (not set/null/empty array/etc.) @@ -664,7 +668,7 @@ public function findOrCreateMagazineByToAndCC(array $object): Magazine|null $potentialGroups = self::getReceivers($object); $magazine = $this->magazineRepository->findByApGroupProfileId($potentialGroups); if ($magazine and $magazine->apId && (!$magazine->apFetchedAt || $magazine->apFetchedAt->modify('+1 Day') < (new \DateTime()))) { - $this->bus->dispatch(new UpdateActorMessage($magazine->apProfileId)); + $this->dispatchUpdateActor($magazine->apPublicUrl); } if (null === $magazine) {