Skip to content

Commit

Permalink
added lock when updating actor via UpdateActorHandler (#620)
Browse files Browse the repository at this point in the history
while working on incoming activity, the UpdateActorMessage could be
fired more than once per actor, and even with #602 applied there could
still be multiple UpdateActorHandler running at the same time updating
the same actor, before the added guard could kick in

this adds a bit of locking in UpdateActorHandler per actor url to try
and ensure that only one handler is actually updating an actor at a
time, and discards any following update actor message that comes after

added the force parameter to UpdateActorMessage and handler to force
actor update regardless of #602 last updated time (still have to get
past the lock though)

also use symfony rate limiter to throttle auto dispatching the
UpdateActorMessage once per 5 minutes, allowing for redispatch recovery
if the previously dispatched message+handler falied to update the actor,
as it appears that multiple UpdateActorMessage will get dispatched when
the actor is eligible for updating e.g. last fetched over an hour ago

the `mbin:actor:update` command also got some jank in there fixed
as they also used UpdateActorMessage to do its job, and the force
parameter is also used here
  • Loading branch information
asdfzdfj authored May 22, 2024
1 parent 06df5f6 commit 4d53b84
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 31 deletions.
4 changes: 4 additions & 0 deletions config/packages/rate_limiter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,7 @@ framework:
policy: 'fixed_window'
limit: 4
interval: '1 day'
ap_update_actor:
policy: 'sliding_window'
limit: 1
interval: '5 minutes'
20 changes: 11 additions & 9 deletions src/Command/ActorUpdateCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

#[AsCommand(
name: 'mbin:actor:update',
description: 'This command will allow you to update remote user info.',
description: 'This command will allow you to update remote actor (user/magazine) info.',
)]
class ActorUpdateCommand extends Command
{
Expand All @@ -32,28 +32,30 @@ public function __construct(

protected function configure(): void
{
$this->addArgument('user', InputArgument::OPTIONAL, 'Argument description')
->addOption('users', null, InputOption::VALUE_NONE)
->addOption('magazines', null, InputOption::VALUE_NONE);
$this->addArgument('user', InputArgument::OPTIONAL, 'AP url of the actor to update')
->addOption('users', null, InputOption::VALUE_NONE, 'update *all* known users that needs updating')
->addOption('magazines', null, InputOption::VALUE_NONE, 'update *all* known magazines that needs updating')
->addOption('force', null, InputOption::VALUE_NONE, 'force actor update even if they are recently updated');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
$userArg = $input->getArgument('user');
$force = (bool) $input->getOption('force');

if ($input->getOption('users')) {
if ($userArg) {
$this->bus->dispatch(new UpdateActorMessage($userArg, $force));
} elseif ($input->getOption('users')) {
foreach ($this->repository->findRemoteForUpdate() as $u) {
$this->bus->dispatch(new UpdateActorMessage($u->apProfileId));
$this->bus->dispatch(new UpdateActorMessage($u->apProfileId, $force));
$io->info($u->username);
}
} elseif ($input->getOption('magazines')) {
foreach ($this->magazineRepository->findRemoteForUpdate() as $u) {
$this->bus->dispatch(new UpdateActorMessage($u->apProfileId));
$this->bus->dispatch(new UpdateActorMessage($u->apProfileId, $force));
$io->info($u->name);
}
} elseif ($userArg) {
$this->bus->dispatch(new UpdateActorMessage($userArg));
}

$io->success('Done.');
Expand Down
2 changes: 1 addition & 1 deletion src/Message/ActivityPub/UpdateActorMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

class UpdateActorMessage implements ActivityPubResolveInterface
{
public function __construct(public string $actorUrl)
public function __construct(public string $actorUrl, public bool $force = false)
{
}
}
41 changes: 38 additions & 3 deletions src/MessageHandler/ActivityPub/UpdateActorHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,53 @@
namespace App\MessageHandler\ActivityPub;

use App\Message\ActivityPub\UpdateActorMessage;
use App\Repository\MagazineRepository;
use App\Repository\UserRepository;
use App\Service\ActivityPubManager;
use Psr\Log\LoggerInterface;
use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
class UpdateActorHandler
{
public function __construct(private readonly ActivityPubManager $manager)
{
public function __construct(
private readonly ActivityPubManager $manager,
private readonly LockFactory $lockFactory,
private readonly UserRepository $userRepository,
private readonly MagazineRepository $magazineRepository,
private readonly LoggerInterface $logger,
) {
}

public function __invoke(UpdateActorMessage $message): void
{
$this->manager->updateActor($message->actorUrl);
$actorUrl = $message->actorUrl;
$lock = $this->lockFactory->createLock('update_actor_'.hash('sha256', $actorUrl), 60);

if (!$lock->acquire()) {
$this->logger->debug(
'not updating actor at {url}: ongoing actor update is already in progress',
['url' => $actorUrl]
);

return;
}

$actor = $this->userRepository->findOneBy(['apProfileId' => $actorUrl])
?? $this->magazineRepository->findOneBy(['apProfileId' => $actorUrl]);

if ($actor) {
if ($message->force || $actor->apFetchedAt < (new \DateTime())->modify('-1 hour')) {
$this->manager->updateActor($actorUrl);
} else {
$this->logger->debug('not updating actor {url}: last updated is recent: {fetched}', [
'url' => $actorUrl,
'fetched' => $actor->apFetchedAt,
]);
}
}

$lock->release();
}
}
1 change: 1 addition & 0 deletions src/Repository/MagazineRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ public function findRemoteForUpdate(): array
->andWhere('m.apDomain IS NULL')
->andWhere('m.apDeletedAt IS NULL')
->andWhere('m.apTimeoutAt IS NULL')
->addOrderBy('m.apFetchedAt', 'ASC')
->setMaxResults(1000)
->getQuery()
->getResult();
Expand Down
1 change: 1 addition & 0 deletions src/Repository/UserRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ public function findRemoteForUpdate(): array
->andWhere('u.apDomain IS NULL')
->andWhere('u.apDeletedAt IS NULL')
->andWhere('u.apTimeoutAt IS NULL')
->addOrderBy('u.apFetchedAt', 'ASC')
->setMaxResults(1000)
->getQuery()
->getResult();
Expand Down
40 changes: 22 additions & 18 deletions src/Service/ActivityPubManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
use League\HTMLToMarkdown\HtmlConverter;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\RateLimiter\RateLimiterFactory;
use Symfony\Component\Routing\Generator\UrlGeneratorInterface;

class ActivityPubManager
Expand All @@ -60,6 +61,7 @@ public function __construct(
private readonly UrlGeneratorInterface $urlGenerator,
private readonly MessageBusInterface $bus,
private readonly LoggerInterface $logger,
private readonly RateLimiterFactory $apUpdateActorLimiter,
) {
}

Expand Down Expand Up @@ -133,7 +135,7 @@ public function findActorOrCreate(?string $actorUrlOrHandle): null|User|Magazine
$user = $this->userRepository->findOneBy(['username' => ltrim($actorUrl, '@')]);
if ($user instanceof User) {
if ($user->apId && (!$user->apFetchedAt || $user->apFetchedAt->modify('+1 hour') < (new \DateTime()))) {
$this->bus->dispatch(new UpdateActorMessage($user->apProfileId));
$this->dispatchUpdateActor($user->apProfileId);
}

return $user;
Expand Down Expand Up @@ -166,10 +168,7 @@ public function findActorOrCreate(?string $actorUrlOrHandle): null|User|Magazine
$user = $this->createUser($actorUrl);
} else {
if (!$user->apFetchedAt || $user->apFetchedAt->modify('+1 hour') < (new \DateTime())) {
try {
$this->bus->dispatch(new UpdateActorMessage($user->apProfileId));
} catch (\Exception $e) {
}
$this->dispatchUpdateActor($user->apProfileId);
}
}

Expand All @@ -185,10 +184,7 @@ public function findActorOrCreate(?string $actorUrlOrHandle): null|User|Magazine
$magazine = $this->createMagazine($actorUrl);
} else {
if (!$magazine->apFetchedAt || $magazine->apFetchedAt->modify('+1 hour') < (new \DateTime())) {
try {
$this->bus->dispatch(new UpdateActorMessage($magazine->apProfileId));
} catch (\Exception $e) {
}
$this->dispatchUpdateActor($magazine->apProfileId);
}
}

Expand All @@ -212,6 +208,22 @@ public function findActorOrCreate(?string $actorUrlOrHandle): null|User|Magazine
return null;
}

public function dispatchUpdateActor(string $actorUrl)
{
$limiter = $this->apUpdateActorLimiter
->create($actorUrl)
->consume(1);

if ($limiter->isAccepted()) {
$this->bus->dispatch(new UpdateActorMessage($actorUrl));
} else {
$this->logger->debug(
'not dispatching updating actor for {actor}: one has been dispatched recently',
['actor' => $actorUrl, 'retry' => $limiter->getRetryAfter()]
);
}
}

/**
* Try to find an existing actor or create a new one if the actor doesn't yet exists.
*
Expand Down Expand Up @@ -291,10 +303,6 @@ public function updateUser(string $actorUrl): ?User
$this->logger->info('updating user {name}', ['name' => $actorUrl]);
$user = $this->userRepository->findOneBy(['apProfileId' => $actorUrl]);

if ($user instanceof User && $user->apFetchedAt > (new \DateTime())->modify('-1 hour')) {
return $user;
}

$actor = $this->apHttpClient->getActorObject($actorUrl);
if (!$actor || !\is_array($actor)) {
return null;
Expand Down Expand Up @@ -426,10 +434,6 @@ public function updateMagazine(string $actorUrl): ?Magazine
$this->logger->info('updating magazine "{magName}"', ['magName' => $actorUrl]);
$magazine = $this->magazineRepository->findOneBy(['apProfileId' => $actorUrl]);

if ($magazine instanceof Magazine && $magazine->apFetchedAt > (new \DateTime())->modify('-1 hour')) {
return $magazine;
}

$actor = $this->apHttpClient->getActorObject($actorUrl);
// Check if actor isn't empty (not set/null/empty array/etc.)

Expand Down Expand Up @@ -664,7 +668,7 @@ public function findOrCreateMagazineByToAndCC(array $object): Magazine|null
$potentialGroups = self::getReceivers($object);
$magazine = $this->magazineRepository->findByApGroupProfileId($potentialGroups);
if ($magazine and $magazine->apId && (!$magazine->apFetchedAt || $magazine->apFetchedAt->modify('+1 Day') < (new \DateTime()))) {
$this->bus->dispatch(new UpdateActorMessage($magazine->apProfileId));
$this->dispatchUpdateActor($magazine->apPublicUrl);
}

if (null === $magazine) {
Expand Down

0 comments on commit 4d53b84

Please sign in to comment.