-
Notifications
You must be signed in to change notification settings - Fork 39
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #369 from matrix-org/kegan/migrate-stuck-invites
Migrate stuck invites
- Loading branch information
Showing
2 changed files
with
315 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
package migrations | ||
|
||
import ( | ||
"context" | ||
"database/sql" | ||
"fmt" | ||
|
||
"github.com/lib/pq" | ||
"github.com/pressly/goose/v3" | ||
) | ||
|
||
func init() { | ||
goose.AddMigrationContext(upClearStuckInvites, downClearStuckInvites) | ||
} | ||
|
||
// The purpose of this migration is to find users who have rooms which have | ||
// not been properly processed by the proxy and invalidate their since token | ||
// so they will do an initial sync on the next poller startup. This is specifically | ||
// targeting stuck invites, where there is an invite in the invites table but | ||
// the room is already joined. This is usually (always?) due to missing a create | ||
// event when the room was joined, caused by a synapse bug outlined in | ||
// https://github.com/matrix-org/sliding-sync/issues/367 | ||
// This isn't exclusively a problem with invites, though it manifests more clearly there. | ||
func upClearStuckInvites(ctx context.Context, tx *sql.Tx) error { | ||
// The syncv3_unread table is updated any time A) a room is in rooms.join and B) the unread count has changed, | ||
// where nil != 0. Therefore, we can use this table as a proxy for "have we seen a v2 response which has put this | ||
// room into rooms.join"? For every room in rooms.join, we should have seen a create event for it, and hence have | ||
// an entry in syncv3_rooms. If we do not have an entry in syncv3_rooms but do have an entry in syncv3_unread, this | ||
// implies we failed to properly store this joined room and therefore the user who the unread marker is for should be | ||
// reset to force an initial sync. On matrix.org, of the users using sliding sync, this will catch around ~1.82% of users | ||
rows, err := tx.QueryContext(ctx, ` | ||
SELECT distinct(user_id) FROM syncv3_unread | ||
WHERE room_id NOT IN ( | ||
SELECT room_id | ||
FROM syncv3_rooms | ||
) | ||
`) | ||
defer rows.Close() | ||
if err != nil { | ||
return fmt.Errorf("failed to select bad users: %w", err) | ||
} | ||
|
||
var usersToInvalidate []string | ||
for rows.Next() { | ||
var userID string | ||
err = rows.Scan(&userID) | ||
if err != nil { | ||
return fmt.Errorf("failed to scan user: %w", err) | ||
} | ||
usersToInvalidate = append(usersToInvalidate, userID) | ||
} | ||
logger.Info().Int("len_invalidate_users", len(usersToInvalidate)).Msg("invalidating users") | ||
if len(usersToInvalidate) < 50 { | ||
logger.Info().Strs("invalidate_users", usersToInvalidate).Msg("invalidating users") | ||
} | ||
|
||
// for each user: | ||
// - reset their since token for all devices | ||
// - remove any outstanding invites (we'll be told about them again when they initial sync) | ||
res, err := tx.ExecContext(ctx, ` | ||
UPDATE syncv3_sync2_devices SET since='' WHERE user_id=ANY($1) | ||
`, pq.StringArray(usersToInvalidate)) | ||
if err != nil { | ||
return fmt.Errorf("failed to invalidate since tokens: %w", err) | ||
} | ||
ra, _ := res.RowsAffected() | ||
logger.Info().Int64("num_devices", ra).Msg("reset since tokens") | ||
|
||
res, err = tx.ExecContext(ctx, ` | ||
DELETE FROM syncv3_invites WHERE user_id=ANY($1) | ||
`, pq.StringArray(usersToInvalidate)) | ||
if err != nil { | ||
return fmt.Errorf("failed to remove outstanding invites: %w", err) | ||
} | ||
ra, _ = res.RowsAffected() | ||
logger.Info().Int64("num_invites", ra).Msg("reset invites") | ||
return nil | ||
} | ||
|
||
func downClearStuckInvites(ctx context.Context, tx *sql.Tx) error { | ||
// we can't roll this back | ||
return nil | ||
} |
232 changes: 232 additions & 0 deletions
232
state/migrations/20231108122539_clear_stuck_invites_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,232 @@ | ||
package migrations | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"testing" | ||
"time" | ||
|
||
"github.com/jmoiron/sqlx" | ||
"github.com/matrix-org/sliding-sync/sqlutil" | ||
"github.com/matrix-org/sliding-sync/state" | ||
"github.com/matrix-org/sliding-sync/sync2" | ||
) | ||
|
||
func TestClearStuckInvites(t *testing.T) { | ||
db, close := connectToDB(t) | ||
defer close() | ||
roomsTable := state.NewRoomsTable(db) | ||
inviteTable := state.NewInvitesTable(db) | ||
unreadTable := state.NewUnreadTable(db) | ||
deviceTable := sync2.NewDevicesTable(db) | ||
tokensTable := sync2.NewTokensTable(db, "secret") | ||
|
||
zero := 0 | ||
device1 := "TEST_1" | ||
device2 := "TEST_2" | ||
roomA := "!TestClearStuckInvites_a:localhost" | ||
roomB := "!TestClearStuckInvites_b:localhost" | ||
roomC := "!TestClearStuckInvites_c:localhost" | ||
roomD := "!TestClearStuckInvites_d:localhost" | ||
roomE := "!TestClearStuckInvites_e:localhost" | ||
roomF := "!TestClearStuckInvites_f:localhost" | ||
roomG := "!TestClearStuckInvites_g:localhost" | ||
alice := "@TestClearStuckInvites_alice:localhost" | ||
bob := "@TestClearStuckInvites_bob:localhost" | ||
charlie := "@TestClearStuckInvites_charlie:localhost" | ||
doris := "@TestClearStuckInvites_doris:localhost" | ||
users := []string{ | ||
alice, bob, charlie, doris, | ||
} | ||
|
||
// Test cases: | ||
// Room | In Invite Table? | In Unread Table? | In Room Table? | Comment | ||
// A Y Y Y OK, Genuine invite, proxy in room | ||
// B Y Y N BAD, Stuck invite, proxy never saw join | ||
// C Y N Y OK, Genuine invite, proxy in room, no unread counts (unusual but valid) | ||
// D Y N N OK, Genuine invite, proxy not in room | ||
// E N Y Y OK, Genuine joined room | ||
// F N Y N BAD, Stuck joined room, proxy never saw join | ||
// G N N Y OK, Genuine joined room, no unread counts (unusual but valid) | ||
// - N N N Impossible, room id isn't in any table! | ||
roomToInfo := map[string]struct { | ||
invitedUser string | ||
unreadUser string | ||
inRoomTable bool | ||
}{ | ||
roomA: { | ||
invitedUser: alice, | ||
unreadUser: bob, | ||
inRoomTable: true, | ||
}, | ||
roomB: { | ||
invitedUser: bob, | ||
unreadUser: bob, | ||
inRoomTable: false, | ||
}, | ||
roomC: { | ||
invitedUser: charlie, | ||
inRoomTable: true, | ||
}, | ||
roomD: { | ||
invitedUser: doris, | ||
inRoomTable: false, | ||
}, | ||
roomE: { | ||
unreadUser: alice, | ||
inRoomTable: true, | ||
}, | ||
roomF: { | ||
unreadUser: doris, | ||
}, | ||
roomG: { | ||
inRoomTable: true, | ||
}, | ||
} | ||
|
||
err := sqlutil.WithTransaction(db, func(txn *sqlx.Tx) error { | ||
for roomID, info := range roomToInfo { | ||
if info.inRoomTable { | ||
err := roomsTable.Upsert(txn, state.RoomInfo{ | ||
ID: roomID, | ||
}, 0, 0) | ||
if err != nil { | ||
return fmt.Errorf("Upsert room: %s", err) | ||
} | ||
} | ||
if info.invitedUser != "" { | ||
err := inviteTable.InsertInvite(info.invitedUser, roomID, []json.RawMessage{json.RawMessage(`{}`)}) | ||
if err != nil { | ||
return fmt.Errorf("InsertInvite: %s", err) | ||
} | ||
} | ||
if info.unreadUser != "" { | ||
err := unreadTable.UpdateUnreadCounters(info.unreadUser, roomID, &zero, &zero) | ||
if err != nil { | ||
return fmt.Errorf("UpdateUnreadCounters: %s", err) | ||
} | ||
} | ||
} | ||
for _, userID := range users { | ||
for _, deviceID := range []string{device1, device2} { | ||
// each user has 2 devices | ||
if err := deviceTable.InsertDevice(txn, userID, deviceID); err != nil { | ||
return fmt.Errorf("InsertDevice: %s", err) | ||
} | ||
_, err := tokensTable.Insert(txn, userID+deviceID, userID, deviceID, time.Now()) | ||
if err != nil { | ||
return fmt.Errorf("TokensTable.Insert: %s", err) | ||
} | ||
} | ||
} | ||
return nil | ||
}) | ||
if err != nil { | ||
t.Fatalf("failed to set up test configuration: %s", err) | ||
} | ||
// set since tokens (this is done without a txn hence cannot be bundled in as the UPDATE would fail) | ||
for _, userID := range users { | ||
for i, deviceID := range []string{device1, device2} { | ||
// each user has 2 devices | ||
since := fmt.Sprintf("since_%d", i) | ||
if err := deviceTable.UpdateDeviceSince(userID, deviceID, since); err != nil { | ||
t.Fatalf("UpdateDeviceSince: %s", err) | ||
} | ||
} | ||
} | ||
|
||
t.Log("Run the migration.") | ||
tx, err := db.Beginx() | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
if err := upClearStuckInvites(context.Background(), tx.Tx); err != nil { | ||
t.Fatalf("upClearStuckInvites: %s", err) | ||
} | ||
tx.Commit() | ||
|
||
// make a new txn for assertions | ||
tx, err = db.Beginx() | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
// users in room B (bob) and F (doris) should be reset. | ||
tokens, err := tokensTable.TokenForEachDevice(tx) | ||
if err != nil { | ||
t.Fatalf("TokenForEachDevice: %s", err) | ||
} | ||
wantResults := map[[2]string]struct { | ||
wantSinceReset bool | ||
}{ | ||
{bob, device1}: { | ||
wantSinceReset: true, | ||
}, | ||
{bob, device2}: { | ||
wantSinceReset: true, | ||
}, | ||
{doris, device1}: { | ||
wantSinceReset: true, | ||
}, | ||
{doris, device2}: { | ||
wantSinceReset: true, | ||
}, | ||
// everyone else should NOT have since reset | ||
{alice, device1}: { | ||
wantSinceReset: false, | ||
}, | ||
{alice, device2}: { | ||
wantSinceReset: false, | ||
}, | ||
{charlie, device1}: { | ||
wantSinceReset: false, | ||
}, | ||
{charlie, device2}: { | ||
wantSinceReset: false, | ||
}, | ||
} | ||
for _, tok := range tokens { | ||
key := [2]string{tok.UserID, tok.DeviceID} | ||
want, ok := wantResults[key] | ||
if !ok { | ||
continue // different user in another test? | ||
} | ||
if want.wantSinceReset && tok.Since != "" { | ||
t.Errorf("%s want since reset, got %+v", key, tok) | ||
} | ||
if !want.wantSinceReset && tok.Since == "" { | ||
t.Errorf("%s did not want since reset, got %+v", key, tok) | ||
} | ||
} | ||
// invites for Bob and Doris are gone | ||
for _, userID := range []string{bob, doris} { | ||
got, err := inviteTable.SelectAllInvitesForUser(userID) | ||
if err != nil { | ||
t.Fatalf("SelectAllInvitesForUser: %s", err) | ||
} | ||
if len(got) > 0 { | ||
t.Fatalf("SelectAllInvitesForUser got invites for %s, wanted none: %+v", userID, got) | ||
} | ||
} | ||
// ensure other invites remain | ||
wantInvites := map[string][]string{ | ||
alice: {roomA}, | ||
charlie: {roomC}, | ||
} | ||
for userID, wantInvitesRooms := range wantInvites { | ||
got, err := inviteTable.SelectAllInvitesForUser(userID) | ||
if err != nil { | ||
t.Fatalf("SelectAllInvitesForUser: %s", err) | ||
} | ||
if len(got) != len(wantInvitesRooms) { | ||
t.Fatalf("SelectAllInvitesForUser got %d invites for %s, wanted %d", len(got), userID, len(wantInvitesRooms)) | ||
} | ||
for _, wantRoom := range wantInvitesRooms { | ||
_, exists := got[wantRoom] | ||
if !exists { | ||
t.Fatalf("SelectAllInvitesForUser wanted invite for %s in %s, but it's missing", userID, wantRoom) | ||
} | ||
} | ||
} | ||
} |