Skip to content

Commit

Permalink
Use Detected field of endpointregistry in the loadbalancer (#2629)
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Zavodskikh <[email protected]>
Co-authored-by: Roman Zavodskikh <[email protected]>
  • Loading branch information
RomanZavodskikh and Roman Zavodskikh authored Sep 26, 2023
1 parent 9e6699f commit c823514
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 21 deletions.
7 changes: 3 additions & 4 deletions filters/fadein/fadein_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,17 +369,16 @@ func TestPostProcessor(t *testing.T) {
* -> fadeIn("15ms") -> <"http://10.0.0.1:8080", "http://10.0.0.2:8080">
`

endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
const lastSeenTimeout = 2 * time.Second
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{LastSeenTimeout: lastSeenTimeout})
rt, update := createRouting(t, initialRoutes, endpointRegistry)
firstDetected := time.Now()

const nextRoutes = `
* -> fadeIn("1m") -> <"http://10.0.0.2:8080">
`

// We need to wait routing.lastSeenTimeout to expire.
// TODO: Use mock clock like this https://pkg.go.dev/github.com/benbjohnson/clock
time.Sleep(61 * time.Second)
time.Sleep(lastSeenTimeout + 10*time.Millisecond)
update(nextRoutes)
update(initialRoutes)

Expand Down
12 changes: 8 additions & 4 deletions loadbalancer/algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,17 @@ func shiftWeighted(rnd *rand.Rand, ctx *routing.LBContext, now time.Time) routin
rt := ctx.Route
ep := ctx.Route.LBEndpoints
for _, epi := range ep {
wi := fadeIn(now, rt.LBFadeInDuration, rt.LBFadeInExponent, epi.Detected)
detected := ctx.Registry.GetMetrics(epi.Host).DetectedTime()
wi := fadeIn(now, rt.LBFadeInDuration, rt.LBFadeInExponent, detected)
sum += wi
}

choice := ep[len(ep)-1]
r := rnd.Float64() * sum
var upto float64
for i, epi := range ep {
upto += fadeIn(now, rt.LBFadeInDuration, rt.LBFadeInExponent, epi.Detected)
detected := ctx.Registry.GetMetrics(epi.Host).DetectedTime()
upto += fadeIn(now, rt.LBFadeInDuration, rt.LBFadeInExponent, detected)
if upto > r {
choice = ep[i]
break
Expand Down Expand Up @@ -112,19 +114,21 @@ func shiftToRemaining(rnd *rand.Rand, ctx *routing.LBContext, wi []int, now time
func withFadeIn(rnd *rand.Rand, ctx *routing.LBContext, choice int, algo routing.LBAlgorithm) routing.LBEndpoint {
ep := ctx.Route.LBEndpoints
now := time.Now()
detected := ctx.Registry.GetMetrics(ctx.Route.LBEndpoints[choice].Host).DetectedTime()
f := fadeIn(
now,
ctx.Route.LBFadeInDuration,
ctx.Route.LBFadeInExponent,
ctx.Route.LBEndpoints[choice].Detected,
detected,
)

if rnd.Float64() < f {
return ep[choice]
}
notFadingIndexes := make([]int, 0, len(ep))
for i := 0; i < len(ep); i++ {
if _, fadingIn := fadeInState(now, ctx.Route.LBFadeInDuration, ep[i].Detected); !fadingIn {
detected := ctx.Registry.GetMetrics(ep[i].Host).DetectedTime()
if _, fadingIn := fadeInState(now, ctx.Route.LBFadeInDuration, detected); !fadingIn {
notFadingIndexes = append(notFadingIndexes, i)
}
}
Expand Down
21 changes: 14 additions & 7 deletions routing/endpointregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/zalando/skipper/eskip"
)

const lastSeenTimeout = 1 * time.Minute
const defaultLastSeenTimeout = 1 * time.Minute

// Metrics describe the data about endpoint that could be
// used to perform better load balancing, fadeIn, etc.
Expand All @@ -32,8 +32,9 @@ func (e *entry) InflightRequests() int64 {
}

type EndpointRegistry struct {
lastSeen map[string]time.Time
now func() time.Time
lastSeen map[string]time.Time
lastSeenTimeout time.Duration
now func() time.Time

mu sync.Mutex

Expand All @@ -43,6 +44,7 @@ type EndpointRegistry struct {
var _ PostProcessor = &EndpointRegistry{}

type RegistryOptions struct {
LastSeenTimeout time.Duration
}

func (r *EndpointRegistry) Do(routes []*Route) []*Route {
Expand All @@ -62,7 +64,7 @@ func (r *EndpointRegistry) Do(routes []*Route) []*Route {
}

for host, ts := range r.lastSeen {
if ts.Add(lastSeenTimeout).Before(now) {
if ts.Add(r.lastSeenTimeout).Before(now) {
r.mu.Lock()
if r.data[host].inflightRequests == 0 {
delete(r.lastSeen, host)
Expand All @@ -76,10 +78,15 @@ func (r *EndpointRegistry) Do(routes []*Route) []*Route {
}

func NewEndpointRegistry(o RegistryOptions) *EndpointRegistry {
if o.LastSeenTimeout == 0 {
o.LastSeenTimeout = defaultLastSeenTimeout
}

return &EndpointRegistry{
data: map[string]*entry{},
lastSeen: map[string]time.Time{},
now: time.Now,
data: map[string]*entry{},
lastSeen: map[string]time.Time{},
lastSeenTimeout: o.LastSeenTimeout,
now: time.Now,
}
}

Expand Down
2 changes: 1 addition & 1 deletion routing/endpointregistry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestDoRemovesOldEntries(t *testing.T) {
r.DecInflightRequest("endpoint2.test:80")

routing.SetNow(r, func() time.Time {
return beginTestTs.Add(routing.ExportLastSeenTimeout + time.Second)
return beginTestTs.Add(routing.ExportDefaultLastSeenTimeout + time.Second)
})
route = &routing.Route{
LBEndpoints: []routing.LBEndpoint{
Expand Down
10 changes: 5 additions & 5 deletions routing/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package routing
import "time"

var (
ExportProcessRouteDef = processRouteDef
ExportNewMatcher = newMatcher
ExportMatch = (*matcher).match
ExportProcessPredicates = processPredicates
ExportLastSeenTimeout = lastSeenTimeout
ExportProcessRouteDef = processRouteDef
ExportNewMatcher = newMatcher
ExportMatch = (*matcher).match
ExportProcessPredicates = processPredicates
ExportDefaultLastSeenTimeout = defaultLastSeenTimeout
)

func SetNow(r *EndpointRegistry, now func() time.Time) {
Expand Down

0 comments on commit c823514

Please sign in to comment.