Skip to content

Add TestNotificationClientDupeOTKUpload #134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions internal/api/rust/rust.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type RustRoomInfo struct {

type RustClient struct {
FFIClient *matrix_sdk_ffi.Client
syncService *matrix_sdk_ffi.SyncService
roomsListener *RoomsListener
allRooms *matrix_sdk_ffi.RoomList
rooms map[string]*RustRoomInfo
Expand All @@ -57,7 +58,7 @@ type RustClient struct {
persistentStoragePath string
opts api.ClientCreationOpts

// for NSE tests
// for push notification tests (single/multi-process)
notifClient *matrix_sdk_ffi.NotificationClient
}

Expand Down Expand Up @@ -132,9 +133,14 @@ func (c *RustClient) Opts() api.ClientCreationOpts {

func (c *RustClient) GetNotification(t ct.TestLike, roomID, eventID string) (*api.Notification, error) {
if c.notifClient == nil {
t.Errorf("RustClient misconfigured. You can only call GetNotification if this is an NSE process. " +
"Ensure opts.EnableCrossProcessRefreshLockProcessName and opts.AccessToken are set!")
return nil, fmt.Errorf("misconfigured rust client")
var err error
c.Logf(t, "creating NotificationClient")
c.notifClient, err = c.FFIClient.NotificationClient(matrix_sdk_ffi.NotificationProcessSetupSingleProcess{
SyncService: c.syncService,
})
if err != nil {
return nil, fmt.Errorf("GetNotification: failed to create NotificationClient: %s", err)
}
}
notifItem, err := c.notifClient.GetNotification(roomID, eventID)
if err != nil {
Expand Down Expand Up @@ -350,6 +356,7 @@ func (c *RustClient) StartSyncing(t ct.TestLike) (stopSyncing func(), err error)
}
go syncService.Start()
c.allRooms = roomList
c.syncService = syncService
// track new rooms when they are made
allRoomsListener := newGenericStateListener[[]matrix_sdk_ffi.RoomListEntriesUpdate]()
go func() {
Expand Down
66 changes: 29 additions & 37 deletions tests/notification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package tests
import (
"fmt"
"math/rand"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -535,35 +534,42 @@ func TestMultiprocessNSEOlmSessionWedge(t *testing.T) {
})
}

func TestMultiprocessDupeOTKUpload(t *testing.T) {
// Test that the notification client doesn't cause duplicate OTK uploads.
// Regression test for https://github.com/matrix-org/matrix-rust-sdk/issues/1415
//
// This test creates a normal Alice rust client and lets it upload OTKs. It then:
// - hooks into /keys/upload requests and artificially delays them by adding 4s of latency
// - creates a Bob client who sends a message to Alice, consuming 1 OTK in the process
// - immediately calls GetNotification on Bob's event as soon as it 200 OKs, which creates
// a NotificationClient inside the same process.
//
// This means there are 2 sync loops: the main Client and the NotificationClient. Both clients
// will see the OTK count being lowered so both may try to upload a new OTK. Because we are
// delaying upload requests by 4s, this increases the chance of both uploads being in-flight at
// the same time. If they do not synchronise this operation, they will both try to upload
// _different keys_ with the _same_ key ID, which causes synapse to HTTP 400 with:
//
// > One time key signed_curve25519:AAAAAAAAADI already exists
//
// Which will fail the test.
func TestNotificationClientDupeOTKUpload(t *testing.T) {
if !Instance().ShouldTest(api.ClientTypeRust) {
t.Skipf("rust only")
return
}
t.Skipf("skipped until it is no longer flakey")
tc, roomID := createAndJoinRoom(t)

// start the "main" app
alice := tc.MustLoginClient(t, &cc.ClientCreationRequest{
User: tc.Alice,
Opts: api.ClientCreationOpts{
PersistentStorage: true,
EnableCrossProcessRefreshLockProcessName: "main",
PersistentStorage: true,
},
})
stopSyncing := alice.MustStartSyncing(t)
defer stopSyncing()
aliceAccessToken := alice.Opts().AccessToken

// prep nse process
nseAlice := tc.MustCreateClient(t, &cc.ClientCreationRequest{
User: tc.Alice,
Multiprocess: true,
Opts: api.ClientCreationOpts{
PersistentStorage: true,
EnableCrossProcessRefreshLockProcessName: api.ProcessNameNSE,
AccessToken: aliceAccessToken,
},
})

aliceUploadedNewKeys := false
// artificially slow down the HTTP responses, such that we will potentially have 2 in-flight /keys/upload requests
// at once. If the NSE and main apps are talking to each other, they should be using the same key ID + key.
Expand All @@ -588,35 +594,21 @@ func TestMultiprocessDupeOTKUpload(t *testing.T) {
return nil
},
}, func() {
var eventID string
// Bob appears and sends a message, causing Bob to claim one of Alice's OTKs.
// The main app will see this in /sync and then try to upload another OTK, which we will tarpit.
tc.WithClientSyncing(t, &cc.ClientCreationRequest{
User: tc.Bob,
}, func(bob api.Client) {
eventID = bob.SendMessage(t, roomID, "Hello world!")
})
var wg sync.WaitGroup
wg.Add(2)
go func() { // nse process
defer wg.Done()
// wake up NSE process as if it got a push notification. Calling this function
// should cause the NSE process to upload a OTK as it would have seen 1 has been used.
// The NSE and main app must talk to each other to ensure they use the same key.
nseAlice.Logf(t, "GetNotification %s, %s", roomID, eventID)
notif, err := nseAlice.GetNotification(t, roomID, eventID)
eventID := bob.SendMessage(t, roomID, "Hello world!")
// create a NotificationClient in the same process to fetch this "push notification".
// It might make the NotificationClient upload a OTK as it would have seen 1 has been used.
// The NotificationClient and main Client must talk to each other to ensure they use the same key.
alice.Logf(t, "GetNotification %s, %s", roomID, eventID)
notif, err := alice.GetNotification(t, roomID, eventID)
must.NotError(t, "failed to get notification", err)
must.Equal(t, notif.Text, "Hello world!", "failed to decrypt msg body")
must.Equal(t, notif.FailedToDecrypt, false, "FailedToDecrypt but we should be able to decrypt")
}()
go func() { // app process
defer wg.Done()
stopSyncing := alice.MustStartSyncing(t)
// let alice upload new OTK
time.Sleep(5 * time.Second)
stopSyncing()
}()
wg.Wait()
})
})
if !aliceUploadedNewKeys {
t.Errorf("Alice did not upload new OTKs")
Expand Down
Loading