Skip to content

Commit

Permalink
Merge pull request #117 from kawych/master
Browse files Browse the repository at this point in the history
 Implement translation between External Metrics API and Stackdriver API
  • Loading branch information
kawych authored Mar 14, 2018
2 parents e76842e + 7352b88 commit 75b6ba3
Show file tree
Hide file tree
Showing 23 changed files with 887 additions and 421 deletions.
370 changes: 262 additions & 108 deletions custom-metrics-stackdriver-adapter/Godeps/Godeps.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"github.com/GoogleCloudPlatform/k8s-stackdriver/custom-metrics-stackdriver-adapter/pkg/provider"
"k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/metrics/pkg/apis/external_metrics"
"strings"
)

// TODO(kawych):
Expand Down Expand Up @@ -201,6 +203,30 @@ func (p *StackdriverProvider) ListAllMetrics() []provider.MetricInfo {
return p.translator.GetMetricsFromSDDescriptorsResp(response)
}

// GetExternalMetric queries Stackdriver for external metrics.
func (p *StackdriverProvider) GetExternalMetric(namespace string, metricNameEscaped string, metricSelector labels.Selector) (*external_metrics.ExternalMetricValueList, error) {
metricName := unescapeMetricName(metricNameEscaped)
stackdriverRequest, err := p.translator.GetExternalMetricRequest(metricName, metricSelector)
if err != nil {
return nil, err
}
stackdriverResponse, err := stackdriverRequest.Do()
if err != nil {
return nil, err
}
externalMetricItems, err := p.translator.GetRespForExternalMetric(stackdriverResponse, metricNameEscaped)
if err != nil {
return nil, err
}
return &external_metrics.ExternalMetricValueList{
Items: externalMetricItems,
}, nil
}

func unescapeMetricName(metricName string) string {
return strings.Replace(metricName, "|", "/", -1)
}

func min(a, b int) int {
if a < b {
return a
Expand Down
174 changes: 171 additions & 3 deletions custom-metrics-stackdriver-adapter/pkg/adapter/provider/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,20 @@ import (
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/metrics/pkg/apis/custom_metrics"
"k8s.io/metrics/pkg/apis/external_metrics"
"strconv"
)

var (
// allowedLabelPrefixes and allowedFullLabelNames specify all metric labels allowed for querying
// External Metrics API.
allowedLabelPrefixes = []string{"metric.label", "resource.label", "metadata.system_label", "metadata.user_label"}
allowedFullLabelNames = []string{"resource.type"}
)

const (
Expand Down Expand Up @@ -105,6 +116,19 @@ func (t *Translator) GetSDReqForNodes(nodeList *v1.NodeList, metricName string)
return t.createListTimeseriesRequest(filter), nil
}

// GetExternalMetricRequest returns Stackdriver request for query for external metric.
func (t *Translator) GetExternalMetricRequest(metricName string, metricSelector labels.Selector) (*stackdriver.ProjectsTimeSeriesListCall, error) {
filterForMetric := t.filterForMetric(metricName)
if metricSelector.Empty() {
return t.createListTimeseriesRequest(filterForMetric), nil
}
filterForSelector, err := t.filterForSelector(metricSelector)
if err != nil {
return nil, err
}
return t.createListTimeseriesRequest(joinFilters(filterForMetric, filterForSelector)), nil
}

// GetRespForSingleObject returns translates Stackdriver response to a Custom Metric associated with
// a single object.
func (t *Translator) GetRespForSingleObject(response *stackdriver.ListTimeSeriesResponse, groupResource schema.GroupResource, metricName string, namespace string, name string) (*custom_metrics.MetricValue, error) {
Expand Down Expand Up @@ -137,6 +161,41 @@ func (t *Translator) GetRespForMultipleObjects(response *stackdriver.ListTimeSer
return t.metricsFor(values, groupResource, metricName, list)
}

// GetRespForExternalMetric translates Stackdriver response to list of External Metrics
func (t *Translator) GetRespForExternalMetric(response *stackdriver.ListTimeSeriesResponse, metricName string) ([]external_metrics.ExternalMetricValue, error) {
if len(response.TimeSeries) < 1 {
return nil, provider.NewExternalMetricNotFoundError(metricName)
}
metricValues := []external_metrics.ExternalMetricValue{}
for _, series := range response.TimeSeries {
if len(series.Points) != 1 {
// This shouldn't happen with correct query to Stackdriver
return nil, apierr.NewInternalError(fmt.Errorf("Expected exactly one Point in TimeSeries from Stackdriver, but received %v", len(series.Points)))
}
point := series.Points[0]
endTime, err := time.Parse(time.RFC3339, point.Interval.EndTime)
if err != nil {
return nil, apierr.NewInternalError(fmt.Errorf("Timeseries from Stackdriver has incorrect end time: %s", point.Interval.EndTime))
}
metricValue := external_metrics.ExternalMetricValue{
Timestamp: metav1.NewTime(endTime),
MetricName: metricName,
MetricLabels: t.getMetricLabels(series),
}
value := *point.Value
switch {
case value.Int64Value != nil:
metricValue.Value = *resource.NewQuantity(*value.Int64Value, resource.DecimalSI)
case value.DoubleValue != nil:
metricValue.Value = *resource.NewMilliQuantity(int64(*value.DoubleValue*1000), resource.DecimalSI)
default:
return nil, apierr.NewBadRequest(fmt.Sprintf("Expected metric of type DoubleValue or Int64Value, but received TypedValue: %v", value))
}
metricValues = append(metricValues, metricValue)
}
return metricValues, nil
}

// ListMetricDescriptors returns Stackdriver request for all custom metrics descriptors.
func (t *Translator) ListMetricDescriptors() *stackdriver.ProjectsMetricDescriptorsListCall {
var filter string
Expand Down Expand Up @@ -187,6 +246,29 @@ func getNodeNames(list *v1.NodeList) []string {
return resourceNames
}

func isAllowedLabelName(labelName string) bool {
for _, prefix := range allowedLabelPrefixes {
if strings.HasPrefix(labelName, prefix+".") {
return true
}
}
for _, name := range allowedFullLabelNames {
if labelName == name {
return true
}
}
return false
}

func splitMetricLabel(labelName string) (string, string, error) {
for _, prefix := range allowedLabelPrefixes {
if strings.HasPrefix(labelName, prefix+".") {
return prefix, strings.TrimPrefix(labelName, prefix+"."), nil
}
}
return "", "", apierr.NewBadRequest(fmt.Sprintf("Label name: %s is not allowed.", labelName))
}

func getResourceIDs(list *v1.PodList) []string {
resourceIDs := []string{}
for _, item := range list.Items {
Expand All @@ -195,6 +277,14 @@ func getResourceIDs(list *v1.PodList) []string {
return resourceIDs
}

func quoteAll(list []string) []string {
result := []string{}
for _, item := range list {
result = append(result, fmt.Sprintf("%q", item))
}
return result
}

func joinFilters(filters ...string) string {
return strings.Join(filters, " AND ")
}
Expand Down Expand Up @@ -265,6 +355,87 @@ func (t *Translator) legacyFilterForPods(podIDs []string) string {
return fmt.Sprintf("resource.label.pod_id = one_of(%s)", strings.Join(podIDs, ","))
}

func (t *Translator) filterForSelector(metricSelector labels.Selector) (string, error) {
requirements, selectable := metricSelector.Requirements()
if !selectable {
return "", apierr.NewBadRequest(fmt.Sprintf("Label selector is impossible to match: %s", metricSelector))
}
filters := []string{}
for _, req := range requirements {
switch req.Operator() {
case selection.Equals, selection.DoubleEquals:
if isAllowedLabelName(req.Key()) {
filters = append(filters, fmt.Sprintf("%s = %q", req.Key(), req.Values().List()[0]))
} else {
return "", provider.NewLabelNotAllowedError(req.Key())
}
case selection.NotEquals:
if isAllowedLabelName(req.Key()) {
filters = append(filters, fmt.Sprintf("%s != %q", req.Key(), req.Values().List()[0]))
} else {
return "", provider.NewLabelNotAllowedError(req.Key())
}
case selection.In:
if isAllowedLabelName(req.Key()) {
filters = append(filters, fmt.Sprintf("%s = one_of(%s)", req.Key(), strings.Join(quoteAll(req.Values().List()), ",")))
} else {
return "", provider.NewLabelNotAllowedError(req.Key())
}
case selection.NotIn:
if isAllowedLabelName(req.Key()) {
filters = append(filters, fmt.Sprintf("NOT %s = one_of(%s)", req.Key(), strings.Join(quoteAll(req.Values().List()), ",")))
} else {
return "", provider.NewLabelNotAllowedError(req.Key())
}
case selection.Exists:
prefix, suffix, err := splitMetricLabel(req.Key())
if err == nil {
filters = append(filters, fmt.Sprintf("%s : %s", prefix, suffix))
} else {
return "", provider.NewLabelNotAllowedError(req.Key())
}
case selection.DoesNotExist:
// DoesNotExist is not allowed due to Stackdriver filtering syntax limitation
return "", apierr.NewBadRequest("Label selector with operator DoesNotExist is not allowed")
case selection.GreaterThan:
if isAllowedLabelName(req.Key()) {
value, err := strconv.ParseInt(req.Values().List()[0], 10, 64)
if err != nil {
return "", apierr.NewInternalError(fmt.Errorf("Unexpected error: value %s could not be parsed to integer", req.Values().List()[0]))
}
filters = append(filters, fmt.Sprintf("%s > %v", req.Key(), value))
} else {
return "", provider.NewLabelNotAllowedError(req.Key())
}
case selection.LessThan:
if isAllowedLabelName(req.Key()) {
value, err := strconv.ParseInt(req.Values().List()[0], 10, 64)
if err != nil {
return "", apierr.NewInternalError(fmt.Errorf("Unexpected error: value %s could not be parsed to integer", req.Values().List()[0]))
}
filters = append(filters, fmt.Sprintf("%s < %v", req.Key(), value))
} else {
return "", provider.NewLabelNotAllowedError(req.Key())
}
default:
return "", provider.NewOperationNotSupportedError(fmt.Sprintf("Selector with operator %q", req.Operator()))
}
}
return strings.Join(filters, " AND "), nil
}

func (t *Translator) getMetricLabels(series *stackdriver.TimeSeries) map[string]string {
metricLabels := map[string]string{}
for label, value := range series.Metric.Labels {
metricLabels["metric.label."+label] = value
}
metricLabels["resource.type"] = series.Resource.Type
for label, value := range series.Resource.Labels {
metricLabels["resource.label."+label] = value
}
return metricLabels
}

func (t *Translator) createListTimeseriesRequest(filter string) *stackdriver.ProjectsTimeSeriesListCall {
project := fmt.Sprintf("projects/%s", t.config.Project)
endTime := t.clock.Now()
Expand All @@ -281,9 +452,6 @@ func (t *Translator) getMetricValuesFromResponse(groupResource schema.GroupResou
return nil, provider.NewMetricNotFoundError(groupResource, metricName)
}
metricValues := make(map[string]resource.Quantity)
// Find time series with specified labels matching
// Stackdriver API doesn't allow complex label filtering (i.e. "label1 = x AND (label2 = y OR label2 = z)"),
// therefore only part of the filters is passed and remaining filtering is done here.
for _, series := range response.TimeSeries {
if len(series.Points) != 1 {
// This shouldn't happen with correct query to Stackdriver
Expand Down
Loading

0 comments on commit 75b6ba3

Please sign in to comment.