From 8061ef2f9e32276f0c33895580689819525796ac Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Sun, 23 Jul 2023 16:24:31 +0800 Subject: [PATCH 01/18] 1st version of email publisher Signed-off-by: Future Outlier --- .../implementations/sandbox_processor.go | 57 +++++++++++++++++++ .../implementations/sandbox_publisher.go | 32 +++++++++++ 2 files changed, 89 insertions(+) create mode 100644 pkg/async/notifications/implementations/sandbox_processor.go create mode 100644 pkg/async/notifications/implementations/sandbox_publisher.go diff --git a/pkg/async/notifications/implementations/sandbox_processor.go b/pkg/async/notifications/implementations/sandbox_processor.go new file mode 100644 index 000000000..b635a678c --- /dev/null +++ b/pkg/async/notifications/implementations/sandbox_processor.go @@ -0,0 +1,57 @@ +package implementations + +import ( + "context" + "time" + + "github.com/NYTimes/gizmo/pubsub" + "github.com/flyteorg/flyteadmin/pkg/async" + "github.com/flyteorg/flyteadmin/pkg/async/notifications/interfaces" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flytestdlib/logger" + "github.com/flyteorg/flytestdlib/promutils" + "github.com/golang/protobuf/proto" +) + +type SandboxProcessor struct { + email interfaces.Emailer + // systemMetrics processorSystemMetrics +} + +func (p *SandboxProcessor) StartProcessing() { + for { + logger.Warningf(context.Background(), "Starting SandBox notifications processor") + err := p.run() + logger.Errorf(context.Background(), "error with running SandBox processor err: [%v] ", err) + time.Sleep(async.RetryDelay) + } +} + +func (p *SandboxProcessor) run() error { + var emailMessage admin.EmailMessage + + // use select instead + select { + case msg := <-msgChan: + err := proto.Unmarshal(msg, &emailMessage) + if err != nil { + logger.Errorf(context.Background(), "error with unmarshalling message [%v] ", err) + } + err = p.email.SendEmail(context.Background(), emailMessage) + } + + return nil + +} + +func (p *SandboxProcessor) StopProcessing() error { + logger.Debug(context.Background(), "call to sandbox stop processing.") + return nil +} + +func NewSandboxProcessor(sub pubsub.Subscriber, emailer interfaces.Emailer, scope promutils.Scope) interfaces.Processor { + return &SandboxProcessor{ + email: emailer, + // systemMetrics: newProcessorSystemMetrics(scope.NewSubScope("sandbox_processor")), + } +} diff --git a/pkg/async/notifications/implementations/sandbox_publisher.go b/pkg/async/notifications/implementations/sandbox_publisher.go new file mode 100644 index 000000000..6f876b227 --- /dev/null +++ b/pkg/async/notifications/implementations/sandbox_publisher.go @@ -0,0 +1,32 @@ +package implementations + +import ( + "context" + + "github.com/flyteorg/flytestdlib/logger" + "github.com/golang/protobuf/proto" +) + +/* + TODO: Check if SystemMetrics is necessary for the sandbox publisher. +*/ + +type SandboxPublisher struct{} + +var msgChan = make(chan []byte) + +//subscriber can get it from this channel +func (p *SandboxPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error { + // push the marshal message to the queue + data, err := proto.Marshal(msg) + if err != nil { + logger.Errorf(ctx, "Failed to publish a message with key [%s] and message [%s] and error: %v", notificationType, msg.String(), err) + } + msgChan <- data + + return nil +} + +func NewSandboxPublisher() *SandboxPublisher { + return &SandboxPublisher{} +} From af73e9acf7eed1a8e6f2324b442758cb2fc9c0d3 Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Sun, 23 Jul 2023 16:42:34 +0800 Subject: [PATCH 02/18] fix input Signed-off-by: Future Outlier --- pkg/async/notifications/factory.go | 18 ++++++++++++++++-- .../implementations/sandbox_processor.go | 6 +++--- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/pkg/async/notifications/factory.go b/pkg/async/notifications/factory.go index 836c1ff2f..2425a3a11 100644 --- a/pkg/async/notifications/factory.go +++ b/pkg/async/notifications/factory.go @@ -67,6 +67,9 @@ func GetEmailer(config runtimeInterfaces.NotificationsConfig, scope promutils.Sc sesClient, ) case common.Local: + /* + + */ fallthrough default: logger.Infof(context.Background(), "Using default noop emailer implementation for config type [%s]", config.Type) @@ -121,7 +124,14 @@ func NewNotificationsProcessor(config runtimeInterfaces.NotificationsConfig, sco emailer = GetEmailer(config, scope) return implementations.NewGcpProcessor(sub, emailer, scope) case common.Local: - fallthrough + // TODO: Implement local processor for sandbox image. + /* + + emailer = GetEmailer(config, scope) + return implementations.NewSandboxProcessor(sub, emailer, scope) + */ + emailer = GetEmailer(config, scope) + return implementations.NewSandboxProcessor(emailer) default: logger.Infof(context.Background(), "Using default noop notifications processor implementation for config type [%s]", config.Type) @@ -129,6 +139,7 @@ func NewNotificationsProcessor(config runtimeInterfaces.NotificationsConfig, sco } } +// push notifications to a topic func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, scope promutils.Scope) interfaces.Publisher { reconnectAttempts := config.ReconnectAttempts reconnectDelay := time.Duration(config.ReconnectDelaySeconds) * time.Second @@ -172,7 +183,10 @@ func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, sco } return implementations.NewPublisher(publisher, scope) case common.Local: - fallthrough + // push notifications to a topic + // use message queue to send notifications + // chan is pass by reference in golang + return implementations.NewSandboxPublisher() default: logger.Infof(context.Background(), "Using default noop notifications publisher implementation for config type [%s]", config.Type) diff --git a/pkg/async/notifications/implementations/sandbox_processor.go b/pkg/async/notifications/implementations/sandbox_processor.go index b635a678c..0c1a4f4cc 100644 --- a/pkg/async/notifications/implementations/sandbox_processor.go +++ b/pkg/async/notifications/implementations/sandbox_processor.go @@ -4,12 +4,10 @@ import ( "context" "time" - "github.com/NYTimes/gizmo/pubsub" "github.com/flyteorg/flyteadmin/pkg/async" "github.com/flyteorg/flyteadmin/pkg/async/notifications/interfaces" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flytestdlib/logger" - "github.com/flyteorg/flytestdlib/promutils" "github.com/golang/protobuf/proto" ) @@ -38,6 +36,8 @@ func (p *SandboxProcessor) run() error { logger.Errorf(context.Background(), "error with unmarshalling message [%v] ", err) } err = p.email.SendEmail(context.Background(), emailMessage) + default: + logger.Debugf(context.Background(), "no message to process") } return nil @@ -49,7 +49,7 @@ func (p *SandboxProcessor) StopProcessing() error { return nil } -func NewSandboxProcessor(sub pubsub.Subscriber, emailer interfaces.Emailer, scope promutils.Scope) interfaces.Processor { +func NewSandboxProcessor(emailer interfaces.Emailer) interfaces.Processor { return &SandboxProcessor{ email: emailer, // systemMetrics: newProcessorSystemMetrics(scope.NewSubScope("sandbox_processor")), From 870f3820a73bd216fe8239dffb9d0e81ee347574 Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Wed, 26 Jul 2023 00:01:25 +0800 Subject: [PATCH 03/18] email feature Signed-off-by: Future Outlier --- pkg/async/notifications/factory.go | 5 +-- .../implementations/sandbox_processor.go | 32 ++++++++++--------- .../implementations/sandbox_publisher.go | 11 +++---- 3 files changed, 23 insertions(+), 25 deletions(-) diff --git a/pkg/async/notifications/factory.go b/pkg/async/notifications/factory.go index 2425a3a11..0fff21c60 100644 --- a/pkg/async/notifications/factory.go +++ b/pkg/async/notifications/factory.go @@ -67,10 +67,7 @@ func GetEmailer(config runtimeInterfaces.NotificationsConfig, scope promutils.Sc sesClient, ) case common.Local: - /* - - */ - fallthrough + return implementations.NewSendGridEmailer(config, scope) default: logger.Infof(context.Background(), "Using default noop emailer implementation for config type [%s]", config.Type) return implementations.NewNoopEmail() diff --git a/pkg/async/notifications/implementations/sandbox_processor.go b/pkg/async/notifications/implementations/sandbox_processor.go index 0c1a4f4cc..d6b787d04 100644 --- a/pkg/async/notifications/implementations/sandbox_processor.go +++ b/pkg/async/notifications/implementations/sandbox_processor.go @@ -13,14 +13,13 @@ import ( type SandboxProcessor struct { email interfaces.Emailer - // systemMetrics processorSystemMetrics } func (p *SandboxProcessor) StartProcessing() { for { logger.Warningf(context.Background(), "Starting SandBox notifications processor") err := p.run() - logger.Errorf(context.Background(), "error with running SandBox processor err: [%v] ", err) + logger.Errorf(context.Background(), "error with running processor err: [%v] ", err) time.Sleep(async.RetryDelay) } } @@ -28,20 +27,24 @@ func (p *SandboxProcessor) StartProcessing() { func (p *SandboxProcessor) run() error { var emailMessage admin.EmailMessage - // use select instead - select { - case msg := <-msgChan: - err := proto.Unmarshal(msg, &emailMessage) - if err != nil { - logger.Errorf(context.Background(), "error with unmarshalling message [%v] ", err) + for { + select { + case msg := <-msgChan: + err := proto.Unmarshal(msg, &emailMessage) + if err != nil { + logger.Errorf(context.Background(), "error with unmarshalling message [%v]", err) + return err + } + + err = p.email.SendEmail(context.Background(), emailMessage) + if err != nil { + logger.Errorf(context.Background(), "error with sendemail message [%v] ", err) + return err + } + default: + logger.Debugf(context.Background(), "no message to process") } - err = p.email.SendEmail(context.Background(), emailMessage) - default: - logger.Debugf(context.Background(), "no message to process") } - - return nil - } func (p *SandboxProcessor) StopProcessing() error { @@ -52,6 +55,5 @@ func (p *SandboxProcessor) StopProcessing() error { func NewSandboxProcessor(emailer interfaces.Emailer) interfaces.Processor { return &SandboxProcessor{ email: emailer, - // systemMetrics: newProcessorSystemMetrics(scope.NewSubScope("sandbox_processor")), } } diff --git a/pkg/async/notifications/implementations/sandbox_publisher.go b/pkg/async/notifications/implementations/sandbox_publisher.go index 6f876b227..735dc054b 100644 --- a/pkg/async/notifications/implementations/sandbox_publisher.go +++ b/pkg/async/notifications/implementations/sandbox_publisher.go @@ -7,21 +7,20 @@ import ( "github.com/golang/protobuf/proto" ) -/* - TODO: Check if SystemMetrics is necessary for the sandbox publisher. -*/ - type SandboxPublisher struct{} var msgChan = make(chan []byte) -//subscriber can get it from this channel func (p *SandboxPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error { - // push the marshal message to the queue + logger.Debugf(ctx, "Publishing the following message [%s]", msg.String()) + data, err := proto.Marshal(msg) + if err != nil { logger.Errorf(ctx, "Failed to publish a message with key [%s] and message [%s] and error: %v", notificationType, msg.String(), err) + return err } + msgChan <- data return nil From 89ccfd2bee064d55cece37450d6eca6b5b324bda Mon Sep 17 00:00:00 2001 From: Flyte Bot Date: Fri, 14 Jul 2023 16:22:26 -0700 Subject: [PATCH 04/18] Update boilerplate version (#589) Signed-off-by: Flyte-Bot Co-authored-by: eapolinario Signed-off-by: Future Outlier --- boilerplate/flyte/end2end/run-tests.py | 58 +++++++++++++------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/boilerplate/flyte/end2end/run-tests.py b/boilerplate/flyte/end2end/run-tests.py index b48f2be9d..15b35e1d9 100644 --- a/boilerplate/flyte/end2end/run-tests.py +++ b/boilerplate/flyte/end2end/run-tests.py @@ -22,42 +22,42 @@ # starting with "core". FLYTESNACKS_WORKFLOW_GROUPS: Mapping[str, List[Tuple[str, dict]]] = { "lite": [ - ("core.flyte_basics.hello_world.my_wf", {}), - ("core.flyte_basics.lp.go_greet", {"day_of_week": "5", "number": 3, "am": True}), + ("basics.hello_world.my_wf", {}), + ("basics.lp.go_greet", {"day_of_week": "5", "number": 3, "am": True}), ], "core": [ - ("core.flyte_basics.deck.wf", {}), + ("basics.deck.wf", {}), # The chain_workflows example in flytesnacks expects to be running in a sandbox. - # ("core.control_flow.chain_entities.chain_workflows_wf", {}), - ("core.control_flow.dynamics.wf", {"s1": "Pear", "s2": "Earth"}), - ("core.control_flow.map_task.my_map_workflow", {"a": [1, 2, 3, 4, 5]}), + # ("control_flow.chain_entities.chain_workflows_wf", {}), + ("control_flow.dynamics.wf", {"s1": "Pear", "s2": "Earth"}), + ("control_flow.map_task.my_map_workflow", {"a": [1, 2, 3, 4, 5]}), # Workflows that use nested executions cannot be launched via flyteremote. # This issue is being tracked in https://github.com/flyteorg/flyte/issues/1482. - # ("core.control_flow.run_conditions.multiplier", {"my_input": 0.5}), - # ("core.control_flow.run_conditions.multiplier_2", {"my_input": 10}), - # ("core.control_flow.run_conditions.multiplier_3", {"my_input": 5}), - # ("core.control_flow.run_conditions.basic_boolean_wf", {"seed": 5}), - # ("core.control_flow.run_conditions.bool_input_wf", {"b": True}), - # ("core.control_flow.run_conditions.nested_conditions", {"my_input": 0.4}), - # ("core.control_flow.run_conditions.consume_outputs", {"my_input": 0.4, "seed": 7}), - # ("core.control_flow.run_merge_sort.merge_sort", {"numbers": [5, 4, 3, 2, 1], "count": 5}), - ("core.control_flow.subworkflows.parent_wf", {"a": 3}), - ("core.control_flow.subworkflows.nested_parent_wf", {"a": 3}), - ("core.flyte_basics.basic_workflow.my_wf", {"a": 50, "b": "hello"}), + # ("control_flow.run_conditions.multiplier", {"my_input": 0.5}), + # ("control_flow.run_conditions.multiplier_2", {"my_input": 10}), + # ("control_flow.run_conditions.multiplier_3", {"my_input": 5}), + # ("control_flow.run_conditions.basic_boolean_wf", {"seed": 5}), + # ("control_flow.run_conditions.bool_input_wf", {"b": True}), + # ("control_flow.run_conditions.nested_conditions", {"my_input": 0.4}), + # ("control_flow.run_conditions.consume_outputs", {"my_input": 0.4, "seed": 7}), + # ("control_flow.run_merge_sort.merge_sort", {"numbers": [5, 4, 3, 2, 1], "count": 5}), + ("control_flow.subworkflows.parent_wf", {"a": 3}), + ("control_flow.subworkflows.nested_parent_wf", {"a": 3}), + ("basics.basic_workflow.my_wf", {"a": 50, "b": "hello"}), # TODO: enable new files and folders workflows - # ("core.flyte_basics.files.rotate_one_workflow", {"in_image": "https://upload.wikimedia.org/wikipedia/commons/d/d2/Julia_set_%28C_%3D_0.285%2C_0.01%29.jpg"}), - # ("core.flyte_basics.folders.download_and_rotate", {}), - ("core.flyte_basics.hello_world.my_wf", {}), - ("core.flyte_basics.lp.my_wf", {"val": 4}), - ("core.flyte_basics.lp.go_greet", {"day_of_week": "5", "number": 3, "am": True}), - ("core.flyte_basics.named_outputs.my_wf", {}), + # ("basics.files.rotate_one_workflow", {"in_image": "https://upload.wikimedia.org/wikipedia/commons/d/d2/Julia_set_%28C_%3D_0.285%2C_0.01%29.jpg"}), + # ("basics.folders.download_and_rotate", {}), + ("basics.hello_world.my_wf", {}), + ("basics.lp.my_wf", {"val": 4}), + ("basics.lp.go_greet", {"day_of_week": "5", "number": 3, "am": True}), + ("basics.named_outputs.my_wf", {}), # # Getting a 403 for the wikipedia image - # # ("core.flyte_basics.reference_task.wf", {}), - ("core.type_system.custom_objects.wf", {"x": 10, "y": 20}), + # # ("basics.reference_task.wf", {}), + ("type_system.custom_objects.wf", {"x": 10, "y": 20}), # Enums are not supported in flyteremote - # ("core.type_system.enums.enum_wf", {"c": "red"}), - ("core.type_system.schema.df_wf", {"a": 42}), - ("core.type_system.typed_schema.wf", {}), + # ("type_system.enums.enum_wf", {"c": "red"}), + ("type_system.schema.df_wf", {"a": 42}), + ("type_system.typed_schema.wf", {}), #("my.imperative.workflow.example", {"in1": "hello", "in2": "foo"}), ], "integrations-k8s-spark": [ @@ -193,7 +193,7 @@ def run( # For a given release tag and priority, this function filters the workflow groups from the flytesnacks # manifest file. For example, for the release tag "v0.2.224" and the priority "P0" it returns [ "core" ]. manifest_url = "https://raw.githubusercontent.com/flyteorg/flytesnacks/" \ - f"{flytesnacks_release_tag}/cookbook/flyte_tests_manifest.json" + f"{flytesnacks_release_tag}/flyte_tests_manifest.json" r = requests.get(manifest_url) parsed_manifest = r.json() workflow_groups = [] From 66fb45dc09733f1c9379cb223165cfb361188c9e Mon Sep 17 00:00:00 2001 From: Flyte Bot Date: Wed, 26 Jul 2023 14:29:57 -0700 Subject: [PATCH 05/18] Update boilerplate version (#594) Signed-off-by: Flyte-Bot Co-authored-by: flyte-bot Signed-off-by: Future Outlier --- boilerplate/flyte/end2end/run-tests.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/boilerplate/flyte/end2end/run-tests.py b/boilerplate/flyte/end2end/run-tests.py index 15b35e1d9..66c678fd4 100644 --- a/boilerplate/flyte/end2end/run-tests.py +++ b/boilerplate/flyte/end2end/run-tests.py @@ -61,33 +61,33 @@ #("my.imperative.workflow.example", {"in1": "hello", "in2": "foo"}), ], "integrations-k8s-spark": [ - ("k8s_spark.pyspark_pi.my_spark", {"triggered_date": datetime.datetime.now()}), + ("k8s_spark_plugin.pyspark_pi.my_spark", {"triggered_date": datetime.datetime.now()}), ], "integrations-kfpytorch": [ - ("kfpytorch.pytorch_mnist.pytorch_training_wf", {}), + ("kfpytorch_plugin.pytorch_mnist.pytorch_training_wf", {}), ], "integrations-kftensorflow": [ - ("kftensorflow.tf_mnist.mnist_tensorflow_workflow", {}), + ("kftensorflow_plugin.tf_mnist.mnist_tensorflow_workflow", {}), ], # "integrations-pod": [ # ("pod.pod.pod_workflow", {}), # ], "integrations-pandera_examples": [ - ("pandera_examples.basic_schema_example.process_data", {}), + ("pandera_plugin.basic_schema_example.process_data", {}), # TODO: investigate type mismatch float -> numpy.float64 - # ("pandera_examples.validating_and_testing_ml_pipelines.pipeline", {"data_random_state": 42, "model_random_state": 99}), + # ("pandera_plugin.validating_and_testing_ml_pipelines.pipeline", {"data_random_state": 42, "model_random_state": 99}), ], "integrations-modin_examples": [ - ("modin_examples.knn_classifier.pipeline", {}), + ("modin_plugin.knn_classifier.pipeline", {}), ], "integrations-papermilltasks": [ - ("papermilltasks.simple.nb_to_python_wf", {"f": 3.1415926535}), + ("papermill_plugin.simple.nb_to_python_wf", {"f": 3.1415926535}), ], "integrations-greatexpectations": [ - ("greatexpectations.task_example.simple_wf", {}), - ("greatexpectations.task_example.file_wf", {}), - ("greatexpectations.task_example.schema_wf", {}), - ("greatexpectations.task_example.runtime_wf", {}), + ("greatexpectations_plugin.task_example.simple_wf", {}), + ("greatexpectations_plugin.task_example.file_wf", {}), + ("greatexpectations_plugin.task_example.schema_wf", {}), + ("greatexpectations_plugin.task_example.runtime_wf", {}), ], } From b8706bbb0648bd5cea6187a6d92f6b70f8cbdc79 Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Thu, 27 Jul 2023 22:26:19 +0800 Subject: [PATCH 06/18] sandbox email publisher and processor with test Signed-off-by: Future Outlier --- go.mod | 22 ++++----- go.sum | 47 ++++++++++--------- .../implementations/sandbox_processor_test.go | 21 +++++++++ .../implementations/sandbox_publisher_test.go | 40 ++++++++++++++++ 4 files changed, 95 insertions(+), 35 deletions(-) create mode 100644 pkg/async/notifications/implementations/sandbox_processor_test.go create mode 100644 pkg/async/notifications/implementations/sandbox_publisher_test.go diff --git a/go.mod b/go.mod index 13acd65cc..8dd56c593 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/coreos/go-oidc v2.2.1+incompatible github.com/evanphx/json-patch v4.12.0+incompatible github.com/flyteorg/flyteidl v1.5.11 - github.com/flyteorg/flyteplugins v1.0.67 + github.com/flyteorg/flyteplugins v1.1.8 github.com/flyteorg/flytepropeller v1.1.98 github.com/flyteorg/flytestdlib v1.0.20 github.com/flyteorg/stow v0.3.6 @@ -79,8 +79,6 @@ require ( github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect github.com/Azure/go-autorest/logger v0.2.1 // indirect github.com/Azure/go-autorest/tracing v0.6.0 // indirect - github.com/PuerkitoBio/purell v1.1.1 // indirect - github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect github.com/asaskevich/govalidator v0.0.0-20200428143746-21a406dcc535 // indirect github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 // indirect github.com/beorn7/perks v1.0.1 // indirect @@ -93,14 +91,14 @@ require ( github.com/eapache/go-resiliency v1.2.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect github.com/eapache/queue v1.1.0 // indirect - github.com/emicklei/go-restful/v3 v3.8.0 // indirect + github.com/emicklei/go-restful/v3 v3.9.0 // indirect github.com/fatih/color v1.13.0 // indirect github.com/felixge/httpsnoop v1.0.1 // indirect github.com/fsnotify/fsnotify v1.5.1 // indirect github.com/go-logr/logr v1.2.3 // indirect - github.com/go-openapi/jsonpointer v0.19.5 // indirect - github.com/go-openapi/jsonreference v0.19.5 // indirect - github.com/go-openapi/swag v0.19.14 // indirect + github.com/go-openapi/jsonpointer v0.19.6 // indirect + github.com/go-openapi/jsonreference v0.20.1 // indirect + github.com/go-openapi/swag v0.22.3 // indirect github.com/go-sql-driver/mysql v1.7.0 // indirect github.com/go-test/deep v1.0.7 // indirect github.com/goccy/go-json v0.4.8 // indirect @@ -136,7 +134,7 @@ require ( github.com/lestrrat-go/iter v1.0.1 // indirect github.com/lestrrat-go/option v1.0.0 // indirect github.com/lib/pq v1.10.4 // indirect - github.com/mailru/easyjson v0.7.6 // indirect + github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/mattn/go-sqlite3 v2.0.3+incompatible // indirect @@ -187,7 +185,7 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.24.1 // indirect - sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect + sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/yaml v1.3.0 // indirect ) @@ -202,9 +200,9 @@ require ( go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.19.1 // indirect - k8s.io/klog/v2 v2.70.1 // indirect - k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 // indirect - k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect + k8s.io/klog/v2 v2.90.1 // indirect + k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect + k8s.io/utils v0.0.0-20230209194617-a36077c30491 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect ) diff --git a/go.sum b/go.sum index 1e877bf40..d0a658df6 100644 --- a/go.sum +++ b/go.sum @@ -112,9 +112,7 @@ github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBKeU+7zCJoLcc= github.com/PuerkitoBio/purell v1.1.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= -github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tNFfI= github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= -github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/Selvatico/go-mocket v1.0.7 h1:sXuFMnMfVL9b/Os8rGXPgbOFbr4HJm8aHsulD/uMTUk= github.com/Selvatico/go-mocket v1.0.7/go.mod h1:4gO2v+uQmsL+jzQgLANy3tyEFzaEzHlymVbZ3GP2Oes= @@ -272,8 +270,8 @@ github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkg github.com/elazarl/goproxy v0.0.0-20181003060214-f58a169a71a5/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= -github.com/emicklei/go-restful/v3 v3.8.0 h1:eCZ8ulSerjdAiaNpF7GxXIE7ZCMo1moN1qX+S609eVw= -github.com/emicklei/go-restful/v3 v3.8.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE= +github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -295,10 +293,8 @@ github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8S github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flyteorg/flyteidl v1.5.11 h1:Xcb17YqNstl+dHQsK+o0Ac+1l1U154wXivg28O3C5l0= github.com/flyteorg/flyteidl v1.5.11/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= -github.com/flyteorg/flyteplugins v1.0.67 h1:d2FXpwxQwX/k4YdmhuusykOemHb/cUTPEob4WBmdpjE= -github.com/flyteorg/flyteplugins v1.0.67/go.mod h1:HHt4nKDKVwrZPKDsj99dNtDSIJL378xNotYMA3a/TFA= -github.com/flyteorg/flytepropeller v1.1.98 h1:Zk2ENYB9VZRT5tFUIFjm+aCkr0TU2EuyJ5gh52fpLoA= -github.com/flyteorg/flytepropeller v1.1.98/go.mod h1:R0CB6Uzp9F4YyvPmLRE7XyXxDebAPFD+LbHTf07mBzI= +github.com/flyteorg/flyteplugins v1.1.8 h1:UVYdqDdcIqz2JIso+m3MsaPSsTZJZyZQ6Eg7nhX9r/Y= +github.com/flyteorg/flyteplugins v1.1.8/go.mod h1:sRxeatEOHq1b9bTxTRNcwoIkVTAVN9dTz8toXkfcz2E= github.com/flyteorg/flytestdlib v1.0.20 h1:BrCQMlpdrFAPlADFJvCyn7gm+37df9WGYqLEB1mOlCQ= github.com/flyteorg/flytestdlib v1.0.20/go.mod h1:v3ua7HfHDXXTCrAt2yZERGKCuilP5Rh+L8TdAbfVcBg= github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk= @@ -359,14 +355,16 @@ github.com/go-openapi/jsonpointer v0.17.0/go.mod h1:cOnomiV+CVVwFLk0A/MExoFMjwds github.com/go-openapi/jsonpointer v0.18.0/go.mod h1:cOnomiV+CVVwFLk0A/MExoFMjwdsUdVpsRhURCKh+3M= github.com/go-openapi/jsonpointer v0.19.2/go.mod h1:3akKfEdA7DF1sugOqz1dVQHBcuDBPKZGEoHC/NkiQRg= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= -github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY= github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= +github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE= +github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs= github.com/go-openapi/jsonreference v0.17.0/go.mod h1:g4xxGn04lDIRh0GJb5QlpE3HfopLOL6uZrK/VgnsK9I= github.com/go-openapi/jsonreference v0.18.0/go.mod h1:g4xxGn04lDIRh0GJb5QlpE3HfopLOL6uZrK/VgnsK9I= github.com/go-openapi/jsonreference v0.19.2/go.mod h1:jMjeRr2HHw6nAVajTXJ4eiUwohSTlpa0o73RUL1owJc= github.com/go-openapi/jsonreference v0.19.3/go.mod h1:rjx6GuL8TTa9VaixXglHmQmIL98+wF9xc8zWvFonSJ8= -github.com/go-openapi/jsonreference v0.19.5 h1:1WJP/wi4OjB4iV8KVbH73rQaoialJrqv8gitZLxGLtM= github.com/go-openapi/jsonreference v0.19.5/go.mod h1:RdybgQwPxbL4UEjuAruzK1x3nE69AqPYEJeo/TWfEeg= +github.com/go-openapi/jsonreference v0.20.1 h1:FBLnyygC4/IZZr893oiomc9XaghoveYTrLC1F86HID8= +github.com/go-openapi/jsonreference v0.20.1/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k= github.com/go-openapi/loads v0.17.0/go.mod h1:72tmFy5wsWx89uEVddd0RjRWPZm92WRLhf7AC+0+OOU= github.com/go-openapi/loads v0.18.0/go.mod h1:72tmFy5wsWx89uEVddd0RjRWPZm92WRLhf7AC+0+OOU= github.com/go-openapi/loads v0.19.0/go.mod h1:72tmFy5wsWx89uEVddd0RjRWPZm92WRLhf7AC+0+OOU= @@ -397,8 +395,9 @@ github.com/go-openapi/swag v0.19.2/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= github.com/go-openapi/swag v0.19.7/go.mod h1:ao+8BpOPyKdpQz3AOJfbeEVpLmWAvlT1IfTe5McPyhY= github.com/go-openapi/swag v0.19.9/go.mod h1:ao+8BpOPyKdpQz3AOJfbeEVpLmWAvlT1IfTe5McPyhY= -github.com/go-openapi/swag v0.19.14 h1:gm3vOOXfiuw5i9p5N9xJvfjvuofpyvLA9Wr6QfK5Fng= github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ= +github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g= +github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= github.com/go-openapi/validate v0.18.0/go.mod h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+MYsct2VUrAJ4= github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2KDnRCRMUi7GTA= github.com/go-openapi/validate v0.19.3/go.mod h1:90Vh6jjkTn+OT1Eefm0ZixWNFjhtOH7vS9k0lo6zwJo= @@ -997,8 +996,9 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxv github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= @@ -1042,8 +1042,9 @@ github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.1/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs= -github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA= github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/markbates/deplist v1.0.4/go.mod h1:gRRbPbbuA8TmMiRvaOzUlRfzfjeCCBqX2A6arxN01MM= github.com/markbates/deplist v1.0.5/go.mod h1:gRRbPbbuA8TmMiRvaOzUlRfzfjeCCBqX2A6arxN01MM= github.com/markbates/deplist v1.1.3/go.mod h1:BF7ioVzAJYEtzQN/os4rt8H8Ti3h0T7EoN+7eyALktE= @@ -1139,7 +1140,6 @@ github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+ github.com/ncw/swift v1.0.53 h1:luHjjTNtekIEvHg5KdAFIBaH7bWfNkefwFnpDffSIks= github.com/ncw/swift v1.0.53/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM= github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q= -github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d/go.mod h1:YUTz3bUH2ZwIWBy3CJBeOBEugqcmXREj14T+iG/4k4U= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= @@ -2113,8 +2113,9 @@ gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc/go.mod gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo= @@ -2213,15 +2214,15 @@ k8s.io/gengo v0.0.0-20211129171323-c02415ce4185/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAE k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= k8s.io/klog/v2 v2.60.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= -k8s.io/klog/v2 v2.70.1 h1:7aaoSdahviPmR+XkS7FyxlkkXs6tHISSG03RxleQAVQ= -k8s.io/klog/v2 v2.70.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= +k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw= +k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42/go.mod h1:Z/45zLw8lUo4wdiUkI+v/ImEGAvu3WatcZl3lPMR4Rk= -k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 h1:MQ8BAZPZlWk3S9K4a9NCkIFQtZShWqoha7snGixVgEA= -k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1/go.mod h1:C/N6wCaBHeBHkHUesQOQy2/MZqGgMAFPqGsGQLdbZBU= +k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f h1:2kWPakN3i/k81b0gvD5C5FJ2kxm1WrQFanWchyKuqGg= +k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f/go.mod h1:byini6yhqGC14c3ebc/QwanvYwhuMWF6yz2F8uwW8eg= k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= -k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed h1:jAne/RjBTyawwAy0utX5eqigAwz/lQhTmy+Hr/Cpue4= -k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= +k8s.io/utils v0.0.0-20230209194617-a36077c30491 h1:r0BAOLElQnnFhE/ApUsg3iHdVYYPBjNSSOMowRZxxsY= +k8s.io/utils v0.0.0-20230209194617-a36077c30491/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw= modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk= modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k= @@ -2235,8 +2236,8 @@ sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.30/go.mod h1:fEO7lR sigs.k8s.io/controller-runtime v0.12.1 h1:4BJY01xe9zKQti8oRjj/NeHKRXthf1YkYJAgLONFFoI= sigs.k8s.io/controller-runtime v0.12.1/go.mod h1:BKhxlA4l7FPK4AQcsuL4X6vZeWnKDXez/vp1Y8dxTU0= sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2/go.mod h1:B+TnT182UBxE84DiCz4CVE26eOSDAeYCpfDnC2kdKMY= -sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k= -sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= +sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= +sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw= sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4= sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE= diff --git a/pkg/async/notifications/implementations/sandbox_processor_test.go b/pkg/async/notifications/implementations/sandbox_processor_test.go new file mode 100644 index 000000000..ff6c977c6 --- /dev/null +++ b/pkg/async/notifications/implementations/sandbox_processor_test.go @@ -0,0 +1,21 @@ +package implementations + +import ( + "testing" + + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" +) + +func TestSandboxProcessor_StartProcessing(t *testing.T) { + // Mock a message + var emailMessage admin.EmailMessage + err := proto.Unmarshal(msg, &emailMessage) + assert.Nil(t, err) + + assert.Equal(t, emailMessage.Body, testEmail.Body) + assert.Equal(t, emailMessage.RecipientsEmail, testEmail.RecipientsEmail) + assert.Equal(t, emailMessage.SubjectLine, testEmail.SubjectLine) + assert.Equal(t, emailMessage.SenderEmail, testEmail.SenderEmail) +} diff --git a/pkg/async/notifications/implementations/sandbox_publisher_test.go b/pkg/async/notifications/implementations/sandbox_publisher_test.go new file mode 100644 index 000000000..ede3bfe1e --- /dev/null +++ b/pkg/async/notifications/implementations/sandbox_publisher_test.go @@ -0,0 +1,40 @@ +package implementations + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestSandboxPublisher_Publish(t *testing.T) { + // Initialize the sandbox publisher + publisher := NewSandboxPublisher() + + // Create a channel to communicate failures + errChan := make(chan string) + + go func() { + select { + case <-msgChan: + // if message received, no need to send an error + case <-time.After(time.Second * 5): + errChan <- "No data was received in the channel within the expected time frame" + } + }() + + // Run the Publish method + err := publisher.Publish(context.Background(), "NOTIFICATION_TYPE", &testEmail) + + // Check if there was an error in the goroutine + select { + case errMsg := <-errChan: + t.Fatal(errMsg) + default: + // no error from the goroutine + } + + // Ensure there was no error in the Publish method + assert.Nil(t, err) +} From c4a0fcfa8937edd9d3639f7349a5e64ad971f6b8 Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Thu, 27 Jul 2023 22:28:40 +0800 Subject: [PATCH 07/18] delete unnecessary comments Signed-off-by: Future Outlier --- pkg/async/notifications/factory.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/pkg/async/notifications/factory.go b/pkg/async/notifications/factory.go index 0fff21c60..ea4d08f26 100644 --- a/pkg/async/notifications/factory.go +++ b/pkg/async/notifications/factory.go @@ -121,12 +121,6 @@ func NewNotificationsProcessor(config runtimeInterfaces.NotificationsConfig, sco emailer = GetEmailer(config, scope) return implementations.NewGcpProcessor(sub, emailer, scope) case common.Local: - // TODO: Implement local processor for sandbox image. - /* - - emailer = GetEmailer(config, scope) - return implementations.NewSandboxProcessor(sub, emailer, scope) - */ emailer = GetEmailer(config, scope) return implementations.NewSandboxProcessor(emailer) default: @@ -136,7 +130,6 @@ func NewNotificationsProcessor(config runtimeInterfaces.NotificationsConfig, sco } } -// push notifications to a topic func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, scope promutils.Scope) interfaces.Publisher { reconnectAttempts := config.ReconnectAttempts reconnectDelay := time.Duration(config.ReconnectDelaySeconds) * time.Second @@ -180,9 +173,6 @@ func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, sco } return implementations.NewPublisher(publisher, scope) case common.Local: - // push notifications to a topic - // use message queue to send notifications - // chan is pass by reference in golang return implementations.NewSandboxPublisher() default: logger.Infof(context.Background(), From dc16bb252066a1d9154db777517b1e0be7c6d78f Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Fri, 28 Jul 2023 21:43:12 +0800 Subject: [PATCH 08/18] update go.sum Signed-off-by: Future Outlier --- go.sum | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go.sum b/go.sum index d0a658df6..dcca85c8c 100644 --- a/go.sum +++ b/go.sum @@ -295,6 +295,8 @@ github.com/flyteorg/flyteidl v1.5.11 h1:Xcb17YqNstl+dHQsK+o0Ac+1l1U154wXivg28O3C github.com/flyteorg/flyteidl v1.5.11/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= github.com/flyteorg/flyteplugins v1.1.8 h1:UVYdqDdcIqz2JIso+m3MsaPSsTZJZyZQ6Eg7nhX9r/Y= github.com/flyteorg/flyteplugins v1.1.8/go.mod h1:sRxeatEOHq1b9bTxTRNcwoIkVTAVN9dTz8toXkfcz2E= +github.com/flyteorg/flytepropeller v1.1.98 h1:Zk2ENYB9VZRT5tFUIFjm+aCkr0TU2EuyJ5gh52fpLoA= +github.com/flyteorg/flytepropeller v1.1.98/go.mod h1:R0CB6Uzp9F4YyvPmLRE7XyXxDebAPFD+LbHTf07mBzI= github.com/flyteorg/flytestdlib v1.0.20 h1:BrCQMlpdrFAPlADFJvCyn7gm+37df9WGYqLEB1mOlCQ= github.com/flyteorg/flytestdlib v1.0.20/go.mod h1:v3ua7HfHDXXTCrAt2yZERGKCuilP5Rh+L8TdAbfVcBg= github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk= From 8e4c50aed5f2ff96774d0d0e16d370341a831ec4 Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Sat, 29 Jul 2023 23:24:42 +0800 Subject: [PATCH 09/18] add common.Sandbox type Signed-off-by: Future Outlier --- go.mod | 2 +- go.sum | 4 ++-- pkg/async/notifications/factory.go | 12 +++++++++--- .../implementations/sandbox_publisher_test.go | 4 ---- pkg/common/cloud.go | 9 +++++---- 5 files changed, 17 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index 8dd56c593..e0ec8d581 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/coreos/go-oidc v2.2.1+incompatible github.com/evanphx/json-patch v4.12.0+incompatible github.com/flyteorg/flyteidl v1.5.11 - github.com/flyteorg/flyteplugins v1.1.8 + github.com/flyteorg/flyteplugins v1.0.67 github.com/flyteorg/flytepropeller v1.1.98 github.com/flyteorg/flytestdlib v1.0.20 github.com/flyteorg/stow v0.3.6 diff --git a/go.sum b/go.sum index dcca85c8c..aaf53a11e 100644 --- a/go.sum +++ b/go.sum @@ -293,8 +293,8 @@ github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8S github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flyteorg/flyteidl v1.5.11 h1:Xcb17YqNstl+dHQsK+o0Ac+1l1U154wXivg28O3C5l0= github.com/flyteorg/flyteidl v1.5.11/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= -github.com/flyteorg/flyteplugins v1.1.8 h1:UVYdqDdcIqz2JIso+m3MsaPSsTZJZyZQ6Eg7nhX9r/Y= -github.com/flyteorg/flyteplugins v1.1.8/go.mod h1:sRxeatEOHq1b9bTxTRNcwoIkVTAVN9dTz8toXkfcz2E= +github.com/flyteorg/flyteplugins v1.0.67 h1:d2FXpwxQwX/k4YdmhuusykOemHb/cUTPEob4WBmdpjE= +github.com/flyteorg/flyteplugins v1.0.67/go.mod h1:HHt4nKDKVwrZPKDsj99dNtDSIJL378xNotYMA3a/TFA= github.com/flyteorg/flytepropeller v1.1.98 h1:Zk2ENYB9VZRT5tFUIFjm+aCkr0TU2EuyJ5gh52fpLoA= github.com/flyteorg/flytepropeller v1.1.98/go.mod h1:R0CB6Uzp9F4YyvPmLRE7XyXxDebAPFD+LbHTf07mBzI= github.com/flyteorg/flytestdlib v1.0.20 h1:BrCQMlpdrFAPlADFJvCyn7gm+37df9WGYqLEB1mOlCQ= diff --git a/pkg/async/notifications/factory.go b/pkg/async/notifications/factory.go index ea4d08f26..88d6ead61 100644 --- a/pkg/async/notifications/factory.go +++ b/pkg/async/notifications/factory.go @@ -66,8 +66,10 @@ func GetEmailer(config runtimeInterfaces.NotificationsConfig, scope promutils.Sc scope, sesClient, ) - case common.Local: + case common.Sandbox: return implementations.NewSendGridEmailer(config, scope) + case common.Local: + fallthrough default: logger.Infof(context.Background(), "Using default noop emailer implementation for config type [%s]", config.Type) return implementations.NewNoopEmail() @@ -120,9 +122,11 @@ func NewNotificationsProcessor(config runtimeInterfaces.NotificationsConfig, sco } emailer = GetEmailer(config, scope) return implementations.NewGcpProcessor(sub, emailer, scope) - case common.Local: + case common.Sandbox: emailer = GetEmailer(config, scope) return implementations.NewSandboxProcessor(emailer) + case common.Local: + fallthrough default: logger.Infof(context.Background(), "Using default noop notifications processor implementation for config type [%s]", config.Type) @@ -172,8 +176,10 @@ func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, sco panic(err) } return implementations.NewPublisher(publisher, scope) - case common.Local: + case common.Sandbox: return implementations.NewSandboxPublisher() + case common.Local: + fallthrough default: logger.Infof(context.Background(), "Using default noop notifications publisher implementation for config type [%s]", config.Type) diff --git a/pkg/async/notifications/implementations/sandbox_publisher_test.go b/pkg/async/notifications/implementations/sandbox_publisher_test.go index ede3bfe1e..97c103666 100644 --- a/pkg/async/notifications/implementations/sandbox_publisher_test.go +++ b/pkg/async/notifications/implementations/sandbox_publisher_test.go @@ -9,10 +9,8 @@ import ( ) func TestSandboxPublisher_Publish(t *testing.T) { - // Initialize the sandbox publisher publisher := NewSandboxPublisher() - // Create a channel to communicate failures errChan := make(chan string) go func() { @@ -24,7 +22,6 @@ func TestSandboxPublisher_Publish(t *testing.T) { } }() - // Run the Publish method err := publisher.Publish(context.Background(), "NOTIFICATION_TYPE", &testEmail) // Check if there was an error in the goroutine @@ -35,6 +32,5 @@ func TestSandboxPublisher_Publish(t *testing.T) { // no error from the goroutine } - // Ensure there was no error in the Publish method assert.Nil(t, err) } diff --git a/pkg/common/cloud.go b/pkg/common/cloud.go index cba0a6879..93f3669a5 100644 --- a/pkg/common/cloud.go +++ b/pkg/common/cloud.go @@ -5,8 +5,9 @@ package common type CloudProvider = string const ( - AWS CloudProvider = "aws" - GCP CloudProvider = "gcp" - Local CloudProvider = "local" - None CloudProvider = "none" + AWS CloudProvider = "aws" + GCP CloudProvider = "gcp" + Sandbox CloudProvider = "sandbox" + Local CloudProvider = "local" + None CloudProvider = "none" ) From e5c12bff1c1ad0b7cb941493bbd412c99781f493 Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Sun, 30 Jul 2023 15:06:19 +0800 Subject: [PATCH 10/18] remove emailer setting in factory.go and add mock emailer test in sandbox_processor_test.go Signed-off-by: Future Outlier --- pkg/async/notifications/factory.go | 2 -- .../implementations/sandbox_processor.go | 1 + .../implementations/sandbox_processor_test.go | 22 +++++++++++++++++-- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/pkg/async/notifications/factory.go b/pkg/async/notifications/factory.go index 88d6ead61..aba360eb5 100644 --- a/pkg/async/notifications/factory.go +++ b/pkg/async/notifications/factory.go @@ -66,8 +66,6 @@ func GetEmailer(config runtimeInterfaces.NotificationsConfig, scope promutils.Sc scope, sesClient, ) - case common.Sandbox: - return implementations.NewSendGridEmailer(config, scope) case common.Local: fallthrough default: diff --git a/pkg/async/notifications/implementations/sandbox_processor.go b/pkg/async/notifications/implementations/sandbox_processor.go index d6b787d04..c654c6c3b 100644 --- a/pkg/async/notifications/implementations/sandbox_processor.go +++ b/pkg/async/notifications/implementations/sandbox_processor.go @@ -43,6 +43,7 @@ func (p *SandboxProcessor) run() error { } default: logger.Debugf(context.Background(), "no message to process") + return nil } } } diff --git a/pkg/async/notifications/implementations/sandbox_processor_test.go b/pkg/async/notifications/implementations/sandbox_processor_test.go index ff6c977c6..9ce752097 100644 --- a/pkg/async/notifications/implementations/sandbox_processor_test.go +++ b/pkg/async/notifications/implementations/sandbox_processor_test.go @@ -1,15 +1,18 @@ package implementations import ( + "context" "testing" + "github.com/flyteorg/flyteadmin/pkg/async/notifications/mocks" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" ) -func TestSandboxProcessor_StartProcessing(t *testing.T) { - // Mock a message +var mockSandboxEmailer mocks.MockEmailer + +func TestSandboxProcessor_UnmarshalMessage(t *testing.T) { var emailMessage admin.EmailMessage err := proto.Unmarshal(msg, &emailMessage) assert.Nil(t, err) @@ -19,3 +22,18 @@ func TestSandboxProcessor_StartProcessing(t *testing.T) { assert.Equal(t, emailMessage.SubjectLine, testEmail.SubjectLine) assert.Equal(t, emailMessage.SenderEmail, testEmail.SenderEmail) } + +func TestSandboxProcessor_StartProcessing(t *testing.T) { + + testSandboxProcessor := NewSandboxProcessor(&mockSandboxEmailer) + + sendEmailValidationFunc := func(ctx context.Context, email admin.EmailMessage) error { + assert.Equal(t, testEmail.Body, email.Body) + assert.Equal(t, testEmail.RecipientsEmail, email.RecipientsEmail) + assert.Equal(t, testEmail.SubjectLine, email.SubjectLine) + assert.Equal(t, testEmail.SenderEmail, email.SenderEmail) + return nil + } + mockSandboxEmailer.SetSendEmailFunc(sendEmailValidationFunc) + assert.Nil(t, testSandboxProcessor.(*SandboxProcessor).run()) +} From be74e6ec69aa4425cf2c69378160ba50cb12ce10 Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Sun, 30 Jul 2023 17:06:33 +0800 Subject: [PATCH 11/18] update file go.mod and go.sum Signed-off-by: Future Outlier --- go.mod | 20 +++++++++++--------- go.sum | 41 +++++++++++++++++++---------------------- 2 files changed, 30 insertions(+), 31 deletions(-) diff --git a/go.mod b/go.mod index e0ec8d581..13acd65cc 100644 --- a/go.mod +++ b/go.mod @@ -79,6 +79,8 @@ require ( github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect github.com/Azure/go-autorest/logger v0.2.1 // indirect github.com/Azure/go-autorest/tracing v0.6.0 // indirect + github.com/PuerkitoBio/purell v1.1.1 // indirect + github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect github.com/asaskevich/govalidator v0.0.0-20200428143746-21a406dcc535 // indirect github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 // indirect github.com/beorn7/perks v1.0.1 // indirect @@ -91,14 +93,14 @@ require ( github.com/eapache/go-resiliency v1.2.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect github.com/eapache/queue v1.1.0 // indirect - github.com/emicklei/go-restful/v3 v3.9.0 // indirect + github.com/emicklei/go-restful/v3 v3.8.0 // indirect github.com/fatih/color v1.13.0 // indirect github.com/felixge/httpsnoop v1.0.1 // indirect github.com/fsnotify/fsnotify v1.5.1 // indirect github.com/go-logr/logr v1.2.3 // indirect - github.com/go-openapi/jsonpointer v0.19.6 // indirect - github.com/go-openapi/jsonreference v0.20.1 // indirect - github.com/go-openapi/swag v0.22.3 // indirect + github.com/go-openapi/jsonpointer v0.19.5 // indirect + github.com/go-openapi/jsonreference v0.19.5 // indirect + github.com/go-openapi/swag v0.19.14 // indirect github.com/go-sql-driver/mysql v1.7.0 // indirect github.com/go-test/deep v1.0.7 // indirect github.com/goccy/go-json v0.4.8 // indirect @@ -134,7 +136,7 @@ require ( github.com/lestrrat-go/iter v1.0.1 // indirect github.com/lestrrat-go/option v1.0.0 // indirect github.com/lib/pq v1.10.4 // indirect - github.com/mailru/easyjson v0.7.7 // indirect + github.com/mailru/easyjson v0.7.6 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/mattn/go-sqlite3 v2.0.3+incompatible // indirect @@ -185,7 +187,7 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.24.1 // indirect - sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect + sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect sigs.k8s.io/yaml v1.3.0 // indirect ) @@ -200,9 +202,9 @@ require ( go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.19.1 // indirect - k8s.io/klog/v2 v2.90.1 // indirect - k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect - k8s.io/utils v0.0.0-20230209194617-a36077c30491 // indirect + k8s.io/klog/v2 v2.70.1 // indirect + k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 // indirect + k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect ) diff --git a/go.sum b/go.sum index aaf53a11e..1e877bf40 100644 --- a/go.sum +++ b/go.sum @@ -112,7 +112,9 @@ github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBKeU+7zCJoLcc= github.com/PuerkitoBio/purell v1.1.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= +github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tNFfI= github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= +github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/Selvatico/go-mocket v1.0.7 h1:sXuFMnMfVL9b/Os8rGXPgbOFbr4HJm8aHsulD/uMTUk= github.com/Selvatico/go-mocket v1.0.7/go.mod h1:4gO2v+uQmsL+jzQgLANy3tyEFzaEzHlymVbZ3GP2Oes= @@ -270,8 +272,8 @@ github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkg github.com/elazarl/goproxy v0.0.0-20181003060214-f58a169a71a5/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= -github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE= -github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/emicklei/go-restful/v3 v3.8.0 h1:eCZ8ulSerjdAiaNpF7GxXIE7ZCMo1moN1qX+S609eVw= +github.com/emicklei/go-restful/v3 v3.8.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -357,16 +359,14 @@ github.com/go-openapi/jsonpointer v0.17.0/go.mod h1:cOnomiV+CVVwFLk0A/MExoFMjwds github.com/go-openapi/jsonpointer v0.18.0/go.mod h1:cOnomiV+CVVwFLk0A/MExoFMjwdsUdVpsRhURCKh+3M= github.com/go-openapi/jsonpointer v0.19.2/go.mod h1:3akKfEdA7DF1sugOqz1dVQHBcuDBPKZGEoHC/NkiQRg= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= +github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY= github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= -github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE= -github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs= github.com/go-openapi/jsonreference v0.17.0/go.mod h1:g4xxGn04lDIRh0GJb5QlpE3HfopLOL6uZrK/VgnsK9I= github.com/go-openapi/jsonreference v0.18.0/go.mod h1:g4xxGn04lDIRh0GJb5QlpE3HfopLOL6uZrK/VgnsK9I= github.com/go-openapi/jsonreference v0.19.2/go.mod h1:jMjeRr2HHw6nAVajTXJ4eiUwohSTlpa0o73RUL1owJc= github.com/go-openapi/jsonreference v0.19.3/go.mod h1:rjx6GuL8TTa9VaixXglHmQmIL98+wF9xc8zWvFonSJ8= +github.com/go-openapi/jsonreference v0.19.5 h1:1WJP/wi4OjB4iV8KVbH73rQaoialJrqv8gitZLxGLtM= github.com/go-openapi/jsonreference v0.19.5/go.mod h1:RdybgQwPxbL4UEjuAruzK1x3nE69AqPYEJeo/TWfEeg= -github.com/go-openapi/jsonreference v0.20.1 h1:FBLnyygC4/IZZr893oiomc9XaghoveYTrLC1F86HID8= -github.com/go-openapi/jsonreference v0.20.1/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k= github.com/go-openapi/loads v0.17.0/go.mod h1:72tmFy5wsWx89uEVddd0RjRWPZm92WRLhf7AC+0+OOU= github.com/go-openapi/loads v0.18.0/go.mod h1:72tmFy5wsWx89uEVddd0RjRWPZm92WRLhf7AC+0+OOU= github.com/go-openapi/loads v0.19.0/go.mod h1:72tmFy5wsWx89uEVddd0RjRWPZm92WRLhf7AC+0+OOU= @@ -397,9 +397,8 @@ github.com/go-openapi/swag v0.19.2/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= github.com/go-openapi/swag v0.19.7/go.mod h1:ao+8BpOPyKdpQz3AOJfbeEVpLmWAvlT1IfTe5McPyhY= github.com/go-openapi/swag v0.19.9/go.mod h1:ao+8BpOPyKdpQz3AOJfbeEVpLmWAvlT1IfTe5McPyhY= +github.com/go-openapi/swag v0.19.14 h1:gm3vOOXfiuw5i9p5N9xJvfjvuofpyvLA9Wr6QfK5Fng= github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ= -github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g= -github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= github.com/go-openapi/validate v0.18.0/go.mod h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+MYsct2VUrAJ4= github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2KDnRCRMUi7GTA= github.com/go-openapi/validate v0.19.3/go.mod h1:90Vh6jjkTn+OT1Eefm0ZixWNFjhtOH7vS9k0lo6zwJo= @@ -998,9 +997,8 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxv github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= -github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= @@ -1044,9 +1042,8 @@ github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.1/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs= +github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA= github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= -github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= -github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/markbates/deplist v1.0.4/go.mod h1:gRRbPbbuA8TmMiRvaOzUlRfzfjeCCBqX2A6arxN01MM= github.com/markbates/deplist v1.0.5/go.mod h1:gRRbPbbuA8TmMiRvaOzUlRfzfjeCCBqX2A6arxN01MM= github.com/markbates/deplist v1.1.3/go.mod h1:BF7ioVzAJYEtzQN/os4rt8H8Ti3h0T7EoN+7eyALktE= @@ -1142,6 +1139,7 @@ github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+ github.com/ncw/swift v1.0.53 h1:luHjjTNtekIEvHg5KdAFIBaH7bWfNkefwFnpDffSIks= github.com/ncw/swift v1.0.53/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM= github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d/go.mod h1:YUTz3bUH2ZwIWBy3CJBeOBEugqcmXREj14T+iG/4k4U= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= @@ -2115,9 +2113,8 @@ gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc/go.mod gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo= @@ -2216,15 +2213,15 @@ k8s.io/gengo v0.0.0-20211129171323-c02415ce4185/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAE k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= k8s.io/klog/v2 v2.60.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= -k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw= -k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= +k8s.io/klog/v2 v2.70.1 h1:7aaoSdahviPmR+XkS7FyxlkkXs6tHISSG03RxleQAVQ= +k8s.io/klog/v2 v2.70.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42/go.mod h1:Z/45zLw8lUo4wdiUkI+v/ImEGAvu3WatcZl3lPMR4Rk= -k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f h1:2kWPakN3i/k81b0gvD5C5FJ2kxm1WrQFanWchyKuqGg= -k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f/go.mod h1:byini6yhqGC14c3ebc/QwanvYwhuMWF6yz2F8uwW8eg= +k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 h1:MQ8BAZPZlWk3S9K4a9NCkIFQtZShWqoha7snGixVgEA= +k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1/go.mod h1:C/N6wCaBHeBHkHUesQOQy2/MZqGgMAFPqGsGQLdbZBU= k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= -k8s.io/utils v0.0.0-20230209194617-a36077c30491 h1:r0BAOLElQnnFhE/ApUsg3iHdVYYPBjNSSOMowRZxxsY= -k8s.io/utils v0.0.0-20230209194617-a36077c30491/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed h1:jAne/RjBTyawwAy0utX5eqigAwz/lQhTmy+Hr/Cpue4= +k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw= modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk= modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k= @@ -2238,8 +2235,8 @@ sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.30/go.mod h1:fEO7lR sigs.k8s.io/controller-runtime v0.12.1 h1:4BJY01xe9zKQti8oRjj/NeHKRXthf1YkYJAgLONFFoI= sigs.k8s.io/controller-runtime v0.12.1/go.mod h1:BKhxlA4l7FPK4AQcsuL4X6vZeWnKDXez/vp1Y8dxTU0= sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2/go.mod h1:B+TnT182UBxE84DiCz4CVE26eOSDAeYCpfDnC2kdKMY= -sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= -sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= +sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k= +sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw= sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4= sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE= From 45b050edceaee0d2d9fed5355e5976539b7b3bae Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Tue, 1 Aug 2023 13:21:23 +0800 Subject: [PATCH 12/18] Add test for NewNotificationsPublisher and NewNotificationsProcessor in factory_test.go Signed-off-by: Future Outlier --- pkg/async/notifications/factory_test.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pkg/async/notifications/factory_test.go b/pkg/async/notifications/factory_test.go index 280f383f2..78e28e44f 100644 --- a/pkg/async/notifications/factory_test.go +++ b/pkg/async/notifications/factory_test.go @@ -3,11 +3,19 @@ package notifications import ( "testing" + "github.com/flyteorg/flyteadmin/pkg/async/notifications/implementations" runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" "github.com/flyteorg/flytestdlib/promutils" "github.com/stretchr/testify/assert" ) +var ( + scope = promutils.NewScope("test_sandbox_processor") + notificationsConfig = runtimeInterfaces.NotificationsConfig{ + Type: "sandbox", + } +) + func TestGetEmailer(t *testing.T) { defer func() { r := recover(); assert.NotNil(t, r) }() cfg := runtimeInterfaces.NotificationsConfig{ @@ -23,3 +31,13 @@ func TestGetEmailer(t *testing.T) { // shouldn't reach here t.Errorf("did not panic") } + +func TestNewNotificationsProcessor(t *testing.T) { + testSandboxProcessor := NewNotificationsProcessor(notificationsConfig, scope) + assert.IsType(t, testSandboxProcessor, &implementations.SandboxProcessor{}) +} + +func TestNewNotificationPublisher(t *testing.T) { + testSandboxPublisher := NewNotificationsPublisher(notificationsConfig, scope) + assert.IsType(t, testSandboxPublisher, &implementations.SandboxPublisher{}) +} From 742c01008873b18669a2e2ef490a6e8a3eb9858b Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Tue, 1 Aug 2023 14:52:58 +0800 Subject: [PATCH 13/18] complete test for StartProcessing, Publish and StopProcessing in TestNewNotificationPublisherAndProcessor Signed-off-by: Future Outlier --- pkg/async/notifications/factory_test.go | 26 ++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/pkg/async/notifications/factory_test.go b/pkg/async/notifications/factory_test.go index 78e28e44f..c10bab230 100644 --- a/pkg/async/notifications/factory_test.go +++ b/pkg/async/notifications/factory_test.go @@ -1,10 +1,12 @@ package notifications import ( + "context" "testing" "github.com/flyteorg/flyteadmin/pkg/async/notifications/implementations" runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flytestdlib/promutils" "github.com/stretchr/testify/assert" ) @@ -14,6 +16,15 @@ var ( notificationsConfig = runtimeInterfaces.NotificationsConfig{ Type: "sandbox", } + testEmail = admin.EmailMessage{ + RecipientsEmail: []string{ + "a@example.com", + "b@example.com", + }, + SenderEmail: "no-reply@example.com", + SubjectLine: "Test email", + Body: "This is a sample email.", + } ) func TestGetEmailer(t *testing.T) { @@ -32,12 +43,17 @@ func TestGetEmailer(t *testing.T) { t.Errorf("did not panic") } -func TestNewNotificationsProcessor(t *testing.T) { +func TestNewNotificationPublisherAndProcessor(t *testing.T) { + testSandboxPublisher := NewNotificationsPublisher(notificationsConfig, scope) + assert.IsType(t, testSandboxPublisher, &implementations.SandboxPublisher{}) testSandboxProcessor := NewNotificationsProcessor(notificationsConfig, scope) assert.IsType(t, testSandboxProcessor, &implementations.SandboxProcessor{}) -} -func TestNewNotificationPublisher(t *testing.T) { - testSandboxPublisher := NewNotificationsPublisher(notificationsConfig, scope) - assert.IsType(t, testSandboxPublisher, &implementations.SandboxPublisher{}) + go func() { + testSandboxProcessor.StartProcessing() + }() + + assert.Nil(t, testSandboxPublisher.Publish(context.Background(), "TEST_NOTIFICATION", &testEmail)) + + assert.Nil(t, testSandboxProcessor.StopProcessing()) } From c05a78c7717db9b97b4fa4b035206ea03ba6ae4f Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Fri, 4 Aug 2023 08:55:10 +0800 Subject: [PATCH 14/18] refactor msg channel using singleton pattern and write better unit testing Signed-off-by: Future Outlier --- pkg/async/notifications/factory.go | 16 +++++++++++-- .../implementations/sandbox_processor.go | 10 ++++---- .../implementations/sandbox_processor_test.go | 4 ++-- .../implementations/sandbox_publisher.go | 14 ++++++----- .../implementations/sandbox_publisher_test.go | 24 +++---------------- 5 files changed, 33 insertions(+), 35 deletions(-) diff --git a/pkg/async/notifications/factory.go b/pkg/async/notifications/factory.go index aba360eb5..a2fd4c357 100644 --- a/pkg/async/notifications/factory.go +++ b/pkg/async/notifications/factory.go @@ -3,6 +3,7 @@ package notifications import ( "context" "fmt" + "sync" "time" "github.com/flyteorg/flyteadmin/pkg/async" @@ -27,6 +28,9 @@ const maxRetries = 3 var enable64decoding = false +var msgChan chan []byte +var once sync.Once + type PublisherConfig struct { TopicName string } @@ -41,6 +45,13 @@ type EmailerConfig struct { BaseURL string } +// For sandbox only +func CreateMsgChan() { + once.Do(func() { + msgChan = make(chan []byte) + }) +} + func GetEmailer(config runtimeInterfaces.NotificationsConfig, scope promutils.Scope) interfaces.Emailer { // If an external email service is specified use that instead. // TODO: Handling of this is messy, see https://github.com/flyteorg/flyte/issues/1063 @@ -122,7 +133,7 @@ func NewNotificationsProcessor(config runtimeInterfaces.NotificationsConfig, sco return implementations.NewGcpProcessor(sub, emailer, scope) case common.Sandbox: emailer = GetEmailer(config, scope) - return implementations.NewSandboxProcessor(emailer) + return implementations.NewSandboxProcessor(msgChan, emailer) case common.Local: fallthrough default: @@ -175,7 +186,8 @@ func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, sco } return implementations.NewPublisher(publisher, scope) case common.Sandbox: - return implementations.NewSandboxPublisher() + CreateMsgChan() + return implementations.NewSandboxPublisher(msgChan) case common.Local: fallthrough default: diff --git a/pkg/async/notifications/implementations/sandbox_processor.go b/pkg/async/notifications/implementations/sandbox_processor.go index c654c6c3b..112c27901 100644 --- a/pkg/async/notifications/implementations/sandbox_processor.go +++ b/pkg/async/notifications/implementations/sandbox_processor.go @@ -12,7 +12,8 @@ import ( ) type SandboxProcessor struct { - email interfaces.Emailer + email interfaces.Emailer + msgChan chan []byte } func (p *SandboxProcessor) StartProcessing() { @@ -29,7 +30,7 @@ func (p *SandboxProcessor) run() error { for { select { - case msg := <-msgChan: + case msg := <-p.msgChan: err := proto.Unmarshal(msg, &emailMessage) if err != nil { logger.Errorf(context.Background(), "error with unmarshalling message [%v]", err) @@ -53,8 +54,9 @@ func (p *SandboxProcessor) StopProcessing() error { return nil } -func NewSandboxProcessor(emailer interfaces.Emailer) interfaces.Processor { +func NewSandboxProcessor(msgChan chan []byte, emailer interfaces.Emailer) interfaces.Processor { return &SandboxProcessor{ - email: emailer, + msgChan: msgChan, + email: emailer, } } diff --git a/pkg/async/notifications/implementations/sandbox_processor_test.go b/pkg/async/notifications/implementations/sandbox_processor_test.go index 9ce752097..45f96f226 100644 --- a/pkg/async/notifications/implementations/sandbox_processor_test.go +++ b/pkg/async/notifications/implementations/sandbox_processor_test.go @@ -24,8 +24,8 @@ func TestSandboxProcessor_UnmarshalMessage(t *testing.T) { } func TestSandboxProcessor_StartProcessing(t *testing.T) { - - testSandboxProcessor := NewSandboxProcessor(&mockSandboxEmailer) + msgChan := make(chan []byte, 1) + testSandboxProcessor := NewSandboxProcessor(msgChan, &mockSandboxEmailer) sendEmailValidationFunc := func(ctx context.Context, email admin.EmailMessage) error { assert.Equal(t, testEmail.Body, email.Body) diff --git a/pkg/async/notifications/implementations/sandbox_publisher.go b/pkg/async/notifications/implementations/sandbox_publisher.go index 735dc054b..a50e5fb36 100644 --- a/pkg/async/notifications/implementations/sandbox_publisher.go +++ b/pkg/async/notifications/implementations/sandbox_publisher.go @@ -7,9 +7,9 @@ import ( "github.com/golang/protobuf/proto" ) -type SandboxPublisher struct{} - -var msgChan = make(chan []byte) +type SandboxPublisher struct { + msgChan chan []byte +} func (p *SandboxPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error { logger.Debugf(ctx, "Publishing the following message [%s]", msg.String()) @@ -21,11 +21,13 @@ func (p *SandboxPublisher) Publish(ctx context.Context, notificationType string, return err } - msgChan <- data + p.msgChan <- data return nil } -func NewSandboxPublisher() *SandboxPublisher { - return &SandboxPublisher{} +func NewSandboxPublisher(msgChan chan []byte) *SandboxPublisher { + return &SandboxPublisher{ + msgChan: msgChan, + } } diff --git a/pkg/async/notifications/implementations/sandbox_publisher_test.go b/pkg/async/notifications/implementations/sandbox_publisher_test.go index 97c103666..fed562a0a 100644 --- a/pkg/async/notifications/implementations/sandbox_publisher_test.go +++ b/pkg/async/notifications/implementations/sandbox_publisher_test.go @@ -3,34 +3,16 @@ package implementations import ( "context" "testing" - "time" "github.com/stretchr/testify/assert" ) func TestSandboxPublisher_Publish(t *testing.T) { - publisher := NewSandboxPublisher() - - errChan := make(chan string) - - go func() { - select { - case <-msgChan: - // if message received, no need to send an error - case <-time.After(time.Second * 5): - errChan <- "No data was received in the channel within the expected time frame" - } - }() + msgChan := make(chan []byte, 1) + publisher := NewSandboxPublisher(msgChan) err := publisher.Publish(context.Background(), "NOTIFICATION_TYPE", &testEmail) - // Check if there was an error in the goroutine - select { - case errMsg := <-errChan: - t.Fatal(errMsg) - default: - // no error from the goroutine - } - + assert.NotZero(t, len(msgChan)) assert.Nil(t, err) } From 44e5e860a313f36831e5ebd318c06b89b79371b4 Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Sat, 5 Aug 2023 10:37:58 +0800 Subject: [PATCH 15/18] refactor the publisher and processor both of their msg channel and rename both of them Signed-off-by: Future Outlier --- .../implementations/sandbox_processor.go | 10 +++++----- .../implementations/sandbox_processor_test.go | 12 ------------ .../implementations/sandbox_publisher.go | 8 ++++---- 3 files changed, 9 insertions(+), 21 deletions(-) diff --git a/pkg/async/notifications/implementations/sandbox_processor.go b/pkg/async/notifications/implementations/sandbox_processor.go index 112c27901..4b89f277a 100644 --- a/pkg/async/notifications/implementations/sandbox_processor.go +++ b/pkg/async/notifications/implementations/sandbox_processor.go @@ -13,7 +13,7 @@ import ( type SandboxProcessor struct { email interfaces.Emailer - msgChan chan []byte + subChan <-chan []byte } func (p *SandboxProcessor) StartProcessing() { @@ -30,7 +30,7 @@ func (p *SandboxProcessor) run() error { for { select { - case msg := <-p.msgChan: + case msg := <-p.subChan: err := proto.Unmarshal(msg, &emailMessage) if err != nil { logger.Errorf(context.Background(), "error with unmarshalling message [%v]", err) @@ -39,7 +39,7 @@ func (p *SandboxProcessor) run() error { err = p.email.SendEmail(context.Background(), emailMessage) if err != nil { - logger.Errorf(context.Background(), "error with sendemail message [%v] ", err) + logger.Errorf(context.Background(), "Error sending an email message for message [%s] with emailM with err: %v", emailMessage.String(), err) return err } default: @@ -54,9 +54,9 @@ func (p *SandboxProcessor) StopProcessing() error { return nil } -func NewSandboxProcessor(msgChan chan []byte, emailer interfaces.Emailer) interfaces.Processor { +func NewSandboxProcessor(subChan <-chan []byte, emailer interfaces.Emailer) interfaces.Processor { return &SandboxProcessor{ - msgChan: msgChan, + subChan: subChan, email: emailer, } } diff --git a/pkg/async/notifications/implementations/sandbox_processor_test.go b/pkg/async/notifications/implementations/sandbox_processor_test.go index 45f96f226..6def96e99 100644 --- a/pkg/async/notifications/implementations/sandbox_processor_test.go +++ b/pkg/async/notifications/implementations/sandbox_processor_test.go @@ -6,23 +6,11 @@ import ( "github.com/flyteorg/flyteadmin/pkg/async/notifications/mocks" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" - "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" ) var mockSandboxEmailer mocks.MockEmailer -func TestSandboxProcessor_UnmarshalMessage(t *testing.T) { - var emailMessage admin.EmailMessage - err := proto.Unmarshal(msg, &emailMessage) - assert.Nil(t, err) - - assert.Equal(t, emailMessage.Body, testEmail.Body) - assert.Equal(t, emailMessage.RecipientsEmail, testEmail.RecipientsEmail) - assert.Equal(t, emailMessage.SubjectLine, testEmail.SubjectLine) - assert.Equal(t, emailMessage.SenderEmail, testEmail.SenderEmail) -} - func TestSandboxProcessor_StartProcessing(t *testing.T) { msgChan := make(chan []byte, 1) testSandboxProcessor := NewSandboxProcessor(msgChan, &mockSandboxEmailer) diff --git a/pkg/async/notifications/implementations/sandbox_publisher.go b/pkg/async/notifications/implementations/sandbox_publisher.go index a50e5fb36..ab94b1ee6 100644 --- a/pkg/async/notifications/implementations/sandbox_publisher.go +++ b/pkg/async/notifications/implementations/sandbox_publisher.go @@ -8,7 +8,7 @@ import ( ) type SandboxPublisher struct { - msgChan chan []byte + pubChan chan<- []byte } func (p *SandboxPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error { @@ -21,13 +21,13 @@ func (p *SandboxPublisher) Publish(ctx context.Context, notificationType string, return err } - p.msgChan <- data + p.pubChan <- data return nil } -func NewSandboxPublisher(msgChan chan []byte) *SandboxPublisher { +func NewSandboxPublisher(pubChan chan<- []byte) *SandboxPublisher { return &SandboxPublisher{ - msgChan: msgChan, + pubChan: pubChan, } } From c2a9301bb8bfcf15485474725a87875358178e33 Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Wed, 9 Aug 2023 11:01:41 +0800 Subject: [PATCH 16/18] add email error for sandbox processor test, trying to imporove the code coverage Signed-off-by: Future Outlier --- .../implementations/sandbox_processor_test.go | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/pkg/async/notifications/implementations/sandbox_processor_test.go b/pkg/async/notifications/implementations/sandbox_processor_test.go index 6def96e99..a875c9247 100644 --- a/pkg/async/notifications/implementations/sandbox_processor_test.go +++ b/pkg/async/notifications/implementations/sandbox_processor_test.go @@ -6,6 +6,7 @@ import ( "github.com/flyteorg/flyteadmin/pkg/async/notifications/mocks" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) @@ -13,6 +14,7 @@ var mockSandboxEmailer mocks.MockEmailer func TestSandboxProcessor_StartProcessing(t *testing.T) { msgChan := make(chan []byte, 1) + msgChan <- msg testSandboxProcessor := NewSandboxProcessor(msgChan, &mockSandboxEmailer) sendEmailValidationFunc := func(ctx context.Context, email admin.EmailMessage) error { @@ -22,6 +24,27 @@ func TestSandboxProcessor_StartProcessing(t *testing.T) { assert.Equal(t, testEmail.SenderEmail, email.SenderEmail) return nil } + mockSandboxEmailer.SetSendEmailFunc(sendEmailValidationFunc) assert.Nil(t, testSandboxProcessor.(*SandboxProcessor).run()) } + +func TestSandboxProcessor_StartProcessingEmailError(t *testing.T) { + msgChan := make(chan []byte, 1) + msgChan <- msg + testSandboxProcessor := NewSandboxProcessor(msgChan, &mockSandboxEmailer) + + emailError := errors.New("error sending email") + sendEmailValidationFunc := func(ctx context.Context, email admin.EmailMessage) error { + return emailError + } + + mockSandboxEmailer.SetSendEmailFunc(sendEmailValidationFunc) + assert.NotNil(t, testSandboxProcessor.(*SandboxProcessor).run()) +} + +func TestSandboxProcessor_StopProcessing(t *testing.T) { + msgChan := make(chan []byte, 1) + testSandboxProcessor := NewSandboxProcessor(msgChan, &mockSandboxEmailer) + assert.Nil(t, testSandboxProcessor.StopProcessing()) +} From 22d84e71271641f4c88db0e9b4ebf7c5df339e87 Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Wed, 9 Aug 2023 11:26:58 +0800 Subject: [PATCH 17/18] improve test code coverage by adding test function in sandbox processor adn publisher Signed-off-by: Future Outlier --- .../implementations/sandbox_processor_test.go | 9 +++++++++ .../implementations/sandbox_publisher_test.go | 18 ++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/pkg/async/notifications/implementations/sandbox_processor_test.go b/pkg/async/notifications/implementations/sandbox_processor_test.go index a875c9247..50c808033 100644 --- a/pkg/async/notifications/implementations/sandbox_processor_test.go +++ b/pkg/async/notifications/implementations/sandbox_processor_test.go @@ -29,6 +29,15 @@ func TestSandboxProcessor_StartProcessing(t *testing.T) { assert.Nil(t, testSandboxProcessor.(*SandboxProcessor).run()) } +func TestSandboxProcessor_StartProcessingMessageError(t *testing.T) { + msgChan := make(chan []byte, 1) + invalidProtoMessage := []byte("invalid message") + msgChan <- invalidProtoMessage + testSandboxProcessor := NewSandboxProcessor(msgChan, &mockSandboxEmailer) + + assert.NotNil(t, testSandboxProcessor.(*SandboxProcessor).run()) +} + func TestSandboxProcessor_StartProcessingEmailError(t *testing.T) { msgChan := make(chan []byte, 1) msgChan <- msg diff --git a/pkg/async/notifications/implementations/sandbox_publisher_test.go b/pkg/async/notifications/implementations/sandbox_publisher_test.go index fed562a0a..5a73186c1 100644 --- a/pkg/async/notifications/implementations/sandbox_publisher_test.go +++ b/pkg/async/notifications/implementations/sandbox_publisher_test.go @@ -4,9 +4,18 @@ import ( "context" "testing" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) +// mockMessage is a dummy proto message that will always fail to marshal +type mockMessage struct{} + +func (m *mockMessage) Reset() {} +func (m *mockMessage) String() string { return "mockMessage" } +func (m *mockMessage) ProtoMessage() {} +func (m *mockMessage) Marshal() ([]byte, error) { return nil, errors.New("forced marshal error") } + func TestSandboxPublisher_Publish(t *testing.T) { msgChan := make(chan []byte, 1) publisher := NewSandboxPublisher(msgChan) @@ -16,3 +25,12 @@ func TestSandboxPublisher_Publish(t *testing.T) { assert.NotZero(t, len(msgChan)) assert.Nil(t, err) } + +func TestSandboxPublisher_PublishMarshalError(t *testing.T) { + msgChan := make(chan []byte, 1) + publisher := NewSandboxPublisher(msgChan) + + err := publisher.Publish(context.Background(), "testMarshallError", &mockMessage{}) + assert.Error(t, err) + assert.Equal(t, "forced marshal error", err.Error()) +} From 85bbe17955c8eba3f69a9d6531fc2a8302e7c4f0 Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Wed, 9 Aug 2023 14:56:21 +0800 Subject: [PATCH 18/18] make sandbox_processor and sandbox_publisher to 100% code coverage Signed-off-by: Future Outlier --- .../implementations/sandbox_processor_test.go | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/pkg/async/notifications/implementations/sandbox_processor_test.go b/pkg/async/notifications/implementations/sandbox_processor_test.go index 50c808033..6bbee8cf5 100644 --- a/pkg/async/notifications/implementations/sandbox_processor_test.go +++ b/pkg/async/notifications/implementations/sandbox_processor_test.go @@ -3,6 +3,7 @@ package implementations import ( "context" "testing" + "time" "github.com/flyteorg/flyteadmin/pkg/async/notifications/mocks" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" @@ -12,7 +13,7 @@ import ( var mockSandboxEmailer mocks.MockEmailer -func TestSandboxProcessor_StartProcessing(t *testing.T) { +func TestSandboxProcessor_StartProcessingSuccess(t *testing.T) { msgChan := make(chan []byte, 1) msgChan <- msg testSandboxProcessor := NewSandboxProcessor(msgChan, &mockSandboxEmailer) @@ -29,12 +30,36 @@ func TestSandboxProcessor_StartProcessing(t *testing.T) { assert.Nil(t, testSandboxProcessor.(*SandboxProcessor).run()) } +func TestSandboxProcessor_StartProcessingNoMessage(t *testing.T) { + msgChan := make(chan []byte, 1) + testSandboxProcessor := NewSandboxProcessor(msgChan, &mockSandboxEmailer) + go testSandboxProcessor.StartProcessing() + time.Sleep(1 * time.Second) +} + +func TestSandboxProcessor_StartProcessingError(t *testing.T) { + msgChan := make(chan []byte, 1) + msgChan <- msg + + emailError := errors.New("error running processor") + sendEmailValidationFunc := func(ctx context.Context, email admin.EmailMessage) error { + return emailError + } + mockSandboxEmailer.SetSendEmailFunc(sendEmailValidationFunc) + + testSandboxProcessor := NewSandboxProcessor(msgChan, &mockSandboxEmailer) + go testSandboxProcessor.StartProcessing() + + // give time to receive the err in StartProcessing + time.Sleep(1 * time.Second) + assert.Zero(t, len(msgChan)) +} + func TestSandboxProcessor_StartProcessingMessageError(t *testing.T) { msgChan := make(chan []byte, 1) invalidProtoMessage := []byte("invalid message") msgChan <- invalidProtoMessage testSandboxProcessor := NewSandboxProcessor(msgChan, &mockSandboxEmailer) - assert.NotNil(t, testSandboxProcessor.(*SandboxProcessor).run()) }