Skip to content

Commit

Permalink
Merge pull request #3 from kedark3/working
Browse files Browse the repository at this point in the history
Adding new features
  • Loading branch information
kedark3 authored Nov 23, 2021
2 parents 96c0dde + b36c24f commit 0382386
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 10 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ This tool allows OpenShift users to run a watcher for Prometheus queries and def
* [x] Notify/Do Something(e.g. Pause/Kill benchmark jobs to preserve cluster) when results don't match conditions
* [x] Spawn goroutines to keep running queries and evaluating results to handle scale - e.g. when we have very large number of queries in the yaml file, we can divide and concurrently run queries
* [x] If slack config is not set, it is ignored and no attempts will be made to notify via slack
* [ ] debug mode
* [ ] use env vars
* [ ] Enhance log files to include uuid/time
* [x] debug/verbose mode
* [x] Enhance log files to include uuid/time
* [x] Use env vars
* [x] RFE: come up with a basic "cluster health" profile that anyone can use. Operator monitoring + some best practice monitors from the dittybopper dashboards


## Usage:
Expand Down
Binary file modified bin/cpa
Binary file not shown.
12 changes: 9 additions & 3 deletions cmd/analyze/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,17 @@ func ReadPrometheusQueries(queriesFile string) (queriesList queryList, err error
return queriesList, nil
}

func Queries(queryList queryList, oc *exutil.CLI, baseURL, bearerToken string, c chan string, tb chan bool, terminateBenchmark string) {
func Queries(queryList queryList, oc *exutil.CLI, baseURL, bearerToken string, c chan string, tb chan bool, terminateBenchmark string, verbose bool) {
// start := time.Now()
for _, item := range queryList {
go runQuery(item, oc, baseURL, bearerToken, c, tb, terminateBenchmark)
go runQuery(item, oc, baseURL, bearerToken, c, tb, terminateBenchmark, verbose)
}
wg.Wait()
// end := time.Since(start)
// log.Printf("\n It takes %s time to run queries", end)
}

func runQuery(q queries, oc *exutil.CLI, baseURL, bearerToken string, c chan string, tb chan bool, terminateBenchmark string) {
func runQuery(q queries, oc *exutil.CLI, baseURL, bearerToken string, c chan string, tb chan bool, terminateBenchmark string, verbose bool) {
wg.Add(1)
defer wg.Done()
result, err := prometheus.RunQuery(q.Query, oc, baseURL, bearerToken)
Expand All @@ -79,6 +79,12 @@ func runQuery(q queries, oc *exutil.CLI, baseURL, bearerToken string, c chan str
return
}
opMap := map[string]string{"eq": "==", "lt": "<", "gt": ">", "lte": "<=", "gte": ">="}
if verbose {
log.Printf("Verbose Metric values for %s are:", q.Query)
for _, metric := range result.Data.Result {
log.Printf("%v\n", metric)
}
}
for _, metric := range result.Data.Result {
for _, watchItems := range q.WatchFor {
// log.Println(watchItems.Key, watchItems.Val, watchItems.Threshold)
Expand Down
19 changes: 19 additions & 0 deletions cmd/notify/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,25 @@ func (c *slackConfig) Parse(data []byte) error {
}

func ReadslackConfig() (config slackConfig, err error) {
userID, ok := os.LookupEnv("SLACK_USERID")
if !ok {
log.Println("Didn't find the Slack User ID in the Env Var. Will look it up in config/ dir")
}
channelID, ok := os.LookupEnv("SLACK_CHANNELID")
if !ok {
log.Println("Didn't find the Slack Channel ID in the Env Var. Will look it up in config/ dir")
}
slackToken, ok := os.LookupEnv("SLACK_TOKEN")
if !ok {
log.Println("Didn't find the Slack Token in the Env Var. Will look it up in config/ dir")
}
if userID != "" && channelID != "" && slackToken != "" {
log.Printf("Found env vars for SLACK_USERID, SLACK_CHANNELID, and SLACK_TOKEN as: %s, %s and %s(hidden for security)", userID, channelID, string(slackToken[len(slackToken)-4:]))
config.ChannelID = channelID
config.UserID = userID
config.SlackToken = slackToken
return config, nil
}
data, err := ioutil.ReadFile(configPath + "slack.yaml")
msg := fmt.Sprintf("Cound't read %sslack.yaml", configPath)
if err != nil {
Expand Down
15 changes: 14 additions & 1 deletion cmd/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"log"
"net/http"
"net/url"
"os"
"strings"
"time"

Expand All @@ -39,12 +40,24 @@ func (c *prometheusConfig) Parse(data []byte) error {
}

func readPrometheusConfig() (url, bearerToken string, err error) {
url, ok := os.LookupEnv("PROM_URL")
if !ok {
log.Println("Didn't find the Prometheus URL in the Env Var. Will look it up in config/ dir")
}
bearerToken, ok = os.LookupEnv("BEARER_TOKEN")
if !ok {
log.Println("Didn't find the Prometheus BEARER_TOKEN in the Env Var. Will look it up in config/ dir")
}
if bearerToken != "" && url != "" {
log.Printf("Found env vars for PROM_URL and BEARER_TOKEN as: %s and %s(hidden for security)", url, string(bearerToken[len(bearerToken)-4:]))
return url, bearerToken, nil
}
var config prometheusConfig
data, err := ioutil.ReadFile(configPath + "prometheus.yaml")
msg := fmt.Sprintf("Cound't read %sprometheus.yaml", configPath)
if err != nil {
return "", "", fmt.Errorf(msg)
}
var config prometheusConfig
if err := config.Parse(data); err != nil {
log.Fatal(err)
return "", "", err
Expand Down
106 changes: 105 additions & 1 deletion config/queries.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
- key: condition
val: "Available"
threshold: 33
operator: eq
operator: gte
- query: 'max(sum(container_memory_rss{namespace!="",name!="",container="prometheus"}) by (pod))/1073742000' # 1073742000 is bytes per GiB
watchFor:
- key: nil
Expand Down Expand Up @@ -71,5 +71,109 @@
val: nil
threshold: 3
operator: lt
- query: 'histogram_quantile(0.99, sum(rate(apiserver_request_duration_seconds_bucket{subresource!="log",verb!~"WATCH|WATCHLIST|PROXY"}[5m])) by(verb,le))'
watchFor:
- key: verb
val: PATCH
threshold: 0.50
operator: lte
- key: verb
val: APPLY
threshold: 0.50
operator: lte
- key: verb
val: GET
threshold: 0.50
operator: lte
- key: verb
val: LIST
threshold: 0.50
operator: lte
- key: verb
val: POST
threshold: 0.50
operator: lte
- key: verb
val: PUT
threshold: 0.50
operator: lte
- key: verb
val: DELETE
threshold: 0.50
operator: lte
- query: 'sum(rate(apiserver_request_total[5m])) by(code)'
watchFor:
- key: code
val: 403
threshold: 10
operator: lte
- key: code
val: 404
threshold: 10
operator: lte
- key: code
val: 500
threshold: 10
operator: lte
- key: code
val: 504
threshold: 10
operator: lte
- query: 'sum(apiserver_current_inflight_requests) by (request_kind)'
watchFor:
- key: request_kind
val: mutating
threshold: 10
operator: lte
- key: request_kind
val: readOnly
threshold: 100
operator: lte
- query: 'sum(rate(apiserver_dropped_requests_total[5m])) by (request_kind)'
watchFor:
- key: request_kind
val: mutating
threshold: 10
operator: lte
- key: request_kind
val: readOnly
threshold: 100
operator: lte
- query: 'sum(apiserver_flowcontrol_current_inqueue_requests)' # Pending request count
watchFor:
- key: nil
val: nil
threshold: 10
operator: lte
- query: 'max((etcd_mvcc_db_total_size_in_bytes{} / etcd_server_quota_backend_bytes{})*100)' # Max % DB Space used across all nodes of etcd
watchFor:
- key: nil
val: nil
threshold: 90
operator: lte
- query: 'etcd_server_has_leader'
watchFor:
- key: nil
val: nil
threshold: 1
operator: eq
- query: 'etcd_server_health_failures'
watchFor:
- key: nil
val: nil
threshold: 0
operator: lte
- query: 'etcd_server_health_failures'
watchFor:
- key: nil
val: nil
threshold: 1
operator: lte
- query: 'sum(rate(etcd_server_leader_changes_seen_total[2m]))'
watchFor:
- key: nil
val: nil
threshold: 5
operator: lte
# Metrics of Interest: ovnkube_master_requeue_service_total, ovnkube_master_skipped_nbctl_daemon_total, ovnkube_master_sync_service_total, max(ovnkube_master_ovn_cli_latency_seconds_count) by (command)
# max(ovnkube_master_pod_creation_latency_seconds_bucket), ovnkube_master_workqueue_depth, max(ovnkube_master_workqueue_retries_total),ovnkube_node_cni_request_duration_seconds_count
5 changes: 3 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ func main() {
Timeout time.Duration `arg:"-t,--timeout" help:"Duration to run Continuous Performance Analysis. You can pass values like 4h or 1h10m10s" default:"4h"`
LogOutput bool `arg:"-l,--log-output" help:"Output will be stored in a log file(cpa.log) in addition to stdout." default:"false"`
TerminateBenchmark string `arg:"-k,--terminate-benchmark" help:"When CPA is running in parallel with benchmark job, let CPA know to kill benchmark if any query fail. (E.g. -k <processID>) Helpful to preserve cluster for further analysis." default:""`
Verbose bool `arg:"-v,--verbose" help:"When this mode is enabled, output will contain much more information about each query."`
}
arg.MustParse(&args)

o.RegisterFailHandler(g.Fail)
if args.LogOutput {
f, err := os.OpenFile("cpa.log", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
f, err := os.OpenFile("cpa_"+time.Now().Format("2006-01-02_15:04:05")+".log", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
multiWriter := io.MultiWriter(os.Stdout, f)
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -111,7 +112,7 @@ func main() {
go func(c chan string) {
for i := 1; ; i++ {
log.Printf("\n%[2]s\nIteration no. %[1]d\n%[2]s\n", i, strings.Repeat("~", 80))
analyze.Queries(queryList, oc, url, bearerToken, c, tb, args.TerminateBenchmark)
analyze.Queries(queryList, oc, url, bearerToken, c, tb, args.TerminateBenchmark, args.Verbose)
time.Sleep(args.QueryFrequency)
if !args.NoClrscr {
log.Print("\033[H\033[2J") // clears screen before printing next iteration
Expand Down

0 comments on commit 0382386

Please sign in to comment.