Skip to content

Commit

Permalink
Adderessed comments for theia manager resync to TAD
Browse files Browse the repository at this point in the history
Signed-off-by: Tushar Tathgur <[email protected]>
  • Loading branch information
Tushar Tathgur authored and Tushar Tathgur committed Mar 27, 2023
1 parent ce87ea2 commit 80d1138
Show file tree
Hide file tree
Showing 6 changed files with 389 additions and 328 deletions.
63 changes: 7 additions & 56 deletions pkg/controller/anomalydetector/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ var (
CreateSparkApplication = controllerutil.CreateSparkApplication
DeleteSparkApplication = controllerutil.DeleteSparkApplication
GetSparkApplication = controllerutil.GetSparkApplication
ListSparkApplication = controllerutil.ListSparkApplicationWithLabel
GetSparkMonitoringSvcDNS = controllerutil.GetSparkMonitoringSvcDNS
GetSparkJobIds = controllerutil.GetSparkJobIds
// For TAD in scheduled or running state, check its status periodically
anomalyDetectorResyncPeriod = 10 * time.Second
sparkAppLabelMap = map[string]string{"app": "theia-tad"}
Expand Down Expand Up @@ -252,7 +250,8 @@ func (c *AnomalyDetectorController) handleStaleResources(key controllerutil.GcKe
}
}
if key.RemoveStaleDbEntries {
err = c.HandleStaleDbEntries()
err = controllerutil.HandleStaleDbEntries(
c.clickhouseConnect, c.kubeClient, "tadetector", "tadetector_local", c.IfTADexists, "tad-")
if err != nil {
errorList = append(errorList, err)
} else {
Expand All @@ -261,7 +260,7 @@ func (c *AnomalyDetectorController) handleStaleResources(key controllerutil.GcKe
}

if key.RemoveStaleSparkApp {
err = c.handleStaleSparkApp()
err = controllerutil.HandleStaleSparkApp(c.kubeClient, sparkAppLabel, c.IfTADexists)
if err != nil {
errorList = append(errorList, err)
} else {
Expand All @@ -276,58 +275,10 @@ func (c *AnomalyDetectorController) handleStaleResources(key controllerutil.GcKe
}
}

func (c *AnomalyDetectorController) HandleStaleDbEntries() error {
if c.clickhouseConnect == nil {
var err error
c.clickhouseConnect, err = clickhouse.SetupConnection(c.kubeClient)
if err != nil {
return fmt.Errorf("failed to connect ClickHouse: %v", err)
}
}
idList, err := GetSparkJobIds(c.clickhouseConnect, "tadetector")
func (c *AnomalyDetectorController) IfTADexists(namespace, id string) error {
_, err := c.GetThroughputAnomalyDetector(env.GetTheiaNamespace(), id)
if err != nil {
return fmt.Errorf("failed to get anomaly detector ids from ClickHouse: %v", err)
}
var errorList []error
for _, id := range idList {
_, err := c.GetThroughputAnomalyDetector(env.GetTheiaNamespace(), "tad-"+id)
if err != nil {
if apimachineryerrors.IsNotFound(err) {
query := "ALTER TABLE tadetector ON CLUSTER '{cluster}' DELETE WHERE id = (" + id + ");"
err = controllerutil.DeleteSparkResult(c.clickhouseConnect, query, id)
if err != nil {
errorList = append(errorList, err)
}
} else {
errorList = append(errorList, err)
}
}
}
if len(errorList) > 0 {
return fmt.Errorf("failed to remove all stale ClickHouse entries: %v", errorList)
}
return nil
}

func (c *AnomalyDetectorController) handleStaleSparkApp() error {
saList, err := ListSparkApplication(c.kubeClient, sparkAppLabel)
if err != nil {
return fmt.Errorf("failed to list Spark Application: %v", err)
}
// Remove stale Spark Applications
var errorList []error
for _, sa := range saList.Items {
_, err := c.GetThroughputAnomalyDetector(sa.Namespace, sa.Name)
if err != nil {
if apimachineryerrors.IsNotFound(err) {
DeleteSparkApplication(c.kubeClient, sa.Name, sa.Namespace)
} else {
errorList = append(errorList, err)
}
}
}
if len(errorList) > 0 {
return fmt.Errorf("failed to remove stale Spark Applications and database entries: %v", errorList)
return err
}
return nil
}
Expand Down Expand Up @@ -443,7 +394,7 @@ func (c *AnomalyDetectorController) cleanupTADetector(namespace string, sparkApp
}
}
query := "ALTER TABLE tadetector ON CLUSTER '{cluster}' DELETE WHERE id = (" + sparkApplicationId + ");"
return controllerutil.DeleteSparkResult(c.clickhouseConnect, query, sparkApplicationId)
return controllerutil.RunClickHouseQuery(c.clickhouseConnect, query, sparkApplicationId)
}

func (c *AnomalyDetectorController) finishJob(newTAD *crdv1alpha1.ThroughputAnomalyDetector) error {
Expand Down
110 changes: 2 additions & 108 deletions pkg/controller/anomalydetector/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -73,7 +72,7 @@ func newFakeController(t *testing.T) (*fakeController, *sql.DB) {
tadController := NewAnomalyDetectorController(crdClient, kubeClient, taDetectorInformer)

mock.ExpectQuery("SELECT DISTINCT id FROM tadetector;").WillReturnRows(sqlmock.NewRows([]string{}))
mock.ExpectExec("ALTER TABLE tadetector ON CLUSTER '{cluster}' DELETE WHERE id = (?);").WithArgs(tadName[3:]).WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("ALTER TABLE tadetector_local ON CLUSTER '{cluster}' DELETE WHERE id = (?);").WithArgs(tadName[3:]).WillReturnResult(sqlmock.NewResult(0, 1))
return &fakeController{
tadController,
crdClient,
Expand Down Expand Up @@ -209,7 +208,7 @@ func TestTADetection(t *testing.T) {
}
CreateSparkApplication = fakeSAClient.create
DeleteSparkApplication = fakeSAClient.delete
ListSparkApplication = fakeSAClient.list
controllerUtil.ListSparkApplication = fakeSAClient.list
GetSparkApplication = fakeSAClient.get
os.Setenv("POD_NAMESPACE", testNamespace)
defer os.Unsetenv("POD_NAMESPACE")
Expand Down Expand Up @@ -406,108 +405,3 @@ func TestTADetection(t *testing.T) {
})
}
}

func TestValidateCluster(t *testing.T) {
testCases := []struct {
name string
setupClient func(kubernetes.Interface)
expectedErrorMsg string
}{
{
name: "clickhouse pod not found",
setupClient: func(i kubernetes.Interface) {},
expectedErrorMsg: "failed to find the ClickHouse Pod, please check the deployment",
},
{
name: "spark operator pod not found",
setupClient: func(client kubernetes.Interface) {
db, _ := clickhouse.CreateFakeClickHouse(t, client, testNamespace)
db.Close()
},
expectedErrorMsg: "failed to find the Spark Operator Pod, please check the deployment",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
kubeClient := fake.NewSimpleClientset()
tc.setupClient(kubeClient)
err := controllerUtil.ValidateCluster(kubeClient, testNamespace)
assert.Contains(t, err.Error(), tc.expectedErrorMsg)
})
}
}

func TestGetTADetectorProgress(t *testing.T) {
sparkAppID := "spark-application-id"
testCases := []struct {
name string
testServer *httptest.Server
expectedErrorMsg string
}{
{
name: "more than one spark application",
testServer: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch strings.TrimSpace(r.URL.Path) {
case "/api/v1/applications":
responses := []map[string]interface{}{
{"id": sparkAppID},
{"id": sparkAppID},
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(responses)
}
})),
expectedErrorMsg: "wrong Spark Application number, expected 1, got 2",
},
{
name: "no spark monitor service",
testServer: nil,
expectedErrorMsg: "failed to get response from the Spark Monitoring Service",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var err error
if tc.testServer != nil {
defer tc.testServer.Close()
_, _, err = controllerUtil.GetSparkAppProgress(tc.testServer.URL)
} else {
_, _, err = controllerUtil.GetSparkAppProgress("http://127.0.0.1")
}
assert.Contains(t, err.Error(), tc.expectedErrorMsg)
})
}
}

func TestHandleStaleDbEntries(t *testing.T) {
tadController, db := newFakeController(t)
testCases := []struct {
name string
GetSparkJobIds func(*sql.DB, string) ([]string, error)
expectedErrorMsg string
}{
{
name: "Anomaly Detector Ids not found",
GetSparkJobIds: func(db *sql.DB, tableName string) ([]string, error) {
return []string{}, errors.New("mock_error")
},
expectedErrorMsg: "failed to get anomaly detector ids from ClickHouse: mock_error",
},
{
name: "Clickhouse Stale Entries present",
GetSparkJobIds: func(db *sql.DB, tableName string) ([]string, error) {
return []string{"mock_id"}, nil
},
expectedErrorMsg: "failed to remove all stale ClickHouse entries",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
tadController.clickhouseConnect = db
GetSparkJobIds = tc.GetSparkJobIds
err := tadController.HandleStaleDbEntries()
assert.Contains(t, err.Error(), tc.expectedErrorMsg)
})
}
}
62 changes: 7 additions & 55 deletions pkg/controller/networkpolicyrecommendation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ var (
ListSparkApplication = controllerutil.ListSparkApplicationWithLabel
GetSparkApplication = controllerutil.GetSparkApplication
GetSparkMonitoringSvcDNS = controllerutil.GetSparkMonitoringSvcDNS
GetSparkJobIds = controllerutil.GetSparkJobIds
// For NPR in scheduled or running state, check its status periodically
npRecommendationResyncPeriod = 10 * time.Second
sparkAppLabelMap = map[string]string{"app": "theia-npr"}
Expand Down Expand Up @@ -205,58 +204,10 @@ func (c *NPRecommendationController) Run(stopCh <-chan struct{}) {
<-stopCh
}

func (c *NPRecommendationController) HandleStaleDbEntries() error {
if c.clickhouseConnect == nil {
var err error
c.clickhouseConnect, err = clickhouse.SetupConnection(c.kubeClient)
if err != nil {
return fmt.Errorf("failed to connect ClickHouse: %v", err)
}
}
idList, err := GetSparkJobIds(c.clickhouseConnect, "recommendations")
func (c *NPRecommendationController) IfNPRexists(namespace, id string) error {
_, err := c.GetNetworkPolicyRecommendation(env.GetTheiaNamespace(), id)
if err != nil {
return fmt.Errorf("failed to get recommendation ids from ClickHouse: %v", err)
}
var errorList []error
for _, id := range idList {
_, err := c.GetNetworkPolicyRecommendation(env.GetTheiaNamespace(), "pr-"+id)
if err != nil {
if apimachineryerrors.IsNotFound(err) {
query := "ALTER TABLE recommendations_local ON CLUSTER '{cluster}' DELETE WHERE id = (" + id + ");"
err = controllerutil.DeleteSparkResult(c.clickhouseConnect, query, id)
if err != nil {
errorList = append(errorList, err)
}
} else {
errorList = append(errorList, err)
}
}
}
if len(errorList) > 0 {
return fmt.Errorf("failed to remove all stale ClickHouse entries: %v", errorList)
}
return nil
}

func (c *NPRecommendationController) handleStaleSparkApp() error {
saList, err := ListSparkApplication(c.kubeClient, sparkAppLabel)
if err != nil {
return fmt.Errorf("failed to list Spark Application: %v", err)
}
// Remove stale Spark Applications
var errorList []error
for _, sa := range saList.Items {
_, err := c.GetNetworkPolicyRecommendation(sa.Namespace, sa.Name)
if err != nil {
if apimachineryerrors.IsNotFound(err) {
DeleteSparkApplication(c.kubeClient, sa.Name, sa.Namespace)
} else {
errorList = append(errorList, err)
}
}
}
if len(errorList) > 0 {
return fmt.Errorf("failed to remove stale Spark Applications and database entries: %v", errorList)
return err
}
return nil
}
Expand Down Expand Up @@ -284,7 +235,8 @@ func (c *NPRecommendationController) handleStaleResources(key controllerutil.GcK
}
}
if key.RemoveStaleDbEntries {
err = c.HandleStaleDbEntries()
err = controllerutil.HandleStaleDbEntries(
c.clickhouseConnect, c.kubeClient, "recommendations", "recommendations_local", c.IfNPRexists, "pr-")
if err != nil {
errorList = append(errorList, err)
} else {
Expand All @@ -293,7 +245,7 @@ func (c *NPRecommendationController) handleStaleResources(key controllerutil.GcK
}

if key.RemoveStaleSparkApp {
err = c.handleStaleSparkApp()
err = controllerutil.HandleStaleSparkApp(c.kubeClient, sparkAppLabel, c.IfNPRexists)
if err != nil {
errorList = append(errorList, err)
} else {
Expand Down Expand Up @@ -447,7 +399,7 @@ func (c *NPRecommendationController) cleanupNPRecommendation(namespace string, s
}
}
query := "ALTER TABLE recommendations_local ON CLUSTER '{cluster}' DELETE WHERE id = (" + sparkApplicationId + ");"
return controllerutil.DeleteSparkResult(c.clickhouseConnect, query, sparkApplicationId)
return controllerutil.RunClickHouseQuery(c.clickhouseConnect, query, sparkApplicationId)
}

func (c *NPRecommendationController) finishJob(npReco *crdv1alpha1.NetworkPolicyRecommendation) error {
Expand Down
Loading

0 comments on commit 80d1138

Please sign in to comment.