diff --git a/.chloggen/add-batch-api-to-azure-monitor-receiver.yaml b/.chloggen/add-batch-api-to-azure-monitor-receiver.yaml new file mode 100644 index 000000000000..9c1463bbd4e6 --- /dev/null +++ b/.chloggen/add-batch-api-to-azure-monitor-receiver.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: azuremonitorreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Allow the Metrics Batch API to be used instead of the Azure Resource Manager API to run less queries and avoid being throttled. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: This is intended to be a Proof-of-Concept and includes a lot of replication. It is inteded to be used to start a conversation until ideas can be integrated and issues resolved. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/configschema/go.mod b/cmd/configschema/go.mod index 04bc7581257d..e1b35742e260 100644 --- a/cmd/configschema/go.mod +++ b/cmd/configschema/go.mod @@ -202,8 +202,10 @@ require ( github.com/Azure/azure-sdk-for-go v68.0.0+incompatible // indirect github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.2 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2 // indirect + github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v4 v4.2.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2.1 // indirect + github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.1 // indirect github.com/Azure/azure-storage-queue-go v0.0.0-20230531184854-c06a8eff66fe // indirect github.com/Azure/go-amqp v1.0.4 // indirect diff --git a/cmd/configschema/go.sum b/cmd/configschema/go.sum index 855d53c418a7..28e9a273e868 100644 --- a/cmd/configschema/go.sum +++ b/cmd/configschema/go.sum @@ -93,6 +93,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 h1:sO0/P7g68FrryJzljemN+ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1/go.mod h1:h8hyGFDsU5HMivxiS2iYFZsgDbU9OnnJ163x5UGVKYo= github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2 h1:LqbJ/WzJUwBf8UiaSzgX7aMclParm9/5Vgp+TY51uBQ= github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2/go.mod h1:yInRyqWXAuaPrgI7p70+lDDgh3mlBohis29jGMISnmc= +github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1 h1:uG9gOhn40/Ocd12+Nm6vAZM80s9hwB2Yhjg5UM4rb/A= +github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1/go.mod h1:8STfZzbS0RUr8NtnAreiVeLCH3bpJSTmuARvThbTGmk= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v4 v4.2.1 h1:UPeCRD+XY7QlaGQte2EVI2iOcWvUYA2XY8w5T/8v0NQ= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v4 v4.2.1/go.mod h1:oGV6NlB0cvi1ZbYRR2UN44QHxWFyGk+iylgD0qaMXjA= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal v1.1.2 h1:mLY+pNLjCUeKhgnAJWAKhEUQM+RJQo2H1fuGSw1Ky1E= @@ -105,6 +107,8 @@ github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2.1/go.mod h1:Bzf34hhAE9NSxailk8xVeLEZbUjOXcC+GnU1mMKdhLw= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0 h1:Dd+RhdJn0OTtVGaeDLZpcumkIVCtA/3/Fo42+eoYvVM= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0/go.mod h1:5kakwfW5CjC9KK+Q4wjXAg+ShuIm2mBMua0ZFj2C8PE= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0 h1:wxQx2Bt4xzPIKvW59WQf1tJNx/ZZKPfN+EhPX3Z6CYY= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0/go.mod h1:TpiwjwnW/khS0LKs4vW5UmmT9OWcxaveS8U7+tlknzo= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.5.0 h1:AifHbc4mg0x9zW52WOpKbsHaDKuRhlI7TVl47thgQ70= github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/azkeys v1.0.0 h1:yfJe15aSwEQ6Oo6J+gdfdulPNoZ3TEhmbhLIoxZcA+U= github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v0.8.0 h1:T028gtTPiYt/RMUfs8nVsAL7FDQrfLlrm/NnRG/zcC4= diff --git a/cmd/otelcontribcol/go.mod b/cmd/otelcontribcol/go.mod index 5041df63c807..09cb4c1a83e3 100644 --- a/cmd/otelcontribcol/go.mod +++ b/cmd/otelcontribcol/go.mod @@ -246,10 +246,12 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.2 // indirect github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2 // indirect + github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v4 v4.2.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.1 // indirect github.com/Azure/azure-storage-queue-go v0.0.0-20230531184854-c06a8eff66fe // indirect github.com/Azure/go-amqp v1.0.4 // indirect diff --git a/cmd/otelcontribcol/go.sum b/cmd/otelcontribcol/go.sum index 2a322c5f5222..7b4f37f71bd9 100644 --- a/cmd/otelcontribcol/go.sum +++ b/cmd/otelcontribcol/go.sum @@ -92,6 +92,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 h1:sO0/P7g68FrryJzljemN+ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1/go.mod h1:h8hyGFDsU5HMivxiS2iYFZsgDbU9OnnJ163x5UGVKYo= github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2 h1:LqbJ/WzJUwBf8UiaSzgX7aMclParm9/5Vgp+TY51uBQ= github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2/go.mod h1:yInRyqWXAuaPrgI7p70+lDDgh3mlBohis29jGMISnmc= +github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1 h1:uG9gOhn40/Ocd12+Nm6vAZM80s9hwB2Yhjg5UM4rb/A= +github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1/go.mod h1:8STfZzbS0RUr8NtnAreiVeLCH3bpJSTmuARvThbTGmk= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v4 v4.2.1 h1:UPeCRD+XY7QlaGQte2EVI2iOcWvUYA2XY8w5T/8v0NQ= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v4 v4.2.1/go.mod h1:oGV6NlB0cvi1ZbYRR2UN44QHxWFyGk+iylgD0qaMXjA= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal v1.1.2 h1:mLY+pNLjCUeKhgnAJWAKhEUQM+RJQo2H1fuGSw1Ky1E= @@ -104,6 +106,8 @@ github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2.1/go.mod h1:Bzf34hhAE9NSxailk8xVeLEZbUjOXcC+GnU1mMKdhLw= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0 h1:Dd+RhdJn0OTtVGaeDLZpcumkIVCtA/3/Fo42+eoYvVM= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0/go.mod h1:5kakwfW5CjC9KK+Q4wjXAg+ShuIm2mBMua0ZFj2C8PE= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0 h1:wxQx2Bt4xzPIKvW59WQf1tJNx/ZZKPfN+EhPX3Z6CYY= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0/go.mod h1:TpiwjwnW/khS0LKs4vW5UmmT9OWcxaveS8U7+tlknzo= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.5.0 h1:AifHbc4mg0x9zW52WOpKbsHaDKuRhlI7TVl47thgQ70= github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/azkeys v1.0.0 h1:yfJe15aSwEQ6Oo6J+gdfdulPNoZ3TEhmbhLIoxZcA+U= github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v0.8.0 h1:T028gtTPiYt/RMUfs8nVsAL7FDQrfLlrm/NnRG/zcC4= diff --git a/go.mod b/go.mod index 39d040f0839f..31e952e4051b 100644 --- a/go.mod +++ b/go.mod @@ -214,10 +214,12 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.2 // indirect github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2 // indirect + github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v4 v4.2.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.1 // indirect github.com/Azure/azure-storage-queue-go v0.0.0-20230531184854-c06a8eff66fe // indirect github.com/Azure/go-amqp v1.0.4 // indirect diff --git a/go.sum b/go.sum index 5cddaf1c804f..3bbfa8212c9a 100644 --- a/go.sum +++ b/go.sum @@ -93,6 +93,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 h1:sO0/P7g68FrryJzljemN+ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1/go.mod h1:h8hyGFDsU5HMivxiS2iYFZsgDbU9OnnJ163x5UGVKYo= github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2 h1:LqbJ/WzJUwBf8UiaSzgX7aMclParm9/5Vgp+TY51uBQ= github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2/go.mod h1:yInRyqWXAuaPrgI7p70+lDDgh3mlBohis29jGMISnmc= +github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1 h1:uG9gOhn40/Ocd12+Nm6vAZM80s9hwB2Yhjg5UM4rb/A= +github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1/go.mod h1:8STfZzbS0RUr8NtnAreiVeLCH3bpJSTmuARvThbTGmk= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v4 v4.2.1 h1:UPeCRD+XY7QlaGQte2EVI2iOcWvUYA2XY8w5T/8v0NQ= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v4 v4.2.1/go.mod h1:oGV6NlB0cvi1ZbYRR2UN44QHxWFyGk+iylgD0qaMXjA= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal v1.1.2 h1:mLY+pNLjCUeKhgnAJWAKhEUQM+RJQo2H1fuGSw1Ky1E= @@ -105,6 +107,8 @@ github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2 v2.2.1/go.mod h1:Bzf34hhAE9NSxailk8xVeLEZbUjOXcC+GnU1mMKdhLw= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0 h1:Dd+RhdJn0OTtVGaeDLZpcumkIVCtA/3/Fo42+eoYvVM= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0/go.mod h1:5kakwfW5CjC9KK+Q4wjXAg+ShuIm2mBMua0ZFj2C8PE= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0 h1:wxQx2Bt4xzPIKvW59WQf1tJNx/ZZKPfN+EhPX3Z6CYY= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0/go.mod h1:TpiwjwnW/khS0LKs4vW5UmmT9OWcxaveS8U7+tlknzo= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.5.0 h1:AifHbc4mg0x9zW52WOpKbsHaDKuRhlI7TVl47thgQ70= github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/azkeys v1.0.0 h1:yfJe15aSwEQ6Oo6J+gdfdulPNoZ3TEhmbhLIoxZcA+U= github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v0.8.0 h1:T028gtTPiYt/RMUfs8nVsAL7FDQrfLlrm/NnRG/zcC4= diff --git a/receiver/azuremonitorreceiver/config.go b/receiver/azuremonitorreceiver/config.go index 5037a08d9b29..781cefa7acaf 100644 --- a/receiver/azuremonitorreceiver/config.go +++ b/receiver/azuremonitorreceiver/config.go @@ -26,6 +26,7 @@ var ( errMissingClientSecret = errors.New(`ClientSecret" is not specified in config`) errMissingFedTokenFile = errors.New(`FederatedTokenFile is not specified in config`) errInvalidCloud = errors.New(`Cloud" is invalid`) + errInvalidRegion = errors.New("`Region` is not specified in config`") monitorServices = []string{ "Microsoft.EventGrid/eventSubscriptions", @@ -244,6 +245,10 @@ type Config struct { CacheResourcesDefinitions float64 `mapstructure:"cache_resources_definitions"` MaximumNumberOfMetricsInACall int `mapstructure:"maximum_number_of_metrics_in_a_call"` AppendTagsAsAttributes bool `mapstructure:"append_tags_as_attributes"` + UseBatchApi bool `mapstructure:"use_batch_api"` + DiscoverSubscription bool `mapstructure:"discover_subscriptions"` + Region string `mapstructure:"region"` + MaximumNumberOfDimensionsInACall int `mapstructure:"maximum_number_of_dimensions_in_a_call"` } const ( @@ -253,7 +258,7 @@ const ( // Validate validates the configuration by checking for missing or invalid fields func (c Config) Validate() (err error) { - if c.SubscriptionID == "" { + if c.SubscriptionID == "" && !c.DiscoverSubscription { err = multierr.Append(err, errMissingSubscriptionID) } @@ -290,5 +295,9 @@ func (c Config) Validate() (err error) { err = multierr.Append(err, errInvalidCloud) } + if c.UseBatchApi && c.Region == "" && !c.DiscoverSubscription { + err = multierr.Append(err, errInvalidRegion) + } + return } diff --git a/receiver/azuremonitorreceiver/factory.go b/receiver/azuremonitorreceiver/factory.go index 96800cbea13c..4a44a3dc8223 100644 --- a/receiver/azuremonitorreceiver/factory.go +++ b/receiver/azuremonitorreceiver/factory.go @@ -36,14 +36,15 @@ func createDefaultConfig() component.Config { cfg.CollectionInterval = defaultCollectionInterval return &Config{ - ScraperControllerSettings: cfg, - MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), - CacheResources: 24 * 60 * 60, - CacheResourcesDefinitions: 24 * 60 * 60, - MaximumNumberOfMetricsInACall: 20, - Services: monitorServices, - Authentication: servicePrincipal, - Cloud: defaultCloud, + ScraperControllerSettings: cfg, + MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), + CacheResources: 24 * 60 * 60, + CacheResourcesDefinitions: 24 * 60 * 60, + MaximumNumberOfMetricsInACall: 20, + Services: monitorServices, + Authentication: servicePrincipal, + Cloud: defaultCloud, + MaximumNumberOfDimensionsInACall: 10, } } @@ -53,8 +54,16 @@ func createMetricsReceiver(_ context.Context, params receiver.CreateSettings, rC return nil, errConfigNotAzureMonitor } - azureScraper := newScraper(cfg, params) - scraper, err := scraperhelper.NewScraper(metadata.Type.String(), azureScraper.scrape, scraperhelper.WithStart(azureScraper.start)) + var scraper scraperhelper.Scraper + var err error + if cfg.UseBatchApi { + azureBatchScraper := newBatchScraper(cfg, params) + scraper, err = scraperhelper.NewScraper(metadata.Type.String(), azureBatchScraper.scrape, scraperhelper.WithStart(azureBatchScraper.start)) + } else { + azureScraper := newScraper(cfg, params) + scraper, err = scraperhelper.NewScraper(metadata.Type.String(), azureScraper.scrape, scraperhelper.WithStart(azureScraper.start)) + } + if err != nil { return nil, err } diff --git a/receiver/azuremonitorreceiver/go.mod b/receiver/azuremonitorreceiver/go.mod index 512b7ea2e1ad..f38edb1395e8 100644 --- a/receiver/azuremonitorreceiver/go.mod +++ b/receiver/azuremonitorreceiver/go.mod @@ -5,8 +5,10 @@ go 1.20 require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.2 github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 + github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0 + github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0 github.com/google/go-cmp v0.6.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.94.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.94.0 diff --git a/receiver/azuremonitorreceiver/go.sum b/receiver/azuremonitorreceiver/go.sum index 960872620f06..03e5bbcd7205 100644 --- a/receiver/azuremonitorreceiver/go.sum +++ b/receiver/azuremonitorreceiver/go.sum @@ -4,12 +4,16 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 h1:sO0/P7g68FrryJzljemN+ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1/go.mod h1:h8hyGFDsU5HMivxiS2iYFZsgDbU9OnnJ163x5UGVKYo= github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2 h1:LqbJ/WzJUwBf8UiaSzgX7aMclParm9/5Vgp+TY51uBQ= github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2/go.mod h1:yInRyqWXAuaPrgI7p70+lDDgh3mlBohis29jGMISnmc= +github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1 h1:uG9gOhn40/Ocd12+Nm6vAZM80s9hwB2Yhjg5UM4rb/A= +github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery v1.2.0-beta.1/go.mod h1:8STfZzbS0RUr8NtnAreiVeLCH3bpJSTmuARvThbTGmk= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal/v2 v2.0.0 h1:PTFGRSlMKCQelWwxUyYVEUqseBJVemLyqWJjvMyt0do= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/managementgroups/armmanagementgroups v1.0.0 h1:pPvTJ1dY0sA35JOeFq6TsY2xj6Z85Yo23Pj4wCCvu4o= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0 h1:Ds0KRF8ggpEGg4Vo42oX1cIt/IfOhHWJBikksZbVxeg= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0/go.mod h1:jj6P8ybImR+5topJ+eH6fgcemSFBmU6/6bFF8KkwuDI= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0 h1:Dd+RhdJn0OTtVGaeDLZpcumkIVCtA/3/Fo42+eoYvVM= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0/go.mod h1:5kakwfW5CjC9KK+Q4wjXAg+ShuIm2mBMua0ZFj2C8PE= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0 h1:wxQx2Bt4xzPIKvW59WQf1tJNx/ZZKPfN+EhPX3Z6CYY= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0/go.mod h1:TpiwjwnW/khS0LKs4vW5UmmT9OWcxaveS8U7+tlknzo= github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1 h1:DzHpqpoJVaCgOUdVHxE8QB52S6NiVdDQvGlny1qvPqA= github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= diff --git a/receiver/azuremonitorreceiver/scraper_batch.go b/receiver/azuremonitorreceiver/scraper_batch.go new file mode 100644 index 000000000000..bddb0f32d83c --- /dev/null +++ b/receiver/azuremonitorreceiver/scraper_batch.go @@ -0,0 +1,545 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package azuremonitorreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azuremonitorreceiver" + +import ( + "context" + "errors" + "fmt" + "maps" + "sort" + "strings" + "sync" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/arm" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/cloud" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/azidentity" + "github.com/Azure/azure-sdk-for-go/sdk/monitor/azquery" + "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor" + "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources" + "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azuremonitorreceiver/internal/metadata" +) + +type azureType struct { + name *string + attributes map[string]*string + resourceIds []*string + metricsByCompositeKey map[metricsCompositeKey]*azureResourceMetrics + metricsDefinitionsUpdated time.Time +} + +func newBatchScraper(conf *Config, settings receiver.CreateSettings) *azureBatchScraper { + return &azureBatchScraper{ + cfg: conf, + settings: settings.TelemetrySettings, + mb: metadata.NewMetricsBuilder(conf.MetricsBuilderConfig, settings), + azIDCredentialsFunc: azidentity.NewClientSecretCredential, + azIDWorkloadFunc: azidentity.NewWorkloadIdentityCredential, + armMonitorDefinitionsClientFunc: armmonitor.NewMetricDefinitionsClient, + azQueryMetricsBatchClientFunc: azquery.NewMetricsBatchClient, + mutex: &sync.Mutex{}, + } +} + +type ArmsubscriptionClient interface { + NewListPager(options *armsubscriptions.ClientListOptions) *runtime.Pager[armsubscriptions.ClientListResponse] + NewListLocationsPager(subscriptionID string, options *armsubscriptions.ClientListLocationsOptions) *runtime.Pager[armsubscriptions.ClientListLocationsResponse] +} + +type azureBatchScraper struct { + cred azcore.TokenCredential + cfg *Config + settings component.TelemetrySettings + discoveredSubscriptions map[string]*armsubscriptions.Subscription + regionsFromSubscriptions map[string]map[string]struct{} + resources map[string]map[string]*azureResource + resourceTypes map[string]map[string]*azureType + resourcesUpdated time.Time + mb *metadata.MetricsBuilder + azIDCredentialsFunc func(string, string, string, *azidentity.ClientSecretCredentialOptions) (*azidentity.ClientSecretCredential, error) + azIDWorkloadFunc func(options *azidentity.WorkloadIdentityCredentialOptions) (*azidentity.WorkloadIdentityCredential, error) + armClientOptions *arm.ClientOptions + armSubscriptionclient ArmsubscriptionClient + armMonitorDefinitionsClientFunc func(string, azcore.TokenCredential, *arm.ClientOptions) (*armmonitor.MetricDefinitionsClient, error) + azQueryMetricsBatchClientOptions *azquery.MetricsBatchClientOptions + azQueryMetricsBatchClientFunc func(string, azcore.TokenCredential, *azquery.MetricsBatchClientOptions) (*azquery.MetricsBatchClient, error) + mutex *sync.Mutex +} + +func (s *azureBatchScraper) getArmClientOptions() *arm.ClientOptions { + var cloudToUse cloud.Configuration + switch s.cfg.Cloud { + case azureGovernmentCloud: + cloudToUse = cloud.AzureGovernment + default: + cloudToUse = cloud.AzurePublic + } + options := arm.ClientOptions{ + ClientOptions: azcore.ClientOptions{ + Cloud: cloudToUse, + }, + } + + return &options +} + +func (s *azureBatchScraper) getAzQueryMetricsBatchClientOptions() *azquery.MetricsBatchClientOptions { + var cloudToUse cloud.Configuration + switch s.cfg.Cloud { + case azureGovernmentCloud: + cloudToUse = cloud.AzureGovernment + default: + cloudToUse = cloud.AzurePublic + } + + options := azquery.MetricsBatchClientOptions{ + ClientOptions: azcore.ClientOptions{ + Cloud: cloudToUse, + }, + } + + return &options +} + +func (s *azureBatchScraper) getArmsubscriptionClient() ArmsubscriptionClient { + client, _ := armsubscriptions.NewClient(s.cred, s.armClientOptions) + return client +} + +func (s *azureBatchScraper) getArmClient(subscriptionId string) ArmClient { + client, _ := armresources.NewClient(subscriptionId, s.cred, s.armClientOptions) + return client +} + +func (s *azureBatchScraper) getMetricsDefinitionsClient(subscriptionId string) MetricsDefinitionsClientInterface { + client, _ := s.armMonitorDefinitionsClientFunc(subscriptionId, s.cred, s.armClientOptions) + return client +} + +type MetricBatchValuesClient interface { + QueryBatch(ctx context.Context, subscriptionID string, metricNamespace string, metricNames []string, resourceIDs azquery.ResourceIDList, options *azquery.MetricsBatchClientQueryBatchOptions) ( + azquery.MetricsBatchClientQueryBatchResponse, error, + ) +} + +func (s *azureBatchScraper) GetMetricsBatchValuesClient(region string) MetricBatchValuesClient { + endpoint := "https://" + region + ".metrics.monitor.azure.com" + s.settings.Logger.Debug("Batch Endpoint", zap.String("endpoint", endpoint)) + client, _ := azquery.NewMetricsBatchClient(endpoint, s.cred, s.azQueryMetricsBatchClientOptions) + return client +} + +func (s *azureBatchScraper) start(ctx context.Context, _ component.Host) (err error) { + if err = s.loadCredentials(); err != nil { + return err + } + + s.armClientOptions = s.getArmClientOptions() + s.azQueryMetricsBatchClientOptions = s.getAzQueryMetricsBatchClientOptions() + s.armSubscriptionclient = s.getArmsubscriptionClient() + s.resources = map[string]map[string]*azureResource{} + s.resourceTypes = map[string]map[string]*azureType{} + s.discoveredSubscriptions = map[string]*armsubscriptions.Subscription{} + s.regionsFromSubscriptions = map[string]map[string]struct{}{} + + if !s.cfg.DiscoverSubscription { + s.resources[s.cfg.SubscriptionID] = make(map[string]*azureResource) + s.resourceTypes[s.cfg.SubscriptionID] = make(map[string]*azureType) + s.discoveredSubscriptions[s.cfg.SubscriptionID] = &armsubscriptions.Subscription{ + ID: &s.cfg.SubscriptionID, + DisplayName: &s.cfg.SubscriptionID, + } + } else { + s.getSubscriptions(ctx) + } + + return +} + +func (s *azureBatchScraper) loadCredentials() (err error) { + switch s.cfg.Authentication { + case servicePrincipal: + if s.cred, err = s.azIDCredentialsFunc(s.cfg.TenantID, s.cfg.ClientID, s.cfg.ClientSecret, nil); err != nil { + return err + } + case workloadIdentity: + if s.cred, err = s.azIDWorkloadFunc(nil); err != nil { + return err + } + default: + return fmt.Errorf("unknown authentication %v", s.cfg.Authentication) + } + return nil +} + +func (s *azureBatchScraper) scrape(ctx context.Context) (pmetric.Metrics, error) { + if !(time.Since(s.resourcesUpdated).Seconds() < s.cfg.CacheResources) { + s.getSubscriptions(ctx) + } + var wg sync.WaitGroup + for _, subscription := range s.discoveredSubscriptions { + wg.Add(1) + go func(subscription *armsubscriptions.Subscription) { + defer wg.Done() + + s.getResources(ctx, *subscription.SubscriptionID) + resourceTypesWithDefinitions := make(chan string) + go func() { + defer close(resourceTypesWithDefinitions) + for resourceType := range s.resourceTypes[*subscription.SubscriptionID] { + s.getResourceMetricsDefinitionsByType(ctx, subscription, resourceType) + resourceTypesWithDefinitions <- resourceType + } + }() + + var wg2 sync.WaitGroup + for resourceType := range resourceTypesWithDefinitions { + wg2.Add(1) + go func(subscription *armsubscriptions.Subscription, resourceType string) { + defer wg2.Done() + s.getBatchMetricsValues(ctx, subscription, resourceType) + }(subscription, resourceType) + } + + wg2.Wait() + }(subscription) + + } + + wg.Wait() + return s.mb.Emit(), nil +} + +func (s *azureBatchScraper) getSubscriptions(ctx context.Context) { + opts := &armsubscriptions.ClientListOptions{} + pager := s.armSubscriptionclient.NewListPager(opts) + + for pager.More() { + nextResult, err := pager.NextPage(ctx) + if err != nil { + s.settings.Logger.Error("failed to get Azure Subscriptions", zap.Error(err)) + return + } + + for _, subscription := range nextResult.Value { + s.resources[*subscription.SubscriptionID] = make(map[string]*azureResource) + s.resourceTypes[*subscription.SubscriptionID] = make(map[string]*azureType) + s.discoveredSubscriptions[*subscription.SubscriptionID] = subscription + s.regionsFromSubscriptions[*subscription.SubscriptionID] = make(map[string]struct{}) + } + } +} + +func (s *azureBatchScraper) getResources(ctx context.Context, subscriptionId string) { + if time.Since(s.resourcesUpdated).Seconds() < s.cfg.CacheResources { + return + } + clientResources := s.getArmClient(subscriptionId) + + existingResources := map[string]void{} + for id := range s.resources[subscriptionId] { + existingResources[id] = void{} + } + + filter := s.getResourcesFilter() + opts := &armresources.ClientListOptions{ + Filter: &filter, + } + + updatedTypes := map[string]*azureType{} + pager := clientResources.NewListPager(opts) + + for pager.More() { + nextResult, err := pager.NextPage(ctx) + if err != nil { + s.settings.Logger.Error("failed to get Azure Resources data", zap.Error(err)) + return + } + + for _, resource := range nextResult.Value { + + if _, ok := s.resources[subscriptionId][*resource.ID]; !ok { + resourceGroup := getResourceGroupFromID(*resource.ID) + attributes := map[string]*string{ + attributeName: resource.Name, + attributeResourceGroup: &resourceGroup, + attributeResourceType: resource.Type, + } + + if resource.Location != nil { + s.regionsFromSubscriptions[subscriptionId][*resource.Location] = struct{}{} + attributes[attributeLocation] = resource.Location + } + + s.resources[subscriptionId][*resource.ID] = &azureResource{ + attributes: attributes, + tags: resource.Tags, + } + + if updatedTypes[*resource.Type] == nil { + updatedTypes[*resource.Type] = &azureType{ + name: resource.Type, + attributes: map[string]*string{}, + resourceIds: []*string{resource.ID}, + } + } else { + updatedTypes[*resource.Type].resourceIds = append(updatedTypes[*resource.Type].resourceIds, resource.ID) + } + } + delete(existingResources, *resource.ID) + } + } + + if len(existingResources) > 0 { + for idToDelete := range existingResources { + delete(s.resources[subscriptionId], idToDelete) + } + } + + s.resourcesUpdated = time.Now() + maps.Copy(s.resourceTypes[subscriptionId], updatedTypes) +} + +func (s *azureBatchScraper) getResourcesFilter() string { + // TODO: switch to parsing services from + // https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/metrics-supported + resourcesTypeFilter := strings.Join(s.cfg.Services, "' or resourceType eq '") + + resourcesGroupFilterString := "" + if len(s.cfg.ResourceGroups) > 0 { + resourcesGroupFilterString = fmt.Sprintf(" and (resourceGroup eq '%s')", + strings.Join(s.cfg.ResourceGroups, "' or resourceGroup eq '")) + } + + return fmt.Sprintf("(resourceType eq '%s')%s", resourcesTypeFilter, resourcesGroupFilterString) +} + +func (s *azureBatchScraper) getResourceMetricsDefinitionsByType(ctx context.Context, subscription *armsubscriptions.Subscription, resourceType string) { + + if time.Since(s.resourceTypes[*subscription.SubscriptionID][resourceType].metricsDefinitionsUpdated).Seconds() < s.cfg.CacheResourcesDefinitions { + return + } + + s.resourceTypes[*subscription.SubscriptionID][resourceType].metricsByCompositeKey = map[metricsCompositeKey]*azureResourceMetrics{} + + resourceIds := s.resourceTypes[*subscription.SubscriptionID][resourceType].resourceIds + if len(resourceIds) == 0 && resourceIds[0] != nil { + return + } + + clientMetricsDefinitions := s.getMetricsDefinitionsClient(*subscription.SubscriptionID) + pager := clientMetricsDefinitions.NewListPager(*resourceIds[0], nil) + for pager.More() { + nextResult, err := pager.NextPage(ctx) + if err != nil { + s.settings.Logger.Error("failed to get Azure Metrics definitions data", zap.Error(err)) + return + } + + for _, v := range nextResult.Value { + s.settings.Logger.Info("getResourceMetricsDefinitionsByType", zap.String("resourceType", resourceType), zap.Any("v", v)) + timeGrain := *v.MetricAvailabilities[0].TimeGrain + name := *v.Name.Value + compositeKey := metricsCompositeKey{timeGrain: timeGrain} + + if len(v.Dimensions) > 0 { + var dimensionsSlice []string + for _, dimension := range v.Dimensions { + if len(strings.TrimSpace(*dimension.Value)) > 0 { + dimensionsSlice = append(dimensionsSlice, *dimension.Value) + } + } + sort.Strings(dimensionsSlice) + compositeKey.dimensions = strings.Join(dimensionsSlice, ",") + } + s.storeMetricsDefinitionByType(*subscription.SubscriptionID, resourceType, name, compositeKey) + } + } + s.resourceTypes[*subscription.SubscriptionID][resourceType].metricsDefinitionsUpdated = time.Now() +} + +func (s *azureBatchScraper) storeMetricsDefinitionByType(subscriptionid string, resourceType string, name string, compositeKey metricsCompositeKey) { + if _, ok := s.resourceTypes[subscriptionid][resourceType].metricsByCompositeKey[compositeKey]; ok { + s.resourceTypes[subscriptionid][resourceType].metricsByCompositeKey[compositeKey].metrics = append( + s.resourceTypes[subscriptionid][resourceType].metricsByCompositeKey[compositeKey].metrics, name, + ) + } else { + s.resourceTypes[subscriptionid][resourceType].metricsByCompositeKey[compositeKey] = &azureResourceMetrics{metrics: []string{name}} + } +} + +func (s *azureBatchScraper) getBatchMetricsValues(ctx context.Context, subscription *armsubscriptions.Subscription, resourceType string) { + resType := *s.resourceTypes[*subscription.SubscriptionID][resourceType] + + for compositeKey, metricsByGrain := range resType.metricsByCompositeKey { + now := time.Now().UTC() + + // Azure Metrics only allows querying every minute? + d := (60 * time.Second) + closestMinute := now.Round(d) + timeSinceClosestMinute := time.Since(closestMinute).Seconds() + if timeSinceClosestMinute < 0 { + // Skip this batch to avoid duplication (on first interval) + continue + } + + if time.Since(metricsByGrain.metricsValuesUpdated).Seconds() < float64(timeGrains[compositeKey.timeGrain]) { + continue + } + metricsByGrain.metricsValuesUpdated = closestMinute + + timeGrain := timeGrains[compositeKey.timeGrain] + startTime := closestMinute.Add(time.Duration(-timeGrain) * time.Second) + for region := range s.regionsFromSubscriptions[*subscription.SubscriptionID] { + clientMetrics := s.GetMetricsBatchValuesClient(region) + + start := 0 + for start < len(metricsByGrain.metrics) { + + end := start + s.cfg.MaximumNumberOfMetricsInACall + if end > len(metricsByGrain.metrics) { + end = len(metricsByGrain.metrics) + } + + start_resources := 0 + for start_resources < len(resType.resourceIds) { + end_resources := start_resources + 50 // getBatch API is limited to 50 resources max + if end_resources > len(resType.resourceIds) { + end_resources = len(resType.resourceIds) + } + + s.settings.Logger.Debug( + "scrape", + zap.String("subscription", *subscription.DisplayName), + zap.String("region", region), + zap.String("resourceType", resourceType), + zap.Any("resourceIds", resType.resourceIds[start_resources:end_resources]), + zap.Any("metrics", metricsByGrain.metrics[start:end]), + zap.Int("start_resources", start_resources), + zap.Int("end_resrouces", end_resources), + zap.Time("startTime", startTime), + zap.Time("end_time", closestMinute), + zap.String("interval", compositeKey.timeGrain), + ) + + response, err := clientMetrics.QueryBatch( + ctx, + *subscription.SubscriptionID, + resourceType, + metricsByGrain.metrics[start:end], + azquery.ResourceIDList{ResourceIDs: resType.resourceIds[start_resources:end_resources]}, + &azquery.MetricsBatchClientQueryBatchOptions{ + Aggregation: to.SliceOfPtrs( + azquery.AggregationTypeAverage, + azquery.AggregationTypeMaximum, + azquery.AggregationTypeMinimum, + azquery.AggregationTypeTotal, + azquery.AggregationTypeCount, + ), + StartTime: to.Ptr(startTime.Format(time.RFC3339)), + EndTime: to.Ptr(closestMinute.Format(time.RFC3339)), + Interval: to.Ptr(compositeKey.timeGrain), + Top: to.Ptr(int32(s.cfg.MaximumNumberOfDimensionsInACall)), // Defaults to 10 (may be limiting results) + }, + ) + + if err != nil { + var respErr *azcore.ResponseError + if errors.As(err, &respErr) { + s.settings.Logger.Error("failed to get Azure Metrics values data", zap.String("subscription", *subscription.SubscriptionID), zap.String("region", region), zap.String("resourceType", resourceType), zap.Any("metrics", metricsByGrain.metrics[start:end]), zap.Any("resources", resType.resourceIds[start_resources:end_resources]), zap.Any("response", response), zap.Error(err)) + } + s.settings.Logger.Error("failed to get Azure Metrics values data", zap.String("subscription", *subscription.SubscriptionID), zap.String("region", region), zap.String("resourceType", resourceType), zap.Any("metrics", metricsByGrain.metrics[start:end]), zap.Any("resources", resType.resourceIds[start_resources:end_resources]), zap.Any("response", response), zap.Any("responseError", respErr)) + break + } + + //s.settings.Logger.Debug("scrape", zap.Any("response.Values", response.Values)) + for _, metricValues := range response.Values { + for _, metric := range metricValues.Values { + + for _, timeseriesElement := range metric.TimeSeries { + if timeseriesElement.Data != nil { + if metricValues.ResourceID != nil { + res := s.resources[*subscription.SubscriptionID][*metricValues.ResourceID] + attributes := map[string]*string{} + for name, value := range res.attributes { + attributes[name] = value + } + for _, value := range timeseriesElement.MetadataValues { + name := metadataPrefix + *value.Name.Value + attributes[name] = value.Value + } + if s.cfg.AppendTagsAsAttributes { + for tagName, value := range res.tags { + name := tagPrefix + tagName + attributes[name] = value + } + } + attributes["subscription"] = subscription.DisplayName + attributes["timegrain"] = &compositeKey.timeGrain + for i := len(timeseriesElement.Data) - 1; i >= 0; i-- { // reverse for loop because newest timestamp is at the end of the slice + metricValue := timeseriesElement.Data[i] + if metricValue.Average != nil { + s.processQueryTimeseriesData(*metricValues.ResourceID, metric, metricValue, attributes) + break + } + } + } + } + } + } + } + start_resources = end_resources + } + start = end + } + } + } +} + +func (s *azureBatchScraper) processQueryTimeseriesData( + resourceID string, + metric *azquery.Metric, + metricValue *azquery.MetricValue, + attributes map[string]*string, +) { + s.mutex.Lock() + defer s.mutex.Unlock() + + ts := pcommon.NewTimestampFromTime(*metricValue.TimeStamp) + + aggregationsData := []struct { + name string + value *float64 + }{ + {"Average", metricValue.Average}, + {"Count", metricValue.Count}, + {"Maximum", metricValue.Maximum}, + {"Minimum", metricValue.Minimum}, + {"Total", metricValue.Total}, + } + for _, aggregation := range aggregationsData { + if aggregation.value != nil { + s.mb.AddDataPoint( + resourceID, + *metric.Name.Value, + aggregation.name, + string(*metric.Unit), + attributes, + ts, + *aggregation.value, + ) + } + } +}