Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New integration receiver and processor #96

Open
wants to merge 46 commits into
base: main
Choose a base branch
from

Conversation

jsoriano
Copy link
Member

@jsoriano jsoriano commented Aug 20, 2024

Summary

These components introduce an implementation of open-telemetry/opentelemetry-collector#11631 to use a receiver to render templates as a mechanism to distribute reusable configurations as part of integrations. Templates are here similar to OTel collector configuration files, with placeholders for variables.

This change includes the following new components:

  • internal/integrations with helper code to work with integrations in other components.
  • extension/fileintegrationextension with an extension that can be used by other components to obtain integrations from local disk.
  • extension/configintegrationextension with an extension that can be used to embed integration templates in the main OTel collector configuration.
  • processor/integrationprocessor processor that is composed by instantiating processors from a integration.
  • receiver/integrationreceiver receiver that is composed by instantiating receivers and processors from a integration.

Open questions:

  • Should we explicitly declare dependencies of each integration?
  • Should we version integrations? Would it depend on the source?
  • Do we need to support extensions in integrations or it is ok if the user needs to define them and pass the ids as parameter?

Purpose and use-cases of the new components

The main purpose of integrations is to provide an abstraction to share configurations intended to observe specific applications and services.

Templates are interpreted by components created with this purpose. Two main components are implemented for that, a receiver and a processor. They can use extensions to discover integrations.

These components are intended to be used with minimal configuration, so users can leverage integrations without extensive knowledge of the OTEL collector. The minimal configuration users need to provide is one or more sources of integrations, configured as extensions, the name of the integrations and the parameters it needs.

The use of receiver as the main component to use integrations will allow to leverage any other feature built for receivers, such as autodiscover features.

The main use case is the distribution of configurations for specific services, these configurations include receivers and processors, and pipelines to indicate how the receivers and processors have to be combined. A single integration can include multiple pipelines, users can chose which ones to use.

Find below an example integration, mostly copied from the one in open-telemetry/opentelemetry-collector#8372.

receivers:
  prometheus:
    config:
      scrape_configs:
        - job_name: 'couchbase'
          scrape_interval: 5s
          static_configs:
            - targets: ${var:endpoints}
          basic_auth:
            username: ${var:username}
            password: ${var:password}
          metric_relabel_configs:
            # Include only a few key metrics
            - source_labels: [ __name__ ]
              regex: "(kv_ops)|\
                (kv_vb_curr_items)|\
                (kv_num_vbuckets)|\
                (kv_ep_cursor_memory_freed_bytes)|\
                (kv_total_memory_used_bytes)|\
                (kv_ep_num_value_ejects)|\
                (kv_ep_mem_high_wat)|\
                (kv_ep_mem_low_wat)|\
                (kv_ep_tmp_oom_errors)|\
                (kv_ep_oom_errors)"
              action: keep

processors:
  filter:
    # Filter out prometheus scraping meta-metrics.
    metrics:
      exclude:
        match_type: strict
        metric_names:
          - scrape_samples_post_metric_relabeling
          - scrape_series_added
          - scrape_duration_seconds
          - scrape_samples_scraped
          - up

  metricstransform:
    transforms:
      # Rename from prometheus metric name to OTel metric name.
      # We cannot do this with metric_relabel_configs, as the prometheus receiver does not
      # allow metric renames at this time.
      - include: kv_ops
        match_type: strict
        action: update
        new_name: "couchbase.bucket.operation.count"
      - include: kv_vb_curr_items
        match_type: strict
        action: update
        new_name: "couchbase.bucket.item.count"
      - include: kv_num_vbuckets
        match_type: strict
        action: update
        new_name: "couchbase.bucket.vbucket.count"
      - include: kv_ep_cursor_memory_freed_bytes
        match_type: strict
        action: update
        new_name: "couchbase.bucket.memory.usage.free"
      - include: kv_total_memory_used_bytes
        match_type: strict
        action: update
        new_name: "couchbase.bucket.memory.usage.used"
      - include: kv_ep_num_value_ejects
        match_type: strict
        action: update
        new_name: "couchbase.bucket.item.ejection.count"
      - include: kv_ep_mem_high_wat
        match_type: strict
        action: update
        new_name: "couchbase.bucket.memory.high_water_mark.limit"
      - include: kv_ep_mem_low_wat
        match_type: strict
        action: update
        new_name: "couchbase.bucket.memory.low_water_mark.limit"
      - include: kv_ep_tmp_oom_errors
        match_type: strict
        action: update
        new_name: "couchbase.bucket.error.oom.count.recoverable"
      - include: kv_ep_oom_errors
        match_type: strict
        action: update
        new_name: "couchbase.bucket.error.oom.count.unrecoverable"
      # Combine couchbase.bucket.error.oom.count.x and couchbase.bucket.memory.usage.x
      # metrics.
      #- include: '^couchbase\.bucket\.error\.oom\.count\.(?P<error_type>unrecoverable|recoverable)$$'
      #  match_type: regexp
      #  action: combine
      #  new_name: "couchbase.bucket.error.oom.count"
      - include: '^couchbase\.bucket\.memory\.usage\.(?P<state>free|used)$$'
        match_type: regexp
        action: combine
        new_name: "couchbase.bucket.memory.usage"
      # Aggregate "result" label on operation count to keep label sets consistent across the metric datapoints
      - include: 'couchbase.bucket.operation.count'
        match_type: strict
        action: update
        operations:
          - action: aggregate_labels
            label_set: ["bucket", "op"]
            aggregation_type: sum

  transform:
    metric_statements:
    - context: datapoint
      statements:
        - convert_gauge_to_sum("cumulative", true) where metric.name == "couchbase.bucket.operation.count"
        - set(metric.description, "Number of operations on the bucket.") where metric.name == "couchbase.bucket.operation.count"
        - set(metric.unit, "{operations}") where metric.name == "couchbase.bucket.operation.count"

        - convert_gauge_to_sum("cumulative", false) where metric.name == "couchbase.bucket.item.count"
        - set(metric.description, "Number of items that belong to the bucket.") where metric.name == "couchbase.bucket.item.count"
        - set(metric.unit, "{items}") where metric.name == "couchbase.bucket.item.count"

        - convert_gauge_to_sum("cumulative", false) where metric.name == "couchbase.bucket.vbucket.count"
        - set(metric.description, "Number of non-resident vBuckets.") where metric.name == "couchbase.bucket.vbucket.count"
        - set(metric.unit, "{vbuckets}") where metric.name == "couchbase.bucket.vbucket.count"

        - convert_gauge_to_sum("cumulative", false) where metric.name == "couchbase.bucket.memory.usage"
        - set(metric.description, "Usage of total memory available to the bucket.") where metric.name == "couchbase.bucket.memory.usage"
        - set(metric.unit, "By") where metric.name == "couchbase.bucket.memory.usage"

        - convert_gauge_to_sum("cumulative", true) where metric.name == "couchbase.bucket.item.ejection.count"
        - set(metric.description, "Number of item value ejections from memory to disk.") where metric.name == "couchbase.bucket.item.ejection.count"
        - set(metric.unit, "{ejections}") where metric.name == "couchbase.bucket.item.ejection.count"

        - convert_gauge_to_sum("cumulative", true) where metric.name == "couchbase.bucket.error.oom.count"
        - set(metric.description, "Number of out of memory errors.") where metric.name == "couchbase.bucket.error.oom.count"
        - set(metric.unit, "{errors}") where metric.name == "couchbase.bucket.error.oom.count"

        - set(metric.description, "The memory usage at which items will be ejected.") where metric.name == "couchbase.bucket.memory.high_water_mark.limit"
        - set(metric.unit, "By") where metric.name == "couchbase.bucket.memory.high_water_mark.limit"

        - set(metric.description, "The memory usage at which ejections will stop that were previously triggered by a high water mark breach.") where metric.name == "couchbase.bucket.memory.low_water_mark.limit"
        - set(metric.unit, "By") where metric.name == "couchbase.bucket.memory.low_water_mark.limit"

pipelines:
  metrics:
    receiver: prometheus
    processors:
      - filter
      - metricstransform
      - transform

Example configuration for the component

Using the integration receiver:

extensions:
  file_integrations: # <--- This extension is used to discover integrations from a local directory.
    path: "./integrations"

receivers:
  integration/couchbase:
    name: "couchbase" # <--- This is the name of the integration.
    pipelines: [metrics,logs]
    parameters:
      endpoints: [localhost:8091]
      username: Administrator
      password: foobar

exporters:
  debug:
    verbosity: detailed

service:
  extensions: [file_integrations]
  pipelines:
    metrics:
      receivers: [integration/couchbase]
      exporters: [debug]

Using the integration processor:

extensions:
  file_integrations:
    path: "./integrations"

receivers:
  prometheus/couchbase:
    config:
      scrape_configs:
        - job_name: 'couchbase'
          scrape_interval: 5s
          static_configs:
            - targets: [localhost:8091]

processors:
  integration/couchbase:
    name: "couchbase"

exporters:
  debug:
    verbosity: detailed

service:
  extensions: [file_integrations]
  pipelines:
    metrics:
      receivers: [prometheus/couchbase]
      processors: [integration/couchbase]
      exporters: [debug]

Telemetry data types supported

These are generic components that support any data type provided by other components.

Is this a vendor-specific component?

It is not vendor-specific.

Code owners

Elastic?

Sponsor

Elastic?

Additional context

This proposal is a follow-up of open-telemetry/opentelemetry-collector-contrib#26312 and open-telemetry/opentelemetry-collector#8372.

@rogercoll
Copy link
Contributor

This looks really promising! I think having the templates as a receiver solves two main issues compared to the "Converter" approach:

  • Receiver creator: A template receiver could be included in a receivercreator configuration.
  • Forward connector: The template receiver does not rely on the collector having the "forward" connector in its pipeline.

Copy link
Contributor

@rogercoll rogercoll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this! I wanted to check if a receiverscreator would also work, template config:

extensions:
    host_observer:

receivers:
  receiver_creator/2:
    # Name of the extensions to watch for endpoints to start and stop.
    watch_observers: [host_observer]
    receivers:
      httpcheck/on_host:
        # If this rule matches an instance of this receiver will be started.
        rule: type == "port" && port == 8080
        resource_attributes:
          service.name: redis_on_host

processors:
  attributes/example:
    actions:
      - key: account_id
        value: 2245
        action: insert

pipelines:
  extensions: host_observer
  metrics:
    receiver: ""
    processors:
      - attributes/example

For the previous configuration to work we would need to add support for extensions in the configuration. I think that supporting extensions could be added in the future, we can focus on templates with just plain receivers + processors for the moment.

@jsoriano jsoriano changed the title POC of template receiver and processor POC of integration receiver and processor Sep 19, 2024
@jsoriano
Copy link
Member Author

@rogercoll opening for review, feel free to involve more people from your team.

From the previous version I have fixed the issue with detection of missing variables. They were being ignored so resolution wouldn't fail for parts of the template that were not being used.

Now template resolution is done in two steps:

  1. Template is unmarshalled as raw YAML till the component IDs, and unused components are filtered out.
  2. Filtered template is resolved using a confmap resolver, so variables are replaced.

This approach might be seen as a bit inefficient, because it unmarshals twice, but in principle it is not in the hot path, and it could be optimized while keeping the public interface. For optimizing, if required at some point, I think we could filter while resolving, in a single step, but we'd need some kind of custom decoder or resolver.

A possible criticism to this filtering during resolution is that these components need to be aware of the structure of the configuration, but they are definitions specific to these components, that mimic OTel collector configuration for convenience, but doesn't need to be the same.
These components still don't need to know anything about the specifics of OTel collector configuration, because they only read their specific configs, and build the required subcomponents internally.

With these changes I also took the opportunity to remove some code duplication between the receiver and the processor, and I have also added some tests for template resolution. More testing will be needed, but we could maybe add it as follow up.

@jsoriano jsoriano marked this pull request as ready for review February 21, 2025 10:38
@jsoriano jsoriano requested a review from a team as a code owner February 21, 2025 10:38
@jsoriano jsoriano changed the title POC of integration receiver and processor New integration receiver and processor Feb 21, 2025
@jsoriano jsoriano requested a review from ChrsMark February 21, 2025 10:45
Copy link
Contributor

@rogercoll rogercoll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks good to me, just added some minors comments.

Do we plan to add tests and components documentation in a follow-up PR? It would be the best to have some guidance on how to use all these components.

}

func (e *fileTemplateExtension) FindTemplate(ctx context.Context, name, version string) (integrations.Template, error) {
path := filepath.Join(e.config.Path, name+".yml")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered using the upstream file provider? (open question, not sure if it's feasible)

I feel like upstream unmarshalling process has capabilities that we might lose if we do our own unmarshall (e.g. embedded file uris)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered using the upstream file provider? (open question, not sure if it's feasible)

Yes, actually I was using it in a previous version, where the template returned provider factories, and I was not doing any processing on the source template.

In the current version template resolution is done in two steps, first the template is partially parsed to remove the unused components, then the template is resolved using the confmap resolver.

I feel like upstream unmarshalling process has capabilities that we might lose if we do our own unmarshall

By using the confmap resolver on the second step we keep many unmarshalling capabilities, specially related to yaml-aware variable resolution.

e.g. embedded file uris

I am instantiating a "sandboxed" resolver, with a limited set of confmap providers. I think it may be risky to allow the use of any provider in templates.

We may decide later if we want to allow the use of other providers as file or env. I don't see particular use cases for them now.

}

func (e *fileTemplateExtension) FindTemplate(ctx context.Context, name, version string) (integrations.Template, error) {
path := filepath.Join(e.config.Path, name+".yml")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a README that documents the required .yml file extension?

}

type PipelineConfig struct {
Receiver component.ID `mapstructure:"receiver"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be an array of receivers as well? I find it a bit confusing the fact that you can define a map of receivers but only reference one in the pipelines' configuration (differently than the main config).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I see the examples below, if only one receiver is meant to be used in integrations pipelines I would add some documentation on the why.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a pipeline in a template is slightly different to the one in the main config. A pipeline here defines the set of components that are internally instantiated by an integration receiver.

I will add docs, yes. We can also consider using a different name instead of pipelines in the template if it helps to avoid confusion with the pipelines in the main config.

var ErrNotFound = errors.New("not found")

type Template interface {
Resolve(ctx context.Context, params map[string]any, pipelines []component.ID) (*Config, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add some comments here? What should be the value of params/pipelines?

Comment on lines +112 to +118
switch id.Type().String() {
case "logs":
return r.nextLogsConsumer != nil
case "metrics":
return r.nextMetricsConsumer != nil
case "traces":
return r.nextTracesConsumer != nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why this component ID's types are set to either logs, metrics or traces? AFAIK component.Type is the generic name of the component and it does not contain the signal type: https://github.com/open-telemetry/opentelemetry-collector/blob/main/component/identifiable_test.go#L29

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will comment on this function, and maybe make the code more strict around this.

This method is only intended to be used with pipeline ids, not with any component. And pipelines are expected to be of type logs, metrics or traces in the otel config: https://opentelemetry.io/docs/collector/configuration/#pipelines

We take advantage of this to know what receivers to instantiate for each pipeline in the main config.

type Config struct {
Receivers map[component.ID]map[string]any `mapstructure:"receivers"`
Processors map[component.ID]map[string]any `mapstructure:"processors"`
Pipelines map[component.ID]PipelineConfig `mapstructure:"pipelines"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Pipelines map[component.ID]PipelineConfig `mapstructure:"pipelines"`
Pipelines map[pipeline.ID]PipelineConfig `mapstructure:"pipelines"`

I think we could use the pipeline package here https://github.com/open-telemetry/opentelemetry-collector/blob/main/pipeline/pipeline.go#L20

We can assert the signal type afterward.

Comment on lines +258 to +263
err := component.Start(ctx, host)
if err != nil {
return fmt.Errorf("failed to start component %q: %w", component, err)
}
}
r.components = append(r.components, components...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we append the components to the receiver's state as soon as are being started? There might be some components that won't be shutdown if there's an error after the first started component.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a good idea, yes, I will do it.

}

func (r *integrationReceiver) Shutdown(ctx context.Context) error {
return shutdownComponents(ctx, r.components)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we clean r.components after shutdown?

params.Logger = params.Logger.With(zap.String("name", params.ID.String()))
receiversCreated := 0
if consumerChain.logs != nil {
logs, err := receiverFactory.CreateLogs(ctx, params, preparedConfig, consumerChain.logs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could wrap these components constructors into an anonymous function so we could share the switch logic (we will need to duplicate the logic for any new signal type like profiles).

not needed for this PR though

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will give a try.

}

type TemplateFinder interface {
FindTemplate(ctx context.Context, name, version string) (Template, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the version parameter used in any of the extensions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used yet, we still have to think about versioning of templates.

I can remove it till we decide how to handle this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I would vote for removing it then. (a bit less code :))

@jsoriano
Copy link
Member Author

Do we plan to add tests and components documentation in a follow-up PR? It would be the best to have some guidance on how to use all these components.

Thanks for the review. Yes, I will add tests in a follow up. Regarding docs, I will add at least docs for the areas you mentioned in your comments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants