diff --git a/tests/soak/Dockerfile b/tests/soak/Dockerfile new file mode 100644 index 000000000..eb89ebb0e --- /dev/null +++ b/tests/soak/Dockerfile @@ -0,0 +1,12 @@ +FROM debian:bookworm +ARG LK_VERSION +ARG CKPY_VERSION +RUN test -n "${LK_VERSION}" || (echo "LK_VERSION env variable required" && exit 1) +RUN test -n "${CKPY_VERSION}" || (echo "CKPY_VERSION env variable required" && exit 1) +ENV DEBIAN_FRONTEND=noninteractive +RUN apt update && apt install -y sudo +RUN mkdir -p /soaktests +COPY bootstrap.sh /soaktests +WORKDIR /soaktests +RUN /soaktests/bootstrap.sh ${CKPY_VERSION} ${LK_VERSION} +ENTRYPOINT [ "/soaktests/confluent-kafka-python/tests/soak/run.sh" ] diff --git a/tests/soak/README.md b/tests/soak/README.md index a26a2fc45..29adebc2e 100644 --- a/tests/soak/README.md +++ b/tests/soak/README.md @@ -6,26 +6,14 @@ of time, typically 2+ weeks, to vet out any resource leaks, etc. The soak testing client is made up of a producer, producing messages to the configured topic, and a consumer, consuming the same messages back. -DataDog reporting supported by setting datadog.api_key a and datadog.app_key -in the soak client configuration file. +OpenTelemetry reporting supported through OTLP. +# Installation -There are some convenience script to get you started. - -On the host (ec2) where you aim to run the soaktest, do: - -$ git clone https://github.com/confluentinc/librdkafka -$ git clone https://github.com/confluentinc/confluent-kafka-python - -# Build librdkafka and python -$ ~/confluent-kafka-python/tests/soak/build.sh - -# Set up config: -$ cp ~/confluent-kafka-python/tests/soak/ccloud.config.example ~/confluent-kafka-python/ccloud.config - -# Start a screen session -$ screen bash - -# Within the screen session, run the soak client -(screen)$ ~/run.sh -(screen)$ Ctrl-A d # to detach +TESTID=normal \ +LK_VERSION=v2.2.0 \ +CKPY_VERSION=v2.2.0 \ +CC_BOOSTRAP_SERVERS=_ \ +CC_USERNAME=_ \ +CC_PASSWORD=_ \ +DOCKER_REPOSITORY=_ ./install.sh diff --git a/tests/soak/ubuntu-bootstrap.sh b/tests/soak/bootstrap.sh similarity index 80% rename from tests/soak/ubuntu-bootstrap.sh rename to tests/soak/bootstrap.sh index 260df926f..e110fabeb 100755 --- a/tests/soak/ubuntu-bootstrap.sh +++ b/tests/soak/bootstrap.sh @@ -16,12 +16,11 @@ fi python_branch=$1 librdkafka_branch=$2 +venv=$PWD/venv sudo apt update -sudo apt install -y make gcc g++ zlib1g-dev libssl-dev libzstd-dev screen \ - python3.6-dev python3-pip python3-virtualenv - -pushd $HOME +sudo apt install -y git curl make gcc g++ zlib1g-dev libssl-dev libzstd-dev \ + python3-dev python3-pip python3-venv if [[ ! -d confluent-kafka-python ]]; then git clone https://github.com/confluentinc/confluent-kafka-python @@ -33,14 +32,14 @@ git checkout $python_branch echo "Installing librdkafka $librdkafka_branch" tools/bootstrap-librdkafka.sh --require-ssl $librdkafka_branch /usr +rm -rf tmp-build -echo "Installing interceptors" -tools/install-interceptors.sh +# echo "Installing interceptors" +# tools/install-interceptors.sh -venv=$HOME/venv echo "Setting up virtualenv in $venv" if [[ ! -d $venv ]]; then - virtualenv -p python3.6 $venv + python3 -m venv $venv fi source $venv/bin/activate @@ -57,8 +56,6 @@ python -c "import confluent_kafka; print(confluent_kafka.version(), confluent_ka deactivate -popd # $HOME - echo "All done, activate the virtualenv in $venv before running the client:" echo "source $venv/bin/activate" diff --git a/tests/soak/ccloud.config.example b/tests/soak/ccloud.config.example deleted file mode 100644 index 328642a22..000000000 --- a/tests/soak/ccloud.config.example +++ /dev/null @@ -1,14 +0,0 @@ -bootstrap.servers= -sasl.mechanisms=PLAIN -security.protocol=SASL_SSL -sasl.username= -sasl.password= -enable.idempotence=true -debug=eos,generic,broker,security,consumer -linger.ms=2 -compression.type=lz4 -# DataDog options/config -datadog.api_key= -datadog.app_key= - - diff --git a/tests/soak/install.sh b/tests/soak/install.sh new file mode 100755 index 000000000..0bd5b7b8a --- /dev/null +++ b/tests/soak/install.sh @@ -0,0 +1,48 @@ +#!/bin/bash +set -e + +DOCKER_REPOSITORY_DEFAULT=${DOCKER_REPOSITORY:-docker.io/library/njc-py-soak-tests} +NAMESPACE=njc-soak-tests +NOCACHE=${NOCACHE:---no-cache} + +for var in LK_VERSION CKPY_VERSION CC_BOOSTRAP_SERVERS CC_USERNAME CC_PASSWORD TESTID \ +DOCKER_REPOSITORY_DEFAULT; do + VAR_VALUE=$(eval echo \$$var) + if [ -z "$VAR_VALUE" ]; then + echo "env variable $var is required" + exit 1 + fi +done + +TAG=${LK_VERSION}-${CKPY_VERSION} + +COMMAND="docker build . $NOCACHE --build-arg LK_VERSION=${LK_VERSION} \ +--build-arg CKPY_VERSION=${CKPY_VERSION} \ +-t ${DOCKER_REPOSITORY_DEFAULT}:${TAG}" +echo $COMMAND +$COMMAND + +if [ ! -z "$DOCKER_REPOSITORY" ]; then + COMMAND="docker push ${DOCKER_REPOSITORY}:${TAG}" + echo $COMMAND + $COMMAND +fi + +if [ "$(uname -p)" = "x86_64" ]; then + NODE_ARCH="amd64" +else + NODE_ARCH="arm64" +fi + +COMMAND="helm upgrade --install njc-py-soak-tests-${TESTID} ./njc-py-soak-tests \ +--set "cluster.bootstrapServers=${CC_BOOSTRAP_SERVERS}" \ +--set "cluster.username=${CC_USERNAME}" \ +--set "cluster.password=${CC_PASSWORD}" \ +--set "image.repository=${DOCKER_REPOSITORY_DEFAULT}" \ +--set "testid=${TESTID}" \ +--set "fullnameOverride=njc-py-soak-tests-${TESTID}" \ +--set "image.tag=${TAG}" \ +--set "nodeSelector.kubernetes\\.io/arch=${NODE_ARCH}" \ +--namespace "${NAMESPACE}" --create-namespace" +echo $COMMAND +$COMMAND diff --git a/tests/soak/njc-py-soak-tests/.helmignore b/tests/soak/njc-py-soak-tests/.helmignore new file mode 100644 index 000000000..0e8a0eb36 --- /dev/null +++ b/tests/soak/njc-py-soak-tests/.helmignore @@ -0,0 +1,23 @@ +# Patterns to ignore when building packages. +# This supports shell glob matching, relative path matching, and +# negation (prefixed with !). Only one pattern per line. +.DS_Store +# Common VCS dirs +.git/ +.gitignore +.bzr/ +.bzrignore +.hg/ +.hgignore +.svn/ +# Common backup files +*.swp +*.bak +*.tmp +*.orig +*~ +# Various IDEs +.project +.idea/ +*.tmproj +.vscode/ diff --git a/tests/soak/njc-py-soak-tests/Chart.yaml b/tests/soak/njc-py-soak-tests/Chart.yaml new file mode 100644 index 000000000..71e9d4248 --- /dev/null +++ b/tests/soak/njc-py-soak-tests/Chart.yaml @@ -0,0 +1,24 @@ +apiVersion: v2 +name: njc-py-soak-tests +description: A Helm chart for Kubernetes + +# A chart can be either an 'application' or a 'library' chart. +# +# Application charts are a collection of templates that can be packaged into versioned archives +# to be deployed. +# +# Library charts provide useful utilities or functions for the chart developer. They're included as +# a dependency of application charts to inject those utilities and functions into the rendering +# pipeline. Library charts do not define any templates and therefore cannot be deployed. +type: application + +# This is the chart version. This version number should be incremented each time you make changes +# to the chart and its templates, including the app version. +# Versions are expected to follow Semantic Versioning (https://semver.org/) +version: 0.1.0 + +# This is the version number of the application being deployed. This version number should be +# incremented each time you make changes to the application. Versions are not expected to +# follow Semantic Versioning. They should reflect the version the application is using. +# It is recommended to use it with quotes. +appVersion: "1.0.0" diff --git a/tests/soak/njc-py-soak-tests/templates/NOTES.txt b/tests/soak/njc-py-soak-tests/templates/NOTES.txt new file mode 100644 index 000000000..fb58cd2ce --- /dev/null +++ b/tests/soak/njc-py-soak-tests/templates/NOTES.txt @@ -0,0 +1 @@ +NJC Python soak tests installed! diff --git a/tests/soak/njc-py-soak-tests/templates/_helpers.tpl b/tests/soak/njc-py-soak-tests/templates/_helpers.tpl new file mode 100644 index 000000000..e1b849e97 --- /dev/null +++ b/tests/soak/njc-py-soak-tests/templates/_helpers.tpl @@ -0,0 +1,62 @@ +{{/* +Expand the name of the chart. +*/}} +{{- define "njc-py-soak-tests.name" -}} +{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Create a default fully qualified app name. +We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec). +If release name contains chart name it will be used as a full name. +*/}} +{{- define "njc-py-soak-tests.fullname" -}} +{{- if .Values.fullnameOverride }} +{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- $name := default .Chart.Name .Values.nameOverride }} +{{- if contains $name .Release.Name }} +{{- .Release.Name | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }} +{{- end }} +{{- end }} +{{- end }} + +{{/* +Create chart name and version as used by the chart label. +*/}} +{{- define "njc-py-soak-tests.chart" -}} +{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Common labels +*/}} +{{- define "njc-py-soak-tests.labels" -}} +helm.sh/chart: {{ include "njc-py-soak-tests.chart" . }} +{{ include "njc-py-soak-tests.selectorLabels" . }} +{{- if .Chart.AppVersion }} +app.kubernetes.io/version: {{ .Chart.AppVersion | quote }} +{{- end }} +app.kubernetes.io/managed-by: {{ .Release.Service }} +{{- end }} + +{{/* +Selector labels +*/}} +{{- define "njc-py-soak-tests.selectorLabels" -}} +app.kubernetes.io/name: {{ include "njc-py-soak-tests.name" . }} +app.kubernetes.io/instance: {{ .Release.Name }} +{{- end }} + +{{/* +Create the name of the service account to use +*/}} +{{- define "njc-py-soak-tests.serviceAccountName" -}} +{{- if .Values.serviceAccount.create }} +{{- default (include "njc-py-soak-tests.fullname" .) .Values.serviceAccount.name }} +{{- else }} +{{- default "default" .Values.serviceAccount.name }} +{{- end }} +{{- end }} diff --git a/tests/soak/njc-py-soak-tests/templates/deployment.yaml b/tests/soak/njc-py-soak-tests/templates/deployment.yaml new file mode 100644 index 000000000..837aab835 --- /dev/null +++ b/tests/soak/njc-py-soak-tests/templates/deployment.yaml @@ -0,0 +1,87 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "njc-py-soak-tests.fullname" . }} + labels: + {{- include "njc-py-soak-tests.labels" . | nindent 4 }} +spec: + replicas: {{ .Values.replicaCount }} + selector: + matchLabels: + {{- include "njc-py-soak-tests.selectorLabels" . | nindent 6 }} + strategy: + type: RollingUpdate + rollingUpdate: + maxSurge: 0 + maxUnavailable: 1 + template: + metadata: + {{- with .Values.podAnnotations }} + annotations: + {{- toYaml . | nindent 8 }} + {{- end }} + labels: + {{- include "njc-py-soak-tests.selectorLabels" . | nindent 8 }} + spec: + {{- with .Values.imagePullSecrets }} + imagePullSecrets: + {{- toYaml . | nindent 8 }} + {{- end }} + serviceAccountName: {{ include "njc-py-soak-tests.serviceAccountName" . }} + securityContext: + {{- toYaml .Values.podSecurityContext | nindent 8 }} + volumes: + - name: secret + secret: + secretName: {{ include "njc-py-soak-tests.fullname" $ }}-secret + containers: + - name: {{ .Chart.Name }} + securityContext: + {{- toYaml .Values.securityContext | nindent 12 }} + image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + env: + - name: NODEIP + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: status.hostIP + - name: OTEL_METRICS_EXPORTER + value: "otlp" + - name: OTEL_RESOURCE_ATTRIBUTES + value: "service.name={{ include "njc-py-soak-tests.fullname" . }},service.version={{ .Values.image.tag | default .Chart.AppVersion }}" + - name: OTEL_TRACES_EXPORTER + value: "none" + - name: OTEL_EXPORTER_OTLP_ENDPOINT + value: "http://$(NODEIP):14317" + - name: OTEL_METRIC_EXPORT_INTERVAL + value: "10000" + - name: TESTID + value: "{{ .Values.testid }}" + volumeMounts: + - name: "secret" + mountPath: "/soaktests/confluent-kafka-python/ccloud.config" + subPath: ccloud.config + readOnly: true + # livenessProbe: + # httpGet: + # path: / + # port: http + # readinessProbe: + # httpGet: + # path: / + # port: http + resources: + {{- toYaml .Values.resources | nindent 12 }} + {{- with .Values.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} diff --git a/tests/soak/njc-py-soak-tests/templates/secrets.yaml b/tests/soak/njc-py-soak-tests/templates/secrets.yaml new file mode 100644 index 000000000..16eefd49b --- /dev/null +++ b/tests/soak/njc-py-soak-tests/templates/secrets.yaml @@ -0,0 +1,35 @@ +{{- range $nameSuffix, $values := .Values.secrets }} +--- +apiVersion: v1 +kind: Secret +metadata: + name: {{ include "njc-py-soak-tests.fullname" $ }}-{{ $nameSuffix }} + {{- with $values.annotations }} + annotations: + {{- range $key, $value := . }} + {{- printf "%s: %s" $key (tpl $value $ | quote) | nindent 4 }} + {{- end }} + {{- end }} + labels: + {{- range $key, $value := $values.labels }} + {{- printf "%s: %s" $key (tpl $value $ | quote) | nindent 4 }} + {{- end }} +type: {{ default "Opaque" $values.type }} +{{- with $values.data }} +data: + {{- toYaml . | nindent 2 }} +{{- end }} +stringData: + ccloud.config: |- + bootstrap.servers={{ $.Values.cluster.bootstrapServers }} + sasl.mechanisms=PLAIN + security.protocol=SASL_SSL + sasl.username={{ $.Values.cluster.username }} + sasl.password={{ $.Values.cluster.password }} + {{- $.Values.properties | nindent 4 -}} +{{- with $values.stringData }} + {{- range $key, $value := . }} + {{- printf "%s: %s" $key (tpl $value $ | quote) | nindent 2 }} + {{- end }} +{{- end }} +{{- end -}} diff --git a/tests/soak/njc-py-soak-tests/templates/serviceaccount.yaml b/tests/soak/njc-py-soak-tests/templates/serviceaccount.yaml new file mode 100644 index 000000000..88e498c8a --- /dev/null +++ b/tests/soak/njc-py-soak-tests/templates/serviceaccount.yaml @@ -0,0 +1,12 @@ +{{- if .Values.serviceAccount.create -}} +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ include "njc-py-soak-tests.serviceAccountName" . }} + labels: + {{- include "njc-py-soak-tests.labels" . | nindent 4 }} + {{- with .Values.serviceAccount.annotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +{{- end }} diff --git a/tests/soak/njc-py-soak-tests/values.yaml b/tests/soak/njc-py-soak-tests/values.yaml new file mode 100644 index 000000000..0640bfc86 --- /dev/null +++ b/tests/soak/njc-py-soak-tests/values.yaml @@ -0,0 +1,66 @@ +# Default values for njc-py-soak-tests. +# This is a YAML-formatted file. +# Declare variables to be passed into your templates. + +replicaCount: 1 + +image: + # To use in minikube + repository: docker.io/library/njc-py-soak-tests + pullPolicy: Always + # Overrides the image tag whose default is the chart appVersion. + tag: "" + +imagePullSecrets: [] +nameOverride: "" +fullnameOverride: "njc-py-soak-tests" + +serviceAccount: + # Specifies whether a service account should be created + create: true + # Annotations to add to the service account + annotations: {} + # The name of the service account to use. + # If not set and create is true, a name is generated using the fullname template + name: "" + +podAnnotations: {} + +podSecurityContext: {} + # fsGroup: 2000 + +securityContext: {} + # capabilities: + # drop: + # - ALL + # readOnlyRootFilesystem: true + # runAsNonRoot: true + # runAsUser: 1000 + +resources: + limits: + cpu: 100m + memory: 512Mi + requests: + cpu: 100m + memory: 128Mi + +cluster: + bootstrapServers: "" + username: "" + password: "" + +properties: |- + enable.idempotence=true + debug=eos,generic,broker,security,consumer + linger.ms=2 + compression.type=lz4 + +secrets: + secret: {} + +nodeSelector: {} + +tolerations: [] + +affinity: {} diff --git a/tests/soak/requirements.txt b/tests/soak/requirements.txt index c7a209847..c34e8ede5 100644 --- a/tests/soak/requirements.txt +++ b/tests/soak/requirements.txt @@ -1,2 +1,3 @@ -datadog psutil +opentelemetry-distro +opentelemetry-exporter-otlp diff --git a/tests/soak/run.sh b/tests/soak/run.sh index bac5fa822..47826e590 100755 --- a/tests/soak/run.sh +++ b/tests/soak/run.sh @@ -11,19 +11,12 @@ if [[ -z $librdkafka_version ]]; then exit 1 fi -if [[ -z $STY ]]; then - echo "This script should be run from inside a screen session" - exit 1 -fi - set -u -topic="pysoak-$librdkafka_version" -logfile="${topic}.log.bz2" +topic="pysoak-$TESTID-$librdkafka_version" -echo "Starting soak client using topic $topic with logs written to $logfile" +echo "Starting soak client using topic $topic" set +x -time confluent-kafka-python/tests/soak/soakclient.py -t $topic -r 80 -f confluent-kafka-python/ccloud.config 2>&1 \ - | tee /dev/stderr | bzip2 > $logfile +time opentelemetry-instrument confluent-kafka-python/tests/soak/soakclient.py -i $TESTID -t $topic -r 80 -f confluent-kafka-python/ccloud.config 2>&1 ret=$? echo "Python client exited with status $ret" exit $ret diff --git a/tests/soak/soakclient.py b/tests/soak/soakclient.py index e7e914cee..453c7e5a5 100755 --- a/tests/soak/soakclient.py +++ b/tests/soak/soakclient.py @@ -20,7 +20,7 @@ # long term validation testing. # # Usage: -# tests/soak/soakclient.py -t -r -f +# tests/soak/soakclient.py -i -t -r -f # # A unique topic should be used for each soakclient instance. # @@ -30,6 +30,7 @@ from confluent_kafka.admin import AdminClient, NewTopic from collections import defaultdict from builtins import int +from opentelemetry import metrics import argparse import threading import time @@ -40,7 +41,6 @@ import resource import os import psutil -import datadog class SoakRecord (object): @@ -72,8 +72,8 @@ class SoakClient (object): The producer and consumer run in separate background threads. """ - # DataDog metric name prefix - DD_PFX = "kafka.client.soak.python." + # metric name prefix + METRIC_PFX = "kafka.client.soak.python." def dr_cb(self, err, msg): """ Producer delivery report callback """ @@ -81,17 +81,18 @@ def dr_cb(self, err, msg): self.logger.warning("producer: delivery failed: {} [{}]: {}". format(msg.topic(), msg.partition(), err)) self.dr_err_cnt += 1 - self.dd_incr("producer.drerr", 1) - self.dd.event("Message delivery failure", - "Message delivery failed: {} [{}]: {}". - format(msg.topic(), msg.partition(), err), - hostname=self.hostname) + self.incr_counter("producer.drerr", 1) + self.incr_counter("producer.delivery.failure", 1, { + "topic": msg.topic(), + "partition": str(msg.partition()), + "err": str(err) + }) else: self.dr_cnt += 1 - self.dd_incr("producer.drok", 1) - self.dd_gauge("producer.latency", msg.latency(), - tags=["partition:{}".format(msg.partition())]) + self.incr_counter("producer.drok", 1) + self.set_gauge("producer.latency", msg.latency(), + tags={"partition": "{}".format(msg.partition())}) if (self.dr_cnt % self.disprate) == 0: self.logger.debug("producer: delivered message to {} [{}] at offset {} in {}s".format( msg.topic(), msg.partition(), msg.offset(), msg.latency())) @@ -118,7 +119,7 @@ def produce_record(self): continue self.producer_msgid += 1 - self.dd_incr("producer.send", 1) + self.incr_counter("producer.send", 1) def producer_status(self): """ Print producer status """ @@ -218,7 +219,7 @@ def consumer_run(self): if msg.error() is not None: self.logger.error("consumer: error: {}".format(msg.error())) self.consumer_err_cnt += 1 - self.dd_incr("consumer.error", 1) + self.incr_counter("consumer.error", 1) continue try: @@ -229,18 +230,18 @@ def consumer_run(self): "{} [{}] at offset {} (headers {}): {}".format( msg.topic(), msg.partition(), msg.offset(), msg.headers(), ex)) self.msg_err_cnt += 1 - self.dd_incr("consumer.msgerr", 1) + self.incr_counter("consumer.msgerr", 1) self.msg_cnt += 1 - self.dd_incr("consumer.msg", 1) + self.incr_counter("consumer.msg", 1) # end-to-end latency headers = dict(msg.headers()) txtime = headers.get('time', None) if txtime is not None: latency = time.time() - float(txtime) - self.dd_gauge("consumer.e2e_latency", latency, - tags=["partition:{}".format(msg.partition())]) + self.set_gauge("consumer.e2e_latency", latency, + tags={"partition": "{}".format(msg.partition())}) else: latency = None @@ -265,7 +266,7 @@ def consumer_run(self): msg.offset(), msg.headers(), hw, self.last_committed)) self.msg_dup_cnt += (hw + 1) - msg.offset() - self.dd_incr("consumer.msgdup", 1) + self.incr_counter("consumer.msgdup", 1) elif msg.offset() > hw + 1: self.logger.warning("consumer: Lost messages, now at {} " "[{}] at offset {} (headers {}): " @@ -274,7 +275,7 @@ def consumer_run(self): msg.offset(), msg.headers(), hw, self.last_committed)) self.msg_miss_cnt += msg.offset() - (hw + 1) - self.dd_incr("consumer.missedmsg", 1) + self.incr_counter("consumer.missedmsg", 1) hwmarks[hwkey] = msg.offset() @@ -297,14 +298,14 @@ def consumer_error_cb(self, err): """ Consumer error callback """ self.logger.error("consumer: error_cb: {}".format(err)) self.consumer_error_cb_cnt += 1 - self.dd_incr("consumer.errorcb", 1) + self.incr_counter("consumer.errorcb", 1) def consumer_commit_cb(self, err, partitions): """ Auto commit result callback """ if err is not None: self.logger.error("consumer: offset commit failed for {}: {}".format(partitions, err)) self.consumer_err_cnt += 1 - self.dd_incr("consumer.error", 1) + self.incr_counter("consumer.error", 1) else: self.last_committed = partitions @@ -312,7 +313,7 @@ def producer_error_cb(self, err): """ Producer error callback """ self.logger.error("producer: error_cb: {}".format(err)) self.producer_error_cb_cnt += 1 - self.dd_incr("producer.errorcb", 1) + self.incr_counter("producer.errorcb", 1) def rtt_stats(self, d): """ Extract broker rtt statistics from the stats dict in @param d """ @@ -324,14 +325,14 @@ def rtt_stats(self, d): parts = ','.join([str(x['partition']) for x in broker['toppars'].values()]) - tags = ["broker:{}".format(broker['nodeid']), - "partitions:{}".format(parts), - "type:{}".format(d['type'])] + tags = {"broker": "{}".format(broker['nodeid']), + "partitions": "{}".format(parts), + "type": "{}".format(d['type'])} - self.dd_gauge("broker.rtt.p99", - float(broker['rtt']['p99']) / 1000000.0, tags=tags) - self.dd_gauge("broker.rtt.avg", - float(broker['rtt']['avg']) / 1000000.0, tags=tags) + self.set_gauge("broker.rtt.p99", + float(broker['rtt']['p99']) / 1000000.0, tags=tags) + self.set_gauge("broker.rtt.avg", + float(broker['rtt']['avg']) / 1000000.0, tags=tags) def stats_cb(self, json_str): """ Common statistics callback. """ @@ -350,16 +351,15 @@ def stats_cb(self, json_str): self.logger.info("{} stats: {}/{} brokers UP, {} partition leaders: {}".format( d['name'], len(up_brokers), broker_cnt, self.topic, leaders)) - # Emit the full raw stats every now and then for troubleshooting. + # Emit the full raw stats for troubleshooting. self.stats_cnt[d['type']] += 1 - if (self.stats_cnt[d['type']] % 11) == 0: - self.logger.info("{} raw stats: {}".format(d['name'], json_str)) + self.logger.info("{} raw stats: {}".format(d['name'], json_str)) self.rtt_stats(d) # Sample the producer queue length if d['type'] == 'producer': - self.dd_gauge("producer.outq", len(self.producer)) + self.set_gauge("producer.outq", len(self.producer)) def create_topic(self, topic, conf): """ Create the topic if it doesn't already exist """ @@ -374,7 +374,7 @@ def create_topic(self, topic, conf): else: raise - def __init__(self, topic, rate, conf): + def __init__(self, testid, topic, rate, conf): """ SoakClient constructor. conf is the client configuration """ self.topic = topic self.rate = rate @@ -383,6 +383,12 @@ def __init__(self, topic, rate, conf): self.stats_cnt = {'producer': 0, 'consumer': 0} self.start_time = time.time() + # OTEL instruments + self.counters = {} + self.gauges = {} + self.gauge_cbs = {} + self.gauge_values = {} + self.last_rusage = None self.last_rusage_time = None self.proc = psutil.Process(os.getpid()) @@ -395,8 +401,10 @@ def __init__(self, topic, rate, conf): # Construct a unique id to use for metrics hostname so that # multiple instances of the SoakClient can run on the same machine. - hostname = datadog.util.hostname.get_hostname() + hostname = os.environ["HOSTNAME"] self.hostname = "py-{}-{}".format(hostname, self.topic) + self.testid = testid + self.meter = metrics.get_meter("njc.python.soak.tests") self.logger.info("SoakClient id {}".format(self.hostname)) @@ -405,13 +413,7 @@ def __init__(self, topic, rate, conf): conf['group.id'] = 'soakclient-{}-{}-{}'.format( self.hostname, version()[0], sys.version.split(' ')[0]) - # Separate datadog config from client config - datadog_conf = {k[len("datadog."):]: conf[k] - for k in conf.keys() if k.startswith("datadog.")} - conf = {k: v for k, v in conf.items() if not k.startswith("datadog.")} - - # Set up datadog agent - self.init_datadog(datadog_conf) + conf = {k: v for k, v in conf.items()} def filter_config(conf, filter_out, strip_prefix): len_sp = len(strip_prefix) @@ -432,7 +434,7 @@ def filter_config(conf, filter_out, strip_prefix): # Create Producer and Consumer, each running in its own thread. # conf['stats_cb'] = self.stats_cb - conf['statistics.interval.ms'] = 10000 + conf['statistics.interval.ms'] = 120000 # Producer pconf = filter_config(conf, ["consumer.", "admin."], "producer.") @@ -465,36 +467,66 @@ def terminate(self): # Final resource usage soak.get_rusage() - def init_datadog(self, options): - """ Initialize datadog agent """ - datadog.initialize(**options) - - self.dd = datadog.ThreadStats() - self.dd.start() - - def dd_incr(self, metric_name, incrval): - """ Increment datadog metric counter by incrval """ - self.dd.increment(self.DD_PFX + metric_name, incrval, host=self.hostname) - - def dd_gauge(self, metric_name, val, tags=None): - """ Set datadog metric gauge to val """ - self.dd.gauge(self.DD_PFX + metric_name, val, - tags=tags, host=self.hostname) + def incr_counter(self, metric_name, incrval, tags=None): + """ Increment metric counter by incrval """ + if not tags: + tags = {} + tags.update({ + "host": self.hostname, + "testid": self.testid + }) + + full_metric_name = self.METRIC_PFX + metric_name + if full_metric_name not in self.counters: + self.counters[full_metric_name] = self.meter.create_counter( + full_metric_name, + description=full_metric_name, + ) + counter = self.counters[full_metric_name] + counter.add(incrval, tags) + + def set_gauge(self, metric_name, val, tags=None): + """ Set metric gauge to val """ + if not tags: + tags = {} + tags.update({ + "host": self.hostname, + "testid": self.testid + }) + + full_metric_name = self.METRIC_PFX + metric_name + if full_metric_name not in self.gauge_values: + self.gauge_values[full_metric_name] = [] + + self.gauge_values[full_metric_name].append([val, tags]) + + if full_metric_name not in self.gauges: + def cb(_): + for value in self.gauge_values[full_metric_name]: + yield metrics.Observation(value[0], value[1]) + self.gauge_values[full_metric_name] = [] + + self.gauge_cbs[full_metric_name] = cb + self.gauges[full_metric_name] = self.meter.create_observable_gauge( + callbacks=[self.gauge_cbs[full_metric_name]], + name=full_metric_name, + description=full_metric_name + ) def calc_rusage_deltas(self, curr, prev, elapsed): """ Calculate deltas between previous and current resource usage """ # User CPU % user_cpu = ((curr.ru_utime - prev.ru_utime) / elapsed) * 100.0 - self.dd_gauge("cpu.user", user_cpu) + self.set_gauge("cpu.user", user_cpu) # System CPU % sys_cpu = ((curr.ru_stime - prev.ru_stime) / elapsed) * 100.0 - self.dd_gauge("cpu.system", sys_cpu) + self.set_gauge("cpu.system", sys_cpu) # Max RSS memory (monotonic) max_rss = curr.ru_maxrss / 1024.0 - self.dd_gauge("memory.rss.max", max_rss) + self.set_gauge("memory.rss.max", max_rss) self.logger.info("User CPU: {:.1f}%, System CPU: {:.1f}%, MaxRSS {:.3f}MiB".format( user_cpu, sys_cpu, max_rss)) @@ -513,12 +545,13 @@ def get_rusage(self): # Current RSS memory rss = float(self.proc.memory_info().rss) / (1024.0*1024.0) - self.dd_gauge("memory.rss", rss) + self.set_gauge("memory.rss", rss) if __name__ == '__main__': parser = argparse.ArgumentParser(description='Kafka client soak test') + parser.add_argument('-i', dest='testid', type=str, required=True, help='Test id') parser.add_argument('-b', dest='brokers', type=str, default=None, help='Bootstrap servers') parser.add_argument('-t', dest='topic', type=str, required=True, help='Topic to use') parser.add_argument('-r', dest='rate', type=float, default=10, help='Message produce rate per second') @@ -554,7 +587,7 @@ def get_rusage(self): conf['enable.partition.eof'] = False # Create SoakClient - soak = SoakClient(args.topic, args.rate, conf) + soak = SoakClient(args.testid, args.topic, args.rate, conf) # Get initial resource usage soak.get_rusage()