Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] Adds maintenance windows !! #41508

Merged
merged 49 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
6824d0d
Add maint windows
andrewvc Jan 19, 2023
4593430
Initial maint win MVP
andrewvc Jan 19, 2023
b8a4d3c
Merge branch 'main' into maint-win
shahzad31 Nov 2, 2024
e75ec1d
added maintenance windows
shahzad31 Nov 4, 2024
09b0587
destroy cluster
shahzad31 Nov 4, 2024
1a722aa
format
shahzad31 Nov 4, 2024
82e3faf
revert file
shahzad31 Nov 4, 2024
3d3d14f
revert file
shahzad31 Nov 4, 2024
b16af0d
added lib
shahzad31 Nov 4, 2024
f3db602
revert
shahzad31 Nov 6, 2024
87410e9
Merge branch 'main' of https://github.com/elastic/beats into maintena…
shahzad31 Nov 6, 2024
2693d6a
space
shahzad31 Nov 6, 2024
7ae80b7
add license
shahzad31 Nov 6, 2024
b7b9f29
update
shahzad31 Nov 14, 2024
3a792e1
Merge remote-tracking branch 'upstream/main' into maintenance-windows
shahzad31 Nov 14, 2024
3767903
update
shahzad31 Nov 14, 2024
aa34408
fix tests
shahzad31 Nov 14, 2024
2a6b6e0
lint
shahzad31 Nov 21, 2024
de4df2b
skip run once
shahzad31 Nov 21, 2024
31a04f4
Fix linting
emilioalvap Feb 6, 2025
9641f53
Merge branch 'main' into maintenance-windows
shahzad31 Feb 6, 2025
c39784b
Remove unused fields
emilioalvap Feb 6, 2025
2998d8c
Merge branch 'main' into maintenance-windows
emilioalvap Feb 6, 2025
4160a74
Make update
emilioalvap Feb 6, 2025
7a3493d
add more test cases
shahzad31 Feb 7, 2025
474de92
utc
shahzad31 Feb 7, 2025
8027954
lint
shahzad31 Feb 10, 2025
e086be3
handle errors
shahzad31 Feb 13, 2025
62decb4
bit of refactor
shahzad31 Feb 13, 2025
7d17a2d
emilio suggestion
shahzad31 Feb 13, 2025
3de2b53
More PR feedback
shahzad31 Feb 13, 2025
9ba538e
lint
shahzad31 Feb 13, 2025
f581a44
add validation for map
shahzad31 Feb 13, 2025
e0e318d
nil condition
shahzad31 Feb 13, 2025
e34daec
set count to default
shahzad31 Feb 13, 2025
4da6fc1
add validation for only daily
shahzad31 Feb 13, 2025
3551154
add dt start validation
shahzad31 Feb 13, 2025
5f32cb6
revrt
shahzad31 Feb 13, 2025
88f3f3d
revrt
shahzad31 Feb 13, 2025
954b78d
revert
shahzad31 Feb 13, 2025
734e85b
revert
shahzad31 Feb 13, 2025
e9c2e61
format
shahzad31 Feb 13, 2025
ad3ec73
PR feedback
shahzad31 Feb 14, 2025
7b13c96
add docs
shahzad31 Feb 17, 2025
f0f96b3
Update heartbeat/monitors/maintwin/maintwin.go
shahzad31 Feb 17, 2025
f28455a
Update heartbeat/monitors/maintwin/maintwin.go
shahzad31 Feb 17, 2025
897f3e7
fomat
shahzad31 Feb 17, 2025
a80dc6e
Merge branch 'main' of https://github.com/elastic/beats into maintena…
shahzad31 Feb 17, 2025
364bc72
change log
shahzad31 Feb 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ require (
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/teambition/rrule-go v1.8.2 // indirect
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/vishvananda/netlink v1.2.1-beta.2 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,8 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/teambition/rrule-go v1.8.2 h1:lIjpjvWTj9fFUZCmuoVDrKVOtdiyzbzc93qTmRVe/J8=
github.com/teambition/rrule-go v1.8.2/go.mod h1:Ieq5AbrKGciP1V//Wq8ktsTXwSwJHDD5mD/wLBGl3p4=
github.com/tklauser/go-sysconf v0.3.10 h1:IJ1AZGZRWbY8T5Vfk04D9WOA5WSejdflXxP03OUqALw=
github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk=
github.com/tklauser/numcpus v0.4.0 h1:E53Dm1HjH1/R2/aoCtXtPgzmElmn51aOkhCFSuZq//o=
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/heartbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ heartbeat.monitors:
#timeout: 16s
# Name of corresponding APM service, if Elastic APM is in use for the monitored service.
#service.name: my-apm-service-name

# Experimental: Set this to true to run heartbeat monitors exactly once at startup
#heartbeat.run_once: true

Expand Down
99 changes: 99 additions & 0 deletions heartbeat/monitors/maintwin/maintwin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package maintwin

import (
"time"

"github.com/teambition/rrule-go"
)

var weekdayLookup = map[string]rrule.Weekday{
"MO": rrule.MO, "TU": rrule.TU, "WE": rrule.WE, "TH": rrule.TH, "FR": rrule.FR, "SA": rrule.SA, "SU": rrule.SU,
}

type MaintWin struct {
Freq int `config:"freq" validate:"required"`
Dtstart string `config:"dtstart" validate:"required"`
Interval int `config:"interval" validate:"required"`
Duration time.Duration `config:"duration" validate:"required"`
Wkst rrule.Weekday `config:"wkst"`
Count int `config:"count"`
Bysetpos []int `config:"bysetpos"`
Bymonth []int `config:"bymonth"`
Bymonthday []int `config:"bymonthday"`
Byyearday []int `config:"byyearday"`
Byweekno []int `config:"byweekno"`
Byweekday []string `config:"byweekday"`
Byhour []int `config:"byhour"`
Byminute []int `config:"byminute"`
Bysecond []int `config:"bysecond"`
Byeaster []int `config:"byeaster"`
}

func (mw *MaintWin) Parse() (r *rrule.RRule, err error) {

dtstart, _ := time.Parse(time.RFC3339, mw.Dtstart)

// Convert the string weekdays to rrule.Weekday
weekdays := []rrule.Weekday{}
for _, wd := range mw.Byweekday {
weekdays = append(weekdays, weekdayLookup[wd])
}

r, _ = rrule.NewRRule(rrule.ROption{
Freq: rrule.Frequency(mw.Freq),
Count: mw.Count,
Dtstart: dtstart,
Interval: int(mw.Interval),
Until: dtstart.Add(mw.Duration),
Byweekday: weekdays,
Byhour: mw.Byhour,
Byminute: mw.Byminute,
Bysecond: mw.Bysecond,
Byeaster: mw.Byeaster,
Bysetpos: mw.Bysetpos,
Bymonth: mw.Bymonth,
Byweekno: mw.Byweekno,
Byyearday: mw.Byyearday,
Bymonthday: mw.Bymonthday,
Wkst: mw.Wkst,
})

return r, nil
}

type ParsedMaintWin struct {
Rules []*rrule.RRule
}

func (pmw ParsedMaintWin) IsActive(tOrig time.Time) bool {
matched := false
for _, r := range pmw.Rules {
occurrences := r.All()

for _, occ := range occurrences {
if tOrig.Equal(occ) || tOrig.After(occ) && tOrig.Before(r.GetUntil()) {
matched = true
break
}
}
}

return matched
}
84 changes: 84 additions & 0 deletions heartbeat/monitors/maintwin/maintwin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package maintwin

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/teambition/rrule-go"

)

func TestMaintWin(t *testing.T) {
cases := []struct {
name string
mw MaintWin
positiveMatches []string
negativeMatches []string
}{
{
"Every sunday at midnight to 1 AM",
MaintWin{
Freq: 3,
Dtstart: time.Now().Format(time.RFC3339),
Duration: mustParseDuration("2h"),
Byweekday: []string{"SU", "MO", "TU", "WE", "TH", "FR", "SA"},
Count: 10,
},
// add 30 minutes, 1 hour, 1 hour 30 minutes to the start time
[]string{time.Now().Add(30 * time.Minute).Format(time.RFC3339), time.Now().Add(60 * time.Minute).Format(time.RFC3339), time.Now().Add(90 * time.Minute).Format(time.RFC3339)},
[]string{time.Now().Add(180 * time.Minute).Format(time.RFC3339), time.Now().Add(540 * time.Minute).Format(time.RFC3339)},
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
rules := []*rrule.RRule{}
r, err := c.mw.Parse()
require.NoError(t, err)
rules = append(rules, r)
pmw := ParsedMaintWin{Rules: rules}
for _, m := range c.positiveMatches {
t.Run(fmt.Sprintf("does match %s", m), func(t *testing.T) {
pt, err := time.Parse(time.RFC3339, m)
require.NoError(t, err)
assert.True(t, pmw.IsActive(pt))
})
}
for _, m := range c.negativeMatches {
t.Run(fmt.Sprintf("does not match %s", m), func(t *testing.T) {
pt, err := time.Parse(time.RFC3339, m)
require.NoError(t, err)
assert.False(t, pmw.IsActive(pt))
})
}
})
}
}

func mustParseDuration(s string) time.Duration {
d, err := time.ParseDuration(s)
if err != nil {
panic(fmt.Sprintf("could not parse duration %s: %s", s, err))
}
return d
}
16 changes: 9 additions & 7 deletions heartbeat/monitors/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"sync"

"github.com/elastic/beats/v7/heartbeat/monitors/maintwin"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate"

"github.com/mitchellh/hashstructure"
Expand Down Expand Up @@ -49,13 +50,14 @@ const (
// Monitor represents a configured recurring monitoring configuredJob loaded from a config file. Starting it
// will cause it to run with the given scheduler until Stop() is called.
type Monitor struct {
stdFields stdfields.StdMonitorFields
pluginName string
config *conf.C
addTask scheduler.AddTask
configuredJobs []*configuredJob
enabled bool
state int
stdFields stdfields.StdMonitorFields
parsedMaintWindow maintwin.ParsedMaintWin
pluginName string
config *conf.C
addTask scheduler.AddTask
configuredJobs []*configuredJob
enabled bool
state int
// endpoints is a count of endpoints this monitor measures.
endpoints int
// internalsMtx is used to synchronize access to critical
Expand Down
31 changes: 22 additions & 9 deletions heartbeat/monitors/stdfields/stdfields.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"time"

hbconfig "github.com/elastic/beats/v7/heartbeat/config"
"github.com/elastic/beats/v7/heartbeat/monitors/maintwin"
"github.com/elastic/beats/v7/heartbeat/scheduler/schedule"
"github.com/elastic/elastic-agent-libs/config"
"github.com/teambition/rrule-go"
)

type ServiceFields struct {
Expand All @@ -32,15 +34,17 @@ type ServiceFields struct {

// StdMonitorFields represents the generic configuration options around a monitor plugin.
type StdMonitorFields struct {
ID string `config:"id"`
Name string `config:"name"`
Type string `config:"type" validate:"required"`
Schedule *schedule.Schedule `config:"schedule" validate:"required"`
Timeout time.Duration `config:"timeout"`
Service ServiceFields `config:"service"`
Origin string `config:"origin"`
LegacyServiceName string `config:"service_name"`
MaxAttempts uint16 `config:"max_attempts"`
ID string `config:"id"`
Name string `config:"name"`
Type string `config:"type" validate:"required"`
Schedule *schedule.Schedule `config:"schedule" validate:"required"`
MaintenanceWindows []maintwin.MaintWin `config:"maintenance_windows" `
ParsedMaintenanceWindow maintwin.ParsedMaintWin
Timeout time.Duration `config:"timeout"`
Service ServiceFields `config:"service"`
Origin string `config:"origin"`
LegacyServiceName string `config:"service_name"`
MaxAttempts uint16 `config:"max_attempts"`
// Used by zip_url and local monitors
// kibana originating monitors only run one journey at a time
// and just use the `fields` syntax / manually set monitor IDs
Expand Down Expand Up @@ -76,6 +80,15 @@ func ConfigToStdMonitorFields(conf *config.C) (StdMonitorFields, error) {
if sFields.Source.Local != nil || sFields.Source.ZipUrl != nil {
sFields.IsLegacyBrowserSource = true
}
rules := []*rrule.RRule{}
for _, mw := range sFields.MaintenanceWindows {
parsed, err := mw.Parse()
if err != nil {
return StdMonitorFields{}, fmt.Errorf("could not parse maintenance window for monitor (id:%s name:%s): %w", sFields.ID, sFields.Name, err)
}
rules = append(rules, parsed)
}
sFields.ParsedMaintenanceWindow = maintwin.ParsedMaintWin{Rules: rules}

return sFields, nil
}
2 changes: 1 addition & 1 deletion heartbeat/monitors/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (t *configuredJob) Start(pubClient beat.Client) {
return
}

t.cancelFn, err = t.monitor.addTask(t.config.Schedule, t.monitor.stdFields.ID, t.makeSchedulerTaskFunc(), t.config.Type)
t.cancelFn, err = t.monitor.addTask(t.config.Schedule, t.monitor.stdFields.ParsedMaintenanceWindow, t.monitor.stdFields.ID, t.makeSchedulerTaskFunc(), t.config.Type)
if err != nil {
logp.L().Infof("could not start monitor: %v", err)
}
Expand Down
23 changes: 19 additions & 4 deletions heartbeat/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"golang.org/x/sync/semaphore"

"github.com/elastic/beats/v7/heartbeat/config"
"github.com/elastic/beats/v7/heartbeat/monitors/maintwin"
"github.com/elastic/beats/v7/heartbeat/scheduler/timerqueue"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
Expand Down Expand Up @@ -161,11 +162,11 @@ func (s *Scheduler) WaitForRunOnce() {
// has already stopped.
var ErrAlreadyStopped = errors.New("attempted to add job to already stopped scheduler")

type AddTask func(sched Schedule, id string, entrypoint TaskFunc, jobType string) (removeFn context.CancelFunc, err error)
type AddTask func(sched Schedule, pmw maintwin.ParsedMaintWin, id string, entrypoint TaskFunc, jobType string) (removeFn context.CancelFunc, err error)

// Add adds the given TaskFunc to the current scheduler. Will return an error if the scheduler
// is done.
func (s *Scheduler) Add(sched Schedule, id string, entrypoint TaskFunc, jobType string) (removeFn context.CancelFunc, err error) {
func (s *Scheduler) Add(sched Schedule, pmw maintwin.ParsedMaintWin, id string, entrypoint TaskFunc, jobType string) (removeFn context.CancelFunc, err error) {
if errors.Is(s.ctx.Err(), context.Canceled) {
return nil, ErrAlreadyStopped
}
Expand All @@ -178,7 +179,7 @@ func (s *Scheduler) Add(sched Schedule, id string, entrypoint TaskFunc, jobType

var taskFn timerqueue.TimerTaskFn

taskFn = func(_ time.Time) {
taskFn = func(now time.Time) {
select {
case <-jobCtx.Done():
debugf("Job '%v' canceled", id)
Expand All @@ -189,10 +190,24 @@ func (s *Scheduler) Add(sched Schedule, id string, entrypoint TaskFunc, jobType
debugf("Job '%s' started", id)
sj := newSchedJob(jobCtx, s, id, jobType, entrypoint)

lastRanAt := sj.run()
inMaintWin := false
if pmw.IsActive(now) {
inMaintWin = true
}

var lastRanAt time.Time
if !inMaintWin {
lastRanAt = sj.run()
} else {
logp.L().Infof("Job '%s' is in maintenance window, skipping", id)
lastRanAt = now
}
s.stats.activeJobs.Dec()

if s.runOnce {
if !inMaintWin {
// waitForPublish()
}
s.runOnceWg.Done()
} else {
// Schedule the next run
Expand Down
Loading
Loading