Skip to content

Commit

Permalink
Use sync.Map in the endpointregistry (#2795)
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Zavodskikh <[email protected]>
Co-authored-by: Roman Zavodskikh <[email protected]>
  • Loading branch information
RomanZavodskikh and Roman Zavodskikh authored Dec 21, 2023
1 parent 0dc97af commit 377b9e0
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 99 deletions.
18 changes: 16 additions & 2 deletions filters/fadein/fadein.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type (
}

postProcessor struct {
endpointRegisty *routing.EndpointRegistry
// "http://10.2.1.53:1234": {t0 60s t0-10s}
detected map[string]detectedFadeIn
}
Expand Down Expand Up @@ -192,11 +193,20 @@ func (endpointCreated) CreateFilter(args []interface{}) (filters.Filter, error)
func (endpointCreated) Request(filters.FilterContext) {}
func (endpointCreated) Response(filters.FilterContext) {}

type PostProcessorOptions struct {
EndpointRegistry *routing.EndpointRegistry
}

// NewPostProcessor creates post-processor for maintaining the detection time of LB endpoints with fade-in
// behavior.
func NewPostProcessor() routing.PostProcessor {
func NewPostProcessor(options PostProcessorOptions) routing.PostProcessor {
if options.EndpointRegistry == nil {
options.EndpointRegistry = routing.NewEndpointRegistry(routing.RegistryOptions{})
}

return &postProcessor{
detected: make(map[string]detectedFadeIn),
endpointRegisty: options.EndpointRegistry,
detected: make(map[string]detectedFadeIn),
}
}

Expand Down Expand Up @@ -235,6 +245,10 @@ func (p *postProcessor) Do(r []*routing.Route) []*routing.Route {
}

ep.Detected = detected
metrics := p.endpointRegisty.GetMetrics(ep.Host)
if endpointsCreated[key].After(metrics.DetectedTime()) {
metrics.SetDetected(endpointsCreated[key])
}
p.detected[key] = detectedFadeIn{
when: detected,
duration: ri.LBFadeInDuration,
Expand Down
4 changes: 2 additions & 2 deletions filters/fadein/fadein_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func TestPostProcessor(t *testing.T) {
PostProcessors: []routing.PostProcessor{
loadbalancer.NewAlgorithmProvider(),
endpointRegistry,
NewPostProcessor(),
NewPostProcessor(PostProcessorOptions{EndpointRegistry: endpointRegistry}),
},
SignalFirstLoad: true,
})
Expand Down Expand Up @@ -431,7 +431,7 @@ func TestPostProcessor(t *testing.T) {
if !ep.Detected.After(firstDetected) {
t.Fatal("Failed to reset detection time.")
}
if endpointRegistry.GetMetrics(ep.Host).DetectedTime().After(firstDetected) {
if !endpointRegistry.GetMetrics(ep.Host).DetectedTime().After(firstDetected) {
t.Fatal("Failed to reset detection time.")
}

Expand Down
4 changes: 2 additions & 2 deletions loadbalancer/algorithm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func TestConsistentHashBoundedLoadDistribution(t *testing.T) {
t.Errorf("Expected in-flight requests for each endpoint to be less than %d. In-flight request counts: %d, %d, %d", limit, ifr0, ifr1, ifr2)
}
ep.Metrics.IncInflightRequest()
ctx.Registry.IncInflightRequest(ep.Host)
ctx.Registry.GetMetrics(ep.Host).IncInflightRequest()
}
}

Expand All @@ -441,7 +441,7 @@ func TestConsistentHashKeyDistribution(t *testing.T) {
func addInflightRequests(registry *routing.EndpointRegistry, endpoint routing.LBEndpoint, count int) {
for i := 0; i < count; i++ {
endpoint.Metrics.IncInflightRequest()
registry.IncInflightRequest(endpoint.Host)
registry.GetMetrics(endpoint.Host).IncInflightRequest()
}
}

Expand Down
4 changes: 2 additions & 2 deletions loadbalancer/fadein_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func initializeEndpoints(endpointAges []time.Duration, fadeInDuration time.Durat
Host: eps[i],
Detected: detectionTimes[i],
})
ctx.Registry.SetDetectedTime(eps[i], detectionTimes[i])
ctx.Registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i])
}
ctx.LBEndpoints = ctx.Route.LBEndpoints

Expand Down Expand Up @@ -332,7 +332,7 @@ func benchmarkFadeIn(
Host: eps[i],
Detected: detectionTimes[i],
})
registry.SetDetectedTime(eps[i], detectionTimes[i])
registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i])
}

var wg sync.WaitGroup
Expand Down
2 changes: 1 addition & 1 deletion proxy/fadeintesting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (p *fadeInProxy) addInstances(n int) {
DataClients: []routing.DataClient{client},
PostProcessors: []routing.PostProcessor{
loadbalancer.NewAlgorithmProvider(),
fadein.NewPostProcessor(),
fadein.NewPostProcessor(fadein.PostProcessorOptions{}),
},
})

Expand Down
125 changes: 58 additions & 67 deletions routing/endpointregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package routing

import (
"sync"
"sync/atomic"
"time"

"github.com/zalando/skipper/eskip"
Expand All @@ -13,32 +14,65 @@ const defaultLastSeenTimeout = 1 * time.Minute
// used to perform better load balancing, fadeIn, etc.
type Metrics interface {
DetectedTime() time.Time
SetDetected(detected time.Time)

LastSeen() time.Time
SetLastSeen(lastSeen time.Time)

InflightRequests() int64
IncInflightRequest()
DecInflightRequest()
}

type entry struct {
detected time.Time
inflightRequests int64
detected atomic.Value // time.Time
lastSeen atomic.Value // time.Time
inflightRequests atomic.Int64
}

var _ Metrics = &entry{}

func (e *entry) DetectedTime() time.Time {
return e.detected
return e.detected.Load().(time.Time)
}

func (e *entry) LastSeen() time.Time {
return e.lastSeen.Load().(time.Time)
}

func (e *entry) InflightRequests() int64 {
return e.inflightRequests
return e.inflightRequests.Load()
}

func (e *entry) IncInflightRequest() {
e.inflightRequests.Add(1)
}

func (e *entry) DecInflightRequest() {
e.inflightRequests.Add(-1)
}

func (e *entry) SetDetected(detected time.Time) {
e.detected.Store(detected)
}

func (e *entry) SetLastSeen(ts time.Time) {
e.lastSeen.Store(ts)
}

func newEntry() (result *entry) {
result = &entry{}
result.SetDetected(time.Time{})
result.SetLastSeen(time.Time{})
return
}

type EndpointRegistry struct {
lastSeen map[string]time.Time
lastSeenTimeout time.Duration
now func() time.Time

mu sync.Mutex

data map[string]*entry
// map[string]*entry
data sync.Map
}

var _ PostProcessor = &EndpointRegistry{}
Expand All @@ -55,24 +89,23 @@ func (r *EndpointRegistry) Do(routes []*Route) []*Route {
for _, epi := range route.LBEndpoints {
metrics := r.GetMetrics(epi.Host)
if metrics.DetectedTime().IsZero() {
r.SetDetectedTime(epi.Host, now)
metrics.SetDetected(now)
}

r.lastSeen[epi.Host] = now
metrics.SetLastSeen(now)
}
}
}

for host, ts := range r.lastSeen {
if ts.Add(r.lastSeenTimeout).Before(now) {
r.mu.Lock()
if r.data[host].inflightRequests == 0 {
delete(r.lastSeen, host)
delete(r.data, host)
}
r.mu.Unlock()
removeOlder := now.Add(-r.lastSeenTimeout)
r.data.Range(func(key, value any) bool {
e := value.(*entry)
if e.LastSeen().Before(removeOlder) {
r.data.Delete(key)
}
}

return true
})

return routes
}
Expand All @@ -82,58 +115,16 @@ func NewEndpointRegistry(o RegistryOptions) *EndpointRegistry {
o.LastSeenTimeout = defaultLastSeenTimeout
}

return &EndpointRegistry{
data: map[string]*entry{},
lastSeen: map[string]time.Time{},
result := &EndpointRegistry{
data: sync.Map{},
lastSeenTimeout: o.LastSeenTimeout,
now: time.Now,
}
}

func (r *EndpointRegistry) GetMetrics(key string) Metrics {
r.mu.Lock()
defer r.mu.Unlock()

e := r.getOrInitEntryLocked(key)
copy := &entry{}
*copy = *e
return copy
}

func (r *EndpointRegistry) SetDetectedTime(key string, detected time.Time) {
r.mu.Lock()
defer r.mu.Unlock()

e := r.getOrInitEntryLocked(key)
e.detected = detected
return result
}

func (r *EndpointRegistry) IncInflightRequest(key string) {
r.mu.Lock()
defer r.mu.Unlock()

e := r.getOrInitEntryLocked(key)
e.inflightRequests++
}

func (r *EndpointRegistry) DecInflightRequest(key string) {
r.mu.Lock()
defer r.mu.Unlock()

e := r.getOrInitEntryLocked(key)
e.inflightRequests--
}

// getOrInitEntryLocked returns pointer to endpoint registry entry
// which contains the information about endpoint representing the
// following key. r.mu must be held while calling this function and
// using of the entry returned. In general, key represents the "host:port"
// string
func (r *EndpointRegistry) getOrInitEntryLocked(key string) *entry {
e, ok := r.data[key]
if !ok {
e = &entry{}
r.data[key] = e
}
return e
func (r *EndpointRegistry) GetMetrics(key string) Metrics {
e, _ := r.data.LoadOrStore(key, newEntry())
return e.(*entry)
}
Loading

0 comments on commit 377b9e0

Please sign in to comment.