21
21
import java .util .concurrent .ExecutorService ;
22
22
import java .util .concurrent .TimeUnit ;
23
23
import java .util .concurrent .TimeoutException ;
24
+ import java .util .function .BiFunction ;
24
25
import java .util .stream .Collectors ;
25
26
import java .util .stream .IntStream ;
26
27
import javax .annotation .Nullable ;
41
42
public class MessagesManager {
42
43
43
44
private static final int RESULT_SET_CHUNK_SIZE = 100 ;
44
- final String GET_MESSAGES_FOR_DEVICE_FLUX_NAME = name (MessagesManager .class , "getMessagesForDevice" );
45
+ private final static String GET_MESSAGES_FOR_DEVICE_FLUX_NAME = name (MessagesManager .class , "getMessagesForDevice" );
46
+ // shared payloads have some overhead, which sometimes exceeds the size if we just wrote the content directly
47
+ private static final int MULTI_RECIPIENT_MESSAGE_MINIMUM_SIZE_FOR_SHARED_PAYLOAD = 150 ;
45
48
46
49
private static final Logger logger = LoggerFactory .getLogger (MessagesManager .class );
47
50
@@ -139,17 +142,50 @@ public CompletableFuture<Map<Account, Map<Byte, Boolean>>> insertMultiRecipientM
139
142
140
143
final long serverTimestamp = clock .millis ();
141
144
142
- return insertSharedMultiRecipientMessagePayload (multiRecipientMessage )
143
- .thenCompose (sharedMrmKey -> {
144
- final Envelope prototypeMessage = Envelope .newBuilder ()
145
- .setType (Envelope .Type .UNIDENTIFIED_SENDER )
146
- .setClientTimestamp (clientTimestamp == 0 ? serverTimestamp : clientTimestamp )
147
- .setServerTimestamp (serverTimestamp )
148
- .setStory (isStory )
149
- .setEphemeral (isEphemeral )
150
- .setUrgent (isUrgent )
151
- .setSharedMrmKey (ByteString .copyFrom (sharedMrmKey ))
145
+ final Envelope .Builder prototypeMessageBuilder = Envelope .newBuilder ()
146
+ .setType (Envelope .Type .UNIDENTIFIED_SENDER )
147
+ .setClientTimestamp (clientTimestamp == 0 ? serverTimestamp : clientTimestamp )
148
+ .setServerTimestamp (serverTimestamp )
149
+ .setStory (isStory )
150
+ .setEphemeral (isEphemeral )
151
+ .setUrgent (isUrgent );
152
+
153
+ final CompletableFuture <Envelope > prototypeMessageFuture ;
154
+ final BiFunction <ServiceIdentifier , Envelope , Envelope > recipientEnvelopeBuilder ;
155
+
156
+ // A shortcut -- message sizes do not vary by recipient in the current SealedSenderMultiRecipientMessage version
157
+ final int perRecipientMessageSize = multiRecipientMessage .getRecipients ().values ().stream ().findAny ()
158
+ .map (multiRecipientMessage ::messageSizeForRecipient )
159
+ .orElse (0 );
160
+
161
+ multiRecipientMessage .messageSizeForRecipient (
162
+ multiRecipientMessage .getRecipients ().values ().iterator ().next ());
163
+ if (perRecipientMessageSize >= MULTI_RECIPIENT_MESSAGE_MINIMUM_SIZE_FOR_SHARED_PAYLOAD ) {
164
+
165
+ // the message is large enough that the shared payload overhead is worth it, so insert into the cache
166
+ prototypeMessageFuture = insertSharedMultiRecipientMessagePayload ((multiRecipientMessage ))
167
+ .thenApply (sharedMrmKey -> prototypeMessageBuilder
168
+ .setSharedMrmKey (ByteString .copyFrom (sharedMrmKey ))
169
+ .build ());
170
+
171
+ recipientEnvelopeBuilder = (serviceIdentifier , prototype ) -> prototype .toBuilder ()
172
+ .setDestinationServiceId (serviceIdentifier .toServiceIdentifierString ())
173
+ .build ();
174
+
175
+ } else {
176
+
177
+ prototypeMessageFuture = CompletableFuture .completedFuture (prototypeMessageBuilder .build ());
178
+
179
+ recipientEnvelopeBuilder = (serviceIdentifier , prototype ) ->
180
+ prototype .toBuilder ()
181
+ .setDestinationServiceId (serviceIdentifier .toServiceIdentifierString ())
182
+ .setContent (ByteString .copyFrom (multiRecipientMessage .messageForRecipient (
183
+ multiRecipientMessage .getRecipients ().get (serviceIdentifier .toLibsignal ()))))
152
184
.build ();
185
+ }
186
+
187
+ return prototypeMessageFuture
188
+ .thenCompose (prototypeMessage -> {
153
189
154
190
final Map <Account , Map <Byte , Boolean >> clientPresenceByAccountAndDevice = new ConcurrentHashMap <>();
155
191
@@ -162,9 +198,7 @@ public CompletableFuture<Map<Account, Map<Byte, Boolean>>> insertMultiRecipientM
162
198
163
199
return insertAsync (resolvedRecipients .get (recipient ).getIdentifier (IdentityType .ACI ),
164
200
IntStream .range (0 , devices .length ).mapToObj (i -> devices [i ])
165
- .collect (Collectors .toMap (deviceId -> deviceId , deviceId -> prototypeMessage .toBuilder ()
166
- .setDestinationServiceId (serviceIdentifier .toServiceIdentifierString ())
167
- .build ())))
201
+ .collect (Collectors .toMap (deviceId -> deviceId , deviceId -> recipientEnvelopeBuilder .apply (serviceIdentifier , prototypeMessage ))))
168
202
.thenAccept (clientPresenceByDeviceId ->
169
203
clientPresenceByAccountAndDevice .put (resolvedRecipients .get (recipient ),
170
204
clientPresenceByDeviceId ));
0 commit comments