Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slightly refactor latestEventsUpdater #3272

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
a4e111c
Let doUpdateLatestEvents return the updates to send to NATS
S7evinK Nov 23, 2023
53c474f
Move makeOutputNewRoomEvent
S7evinK Nov 23, 2023
8bbd940
Remove TransactionID from struct
S7evinK Nov 23, 2023
06064e0
Move sendAsServer
S7evinK Nov 23, 2023
416dbce
No naked returns
S7evinK Nov 23, 2023
2047cca
Move roomInfo
S7evinK Nov 23, 2023
7999ef0
Move lastEventIDSent
S7evinK Nov 23, 2023
0f74cbf
Move historyVisibility
S7evinK Nov 23, 2023
7db3e9f
Move context
S7evinK Nov 23, 2023
94960c5
Move SetLatestEvents
S7evinK Nov 24, 2023
2f99680
Move checking if the event has been sent as well
S7evinK Nov 24, 2023
4fec392
Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/la…
S7evinK Nov 24, 2023
1432857
Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/la…
S7evinK Nov 25, 2023
4ae7335
Be more verbose when logging state resets
S7evinK Nov 29, 2023
a165781
Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/la…
S7evinK Nov 29, 2023
d5204ef
Merge branch 'main' into s7evink/latest-event-updater
S7evinK Dec 13, 2023
2c81b06
Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/la…
S7evinK Dec 29, 2023
925843d
Try to fix state reset sentry messages, maybe?
S7evinK Dec 29, 2023
329d15e
Revert "Try to fix state reset sentry messages, maybe?"
S7evinK Dec 29, 2023
9e2afb5
Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/la…
S7evinK Jan 22, 2024
928f3a0
Merge branch 'main' of github.com:matrix-org/dendrite into s7evink/la…
S7evinK Jul 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 95 additions & 94 deletions roomserver/internal/input/input_latest_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,15 @@ import (
"fmt"

"github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"

"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/storage/shared"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)

// updateLatestEvents updates the list of latest events for this room in the database and writes the
Expand Down Expand Up @@ -58,7 +57,7 @@ func (r *Inputer) updateLatestEvents(
transactionID *api.TransactionID,
rewritesState bool,
historyVisibility gomatrixserverlib.HistoryVisibility,
) (err error) {
) error {
trace, ctx := internal.StartRegion(ctx, "updateLatestEvents")
defer trace.EndRegion()

Expand All @@ -70,44 +69,72 @@ func (r *Inputer) updateLatestEvents(

defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err)

// If the event has already been written to the output log then we
// don't need to do anything, as we've handled it already.
hasBeenSent, err := updater.HasEventBeenSent(stateAtEvent.EventNID)
if err != nil {
return fmt.Errorf("u.updater.HasEventBeenSent: %w", err)
}
if hasBeenSent {
return nil
}

u := latestEventsUpdater{
ctx: ctx,
api: r,
updater: updater,
roomInfo: roomInfo,
stateAtEvent: stateAtEvent,
event: event,
sendAsServer: sendAsServer,
transactionID: transactionID,
rewritesState: rewritesState,
historyVisibility: historyVisibility,
api: r,
updater: updater,
stateAtEvent: stateAtEvent,
event: event,
rewritesState: rewritesState,
}

if err = u.doUpdateLatestEvents(); err != nil {
var updates []api.OutputEvent
updates, err = u.doUpdateLatestEvents(ctx, roomInfo)
if err != nil {
return fmt.Errorf("u.doUpdateLatestEvents: %w", err)
}

update, err := u.makeOutputNewRoomEvent(ctx, transactionID, sendAsServer, updater.LastEventIDSent(), historyVisibility)
if err != nil {
return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err)
}
updates = append(updates, *update)

// Send the event to the output logs.
// We do this inside the database transaction to ensure that we only mark an event as sent if we sent it.
// (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but
// the write to the output log succeeds)
// TODO: This assumes that writing the event to the output log is synchronous. It should be possible to
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
// necessary bookkeeping we'll keep the event sending synchronous for now.
if len(updates) > 0 {
if err = u.api.OutputProducer.ProduceRoomEvents(u.event.RoomID().String(), updates); err != nil {
return fmt.Errorf("u.api.WriteOutputEvents: %w", err)
}

if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil {
return fmt.Errorf("u.updater.MarkEventAsSent: %w", err)
}
}

if err = u.updater.SetLatestEvents(roomInfo.RoomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil {
return fmt.Errorf("u.updater.SetLatestEvents: %w", err)
}

succeeded = true
return
return nil
}

// latestEventsUpdater tracks the state used to update the latest events in the
// room. It mostly just ferries state between the various function calls.
// The state could be passed using function arguments, but it becomes impractical
// when there are so many variables to pass around.
type latestEventsUpdater struct {
ctx context.Context
api *Inputer
updater *shared.RoomUpdater
roomInfo *types.RoomInfo
stateAtEvent types.StateAtEvent
event gomatrixserverlib.PDU
transactionID *api.TransactionID
rewritesState bool
// Which server to send this event as.
sendAsServer string
// The eventID of the event that was processed before this one.
lastEventIDSent string
// The latest events in the room after processing this event.
oldLatest types.StateAtEventAndReferences
latest types.StateAtEventAndReferences
Expand All @@ -122,13 +149,9 @@ type latestEventsUpdater struct {
// The snapshots of current state before and after processing this event
oldStateNID types.StateSnapshotNID
newStateNID types.StateSnapshotNID
// The history visibility of the event itself (from the state before the event).
historyVisibility gomatrixserverlib.HistoryVisibility
}

func (u *latestEventsUpdater) doUpdateLatestEvents() error {
u.lastEventIDSent = u.updater.LastEventIDSent()

func (u *latestEventsUpdater) doUpdateLatestEvents(ctx context.Context, roomInfo *types.RoomInfo) ([]api.OutputEvent, error) {
// If we are doing a regular event update then we will get the
// previous latest events to use as a part of the calculation. If
// we are overwriting the latest events because we have a complete
Expand All @@ -141,79 +164,46 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
u.oldLatest = u.updater.LatestEvents()
}

// If the event has already been written to the output log then we
// don't need to do anything, as we've handled it already.
if hasBeenSent, err := u.updater.HasEventBeenSent(u.stateAtEvent.EventNID); err != nil {
return fmt.Errorf("u.updater.HasEventBeenSent: %w", err)
} else if hasBeenSent {
return nil
}

// Work out what the latest events are. This will include the new
// event if it is not already referenced.
extremitiesChanged, err := u.calculateLatest(
ctx,
u.oldLatest, u.event,
types.StateAtEventAndReference{
EventID: u.event.EventID(),
StateAtEvent: u.stateAtEvent,
},
)
if err != nil {
return fmt.Errorf("u.calculateLatest: %w", err)
return nil, fmt.Errorf("u.calculateLatest: %w", err)
}

// Now that we know what the latest events are, it's time to get the
// latest state.
var updates []api.OutputEvent
var membershipUpdates []api.OutputEvent
if extremitiesChanged || u.rewritesState {
if err = u.latestState(); err != nil {
return fmt.Errorf("u.latestState: %w", err)
if err = u.latestState(ctx, roomInfo); err != nil {
return nil, fmt.Errorf("u.latestState: %w", err)
}

// If we need to generate any output events then here's where we do it.
// TODO: Move this!
if updates, err = u.api.updateMemberships(u.ctx, u.updater, u.removed, u.added); err != nil {
return fmt.Errorf("u.api.updateMemberships: %w", err)
if membershipUpdates, err = u.api.updateMemberships(ctx, u.updater, u.removed, u.added); err != nil {
return nil, fmt.Errorf("u.api.updateMemberships: %w", err)
}
} else {
u.newStateNID = u.oldStateNID
}

if err = u.updater.SetLatestEvents(u.roomInfo.RoomNID, u.latest, u.stateAtEvent.EventNID, u.newStateNID); err != nil {
return fmt.Errorf("u.updater.SetLatestEvents: %w", err)
}

update, err := u.makeOutputNewRoomEvent()
if err != nil {
return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err)
}
updates = append(updates, *update)

// Send the event to the output logs.
// We do this inside the database transaction to ensure that we only mark an event as sent if we sent it.
// (n.b. this means that it's possible that the same event will be sent twice if the transaction fails but
// the write to the output log succeeds)
// TODO: This assumes that writing the event to the output log is synchronous. It should be possible to
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
// necessary bookkeeping we'll keep the event sending synchronous for now.
if err = u.api.OutputProducer.ProduceRoomEvents(u.event.RoomID().String(), updates); err != nil {
return fmt.Errorf("u.api.WriteOutputEvents: %w", err)
}

if err = u.updater.MarkEventAsSent(u.stateAtEvent.EventNID); err != nil {
return fmt.Errorf("u.updater.MarkEventAsSent: %w", err)
}

return nil
return membershipUpdates, nil
}

func (u *latestEventsUpdater) latestState() error {
trace, ctx := internal.StartRegion(u.ctx, "processEventWithMissingState")
func (u *latestEventsUpdater) latestState(ctx context.Context, roomInfo *types.RoomInfo) error {
trace, ctx := internal.StartRegion(ctx, "processEventWithMissingState")
defer trace.EndRegion()

var err error
roomState := state.NewStateResolution(u.updater, u.roomInfo, u.api.Queryer)
roomState := state.NewStateResolution(u.updater, roomInfo, u.api.Queryer)

// Work out if the state at the extremities has actually changed
// or not. If they haven't then we won't bother doing all of the
Expand Down Expand Up @@ -289,23 +279,27 @@ func (u *latestEventsUpdater) latestState() error {

if removed := len(u.removed) - len(u.added); !u.rewritesState && removed > 0 {
logrus.WithFields(logrus.Fields{
"event_id": u.event.EventID(),
"room_id": u.event.RoomID().String(),
"old_state_nid": u.oldStateNID,
"new_state_nid": u.newStateNID,
"old_latest": u.oldLatest.EventIDs(),
"new_latest": u.latest.EventIDs(),
"event_id": u.event.EventID(),
"room_id": u.event.RoomID().String(),
"old_state_nid": u.oldStateNID,
"new_state_nid": u.newStateNID,
"old_latest": u.oldLatest.EventIDs(),
"new_latest": u.latest.EventIDs(),
"rewrites_state": u.rewritesState,
"state_at_event": fmt.Sprintf("%#v", u.stateAtEvent),
}).Warnf("State reset detected (removing %d events)", removed)
sentry.WithScope(func(scope *sentry.Scope) {
scope.SetLevel("warning")
scope.SetTag("room_id", u.event.RoomID().String())
scope.SetContext("State reset", map[string]interface{}{
"Event ID": u.event.EventID(),
"Old state NID": fmt.Sprintf("%d", u.oldStateNID),
"New state NID": fmt.Sprintf("%d", u.newStateNID),
"Old latest": u.oldLatest.EventIDs(),
"New latest": u.latest.EventIDs(),
"State removed": removed,
"Event ID": u.event.EventID(),
"Old state NID": fmt.Sprintf("%d", u.oldStateNID),
"New state NID": fmt.Sprintf("%d", u.newStateNID),
"Old latest": u.oldLatest.EventIDs(),
"New latest": u.latest.EventIDs(),
"State removed": removed,
"State rewritten": fmt.Sprintf("%v", u.rewritesState),
"State at event": fmt.Sprintf("%#v", u.stateAtEvent),
})
sentry.CaptureMessage("State reset detected")
})
Expand All @@ -326,11 +320,12 @@ func (u *latestEventsUpdater) latestState() error {
// calculateLatest works out the new set of forward extremities. Returns
// true if the new event is included in those extremites, false otherwise.
func (u *latestEventsUpdater) calculateLatest(
ctx context.Context,
oldLatest []types.StateAtEventAndReference,
newEvent gomatrixserverlib.PDU,
newStateAndRef types.StateAtEventAndReference,
) (bool, error) {
trace, _ := internal.StartRegion(u.ctx, "calculateLatest")
trace, _ := internal.StartRegion(ctx, "calculateLatest")
defer trace.EndRegion()

// First of all, get a list of all of the events in our current
Expand Down Expand Up @@ -387,7 +382,13 @@ func (u *latestEventsUpdater) calculateLatest(
return true, nil
}

func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) {
func (u *latestEventsUpdater) makeOutputNewRoomEvent(
ctx context.Context,
transactionID *api.TransactionID,
sendAsServer string,
lastEventIDSent string,
historyVisibility gomatrixserverlib.HistoryVisibility,
) (*api.OutputEvent, error) {
latestEventIDs := make([]string, len(u.latest))
for i := range u.latest {
latestEventIDs[i] = u.latest[i].EventID
Expand All @@ -396,14 +397,14 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
ore := api.OutputNewRoomEvent{
Event: &types.HeaderedEvent{PDU: u.event},
RewritesState: u.rewritesState,
LastSentEventID: u.lastEventIDSent,
LastSentEventID: lastEventIDSent,
LatestEventIDs: latestEventIDs,
TransactionID: u.transactionID,
SendAsServer: u.sendAsServer,
HistoryVisibility: u.historyVisibility,
TransactionID: transactionID,
SendAsServer: sendAsServer,
HistoryVisibility: historyVisibility,
}

eventIDMap, err := u.stateEventMap()
eventIDMap, err := u.stateEventMap(ctx)
if err != nil {
return nil, err
}
Expand All @@ -427,7 +428,7 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
}

// retrieve an event nid -> event ID map for all events that need updating
func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error) {
func (u *latestEventsUpdater) stateEventMap(ctx context.Context) (map[types.EventNID]string, error) {
cap := len(u.added) + len(u.removed) + len(u.stateBeforeEventRemoves) + len(u.stateBeforeEventAdds)
stateEventNIDs := make(types.EventNIDs, 0, cap)
allStateEntries := make([]types.StateEntry, 0, cap)
Expand All @@ -439,5 +440,5 @@ func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error)
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
}
stateEventNIDs = stateEventNIDs[:util.SortAndUnique(stateEventNIDs)]
return u.updater.EventIDs(u.ctx, stateEventNIDs)
return u.updater.EventIDs(ctx, stateEventNIDs)
}
Loading