From 4dba2f70329d86bb885f6fac75f7b8272860c0a6 Mon Sep 17 00:00:00 2001 From: Gaurav Kumar <57063469+GrayFlash@users.noreply.github.com> Date: Fri, 3 Jun 2022 11:48:49 +0530 Subject: [PATCH] chore: improve code coverage (#350) * chore: improve code coverage for registry package * chore: test for empty discover plugin * chore: add test for invalid labels in compass sink * chore: add build plugins label to tests for tableau extractor * chore: add build plugins label to tests for bigquery extractor * chore: add test for new stats client in metrics * chore: add build plugins label to tests for bigquery extractor * fix(lint): revert back one commit * chore: improve test coverage for reistry package to 100% * chore: improve code-coverage for agent package * chore: remove unused error file from recipe * chore: add test for checking invalid template path --- agent/agent_test.go | 98 ++++++++++++++++++ metrics/statsd_test.go | 59 +++++++++++ plugins/external/discover_test.go | 4 + plugins/extractors/bigquery/profile_test.go | 3 + plugins/extractors/tableau/builder_test.go | 3 + plugins/extractors/tableau/client_test.go | 3 + plugins/sinks/compass/sink.go | 7 +- plugins/sinks/compass/sink_test.go | 77 ++++++++++++++ recipe/errors.go | 14 --- recipe/generator_test.go | 23 +++++ registry/extractors_test.go | 38 ++++++- registry/processors_test.go | 108 ++++++++++++++++++++ registry/sinks_test.go | 108 ++++++++++++++++++++ 13 files changed, 526 insertions(+), 19 deletions(-) delete mode 100644 recipe/errors.go create mode 100644 registry/processors_test.go create mode 100644 registry/sinks_test.go diff --git a/agent/agent_test.go b/agent/agent_test.go index df2fd5ac2..9370ccc6e 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -776,6 +776,91 @@ func TestAgentRunMultiple(t *testing.T) { }) } +func TestValidate(t *testing.T) { + t.Run("should return error if plugins in recipe not found in Factory", func(t *testing.T) { + r := agent.NewAgent(agent.Config{ + ExtractorFactory: registry.NewExtractorFactory(), + ProcessorFactory: registry.NewProcessorFactory(), + SinkFactory: registry.NewSinkFactory(), + Logger: utils.Logger, + }) + var expectedErrs []error + errs := r.Validate(validRecipe) + expectedErrs = append(expectedErrs, plugins.NotFoundError{Type: plugins.PluginTypeExtractor, Name: "test-extractor"}) + expectedErrs = append(expectedErrs, plugins.NotFoundError{Type: plugins.PluginTypeSink, Name: "test-sink"}) + expectedErrs = append(expectedErrs, plugins.NotFoundError{Type: plugins.PluginTypeProcessor, Name: "test-processor"}) + assert.Equal(t, 3, len(errs)) + assert.Equal(t, expectedErrs, errs) + }) + t.Run("", func(t *testing.T) { + var invalidRecipe = recipe.Recipe{ + Name: "sample", + Source: recipe.PluginRecipe{ + Name: "test-extractor", + Config: map[string]interface{}{ + "proc-foo": "proc-bar", + }, + }, + Processors: []recipe.PluginRecipe{ + { + Name: "test-processor", + Config: map[string]interface{}{ + "proc-foo": "proc-bar", + }, + }, + }, + Sinks: []recipe.PluginRecipe{ + { + Name: "test-sink", + Config: map[string]interface{}{ + "url": "http://localhost:3000/data", + }, + }, + }, + } + + extr := mocks.NewExtractor() + err := plugins.InvalidConfigError{} + extr.On("Validate", invalidRecipe.Source.Config).Return(err).Once() + ef := registry.NewExtractorFactory() + if err := ef.Register("test-extractor", newExtractor(extr)); err != nil { + t.Fatal(err) + } + + proc := mocks.NewProcessor() + proc.On("Validate", invalidRecipe.Processors[0].Config).Return(err).Once() + defer proc.AssertExpectations(t) + pf := registry.NewProcessorFactory() + if err := pf.Register("test-processor", newProcessor(proc)); err != nil { + t.Fatal(err) + } + + sink := mocks.NewSink() + sink.On("Validate", invalidRecipe.Sinks[0].Config).Return(err).Once() + defer sink.AssertExpectations(t) + sf := registry.NewSinkFactory() + if err := sf.Register("test-sink", newSink(sink)); err != nil { + t.Fatal(err) + } + + r := agent.NewAgent(agent.Config{ + ExtractorFactory: ef, + ProcessorFactory: pf, + SinkFactory: sf, + Logger: utils.Logger, + Monitor: newMockMonitor(), + }) + + var expectedErrs []error + errs := r.Validate(invalidRecipe) + assert.Equal(t, 3, len(errs)) + expectedErrs = append(expectedErrs, enrichInvalidConfigError(err, invalidRecipe.Source.Name, plugins.PluginTypeExtractor)) + expectedErrs = append(expectedErrs, enrichInvalidConfigError(err, invalidRecipe.Sinks[0].Name, plugins.PluginTypeSink)) + expectedErrs = append(expectedErrs, enrichInvalidConfigError(err, invalidRecipe.Processors[0].Name, plugins.PluginTypeProcessor)) + assert.Equal(t, expectedErrs, errs) + }) +} + func newExtractor(extr plugins.Extractor) func() plugins.Extractor { return func() plugins.Extractor { return extr @@ -821,3 +906,16 @@ type panicProcessor struct { func (p *panicProcessor) Process(_ context.Context, _ models.Record) (dst models.Record, err error) { panic("panicking") } + +// enrichInvalidConfigError enrich the error with plugin information +func enrichInvalidConfigError(err error, pluginName string, pluginType plugins.PluginType) error { + if errors.As(err, &plugins.InvalidConfigError{}) { + icErr := err.(plugins.InvalidConfigError) + icErr.PluginName = pluginName + icErr.Type = pluginType + + return icErr + } + + return err +} diff --git a/metrics/statsd_test.go b/metrics/statsd_test.go index 384eaa3d0..409688674 100644 --- a/metrics/statsd_test.go +++ b/metrics/statsd_test.go @@ -2,11 +2,18 @@ package metrics_test import ( "fmt" + "log" + "os" "testing" + "github.com/odpf/meteor/test/utils" + "github.com/odpf/meteor/agent" "github.com/odpf/meteor/metrics" "github.com/odpf/meteor/recipe" + "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) @@ -26,6 +33,47 @@ func (c *mockStatsdClient) Increment(name string) { c.Called(name) } +var port = "8125" + +func TestMain(m *testing.M) { + // setup test + opts := dockertest.RunOptions{ + Repository: "statsd/statsd", + Tag: "latest", + Env: []string{ + "MYSQL_ALLOW_EMPTY_PASSWORD=true", + }, + ExposedPorts: []string{"8125", port}, + PortBindings: map[docker.Port][]docker.PortBinding{ + "8125": { + {HostIP: "0.0.0.0", HostPort: port}, + }, + }, + } + // exponential backoff-retry, because the application in the container might not be ready to accept connections yet + retryFn := func(resource *dockertest.Resource) (err error) { + c, err := metrics.NewStatsdClient("127.0.0.1:" + port) + if err != nil { + return + } + c.Open() + return + } + purgeFn, err := utils.CreateContainer(opts, retryFn) + if err != nil { + log.Fatal(err) + } + + // run tests + code := m.Run() + + // clean tests + if err := purgeFn(); err != nil { + log.Fatal(err) + } + os.Exit(code) +} + func TestStatsdMonitorRecordRun(t *testing.T) { statsdPrefix := "testprefix" @@ -117,3 +165,14 @@ func TestStatsdMonitorRecordRun(t *testing.T) { monitor.RecordRun(agent.Run{Recipe: recipe, DurationInMs: duration, RecordCount: 2, Success: true}) }) } + +func TestNewStatsClient(t *testing.T) { + t.Run("should return error for invalid address", func(t *testing.T) { + _, err := metrics.NewStatsdClient("127.0.0.1") + assert.Error(t, err) + }) + t.Run("should return error for invalid port", func(t *testing.T) { + _, err := metrics.NewStatsdClient("127.0.0.1:81A5") + assert.Error(t, err) + }) +} diff --git a/plugins/external/discover_test.go b/plugins/external/discover_test.go index 785a2a0ec..1df695ec0 100644 --- a/plugins/external/discover_test.go +++ b/plugins/external/discover_test.go @@ -6,11 +6,15 @@ package plugins import ( "testing" + "github.com/odpf/meteor/registry" "github.com/stretchr/testify/assert" ) func TestDiscoverPlugins(t *testing.T) { // TODO: add test + factory := registry.NewProcessorFactory() + _, err := DiscoverPlugins(factory) + assert.Nil(t, err) } // once we already setup a test for DiscoverPlugins this test will not be needed diff --git a/plugins/extractors/bigquery/profile_test.go b/plugins/extractors/bigquery/profile_test.go index 6b54445d2..1e5601edb 100644 --- a/plugins/extractors/bigquery/profile_test.go +++ b/plugins/extractors/bigquery/profile_test.go @@ -1,3 +1,6 @@ +//go:build plugins +// +build plugins + package bigquery import ( diff --git a/plugins/extractors/tableau/builder_test.go b/plugins/extractors/tableau/builder_test.go index a7d5bd50b..3dc8f9f4a 100644 --- a/plugins/extractors/tableau/builder_test.go +++ b/plugins/extractors/tableau/builder_test.go @@ -1,3 +1,6 @@ +//go:build plugins +// +build plugins + package tableau import ( diff --git a/plugins/extractors/tableau/client_test.go b/plugins/extractors/tableau/client_test.go index 181ac160c..aa3abc8e8 100644 --- a/plugins/extractors/tableau/client_test.go +++ b/plugins/extractors/tableau/client_test.go @@ -1,3 +1,6 @@ +//go:build plugins +// +build plugins + package tableau import ( diff --git a/plugins/sinks/compass/sink.go b/plugins/sinks/compass/sink.go index 78ca26df9..5aca21a46 100644 --- a/plugins/sinks/compass/sink.go +++ b/plugins/sinks/compass/sink.go @@ -6,15 +6,16 @@ import ( _ "embed" "encoding/json" "fmt" + "io/ioutil" + "net/http" + "strings" + "github.com/odpf/meteor/models" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/registry" "github.com/odpf/meteor/utils" "github.com/odpf/salt/log" "github.com/pkg/errors" - "io/ioutil" - "net/http" - "strings" ) //go:embed README.md diff --git a/plugins/sinks/compass/sink_test.go b/plugins/sinks/compass/sink_test.go index bf584495c..9a0a5eff3 100644 --- a/plugins/sinks/compass/sink_test.go +++ b/plugins/sinks/compass/sink_test.go @@ -97,6 +97,83 @@ func TestSink(t *testing.T) { } }) + t.Run("should return error for various invalid labels", func(t *testing.T) { + testData := &assetsv1beta1.User{ + Resource: &commonv1beta1.Resource{ + Urn: "my-topic-urn", + Name: "my-topic", + Service: "kafka", + Type: "topic", + Description: "topic information", + }, + Properties: &facetsv1beta1.Properties{ + Attributes: utils.TryParseMapToProto(map[string]interface{}{ + "attrA": "valueAttrA", + "attrB": "valueAttrB", + }), + Labels: map[string]string{ + "labelA": "valueLabelA", + "labelB": "valueLabelB", + }, + }, + } + testPayload := compass.RequestPayload{ + Asset: compass.Asset{ + URN: "my-topic-urn", + Name: "my-topic", + Service: "kafka", + Type: "topic", + Description: "topic information", + }, + } + invalidConfigs := []map[string]interface{}{ + { + "host": host, + "labels": map[string]string{ + "foo": "$properties.attributes", + }, + }, + { + "host": host, + "labels": map[string]string{ + "foo": "$properties.attributes.12", + }, + }, + { + "host": host, + "labels": map[string]string{ + "foo": "$properties.attributes.attrC", + }, + }, + { + "host": host, + "labels": map[string]string{ + "foo": "$invalid.attributes.attrC", + }, + }, + { + "host": host, + "labels": map[string]string{ + "bar": "$properties.labels.labelC", + }, + }, + } + for _, c := range invalidConfigs { + client := newMockHTTPClient(c, http.MethodPatch, url, testPayload) + client.SetupResponse(200, "") + ctx := context.TODO() + compassSink := compass.New(client, testUtils.Logger) + err := compassSink.Init(ctx, c) + if err != nil { + t.Fatal(err) + } + err = compassSink.Sink(ctx, []models.Record{models.NewRecord(testData)}) + assert.Error(t, err) + fmt.Println(err) + } + + }) + successTestCases := []struct { description string data models.Metadata diff --git a/recipe/errors.go b/recipe/errors.go deleted file mode 100644 index 98b69b1dd..000000000 --- a/recipe/errors.go +++ /dev/null @@ -1,14 +0,0 @@ -package recipe - -import ( - "fmt" -) - -// InvalidRecipeError hold the field to show the error message -type InvalidRecipeError struct { - Message string -} - -func (err InvalidRecipeError) Error() string { - return fmt.Sprintf("invalid recipe: \"%s\"", err.Message) -} diff --git a/recipe/generator_test.go b/recipe/generator_test.go index 5940b560f..6896adac4 100644 --- a/recipe/generator_test.go +++ b/recipe/generator_test.go @@ -14,6 +14,29 @@ import ( ) func TestFromTemplate(t *testing.T) { + t.Run("should throw error for invalid template path", func(t *testing.T) { + templatePath := "./testdata/template.yaml" + outputDir := "./test/temp" + bytes, err := ioutil.ReadFile("./testdata/generator/data-3.yaml") + if err != nil { + fmt.Println(fmt.Errorf("error reading data: %w", err)) + return + } + + var data []recipe.TemplateData + if err := yaml.Unmarshal(bytes, &data); err != nil { + fmt.Println(fmt.Errorf("error parsing data: %w", err)) + return + } + + err = recipe.FromTemplate(recipe.TemplateConfig{ + TemplateFilePath: templatePath, + OutputDirPath: outputDir, + Data: data, + }) + assert.Error(t, err) + }) + t.Run("should output recipe files using template to output directory", func(t *testing.T) { templatePath := "./testdata/generator/template.yaml" outputDir := "./testdata/generator/temp" diff --git a/registry/extractors_test.go b/registry/extractors_test.go index 16874b7ba..221dd0688 100644 --- a/registry/extractors_test.go +++ b/registry/extractors_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestFactoryGet(t *testing.T) { +func TestExtractorFactoryGet(t *testing.T) { t.Run("should return not found error if extractor does not exist", func(t *testing.T) { name := "wrong-name" @@ -39,7 +39,7 @@ func TestFactoryGet(t *testing.T) { }) } -func TestFactoryRegister(t *testing.T) { +func TestExtractorFactoryRegister(t *testing.T) { t.Run("should add extractor factory with given key", func(t *testing.T) { factory := registry.Extractors err := factory.Register("mock1", newExtractor(mocks.NewExtractor())) @@ -64,6 +64,40 @@ func TestFactoryRegister(t *testing.T) { } assert.Equal(t, mocks.NewExtractor(), mock2) // Same type assert.True(t, mocks.NewExtractor() != mock2) // Different instance + + err = factory.Register("mock1", newExtractor(mocks.NewExtractor())) //error for duplicate extractor + assert.Error(t, err) + }) +} + +func TestExtractorFactoryList(t *testing.T) { + t.Run("return list for a extractor factory", func(t *testing.T) { + factory := registry.NewExtractorFactory() + extr := mocks.NewExtractor() + mockInfo := plugins.Info{ + Description: "Mock Extractor 1", + } + extr.On("Info").Return(mockInfo, nil).Once() + defer extr.AssertExpectations(t) + err := factory.Register("mock1", newExtractor(extr)) + if err != nil { + t.Error(err.Error()) + } + list := factory.List() + assert.Equal(t, mockInfo, list["mock1"]) + }) +} + +func TestExtractorFactoryInfo(t *testing.T) { + t.Run("return error for a extractor not found", func(t *testing.T) { + factory := registry.NewExtractorFactory() + extr := mocks.NewExtractor() + err := factory.Register("mock1", newExtractor(extr)) + if err != nil { + t.Error(err.Error()) + } + _, err = factory.Info("mock") + assert.Equal(t, plugins.NotFoundError{Type: plugins.PluginTypeExtractor, Name: "mock"}, err) }) } diff --git a/registry/processors_test.go b/registry/processors_test.go new file mode 100644 index 000000000..002365a21 --- /dev/null +++ b/registry/processors_test.go @@ -0,0 +1,108 @@ +package registry_test + +import ( + "testing" + + "github.com/odpf/meteor/plugins" + "github.com/odpf/meteor/registry" + "github.com/odpf/meteor/test/mocks" + "github.com/stretchr/testify/assert" +) + +func TestProcessorFactoryGet(t *testing.T) { + t.Run("should return not found error if processor does not exist", func(t *testing.T) { + name := "wrong-name" + + factory := registry.Processors + if err := factory.Register("mock", newProcessor(mocks.NewProcessor())); err != nil { + t.Error(err.Error()) + } + _, err := factory.Get(name) + assert.Equal(t, plugins.NotFoundError{Type: "processor", Name: name}, err) + }) + + t.Run("should return a new instance of processor with given name", func(t *testing.T) { + name := "mock3" + + factory := registry.Processors + if err := factory.Register(name, newProcessor(mocks.NewProcessor())); err != nil { + t.Error(err.Error()) + } + + extr, err := factory.Get(name) + if err != nil { + t.Error(err.Error()) + } + + assert.Equal(t, mocks.NewProcessor(), extr) // Same type + assert.True(t, mocks.NewProcessor() != extr) // Different instance + }) +} + +func TestProcessorFactoryRegister(t *testing.T) { + t.Run("should add processor factory with given key", func(t *testing.T) { + factory := registry.Processors + err := factory.Register("mock1", newProcessor(mocks.NewProcessor())) + if err != nil { + t.Error(err.Error()) + } + err = factory.Register("mock2", newProcessor(mocks.NewProcessor())) + if err != nil { + t.Error(err.Error()) + } + + mock1, err := factory.Get("mock1") + if err != nil { + t.Error(err.Error()) + } + assert.Equal(t, mocks.NewProcessor(), mock1) // Same type + assert.True(t, mocks.NewProcessor() != mock1) // Different instance + + mock2, err := factory.Get("mock2") + if err != nil { + t.Error(err.Error()) + } + assert.Equal(t, mocks.NewProcessor(), mock2) // Same type + assert.True(t, mocks.NewProcessor() != mock2) // Different instance + + err = factory.Register("mock1", newProcessor(mocks.NewProcessor())) //error for duplicate processor + assert.Error(t, err) + }) +} + +func TestProcessorFactoryList(t *testing.T) { + t.Run("return list for a processor factory", func(t *testing.T) { + factory := registry.NewProcessorFactory() + extr := mocks.NewProcessor() + mockInfo := plugins.Info{ + Description: "Mock Processor 1", + } + extr.On("Info").Return(mockInfo, nil).Once() + defer extr.AssertExpectations(t) + err := factory.Register("mock1", newProcessor(extr)) + if err != nil { + t.Error(err.Error()) + } + list := factory.List() + assert.Equal(t, mockInfo, list["mock1"]) + }) +} + +func TestProcessorFactoryInfo(t *testing.T) { + t.Run("return error for a processor not found", func(t *testing.T) { + factory := registry.NewProcessorFactory() + extr := mocks.NewProcessor() + err := factory.Register("mock1", newProcessor(extr)) + if err != nil { + t.Error(err.Error()) + } + _, err = factory.Info("mock") + assert.Equal(t, plugins.NotFoundError{Type: plugins.PluginTypeProcessor, Name: "mock"}, err) + }) +} + +func newProcessor(extr plugins.Processor) func() plugins.Processor { + return func() plugins.Processor { + return extr + } +} diff --git a/registry/sinks_test.go b/registry/sinks_test.go new file mode 100644 index 000000000..5a613898c --- /dev/null +++ b/registry/sinks_test.go @@ -0,0 +1,108 @@ +package registry_test + +import ( + "testing" + + "github.com/odpf/meteor/plugins" + "github.com/odpf/meteor/registry" + "github.com/odpf/meteor/test/mocks" + "github.com/stretchr/testify/assert" +) + +func TestSinkFactoryGet(t *testing.T) { + t.Run("should return not found error if sink does not exist", func(t *testing.T) { + name := "wrong-name" + + factory := registry.Sinks + if err := factory.Register("mock", newSink(mocks.NewSink())); err != nil { + t.Error(err.Error()) + } + _, err := factory.Get(name) + assert.Equal(t, plugins.NotFoundError{Type: "sink", Name: name}, err) + }) + + t.Run("should return a new instance of sink with given name", func(t *testing.T) { + name := "mock3" + + factory := registry.Sinks + if err := factory.Register(name, newSink(mocks.NewSink())); err != nil { + t.Error(err.Error()) + } + + extr, err := factory.Get(name) + if err != nil { + t.Error(err.Error()) + } + + assert.Equal(t, mocks.NewSink(), extr) // Same type + assert.True(t, mocks.NewSink() != extr) // Different instance + }) +} + +func TestSinkFactoryRegister(t *testing.T) { + t.Run("should add sink factory with given key", func(t *testing.T) { + factory := registry.Sinks + err := factory.Register("mock1", newSink(mocks.NewSink())) + if err != nil { + t.Error(err.Error()) + } + err = factory.Register("mock2", newSink(mocks.NewSink())) + if err != nil { + t.Error(err.Error()) + } + + mock1, err := factory.Get("mock1") + if err != nil { + t.Error(err.Error()) + } + assert.Equal(t, mocks.NewSink(), mock1) // Same type + assert.True(t, mocks.NewSink() != mock1) // Different instance + + mock2, err := factory.Get("mock2") + if err != nil { + t.Error(err.Error()) + } + assert.Equal(t, mocks.NewSink(), mock2) // Same type + assert.True(t, mocks.NewSink() != mock2) // Different instance + + err = factory.Register("mock1", newSink(mocks.NewSink())) //error for duplicate sink + assert.Error(t, err) + }) +} + +func TestSinkFactoryList(t *testing.T) { + t.Run("return list for a sink factory", func(t *testing.T) { + factory := registry.NewSinkFactory() + extr := mocks.NewSink() + mockInfo := plugins.Info{ + Description: "Mock Sink 1", + } + extr.On("Info").Return(mockInfo, nil).Once() + defer extr.AssertExpectations(t) + err := factory.Register("mock1", newSink(extr)) + if err != nil { + t.Error(err.Error()) + } + list := factory.List() + assert.Equal(t, mockInfo, list["mock1"]) + }) +} + +func TestSinkFactoryInfo(t *testing.T) { + t.Run("return error for a sink not found", func(t *testing.T) { + factory := registry.NewSinkFactory() + extr := mocks.NewSink() + err := factory.Register("mock1", newSink(extr)) + if err != nil { + t.Error(err.Error()) + } + _, err = factory.Info("mock") + assert.Equal(t, plugins.NotFoundError{Type: plugins.PluginTypeSink, Name: "mock"}, err) + }) +} + +func newSink(extr plugins.Syncer) func() plugins.Syncer { + return func() plugins.Syncer { + return extr + } +}