diff --git a/src/ToDeviceMessageQueue.ts b/src/ToDeviceMessageQueue.ts index 168584894f..7326405e92 100644 --- a/src/ToDeviceMessageQueue.ts +++ b/src/ToDeviceMessageQueue.ts @@ -55,7 +55,14 @@ export class ToDeviceMessageQueue { this.client.removeListener(ClientEvent.Sync, this.onResumedSync); } - public async queueBatch(batch: ToDeviceBatch): Promise { + /** + * queues a batch of to-device messages for sending. The batch is split into + * smaller batches of size MAX_BATCH_SIZE, and each batch is given a unique + * transaction ID. + * @param batch the total (not split) batch of to-device messages. + * @param sendCallback a callback that is called once all batches are sent. + */ + public async queueBatch(batch: ToDeviceBatch, sendCallback?: (result: Error | undefined) => void): Promise { const batches: ToDeviceBatchWithTxnId[] = []; for (let i = 0; i < batch.batch.length; i += MAX_BATCH_SIZE) { const batchWithTxnId = { @@ -74,10 +81,15 @@ export class ToDeviceMessageQueue { } await this.client.store.saveToDeviceBatches(batches); - this.sendQueue(); + this.sendQueue().then(sendCallback); } - public sendQueue = async (): Promise => { + /** + * sends the queues to device messages currently saved in client.store. + * @returns resolves to undefined if the queue was sent successfully, or an error if + * the queue could not be sent. + */ + public sendQueue = async (): Promise => { if (this.retryTimeout !== null) clearTimeout(this.retryTimeout); this.retryTimeout = null; @@ -114,13 +126,14 @@ export class ToDeviceMessageQueue { } else { logger.info("Automatic retry limit reached for to-device messages."); } - return; + return Error("max to devices retries reached"); } logger.info(`Failed to send batch of to-device messages. Will retry in ${retryDelay}ms`, e); this.retryTimeout = setTimeout(this.sendQueue, retryDelay); } finally { this.sending = false; + return undefined; } }; diff --git a/src/client.ts b/src/client.ts index 6974f35fa4..2340a5fdbf 100644 --- a/src/client.ts +++ b/src/client.ts @@ -52,7 +52,16 @@ import { type GroupCallEventHandlerEventHandlerMap, } from "./webrtc/groupCallEventHandler.ts"; import * as utils from "./utils.ts"; -import { deepCompare, defer, noUnsafeEventProps, type QueryDict, replaceParam, safeSet, sleep } from "./utils.ts"; +import { + deepCompare, + defer, + MapWithDefault, + noUnsafeEventProps, + type QueryDict, + replaceParam, + safeSet, + sleep, +} from "./utils.ts"; import { Direction, EventTimeline } from "./models/event-timeline.ts"; import { type IActionsObject, PushProcessor } from "./pushprocessor.ts"; import { AutoDiscovery, type AutoDiscoveryAction } from "./autodiscovery.ts"; @@ -7948,7 +7957,8 @@ export class MatrixClient extends TypedEventEmitter> = new MapWithDefault(() => new Map()); + for (const item of batch.batch) { + contentMap.getOrCreate(item.userId).set(item.deviceId, item.payload); + } + + return new Promise((resolve, reject) => { + this.queueToDevice(batch, (result) => { + if (result === undefined) { + resolve; + } else if (result) { + reject(result); + } + }); + }); } /** @@ -7971,9 +7992,11 @@ export class MatrixClient extends TypedEventEmitter { - return this.toDeviceMessageQueue.queueBatch(batch); + public queueToDevice(batch: ToDeviceBatch, sendCallback?: (result: Error | undefined) => void): Promise { + return this.toDeviceMessageQueue.queueBatch(batch, sendCallback); } /**