Skip to content

Commit

Permalink
[#3750] Migrate Approval List
Browse files Browse the repository at this point in the history
- Updated migration of approval list items

Signed-off-by: Harold Wanyama <[email protected]>
  • Loading branch information
nickmango committed Apr 3, 2024
1 parent 28c8796 commit 745e0dc
Show file tree
Hide file tree
Showing 6 changed files with 405 additions and 85 deletions.
150 changes: 118 additions & 32 deletions cla-backend-go/cmd/migrate_approval_list/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package main

import (
"context"
"flag"
"fmt"
"os"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -55,11 +57,25 @@ type combinedRepo struct {
projects_cla_groups.Repository
}

var emailDomainSuccessCount int
var emailSuccessCount int
var githubOrgSuccessCount int
var githubUsernameSuccessCount int
var gitlabOrgSuccessCount int
var gitlabUsernameSuccessCount int

// var emailFailureCount int
// var githubOrgFailureCount int
// var githubUsernameFailureCount int
// var gitlabOrgFailureCount int
// var gitlabUsernameFailureCount int

func init() {
stage = os.Getenv("STAGE")
if stage == "" {
log.Fatal("stage not set")
}

log.Infof("STAGE set to %s\n", stage)
approvalsTableName = fmt.Sprintf("cla-%s-approvals", stage)
approvalRepo = approvals.NewRepository(stage, awsSession, approvalsTableName)
Expand Down Expand Up @@ -91,11 +107,27 @@ func main() {
log.Info("Fetching ccla signatures")
signed := true
approved := true
// signatureID := flag.String("signature-id", "ALL", "signature ID to migrate")
delete := flag.Bool("delete", false, "delete approval items")
flag.Parse()

if *delete {
log.Info("Deleting approval items")
err := approvalRepo.DeleteAll()
if err != nil {
log.WithFields(f).WithError(err).Error("error deleting approval items")
return
}
log.Info("Deleted all approval items")
return
}

log.Info("Fetching all ccla signatures...")
cclaSignatures, err := signatureRepo.GetCCLASignatures(context.Background(), &signed, &approved)
if err != nil {
log.Fatalf("Error fetching ccla signatures : %v", err)
}
log.Info("Fetched ccla signatures")
log.Infof("Fetched %d ccla signatures", len(cclaSignatures))
eventFound = 0
eventNotFound = 0
recordExists = 0
Expand All @@ -106,6 +138,7 @@ func main() {
wg.Add(1)
go func(signature *signatures.ItemSignature) {
defer wg.Done()
log.WithFields(f).Debugf("Processing company : %s, project : %s", signature.SignatureReferenceName, signature.SignatureProjectID)
err := updateApprovalsTable(signature)
if err != nil {
log.WithFields(f).Warnf("Error updating approvals table for signature : %s, error: %v", signature.SignatureID, err)
Expand All @@ -114,6 +147,39 @@ func main() {
}
wg.Wait()
log.WithFields(f).Debugf("Events found : %d, Events not found : %d , existing Record count: %d", eventFound, eventNotFound, recordExists)



}

func counter(listType string) {
switch listType {
case utils.DomainApprovalCriteria:
emailDomainSuccessCount++
case utils.EmailApprovalCriteria:
emailSuccessCount++
case utils.GithubOrgApprovalCriteria:
githubOrgSuccessCount++
case utils.GithubUsernameApprovalCriteria:
githubUsernameSuccessCount++
case utils.GitlabOrgApprovalCriteria:
gitlabOrgSuccessCount++
case utils.GitlabUsernameApprovalCriteria:
gitlabUsernameSuccessCount++
}
}

func approvalExists(approvalItems []approvals.ApprovalItem, listType, item, projectID, signatureID string) bool {
if len(approvalItems) == 0 {
return false
}
for _, approval := range approvalItems {
if approval.ApprovalCriteria == listType && approval.ApprovalName == item && approval.ProjectID == projectID && approval.SignatureID == signatureID {
return true
}
}

return false
}

func updateApprovalsTable(signature *signatures.ItemSignature) error {
Expand All @@ -126,38 +192,62 @@ func updateApprovalsTable(signature *signatures.ItemSignature) error {
var errMutex sync.Mutex
var err error

approvalItems, err := approvalRepo.GetApprovalListBySignature(signature.SignatureID)
if err != nil {
log.WithFields(f).Warnf("Error fetching approval list items for signature : %s, error: %v", signature.SignatureID, err)
return err
}

log.WithFields(f).Debugf("Fetched %d approval list items for signature: %s", len(approvalItems), signature.SignatureID)

company, err := companyRepo.GetCompany(context.Background(), signature.SignatureReferenceID)

if err != nil {
log.WithFields(f).Warnf("Error fetching company : %s, error: %v", signature.SignatureReferenceID, err)
return err
}

if company == nil {
log.WithFields(f).Warnf("Company not found for : %s", signature.SignatureReferenceID)
return fmt.Errorf("company not found for : %s", signature.SignatureReferenceID)
}

searchTerm := "was added to the approval list"

// Get Company Project list
companyEvents, err := eventsRepo.GetCompanyClaGroupEvents(signature.SignatureProjectID, company.CompanyExternalID, nil, nil, &searchTerm, true)

if err != nil {
log.WithFields(f).Warnf("Error fetching company events : %s, error: %v", signature.SignatureProjectID, err)
return err
}

log.WithFields(f).Debugf("Fetched %d company events for project : %s and company: %s", len(companyEvents.Events), signature.SignatureProjectID, company.CompanyName)

update := func(approvalList []string, listType string) {
defer wg.Done()
for _, item := range approvalList {
searchTerm := fmt.Sprintf("%s was added to the approval list", item)
pageSize := int64(1000)
eventType := events.ClaApprovalListUpdated

// check if approval item already exists
approvalItems, searchErr := approvalRepo.SearchApprovalList(listType, item, signature.SignatureProjectID, "", signature.SignatureID)
if err != nil {
errMutex.Lock()
err = searchErr
errMutex.Unlock()
log.WithFields(f).Warnf("Error searching approval list for item : %s, error: %v", item, err)
return
}

if len(approvalItems) > 0 {
if approvalExists(approvalItems, listType, item, signature.SignatureProjectID, signature.SignatureID) {
log.WithFields(f).Debugf("Approval item already exists for : %s, %s", listType, item)
recordExists++
return
}

log.WithFields(f).Debugf("searching for events with search term : %s, projectID: %s, eventType: %s ", searchTerm, signature.SignatureProjectID, eventType)

events, eventErr := eventsRepo.GetCCLAEvents(signature.SignatureProjectID, signature.SignatureReferenceID, searchTerm, eventType, pageSize)
dateAdded := signature.DateModified

if eventErr != nil {
errMutex.Lock()
err = eventErr
errMutex.Unlock()
return
// search events for the item
for _, event := range companyEvents.Events {
if event.EventType == eventType && event.EventCompanyID == company.CompanyID && event.EventCLAGroupID == signature.SignatureProjectID && strings.Contains(strings.ToLower(event.EventData), strings.ToLower(searchTerm)) {
log.WithFields(f).Debugf("found event with id: %s, event : %+v ", event.EventID, event)
dateAdded = event.EventTime
eventFound++
break
}
}

approvalID, approvalErr := uuid.NewV4()
Expand All @@ -169,6 +259,7 @@ func updateApprovalsTable(signature *signatures.ItemSignature) error {
return
}
currentTime := time.Now().UTC().String()
note := fmt.Sprintf("Approval item added by migration script on %s", currentTime)
approvalItem := approvals.ApprovalItem{
ApprovalID: approvalID.String(),
SignatureID: signature.SignatureID,
Expand All @@ -179,22 +270,12 @@ func updateApprovalsTable(signature *signatures.ItemSignature) error {
CompanyID: signature.SignatureReferenceID,
ProjectID: signature.SignatureProjectID,
ApprovalCompanyName: signature.SignatureReferenceName,
DateAdded: dateAdded,
Note: note,
Active: true,
}

log.WithFields(f).Debugf("Adding approval item : %+v", approvalItem)

if len(events) > 0 {
event := getLatestEvent(events)
approvalItem.DateAdded = event.EventTime
log.WithFields(f).Debugf("found event with id: %s , approval: %+v ", event.EventID, approvalItem)
eventFound++
} else {
log.WithFields(f).Debugf("no events found for %s: %s", listType, item)
approvalItem.DateAdded = signature.DateModified
eventNotFound++
}

log.WithFields(f).Debugf("adding approval item : %+v", approvalItem)
approvalErr = approvalRepo.AddApprovalList(approvalItem)
if err != nil {
errMutex.Lock()
Expand All @@ -203,6 +284,8 @@ func updateApprovalsTable(signature *signatures.ItemSignature) error {
log.WithFields(f).Warnf("Error adding approval item : %v", err)
return
}

counter(listType)
}
}

Expand All @@ -226,6 +309,9 @@ func updateApprovalsTable(signature *signatures.ItemSignature) error {

wg.Wait()

log.WithFields(f).Debugf("processed for company : %s, project : %s", signature.SignatureReferenceName, signature.SignatureProjectID)
log.WithFields(f).Debugf("Email domain success count : %d, Email success count : %d, Github org success count : %d, Github username success count : %d, Gitlab org success count : %d, Gitlab username success count : %d", emailDomainSuccessCount, emailSuccessCount, githubOrgSuccessCount, githubUsernameSuccessCount, gitlabOrgSuccessCount, gitlabUsernameSuccessCount)

return err
}

Expand Down
60 changes: 57 additions & 3 deletions cla-backend-go/events/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
package events

import (
"crypto/rand"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"math/big"
"strconv"
"strings"
"time"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/communitybridge/easycla/cla-backend-go/gen/v1/models"
eventOps "github.com/communitybridge/easycla/cla-backend-go/gen/v1/restapi/operations/events"
)
Expand Down Expand Up @@ -249,6 +252,48 @@ func addTimeExpression(keyCond expression.KeyConditionBuilder, params *eventOps.
return keyCond
}

func isProvisionedThroughputExceeded(err error) bool {
f := logrus.Fields{
"functionName": "v1.events.repository.isProvisionedThroughputExceeded",
}

if err == nil {
return false
}
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == "ProvisionedThroughputExceededException" {
log.WithFields(f).WithError(err).Warn("provisioned throughput exceeded")
return true
}

log.WithFields(f).WithError(err).Warn("error checking for provisioned throughput exceeded")
return false
}

func exponentialBackoffSleep(retry int) {
// Base delay
baseDelay := 100

// Mx backoff time
maxBackoff := 20000

// Calculate delay
delay := baseDelay * (1 << retry)

if delay > maxBackoff {
delay = maxBackoff
}

// Add jitter
jitter, err := rand.Int(rand.Reader, big.NewInt(int64(delay)))
if err != nil {
log.Warnf("error generating random number: %v", err)
return
}
totalDelay := time.Duration(int64(delay)+jitter.Int64()) * time.Millisecond

time.Sleep(totalDelay)
}

// GetEvents
func (repo *repository) GetCCLAEvents(claGroupId, companyID, searchTerm, eventType string, pageSize int64) ([]*models.Event, error) {
f := logrus.Fields{
Expand Down Expand Up @@ -290,13 +335,21 @@ func (repo *repository) GetCCLAEvents(claGroupId, companyID, searchTerm, eventTy

var results *dynamodb.QueryOutput

for {
maxRetries := 5

for retry := 0; retry < maxRetries; retry++ {
// Perform the query...
log.WithFields(f).Debugf("retrying query: %d", retry)
var errQuery error
results, errQuery = repo.dynamoDBClient.Query(queryInput)
if errQuery != nil {
log.WithFields(f).WithError(errQuery).Warn("error retrieving events")
return nil, errQuery
if retry == maxRetries || isProvisionedThroughputExceeded(errQuery) {
log.WithFields(f).WithError(errQuery).Warn("error retrieving events")
return nil, errQuery
}
log.WithFields(f).WithError(errQuery).Warn("error retrieving events - retrying...")
exponentialBackoffSleep(retry)
continue
}

// Build the result models
Expand All @@ -313,6 +366,7 @@ func (repo *repository) GetCCLAEvents(claGroupId, companyID, searchTerm, eventTy
log.WithFields(f).Debugf("last evaluated key %+v", results.LastEvaluatedKey)
if len(results.LastEvaluatedKey) > 0 {
queryInput.ExclusiveStartKey = results.LastEvaluatedKey
retry = 0
} else {
break
}
Expand Down
Loading

0 comments on commit 745e0dc

Please sign in to comment.