Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3750] Migrate Approval List #4284

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 118 additions & 32 deletions cla-backend-go/cmd/migrate_approval_list/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package main

import (
"context"
"flag"
"fmt"
"os"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -55,11 +57,25 @@ type combinedRepo struct {
projects_cla_groups.Repository
}

var emailDomainSuccessCount int
var emailSuccessCount int
var githubOrgSuccessCount int
var githubUsernameSuccessCount int
var gitlabOrgSuccessCount int
var gitlabUsernameSuccessCount int

// var emailFailureCount int
// var githubOrgFailureCount int
// var githubUsernameFailureCount int
// var gitlabOrgFailureCount int
// var gitlabUsernameFailureCount int

func init() {
stage = os.Getenv("STAGE")
if stage == "" {
log.Fatal("stage not set")
}

log.Infof("STAGE set to %s\n", stage)
approvalsTableName = fmt.Sprintf("cla-%s-approvals", stage)
approvalRepo = approvals.NewRepository(stage, awsSession, approvalsTableName)
Expand Down Expand Up @@ -91,11 +107,27 @@ func main() {
log.Info("Fetching ccla signatures")
signed := true
approved := true
// signatureID := flag.String("signature-id", "ALL", "signature ID to migrate")
delete := flag.Bool("delete", false, "delete approval items")
flag.Parse()

if *delete {
log.Info("Deleting approval items")
err := approvalRepo.DeleteAll()
if err != nil {
log.WithFields(f).WithError(err).Error("error deleting approval items")
return
}
log.Info("Deleted all approval items")
return
}

log.Info("Fetching all ccla signatures...")
cclaSignatures, err := signatureRepo.GetCCLASignatures(context.Background(), &signed, &approved)
if err != nil {
log.Fatalf("Error fetching ccla signatures : %v", err)
}
log.Info("Fetched ccla signatures")
log.Infof("Fetched %d ccla signatures", len(cclaSignatures))
eventFound = 0
eventNotFound = 0
recordExists = 0
Expand All @@ -106,6 +138,7 @@ func main() {
wg.Add(1)
go func(signature *signatures.ItemSignature) {
defer wg.Done()
log.WithFields(f).Debugf("Processing company : %s, project : %s", signature.SignatureReferenceName, signature.SignatureProjectID)
err := updateApprovalsTable(signature)
if err != nil {
log.WithFields(f).Warnf("Error updating approvals table for signature : %s, error: %v", signature.SignatureID, err)
Expand All @@ -114,6 +147,39 @@ func main() {
}
wg.Wait()
log.WithFields(f).Debugf("Events found : %d, Events not found : %d , existing Record count: %d", eventFound, eventNotFound, recordExists)



}

func counter(listType string) {
switch listType {
case utils.DomainApprovalCriteria:
emailDomainSuccessCount++
case utils.EmailApprovalCriteria:
emailSuccessCount++
case utils.GithubOrgApprovalCriteria:
githubOrgSuccessCount++
case utils.GithubUsernameApprovalCriteria:
githubUsernameSuccessCount++
case utils.GitlabOrgApprovalCriteria:
gitlabOrgSuccessCount++
case utils.GitlabUsernameApprovalCriteria:
gitlabUsernameSuccessCount++
}
}

func approvalExists(approvalItems []approvals.ApprovalItem, listType, item, projectID, signatureID string) bool {
if len(approvalItems) == 0 {
return false
}
for _, approval := range approvalItems {
if approval.ApprovalCriteria == listType && approval.ApprovalName == item && approval.ProjectID == projectID && approval.SignatureID == signatureID {
return true
}
}

return false
}

func updateApprovalsTable(signature *signatures.ItemSignature) error {
Expand All @@ -126,38 +192,62 @@ func updateApprovalsTable(signature *signatures.ItemSignature) error {
var errMutex sync.Mutex
var err error

approvalItems, err := approvalRepo.GetApprovalListBySignature(signature.SignatureID)
if err != nil {
log.WithFields(f).Warnf("Error fetching approval list items for signature : %s, error: %v", signature.SignatureID, err)
return err
}

log.WithFields(f).Debugf("Fetched %d approval list items for signature: %s", len(approvalItems), signature.SignatureID)

company, err := companyRepo.GetCompany(context.Background(), signature.SignatureReferenceID)

if err != nil {
log.WithFields(f).Warnf("Error fetching company : %s, error: %v", signature.SignatureReferenceID, err)
return err
}

if company == nil {
log.WithFields(f).Warnf("Company not found for : %s", signature.SignatureReferenceID)
return fmt.Errorf("company not found for : %s", signature.SignatureReferenceID)
}

searchTerm := "was added to the approval list"

// Get Company Project list
companyEvents, err := eventsRepo.GetCompanyClaGroupEvents(signature.SignatureProjectID, company.CompanyExternalID, nil, nil, &searchTerm, true)

if err != nil {
log.WithFields(f).Warnf("Error fetching company events : %s, error: %v", signature.SignatureProjectID, err)
return err
}

log.WithFields(f).Debugf("Fetched %d company events for project : %s and company: %s", len(companyEvents.Events), signature.SignatureProjectID, company.CompanyName)

update := func(approvalList []string, listType string) {
defer wg.Done()
for _, item := range approvalList {
searchTerm := fmt.Sprintf("%s was added to the approval list", item)
pageSize := int64(1000)
eventType := events.ClaApprovalListUpdated

// check if approval item already exists
approvalItems, searchErr := approvalRepo.SearchApprovalList(listType, item, signature.SignatureProjectID, "", signature.SignatureID)
if err != nil {
errMutex.Lock()
err = searchErr
errMutex.Unlock()
log.WithFields(f).Warnf("Error searching approval list for item : %s, error: %v", item, err)
return
}

if len(approvalItems) > 0 {
if approvalExists(approvalItems, listType, item, signature.SignatureProjectID, signature.SignatureID) {
log.WithFields(f).Debugf("Approval item already exists for : %s, %s", listType, item)
recordExists++
return
}

log.WithFields(f).Debugf("searching for events with search term : %s, projectID: %s, eventType: %s ", searchTerm, signature.SignatureProjectID, eventType)

events, eventErr := eventsRepo.GetCCLAEvents(signature.SignatureProjectID, signature.SignatureReferenceID, searchTerm, eventType, pageSize)
dateAdded := signature.DateModified

if eventErr != nil {
errMutex.Lock()
err = eventErr
errMutex.Unlock()
return
// search events for the item
for _, event := range companyEvents.Events {
if event.EventType == eventType && event.EventCompanyID == company.CompanyID && event.EventCLAGroupID == signature.SignatureProjectID && strings.Contains(strings.ToLower(event.EventData), strings.ToLower(searchTerm)) {
log.WithFields(f).Debugf("found event with id: %s, event : %+v ", event.EventID, event)
dateAdded = event.EventTime
eventFound++
break
}
}

approvalID, approvalErr := uuid.NewV4()
Expand All @@ -169,6 +259,7 @@ func updateApprovalsTable(signature *signatures.ItemSignature) error {
return
}
currentTime := time.Now().UTC().String()
note := fmt.Sprintf("Approval item added by migration script on %s", currentTime)
approvalItem := approvals.ApprovalItem{
ApprovalID: approvalID.String(),
SignatureID: signature.SignatureID,
Expand All @@ -179,22 +270,12 @@ func updateApprovalsTable(signature *signatures.ItemSignature) error {
CompanyID: signature.SignatureReferenceID,
ProjectID: signature.SignatureProjectID,
ApprovalCompanyName: signature.SignatureReferenceName,
DateAdded: dateAdded,
Note: note,
Active: true,
}

log.WithFields(f).Debugf("Adding approval item : %+v", approvalItem)

if len(events) > 0 {
event := getLatestEvent(events)
approvalItem.DateAdded = event.EventTime
log.WithFields(f).Debugf("found event with id: %s , approval: %+v ", event.EventID, approvalItem)
eventFound++
} else {
log.WithFields(f).Debugf("no events found for %s: %s", listType, item)
approvalItem.DateAdded = signature.DateModified
eventNotFound++
}

log.WithFields(f).Debugf("adding approval item : %+v", approvalItem)
approvalErr = approvalRepo.AddApprovalList(approvalItem)
if err != nil {
errMutex.Lock()
Expand All @@ -203,6 +284,8 @@ func updateApprovalsTable(signature *signatures.ItemSignature) error {
log.WithFields(f).Warnf("Error adding approval item : %v", err)
return
}

counter(listType)
}
}

Expand All @@ -226,6 +309,9 @@ func updateApprovalsTable(signature *signatures.ItemSignature) error {

wg.Wait()

log.WithFields(f).Debugf("processed for company : %s, project : %s", signature.SignatureReferenceName, signature.SignatureProjectID)
log.WithFields(f).Debugf("Email domain success count : %d, Email success count : %d, Github org success count : %d, Github username success count : %d, Gitlab org success count : %d, Gitlab username success count : %d", emailDomainSuccessCount, emailSuccessCount, githubOrgSuccessCount, githubUsernameSuccessCount, gitlabOrgSuccessCount, gitlabUsernameSuccessCount)

return err
}

Expand Down
60 changes: 57 additions & 3 deletions cla-backend-go/events/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
package events

import (
"crypto/rand"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"math/big"
"strconv"
"strings"
"time"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/communitybridge/easycla/cla-backend-go/gen/v1/models"
eventOps "github.com/communitybridge/easycla/cla-backend-go/gen/v1/restapi/operations/events"
)
Expand Down Expand Up @@ -249,6 +252,48 @@ func addTimeExpression(keyCond expression.KeyConditionBuilder, params *eventOps.
return keyCond
}

func isProvisionedThroughputExceeded(err error) bool {
f := logrus.Fields{
"functionName": "v1.events.repository.isProvisionedThroughputExceeded",
}

if err == nil {
return false
}
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == "ProvisionedThroughputExceededException" {
log.WithFields(f).WithError(err).Warn("provisioned throughput exceeded")
return true
}

log.WithFields(f).WithError(err).Warn("error checking for provisioned throughput exceeded")
return false
}

func exponentialBackoffSleep(retry int) {
// Base delay
baseDelay := 100

// Mx backoff time
maxBackoff := 20000

// Calculate delay
delay := baseDelay * (1 << retry)

if delay > maxBackoff {
delay = maxBackoff
}

// Add jitter
jitter, err := rand.Int(rand.Reader, big.NewInt(int64(delay)))
if err != nil {
log.Warnf("error generating random number: %v", err)
return
}
totalDelay := time.Duration(int64(delay)+jitter.Int64()) * time.Millisecond

time.Sleep(totalDelay)
}

// GetEvents
func (repo *repository) GetCCLAEvents(claGroupId, companyID, searchTerm, eventType string, pageSize int64) ([]*models.Event, error) {
f := logrus.Fields{
Expand Down Expand Up @@ -290,13 +335,21 @@ func (repo *repository) GetCCLAEvents(claGroupId, companyID, searchTerm, eventTy

var results *dynamodb.QueryOutput

for {
maxRetries := 5

for retry := 0; retry < maxRetries; retry++ {
// Perform the query...
log.WithFields(f).Debugf("retrying query: %d", retry)
var errQuery error
results, errQuery = repo.dynamoDBClient.Query(queryInput)
if errQuery != nil {
log.WithFields(f).WithError(errQuery).Warn("error retrieving events")
return nil, errQuery
if retry == maxRetries || isProvisionedThroughputExceeded(errQuery) {
log.WithFields(f).WithError(errQuery).Warn("error retrieving events")
return nil, errQuery
}
log.WithFields(f).WithError(errQuery).Warn("error retrieving events - retrying...")
exponentialBackoffSleep(retry)
continue
}

// Build the result models
Expand All @@ -313,6 +366,7 @@ func (repo *repository) GetCCLAEvents(claGroupId, companyID, searchTerm, eventTy
log.WithFields(f).Debugf("last evaluated key %+v", results.LastEvaluatedKey)
if len(results.LastEvaluatedKey) > 0 {
queryInput.ExclusiveStartKey = results.LastEvaluatedKey
retry = 0
} else {
break
}
Expand Down
Loading
Loading