From a04ad40b7216f483453ff99eead76453780672db Mon Sep 17 00:00:00 2001 From: Neil Jerram Date: Mon, 26 Jun 2017 17:31:36 +0100 Subject: [PATCH 1/7] Implement health endpoints for Typha --- pkg/config/config_params.go | 2 + pkg/daemon/daemon.go | 26 +++++ pkg/health/health.go | 137 +++++++++++++++++++++++++ pkg/health/health_suite_test.go | 33 ++++++ pkg/health/health_test.go | 171 ++++++++++++++++++++++++++++++++ pkg/set/set.go | 13 +++ 6 files changed, 382 insertions(+) create mode 100644 pkg/health/health.go create mode 100644 pkg/health/health_suite_test.go create mode 100644 pkg/health/health_test.go diff --git a/pkg/config/config_params.go b/pkg/config/config_params.go index 57bba813..96a48bcc 100644 --- a/pkg/config/config_params.go +++ b/pkg/config/config_params.go @@ -103,6 +103,8 @@ type Config struct { LogSeverityScreen string `config:"oneof(DEBUG,INFO,WARNING,ERROR,CRITICAL);INFO"` LogSeveritySys string `config:"oneof(DEBUG,INFO,WARNING,ERROR,CRITICAL);INFO"` + HealthEnabled bool `config:"bool;false"` + HealthPort int `config:"int(0,65535);9099"` PrometheusMetricsEnabled bool `config:"bool;false"` PrometheusMetricsPort int `config:"int(0,65535);9093"` PrometheusGoMetricsEnabled bool `config:"bool;true"` diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index feef43c1..e316923d 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -39,9 +39,11 @@ import ( "github.com/projectcalico/typha/pkg/buildinfo" "github.com/projectcalico/typha/pkg/calc" "github.com/projectcalico/typha/pkg/config" + "github.com/projectcalico/typha/pkg/health" "github.com/projectcalico/typha/pkg/jitter" "github.com/projectcalico/typha/pkg/k8s" "github.com/projectcalico/typha/pkg/logutils" + "github.com/projectcalico/typha/pkg/set" "github.com/projectcalico/typha/pkg/snapcache" "github.com/projectcalico/typha/pkg/syncserver" ) @@ -77,6 +79,11 @@ type TyphaDaemon struct { NewBackendClient func(config api.CalicoAPIConfig) (BackendClient, error) ConfigureEarlyLogging func() ConfigureLogging func(configParams *config.Config) + + // Health monitoring. + healthChannel chan health.HealthIndicator + neededForReady set.Set + neededForLive set.Set } func New() *TyphaDaemon { @@ -215,6 +222,15 @@ configRetry: func (t *TyphaDaemon) CreateServer() { // Now create the Syncer; our caching layer and the TCP server. + // Health monitoring: as we create Typha components after this point, we'll add health + // sources that we expect those components to report, to feed into the determination of + // Typha's liveness and readiness. + t.neededForReady = set.New() + t.neededForLive = set.New() + if t.ConfigParams.HealthEnabled { + t.healthChannel = make(chan health.HealthIndicator) + } + // Get a Syncer from the datastore, which will feed the validator layer with updates. t.SyncerToValidator = calc.NewSyncerCallbacksDecoupler() t.Syncer = t.DatastoreClient.Syncer(t.SyncerToValidator) @@ -268,6 +284,16 @@ func (t *TyphaDaemon) Start(cxt context.Context) { log.Info("Prometheus metrics enabled. Starting server.") go servePrometheusMetrics(t.ConfigParams) } + + if t.ConfigParams.HealthEnabled { + log.Info("Health enabled. Starting server.") + go health.ServeHealth( + t.ConfigParams.HealthPort, + t.neededForReady, + t.neededForLive, + t.healthChannel, + ) + } } // WaitAndShutDown waits for OS signals or context.Done() and exits as appropriate. diff --git a/pkg/health/health.go b/pkg/health/health.go new file mode 100644 index 00000000..d5367cbf --- /dev/null +++ b/pkg/health/health.go @@ -0,0 +1,137 @@ +// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package health + +import ( + "fmt" + "net/http" + "sync" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/projectcalico/typha/pkg/set" +) + +// Any kind of value that can be used as a map key and is unique across multiple packages. For +// example, "type myHealthSource string". +type HealthSource interface{} + +type HealthIndicator struct { + // The source of this health indicator. + Source HealthSource + + // How long the indicator is valid for. In other words, if it continues operating normally, + // the source expects to refresh this indicator before this timeout. + Timeout time.Duration +} + +// For a component that provides health indications, return the sources that it provides to indicate +// readiness, and those that it provides to indicate liveness. +type HealthProvider interface { + ReadySources() []HealthSource + LiveSources() []HealthSource +} + +func MonitorHealth( + ready *bool, + live *bool, + mutex *sync.Mutex, + neededForReady set.Set, + neededForLive set.Set, + c <-chan HealthIndicator, +) { + currentHealth := set.New() + timer := map[HealthSource]*time.Timer{} + timeoutC := make(chan HealthSource) + + for { + select { + case indicator, ok := <-c: + if !ok { + log.Warningf("Health channel closed") + mutex.Lock() + *ready = false + *live = false + mutex.Unlock() + return + } + log.WithField("source", indicator.Source).Debug("Health indicator current") + if timer[indicator.Source] != nil { + timer[indicator.Source].Stop() + } + if indicator.Timeout > 0 { + currentHealth.Add(indicator.Source) + timer[indicator.Source] = time.AfterFunc(indicator.Timeout, func() { + timeoutC <- indicator.Source + }) + } else { + // Shortcut immediate timeout. A health source can use an + // indication with zero timeout to cancel a previous indication that + // might otherwise take a long time to expire. + log.WithField("source", indicator.Source).Debug("Health indicator cancelled") + currentHealth.Discard(indicator.Source) + } + case source := <-timeoutC: + log.WithField("source", source).Debug("Health indicator expired") + currentHealth.Discard(source) + } + mutex.Lock() + *ready = currentHealth.ContainsAll(neededForReady) + *live = currentHealth.ContainsAll(neededForLive) + log.WithFields(log.Fields{ + "ready": *ready, + "live": *live, + }).Debug("Health now") + mutex.Unlock() + } +} + +func ServeHealth(port int, neededForReady set.Set, neededForLive set.Set, c <-chan HealthIndicator) { + ready := false + live := false + mutex := &sync.Mutex{} + + go MonitorHealth(&ready, &live, mutex, neededForReady, neededForLive, c) + + log.WithField("port", port).Info("Starting health endpoints") + http.HandleFunc("/readiness", func(rsp http.ResponseWriter, req *http.Request) { + log.Debug("GET /readiness") + status := 500 + mutex.Lock() + if ready { + log.Debug("Typha is ready") + status = 200 + } + mutex.Unlock() + rsp.WriteHeader(status) + }) + http.HandleFunc("/liveness", func(rsp http.ResponseWriter, req *http.Request) { + log.Debug("GET /liveness") + status := 500 + mutex.Lock() + if live { + log.Debug("Typha is live") + status = 200 + } + mutex.Unlock() + rsp.WriteHeader(status) + }) + for { + err := http.ListenAndServe(fmt.Sprintf(":%v", port), nil) + log.WithError(err).Error( + "Readiness endpoint failed, trying to restart it...") + time.Sleep(1 * time.Second) + } +} diff --git a/pkg/health/health_suite_test.go b/pkg/health/health_suite_test.go new file mode 100644 index 00000000..f7c065d0 --- /dev/null +++ b/pkg/health/health_suite_test.go @@ -0,0 +1,33 @@ +// Copyright (c) 2017 Tigera, Inc. All rights reserved. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package health_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "testing" + + "github.com/projectcalico/libcalico-go/lib/testutils" +) + +func init() { + testutils.HookLogrusForGinkgo() +} + +func TestSet(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Health Suite") +} diff --git a/pkg/health/health_test.go b/pkg/health/health_test.go new file mode 100644 index 00000000..06a01876 --- /dev/null +++ b/pkg/health/health_test.go @@ -0,0 +1,171 @@ +// Copyright (c) 2017 Tigera, Inc. All rights reserved. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package health_test + +import ( + "sync" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/projectcalico/typha/pkg/health" + "github.com/projectcalico/typha/pkg/set" +) + +type healthSource string + +var ( + SOURCE1 = healthSource("source1") + SOURCE2 = healthSource("source2") + SOURCE3 = healthSource("source3") +) + +var _ = Describe("Health", func() { + + var ( + healthChannel chan health.HealthIndicator + outputs []bool + mutex *sync.Mutex + ) + + getReady := func() bool { + mutex.Lock() + defer mutex.Unlock() + return outputs[0] + } + + getLive := func() bool { + mutex.Lock() + defer mutex.Unlock() + return outputs[1] + } + + notifySource := func(source healthSource) func() { + return func() { + healthChannel <- health.HealthIndicator{source, 1 * time.Second} + } + } + + cancelSource := func(source healthSource) func() { + return func() { + healthChannel <- health.HealthIndicator{source, 0} + } + } + + BeforeEach(func() { + healthChannel = make(chan health.HealthIndicator) + // Note: use a different pair of locations, in each test, for the calculated "ready" + // and "live" values. Otherwise what can happen is that the closing goroutine from + // the previous test sets them to false and confuses the test that is running now... + outputs = make([]bool, 2, 2) + mutex = &sync.Mutex{} + + go health.MonitorHealth( + &outputs[0], &outputs[1], mutex, + set.From(SOURCE1, SOURCE2), + set.From(SOURCE2, SOURCE3), + healthChannel, + ) + }) + + AfterEach(func() { + close(healthChannel) + Eventually(getReady).Should(BeFalse()) + Eventually(getLive).Should(BeFalse()) + }) + + It("initially reports false", func() { + Expect(getReady()).To(BeFalse()) + Expect(getLive()).To(BeFalse()) + }) + + Context("with indicators for readiness sources", func() { + + BeforeEach(func() { + notifySource(SOURCE1)() + notifySource(SOURCE2)() + }) + + It("is ready but not live", func() { + Eventually(getReady).Should(BeTrue()) + Expect(getLive()).To(BeFalse()) + }) + + Context("with liveness source also", func() { + + BeforeEach(notifySource(SOURCE3)) + + It("is ready and live", func() { + Eventually(getReady).Should(BeTrue()) + Eventually(getLive).Should(BeTrue()) + }) + }) + + Context("with a source cancelled", func() { + + BeforeEach(cancelSource(SOURCE1)) + + It("is not ready and not live", func() { + Eventually(getReady).Should(BeFalse()) + Eventually(getLive).Should(BeFalse()) + }) + }) + }) + + Context("with indicators for liveness sources", func() { + + BeforeEach(func() { + notifySource(SOURCE3)() + notifySource(SOURCE2)() + }) + + It("is live but not ready", func() { + Eventually(getLive).Should(BeTrue()) + Expect(getReady()).To(BeFalse()) + }) + + Context("with readiness source also", func() { + + BeforeEach(notifySource(SOURCE1)) + + It("is ready and live", func() { + Eventually(getReady).Should(BeTrue()) + Eventually(getLive).Should(BeTrue()) + }) + + Context("with time passing so that indicators expire", func() { + + BeforeEach(func() { + time.Sleep(2 * time.Second) + }) + + It("is not ready and not live", func() { + Eventually(getReady).Should(BeFalse()) + Eventually(getLive).Should(BeFalse()) + }) + }) + }) + + Context("with a source cancelled", func() { + + BeforeEach(cancelSource(SOURCE3)) + + It("is not ready and not live", func() { + Eventually(getReady).Should(BeFalse()) + Eventually(getLive).Should(BeFalse()) + }) + }) + }) +}) diff --git a/pkg/set/set.go b/pkg/set/set.go index 0cdc70a3..a3fc94f9 100644 --- a/pkg/set/set.go +++ b/pkg/set/set.go @@ -31,6 +31,7 @@ type Set interface { Iter(func(item interface{}) error) Copy() Set Equals(Set) bool + ContainsAll(Set) bool } type empty struct{} @@ -131,3 +132,15 @@ func (set mapSet) Equals(other Set) bool { } return true } + +func (set mapSet) ContainsAll(other Set) bool { + result := true + other.Iter(func(item interface{}) error { + if !set.Contains(item) { + result = false + return StopIteration + } + return nil + }) + return result +} From 859150aaef20d979bb77cebd9523b22220f9c8a6 Mon Sep 17 00:00:00 2001 From: Neil Jerram Date: Tue, 27 Jun 2017 14:07:54 +0100 Subject: [PATCH 2/7] Use set and health packages from libcalico-go --- check-licenses/check_licenses.go | 2 +- glide.lock | 12 +- glide.yaml | 2 +- pkg/daemon/daemon.go | 5 +- pkg/health/health.go | 137 -------------------- pkg/health/health_suite_test.go | 33 ----- pkg/health/health_test.go | 171 ------------------------ pkg/k8s/lookup.go | 2 +- pkg/set/set.go | 146 --------------------- pkg/set/set_suite_test.go | 33 ----- pkg/set/set_test.go | 216 ------------------------------- 11 files changed, 12 insertions(+), 747 deletions(-) delete mode 100644 pkg/health/health.go delete mode 100644 pkg/health/health_suite_test.go delete mode 100644 pkg/health/health_test.go delete mode 100644 pkg/set/set.go delete mode 100644 pkg/set/set_suite_test.go delete mode 100644 pkg/set/set_test.go diff --git a/check-licenses/check_licenses.go b/check-licenses/check_licenses.go index f71b9764..8bfb11f9 100644 --- a/check-licenses/check_licenses.go +++ b/check-licenses/check_licenses.go @@ -23,8 +23,8 @@ import ( "regexp" "strings" + "github.com/projectcalico/libcalico-go/lib/set" "github.com/projectcalico/typha/pkg/logutils" - "github.com/projectcalico/typha/pkg/set" ) var ( diff --git a/glide.lock b/glide.lock index 269fe680..5b9df276 100644 --- a/glide.lock +++ b/glide.lock @@ -1,12 +1,12 @@ -hash: 1847cdf106df6e8085d241e5910354da8346074adbb0dda7033ad4708701a92e -updated: 2017-07-19T10:15:13.551306815Z +hash: bf35a63bb5c663298d6bc89a2682099a7d0abbc895f71d84a4d76d55e009e3de +updated: 2017-07-20T08:42:22.874117758Z imports: - name: github.com/beorn7/perks version: 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9 subpackages: - quantile - name: github.com/coreos/etcd - version: ae23b0ef2f1f6708f9a34d287a28ab02767c2a16 + version: c31bec0f29facff13f7c3e3d948e55dd6689ed42 subpackages: - client - pkg/pathutil @@ -114,7 +114,7 @@ imports: - name: github.com/projectcalico/go-yaml-wrapper version: 598e54215bee41a19677faa4f0c32acd2a87eb56 - name: github.com/projectcalico/libcalico-go - version: 639aa7b2d33b94419143a2e6987d2d77aa395f2f + version: e1ab5e5bf4a57ea6fa71bf4a7712af27b824a005 subpackages: - lib - lib/api @@ -131,6 +131,7 @@ imports: - lib/converter - lib/errors - lib/hash + - lib/health - lib/hwm - lib/ipip - lib/net @@ -139,6 +140,7 @@ imports: - lib/selector - lib/selector/parser - lib/selector/tokenizer + - lib/set - lib/testutils - lib/validator - name: github.com/prometheus/client_golang @@ -194,7 +196,7 @@ imports: - idna/ - lex/httplex - name: golang.org/x/sys - version: cd2c276457edda6df7fb04895d3fd6a6add42926 + version: 7a4fde3fda8ef580a89dbae8138c26041be14299 subpackages: - unix - name: golang.org/x/text diff --git a/glide.yaml b/glide.yaml index ace7d439..34f5dc1c 100644 --- a/glide.yaml +++ b/glide.yaml @@ -27,7 +27,7 @@ import: - package: github.com/go-ini/ini version: ^1.21.0 - package: github.com/projectcalico/libcalico-go - version: 639aa7b2d33b94419143a2e6987d2d77aa395f2f + version: e1ab5e5bf4a57ea6fa71bf4a7712af27b824a005 subpackages: - lib - package: github.com/Sirupsen/logrus diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index e316923d..679dc704 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -29,21 +29,20 @@ import ( "time" log "github.com/Sirupsen/logrus" - "github.com/docopt/docopt-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/projectcalico/libcalico-go/lib/api" "github.com/projectcalico/libcalico-go/lib/backend" bapi "github.com/projectcalico/libcalico-go/lib/backend/api" + "github.com/projectcalico/libcalico-go/lib/health" + "github.com/projectcalico/libcalico-go/lib/set" "github.com/projectcalico/typha/pkg/buildinfo" "github.com/projectcalico/typha/pkg/calc" "github.com/projectcalico/typha/pkg/config" - "github.com/projectcalico/typha/pkg/health" "github.com/projectcalico/typha/pkg/jitter" "github.com/projectcalico/typha/pkg/k8s" "github.com/projectcalico/typha/pkg/logutils" - "github.com/projectcalico/typha/pkg/set" "github.com/projectcalico/typha/pkg/snapcache" "github.com/projectcalico/typha/pkg/syncserver" ) diff --git a/pkg/health/health.go b/pkg/health/health.go deleted file mode 100644 index d5367cbf..00000000 --- a/pkg/health/health.go +++ /dev/null @@ -1,137 +0,0 @@ -// Copyright (c) 2017 Tigera, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package health - -import ( - "fmt" - "net/http" - "sync" - "time" - - log "github.com/Sirupsen/logrus" - "github.com/projectcalico/typha/pkg/set" -) - -// Any kind of value that can be used as a map key and is unique across multiple packages. For -// example, "type myHealthSource string". -type HealthSource interface{} - -type HealthIndicator struct { - // The source of this health indicator. - Source HealthSource - - // How long the indicator is valid for. In other words, if it continues operating normally, - // the source expects to refresh this indicator before this timeout. - Timeout time.Duration -} - -// For a component that provides health indications, return the sources that it provides to indicate -// readiness, and those that it provides to indicate liveness. -type HealthProvider interface { - ReadySources() []HealthSource - LiveSources() []HealthSource -} - -func MonitorHealth( - ready *bool, - live *bool, - mutex *sync.Mutex, - neededForReady set.Set, - neededForLive set.Set, - c <-chan HealthIndicator, -) { - currentHealth := set.New() - timer := map[HealthSource]*time.Timer{} - timeoutC := make(chan HealthSource) - - for { - select { - case indicator, ok := <-c: - if !ok { - log.Warningf("Health channel closed") - mutex.Lock() - *ready = false - *live = false - mutex.Unlock() - return - } - log.WithField("source", indicator.Source).Debug("Health indicator current") - if timer[indicator.Source] != nil { - timer[indicator.Source].Stop() - } - if indicator.Timeout > 0 { - currentHealth.Add(indicator.Source) - timer[indicator.Source] = time.AfterFunc(indicator.Timeout, func() { - timeoutC <- indicator.Source - }) - } else { - // Shortcut immediate timeout. A health source can use an - // indication with zero timeout to cancel a previous indication that - // might otherwise take a long time to expire. - log.WithField("source", indicator.Source).Debug("Health indicator cancelled") - currentHealth.Discard(indicator.Source) - } - case source := <-timeoutC: - log.WithField("source", source).Debug("Health indicator expired") - currentHealth.Discard(source) - } - mutex.Lock() - *ready = currentHealth.ContainsAll(neededForReady) - *live = currentHealth.ContainsAll(neededForLive) - log.WithFields(log.Fields{ - "ready": *ready, - "live": *live, - }).Debug("Health now") - mutex.Unlock() - } -} - -func ServeHealth(port int, neededForReady set.Set, neededForLive set.Set, c <-chan HealthIndicator) { - ready := false - live := false - mutex := &sync.Mutex{} - - go MonitorHealth(&ready, &live, mutex, neededForReady, neededForLive, c) - - log.WithField("port", port).Info("Starting health endpoints") - http.HandleFunc("/readiness", func(rsp http.ResponseWriter, req *http.Request) { - log.Debug("GET /readiness") - status := 500 - mutex.Lock() - if ready { - log.Debug("Typha is ready") - status = 200 - } - mutex.Unlock() - rsp.WriteHeader(status) - }) - http.HandleFunc("/liveness", func(rsp http.ResponseWriter, req *http.Request) { - log.Debug("GET /liveness") - status := 500 - mutex.Lock() - if live { - log.Debug("Typha is live") - status = 200 - } - mutex.Unlock() - rsp.WriteHeader(status) - }) - for { - err := http.ListenAndServe(fmt.Sprintf(":%v", port), nil) - log.WithError(err).Error( - "Readiness endpoint failed, trying to restart it...") - time.Sleep(1 * time.Second) - } -} diff --git a/pkg/health/health_suite_test.go b/pkg/health/health_suite_test.go deleted file mode 100644 index f7c065d0..00000000 --- a/pkg/health/health_suite_test.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright (c) 2017 Tigera, Inc. All rights reserved. - -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package health_test - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "testing" - - "github.com/projectcalico/libcalico-go/lib/testutils" -) - -func init() { - testutils.HookLogrusForGinkgo() -} - -func TestSet(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Health Suite") -} diff --git a/pkg/health/health_test.go b/pkg/health/health_test.go deleted file mode 100644 index 06a01876..00000000 --- a/pkg/health/health_test.go +++ /dev/null @@ -1,171 +0,0 @@ -// Copyright (c) 2017 Tigera, Inc. All rights reserved. - -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package health_test - -import ( - "sync" - "time" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "github.com/projectcalico/typha/pkg/health" - "github.com/projectcalico/typha/pkg/set" -) - -type healthSource string - -var ( - SOURCE1 = healthSource("source1") - SOURCE2 = healthSource("source2") - SOURCE3 = healthSource("source3") -) - -var _ = Describe("Health", func() { - - var ( - healthChannel chan health.HealthIndicator - outputs []bool - mutex *sync.Mutex - ) - - getReady := func() bool { - mutex.Lock() - defer mutex.Unlock() - return outputs[0] - } - - getLive := func() bool { - mutex.Lock() - defer mutex.Unlock() - return outputs[1] - } - - notifySource := func(source healthSource) func() { - return func() { - healthChannel <- health.HealthIndicator{source, 1 * time.Second} - } - } - - cancelSource := func(source healthSource) func() { - return func() { - healthChannel <- health.HealthIndicator{source, 0} - } - } - - BeforeEach(func() { - healthChannel = make(chan health.HealthIndicator) - // Note: use a different pair of locations, in each test, for the calculated "ready" - // and "live" values. Otherwise what can happen is that the closing goroutine from - // the previous test sets them to false and confuses the test that is running now... - outputs = make([]bool, 2, 2) - mutex = &sync.Mutex{} - - go health.MonitorHealth( - &outputs[0], &outputs[1], mutex, - set.From(SOURCE1, SOURCE2), - set.From(SOURCE2, SOURCE3), - healthChannel, - ) - }) - - AfterEach(func() { - close(healthChannel) - Eventually(getReady).Should(BeFalse()) - Eventually(getLive).Should(BeFalse()) - }) - - It("initially reports false", func() { - Expect(getReady()).To(BeFalse()) - Expect(getLive()).To(BeFalse()) - }) - - Context("with indicators for readiness sources", func() { - - BeforeEach(func() { - notifySource(SOURCE1)() - notifySource(SOURCE2)() - }) - - It("is ready but not live", func() { - Eventually(getReady).Should(BeTrue()) - Expect(getLive()).To(BeFalse()) - }) - - Context("with liveness source also", func() { - - BeforeEach(notifySource(SOURCE3)) - - It("is ready and live", func() { - Eventually(getReady).Should(BeTrue()) - Eventually(getLive).Should(BeTrue()) - }) - }) - - Context("with a source cancelled", func() { - - BeforeEach(cancelSource(SOURCE1)) - - It("is not ready and not live", func() { - Eventually(getReady).Should(BeFalse()) - Eventually(getLive).Should(BeFalse()) - }) - }) - }) - - Context("with indicators for liveness sources", func() { - - BeforeEach(func() { - notifySource(SOURCE3)() - notifySource(SOURCE2)() - }) - - It("is live but not ready", func() { - Eventually(getLive).Should(BeTrue()) - Expect(getReady()).To(BeFalse()) - }) - - Context("with readiness source also", func() { - - BeforeEach(notifySource(SOURCE1)) - - It("is ready and live", func() { - Eventually(getReady).Should(BeTrue()) - Eventually(getLive).Should(BeTrue()) - }) - - Context("with time passing so that indicators expire", func() { - - BeforeEach(func() { - time.Sleep(2 * time.Second) - }) - - It("is not ready and not live", func() { - Eventually(getReady).Should(BeFalse()) - Eventually(getLive).Should(BeFalse()) - }) - }) - }) - - Context("with a source cancelled", func() { - - BeforeEach(cancelSource(SOURCE3)) - - It("is not ready and not live", func() { - Eventually(getReady).Should(BeFalse()) - Eventually(getLive).Should(BeFalse()) - }) - }) - }) -}) diff --git a/pkg/k8s/lookup.go b/pkg/k8s/lookup.go index 0c867e8a..cd17d348 100644 --- a/pkg/k8s/lookup.go +++ b/pkg/k8s/lookup.go @@ -20,7 +20,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "github.com/projectcalico/typha/pkg/set" + "github.com/projectcalico/libcalico-go/lib/set" ) func NewK8sAPI() *RealK8sAPI { diff --git a/pkg/set/set.go b/pkg/set/set.go deleted file mode 100644 index a3fc94f9..00000000 --- a/pkg/set/set.go +++ /dev/null @@ -1,146 +0,0 @@ -// Copyright (c) 2016-2017 Tigera, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package set - -import ( - "errors" - "reflect" - - log "github.com/Sirupsen/logrus" -) - -type Set interface { - Len() int - Add(interface{}) - AddAll(itemArray interface{}) - Discard(interface{}) - Clear() - Contains(interface{}) bool - Iter(func(item interface{}) error) - Copy() Set - Equals(Set) bool - ContainsAll(Set) bool -} - -type empty struct{} - -var emptyValue = empty{} - -var ( - StopIteration = errors.New("Stop iteration") - RemoveItem = errors.New("Remove item") -) - -func New() Set { - return make(mapSet) -} - -func From(members ...interface{}) Set { - s := New() - s.AddAll(members) - return s -} - -func FromArray(membersArray interface{}) Set { - s := New() - s.AddAll(membersArray) - return s -} - -func Empty() Set { - return mapSet(nil) -} - -type mapSet map[interface{}]empty - -func (set mapSet) Len() int { - return len(set) -} - -func (set mapSet) Add(item interface{}) { - set[item] = emptyValue -} - -func (set mapSet) AddAll(itemArray interface{}) { - - arrVal := reflect.ValueOf(itemArray) - for i := 0; i < arrVal.Len(); i++ { - set.Add(arrVal.Index(i).Interface()) - } -} - -func (set mapSet) Discard(item interface{}) { - delete(set, item) -} - -func (set mapSet) Clear() { - for item := range set { - delete(set, item) - } -} - -func (set mapSet) Contains(item interface{}) bool { - _, present := set[item] - return present -} - -func (set mapSet) Iter(visitor func(item interface{}) error) { -loop: - for item := range set { - err := visitor(item) - switch err { - case StopIteration: - break loop - case RemoveItem: - delete(set, item) - case nil: - break - default: - log.WithError(err).Panic("Unexpected iteration error") - } - } -} - -func (set mapSet) Copy() Set { - cpy := New() - for item := range set { - cpy.Add(item) - } - return cpy -} - -func (set mapSet) Equals(other Set) bool { - if set.Len() != other.Len() { - return false - } - for item := range set { - if !other.Contains(item) { - return false - } - } - return true -} - -func (set mapSet) ContainsAll(other Set) bool { - result := true - other.Iter(func(item interface{}) error { - if !set.Contains(item) { - result = false - return StopIteration - } - return nil - }) - return result -} diff --git a/pkg/set/set_suite_test.go b/pkg/set/set_suite_test.go deleted file mode 100644 index 072c66c8..00000000 --- a/pkg/set/set_suite_test.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright (c) 2016-2017 Tigera, Inc. All rights reserved. - -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package set_test - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "testing" - - "github.com/projectcalico/libcalico-go/lib/testutils" -) - -func init() { - testutils.HookLogrusForGinkgo() -} - -func TestSet(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Set Suite") -} diff --git a/pkg/set/set_test.go b/pkg/set/set_test.go deleted file mode 100644 index 0994702f..00000000 --- a/pkg/set/set_test.go +++ /dev/null @@ -1,216 +0,0 @@ -// Copyright (c) 2016-2017 Tigera, Inc. All rights reserved. - -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package set_test - -import ( - "github.com/projectcalico/typha/pkg/set" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" -) - -var _ = Describe("Set", func() { - var s set.Set - BeforeEach(func() { - s = set.New() - }) - - It("should be empty", func() { - Expect(s.Len()).To(BeZero()) - }) - It("should iterate over no items", func() { - called := false - s.Iter(func(item interface{}) error { - called = true - return nil - }) - Expect(called).To(BeFalse()) - }) - It("should do nothing on clear", func() { - s.Clear() - Expect(s.Len()).To(BeZero()) - }) - - Describe("Set created by FromArray", func() { - BeforeEach(func() { - s = set.FromArray([]int{1, 2}) - }) - It("should contain 1", func() { - Expect(s.Contains(1)).To(BeTrue()) - }) - It("should contain 2", func() { - Expect(s.Contains(2)).To(BeTrue()) - }) - It("should not contain 3", func() { - Expect(s.Contains(3)).To(BeFalse()) - }) - }) - - Describe("Set created by From", func() { - BeforeEach(func() { - s = set.From(1, 2) - }) - It("should contain 1", func() { - Expect(s.Contains(1)).To(BeTrue()) - }) - It("should contain 2", func() { - Expect(s.Contains(2)).To(BeTrue()) - }) - It("should not contain 3", func() { - Expect(s.Contains(3)).To(BeFalse()) - }) - }) - - Describe("after adding 1 and 2", func() { - BeforeEach(func() { - s.Add(1) - s.Add(2) - s.Add(2) // Duplicate should have no effect - }) - It("should contain 1", func() { - Expect(s.Contains(1)).To(BeTrue()) - }) - It("should contain 2", func() { - Expect(s.Contains(2)).To(BeTrue()) - }) - It("should not contain 3", func() { - Expect(s.Contains(3)).To(BeFalse()) - }) - It("should iterate over 1 and 2 in some order", func() { - seen1 := false - seen2 := false - s.Iter(func(item interface{}) error { - if item.(int) == 1 { - Expect(seen1).To(BeFalse()) - seen1 = true - } else if item.(int) == 2 { - Expect(seen2).To(BeFalse()) - seen2 = true - } else { - Fail("Unexpected item") - } - return nil - }) - Expect(seen1).To(BeTrue()) - Expect(seen2).To(BeTrue()) - }) - It("should allow remove during iteration", func() { - s.Iter(func(item interface{}) error { - if item.(int) == 1 { - return set.RemoveItem - } - return nil - }) - Expect(s.Contains(1)).To(BeFalse()) - Expect(s.Contains(2)).To(BeTrue()) - }) - It("should support stopping iteration", func() { - iterationStarted := false - s.Iter(func(item interface{}) error { - if iterationStarted { - Fail("Iteration continued after stop") - } - iterationStarted = true - return set.StopIteration - }) - Expect(s.Contains(1)).To(BeTrue()) - Expect(s.Contains(2)).To(BeTrue()) - }) - It("can copy a Set", func() { - c := s.Copy() - Expect(c.Len()).To(Equal(s.Len())) - Expect(c).NotTo(BeIdenticalTo(s)) // Check they're not the same object. - Expect(c).To(Equal(s)) // DeepEquals, will check the contents. - }) - It("should correctly determine set equality", func() { - c := s.Copy() - Expect(c.Equals(s)).To(BeTrue()) - Expect(s.Equals(c)).To(BeTrue()) - c.Add(3) - Expect(c.Equals(s)).To(BeFalse()) - Expect(s.Equals(c)).To(BeFalse()) - c.Discard(2) - Expect(c.Equals(s)).To(BeFalse()) - Expect(s.Equals(c)).To(BeFalse()) - c.Add(2) - c.Discard(3) - Expect(c.Equals(s)).To(BeTrue()) - Expect(s.Equals(c)).To(BeTrue()) - }) - - Describe("after removing 2", func() { - BeforeEach(func() { - s.Discard(2) - }) - It("should contain 1", func() { - Expect(s.Contains(1)).To(BeTrue()) - }) - It("should not contain 2", func() { - Expect(s.Contains(2)).To(BeFalse()) - }) - It("should not contain 3", func() { - Expect(s.Contains(3)).To(BeFalse()) - }) - }) - Describe("after using AddAll to add 2, 3, 4", func() { - BeforeEach(func() { - s.AddAll([]int{2, 3, 4}) - }) - It("should contain 1", func() { - Expect(s.Contains(1)).To(BeTrue()) - }) - It("should contain 2", func() { - Expect(s.Contains(2)).To(BeTrue()) - }) - It("should contain 3", func() { - Expect(s.Contains(3)).To(BeTrue()) - }) - It("should contain 4", func() { - Expect(s.Contains(4)).To(BeTrue()) - }) - }) - - Describe("after Clear()", func() { - BeforeEach(func() { - s.Clear() - }) - It("should be empty", func() { - Expect(s.Len()).To(BeZero()) - }) - }) - }) -}) - -var _ = Describe("EmptySet", func() { - var empty set.Set - BeforeEach(func() { - empty = set.Empty() - }) - It("has length 0", func() { - Expect(empty.Len()).To(Equal(0)) - }) - It("should panic on add", func() { - Expect(func() { empty.Add("foo") }).To(Panic()) - }) - It("should ignore discard", func() { - Expect(func() { empty.Discard("foo") }).NotTo(Panic()) - }) - It("should iterate 0 times", func() { - empty.Iter(func(item interface{}) error { - Fail("Iterated > 0 times") - return nil - }) - }) -}) From e87dd940a45c5dad911d3ed1959fda5502928f1e Mon Sep 17 00:00:00 2001 From: Neil Jerram Date: Tue, 27 Jun 2017 14:51:14 +0100 Subject: [PATCH 3/7] Typha's default health port should be different from Felix's --- pkg/config/config_params.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/config_params.go b/pkg/config/config_params.go index 96a48bcc..d0652b78 100644 --- a/pkg/config/config_params.go +++ b/pkg/config/config_params.go @@ -104,7 +104,7 @@ type Config struct { LogSeveritySys string `config:"oneof(DEBUG,INFO,WARNING,ERROR,CRITICAL);INFO"` HealthEnabled bool `config:"bool;false"` - HealthPort int `config:"int(0,65535);9099"` + HealthPort int `config:"int(0,65535);9098"` PrometheusMetricsEnabled bool `config:"bool;false"` PrometheusMetricsPort int `config:"int(0,65535);9093"` PrometheusGoMetricsEnabled bool `config:"bool;true"` From e025af382c3ae53deded5b2d5b3646b5ca8c7ec9 Mon Sep 17 00:00:00 2001 From: Neil Jerram Date: Tue, 27 Jun 2017 15:18:47 +0100 Subject: [PATCH 4/7] Monitor health from the Typha sync server --- pkg/daemon/daemon.go | 4 ++++ pkg/syncserver/sync_server.go | 40 +++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 679dc704..d6a638b9 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -29,6 +29,7 @@ import ( "time" log "github.com/Sirupsen/logrus" + docopt "github.com/docopt/docopt-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -256,8 +257,11 @@ func (t *TyphaDaemon) CreateServer() { DropInterval: t.ConfigParams.ConnectionDropIntervalSecs, MaxConns: t.ConfigParams.MaxConnectionsUpperLimit, Port: t.ConfigParams.ServerPort, + HealthChannel: t.healthChannel, }, ) + t.neededForReady.AddAll(t.Server.ReadySources()) + t.neededForLive.AddAll(t.Server.LiveSources()) } // Start starts all the server components in background goroutines. diff --git a/pkg/syncserver/sync_server.go b/pkg/syncserver/sync_server.go index bbb53df2..806037f9 100644 --- a/pkg/syncserver/sync_server.go +++ b/pkg/syncserver/sync_server.go @@ -29,6 +29,7 @@ import ( "github.com/projectcalico/libcalico-go/lib/backend/api" "github.com/projectcalico/typha/pkg/buildinfo" + "github.com/projectcalico/typha/pkg/health" "github.com/projectcalico/typha/pkg/jitter" "github.com/projectcalico/typha/pkg/snapcache" "github.com/projectcalico/typha/pkg/syncproto" @@ -132,6 +133,7 @@ type Config struct { PongTimeout time.Duration DropInterval time.Duration MaxConns int + HealthChannel chan<- health.HealthIndicator } func (c *Config) ApplyDefaults() { @@ -320,12 +322,14 @@ func (s *Server) recordConnection(conn *connection) { s.connTrackingLock.Lock() s.connIDToConn[conn.ID] = conn s.connTrackingLock.Unlock() + s.reportHealth() } func (s *Server) discardConnection(conn *connection) { s.connTrackingLock.Lock() delete(s.connIDToConn, conn.ID) s.connTrackingLock.Unlock() + s.reportHealth() } func (s *Server) governNumberOfConnections(cxt context.Context) { @@ -333,6 +337,11 @@ func (s *Server) governNumberOfConnections(cxt context.Context) { logCxt := log.WithField("thread", "numConnsGov") maxConns := s.maxConns ticker := jitter.NewTicker(s.dropInterval, s.dropInterval/10) + var healthTicks <-chan time.Time + if s.config.HealthChannel != nil { + healthTicks = time.NewTicker(10 * time.Second).C + } + s.reportHealth() for { select { case newMax := <-s.maxConnsC: @@ -370,6 +379,37 @@ func (s *Server) governNumberOfConnections(cxt context.Context) { case <-cxt.Done(): logCxt.Info("Context asked us to stop") return + case <-healthTicks: + s.reportHealth() + } + } +} + +type healthSource string + +var ( + SERVER_RUNNING = healthSource("running") + SERVER_READY = healthSource("ready") +) + +func (s *Server) ReadySources() []health.HealthSource { + return []health.HealthSource{SERVER_READY} +} + +func (s *Server) LiveSources() []health.HealthSource { + return []health.HealthSource{SERVER_RUNNING} +} + +func (s *Server) reportHealth() { + if s.config.HealthChannel != nil { + s.config.HealthChannel <- health.HealthIndicator{SERVER_RUNNING, 20 * time.Second} + if !s.atConnLimit() { + // Ready to accept more connections. + s.config.HealthChannel <- health.HealthIndicator{SERVER_READY, 20 * time.Second} + } else { + // Immediately report that we're now out of capacity - rather than waiting + // for a previous 'ready' indication to expire. + s.config.HealthChannel <- health.HealthIndicator{SERVER_READY, 0} } } } From cd9662bd520dba908d51157c6dbeb95788680b40 Mon Sep 17 00:00:00 2001 From: Neil Jerram Date: Tue, 4 Jul 2017 18:06:46 +0100 Subject: [PATCH 5/7] Updates for reviewed health code --- pkg/daemon/daemon.go | 28 ++++--------------- pkg/syncserver/sync_server.go | 52 +++++++++++++---------------------- 2 files changed, 25 insertions(+), 55 deletions(-) diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index d6a638b9..e1db5bfe 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -37,7 +37,6 @@ import ( "github.com/projectcalico/libcalico-go/lib/backend" bapi "github.com/projectcalico/libcalico-go/lib/backend/api" "github.com/projectcalico/libcalico-go/lib/health" - "github.com/projectcalico/libcalico-go/lib/set" "github.com/projectcalico/typha/pkg/buildinfo" "github.com/projectcalico/typha/pkg/calc" "github.com/projectcalico/typha/pkg/config" @@ -81,9 +80,7 @@ type TyphaDaemon struct { ConfigureLogging func(configParams *config.Config) // Health monitoring. - healthChannel chan health.HealthIndicator - neededForReady set.Set - neededForLive set.Set + healthAggregator *health.HealthAggregator } func New() *TyphaDaemon { @@ -222,14 +219,8 @@ configRetry: func (t *TyphaDaemon) CreateServer() { // Now create the Syncer; our caching layer and the TCP server. - // Health monitoring: as we create Typha components after this point, we'll add health - // sources that we expect those components to report, to feed into the determination of - // Typha's liveness and readiness. - t.neededForReady = set.New() - t.neededForLive = set.New() - if t.ConfigParams.HealthEnabled { - t.healthChannel = make(chan health.HealthIndicator) - } + // Health monitoring, for liveness and readiness endpoints. + t.healthAggregator = health.NewHealthAggregator() // Get a Syncer from the datastore, which will feed the validator layer with updates. t.SyncerToValidator = calc.NewSyncerCallbacksDecoupler() @@ -257,11 +248,9 @@ func (t *TyphaDaemon) CreateServer() { DropInterval: t.ConfigParams.ConnectionDropIntervalSecs, MaxConns: t.ConfigParams.MaxConnectionsUpperLimit, Port: t.ConfigParams.ServerPort, - HealthChannel: t.healthChannel, + HealthAggregator: t.healthAggregator, }, ) - t.neededForReady.AddAll(t.Server.ReadySources()) - t.neededForLive.AddAll(t.Server.LiveSources()) } // Start starts all the server components in background goroutines. @@ -289,13 +278,8 @@ func (t *TyphaDaemon) Start(cxt context.Context) { } if t.ConfigParams.HealthEnabled { - log.Info("Health enabled. Starting server.") - go health.ServeHealth( - t.ConfigParams.HealthPort, - t.neededForReady, - t.neededForLive, - t.healthChannel, - ) + log.WithField("port", t.ConfigParams.HealthPort).Info("Health enabled. Starting server.") + go t.healthAggregator.ServeHTTP(t.ConfigParams.HealthPort) } } diff --git a/pkg/syncserver/sync_server.go b/pkg/syncserver/sync_server.go index 806037f9..daf807b8 100644 --- a/pkg/syncserver/sync_server.go +++ b/pkg/syncserver/sync_server.go @@ -28,8 +28,8 @@ import ( "math" "github.com/projectcalico/libcalico-go/lib/backend/api" + "github.com/projectcalico/libcalico-go/lib/health" "github.com/projectcalico/typha/pkg/buildinfo" - "github.com/projectcalico/typha/pkg/health" "github.com/projectcalico/typha/pkg/jitter" "github.com/projectcalico/typha/pkg/snapcache" "github.com/projectcalico/typha/pkg/syncproto" @@ -133,9 +133,14 @@ type Config struct { PongTimeout time.Duration DropInterval time.Duration MaxConns int - HealthChannel chan<- health.HealthIndicator + HealthAggregator *health.HealthAggregator } +const ( + healthName = "sync_server" + healthInterval = 10 * time.Second +) + func (c *Config) ApplyDefaults() { if c.MaxMessageSize < 1 { log.WithFields(log.Fields{ @@ -216,6 +221,15 @@ func New(cache BreadcrumbProvider, config Config) *Server { connIDToConn: map[uint64]*connection{}, listeningC: make(chan struct{}), } + + // Register that we will report liveness. + if config.HealthAggregator != nil { + config.HealthAggregator.RegisterReporter( + healthName, + &health.HealthReport{Live: true}, + healthInterval*2, + ) + } } func (s *Server) Start(cxt context.Context) { @@ -322,14 +336,12 @@ func (s *Server) recordConnection(conn *connection) { s.connTrackingLock.Lock() s.connIDToConn[conn.ID] = conn s.connTrackingLock.Unlock() - s.reportHealth() } func (s *Server) discardConnection(conn *connection) { s.connTrackingLock.Lock() delete(s.connIDToConn, conn.ID) s.connTrackingLock.Unlock() - s.reportHealth() } func (s *Server) governNumberOfConnections(cxt context.Context) { @@ -337,10 +349,7 @@ func (s *Server) governNumberOfConnections(cxt context.Context) { logCxt := log.WithField("thread", "numConnsGov") maxConns := s.maxConns ticker := jitter.NewTicker(s.dropInterval, s.dropInterval/10) - var healthTicks <-chan time.Time - if s.config.HealthChannel != nil { - healthTicks = time.NewTicker(10 * time.Second).C - } + healthTicks := time.NewTicker(healthInterval).C s.reportHealth() for { select { @@ -385,32 +394,9 @@ func (s *Server) governNumberOfConnections(cxt context.Context) { } } -type healthSource string - -var ( - SERVER_RUNNING = healthSource("running") - SERVER_READY = healthSource("ready") -) - -func (s *Server) ReadySources() []health.HealthSource { - return []health.HealthSource{SERVER_READY} -} - -func (s *Server) LiveSources() []health.HealthSource { - return []health.HealthSource{SERVER_RUNNING} -} - func (s *Server) reportHealth() { - if s.config.HealthChannel != nil { - s.config.HealthChannel <- health.HealthIndicator{SERVER_RUNNING, 20 * time.Second} - if !s.atConnLimit() { - // Ready to accept more connections. - s.config.HealthChannel <- health.HealthIndicator{SERVER_READY, 20 * time.Second} - } else { - // Immediately report that we're now out of capacity - rather than waiting - // for a previous 'ready' indication to expire. - s.config.HealthChannel <- health.HealthIndicator{SERVER_READY, 0} - } + if s.config.HealthAggregator != nil { + s.config.HealthAggregator.Report(healthName, &health.HealthReport{Live: true}) } } From 611d9d0fc45680c4b8ae0ba777a6969bb044190d Mon Sep 17 00:00:00 2001 From: Neil Jerram Date: Tue, 4 Jul 2017 18:33:05 +0100 Subject: [PATCH 6/7] Typha readiness depending on API server connection / sync --- pkg/daemon/daemon.go | 3 ++- pkg/snapcache/cache.go | 31 ++++++++++++++++++++++++++++--- pkg/syncserver/sync_server.go | 4 +++- 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index e1db5bfe..4bd981c7 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -233,7 +233,8 @@ func (t *TyphaDaemon) CreateServer() { // Create our snapshot cache, which stores point-in-time copies of the datastore contents. t.Cache = snapcache.New(snapcache.Config{ - MaxBatchSize: t.ConfigParams.SnapshotCacheMaxBatchSize, + MaxBatchSize: t.ConfigParams.SnapshotCacheMaxBatchSize, + HealthAggregator: t.healthAggregator, }) // Create the server, which listens for connections from Felix. diff --git a/pkg/snapcache/cache.go b/pkg/snapcache/cache.go index ded77484..6d82adf2 100644 --- a/pkg/snapcache/cache.go +++ b/pkg/snapcache/cache.go @@ -26,6 +26,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/projectcalico/libcalico-go/lib/backend/api" + "github.com/projectcalico/libcalico-go/lib/health" "github.com/projectcalico/typha/pkg/jitter" "github.com/projectcalico/typha/pkg/syncproto" ) @@ -125,11 +126,18 @@ type Cache struct { currentBreadcrumb unsafe.Pointer wakeUpTicker *jitter.Ticker + healthTicks <-chan time.Time } +const ( + healthName = "cache" + healthInterval = 10 * time.Second +) + type Config struct { - MaxBatchSize int - WakeUpInterval time.Duration + MaxBatchSize int + WakeUpInterval time.Duration + HealthAggregator *health.HealthAggregator } func (config *Config) ApplyDefaults() { @@ -158,14 +166,20 @@ func New(config Config) *Cache { nextCond: cond, KVs: kvs.ReadOnlySnapshot(), } - return &Cache{ + c := &Cache{ config: config, inputC: make(chan interface{}, config.MaxBatchSize*2), breadcrumbCond: cond, kvs: kvs, currentBreadcrumb: (unsafe.Pointer)(snap), wakeUpTicker: jitter.NewTicker(config.WakeUpInterval, config.WakeUpInterval/10), + healthTicks: time.NewTicker(healthInterval).C, + } + if config.HealthAggregator != nil { + config.HealthAggregator.RegisterReporter(healthName, &health.HealthReport{Live: true, Ready: true}, healthInterval*2) } + c.reportHealth() + return c } // CurrentBreadcrumb returns the current Breadcrumb, which contains a snapshot of the datastore @@ -252,10 +266,21 @@ func (c *Cache) fillBatchFromInputQueue(ctx context.Context) error { // wake all the clients so they can check if their Context is done. log.Debug("Waking all clients.") c.breadcrumbCond.Broadcast() + case <-c.healthTicks: + c.reportHealth() } return ctx.Err() } +func (c *Cache) reportHealth() { + if c.config.HealthAggregator != nil { + c.config.HealthAggregator.Report(healthName, &health.HealthReport{ + Live: true, + Ready: c.pendingStatus == api.InSync, + }) + } +} + // publishBreadcrumbs sends a series of Breadcrumbs, draining the pending updates list. func (c *Cache) publishBreadcrumbs() { for { diff --git a/pkg/syncserver/sync_server.go b/pkg/syncserver/sync_server.go index daf807b8..6e8d415f 100644 --- a/pkg/syncserver/sync_server.go +++ b/pkg/syncserver/sync_server.go @@ -212,7 +212,7 @@ func (c *Config) ListenPort() int { func New(cache BreadcrumbProvider, config Config) *Server { config.ApplyDefaults() log.WithField("config", config).Info("Creating server") - return &Server{ + s := &Server{ config: config, cache: cache, maxConnsC: make(chan int), @@ -230,6 +230,8 @@ func New(cache BreadcrumbProvider, config Config) *Server { healthInterval*2, ) } + + return s } func (s *Server) Start(cxt context.Context) { From a8300f8c2a9daa4ab9905ac21541b632359f4acb Mon Sep 17 00:00:00 2001 From: Neil Jerram Date: Tue, 4 Jul 2017 19:28:42 +0100 Subject: [PATCH 7/7] Remove gomega pin --- glide.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/glide.yaml b/glide.yaml index 34f5dc1c..3cfc0ec9 100644 --- a/glide.yaml +++ b/glide.yaml @@ -58,4 +58,3 @@ import: version: ^0.3.0 testImport: - package: github.com/onsi/gomega - version: 9b8c753e8dfb382618ba8fa19b4197b5dcb0434c