Skip to content

Commit

Permalink
Merge pull request #279 from jzwlqx/bugfix/probe-state
Browse files Browse the repository at this point in the history
bugfix: probe state
  • Loading branch information
BSWANG authored May 22, 2024
2 parents 6e34f1a + cb09e8c commit 9b0d8d8
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 89 deletions.
17 changes: 11 additions & 6 deletions pkg/exporter/cmd/eventserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type EventServer struct {
*DynamicProbeServer[probe.EventProbe]
}

func NewEventServer(sinks []sink.Sink) (*EventServer, error) {
func newEventServer(sinks []sink.Sink) (*EventServer, error) {
var sinkWrappers []*sinkWrapper

done := make(chan struct{})
Expand Down Expand Up @@ -109,13 +109,18 @@ func (m *EventProbeManager) CreateProbe(config ProbeConfig) (probe.EventProbe, e
return probe.CreateEventProbe(config.Name, m.sinkChan, config.Args)
}

func (m *EventProbeManager) StartProbe(ctx context.Context, probe probe.EventProbe) error {
log.Infof("start event probe %s", probe.Name())
return probe.Start(ctx)
func (m *EventProbeManager) StartProbe(ctx context.Context, p probe.EventProbe) error {
log.Infof("start event probe %s", p.Name())
return p.Start(ctx)
}

func (m *EventProbeManager) StopProbe(ctx context.Context, probe probe.EventProbe) error {
return probe.Stop(ctx)
func (m *EventProbeManager) StopProbe(ctx context.Context, p probe.EventProbe) error {
log.Infof("stop event probe %s", p.Name())
state := p.State()
if state == probe.ProbeStateStopped || state == probe.ProbeStateStopping || state == probe.ProbeStateFailed {
return nil
}
return p.Stop(ctx)
}

var _ ProbeManager[probe.MetricsProbe] = &MetricsProbeManager{}
24 changes: 15 additions & 9 deletions pkg/exporter/cmd/metricserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
log "github.com/sirupsen/logrus"
)

func NewMetricsServer() (*MetricsServer, error) {
func newMetricsServer() (*MetricsServer, error) {

r := prometheus.NewRegistry()
handler := promhttp.HandlerFor(prometheus.Gatherers{
Expand All @@ -37,21 +37,27 @@ func (m *MetricsProbeManager) CreateProbe(config ProbeConfig) (probe.MetricsProb
return probe.CreateMetricsProbe(config.Name, config.Args)
}

func (m *MetricsProbeManager) StartProbe(ctx context.Context, probe probe.MetricsProbe) error {
log.Infof("start metrics probe %s", probe.Name())
if err := probe.Start(ctx); err != nil {
func (m *MetricsProbeManager) StartProbe(ctx context.Context, p probe.MetricsProbe) error {
log.Infof("start metrics probe %s", p.Name())
if err := p.Start(ctx); err != nil {
return err
}
m.prometheusRegistry.MustRegister(probe)
m.prometheusRegistry.MustRegister(p)
return nil
}

func (m *MetricsProbeManager) StopProbe(ctx context.Context, probe probe.MetricsProbe) error {
log.Infof("stop metrics probe %s", probe.Name())
if err := probe.Stop(ctx); err != nil {
func (m *MetricsProbeManager) StopProbe(ctx context.Context, p probe.MetricsProbe) error {
log.Infof("stop metrics probe %s", p.Name())

state := p.State()
if state == probe.ProbeStateStopped || state == probe.ProbeStateStopping || state == probe.ProbeStateFailed {
return nil
}

if err := p.Stop(ctx); err != nil {
return err
}
m.prometheusRegistry.Unregister(probe)
m.prometheusRegistry.Unregister(p)
return nil
}

Expand Down
147 changes: 74 additions & 73 deletions pkg/exporter/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,91 +280,93 @@ func (i *inspServer) reload() error {
return nil
}

func (i *inspServer) newHTTPServer(cfg *InspServerConfig) *http.Server {
http.Handle("/metrics", i.metricsServer)
http.Handle("/", http.HandlerFunc(defaultPage))
http.Handle("/status", http.HandlerFunc(i.statusPage))
if cfg.DebugMode {
reg := prometheus.NewRegistry()

reg.MustRegister(
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
)
http.Handle("/internal", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}))
}
listenAddr := fmt.Sprintf(":%d", cfg.Port)
log.Infof("inspector start metric server, listenAddr: %s", listenAddr)
return &http.Server{Addr: listenAddr}
}

func (i *inspServer) start(cfg *InspServerConfig) error {
if err := gops.Listen(gops.Options{}); err != nil {
log.Infof("start gops err: %v", err)
}

go func() {
var err error
ctx := context.TODO()

err = probe.InitAdditionalLabels(cfg.MetricsConfig.AdditionalLabels)
if err != nil {
log.Errorf("failed init additional labels: %v", err)
return
}

log.Infof("start metrics server")
i.metricsServer, err = NewMetricsServer()
if err != nil {
log.Errorf("failed create metrics server: %v", err)
return
}
var err error
ctx := context.TODO()
err = probe.InitAdditionalLabels(cfg.MetricsConfig.AdditionalLabels)
if err != nil {
return fmt.Errorf("failed init additional labels: %w", err)
}

defer func() {
_ = i.metricsServer.Stop(ctx)
}()
i.metricsServer, err = newMetricsServer()
if err != nil {
return fmt.Errorf("failed create metrics server: %w", err)
}

if err := i.metricsServer.Start(ctx, cfg.MetricsConfig.Probes); err != nil {
log.Errorf("failed start metrics server: %v", err)
return
}
if err := i.metricsServer.Start(ctx, cfg.MetricsConfig.Probes); err != nil {
return fmt.Errorf("failed start metrics server: %w", err)
}

//sink
sinks, err := createSink(cfg.EventConfig.EventSinks)
if err != nil {
log.Errorf("failed create sinks, err: %v", err)
} else if len(sinks) != len(cfg.EventConfig.EventSinks) {
log.Warnf("expected to create %d sinks , but %d were created", len(cfg.EventConfig.EventSinks), len(sinks))
}
defer func() {
_ = i.metricsServer.Stop(ctx)
}()

log.Infof("start event server")
//TODO create sinks from config
i.eventServer, err = NewEventServer(sinks)
if err != nil {
log.Errorf("failed create event server: %v", err)
return
}
sinks, err := createSink(cfg.EventConfig.EventSinks)
if err != nil {
return fmt.Errorf("failed create sinks, err: %w", err)
}
if len(sinks) != len(cfg.EventConfig.EventSinks) {
log.Warnf("expected to create %d sinks , but %d were created", len(cfg.EventConfig.EventSinks), len(sinks))
}

defer func() {
_ = i.eventServer.Stop(context.TODO())
}()
i.eventServer, err = newEventServer(sinks)
if err != nil {
return fmt.Errorf("failed create event server: %w", err)
}

if err := i.eventServer.Start(ctx, cfg.EventConfig.Probes); err != nil {
log.Errorf("failed start event server: %v", err)
return
}
if err = i.eventServer.Start(ctx, cfg.EventConfig.Probes); err != nil {
return fmt.Errorf("failed start event server: %w", err)
}

http.Handle("/metrics", i.metricsServer)
http.Handle("/", http.HandlerFunc(defaultPage))
http.Handle("/status", http.HandlerFunc(i.statusPage))
if cfg.DebugMode {
reg := prometheus.NewRegistry()

reg.MustRegister(
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
)
http.Handle("/internal", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}))
}
listenAddr := fmt.Sprintf(":%d", cfg.Port)
log.Infof("inspector start metric server, listenAddr: %s", listenAddr)
srv := &http.Server{Addr: listenAddr}
if err := srv.ListenAndServe(); err != nil {
log.Errorf("inspector start metric server err: %v", err)
}
defer func() {
_ = i.eventServer.Stop(ctx)
}()

done := make(chan struct{})

if err := i.WatchConfig(done); err != nil {
if err = i.WatchConfig(done); err != nil {
log.Errorf("failed watch config, dynamic load would not work: %v", err)
}

WaitSignals(i, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
close(done)
srv := i.newHTTPServer(cfg)
serverClosedChan := make(chan struct{})
serverClosed := false
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Errorf("server error: %v", err)
}

close(serverClosedChan)
serverClosed = true
}()

WaitSignals(serverClosedChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
close(done)
if !serverClosed {
_ = srv.Shutdown(ctx)
}
return nil
}

Expand All @@ -381,16 +383,15 @@ func createSink(sinkConfigs []EventSinkConfig) ([]sink.Sink, error) {
return ret, nil
}

func WaitSignals(i *inspServer, sgs ...os.Signal) {
func WaitSignals(done <-chan struct{}, sgs ...os.Signal) {
s := make(chan os.Signal, 1)
signal.Notify(s, sgs...)
sig := <-s
log.Warnf("recive signal %s, stopping", sig.String())
if err := i.metricsServer.Stop(i.ctx); err != nil {
log.Errorf("failed stop metrics server, err: %v", err)
}
if err := i.eventServer.Stop(i.ctx); err != nil {
log.Errorf("failed stop event server, err: %v", err)
select {
case sig := <-s:
log.Warnf("recive signal %s, stopping", sig.String())
return
case <-done:
log.Warnf("recive server close signal")
}
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/exporter/nettop/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func StartCache(ctx context.Context, sidecarMode bool) error {
}

func StopCache() {
control <- struct{}{}
close(control)
}

func cacheDaemonLoop(_ context.Context, control chan struct{}) {
Expand All @@ -249,10 +249,12 @@ func cacheDaemonLoop(_ context.Context, control chan struct{}) {
t := time.NewTicker(cacheUpdateInterval)
defer t.Stop()

loop:
for {
select {
case <-control:
log.Info("cache daemon loop exit of control signal")
break loop
case <-t.C:
if err := cachePodsWithTimeout(cacheUpdateInterval); err != nil {
log.Errorf("failed cache pods: %v", err)
Expand Down

0 comments on commit 9b0d8d8

Please sign in to comment.