Skip to content

Commit

Permalink
Remove the need for SQL-type backend connector in query/ingest proces…
Browse files Browse the repository at this point in the history
…sors and pipelines (#861)

Previously, the user always had to provide two backend connectors to the
query/ingest processor: Elastic connector and SQL-type connector (aka
ClickHouse/Hydrolix).

Remove that requirement - now only the Elastic connector is mandatory
(for Kibana internal queries) and the user doesn't need to have SQL-type
backend connector in the pipeline (in such case they won't be able to
use it in `indexes` configuration, thus only targeting queries/ingest to
Elastic).

However, this doesn't yet solve the more tricky scenario of completely
removing SQL-type backend connector from the entire configuration (it's
possible in transparent proxy mode, but not yet in dual pipeline mode).
Put another way: ClickHouse/Hydrolix is still required in the
`backendConnectors` section (except in no-op transparent proxy), but you
can now omit it from `pipelines`/`processors` sections. The more tricky
scenario is tested by (currently skipped)
`TestQuesmaTransparentProxyWithoutNoopConfiguration`.
  • Loading branch information
avelanarius authored Oct 9, 2024
1 parent e3ca75a commit 792408f
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 20 deletions.
36 changes: 22 additions & 14 deletions quesma/quesma/config/config_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,9 @@ func (c *QuesmaNewConfiguration) validatePipelines() error {
}

func (c *QuesmaNewConfiguration) validateFrontendConnector(fc FrontendConnector) error {
if len(fc.Name) == 0 {
return fmt.Errorf("frontend connector must have a non-empty name")
}
if fc.Type != ElasticsearchFrontendIngestConnectorName && fc.Type != ElasticsearchFrontendQueryConnectorName {
return fmt.Errorf(fmt.Sprintf("frontend connector's [%s] type not recognized, only `%s` and `%s` are supported at this moment", fc.Name, ElasticsearchFrontendIngestConnectorName, ElasticsearchFrontendQueryConnectorName))
}
Expand Down Expand Up @@ -344,6 +347,9 @@ func (c *QuesmaNewConfiguration) definedProcessorNames() []string {
}

func (c *QuesmaNewConfiguration) validateProcessor(p Processor) error {
if len(p.Name) == 0 {
return fmt.Errorf("processor must have a non-empty name")
}
if !slices.Contains(getAllowedProcessorTypes(), p.Type) {
return fmt.Errorf("processor type not recognized, only `quesma-v1-processor-noop`, `quesma-v1-processor-query` and `quesma-v1-processor-ingest` are supported at this moment")
}
Expand Down Expand Up @@ -374,6 +380,9 @@ func (c *QuesmaNewConfiguration) validateProcessor(p Processor) error {

func (c *QuesmaNewConfiguration) validatePipeline(pipeline Pipeline) error {
var _, errAcc error
if len(pipeline.Name) == 0 {
errAcc = multierror.Append(errAcc, fmt.Errorf("pipeline must have a non-empty name"))
}
if len(pipeline.FrontendConnectors) != 1 {
errAcc = multierror.Append(errAcc, fmt.Errorf("pipeline must have exactly one frontend connector"))
} else if len(pipeline.FrontendConnectors) == 0 {
Expand Down Expand Up @@ -411,23 +420,19 @@ func (c *QuesmaNewConfiguration) validatePipeline(pipeline Pipeline) error {
}
}
if onlyProcessorInPipeline.Type == QuesmaV1ProcessorQuery || onlyProcessorInPipeline.Type == QuesmaV1ProcessorIngest {
if len(pipeline.BackendConnectors) != 2 {
return multierror.Append(errAcc, fmt.Errorf(fmt.Sprintf("pipeline %s has a processor of type %s which requires two backend connectors", pipeline.Name, onlyProcessorInPipeline.Type)))
}
bConn1, bConn2 := c.getBackendConnectorByName(pipeline.BackendConnectors[0]), c.getBackendConnectorByName(pipeline.BackendConnectors[1])
if bConn1 == nil {
return multierror.Append(errAcc, fmt.Errorf(fmt.Sprintf("backend connector named %s referenced in %s not found in configuration", pipeline.BackendConnectors[0], pipeline.Name)))
}
if bConn2 == nil {
return multierror.Append(errAcc, fmt.Errorf(fmt.Sprintf("backend connector named %s referenced in %s not found in configuration", pipeline.BackendConnectors[1], pipeline.Name)))
foundElasticBackendConnector := false
for _, backendConnectorName := range pipeline.BackendConnectors {
backendConnector := c.getBackendConnectorByName(backendConnectorName)
if backendConnector == nil {
return multierror.Append(errAcc, fmt.Errorf(fmt.Sprintf("backend connector named %s referenced in %s not found in configuration", backendConnectorName, pipeline.Name)))
}
if backendConnector.Type == ElasticsearchBackendConnectorName {
foundElasticBackendConnector = true
}
}
backendConnTypes := []string{bConn1.Type, bConn2.Type}
if !slices.Contains(backendConnTypes, ElasticsearchBackendConnectorName) {
if !foundElasticBackendConnector {
return multierror.Append(errAcc, fmt.Errorf(fmt.Sprintf("pipeline %s has a processor of type %s which requires having one elasticsearch backend connector", pipeline.Name, onlyProcessorInPipeline.Type)))
}
if !slices.Contains(backendConnTypes, ClickHouseBackendConnectorName) && !slices.Contains(backendConnTypes, ClickHouseOSBackendConnectorName) && !slices.Contains(backendConnTypes, HydrolixBackendConnectorName) {
return multierror.Append(errAcc, fmt.Errorf(fmt.Sprintf("pipeline %s has a processor of type %s which requires having one Clickhouse-compatible backend connector", pipeline.Name, onlyProcessorInPipeline.Type)))
}
}
}

Expand Down Expand Up @@ -711,6 +716,9 @@ func (c *QuesmaNewConfiguration) getRelationalDBConf() (*RelationalDbConfigurati
func (c *QuesmaNewConfiguration) validateBackendConnectors() error {
elasticBackendConnectors, clickhouseBackendConnectors := 0, 0
for _, backendConn := range c.BackendConnectors {
if len(backendConn.Name) == 0 {
return fmt.Errorf("backend connector must have a non-empty name")
}
if backendConn.Type == ElasticsearchBackendConnectorName {
elasticBackendConnectors += 1
} else if backendConn.Type == ClickHouseBackendConnectorName || backendConn.Type == ClickHouseOSBackendConnectorName || backendConn.Type == HydrolixBackendConnectorName {
Expand Down
56 changes: 50 additions & 6 deletions quesma/quesma/config/config_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,29 @@ func TestQuesmaTransparentProxyConfiguration(t *testing.T) {
assert.Equal(t, false, legacyConf.CreateCommonTable)
}

func TestQuesmaTransparentProxyWithoutNoopConfiguration(t *testing.T) {
t.Skip("not working yet")

os.Setenv(configFileLocationEnvVar, "./test_configs/quesma_as_transparent_proxy_without_noop.yml")
cfg := LoadV2Config()
if err := cfg.Validate(); err != nil {
t.Fatalf("error validating config: %v", err)
}
legacyConf := cfg.TranslateToLegacyConfig()
assert.False(t, legacyConf.TransparentProxy) // even though transparent proxy would work similarly, the user explicitly requested two Quesma pipelines
assert.Equal(t, 2, len(legacyConf.IndexConfig))
siemIndexConf := legacyConf.IndexConfig["siem"]
logsIndexConf := legacyConf.IndexConfig["logs"]

assert.Equal(t, []string{ElasticsearchTarget}, siemIndexConf.QueryTarget)
assert.Equal(t, []string{ElasticsearchTarget}, siemIndexConf.IngestTarget)

assert.Equal(t, []string{ElasticsearchTarget}, logsIndexConf.QueryTarget)
assert.Equal(t, []string{ElasticsearchTarget}, logsIndexConf.IngestTarget)
assert.Equal(t, true, legacyConf.EnableIngest)
assert.Equal(t, false, legacyConf.CreateCommonTable)
}

func TestQuesmaAddingHydrolixTablesToExistingElasticsearch(t *testing.T) {
os.Setenv(configFileLocationEnvVar, "./test_configs/quesma_adding_two_hydrolix_tables.yaml")
cfg := LoadV2Config()
Expand All @@ -87,11 +110,32 @@ func TestQuesmaAddingHydrolixTablesToExistingElasticsearch(t *testing.T) {
siemIndexConf := legacyConf.IndexConfig["siem"]
logsIndexConf := legacyConf.IndexConfig["logs"]

assert.Equal(t, []string{"clickhouse"}, siemIndexConf.QueryTarget)
assert.Equal(t, []string{"elasticsearch"}, siemIndexConf.IngestTarget)
assert.Equal(t, []string{ClickhouseTarget}, siemIndexConf.QueryTarget)
assert.Equal(t, []string{ElasticsearchTarget}, siemIndexConf.IngestTarget)

assert.Equal(t, []string{ClickhouseTarget}, logsIndexConf.QueryTarget)
assert.Equal(t, []string{ElasticsearchTarget}, logsIndexConf.IngestTarget)
assert.Equal(t, true, legacyConf.EnableIngest)
assert.Equal(t, false, legacyConf.CreateCommonTable)
}

func TestIngestWithSingleConnector(t *testing.T) {
os.Setenv(configFileLocationEnvVar, "./test_configs/ingest_with_single_connector.yaml")
cfg := LoadV2Config()
if err := cfg.Validate(); err != nil {
t.Fatalf("error validating config: %v", err)
}
legacyConf := cfg.TranslateToLegacyConfig()
assert.False(t, legacyConf.TransparentProxy)
assert.Equal(t, 2, len(legacyConf.IndexConfig))
siemIndexConf := legacyConf.IndexConfig["siem"]
logsIndexConf := legacyConf.IndexConfig["logs"]

assert.Equal(t, []string{ClickhouseTarget}, siemIndexConf.QueryTarget)
assert.Equal(t, []string{ElasticsearchTarget}, siemIndexConf.IngestTarget)

assert.Equal(t, []string{"clickhouse"}, logsIndexConf.QueryTarget)
assert.Equal(t, []string{"elasticsearch"}, logsIndexConf.IngestTarget)
assert.Equal(t, []string{ClickhouseTarget}, logsIndexConf.QueryTarget)
assert.Equal(t, []string{ElasticsearchTarget}, logsIndexConf.IngestTarget)
assert.Equal(t, true, legacyConf.EnableIngest)
assert.Equal(t, false, legacyConf.CreateCommonTable)
}
Expand All @@ -111,9 +155,9 @@ func TestQuesmaHydrolixQueryOnly(t *testing.T) {
logsIndexConf, ok := legacyConf.IndexConfig["logs"]
assert.True(t, ok)

assert.Equal(t, []string{"clickhouse"}, siemIndexConf.QueryTarget)
assert.Equal(t, []string{ClickhouseTarget}, siemIndexConf.QueryTarget)

assert.Equal(t, []string{"clickhouse"}, logsIndexConf.QueryTarget)
assert.Equal(t, []string{ClickhouseTarget}, logsIndexConf.QueryTarget)

assert.Equal(t, false, legacyConf.EnableIngest)
assert.Equal(t, false, legacyConf.IngestStatistics)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Similar to quesma_adding_two_hydrolix_tables.yaml,
# but the ingest processor has only a single backend connector.

logging:
level: info
frontendConnectors:
- name: elastic-ingest
type: elasticsearch-fe-ingest
config:
listenPort: 8080
- name: elastic-query
type: elasticsearch-fe-query
config:
listenPort: 8080
backendConnectors:
- name: my-minimal-elasticsearch
type: elasticsearch
config:
url: "http://elasticsearch:9200"
user: elastic
password: quesmaquesma
- name: my-hydrolix-instance
type: hydrolix
config:
url: "clickhouse://localhost:9000"
user: "u"
password: "p"
database: "d"
ingestStatistics: true
processors:
- name: my-query-processor
type: quesma-v1-processor-query
config:
indexes:
siem:
target: [my-hydrolix-instance]
logs:
target: [my-hydrolix-instance]
"*":
target: [ my-minimal-elasticsearch ]
- name: my-ingest-processor
type: quesma-v1-processor-ingest
config:
indexes:
siem:
target: [ my-minimal-elasticsearch ]
logs:
target: [ my-minimal-elasticsearch ]
"*":
target: [ my-minimal-elasticsearch ]
pipelines:
- name: my-elasticsearch-proxy-read
frontendConnectors: [ elastic-query ]
processors: [ my-query-processor ]
backendConnectors: [ my-minimal-elasticsearch, my-hydrolix-instance ]
- name: my-elasticsearch-proxy-write
frontendConnectors: [ elastic-ingest ]
processors: [ my-ingest-processor ]
backendConnectors: [ my-minimal-elasticsearch ] # my-hydrolix-instance is not needed here, as we don't ingest to it
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# The recommended way to start Quesma in transparent proxy
# is to use the noop processor. However, the user can achieve
# the same thing by specifying query/ingest processors without
# routing anything to ClickHouse/Hydrolix - that should be supported,
# even if not recommended.

logging:
level: info
frontendConnectors:
- name: elastic-ingest
type: elasticsearch-fe-ingest
config:
listenPort: 8080
- name: elastic-query
type: elasticsearch-fe-query
config:
listenPort: 8080
backendConnectors:
- name: my-minimal-elasticsearch
type: elasticsearch
config:
url: "http://elasticsearch:9200"
user: elastic
password: quesmaquesma
# No ClickHouse, Hydrolix connector needed!
ingestStatistics: true
processors:
- name: my-query-processor
type: quesma-v1-processor-query
config:
indexes:
siem:
target: [ my-minimal-elasticsearch ]
logs:
target: [ my-minimal-elasticsearch ]
"*":
target: [ my-minimal-elasticsearch ]
- name: my-ingest-processor
type: quesma-v1-processor-ingest
config:
indexes:
siem:
target: [ my-minimal-elasticsearch ]
logs:
target: [ my-minimal-elasticsearch ]
"*":
target: [ my-minimal-elasticsearch ]
pipelines:
- name: my-elasticsearch-transparent-proxy-read
frontendConnectors: [ elastic-query ]
processors: [ my-query-processor ]
backendConnectors: [ my-minimal-elasticsearch ]
- name: my-elasticsearch-transparent-proxy-write
frontendConnectors: [ elastic-ingest ]
processors: [ my-ingest-processor ]
backendConnectors: [ my-minimal-elasticsearch ]

0 comments on commit 792408f

Please sign in to comment.