diff --git a/packages/homeserver/src/plugins/mongodb.ts b/packages/homeserver/src/plugins/mongodb.ts index 220d2098..4da99140 100644 --- a/packages/homeserver/src/plugins/mongodb.ts +++ b/packages/homeserver/src/plugins/mongodb.ts @@ -37,6 +37,11 @@ export const routerWithMongodb = (db: Db) => ); }; + const getEventsByIds = async (roomId: string, eventIds: string[]) => { + return eventsCollection + .find({ "event.room_id": roomId, "event._id": { $in: eventIds } }) + .toArray(); + }; const getDeepEarliestAndLatestEvents = async ( roomId: string, earliest_events: string[], @@ -165,6 +170,30 @@ export const routerWithMongodb = (db: Db) => return id; }; + const createEvent = async (event: EventBase) => { + const id = generateId(event); + await eventsCollection.insertOne({ + _id: id, + event, + }); + + return id; + }; + + const removeEventFromStaged = async (roomId: string, id: string) => { + await eventsCollection.updateOne( + { _id: id, "event.room_id": roomId }, + { $unset: { staged: 1 } }, + ); + }; + + const getOldestStagedEvent = async (roomId: string) => { + return eventsCollection.findOne( + { staged: true, "event.room_id": roomId }, + { sort: { "event.origin_server_ts": 1 } }, + ); + }; + return { serversCollection, getValidPublicKeyFromLocal, @@ -175,7 +204,12 @@ export const routerWithMongodb = (db: Db) => getMissingEventsByDeep, getLastEvent, getAuthEvents, + + removeEventFromStaged, + getEventsByIds, + getOldestStagedEvent, createStagingEvent, + createEvent, }; })(), ); diff --git a/packages/homeserver/src/procedures/processPDU.ts b/packages/homeserver/src/procedures/processPDU.ts new file mode 100644 index 00000000..ba5fcdad --- /dev/null +++ b/packages/homeserver/src/procedures/processPDU.ts @@ -0,0 +1,47 @@ +import type { EventBase } from "@hs/core/src/events/eventBase"; +import type { SignedJson } from "../signJson"; +import type { HashedEvent } from "../authentication"; +import type { EventStore } from "../plugins/mongodb"; + +export const processPDUsByRoomId = async ( + roomId: string, + pdus: SignedJson>[], + validatePdu: (pdu: SignedJson>) => Promise, + getEventsByIds: (roomId: string, eventIds: string[]) => Promise, + createStagingEvent: (event: EventBase) => Promise, + createEvent: (event: EventBase) => Promise, + processMissingEvents: (roomId: string) => Promise, + generateId: (pdu: SignedJson>) => string, +) => { + const resultPDUs = {} as { + [key: string]: Record; + }; + for (const pdu of pdus) { + const pid = generateId(pdu); + try { + await validatePdu(pdu); + resultPDUs[pid] = {}; + + const events = await getEventsByIds(roomId, pdu.prev_events); + + const missing = pdu.prev_events.filter( + (event) => !events.find((e) => e._id === event), + ); + + if (!missing.length) { + await createStagingEvent(pdu); + } else { + await createEvent(pdu); + } + } catch (error) { + resultPDUs[pid] = { error } as any; + } + void (async () => { + while (await processMissingEvents(roomId)); + })(); + } + + return { + pdus: resultPDUs, + }; +}; diff --git a/packages/homeserver/src/routes/federation/sendTransaction.spec.ts b/packages/homeserver/src/routes/federation/sendTransaction.spec.ts index 0cc68c09..96af74fa 100644 --- a/packages/homeserver/src/routes/federation/sendTransaction.spec.ts +++ b/packages/homeserver/src/routes/federation/sendTransaction.spec.ts @@ -51,6 +51,15 @@ describe("/send/:txnId", () => { createStagingEvent: async () => { return; }, + getEventsByIds: async () => { + return []; + }, + createEvent: async () => { + return; + }, + getOldestStagedEvent: async () => { + return; + }, serversCollection: { findOne: async () => { return; @@ -140,7 +149,9 @@ describe("/send/:txnId", () => { expect(resp.status).toBe(200); expect(data).toHaveProperty("pdus"); expect(data.pdus).toStrictEqual({ - [id]: {}, + pdus: { + [id]: {}, + }, }); }); @@ -188,7 +199,11 @@ describe("/send/:txnId", () => { expect(resp.status).toBe(200); expect(data).toHaveProperty("pdus"); expect(data.pdus).toStrictEqual({ - [id]: {}, + pdus: { + [id]: { + error: {}, + }, + }, }); }); }); @@ -238,6 +253,15 @@ describe("/send/:txnId using real case", () => { createStagingEvent: async () => { return; }, + getEventsByIds: async () => { + return []; + }, + createEvent: async () => { + return; + }, + getOldestStagedEvent: async () => { + return; + }, serversCollection: { findOne: async () => { return; @@ -307,7 +331,9 @@ describe("/send/:txnId using real case", () => { expect(resp.status).toBe(200); expect(data).toHaveProperty("pdus"); expect(data.pdus).toStrictEqual({ - [`${id}`]: {}, + pdus: { + [`${id}`]: {}, + }, }); }); }); diff --git a/packages/homeserver/src/routes/federation/sendTransaction.ts b/packages/homeserver/src/routes/federation/sendTransaction.ts index 408775e6..11604519 100644 --- a/packages/homeserver/src/routes/federation/sendTransaction.ts +++ b/packages/homeserver/src/routes/federation/sendTransaction.ts @@ -17,20 +17,32 @@ import { import { isConfigContext } from "../../plugins/isConfigContext"; import { MatrixError } from "../../errors"; import { isRoomMemberEvent } from "@hs/core/src/events/m.room.member"; +import { makeRequest } from "../../makeRequest"; +import { isMutexContext, routerWithMutex } from "../../plugins/mutex"; +import { processPDUsByRoomId } from "../../procedures/processPDU"; -export const sendTransactionRoute = new Elysia().put( - "/send/:txnId", - async ({ params, body, ...context }) => { +export const sendTransactionRoute = new Elysia() + .use(routerWithMutex) + .put("/send/:txnId", async ({ params, body, ...context }) => { if (!isConfigContext(context)) { throw new Error("No config context"); } if (!isMongodbContext(context)) { throw new Error("No mongodb context"); } + if (!isMutexContext(context)) { + throw new Error("No mutex context"); + } const { config, - mongo: { eventsCollection, createStagingEvent }, + mongo: { + getEventsByIds, + createStagingEvent, + createEvent, + removeEventFromStaged, + getOldestStagedEvent, + }, } = context; const { pdus, edus = [] } = body as any; @@ -39,7 +51,6 @@ export const sendTransactionRoute = new Elysia().put( throw new MatrixError("400", "Too many edus"); } - console.log("1"); const isValidPDU = ( pdu: any, ): pdu is SignedJson> => { @@ -160,27 +171,81 @@ export const sendTransactionRoute = new Elysia().put( } }; - const resultPDUs = {} as { - [key: string]: Record; + /** + * Based on the fetched events from the remote server, we check if there are any new events (that haven't been stored yet) + * @param fetchedEvents + * @returns + */ + + const getNewEvents = async ( + roomId: string, + fetchedEvents: EventBase[], + ) => { + const fetchedEventsIds = fetchedEvents.map(generateId); + const storedEvents = await getEventsByIds(roomId, fetchedEventsIds); + return fetchedEvents + .filter( + (event) => !storedEvents.find((e) => e._id === generateId(event)), + ) + .sort((a, b) => a.depth - b.depth); }; - for (const [roomId, pdus] of pdusByRoomId) { - // const roomVersion = getRoomVersion - for (const pdu of pdus) { - try { - await validatePdu(pdu); - resultPDUs[`${generateId(pdu)}`] = {}; - void createStagingEvent(pdu); - } catch (e) { - console.error("error validating pdu", e); - resultPDUs[`${generateId(pdu)}`] = e as any; - } + const processMissingEvents = async (roomId: string) => { + const lock = await context.mutex.request(roomId); + if (!lock) { + return false; + } + const event = await getOldestStagedEvent(roomId); + + if (!event) { + return false; } - } - return { - pdus: resultPDUs, + const { _id: pid, event: pdu } = event; + + const fetchedEvents = await makeRequest({ + method: "POST", + domain: pdu.origin, + uri: `/_matrix/federation/v1/get_missing_events/${pdu.room_id}`, + body: { + earliest_events: pdu.prev_events, + latest_events: [pid], + limit: 10, + min_depth: 10, + }, + signingName: config.name, + }); + + const newEvents = await getNewEvents(roomId, fetchedEvents.events); + // in theory, we have all the new events + await removeEventFromStaged(roomId, pid); + + for await (const event of newEvents) { + await createStagingEvent(event); + } + + await lock.release(); + + return true; }; + const result = { + pdus: {}, + }; + for await (const [roomId, pdus] of pdusByRoomId) { + Object.assign( + result.pdus, + await processPDUsByRoomId( + roomId, + pdus, + validatePdu, + getEventsByIds, + createStagingEvent, + createEvent, + processMissingEvents, + generateId, + ), + ); + } + return result; } - }, -); + });