Skip to content

Commit

Permalink
BUG/MEDIUM: reload agent: fix race conditions in the reload agent
Browse files Browse the repository at this point in the history
  • Loading branch information
geodimm authored and mjuraga committed Oct 6, 2021
1 parent 8949d41 commit 67e65eb
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 32 deletions.
68 changes: 36 additions & 32 deletions haproxy/reload_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ type reloadCache struct {
index int64
retention int
mu sync.RWMutex
channel chan string
}

type ReloadAgentParams struct {
Expand Down Expand Up @@ -124,46 +123,41 @@ func (ra *ReloadAgent) handleReload(id string) {
ra.cache.mu.Unlock()
}()

response, err := ra.reloadHAProxy()
response, err := ra.reloadHAProxy(id)
if err != nil {
ra.cache.failReload(response)
log.Warning("Reload failed " + err.Error())
log.Warningf("Reload %s failed: %s", id, err)
} else {
ra.cache.succeedReload(response)

d := time.Duration(ra.delay) * time.Millisecond
log.Debugf("Delaying reload for %s", d.String())
time.Sleep(d)
log.Debugf("Handling reload completed, waiting for new requests")
log.Debugf("Handling reload %s completed, waiting for new requests", id)
}
}

func (ra *ReloadAgent) handleReloads() {
defer close(ra.cache.channel)
ticker := time.NewTicker(time.Duration(ra.delay) * time.Millisecond)
for {
select {
case id, ok := <-ra.cache.channel:
if !ok {
return
case <-ticker.C:
if next := ra.cache.getNext(); next != "" {
ra.handleReload(next)
}
ra.handleReload(id)
case <-ra.done:
ticker.Stop()
return
}
}
}

func (ra *ReloadAgent) reloadHAProxy() (string, error) {
func (ra *ReloadAgent) reloadHAProxy(id string) (string, error) {
// try the reload
log.Debug("Reload started...")
log.Debugf("Reload %s started", id)
t := time.Now()
output, err := execCmd(ra.reloadCmd)
log.Debug("Reload finished.")
log.Debug("Time elapsed: ", time.Since(t))
log.Debugf("Reload %s finished in %s", id, time.Since(t))
if err != nil {
reloadFailedError := err
// if failed, return to last known good file and restart and return the original file
log.Info("Reload failed, restarting with last known good config...")
log.Infof("Reload %s failed, restarting with last known good config...", id)
if err := copyFile(ra.configFile, ra.configFile+".bck"); err != nil {
return fmt.Sprintf("Reload failed: %s, failed to backup original config file for restart.", output), err
}
Expand All @@ -182,7 +176,7 @@ func (ra *ReloadAgent) reloadHAProxy() (string, error) {
log.Debug("HAProxy restarted with last known good config.")
return output, reloadFailedError
}
log.Debug("Reload successful")
log.Debugf("Reload %s successful", id)
// if success, replace last known good file
// nolint:errcheck
copyFile(ra.configFile, ra.lkgConfigFile)
Expand Down Expand Up @@ -220,15 +214,18 @@ func execCmd(cmd string) (string, error) {

// Reload schedules a reload
func (ra *ReloadAgent) Reload() string {
if ra.cache.next == "" {
ra.cache.newReload()
next := ra.cache.getNext()
if next == "" {
next = ra.cache.newReload()
log.Debugf("Scheduling a new reload with id: %s", next)
}
return ra.cache.next

return next
}

// ForceReload calls reload directly
func (ra *ReloadAgent) ForceReload() error {
r, err := ra.reloadHAProxy()
r, err := ra.reloadHAProxy("force")
if err != nil {
return NewReloadError(fmt.Sprintf("Reload failed: %v, %v", err, r))
}
Expand All @@ -244,14 +241,20 @@ func (rc *reloadCache) Init(retention int) {
rc.lastSuccess = nil
rc.index = 0
rc.retention = retention
rc.channel = make(chan string)
}

func (rc *reloadCache) newReload() {
func (rc *reloadCache) newReload() string {
next := rc.generateID()
rc.mu.Lock()
defer rc.mu.Unlock()
rc.next = rc.generateID()
rc.channel <- rc.next
rc.next = next
rc.mu.Unlock()
return next
}

func (rc *reloadCache) getNext() string {
rc.mu.RLock()
defer rc.mu.RUnlock()
return rc.next
}

func (rc *reloadCache) failReload(response string) {
Expand Down Expand Up @@ -378,10 +381,11 @@ func (ra *ReloadAgent) Restart() error {
}

func (rc *reloadCache) generateID() string {
defer func() {
rc.index++
}()
return fmt.Sprintf("%s-%v", time.Now().Format("2006-01-02"), rc.index)
rc.mu.Lock()
defer rc.mu.Unlock()
id := fmt.Sprintf("%s-%v", time.Now().Format("2006-01-02"), rc.index)
rc.index++
return id
}

func getTimeIndexFromID(id string) (time.Time, int64, error) {
Expand Down
75 changes: 75 additions & 0 deletions haproxy/reload_agent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2019 HAProxy Technologies
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package haproxy

import (
"context"
"io/ioutil"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestReloadAgentDoesntMissReloads(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
f, err := ioutil.TempFile("", "config.cfg")
assert.Nil(t, err)
assert.NotNil(t, f)
t.Cleanup(func() {
cancel()
assert.Nil(t, os.Remove(f.Name()))
})

reloadAgentParams := ReloadAgentParams{
Delay: 1,
ReloadCmd: `echo "systemctl reload haproxy"`,
RestartCmd: `echo "systemctl restart haproxy"`,
ConfigFile: f.Name(),
BackupDir: "",
Retention: 1,
Ctx: ctx,
}

ra, err := NewReloadAgent(reloadAgentParams)
assert.Nil(t, err)
assert.NotNil(t, ra)

var reloadID, firstReloadID, secondReloadID string

// trigger a reload
reloadID = ra.Reload()
assert.NotEmpty(t, reloadID)
firstReloadID = reloadID

// trigger another reload shortly after the first one but before the
// delay has elapsed which should yield the first reload ID
time.Sleep(10 * time.Millisecond)
reloadID = ra.Reload()
assert.EqualValues(t, firstReloadID, reloadID)

// sleep for as long as the delay duration to mimic a slightly
// slower DataplaneAPI operation
time.Sleep(time.Duration(reloadAgentParams.Delay) * time.Second)

// Since this is happening after the delay has elapsed, it should create
// a new reload ID
reloadID = ra.Reload()
assert.NotEmpty(t, reloadID)
secondReloadID = reloadID
assert.NotEqualValues(t, firstReloadID, secondReloadID)
}

0 comments on commit 67e65eb

Please sign in to comment.