Skip to content

Commit

Permalink
Merge pull request #1195 from France-ioi/move_ancestors_prop_back_int…
Browse files Browse the repository at this point in the history
…o_tx

Move ancestors recalculation back into initiating transactions + more granular locking during ancestors recalculation
  • Loading branch information
zenovich authored Oct 16, 2024
2 parents e4c7f87 + 562b0a5 commit f92e6af
Show file tree
Hide file tree
Showing 37 changed files with 317 additions and 274 deletions.
2 changes: 1 addition & 1 deletion app/api/contests/set_additional_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func setAdditionalTimeForGroupInContest(
`, itemID, itemID).Error())
service.MustNotBeError(store.Exec("DROP TEMPORARY TABLE new_expires_at").Error())
if groupsGroupsModified {
store.ScheduleGroupsAncestorsPropagation()
service.MustNotBeError(store.GroupGroups().CreateNewAncestors())
store.ScheduleResultsPropagation()
}
}
4 changes: 2 additions & 2 deletions app/api/groups/update_permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,8 +729,8 @@ func savePermissionsIntoDB(groupID, itemID, sourceGroupID int64, dbMap map[strin
service.MustNotBeError(permissionGrantedStore.InsertOrUpdateMap(dbMap, columnsToUpdate))
s.SchedulePermissionsPropagation()
if dbMap["can_view"] != nil && dbMap["can_view"] != none || dbMap["is_owner"] != nil && dbMap["is_owner"].(bool) {
// permissionGrantedStore.After() implicitly (via triggers) marks some attempts as to_be_propagated
// when an item becomes visible, so we should propagate attempts here
// the permissions propagation implicitly (via triggers) marks some results as to_be_propagated
// when an item becomes visible, so we should propagate results here
s.ScheduleResultsPropagation()
}
}
7 changes: 2 additions & 5 deletions app/api/items/create_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ func (srv *Service) createItem(w http.ResponseWriter, r *http.Request) service.A
func validateAndInsertItem(srv *Service, r *http.Request) (itemID int64, apiError service.APIError, err error) {
user := srv.GetUser(r)
store := srv.GetStore(r)
propagationsToRun := []string{}
err = store.InTransaction(func(store *database.DataStore) error {
input := NewItemRequest{}
formData := formdata.NewFormData(&input)
Expand Down Expand Up @@ -260,14 +259,12 @@ func validateAndInsertItem(srv *Service, r *http.Request) (itemID int64, apiErro

setNewItemAsRootActivityOrSkill(store, formData, &input, itemID)

propagationsToRun = append(propagationsToRun, "items_ancestors")
service.MustNotBeError(store.ItemItems().CreateNewAncestors())

return nil
})
if err == nil {
propagationsToRun = append(propagationsToRun, "permissions", "results")

service.SchedulePropagation(store, srv.GetPropagationEndpoint(), propagationsToRun)
service.SchedulePropagation(store, srv.GetPropagationEndpoint(), []string{"permissions", "results"})
}

return itemID, apiError, err
Expand Down
4 changes: 1 addition & 3 deletions app/api/items/end_attempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ func (srv *Service) endAttempt(w http.ResponseWriter, r *http.Request) service.A
participantID, attemptID, participantID, participantID).
Error())

store.ScheduleGroupsAncestorsPropagation()

return nil
return store.GroupGroups().CreateNewAncestors()
})

if apiError != service.NoError {
Expand Down
2 changes: 1 addition & 1 deletion app/api/items/enter.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (srv *Service) enter(w http.ResponseWriter, r *http.Request) service.APIErr
ON DUPLICATE KEY UPDATE expires_at = VALUES(expires_at)`,
itemInfo.ParticipantsGroupID, entryState.groupID,
itemInfo.Now, itemInfo.Duration, totalAdditionalTime).Error())
store.ScheduleGroupsAncestorsPropagation()
service.MustNotBeError(store.GroupGroups().CreateNewAncestors())
// Upserting into groups_groups may mark some attempts as 'to_be_propagated',
// so we need to recompute them
store.ScheduleResultsPropagation()
Expand Down
2 changes: 1 addition & 1 deletion app/api/items/items.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ func insertItemItems(store *database.DataStore, spec []*insertItemItemsSpec) {

// createContestParticipantsGroup creates a new contest participants group for the given item and
// gives "can_manage:content" permission on the item to this new group.
// The method doesn't update `items.participants_group_id` or run ItemItemStore.After()
// The method doesn't update `items.participants_group_id` or run items ancestors recalculation
// (a caller should do both on their own).
func createContestParticipantsGroup(store *database.DataStore, itemID int64) int64 {
var participantsGroupID int64
Expand Down
5 changes: 2 additions & 3 deletions app/api/items/path_from_root_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,11 +818,10 @@ func Test_FindItemPath(t *testing.T) {
store := database.NewDataStore(db)
var got []items.ItemPath
assert.NoError(t, store.InTransaction(func(s *database.DataStore) error {
s.ScheduleGroupsAncestorsPropagation()
s.ScheduleItemsAncestorsPropagation()
assert.NoError(t, s.GroupGroups().CreateNewAncestors())
assert.NoError(t, s.ItemItems().CreateNewAncestors())
s.SchedulePermissionsPropagation()
s.ScheduleResultsPropagation()

return nil
}))
got = items.FindItemPaths(
Expand Down
5 changes: 1 addition & 4 deletions app/api/items/start_result_path_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,10 +445,7 @@ func Test_getDataForResultPathStart(t *testing.T) {
store := database.NewDataStore(db)
var got []map[string]interface{}
assert.NoError(t, store.InTransaction(func(s *database.DataStore) error {
s.ScheduleGroupsAncestorsPropagation()
return nil
}))
assert.NoError(t, store.InTransaction(func(s *database.DataStore) error {
assert.NoError(t, s.GroupGroups().CreateNewAncestors())
got = items.GetDataForResultPathStart(s, tt.args.participantID, tt.args.ids)
return nil
}))
Expand Down
6 changes: 3 additions & 3 deletions app/api/items/update_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,20 +264,20 @@ func updateChildrenAndRunListeners(
parentChildSpec := constructItemsItemsForChildren(input.Children, itemID)
insertItemItems(lockedStore, parentChildSpec)

propagationsToRun = append(propagationsToRun, "items_ancestors")
service.MustNotBeError(store.ItemItems().CreateNewAncestors())

return nil
})

propagationsToRun = append(propagationsToRun, "permissions", "results")
propagationsToRun = []string{"permissions", "results"}
} else if formData.IsSet("no_score") || formData.IsSet("validation_type") {
// results data of the task will be zeroed
service.MustNotBeError(
store.Exec("INSERT INTO results_propagate ? ON DUPLICATE KEY UPDATE state = 'to_be_recomputed'",
store.Results().Where("item_id = ?", itemID).
Select("participant_id, attempt_id, item_id, 'to_be_recomputed' AS state").QueryExpr()).Error())

propagationsToRun = append(propagationsToRun, "results")
propagationsToRun = []string{"results"}
}

return propagationsToRun, apiError, err
Expand Down
168 changes: 60 additions & 108 deletions app/database/ancestors.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,11 @@
package database

import (
"time"

"github.com/France-ioi/AlgoreaBackend/v2/app/logging"
"github.com/France-ioi/AlgoreaBackend/v2/golang"
"database/sql"
)

const groups = "groups"

// createNewAncestorsQueries contains the SQL queries needed for createNewAncestors.
type createNewAncestorsQueries struct {
markAsProcessingQuery string
recomputeQueries []string
markAsDoneQuery string
}

// createNewAncestors inserts new rows into
// the objectName_ancestors table (items_ancestors or groups_ancestors)
// for all rows marked with ancestors_computation_state="todo" in objectName_propagate
Expand All @@ -28,116 +18,38 @@ type createNewAncestorsQueries struct {
// - before_insert_items_items/groups_groups
// - before_delete_items_items/groups_groups.
func (s *DataStore) createNewAncestors(objectName, singleObjectName string) { /* #nosec */
CallBeforePropagationStepHook(golang.IfElse(objectName == groups, PropagationStepGroupAncestorsInit, PropagationStepItemAncestorsInit))

mustNotBeError(s.InTransaction(func(s *DataStore) error {
initTransactionTime := time.Now()

s.createNewAncestorsInsideTransactionInitStep(objectName, singleObjectName)

logging.Debugf("Duration of %v_ancestors propagation init step: %v", objectName, time.Since(initTransactionTime))

return nil
}))

queries := s.constructCreateNewAncestorsQueries(objectName, singleObjectName)

hasChanges := true
for hasChanges {
CallBeforePropagationStepHook(golang.IfElse(objectName == groups, PropagationStepGroupAncestorsMain, PropagationStepItemAncestorsMain))

mustNotBeError(s.InTransaction(func(s *DataStore) error {
initStepTransactionTime := time.Now()

rowsAffected := s.createNewAncestorsInsideTransactionStep(queries)

logging.Debugf(
"Duration of %v_ancestors propagation step: %d rows affected, took %v",
objectName,
rowsAffected,
time.Since(initStepTransactionTime),
)

hasChanges = rowsAffected > 0

return nil
}))
}
}

// createNewAncestorsInsideTransaction does the sql work of createNewAncestors.
// It has to be called in a transaction.
// Normally, createNewAncestors is called AFTER transactions.
// But there is a case where we need to call it inside: when we import the badges of the user.
// In this case, there is a verification that there are no cycles that needs the groups ancestors to be propagated.
// For now, since we keep the whole work of createNewAncestors in a single transaction, we can use this function
// when we need to propagate inside a transaction, and createNewAncestors for the normal propagation.
// In the future, we might want to split the steps here each into its own transaction.
// At that time, we'll need a better way to either:
// - Remove the need for badges cycles detection to not depend on group ancestors
// - Or refactor those two functions in a different way.
func (s *DataStore) createNewAncestorsInsideTransaction(objectName, singleObjectName string) {
s.mustBeInTransaction()

s.createNewAncestorsInsideTransactionInitStep(objectName, singleObjectName)

queries := s.constructCreateNewAncestorsQueries(objectName, singleObjectName)

hasChanges := true
for hasChanges {
rowsAffected := s.createNewAncestorsInsideTransactionStep(queries)

hasChanges = rowsAffected > 0
}
}

// createNewAncestorsInsideTransactionInitStep does the sql work of the initialization step of createNewAncestors.
func (s *DataStore) createNewAncestorsInsideTransactionInitStep(objectName, singleObjectName string) {
s.mustBeInTransaction()

// We mark as 'todo' all descendants of objects marked as 'todo'
query := `
INSERT INTO ` + objectName + `_propagate (id, ancestors_computation_state)
SELECT descendants.id, 'todo'
FROM ` + QuoteName(objectName) + ` AS descendants
JOIN ` + QuoteName(objectName+"_ancestors") + `
ON descendants.id = ` + QuoteName(objectName+"_ancestors") + ".child_" + singleObjectName + `_id
SELECT ` + QuoteName(objectName+"_ancestors") + ".child_" + singleObjectName + `_id, 'todo'
FROM ` + QuoteName(objectName+"_ancestors") + `
JOIN ` + QuoteName(objectName+"_propagate") + ` AS ancestors
ON ancestors.id = ` + QuoteName(objectName+"_ancestors") + ".ancestor_" + singleObjectName + `_id
WHERE ancestors.ancestors_computation_state = 'todo'
FOR UPDATE
FOR SHARE OF ` + QuoteName(objectName+"_ancestors") + `
FOR UPDATE OF ancestors
ON DUPLICATE KEY UPDATE ancestors_computation_state = 'todo'` /* #nosec */

mustNotBeError(s.db.Exec(query).Error)
}

// createNewAncestorsInsideTransactionStep does the sql work of a step of createNewAncestors.
func (s *DataStore) createNewAncestorsInsideTransactionStep(queries createNewAncestorsQueries) int64 {
s.mustBeInTransaction()

mustNotBeError(s.Exec(queries.markAsProcessingQuery).Error())
for i := 0; i < len(queries.recomputeQueries); i++ {
mustNotBeError(s.Exec(queries.recomputeQueries[i]).Error())
}

return s.Exec(queries.markAsDoneQuery).RowsAffected()
}
hasChanges := true

// constructCreateNewAncestorsQueries constructs the SQL queries needed for the main steps of createNewAncestors.
func (s *DataStore) constructCreateNewAncestorsQueries(objectName, singleObjectName string) (queries createNewAncestorsQueries) {
relationsTable := objectName + "_" + objectName

var additionalJoin string
var additionalJoin, additionalLocking string
if objectName == groups {
additionalJoin = " JOIN `groups` AS parent ON parent.id = groups_groups.parent_group_id AND parent.type != 'Team' "
additionalLocking = " FOR SHARE OF parent "
}
// Next queries will be executed in the loop

// We mark as "processing" all objects that were marked as 'todo' and that have no parents not marked as 'done'
// This way we prevent infinite looping as we never process objects that are descendants of themselves

/* #nosec */
queries.markAsProcessingQuery = `
markAsProcessingQuery := `
UPDATE ` + objectName + `_propagate AS children
SET children.ancestors_computation_state='processing'
WHERE children.ancestors_computation_state = 'todo' AND NOT EXISTS (
Expand All @@ -149,10 +61,16 @@ func (s *DataStore) constructCreateNewAncestorsQueries(objectName, singleObjectN
` + objectName + `_propagate.ancestors_computation_state <> 'done'
` + additionalJoin + `
WHERE ` + relationsTable + `.child_` + singleObjectName + `_id = children.id
FOR UPDATE
) has_undone_parents FOR UPDATE
FOR SHARE OF ` + relationsTable + `
FOR UPDATE OF ` + objectName + `_propagate
` + additionalLocking + `
) has_undone_parents
)`

markAsProcessing, err := s.db.CommonDB().Prepare(markAsProcessingQuery)
mustNotBeError(err)
defer func() { mustNotBeError(markAsProcessing.Close()) }()

expiresAtColumn := ""
expiresAtValueJoin := ""
ignore := "IGNORE"
Expand All @@ -164,8 +82,8 @@ func (s *DataStore) constructCreateNewAncestorsQueries(objectName, singleObjectN
}

// For every object marked as 'processing', we compute all its ancestors
queries.recomputeQueries = make([]string, 0, 3)
queries.recomputeQueries = append(queries.recomputeQueries, `
recomputeQueries := make([]string, 0, 3)
recomputeQueries = append(recomputeQueries, `
DELETE `+objectName+`_ancestors
FROM `+objectName+`_ancestors
JOIN `+objectName+`_propagate
Expand All @@ -192,12 +110,16 @@ func (s *DataStore) constructCreateNewAncestorsQueries(objectName, singleObjectN
WHERE
`+objectName+`_propagate.ancestors_computation_state = 'processing'`) // #nosec
if objectName == groups {
queries.recomputeQueries[1] += `
recomputeQueries[1] += `
AND NOW() < groups_groups.expires_at AND
NOW() < LEAST(groups_ancestors_join.expires_at, groups_groups.expires_at)
FOR UPDATE OF ` + objectName + `_propagate
FOR SHARE OF ` + objectName + `_ancestors_join
FOR SHARE OF ` + relationsTable + `
FOR SHARE OF parent
ON DUPLICATE KEY UPDATE
expires_at = GREATEST(groups_ancestors.expires_at, LEAST(groups_ancestors_join.expires_at, groups_groups.expires_at))`
queries.recomputeQueries = append(queries.recomputeQueries, `
recomputeQueries = append(recomputeQueries, `
INSERT IGNORE INTO `+objectName+`_ancestors
(
ancestor_`+singleObjectName+`_id,
Expand All @@ -210,21 +132,51 @@ func (s *DataStore) constructCreateNewAncestorsQueries(objectName, singleObjectN
WHERE groups_propagate.ancestors_computation_state = 'processing'
FOR UPDATE`) // #nosec
} else {
queries.recomputeQueries[1] += ` FOR UPDATE`
queries.recomputeQueries = append(queries.recomputeQueries, `
recomputeQueries[1] += `
FOR UPDATE OF ` + objectName + `_propagate
FOR SHARE OF ` + objectName + `_ancestors_join
FOR SHARE OF ` + relationsTable
recomputeQueries = append(recomputeQueries, `
INSERT IGNORE INTO items_ancestors (ancestor_item_id, child_item_id)
SELECT items_items.parent_item_id, items_items.child_item_id
FROM items_items
JOIN items_propagate ON items_items.child_item_id = items_propagate.id
WHERE items_propagate.ancestors_computation_state = 'processing'
FOR UPDATE`) // #nosec
FOR UPDATE OF items_propagate
FOR SHARE OF items_items`) // #nosec
}

recomputeAncestors := make([]*sql.Stmt, len(recomputeQueries))
for i := 0; i < len(recomputeQueries); i++ {
recomputeAncestors[i], err = s.db.CommonDB().Prepare(recomputeQueries[i])
mustNotBeError(err)

defer func(i int) { mustNotBeError(recomputeAncestors[i].Close()) }(i)
}

// Objects marked as 'processing' are now marked as 'done'
queries.markAsDoneQuery = `
markAsDoneQuery := `
UPDATE ` + objectName + `_propagate
SET ancestors_computation_state = 'done'
WHERE ancestors_computation_state = 'processing'` // #nosec
markAsDone, err := s.db.CommonDB().Prepare(markAsDoneQuery)
mustNotBeError(err)
defer func() { mustNotBeError(markAsDone.Close()) }()

return queries
for hasChanges {
_, err = markAsProcessing.Exec()
mustNotBeError(err)
for i := 0; i < len(recomputeAncestors); i++ {
_, err = recomputeAncestors[i].Exec()
mustNotBeError(err)
}

var result sql.Result
result, err = markAsDone.Exec()
mustNotBeError(err)
var rowsAffected int64
rowsAffected, err = result.RowsAffected()
mustNotBeError(err)
hasChanges = rowsAffected > 0
}
}
2 changes: 1 addition & 1 deletion app/database/badges.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *GroupStore) StoreBadges(badges []Badge, userID int64, newUser bool) (er
}

if ancestorsCalculationNeeded {
s.ScheduleGroupsAncestorsPropagation()
mustNotBeError(s.GroupGroups().CreateNewAncestors())
s.ScheduleResultsPropagation()
}

Expand Down
Loading

0 comments on commit f92e6af

Please sign in to comment.