Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregated Throughput Anomaly detection #184

Merged
merged 4 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions build/charts/theia/crds/anomaly-detector-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ spec:
type: array
items:
type: string
aggFlow:
type: string
podLabel:
type: string
externalIp:
type: string
podName:
type: string
podNameSpace:
type: string
servicePortName:
type: string
executorInstances:
type: integer
driverCoreRequest:
Expand Down
6 changes: 6 additions & 0 deletions build/charts/theia/provisioning/datasources/create_table.sh
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,14 @@ clickhouse client -n -h 127.0.0.1 <<-EOSQL
destinationTransportPort UInt16,
protocolIdentifier UInt16,
flowStartSeconds DateTime,
podNamespace String,
podLabels String,
podName String,
destinationServicePortName String,
direction String,
flowEndSeconds DateTime,
throughputStandardDeviation Float64,
aggType String,
algoType String,
algoCalc Float64,
throughput Float64,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,17 @@ ALTER TABLE flows
ALTER TABLE flows_local
DROP COLUMN egressName,
DROP COLUMN egressIP;
ALTER TABLE tadetector
DROP COLUMN podNamespace,
DROP COLUMN podLabels,
DROP COLUMN destinationServicePortName,
DROP COLUMN aggType,
DROP COLUMN direction,
DROP COLUMN podName;
ALTER TABLE tadetector_local
DROP COLUMN podNamespace,
DROP COLUMN podLabels,
DROP COLUMN destinationServicePortName,
DROP COLUMN aggType,
DROP COLUMN direction,
DROP COLUMN podName;
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,17 @@ ALTER TABLE flows
ALTER TABLE flows_local
ADD COLUMN egressName String,
ADD COLUMN egressIP String;
ALTER TABLE tadetector
ADD COLUMN podNamespace String,
ADD COLUMN podLabels String,
ADD COLUMN destinationServicePortName String,
ADD COLUMN aggType String,
ADD COLUMN direction String,
ADD COLUMN podName String;
ALTER TABLE tadetector_local
ADD COLUMN podNamespace String,
ADD COLUMN podLabels String,
ADD COLUMN destinationServicePortName String,
ADD COLUMN aggType String,
ADD COLUMN direction String,
ADD COLUMN podName String;
34 changes: 34 additions & 0 deletions build/yamls/flow-visibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,20 @@ data:
ALTER TABLE flows_local
DROP COLUMN egressName,
DROP COLUMN egressIP;
ALTER TABLE tadetector
DROP COLUMN podNamespace,
DROP COLUMN podLabels,
DROP COLUMN destinationServicePortName,
DROP COLUMN aggType,
DROP COLUMN direction,
DROP COLUMN podName;
ALTER TABLE tadetector_local
DROP COLUMN podNamespace,
DROP COLUMN podLabels,
DROP COLUMN destinationServicePortName,
DROP COLUMN aggType,
DROP COLUMN direction,
DROP COLUMN podName;
000005_0-6-0.up.sql: |
-- Create underlying tables for Materialized Views to attach data
CREATE TABLE IF NOT EXISTS pod_view_table_local (
Expand Down Expand Up @@ -729,6 +743,20 @@ data:
ALTER TABLE flows_local
ADD COLUMN egressName String,
ADD COLUMN egressIP String;
ALTER TABLE tadetector
ADD COLUMN podNamespace String,
ADD COLUMN podLabels String,
ADD COLUMN destinationServicePortName String,
ADD COLUMN aggType String,
ADD COLUMN direction String,
ADD COLUMN podName String;
ALTER TABLE tadetector_local
ADD COLUMN podNamespace String,
ADD COLUMN podLabels String,
ADD COLUMN destinationServicePortName String,
ADD COLUMN aggType String,
ADD COLUMN direction String,
ADD COLUMN podName String;
create_table.sh: |
#!/usr/bin/env bash
Expand Down Expand Up @@ -1090,8 +1118,14 @@ data:
destinationTransportPort UInt16,
protocolIdentifier UInt16,
flowStartSeconds DateTime,
podNamespace String,
podLabels String,
podName String,
destinationServicePortName String,
direction String,
flowEndSeconds DateTime,
throughputStandardDeviation Float64,
aggType String,
algoType String,
algoCalc Float64,
throughput Float64,
Expand Down
2 changes: 1 addition & 1 deletion ci/jenkins/test-vmc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ function deliver_antrea {
antrea_yml="antrea.yml"
# Enable verbose log for troubleshooting.
sed -i "s/--v=0/--v=4/g" $GIT_CHECKOUT_DIR/build/yamls/$antrea_yml
perl -i -p0e 's/ # feature, you need to set "enable" to true, and ensure that the FlowExporter\n # feature gate is also enabled.\n enable: false/ # feature, you need to set "enable" to true, and ensure that the FlowExporter\n # feature gate is also enabled.\n enable: true/' $TMP_DIR/antrea.yml
perl -i -p0e 's/ # feature, you need to set "enable" to true, and ensure that the FlowExporter\n # feature gate is also enabled.\n enable: false/ # feature, you need to set "enable" to true, and ensure that the FlowExporter\n # feature gate is also enabled.\n enable: true/' $GIT_CHECKOUT_DIR/build/yamls/antrea.yml
sed -i -e "s/flowPollInterval: \"5s\"/flowPollInterval: \"1s\"/g" $GIT_CHECKOUT_DIR/build/yamls/$antrea_yml
sed -i -e "s/activeFlowExportTimeout: \"5s\"/activeFlowExportTimeout: \"2s\"/g" $GIT_CHECKOUT_DIR/build/yamls/$antrea_yml
sed -i -e "s/idleFlowExportTimeout: \"15s\"/idleFlowExportTimeout: \"1s\"/g" $GIT_CHECKOUT_DIR/build/yamls/$antrea_yml
Expand Down
2 changes: 1 addition & 1 deletion ci/kind/test-upgrade-theia.sh
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ popd
rm -rf $TMP_THEIA_DIR

rc=0
go test -v -run=TestUpgrade antrea.io/theia/test/e2e -provider=kind --logs-export-dir=$ANTREA_LOG_DIR --upgrade.toVersion=$CURRENT_VERSION --upgrade.fromVersion=$THEIA_FROM_TAG || rc=$?
go test -v -timeout=15m -run=TestUpgrade antrea.io/theia/test/e2e -provider=kind --logs-export-dir=$ANTREA_LOG_DIR --upgrade.toVersion=$CURRENT_VERSION --upgrade.fromVersion=$THEIA_FROM_TAG || rc=$?

$THIS_DIR/kind-setup.sh destroy kind

Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/crd/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ type ThroughputAnomalyDetectorSpec struct {
StartInterval metav1.Time `json:"startInterval,omitempty"`
EndInterval metav1.Time `json:"endInterval,omitempty"`
NSIgnoreList []string `json:"nsIgnoreList,omitempty"`
AggregatedFlow string `json:"aggFlow,omitempty"`
PodLabel string `json:"podLabel,omitempty"`
PodName string `json:"podName,omitempty"`
PodNameSpace string `json:"podNameSpace,omitempty"`
ExternalIP string `json:"externalIp,omitempty"`
ServicePortName string `json:"servicePortName,omitempty"`
ExecutorInstances int `json:"executorInstances,omitempty"`
DriverCoreRequest string `json:"driverCoreRequest,omitempty"`
DriverMemory string `json:"driverMemory,omitempty"`
Expand Down
33 changes: 23 additions & 10 deletions pkg/apis/intelligence/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ type ThroughputAnomalyDetector struct {
EndInterval metav1.Time `json:"endInterval,omitempty"`
ExecutorInstances int `json:"executorInstances,omitempty"`
NSIgnoreList []string `json:"nsIgnoreList,omitempty"`
AggregatedFlow string `json:"aggFlow,omitempty"`
PodLabel string `json:"podLabel,omitempty"`
PodName string `json:"podName,omitempty"`
PodNameSpace string `json:"podNameSpace,omitempty"`
ExternalIP string `json:"externalIp,omitempty"`
ServicePortName string `json:"servicePortName,omitempty"`
DriverCoreRequest string `json:"driverCoreRequest,omitempty"`
DriverMemory string `json:"driverMemory,omitempty"`
ExecutorCoreRequest string `json:"executorCoreRequest,omitempty"`
Expand Down Expand Up @@ -100,14 +106,21 @@ type ThroughputAnomalyDetectorList struct {
}

type ThroughputAnomalyDetectorStats struct {
Id string `json:"id,omitempty"`
SourceIP string `json:"sourceIP,omitempty"`
SourceTransportPort string `json:"sourceTransportPort,omitempty"`
DestinationIP string `json:"destinationIP,omitempty"`
DestinationTransportPort string `json:"destinationTransportPort,omitempty"`
FlowStartSeconds string `json:"FlowStartSeconds,omitempty"`
FlowEndSeconds string `json:"FlowEndSeconds,omitempty"`
Throughput string `json:"Throughput,omitempty"`
AlgoCalc string `json:"AlgoCalc,omitempty"`
Anomaly string `json:"anomaly,omitempty"`
Id string `json:"id,omitempty"`
SourceIP string `json:"sourceIP,omitempty"`
SourceTransportPort string `json:"sourceTransportPort,omitempty"`
DestinationIP string `json:"destinationIP,omitempty"`
DestinationTransportPort string `json:"destinationTransportPort,omitempty"`
FlowStartSeconds string `json:"FlowStartSeconds,omitempty"`
PodNamespace string `json:"podNamespace,omitempty"`
PodLabels string `json:"podLabels,omitempty"`
PodName string `json:"podName,omitempty"`
Direction string `json:"direction,omitempty"`
DestinationServicePortName string `json:"destinationServicePortName,omitempty"`
FlowEndSeconds string `json:"FlowEndSeconds,omitempty"`
Throughput string `json:"throughput,omitempty"`
AggType string `json:"aggType,omitempty"`
AlgoType string `json:"algoType,omitempty"`
AlgoCalc string `json:"AlgoCalc,omitempty"`
Anomaly string `json:"anomaly,omitempty"`
}
124 changes: 117 additions & 7 deletions pkg/apiserver/registry/intelligence/throughputanomalydetector/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ import (
const (
defaultNameSpace = "flow-visibility"
tadQuery int = iota
aggTadExternalQuery
aggTadPodLabelQuery
aggTadPodNameQuery
aggTadSvcQuery
)

// REST implements rest.Storage for anomalydetector.
Expand All @@ -54,7 +58,7 @@ var (

var queryMap = map[int]string{
tadQuery: `
SELECT
SELECT
id,
sourceIP,
sourceTransportPort,
Expand All @@ -63,6 +67,56 @@ var queryMap = map[int]string{
flowStartSeconds,
flowEndSeconds,
throughput,
aggType,
algoType,
algoCalc,
anomaly
FROM tadetector WHERE id = (?);`,
aggTadExternalQuery: `
SELECT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code formatter missing in .go files?

Copy link
Contributor Author

@tushartathgur tushartathgur Mar 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, make fmt does cover rest files
tathgurt@tathgurtFLVDL theia % find . -type d -name '.cache' -prune -o -type f -name '*.go' -print | grep rest ./pkg/apiserver/registry/intelligence/throughputanomalydetector/rest.go ./pkg/apiserver/registry/intelligence/throughputanomalydetector/rest_test.go ./pkg/apiserver/registry/intelligence/networkpolicyrecommendation/rest.go ./pkg/apiserver/registry/intelligence/networkpolicyrecommendation/rest_test.go ./pkg/apiserver/registry/system/supportbundle/rest.go ./pkg/apiserver/registry/system/supportbundle/rest_test.go ./pkg/apiserver/registry/stats/clickhouse/rest.go ./pkg/apiserver/registry/stats/clickhouse/rest_test.go

id,
destinationIP,
flowEndSeconds,
throughput,
aggType,
algoType,
algoCalc,
anomaly
FROM tadetector WHERE id = (?);`,
aggTadPodLabelQuery: `
SELECT
id,
podNamespace,
podLabels,
direction,
flowEndSeconds,
throughput,
aggType,
algoType,
algoCalc,
anomaly
FROM tadetector WHERE id = (?);`,
aggTadPodNameQuery: `
SELECT
id,
podNamespace,
podName,
direction,
flowEndSeconds,
throughput,
aggType,
algoType,
algoCalc,
anomaly
FROM tadetector WHERE id = (?);`,
aggTadSvcQuery: `
SELECT
id,
destinationServicePortName,
flowEndSeconds,
throughput,
aggType,
algoType,
algoCalc,
anomaly
FROM tadetector WHERE id = (?);`,
Expand Down Expand Up @@ -102,6 +156,12 @@ func (r *REST) copyThroughputAnomalyDetector(tad *v1alpha1.ThroughputAnomalyDete
tad.EndInterval = crd.Spec.EndInterval
tad.ExecutorInstances = crd.Spec.ExecutorInstances
tad.NSIgnoreList = crd.Spec.NSIgnoreList
tad.AggregatedFlow = crd.Spec.AggregatedFlow
tad.PodLabel = crd.Spec.PodLabel
tad.PodName = crd.Spec.PodName
tad.PodNameSpace = crd.Spec.PodNameSpace
tad.ExternalIP = crd.Spec.ExternalIP
tad.ServicePortName = crd.Spec.ServicePortName
tad.DriverCoreRequest = crd.Spec.DriverCoreRequest
tad.DriverMemory = crd.Spec.DriverMemory
tad.ExecutorCoreRequest = crd.Spec.ExecutorCoreRequest
Expand Down Expand Up @@ -170,6 +230,12 @@ func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation
job.Spec.DriverMemory = newTAD.DriverMemory
job.Spec.ExecutorCoreRequest = newTAD.ExecutorCoreRequest
job.Spec.ExecutorMemory = newTAD.ExecutorMemory
job.Spec.AggregatedFlow = newTAD.AggregatedFlow
job.Spec.PodLabel = newTAD.PodLabel
job.Spec.PodName = newTAD.PodName
job.Spec.PodNameSpace = newTAD.PodNameSpace
job.Spec.ExternalIP = newTAD.ExternalIP
job.Spec.ServicePortName = newTAD.ServicePortName
_, err := r.ThroughputAnomalyDetectorQuerier.CreateThroughputAnomalyDetector(defaultNameSpace, job)
if err != nil {
return nil, errors.NewBadRequest(fmt.Sprintf("error when creating ThroughputAnomalyDetection job: %+v, err: %v", job, err))
Expand All @@ -179,24 +245,68 @@ func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation

func (r *REST) getTADetectorResult(id string, tad *v1alpha1.ThroughputAnomalyDetector) error {
var err error
query := tadQuery
switch tad.AggregatedFlow {
case "external":
query = aggTadExternalQuery
case "pod":
if tad.PodName != "" {
query = aggTadPodNameQuery
} else {
query = aggTadPodLabelQuery
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A similar situation here, if tad.PodLabel == "" and tad.PodName == "" then query = aggTadPodLabelQuery, I would suggest

               if tad.PodName != "" {
			query = aggTadPodNameQuery
		} else {
			query = aggTadPodLabelQuery
		}

case "svc":
query = aggTadSvcQuery
}
if r.clickhouseConnect == nil {
r.clickhouseConnect, err = setupClickHouseConnection(nil)
if err != nil {
return err
}
}
rows, err := r.clickhouseConnect.Query(queryMap[tadQuery], id)
rows, err := r.clickhouseConnect.Query(queryMap[query], id)
if err != nil {
return fmt.Errorf("failed to get Throughput Anomaly Detector results with id %s: %v", id, err)
}
defer rows.Close()
for rows.Next() {
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.SourceIP, &res.SourceTransportPort, &res.DestinationIP, &res.DestinationTransportPort, &res.FlowStartSeconds, &res.FlowEndSeconds, &res.Throughput, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector results: %v", err)
switch query {
case tadQuery:
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.SourceIP, &res.SourceTransportPort, &res.DestinationIP, &res.DestinationTransportPort, &res.FlowStartSeconds, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector results: %v", err)
}
tad.Stats = append(tad.Stats, res)
case aggTadExternalQuery:
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.DestinationIP, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector External IP Aggregate results: %v", err)
}
tad.Stats = append(tad.Stats, res)
case aggTadPodLabelQuery:
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.PodNamespace, &res.PodLabels, &res.Direction, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector Pod Aggregate results: %v", err)
}
tad.Stats = append(tad.Stats, res)
case aggTadPodNameQuery:
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.PodNamespace, &res.PodName, &res.Direction, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector Pod Aggregate results: %v", err)
}
tad.Stats = append(tad.Stats, res)
case aggTadSvcQuery:
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.DestinationServicePortName, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector Service Aggregate results: %v", err)
}
tad.Stats = append(tad.Stats, res)
}
tad.Stats = append(tad.Stats, res)
}
return nil
}
Expand Down
Loading
Loading