-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Aggregated Throughput Anomaly detection #184
Conversation
Codecov Report
@@ Coverage Diff @@
## main #184 +/- ##
==========================================
- Coverage 66.54% 66.25% -0.30%
==========================================
Files 38 38
Lines 4783 5049 +266
==========================================
+ Hits 3183 3345 +162
- Misses 1453 1548 +95
- Partials 147 156 +9
|
66bd428
to
1518a84
Compare
abcf5bc
to
842d8e8
Compare
algoCalc, | ||
anomaly | ||
FROM tadetector WHERE id = (?);`, | ||
aggtadpodQuery: ` | ||
SELECT |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code formatter missing in .go files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, make fmt does cover rest files
tathgurt@tathgurtFLVDL theia % find . -type d -name '.cache' -prune -o -type f -name '*.go' -print | grep rest ./pkg/apiserver/registry/intelligence/throughputanomalydetector/rest.go ./pkg/apiserver/registry/intelligence/throughputanomalydetector/rest_test.go ./pkg/apiserver/registry/intelligence/networkpolicyrecommendation/rest.go ./pkg/apiserver/registry/intelligence/networkpolicyrecommendation/rest_test.go ./pkg/apiserver/registry/system/supportbundle/rest.go ./pkg/apiserver/registry/system/supportbundle/rest_test.go ./pkg/apiserver/registry/stats/clickhouse/rest.go ./pkg/apiserver/registry/stats/clickhouse/rest_test.go
842d8e8
to
cda0b2b
Compare
/theia-test-e2e |
algoType String, | ||
algoCalc Float64, | ||
throughput Float64, | ||
anomaly String, | ||
id String | ||
) engine=ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/{table}', '{replica}') | ||
ORDER BY (flowStartSeconds); | ||
ORDER BY (flowEndSeconds); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for curiosity, why do we change this from flowStartSeconds
to flowEndSeconds
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is because we are reusing the same table for aggregated flow output and it will not have flowStartSeconds col in aggregated output, but the common column in both the outputs is flowEndSeconds
SELECT | ||
id, | ||
sourcePodNamespace, | ||
sourcePodLabels, | ||
flowEndSeconds, | ||
throughput, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's the mix of spaces and tabs makes the indentation looks strange here. Could you unify them?
aggtadpodQuery | ||
aggtadpod2podQuery | ||
aggtadpod2svcQuery |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep the camel-case naming to make them aggTadPodQuery
, aggTadPod2PodQuery
, aggTadPod2SvcQuery
) | ||
} | ||
|
||
func deleteTADid(cmd *cobra.Command, tadName string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func deleteTADid(cmd *cobra.Command, tadName string) error { | |
func deleteTADId(cmd *cobra.Command, tadName string) error { |
case "e2e": | ||
fmt.Fprintf(w, "id\tsourceIP\tsourceTransportPort\tdestinationIP\tdestinationTransportPort\tflowStartSeconds\tflowEndSeconds\tthroughput\taggType\talgoType\talgoCalc\tanomaly\n") | ||
for _, p := range tad.Stats { | ||
fmt.Fprintf(w, "%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\n", p.Id, p.SourceIP, p.SourceTransportPort, p.DestinationIP, p.DestinationTransportPort, p.FlowStartSeconds, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feels this is not quite readable. Is it possible to construct a matrix and reuse the TableOutput function here?
throughputAnomalyDetectionAlgoCmd.Flags().String( | ||
"agg-flow", | ||
"", | ||
`Specifies which aggregated flow to perform anomaly detection on, options are pods/pod2pod/pod2svc`, | ||
) | ||
throughputAnomalyDetectionAlgoCmd.Flags().String( | ||
"p2p-label", | ||
"", | ||
`On choosing agg-flow as pod2pod, user need to mention labels for inbound/outbound throughput`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the newly added functionality, could you include them in documentation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comment, I have added it in the other PR 192
3a013ec
to
c031f46
Compare
f.col("new.anomaly").alias("anomaly")) | ||
def filter_df_with_true_anomalies(spark, plotDF, algo_type, agg_flow=None): | ||
if agg_flow: | ||
plotDF = plotDF.withColumn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend merging the shared operations found in lines 317-343. Looks like the different operations are selecting different columns in 322-324 and 336-338.
f.collect_list("flowEndSeconds").alias("flowEndSeconds"), | ||
f.stddev_samp("max(throughput)").alias("throughputStandardDeviation"), | ||
f.collect_list(f.struct(["Diff_Secs", "max(throughput)"])).alias( | ||
"Diff_Secs, Throughput")) | ||
f.collect_list(f.struct(["max(throughput)"])).alias("Throughput")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
c031f46
to
3a8d388
Compare
/theia-test-e2e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the main issue is that the implementation has deviated from our initial design. If you prefer, we can discuss this further offline.
result = append(result, []string{p.Id, p.SourcePodNamespace, p.SourcePodLabels, p.DestinationPodNamespace, p.DestinationPodLabels, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly}) | ||
} | ||
case "pod_to_svc": | ||
result = append(result, []string{"id", "sourcePodNamespace", "sourcePodLabels", "destinationServicePortName", "flowEndSeconds", "throughput", "aggType", "algoType", "algoCalc", "anomaly"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the 'pod_to_svc' type aligns with our design. Our goal was to monitor the aggregated throughput of all traffic directed to a Service, so the 'sourcePodNamespace' and 'sourcePodLabels' parameters are not relevant in this case.
for _, p := range tad.Stats { | ||
result = append(result, []string{p.Id, p.SourceIP, p.SourceTransportPort, p.DestinationIP, p.DestinationTransportPort, p.FlowStartSeconds, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly}) | ||
} | ||
case "pod_to_external": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same concern for the 'pod_to_external' type. I think our goal was to monitor the aggregated throughput of all traffic directed to an external IP.
for _, p := range tad.Stats { | ||
result = append(result, []string{p.Id, p.SourcePodNamespace, p.SourcePodLabels, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly}) | ||
} | ||
case "pod_to_pod": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"pod_to_pod" seems a fair usecase here, just want to point out our initial goal was to monitor the aggregated in/out bound throughput of a set of specific pod labels.
3a8d388
to
6f682d2
Compare
throughputAnomalyDetection.AggregatedFlow = aggregatedFlow | ||
throughputAnomalyDetection.ExternalIP = externalIp | ||
case "svc": | ||
throughputAnomalyDetection.AggregatedFlow = aggregatedFlow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we require users input Pod labels and IP for "pod" and "external" cases but not require a service name for the "svc" case?
I feel Pod labels and external IP are optional input, if user doesn't provide this info, we will considering all possible pod labels and external IPs. Like what we have done in the "svc" case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I can add them as optional parameters, just wanted to confirm if the user doesn’t provide labels or external ip, should we have a check if the corresponding columns are empty or should we even include those columns in DF ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example if user chooses "external" agg type and doesn't provided any input IP, we will consider all toExternal flows, group by the destinationIP, aggregated throughput for each destinationIP, and find anomalies if any.
Hope this exmaple answered your question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for “external” case, it may work as we have flow type to define if the traffic is for external, but there could be pods with no labels, and pods with any type of label, should we add both of them? they both would have different kind of queries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should add both the cases of empty and non empty case for labels, also we can add svc name as argument for user to provide a specific service name, but in case of service age_type, we should only consider non empty cases
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, we can consider the non empty case only for now if the empty case needs big changes.
Maybe you can add another parameter "podname" in the pod agg case, it will help cover the cases of pods with no labels.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, we should not consider the empty cases, as we look for pods based of labels, if there are no labels it would be little misleading to still collect them based of their name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I meant was to have a user input parameter 'podname' that would cover cases where the user knows the pods they're interested in don't have labels.
Just a nice-to-have.
4c2299f
to
ed6a2b4
Compare
c76ecaf
to
570e83a
Compare
@@ -198,3 +198,17 @@ ALTER TABLE flows | |||
ALTER TABLE flows_local | |||
DROP COLUMN egressName, | |||
DROP COLUMN egressIP; | |||
ALTER TABLE tadetector | |||
DROP COLUMN podNamespace; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be a comma instead of semicolon between DROP COLUMNs, please check my previous comment: #184 (comment)
@@ -137,3 +137,17 @@ ALTER TABLE flows | |||
ALTER TABLE flows_local | |||
ADD COLUMN egressName String, | |||
ADD COLUMN egressIP String; | |||
ALTER TABLE tadetector | |||
ADD COLUMN podNamespace; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same above, use comma and add datatype in ADD COLUMNs, please check my previous comment: #184 (comment)
512a38a
to
11741fa
Compare
/theia-test-e2e |
2 similar comments
/theia-test-e2e |
/theia-test-e2e |
Signed-off-by: Tushar Tathgur <[email protected]>
11741fa
to
d921d1a
Compare
/theia-test-e2e |
d921d1a
to
c9bb8e6
Compare
/theia-test-e2e |
c9bb8e6
to
bf7d123
Compare
/theia-test-e2e |
bf7d123
to
1ff58a2
Compare
/theia-test-e2e |
2 similar comments
/theia-test-e2e |
/theia-test-e2e |
1ff58a2
to
19688ec
Compare
/theia-test-e2e |
19688ec
to
73d84dc
Compare
/theia-test-e2e |
73d84dc
to
ff39e79
Compare
/theia-test-e2e |
ff39e79
to
c98c8ae
Compare
/theia-test-e2e |
Signed-off-by: Tushar Tathgur <[email protected]>
c98c8ae
to
e305fdd
Compare
/theia-test-e2e |
This PR does the following: