-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New integration receiver and processor #96
base: main
Are you sure you want to change the base?
Conversation
This looks really promising! I think having the templates as a receiver solves two main issues compared to the "Converter" approach:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this! I wanted to check if a receiverscreator would also work, template config:
extensions:
host_observer:
receivers:
receiver_creator/2:
# Name of the extensions to watch for endpoints to start and stop.
watch_observers: [host_observer]
receivers:
httpcheck/on_host:
# If this rule matches an instance of this receiver will be started.
rule: type == "port" && port == 8080
resource_attributes:
service.name: redis_on_host
processors:
attributes/example:
actions:
- key: account_id
value: 2245
action: insert
pipelines:
extensions: host_observer
metrics:
receiver: ""
processors:
- attributes/example
For the previous configuration to work we would need to add support for extensions in the configuration. I think that supporting extensions could be added in the future, we can focus on templates with just plain receivers + processors for the moment.
@rogercoll opening for review, feel free to involve more people from your team. From the previous version I have fixed the issue with detection of missing variables. They were being ignored so resolution wouldn't fail for parts of the template that were not being used. Now template resolution is done in two steps:
This approach might be seen as a bit inefficient, because it unmarshals twice, but in principle it is not in the hot path, and it could be optimized while keeping the public interface. For optimizing, if required at some point, I think we could filter while resolving, in a single step, but we'd need some kind of custom decoder or resolver. A possible criticism to this filtering during resolution is that these components need to be aware of the structure of the configuration, but they are definitions specific to these components, that mimic OTel collector configuration for convenience, but doesn't need to be the same. With these changes I also took the opportunity to remove some code duplication between the receiver and the processor, and I have also added some tests for template resolution. More testing will be needed, but we could maybe add it as follow up. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code looks good to me, just added some minors comments.
Do we plan to add tests and components documentation in a follow-up PR? It would be the best to have some guidance on how to use all these components.
} | ||
|
||
func (e *fileTemplateExtension) FindTemplate(ctx context.Context, name, version string) (integrations.Template, error) { | ||
path := filepath.Join(e.config.Path, name+".yml") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered using the upstream file provider? (open question, not sure if it's feasible)
I feel like upstream unmarshalling process has capabilities that we might lose if we do our own unmarshall (e.g. embedded file uris)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered using the upstream file provider? (open question, not sure if it's feasible)
Yes, actually I was using it in a previous version, where the template returned provider factories, and I was not doing any processing on the source template.
In the current version template resolution is done in two steps, first the template is partially parsed to remove the unused components, then the template is resolved using the confmap resolver.
I feel like upstream unmarshalling process has capabilities that we might lose if we do our own unmarshall
By using the confmap resolver on the second step we keep many unmarshalling capabilities, specially related to yaml-aware variable resolution.
e.g. embedded file uris
I am instantiating a "sandboxed" resolver, with a limited set of confmap providers. I think it may be risky to allow the use of any provider in templates.
We may decide later if we want to allow the use of other providers as file or env. I don't see particular use cases for them now.
} | ||
|
||
func (e *fileTemplateExtension) FindTemplate(ctx context.Context, name, version string) (integrations.Template, error) { | ||
path := filepath.Join(e.config.Path, name+".yml") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add a README that documents the required .yml
file extension?
} | ||
|
||
type PipelineConfig struct { | ||
Receiver component.ID `mapstructure:"receiver"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be an array of receivers as well? I find it a bit confusing the fact that you can define a map of receivers but only reference one in the pipelines' configuration (differently than the main config).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I see the examples below, if only one receiver is meant to be used in integrations pipelines I would add some documentation on the why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, a pipeline in a template is slightly different to the one in the main config. A pipeline here defines the set of components that are internally instantiated by an integration receiver.
I will add docs, yes. We can also consider using a different name instead of pipelines in the template if it helps to avoid confusion with the pipelines in the main config.
var ErrNotFound = errors.New("not found") | ||
|
||
type Template interface { | ||
Resolve(ctx context.Context, params map[string]any, pipelines []component.ID) (*Config, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add some comments here? What should be the value of params/pipelines?
switch id.Type().String() { | ||
case "logs": | ||
return r.nextLogsConsumer != nil | ||
case "metrics": | ||
return r.nextMetricsConsumer != nil | ||
case "traces": | ||
return r.nextTracesConsumer != nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain why this component ID's types are set to either logs, metrics or traces? AFAIK component.Type is the generic name of the component and it does not contain the signal type: https://github.com/open-telemetry/opentelemetry-collector/blob/main/component/identifiable_test.go#L29
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will comment on this function, and maybe make the code more strict around this.
This method is only intended to be used with pipeline ids, not with any component. And pipelines are expected to be of type logs, metrics or traces in the otel config: https://opentelemetry.io/docs/collector/configuration/#pipelines
We take advantage of this to know what receivers to instantiate for each pipeline in the main config.
type Config struct { | ||
Receivers map[component.ID]map[string]any `mapstructure:"receivers"` | ||
Processors map[component.ID]map[string]any `mapstructure:"processors"` | ||
Pipelines map[component.ID]PipelineConfig `mapstructure:"pipelines"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pipelines map[component.ID]PipelineConfig `mapstructure:"pipelines"` | |
Pipelines map[pipeline.ID]PipelineConfig `mapstructure:"pipelines"` |
I think we could use the pipeline package here https://github.com/open-telemetry/opentelemetry-collector/blob/main/pipeline/pipeline.go#L20
We can assert the signal type afterward.
err := component.Start(ctx, host) | ||
if err != nil { | ||
return fmt.Errorf("failed to start component %q: %w", component, err) | ||
} | ||
} | ||
r.components = append(r.components, components...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we append the components to the receiver's state as soon as are being started? There might be some components that won't be shutdown if there's an error after the first started component.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds like a good idea, yes, I will do it.
} | ||
|
||
func (r *integrationReceiver) Shutdown(ctx context.Context) error { | ||
return shutdownComponents(ctx, r.components) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we clean r.components
after shutdown?
params.Logger = params.Logger.With(zap.String("name", params.ID.String())) | ||
receiversCreated := 0 | ||
if consumerChain.logs != nil { | ||
logs, err := receiverFactory.CreateLogs(ctx, params, preparedConfig, consumerChain.logs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we could wrap these components constructors into an anonymous function so we could share the switch logic (we will need to duplicate the logic for any new signal type like profiles).
not needed for this PR though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will give a try.
} | ||
|
||
type TemplateFinder interface { | ||
FindTemplate(ctx context.Context, name, version string) (Template, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the version parameter used in any of the extensions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not used yet, we still have to think about versioning of templates.
I can remove it till we decide how to handle this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, I would vote for removing it then. (a bit less code :))
Thanks for the review. Yes, I will add tests in a follow up. Regarding docs, I will add at least docs for the areas you mentioned in your comments. |
Summary
These components introduce an implementation of open-telemetry/opentelemetry-collector#11631 to use a receiver to render templates as a mechanism to distribute reusable configurations as part of integrations. Templates are here similar to OTel collector configuration files, with placeholders for variables.
This change includes the following new components:
internal/integrations
with helper code to work with integrations in other components.extension/fileintegrationextension
with an extension that can be used by other components to obtain integrations from local disk.extension/configintegrationextension
with an extension that can be used to embed integration templates in the main OTel collector configuration.processor/integrationprocessor
processor that is composed by instantiating processors from a integration.receiver/integrationreceiver
receiver that is composed by instantiating receivers and processors from a integration.Open questions:
Purpose and use-cases of the new components
The main purpose of integrations is to provide an abstraction to share configurations intended to observe specific applications and services.
Templates are interpreted by components created with this purpose. Two main components are implemented for that, a receiver and a processor. They can use extensions to discover integrations.
These components are intended to be used with minimal configuration, so users can leverage integrations without extensive knowledge of the OTEL collector. The minimal configuration users need to provide is one or more sources of integrations, configured as extensions, the name of the integrations and the parameters it needs.
The use of receiver as the main component to use integrations will allow to leverage any other feature built for receivers, such as autodiscover features.
The main use case is the distribution of configurations for specific services, these configurations include receivers and processors, and pipelines to indicate how the receivers and processors have to be combined. A single integration can include multiple pipelines, users can chose which ones to use.
Find below an example integration, mostly copied from the one in open-telemetry/opentelemetry-collector#8372.
Example configuration for the component
Using the integration receiver:
Using the integration processor:
Telemetry data types supported
These are generic components that support any data type provided by other components.
Is this a vendor-specific component?
It is not vendor-specific.
Code owners
Elastic?
Sponsor
Elastic?
Additional context
This proposal is a follow-up of open-telemetry/opentelemetry-collector-contrib#26312 and open-telemetry/opentelemetry-collector#8372.