From e414fed28a7e339a15b154a65929b4ea8d27c210 Mon Sep 17 00:00:00 2001 From: streamer45 Date: Mon, 29 May 2023 13:32:15 +0200 Subject: [PATCH] Implement start call API endpoint --- e2e/tests/global_widget.spec.ts | 26 +--- e2e/tests/start_call.spec.ts | 25 +++- e2e/types.ts | 1 - e2e/utils.ts | 10 +- server/api.go | 89 +++--------- server/call.go | 245 ++++++++++++++++++++++++++++++++ server/channel_state.go | 4 +- server/plugin.go | 5 + server/websocket.go | 112 ++++----------- webapp/src/reducers.ts | 5 + webapp/src/selectors.ts | 4 +- webapp/src/slash_commands.tsx | 6 +- 12 files changed, 348 insertions(+), 184 deletions(-) create mode 100644 server/call.go diff --git a/e2e/tests/global_widget.spec.ts b/e2e/tests/global_widget.spec.ts index 6fce8904f..33cb18064 100644 --- a/e2e/tests/global_widget.spec.ts +++ b/e2e/tests/global_widget.spec.ts @@ -2,17 +2,13 @@ import {test, expect} from '@playwright/test'; import {baseURL, defaultTeam, pluginID} from '../constants'; -import {getChannelNamesForTest, getUserStoragesForTest} from '../utils'; +import {getChannelNamesForTest, getUserStoragesForTest, getChannelID} from '../utils'; test.describe('global widget', () => { test.use({storageState: getUserStoragesForTest()[0]}); test('start call', async ({page, request}) => { - const channelName = getChannelNamesForTest()[0]; - const resp = await request.get(`${baseURL}/api/v4/teams/name/${defaultTeam}/channels/name/${channelName}`); - const channel = await resp.json(); - - await page.goto(`${baseURL}/plugins/${pluginID}/standalone/widget.html?call_id=${channel.id}`); + await page.goto(`${baseURL}/plugins/${pluginID}/standalone/widget.html?call_id=${await getChannelID(request)}`); await expect(page.locator('#calls-widget')).toBeVisible(); await expect(page.locator('#calls-widget-leave-button')).toBeVisible(); await page.locator('#calls-widget-leave-button').click(); @@ -21,11 +17,7 @@ test.describe('global widget', () => { test('recording widget banner', async ({page, request, context}) => { // start call - const channelName = getChannelNamesForTest()[0]; - const resp = await request.get(`${baseURL}/api/v4/teams/name/${defaultTeam}/channels/name/${channelName}`); - const channel = await resp.json(); - - await page.goto(`${baseURL}/plugins/${pluginID}/standalone/widget.html?call_id=${channel.id}`); + await page.goto(`${baseURL}/plugins/${pluginID}/standalone/widget.html?call_id=${await getChannelID(request)}`); await expect(page.locator('#calls-widget')).toBeVisible(); // open popout to control recording @@ -65,11 +57,7 @@ test.describe('global widget', () => { context, }) => { // start call - const channelName = getChannelNamesForTest()[0]; - const resp = await request.get(`${baseURL}/api/v4/teams/name/${defaultTeam}/channels/name/${channelName}`); - const channel = await resp.json(); - - await page.goto(`${baseURL}/plugins/${pluginID}/standalone/widget.html?call_id=${channel.id}`); + await page.goto(`${baseURL}/plugins/${pluginID}/standalone/widget.html?call_id=${await getChannelID(request)}`); await expect(page.locator('#calls-widget')).toBeVisible(); // open popout to control recording @@ -138,11 +126,7 @@ test.describe('global widget', () => { context, }) => { // start call - const channelName = getChannelNamesForTest()[0]; - const resp = await request.get(`${baseURL}/api/v4/teams/name/${defaultTeam}/channels/name/${channelName}`); - const channel = await resp.json(); - - await page.goto(`${baseURL}/plugins/${pluginID}/standalone/widget.html?call_id=${channel.id}`); + await page.goto(`${baseURL}/plugins/${pluginID}/standalone/widget.html?call_id=${await getChannelID(request)}`); await expect(page.locator('#calls-widget')).toBeVisible(); // open popout to control recording diff --git a/e2e/tests/start_call.spec.ts b/e2e/tests/start_call.spec.ts index b411b3e22..b6f7ca2ba 100644 --- a/e2e/tests/start_call.spec.ts +++ b/e2e/tests/start_call.spec.ts @@ -3,9 +3,9 @@ import {readFile} from 'fs/promises'; import {test, expect, chromium} from '@playwright/test'; import PlaywrightDevPage from '../page'; -import {getChannelNamesForTest, getUsernamesForTest, getUserStoragesForTest} from '../utils'; +import {getChannelNamesForTest, getUsernamesForTest, getUserStoragesForTest, getChannelID} from '../utils'; -import {adminState} from '../constants'; +import {adminState, baseURL, defaultTeam, pluginID} from '../constants'; const userStorages = getUserStoragesForTest(); const usernames = getUsernamesForTest(); @@ -54,6 +54,27 @@ test.describe('start/join call in channel with calls disabled', () => { test.describe('start new call', () => { test.use({storageState: userStorages[0]}); + test('API endpoint', async ({page, request}) => { + const channelID = await getChannelID(request); + + await request.post(`${baseURL}/plugins/${pluginID}/calls/start`, { + data: { + channel_id: channelID, + }, + headers: {'X-Requested-With': 'XMLHttpRequest'}, + }); + + // verify the call post is created. + await expect(page.getByTestId('call-thread').filter({has: page.getByText(`${usernames[0]} started a call`)})).toBeVisible(); + + await request.post(`${baseURL}/plugins/${pluginID}/calls/${channelID}/end`, { + headers: {'X-Requested-With': 'XMLHttpRequest'}, + }); + + // verify the call has ended. + await expect(page.getByTestId('call-thread').filter({has: page.getByText(`${usernames[0]} started a call`)})).toBeHidden(); + }); + test('channel header button', async ({page}) => { const devPage = new PlaywrightDevPage(page); await devPage.startCall(); diff --git a/e2e/types.ts b/e2e/types.ts index 016ac1e68..0f4503346 100644 --- a/e2e/types.ts +++ b/e2e/types.ts @@ -11,4 +11,3 @@ export type UserState = { password: string; storageStatePath: string; }; - diff --git a/e2e/utils.ts b/e2e/utils.ts index c631b3bee..e5d4edb40 100644 --- a/e2e/utils.ts +++ b/e2e/utils.ts @@ -1,8 +1,8 @@ -import {chromium} from '@playwright/test'; +import {chromium, APIRequestContext} from '@playwright/test'; import PlaywrightDevPage from './page'; -import {userPrefix, channelPrefix} from './constants'; +import {userPrefix, channelPrefix, baseURL, defaultTeam} from './constants'; export function getChannelNamesForTest() { let idx = 0; @@ -38,3 +38,9 @@ export async function startCall(userState: string) { return userPage; } +export async function getChannelID(request: APIRequestContext, channelIdx?: number) { + const channelName = getChannelNamesForTest()[channelIdx || 0]; + const resp = await request.get(`${baseURL}/api/v4/teams/name/${defaultTeam}/channels/name/${channelName}`); + const channel = await resp.json(); + return channel.id; +} diff --git a/server/api.go b/server/api.go index 813a03746..28e04280c 100644 --- a/server/api.go +++ b/server/api.go @@ -11,7 +11,6 @@ import ( "path/filepath" "regexp" "strings" - "time" "golang.org/x/time/rate" @@ -23,6 +22,7 @@ import ( var chRE = regexp.MustCompile(`^\/([a-z0-9]+)$`) var callEndRE = regexp.MustCompile(`^\/calls\/([a-z0-9]+)\/end$`) +var callStartRE = regexp.MustCompile(`^\/calls\/start$`) const requestBodyMaxSizeBytes = 1024 * 1024 // 1MB @@ -170,92 +170,42 @@ func (p *Plugin) handleGetAllChannels(w http.ResponseWriter, r *http.Request) { } } -func (p *Plugin) handleEndCall(w http.ResponseWriter, r *http.Request, channelID string) { +func (p *Plugin) handleStartCall(w http.ResponseWriter, r *http.Request) { var res httpResponse - defer p.httpAudit("handleEndCall", &res, w, r) + defer p.httpAudit("handleStartCall", &res, w, r) userID := r.Header.Get("Mattermost-User-Id") - isAdmin := p.API.HasPermissionTo(userID, model.PermissionManageSystem) - - state, err := p.kvGetChannelState(channelID) - if err != nil { + var data CallStartRequest + if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, requestBodyMaxSizeBytes)).Decode(&data); err != nil { res.Err = err.Error() - res.Code = http.StatusInternalServerError - return - } - - if state == nil || state.Call == nil { - res.Err = "no call ongoing" res.Code = http.StatusBadRequest return } - if !isAdmin && state.Call.OwnerID != userID { - res.Err = "no permissions to end the call" + // TODO: consider forwarding proper error codes. + if err := p.startCall(userID, data); err != nil { + res.Err = err.Error() res.Code = http.StatusForbidden return } - callID := state.Call.ID - - if err := p.kvSetAtomicChannelState(channelID, func(state *channelState) (*channelState, error) { - if state == nil || state.Call == nil { - return nil, nil - } + res.Code = http.StatusOK + res.Msg = "success" +} - if state.Call.ID != callID { - return nil, fmt.Errorf("previous call has ended and new one has started") - } +func (p *Plugin) handleEndCall(w http.ResponseWriter, r *http.Request, channelID string) { + var res httpResponse + defer p.httpAudit("handleEndCall", &res, w, r) - if state.Call.EndAt == 0 { - state.Call.EndAt = time.Now().UnixMilli() - } + userID := r.Header.Get("Mattermost-User-Id") - return state, nil - }); err != nil { - res.Err = err.Error() + if err := p.endCall(userID, channelID); err != nil { res.Code = http.StatusForbidden + res.Msg = err.Error() return } - p.publishWebSocketEvent(wsEventCallEnd, map[string]interface{}{}, &model.WebsocketBroadcast{ChannelId: channelID, ReliableClusterSend: true}) - - go func() { - // We wait a few seconds for the call to end cleanly. If this doesn't - // happen we force end it. - time.Sleep(5 * time.Second) - - state, err := p.kvGetChannelState(channelID) - if err != nil { - p.LogError(err.Error()) - return - } - if state == nil || state.Call == nil || state.Call.ID != callID { - return - } - - p.LogInfo("call state is still in store, force ending it", "channelID", channelID) - - if state.Call.Recording != nil && state.Call.Recording.EndAt == 0 { - p.LogInfo("recording is in progress, force ending it", "channelID", channelID, "jobID", state.Call.Recording.JobID) - - if err := p.jobService.StopJob(state.Call.Recording.JobID); err != nil { - p.LogError("failed to stop recording job", "error", err.Error(), "channelID", channelID, "jobID", state.Call.Recording.JobID) - } - } - - for connID := range state.Call.Sessions { - if err := p.closeRTCSession(userID, connID, channelID, state.NodeID); err != nil { - p.LogError(err.Error()) - } - } - - if err := p.cleanCallState(channelID); err != nil { - p.LogError(err.Error()) - } - }() - res.Code = http.StatusOK res.Msg = "success" } @@ -564,6 +514,11 @@ func (p *Plugin) ServeHTTP(c *plugin.Context, w http.ResponseWriter, r *http.Req return } + if callStartRE.MatchString(r.URL.Path) { + p.handleStartCall(w, r) + return + } + if r.URL.Path == "/telemetry/track" { p.handleTrackEvent(w, r) return diff --git a/server/call.go b/server/call.go new file mode 100644 index 000000000..74fb5cd22 --- /dev/null +++ b/server/call.go @@ -0,0 +1,245 @@ +// Copyright (c) 2022-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package main + +import ( + "fmt" + "time" + + "github.com/mattermost/mattermost-server/v6/model" +) + +type CallStartRequest struct { + ChannelID string `json:"channel_id"` + Title string `json:"title,omitempty"` + ThreadID string `json:"thread_id,omitempty"` +} + +func (r CallStartRequest) IsValid() error { + if r.ChannelID == "" { + return fmt.Errorf("ChannelID should not be empty") + } + + return nil +} + +func (p *Plugin) getChannelForCall(userID string, req CallStartRequest) (*model.Channel, error) { + // We should go through only if the user has permissions to the requested channel + // or if the user is the Calls bot. + if !(p.isBot(userID) || p.API.HasPermissionToChannel(userID, req.ChannelID, model.PermissionCreatePost)) { + return nil, fmt.Errorf("forbidden") + } + channel, appErr := p.API.GetChannel(req.ChannelID) + if appErr != nil { + return nil, appErr + } + if channel.DeleteAt > 0 { + return nil, fmt.Errorf("cannot join call in archived channel") + } + + if req.ThreadID != "" { + post, appErr := p.API.GetPost(req.ThreadID) + if appErr != nil { + return nil, appErr + } + + if post.ChannelId != req.ChannelID { + return nil, fmt.Errorf("forbidden") + } + + if post.DeleteAt > 0 { + return nil, fmt.Errorf("cannot attach call to deleted thread") + } + + if post.RootId != "" { + return nil, fmt.Errorf("thread is not a root post") + } + } + + return channel, nil +} + +func (p *Plugin) startCall(userID string, req CallStartRequest) error { + if err := req.IsValid(); err != nil { + return fmt.Errorf("failed to validate call start request: %w", err) + } + + channel, err := p.getChannelForCall(userID, req) + if err != nil { + return fmt.Errorf("failed to get channel for call: %w", err) + } + + var call *callState + err = p.kvSetAtomicChannelState(req.ChannelID, func(state *channelState) (*channelState, error) { + if state == nil { + state = &channelState{} + } else if !p.userCanStartOrJoin(userID, state) { + return nil, fmt.Errorf("calls are not enabled") + } + + if state.Call != nil { + return nil, fmt.Errorf("call already ongoing") + } + + call = &callState{ + ID: model.NewId(), + ThreadID: req.ThreadID, + StartAt: time.Now().UnixMilli(), + Users: make(map[string]*userState), + Sessions: make(map[string]struct{}), + OwnerID: userID, + } + state.Call = call + state.NodeID = p.nodeID + + if p.rtcdManager != nil { + host, err := p.rtcdManager.GetHostForNewCall() + if err != nil { + return nil, fmt.Errorf("failed to get rtcd host: %w", err) + } + p.LogDebug("rtcd host has been assigned to call", "host", host) + state.Call.RTCDHost = host + } + + return state, nil + }) + if err != nil { + return fmt.Errorf("failed to start call: %w", err) + } + + if err := p.handleCallStarted(call, req, channel); err != nil { + p.LogError(err.Error()) + } + + return nil +} + +func (p *Plugin) handleCallStarted(call *callState, req CallStartRequest, channel *model.Channel) error { + p.track(evCallStarted, map[string]interface{}{ + "OwnerID": call.OwnerID, + "CallID": call.ID, + "ChannelID": channel.Id, + "ChannelType": channel.Type, + }) + + // new call has started + // If this is TestMode (DefaultEnabled=false) and sysadmin, send an ephemeral message + cfg := p.getConfiguration() + if cfg.DefaultEnabled != nil && !*cfg.DefaultEnabled && + p.API.HasPermissionTo(call.OwnerID, model.PermissionManageSystem) { + p.pluginAPI.Post.SendEphemeralPost( + call.OwnerID, + &model.Post{ + UserId: p.botSession.UserId, + ChannelId: channel.Id, + Message: "Currently calls are not enabled for non-admin users. You can change the setting through the system console", + }, + ) + } + + postID, threadID, err := p.startNewCallPost(call.OwnerID, channel.Id, call.StartAt, req.Title, req.ThreadID) + if err != nil { + return err + } + + // TODO: send all the info attached to a call. + p.publishWebSocketEvent(wsEventCallStart, map[string]interface{}{ + "channelID": channel.Id, + "start_at": call.StartAt, + "thread_id": threadID, + "post_id": postID, + "owner_id": call.OwnerID, + "host_id": call.HostID, + }, &model.WebsocketBroadcast{ChannelId: channel.Id, ReliableClusterSend: true}) + + return nil +} + +func (p *Plugin) endCall(userID, channelID string) error { + state, err := p.kvGetChannelState(channelID) + if err != nil { + return fmt.Errorf("failed to get state: %w", err) + } + + if state == nil || state.Call == nil { + return fmt.Errorf("no call ongoing") + } + + isAdmin := p.API.HasPermissionTo(userID, model.PermissionManageSystem) + if !isAdmin && state.Call.OwnerID != userID { + return fmt.Errorf("no permissions to end the call") + } + + var hasEnded bool + callID := state.Call.ID + + if err := p.kvSetAtomicChannelState(channelID, func(state *channelState) (*channelState, error) { + if state == nil || state.Call == nil { + return nil, nil + } + + if state.Call.ID != callID { + return nil, fmt.Errorf("previous call has ended and new one has started") + } + + if state.Call.EndAt == 0 { + state.Call.EndAt = time.Now().UnixMilli() + } + + if len(state.Call.Users) == 0 { + state.Call = nil + hasEnded = true + } + + return state, nil + }); err != nil { + return fmt.Errorf("failed to set state: %w", err) + } + + if _, err := p.updateCallPostEnded(state.Call.PostID); err != nil { + p.LogError(err.Error()) + } + p.publishWebSocketEvent(wsEventCallEnd, map[string]interface{}{}, &model.WebsocketBroadcast{ChannelId: channelID, ReliableClusterSend: true}) + + if hasEnded { + return nil + } + + go func() { + // We wait a few seconds for the call to end cleanly. If this doesn't + // happen we force end it. + time.Sleep(5 * time.Second) + + state, err := p.kvGetChannelState(channelID) + if err != nil { + p.LogError(err.Error()) + return + } + if state == nil || state.Call == nil || state.Call.ID != callID { + return + } + + p.LogInfo("call state is still in store, force ending it", "channelID", channelID) + + if state.Call.Recording != nil && state.Call.Recording.EndAt == 0 { + p.LogInfo("recording is in progress, force ending it", "channelID", channelID, "jobID", state.Call.Recording.JobID) + + if err := p.jobService.StopJob(state.Call.Recording.JobID); err != nil { + p.LogError("failed to stop recording job", "error", err.Error(), "channelID", channelID, "jobID", state.Call.Recording.JobID) + } + } + + for connID := range state.Call.Sessions { + if err := p.closeRTCSession(userID, connID, channelID, state.NodeID); err != nil { + p.LogError(err.Error()) + } + } + + if err := p.cleanCallState(channelID); err != nil { + p.LogError(err.Error()) + } + }() + + return nil +} diff --git a/server/channel_state.go b/server/channel_state.go index cde332453..a0292b60f 100644 --- a/server/channel_state.go +++ b/server/channel_state.go @@ -31,8 +31,8 @@ type callState struct { ID string `json:"id"` StartAt int64 `json:"create_at"` EndAt int64 `json:"end_at"` - Users map[string]*userState `json:"users,omitempty"` - Sessions map[string]struct{} `json:"sessions,omitempty"` + Users map[string]*userState `json:"users"` + Sessions map[string]struct{} `json:"sessions"` OwnerID string `json:"owner_id"` ThreadID string `json:"thread_id"` PostID string `json:"post_id"` diff --git a/server/plugin.go b/server/plugin.go index 4ead6ac6f..8a9a39b07 100644 --- a/server/plugin.go +++ b/server/plugin.go @@ -339,6 +339,11 @@ func (p *Plugin) updateCallPostEnded(postID string) (float64, error) { return 0, appErr } + if prop := post.GetProp("end_at"); prop != nil { + // Already ended. + return 0, nil + } + postMsg := "Call ended" slackAttachment := model.SlackAttachment{ Fallback: postMsg, diff --git a/server/websocket.go b/server/websocket.go index df0f53cb8..1c1e764f7 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -465,39 +465,12 @@ func (p *Plugin) handleLeave(us *session, userID, connID, channelID string) erro return nil } -func (p *Plugin) handleJoin(userID, connID, channelID, title, threadID string) error { - p.LogDebug("handleJoin", "userID", userID, "connID", connID, "channelID", channelID) +func (p *Plugin) handleJoin(userID, connID string, req CallStartRequest) error { + p.LogDebug("handleJoin", "userID", userID, "connID", connID, "channelID", req.ChannelID) - // We should go through only if the user has permissions to the requested channel - // or if the user is the Calls bot. - if !(p.isBot(userID) || p.API.HasPermissionToChannel(userID, channelID, model.PermissionCreatePost)) { - return fmt.Errorf("forbidden") - } - channel, appErr := p.API.GetChannel(channelID) - if appErr != nil { - return appErr - } - if channel.DeleteAt > 0 { - return fmt.Errorf("cannot join call in archived channel") - } - - if threadID != "" { - post, appErr := p.API.GetPost(threadID) - if appErr != nil { - return appErr - } - - if post.ChannelId != channelID { - return fmt.Errorf("forbidden") - } - - if post.DeleteAt > 0 { - return fmt.Errorf("cannot attach call to deleted thread") - } - - if post.RootId != "" { - return fmt.Errorf("thread is not a root post") - } + channel, err := p.getChannelForCall(userID, req) + if err != nil { + return fmt.Errorf("failed to get channel for call: %w", err) } state, prevState, err := p.addUserSession(userID, connID, channel) @@ -505,43 +478,10 @@ func (p *Plugin) handleJoin(userID, connID, channelID, title, threadID string) e return fmt.Errorf("failed to add user session: %w", err) } else if state.Call == nil { return fmt.Errorf("state.Call should not be nil") - } else if len(state.Call.Users) == 1 { - p.track(evCallStarted, map[string]interface{}{ - "ParticipantID": userID, - "CallID": state.Call.ID, - "ChannelID": channelID, - "ChannelType": channel.Type, - }) - - // new call has started - // If this is TestMode (DefaultEnabled=false) and sysadmin, send an ephemeral message - cfg := p.getConfiguration() - if cfg.DefaultEnabled != nil && !*cfg.DefaultEnabled && - p.API.HasPermissionTo(userID, model.PermissionManageSystem) { - p.pluginAPI.Post.SendEphemeralPost( - userID, - &model.Post{ - UserId: p.botSession.UserId, - ChannelId: channelID, - Message: "Currently calls are not enabled for non-admin users. You can change the setting through the system console", - }, - ) - } - - postID, threadID, err := p.startNewCallPost(userID, channelID, state.Call.StartAt, title, threadID) - if err != nil { + } else if prevState.Call == nil { + if err := p.handleCallStarted(state.Call, req, channel); err != nil { p.LogError(err.Error()) } - - // TODO: send all the info attached to a call. - p.publishWebSocketEvent(wsEventCallStart, map[string]interface{}{ - "channelID": channelID, - "start_at": state.Call.StartAt, - "thread_id": threadID, - "post_id": postID, - "owner_id": state.Call.OwnerID, - "host_id": state.Call.HostID, - }, &model.WebsocketBroadcast{ChannelId: channelID, ReliableClusterSend: true}) } handlerID, err := p.getHandlerID() @@ -553,12 +493,12 @@ func (p *Plugin) handleJoin(userID, connID, channelID, title, threadID string) e } p.LogDebug("got handlerID", "handlerID", handlerID) - us := newUserSession(userID, channelID, connID, p.rtcdManager == nil && handlerID == p.nodeID) + us := newUserSession(userID, req.ChannelID, connID, p.rtcdManager == nil && handlerID == p.nodeID) p.mut.Lock() p.sessions[connID] = us p.mut.Unlock() defer func() { - if err := p.handleLeave(us, userID, connID, channelID); err != nil { + if err := p.handleLeave(us, userID, connID, req.ChannelID); err != nil { p.LogError(err.Error()) } }() @@ -567,23 +507,23 @@ func (p *Plugin) handleJoin(userID, connID, channelID, title, threadID string) e msg := rtcd.ClientMessage{ Type: rtcd.ClientMessageJoin, Data: map[string]string{ - "callID": channelID, + "callID": req.ChannelID, "userID": userID, "sessionID": connID, }, } - if err := p.rtcdManager.Send(msg, channelID); err != nil { + if err := p.rtcdManager.Send(msg, req.ChannelID); err != nil { return fmt.Errorf("failed to send client join message: %w", err) } } else { if handlerID == p.nodeID { cfg := rtc.SessionConfig{ GroupID: "default", - CallID: channelID, + CallID: req.ChannelID, UserID: userID, SessionID: connID, } - p.LogDebug("initializing RTC session", "userID", userID, "connID", connID, "channelID", channelID) + p.LogDebug("initializing RTC session", "userID", userID, "connID", connID, "channelID", req.ChannelID) if err = p.rtcServer.InitSession(cfg, func() error { if atomic.CompareAndSwapInt32(&us.rtcClosed, 0, 1) { close(us.rtcCloseCh) @@ -597,7 +537,7 @@ func (p *Plugin) handleJoin(userID, connID, channelID, title, threadID string) e if err := p.sendClusterMessage(clusterMessage{ ConnID: connID, UserID: userID, - ChannelID: channelID, + ChannelID: req.ChannelID, SenderID: p.nodeID, }, clusterMessageTypeConnect, handlerID); err != nil { return fmt.Errorf("failed to send connect message: %w", err) @@ -611,26 +551,26 @@ func (p *Plugin) handleJoin(userID, connID, channelID, title, threadID string) e }, &model.WebsocketBroadcast{UserId: userID, ReliableClusterSend: true}) p.publishWebSocketEvent(wsEventUserConnected, map[string]interface{}{ "userID": userID, - }, &model.WebsocketBroadcast{ChannelId: channelID, ReliableClusterSend: true}) - p.metrics.IncWebSocketConn(channelID) - defer p.metrics.DecWebSocketConn(channelID) + }, &model.WebsocketBroadcast{ChannelId: req.ChannelID, ReliableClusterSend: true}) + p.metrics.IncWebSocketConn(req.ChannelID) + defer p.metrics.DecWebSocketConn(req.ChannelID) p.track(evCallUserJoined, map[string]interface{}{ "ParticipantID": userID, - "ChannelID": channelID, + "ChannelID": req.ChannelID, "CallID": state.Call.ID, }) if prevState.Call != nil && state.Call.HostID != prevState.Call.HostID { p.publishWebSocketEvent(wsEventCallHostChanged, map[string]interface{}{ "hostID": state.Call.HostID, - }, &model.WebsocketBroadcast{ChannelId: channelID, ReliableClusterSend: true}) + }, &model.WebsocketBroadcast{ChannelId: req.ChannelID, ReliableClusterSend: true}) } if userID == p.getBotID() && state.Call.Recording != nil { p.publishWebSocketEvent(wsEventCallRecordingState, map[string]interface{}{ - "callID": channelID, + "callID": req.ChannelID, "recState": state.Call.Recording.getClientState().toMap(), - }, &model.WebsocketBroadcast{ChannelId: channelID, ReliableClusterSend: true}) + }, &model.WebsocketBroadcast{ChannelId: req.ChannelID, ReliableClusterSend: true}) } p.wsReader(us, handlerID) @@ -759,16 +699,20 @@ func (p *Plugin) WebSocketMessageHasBeenPosted(connID, userID string, req *model return } + data := CallStartRequest{ + ChannelID: channelID, + } + // Title is optional, so if it's not present, // it will be an empty string. - title, _ := req.Data["title"].(string) + data.Title, _ = req.Data["title"].(string) // ThreadID is optional, so if it's not present, // it will be an empty string. - threadID, _ := req.Data["threadID"].(string) + data.ThreadID, _ = req.Data["threadID"].(string) go func() { - if err := p.handleJoin(userID, connID, channelID, title, threadID); err != nil { + if err := p.handleJoin(userID, connID, data); err != nil { p.LogWarn(err.Error(), "userID", userID, "connID", connID, "channelID", channelID) p.publishWebSocketEvent(wsEventError, map[string]interface{}{ "data": err.Error(), diff --git a/webapp/src/reducers.ts b/webapp/src/reducers.ts index 48b75899a..bb4365eca 100644 --- a/webapp/src/reducers.ts +++ b/webapp/src/reducers.ts @@ -604,6 +604,11 @@ const voiceChannelCalls = (state: { [channelID: string]: callState } = {}, actio hostChangeAt: action.data.startAt, }, }; + case VOICE_CHANNEL_CALL_END: + return { + ...state, + [action.data.channelID]: null, + }; default: return state; } diff --git a/webapp/src/selectors.ts b/webapp/src/selectors.ts index fedc9cfd6..6902f8052 100644 --- a/webapp/src/selectors.ts +++ b/webapp/src/selectors.ts @@ -109,8 +109,8 @@ export const voiceUsersStatusesInChannel = (state: GlobalState, channelID: strin return pluginState(state).voiceUsersStatuses[channelID] || {}; }; -export const voiceChannelCallStartAt = (state: GlobalState, channelID: string): number | undefined => { - return pluginState(state).voiceChannelCalls[channelID]?.startAt; +export const voiceChannelCallStartAt = (state: GlobalState, channelID: string) => { + return pluginState(state).voiceChannelCalls[channelID]?.startAt || 0; }; export const voiceChannelCallOwnerID = (state: GlobalState, channelID: string): string | undefined => { diff --git a/webapp/src/slash_commands.tsx b/webapp/src/slash_commands.tsx index b1f00aff7..e47684348 100644 --- a/webapp/src/slash_commands.tsx +++ b/webapp/src/slash_commands.tsx @@ -23,7 +23,7 @@ import {logDebug} from './log'; import {sendDesktopEvent, shouldRenderDesktopWidget} from './utils'; import { connectedChannelID, - voiceConnectedUsersInChannel, + voiceChannelCallStartAt, voiceChannelCallOwnerID, voiceChannelCallHostID, callRecording, @@ -51,7 +51,7 @@ export default async function slashCommandsHandler(store: Store, joinCall: joinC case 'join': case 'start': if (subCmd === 'start') { - if (voiceConnectedUsersInChannel(store.getState(), args.channel_id).length > 0) { + if (voiceChannelCallStartAt(store.getState(), args.channel_id) > 0) { store.dispatch(displayGenericErrorModal( defineMessage({defaultMessage: 'Unable to start call'}), defineMessage({defaultMessage: 'A call is already ongoing in the channel.'}), @@ -112,7 +112,7 @@ export default async function slashCommandsHandler(store: Store, joinCall: joinC )); return {}; case 'end': - if (voiceConnectedUsersInChannel(store.getState(), args.channel_id)?.length === 0) { + if (!voiceChannelCallStartAt(store.getState(), args.channel_id)) { store.dispatch(displayGenericErrorModal( defineMessage({defaultMessage: 'Unable to end the call'}), defineMessage({defaultMessage: 'There\'s no ongoing call in the channel.'}),