Skip to content

Commit fe700fe

Browse files
authored
[jaeger-v2] Refactor ElasticSearch/OpenSearch configurations to have more logical groupings (jaegertracing#6090)
## Which problem is this PR solving? - Resolves jaegertracing#6059 ## Description of the changes - Added more groupings for the following sub-configurations in the ElasticSearch/OpenSearch configurations - Sniffing - Authentication - Bulk Processing - The migration guide from v1 to v2 can be viewed [here](https://docs.google.com/document/d/1rabu8zvjoZeHx-HNqvK5kjsujMEBC7DkR2MYeOvB6HI/edit?tab=t.0#heading=h.abjgb642qlsg). ## How was this change tested? - CI ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` --------- Signed-off-by: Mahad Zaryab <[email protected]>
1 parent a9f9fcb commit fe700fe

File tree

7 files changed

+526
-284
lines changed

7 files changed

+526
-284
lines changed

cmd/query/app/token_propagation_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func runQueryService(t *testing.T, esURL string) *Server {
7676
}))
7777
f.InitFromViper(v, flagsSvc.Logger)
7878
// set AllowTokenFromContext manually because we don't register the respective CLI flag from query svc
79-
f.Options.Primary.AllowTokenFromContext = true
79+
f.Options.Primary.Authentication.BearerTokenAuthentication.AllowFromContext = true
8080
require.NoError(t, f.Initialize(metrics.NullFactory, flagsSvc.Logger))
8181
defer f.Close()
8282

pkg/es/config/config.go

+152-71
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,25 @@ const (
3939

4040
// IndexOptions describes the index format and rollover frequency
4141
type IndexOptions struct {
42-
Priority int64 `mapstructure:"priority"`
43-
DateLayout string `mapstructure:"date_layout"`
44-
Shards int64 `mapstructure:"shards"`
45-
Replicas int64 `mapstructure:"replicas"`
46-
RolloverFrequency string `mapstructure:"rollover_frequency"` // "hour" or "day"
42+
// Priority contains the priority of index template (ESv8 only).
43+
Priority int64 `mapstructure:"priority"`
44+
DateLayout string `mapstructure:"date_layout"`
45+
// Shards is the number of shards per index in Elasticsearch.
46+
Shards int64 `mapstructure:"shards"`
47+
// Replicas is the number of replicas per index in Elasticsearch.
48+
Replicas int64 `mapstructure:"replicas"`
49+
// RolloverFrequency contains the rollover frequency setting used to fetch
50+
// indices from elasticsearch.
51+
// Valid configuration options are: [hour, day].
52+
// This setting does not affect the index rotation and is simply used for
53+
// fetching indices.
54+
RolloverFrequency string `mapstructure:"rollover_frequency"`
4755
}
4856

4957
// Indices describes different configuration options for each index type
5058
type Indices struct {
59+
// IndexPrefix is an optional prefix to prepend to Jaeger indices.
60+
// For example, setting this field to "production" creates "production-jaeger-*".
5161
IndexPrefix IndexPrefix `mapstructure:"index_prefix"`
5262
Spans IndexOptions `mapstructure:"spans"`
5363
Services IndexOptions `mapstructure:"services"`
@@ -70,34 +80,58 @@ func (p IndexPrefix) Apply(indexName string) string {
7080

7181
// Configuration describes the configuration properties needed to connect to an ElasticSearch cluster
7282
type Configuration struct {
73-
Servers []string `mapstructure:"server_urls" valid:"required,url"`
74-
RemoteReadClusters []string `mapstructure:"remote_read_clusters"`
75-
Username string `mapstructure:"username"`
76-
Password string `mapstructure:"password" json:"-"`
77-
TokenFilePath string `mapstructure:"token_file"`
78-
PasswordFilePath string `mapstructure:"password_file"`
79-
AllowTokenFromContext bool `mapstructure:"-"`
80-
Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing
81-
SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"`
82-
MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query
83-
MaxSpanAge time.Duration `mapstructure:"-"` // configures the maximum lookback on span reads
84-
Timeout time.Duration `mapstructure:"-"`
85-
BulkSize int `mapstructure:"-"`
86-
BulkWorkers int `mapstructure:"-"`
87-
BulkActions int `mapstructure:"-"`
88-
BulkFlushInterval time.Duration `mapstructure:"-"`
89-
Indices Indices `mapstructure:"indices"`
90-
ServiceCacheTTL time.Duration `mapstructure:"service_cache_ttl"`
91-
AdaptiveSamplingLookback time.Duration `mapstructure:"-"`
92-
Tags TagsAsFields `mapstructure:"tags_as_fields"`
93-
Enabled bool `mapstructure:"-"`
94-
TLS configtls.ClientConfig `mapstructure:"tls"`
95-
UseReadWriteAliases bool `mapstructure:"use_aliases"`
96-
CreateIndexTemplates bool `mapstructure:"create_mappings"`
97-
UseILM bool `mapstructure:"use_ilm"`
98-
Version uint `mapstructure:"version"`
99-
LogLevel string `mapstructure:"log_level"`
100-
SendGetBodyAs string `mapstructure:"send_get_body_as"`
83+
// ---- connection related configs ----
84+
// Servers is a list of Elasticsearch servers. The strings must must contain full URLs
85+
// (i.e. http://localhost:9200).
86+
Servers []string `mapstructure:"server_urls" valid:"required,url"`
87+
// RemoteReadClusters is a list of Elasticsearch remote cluster names for cross-cluster
88+
// querying.
89+
RemoteReadClusters []string `mapstructure:"remote_read_clusters"`
90+
Authentication Authentication `mapstructure:"auth"`
91+
// TLS contains the TLS configuration for the connection to the ElasticSearch clusters.
92+
TLS configtls.ClientConfig `mapstructure:"tls"`
93+
Sniffing Sniffing `mapstructure:"sniffing"`
94+
// SendGetBodyAs is the HTTP verb to use for requests that contain a body.
95+
SendGetBodyAs string `mapstructure:"send_get_body_as"`
96+
// QueryTimeout contains the timeout used for queries. A timeout of zero means no timeout.
97+
QueryTimeout time.Duration `mapstructure:"query_timeout"`
98+
99+
// ---- elasticsearch client related configs ----
100+
BulkProcessing BulkProcessing `mapstructure:"bulk_processing"`
101+
// Version contains the major Elasticsearch version. If this field is not specified,
102+
// the value will be auto-detected from Elasticsearch.
103+
Version uint `mapstructure:"version"`
104+
// LogLevel contains the Elasticsearch client log-level. Valid values for this field
105+
// are: [debug, info, error]
106+
LogLevel string `mapstructure:"log_level"`
107+
108+
// ---- index related configs ----
109+
Indices Indices `mapstructure:"indices"`
110+
// UseReadWriteAliases, if set to true, will use read and write aliases for indices.
111+
// Use this option with Elasticsearch rollover API. It requires an external component
112+
// to create aliases before startup and then performing its management.
113+
UseReadWriteAliases bool `mapstructure:"use_aliases"`
114+
// CreateIndexTemplates, if set to true, creates index templates at application startup.
115+
// This configuration should be set to false when templates are installed manually.
116+
CreateIndexTemplates bool `mapstructure:"create_mappings"`
117+
// Option to enable Index Lifecycle Management (ILM) for Jaeger span and service indices.
118+
// Read more about ILM at
119+
// https://www.jaegertracing.io/docs/deployment/#enabling-ilm-support
120+
UseILM bool `mapstructure:"use_ilm"`
121+
122+
// ---- jaeger-specific configs ----
123+
// MaxDocCount Defines maximum number of results to fetch from storage per query.
124+
MaxDocCount int `mapstructure:"max_doc_count"`
125+
// MaxSpanAge configures the maximum lookback on span reads.
126+
MaxSpanAge time.Duration `mapstructure:"max_span_age"`
127+
// ServiceCacheTTL contains the TTL for the cache of known service names.
128+
ServiceCacheTTL time.Duration `mapstructure:"service_cache_ttl"`
129+
// AdaptiveSamplingLookback contains the duration to look back for the
130+
// latest adaptive sampling probabilities.
131+
AdaptiveSamplingLookback time.Duration `mapstructure:"adaptive_sampling_lookback"`
132+
Tags TagsAsFields `mapstructure:"tags_as_fields"`
133+
// Enabled, if set to true, enables the namespace for storage pointed to by this configuration.
134+
Enabled bool `mapstructure:"-"`
101135
}
102136

103137
// TagsAsFields holds configuration for tag schema.
@@ -114,6 +148,53 @@ type TagsAsFields struct {
114148
Include string `mapstructure:"include"`
115149
}
116150

151+
// Sniffing sets the sniffing configuration for the ElasticSearch client, which is the process
152+
// of finding all the nodes of your cluster. Read more about sniffing at
153+
// https://github.com/olivere/elastic/wiki/Sniffing.
154+
type Sniffing struct {
155+
// Enabled, if set to true, enables sniffing for the ElasticSearch client.
156+
Enabled bool `mapstructure:"enabled"`
157+
// UseHTTPS, if set to true, sets the HTTP scheme to HTTPS when performing sniffing.
158+
UseHTTPS bool `mapstructure:"use_https"`
159+
}
160+
161+
type BulkProcessing struct {
162+
// MaxBytes, contains the number of bytes which specifies when to flush.
163+
MaxBytes int `mapstructure:"max_bytes"`
164+
// MaxActions contain the number of added actions which specifies when to flush.
165+
MaxActions int `mapstructure:"max_actions"`
166+
// FlushInterval is the interval at the end of which a flush occurs.
167+
FlushInterval time.Duration `mapstructure:"flush_interval"`
168+
// Workers contains the number of concurrent workers allowed to be executed.
169+
Workers int `mapstructure:"workers"`
170+
}
171+
172+
type Authentication struct {
173+
BasicAuthentication BasicAuthentication `mapstructure:"basic"`
174+
BearerTokenAuthentication BearerTokenAuthentication `mapstructure:"bearer_token"`
175+
}
176+
177+
type BasicAuthentication struct {
178+
// Username contains the username required to connect to Elasticsearch.
179+
Username string `mapstructure:"username"`
180+
// Password contains The password required by Elasticsearch
181+
Password string `mapstructure:"password" json:"-"`
182+
// PasswordFilePath contains the path to a file containing password.
183+
// This file is watched for changes.
184+
PasswordFilePath string `mapstructure:"password_file"`
185+
}
186+
187+
// BearerTokenAuthentication contains the configuration for attaching bearer tokens
188+
// when making HTTP requests. Note that TokenFilePath and AllowTokenFromContext
189+
// should not both be enabled. If both TokenFilePath and AllowTokenFromContext are set,
190+
// the TokenFilePath will be ignored.
191+
type BearerTokenAuthentication struct {
192+
// FilePath contains the path to a file containing a bearer token.
193+
FilePath string `mapstructure:"file_path"`
194+
// AllowTokenFromContext, if set to true, enables reading bearer token from the context.
195+
AllowFromContext bool `mapstructure:"from_context"`
196+
}
197+
117198
// NewClient creates a new ElasticSearch client
118199
func NewClient(c *Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) {
119200
if len(c.Servers) < 1 {
@@ -171,10 +252,10 @@ func NewClient(c *Configuration, logger *zap.Logger, metricsFactory metrics.Fact
171252
zap.Any("response", response))
172253
}
173254
}).
174-
BulkSize(c.BulkSize).
175-
Workers(c.BulkWorkers).
176-
BulkActions(c.BulkActions).
177-
FlushInterval(c.BulkFlushInterval).
255+
BulkSize(c.BulkProcessing.MaxBytes).
256+
Workers(c.BulkProcessing.Workers).
257+
BulkActions(c.BulkProcessing.MaxActions).
258+
FlushInterval(c.BulkProcessing.FlushInterval).
178259
Do(context.Background())
179260
if err != nil {
180261
return nil, err
@@ -220,9 +301,9 @@ func NewClient(c *Configuration, logger *zap.Logger, metricsFactory metrics.Fact
220301
func newElasticsearchV8(c *Configuration, logger *zap.Logger) (*esV8.Client, error) {
221302
var options esV8.Config
222303
options.Addresses = c.Servers
223-
options.Username = c.Username
224-
options.Password = c.Password
225-
options.DiscoverNodesOnStart = c.Sniffer
304+
options.Username = c.Authentication.BasicAuthentication.Username
305+
options.Password = c.Authentication.BasicAuthentication.Password
306+
options.DiscoverNodesOnStart = c.Sniffing.Enabled
226307
transport, err := GetHTTPRoundTripper(c, logger)
227308
if err != nil {
228309
return nil, err
@@ -258,14 +339,14 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
258339
if len(c.RemoteReadClusters) == 0 {
259340
c.RemoteReadClusters = source.RemoteReadClusters
260341
}
261-
if c.Username == "" {
262-
c.Username = source.Username
342+
if c.Authentication.BasicAuthentication.Username == "" {
343+
c.Authentication.BasicAuthentication.Username = source.Authentication.BasicAuthentication.Username
263344
}
264-
if c.Password == "" {
265-
c.Password = source.Password
345+
if c.Authentication.BasicAuthentication.Password == "" {
346+
c.Authentication.BasicAuthentication.Password = source.Authentication.BasicAuthentication.Password
266347
}
267-
if !c.Sniffer {
268-
c.Sniffer = source.Sniffer
348+
if !c.Sniffing.Enabled {
349+
c.Sniffing.Enabled = source.Sniffing.Enabled
269350
}
270351
if c.MaxSpanAge == 0 {
271352
c.MaxSpanAge = source.MaxSpanAge
@@ -281,20 +362,20 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
281362
setDefaultIndexOptions(&c.Indices.Services, &source.Indices.Services)
282363
setDefaultIndexOptions(&c.Indices.Dependencies, &source.Indices.Dependencies)
283364

284-
if c.BulkSize == 0 {
285-
c.BulkSize = source.BulkSize
365+
if c.BulkProcessing.MaxBytes == 0 {
366+
c.BulkProcessing.MaxBytes = source.BulkProcessing.MaxBytes
286367
}
287-
if c.BulkWorkers == 0 {
288-
c.BulkWorkers = source.BulkWorkers
368+
if c.BulkProcessing.Workers == 0 {
369+
c.BulkProcessing.Workers = source.BulkProcessing.Workers
289370
}
290-
if c.BulkActions == 0 {
291-
c.BulkActions = source.BulkActions
371+
if c.BulkProcessing.MaxActions == 0 {
372+
c.BulkProcessing.MaxActions = source.BulkProcessing.MaxActions
292373
}
293-
if c.BulkFlushInterval == 0 {
294-
c.BulkFlushInterval = source.BulkFlushInterval
374+
if c.BulkProcessing.FlushInterval == 0 {
375+
c.BulkProcessing.FlushInterval = source.BulkProcessing.FlushInterval
295376
}
296-
if !c.SnifferTLSEnabled {
297-
c.SnifferTLSEnabled = source.SnifferTLSEnabled
377+
if !c.Sniffing.UseHTTPS {
378+
c.Sniffing.UseHTTPS = source.Sniffing.UseHTTPS
298379
}
299380
if !c.Tags.AllAsFields {
300381
c.Tags.AllAsFields = source.Tags.AllAsFields
@@ -361,31 +442,31 @@ func (c *Configuration) TagKeysAsFields() ([]string, error) {
361442
// getConfigOptions wraps the configs to feed to the ElasticSearch client init
362443
func (c *Configuration) getConfigOptions(logger *zap.Logger) ([]elastic.ClientOptionFunc, error) {
363444
options := []elastic.ClientOptionFunc{
364-
elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer),
445+
elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffing.Enabled),
365446
// Disable health check when token from context is allowed, this is because at this time
366-
// we don' have a valid token to do the check ad if we don't disable the check the service that
447+
// we don'r have a valid token to do the check ad if we don't disable the check the service that
367448
// uses this won't start.
368-
elastic.SetHealthcheck(!c.AllowTokenFromContext),
449+
elastic.SetHealthcheck(!c.Authentication.BearerTokenAuthentication.AllowFromContext),
369450
}
370-
if c.SnifferTLSEnabled {
451+
if c.Sniffing.UseHTTPS {
371452
options = append(options, elastic.SetScheme("https"))
372453
}
373454
httpClient := &http.Client{
374-
Timeout: c.Timeout,
455+
Timeout: c.QueryTimeout,
375456
}
376457
options = append(options, elastic.SetHttpClient(httpClient))
377458

378-
if c.Password != "" && c.PasswordFilePath != "" {
459+
if c.Authentication.BasicAuthentication.Password != "" && c.Authentication.BasicAuthentication.PasswordFilePath != "" {
379460
return nil, fmt.Errorf("both Password and PasswordFilePath are set")
380461
}
381-
if c.PasswordFilePath != "" {
382-
passwordFromFile, err := loadTokenFromFile(c.PasswordFilePath)
462+
if c.Authentication.BasicAuthentication.PasswordFilePath != "" {
463+
passwordFromFile, err := loadTokenFromFile(c.Authentication.BasicAuthentication.PasswordFilePath)
383464
if err != nil {
384465
return nil, fmt.Errorf("failed to load password from file: %w", err)
385466
}
386-
c.Password = passwordFromFile
467+
c.Authentication.BasicAuthentication.Password = passwordFromFile
387468
}
388-
options = append(options, elastic.SetBasicAuth(c.Username, c.Password))
469+
options = append(options, elastic.SetBasicAuth(c.Authentication.BasicAuthentication.Username, c.Authentication.BasicAuthentication.Password))
389470

390471
if c.SendGetBodyAs != "" {
391472
options = append(options, elastic.SetSendGetBodyAs(c.SendGetBodyAs))
@@ -465,20 +546,20 @@ func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTrippe
465546
}
466547

467548
token := ""
468-
if c.TokenFilePath != "" {
469-
if c.AllowTokenFromContext {
549+
if c.Authentication.BearerTokenAuthentication.FilePath != "" {
550+
if c.Authentication.BearerTokenAuthentication.AllowFromContext {
470551
logger.Warn("Token file and token propagation are both enabled, token from file won't be used")
471552
}
472-
tokenFromFile, err := loadTokenFromFile(c.TokenFilePath)
553+
tokenFromFile, err := loadTokenFromFile(c.Authentication.BearerTokenAuthentication.FilePath)
473554
if err != nil {
474555
return nil, err
475556
}
476557
token = tokenFromFile
477558
}
478-
if token != "" || c.AllowTokenFromContext {
559+
if token != "" || c.Authentication.BearerTokenAuthentication.AllowFromContext {
479560
transport = bearertoken.RoundTripper{
480561
Transport: httpTransport,
481-
OverrideFromCtx: c.AllowTokenFromContext,
562+
OverrideFromCtx: c.Authentication.BearerTokenAuthentication.AllowFromContext,
482563
StaticToken: token,
483564
}
484565
}

0 commit comments

Comments
 (0)