Skip to content

Commit

Permalink
Created HealthyEndpoint* fields in routing.Routing
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Zavodskikh <[email protected]>
  • Loading branch information
Roman Zavodskikh committed Aug 17, 2023
1 parent 8b981f3 commit d3af8fa
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 0 deletions.
58 changes: 58 additions & 0 deletions routing/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package routing
import (
"errors"
"fmt"
"math/rand"
"net/url"
"sort"
"sync"
"time"

log "github.com/sirupsen/logrus"
"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/logging"
Expand Down Expand Up @@ -528,6 +530,62 @@ type routeTable struct {
created time.Time
}

func healthyEndpoint(rnd *rand.Rand, ep LBEndpoint, route *Route) bool {
return true
}

func updateHealthyEndpointsOnce(r *Routing) {
rnd := rand.New(rand.NewSource(time.Now().UnixNano())) // #nosec

var rt *routeTable
rt, ok := r.routeTable.Load().(*routeTable)
if !ok {
return
}

for _, ri := range rt.routes {
if ri.Route.BackendType != eskip.LBBackend {
continue
}

if len(ri.Route.LBEndpoints) == 0 {
log.Debugf("failed to post-process LB route: %s, no endpoints defined", ri.Id)
continue
}

newHealthyEndpoints := make([]LBEndpoint, 0, len(ri.LBEndpoints))
newHealthyEndpointsSet := make(map[LBEndpoint]struct{}, len(ri.LBEndpoints))
for i := range ri.LBEndpoints {
if healthyEndpoint(rnd, ri.LBEndpoints[i], ri) {
newHealthyEndpoints = append(newHealthyEndpoints, ri.LBEndpoints[i])
newHealthyEndpointsSet[ri.LBEndpoints[i]] = struct{}{}
}
}
if len(newHealthyEndpoints) == 0 {
newHealthyEndpoints = ri.LBEndpoints
for i := range ri.LBEndpoints {
newHealthyEndpointsSet[ri.LBEndpoints[i]] = struct{}{}
}
}

ri.mx.Lock()
ri.HealthyEndpoints = newHealthyEndpoints
ri.HealthyEndpointsSet = newHealthyEndpointsSet
ri.mx.Unlock()
}
}

func updateHealthyEndpoints(r *Routing, updateCh <-chan time.Time) {
for {
select {
case <-r.quit:
return
case <-updateCh:
go updateHealthyEndpointsOnce(r)
}
}
}

// close routeTable will cleanup all underlying resources, that could
// leak goroutines.
func (rt *routeTable) close() {
Expand Down
42 changes: 42 additions & 0 deletions routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ type Options struct {
// SignalFirstLoad enables signaling on the first load
// of the routing configuration during the startup.
SignalFirstLoad bool

UpdateHealthyEndpointsChan <-chan time.Time
}

// RouteFilter contains extensions to generic filter
Expand Down Expand Up @@ -230,6 +232,10 @@ type Route struct {
// balanced route.
LBEndpoints []LBEndpoint

// LBEndpoints which are checked for not being unhealthy or fading-in
HealthyEndpoints []LBEndpoint
HealthyEndpointsSet map[LBEndpoint]struct{}

// LBAlgorithm is the selected load balancing algorithm
// of a load balanced route.
LBAlgorithm LBAlgorithm
Expand All @@ -248,6 +254,30 @@ type Route struct {
LBFadeInExponent float64
}

func (r *Route) GetHealthyEndpoints() []LBEndpoint {
r.mx.Lock()
defer r.mx.Unlock()

if len(r.HealthyEndpoints) == 0 {
return r.LBEndpoints
}
return r.HealthyEndpoints
}

func (r *Route) GetHealthyEndpointsSet() map[LBEndpoint]struct{} {
r.mx.Lock()
defer r.mx.Unlock()

if len(r.HealthyEndpointsSet) == 0 {
endpointsSet := make(map[LBEndpoint]struct{}, len(r.LBEndpoints))
for i := range r.LBEndpoints {
endpointsSet[r.LBEndpoints[i]] = struct{}{}
}
return endpointsSet
}
return r.HealthyEndpointsSet
}

// PostProcessor is an interface for custom post-processors applying changes
// to the routes after they were created from their data representation and
// before they were passed to the proxy.
Expand Down Expand Up @@ -359,11 +389,23 @@ func (r *Routing) startReceivingUpdates(o Options) {
dc := len(o.DataClients)
c := make(chan *routeTable)
go receiveRouteMatcher(o, c, r.quit)

go func() {
if o.UpdateHealthyEndpointsChan == nil {
ticker := time.NewTicker(500 * time.Millisecond)
updateHealthyEndpoints(r, ticker.C)
ticker.Stop()
} else {
updateHealthyEndpoints(r, o.UpdateHealthyEndpointsChan)
}
}()

go func() {
for {
select {
case rt := <-c:
r.routeTable.Store(rt)
go updateHealthyEndpointsOnce(r)
if !r.firstLoadSignaled {
dc--
if dc == 0 {
Expand Down

0 comments on commit d3af8fa

Please sign in to comment.