Skip to content

Commit

Permalink
Addressed comments in Aggregated Throughput Anomaly Detection
Browse files Browse the repository at this point in the history
partially solves: #168

Signed-off-by: Tushar Tathgur <[email protected]>
  • Loading branch information
Tushar Tathgur authored and Tushar Tathgur committed Mar 30, 2023
1 parent cda0b2b commit 4c2299f
Show file tree
Hide file tree
Showing 18 changed files with 783 additions and 545 deletions.
12 changes: 9 additions & 3 deletions build/charts/theia/crds/anomaly-detector-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,16 @@ spec:
type: array
items:
type: string
aggflow:
aggFlow:
type: string
podLabel:
type: string
externalIp:
type: string
podName:
type: string
servicePortName:
type: string
pod2podlabel:
type: string
executorInstances:
type: integer
driverCoreRequest:
Expand Down
7 changes: 3 additions & 4 deletions build/charts/theia/provisioning/datasources/create_table.sh
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,10 @@ clickhouse client -n -h 127.0.0.1 <<-EOSQL
destinationTransportPort UInt16,
protocolIdentifier UInt16,
flowStartSeconds DateTime,
sourcePodNamespace String,
sourcePodLabels String,
destinationPodNamespace String,
destinationPodLabels String,
podNamespace String,
podLabels String,
destinationServicePortName String,
direction String,
flowEndSeconds DateTime,
throughputStandardDeviation Float64,
aggType String,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
--Drop aggregated TAD columns
ALTER TABLE tadetector DROP COLUMN sourcePodNamespace String;
ALTER TABLE tadetector_local DROP COLUMN sourcePodNamespace String;
ALTER TABLE tadetector DROP COLUMN sourcePodLabels String;
ALTER TABLE tadetector_local DROP COLUMN sourcePodLabels String;
ALTER TABLE tadetector DROP COLUMN destinationPodNamespace String;
ALTER TABLE tadetector_local DROP COLUMN destinationPodNamespace String;
ALTER TABLE tadetector DROP COLUMN destinationPodLabels String;
ALTER TABLE tadetector_local DROP COLUMN destinationPodLabels String;
ALTER TABLE tadetector DROP COLUMN podNamespace String;
ALTER TABLE tadetector_local DROP COLUMN podNamespace String;
ALTER TABLE tadetector DROP COLUMN podLabels String;
ALTER TABLE tadetector_local DROP COLUMN podLabels String;
ALTER TABLE tadetector DROP COLUMN destinationServicePortName String;
ALTER TABLE tadetector_local DROP COLUMN destinationServicePortName String;
ALTER TABLE tadetector DROP COLUMN aggType String;
ALTER TABLE tadetector_local DROP COLUMN aggType String;
ALTER TABLE tadetector DROP COLUMN direction String;
ALTER TABLE tadetector_local DROP COLUMN direction String;
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
--Add aggregated TAD columns
ALTER TABLE tadetector ADD COLUMN sourcePodNamespace String;
ALTER TABLE tadetector_local ADD COLUMN sourcePodNamespace String;
ALTER TABLE tadetector ADD COLUMN sourcePodLabels String;
ALTER TABLE tadetector_local ADD COLUMN sourcePodLabels String;
ALTER TABLE tadetector ADD COLUMN destinationPodNamespace String;
ALTER TABLE tadetector_local ADD COLUMN destinationPodNamespace String;
ALTER TABLE tadetector ADD COLUMN destinationPodLabels String;
ALTER TABLE tadetector_local ADD COLUMN destinationPodLabels String;
ALTER TABLE tadetector ADD COLUMN podNamespace String;
ALTER TABLE tadetector_local ADD COLUMN podNamespace String;
ALTER TABLE tadetector ADD COLUMN podLabels String;
ALTER TABLE tadetector_local ADD COLUMN podLabels String;
ALTER TABLE tadetector ADD COLUMN destinationServicePortName String;
ALTER TABLE tadetector_local ADD COLUMN destinationServicePortName String;
ALTER TABLE tadetector ADD COLUMN aggType String;
ALTER TABLE tadetector_local ADD COLUMN aggType String;
ALTER TABLE tadetector ADD COLUMN direction String;
ALTER TABLE tadetector_local ADD COLUMN direction String;
35 changes: 15 additions & 20 deletions build/yamls/flow-visibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -390,32 +390,28 @@ data:
engine=Distributed('{cluster}', default, tadetector_local, rand());
000005_0-5-0.down.sql: |
--Drop aggregated TAD columns
ALTER TABLE tadetector DROP COLUMN sourcePodNamespace String;
ALTER TABLE tadetector_local DROP COLUMN sourcePodNamespace String;
ALTER TABLE tadetector DROP COLUMN sourcePodLabels String;
ALTER TABLE tadetector_local DROP COLUMN sourcePodLabels String;
ALTER TABLE tadetector DROP COLUMN destinationPodNamespace String;
ALTER TABLE tadetector_local DROP COLUMN destinationPodNamespace String;
ALTER TABLE tadetector DROP COLUMN destinationPodLabels String;
ALTER TABLE tadetector_local DROP COLUMN destinationPodLabels String;
ALTER TABLE tadetector DROP COLUMN podNamespace String;
ALTER TABLE tadetector_local DROP COLUMN podNamespace String;
ALTER TABLE tadetector DROP COLUMN podLabels String;
ALTER TABLE tadetector_local DROP COLUMN podLabels String;
ALTER TABLE tadetector DROP COLUMN destinationServicePortName String;
ALTER TABLE tadetector_local DROP COLUMN destinationServicePortName String;
ALTER TABLE tadetector DROP COLUMN aggType String;
ALTER TABLE tadetector_local DROP COLUMN aggType String;
ALTER TABLE tadetector DROP COLUMN direction String;
ALTER TABLE tadetector_local DROP COLUMN direction String;
000005_0-5-0.up.sql: |
--Add aggregated TAD columns
ALTER TABLE tadetector ADD COLUMN sourcePodNamespace String;
ALTER TABLE tadetector_local ADD COLUMN sourcePodNamespace String;
ALTER TABLE tadetector ADD COLUMN sourcePodLabels String;
ALTER TABLE tadetector_local ADD COLUMN sourcePodLabels String;
ALTER TABLE tadetector ADD COLUMN destinationPodNamespace String;
ALTER TABLE tadetector_local ADD COLUMN destinationPodNamespace String;
ALTER TABLE tadetector ADD COLUMN destinationPodLabels String;
ALTER TABLE tadetector_local ADD COLUMN destinationPodLabels String;
ALTER TABLE tadetector ADD COLUMN podNamespace String;
ALTER TABLE tadetector_local ADD COLUMN podNamespace String;
ALTER TABLE tadetector ADD COLUMN podLabels String;
ALTER TABLE tadetector_local ADD COLUMN podLabels String;
ALTER TABLE tadetector ADD COLUMN destinationServicePortName String;
ALTER TABLE tadetector_local ADD COLUMN destinationServicePortName String;
ALTER TABLE tadetector ADD COLUMN aggType String;
ALTER TABLE tadetector_local ADD COLUMN aggType String;
ALTER TABLE tadetector ADD COLUMN direction String;
ALTER TABLE tadetector_local ADD COLUMN direction String;
create_table.sh: |
#!/usr/bin/env bash
Expand Down Expand Up @@ -703,11 +699,10 @@ data:
destinationTransportPort UInt16,
protocolIdentifier UInt16,
flowStartSeconds DateTime,
sourcePodNamespace String,
sourcePodLabels String,
destinationPodNamespace String,
destinationPodLabels String,
podNamespace String,
podLabels String,
destinationServicePortName String,
direction String,
flowEndSeconds DateTime,
throughputStandardDeviation Float64,
aggType String,
Expand Down
7 changes: 5 additions & 2 deletions pkg/apis/crd/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,11 @@ type ThroughputAnomalyDetectorSpec struct {
StartInterval metav1.Time `json:"startInterval,omitempty"`
EndInterval metav1.Time `json:"endInterval,omitempty"`
NSIgnoreList []string `json:"nsIgnoreList,omitempty"`
AggregatedFlow string `json:"aggflow,omitempty"`
Pod2PodLabel string `json:"pod2podlabel,omitempty"`
AggregatedFlow string `json:"aggFlow,omitempty"`
PodLabel string `json:"podLabel,omitempty"`
PodName string `json:"podName,omitempty"`
ExternalIP string `json:"externalIp,omitempty"`
ServicePortName string `json:"servicePortName,omitempty"`
ExecutorInstances int `json:"executorInstances,omitempty"`
DriverCoreRequest string `json:"driverCoreRequest,omitempty"`
DriverMemory string `json:"driverMemory,omitempty"`
Expand Down
14 changes: 8 additions & 6 deletions pkg/apis/intelligence/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,11 @@ type ThroughputAnomalyDetector struct {
EndInterval metav1.Time `json:"endInterval,omitempty"`
ExecutorInstances int `json:"executorInstances,omitempty"`
NSIgnoreList []string `json:"nsIgnoreList,omitempty"`
AggregatedFlow string `json:"aggflow,omitempty"`
Pod2PodLabel string `json:"pod2podlabel,omitempty"`
AggregatedFlow string `json:"aggFlow,omitempty"`
PodLabel string `json:"podLabel,omitempty"`
PodName string `json:"podName,omitempty"`
ExternalIP string `json:"externalIp,omitempty"`
ServicePortName string `json:"servicePortName,omitempty"`
DriverCoreRequest string `json:"driverCoreRequest,omitempty"`
DriverMemory string `json:"driverMemory,omitempty"`
ExecutorCoreRequest string `json:"executorCoreRequest,omitempty"`
Expand Down Expand Up @@ -108,10 +111,9 @@ type ThroughputAnomalyDetectorStats struct {
DestinationIP string `json:"destinationIP,omitempty"`
DestinationTransportPort string `json:"destinationTransportPort,omitempty"`
FlowStartSeconds string `json:"FlowStartSeconds,omitempty"`
SourcePodNamespace string `json:"sourcePodNamespace,omitempty"`
SourcePodLabels string `json:"sourcePodLabels,omitempty"`
DestinationPodNamespace string `json:"destinationPodNamespace,omitempty"`
DestinationPodLabels string `json:"destinationPodLabels,omitempty"`
PodNamespace string `json:"podNamespace,omitempty"`
PodLabels string `json:"podLabels,omitempty"`
Direction string `json:"direction,omitempty"`
DestinationServicePortName string `json:"destinationServicePortName,omitempty"`
FlowEndSeconds string `json:"FlowEndSeconds,omitempty"`
Throughput string `json:"throughput,omitempty"`
Expand Down
102 changes: 52 additions & 50 deletions pkg/apiserver/registry/intelligence/throughputanomalydetector/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import (
const (
defaultNameSpace = "flow-visibility"
tadQuery int = iota
aggtadpodQuery
aggtadpod2podQuery
aggtadpod2svcQuery
aggTadExternalQuery
aggTadPodQuery
aggTadSvcQuery
)

// REST implements rest.Storage for anomalydetector.
Expand All @@ -57,7 +57,7 @@ var (

var queryMap = map[int]string{
tadQuery: `
SELECT
SELECT
id,
sourceIP,
sourceTransportPort,
Expand All @@ -71,44 +71,40 @@ var queryMap = map[int]string{
algoCalc,
anomaly
FROM tadetector WHERE id = (?);`,
aggtadpodQuery: `
aggTadExternalQuery: `
SELECT
id,
sourcePodNamespace,
sourcePodLabels,
flowEndSeconds,
throughput,
id,
destinationIP,
flowEndSeconds,
throughput,
aggType,
algoType,
algoCalc,
anomaly
algoType,
algoCalc,
anomaly
FROM tadetector WHERE id = (?);`,
aggtadpod2podQuery: `
aggTadPodQuery: `
SELECT
id,
sourcePodNamespace,
sourcePodLabels,
destinationPodNamespace,
destinationPodLabels,
flowEndSeconds,
throughput,
id,
podNamespace,
podLabels,
direction,
flowEndSeconds,
throughput,
aggType,
algoType,
algoCalc,
anomaly
algoType,
algoCalc,
anomaly
FROM tadetector WHERE id = (?);`,
aggtadpod2svcQuery: `
aggTadSvcQuery: `
SELECT
id,
sourcePodNamespace,
sourcePodLabels,
destinationServicePortName,
flowEndSeconds,
throughput,
id,
destinationServicePortName,
flowEndSeconds,
throughput,
aggType,
algoType,
algoCalc,
anomaly
algoType,
algoCalc,
anomaly
FROM tadetector WHERE id = (?);`,
}

Expand Down Expand Up @@ -147,7 +143,10 @@ func (r *REST) copyThroughputAnomalyDetector(tad *v1alpha1.ThroughputAnomalyDete
tad.ExecutorInstances = crd.Spec.ExecutorInstances
tad.NSIgnoreList = crd.Spec.NSIgnoreList
tad.AggregatedFlow = crd.Spec.AggregatedFlow
tad.Pod2PodLabel = crd.Spec.Pod2PodLabel
tad.PodLabel = crd.Spec.PodLabel
tad.PodName = crd.Spec.PodName
tad.ExternalIP = crd.Spec.ExternalIP
tad.ServicePortName = crd.Spec.ServicePortName
tad.DriverCoreRequest = crd.Spec.DriverCoreRequest
tad.DriverMemory = crd.Spec.DriverMemory
tad.ExecutorCoreRequest = crd.Spec.ExecutorCoreRequest
Expand Down Expand Up @@ -217,7 +216,10 @@ func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation
job.Spec.ExecutorCoreRequest = newTAD.ExecutorCoreRequest
job.Spec.ExecutorMemory = newTAD.ExecutorMemory
job.Spec.AggregatedFlow = newTAD.AggregatedFlow
job.Spec.Pod2PodLabel = newTAD.Pod2PodLabel
job.Spec.PodLabel = newTAD.PodLabel
job.Spec.PodName = newTAD.PodName
job.Spec.ExternalIP = newTAD.ExternalIP
job.Spec.ServicePortName = newTAD.ServicePortName
_, err := r.ThroughputAnomalyDetectorQuerier.CreateThroughputAnomalyDetector(defaultNameSpace, job)
if err != nil {
return nil, errors.NewBadRequest(fmt.Sprintf("error when creating ThroughputAnomalyDetection job: %+v, err: %v", job, err))
Expand All @@ -229,12 +231,12 @@ func (r *REST) getTADetectorResult(id string, tad *v1alpha1.ThroughputAnomalyDet
var err error
query := tadQuery
switch tad.AggregatedFlow {
case "external":
query = aggTadExternalQuery
case "pod":
query = aggtadpodQuery
case "pod2pod":
query = aggtadpod2podQuery
case "pod2svc":
query = aggtadpod2svcQuery
query = aggTadPodQuery
case "svc":
query = aggTadSvcQuery
}
if r.clickhouseConnect == nil {
r.clickhouseConnect, err = setupClickHouseConnection(nil)
Expand All @@ -256,25 +258,25 @@ func (r *REST) getTADetectorResult(id string, tad *v1alpha1.ThroughputAnomalyDet
return fmt.Errorf("failed to scan Throughput Anomaly Detector results: %v", err)
}
tad.Stats = append(tad.Stats, res)
case aggtadpodQuery:
case aggTadExternalQuery:
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.SourcePodNamespace, &res.SourcePodLabels, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
err := rows.Scan(&res.Id, &res.DestinationIP, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector pod Aggregate results: %v", err)
return fmt.Errorf("failed to scan Throughput Anomaly Detector External IP Aggregate results: %v", err)
}
tad.Stats = append(tad.Stats, res)
case aggtadpod2podQuery:
case aggTadPodQuery:
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.SourcePodNamespace, &res.SourcePodLabels, &res.DestinationPodNamespace, &res.DestinationPodLabels, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
err := rows.Scan(&res.Id, &res.PodNamespace, &res.PodLabels, &res.Direction, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector pod to pod Aggregate results: %v", err)
return fmt.Errorf("failed to scan Throughput Anomaly Detector pod Aggregate results: %v", err)
}
tad.Stats = append(tad.Stats, res)
case aggtadpod2svcQuery:
case aggTadSvcQuery:
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.SourcePodNamespace, &res.SourcePodLabels, &res.DestinationServicePortName, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
err := rows.Scan(&res.Id, &res.DestinationServicePortName, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector pod to svc Aggregate results: %v", err)
return fmt.Errorf("failed to scan Throughput Anomaly Detector service Aggregate results: %v", err)
}
tad.Stats = append(tad.Stats, res)
}
Expand Down
Loading

0 comments on commit 4c2299f

Please sign in to comment.