Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

build(deps): bump k8s.io/klog/v2 from 2.60.1 to 2.80.1 in /executor-plugins/resource-tagger #4

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion executor-plugins/resource-tagger/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
google.golang.org/grpc v1.47.0
k8s.io/apimachinery v0.24.2
k8s.io/client-go v0.24.2
k8s.io/klog/v2 v2.60.1
k8s.io/klog/v2 v2.80.1
)

require (
Expand Down
3 changes: 2 additions & 1 deletion executor-plugins/resource-tagger/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -676,8 +676,9 @@ k8s.io/client-go v0.24.2/go.mod h1:zg4Xaoo+umDsfCWr4fCnmLEtQXyCNXCvJuSsglNcV30=
k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
k8s.io/klog/v2 v2.60.1 h1:VW25q3bZx9uE3vvdL6M8ezOX79vA2Aq1nEWLqNQclHc=
k8s.io/klog/v2 v2.60.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4=
k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42 h1:Gii5eqf+GmIEwGNKQYQClCayuJCe2/4fZUvF7VG99sU=
k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42/go.mod h1:Z/45zLw8lUo4wdiUkI+v/ImEGAvu3WatcZl3lPMR4Rk=
k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
Expand Down
33 changes: 32 additions & 1 deletion observer-plugins/metric-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"flag"
"log"
"net"
"os"
"os/signal"
"syscall"

//"github.com/smoky8/pkg/lib/go/obi"
"google.golang.org/grpc"
Expand All @@ -37,6 +40,9 @@ var (
endpoint = flag.String("endpoint", "/var/run/observer.sock", "unix socket domain for current server")
kubeconfig = flag.String("kubeconfig", "", "kubernetes auth config file")
)
var (
shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
)

func main() {
klog.InitFlags(flag.CommandLine)
Expand All @@ -61,8 +67,11 @@ func main() {
if err != nil {
log.Fatalf("%s create metric client error: %s", server.PluginName, err)
}
metricServer := grpc.NewServer()

// Setup signal watcher to handle cleanup
SetupSignalHandler(*endpoint)

metricServer := grpc.NewServer()
obi.RegisterServerServer(metricServer, server.NewServer(clientSet))
listen, err := net.Listen("unix", *endpoint)
if err != nil {
Expand All @@ -72,3 +81,25 @@ func main() {

klog.Fatalln(metricServer.Serve(listen))
}

// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
func SetupSignalHandler(socketFile string) {
c := make(chan os.Signal)
signal.Notify(c, shutdownSignals...)
go func() {
for s := range c {
switch s {
case os.Interrupt, syscall.SIGTERM:
klog.Infoln("Shutting down normally...")
if err := os.RemoveAll(socketFile); err != nil {
klog.Fatal(err)
}
os.Exit(1)
default:
klog.Infoln("Got signal", s)
}
}
}()
}
32 changes: 31 additions & 1 deletion observer-plugins/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"flag"
"log"
"net"
"os"
"os/signal"
"syscall"

"google.golang.org/grpc"
"k8s.io/client-go/kubernetes"
Expand All @@ -38,6 +41,9 @@ var (
stepSeconds = flag.Int64("step", 60, "query steps")
rangeMinute = flag.Int64("range", 2, "prometheus, the maximum time between two slices within the boundaries.")
)
var (
shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
)

func main() {
klog.InitFlags(flag.CommandLine)
Expand All @@ -64,6 +70,8 @@ func main() {
if err != nil {
klog.Fatal(err)
}
// Setup signal watcher to handle cleanup
SetupSignalHandler(*endpoint)

server := grpc.NewServer()
obi.RegisterServerServer(server, prometheus.NewPrometheusServer(*address, conf, *stepSeconds, *rangeMinute))
Expand All @@ -72,6 +80,28 @@ func main() {
log.Fatal(err)
}

klog.Infof("%s starting work...", prometheus.PluginName)
klog.Infof("%s plugin started ...", prometheus.PluginName)
klog.Fatalln(server.Serve(listen))
}

// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
func SetupSignalHandler(socketFile string) {
c := make(chan os.Signal)
signal.Notify(c, shutdownSignals...)
go func() {
for s := range c {
switch s {
case os.Interrupt, syscall.SIGTERM:
klog.Infoln("Shutting down normally...")
if err := os.RemoveAll(socketFile); err != nil {
klog.Fatal(err)
}
os.Exit(1)
default:
klog.Infoln("Got signal", s)
}
}
}()
}
30 changes: 21 additions & 9 deletions observer-plugins/prometheus/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package prometheus

import (
"encoding/json"
"fmt"
"time"

Expand Down Expand Up @@ -65,7 +66,7 @@ type CalculateAux struct {
Value float64
}

func (p *prometheusServer) Query(startTime, endTime time.Time, query, op string) (DataSeries, error) {
func (p *prometheusServer) Query(startTime, endTime time.Time, kind, query, op string) (DataSeries, error) {
method := "prometheusServer.Query"
ans := DataSeries{Timestamp: endTime.UnixMilli()}
prometheusAPI, err := p.NewPrometheusAPI()
Expand All @@ -86,18 +87,29 @@ func (p *prometheusServer) Query(startTime, endTime time.Time, query, op string)
klog.V(4).Infof("%s quer '%s' result with warnings %v\n", method, warnings)
}

data, err := formatRawValues(result, op)
if err != nil {
return ans, err
}

if f, ok := actionFuncs[op]; ok {
f(data, &ans)
// TODO: Use kind as the raw data query, may add a 'rawData: true' property for this?
if kind == "Pod" || kind == "Node" {
data, err := formatRawValues(result)
if err != nil {
return ans, err
}
if f, ok := actionFuncs[op]; ok {
f(data, &ans)
}
} else {
// Handle raw data if no aggregation defined, just return the json data
jsonValue, err := json.Marshal(result)
if err != nil {
klog.Errorf("failed to marshal result to json: %s", err)
ans.Value = fmt.Sprintf("failed to get json value: %s " + result.String())
} else {
ans.Value = string(jsonValue)
}
}
return ans, nil
}

func formatRawValues(rawValue model.Value, op string) ([]CalculateAux, error) {
func formatRawValues(rawValue model.Value) ([]CalculateAux, error) {
ans := make([]CalculateAux, 0)
switch rawValue.Type() {
case model.ValScalar:
Expand Down
16 changes: 10 additions & 6 deletions observer-plugins/prometheus/prometheus/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
MaxAction = "max"
MinAction = "min"
AvgAction = "avg"
NoneAction = "none"
)

// impl obi interface
Expand Down Expand Up @@ -77,24 +78,27 @@ func (p *prometheusServer) GetMetrics(ctx context.Context, req *obi.GetMetricsRe

var err error
klog.V(4).Infof("prometheus query: %s\n", req.Query)
var resourceName string
if len(req.ResourceNames) > 0 {
resourceName = req.ResourceNames[0]
}
result := &obi.GetMetricsResponse{
ResourceName: req.ResourceNames[0],
ResourceName: resourceName,
Namespace: req.Namespace,
Unit: req.Unit,
Records: []*obi.GetMetricsResponseRecord{},
}

// use avgerage as the default aggregation action
op := AvgAction
if len(req.Aggregation) > 0 {
op = req.Aggregation[0]
}
klog.Infof("exec aggregation is: %s\n", op)
metricData, err := p.Query(startTime, endTime, req.Query, op)
metricData, err := p.Query(startTime, endTime, req.Kind, req.Query, op)
if err != nil {
klog.Errorf("%s query error: %s\n", method, err)
return result, err
}

// only return the latest record
result.Records = append(result.Records, &obi.GetMetricsResponseRecord{Timestamp: metricData.Timestamp, Value: metricData.Value})
/*
Expand All @@ -103,8 +107,8 @@ func (p *prometheusServer) GetMetrics(ctx context.Context, req *obi.GetMetricsRe
}
*/

klog.Infof("query by %s successfully", req.MetricName)
klog.V(5).Infof("%s query by %s result: %v\n", method, req.MetricName, metricData)
klog.Infof("query by metric '%s', query '%s' successfully", req.MetricName, req.Query)
klog.V(5).Infof("%s query by %s, %s result: %v\n", method, req.MetricName, req.Query, metricData)

return result, nil
}