diff --git a/Dockerfiles/DockerfileKubeLocal b/Dockerfiles/DockerfileKubeLocal new file mode 100644 index 00000000..2e785a1a --- /dev/null +++ b/Dockerfiles/DockerfileKubeLocal @@ -0,0 +1,12 @@ +FROM golang:1.21 +WORKDIR /app +RUN apt-get update && apt-get install -y ca-certificates openssl +RUN update-ca-certificates +COPY /build/mw-kube-agent /usr/bin/mw-agent +COPY configyamls-k8s/otel-config.yaml /app/otel-config.yaml +COPY configyamls-k8s/otel-config-nodocker.yaml /app/otel-config-nodocker.yaml +COPY configyamls-k8s/otel-config-daemonset.yaml /app/otel-config-daemonset.yaml +COPY configyamls-k8s/otel-config-deployment.yaml /app/otel-config-deployment.yaml +# A symlink to support existing k8s agent users +RUN ln -s /usr/bin/mw-agent /usr/bin/api-server +CMD ["mw-agent", "start"] \ No newline at end of file diff --git a/README.md b/README.md index c2a1a72d..2f5eb145 100644 --- a/README.md +++ b/README.md @@ -13,3 +13,17 @@ This agent currently supports [Linux & Windows](cmd/host-agent/) & [Kubernetes]( `mw-agent` can take configuration from environment variables, CLI flags or configuration file. Details of how to configure the agent can be found [here](docs). + +---- + +https://test-keval.free.beeceptor.com/aws-jobs + + helm install --set mw.target=https://4plo493.middleware.io:443 --set mw.apiKey=evaddjfmazsdz8qip2cxva99muxv30wq6g6c --set clusterMetadata.name=minikube-keval --wait mw-aws-data-scraper -n mw-agent-ns --create-namespace + +helm upgrade --install --set mw.target=https://4plo493.middleware.io:443 --set mw.apiKey=evaddjfmazsdz8qip2cxva99muxv30wq6g6c --set clusterMetadata.name=minikube-keval --wait my-mw-aws-data-scraper mw-aws-data-scraper -n mw-agent-ns + + minikube addons enable metrics-server + + https://linear.app/middleware/issue/ENG-2933/create-a-server-side-component-to-do-aws-metrics-scrapping + + minikube image load ghcr.io/middleware-labs/mw-kube-agent:awscloudwatchmetricsreceiver-keval \ No newline at end of file diff --git a/go.mod b/go.mod index 83f533cf..a466b4c1 100644 --- a/go.mod +++ b/go.mod @@ -64,6 +64,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor v0.84.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor v0.84.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/apachereceiver v0.84.0 + github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscloudwatchmetricsreceiver v0.84.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsecscontainermetricsreceiver v0.84.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/dockerstatsreceiver v0.84.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver v0.84.0 diff --git a/go.sum b/go.sum index 4adeeada..f2fd7bc3 100644 --- a/go.sum +++ b/go.sum @@ -754,6 +754,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedete github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor v0.84.0/go.mod h1:CXioiwzuR9YaxCtT+bobWx3aeNd6KTlFpIWlnNpfriI= github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor v0.84.0 h1:63ocfTOX403x0RlOIxiqoIxfhlnz5bYNjKFinaMUAFc= github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor v0.84.0/go.mod h1:mcxeCwBWxyIFEOfrHczRwmWknNU/mr7X/p1SKz5oKc8= +github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscloudwatchmetricsreceiver v0.84.0 h1:+hBhKVUNe8hWgQcvkw/feHEHphvPKvtpdr5KYN9R/v8= +github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscloudwatchmetricsreceiver v0.84.0/go.mod h1:u7HeCy6ykHxhj4mIGttoMKmWb6Jj2s0IwoQb0oU1pnI= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver v0.84.0 h1:4Rplr/R6Z9E/e6/F2jG4c4E7Vy9dTC576wi8cuVDG8M= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver v0.84.0/go.mod h1:RPq12Un/5T3BUNQXO2445cJtFW8mTOjKY81+7wdWFEM= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filelogreceiver v0.84.0 h1:Nv9JywcY7nBSZ3tIw/SjW0GUlgy/SeS05CIdRg0GqWk= diff --git a/otel-config.yaml b/otel-config.yaml new file mode 100644 index 00000000..4fdf7bce --- /dev/null +++ b/otel-config.yaml @@ -0,0 +1,35 @@ +exporters: + logging: + loglevel: debug + otlp/2: + endpoint: ${MW_TARGET} + headers: + authorization: ${MW_API_KEY} + sending_queue: + enabled: true + num_consumers: 100 + queue_size: 10000 + +receivers: + awscloudwatchmetrics/378448125786_us-east-1_AWS-EC2: + region: us-east-1 + profile: "default" + imds_endpoint: "" + poll_interval: "5m" + metrics: + named: + - namespace: "AWS/EC2" + metric_name: "CPUUtilization" + period: "5m" + aws_aggregation: "Sum" + +service: + pipelines: + metrics: + exporters: + - logging + receivers: + - awscloudwatchmetrics/378448125786_us-east-1_ec2 + telemetry: + logs: + level: debug diff --git a/pkg/agent/hostagent_linux.go b/pkg/agent/hostagent_linux.go index 0ffed24a..ff9d4780 100644 --- a/pkg/agent/hostagent_linux.go +++ b/pkg/agent/hostagent_linux.go @@ -2,6 +2,7 @@ package agent import ( "context" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension" @@ -13,6 +14,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/apachereceiver" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscloudwatchmetricsreceiver" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsecscontainermetricsreceiver" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/dockerstatsreceiver" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver" @@ -69,6 +71,7 @@ func (c *HostAgent) GetFactories(_ context.Context) (otelcol.Factories, error) { jmxreceiver.NewFactory(), apachereceiver.NewFactory(), oracledbreceiver.NewFactory(), + awscloudwatchmetricsreceiver.NewFactory(), } // if the host agent is running on ECS EC2, add diff --git a/pkg/agent/kubeagent.go b/pkg/agent/kubeagent.go index ed1851be..aad1a63c 100644 --- a/pkg/agent/kubeagent.go +++ b/pkg/agent/kubeagent.go @@ -5,10 +5,14 @@ import ( "encoding/json" "fmt" "io" + "io/ioutil" "net/http" "net/url" + "os" + "time" yaml "gopkg.in/yaml.v2" + autoscalingv1 "k8s.io/api/autoscaling/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" @@ -41,10 +45,22 @@ import ( "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/otlpreceiver" "go.uber.org/zap" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" ) +type Response struct { + Data []AccountData `json:"data"` +} + +type AccountData struct { + AccountID string `json:"accountId"` + Region string `json:"region"` + Namespace string `json:"namespace"` +} + const Timestamp string = "timestamp" // KubeAgent implements Agent interface for Kubernetes @@ -184,70 +200,77 @@ func (k *KubeAgent) GetFactories(_ context.Context) (otelcol.Factories, error) { // agent on the Middleware backend and restarts the agent if configuration // has changed. func (c *KubeAgentMonitor) ListenForKubeOtelConfigChanges(ctx context.Context) error { - err := c.callRestartStatusAPI(ctx) - if err != nil { - c.logger.Info("error restarting agent on config change", - zap.Error(err)) - } + fmt.Println("Listening...") + c.callRestartStatusAPI(ctx) return nil } // callRestartStatusAPI checks if there is an update in the otel-config at Middleware Backend // For a particular account -func (c *KubeAgentMonitor) callRestartStatusAPI(ctx context.Context) error { - - u, err := url.Parse(c.APIURLForConfigCheck) - if err != nil { - return err - } - - baseURL := u.JoinPath(apiPathForRestart) - baseURL = baseURL.JoinPath(c.APIKey) - params := url.Values{} - params.Add("platform", "k8s") - params.Add("host_id", c.ClusterName) - params.Add("cluster", c.ClusterName) - params.Add("agent_version", c.Version) - - // Add Query Parameters to the URL - baseURL.RawQuery = params.Encode() // Escape Query Parameters - - resp, err := http.Get(baseURL.String()) - if err != nil { - c.logger.Error("failed to call Restart-API", zap.String("url", baseURL.String()), zap.Error(err)) - return err - } - - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - c.logger.Error("failed to call Restart-API", zap.Int("code", resp.StatusCode)) - return ErrRestartStatusAPINotOK - } - - var apiResponse apiResponseForRestart - if err := json.NewDecoder(resp.Body).Decode(&apiResponse); err != nil { - c.logger.Error("failed unmarshal Restart-API response", zap.Error(err)) - return err - } - - if apiResponse.Rollout.Daemonset { - c.logger.Info("restarting mw-agent") - if err := c.restartKubeAgent(ctx, DaemonSet); err != nil { - c.logger.Error("error restarting mw-agent daemonset", zap.Error(err)) - return err - } - } - - if apiResponse.Rollout.Deployment { - c.logger.Info("restarting mw-agent") - if err := c.restartKubeAgent(ctx, Deployment); err != nil { - c.logger.Error("error restarting mw-agent deployment", zap.Error(err)) - return err - } - } - - return err +func (c *KubeAgentMonitor) callRestartStatusAPI(ctx context.Context) { + fmt.Println("Restarting...") + // Create a shared informer factory for HPA resources + factory := informers.NewSharedInformerFactory(c.Clientset, time.Minute) + hpaInformer := factory.Autoscaling().V1().HorizontalPodAutoscalers().Informer() + + stopCh := make(chan struct{}) + defer close(stopCh) + + hpaInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(oldObj, newObj interface{}) { + oldHPA := oldObj.(*autoscalingv1.HorizontalPodAutoscaler) + newHPA := newObj.(*autoscalingv1.HorizontalPodAutoscaler) + if oldHPA.Status.CurrentReplicas != newHPA.Status.CurrentReplicas { + fmt.Printf("HPA updated: %s/%s, replicas: %d -> %d\n", newHPA.Namespace, newHPA.Name, oldHPA.Status.CurrentReplicas, newHPA.Status.CurrentReplicas) + // Add your custom code here + // RunSomeCode() + + resp, err := http.Get(os.Getenv("MW_AWS_JOBS_LIST_URL")) + if err != nil { + fmt.Printf("Error making GET request: %v\n", err) + return + } + defer resp.Body.Close() + + // Read the response body + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Printf("Error reading response body: %v\n", err) + return + } + + // Parse the JSON response into the custom struct + var response Response + err = json.Unmarshal(body, &response) + if err != nil { + fmt.Printf("Error unmarshalling JSON response: %v\n", err) + return + } + + // Print the parsed data + for _, data := range response.Data { + fmt.Printf("AccountID: %s, Region: %s, Namespace: %s\n", data.AccountID, data.Region, data.Namespace) + } + + } + }, + }) + + // Start the informer + factory.Start(stopCh) + factory.WaitForCacheSync(stopCh) + + // Block forever + <-stopCh + // if apiResponse.Rollout.Deployment { + // c.logger.Info("restarting mw-agent") + // if err := c.restartKubeAgent(ctx, Deployment); err != nil { + // c.logger.Error("error restarting mw-agent deployment", zap.Error(err)) + // return err + // } + // } + + // return err } // restartKubeAgent rewrites the configmaps and rollout restarts agent's data scraping components