Skip to content

Commit

Permalink
WIP: Sets eventgenerator healthenpoint on server port
Browse files Browse the repository at this point in the history
  • Loading branch information
bonzofenix committed Jun 17, 2024
1 parent a25c18c commit dc4566c
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 34 deletions.
29 changes: 14 additions & 15 deletions src/autoscaler/eventgenerator/cmd/eventgenerator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"io"
"time"

"code.cloudfoundry.org/app-autoscaler/src/autoscaler/db/sqldb"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/eventgenerator/aggregator"
Expand All @@ -21,7 +20,6 @@ import (

"code.cloudfoundry.org/clock"
"code.cloudfoundry.org/lager/v3"
"github.com/prometheus/client_golang/prometheus"
"github.com/tedsuo/ifrit"
"github.com/tedsuo/ifrit/grouper"
"github.com/tedsuo/ifrit/sigmon"
Expand Down Expand Up @@ -57,12 +55,7 @@ func main() {
defer func() { _ = policyDb.Close() }()

httpStatusCollector := healthendpoint.NewHTTPStatusCollector("autoscaler", "eventgenerator")
promRegistry := prometheus.NewRegistry()
healthendpoint.RegisterCollectors(promRegistry, []prometheus.Collector{
healthendpoint.NewDatabaseStatusCollector("autoscaler", "eventgenerator", "appMetricDB", appMetricDB),
healthendpoint.NewDatabaseStatusCollector("autoscaler", "eventgenerator", "policyDB", policyDb),
httpStatusCollector,
}, true, logger.Session("eventgenerator-prometheus"))
promRegistry := server.CreatePrometheusRegistry(appMetricDB, policyDb, httpStatusCollector, logger)

appManager := aggregator.NewAppManager(logger, egClock, conf.Aggregator.PolicyPollerInterval, len(conf.Server.NodeAddrs), conf.Server.NodeIndex, conf.Aggregator.MetricCacheSizePerApp, policyDb, appMetricDB)

Expand Down Expand Up @@ -101,15 +94,10 @@ func main() {
logger.Error("failed to create http server", err)
os.Exit(1)
}
healthServer, err := healthendpoint.NewServerWithBasicAuth(conf.Health, []healthendpoint.Checker{}, logger.Session("health-server"), promRegistry, time.Now)
if err != nil {
logger.Error("failed to create health server", err)
os.Exit(1)
}

members := grouper.Members{
{"eventGenerator", eventGenerator},
{"http_server", httpServer},
{"health_server", healthServer},
}
monitor := ifrit.Invoke(sigmon.New(grouper.NewOrdered(os.Interrupt, members)))

Expand Down Expand Up @@ -155,7 +143,8 @@ func loadConfig(path string) (*config.Config, error) {
}

configFileBytes, err := io.ReadAll(configFile)
_ = configFile.Close()
defer func() { _ = configFile.Close() }()

if err != nil {
return nil, fmt.Errorf("failed to read data from config file %q: %w", path, err)
}
Expand All @@ -172,6 +161,16 @@ func loadConfig(path string) (*config.Config, error) {
return conf, nil
}

func createEventgeneratorServer(logger lager.Logger, conf *config.Config, queryMetrics aggregator.QueryAppMetricsFunc, httpStatusCollector *healthendpoint.HTTPStatusCollector) ifrit.Runner {
httpServer, err := server.NewServer(logger.Session("http_server"), conf, queryMetrics, *httpStatusCollector)
if err != nil {
logger.Fatal("failed to create http server", err)
os.Exit(1)
}

return httpServer
}

func createEvaluators(logger lager.Logger, conf *config.Config, triggersChan chan []*models.Trigger, queryMetrics aggregator.QueryAppMetricsFunc, getBreaker func(string) *circuit.Breaker, setCoolDownExpired func(string, int64)) ([]*generator.Evaluator, error) {
count := conf.Evaluator.EvaluatorCount

Expand Down
48 changes: 45 additions & 3 deletions src/autoscaler/eventgenerator/server/server.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package server

import (
"fmt"
"net/http"
"time"

"code.cloudfoundry.org/app-autoscaler/src/autoscaler/db"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/eventgenerator/aggregator"
"code.cloudfoundry.org/app-autoscaler/src/autoscaler/helpers"
"go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux"
Expand All @@ -13,6 +16,7 @@ import (

"code.cloudfoundry.org/lager/v3"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/tedsuo/ifrit"
)

Expand All @@ -23,18 +27,56 @@ func (vh VarsFunc) ServeHTTP(w http.ResponseWriter, r *http.Request) {
vh(w, r, vars)
}

func NewServer(logger lager.Logger, conf *config.Config, queryAppMetric aggregator.QueryAppMetricsFunc, httpStatusCollector healthendpoint.HTTPStatusCollector) (ifrit.Runner, error) {
func NewServer(logger lager.Logger, conf *config.Config, appMetricDB db.AppMetricDB, policyDb db.PolicyDB, queryAppMetric aggregator.QueryAppMetricsFunc, httpStatusCollector healthendpoint.HTTPStatusCollector) (ifrit.Runner, error) {
eh := NewEventGenHandler(logger, queryAppMetric)
httpStatusCollectMiddleware := healthendpoint.NewHTTPStatusCollectMiddleware(httpStatusCollector)
r := routes.EventGeneratorRoutes()
r.Use(otelmux.Middleware("eventgenerator"))
r.Use(httpStatusCollectMiddleware.Collect)
r.Get(routes.GetAggregatedMetricHistoriesRouteName).Handler(VarsFunc(eh.GetAggregatedMetricHistories))

httpServerConfig := helpers.ServerConfig{
healthRouter, err := createHealthRouter(appMetricDB, policyDb, logger, conf, httpStatusCollector)
if err != nil {
return nil, fmt.Errorf("failed to create health router: %w", err)
}

mainRouter := setupMainRouter(r, healthRouter)

return helpers.NewHTTPServer(logger, serverConfigFrom(conf), mainRouter)
}

func serverConfigFrom(conf *config.Config) helpers.ServerConfig {
return helpers.ServerConfig{
Port: conf.Server.Port,
TLS: conf.Server.TLS,
}
}

func createHealthRouter(appMetricDB db.AppMetricDB, policyDb db.PolicyDB, logger lager.Logger, conf *config.Config, httpStatusCollector healthendpoint.HTTPStatusCollector) (*mux.Router, error) {
checkers := []healthendpoint.Checker{}
gatherer := CreatePrometheusRegistry(appMetricDB, policyDb, httpStatusCollector, logger)
healthRouter, err := healthendpoint.NewHealthRouter(conf.Health, checkers, logger.Session("health-server"), gatherer, time.Now)
if err != nil {
return nil, fmt.Errorf("failed to create health router: %w", err)
}

return healthRouter, nil
}

func CreatePrometheusRegistry(appMetricDB db.AppMetricDB, policyDb db.PolicyDB, httpStatusCollector healthendpoint.HTTPStatusCollector, logger lager.Logger) *prometheus.Registry {
promRegistry := prometheus.NewRegistry()
healthendpoint.RegisterCollectors(promRegistry, []prometheus.Collector{
healthendpoint.NewDatabaseStatusCollector("autoscaler", "eventgenerator", "appMetricDB", appMetricDB),
healthendpoint.NewDatabaseStatusCollector("autoscaler", "eventgenerator", "policyDB", policyDb),
httpStatusCollector,
}, true, logger.Session("eventgenerator-prometheus"))
return promRegistry
}

return helpers.NewHTTPServer(logger, httpServerConfig, r)
func setupMainRouter(egRouter, healthRouter *mux.Router) *mux.Router {
mainRouter := mux.NewRouter()
mainRouter.PathPrefix("/v1").Handler(egRouter)
mainRouter.PathPrefix("/health").Handler(healthRouter)
mainRouter.PathPrefix("/").Handler(healthRouter)
return mainRouter
}
7 changes: 6 additions & 1 deletion src/autoscaler/eventgenerator/server/server_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
var (
serverProcess ifrit.Process
serverUrl *url.URL
policyDB *fakes.FakePolicyDB

appMetricDB *fakes.FakeAppMetricDB
)

func TestServer(t *testing.T) {
Expand All @@ -43,7 +46,9 @@ var _ = BeforeSuite(func() {
}

httpStatusCollector := &fakes.FakeHTTPStatusCollector{}
httpServer, err := server.NewServer(lager.NewLogger("test"), conf, queryAppMetrics, httpStatusCollector)
policyDB = &fakes.FakePolicyDB{}
appMetricDB = &fakes.FakeAppMetricDB{}
httpServer, err := server.NewServer(lager.NewLogger("test"), conf, appMetricDB, policyDB, queryAppMetrics, httpStatusCollector)
Expect(err).NotTo(HaveOccurred())

serverUrl, err = url.Parse("http://127.0.0.1:" + strconv.Itoa(port))
Expand Down
38 changes: 23 additions & 15 deletions src/autoscaler/metricsforwarder/cmd/metricsforwarder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,10 @@ func main() {
_, _ = fmt.Fprintln(os.Stderr, "missing config file")
os.Exit(1)
}
configFile, err := os.Open(path)
if err != nil {
_, _ = fmt.Fprintf(os.Stdout, "failed to open config file '%s' : %s\n", path, err.Error())
os.Exit(1)
}

var conf *config.Config
conf, err = config.LoadConfig(configFile)
if err != nil {
_, _ = fmt.Fprintf(os.Stdout, "failed to read config file '%s' : %s\n", path, err.Error())
os.Exit(1)
}
_ = configFile.Close()

err = conf.Validate()
conf, err := loadConfig(path)
if err != nil {
_, _ = fmt.Fprintf(os.Stdout, "failed to validate configuration : %s\n", err.Error())
_, _ = fmt.Fprintf(os.Stdout, "%s\n", err.Error())
os.Exit(1)
}

Expand Down Expand Up @@ -108,3 +95,24 @@ func createCustomMetricsServer(conf *config.Config, logger lager.Logger, policyD
}
return httpServer
}

func loadConfig(path string) (*config.Config, error) {
configFile, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("failed to open config file '%s' : %s", path, err.Error())
}
defer func() { _ = configFile.Close() }()

conf, err := config.LoadConfig(configFile)
if err != nil {
return nil, fmt.Errorf("failed to read config file '%s' : %s", path, err.Error())
}

err = conf.Validate()
if err != nil {
return nil, fmt.Errorf("failed to validate configuration: %w", err)
}

return conf, nil

}

0 comments on commit dc4566c

Please sign in to comment.