Skip to content

Commit

Permalink
🗂️ DB functionality to execute the data retention.
Browse files Browse the repository at this point in the history
  • Loading branch information
Danieloni1 committed Aug 6, 2024
1 parent 216950d commit 48008a2
Show file tree
Hide file tree
Showing 13 changed files with 310 additions and 30 deletions.
44 changes: 23 additions & 21 deletions clientapi/routing/data_retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,11 @@ import (
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)

type DataRetentionRequest struct {
DataRetentions DataRetention `json:"data_retentions"`
}

type DataRetention struct {
SpaceID string `json:"space_id"`
Enabled bool `json:"enabled"`
MaxAge int32 `json:"max_age,required"`
Teams bool `json:"teams"`
Operations bool `json:"operations"`
Dms bool `json:"dms"`
DataRetention api.PerformDataRetentionRequest `json:"data_retention"`
}

// Triggred by an application service job.
Expand All @@ -37,16 +29,17 @@ func PostDataRetention(
if reqErr := httputil.UnmarshalJSONRequest(req, &body); reqErr != nil {
return *reqErr
}
dr := body.DataRetention

if body.DataRetentions.MaxAge <= 0 || body.DataRetentions.SpaceID == "" {
if dr.MaxAge <= 0 || dr.SpaceID == "" {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: spec.BadJSON("missing max_age or space_id"),
}
}

// Validate the roomID
validRoomID, err := spec.NewRoomID(body.DataRetentions.SpaceID)
validRoomID, err := spec.NewRoomID(dr.SpaceID)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
Expand All @@ -67,28 +60,37 @@ func PostDataRetention(
}
}

if body.DataRetentions.Teams {
// TODO: Replace with PerformDataRetention once it's implemented
if dr.Teams {
logrus.Infof("Performing data retention on teams in space %s", dr.SpaceID)
for _, roomId := range queryRes.Teams {
if err = rsAPI.PerformAdminPurgeRoom(context.Background(), roomId); err != nil {
if err = rsAPI.PerformDataRetention(context.Background(), &dr, roomId); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"space_id": dr.SpaceID,
}).Errorf("Failed to perform data retention on team with id %s", roomId)
return util.ErrorResponse(err)
}
}
}

if body.DataRetentions.Operations {
if dr.Operations {
logrus.Infof("Performing data retention on operations in space %s", dr.SpaceID)
for _, roomId := range queryRes.Operations {
// TODO: Replace with PerformDataRetention once it's implemented
if err = rsAPI.PerformAdminPurgeRoom(context.Background(), roomId); err != nil {
if err = rsAPI.PerformDataRetention(context.Background(), &dr, roomId); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"space_id": dr.SpaceID,
}).Errorf("Failed to perform data retention on operation with id %s", roomId)
return util.ErrorResponse(err)
}
}
}

if body.DataRetentions.Dms {
if dr.Dms {
logrus.Infof("Performing data retention on dms in space %s", dr.SpaceID)
for _, roomId := range queryRes.DMs {
// TODO: Replace with PerformDataRetention once it's implemented
if err = rsAPI.PerformAdminPurgeRoom(context.Background(), roomId); err != nil {
if err = rsAPI.PerformDataRetention(context.Background(), &dr, roomId); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"space_id": dr.SpaceID,
}).Errorf("Failed to perform data retention on dm with id %s", roomId)
return util.ErrorResponse(err)
}
}
Expand Down
7 changes: 7 additions & 0 deletions federationapi/storage/shared/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,3 +389,10 @@ func (d *Database) PurgeRoom(ctx context.Context, roomID string) error {
return nil
})
}

func (d *Database) DataRetentionInRoom(ctx context.Context, roomID string) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
// TODO: Implement this to support federation
return nil
})
}
6 changes: 1 addition & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,7 @@ require (
modernc.org/sqlite v1.29.5
)

require (
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
github.com/go-co-op/gocron v1.25.0
)
require github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect

require (
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
Expand Down Expand Up @@ -119,7 +116,6 @@ require (
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/rs/zerolog v1.29.1 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd/go.mod
github.com/glycerine/goconvey v0.0.0-20180728074245-46e3a41ad493/go.mod h1:Ogl1Tioa0aV7gstGFO7KhffUsb9M4ydbEbbxpcEDc24=
github.com/go-asn1-ber/asn1-ber v1.5.5 h1:MNHlNMBDgEKD4TcKr36vQN68BA00aDfjIt3/bD50WnA=
github.com/go-asn1-ber/asn1-ber v1.5.5/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0=
github.com/go-co-op/gocron v1.25.0 h1:pzAdtily1JVIf6lGby6K0JKzhishgLOllQgNxoYbR+8=
github.com/go-co-op/gocron v1.25.0/go.mod h1:JHrQDY4iE1HZPkgTyoccY4xtDgLbrUwL+xODIbEQdnc=
github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-ldap/ldap/v3 v3.4.6 h1:ert95MdbiG7aWo/oPYp9btL3KJlMPKnP58r09rI8T+A=
Expand Down Expand Up @@ -261,8 +259,6 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.29.1 h1:cO+d60CHkknCbvzEWxP0S9K6KqyTjrCNUy1LdQLCGPc=
Expand Down
2 changes: 2 additions & 0 deletions roomserver/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ type ClientRoomserverAPI interface {
PerformAdminEvacuateRoom(ctx context.Context, roomID string) (affected []string, err error)
PerformAdminEvacuateUser(ctx context.Context, userID string) (affected []string, err error)
PerformAdminPurgeRoom(ctx context.Context, roomID string) error
//! GlobeKeeper Customization
PerformDataRetention(ctx context.Context, dr *PerformDataRetentionRequest, roomID string) error
PerformAdminDownloadState(ctx context.Context, roomID, userID string, serverName spec.ServerName) error
PerformPeek(ctx context.Context, req *PerformPeekRequest) (roomID string, err error)
PerformUnpeek(ctx context.Context, roomID, userID, deviceID string) error
Expand Down
9 changes: 9 additions & 0 deletions roomserver/api/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ const (
OutputTypeRetirePeek OutputType = "retire_peek"
// OutputTypePurgeRoom indicates the event is an OutputPurgeRoom
OutputTypePurgeRoom OutputType = "purge_room"
// OutputTypeDataRetention indicates the event is an OutputDataRetention
OutputTypeDataRetention OutputType = "data_retention"
)

// An OutputEvent is an entry in the roomserver output kafka log.
Expand All @@ -84,6 +86,8 @@ type OutputEvent struct {
RetirePeek *OutputRetirePeek `json:"retire_peek,omitempty"`
// The content of the event with type OutputPurgeRoom
PurgeRoom *OutputPurgeRoom `json:"purge_room,omitempty"`
// The content of the event with type OutputDataRetention
DataRetentionInRoom *OutputDataRetention `json:"data_retention,omitempty"`
}

// Type of the OutputNewRoomEvent.
Expand Down Expand Up @@ -269,3 +273,8 @@ type OutputRetirePeek struct {
type OutputPurgeRoom struct {
RoomID string
}

type OutputDataRetention struct {
RoomID string
DR *PerformDataRetentionRequest
}
9 changes: 9 additions & 0 deletions roomserver/api/perform.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,12 @@ type PerformForgetRequest struct {
}

type PerformForgetResponse struct{}

type PerformDataRetentionRequest struct {
SpaceID string `json:"space_id"`
Enabled bool `json:"enabled"`
MaxAge int32 `json:"max_age"`
Teams bool `json:"teams"`
Operations bool `json:"operations"`
Dms bool `json:"dms"`
}
30 changes: 30 additions & 0 deletions roomserver/internal/perform/perform_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,36 @@ func (r *Admin) PerformAdminPurgeRoom(
})
}

// PerformDataRetention removes all stale encrypted chat messages from the given room according to the data retention policy.
func (r *Admin) PerformDataRetention(
ctx context.Context,
dr *api.PerformDataRetentionRequest,
roomID string,
) error {
// Validate we actually got a room ID and nothing else
if _, _, err := gomatrixserverlib.SplitID('!', roomID); err != nil {
return err
}

logrus.WithField("room_id", roomID).Warn("Performing data retention on room from roomserver")
if err := r.DB.DataRetentionInRoom(ctx, dr, roomID); err != nil {
logrus.WithField("room_id", roomID).WithError(err).Warn("Failed to perform data retention on room from roomserver")
return err
}

logrus.WithField("room_id", roomID).Warn("Performed data retention on room from roomserver, informing other components")

return r.Inputer.OutputProducer.ProduceRoomEvents(roomID, []api.OutputEvent{
{
Type: api.OutputTypeDataRetention,
DataRetentionInRoom: &api.OutputDataRetention{
RoomID: roomID,
DR: dr,
},
},
})
}

func (r *Admin) PerformAdminDownloadState(
ctx context.Context,
roomID, userID string, serverName spec.ServerName,
Expand Down
2 changes: 2 additions & 0 deletions roomserver/storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ type Database interface {
GetHistoryVisibilityState(ctx context.Context, roomInfo *types.RoomInfo, eventID string, domain string) ([]gomatrixserverlib.PDU, error)
GetLeftUsers(ctx context.Context, userIDs []string) ([]string, error)
PurgeRoom(ctx context.Context, roomID string) error
//! GlobeKeeper Customization
DataRetentionInRoom(ctx context.Context, dr *api.PerformDataRetentionRequest, roomID string) error
UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error

// GetMembershipForHistoryVisibility queries the membership events for the given eventIDs.
Expand Down
102 changes: 102 additions & 0 deletions roomserver/storage/postgres/data_retention_statements.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package postgres

import (
"context"
"database/sql"

"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/types"
)

const dataRetentionEventJSONSQL = "" +
"DELETE FROM roomserver_event_json WHERE event_nid = ANY(" +
" SELECT re.event_nid" +
" FROM roomserver_events AS re" +
" JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" +
" WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" +
" AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" +
" AND re.room_nid = $1" +
")"

const dataRetentionEventsSQL = "" +
"DELETE FROM roomserver_events WHERE event_nid = ANY(" +
" SELECT re.event_nid" +
" FROM roomserver_events AS re" +
" JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" +
" WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" +
" AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" +
" AND re.room_nid = $1" +
")"

const dataRetentionPreviousEventsSQL = "" +
"UPDATE roomserver_previous_events SET event_nids = array_remove(event_nids, subquery.event_nid) FROM (" +
" SELECT re.event_nid" +
" FROM roomserver_events AS re" +
" JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" +
" WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" +
" AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" +
" AND re.room_nid = $1" +
") AS subquery" +
"WHERE event_nids @> ARRAY[subquery.event_nid]"

const dataRetentionStateBlockEntriesSQL = "" +
"UPDATE roomserver_state_block SET event_nids = array_remove(event_nids, subquery.event_nid) FROM (" +
" SELECT re.event_nid" +
" FROM roomserver_events AS re" +
" JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" +
" WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" +
" AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" +
" AND re.room_nid = $1" +
") AS subquery" +
"WHERE event_nids @> ARRAY[subquery.event_nid]"

type dataRetentionStatements struct {
dataRetentionEventJSONStmt *sql.Stmt
dataRetentionEventsStmt *sql.Stmt
dataRetentionPreviousEventsStmt *sql.Stmt
dataRetentionStateBlockEntriesStmt *sql.Stmt
}

func PrepareDataRetentionStatements(db *sql.DB) (*dataRetentionStatements, error) {
s := &dataRetentionStatements{}

return s, sqlutil.StatementList{
{&s.dataRetentionEventJSONStmt, dataRetentionEventJSONSQL},
{&s.dataRetentionEventsStmt, dataRetentionEventsSQL},
{&s.dataRetentionPreviousEventsStmt, dataRetentionPreviousEventsSQL},
{&s.dataRetentionStateBlockEntriesStmt, dataRetentionStateBlockEntriesSQL},
}.Prepare(db)
}

func (s *dataRetentionStatements) DataRetentionInRoom(
ctx context.Context, txn *sql.Tx, dr *api.PerformDataRetentionRequest, roomNID types.RoomNID, roomID string,
) error {
dataRetentionByRoomNID := []*sql.Stmt{
s.dataRetentionEventJSONStmt,
s.dataRetentionEventsStmt,
s.dataRetentionPreviousEventsStmt,
s.dataRetentionStateBlockEntriesStmt,
}
for _, stmt := range dataRetentionByRoomNID {
_, err := sqlutil.TxStmt(txn, stmt).ExecContext(ctx, roomNID)
if err != nil {
return err
}
}
return nil
}
17 changes: 17 additions & 0 deletions roomserver/storage/shared/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Database struct {
MembershipTable tables.Membership
PublishedTable tables.Published
Purge tables.Purge
DataRetention tables.DataRetention
UserRoomKeyTable tables.UserRoomKeys
GetRoomUpdaterFn func(ctx context.Context, roomInfo *types.RoomInfo) (*RoomUpdater, error)
}
Expand Down Expand Up @@ -1686,6 +1687,22 @@ func (d *Database) PurgeRoom(ctx context.Context, roomID string) error {
})
}

// DataRetentionInRoom removes all stale encrypted chat messages within a given room from the roomserver.
// For large rooms this operation may take a considerable amount of time.
func (d *Database) DataRetentionInRoom(ctx context.Context, dr *api.PerformDataRetentionRequest, roomID string) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
// Remove `ForUpdate` to execute without locking the records (might be needed since data retention is a long running operation)
roomNID, err := d.RoomsTable.SelectRoomNIDForUpdate(ctx, txn, roomID)
if err != nil {
if err == sql.ErrNoRows {
return fmt.Errorf("room %s does not exist", roomID)
}
return fmt.Errorf("failed to lock the room: %w", err)
}
return d.DataRetention.DataRetentionInRoom(ctx, txn, dr, roomNID, roomID)
})
}

func (d *Database) UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error {

return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
Expand Down
Loading

0 comments on commit 48008a2

Please sign in to comment.