Skip to content

Commit

Permalink
Merge branch 'mattermost:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
offsoc authored Sep 27, 2024
2 parents 872914d + 2b42657 commit a89cb90
Show file tree
Hide file tree
Showing 64 changed files with 633 additions and 1,303 deletions.
6 changes: 6 additions & 0 deletions server/channels/api4/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/v8/channels/app"
"github.com/mattermost/mattermost/server/v8/channels/manualtesting"
"github.com/mattermost/mattermost/server/v8/channels/web"
)

Expand Down Expand Up @@ -337,6 +338,11 @@ func Init(srv *app.Server) (*API, error) {
api.InitOutgoingOAuthConnection()
api.InitClientPerformanceMetrics()

// If we allow testing then listen for manual testing URL hits
if *srv.Config().ServiceSettings.EnableTesting {
api.BaseRoutes.Root.Handle("/manualtest", api.APIHandler(manualtesting.ManualTest)).Methods(http.MethodGet)
}

srv.Router.Handle("/api/v4/{anything:.*}", http.HandlerFunc(api.Handle404))

InitLocal(srv)
Expand Down
14 changes: 7 additions & 7 deletions server/channels/api4/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,21 +378,21 @@ func queryLogs(c *Context, w http.ResponseWriter, r *http.Request) {
return
}

logs, logerr := c.App.QueryLogs(c.AppContext, c.Params.Page, c.Params.LogsPerPage, logFilter)
if logerr != nil {
c.Err = logerr
logs, appErr := c.App.QueryLogs(c.AppContext, c.Params.Page, c.Params.LogsPerPage, logFilter)
if appErr != nil {
c.Err = appErr
return
}

logsJSON := make(map[string][]interface{})
var result interface{}
for node, logLines := range logs {
for _, log := range logLines {
err2 := json.Unmarshal([]byte(log), &result)
if err2 == nil {
logsJSON[node] = append(logsJSON[node], result)
err = json.Unmarshal([]byte(log), &result)
if err != nil {
c.Logger.Warn("Error parsing log line in Server Logs", mlog.String("from_node", node), mlog.Err(err))
} else {
c.Logger.Warn("Error parsing log line in Server Logs")
logsJSON[node] = append(logsJSON[node], result)
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions server/channels/api4/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -3344,6 +3344,14 @@ func setUnreadThreadByPostId(c *Context, w http.ResponseWriter, r *http.Request)
return
}

// We want to make sure the thread is followed when marking as unread
// https://mattermost.atlassian.net/browse/MM-36430
err := c.App.UpdateThreadFollowForUser(c.Params.UserId, c.Params.TeamId, c.Params.ThreadId, true)
if err != nil {
c.Err = err
return
}

thread, err := c.App.UpdateThreadReadForUserByPost(c.AppContext, c.AppContext.Session().Id, c.Params.UserId, c.Params.TeamId, c.Params.ThreadId, c.Params.PostId)
if err != nil {
c.Err = err
Expand Down
38 changes: 20 additions & 18 deletions server/channels/api4/user_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6855,58 +6855,60 @@ func TestThreadSocketEvents(t *testing.T) {
require.Truef(t, caught, "User should have received %s event", model.WebsocketEventThreadUpdated)
})

resp, err = th.Client.UpdateThreadFollowForUser(context.Background(), th.BasicUser.Id, th.BasicTeam.Id, rpost.Id, false)
_, resp, err = th.Client.UpdateThreadReadForUser(context.Background(), th.BasicUser.Id, th.BasicTeam.Id, rpost.Id, replyPost.CreateAt+1)
require.NoError(t, err)
CheckOKStatus(t, resp)

t.Run("Listed for follow event", func(t *testing.T) {
t.Run("Listed for read event", func(t *testing.T) {
var caught bool
func() {
for {
select {
case ev := <-userWSClient.EventChannel:
if ev.EventType() == model.WebsocketEventThreadFollowChanged {
if ev.EventType() == model.WebsocketEventThreadReadChanged {
caught = true
require.Equal(t, ev.GetData()["state"], false)
require.Equal(t, ev.GetData()["reply_count"], float64(1))

data := ev.GetData()
require.EqualValues(t, replyPost.CreateAt+1, data["timestamp"])
require.EqualValues(t, float64(1), data["previous_unread_replies"])
require.EqualValues(t, float64(1), data["previous_unread_mentions"])
require.EqualValues(t, float64(0), data["unread_replies"])
require.EqualValues(t, float64(0), data["unread_mentions"])
}
case <-time.After(2 * time.Second):
return
}
}
}()
require.Truef(t, caught, "User should have received %s event", model.WebsocketEventThreadFollowChanged)

require.Truef(t, caught, "User should have received %s event", model.WebsocketEventThreadReadChanged)
})

_, resp, err = th.Client.UpdateThreadReadForUser(context.Background(), th.BasicUser.Id, th.BasicTeam.Id, rpost.Id, replyPost.CreateAt+1)
resp, err = th.Client.UpdateThreadFollowForUser(context.Background(), th.BasicUser.Id, th.BasicTeam.Id, rpost.Id, false)
require.NoError(t, err)
CheckOKStatus(t, resp)

t.Run("Listed for read event", func(t *testing.T) {
t.Run("Listed for follow event", func(t *testing.T) {
var caught bool
func() {
for {
select {
case ev := <-userWSClient.EventChannel:
if ev.EventType() == model.WebsocketEventThreadReadChanged {
if ev.EventType() == model.WebsocketEventThreadFollowChanged {
caught = true

data := ev.GetData()
require.EqualValues(t, replyPost.CreateAt+1, data["timestamp"])
require.EqualValues(t, float64(1), data["previous_unread_replies"])
require.EqualValues(t, float64(1), data["previous_unread_mentions"])
require.EqualValues(t, float64(0), data["unread_replies"])
require.EqualValues(t, float64(0), data["unread_mentions"])
require.Equal(t, ev.GetData()["state"], false)
require.Equal(t, ev.GetData()["reply_count"], float64(1))
}
case <-time.After(2 * time.Second):
return
}
}
}()

require.Truef(t, caught, "User should have received %s event", model.WebsocketEventThreadReadChanged)
require.Truef(t, caught, "User should have received %s event", model.WebsocketEventThreadFollowChanged)
})

_, err = th.Client.UpdateThreadFollowForUser(context.Background(), th.BasicUser.Id, th.BasicTeam.Id, rpost.Id, true)
require.NoError(t, err)
_, resp, err = th.Client.SetThreadUnreadByPostId(context.Background(), th.BasicUser.Id, th.BasicTeam.Id, rpost.Id, rpost.Id)
require.NoError(t, err)
CheckOKStatus(t, resp)
Expand Down
28 changes: 14 additions & 14 deletions server/channels/app/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var latestVersionCache = cache.NewLRU(&cache.CacheOptions{
Size: 1,
})

func (s *Server) GetLogs(c request.CTX, page, perPage int) ([]string, *model.AppError) {
func (s *Server) GetLogs(rctx request.CTX, page, perPage int) ([]string, *model.AppError) {
var lines []string

license := s.License()
Expand All @@ -33,19 +33,19 @@ func (s *Server) GetLogs(c request.CTX, page, perPage int) ([]string, *model.App
lines = append(lines, "-----------------------------------------------------------------------------------------------------------")
lines = append(lines, "-----------------------------------------------------------------------------------------------------------")
} else {
c.Logger().Error("Could not get cluster info")
rctx.Logger().Error("Could not get cluster info")
}
}

melines, err := s.GetLogsSkipSend(page, perPage, &model.LogFilter{})
melines, err := s.GetLogsSkipSend(rctx, page, perPage, &model.LogFilter{})
if err != nil {
return nil, err
}

lines = append(lines, melines...)

if s.platform.Cluster() != nil && *s.platform.Config().ClusterSettings.Enable {
clines, err := s.platform.Cluster().GetLogs(page, perPage)
clines, err := s.platform.Cluster().GetLogs(rctx, page, perPage)
if err != nil {
return nil, err
}
Expand All @@ -56,7 +56,7 @@ func (s *Server) GetLogs(c request.CTX, page, perPage int) ([]string, *model.App
return lines, nil
}

func (s *Server) QueryLogs(c request.CTX, page, perPage int, logFilter *model.LogFilter) (map[string][]string, *model.AppError) {
func (s *Server) QueryLogs(rctx request.CTX, page, perPage int, logFilter *model.LogFilter) (map[string][]string, *model.AppError) {
logData := make(map[string][]string)

serverName := "default"
Expand All @@ -66,23 +66,23 @@ func (s *Server) QueryLogs(c request.CTX, page, perPage int, logFilter *model.Lo
if info := s.platform.Cluster().GetMyClusterInfo(); info != nil {
serverName = info.Hostname
} else {
c.Logger().Error("Could not get cluster info")
rctx.Logger().Error("Could not get cluster info")
}
}

serverNames := logFilter.ServerNames
if len(serverNames) > 0 {
for _, nodeName := range serverNames {
if nodeName == "default" {
AddLocalLogs(logData, s, page, perPage, nodeName, logFilter)
AddLocalLogs(rctx, logData, s, page, perPage, nodeName, logFilter)
}
}
} else {
AddLocalLogs(logData, s, page, perPage, serverName, logFilter)
AddLocalLogs(rctx, logData, s, page, perPage, serverName, logFilter)
}

if s.platform.Cluster() != nil && *s.Config().ClusterSettings.Enable {
clusterLogs, err := s.platform.Cluster().QueryLogs(page, perPage)
clusterLogs, err := s.platform.Cluster().QueryLogs(rctx, page, perPage)
if err != nil {
return nil, err
}
Expand All @@ -101,8 +101,8 @@ func (s *Server) QueryLogs(c request.CTX, page, perPage int, logFilter *model.Lo
return logData, nil
}

func AddLocalLogs(logData map[string][]string, s *Server, page, perPage int, serverName string, logFilter *model.LogFilter) *model.AppError {
currentServerLogs, err := s.GetLogsSkipSend(page, perPage, logFilter)
func AddLocalLogs(rctx request.CTX, logData map[string][]string, s *Server, page, perPage int, serverName string, logFilter *model.LogFilter) *model.AppError {
currentServerLogs, err := s.GetLogsSkipSend(rctx, page, perPage, logFilter)
if err != nil {
return err
}
Expand All @@ -119,12 +119,12 @@ func (a *App) GetLogs(rctx request.CTX, page, perPage int) ([]string, *model.App
return a.Srv().GetLogs(rctx, page, perPage)
}

func (s *Server) GetLogsSkipSend(page, perPage int, logFilter *model.LogFilter) ([]string, *model.AppError) {
return s.platform.GetLogsSkipSend(page, perPage, logFilter)
func (s *Server) GetLogsSkipSend(rctx request.CTX, page, perPage int, logFilter *model.LogFilter) ([]string, *model.AppError) {
return s.platform.GetLogsSkipSend(rctx, page, perPage, logFilter)
}

func (a *App) GetLogsSkipSend(rctx request.CTX, page, perPage int, logFilter *model.LogFilter) ([]string, *model.AppError) {
return a.Srv().GetLogsSkipSend(page, perPage, logFilter)
return a.Srv().GetLogsSkipSend(rctx, page, perPage, logFilter)
}

func (a *App) GetClusterStatus(rctx request.CTX) []*model.ClusterInfo {
Expand Down
2 changes: 1 addition & 1 deletion server/channels/app/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (a *App) getAnalytics(rctx request.CTX, name string, teamID string, forSupp

// If in HA mode then aggregate all the stats
if a.Cluster() != nil && *a.Config().ClusterSettings.Enable {
stats, err2 := a.Cluster().GetClusterStats()
stats, err2 := a.Cluster().GetClusterStats(rctx)
if err2 != nil {
return nil, err2
}
Expand Down
18 changes: 10 additions & 8 deletions server/channels/app/busy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,18 @@ func (c *ClusterMock) StartInterNodeCommunication() {}
func (c *ClusterMock) StopInterNodeCommunication() {}
func (c *ClusterMock) RegisterClusterMessageHandler(event model.ClusterEvent, crm einterfaces.ClusterMessageHandler) {
}
func (c *ClusterMock) GetClusterId() string { return "cluster_mock" }
func (c *ClusterMock) IsLeader() bool { return false }
func (c *ClusterMock) GetMyClusterInfo() *model.ClusterInfo { return nil }
func (c *ClusterMock) GetClusterInfos() []*model.ClusterInfo { return nil }
func (c *ClusterMock) NotifyMsg(buf []byte) {}
func (c *ClusterMock) GetClusterStats() ([]*model.ClusterStats, *model.AppError) { return nil, nil }
func (c *ClusterMock) GetLogs(page, perPage int) ([]string, *model.AppError) {
func (c *ClusterMock) GetClusterId() string { return "cluster_mock" }
func (c *ClusterMock) IsLeader() bool { return false }
func (c *ClusterMock) GetMyClusterInfo() *model.ClusterInfo { return nil }
func (c *ClusterMock) GetClusterInfos() []*model.ClusterInfo { return nil }
func (c *ClusterMock) NotifyMsg(buf []byte) {}
func (c *ClusterMock) GetClusterStats(rctx request.CTX) ([]*model.ClusterStats, *model.AppError) {
return nil, nil
}
func (c *ClusterMock) QueryLogs(page, perPage int) (map[string][]string, *model.AppError) {
func (c *ClusterMock) GetLogs(rctx request.CTX, page, perPage int) ([]string, *model.AppError) {
return nil, nil
}
func (c *ClusterMock) QueryLogs(rctx request.CTX, page, perPage int) (map[string][]string, *model.AppError) {
return nil, nil
}
func (c *ClusterMock) GenerateSupportPacket(rctx request.CTX, options *model.SupportPacketOptions) (map[string][]model.FileData, error) {
Expand Down
20 changes: 12 additions & 8 deletions server/channels/app/platform/busy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,18 @@ func (c *ClusterMock) StartInterNodeCommunication() {}
func (c *ClusterMock) StopInterNodeCommunication() {}
func (c *ClusterMock) RegisterClusterMessageHandler(event model.ClusterEvent, crm einterfaces.ClusterMessageHandler) {
}
func (c *ClusterMock) GetClusterId() string { return "cluster_mock" }
func (c *ClusterMock) IsLeader() bool { return false }
func (c *ClusterMock) GetMyClusterInfo() *model.ClusterInfo { return nil }
func (c *ClusterMock) GetClusterInfos() []*model.ClusterInfo { return nil }
func (c *ClusterMock) NotifyMsg(buf []byte) {}
func (c *ClusterMock) GetClusterStats() ([]*model.ClusterStats, *model.AppError) { return nil, nil }
func (c *ClusterMock) GetLogs(page, perPage int) ([]string, *model.AppError) { return nil, nil }
func (c *ClusterMock) QueryLogs(page, perPage int) (map[string][]string, *model.AppError) {
func (c *ClusterMock) GetClusterId() string { return "cluster_mock" }
func (c *ClusterMock) IsLeader() bool { return false }
func (c *ClusterMock) GetMyClusterInfo() *model.ClusterInfo { return nil }
func (c *ClusterMock) GetClusterInfos() []*model.ClusterInfo { return nil }
func (c *ClusterMock) NotifyMsg(buf []byte) {}
func (c *ClusterMock) GetClusterStats(rctx request.CTX) ([]*model.ClusterStats, *model.AppError) {
return nil, nil
}
func (c *ClusterMock) GetLogs(rctx request.CTX, page, perPage int) ([]string, *model.AppError) {
return nil, nil
}
func (c *ClusterMock) QueryLogs(rctx request.CTX, page, perPage int) (map[string][]string, *model.AppError) {
return nil, nil
}
func (c *ClusterMock) GenerateSupportPacket(rctx request.CTX, options *model.SupportPacketOptions) (map[string][]model.FileData, error) {
Expand Down
10 changes: 5 additions & 5 deletions server/channels/app/platform/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (ps *PlatformService) RemoveUnlicensedLogTargets(license *model.License) {
})
}

func (ps *PlatformService) GetLogsSkipSend(page, perPage int, logFilter *model.LogFilter) ([]string, *model.AppError) {
func (ps *PlatformService) GetLogsSkipSend(rctx request.CTX, page, perPage int, logFilter *model.LogFilter) ([]string, *model.AppError) {
var lines []string

if *ps.Config().LogSettings.EnableFile {
Expand Down Expand Up @@ -175,10 +175,10 @@ func (ps *PlatformService) GetLogsSkipSend(page, perPage int, logFilter *model.L
var entry *model.LogEntry
err = json.Unmarshal(line, &entry)
if err != nil {
mlog.Debug("Failed to parse line, skipping")
rctx.Logger().Debug("Failed to parse line, skipping")
} else {
filtered = isLogFilteredByLevel(logFilter, entry) || filtered
filtered = isLogFilteredByDate(logFilter, entry) || filtered
filtered = isLogFilteredByDate(rctx, logFilter, entry) || filtered
}

if filtered {
Expand Down Expand Up @@ -257,7 +257,7 @@ func isLogFilteredByLevel(logFilter *model.LogFilter, entry *model.LogEntry) boo
return true
}

func isLogFilteredByDate(logFilter *model.LogFilter, entry *model.LogEntry) bool {
func isLogFilteredByDate(rctx request.CTX, logFilter *model.LogFilter, entry *model.LogEntry) bool {
if logFilter.DateFrom == "" && logFilter.DateTo == "" {
return false
}
Expand All @@ -273,7 +273,7 @@ func isLogFilteredByDate(logFilter *model.LogFilter, entry *model.LogEntry) bool

timestamp, err := time.Parse("2006-01-02 15:04:05.999 -07:00", entry.Timestamp)
if err != nil {
mlog.Debug("Cannot parse timestamp, skipping")
rctx.Logger().Debug("Cannot parse timestamp, skipping")
return false
}

Expand Down
9 changes: 5 additions & 4 deletions server/channels/app/platform/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ import (
func (ps *PlatformService) ReturnSessionToPool(session *model.Session) {
if session != nil {
session.Id = ""
// Once the session is retrieved from the pool, all existing prop fields are cleared.
// To avoid a race between clearing the props and accessing it, clear the props maps before returning it to the pool.
// All existing prop fields are cleared once the session is retrieved from the pool.
// To speed up that process, clear the props here to avoid doing that in the hot path.
//
// If the request handler spawns a goroutine that uses the session, it might race with this code.
// In that case, the handler should copy the session and use the copy in the goroutine.
clear(session.Props)
// Also clear the team members slice to avoid a similar race condition.
clear(session.TeamMembers)
ps.sessionPool.Put(session)
}
}
Expand Down
5 changes: 4 additions & 1 deletion server/channels/app/platform/web_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,10 @@ func (ps *PlatformService) NewWebConn(cfg *WebConnConfig, suite SuiteIFace, runn
// Create a goroutine to avoid blocking the creation of the websocket connection.
ps.Go(func() {
ps.SetStatusOnline(userID, false)
ps.UpdateLastActivityAtIfNeeded(*wc.GetSession())
session := wc.GetSession()
if session != nil {
ps.UpdateLastActivityAtIfNeeded(*session)
}
})
}

Expand Down
11 changes: 4 additions & 7 deletions server/channels/app/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -2831,13 +2831,10 @@ func (a *App) UpdateThreadReadForUser(c request.CTX, currentSessionId, userID, t
return nil, err
}

opts := store.ThreadMembershipOpts{
Following: true,
UpdateFollowing: true,
}
membership, storeErr := a.Srv().Store().Thread().MaintainMembership(userID, threadID, opts)
if storeErr != nil {
return nil, model.NewAppError("UpdateThreadReadForUser", "app.user.update_thread_read_for_user.app_error", nil, "", http.StatusInternalServerError).Wrap(storeErr)
// If the thread doesn't have a membership, we shouldn't try to mark it as unread
membership, err := a.GetThreadMembershipForUser(userID, threadID)
if err != nil {
return nil, err
}

previousUnreadMentions := membership.UnreadMentions
Expand Down
Loading

0 comments on commit a89cb90

Please sign in to comment.