From aca0ee20e8793810b8ccce8a65819e306881c2b2 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Thu, 16 Jan 2025 16:29:51 +0200 Subject: [PATCH] client: add support for connect-once background resync interface --- pkg/connector/client.go | 42 ++++++++++++++++++++++++++++++++ pkg/connector/connector.go | 3 +++ pkg/connector/handlesignal.go | 4 +++ pkg/signalmeow/client.go | 4 +-- pkg/signalmeow/events/message.go | 3 +++ pkg/signalmeow/receiving.go | 17 +++++++++++-- 6 files changed, 69 insertions(+), 4 deletions(-) diff --git a/pkg/connector/client.go b/pkg/connector/client.go index 3b0be20c..2f2925c7 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -22,12 +22,14 @@ import ( "time" "github.com/rs/zerolog" + "go.mau.fi/util/exsync" "maunium.net/go/mautrix/bridge/status" "maunium.net/go/mautrix/bridgev2" "maunium.net/go/mautrix/bridgev2/networkid" "go.mau.fi/mautrix-signal/pkg/signalid" "go.mau.fi/mautrix-signal/pkg/signalmeow" + "go.mau.fi/mautrix-signal/pkg/signalmeow/web" ) type SignalClient struct { @@ -35,6 +37,8 @@ type SignalClient struct { UserLogin *bridgev2.UserLogin Client *signalmeow.Client Ghost *bridgev2.Ghost + + queueEmptyWaiter *exsync.Event } var ( @@ -51,6 +55,7 @@ var ( _ bridgev2.RoomNameHandlingNetworkAPI = (*SignalClient)(nil) _ bridgev2.RoomAvatarHandlingNetworkAPI = (*SignalClient)(nil) _ bridgev2.RoomTopicHandlingNetworkAPI = (*SignalClient)(nil) + _ bridgev2.BackgroundSyncingNetworkAPI = (*SignalClient)(nil) ) var pushCfg = &bridgev2.PushConfig{ @@ -210,6 +215,43 @@ func (s *SignalClient) Connect(ctx context.Context) { s.tryConnect(ctx, 0) } +func (s *SignalClient) ConnectBackground(ctx context.Context) error { + s.queueEmptyWaiter.Clear() + ch, err := s.Client.StartAuthedWS(ctx) + if err != nil { + return err + } + defer s.Disconnect() + log := zerolog.Ctx(ctx) + queueEmpty := s.queueEmptyWaiter.GetChan() + for { + select { + case status := <-ch: + switch status.Event { + case web.SignalWebsocketConnectionEventConnected: + log.Info().Msg("Authed websocket connected") + case web.SignalWebsocketConnectionEventDisconnected: + log.Err(status.Err).Msg("Authed websocket disconnected") + return fmt.Errorf("authed websocket disconnected: %w", status.Err) + case web.SignalWebsocketConnectionEventLoggedOut: + log.Err(status.Err).Msg("Authed websocket logged out") + return fmt.Errorf("authed websocket logged out: %w", status.Err) + case web.SignalWebsocketConnectionEventError: + log.Err(status.Err).Msg("Authed websocket error") + return fmt.Errorf("authed websocket errored: %w", status.Err) + case web.SignalWebsocketConnectionEventCleanShutdown: + log.Info().Msg("Authed websocket clean shutdown") + } + case <-ctx.Done(): + log.Warn().Msg("Context finished before queue empty event") + return ctx.Err() + case <-queueEmpty: + log.Info().Msg("Received queue empty event") + return nil + } + } +} + func (s *SignalClient) Disconnect() { if s.Client == nil { return diff --git a/pkg/connector/connector.go b/pkg/connector/connector.go index f4fc7ae1..229d4eae 100644 --- a/pkg/connector/connector.go +++ b/pkg/connector/connector.go @@ -23,6 +23,7 @@ import ( "github.com/google/uuid" "go.mau.fi/util/dbutil" + "go.mau.fi/util/exsync" "maunium.net/go/mautrix/bridgev2" "go.mau.fi/mautrix-signal/pkg/msgconv" @@ -89,6 +90,8 @@ func (s *SignalConnector) LoadUserLogin(ctx context.Context, login *bridgev2.Use sc := &SignalClient{ Main: s, UserLogin: login, + + queueEmptyWaiter: exsync.NewEvent(), } if device != nil { sc.Client = &signalmeow.Client{ diff --git a/pkg/connector/handlesignal.go b/pkg/connector/handlesignal.go index 19848620..d39f250e 100644 --- a/pkg/connector/handlesignal.go +++ b/pkg/connector/handlesignal.go @@ -52,6 +52,10 @@ func (s *SignalClient) handleSignalEvent(rawEvt events.SignalEvent) { s.handleSignalContactList(evt) case *events.ACIFound: s.handleSignalACIFound(evt) + case *events.QueueEmpty: + s.queueEmptyWaiter.Set() + default: + s.UserLogin.Log.Warn().Type("event_type", evt).Msg("Unrecognized signalmeow event type") } } diff --git a/pkg/signalmeow/client.go b/pkg/signalmeow/client.go index 8f44db05..7ef1abbd 100644 --- a/pkg/signalmeow/client.go +++ b/pkg/signalmeow/client.go @@ -73,7 +73,7 @@ func (cli *Client) IsConnected() bool { return cli.AuthedWS.IsConnected() && cli.UnauthedWS.IsConnected() } -func (cli *Client) ConnectAuthedWS(ctx context.Context, requestHandler web.RequestHandlerFunc) (chan web.SignalWebsocketConnectionStatus, error) { +func (cli *Client) connectAuthedWS(ctx context.Context, requestHandler web.RequestHandlerFunc) (chan web.SignalWebsocketConnectionStatus, error) { if cli.AuthedWS != nil { return nil, errors.New("authed websocket already connected") } @@ -90,7 +90,7 @@ func (cli *Client) ConnectAuthedWS(ctx context.Context, requestHandler web.Reque return statusChan, nil } -func (cli *Client) ConnectUnauthedWS(ctx context.Context) (chan web.SignalWebsocketConnectionStatus, error) { +func (cli *Client) connectUnauthedWS(ctx context.Context) (chan web.SignalWebsocketConnectionStatus, error) { if cli.UnauthedWS != nil { return nil, errors.New("unauthed websocket already connected") } diff --git a/pkg/signalmeow/events/message.go b/pkg/signalmeow/events/message.go index 0038657f..9b8e79da 100644 --- a/pkg/signalmeow/events/message.go +++ b/pkg/signalmeow/events/message.go @@ -35,6 +35,7 @@ func (*ReadSelf) isSignalEvent() {} func (*Call) isSignalEvent() {} func (*ContactList) isSignalEvent() {} func (*ACIFound) isSignalEvent() {} +func (*QueueEmpty) isSignalEvent() {} type MessageInfo struct { Sender uuid.UUID @@ -78,3 +79,5 @@ type ACIFound struct { PNI libsignalgo.ServiceID ACI libsignalgo.ServiceID } + +type QueueEmpty struct{} diff --git a/pkg/signalmeow/receiving.go b/pkg/signalmeow/receiving.go index b1dddea7..4d913e2a 100644 --- a/pkg/signalmeow/receiving.go +++ b/pkg/signalmeow/receiving.go @@ -68,17 +68,29 @@ type SignalConnectionStatus struct { Err error } +func (cli *Client) StartAuthedWS(ctx context.Context) (chan web.SignalWebsocketConnectionStatus, error) { + ctx, cancel := context.WithCancel(ctx) + cli.WSCancel = cancel + authChan, err := cli.connectAuthedWS(ctx, cli.incomingRequestHandler) + if err != nil { + cancel() + return nil, err + } + zerolog.Ctx(ctx).Info().Msg("Authed websocket connecting") + return authChan, nil +} + func (cli *Client) StartReceiveLoops(ctx context.Context) (chan SignalConnectionStatus, error) { log := zerolog.Ctx(ctx).With().Str("action", "start receive loops").Logger() ctx, cancel := context.WithCancel(log.WithContext(ctx)) cli.WSCancel = cancel - authChan, err := cli.ConnectAuthedWS(ctx, cli.incomingRequestHandler) + authChan, err := cli.connectAuthedWS(ctx, cli.incomingRequestHandler) if err != nil { cancel() return nil, err } log.Info().Msg("Authed websocket connecting") - unauthChan, err := cli.ConnectUnauthedWS(ctx) + unauthChan, err := cli.connectUnauthedWS(ctx) if err != nil { cancel() return nil, err @@ -257,6 +269,7 @@ func (cli *Client) incomingRequestHandler(ctx context.Context, req *signalpb.Web return cli.incomingAPIMessageHandler(ctx, req) } else if *req.Verb == http.MethodPut && *req.Path == "/api/v1/queue/empty" { log.Trace().Msg("Received queue empty") + cli.handleEvent(&events.QueueEmpty{}) } else { log.Warn().Any("req", req).Msg("Unknown websocket request message") }