This repository has been archived by the owner on Sep 21, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathhelpers.go
90 lines (78 loc) · 2.64 KB
/
helpers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package controller
import (
"context"
"fmt"
"reflect"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
cfglib "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/logp/configure"
"github.com/elastic/elastic-agent-libs/paths"
"github.com/elastic/elastic-agent-shipper/config"
"github.com/elastic/elastic-agent-shipper/output/elasticsearch"
)
func createOutputHealthReporter(unit *client.Unit) elasticsearch.WatchReporter {
return func(state elasticsearch.WatchState, msg string) {
if state == elasticsearch.WatchDegraded {
_ = unit.UpdateState(client.UnitStateDegraded, fmt.Sprintf("elasticsearch has failed with: %s", msg), nil)
} else {
_ = unit.UpdateState(client.UnitStateHealthy, "elasticsearch has recovered", nil)
}
}
}
// I am not net sure how this should work or what it should do, but we need to read from that error channel
func reportErrors(ctx context.Context, agentClient client.V2) {
log := logp.L()
for {
select {
case <-ctx.Done():
return
case err := <-agentClient.Errors():
log.Errorf("Got error from controller: %s", err)
}
}
}
// initialize the global logging variables
func setLogging() error {
wrapper := struct {
Logging *cfglib.C `config:"logging"`
}{}
err := config.Overwrites.Unpack(&wrapper)
if err != nil {
return fmt.Errorf("error unpacking CLI overwrites for logger: %w", err)
}
err = configure.Logging("shipper", wrapper.Logging)
if err != nil {
return fmt.Errorf("error setting up logging config: %w", err)
}
return nil
}
// check to see if a unit update only updates a unit level change
// logic is: if the two units have idential configs _and_ different log levels, then true
func onlyLogLevelUpdated(newUnit, currentUnit map[string]interface{}, newLog client.UnitLogLevel) bool {
if newUnit == nil || currentUnit == nil {
return false
}
if reflect.DeepEqual(newUnit, currentUnit) && config.ZapFromUnitLogLevel(newLog) != logp.GetLevel() {
return true
}
return false
}
// setPaths sets the global path variables
func setPaths() error {
partialConfig := struct {
Path paths.Path `config:"path"`
}{}
err := config.Overwrites.Unpack(&partialConfig)
if err != nil {
return fmt.Errorf("error extracting default paths: %w", err)
}
err = paths.InitPaths(&partialConfig.Path)
if err != nil {
return fmt.Errorf("error setting default paths: %w", err)
}
return nil
}