Skip to content

Commit cdb651b

Browse files
authoredApr 28, 2025··
Add commands for removing devices without PQ keys
1 parent 91a36f4 commit cdb651b

9 files changed

+837
-0
lines changed
 

‎service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,13 +266,16 @@
266266
import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand;
267267
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
268268
import org.whispersystems.textsecuregcm.workers.IdleDeviceNotificationSchedulerFactory;
269+
import org.whispersystems.textsecuregcm.workers.LockAccountsWithoutPqKeysCommand;
269270
import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand;
270271
import org.whispersystems.textsecuregcm.workers.NotifyIdleDevicesCommand;
271272
import org.whispersystems.textsecuregcm.workers.ProcessScheduledJobsServiceCommand;
273+
import org.whispersystems.textsecuregcm.workers.RemoveAccountsWithoutPqKeysCommand;
272274
import org.whispersystems.textsecuregcm.workers.RemoveExpiredAccountsCommand;
273275
import org.whispersystems.textsecuregcm.workers.RemoveExpiredBackupsCommand;
274276
import org.whispersystems.textsecuregcm.workers.RemoveExpiredLinkedDevicesCommand;
275277
import org.whispersystems.textsecuregcm.workers.RemoveExpiredUsernameHoldsCommand;
278+
import org.whispersystems.textsecuregcm.workers.RemoveLinkedDevicesWithoutPqKeysCommand;
276279
import org.whispersystems.textsecuregcm.workers.ScheduledApnPushNotificationSenderServiceCommand;
277280
import org.whispersystems.textsecuregcm.workers.ServerVersionCommand;
278281
import org.whispersystems.textsecuregcm.workers.SetRequestLoggingEnabledTask;
@@ -335,6 +338,10 @@ public void initialize(final Bootstrap<WhisperServerConfiguration> bootstrap) {
335338
bootstrap.addCommand(new ProcessScheduledJobsServiceCommand("process-idle-device-notification-jobs",
336339
"Processes scheduled jobs to send notifications to idle devices",
337340
new IdleDeviceNotificationSchedulerFactory()));
341+
342+
bootstrap.addCommand(new RemoveLinkedDevicesWithoutPqKeysCommand());
343+
bootstrap.addCommand(new LockAccountsWithoutPqKeysCommand());
344+
bootstrap.addCommand(new RemoveAccountsWithoutPqKeysCommand());
338345
}
339346

340347
@Override
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright 2025 Signal Messenger, LLC
3+
* SPDX-License-Identifier: AGPL-3.0-only
4+
*/
5+
6+
package org.whispersystems.textsecuregcm.workers;
7+
8+
import com.google.common.annotations.VisibleForTesting;
9+
import io.micrometer.core.instrument.Metrics;
10+
import net.sourceforge.argparse4j.inf.Subparser;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
import org.whispersystems.textsecuregcm.identity.IdentityType;
14+
import org.whispersystems.textsecuregcm.metrics.DevicePlatformUtil;
15+
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
16+
import org.whispersystems.textsecuregcm.storage.Account;
17+
import org.whispersystems.textsecuregcm.storage.AccountsManager;
18+
import reactor.core.publisher.Flux;
19+
import reactor.core.publisher.Mono;
20+
import reactor.util.retry.Retry;
21+
import java.time.Duration;
22+
23+
public class LockAccountsWithoutPqKeysCommand extends AbstractSinglePassCrawlAccountsCommand {
24+
25+
@VisibleForTesting
26+
static final String DRY_RUN_ARGUMENT = "dry-run";
27+
28+
@VisibleForTesting
29+
static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";
30+
31+
@VisibleForTesting
32+
static final String RETRIES_ARGUMENT = "retries";
33+
34+
private static final String LOCKED_ACCOUNT_COUNTER_NAME =
35+
MetricsUtil.name(LockAccountsWithoutPqKeysCommand.class, "lockedAccount");
36+
37+
private static final Logger log = LoggerFactory.getLogger(LockAccountsWithoutPqKeysCommand.class);
38+
39+
public LockAccountsWithoutPqKeysCommand() {
40+
super("lock-accounts-without-pq-keys", "Locks accounts with primary devices that don't have PQ keys");
41+
}
42+
43+
@Override
44+
public void configure(final Subparser subparser) {
45+
super.configure(subparser);
46+
47+
subparser.addArgument("--dry-run")
48+
.type(Boolean.class)
49+
.dest(DRY_RUN_ARGUMENT)
50+
.required(false)
51+
.setDefault(true)
52+
.help("If true, don’t actually lock accounts with expired linked devices");
53+
54+
subparser.addArgument("--max-concurrency")
55+
.type(Integer.class)
56+
.dest(MAX_CONCURRENCY_ARGUMENT)
57+
.setDefault(16)
58+
.help("Max concurrency for DynamoDB operations");
59+
60+
subparser.addArgument("--retries")
61+
.type(Integer.class)
62+
.dest(RETRIES_ARGUMENT)
63+
.setDefault(3)
64+
.help("Maximum number of DynamoDB retries permitted per device");
65+
}
66+
67+
@Override
68+
protected void crawlAccounts(final Flux<Account> accounts) {
69+
final boolean dryRun = getNamespace().getBoolean(DRY_RUN_ARGUMENT);
70+
final int maxConcurrency = getNamespace().getInt(MAX_CONCURRENCY_ARGUMENT);
71+
final int maxRetries = getNamespace().getInt(RETRIES_ARGUMENT);
72+
73+
final AccountsManager accountsManager = getCommandDependencies().accountsManager();
74+
final PqKeysUtil pqKeysUtil = new PqKeysUtil(getCommandDependencies().keysManager(), maxConcurrency, maxRetries);
75+
76+
accounts
77+
.transform(pqKeysUtil::getAccountsWithoutPqKeys)
78+
.flatMap(accountWithoutPqKeys -> {
79+
final String platform = DevicePlatformUtil.getDevicePlatform(accountWithoutPqKeys.getPrimaryDevice())
80+
.map(Enum::name)
81+
.orElse("unknown");
82+
83+
return dryRun
84+
? Mono.just(platform)
85+
: Mono.fromFuture(() -> accountsManager.updateAsync(accountWithoutPqKeys, Account::lockAuthTokenHash))
86+
.retryWhen(Retry.backoff(maxRetries, Duration.ofSeconds(1))
87+
.onRetryExhaustedThrow((spec, rs) -> rs.failure()))
88+
.thenReturn(platform)
89+
.onErrorResume(throwable -> {
90+
log.warn("Failed to lock account without PQ keys {}", accountWithoutPqKeys.getIdentifier(IdentityType.ACI), throwable);
91+
return Mono.empty();
92+
});
93+
})
94+
.doOnNext(deletedAccountPlatform -> {
95+
Metrics.counter(LOCKED_ACCOUNT_COUNTER_NAME,
96+
"dryRun", String.valueOf(dryRun),
97+
"platform", deletedAccountPlatform)
98+
.increment();
99+
})
100+
.then()
101+
.block();
102+
}
103+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2025 Signal Messenger, LLC
3+
* SPDX-License-Identifier: AGPL-3.0-only
4+
*/
5+
6+
package org.whispersystems.textsecuregcm.workers;
7+
8+
import java.time.Duration;
9+
import java.util.Optional;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
import org.whispersystems.textsecuregcm.identity.IdentityType;
13+
import org.whispersystems.textsecuregcm.storage.Account;
14+
import org.whispersystems.textsecuregcm.storage.Device;
15+
import org.whispersystems.textsecuregcm.storage.KeysManager;
16+
import reactor.core.publisher.Flux;
17+
import reactor.core.publisher.Mono;
18+
import reactor.util.retry.Retry;
19+
20+
class PqKeysUtil {
21+
22+
private final KeysManager keysManager;
23+
private final int maxConcurrency;
24+
private final int maxRetries;
25+
26+
private static final Logger log = LoggerFactory.getLogger(PqKeysUtil.class);
27+
28+
PqKeysUtil(final KeysManager keysManager, final int maxConcurrency, final int maxRetries) {
29+
this.keysManager = keysManager;
30+
this.maxConcurrency = maxConcurrency;
31+
this.maxRetries = maxRetries;
32+
}
33+
34+
public Flux<Account> getAccountsWithoutPqKeys(final Flux<Account> accounts) {
35+
return accounts.flatMap(account -> Mono.fromFuture(
36+
() -> keysManager.getLastResort(account.getIdentifier(IdentityType.ACI), Device.PRIMARY_ID))
37+
.retryWhen(Retry.backoff(maxRetries, Duration.ofSeconds(1))
38+
.onRetryExhaustedThrow((spec, rs) -> rs.failure()))
39+
.onErrorResume(throwable -> {
40+
log.warn("Failed to get last-resort key for {}", account.getIdentifier(IdentityType.ACI), throwable);
41+
return Mono.empty();
42+
})
43+
.filter(Optional::isEmpty)
44+
.map(ignored -> account),
45+
maxConcurrency);
46+
}
47+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright 2025 Signal Messenger, LLC
3+
* SPDX-License-Identifier: AGPL-3.0-only
4+
*/
5+
6+
package org.whispersystems.textsecuregcm.workers;
7+
8+
import com.google.common.annotations.VisibleForTesting;
9+
import io.micrometer.core.instrument.Metrics;
10+
import java.time.Duration;
11+
import net.sourceforge.argparse4j.inf.Subparser;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
import org.whispersystems.textsecuregcm.identity.IdentityType;
15+
import org.whispersystems.textsecuregcm.metrics.DevicePlatformUtil;
16+
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
17+
import org.whispersystems.textsecuregcm.storage.Account;
18+
import org.whispersystems.textsecuregcm.storage.AccountsManager;
19+
import reactor.core.publisher.Flux;
20+
import reactor.core.publisher.Mono;
21+
import reactor.util.retry.Retry;
22+
23+
public class RemoveAccountsWithoutPqKeysCommand extends AbstractSinglePassCrawlAccountsCommand {
24+
25+
@VisibleForTesting
26+
static final String DRY_RUN_ARGUMENT = "dry-run";
27+
28+
@VisibleForTesting
29+
static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";
30+
31+
@VisibleForTesting
32+
static final String RETRIES_ARGUMENT = "retries";
33+
34+
@VisibleForTesting
35+
static final String MAX_ACCOUNTS_ARGUMENT = "max-accounts";
36+
37+
private static final String REMOVED_ACCOUNT_COUNTER_NAME =
38+
MetricsUtil.name(RemoveAccountsWithoutPqKeysCommand.class, "removedAccount");
39+
40+
private static final Logger log = LoggerFactory.getLogger(RemoveAccountsWithoutPqKeysCommand.class);
41+
42+
public RemoveAccountsWithoutPqKeysCommand() {
43+
super("remove-accounts-without-pq-keys", "Removes accounts with primary devices that don't have PQ keys");
44+
}
45+
46+
@Override
47+
public void configure(final Subparser subparser) {
48+
super.configure(subparser);
49+
50+
subparser.addArgument("--dry-run")
51+
.type(Boolean.class)
52+
.dest(DRY_RUN_ARGUMENT)
53+
.required(false)
54+
.setDefault(true)
55+
.help("If true, don’t actually modify accounts with expired linked devices");
56+
57+
subparser.addArgument("--max-concurrency")
58+
.type(Integer.class)
59+
.dest(MAX_CONCURRENCY_ARGUMENT)
60+
.setDefault(16)
61+
.help("Max concurrency for DynamoDB operations");
62+
63+
subparser.addArgument("--retries")
64+
.type(Integer.class)
65+
.dest(RETRIES_ARGUMENT)
66+
.setDefault(3)
67+
.help("Maximum number of DynamoDB retries permitted per device");
68+
69+
subparser.addArgument("--max-accounts")
70+
.type(Integer.class)
71+
.required(true)
72+
.dest(MAX_ACCOUNTS_ARGUMENT)
73+
.help("Maximum number of accounts to remove per run");
74+
}
75+
76+
@Override
77+
protected void crawlAccounts(final Flux<Account> accounts) {
78+
final boolean dryRun = getNamespace().getBoolean(DRY_RUN_ARGUMENT);
79+
final int maxConcurrency = getNamespace().getInt(MAX_CONCURRENCY_ARGUMENT);
80+
final int maxRetries = getNamespace().getInt(RETRIES_ARGUMENT);
81+
final int maxAccounts = getNamespace().getInt(MAX_ACCOUNTS_ARGUMENT);
82+
83+
final AccountsManager accountsManager = getCommandDependencies().accountsManager();
84+
final PqKeysUtil pqKeysUtil = new PqKeysUtil(getCommandDependencies().keysManager(), maxConcurrency, maxRetries);
85+
86+
accounts
87+
.transform(pqKeysUtil::getAccountsWithoutPqKeys)
88+
.take(maxAccounts)
89+
.filter(accountWithoutPqKeys -> {
90+
if (!accountWithoutPqKeys.hasLockedCredentials()) {
91+
log.warn("Account {} is not locked", accountWithoutPqKeys.getIdentifier(IdentityType.ACI));
92+
}
93+
94+
return accountWithoutPqKeys.hasLockedCredentials();
95+
})
96+
.flatMap(accountWithoutPqKeys -> {
97+
final String platform = DevicePlatformUtil.getDevicePlatform(accountWithoutPqKeys.getPrimaryDevice())
98+
.map(Enum::name)
99+
.orElse("unknown");
100+
101+
return dryRun
102+
? Mono.just(platform)
103+
: Mono.fromFuture(() -> accountsManager.delete(accountWithoutPqKeys, AccountsManager.DeletionReason.ADMIN_DELETED))
104+
.retryWhen(Retry.backoff(maxRetries, Duration.ofSeconds(1))
105+
.onRetryExhaustedThrow((spec, rs) -> rs.failure()))
106+
.thenReturn(platform)
107+
.onErrorResume(throwable -> {
108+
log.warn("Failed to remove account without PQ keys {}", accountWithoutPqKeys.getIdentifier(IdentityType.ACI), throwable);
109+
return Mono.empty();
110+
});
111+
})
112+
.doOnNext(deletedAccountPlatform -> {
113+
Metrics.counter(REMOVED_ACCOUNT_COUNTER_NAME,
114+
"dryRun", String.valueOf(dryRun),
115+
"platform", deletedAccountPlatform)
116+
.increment();
117+
})
118+
.then()
119+
.block();
120+
}
121+
}

0 commit comments

Comments
 (0)
Please sign in to comment.