Skip to content

Commit

Permalink
🗂️ DB functionality to fetch rooms under space sorted by room types.
Browse files Browse the repository at this point in the history
  • Loading branch information
Danieloni1 committed Aug 6, 2024
1 parent aafeb8c commit 216950d
Show file tree
Hide file tree
Showing 10 changed files with 261 additions and 69 deletions.
106 changes: 43 additions & 63 deletions clientapi/routing/data_retention.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package routing

import (
"context"
"net/http"

"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/roomserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/matrix-org/util"
)
Expand All @@ -29,8 +31,7 @@ type DataRetention struct {
func PostDataRetention(
req *http.Request,
cfg *config.ClientAPI,
deviceAPI *api.Device,
userAPI api.ClientUserAPI,
rsAPI roomserverAPI.ClientRoomserverAPI,
) util.JSONResponse {
var body DataRetentionRequest
if reqErr := httputil.UnmarshalJSONRequest(req, &body); reqErr != nil {
Expand All @@ -44,74 +45,53 @@ func PostDataRetention(
}
}

// TODO: Fetch dms, operators and teams under the provided space.
// WITH room_ids AS (
// SELECT DISTINCT
// (REGEXP_MATCHES(event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id
// FROM roomserver_event_json
// WHERE event_json LIKE '%"state_key":"$1"%'
// AND event_json LIKE '%"type":"m.space.parent"%'
// ),
// dm_rooms AS (
// SELECT
// ARRAY_AGG(DISTINCT r.room_id) AS dm_array
// FROM roomserver_event_json e
// CROSS JOIN LATERAL (
// SELECT (REGEXP_MATCHES(e.event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id
// ) AS r
// WHERE e.event_json LIKE '%"is_direct":true%'
// AND r.room_id = ANY (
// SELECT room_id FROM room_ids
// )
// ),
// operation_rooms AS (
// SELECT
// ARRAY_AGG(DISTINCT r.room_id) AS operation_array
// FROM roomserver_event_json e
// CROSS JOIN LATERAL (
// SELECT (REGEXP_MATCHES(e.event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id
// ) AS r
// WHERE e.event_json LIKE '%"type":"connect.operation"%'
// AND r.room_id = ANY (
// SELECT room_id FROM room_ids
// )
// ),
// team_rooms AS (
// SELECT
// ARRAY_AGG(DISTINCT r.room_id) AS team_array
// FROM roomserver_event_json e
// CROSS JOIN LATERAL (
// SELECT (REGEXP_MATCHES(e.event_json, '"room_id":"([^"]+)"'))[1]::text AS room_id
// ) AS r
// WHERE r.room_id = ANY (
// SELECT room_id FROM room_ids
// )
// AND r.room_id NOT IN (
// SELECT UNNEST(operation_rooms.operation_array) FROM operation_rooms
// )
// AND r.room_id NOT IN (
// SELECT UNNEST(dm_rooms.dm_array) FROM dm_rooms
// )
// )
// SELECT
// dm_rooms.dm_array,
// operation_rooms.operation_array,
// team_rooms.team_array
// FROM
// dm_rooms,
// operation_rooms,
// team_rooms;
// Validate the roomID
validRoomID, err := spec.NewRoomID(body.DataRetentions.SpaceID)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: spec.InvalidParam("space_id is invalid"),
}
}

queryReq := api.QueryRoomsUnderSpaceRequest{
SpaceID: validRoomID.String(),
}

var queryRes api.QueryRoomsUnderSpaceResponse
if queryErr := rsAPI.QueryRoomsUnderSpace(req.Context(), &queryReq, &queryRes); queryErr != nil {
util.GetLogger(req.Context()).WithError(queryErr).Error("rsAPI.QueryRoomsUnderSpace failed")
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: spec.InternalServerError{},
}
}

if body.DataRetentions.Teams {
// TODO: Iterate and purge stale data from teams
// TODO: Replace with PerformDataRetention once it's implemented
for _, roomId := range queryRes.Teams {
if err = rsAPI.PerformAdminPurgeRoom(context.Background(), roomId); err != nil {
return util.ErrorResponse(err)
}
}
}

if body.DataRetentions.Operations {
// TODO: Iterate and purge stale data from operations
for _, roomId := range queryRes.Operations {
// TODO: Replace with PerformDataRetention once it's implemented
if err = rsAPI.PerformAdminPurgeRoom(context.Background(), roomId); err != nil {
return util.ErrorResponse(err)
}
}
}

if body.DataRetentions.Dms {
// TODO: Iterate and purge stale data from dms
for _, roomId := range queryRes.DMs {
// TODO: Replace with PerformDataRetention once it's implemented
if err = rsAPI.PerformAdminPurgeRoom(context.Background(), roomId); err != nil {
return util.ErrorResponse(err)
}
}
}

return util.JSONResponse{
Expand Down
2 changes: 1 addition & 1 deletion clientapi/routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -1549,7 +1549,7 @@ func Setup(

v3mux.Handle("/rooms/{roomID}/dataRetention",
httputil.MakeAuthAPI("data_retention", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return PostDataRetention(req, cfg, device, userAPI)
return PostDataRetention(req, cfg, rsAPI)
}),
).Methods(http.MethodPost, http.MethodOptions)

Expand Down
2 changes: 2 additions & 0 deletions roomserver/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ type ClientRoomserverAPI interface {

QueryMembershipForUser(ctx context.Context, req *QueryMembershipForUserRequest, res *QueryMembershipForUserResponse) error
QueryMembershipsForRoom(ctx context.Context, req *QueryMembershipsForRoomRequest, res *QueryMembershipsForRoomResponse) error
//! GlobeKeeper Customization
QueryRoomsUnderSpace(ctx context.Context, req *QueryRoomsUnderSpaceRequest, res *QueryRoomsUnderSpaceResponse) error
QueryRoomsForUser(ctx context.Context, userID spec.UserID, desiredMembership string) ([]spec.RoomID, error)
QueryStateAfterEvents(ctx context.Context, req *QueryStateAfterEventsRequest, res *QueryStateAfterEventsResponse) error
// QueryKnownUsers returns a list of users that we know about from our joined rooms.
Expand Down
11 changes: 11 additions & 0 deletions roomserver/api/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,17 @@ type QueryServerBannedFromRoomResponse struct {
Banned bool `json:"banned"`
}

// ! GlobeKeeper Customization
type QueryRoomsUnderSpaceRequest struct {
SpaceID string `json:"space_id"`
}

type QueryRoomsUnderSpaceResponse struct {
DMs []string `json:"dm_rooms"`
Operations []string `json:"operation_rooms"`
Teams []string `json:"team_rooms"`
}

type QueryAdminEventReportsResponse struct {
ID int64 `json:"id"`
Score int64 `json:"score"`
Expand Down
15 changes: 15 additions & 0 deletions roomserver/internal/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,21 @@ func (r *Queryer) QueryMembershipsForRoom(
return nil
}

// QueryRoomsUnderSpace implements api.RoomserverInternalAPI
func (r *Queryer) QueryRoomsUnderSpace(
ctx context.Context,
request *api.QueryRoomsUnderSpaceRequest,
response *api.QueryRoomsUnderSpaceResponse,
) error {
var err error
response.DMs, response.Operations, response.Teams, err = r.DB.QueryRoomsUnderSpace(ctx, request.SpaceID)
if err != nil {
return err
}

return nil
}

// QueryServerJoinedToRoom implements api.RoomserverInternalAPI
func (r *Queryer) QueryServerJoinedToRoom(
ctx context.Context,
Expand Down
3 changes: 3 additions & 0 deletions roomserver/storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ type Database interface {
// joinOnly is set to true.
// Returns an error if there was a problem talking to the database.
GetMembershipEventNIDsForRoom(ctx context.Context, roomNID types.RoomNID, joinOnly bool, localOnly bool) ([]types.EventNID, error)
//! GlobeKeeper Customization
// QueryRoomsUnderSpace looks up rooms under the given space and returns all of them sorted by their type (DMs, operations and teams).
QueryRoomsUnderSpace(ctx context.Context, spaceID string) (dms, operations, teams []string, err error)
// EventsFromIDs looks up the Events for a list of event IDs. Does not error if event was
// not found.
// Returns an error if the retrieval went wrong.
Expand Down
91 changes: 89 additions & 2 deletions roomserver/storage/postgres/event_json_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,68 @@ const bulkSelectEventJSONSQL = "" +
" WHERE event_nid = ANY($1)" +
" ORDER BY event_nid ASC"

const selectRoomsUnderSpaceSQL = "" +
"WITH room_ids AS (" +
" SELECT DISTINCT" +
" (REGEXP_MATCHES(event_json, '\"room_id\":\"([^\"]+)\"'))[1]::text AS room_id" +
" FROM roomserver_event_json" +
" WHERE event_json LIKE '%\"state_key\":\"$1\"%'" +
" AND event_json LIKE '%\"type\":\"m.space.parent\"%'" +
")," +
"dm_rooms AS (" +
" SELECT" +
" ARRAY_AGG(DISTINCT r.room_id) AS dm_array" +
" FROM roomserver_event_json e" +
" CROSS JOIN LATERAL (" +
" SELECT (REGEXP_MATCHES(e.event_json, '\"room_id\":\"([^\"]+)\"'))[1]::text AS room_id" +
" ) AS r" +
" WHERE e.event_json LIKE '%\"is_direct\":true%'" +
" AND r.room_id = ANY (" +
" SELECT room_id FROM room_ids" +
" )" +
")," +
"operation_rooms AS (" +
" SELECT" +
" ARRAY_AGG(DISTINCT r.room_id) AS operation_array" +
" FROM roomserver_event_json e" +
" CROSS JOIN LATERAL (" +
" SELECT (REGEXP_MATCHES(e.event_json, '\"room_id\":\"([^\"]+)\"'))[1]::text AS room_id" +
" ) AS r" +
" WHERE e.event_json LIKE '%\"type\":\"connect.operation\"%'" +
" AND r.room_id = ANY (" +
" SELECT room_id FROM room_ids" +
" )" +
")," +
"team_rooms AS (" +
" SELECT" +
" ARRAY_AGG(DISTINCT r.room_id) AS team_array" +
" FROM roomserver_event_json e" +
" CROSS JOIN LATERAL (" +
" SELECT (REGEXP_MATCHES(e.event_json, '\"room_id\":\"([^\"]+)\"'))[1]::text AS room_id" +
" ) AS r" +
" WHERE r.room_id = ANY (" +
" SELECT room_id FROM room_ids" +
" )" +
" AND r.room_id NOT IN (" +
" SELECT UNNEST(operation_rooms.operation_array) FROM operation_rooms" +
" )" +
" AND r.room_id NOT IN (" +
" SELECT UNNEST(dm_rooms.dm_array) FROM dm_rooms" +
" )" +
")" +
"SELECT" +
" dm_rooms.dm_array," +
" operation_rooms.operation_array," +
" team_rooms.team_array" +
"FROM" +
" dm_rooms," +
" operation_rooms," +
" team_rooms;"

type eventJSONStatements struct {
insertEventJSONStmt *sql.Stmt
bulkSelectEventJSONStmt *sql.Stmt
insertEventJSONStmt *sql.Stmt
bulkSelectEventJSONStmt *sql.Stmt
selectRoomsUnderSpaceSQLStmt *sql.Stmt
}

func CreateEventJSONTable(db *sql.DB) error {
Expand All @@ -70,6 +129,7 @@ func PrepareEventJSONTable(db *sql.DB) (tables.EventJSON, error) {
return s, sqlutil.StatementList{
{&s.insertEventJSONStmt, insertEventJSONSQL},
{&s.bulkSelectEventJSONStmt, bulkSelectEventJSONSQL},
{&s.selectRoomsUnderSpaceSQLStmt, selectRoomsUnderSpaceSQL},
}.Prepare(db)
}

Expand Down Expand Up @@ -107,3 +167,30 @@ func (s *eventJSONStatements) BulkSelectEventJSON(
}
return results[:i], rows.Err()
}

func (s *eventJSONStatements) SelectRoomsUnderSpace(
ctx context.Context, txn *sql.Tx, spaceID string,
) ([]string, []string, []string, error) {
stmt := sqlutil.TxStmt(txn, s.selectRoomsUnderSpaceSQLStmt)
rows, err := stmt.QueryContext(ctx, spaceID)
if err != nil {
return nil, nil, nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomsUnderSpaceSQL: rows.close() failed")

var (
Dms []string
Operations []string
Teams []string
)

if err := rows.Scan(&Dms, &Operations, &Teams); err != nil {
return nil, nil, nil, err
}

if err := rows.Err(); err != nil {
return nil, nil, nil, err
}

return Dms, Operations, Teams, nil
}
6 changes: 6 additions & 0 deletions roomserver/storage/shared/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,12 @@ func (d *Database) GetMembershipEventNIDsForRoom(
return d.getMembershipEventNIDsForRoom(ctx, nil, roomNID, joinOnly, localOnly)
}

func (d *Database) QueryRoomsUnderSpace(
ctx context.Context, spaceId string,
) ([]string, []string, []string, error) {
return d.EventJSONTable.SelectRoomsUnderSpace(ctx, nil, spaceId)
}

func (d *Database) getMembershipEventNIDsForRoom(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, joinOnly bool, localOnly bool,
) ([]types.EventNID, error) {
Expand Down
Loading

0 comments on commit 216950d

Please sign in to comment.