Skip to content

Commit

Permalink
Move set and health packages here to avoid Felix/Typha duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
Neil Jerram committed Jun 27, 2017
1 parent cf00825 commit 969076e
Show file tree
Hide file tree
Showing 6 changed files with 761 additions and 0 deletions.
171 changes: 171 additions & 0 deletions lib/health/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright (c) 2017 Tigera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package health

import (
"fmt"
"net/http"
"sync"
"time"

log "github.com/Sirupsen/logrus"
"github.com/projectcalico/libcalico-go/lib/set"
)

// Any kind of value that can be used as a map key and is unique across multiple packages. For
// example, "type myHealthSource string".
type HealthSource interface{}

type HealthIndicator struct {
// The source of this health indicator.
Source HealthSource

// How long the indicator is valid for. In other words, if it continues operating normally,
// the source expects to refresh this indicator before this timeout.
Timeout time.Duration
}

// For a component that provides health indications, return the sources that it provides to indicate
// readiness, and those that it provides to indicate liveness.
type HealthProvider interface {
ReadySources() []HealthSource
LiveSources() []HealthSource
}

type HealthState struct {
// Whether we are overall 'ready'.
ready bool

// Whether we are overall 'live'.
live bool

// Mutex used to protect against concurrently reading and writing those attributes.
mutex *sync.Mutex
}

func (state *HealthState) Ready() bool {
state.mutex.Lock()
defer state.mutex.Unlock()
return state.ready
}

func (state *HealthState) Live() bool {
state.mutex.Lock()
defer state.mutex.Unlock()
return state.live
}

func NewHealthState() *HealthState {
// Start as 'live' but not 'ready'.
return &HealthState{ready: false, live: true, mutex: &sync.Mutex{}}
}

func MonitorHealth(
state *HealthState,
neededForReady set.Set,
neededForLive set.Set,
c <-chan HealthIndicator,
) {
currentHealth := set.New()
timer := map[HealthSource]*time.Timer{}
timeoutC := make(chan HealthSource)

for {
select {
case indicator, ok := <-c:
if !ok {
log.Warningf("Health channel closed")
state.mutex.Lock()
state.ready = false
state.live = false
state.mutex.Unlock()
return
}
log.WithField("source", indicator.Source).Debug("Health indicator current")
if timer[indicator.Source] != nil {
timer[indicator.Source].Stop()
}
if indicator.Timeout > 0 {
currentHealth.Add(indicator.Source)
timer[indicator.Source] = time.AfterFunc(indicator.Timeout, func() {
timeoutC <- indicator.Source
})
} else {
// Shortcut immediate timeout. A health source can use an
// indication with zero timeout to cancel a previous indication that
// might otherwise take a long time to expire.
log.WithField("source", indicator.Source).Debug("Health indicator cancelled")
currentHealth.Discard(indicator.Source)
}
case source := <-timeoutC:
log.WithField("source", source).Debug("Health indicator expired")
currentHealth.Discard(source)
}
state.mutex.Lock()
state.ready = currentHealth.ContainsAll(neededForReady)
state.live = currentHealth.ContainsAll(neededForLive)
log.WithFields(log.Fields{
"ready": state.ready,
"live": state.live,
}).Debug("Health now")
state.mutex.Unlock()
}
}

const (
// The HTTP status that we use for 'ready' or 'live'. 204 means "No Content: The server
// successfully processed the request and is not returning any content." (Kubernetes
// interpets any 200<=status<400 as 'good'.)
STATUS_GOOD = 204

// The HTTP status that we use for 'not ready' or 'not live'. 503 means "Service
// Unavailable: The server is currently unavailable (because it is overloaded or down for
// maintenance). Generally, this is a temporary state." (Kubernetes interpets any
// status>=400 as 'bad'.)
STATUS_BAD = 503
)

func ServeHealth(port int, neededForReady set.Set, neededForLive set.Set, c <-chan HealthIndicator) {

state := NewHealthState()

go MonitorHealth(state, neededForReady, neededForLive, c)

log.WithField("port", port).Info("Starting health endpoints")
http.HandleFunc("/readiness", func(rsp http.ResponseWriter, req *http.Request) {
log.Debug("GET /readiness")
status := STATUS_BAD
if state.Ready() {
log.Debug("Felix is ready")
status = STATUS_GOOD
}
rsp.WriteHeader(status)
})
http.HandleFunc("/liveness", func(rsp http.ResponseWriter, req *http.Request) {
log.Debug("GET /liveness")
status := STATUS_BAD
if state.Live() {
log.Debug("Felix is live")
status = STATUS_GOOD
}
rsp.WriteHeader(status)
})
for {
err := http.ListenAndServe(fmt.Sprintf(":%v", port), nil)
log.WithError(err).Error(
"Readiness endpoint failed, trying to restart it...")
time.Sleep(1 * time.Second)
}
}
33 changes: 33 additions & 0 deletions lib/health/health_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) 2017 Tigera, Inc. All rights reserved.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package health_test

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"testing"

"github.com/projectcalico/libcalico-go/lib/testutils"
)

func init() {
testutils.HookLogrusForGinkgo()
}

func TestSet(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Health Suite")
}
156 changes: 156 additions & 0 deletions lib/health/health_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright (c) 2017 Tigera, Inc. All rights reserved.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package health_test

import (
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/projectcalico/libcalico-go/lib/health"
"github.com/projectcalico/libcalico-go/lib/set"
)

type healthSource string

var (
SOURCE1 = healthSource("source1")
SOURCE2 = healthSource("source2")
SOURCE3 = healthSource("source3")
)

var _ = Describe("Health", func() {

var (
healthChannel chan health.HealthIndicator
state *health.HealthState
)

notifySource := func(source healthSource) func() {
return func() {
healthChannel <- health.HealthIndicator{source, 1 * time.Second}
}
}

cancelSource := func(source healthSource) func() {
return func() {
healthChannel <- health.HealthIndicator{source, 0}
}
}

BeforeEach(func() {
healthChannel = make(chan health.HealthIndicator)
// Note: use a new HealthState, in each test. Otherwise what can happen is that the
// closing goroutine from the previous test changes it and confuses the test that is
// running now...
state = health.NewHealthState()

go health.MonitorHealth(
state,
set.From(SOURCE1, SOURCE2),
set.From(SOURCE2, SOURCE3),
healthChannel,
)
})

AfterEach(func() {
close(healthChannel)
Eventually(state.Ready).Should(BeFalse())
Eventually(state.Live).Should(BeFalse())
})

It("initially reports live but not ready", func() {
Expect(state.Ready()).To(BeFalse())
Expect(state.Live()).To(BeTrue())
})

Context("with indicators for readiness sources", func() {

BeforeEach(func() {
notifySource(SOURCE1)()
notifySource(SOURCE2)()
})

It("is ready but not live", func() {
Eventually(state.Ready).Should(BeTrue())
Expect(state.Live()).To(BeFalse())
})

Context("with liveness source also", func() {

BeforeEach(notifySource(SOURCE3))

It("is ready and live", func() {
Eventually(state.Ready).Should(BeTrue())
Eventually(state.Live).Should(BeTrue())
})
})

Context("with a source cancelled", func() {

BeforeEach(cancelSource(SOURCE1))

It("is not ready and not live", func() {
Eventually(state.Ready).Should(BeFalse())
Eventually(state.Live).Should(BeFalse())
})
})
})

Context("with indicators for liveness sources", func() {

BeforeEach(func() {
notifySource(SOURCE3)()
notifySource(SOURCE2)()
})

It("is live but not ready", func() {
Eventually(state.Live).Should(BeTrue())
Expect(state.Ready()).To(BeFalse())
})

Context("with readiness source also", func() {

BeforeEach(notifySource(SOURCE1))

It("is ready and live", func() {
Eventually(state.Ready).Should(BeTrue())
Eventually(state.Live).Should(BeTrue())
})

Context("with time passing so that indicators expire", func() {

BeforeEach(func() {
time.Sleep(2 * time.Second)
})

It("is not ready and not live", func() {
Eventually(state.Ready).Should(BeFalse())
Eventually(state.Live).Should(BeFalse())
})
})
})

Context("with a source cancelled", func() {

BeforeEach(cancelSource(SOURCE3))

It("is not ready and not live", func() {
Eventually(state.Ready).Should(BeFalse())
Eventually(state.Live).Should(BeFalse())
})
})
})
})
Loading

0 comments on commit 969076e

Please sign in to comment.