Skip to content

Commit

Permalink
Merge branch 'main' into chore/database/housekeeping
Browse files Browse the repository at this point in the history
  • Loading branch information
jbrockopp authored Jul 12, 2023
2 parents e75947d + 8a25e31 commit f3970d2
Show file tree
Hide file tree
Showing 23 changed files with 378 additions and 117 deletions.
29 changes: 28 additions & 1 deletion api/build/list_org.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,33 @@ import (
// required: true
// type: string
// - in: query
// name: event
// description: Filter by build event
// type: string
// enum:
// - comment
// - deployment
// - pull_request
// - push
// - schedule
// - tag
// - in: query
// name: branch
// description: Filter builds by branch
// type: string
// - in: query
// name: status
// description: Filter by build status
// type: string
// enum:
// - canceled
// - error
// - failure
// - killed
// - pending
// - running
// - success
// - in: query
// name: page
// description: The page of results to retrieve
// type: integer
Expand Down Expand Up @@ -109,7 +136,7 @@ func ListBuildsForOrg(c *gin.Context) {
// verify the event provided is a valid event type
if event != constants.EventComment && event != constants.EventDeploy &&
event != constants.EventPush && event != constants.EventPull &&
event != constants.EventTag {
event != constants.EventTag && event != constants.EventSchedule {
retErr := fmt.Errorf("unable to process event %s: invalid event type provided", event)

util.HandleError(c, http.StatusBadRequest, retErr)
Expand Down
9 changes: 5 additions & 4 deletions api/build/list_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ import (
// description: Filter by build event
// type: string
// enum:
// - push
// - comment
// - deployment
// - pull_request
// - push
// - schedule
// - tag
// - deployment
// - comment
// - in: query
// name: commit
// description: Filter builds based on the commit hash
Expand Down Expand Up @@ -159,7 +160,7 @@ func ListBuildsForRepo(c *gin.Context) {
// verify the event provided is a valid event type
if event != constants.EventComment && event != constants.EventDeploy &&
event != constants.EventPush && event != constants.EventPull &&
event != constants.EventTag {
event != constants.EventTag && event != constants.EventSchedule {
retErr := fmt.Errorf("unable to process event %s: invalid event type provided", event)

util.HandleError(c, http.StatusBadRequest, retErr)
Expand Down
38 changes: 24 additions & 14 deletions api/schedule/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func CreateSchedule(c *gin.Context) {
dbSchedule.SetActive(true)

// send API call to update the schedule
err = database.FromContext(c).UpdateSchedule(dbSchedule)
err = database.FromContext(c).UpdateSchedule(dbSchedule, true)
if err != nil {
retErr := fmt.Errorf("unable to set schedule %s to active: %w", dbSchedule.GetName(), err)

Expand Down Expand Up @@ -209,21 +209,31 @@ func validateEntry(minimum time.Duration, entry string) error {
return fmt.Errorf("invalid entry of %s", entry)
}

// check the previous occurrence of the entry
prevTime, err := gronx.PrevTick(entry, true)
if err != nil {
return err
}
// iterate 5 times through ticks in an effort to catch scalene entries
tickForward := 5

// check the next occurrence of the entry
nextTime, err := gronx.NextTick(entry, true)
if err != nil {
return err
}
// start with now
t := time.Now().UTC()

for i := 0; i < tickForward; i++ {
// check the previous occurrence of the entry
prevTime, err := gronx.PrevTickBefore(entry, t, true)
if err != nil {
return err
}

// check the next occurrence of the entry
nextTime, err := gronx.NextTickAfter(entry, t, false)
if err != nil {
return err
}

// ensure the time between previous and next schedule exceeds the minimum duration
if nextTime.Sub(prevTime) < minimum {
return fmt.Errorf("entry needs to occur less frequently than every %s", minimum)
}

// ensure the time between previous and next schedule exceeds the minimum duration
if nextTime.Sub(prevTime) < minimum {
return fmt.Errorf("entry needs to occur less frequently than every %s", minimum)
t = nextTime
}

return nil
Expand Down
16 changes: 16 additions & 0 deletions api/schedule/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ func Test_validateEntry(t *testing.T) {
},
wantErr: true,
},
{
name: "exceeds minimum frequency with scalene entry pattern",
args: args{
minimum: 30 * time.Minute,
entry: "1,2,45 * * * *",
},
wantErr: true,
},
{
name: "meets minimum frequency",
args: args{
Expand All @@ -51,6 +59,14 @@ func Test_validateEntry(t *testing.T) {
},
wantErr: false,
},
{
name: "meets minimum frequency with comma entry pattern",
args: args{
minimum: 15 * time.Minute,
entry: "0,15,30,45 * * * *",
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
7 changes: 6 additions & 1 deletion api/schedule/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/go-vela/server/database"
"github.com/go-vela/server/router/middleware/repo"
"github.com/go-vela/server/router/middleware/schedule"
"github.com/go-vela/server/router/middleware/user"
"github.com/go-vela/server/util"
"github.com/go-vela/types/library"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -73,6 +74,7 @@ func UpdateSchedule(c *gin.Context) {
// capture middleware values
r := repo.Retrieve(c)
s := schedule.Retrieve(c)
u := user.Retrieve(c)
scheduleName := util.PathParameter(c, "schedule")
minimumFrequency := c.Value("scheduleminimumfrequency").(time.Duration)

Expand Down Expand Up @@ -122,8 +124,11 @@ func UpdateSchedule(c *gin.Context) {
s.SetEntry(input.GetEntry())
}

// set the updated by field using claims
s.SetUpdatedBy(u.GetName())

// update the schedule within the database
err = database.FromContext(c).UpdateSchedule(s)
err = database.FromContext(c).UpdateSchedule(s, true)
if err != nil {
retErr := fmt.Errorf("unable to update scheduled %s: %w", scheduleName, err)

Expand Down
8 changes: 7 additions & 1 deletion cmd/vela-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,15 @@ func main() {
&cli.DurationFlag{
EnvVars: []string{"VELA_SCHEDULE_MINIMUM_FREQUENCY", "SCHEDULE_MINIMUM_FREQUENCY"},
Name: "schedule-minimum-frequency",
Usage: "minimum time between each schedule entry",
Usage: "minimum time allowed between each build triggered for a schedule",
Value: 1 * time.Hour,
},
&cli.DurationFlag{
EnvVars: []string{"VELA_SCHEDULE_INTERVAL", "SCHEDULE_INTERVAL"},
Name: "schedule-interval",
Usage: "interval at which schedules will be processed by the server to trigger builds",
Value: 5 * time.Minute,
},
&cli.StringSliceFlag{
EnvVars: []string{"VELA_SCHEDULE_ALLOWLIST"},
Name: "vela-schedule-allowlist",
Expand Down
123 changes: 73 additions & 50 deletions cmd/vela-server/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
)

const baseErr = "unable to schedule build"
const (
scheduleErr = "unable to trigger build for schedule"

func processSchedules(compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
scheduleWait = "waiting to trigger build for schedule"
)

func processSchedules(start time.Time, compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
logrus.Infof("processing active schedules to create builds")

// send API call to capture the list of active schedules
Expand All @@ -37,59 +41,93 @@ func processSchedules(compiler compiler.Engine, database database.Interface, met

// iterate through the list of active schedules
for _, s := range schedules {
// sleep for 1s - 2s before processing the active schedule
//
// This should prevent multiple servers from processing a schedule at the same time by
// leveraging a base duration along with a standard deviation of randomness a.k.a.
// "jitter". To create the jitter, we use a base duration of 1s with a scale factor of 1.0.
time.Sleep(wait.Jitter(time.Second, 1.0))

// send API call to capture the schedule
//
// This is needed to ensure we are not dealing with a stale schedule since we fetch
// all schedules once and iterate through that list which can take a significant
// amount of time to get to the end of the list.
schedule, err := database.GetSchedule(s.GetID())
if err != nil {
logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName())
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())

continue
}

// create a variable to track if a build should be triggered based off the schedule
trigger := false
// ignore triggering a build if the schedule is no longer active
if !schedule.GetActive() {
logrus.Tracef("skipping to trigger build for inactive schedule %s", schedule.GetName())

// check if a build has already been triggered for the schedule
if schedule.GetScheduledAt() == 0 {
// trigger a build for the schedule since one has not already been scheduled
trigger = true
} else {
// parse the previous occurrence of the entry for the schedule
prevTime, err := gronx.PrevTick(schedule.GetEntry(), true)
if err != nil {
logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName())
continue
}

continue
}
// capture the last time a build was triggered for the schedule in UTC
scheduled := time.Unix(schedule.GetScheduledAt(), 0).UTC()

// parse the next occurrence of the entry for the schedule
nextTime, err := gronx.NextTick(schedule.GetEntry(), true)
if err != nil {
logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName())
// capture the previous occurrence of the entry rounded to the nearest whole interval
//
// i.e. if it's 4:02 on five minute intervals, this will be 4:00
prevTime, err := gronx.PrevTick(schedule.GetEntry(), true)
if err != nil {
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())

continue
}
continue
}

// parse the UNIX timestamp from when the last build was triggered for the schedule
t := time.Unix(schedule.GetScheduledAt(), 0).UTC()
// capture the next occurrence of the entry after the last schedule rounded to the nearest whole interval
//
// i.e. if it's 4:02 on five minute intervals, this will be 4:05
nextTime, err := gronx.NextTickAfter(schedule.GetEntry(), scheduled, true)
if err != nil {
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())

// check if the time since the last triggered build is greater than the entry duration for the schedule
if time.Since(t) > nextTime.Sub(prevTime) {
// trigger a build for the schedule since it has not previously ran
trigger = true
}
continue
}

if trigger && schedule.GetActive() {
err = processSchedule(schedule, compiler, database, metadata, queue, scm)
if err != nil {
logrus.WithError(err).Warnf("%s for %s", baseErr, schedule.GetName())
// check if we should wait to trigger a build for the schedule
//
// The current time must be after the next occurrence of the schedule.
if !time.Now().After(nextTime) {
logrus.Tracef("%s %s: current time not past next occurrence", scheduleWait, schedule.GetName())

continue
}
continue
}

// check if we should wait to trigger a build for the schedule
//
// The previous occurrence of the schedule must be after the starting time of processing schedules.
if !prevTime.After(start) {
logrus.Tracef("%s %s: previous occurence not after starting point", scheduleWait, schedule.GetName())

continue
}

// update the scheduled_at field with the current timestamp
//
// This should help prevent multiple servers from processing a schedule at the same time
// by updating the schedule with a new timestamp to reflect the current state.
schedule.SetScheduledAt(time.Now().UTC().Unix())

// send API call to update schedule for ensuring scheduled_at field is set
err = database.UpdateSchedule(schedule, false)
if err != nil {
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())

continue
}

// process the schedule and trigger a new build
err = processSchedule(schedule, compiler, database, metadata, queue, scm)
if err != nil {
logrus.WithError(err).Warnf("%s %s", scheduleErr, schedule.GetName())

continue
}
}

Expand All @@ -98,13 +136,6 @@ func processSchedules(compiler compiler.Engine, database database.Interface, met

//nolint:funlen // ignore function length and number of statements
func processSchedule(s *library.Schedule, compiler compiler.Engine, database database.Interface, metadata *types.Metadata, queue queue.Service, scm scm.Service) error {
// sleep for 1s - 3s before processing the schedule
//
// This should prevent multiple servers from processing a schedule at the same time by
// leveraging a base duration along with a standard deviation of randomness a.k.a.
// "jitter". To create the jitter, we use a base duration of 1s with a scale factor of 3.0.
time.Sleep(wait.Jitter(time.Second, 3.0))

// send API call to capture the repo for the schedule
r, err := database.GetRepo(s.GetRepoID())
if err != nil {
Expand Down Expand Up @@ -337,8 +368,6 @@ func processSchedule(s *library.Schedule, compiler compiler.Engine, database dat
return err
}

s.SetScheduledAt(time.Now().UTC().Unix())

// break the loop because everything was successful
break
} // end of retry loop
Expand All @@ -349,12 +378,6 @@ func processSchedule(s *library.Schedule, compiler compiler.Engine, database dat
return fmt.Errorf("unable to update repo %s: %w", r.GetFullName(), err)
}

// send API call to update schedule for ensuring scheduled_at field is set
err = database.UpdateSchedule(s)
if err != nil {
return fmt.Errorf("unable to update schedule %s/%s: %w", r.GetFullName(), s.GetName(), err)
}

// send API call to capture the triggered build
b, err = database.GetBuildForRepo(r, b.GetNumber())
if err != nil {
Expand Down
Loading

0 comments on commit f3970d2

Please sign in to comment.