Skip to content

Commit

Permalink
AWS scraper
Browse files Browse the repository at this point in the history
  • Loading branch information
bhogayatakb committed May 24, 2024
1 parent 1597d5c commit 0645d35
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 59 deletions.
12 changes: 12 additions & 0 deletions Dockerfiles/DockerfileKubeLocal
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM golang:1.21
WORKDIR /app
RUN apt-get update && apt-get install -y ca-certificates openssl
RUN update-ca-certificates
COPY /build/mw-kube-agent /usr/bin/mw-agent
COPY configyamls-k8s/otel-config.yaml /app/otel-config.yaml
COPY configyamls-k8s/otel-config-nodocker.yaml /app/otel-config-nodocker.yaml
COPY configyamls-k8s/otel-config-daemonset.yaml /app/otel-config-daemonset.yaml
COPY configyamls-k8s/otel-config-deployment.yaml /app/otel-config-deployment.yaml
# A symlink to support existing k8s agent users
RUN ln -s /usr/bin/mw-agent /usr/bin/api-server
CMD ["mw-agent", "start"]
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,17 @@ This agent currently supports [Linux & Windows](cmd/host-agent/) & [Kubernetes](

`mw-agent` can take configuration from environment variables, CLI flags or configuration file. Details of how to configure the agent can be found [here](docs).


----

https://test-keval.free.beeceptor.com/aws-jobs

helm install --set mw.target=https://4plo493.middleware.io:443 --set mw.apiKey=evaddjfmazsdz8qip2cxva99muxv30wq6g6c --set clusterMetadata.name=minikube-keval --wait mw-aws-data-scraper -n mw-agent-ns --create-namespace

helm upgrade --install --set mw.target=https://4plo493.middleware.io:443 --set mw.apiKey=evaddjfmazsdz8qip2cxva99muxv30wq6g6c --set clusterMetadata.name=minikube-keval --wait my-mw-aws-data-scraper mw-aws-data-scraper -n mw-agent-ns

minikube addons enable metrics-server

https://linear.app/middleware/issue/ENG-2933/create-a-server-side-component-to-do-aws-metrics-scrapping

minikube image load ghcr.io/middleware-labs/mw-kube-agent:awscloudwatchmetricsreceiver-keval
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor v0.84.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor v0.84.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/apachereceiver v0.84.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscloudwatchmetricsreceiver v0.84.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsecscontainermetricsreceiver v0.84.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/dockerstatsreceiver v0.84.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver v0.84.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedete
github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor v0.84.0/go.mod h1:CXioiwzuR9YaxCtT+bobWx3aeNd6KTlFpIWlnNpfriI=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor v0.84.0 h1:63ocfTOX403x0RlOIxiqoIxfhlnz5bYNjKFinaMUAFc=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor v0.84.0/go.mod h1:mcxeCwBWxyIFEOfrHczRwmWknNU/mr7X/p1SKz5oKc8=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscloudwatchmetricsreceiver v0.84.0 h1:+hBhKVUNe8hWgQcvkw/feHEHphvPKvtpdr5KYN9R/v8=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscloudwatchmetricsreceiver v0.84.0/go.mod h1:u7HeCy6ykHxhj4mIGttoMKmWb6Jj2s0IwoQb0oU1pnI=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver v0.84.0 h1:4Rplr/R6Z9E/e6/F2jG4c4E7Vy9dTC576wi8cuVDG8M=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver v0.84.0/go.mod h1:RPq12Un/5T3BUNQXO2445cJtFW8mTOjKY81+7wdWFEM=
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filelogreceiver v0.84.0 h1:Nv9JywcY7nBSZ3tIw/SjW0GUlgy/SeS05CIdRg0GqWk=
Expand Down
35 changes: 35 additions & 0 deletions otel-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
exporters:
logging:
loglevel: debug
otlp/2:
endpoint: ${MW_TARGET}
headers:
authorization: ${MW_API_KEY}
sending_queue:
enabled: true
num_consumers: 100
queue_size: 10000

receivers:
awscloudwatchmetrics/378448125786_us-east-1_AWS-EC2:
region: us-east-1
profile: "default"
imds_endpoint: ""
poll_interval: "5m"
metrics:
named:
- namespace: "AWS/EC2"
metric_name: "CPUUtilization"
period: "5m"
aws_aggregation: "Sum"

service:
pipelines:
metrics:
exporters:
- logging
receivers:
- awscloudwatchmetrics/378448125786_us-east-1_ec2
telemetry:
logs:
level: debug
3 changes: 3 additions & 0 deletions pkg/agent/hostagent_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package agent

import (
"context"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/apachereceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscloudwatchmetricsreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsecscontainermetricsreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/dockerstatsreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver"
Expand Down Expand Up @@ -69,6 +71,7 @@ func (c *HostAgent) GetFactories(_ context.Context) (otelcol.Factories, error) {
jmxreceiver.NewFactory(),
apachereceiver.NewFactory(),
oracledbreceiver.NewFactory(),
awscloudwatchmetricsreceiver.NewFactory(),
}

// if the host agent is running on ECS EC2, add
Expand Down
141 changes: 82 additions & 59 deletions pkg/agent/kubeagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"time"

yaml "gopkg.in/yaml.v2"
autoscalingv1 "k8s.io/api/autoscaling/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
Expand Down Expand Up @@ -41,10 +45,22 @@ import (
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"go.uber.org/zap"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)

type Response struct {
Data []AccountData `json:"data"`
}

type AccountData struct {
AccountID string `json:"accountId"`
Region string `json:"region"`
Namespace string `json:"namespace"`
}

const Timestamp string = "timestamp"

// KubeAgent implements Agent interface for Kubernetes
Expand Down Expand Up @@ -184,70 +200,77 @@ func (k *KubeAgent) GetFactories(_ context.Context) (otelcol.Factories, error) {
// agent on the Middleware backend and restarts the agent if configuration
// has changed.
func (c *KubeAgentMonitor) ListenForKubeOtelConfigChanges(ctx context.Context) error {
err := c.callRestartStatusAPI(ctx)
if err != nil {
c.logger.Info("error restarting agent on config change",
zap.Error(err))
}
fmt.Println("Listening...")
c.callRestartStatusAPI(ctx)
return nil
}

// callRestartStatusAPI checks if there is an update in the otel-config at Middleware Backend
// For a particular account
func (c *KubeAgentMonitor) callRestartStatusAPI(ctx context.Context) error {

u, err := url.Parse(c.APIURLForConfigCheck)
if err != nil {
return err
}

baseURL := u.JoinPath(apiPathForRestart)
baseURL = baseURL.JoinPath(c.APIKey)
params := url.Values{}
params.Add("platform", "k8s")
params.Add("host_id", c.ClusterName)
params.Add("cluster", c.ClusterName)
params.Add("agent_version", c.Version)

// Add Query Parameters to the URL
baseURL.RawQuery = params.Encode() // Escape Query Parameters

resp, err := http.Get(baseURL.String())
if err != nil {
c.logger.Error("failed to call Restart-API", zap.String("url", baseURL.String()), zap.Error(err))
return err
}

defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
c.logger.Error("failed to call Restart-API", zap.Int("code", resp.StatusCode))
return ErrRestartStatusAPINotOK
}

var apiResponse apiResponseForRestart
if err := json.NewDecoder(resp.Body).Decode(&apiResponse); err != nil {
c.logger.Error("failed unmarshal Restart-API response", zap.Error(err))
return err
}

if apiResponse.Rollout.Daemonset {
c.logger.Info("restarting mw-agent")
if err := c.restartKubeAgent(ctx, DaemonSet); err != nil {
c.logger.Error("error restarting mw-agent daemonset", zap.Error(err))
return err
}
}

if apiResponse.Rollout.Deployment {
c.logger.Info("restarting mw-agent")
if err := c.restartKubeAgent(ctx, Deployment); err != nil {
c.logger.Error("error restarting mw-agent deployment", zap.Error(err))
return err
}
}

return err
func (c *KubeAgentMonitor) callRestartStatusAPI(ctx context.Context) {
fmt.Println("Restarting...")
// Create a shared informer factory for HPA resources
factory := informers.NewSharedInformerFactory(c.Clientset, time.Minute)
hpaInformer := factory.Autoscaling().V1().HorizontalPodAutoscalers().Informer()

stopCh := make(chan struct{})
defer close(stopCh)

hpaInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
oldHPA := oldObj.(*autoscalingv1.HorizontalPodAutoscaler)
newHPA := newObj.(*autoscalingv1.HorizontalPodAutoscaler)
if oldHPA.Status.CurrentReplicas != newHPA.Status.CurrentReplicas {
fmt.Printf("HPA updated: %s/%s, replicas: %d -> %d\n", newHPA.Namespace, newHPA.Name, oldHPA.Status.CurrentReplicas, newHPA.Status.CurrentReplicas)
// Add your custom code here
// RunSomeCode()

resp, err := http.Get(os.Getenv("MW_AWS_JOBS_LIST_URL"))
if err != nil {
fmt.Printf("Error making GET request: %v\n", err)
return
}
defer resp.Body.Close()

// Read the response body
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Printf("Error reading response body: %v\n", err)
return
}

// Parse the JSON response into the custom struct
var response Response
err = json.Unmarshal(body, &response)
if err != nil {
fmt.Printf("Error unmarshalling JSON response: %v\n", err)
return
}

// Print the parsed data
for _, data := range response.Data {
fmt.Printf("AccountID: %s, Region: %s, Namespace: %s\n", data.AccountID, data.Region, data.Namespace)
}

}
},
})

// Start the informer
factory.Start(stopCh)
factory.WaitForCacheSync(stopCh)

// Block forever
<-stopCh
// if apiResponse.Rollout.Deployment {
// c.logger.Info("restarting mw-agent")
// if err := c.restartKubeAgent(ctx, Deployment); err != nil {
// c.logger.Error("error restarting mw-agent deployment", zap.Error(err))
// return err
// }
// }

// return err
}

// restartKubeAgent rewrites the configmaps and rollout restarts agent's data scraping components
Expand Down

0 comments on commit 0645d35

Please sign in to comment.