diff --git a/clientapi/routing/data_retention.go b/clientapi/routing/data_retention.go index 794919c685..c46fc46cca 100644 --- a/clientapi/routing/data_retention.go +++ b/clientapi/routing/data_retention.go @@ -10,19 +10,11 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib/spec" "github.com/matrix-org/util" + "github.com/sirupsen/logrus" ) type DataRetentionRequest struct { - DataRetentions DataRetention `json:"data_retentions"` -} - -type DataRetention struct { - SpaceID string `json:"space_id"` - Enabled bool `json:"enabled"` - MaxAge int32 `json:"max_age,required"` - Teams bool `json:"teams"` - Operations bool `json:"operations"` - Dms bool `json:"dms"` + DataRetention api.PerformDataRetentionRequest `json:"data_retention"` } // Triggred by an application service job. @@ -37,8 +29,9 @@ func PostDataRetention( if reqErr := httputil.UnmarshalJSONRequest(req, &body); reqErr != nil { return *reqErr } + dr := body.DataRetention - if body.DataRetentions.MaxAge <= 0 || body.DataRetentions.SpaceID == "" { + if dr.MaxAge <= 0 || dr.SpaceID == "" { return util.JSONResponse{ Code: http.StatusBadRequest, JSON: spec.BadJSON("missing max_age or space_id"), @@ -46,7 +39,7 @@ func PostDataRetention( } // Validate the roomID - validRoomID, err := spec.NewRoomID(body.DataRetentions.SpaceID) + validRoomID, err := spec.NewRoomID(dr.SpaceID) if err != nil { return util.JSONResponse{ Code: http.StatusBadRequest, @@ -67,28 +60,37 @@ func PostDataRetention( } } - if body.DataRetentions.Teams { - // TODO: Replace with PerformDataRetention once it's implemented + if dr.Teams { + logrus.Infof("Performing data retention on teams in space %s", dr.SpaceID) for _, roomId := range queryRes.Teams { - if err = rsAPI.PerformAdminPurgeRoom(context.Background(), roomId); err != nil { + if err = rsAPI.PerformDataRetention(context.Background(), &dr, roomId); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "space_id": dr.SpaceID, + }).Errorf("Failed to perform data retention on team with id %s", roomId) return util.ErrorResponse(err) } } } - if body.DataRetentions.Operations { + if dr.Operations { + logrus.Infof("Performing data retention on operations in space %s", dr.SpaceID) for _, roomId := range queryRes.Operations { - // TODO: Replace with PerformDataRetention once it's implemented - if err = rsAPI.PerformAdminPurgeRoom(context.Background(), roomId); err != nil { + if err = rsAPI.PerformDataRetention(context.Background(), &dr, roomId); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "space_id": dr.SpaceID, + }).Errorf("Failed to perform data retention on operation with id %s", roomId) return util.ErrorResponse(err) } } } - if body.DataRetentions.Dms { + if dr.Dms { + logrus.Infof("Performing data retention on dms in space %s", dr.SpaceID) for _, roomId := range queryRes.DMs { - // TODO: Replace with PerformDataRetention once it's implemented - if err = rsAPI.PerformAdminPurgeRoom(context.Background(), roomId); err != nil { + if err = rsAPI.PerformDataRetention(context.Background(), &dr, roomId); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "space_id": dr.SpaceID, + }).Errorf("Failed to perform data retention on dm with id %s", roomId) return util.ErrorResponse(err) } } diff --git a/federationapi/storage/shared/storage.go b/federationapi/storage/shared/storage.go index 8c73967c6f..1ac920c226 100644 --- a/federationapi/storage/shared/storage.go +++ b/federationapi/storage/shared/storage.go @@ -389,3 +389,10 @@ func (d *Database) PurgeRoom(ctx context.Context, roomID string) error { return nil }) } + +func (d *Database) DataRetentionInRoom(ctx context.Context, roomID string) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + // TODO: Implement this to support federation + return nil + }) +} diff --git a/go.mod b/go.mod index 20fd944e24..df576e4cfe 100644 --- a/go.mod +++ b/go.mod @@ -55,10 +55,7 @@ require ( modernc.org/sqlite v1.29.5 ) -require ( - github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect - github.com/go-co-op/gocron v1.25.0 -) +require github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect require ( github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect @@ -119,7 +116,6 @@ require ( github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect - github.com/robfig/cron/v3 v3.0.1 // indirect github.com/rs/zerolog v1.29.1 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect diff --git a/go.sum b/go.sum index 451330a812..971617a84a 100644 --- a/go.sum +++ b/go.sum @@ -117,8 +117,6 @@ github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd/go.mod github.com/glycerine/goconvey v0.0.0-20180728074245-46e3a41ad493/go.mod h1:Ogl1Tioa0aV7gstGFO7KhffUsb9M4ydbEbbxpcEDc24= github.com/go-asn1-ber/asn1-ber v1.5.5 h1:MNHlNMBDgEKD4TcKr36vQN68BA00aDfjIt3/bD50WnA= github.com/go-asn1-ber/asn1-ber v1.5.5/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0= -github.com/go-co-op/gocron v1.25.0 h1:pzAdtily1JVIf6lGby6K0JKzhishgLOllQgNxoYbR+8= -github.com/go-co-op/gocron v1.25.0/go.mod h1:JHrQDY4iE1HZPkgTyoccY4xtDgLbrUwL+xODIbEQdnc= github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-ldap/ldap/v3 v3.4.6 h1:ert95MdbiG7aWo/oPYp9btL3KJlMPKnP58r09rI8T+A= @@ -261,8 +259,6 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= -github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= -github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.29.1 h1:cO+d60CHkknCbvzEWxP0S9K6KqyTjrCNUy1LdQLCGPc= diff --git a/roomserver/api/api.go b/roomserver/api/api.go index 850b302a58..21a068a03e 100644 --- a/roomserver/api/api.go +++ b/roomserver/api/api.go @@ -249,6 +249,8 @@ type ClientRoomserverAPI interface { PerformAdminEvacuateRoom(ctx context.Context, roomID string) (affected []string, err error) PerformAdminEvacuateUser(ctx context.Context, userID string) (affected []string, err error) PerformAdminPurgeRoom(ctx context.Context, roomID string) error + //! GlobeKeeper Customization + PerformDataRetention(ctx context.Context, dr *PerformDataRetentionRequest, roomID string) error PerformAdminDownloadState(ctx context.Context, roomID, userID string, serverName spec.ServerName) error PerformPeek(ctx context.Context, req *PerformPeekRequest) (roomID string, err error) PerformUnpeek(ctx context.Context, roomID, userID, deviceID string) error diff --git a/roomserver/api/output.go b/roomserver/api/output.go index 852b64206d..227680756a 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -59,6 +59,8 @@ const ( OutputTypeRetirePeek OutputType = "retire_peek" // OutputTypePurgeRoom indicates the event is an OutputPurgeRoom OutputTypePurgeRoom OutputType = "purge_room" + // OutputTypeDataRetention indicates the event is an OutputDataRetention + OutputTypeDataRetention OutputType = "data_retention" ) // An OutputEvent is an entry in the roomserver output kafka log. @@ -84,6 +86,8 @@ type OutputEvent struct { RetirePeek *OutputRetirePeek `json:"retire_peek,omitempty"` // The content of the event with type OutputPurgeRoom PurgeRoom *OutputPurgeRoom `json:"purge_room,omitempty"` + // The content of the event with type OutputDataRetention + DataRetentionInRoom *OutputDataRetention `json:"data_retention,omitempty"` } // Type of the OutputNewRoomEvent. @@ -269,3 +273,8 @@ type OutputRetirePeek struct { type OutputPurgeRoom struct { RoomID string } + +type OutputDataRetention struct { + RoomID string + DR *PerformDataRetentionRequest +} diff --git a/roomserver/api/perform.go b/roomserver/api/perform.go index d6caec08c0..9c3145832b 100644 --- a/roomserver/api/perform.go +++ b/roomserver/api/perform.go @@ -171,3 +171,12 @@ type PerformForgetRequest struct { } type PerformForgetResponse struct{} + +type PerformDataRetentionRequest struct { + SpaceID string `json:"space_id"` + Enabled bool `json:"enabled"` + MaxAge int32 `json:"max_age"` + Teams bool `json:"teams"` + Operations bool `json:"operations"` + Dms bool `json:"dms"` +} diff --git a/roomserver/internal/perform/perform_admin.go b/roomserver/internal/perform/perform_admin.go index 1b88172343..84426bb75c 100644 --- a/roomserver/internal/perform/perform_admin.go +++ b/roomserver/internal/perform/perform_admin.go @@ -222,6 +222,36 @@ func (r *Admin) PerformAdminPurgeRoom( }) } +// PerformDataRetention removes all stale encrypted chat messages from the given room according to the data retention policy. +func (r *Admin) PerformDataRetention( + ctx context.Context, + dr *api.PerformDataRetentionRequest, + roomID string, +) error { + // Validate we actually got a room ID and nothing else + if _, _, err := gomatrixserverlib.SplitID('!', roomID); err != nil { + return err + } + + logrus.WithField("room_id", roomID).Warn("Performing data retention on room from roomserver") + if err := r.DB.DataRetentionInRoom(ctx, dr, roomID); err != nil { + logrus.WithField("room_id", roomID).WithError(err).Warn("Failed to perform data retention on room from roomserver") + return err + } + + logrus.WithField("room_id", roomID).Warn("Performed data retention on room from roomserver, informing other components") + + return r.Inputer.OutputProducer.ProduceRoomEvents(roomID, []api.OutputEvent{ + { + Type: api.OutputTypeDataRetention, + DataRetentionInRoom: &api.OutputDataRetention{ + RoomID: roomID, + DR: dr, + }, + }, + }) +} + func (r *Admin) PerformAdminDownloadState( ctx context.Context, roomID, userID string, serverName spec.ServerName, diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index 9595081ee4..a3fd96c6c6 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -180,6 +180,8 @@ type Database interface { GetHistoryVisibilityState(ctx context.Context, roomInfo *types.RoomInfo, eventID string, domain string) ([]gomatrixserverlib.PDU, error) GetLeftUsers(ctx context.Context, userIDs []string) ([]string, error) PurgeRoom(ctx context.Context, roomID string) error + //! GlobeKeeper Customization + DataRetentionInRoom(ctx context.Context, dr *api.PerformDataRetentionRequest, roomID string) error UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error // GetMembershipForHistoryVisibility queries the membership events for the given eventIDs. diff --git a/roomserver/storage/postgres/data_retention_statements.go b/roomserver/storage/postgres/data_retention_statements.go new file mode 100644 index 0000000000..dc47c7eeac --- /dev/null +++ b/roomserver/storage/postgres/data_retention_statements.go @@ -0,0 +1,102 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/roomserver/types" +) + +const dataRetentionEventJSONSQL = "" + + "DELETE FROM roomserver_event_json WHERE event_nid = ANY(" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ")" + +const dataRetentionEventsSQL = "" + + "DELETE FROM roomserver_events WHERE event_nid = ANY(" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ")" + +const dataRetentionPreviousEventsSQL = "" + + "UPDATE roomserver_previous_events SET event_nids = array_remove(event_nids, subquery.event_nid) FROM (" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ") AS subquery" + + "WHERE event_nids @> ARRAY[subquery.event_nid]" + +const dataRetentionStateBlockEntriesSQL = "" + + "UPDATE roomserver_state_block SET event_nids = array_remove(event_nids, subquery.event_nid) FROM (" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ") AS subquery" + + "WHERE event_nids @> ARRAY[subquery.event_nid]" + +type dataRetentionStatements struct { + dataRetentionEventJSONStmt *sql.Stmt + dataRetentionEventsStmt *sql.Stmt + dataRetentionPreviousEventsStmt *sql.Stmt + dataRetentionStateBlockEntriesStmt *sql.Stmt +} + +func PrepareDataRetentionStatements(db *sql.DB) (*dataRetentionStatements, error) { + s := &dataRetentionStatements{} + + return s, sqlutil.StatementList{ + {&s.dataRetentionEventJSONStmt, dataRetentionEventJSONSQL}, + {&s.dataRetentionEventsStmt, dataRetentionEventsSQL}, + {&s.dataRetentionPreviousEventsStmt, dataRetentionPreviousEventsSQL}, + {&s.dataRetentionStateBlockEntriesStmt, dataRetentionStateBlockEntriesSQL}, + }.Prepare(db) +} + +func (s *dataRetentionStatements) DataRetentionInRoom( + ctx context.Context, txn *sql.Tx, dr *api.PerformDataRetentionRequest, roomNID types.RoomNID, roomID string, +) error { + dataRetentionByRoomNID := []*sql.Stmt{ + s.dataRetentionEventJSONStmt, + s.dataRetentionEventsStmt, + s.dataRetentionPreviousEventsStmt, + s.dataRetentionStateBlockEntriesStmt, + } + for _, stmt := range dataRetentionByRoomNID { + _, err := sqlutil.TxStmt(txn, stmt).ExecContext(ctx, roomNID) + if err != nil { + return err + } + } + return nil +} diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index 1c9ec75bbb..e7a2ce7268 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -46,6 +46,7 @@ type Database struct { MembershipTable tables.Membership PublishedTable tables.Published Purge tables.Purge + DataRetention tables.DataRetention UserRoomKeyTable tables.UserRoomKeys GetRoomUpdaterFn func(ctx context.Context, roomInfo *types.RoomInfo) (*RoomUpdater, error) } @@ -1686,6 +1687,22 @@ func (d *Database) PurgeRoom(ctx context.Context, roomID string) error { }) } +// DataRetentionInRoom removes all stale encrypted chat messages within a given room from the roomserver. +// For large rooms this operation may take a considerable amount of time. +func (d *Database) DataRetentionInRoom(ctx context.Context, dr *api.PerformDataRetentionRequest, roomID string) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + // Remove `ForUpdate` to execute without locking the records (might be needed since data retention is a long running operation) + roomNID, err := d.RoomsTable.SelectRoomNIDForUpdate(ctx, txn, roomID) + if err != nil { + if err == sql.ErrNoRows { + return fmt.Errorf("room %s does not exist", roomID) + } + return fmt.Errorf("failed to lock the room: %w", err) + } + return d.DataRetention.DataRetentionInRoom(ctx, txn, dr, roomNID, roomID) + }) +} + func (d *Database) UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { diff --git a/roomserver/storage/sqlite3/data_retention_statements.go b/roomserver/storage/sqlite3/data_retention_statements.go new file mode 100644 index 0000000000..1c14a94419 --- /dev/null +++ b/roomserver/storage/sqlite3/data_retention_statements.go @@ -0,0 +1,102 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/roomserver/types" +) + +const dataRetentionEventJSONSQL = "" + + "DELETE FROM roomserver_event_json WHERE event_nid = ANY(" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ")" + +const dataRetentionEventsSQL = "" + + "DELETE FROM roomserver_events WHERE event_nid = ANY(" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ")" + +const dataRetentionPreviousEventsSQL = "" + + "UPDATE roomserver_previous_events SET event_nids = array_remove(event_nids, subquery.event_nid) FROM (" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ") AS subquery" + + "WHERE event_nids @> ARRAY[subquery.event_nid]" + +const dataRetentionStateBlockEntriesSQL = "" + + "UPDATE roomserver_state_block SET event_nids = array_remove(event_nids, subquery.event_nid) FROM (" + + " SELECT re.event_nid" + + " FROM roomserver_events AS re" + + " JOIN roomserver_event_json AS rej ON re.event_nid = rej.event_nid" + + " WHERE rej.event_json LIKE '%\"type\":\"m.room.encrypted\"%'" + + " AND CAST(rej.event_json::jsonb->>'origin_server_ts' AS BIGINT) >= EXTRACT(EPOCH FROM NOW()) * 1000 - $2" + + " AND re.room_nid = $1" + + ") AS subquery" + + "WHERE event_nids @> ARRAY[subquery.event_nid]" + +type dataRetentionStatements struct { + dataRetentionEventJSONStmt *sql.Stmt + dataRetentionEventsStmt *sql.Stmt + dataRetentionPreviousEventsStmt *sql.Stmt + dataRetentionStateBlockEntriesStmt *sql.Stmt +} + +func PrepareDataRetentionStatements(db *sql.DB) (*dataRetentionStatements, error) { + s := &dataRetentionStatements{} + + return s, sqlutil.StatementList{ + {&s.dataRetentionEventJSONStmt, dataRetentionEventJSONSQL}, + {&s.dataRetentionEventsStmt, dataRetentionEventsSQL}, + {&s.dataRetentionPreviousEventsStmt, dataRetentionPreviousEventsSQL}, + {&s.dataRetentionStateBlockEntriesStmt, dataRetentionStateBlockEntriesSQL}, + }.Prepare(db) +} + +func (s *dataRetentionStatements) DataRetentionInRoom( + ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, roomID string, +) error { + + dataRetentionByRoomNID := []*sql.Stmt{ + s.dataRetentionEventJSONStmt, + s.dataRetentionEventsStmt, + s.dataRetentionPreviousEventsStmt, + s.dataRetentionStateBlockEntriesStmt, + } + for _, stmt := range dataRetentionByRoomNID { + _, err := sqlutil.TxStmt(txn, stmt).ExecContext(ctx, roomNID) + if err != nil { + return err + } + } + return nil +} diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index 8243fbeee1..e4bbd65dcd 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -215,6 +215,12 @@ type Purge interface { ) error } +type DataRetention interface { + DataRetentionInRoom( + ctx context.Context, txn *sql.Tx, dr *api.PerformDataRetentionRequest, roomNID types.RoomNID, roomID string, + ) error +} + type UserRoomKeys interface { // InsertUserRoomPrivatePublicKey inserts the given private key as well as the public key for it. This should be used // when creating keys locally.