Skip to content

Commit

Permalink
Fix ClickHouse monitor e2e test
Browse files Browse the repository at this point in the history
Signed-off-by: Yun-Tang Hsu <[email protected]>
  • Loading branch information
yuntanghsu committed Nov 1, 2023
1 parent 5fb2867 commit 8a52653
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 13 deletions.
2 changes: 1 addition & 1 deletion ci/kind/test-e2e-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ function print_usage {

TESTBED_CMD=$(dirname $0)"/kind-setup.sh"
YML_DIR=$(dirname $0)"/../../build/yamls"
FLOW_VISIBILITY_CMD=$(dirname $0)"/../../hack/generate-manifest.sh --ch-size 100Mi --ch-monitor-threshold 0.1 --theia-manager"
FLOW_VISIBILITY_CMD=$(dirname $0)"/../../hack/generate-manifest.sh --ch-size 100Mi --ch-monitor-threshold 0.1 --ch-monitor-exec-interval 10s --theia-manager"
FLOW_VISIBILITY_WITH_SPARK_CMD=$(dirname $0)"/../../hack/generate-manifest.sh --no-grafana --spark-operator --theia-manager"
FLOW_VISIBILITY_CH_ONLY_CMD=$(dirname $0)"/../../hack/generate-manifest.sh --no-grafana --theia-manager"
CH_OPERATOR_YML=$(dirname $0)"/../../build/charts/theia/crds/clickhouse-operator-install-bundle.yaml"
Expand Down
8 changes: 7 additions & 1 deletion hack/generate-manifest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Kustomize, and print it to stdout.
Ei, Pi, Ti, Gi, Mi, Ki. (default is 8Gi)
--ch-monitor-threshold <threshold> Deploy the ClickHouse monitor with a specific threshold. Can
vary from 0 to 1. (default is 0.5)
--ch-monitor-exec-interval Deploy the ClickHouse monitor with a specific EXEC_INTERVAL.
--local <path> Create the PersistentVolume for Clickhouse DB with a provided
local path.
--zookeeper-local <path> Create the PersistentVolume for ZooKeeper with a provided
Expand Down Expand Up @@ -70,6 +71,7 @@ CH_THRESHOLD=0.5
LOCALPATH=""
ZK_LOCALPATH=""
IP_ADDRESS=""
EXEC_INTERVAL="1m"

while [[ $# -gt 0 ]]
do
Expand Down Expand Up @@ -112,6 +114,10 @@ case $key in
CH_THRESHOLD="$2"
shift 2
;;
--ch-monitor-exec-interval)
EXEC_INTERVAL="$2"
shift 2
;;
--local)
LOCALPATH="$2"
shift 2
Expand Down Expand Up @@ -167,7 +173,7 @@ fi

HELM_VALUES=()

HELM_VALUES+=("clickhouse.storage.size=$CH_SIZE" "clickhouse.monitor.threshold=$CH_THRESHOLD")
HELM_VALUES+=("clickhouse.storage.size=$CH_SIZE" "clickhouse.monitor.threshold=$CH_THRESHOLD" "clickhouse.monitor.execInterval=$EXEC_INTERVAL")

if [ "$MODE" == "dev" ] && [ -n "$IMG_NAME" ]; then
HELM_VALUES+=("clickhouse.monitor.image.repository=$IMG_NAME")
Expand Down
57 changes: 46 additions & 11 deletions test/e2e/flowvisibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import (
"testing"
"time"

"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/apis/crd/v1beta1"
"antrea.io/antrea/test/e2e/utils"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -39,9 +41,6 @@ import (
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"

"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/test/e2e/utils"
)

/* Sample record in ClickHouse table:
Expand Down Expand Up @@ -113,6 +112,10 @@ const (
grafanaAddr = "http://127.0.0.1:5000"
grafanaQueryTimeout = 10 * time.Second
grafanaDefaultIntervalMS = "60000"
allocatedSpace = 100 * 1024 * 1024
clickHouseUsageCheckTimeout = 5 * time.Second
clickHouseUsageCheckPeriod = 500 * time.Millisecond
execInterval = 10 * time.Second
)

var (
Expand Down Expand Up @@ -615,6 +618,8 @@ func checkQueryResult(t *testing.T, apiEndpoint, httpMethod string, queries []st
func checkClickHouseMonitor(t *testing.T, data *TestData, isIPv6 bool, flow testFlow) {
checkClickHouseMonitorLogs(t, data, false, 0)
var cmdStr string
var numRecord int64
var memUsage float64
// iperf3 has a limit on maximum parallel streams at 128
if !isIPv6 {
cmdStr = fmt.Sprintf("iperf3 -u -c %s -P 128 -n 1", flow.dstIP)
Expand All @@ -623,21 +628,51 @@ func checkClickHouseMonitor(t *testing.T, data *TestData, isIPv6 bool, flow test
}
log.Infof("Generating flow records to exceed monitor threshold...")
for i := 0; i < monitorIperfRounds; i++ {
err := wait.Poll(clickHouseUsageCheckPeriod, clickHouseUsageCheckTimeout, func() (bool, error) {
memUsage = getClickHouseUsedMemory(t, data)
if memUsage > monitorThreshold {
// Get the number of records in database before the monitor deletes the records
stdout, stderr, err := data.RunCommandFromPod(flowVisibilityNamespace, clickHousePodName, "clickhouse", []string{"bash", "-c", "clickhouse client -q \"SELECT COUNT() FROM default.flows\""})
require.NoErrorf(t, err, "Error when querying ClickHouse server: %v,\nstdout:%s\nstderr:%s", err, stdout, stderr)
numRecord, err = strconv.ParseInt(strings.TrimSuffix(stdout, "\n"), 10, 64)
require.NoErrorf(t, err, "Failed when parsing the number of records %v", err)
log.Infof("Current total number of records is %d", numRecord)
return true, nil
}
return false, nil
})
if err == nil {
break
}
log.Infof("Current memery usage is %f", memUsage)
stdout, stderr, err := data.RunCommandFromPod(testNamespace, flow.srcPodName, "perftool", []string{"bash", "-c", cmdStr})
require.NoErrorf(t, err, "Error when running iPerf3 client: %v,\nstdout:%s\nstderr:%s", err, stdout, stderr)
}
log.Infof("Waiting for the flows to be exported...")
time.Sleep(30 * time.Second)
// Get the number of records in database before the monitor deletes the records
stdout, stderr, err := data.RunCommandFromPod(flowVisibilityNamespace, clickHousePodName, "clickhouse", []string{"bash", "-c", "clickhouse client -q \"SELECT COUNT() FROM default.flows\""})
require.NoErrorf(t, err, "Error when querying ClickHouse server: %v,\nstdout:%s\nstderr:%s", err, stdout, stderr)
numRecord, err := strconv.ParseInt(strings.TrimSuffix(stdout, "\n"), 10, 64)
require.NoErrorf(t, err, "Failed when parsing the number of records %v", err)
log.Infof("Waiting for the monitor to detect and clean up the ClickHouse storage")
time.Sleep(2 * time.Minute)
time.Sleep(2 * execInterval)
checkClickHouseMonitorLogs(t, data, true, numRecord)
}

func getClickHouseUsedMemory(t *testing.T, data *TestData) float64 {
stdout, stderr, err := data.RunCommandFromPod(flowVisibilityNamespace, clickHousePodName, "clickhouse", []string{"bash", "-c", "clickhouse client -q \"SELECT free_space FROM system.disks;\""})
require.NoErrorf(t, err, "Error when querying ClickHouse server: %v,\nstdout:%s\nstderr:%s", err, stdout, stderr)
freeSpace, err := strconv.ParseInt(strings.TrimSuffix(stdout, "\n"), 10, 64)
require.NoErrorf(t, err, "Error when parsing freeSpace: %v,\nstdout:%s\nstderr:%s", err, stdout, stderr)
stdout, stderr, err = data.RunCommandFromPod(flowVisibilityNamespace, clickHousePodName, "clickhouse", []string{"bash", "-c", "clickhouse client -q \"SELECT SUM(bytes) FROM system.parts;\""})
require.NoErrorf(t, err, "Error when querying ClickHouse server: %v,\nstdout:%s\nstderr:%s", err, stdout, stderr)
usedSpace, err := strconv.ParseInt(strings.TrimSuffix(stdout, "\n"), 10, 64)
require.NoErrorf(t, err, "Error when parsing usedSpace: %v,\nstdout:%s\nstderr:%s", err, stdout, stderr)
// Total space for ClickHouse is the smaller one of the user allocated space size and the actual space size on the disk
var totalSpace int64
if (freeSpace + usedSpace) < allocatedSpace {
totalSpace = freeSpace + usedSpace
} else {
totalSpace = allocatedSpace
}
usagePercentage := float64(usedSpace) / float64(totalSpace)
return usagePercentage
}

func checkClickHouseMonitorLogs(t *testing.T, data *TestData, deleted bool, numRecord int64) {
logString, err := data.GetPodLogs(flowVisibilityNamespace, clickHousePodName,
&corev1.PodLogOptions{
Expand Down

0 comments on commit 8a52653

Please sign in to comment.