diff --git a/quesma/quesma/config/config_v2.go b/quesma/quesma/config/config_v2.go index 4a9be930c..9bbbb345a 100644 --- a/quesma/quesma/config/config_v2.go +++ b/quesma/quesma/config/config_v2.go @@ -313,6 +313,9 @@ func (c *QuesmaNewConfiguration) validatePipelines() error { } func (c *QuesmaNewConfiguration) validateFrontendConnector(fc FrontendConnector) error { + if len(fc.Name) == 0 { + return fmt.Errorf("frontend connector must have a non-empty name") + } if fc.Type != ElasticsearchFrontendIngestConnectorName && fc.Type != ElasticsearchFrontendQueryConnectorName { return fmt.Errorf(fmt.Sprintf("frontend connector's [%s] type not recognized, only `%s` and `%s` are supported at this moment", fc.Name, ElasticsearchFrontendIngestConnectorName, ElasticsearchFrontendQueryConnectorName)) } @@ -344,6 +347,9 @@ func (c *QuesmaNewConfiguration) definedProcessorNames() []string { } func (c *QuesmaNewConfiguration) validateProcessor(p Processor) error { + if len(p.Name) == 0 { + return fmt.Errorf("processor must have a non-empty name") + } if !slices.Contains(getAllowedProcessorTypes(), p.Type) { return fmt.Errorf("processor type not recognized, only `quesma-v1-processor-noop`, `quesma-v1-processor-query` and `quesma-v1-processor-ingest` are supported at this moment") } @@ -374,6 +380,9 @@ func (c *QuesmaNewConfiguration) validateProcessor(p Processor) error { func (c *QuesmaNewConfiguration) validatePipeline(pipeline Pipeline) error { var _, errAcc error + if len(pipeline.Name) == 0 { + errAcc = multierror.Append(errAcc, fmt.Errorf("pipeline must have a non-empty name")) + } if len(pipeline.FrontendConnectors) != 1 { errAcc = multierror.Append(errAcc, fmt.Errorf("pipeline must have exactly one frontend connector")) } else if len(pipeline.FrontendConnectors) == 0 { @@ -411,23 +420,19 @@ func (c *QuesmaNewConfiguration) validatePipeline(pipeline Pipeline) error { } } if onlyProcessorInPipeline.Type == QuesmaV1ProcessorQuery || onlyProcessorInPipeline.Type == QuesmaV1ProcessorIngest { - if len(pipeline.BackendConnectors) != 2 { - return multierror.Append(errAcc, fmt.Errorf(fmt.Sprintf("pipeline %s has a processor of type %s which requires two backend connectors", pipeline.Name, onlyProcessorInPipeline.Type))) - } - bConn1, bConn2 := c.getBackendConnectorByName(pipeline.BackendConnectors[0]), c.getBackendConnectorByName(pipeline.BackendConnectors[1]) - if bConn1 == nil { - return multierror.Append(errAcc, fmt.Errorf(fmt.Sprintf("backend connector named %s referenced in %s not found in configuration", pipeline.BackendConnectors[0], pipeline.Name))) - } - if bConn2 == nil { - return multierror.Append(errAcc, fmt.Errorf(fmt.Sprintf("backend connector named %s referenced in %s not found in configuration", pipeline.BackendConnectors[1], pipeline.Name))) + foundElasticBackendConnector := false + for _, backendConnectorName := range pipeline.BackendConnectors { + backendConnector := c.getBackendConnectorByName(backendConnectorName) + if backendConnector == nil { + return multierror.Append(errAcc, fmt.Errorf(fmt.Sprintf("backend connector named %s referenced in %s not found in configuration", backendConnectorName, pipeline.Name))) + } + if backendConnector.Type == ElasticsearchBackendConnectorName { + foundElasticBackendConnector = true + } } - backendConnTypes := []string{bConn1.Type, bConn2.Type} - if !slices.Contains(backendConnTypes, ElasticsearchBackendConnectorName) { + if !foundElasticBackendConnector { return multierror.Append(errAcc, fmt.Errorf(fmt.Sprintf("pipeline %s has a processor of type %s which requires having one elasticsearch backend connector", pipeline.Name, onlyProcessorInPipeline.Type))) } - if !slices.Contains(backendConnTypes, ClickHouseBackendConnectorName) && !slices.Contains(backendConnTypes, ClickHouseOSBackendConnectorName) && !slices.Contains(backendConnTypes, HydrolixBackendConnectorName) { - return multierror.Append(errAcc, fmt.Errorf(fmt.Sprintf("pipeline %s has a processor of type %s which requires having one Clickhouse-compatible backend connector", pipeline.Name, onlyProcessorInPipeline.Type))) - } } } @@ -711,6 +716,9 @@ func (c *QuesmaNewConfiguration) getRelationalDBConf() (*RelationalDbConfigurati func (c *QuesmaNewConfiguration) validateBackendConnectors() error { elasticBackendConnectors, clickhouseBackendConnectors := 0, 0 for _, backendConn := range c.BackendConnectors { + if len(backendConn.Name) == 0 { + return fmt.Errorf("backend connector must have a non-empty name") + } if backendConn.Type == ElasticsearchBackendConnectorName { elasticBackendConnectors += 1 } else if backendConn.Type == ClickHouseBackendConnectorName || backendConn.Type == ClickHouseOSBackendConnectorName || backendConn.Type == HydrolixBackendConnectorName { diff --git a/quesma/quesma/config/config_v2_test.go b/quesma/quesma/config/config_v2_test.go index 863f0f281..defa0ff04 100644 --- a/quesma/quesma/config/config_v2_test.go +++ b/quesma/quesma/config/config_v2_test.go @@ -75,6 +75,29 @@ func TestQuesmaTransparentProxyConfiguration(t *testing.T) { assert.Equal(t, false, legacyConf.CreateCommonTable) } +func TestQuesmaTransparentProxyWithoutNoopConfiguration(t *testing.T) { + t.Skip("not working yet") + + os.Setenv(configFileLocationEnvVar, "./test_configs/quesma_as_transparent_proxy_without_noop.yml") + cfg := LoadV2Config() + if err := cfg.Validate(); err != nil { + t.Fatalf("error validating config: %v", err) + } + legacyConf := cfg.TranslateToLegacyConfig() + assert.False(t, legacyConf.TransparentProxy) // even though transparent proxy would work similarly, the user explicitly requested two Quesma pipelines + assert.Equal(t, 2, len(legacyConf.IndexConfig)) + siemIndexConf := legacyConf.IndexConfig["siem"] + logsIndexConf := legacyConf.IndexConfig["logs"] + + assert.Equal(t, []string{ElasticsearchTarget}, siemIndexConf.QueryTarget) + assert.Equal(t, []string{ElasticsearchTarget}, siemIndexConf.IngestTarget) + + assert.Equal(t, []string{ElasticsearchTarget}, logsIndexConf.QueryTarget) + assert.Equal(t, []string{ElasticsearchTarget}, logsIndexConf.IngestTarget) + assert.Equal(t, true, legacyConf.EnableIngest) + assert.Equal(t, false, legacyConf.CreateCommonTable) +} + func TestQuesmaAddingHydrolixTablesToExistingElasticsearch(t *testing.T) { os.Setenv(configFileLocationEnvVar, "./test_configs/quesma_adding_two_hydrolix_tables.yaml") cfg := LoadV2Config() @@ -87,11 +110,32 @@ func TestQuesmaAddingHydrolixTablesToExistingElasticsearch(t *testing.T) { siemIndexConf := legacyConf.IndexConfig["siem"] logsIndexConf := legacyConf.IndexConfig["logs"] - assert.Equal(t, []string{"clickhouse"}, siemIndexConf.QueryTarget) - assert.Equal(t, []string{"elasticsearch"}, siemIndexConf.IngestTarget) + assert.Equal(t, []string{ClickhouseTarget}, siemIndexConf.QueryTarget) + assert.Equal(t, []string{ElasticsearchTarget}, siemIndexConf.IngestTarget) + + assert.Equal(t, []string{ClickhouseTarget}, logsIndexConf.QueryTarget) + assert.Equal(t, []string{ElasticsearchTarget}, logsIndexConf.IngestTarget) + assert.Equal(t, true, legacyConf.EnableIngest) + assert.Equal(t, false, legacyConf.CreateCommonTable) +} + +func TestIngestWithSingleConnector(t *testing.T) { + os.Setenv(configFileLocationEnvVar, "./test_configs/ingest_with_single_connector.yaml") + cfg := LoadV2Config() + if err := cfg.Validate(); err != nil { + t.Fatalf("error validating config: %v", err) + } + legacyConf := cfg.TranslateToLegacyConfig() + assert.False(t, legacyConf.TransparentProxy) + assert.Equal(t, 2, len(legacyConf.IndexConfig)) + siemIndexConf := legacyConf.IndexConfig["siem"] + logsIndexConf := legacyConf.IndexConfig["logs"] + + assert.Equal(t, []string{ClickhouseTarget}, siemIndexConf.QueryTarget) + assert.Equal(t, []string{ElasticsearchTarget}, siemIndexConf.IngestTarget) - assert.Equal(t, []string{"clickhouse"}, logsIndexConf.QueryTarget) - assert.Equal(t, []string{"elasticsearch"}, logsIndexConf.IngestTarget) + assert.Equal(t, []string{ClickhouseTarget}, logsIndexConf.QueryTarget) + assert.Equal(t, []string{ElasticsearchTarget}, logsIndexConf.IngestTarget) assert.Equal(t, true, legacyConf.EnableIngest) assert.Equal(t, false, legacyConf.CreateCommonTable) } @@ -111,9 +155,9 @@ func TestQuesmaHydrolixQueryOnly(t *testing.T) { logsIndexConf, ok := legacyConf.IndexConfig["logs"] assert.True(t, ok) - assert.Equal(t, []string{"clickhouse"}, siemIndexConf.QueryTarget) + assert.Equal(t, []string{ClickhouseTarget}, siemIndexConf.QueryTarget) - assert.Equal(t, []string{"clickhouse"}, logsIndexConf.QueryTarget) + assert.Equal(t, []string{ClickhouseTarget}, logsIndexConf.QueryTarget) assert.Equal(t, false, legacyConf.EnableIngest) assert.Equal(t, false, legacyConf.IngestStatistics) diff --git a/quesma/quesma/config/test_configs/ingest_with_single_connector.yaml b/quesma/quesma/config/test_configs/ingest_with_single_connector.yaml new file mode 100644 index 000000000..72a4178e1 --- /dev/null +++ b/quesma/quesma/config/test_configs/ingest_with_single_connector.yaml @@ -0,0 +1,59 @@ +# Similar to quesma_adding_two_hydrolix_tables.yaml, +# but the ingest processor has only a single backend connector. + +logging: + level: info +frontendConnectors: + - name: elastic-ingest + type: elasticsearch-fe-ingest + config: + listenPort: 8080 + - name: elastic-query + type: elasticsearch-fe-query + config: + listenPort: 8080 +backendConnectors: + - name: my-minimal-elasticsearch + type: elasticsearch + config: + url: "http://elasticsearch:9200" + user: elastic + password: quesmaquesma + - name: my-hydrolix-instance + type: hydrolix + config: + url: "clickhouse://localhost:9000" + user: "u" + password: "p" + database: "d" +ingestStatistics: true +processors: + - name: my-query-processor + type: quesma-v1-processor-query + config: + indexes: + siem: + target: [my-hydrolix-instance] + logs: + target: [my-hydrolix-instance] + "*": + target: [ my-minimal-elasticsearch ] + - name: my-ingest-processor + type: quesma-v1-processor-ingest + config: + indexes: + siem: + target: [ my-minimal-elasticsearch ] + logs: + target: [ my-minimal-elasticsearch ] + "*": + target: [ my-minimal-elasticsearch ] +pipelines: + - name: my-elasticsearch-proxy-read + frontendConnectors: [ elastic-query ] + processors: [ my-query-processor ] + backendConnectors: [ my-minimal-elasticsearch, my-hydrolix-instance ] + - name: my-elasticsearch-proxy-write + frontendConnectors: [ elastic-ingest ] + processors: [ my-ingest-processor ] + backendConnectors: [ my-minimal-elasticsearch ] # my-hydrolix-instance is not needed here, as we don't ingest to it diff --git a/quesma/quesma/config/test_configs/quesma_as_transparent_proxy_without_noop.yml b/quesma/quesma/config/test_configs/quesma_as_transparent_proxy_without_noop.yml new file mode 100644 index 000000000..b58d48f1b --- /dev/null +++ b/quesma/quesma/config/test_configs/quesma_as_transparent_proxy_without_noop.yml @@ -0,0 +1,56 @@ +# The recommended way to start Quesma in transparent proxy +# is to use the noop processor. However, the user can achieve +# the same thing by specifying query/ingest processors without +# routing anything to ClickHouse/Hydrolix - that should be supported, +# even if not recommended. + +logging: + level: info +frontendConnectors: + - name: elastic-ingest + type: elasticsearch-fe-ingest + config: + listenPort: 8080 + - name: elastic-query + type: elasticsearch-fe-query + config: + listenPort: 8080 +backendConnectors: + - name: my-minimal-elasticsearch + type: elasticsearch + config: + url: "http://elasticsearch:9200" + user: elastic + password: quesmaquesma + # No ClickHouse, Hydrolix connector needed! +ingestStatistics: true +processors: + - name: my-query-processor + type: quesma-v1-processor-query + config: + indexes: + siem: + target: [ my-minimal-elasticsearch ] + logs: + target: [ my-minimal-elasticsearch ] + "*": + target: [ my-minimal-elasticsearch ] + - name: my-ingest-processor + type: quesma-v1-processor-ingest + config: + indexes: + siem: + target: [ my-minimal-elasticsearch ] + logs: + target: [ my-minimal-elasticsearch ] + "*": + target: [ my-minimal-elasticsearch ] +pipelines: + - name: my-elasticsearch-transparent-proxy-read + frontendConnectors: [ elastic-query ] + processors: [ my-query-processor ] + backendConnectors: [ my-minimal-elasticsearch ] + - name: my-elasticsearch-transparent-proxy-write + frontendConnectors: [ elastic-ingest ] + processors: [ my-ingest-processor ] + backendConnectors: [ my-minimal-elasticsearch ]