Skip to content

Commit

Permalink
Aggregated Throughput Anomaly Detection
Browse files Browse the repository at this point in the history
This PR does the following:
- Implements argument "agg-flow" and "p2p-label" for aggregated flow.
- Aggregated flow contains Pods to external, pods to pods based of labels and pod to service flows.
- New retrieve table has been added for aggregated TAD.
- Modified retrieve table for TAD, new fields include agg_type and algo_type for better understanding.
- TAD delete command can now take multiple tad ids to delete.

partially solves: #168

Signed-off-by: Tushar Tathgur <[email protected]>
  • Loading branch information
Tushar Tathgur authored and Tushar Tathgur committed Mar 27, 2023
1 parent 80d1138 commit 842d8e8
Show file tree
Hide file tree
Showing 19 changed files with 851 additions and 182 deletions.
4 changes: 4 additions & 0 deletions build/charts/theia/crds/anomaly-detector-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ spec:
type: array
items:
type: string
aggflow:
type: string
pod2podlabel:
type: string
executorInstances:
type: integer
driverCoreRequest:
Expand Down
8 changes: 7 additions & 1 deletion build/charts/theia/provisioning/datasources/create_table.sh
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,21 @@ clickhouse client -n -h 127.0.0.1 <<-EOSQL
destinationTransportPort UInt16,
protocolIdentifier UInt16,
flowStartSeconds DateTime,
sourcePodNamespace String,
sourcePodLabels String,
destinationPodNamespace String,
destinationPodLabels String,
destinationServicePortName String,
flowEndSeconds DateTime,
throughputStandardDeviation Float64,
aggType String,
algoType String,
algoCalc Float64,
throughput Float64,
anomaly String,
id String
) engine=ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/{table}', '{replica}')
ORDER BY (flowStartSeconds);
ORDER BY (flowEndSeconds);
--Create distributed tables for cluster
CREATE TABLE IF NOT EXISTS flows AS flows_local
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
--Drop aggregated TAD columns
ALTER TABLE tadetector DROP COLUMN sourcePodNamespace String;
ALTER TABLE tadetector_local DROP COLUMN sourcePodNamespace String;
ALTER TABLE tadetector DROP COLUMN sourcePodLabels String;
ALTER TABLE tadetector_local DROP COLUMN sourcePodLabels String;
ALTER TABLE tadetector DROP COLUMN destinationPodNamespace String;
ALTER TABLE tadetector_local DROP COLUMN destinationPodNamespace String;
ALTER TABLE tadetector DROP COLUMN destinationPodLabels String;
ALTER TABLE tadetector_local DROP COLUMN destinationPodLabels String;
ALTER TABLE tadetector DROP COLUMN destinationServicePortName String;
ALTER TABLE tadetector_local DROP COLUMN destinationServicePortName String;
ALTER TABLE tadetector DROP COLUMN aggType String;
ALTER TABLE tadetector_local DROP COLUMN aggType String;
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
--Add aggregated TAD columns
ALTER TABLE tadetector ADD COLUMN sourcePodNamespace String;
ALTER TABLE tadetector_local ADD COLUMN sourcePodNamespace String;
ALTER TABLE tadetector ADD COLUMN sourcePodLabels String;
ALTER TABLE tadetector_local ADD COLUMN sourcePodLabels String;
ALTER TABLE tadetector ADD COLUMN destinationPodNamespace String;
ALTER TABLE tadetector_local ADD COLUMN destinationPodNamespace String;
ALTER TABLE tadetector ADD COLUMN destinationPodLabels String;
ALTER TABLE tadetector_local ADD COLUMN destinationPodLabels String;
ALTER TABLE tadetector ADD COLUMN destinationServicePortName String;
ALTER TABLE tadetector_local ADD COLUMN destinationServicePortName String;
ALTER TABLE tadetector ADD COLUMN aggType String;
ALTER TABLE tadetector_local ADD COLUMN aggType String;
40 changes: 39 additions & 1 deletion build/yamls/flow-visibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,34 @@ data:
CREATE TABLE IF NOT EXISTS tadetector AS tadetector_local
engine=Distributed('{cluster}', default, tadetector_local, rand());
000005_0-5-0.down.sql: |
--Drop aggregated TAD columns
ALTER TABLE tadetector DROP COLUMN sourcePodNamespace String;
ALTER TABLE tadetector_local DROP COLUMN sourcePodNamespace String;
ALTER TABLE tadetector DROP COLUMN sourcePodLabels String;
ALTER TABLE tadetector_local DROP COLUMN sourcePodLabels String;
ALTER TABLE tadetector DROP COLUMN destinationPodNamespace String;
ALTER TABLE tadetector_local DROP COLUMN destinationPodNamespace String;
ALTER TABLE tadetector DROP COLUMN destinationPodLabels String;
ALTER TABLE tadetector_local DROP COLUMN destinationPodLabels String;
ALTER TABLE tadetector DROP COLUMN destinationServicePortName String;
ALTER TABLE tadetector_local DROP COLUMN destinationServicePortName String;
ALTER TABLE tadetector DROP COLUMN aggType String;
ALTER TABLE tadetector_local DROP COLUMN aggType String;
000005_0-5-0.up.sql: |
--Add aggregated TAD columns
ALTER TABLE tadetector ADD COLUMN sourcePodNamespace String;
ALTER TABLE tadetector_local ADD COLUMN sourcePodNamespace String;
ALTER TABLE tadetector ADD COLUMN sourcePodLabels String;
ALTER TABLE tadetector_local ADD COLUMN sourcePodLabels String;
ALTER TABLE tadetector ADD COLUMN destinationPodNamespace String;
ALTER TABLE tadetector_local ADD COLUMN destinationPodNamespace String;
ALTER TABLE tadetector ADD COLUMN destinationPodLabels String;
ALTER TABLE tadetector_local ADD COLUMN destinationPodLabels String;
ALTER TABLE tadetector ADD COLUMN destinationServicePortName String;
ALTER TABLE tadetector_local ADD COLUMN destinationServicePortName String;
ALTER TABLE tadetector ADD COLUMN aggType String;
ALTER TABLE tadetector_local ADD COLUMN aggType String;
create_table.sh: |
#!/usr/bin/env bash
Expand Down Expand Up @@ -675,15 +703,21 @@ data:
destinationTransportPort UInt16,
protocolIdentifier UInt16,
flowStartSeconds DateTime,
sourcePodNamespace String,
sourcePodLabels String,
destinationPodNamespace String,
destinationPodLabels String,
destinationServicePortName String,
flowEndSeconds DateTime,
throughputStandardDeviation Float64,
aggType String,
algoType String,
algoCalc Float64,
throughput Float64,
anomaly String,
id String
) engine=ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/{table}', '{replica}')
ORDER BY (flowStartSeconds);
ORDER BY (flowEndSeconds);
--Create distributed tables for cluster
CREATE TABLE IF NOT EXISTS flows AS flows_local
Expand Down Expand Up @@ -6850,6 +6884,10 @@ spec:
path: migrators/000004_0-4-0.down.sql
- key: 000004_0-4-0.up.sql
path: migrators/000004_0-4-0.up.sql
- key: 000005_0-5-0.down.sql
path: migrators/000005_0-5-0.down.sql
- key: 000005_0-5-0.up.sql
path: migrators/000005_0-5-0.up.sql
name: clickhouse-mounted-configmap
name: clickhouse-configmap-volume
- emptyDir:
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/crd/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ type ThroughputAnomalyDetectorSpec struct {
StartInterval metav1.Time `json:"startInterval,omitempty"`
EndInterval metav1.Time `json:"endInterval,omitempty"`
NSIgnoreList []string `json:"nsIgnoreList,omitempty"`
AggregatedFlow string `json:"aggflow,omitempty"`
Pod2PodLabel string `json:"pod2podlabel,omitempty"`
ExecutorInstances int `json:"executorInstances,omitempty"`
DriverCoreRequest string `json:"driverCoreRequest,omitempty"`
DriverMemory string `json:"driverMemory,omitempty"`
Expand Down
29 changes: 19 additions & 10 deletions pkg/apis/intelligence/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ type ThroughputAnomalyDetector struct {
EndInterval metav1.Time `json:"endInterval,omitempty"`
ExecutorInstances int `json:"executorInstances,omitempty"`
NSIgnoreList []string `json:"nsIgnoreList,omitempty"`
AggregatedFlow string `json:"aggflow,omitempty"`
Pod2PodLabel string `json:"pod2podlabel,omitempty"`
DriverCoreRequest string `json:"driverCoreRequest,omitempty"`
DriverMemory string `json:"driverMemory,omitempty"`
ExecutorCoreRequest string `json:"executorCoreRequest,omitempty"`
Expand Down Expand Up @@ -100,14 +102,21 @@ type ThroughputAnomalyDetectorList struct {
}

type ThroughputAnomalyDetectorStats struct {
Id string `json:"id,omitempty"`
SourceIP string `json:"sourceIP,omitempty"`
SourceTransportPort string `json:"sourceTransportPort,omitempty"`
DestinationIP string `json:"destinationIP,omitempty"`
DestinationTransportPort string `json:"destinationTransportPort,omitempty"`
FlowStartSeconds string `json:"FlowStartSeconds,omitempty"`
FlowEndSeconds string `json:"FlowEndSeconds,omitempty"`
Throughput string `json:"Throughput,omitempty"`
AlgoCalc string `json:"AlgoCalc,omitempty"`
Anomaly string `json:"anomaly,omitempty"`
Id string `json:"id,omitempty"`
SourceIP string `json:"sourceIP,omitempty"`
SourceTransportPort string `json:"sourceTransportPort,omitempty"`
DestinationIP string `json:"destinationIP,omitempty"`
DestinationTransportPort string `json:"destinationTransportPort,omitempty"`
FlowStartSeconds string `json:"FlowStartSeconds,omitempty"`
SourcePodNamespace string `json:"sourcePodNamespace,omitempty"`
SourcePodLabels string `json:"sourcePodLabels,omitempty"`
DestinationPodNamespace string `json:"destinationPodNamespace,omitempty"`
DestinationPodLabels string `json:"destinationPodLabels,omitempty"`
DestinationServicePortName string `json:"destinationServicePortName,omitempty"`
FlowEndSeconds string `json:"FlowEndSeconds,omitempty"`
Throughput string `json:"throughput,omitempty"`
AggType string `json:"aggType,omitempty"`
AlgoType string `json:"algoType,omitempty"`
AlgoCalc string `json:"AlgoCalc,omitempty"`
Anomaly string `json:"anomaly,omitempty"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ import (
const (
defaultNameSpace = "flow-visibility"
tadQuery int = iota
aggtadpodQuery
aggtadpod2podQuery
aggtadpod2svcQuery
)

// REST implements rest.Storage for anomalydetector.
Expand Down Expand Up @@ -63,9 +66,50 @@ var queryMap = map[int]string{
flowStartSeconds,
flowEndSeconds,
throughput,
aggType,
algoType,
algoCalc,
anomaly
FROM tadetector WHERE id = (?);`,
aggtadpodQuery: `
SELECT
id,
sourcePodNamespace,
sourcePodLabels,
flowEndSeconds,
throughput,
aggType,
algoType,
algoCalc,
anomaly
FROM tadetector WHERE id = (?);`,
aggtadpod2podQuery: `
SELECT
id,
sourcePodNamespace,
sourcePodLabels,
destinationPodNamespace,
destinationPodLabels,
flowEndSeconds,
throughput,
aggType,
algoType,
algoCalc,
anomaly
FROM tadetector WHERE id = (?);`,
aggtadpod2svcQuery: `
SELECT
id,
sourcePodNamespace,
sourcePodLabels,
destinationServicePortName,
flowEndSeconds,
throughput,
aggType,
algoType,
algoCalc,
anomaly
FROM tadetector WHERE id = (?);`,
}

// NewREST returns a REST object that will work against API services.
Expand Down Expand Up @@ -102,6 +146,8 @@ func (r *REST) copyThroughputAnomalyDetector(tad *v1alpha1.ThroughputAnomalyDete
tad.EndInterval = crd.Spec.EndInterval
tad.ExecutorInstances = crd.Spec.ExecutorInstances
tad.NSIgnoreList = crd.Spec.NSIgnoreList
tad.AggregatedFlow = crd.Spec.AggregatedFlow
tad.Pod2PodLabel = crd.Spec.Pod2PodLabel
tad.DriverCoreRequest = crd.Spec.DriverCoreRequest
tad.DriverMemory = crd.Spec.DriverMemory
tad.ExecutorCoreRequest = crd.Spec.ExecutorCoreRequest
Expand Down Expand Up @@ -170,6 +216,8 @@ func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation
job.Spec.DriverMemory = newTAD.DriverMemory
job.Spec.ExecutorCoreRequest = newTAD.ExecutorCoreRequest
job.Spec.ExecutorMemory = newTAD.ExecutorMemory
job.Spec.AggregatedFlow = newTAD.AggregatedFlow
job.Spec.Pod2PodLabel = newTAD.Pod2PodLabel
_, err := r.ThroughputAnomalyDetectorQuerier.CreateThroughputAnomalyDetector(defaultNameSpace, job)
if err != nil {
return nil, errors.NewBadRequest(fmt.Sprintf("error when creating ThroughputAnomalyDetection job: %+v, err: %v", job, err))
Expand All @@ -179,24 +227,57 @@ func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation

func (r *REST) getTADetectorResult(id string, tad *v1alpha1.ThroughputAnomalyDetector) error {
var err error
query := tadQuery
switch tad.AggregatedFlow {
case "pod":
query = aggtadpodQuery
case "pod2pod":
query = aggtadpod2podQuery
case "pod2svc":
query = aggtadpod2svcQuery
}
if r.clickhouseConnect == nil {
r.clickhouseConnect, err = setupClickHouseConnection(nil)
if err != nil {
return err
}
}
rows, err := r.clickhouseConnect.Query(queryMap[tadQuery], id)
rows, err := r.clickhouseConnect.Query(queryMap[query], id)
if err != nil {
return fmt.Errorf("failed to get Throughput Anomaly Detector results with id %s: %v", id, err)
}
defer rows.Close()
for rows.Next() {
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.SourceIP, &res.SourceTransportPort, &res.DestinationIP, &res.DestinationTransportPort, &res.FlowStartSeconds, &res.FlowEndSeconds, &res.Throughput, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector results: %v", err)
switch query {
case tadQuery:
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.SourceIP, &res.SourceTransportPort, &res.DestinationIP, &res.DestinationTransportPort, &res.FlowStartSeconds, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector results: %v", err)
}
tad.Stats = append(tad.Stats, res)
case aggtadpodQuery:
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.SourcePodNamespace, &res.SourcePodLabels, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector pod Aggregate results: %v", err)
}
tad.Stats = append(tad.Stats, res)
case aggtadpod2podQuery:
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.SourcePodNamespace, &res.SourcePodLabels, &res.DestinationPodNamespace, &res.DestinationPodLabels, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector pod to pod Aggregate results: %v", err)
}
tad.Stats = append(tad.Stats, res)
case aggtadpod2svcQuery:
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.SourcePodNamespace, &res.SourcePodLabels, &res.DestinationServicePortName, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector pod to svc Aggregate results: %v", err)
}
tad.Stats = append(tad.Stats, res)
}
tad.Stats = append(tad.Stats, res)
}
return nil
}
Expand Down
Loading

0 comments on commit 842d8e8

Please sign in to comment.