Skip to content

Commit

Permalink
Add buffer + shuffle and configurable concurrency to `RemoveExpiredLi…
Browse files Browse the repository at this point in the history
…nkedDevicesCommand`
  • Loading branch information
eager-signal authored Dec 21, 2023
1 parent 5d6bea5 commit f33a2eb
Showing 1 changed file with 30 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;

import io.micrometer.core.instrument.Metrics;
import io.micrometer.shaded.reactor.util.function.Tuple2;
import io.micrometer.shaded.reactor.util.function.Tuples;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import net.sourceforge.argparse4j.inf.Subparser;
Expand All @@ -23,9 +27,12 @@

public class RemoveExpiredLinkedDevicesCommand extends AbstractSinglePassCrawlAccountsCommand {

private static final int MAX_CONCURRENCY = 16;
private static final int DEFAULT_MAX_CONCURRENCY = 16;

private static final String DRY_RUN_ARGUMENT = "dry-run";
private static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";
private static final String BUFFER_ARGUMENT = "buffer";

private static final String REMOVED_DEVICES_COUNTER_NAME = name(RemoveExpiredLinkedDevicesCommand.class,
"removedDevices");
private static final String UPDATED_ACCOUNTS_COUNTER_NAME = name(RemoveExpiredLinkedDevicesCommand.class,
Expand All @@ -49,15 +56,36 @@ public void configure(final Subparser subparser) {
.required(false)
.setDefault(true)
.help("If true, don’t actually modify accounts with expired linked devices");

subparser.addArgument("--max-concurrency")
.type(Integer.class)
.dest(MAX_CONCURRENCY_ARGUMENT)
.setDefault(DEFAULT_MAX_CONCURRENCY)
.help("Max concurrency for DynamoDB operations");

subparser.addArgument("--buffer")
.type(Integer.class)
.dest(BUFFER_ARGUMENT)
.setDefault(16_384)
.help("Accounts to buffer");
}

@Override
protected void crawlAccounts(final Flux<Account> accounts) {

final boolean dryRun = getNamespace().getBoolean(DRY_RUN_ARGUMENT);
final int maxConcurrency = getNamespace().getInt(MAX_CONCURRENCY_ARGUMENT);
final int bufferSize = getNamespace().getInt(BUFFER_ARGUMENT);

accounts.map(a -> Tuples.of(a, getExpiredLinkedDeviceIds(a.getDevices())))
.filter(accountAndExpiredDevices -> !accountAndExpiredDevices.getT2().isEmpty())
.buffer(bufferSize)
.map(source -> {
final List<Tuple2<Account, Set<Byte>>> shuffled = new ArrayList<>(source);
Collections.shuffle(shuffled);
return shuffled;
})
.flatMapIterable(Function.identity())
.flatMap(accountAndExpiredDevices -> {
final Account account = accountAndExpiredDevices.getT1();
final Set<Byte> expiredDevices = accountAndExpiredDevices.getT2();
Expand All @@ -73,7 +101,7 @@ protected void crawlAccounts(final Flux<Account> accounts) {
return Mono.empty();
});

}, MAX_CONCURRENCY)
}, maxConcurrency)
.doOnNext(removedDevices -> {
Metrics.counter(REMOVED_DEVICES_COUNTER_NAME, "dryRun", String.valueOf(dryRun)).increment(removedDevices);
Metrics.counter(UPDATED_ACCOUNTS_COUNTER_NAME, "dryRun", String.valueOf(dryRun)).increment();
Expand Down

0 comments on commit f33a2eb

Please sign in to comment.