Skip to content

Commit

Permalink
Aggregated Throughput Anomaly detection (#184)
Browse files Browse the repository at this point in the history
* Aggregated Throughput Anomaly Detection

This PR does the following:
- Implements argument "agg-flow" and "p2p-label" for aggregated flow.
- Aggregated flow contains Pods to external, pods to pods based of labels and pod to service flows.
- New retrieve table has been added for aggregated TAD.
- Modified retrieve table for TAD, new fields include agg_type and algo_type for better understanding.
- TAD delete command can now take multiple tad ids to delete.

partially solves: #168

Signed-off-by: Tushar Tathgur <[email protected]>

* Addressed comments in Aggregated Throughput Anomaly Detection

partially solves: #168

Signed-off-by: Tushar Tathgur <[email protected]>

* Addressed comments with Table and datasources changes

Signed-off-by: Tushar Tathgur <[email protected]>

* Resolving Jenkins e2e issue

Signed-off-by: Tushar Tathgur <[email protected]>

---------

Signed-off-by: Tushar Tathgur <[email protected]>
Co-authored-by: Tushar Tathgur <[email protected]>
  • Loading branch information
tushartathgur and Tushar Tathgur authored Jul 12, 2023
1 parent 31f4752 commit aa09d11
Show file tree
Hide file tree
Showing 23 changed files with 1,505 additions and 329 deletions.
12 changes: 12 additions & 0 deletions build/charts/theia/crds/anomaly-detector-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ spec:
type: array
items:
type: string
aggFlow:
type: string
podLabel:
type: string
externalIp:
type: string
podName:
type: string
podNameSpace:
type: string
servicePortName:
type: string
executorInstances:
type: integer
driverCoreRequest:
Expand Down
6 changes: 6 additions & 0 deletions build/charts/theia/provisioning/datasources/create_table.sh
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,14 @@ clickhouse client -n -h 127.0.0.1 <<-EOSQL
destinationTransportPort UInt16,
protocolIdentifier UInt16,
flowStartSeconds DateTime,
podNamespace String,
podLabels String,
podName String,
destinationServicePortName String,
direction String,
flowEndSeconds DateTime,
throughputStandardDeviation Float64,
aggType String,
algoType String,
algoCalc Float64,
throughput Float64,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,17 @@ ALTER TABLE flows
ALTER TABLE flows_local
DROP COLUMN egressName,
DROP COLUMN egressIP;
ALTER TABLE tadetector
DROP COLUMN podNamespace,
DROP COLUMN podLabels,
DROP COLUMN destinationServicePortName,
DROP COLUMN aggType,
DROP COLUMN direction,
DROP COLUMN podName;
ALTER TABLE tadetector_local
DROP COLUMN podNamespace,
DROP COLUMN podLabels,
DROP COLUMN destinationServicePortName,
DROP COLUMN aggType,
DROP COLUMN direction,
DROP COLUMN podName;
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,17 @@ ALTER TABLE flows
ALTER TABLE flows_local
ADD COLUMN egressName String,
ADD COLUMN egressIP String;
ALTER TABLE tadetector
ADD COLUMN podNamespace String,
ADD COLUMN podLabels String,
ADD COLUMN destinationServicePortName String,
ADD COLUMN aggType String,
ADD COLUMN direction String,
ADD COLUMN podName String;
ALTER TABLE tadetector_local
ADD COLUMN podNamespace String,
ADD COLUMN podLabels String,
ADD COLUMN destinationServicePortName String,
ADD COLUMN aggType String,
ADD COLUMN direction String,
ADD COLUMN podName String;
34 changes: 34 additions & 0 deletions build/yamls/flow-visibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,20 @@ data:
ALTER TABLE flows_local
DROP COLUMN egressName,
DROP COLUMN egressIP;
ALTER TABLE tadetector
DROP COLUMN podNamespace,
DROP COLUMN podLabels,
DROP COLUMN destinationServicePortName,
DROP COLUMN aggType,
DROP COLUMN direction,
DROP COLUMN podName;
ALTER TABLE tadetector_local
DROP COLUMN podNamespace,
DROP COLUMN podLabels,
DROP COLUMN destinationServicePortName,
DROP COLUMN aggType,
DROP COLUMN direction,
DROP COLUMN podName;
000005_0-6-0.up.sql: |
-- Create underlying tables for Materialized Views to attach data
CREATE TABLE IF NOT EXISTS pod_view_table_local (
Expand Down Expand Up @@ -729,6 +743,20 @@ data:
ALTER TABLE flows_local
ADD COLUMN egressName String,
ADD COLUMN egressIP String;
ALTER TABLE tadetector
ADD COLUMN podNamespace String,
ADD COLUMN podLabels String,
ADD COLUMN destinationServicePortName String,
ADD COLUMN aggType String,
ADD COLUMN direction String,
ADD COLUMN podName String;
ALTER TABLE tadetector_local
ADD COLUMN podNamespace String,
ADD COLUMN podLabels String,
ADD COLUMN destinationServicePortName String,
ADD COLUMN aggType String,
ADD COLUMN direction String,
ADD COLUMN podName String;
create_table.sh: |
#!/usr/bin/env bash
Expand Down Expand Up @@ -1090,8 +1118,14 @@ data:
destinationTransportPort UInt16,
protocolIdentifier UInt16,
flowStartSeconds DateTime,
podNamespace String,
podLabels String,
podName String,
destinationServicePortName String,
direction String,
flowEndSeconds DateTime,
throughputStandardDeviation Float64,
aggType String,
algoType String,
algoCalc Float64,
throughput Float64,
Expand Down
2 changes: 1 addition & 1 deletion ci/jenkins/test-vmc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ function deliver_antrea {
antrea_yml="antrea.yml"
# Enable verbose log for troubleshooting.
sed -i "s/--v=0/--v=4/g" $GIT_CHECKOUT_DIR/build/yamls/$antrea_yml
perl -i -p0e 's/ # feature, you need to set "enable" to true, and ensure that the FlowExporter\n # feature gate is also enabled.\n enable: false/ # feature, you need to set "enable" to true, and ensure that the FlowExporter\n # feature gate is also enabled.\n enable: true/' $TMP_DIR/antrea.yml
perl -i -p0e 's/ # feature, you need to set "enable" to true, and ensure that the FlowExporter\n # feature gate is also enabled.\n enable: false/ # feature, you need to set "enable" to true, and ensure that the FlowExporter\n # feature gate is also enabled.\n enable: true/' $GIT_CHECKOUT_DIR/build/yamls/antrea.yml
sed -i -e "s/flowPollInterval: \"5s\"/flowPollInterval: \"1s\"/g" $GIT_CHECKOUT_DIR/build/yamls/$antrea_yml
sed -i -e "s/activeFlowExportTimeout: \"5s\"/activeFlowExportTimeout: \"2s\"/g" $GIT_CHECKOUT_DIR/build/yamls/$antrea_yml
sed -i -e "s/idleFlowExportTimeout: \"15s\"/idleFlowExportTimeout: \"1s\"/g" $GIT_CHECKOUT_DIR/build/yamls/$antrea_yml
Expand Down
2 changes: 1 addition & 1 deletion ci/kind/test-upgrade-theia.sh
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ popd
rm -rf $TMP_THEIA_DIR

rc=0
go test -v -run=TestUpgrade antrea.io/theia/test/e2e -provider=kind --logs-export-dir=$ANTREA_LOG_DIR --upgrade.toVersion=$CURRENT_VERSION --upgrade.fromVersion=$THEIA_FROM_TAG || rc=$?
go test -v -timeout=15m -run=TestUpgrade antrea.io/theia/test/e2e -provider=kind --logs-export-dir=$ANTREA_LOG_DIR --upgrade.toVersion=$CURRENT_VERSION --upgrade.fromVersion=$THEIA_FROM_TAG || rc=$?

$THIS_DIR/kind-setup.sh destroy kind

Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/crd/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ type ThroughputAnomalyDetectorSpec struct {
StartInterval metav1.Time `json:"startInterval,omitempty"`
EndInterval metav1.Time `json:"endInterval,omitempty"`
NSIgnoreList []string `json:"nsIgnoreList,omitempty"`
AggregatedFlow string `json:"aggFlow,omitempty"`
PodLabel string `json:"podLabel,omitempty"`
PodName string `json:"podName,omitempty"`
PodNameSpace string `json:"podNameSpace,omitempty"`
ExternalIP string `json:"externalIp,omitempty"`
ServicePortName string `json:"servicePortName,omitempty"`
ExecutorInstances int `json:"executorInstances,omitempty"`
DriverCoreRequest string `json:"driverCoreRequest,omitempty"`
DriverMemory string `json:"driverMemory,omitempty"`
Expand Down
33 changes: 23 additions & 10 deletions pkg/apis/intelligence/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ type ThroughputAnomalyDetector struct {
EndInterval metav1.Time `json:"endInterval,omitempty"`
ExecutorInstances int `json:"executorInstances,omitempty"`
NSIgnoreList []string `json:"nsIgnoreList,omitempty"`
AggregatedFlow string `json:"aggFlow,omitempty"`
PodLabel string `json:"podLabel,omitempty"`
PodName string `json:"podName,omitempty"`
PodNameSpace string `json:"podNameSpace,omitempty"`
ExternalIP string `json:"externalIp,omitempty"`
ServicePortName string `json:"servicePortName,omitempty"`
DriverCoreRequest string `json:"driverCoreRequest,omitempty"`
DriverMemory string `json:"driverMemory,omitempty"`
ExecutorCoreRequest string `json:"executorCoreRequest,omitempty"`
Expand Down Expand Up @@ -100,14 +106,21 @@ type ThroughputAnomalyDetectorList struct {
}

type ThroughputAnomalyDetectorStats struct {
Id string `json:"id,omitempty"`
SourceIP string `json:"sourceIP,omitempty"`
SourceTransportPort string `json:"sourceTransportPort,omitempty"`
DestinationIP string `json:"destinationIP,omitempty"`
DestinationTransportPort string `json:"destinationTransportPort,omitempty"`
FlowStartSeconds string `json:"FlowStartSeconds,omitempty"`
FlowEndSeconds string `json:"FlowEndSeconds,omitempty"`
Throughput string `json:"Throughput,omitempty"`
AlgoCalc string `json:"AlgoCalc,omitempty"`
Anomaly string `json:"anomaly,omitempty"`
Id string `json:"id,omitempty"`
SourceIP string `json:"sourceIP,omitempty"`
SourceTransportPort string `json:"sourceTransportPort,omitempty"`
DestinationIP string `json:"destinationIP,omitempty"`
DestinationTransportPort string `json:"destinationTransportPort,omitempty"`
FlowStartSeconds string `json:"FlowStartSeconds,omitempty"`
PodNamespace string `json:"podNamespace,omitempty"`
PodLabels string `json:"podLabels,omitempty"`
PodName string `json:"podName,omitempty"`
Direction string `json:"direction,omitempty"`
DestinationServicePortName string `json:"destinationServicePortName,omitempty"`
FlowEndSeconds string `json:"FlowEndSeconds,omitempty"`
Throughput string `json:"throughput,omitempty"`
AggType string `json:"aggType,omitempty"`
AlgoType string `json:"algoType,omitempty"`
AlgoCalc string `json:"AlgoCalc,omitempty"`
Anomaly string `json:"anomaly,omitempty"`
}
124 changes: 117 additions & 7 deletions pkg/apiserver/registry/intelligence/throughputanomalydetector/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ import (
const (
defaultNameSpace = "flow-visibility"
tadQuery int = iota
aggTadExternalQuery
aggTadPodLabelQuery
aggTadPodNameQuery
aggTadSvcQuery
)

// REST implements rest.Storage for anomalydetector.
Expand All @@ -54,7 +58,7 @@ var (

var queryMap = map[int]string{
tadQuery: `
SELECT
SELECT
id,
sourceIP,
sourceTransportPort,
Expand All @@ -63,6 +67,56 @@ var queryMap = map[int]string{
flowStartSeconds,
flowEndSeconds,
throughput,
aggType,
algoType,
algoCalc,
anomaly
FROM tadetector WHERE id = (?);`,
aggTadExternalQuery: `
SELECT
id,
destinationIP,
flowEndSeconds,
throughput,
aggType,
algoType,
algoCalc,
anomaly
FROM tadetector WHERE id = (?);`,
aggTadPodLabelQuery: `
SELECT
id,
podNamespace,
podLabels,
direction,
flowEndSeconds,
throughput,
aggType,
algoType,
algoCalc,
anomaly
FROM tadetector WHERE id = (?);`,
aggTadPodNameQuery: `
SELECT
id,
podNamespace,
podName,
direction,
flowEndSeconds,
throughput,
aggType,
algoType,
algoCalc,
anomaly
FROM tadetector WHERE id = (?);`,
aggTadSvcQuery: `
SELECT
id,
destinationServicePortName,
flowEndSeconds,
throughput,
aggType,
algoType,
algoCalc,
anomaly
FROM tadetector WHERE id = (?);`,
Expand Down Expand Up @@ -102,6 +156,12 @@ func (r *REST) copyThroughputAnomalyDetector(tad *v1alpha1.ThroughputAnomalyDete
tad.EndInterval = crd.Spec.EndInterval
tad.ExecutorInstances = crd.Spec.ExecutorInstances
tad.NSIgnoreList = crd.Spec.NSIgnoreList
tad.AggregatedFlow = crd.Spec.AggregatedFlow
tad.PodLabel = crd.Spec.PodLabel
tad.PodName = crd.Spec.PodName
tad.PodNameSpace = crd.Spec.PodNameSpace
tad.ExternalIP = crd.Spec.ExternalIP
tad.ServicePortName = crd.Spec.ServicePortName
tad.DriverCoreRequest = crd.Spec.DriverCoreRequest
tad.DriverMemory = crd.Spec.DriverMemory
tad.ExecutorCoreRequest = crd.Spec.ExecutorCoreRequest
Expand Down Expand Up @@ -170,6 +230,12 @@ func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation
job.Spec.DriverMemory = newTAD.DriverMemory
job.Spec.ExecutorCoreRequest = newTAD.ExecutorCoreRequest
job.Spec.ExecutorMemory = newTAD.ExecutorMemory
job.Spec.AggregatedFlow = newTAD.AggregatedFlow
job.Spec.PodLabel = newTAD.PodLabel
job.Spec.PodName = newTAD.PodName
job.Spec.PodNameSpace = newTAD.PodNameSpace
job.Spec.ExternalIP = newTAD.ExternalIP
job.Spec.ServicePortName = newTAD.ServicePortName
_, err := r.ThroughputAnomalyDetectorQuerier.CreateThroughputAnomalyDetector(defaultNameSpace, job)
if err != nil {
return nil, errors.NewBadRequest(fmt.Sprintf("error when creating ThroughputAnomalyDetection job: %+v, err: %v", job, err))
Expand All @@ -179,24 +245,68 @@ func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation

func (r *REST) getTADetectorResult(id string, tad *v1alpha1.ThroughputAnomalyDetector) error {
var err error
query := tadQuery
switch tad.AggregatedFlow {
case "external":
query = aggTadExternalQuery
case "pod":
if tad.PodName != "" {
query = aggTadPodNameQuery
} else {
query = aggTadPodLabelQuery
}
case "svc":
query = aggTadSvcQuery
}
if r.clickhouseConnect == nil {
r.clickhouseConnect, err = setupClickHouseConnection(nil)
if err != nil {
return err
}
}
rows, err := r.clickhouseConnect.Query(queryMap[tadQuery], id)
rows, err := r.clickhouseConnect.Query(queryMap[query], id)
if err != nil {
return fmt.Errorf("failed to get Throughput Anomaly Detector results with id %s: %v", id, err)
}
defer rows.Close()
for rows.Next() {
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.SourceIP, &res.SourceTransportPort, &res.DestinationIP, &res.DestinationTransportPort, &res.FlowStartSeconds, &res.FlowEndSeconds, &res.Throughput, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector results: %v", err)
switch query {
case tadQuery:
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.SourceIP, &res.SourceTransportPort, &res.DestinationIP, &res.DestinationTransportPort, &res.FlowStartSeconds, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector results: %v", err)
}
tad.Stats = append(tad.Stats, res)
case aggTadExternalQuery:
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.DestinationIP, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector External IP Aggregate results: %v", err)
}
tad.Stats = append(tad.Stats, res)
case aggTadPodLabelQuery:
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.PodNamespace, &res.PodLabels, &res.Direction, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector Pod Aggregate results: %v", err)
}
tad.Stats = append(tad.Stats, res)
case aggTadPodNameQuery:
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.PodNamespace, &res.PodName, &res.Direction, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector Pod Aggregate results: %v", err)
}
tad.Stats = append(tad.Stats, res)
case aggTadSvcQuery:
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.DestinationServicePortName, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector Service Aggregate results: %v", err)
}
tad.Stats = append(tad.Stats, res)
}
tad.Stats = append(tad.Stats, res)
}
return nil
}
Expand Down
Loading

0 comments on commit aa09d11

Please sign in to comment.