Skip to content

Commit

Permalink
Addressed comments in Aggregated Throughput Anomaly Detection
Browse files Browse the repository at this point in the history
partially solves: #168

Signed-off-by: Tushar Tathgur <[email protected]>
  • Loading branch information
Tushar Tathgur authored and Tushar Tathgur committed Jul 6, 2023
1 parent 5696010 commit d9a928c
Show file tree
Hide file tree
Showing 18 changed files with 1,096 additions and 595 deletions.
14 changes: 11 additions & 3 deletions build/charts/theia/crds/anomaly-detector-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,18 @@ spec:
type: array
items:
type: string
aggflow:
aggFlow:
type: string
podLabel:
type: string
externalIp:
type: string
podName:
type: string
podNameSpace:
type: string
servicePortName:
type: string
pod2podlabel:
type: string
executorInstances:
type: integer
driverCoreRequest:
Expand Down
10 changes: 5 additions & 5 deletions build/charts/theia/provisioning/datasources/create_table.sh
Original file line number Diff line number Diff line change
Expand Up @@ -367,11 +367,11 @@ clickhouse client -n -h 127.0.0.1 <<-EOSQL
destinationTransportPort UInt16,
protocolIdentifier UInt16,
flowStartSeconds DateTime,
sourcePodNamespace String,
sourcePodLabels String,
destinationPodNamespace String,
destinationPodLabels String,
podNamespace String,
podLabels String,
podName String,
destinationServicePortName String,
direction String,
flowEndSeconds DateTime,
throughputStandardDeviation Float64,
aggType String,
Expand All @@ -381,7 +381,7 @@ clickhouse client -n -h 127.0.0.1 <<-EOSQL
anomaly String,
id String
) engine=ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/{table}', '{replica}')
ORDER BY (flowEndSeconds);
ORDER BY (flowStartSeconds);
--Create distributed tables for cluster
CREATE TABLE IF NOT EXISTS flows AS flows_local
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
--Drop aggregated TAD columns
ALTER TABLE tadetector DROP COLUMN sourcePodNamespace String;
ALTER TABLE tadetector_local DROP COLUMN sourcePodNamespace String;
ALTER TABLE tadetector DROP COLUMN sourcePodLabels String;
ALTER TABLE tadetector_local DROP COLUMN sourcePodLabels String;
ALTER TABLE tadetector DROP COLUMN destinationPodNamespace String;
ALTER TABLE tadetector_local DROP COLUMN destinationPodNamespace String;
ALTER TABLE tadetector DROP COLUMN destinationPodLabels String;
ALTER TABLE tadetector_local DROP COLUMN destinationPodLabels String;
ALTER TABLE tadetector DROP COLUMN podNamespace String;
ALTER TABLE tadetector_local DROP COLUMN podNamespace String;
ALTER TABLE tadetector DROP COLUMN podLabels String;
ALTER TABLE tadetector_local DROP COLUMN podLabels String;
ALTER TABLE tadetector DROP COLUMN destinationServicePortName String;
ALTER TABLE tadetector_local DROP COLUMN destinationServicePortName String;
ALTER TABLE tadetector DROP COLUMN aggType String;
ALTER TABLE tadetector_local DROP COLUMN aggType String;
ALTER TABLE tadetector DROP COLUMN direction String;
ALTER TABLE tadetector_local DROP COLUMN direction String;
ALTER TABLE tadetector DROP COLUMN podName String;
ALTER TABLE tadetector_local DROP COLUMN podName String;
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
--Add aggregated TAD columns
ALTER TABLE tadetector ADD COLUMN sourcePodNamespace String;
ALTER TABLE tadetector_local ADD COLUMN sourcePodNamespace String;
ALTER TABLE tadetector ADD COLUMN sourcePodLabels String;
ALTER TABLE tadetector_local ADD COLUMN sourcePodLabels String;
ALTER TABLE tadetector ADD COLUMN destinationPodNamespace String;
ALTER TABLE tadetector_local ADD COLUMN destinationPodNamespace String;
ALTER TABLE tadetector ADD COLUMN destinationPodLabels String;
ALTER TABLE tadetector_local ADD COLUMN destinationPodLabels String;
ALTER TABLE tadetector ADD COLUMN podNamespace String;
ALTER TABLE tadetector_local ADD COLUMN podNamespace String;
ALTER TABLE tadetector ADD COLUMN podLabels String;
ALTER TABLE tadetector_local ADD COLUMN podLabels String;
ALTER TABLE tadetector ADD COLUMN destinationServicePortName String;
ALTER TABLE tadetector_local ADD COLUMN destinationServicePortName String;
ALTER TABLE tadetector ADD COLUMN aggType String;
ALTER TABLE tadetector_local ADD COLUMN aggType String;
ALTER TABLE tadetector ADD COLUMN direction String;
ALTER TABLE tadetector_local ADD COLUMN direction String;
ALTER TABLE tadetector ADD COLUMN podName String;
ALTER TABLE tadetector_local ADD COLUMN podName String;
10 changes: 5 additions & 5 deletions build/yamls/flow-visibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1090,11 +1090,11 @@ data:
destinationTransportPort UInt16,
protocolIdentifier UInt16,
flowStartSeconds DateTime,
sourcePodNamespace String,
sourcePodLabels String,
destinationPodNamespace String,
destinationPodLabels String,
podNamespace String,
podLabels String,
podName String,
destinationServicePortName String,
direction String,
flowEndSeconds DateTime,
throughputStandardDeviation Float64,
aggType String,
Expand All @@ -1104,7 +1104,7 @@ data:
anomaly String,
id String
) engine=ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/{table}', '{replica}')
ORDER BY (flowEndSeconds);
ORDER BY (flowStartSeconds);
--Create distributed tables for cluster
CREATE TABLE IF NOT EXISTS flows AS flows_local
Expand Down
8 changes: 6 additions & 2 deletions pkg/apis/crd/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,12 @@ type ThroughputAnomalyDetectorSpec struct {
StartInterval metav1.Time `json:"startInterval,omitempty"`
EndInterval metav1.Time `json:"endInterval,omitempty"`
NSIgnoreList []string `json:"nsIgnoreList,omitempty"`
AggregatedFlow string `json:"aggflow,omitempty"`
Pod2PodLabel string `json:"pod2podlabel,omitempty"`
AggregatedFlow string `json:"aggFlow,omitempty"`
PodLabel string `json:"podLabel,omitempty"`
PodName string `json:"podName,omitempty"`
PodNameSpace string `json:"podNameSpace,omitempty"`
ExternalIP string `json:"externalIp,omitempty"`
ServicePortName string `json:"servicePortName,omitempty"`
ExecutorInstances int `json:"executorInstances,omitempty"`
DriverCoreRequest string `json:"driverCoreRequest,omitempty"`
DriverMemory string `json:"driverMemory,omitempty"`
Expand Down
16 changes: 10 additions & 6 deletions pkg/apis/intelligence/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,12 @@ type ThroughputAnomalyDetector struct {
EndInterval metav1.Time `json:"endInterval,omitempty"`
ExecutorInstances int `json:"executorInstances,omitempty"`
NSIgnoreList []string `json:"nsIgnoreList,omitempty"`
AggregatedFlow string `json:"aggflow,omitempty"`
Pod2PodLabel string `json:"pod2podlabel,omitempty"`
AggregatedFlow string `json:"aggFlow,omitempty"`
PodLabel string `json:"podLabel,omitempty"`
PodName string `json:"podName,omitempty"`
PodNameSpace string `json:"podNameSpace,omitempty"`
ExternalIP string `json:"externalIp,omitempty"`
ServicePortName string `json:"servicePortName,omitempty"`
DriverCoreRequest string `json:"driverCoreRequest,omitempty"`
DriverMemory string `json:"driverMemory,omitempty"`
ExecutorCoreRequest string `json:"executorCoreRequest,omitempty"`
Expand Down Expand Up @@ -108,10 +112,10 @@ type ThroughputAnomalyDetectorStats struct {
DestinationIP string `json:"destinationIP,omitempty"`
DestinationTransportPort string `json:"destinationTransportPort,omitempty"`
FlowStartSeconds string `json:"FlowStartSeconds,omitempty"`
SourcePodNamespace string `json:"sourcePodNamespace,omitempty"`
SourcePodLabels string `json:"sourcePodLabels,omitempty"`
DestinationPodNamespace string `json:"destinationPodNamespace,omitempty"`
DestinationPodLabels string `json:"destinationPodLabels,omitempty"`
PodNamespace string `json:"podNamespace,omitempty"`
PodLabels string `json:"podLabels,omitempty"`
PodName string `json:"podName,omitempty"`
Direction string `json:"direction,omitempty"`
DestinationServicePortName string `json:"destinationServicePortName,omitempty"`
FlowEndSeconds string `json:"FlowEndSeconds,omitempty"`
Throughput string `json:"throughput,omitempty"`
Expand Down
129 changes: 79 additions & 50 deletions pkg/apiserver/registry/intelligence/throughputanomalydetector/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ import (
const (
defaultNameSpace = "flow-visibility"
tadQuery int = iota
aggtadpodQuery
aggtadpod2podQuery
aggtadpod2svcQuery
aggTadExternalQuery
aggTadPodLabelQuery
aggTadPodNameQuery
aggTadSvcQuery
)

// REST implements rest.Storage for anomalydetector.
Expand All @@ -57,7 +58,7 @@ var (

var queryMap = map[int]string{
tadQuery: `
SELECT
SELECT
id,
sourceIP,
sourceTransportPort,
Expand All @@ -71,44 +72,53 @@ var queryMap = map[int]string{
algoCalc,
anomaly
FROM tadetector WHERE id = (?);`,
aggtadpodQuery: `
aggTadExternalQuery: `
SELECT
id,
sourcePodNamespace,
sourcePodLabels,
flowEndSeconds,
throughput,
id,
destinationIP,
flowEndSeconds,
throughput,
aggType,
algoType,
algoCalc,
anomaly
algoType,
algoCalc,
anomaly
FROM tadetector WHERE id = (?);`,
aggTadPodLabelQuery: `
SELECT
id,
podNamespace,
podLabels,
direction,
flowEndSeconds,
throughput,
aggType,
algoType,
algoCalc,
anomaly
FROM tadetector WHERE id = (?);`,
aggtadpod2podQuery: `
aggTadPodNameQuery: `
SELECT
id,
sourcePodNamespace,
sourcePodLabels,
destinationPodNamespace,
destinationPodLabels,
flowEndSeconds,
throughput,
id,
podNamespace,
podName,
direction,
flowEndSeconds,
throughput,
aggType,
algoType,
algoCalc,
anomaly
algoType,
algoCalc,
anomaly
FROM tadetector WHERE id = (?);`,
aggtadpod2svcQuery: `
aggTadSvcQuery: `
SELECT
id,
sourcePodNamespace,
sourcePodLabels,
destinationServicePortName,
flowEndSeconds,
throughput,
id,
destinationServicePortName,
flowEndSeconds,
throughput,
aggType,
algoType,
algoCalc,
anomaly
algoType,
algoCalc,
anomaly
FROM tadetector WHERE id = (?);`,
}

Expand Down Expand Up @@ -147,7 +157,11 @@ func (r *REST) copyThroughputAnomalyDetector(tad *v1alpha1.ThroughputAnomalyDete
tad.ExecutorInstances = crd.Spec.ExecutorInstances
tad.NSIgnoreList = crd.Spec.NSIgnoreList
tad.AggregatedFlow = crd.Spec.AggregatedFlow
tad.Pod2PodLabel = crd.Spec.Pod2PodLabel
tad.PodLabel = crd.Spec.PodLabel
tad.PodName = crd.Spec.PodName
tad.PodNameSpace = crd.Spec.PodNameSpace
tad.ExternalIP = crd.Spec.ExternalIP
tad.ServicePortName = crd.Spec.ServicePortName
tad.DriverCoreRequest = crd.Spec.DriverCoreRequest
tad.DriverMemory = crd.Spec.DriverMemory
tad.ExecutorCoreRequest = crd.Spec.ExecutorCoreRequest
Expand Down Expand Up @@ -217,7 +231,11 @@ func (r *REST) Create(ctx context.Context, obj runtime.Object, createValidation
job.Spec.ExecutorCoreRequest = newTAD.ExecutorCoreRequest
job.Spec.ExecutorMemory = newTAD.ExecutorMemory
job.Spec.AggregatedFlow = newTAD.AggregatedFlow
job.Spec.Pod2PodLabel = newTAD.Pod2PodLabel
job.Spec.PodLabel = newTAD.PodLabel
job.Spec.PodName = newTAD.PodName
job.Spec.PodNameSpace = newTAD.PodNameSpace
job.Spec.ExternalIP = newTAD.ExternalIP
job.Spec.ServicePortName = newTAD.ServicePortName
_, err := r.ThroughputAnomalyDetectorQuerier.CreateThroughputAnomalyDetector(defaultNameSpace, job)
if err != nil {
return nil, errors.NewBadRequest(fmt.Sprintf("error when creating ThroughputAnomalyDetection job: %+v, err: %v", job, err))
Expand All @@ -229,12 +247,16 @@ func (r *REST) getTADetectorResult(id string, tad *v1alpha1.ThroughputAnomalyDet
var err error
query := tadQuery
switch tad.AggregatedFlow {
case "external":
query = aggTadExternalQuery
case "pod":
query = aggtadpodQuery
case "pod2pod":
query = aggtadpod2podQuery
case "pod2svc":
query = aggtadpod2svcQuery
if tad.PodName != "" {
query = aggTadPodNameQuery
} else {
query = aggTadPodLabelQuery
}
case "svc":
query = aggTadSvcQuery
}
if r.clickhouseConnect == nil {
r.clickhouseConnect, err = setupClickHouseConnection(nil)
Expand All @@ -256,25 +278,32 @@ func (r *REST) getTADetectorResult(id string, tad *v1alpha1.ThroughputAnomalyDet
return fmt.Errorf("failed to scan Throughput Anomaly Detector results: %v", err)
}
tad.Stats = append(tad.Stats, res)
case aggtadpodQuery:
case aggTadExternalQuery:
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.DestinationIP, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector External IP Aggregate results: %v", err)
}
tad.Stats = append(tad.Stats, res)
case aggTadPodLabelQuery:
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.SourcePodNamespace, &res.SourcePodLabels, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
err := rows.Scan(&res.Id, &res.PodNamespace, &res.PodLabels, &res.Direction, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector pod Aggregate results: %v", err)
return fmt.Errorf("failed to scan Throughput Anomaly Detector Pod Aggregate results: %v", err)
}
tad.Stats = append(tad.Stats, res)
case aggtadpod2podQuery:
case aggTadPodNameQuery:
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.SourcePodNamespace, &res.SourcePodLabels, &res.DestinationPodNamespace, &res.DestinationPodLabels, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
err := rows.Scan(&res.Id, &res.PodNamespace, &res.PodName, &res.Direction, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector pod to pod Aggregate results: %v", err)
return fmt.Errorf("failed to scan Throughput Anomaly Detector Pod Aggregate results: %v", err)
}
tad.Stats = append(tad.Stats, res)
case aggtadpod2svcQuery:
case aggTadSvcQuery:
res := v1alpha1.ThroughputAnomalyDetectorStats{}
err := rows.Scan(&res.Id, &res.SourcePodNamespace, &res.SourcePodLabels, &res.DestinationServicePortName, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
err := rows.Scan(&res.Id, &res.DestinationServicePortName, &res.FlowEndSeconds, &res.Throughput, &res.AggType, &res.AlgoType, &res.AlgoCalc, &res.Anomaly)
if err != nil {
return fmt.Errorf("failed to scan Throughput Anomaly Detector pod to svc Aggregate results: %v", err)
return fmt.Errorf("failed to scan Throughput Anomaly Detector Service Aggregate results: %v", err)
}
tad.Stats = append(tad.Stats, res)
}
Expand Down
Loading

0 comments on commit d9a928c

Please sign in to comment.