diff --git a/storage/grpc_client.go b/storage/grpc_client.go index 16aa59dce1d6..4748e269e2d6 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -35,6 +35,7 @@ import ( "google.golang.org/api/iterator" "google.golang.org/api/option" "google.golang.org/api/option/internaloption" + "google.golang.org/api/transport" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/encoding" @@ -118,6 +119,24 @@ type grpcStorageClient struct { settings *settings } +func enableClientMetrics(ctx context.Context, s *settings, config storageConfig) (*metricsContext, error) { + var project string + // TODO: use new auth client + c, err := transport.Creds(ctx, s.clientOption...) + if err == nil { + project = c.ProjectID + } + metricsContext, err := newGRPCMetricContext(ctx, metricsConfig{ + project: project, + interval: config.metricInterval, + manualReader: config.manualReader}, + ) + if err != nil { + return nil, fmt.Errorf("gRPC Metrics: %w", err) + } + return metricsContext, nil +} + // newGRPCStorageClient initializes a new storageClient that uses the gRPC // Storage API. func newGRPCStorageClient(ctx context.Context, opts ...storageOption) (storageClient, error) { diff --git a/storage/grpc_metrics.go b/storage/grpc_metrics.go index 149b37807ed4..f7bebd1defa7 100644 --- a/storage/grpc_metrics.go +++ b/storage/grpc_metrics.go @@ -16,8 +16,8 @@ package storage import ( "context" + "errors" "fmt" - "log" "strings" "time" @@ -29,8 +29,8 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" "google.golang.org/api/option" - "google.golang.org/api/transport" "google.golang.org/grpc" + "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/stats/opentelemetry" ) @@ -39,98 +39,78 @@ const ( metricPrefix = "storage.googleapis.com/client/" ) -func latencyHistogramBoundaries() []float64 { - boundaries := []float64{} - boundary := 0.0 - increment := 0.002 - // 2ms buckets for first 100ms, so we can have higher resolution for uploads and downloads in the 100 KiB range - for i := 0; i < 50; i++ { - boundaries = append(boundaries, boundary) - // increment by 2ms - boundary += increment - } - // For the remaining buckets do 10 10ms, 10 20ms, and so on, up until 5 minutes - for i := 0; i < 150 && boundary < 300; i++ { - boundaries = append(boundaries, boundary) - if i != 0 && i%10 == 0 { - increment *= 2 - } - boundary += increment - } - return boundaries +// Added to help with tests +type storageMonitoredResource struct { + project string + api string + location string + instance string + cloudPlatform string + host string + resource *resource.Resource } -func sizeHistogramBoundaries() []float64 { - kb := 1024.0 - mb := 1024.0 * kb - gb := 1024.0 * mb - boundaries := []float64{} - boundary := 0.0 - increment := 128 * kb - // 128 KiB increments up to 4MiB, then exponential growth - for len(boundaries) < 200 && boundary <= 16*gb { - boundaries = append(boundaries, boundary) - boundary += increment - if boundary >= 4*mb { - increment *= 2 - } +func (smr *storageMonitoredResource) exporter() (metric.Exporter, error) { + exporter, err := mexporter.New( + mexporter.WithProjectID(smr.project), + mexporter.WithMetricDescriptorTypeFormatter(metricFormatter), + mexporter.WithCreateServiceTimeSeries(), + mexporter.WithMonitoredResourceDescription(monitoredResourceName, []string{"project_id", "location", "cloud_platform", "host_id", "instance_id", "api"}), + ) + if err != nil { + return nil, fmt.Errorf("storage: creating metrics exporter: %w", err) } - return boundaries -} - -func metricFormatter(m metricdata.Metrics) string { - return metricPrefix + strings.ReplaceAll(string(m.Name), ".", "/") -} - -func gcpAttributeExpectedDefaults() []attribute.KeyValue { - return []attribute.KeyValue{ - {Key: "location", Value: attribute.StringValue("global")}, - {Key: "cloud_platform", Value: attribute.StringValue("unknown")}, - {Key: "host_id", Value: attribute.StringValue("unknown")}} -} - -// Added to help with tests -type preparedResource struct { - projectToUse string - resource *resource.Resource + return exporter, nil } -func newPreparedResource(ctx context.Context, project string, resourceOptions []resource.Option) (*preparedResource, error) { - detectedAttrs, err := resource.New(ctx, resourceOptions...) +func newStorageMonitoredResource(ctx context.Context, project, api string, opts ...resource.Option) (*storageMonitoredResource, error) { + detectedAttrs, err := resource.New(ctx, opts...) if err != nil { return nil, err } - preparedResource := &preparedResource{} + smr := &storageMonitoredResource{ + instance: uuid.New().String(), + api: api, + project: project, + } s := detectedAttrs.Set() - p, present := s.Value("cloud.account.id") - if present { - preparedResource.projectToUse = p.AsString() + // Attempt to use resource detector project id if project id wasn't + // identified using ADC as a last resort. Otherwise metrics cannot be started. + if p, present := s.Value("cloud.account.id"); present && smr.project == "" { + smr.project = p.AsString() + } else if !present && smr.project == "" { + return nil, errors.New("google cloud project is required to start client-side metrics") + } + if v, ok := s.Value("cloud.region"); ok { + smr.location = v.AsString() } else { - preparedResource.projectToUse = project + smr.location = "global" } - updates := []attribute.KeyValue{} - for _, kv := range gcpAttributeExpectedDefaults() { - if val, present := s.Value(kv.Key); !present || val.AsString() == "" { - updates = append(updates, attribute.KeyValue{Key: kv.Key, Value: kv.Value}) - } + if v, ok := s.Value("cloud.platform"); ok { + smr.cloudPlatform = v.AsString() + } else { + smr.cloudPlatform = "unknown" } - r, err := resource.New( - ctx, - resource.WithAttributes( - attribute.KeyValue{Key: "gcp.resource_type", Value: attribute.StringValue(monitoredResourceName)}, - attribute.KeyValue{Key: "instance_id", Value: attribute.StringValue(uuid.New().String())}, - attribute.KeyValue{Key: "project_id", Value: attribute.StringValue(project)}, - attribute.KeyValue{Key: "api", Value: attribute.StringValue("grpc")}, - ), - resource.WithAttributes(detectedAttrs.Attributes()...), - // Last duplicate key / value wins - resource.WithAttributes(updates...), - ) + if v, ok := s.Value("host.id"); ok { + smr.host = v.AsString() + } else if v, ok := s.Value("faas.id"); ok { + smr.host = v.AsString() + } else { + smr.host = "unknown" + } + smr.resource, err = resource.New(ctx, resource.WithAttributes([]attribute.KeyValue{ + {Key: "gcp.resource_type", Value: attribute.StringValue(monitoredResourceName)}, + {Key: "project_id", Value: attribute.StringValue(smr.project)}, + {Key: "api", Value: attribute.StringValue(smr.api)}, + {Key: "instance_id", Value: attribute.StringValue(smr.instance)}, + {Key: "location", Value: attribute.StringValue(smr.location)}, + {Key: "cloud_platform", Value: attribute.StringValue(smr.cloudPlatform)}, + {Key: "host_id", Value: attribute.StringValue(smr.host)}, + }...)) if err != nil { return nil, err } - preparedResource.resource = r - return preparedResource, nil + return smr, nil } type metricsContext struct { @@ -142,64 +122,65 @@ type metricsContext struct { close func() } -func createHistogramView(name string, boundaries []float64) metric.View { - return metric.NewView(metric.Instrument{ - Name: name, - Kind: metric.InstrumentKindHistogram, - }, metric.Stream{ - Name: name, - Aggregation: metric.AggregationExplicitBucketHistogram{Boundaries: boundaries}, - }) +type metricsConfig struct { + project string + interval time.Duration + customExporter *metric.Exporter + manualReader *metric.ManualReader // used by tests + disableExporter bool // used by tests disables exports + resourceOpts []resource.Option // used by tests } -func newGRPCMetricContext(ctx context.Context, project string, config storageConfig) (*metricsContext, error) { +func newGRPCMetricContext(ctx context.Context, cfg metricsConfig) (*metricsContext, error) { var exporter metric.Exporter meterOpts := []metric.Option{} - if config.metricExporter != nil { - exporter = *config.metricExporter - } else { - preparedResource, err := newPreparedResource(ctx, project, []resource.Option{resource.WithDetectors(gcp.NewDetector())}) + if cfg.customExporter == nil { + var ropts []resource.Option + if cfg.resourceOpts != nil { + ropts = cfg.resourceOpts + } else { + ropts = []resource.Option{resource.WithDetectors(gcp.NewDetector())} + } + smr, err := newStorageMonitoredResource(ctx, cfg.project, "grpc", ropts...) if err != nil { return nil, err } - meterOpts = append(meterOpts, metric.WithResource(preparedResource.resource)) - // Implementation requires a project, if one is not determined possibly user - // credentials. Then we will fail stating gRPC Metrics require a project-id. - if project == "" && preparedResource.projectToUse == "" { - return nil, fmt.Errorf("google cloud project is required to start client-side metrics") - } - // If projectTouse isn't the same as project provided to Storage client, then - // emit a log stating which project is being used to emit metrics to. - if project != preparedResource.projectToUse { - log.Printf("The Project ID configured for metrics is %s, but the Project ID of the storage client is %s. Make sure that the service account in use has the required metric writing role (roles/monitoring.metricWriter) in the project projectIdToUse or metrics will not be written.", preparedResource.projectToUse, project) - } - meOpts := []mexporter.Option{ - mexporter.WithProjectID(preparedResource.projectToUse), - mexporter.WithMetricDescriptorTypeFormatter(metricFormatter), - mexporter.WithCreateServiceTimeSeries(), - mexporter.WithMonitoredResourceDescription(monitoredResourceName, []string{"project_id", "location", "cloud_platform", "host_id", "instance_id", "api"})} - exporter, err = mexporter.New(meOpts...) + exporter, err = smr.exporter() if err != nil { return nil, err } - } - // Metric views update histogram boundaries to be relevant to GCS - // otherwise default OTel histogram boundaries are used. - metricViews := []metric.View{ - createHistogramView("grpc.client.attempt.duration", latencyHistogramBoundaries()), - createHistogramView("grpc.client.attempt.rcvd_total_compressed_message_size", sizeHistogramBoundaries()), - createHistogramView("grpc.client.attempt.sent_total_compressed_message_size", sizeHistogramBoundaries()), + meterOpts = append(meterOpts, metric.WithResource(smr.resource)) + } else { + exporter = *cfg.customExporter } interval := time.Minute - if config.metricInterval > 0 { - interval = config.metricInterval + if cfg.interval > 0 { + interval = cfg.interval + } + meterOpts = append(meterOpts, + // Metric views update histogram boundaries to be relevant to GCS + // otherwise default OTel histogram boundaries are used. + metric.WithView( + createHistogramView("grpc.client.attempt.duration", latencyHistogramBoundaries()), + createHistogramView("grpc.client.attempt.rcvd_total_compressed_message_size", sizeHistogramBoundaries()), + createHistogramView("grpc.client.attempt.sent_total_compressed_message_size", sizeHistogramBoundaries())), + ) + if cfg.manualReader != nil { + meterOpts = append(meterOpts, metric.WithReader(cfg.manualReader)) + } + if !cfg.disableExporter { + meterOpts = append(meterOpts, metric.WithReader( + metric.NewPeriodicReader(&exporterLogSuppressor{Exporter: exporter}, metric.WithInterval(interval)))) } - meterOpts = append(meterOpts, metric.WithReader(metric.NewPeriodicReader(&exporterLogSuppressor{exporter: exporter}, metric.WithInterval(interval))), - metric.WithView(metricViews...)) provider := metric.NewMeterProvider(meterOpts...) mo := opentelemetry.MetricsOptions{ MeterProvider: provider, - Metrics: opentelemetry.DefaultMetrics().Add( + Metrics: stats.NewMetrics( + "grpc.client.attempt.started", + "grpc.client.attempt.duration", + "grpc.client.attempt.sent_total_compressed_message_size", + "grpc.client.attempt.rcvd_total_compressed_message_size", + "grpc.client.call.duration", "grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", @@ -208,45 +189,29 @@ func newGRPCMetricContext(ctx context.Context, project string, config storageCon "grpc.lb.rls.cache_size", "grpc.lb.rls.default_target_picks", "grpc.lb.rls.target_picks", - "grpc.lb.rls.failed_picks"), + "grpc.lb.rls.failed_picks", + ), OptionalLabels: []string{"grpc.lb.locality"}, } opts := []option.ClientOption{ - option.WithGRPCDialOption(opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo})), - option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.StaticMethodCallOption{})), + option.WithGRPCDialOption( + opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo})), + option.WithGRPCDialOption( + grpc.WithDefaultCallOptions(grpc.StaticMethodCallOption{})), } - context := &metricsContext{ + return &metricsContext{ clientOpts: opts, provider: provider, - close: createShutdown(ctx, provider), - } - return context, nil -} - -func enableClientMetrics(ctx context.Context, s *settings, config storageConfig) (*metricsContext, error) { - var project string - c, err := transport.Creds(ctx, s.clientOption...) - if err == nil { - project = c.ProjectID - } - // Enable client-side metrics for gRPC - metricsContext, err := newGRPCMetricContext(ctx, project, config) - if err != nil { - return nil, fmt.Errorf("gRPC Metrics: %w", err) - } - return metricsContext, nil -} - -func createShutdown(ctx context.Context, provider *metric.MeterProvider) func() { - return func() { - provider.Shutdown(ctx) - } + close: func() { + provider.Shutdown(ctx) + }, + }, nil } // Silences permission errors after initial error is emitted to prevent // chatty logs. type exporterLogSuppressor struct { - exporter metric.Exporter + metric.Exporter emittedFailure bool } @@ -254,7 +219,7 @@ type exporterLogSuppressor struct { // lack of credentials after initial failure. // https://pkg.go.dev/go.opentelemetry.io/otel/sdk/metric@v1.28.0#Exporter func (e *exporterLogSuppressor) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error { - if err := e.exporter.Export(ctx, rm); err != nil && !e.emittedFailure { + if err := e.Exporter.Export(ctx, rm); err != nil && !e.emittedFailure { if strings.Contains(err.Error(), "PermissionDenied") { e.emittedFailure = true return fmt.Errorf("gRPC metrics failed due permission issue: %w", err) @@ -264,18 +229,55 @@ func (e *exporterLogSuppressor) Export(ctx context.Context, rm *metricdata.Resou return nil } -func (e *exporterLogSuppressor) Temporality(k metric.InstrumentKind) metricdata.Temporality { - return e.exporter.Temporality(k) +func latencyHistogramBoundaries() []float64 { + boundaries := []float64{} + boundary := 0.0 + increment := 0.002 + // 2ms buckets for first 100ms, so we can have higher resolution for uploads and downloads in the 100 KiB range + for i := 0; i < 50; i++ { + boundaries = append(boundaries, boundary) + // increment by 2ms + boundary += increment + } + // For the remaining buckets do 10 10ms, 10 20ms, and so on, up until 5 minutes + for i := 0; i < 150 && boundary < 300; i++ { + boundaries = append(boundaries, boundary) + if i != 0 && i%10 == 0 { + increment *= 2 + } + boundary += increment + } + return boundaries } -func (e *exporterLogSuppressor) Aggregation(k metric.InstrumentKind) metric.Aggregation { - return e.exporter.Aggregation(k) +func sizeHistogramBoundaries() []float64 { + kb := 1024.0 + mb := 1024.0 * kb + gb := 1024.0 * mb + boundaries := []float64{} + boundary := 0.0 + increment := 128 * kb + // 128 KiB increments up to 4MiB, then exponential growth + for len(boundaries) < 200 && boundary <= 16*gb { + boundaries = append(boundaries, boundary) + boundary += increment + if boundary >= 4*mb { + increment *= 2 + } + } + return boundaries } -func (e *exporterLogSuppressor) ForceFlush(ctx context.Context) error { - return e.exporter.ForceFlush(ctx) +func createHistogramView(name string, boundaries []float64) metric.View { + return metric.NewView(metric.Instrument{ + Name: name, + Kind: metric.InstrumentKindHistogram, + }, metric.Stream{ + Name: name, + Aggregation: metric.AggregationExplicitBucketHistogram{Boundaries: boundaries}, + }) } -func (e *exporterLogSuppressor) Shutdown(ctx context.Context) error { - return e.exporter.Shutdown(ctx) +func metricFormatter(m metricdata.Metrics) string { + return metricPrefix + strings.ReplaceAll(string(m.Name), ".", "/") } diff --git a/storage/grpc_metrics_test.go b/storage/grpc_metrics_test.go index 23b3cf981e1c..44d5ed89bd03 100644 --- a/storage/grpc_metrics_test.go +++ b/storage/grpc_metrics_test.go @@ -34,7 +34,7 @@ func TestMetricFormatter(t *testing.T) { } } -func TestNewPreparedResource(t *testing.T) { +func TestStorageMonitoredResource(t *testing.T) { ctx := context.Background() for _, test := range []struct { desc string @@ -52,16 +52,22 @@ func TestNewPreparedResource(t *testing.T) { }, attribute.KeyValue{ Key: "host_id", Value: attribute.StringValue("unknown"), + }, attribute.KeyValue{ + Key: "project_id", + Value: attribute.StringValue("project-id"), + }, attribute.KeyValue{ + Key: "api", + Value: attribute.StringValue("grpc"), }), }, { - desc: "use detected values when GCP attributes are detected", + desc: "use detected values when GCE attributes are detected", detectedAttributes: []attribute.KeyValue{ - {Key: "location", + {Key: "cloud.region", Value: attribute.StringValue("us-central1")}, - {Key: "cloud_platform", - Value: attribute.StringValue("gcp")}, - {Key: "host_id", + {Key: "cloud.platform", + Value: attribute.StringValue("gce")}, + {Key: "host.id", Value: attribute.StringValue("gce-instance-id")}, }, wantAttributes: attribute.NewSet(attribute.KeyValue{ @@ -69,48 +75,60 @@ func TestNewPreparedResource(t *testing.T) { Value: attribute.StringValue("us-central1"), }, attribute.KeyValue{ Key: "cloud_platform", - Value: attribute.StringValue("gcp"), + Value: attribute.StringValue("gce"), }, attribute.KeyValue{ Key: "host_id", Value: attribute.StringValue("gce-instance-id"), + }, attribute.KeyValue{ + Key: "project_id", + Value: attribute.StringValue("project-id"), + }, attribute.KeyValue{ + Key: "api", + Value: attribute.StringValue("grpc"), }), - }, { - desc: "use default when value is empty string", + }, + { + desc: "use detected values when FAAS attributes are detected", detectedAttributes: []attribute.KeyValue{ - {Key: "location", + {Key: "cloud.region", Value: attribute.StringValue("us-central1")}, - {Key: "cloud_platform", - Value: attribute.StringValue("")}, - {Key: "host_id", - Value: attribute.StringValue("")}, + {Key: "cloud.platform", + Value: attribute.StringValue("cloud-run")}, + {Key: "faas.id", + Value: attribute.StringValue("run-instance-id")}, }, wantAttributes: attribute.NewSet(attribute.KeyValue{ Key: "location", Value: attribute.StringValue("us-central1"), }, attribute.KeyValue{ Key: "cloud_platform", - Value: attribute.StringValue("unknown"), + Value: attribute.StringValue("cloud-run"), }, attribute.KeyValue{ Key: "host_id", - Value: attribute.StringValue("unknown"), + Value: attribute.StringValue("run-instance-id"), + }, attribute.KeyValue{ + Key: "project_id", + Value: attribute.StringValue("project-id"), + }, attribute.KeyValue{ + Key: "api", + Value: attribute.StringValue("grpc"), }), }, } { t.Run(test.desc, func(t *testing.T) { - resourceOptions := []resource.Option{resource.WithAttributes(test.detectedAttributes...)} - result, err := newPreparedResource(ctx, "project", resourceOptions) + smr, err := newStorageMonitoredResource(ctx, "project-id", "grpc", resource.WithAttributes(test.detectedAttributes...)) if err != nil { - t.Errorf("newPreparedResource: %v", err) + t.Errorf("newStorageMonitoredResource: %v", err) } - resultSet := result.resource.Set() + resultSet := smr.resource.Set() for _, want := range test.wantAttributes.ToSlice() { got, exists := resultSet.Value(want.Key) if !exists { - t.Errorf("newPreparedResource: %v not set", want.Key) + t.Errorf("resultSet[%v] not set", want.Key) continue } if got != want.Value { - t.Errorf("newPreparedResource: want[%v] = %v, got: %v", want.Key, want.Value, got) + t.Errorf("want[%v] = %v, got: %v", want.Key, want.Value.AsString(), got.AsString()) continue } } @@ -118,9 +136,56 @@ func TestNewPreparedResource(t *testing.T) { } } +func TestNewGRPCMetricContext(t *testing.T) { + ctx := context.Background() + mr := metric.NewManualReader() + attrs := []attribute.KeyValue{ + {Key: "cloud.region", + Value: attribute.StringValue("us-central1")}, + {Key: "cloud.platform", + Value: attribute.StringValue("gcp")}, + {Key: "host.id", + Value: attribute.StringValue("gce-instance-id")}, + } + cfg := metricsConfig{ + project: "project-id", + manualReader: mr, + disableExporter: true, // disable since this is a unit test + resourceOpts: []resource.Option{resource.WithAttributes(attrs...)}, + } + mc, err := newGRPCMetricContext(ctx, cfg) + if err != nil { + t.Errorf("newGRPCMetricContext: %v", err) + } + defer mc.close() + rm := metricdata.ResourceMetrics{} + if err := mr.Collect(ctx, &rm); err != nil { + t.Errorf("ManualReader.Collect: %v", err) + } + monitoredResourceWant := map[string]string{ + "gcp.resource_type": "storage.googleapis.com/Client", + "api": "grpc", + "cloud_platform": "gcp", + "host_id": "gce-instance-id", + "location": "us-central1", + "project_id": "project-id", + "instance_id": "ignore", + } + for _, attr := range rm.Resource.Attributes() { + want := monitoredResourceWant[string(attr.Key)] + if want == "ignore" { + continue + } + got := attr.Value.AsString() + if want != got { + t.Errorf("got: %v want: %v", got, want) + } + } +} + func TestNewExporterLogSuppressor(t *testing.T) { ctx := context.Background() - s := &exporterLogSuppressor{exporter: &failingExporter{}} + s := &exporterLogSuppressor{Exporter: &failingExporter{}} if err := s.Export(ctx, nil); err == nil { t.Errorf("exporterLogSuppressor: did not emit an error when one was expected") } @@ -129,24 +194,10 @@ func TestNewExporterLogSuppressor(t *testing.T) { } } -type failingExporter struct{} +type failingExporter struct { + metric.Exporter +} func (f *failingExporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error { return fmt.Errorf("PermissionDenied") } - -func (f *failingExporter) Temporality(m metric.InstrumentKind) metricdata.Temporality { - return metricdata.CumulativeTemporality -} - -func (f *failingExporter) Aggregation(ik metric.InstrumentKind) metric.Aggregation { - return metric.AggregationDefault{} -} - -func (f *failingExporter) ForceFlush(ctx context.Context) error { - return nil -} - -func (f *failingExporter) Shutdown(ctx context.Context) error { - return nil -} diff --git a/storage/integration_test.go b/storage/integration_test.go index 0a19c59e4815..ef384296d0b5 100644 --- a/storage/integration_test.go +++ b/storage/integration_test.go @@ -55,6 +55,8 @@ import ( "github.com/googleapis/gax-go/v2/apierror" "go.opentelemetry.io/contrib/detectors/gcp" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" @@ -416,6 +418,107 @@ func TestIntegration_DoNotDetectDirectConnectivityWhenDisabled(t *testing.T) { }, internaloption.EnableDirectPath(false)) } +func TestIntegration_MetricsEnablement(t *testing.T) { + ctx := skipHTTP("grpc only test") + mr := metric.NewManualReader() + multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, bucket string, prefix string, client *Client) { + it := client.Bucket(bucket).Objects(ctx, nil) + _, err := it.Next() + if err != iterator.Done { + t.Errorf("Objects.Next: expected iterator.Done got %v", err) + } + rm := metricdata.ResourceMetrics{} + if err := mr.Collect(ctx, &rm); err != nil { + t.Errorf("ManualReader.Collect: %v", err) + } + metricCheck := map[string]bool{ + "grpc.client.attempt.started": false, + "grpc.client.attempt.duration": false, + "grpc.client.attempt.sent_total_compressed_message_size": false, + "grpc.client.attempt.rcvd_total_compressed_message_size": false, + "grpc.client.call.duration": false, + } + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + metricCheck[m.Name] = true + } + } + for k, v := range metricCheck { + if !v { + t.Errorf("metric %v not found", k) + } + } + }, withTestMetricReader(mr)) +} + +func TestIntegration_MetricsEnablementInGCE(t *testing.T) { + ctx := skipHTTP("grpc only test") + mr := metric.NewManualReader() + multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, bucket string, prefix string, client *Client) { + detectedAttrs, err := resource.New(ctx, resource.WithDetectors(gcp.NewDetector())) + if err != nil { + t.Fatalf("resource.New: %v", err) + } + attrs := detectedAttrs.Set() + if v, exists := attrs.Value("cloud.platform"); !exists || v.AsString() != "gcp_compute_engine" { + t.Skip("only testable in a GCE instance") + } + instance, exists := attrs.Value("host.id") + if !exists { + t.Skip("GCE instance id not detected") + } + if v, exists := attrs.Value("cloud.region"); !exists || !strings.Contains(strings.ToLower(v.AsString()), "us-west1") { + t.Skip("inside a GCE instance but region is not us-west1") + } + it := client.Buckets(ctx, testutil.ProjID()) + _, _ = it.Next() + rm := metricdata.ResourceMetrics{} + if err := mr.Collect(ctx, &rm); err != nil { + t.Errorf("ManualReader.Collect: %v", err) + } + + monitoredResourceWant := map[string]string{ + "gcp.resource_type": "storage.googleapis.com/Client", + "api": "grpc", + "cloud_platform": "gcp_compute_engine", + "host_id": instance.AsString(), + "location": "us-west1", + "project_id": testutil.ProjID(), + "instance_id": "ignore", // generated UUID + } + for _, attr := range rm.Resource.Attributes() { + want := monitoredResourceWant[string(attr.Key)] + if want == "ignore" { + continue + } + got := attr.Value.AsString() + if want != got { + t.Errorf("got: %v want: %v", got, want) + } + } + metricCheck := map[string]bool{ + "grpc.client.attempt.started": false, + "grpc.client.attempt.duration": false, + "grpc.client.attempt.sent_total_compressed_message_size": false, + "grpc.client.attempt.rcvd_total_compressed_message_size": false, + "grpc.client.call.duration": false, + "grpc.lb.rls.cache_entries": false, + "grpc.lb.rls.cache_size": false, + "grpc.lb.rls.default_target_picks": false, + } + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + metricCheck[m.Name] = true + } + } + for k, v := range metricCheck { + if !v { + t.Errorf("metric %v not found", k) + } + } + }, withTestMetricReader(mr)) +} + func TestIntegration_BucketCreateDelete(t *testing.T) { ctx := skipJSONReads(context.Background(), "no reads in test") multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, _ string, prefix string, client *Client) { diff --git a/storage/option.go b/storage/option.go index 3b0cf9e71831..a7474842b78a 100644 --- a/storage/option.go +++ b/storage/option.go @@ -79,6 +79,7 @@ type storageConfig struct { disableClientMetrics bool metricExporter *metric.Exporter metricInterval time.Duration + manualReader *metric.ManualReader readStallTimeoutConfig *experimental.ReadStallTimeoutConfig } @@ -192,6 +193,20 @@ func (w *withMetricExporterConfig) ApplyStorageOpt(c *storageConfig) { c.metricExporter = w.metricExporter } +type withTestMetricReaderConfig struct { + internaloption.EmbeddableAdapter + // reader override + metricReader *metric.ManualReader +} + +func withTestMetricReader(ex *metric.ManualReader) option.ClientOption { + return &withTestMetricReaderConfig{metricReader: ex} +} + +func (w *withTestMetricReaderConfig) ApplyStorageOpt(c *storageConfig) { + c.manualReader = w.metricReader +} + // WithReadStallTimeout is an option that may be passed to [NewClient]. // It enables the client to retry the stalled read request, happens as part of // storage.Reader creation. As the name suggest, timeout is adjusted dynamically diff --git a/storage/option_test.go b/storage/option_test.go index dfb30d3667cc..6bdedd3716b8 100644 --- a/storage/option_test.go +++ b/storage/option_test.go @@ -22,6 +22,7 @@ import ( "cloud.google.com/go/storage/experimental" "github.com/google/go-cmp/cmp" "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" + "go.opentelemetry.io/otel/sdk/metric" "google.golang.org/api/option" ) @@ -171,6 +172,21 @@ func TestSetCustomExporter(t *testing.T) { } } +func TestSetManualReader(t *testing.T) { + manualReader := metric.NewManualReader() + want := storageConfig{ + manualReader: manualReader, + } + var got storageConfig + opt := withTestMetricReader(manualReader) + if storageOpt, ok := opt.(storageClientOption); ok { + storageOpt.ApplyStorageOpt(&got) + } + if got.manualReader != want.manualReader { + t.Errorf("TestSetCustomExpoerter: manualReader want=%v, got=%v", want.manualReader, got.manualReader) + } +} + func TestGetDynamicReadReqInitialTimeoutSecFromEnv(t *testing.T) { defaultValue := 10 * time.Second