From fc654f15da548c75c26e77b91148cb10ae0ffa5e Mon Sep 17 00:00:00 2001 From: Ryan Hall Date: Fri, 3 Dec 2021 07:52:48 -0800 Subject: [PATCH] Remove m3collector (#3962) The m3collector is a dead project and never graduated out of the alpha stage. It doesn't seem to have been updated in over 4 years, but people still have to make refactoring changes to it when they are upgrading some other part of m3. We should remove this dead code to increase development velocity. --- .buildkite/pipeline.yml | 4 +- .codecov.yml | 8 - .fossa.yml | 6 - Makefile | 2 - docker/m3collector/Dockerfile | 29 - docker/m3collector/development.Dockerfile | 10 - .../development/m3_stack/docker-compose.yml | 15 - scripts/development/m3_stack/m3collector.yml | 89 -- scripts/development/m3_stack/start_m3.sh | 15 - scripts/process-cover.sh | 3 +- src/cmd/services/m3collector/config/config.go | 51 - src/cmd/services/m3collector/main/main.go | 48 - src/collector/README.md | 6 - src/collector/api/v1/handler/json/report.go | 183 ---- .../api/v1/handler/json/report_test.go | 231 ----- src/collector/api/v1/httpd/handler.go | 94 -- src/collector/config/m3collector.yml | 88 -- src/collector/generated/mocks/generate.go | 24 - src/collector/integration/README.md | 11 - src/collector/integration/client.go | 64 -- src/collector/integration/data.go | 60 -- src/collector/integration/defaults.go | 236 ----- src/collector/integration/integration.go | 49 - src/collector/integration/options.go | 211 ---- ..._match_mapping_rollup_rule_updates_test.go | 111 --- .../report_match_mapping_rule_updates_test.go | 90 -- .../report_match_rollup_rule_updates_test.go | 102 -- .../report_no_match_rule_updates_test.go | 73 -- .../report_with_rule_updates_test.go | 259 ----- src/collector/integration/server/options.go | 169 ---- src/collector/integration/server/server.go | 100 -- src/collector/integration/setup.go | 134 --- .../reporter/m3aggregator/options.go | 74 -- .../reporter/m3aggregator/reporter.go | 345 ------- .../reporter/m3aggregator/reporter_test.go | 941 ------------------ src/collector/reporter/reporter.go | 43 - src/collector/reporter/reporter_mock.go | 126 --- src/collector/server/server.go | 216 ---- 38 files changed, 3 insertions(+), 4317 deletions(-) delete mode 100644 docker/m3collector/Dockerfile delete mode 100644 docker/m3collector/development.Dockerfile delete mode 100644 scripts/development/m3_stack/m3collector.yml delete mode 100644 src/cmd/services/m3collector/config/config.go delete mode 100644 src/cmd/services/m3collector/main/main.go delete mode 100644 src/collector/README.md delete mode 100644 src/collector/api/v1/handler/json/report.go delete mode 100644 src/collector/api/v1/handler/json/report_test.go delete mode 100644 src/collector/api/v1/httpd/handler.go delete mode 100644 src/collector/config/m3collector.yml delete mode 100644 src/collector/generated/mocks/generate.go delete mode 100644 src/collector/integration/README.md delete mode 100644 src/collector/integration/client.go delete mode 100644 src/collector/integration/data.go delete mode 100644 src/collector/integration/defaults.go delete mode 100644 src/collector/integration/integration.go delete mode 100644 src/collector/integration/options.go delete mode 100644 src/collector/integration/report_match_mapping_rollup_rule_updates_test.go delete mode 100644 src/collector/integration/report_match_mapping_rule_updates_test.go delete mode 100644 src/collector/integration/report_match_rollup_rule_updates_test.go delete mode 100644 src/collector/integration/report_no_match_rule_updates_test.go delete mode 100644 src/collector/integration/report_with_rule_updates_test.go delete mode 100644 src/collector/integration/server/options.go delete mode 100644 src/collector/integration/server/server.go delete mode 100644 src/collector/integration/setup.go delete mode 100644 src/collector/reporter/m3aggregator/options.go delete mode 100644 src/collector/reporter/m3aggregator/reporter.go delete mode 100644 src/collector/reporter/m3aggregator/reporter_test.go delete mode 100644 src/collector/reporter/reporter.go delete mode 100644 src/collector/reporter/reporter_mock.go delete mode 100644 src/collector/server/server.go diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 64669fb52f..ea8ebba498 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -114,9 +114,9 @@ steps: run: app workdir: /go/src/github.com/m3db/m3 <<: *common - - label: "Integration (collector, m3em, cluster, msg, metrics) %n" + - label: "Integration (m3em, cluster, msg, metrics) %n" parallelism: 4 - command: make clean install-vendor-m3 test-ci-integration-collector test-ci-integration-m3em test-ci-integration-cluster test-ci-integration-msg test-ci-integration-metrics + command: make clean install-vendor-m3 test-ci-integration-m3em test-ci-integration-cluster test-ci-integration-msg test-ci-integration-metrics plugins: docker-compose#v2.5.1: run: app diff --git a/.codecov.yml b/.codecov.yml index 329a94e74f..df15cf5a86 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -28,10 +28,6 @@ coverage: target: auto threshold: 5% flags: aggregator - collector: - target: auto - threshold: 5% - flags: collector query: target: auto threshold: 5% @@ -56,10 +52,6 @@ coverage: default: off flags: - collector: - paths: - - src/collector/ - - src/cmd/services/m3collector/ aggregator: paths: - src/aggregator/ diff --git a/.fossa.yml b/.fossa.yml index f9a41210d6..bc55c15227 100755 --- a/.fossa.yml +++ b/.fossa.yml @@ -30,12 +30,6 @@ analyze: path: src/cmd/services/m3aggregator/main options: allow-unresolved: true - - name: github.com/m3db/m3/src/cmd/services/m3collector/main - type: go - target: github.com/m3db/m3/src/cmd/services/m3collector/main - path: src/cmd/services/m3collector/main - options: - allow-unresolved: true - name: github.com/m3db/m3/src/cmd/services/m3coordinator/main type: go target: github.com/m3db/m3/src/cmd/services/m3coordinator/main diff --git a/Makefile b/Makefile index 2d3f4c17a6..a5b3e8eb64 100644 --- a/Makefile +++ b/Makefile @@ -50,7 +50,6 @@ SERVICES := \ m3coordinator \ m3aggregator \ m3query \ - m3collector \ m3em_agent \ m3comparator \ r2ctl \ @@ -61,7 +60,6 @@ SUBDIRS := \ msg \ metrics \ cmd \ - collector \ dbnode \ query \ m3em \ diff --git a/docker/m3collector/Dockerfile b/docker/m3collector/Dockerfile deleted file mode 100644 index 1e06a719c6..0000000000 --- a/docker/m3collector/Dockerfile +++ /dev/null @@ -1,29 +0,0 @@ -# stage 1: build -FROM golang1.16.5-alpine3.13 AS builder -LABEL maintainer="The M3DB Authors " - -# Install deps -RUN apk add --update git make bash - -# Add source code -RUN mkdir -p /go/src/github.com/m3db/m3 -ADD . /go/src/github.com/m3db/m3 - -# Build m3dbnode binary -RUN cd /go/src/github.com/m3db/m3/ && \ - git submodule update --init && \ - make m3collector-linux-amd64 - -# stage 2: lightweight "release" -FROM alpine:3.11 -LABEL maintainer="The M3DB Authors " - -EXPOSE 7206-7207/tcp - -RUN apk add --no-cache curl jq - -COPY --from=builder /go/src/github.com/m3db/m3/bin/m3collector /bin/ -COPY --from=builder /go/src/github.com/m3db/m3/src/collector/config/m3collector.yml /etc/m3collector/m3collector.yml - -ENTRYPOINT [ "/bin/m3collector" ] -CMD [ "-f", "/etc/m3collector/m3collector.yml" ] diff --git a/docker/m3collector/development.Dockerfile b/docker/m3collector/development.Dockerfile deleted file mode 100644 index 47a0a6942d..0000000000 --- a/docker/m3collector/development.Dockerfile +++ /dev/null @@ -1,10 +0,0 @@ -FROM alpine:3.11 -LABEL maintainer="The M3DB Authors " - -EXPOSE 7206/tcp 7207/tcp - -ADD ./m3collector /bin/m3collector -ADD ./config/m3collector.yml /etc/m3collector/m3collector.yml - -ENTRYPOINT [ "/bin/m3collector" ] -CMD [ "-f", "/etc/m3collector/m3collector.yml" ] diff --git a/scripts/development/m3_stack/docker-compose.yml b/scripts/development/m3_stack/docker-compose.yml index 2c2017ae91..6be43d5039 100644 --- a/scripts/development/m3_stack/docker-compose.yml +++ b/scripts/development/m3_stack/docker-compose.yml @@ -96,21 +96,6 @@ services: # Note: Use ".tmp" suffix is git ignored. - "./m3coordinator.yml.tmp:/etc/m3coordinator/m3coordinator.yml" - "./schema.proto:/etc/m3coordinator/schema.proto" - m3collector01: - expose: - - "7206" - - "7207" - ports: - - "0.0.0.0:7206:7206" - - "0.0.0.0:7207:7207" - networks: - - backend - build: - context: ../../../bin - dockerfile: ./docker/m3collector/development.Dockerfile - image: m3collector:dev - volumes: - - "./m3collector.yml:/etc/m3collector/m3collector.yml" prometheus01: expose: - "9090" diff --git a/scripts/development/m3_stack/m3collector.yml b/scripts/development/m3_stack/m3collector.yml deleted file mode 100644 index 667b5d18d9..0000000000 --- a/scripts/development/m3_stack/m3collector.yml +++ /dev/null @@ -1,89 +0,0 @@ -listenAddress: 0.0.0.0:7206 - -metrics: - scope: - prefix: collector - prometheus: - onError: none - handlerPath: /metrics - listenAddress: 0.0.0.0:7207 # until https://github.com/m3db/m3/issues/682 is resolved - sanitization: prometheus - samplingRate: 1.0 - extended: none - -etcd: - env: default_env - zone: embedded - service: m3collector - cacheDir: /var/lib/m3kv - etcdClusters: - - zone: embedded - endpoints: - - m3db_seed:2379 - -reporter: - cache: - capacity: 200000 - freshDuration: 5m - stutterDuration: 1m - - matcher: - initWatchTimeout: 10s - rulesKVConfig: - namespace: /rules - namespacesKey: namespaces - ruleSetKeyFmt: rulesets/%s - namespaceTag: application - defaultNamespace: global - nameTagKey: __name__ - matchRangePast: 2m - sortedTagIteratorPool: - size: 8192 - watermark: - low: 0.7 - high: 1.0 - - client: - placementKV: - namespace: /placement - placementWatcher: - key: m3aggregator - initWatchTimeout: 10s - hashType: murmur32 - shardCutoffLingerDuration: 1m - maxTimerBatchSize: 1120 - queueSize: 10000 - queueDropType: oldest - encoder: - initBufferSize: 2048 - maxMessageSize: 10485760 - bytesPool: - buckets: - - capacity: 2048 - count: 4096 - - capacity: 4096 - count: 4096 - watermark: - low: 0.7 - high: 1.0 - connection: - writeTimeout: 250ms - - clock: - maxPositiveSkew: 2m - maxNegativeSkew: 2m - - sortedTagIteratorPool: - size: 8192 - watermark: - low: 0.7 - high: 1.0 - -logging: - level: info - encoding: json - outputPaths: - - stdout - errorOutputPaths: - - stderr - diff --git a/scripts/development/m3_stack/start_m3.sh b/scripts/development/m3_stack/start_m3.sh index 740aca4c2d..94708191f0 100755 --- a/scripts/development/m3_stack/start_m3.sh +++ b/scripts/development/m3_stack/start_m3.sh @@ -30,7 +30,6 @@ fi M3DBNODE_DEV_IMG=$(docker images m3dbnode:dev | fgrep -iv repository | wc -l | xargs) M3COORDINATOR_DEV_IMG=$(docker images m3coordinator:dev | fgrep -iv repository | wc -l | xargs) M3AGGREGATOR_DEV_IMG=$(docker images m3aggregator:dev | fgrep -iv repository | wc -l | xargs) -M3COLLECTOR_DEV_IMG=$(docker images m3collector:dev | fgrep -iv repository | wc -l | xargs) if [[ "$M3DBNODE_DEV_IMG" == "0" ]] || [[ "$FORCE_BUILD" == true ]] || [[ "$BUILD_M3DBNODE" == true ]]; then prepare_build_cmd "make m3dbnode-linux-amd64" @@ -157,16 +156,6 @@ if [[ "$USE_AGGREGATOR" = true ]]; then # Bring up the second replica docker-compose -f docker-compose.yml up $DOCKER_ARGS m3aggregator02 fi - - if [[ "$M3COLLECTOR_DEV_IMG" == "0" ]] || [[ "$FORCE_BUILD" == true ]] || [[ "$BUILD_M3COLLECTOR" == true ]]; then - prepare_build_cmd "make m3collector-linux-amd64" - echo "Building m3collector binary first" - bash -c "$build_cmd" - - docker-compose -f docker-compose.yml up --build $DOCKER_ARGS m3collector01 - else - docker-compose -f docker-compose.yml up $DOCKER_ARGS m3collector01 - fi else echo "Not running aggregator pipeline" fi @@ -357,10 +346,6 @@ if [[ "$USE_AGGREGATOR" = true ]]; then fi docker-compose -f docker-compose.yml up $DOCKER_ARGS m3coordinator01 - - # May not necessarily flush - echo "Sending unaggregated metric to m3collector" - curl http://localhost:7206/api/v1/json/report -X POST -d '{"metrics":[{"type":"gauge","value":42,"tags":{"__name__":"foo_metric","foo":"bar"}}]}' fi echo "Starting Prometheus" diff --git a/scripts/process-cover.sh b/scripts/process-cover.sh index 8e6ea0a9c0..3f3ff79a75 100755 --- a/scripts/process-cover.sh +++ b/scripts/process-cover.sh @@ -8,10 +8,9 @@ fi COVERFILE=$1 SUBMIT_COVER="$(dirname $0)/../.ci/codecov.sh" -TARGETS=("aggregator" "dbnode" "query" "collector" "cluster" "m3ninx" "m3em" "x") +TARGETS=("aggregator" "dbnode" "query" "cluster" "m3ninx" "m3em" "x") target_patterns() { case $1 in - 'collector') echo "^mode|github.com/m3db/m3/src/collector|github.com/m3db/m3/src/cmd/services/m3collector";; 'cluster') echo "^mode|github.com/m3db/m3/src/cluster";; 'aggregator') echo "^mode|github.com/m3db/m3/src/aggregator|github.com/m3db/m3/src/cmd/services/m3aggregator";; 'dbnode') echo "^mode|github.com/m3db/m3/src/dbnode|github.com/m3db/m3/src/cmd/services/m3dbnode";; diff --git a/src/cmd/services/m3collector/config/config.go b/src/cmd/services/m3collector/config/config.go deleted file mode 100644 index 2d607524ad..0000000000 --- a/src/cmd/services/m3collector/config/config.go +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright (c) 2018 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package config - -import ( - "github.com/m3db/m3/src/aggregator/client" - etcdclient "github.com/m3db/m3/src/cluster/client/etcd" - "github.com/m3db/m3/src/metrics/matcher" - "github.com/m3db/m3/src/metrics/matcher/cache" - "github.com/m3db/m3/src/x/clock" - "github.com/m3db/m3/src/x/instrument" - "github.com/m3db/m3/src/x/pool" - - "go.uber.org/zap" -) - -// Configuration is configuration for the collector. -type Configuration struct { - Metrics instrument.MetricsConfiguration `yaml:"metrics"` - Logging zap.Config `yaml:"logging"` - ListenAddress string `yaml:"listenAddress" validate:"nonzero"` - Etcd etcdclient.Configuration `yaml:"etcd"` - Reporter ReporterConfiguration `yaml:"reporter"` -} - -// ReporterConfiguration is the collector -type ReporterConfiguration struct { - Cache cache.Configuration `yaml:"cache" validate:"nonzero"` - Matcher matcher.Configuration `yaml:"matcher" validate:"nonzero"` - Client client.Configuration `yaml:"client"` - SortedTagIteratorPool pool.ObjectPoolConfiguration `yaml:"sortedTagIteratorPool"` - Clock clock.Configuration `yaml:"clock"` -} diff --git a/src/cmd/services/m3collector/main/main.go b/src/cmd/services/m3collector/main/main.go deleted file mode 100644 index 9c7ede6be2..0000000000 --- a/src/cmd/services/m3collector/main/main.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright (c) 2018 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package main - -import ( - "flag" - "log" - _ "net/http/pprof" // pprof: for debug listen server if configured - - "github.com/m3db/m3/src/cmd/services/m3collector/config" - "github.com/m3db/m3/src/collector/server" - xconfig "github.com/m3db/m3/src/x/config" - "github.com/m3db/m3/src/x/config/configflag" -) - -func main() { - var configOpts configflag.Options - configOpts.Register() - - flag.Parse() - - var cfg config.Configuration - if err := configOpts.MainLoad(&cfg, xconfig.Options{}); err != nil { - log.Fatalf("error loading config: %v", err) - } - - server.Run(server.RunOptions{ - Config: cfg, - }) -} diff --git a/src/collector/README.md b/src/collector/README.md deleted file mode 100644 index 4ab5895fe6..0000000000 --- a/src/collector/README.md +++ /dev/null @@ -1,6 +0,0 @@ -## WARNING: This is Alpha software and not intended for use until a stable release. - -# m3collector - -Metrics collection agent. Responsible for collecting metrics and forwarding them to -downstream services (e.g., for aggregation or permanent storage). diff --git a/src/collector/api/v1/handler/json/report.go b/src/collector/api/v1/handler/json/report.go deleted file mode 100644 index c314117a45..0000000000 --- a/src/collector/api/v1/handler/json/report.go +++ /dev/null @@ -1,183 +0,0 @@ -// Copyright (c) 2018 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package json - -import ( - "encoding/json" - "errors" - "fmt" - "math" - "net/http" - - "github.com/m3db/m3/src/collector/reporter" - "github.com/m3db/m3/src/metrics/metric/id" - "github.com/m3db/m3/src/query/api/v1/route" - "github.com/m3db/m3/src/query/models" - "github.com/m3db/m3/src/query/storage" - xerrors "github.com/m3db/m3/src/x/errors" - "github.com/m3db/m3/src/x/instrument" - xhttp "github.com/m3db/m3/src/x/net/http" - "github.com/m3db/m3/src/x/serialize" -) - -const ( - // ReportURL is the url for the report json handler - ReportURL = route.Prefix + "/json/report" - - // ReportHTTPMethod is the HTTP method used with this resource. - ReportHTTPMethod = http.MethodPost - - counterType = "counter" - gaugeType = "gauge" - timerType = "timer" -) - -var ( - errEncoderNoBytes = errors.New("tags encoder has no access to bytes") -) - -type reportHandler struct { - reporter reporter.Reporter - encoderPool serialize.TagEncoderPool - decoderPool serialize.TagDecoderPool - instrumentOpts instrument.Options -} - -// NewReportHandler returns a new instance of the report handler. -func NewReportHandler( - reporter reporter.Reporter, - encoderPool serialize.TagEncoderPool, - decoderPool serialize.TagDecoderPool, - instrumentOpts instrument.Options, -) http.Handler { - return &reportHandler{ - reporter: reporter, - encoderPool: encoderPool, - decoderPool: decoderPool, - instrumentOpts: instrumentOpts, - } -} - -// reportRequest represents the report request from the caller. -type reportRequest struct { - Metrics []metricValue `json:"metrics"` -} - -// metricValue is a reportable metric value. -type metricValue struct { - Type string `json:"type" validate:"nonzero"` - Tags map[string]string `json:"tags" validate:"nonzero"` - Value float64 `json:"value" validate:"nonzero"` -} - -// reportResponse represents the report response. -type reportResponse struct { - Reported int `json:"reported"` -} - -func (h *reportHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - req, err := h.parseRequest(r) - if err != nil { - xhttp.WriteError(w, err) - return - } - - for _, metric := range req.Metrics { - id, err := h.newMetricID(metric) - if err != nil { - xhttp.WriteError(w, err) - return - } - - if err := h.reportMetric(id, metric); err != nil { - xhttp.WriteError(w, err) - return - } - } - - resp := &reportResponse{Reported: len(req.Metrics)} - xhttp.WriteJSONResponse(w, resp, h.instrumentOpts.Logger()) -} - -func (h *reportHandler) parseRequest(r *http.Request) (*reportRequest, error) { - if r.Body == nil { - err := fmt.Errorf("empty request body") - return nil, xerrors.NewInvalidParamsError(err) - } - - defer r.Body.Close() - - req := new(reportRequest) - if err := json.NewDecoder(r.Body).Decode(req); err != nil { - return nil, xerrors.NewInvalidParamsError(err) - } - - return req, nil -} - -func (h *reportHandler) newMetricID(metric metricValue) (id.ID, error) { - tags := models.NewTags(len(metric.Tags), models.NewTagOptions()) - for n, v := range metric.Tags { - tags = tags.AddTag(models.Tag{Name: []byte(n), Value: []byte(v)}) - } - tagsIter := storage.TagsToIdentTagIterator(tags) - - encoder := h.encoderPool.Get() - encoder.Reset() - defer encoder.Finalize() - - if err := encoder.Encode(tagsIter); err != nil { - return nil, err - } - - data, ok := encoder.Data() - if !ok { - return nil, errEncoderNoBytes - } - - // Take a copy of the pooled encoder's bytes - bytes := append([]byte(nil), data.Bytes()...) - - metricTagsIter := serialize.NewMetricTagsIterator(h.decoderPool.Get(), nil) - metricTagsIter.Reset(bytes) - return metricTagsIter, nil -} - -func (h *reportHandler) reportMetric(id id.ID, metric metricValue) error { - switch metric.Type { - case counterType: - roundedValue := math.Ceil(metric.Value) - if roundedValue != metric.Value { - // Not an int - badReqErr := fmt.Errorf("counter value not a float: %v", metric.Value) - return xerrors.NewInvalidParamsError(badReqErr) - } - - return h.reporter.ReportCounter(id, int64(roundedValue)) - case gaugeType: - return h.reporter.ReportGauge(id, metric.Value) - case timerType: - return h.reporter.ReportBatchTimer(id, []float64{metric.Value}) - default: - badReqErr := fmt.Errorf("invalid metric type: %s", metric.Type) - return xerrors.NewInvalidParamsError(badReqErr) - } -} diff --git a/src/collector/api/v1/handler/json/report_test.go b/src/collector/api/v1/handler/json/report_test.go deleted file mode 100644 index db9a16c607..0000000000 --- a/src/collector/api/v1/handler/json/report_test.go +++ /dev/null @@ -1,231 +0,0 @@ -// Copyright (c) 2018 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package json - -import ( - "encoding/json" - "fmt" - "net/http" - "net/http/httptest" - "strings" - "testing" - - "github.com/m3db/m3/src/collector/reporter" - "github.com/m3db/m3/src/metrics/metric/id" - "github.com/m3db/m3/src/x/instrument" - "github.com/m3db/m3/src/x/pool" - "github.com/m3db/m3/src/x/serialize" - - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -const ( - reportRequestJSON = ` -{ - "metrics": [ - { - "type": "counter", - "tags": {"foo": "bar"}, - "value": 1 - }, - { - "type": "gauge", - "tags": {"foo": "baz"}, - "value": 2.42 - }, - { - "type": "timer", - "tags": {"foo": "qux"}, - "value": 3.42 - } - ] -} -` -) - -func TestReportHandler(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - test := newTestReportHandler(ctrl) - handler := test.handler - reporter := test.reporter - - req := newTestReportHandlerRequest(t, reportRequestJSON) - - reporter.EXPECT(). - ReportCounter(gomock.Any(), gomock.Any()). - DoAndReturn(func(id id.ID, value int64) error { - tagValue, ok := id.TagValue([]byte("foo")) - require.True(t, ok) - assert.Equal(t, "bar", string(tagValue)) - assert.Equal(t, int64(1), value) - return nil - }) - - reporter.EXPECT(). - ReportGauge(gomock.Any(), gomock.Any()). - DoAndReturn(func(id id.ID, value float64) error { - tagValue, ok := id.TagValue([]byte("foo")) - require.True(t, ok) - assert.Equal(t, "baz", string(tagValue)) - assert.Equal(t, 2.42, value) - return nil - }) - - reporter.EXPECT(). - ReportBatchTimer(gomock.Any(), gomock.Any()). - DoAndReturn(func(id id.ID, values []float64) error { - tagValue, ok := id.TagValue([]byte("foo")) - require.True(t, ok) - assert.Equal(t, "qux", string(tagValue)) - require.Equal(t, 1, len(values)) - assert.Equal(t, 3.42, values[0]) - return nil - }) - - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) - - resp := decodeParseReportResponse(t, http.StatusOK, recorder) - assert.Equal(t, 3, resp.Reported) -} - -func TestReportHandlerInvalidCounterValue(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - test := newTestReportHandler(ctrl) - handler := test.handler - - req := newTestReportHandlerRequest(t, ` - { - "metrics": [ - { - "type": "counter", - "tags": {"foo": "bar"}, - "value": 1.42 - } - ] - } -`) - - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) - - assert.Equal(t, http.StatusBadRequest, recorder.Code, - fmt.Sprintf("not 400 status: status=%d\nbody=%s\n", - recorder.Code, recorder.Body.String())) -} - -func TestReportHandlerUnknownMetricType(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - test := newTestReportHandler(ctrl) - handler := test.handler - - req := newTestReportHandlerRequest(t, ` - { - "metrics": [ - { - "type": "invalid", - "tags": {"foo": "bar"}, - "value": 1.42 - } - ] - } -`) - - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) - - assert.Equal(t, http.StatusBadRequest, recorder.Code, - fmt.Sprintf("not 400 status: status=%d\nbody=%s\n", - recorder.Code, recorder.Body.String())) -} - -func TestReportHandlerInvalidJSONBody(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - test := newTestReportHandler(ctrl) - handler := test.handler - - req := newTestReportHandlerRequest(t, "plain text") - - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) - - assert.Equal(t, http.StatusBadRequest, recorder.Code, - fmt.Sprintf("not 400 status: status=%d\nbody=%s\n", - recorder.Code, recorder.Body.String())) -} - -type testReportHandler struct { - handler *reportHandler - reporter *reporter.MockReporter -} - -func newTestReportHandler(ctrl *gomock.Controller) testReportHandler { - reporter := reporter.NewMockReporter(ctrl) - poolOpts := pool.NewObjectPoolOptions().SetSize(1) - tagEncoderPool := serialize.NewTagEncoderPool( - serialize.NewTagEncoderOptions(), - poolOpts) - tagEncoderPool.Init() - tagDecoderPool := serialize.NewTagDecoderPool( - serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}), - poolOpts) - tagDecoderPool.Init() - instrumentOpts := instrument.NewOptions() - - handler := NewReportHandler(reporter, - tagEncoderPool, tagDecoderPool, instrumentOpts) - return testReportHandler{ - handler: handler.(*reportHandler), - reporter: reporter, - } -} - -func newTestReportHandlerRequest(t *testing.T, body string) *http.Request { - req, err := http.NewRequest(ReportHTTPMethod, ReportURL, - strings.NewReader(body)) - require.NoError(t, err) - return req -} - -func decodeParseReportResponse( - t *testing.T, - expectStatusCode int, - recorder *httptest.ResponseRecorder, -) reportResponse { - assert.Equal(t, expectStatusCode, recorder.Code, - fmt.Sprintf("not %d status: status=%d\nbody=%s\n", - expectStatusCode, recorder.Code, recorder.Body.String())) - - var resp reportResponse - err := json.NewDecoder(recorder.Body).Decode(&resp) - require.NoError(t, err) - return resp -} diff --git a/src/collector/api/v1/httpd/handler.go b/src/collector/api/v1/httpd/handler.go deleted file mode 100644 index 376d7b7688..0000000000 --- a/src/collector/api/v1/httpd/handler.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright (c) 2018 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package httpd - -import ( - "encoding/json" - "net/http" - "time" - - jsonhandler "github.com/m3db/m3/src/collector/api/v1/handler/json" - "github.com/m3db/m3/src/collector/reporter" - "github.com/m3db/m3/src/x/serialize" - "github.com/m3db/m3/src/x/instrument" - - "github.com/gorilla/mux" -) - -const ( - healthURL = "/health" -) - -// Handler represents an HTTP handler. -type Handler struct { - router *mux.Router - reporter reporter.Reporter - encoderPool serialize.TagEncoderPool - decoderPool serialize.TagDecoderPool - instrumentOpts instrument.Options - createdAt time.Time -} - -// NewHandler returns a new instance of handler with routes. -func NewHandler( - reporter reporter.Reporter, - encoderPool serialize.TagEncoderPool, - decoderPool serialize.TagDecoderPool, - instrumentOpts instrument.Options, -) (*Handler, error) { - return &Handler{ - router: mux.NewRouter(), - reporter: reporter, - encoderPool: encoderPool, - decoderPool: decoderPool, - instrumentOpts: instrumentOpts, - createdAt: time.Now(), - }, nil -} - -// Router returns the handler router. -func (h *Handler) Router() *mux.Router { - return h.router -} - -// RegisterRoutes registers all http routes. -func (h *Handler) RegisterRoutes() error { - // Report handler - reportHandler := jsonhandler.NewReportHandler(h.reporter, h.encoderPool, - h.decoderPool, h.instrumentOpts) - h.router. - Handle(jsonhandler.ReportURL, reportHandler). - Methods(jsonhandler.ReportHTTPMethod) - - h.registerHealthEndpoints() - - return nil -} - -func (h *Handler) registerHealthEndpoints() { - h.router.HandleFunc(healthURL, func(w http.ResponseWriter, r *http.Request) { - json.NewEncoder(w).Encode(struct { - Uptime string `json:"uptime"` - }{ - Uptime: time.Since(h.createdAt).String(), - }) - }).Methods(http.MethodGet) -} diff --git a/src/collector/config/m3collector.yml b/src/collector/config/m3collector.yml deleted file mode 100644 index b9a0181b8d..0000000000 --- a/src/collector/config/m3collector.yml +++ /dev/null @@ -1,88 +0,0 @@ -listenAddress: 0.0.0.0:7206 - -metrics: - scope: - prefix: collector - prometheus: - onError: none - handlerPath: /metrics - listenAddress: 0.0.0.0:7207 # until https://github.com/m3db/m3/issues/682 is resolved - sanitization: prometheus - samplingRate: 1.0 - extended: none - -etcd: - env: default_env - zone: embedded - service: m3collector - cacheDir: /var/lib/m3kv - etcdClusters: - - zone: embedded - endpoints: - - m3db_seed:2379 - -reporter: - cache: - capacity: 200000 - freshDuration: 5m - stutterDuration: 1m - - matcher: - initWatchTimeout: 10s - rulesKVConfig: - namespace: /rules - namespacesKey: namespaces - ruleSetKeyFmt: rulesets/%s - namespaceTag: application - defaultNamespace: global - nameTagKey: __name__ - matchRangePast: 2m - sortedTagIteratorPool: - size: 8192 - watermark: - low: 0.7 - high: 1.0 - - client: - placementKV: - namespace: /placement - placementWatcher: - key: m3aggregator - initWatchTimeout: 10s - hashType: murmur32 - shardCutoffLingerDuration: 1m - maxTimerBatchSize: 1120 - queueSize: 10000 - queueDropType: oldest - encoder: - initBufferSize: 2048 - maxMessageSize: 10485760 - bytesPool: - buckets: - - capacity: 2048 - count: 4096 - - capacity: 4096 - count: 4096 - watermark: - low: 0.7 - high: 1.0 - connection: - writeTimeout: 250ms - - clock: - maxPositiveSkew: 2m - maxNegativeSkew: 2m - - sortedTagIteratorPool: - size: 8192 - watermark: - low: 0.7 - high: 1.0 - -logging: - level: info - encoding: json - outputPaths: - - stdout - errorOutputPaths: - - stderr diff --git a/src/collector/generated/mocks/generate.go b/src/collector/generated/mocks/generate.go deleted file mode 100644 index 51484f7549..0000000000 --- a/src/collector/generated/mocks/generate.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright (c) 2018 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -// mockgen rules for generating mocks for exported interfaces (reflection mode). -//go:generate sh -c "mockgen -package=reporter github.com/m3db/m3/src/collector/reporter Reporter | genclean -pkg github.com/m3db/m3/src/collector/reporter -out $GOPATH/src/github.com/m3db/m3/src/collector/reporter/reporter_mock.go" - -package mocks diff --git a/src/collector/integration/README.md b/src/collector/integration/README.md deleted file mode 100644 index 4d81d27780..0000000000 --- a/src/collector/integration/README.md +++ /dev/null @@ -1,11 +0,0 @@ -Package integration contains integration tests for the collector. - -The directory is structured as follows: -* server/: simple server receiving traffic sent from the collector for integration testing. -* client.go: simple client for testing TCP connections. -* data.go: data structures and utility functions for test data generation and validation. -* defaults.go: default values for various testing parameters. -* integration.go: general utility functions. -* options.go: configurable options for tuning test parameters. -* setup.go: general initialization logic for test setup. -* *_test.go: integration tests. diff --git a/src/collector/integration/client.go b/src/collector/integration/client.go deleted file mode 100644 index 22fe018103..0000000000 --- a/src/collector/integration/client.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package integration - -import ( - "net" - "time" -) - -type client struct { - address string - connectTimeout time.Duration - conn net.Conn -} - -func newClient(address string, connectTimeout time.Duration) *client { - return &client{ - address: address, - connectTimeout: connectTimeout, - } -} - -func (c *client) connect() error { - conn, err := net.DialTimeout("tcp", c.address, c.connectTimeout) - if err != nil { - return err - } - c.conn = conn - return nil -} - -func (c *client) testConnection() bool { - if err := c.connect(); err != nil { - return false - } - c.conn.Close() - c.conn = nil - return true -} - -func (c *client) close() { - if c.conn != nil { - c.conn.Close() - c.conn = nil - } -} diff --git a/src/collector/integration/data.go b/src/collector/integration/data.go deleted file mode 100644 index 711dcca7eb..0000000000 --- a/src/collector/integration/data.go +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package integration - -import ( - "bytes" - - "github.com/m3db/m3/src/metrics/metadata" - "github.com/m3db/m3/src/metrics/metric/id" - "github.com/m3db/m3/src/metrics/metric/unaggregated" -) - -type idGenerator func(int) id.ID - -// nolint:structcheck -type outputResult struct { - idGen idGenerator - metadatas metadata.StagedMetadatas -} - -type outputResults []outputResult - -// nolint:structcheck -type metricWithMetadatas struct { - metric unaggregated.MetricUnion - metadatas metadata.StagedMetadatas -} - -type resultsByTypeAscIDAsc []metricWithMetadatas - -func (r resultsByTypeAscIDAsc) Len() int { return len(r) } -func (r resultsByTypeAscIDAsc) Swap(i, j int) { r[i], r[j] = r[j], r[i] } - -func (r resultsByTypeAscIDAsc) Less(i, j int) bool { - if r[i].metric.Type < r[j].metric.Type { - return true - } - if r[i].metric.Type > r[j].metric.Type { - return false - } - return bytes.Compare(r[i].metric.ID, r[j].metric.ID) < 0 -} diff --git a/src/collector/integration/defaults.go b/src/collector/integration/defaults.go deleted file mode 100644 index acf8b353d5..0000000000 --- a/src/collector/integration/defaults.go +++ /dev/null @@ -1,236 +0,0 @@ -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package integration - -import ( - "fmt" - "hash/adler32" - "time" - - aggclient "github.com/m3db/m3/src/aggregator/client" - "github.com/m3db/m3/src/cluster/generated/proto/placementpb" - "github.com/m3db/m3/src/cluster/kv" - "github.com/m3db/m3/src/cluster/placement" - "github.com/m3db/m3/src/cluster/shard" - "github.com/m3db/m3/src/metrics/encoding/protobuf" - "github.com/m3db/m3/src/metrics/filters" - "github.com/m3db/m3/src/metrics/generated/proto/pipelinepb" - "github.com/m3db/m3/src/metrics/generated/proto/policypb" - "github.com/m3db/m3/src/metrics/generated/proto/rulepb" - "github.com/m3db/m3/src/metrics/matcher" - "github.com/m3db/m3/src/metrics/metric/id" - "github.com/m3db/m3/src/metrics/metric/id/m3" - "github.com/m3db/m3/src/metrics/rules" - "github.com/m3db/m3/src/x/pool" -) - -const ( - defaultNumShards = 4096 - defaultInstanceID = "localhost:6789" - defaultPlacementKey = "placement" - defaultNamespace = "integration" - defaultNamespacesKey = "namespaces" - defaultRuleSetKeyFmt = "rulesets/%s" - // nolint: varcheck - defaultRuleSetKey = "rulesets/integration" - defaultNamespaceTag = "namespace" - defaultNameTag = "name" -) - -func defaultStagedPlacementProto() (*placementpb.PlacementSnapshots, error) { - shardSet := make([]shard.Shard, defaultNumShards) - for i := 0; i < defaultNumShards; i++ { - shardSet[i] = shard.NewShard(uint32(i)).SetState(shard.Available) - } - shards := shard.NewShards(shardSet) - instance := placement.NewInstance(). - SetID(defaultInstanceID). - SetEndpoint(defaultInstanceID). - SetShards(shards) - testPlacement := placement.NewPlacement(). - SetInstances([]placement.Instance{instance}). - SetShards(shards.AllIDs()) - stagedPlacement, err := placement.NewPlacementsFromLatest(testPlacement) - if err != nil { - return nil, err - } - return stagedPlacement.Proto() -} - -func defaultNamespaces() *rulepb.Namespaces { - return &rulepb.Namespaces{ - Namespaces: []*rulepb.Namespace{ - { - Name: defaultNamespace, - Snapshots: []*rulepb.NamespaceSnapshot{ - { - ForRulesetVersion: 1, - Tombstoned: false, - }, - }, - }, - }, - } -} - -func defaultMappingRulesConfig() []*rulepb.MappingRule { - return []*rulepb.MappingRule{ - { - Uuid: "mappingRule1", - Snapshots: []*rulepb.MappingRuleSnapshot{ - { - Name: "mappingRule1.snapshot1", - Tombstoned: false, - CutoverNanos: 1000, - Filter: "mtagName1:mtagValue1", - StoragePolicies: []*policypb.StoragePolicy{ - { - Resolution: policypb.Resolution{ - WindowSize: int64(10 * time.Second), - Precision: int64(time.Second), - }, - Retention: policypb.Retention{ - Period: int64(24 * time.Hour), - }, - }, - }, - }, - }, - }, - } -} - -func defaultRollupRulesConfig() []*rulepb.RollupRule { - return []*rulepb.RollupRule{ - { - Uuid: "rollupRule1", - Snapshots: []*rulepb.RollupRuleSnapshot{ - { - Name: "rollupRule1.snapshot1", - Tombstoned: false, - CutoverNanos: 500, - Filter: "rtagName1:rtagValue1", - KeepOriginal: true, - TargetsV2: []*rulepb.RollupTargetV2{ - { - Pipeline: &pipelinepb.Pipeline{ - Ops: []pipelinepb.PipelineOp{ - { - Type: pipelinepb.PipelineOp_ROLLUP, - Rollup: &pipelinepb.RollupOp{ - NewName: "newRollupName1", - Tags: []string{"namespace", "rtagName1"}, - }, - }, - }, - }, - StoragePolicies: []*policypb.StoragePolicy{ - { - Resolution: policypb.Resolution{ - WindowSize: int64(time.Minute), - Precision: int64(time.Minute), - }, - Retention: policypb.Retention{ - Period: int64(48 * time.Hour), - }, - }, - }, - }, - }, - }, - }, - }, - } -} - -func defaultRuleSet() *rulepb.RuleSet { - return &rulepb.RuleSet{ - Uuid: "07592642-a105-40a5-a5c5-7c416ccb56c5", - Namespace: defaultNamespace, - Tombstoned: false, - CutoverNanos: 1000, - MappingRules: defaultMappingRulesConfig(), - RollupRules: defaultRollupRulesConfig(), - } -} - -func defaultSortedTagIteratorPool() id.SortedTagIteratorPool { - poolOpts := pool.NewObjectPoolOptions() - sortedTagIteratorPool := id.NewSortedTagIteratorPool(poolOpts) - sortedTagIteratorPool.Init(func() id.SortedTagIterator { - return m3.NewPooledSortedTagIterator(nil, sortedTagIteratorPool) - }) - return sortedTagIteratorPool -} - -func defaultMatcherOptions( - store kv.Store, - iterPool id.SortedTagIteratorPool, -) matcher.Options { - sortedTagIteratorFn := func(tagPairs []byte) id.SortedTagIterator { - it := iterPool.Get() - it.Reset(tagPairs) - return it - } - tagsFilterOpts := filters.TagsFilterOptions{ - NameTagKey: []byte(defaultNameTag), - NameAndTagsFn: m3.NameAndTags, - SortedTagIteratorFn: sortedTagIteratorFn, - } - ruleSetOpts := rules.NewOptions(). - SetTagsFilterOptions(tagsFilterOpts). - SetNewRollupIDFn(m3.NewRollupID) - return matcher.NewOptions(). - SetKVStore(store). - SetNamespacesKey(defaultNamespacesKey). - SetRuleSetKeyFn(func(namespace []byte) string { - return fmt.Sprintf(defaultRuleSetKeyFmt, namespace) - }). - SetNamespaceTag([]byte(defaultNamespaceTag)). - SetRuleSetOptions(ruleSetOpts) -} - -func defaultShardFn(id []byte, numShards uint32) uint32 { - return adler32.Checksum(id) % numShards -} - -func defaultBytesPool() pool.BytesPool { - return pool.NewBytesPool([]pool.Bucket{ - {Capacity: 128, Count: 1000}, - {Capacity: 256, Count: 1000}, - {Capacity: 4096, Count: 1000}, - }, pool.NewObjectPoolOptions()) -} - -func defaultAggregatorClientOptions( - store kv.Store, -) aggclient.Options { - watcherOpts := placement.NewWatcherOptions(). - SetStagedPlacementKey(defaultPlacementKey). - SetStagedPlacementStore(store) - bytesPool := defaultBytesPool() - bytesPool.Init() - encoderOpts := protobuf.NewUnaggregatedOptions().SetBytesPool(bytesPool) - return aggclient.NewOptions(). - SetShardFn(defaultShardFn). - SetWatcherOptions(watcherOpts). - SetEncoderOptions(encoderOpts) -} diff --git a/src/collector/integration/integration.go b/src/collector/integration/integration.go deleted file mode 100644 index 173ec5e865..0000000000 --- a/src/collector/integration/integration.go +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package integration - -import ( - "testing" - "time" - - "github.com/m3db/m3/src/cluster/kv" - - "github.com/golang/protobuf/proto" - "github.com/stretchr/testify/require" -) - -type conditionFn func() bool - -func waitUntil(fn conditionFn, timeout time.Duration) bool { - deadline := time.Now().Add(timeout) - for time.Now().Before(deadline) { - if fn() { - return true - } - time.Sleep(time.Second) - } - return false -} - -func updateStore(t *testing.T, store kv.Store, key string, proto proto.Message) { - _, err := store.Set(key, proto) - require.NoError(t, err) -} diff --git a/src/collector/integration/options.go b/src/collector/integration/options.go deleted file mode 100644 index 1a6a1b6178..0000000000 --- a/src/collector/integration/options.go +++ /dev/null @@ -1,211 +0,0 @@ -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package integration - -import ( - "time" - - aggclient "github.com/m3db/m3/src/aggregator/client" - "github.com/m3db/m3/src/cluster/kv" - "github.com/m3db/m3/src/cluster/kv/mem" - "github.com/m3db/m3/src/collector/integration/server" - aggreporter "github.com/m3db/m3/src/collector/reporter/m3aggregator" - "github.com/m3db/m3/src/metrics/matcher" - "github.com/m3db/m3/src/metrics/matcher/cache" - "github.com/m3db/m3/src/x/instrument" -) - -const ( - defaultServerStateChangeTimeout = 5 * time.Second -) - -type testOptions interface { - // SetInstrumentOptions sets the instrument options. - SetInstrumentOptions(value instrument.Options) testOptions - - // InstrumentOptions returns the instrument options. - InstrumentOptions() instrument.Options - - // SetServerAddr sets the server listening address. - SetServerAddr(value string) testOptions - - // ServerAddr returns the server listening address. - ServerAddr() string - - // SetServerOptions sets the server options. - SetServerOptions(value server.Options) testOptions - - // ServerOptions returns the server options. - ServerOptions() server.Options - - // SetServerStateChangeTimeout sets the server state change timeout. - SetServerStateChangeTimeout(value time.Duration) testOptions - - // ServerStateChangeTimeout returns the server state change timeout. - ServerStateChangeTimeout() time.Duration - - // SetKVStore sets the key value store. - SetKVStore(value kv.Store) testOptions - - // KVStore returns the key value store. - KVStore() kv.Store - - // SetCacheOptions sets the cache options. - SetCacheOptions(value cache.Options) testOptions - - // CacheOptions returns the cache options. - CacheOptions() cache.Options - - // SetMatcherOptions sets the matcher options. - SetMatcherOptions(value matcher.Options) testOptions - - // MatcherOptions returns the matcher options. - MatcherOptions() matcher.Options - - // SetAggregatorClientOptions sets the aggregator client options. - SetAggregatorClientOptions(value aggclient.Options) testOptions - - // AggregatorClientOptions returns the aggregator client options. - AggregatorClientOptions() aggclient.Options - - // SetAggregatorReporterOptions sets the aggregator reporter options. - SetAggregatorReporterOptions(value aggreporter.ReporterOptions) testOptions - - // AggregatorReporterOptions returns the reporter options. - AggregatorReporterOptions() aggreporter.ReporterOptions -} - -type options struct { - instrumentOpts instrument.Options - serverAddr string - serverOpts server.Options - serverStateChangeTimeout time.Duration - store kv.Store - cacheOpts cache.Options - matcherOpts matcher.Options - aggClientOpts aggclient.Options - aggReporterOpts aggreporter.ReporterOptions -} - -func newTestOptions() testOptions { - return &options{ - instrumentOpts: instrument.NewOptions(), - serverOpts: server.NewOptions(), - serverStateChangeTimeout: defaultServerStateChangeTimeout, - store: mem.NewStore(), - cacheOpts: cache.NewOptions(), - matcherOpts: matcher.NewOptions(), - aggClientOpts: aggclient.NewOptions(). - SetMaxBatchSize(65536). - SetFlushWorkerCount(4), - aggReporterOpts: aggreporter.NewReporterOptions(), - } -} - -func (o *options) SetInstrumentOptions(value instrument.Options) testOptions { - opts := *o - opts.instrumentOpts = value - return &opts -} - -func (o *options) InstrumentOptions() instrument.Options { - return o.instrumentOpts -} - -func (o *options) SetServerAddr(value string) testOptions { - opts := *o - opts.serverAddr = value - return &opts -} - -func (o *options) ServerAddr() string { - return o.serverAddr -} - -func (o *options) SetServerOptions(value server.Options) testOptions { - opts := *o - opts.serverOpts = value - return &opts -} - -func (o *options) ServerOptions() server.Options { - return o.serverOpts -} - -func (o *options) SetServerStateChangeTimeout(value time.Duration) testOptions { - opts := *o - opts.serverStateChangeTimeout = value - return &opts -} - -func (o *options) ServerStateChangeTimeout() time.Duration { - return o.serverStateChangeTimeout -} - -func (o *options) SetKVStore(value kv.Store) testOptions { - opts := *o - opts.store = value - return &opts -} - -func (o *options) KVStore() kv.Store { - return o.store -} - -func (o *options) SetCacheOptions(value cache.Options) testOptions { - opts := *o - opts.cacheOpts = value - return &opts -} - -func (o *options) CacheOptions() cache.Options { - return o.cacheOpts -} - -func (o *options) SetMatcherOptions(value matcher.Options) testOptions { - opts := *o - opts.matcherOpts = value - return &opts -} - -func (o *options) MatcherOptions() matcher.Options { - return o.matcherOpts -} - -func (o *options) SetAggregatorClientOptions(value aggclient.Options) testOptions { - opts := *o - opts.aggClientOpts = value - return &opts -} - -func (o *options) AggregatorClientOptions() aggclient.Options { - return o.aggClientOpts -} - -func (o *options) SetAggregatorReporterOptions(value aggreporter.ReporterOptions) testOptions { - opts := *o - opts.aggReporterOpts = value - return &opts -} - -func (o *options) AggregatorReporterOptions() aggreporter.ReporterOptions { - return o.aggReporterOpts -} diff --git a/src/collector/integration/report_match_mapping_rollup_rule_updates_test.go b/src/collector/integration/report_match_mapping_rollup_rule_updates_test.go deleted file mode 100644 index b7f135ed7b..0000000000 --- a/src/collector/integration/report_match_mapping_rollup_rule_updates_test.go +++ /dev/null @@ -1,111 +0,0 @@ -// +build integration - -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package integration - -import ( - "fmt" - "testing" - - "github.com/m3db/m3/src/cluster/kv/mem" - "github.com/m3db/m3/src/metrics/aggregation" - "github.com/m3db/m3/src/metrics/metadata" - "github.com/m3db/m3/src/metrics/metric/id" - "github.com/m3db/m3/src/metrics/metric/id/m3" - "github.com/m3db/m3/src/metrics/policy" - - "github.com/stretchr/testify/require" -) - -// TestReportMatchMappingRollupWithRuleUpdates tests that the collector can perform -// rule matching and report metrics correctly when the metrics match against both -// mapping rules and rollup rules, while the rules are updated at a high frequency. -func TestReportMatchMappingRollupWithRuleUpdates(t *testing.T) { - namespaces := defaultNamespaces() - ruleSet := defaultRuleSet() - placement, err := defaultStagedPlacementProto() - require.NoError(t, err) - - // Initialize the kv store with default namespaces and ruleset. - store := mem.NewStore() - updateStore(t, store, defaultNamespacesKey, namespaces) - updateStore(t, store, defaultRuleSetKey, ruleSet) - updateStore(t, store, defaultPlacementKey, placement) - - // Initialize test parameters. - iterPool := defaultSortedTagIteratorPool() - idGen := func(i int) id.ID { - return m3.NewID([]byte(fmt.Sprintf("m3+matchmappingrollup%d+mtagName1=mtagValue1,namespace=%s,rtagName1=rtagValue1", i, defaultNamespace)), iterPool) - } - outputRes := []outputResult{ - { - idGen: idGen, - metadatas: metadata.StagedMetadatas{ - { - CutoverNanos: 1000, - Tombstoned: false, - Metadata: metadata.Metadata{ - Pipelines: []metadata.PipelineMetadata{ - { - AggregationID: aggregation.DefaultID, - StoragePolicies: policy.StoragePolicies{ - policy.MustParseStoragePolicy("10s:1d"), - }, - }, - }, - }, - }, - }, - }, - { - idGen: func(i int) id.ID { - return m3.NewID([]byte(fmt.Sprintf("m3+newRollupName1+m3_rollup=true,namespace=%s,rtagName1=rtagValue1", defaultNamespace)), iterPool) - }, - metadatas: metadata.StagedMetadatas{ - { - CutoverNanos: 500, - Tombstoned: false, - Metadata: metadata.Metadata{ - Pipelines: []metadata.PipelineMetadata{ - { - AggregationID: aggregation.DefaultID, - StoragePolicies: policy.StoragePolicies{ - policy.MustParseStoragePolicy("1m:2d"), - }, - }, - }, - }, - }, - }, - }, - } - - testReportWithRuleUpdates(t, testReportWithRuleUpdatesOptions{ - Description: "test reporting metrics with mapping and rollup rule matches and rule updates", - Store: store, - MatcherOpts: defaultMatcherOptions(store, iterPool), - AggClientOpts: defaultAggregatorClientOptions(store), - InputIDGen: idGen, - OutputRes: outputRes, - RuleUpdateFn: func() { updateStore(t, store, defaultRuleSetKey, ruleSet) }, - }) -} diff --git a/src/collector/integration/report_match_mapping_rule_updates_test.go b/src/collector/integration/report_match_mapping_rule_updates_test.go deleted file mode 100644 index bc16137e1a..0000000000 --- a/src/collector/integration/report_match_mapping_rule_updates_test.go +++ /dev/null @@ -1,90 +0,0 @@ -// +build integration - -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package integration - -import ( - "fmt" - "testing" - - "github.com/m3db/m3/src/cluster/kv/mem" - "github.com/m3db/m3/src/metrics/aggregation" - "github.com/m3db/m3/src/metrics/metadata" - "github.com/m3db/m3/src/metrics/metric/id" - "github.com/m3db/m3/src/metrics/metric/id/m3" - "github.com/m3db/m3/src/metrics/policy" - - "github.com/stretchr/testify/require" -) - -// TestReportMatchMappingWithRuleUpdates tests that the collector can perform -// rule matching and report metrics correctly when the metrics match against -// mapping rules, while the rules are updated at a high frequency. -func TestReportMatchMappingWithRuleUpdates(t *testing.T) { - namespaces := defaultNamespaces() - ruleSet := defaultRuleSet() - placement, err := defaultStagedPlacementProto() - require.NoError(t, err) - - // Initialize the kv store with default namespaces and ruleset. - store := mem.NewStore() - updateStore(t, store, defaultNamespacesKey, namespaces) - updateStore(t, store, defaultRuleSetKey, ruleSet) - updateStore(t, store, defaultPlacementKey, placement) - - // Initialize test parameters. - iterPool := defaultSortedTagIteratorPool() - idGen := func(i int) id.ID { - return m3.NewID([]byte(fmt.Sprintf("m3+matchmapping%d+mtagName1=mtagValue1,namespace=%s", i, defaultNamespace)), iterPool) - } - outputRes := []outputResult{ - { - idGen: idGen, - metadatas: metadata.StagedMetadatas{ - { - CutoverNanos: 1000, - Tombstoned: false, - Metadata: metadata.Metadata{ - Pipelines: []metadata.PipelineMetadata{ - { - AggregationID: aggregation.DefaultID, - StoragePolicies: policy.StoragePolicies{ - policy.MustParseStoragePolicy("10s:1d"), - }, - }, - }, - }, - }, - }, - }, - } - - testReportWithRuleUpdates(t, testReportWithRuleUpdatesOptions{ - Description: "test reporting metrics with mapping rule match and rule updates", - Store: store, - MatcherOpts: defaultMatcherOptions(store, iterPool), - AggClientOpts: defaultAggregatorClientOptions(store), - InputIDGen: idGen, - OutputRes: outputRes, - RuleUpdateFn: func() { updateStore(t, store, defaultRuleSetKey, ruleSet) }, - }) -} diff --git a/src/collector/integration/report_match_rollup_rule_updates_test.go b/src/collector/integration/report_match_rollup_rule_updates_test.go deleted file mode 100644 index 55c54a25cd..0000000000 --- a/src/collector/integration/report_match_rollup_rule_updates_test.go +++ /dev/null @@ -1,102 +0,0 @@ -// +build integration - -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package integration - -import ( - "fmt" - "testing" - - "github.com/m3db/m3/src/cluster/kv/mem" - "github.com/m3db/m3/src/metrics/aggregation" - "github.com/m3db/m3/src/metrics/metadata" - "github.com/m3db/m3/src/metrics/metric/id" - "github.com/m3db/m3/src/metrics/metric/id/m3" - "github.com/m3db/m3/src/metrics/policy" - - "github.com/stretchr/testify/require" -) - -// TestReportMatchRollupWithRuleUpdates tests that the collector can perform -// rule matching and report metrics correctly when the metrics match against -// rollup rules, while the rules are updated at a high frequency. -func TestReportMatchRollupWithRuleUpdates(t *testing.T) { - namespaces := defaultNamespaces() - ruleSet := defaultRuleSet() - placement, err := defaultStagedPlacementProto() - require.NoError(t, err) - - // Initialize the kv store with default namespaces and ruleset. - store := mem.NewStore() - updateStore(t, store, defaultNamespacesKey, namespaces) - updateStore(t, store, defaultRuleSetKey, ruleSet) - updateStore(t, store, defaultPlacementKey, placement) - - // Initialize test parameters. - iterPool := defaultSortedTagIteratorPool() - idGen := func(i int) id.ID { - return m3.NewID([]byte(fmt.Sprintf("m3+matchrollup%d+namespace=%s,rtagName1=rtagValue1", i, defaultNamespace)), iterPool) - } - outputRes := []outputResult{ - { - idGen: idGen, - metadatas: metadata.StagedMetadatas{ - { - CutoverNanos: 500, - Tombstoned: false, - Metadata: metadata.DefaultMetadata, - }, - }, - }, - { - idGen: func(i int) id.ID { - return m3.NewID([]byte(fmt.Sprintf("m3+newRollupName1+m3_rollup=true,namespace=%s,rtagName1=rtagValue1", defaultNamespace)), iterPool) - }, - metadatas: metadata.StagedMetadatas{ - { - CutoverNanos: 500, - Tombstoned: false, - Metadata: metadata.Metadata{ - Pipelines: []metadata.PipelineMetadata{ - { - AggregationID: aggregation.DefaultID, - StoragePolicies: policy.StoragePolicies{ - policy.MustParseStoragePolicy("1m:2d"), - }, - }, - }, - }, - }, - }, - }, - } - - testReportWithRuleUpdates(t, testReportWithRuleUpdatesOptions{ - Description: "test reporting metrics with rollup rule match and rule updates", - Store: store, - MatcherOpts: defaultMatcherOptions(store, iterPool), - AggClientOpts: defaultAggregatorClientOptions(store), - InputIDGen: idGen, - OutputRes: outputRes, - RuleUpdateFn: func() { updateStore(t, store, defaultRuleSetKey, ruleSet) }, - }) -} diff --git a/src/collector/integration/report_no_match_rule_updates_test.go b/src/collector/integration/report_no_match_rule_updates_test.go deleted file mode 100644 index 7c24e1ca47..0000000000 --- a/src/collector/integration/report_no_match_rule_updates_test.go +++ /dev/null @@ -1,73 +0,0 @@ -// +build integration - -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package integration - -import ( - "fmt" - "testing" - - "github.com/m3db/m3/src/cluster/kv/mem" - "github.com/m3db/m3/src/metrics/metadata" - "github.com/m3db/m3/src/metrics/metric/id" - "github.com/m3db/m3/src/metrics/metric/id/m3" - - "github.com/stretchr/testify/require" -) - -// TestReportNoMatchWithRuleUpdates tests that the collector can perform -// rule matching and report metrics correctly when the metrics match no -// rules, while the rules are updated at a high frequency. -func TestReportNoMatchWithRuleUpdates(t *testing.T) { - namespaces := defaultNamespaces() - ruleSet := defaultRuleSet() - placement, err := defaultStagedPlacementProto() - require.NoError(t, err) - - // Initialize the kv store with default namespaces and ruleset. - store := mem.NewStore() - updateStore(t, store, defaultNamespacesKey, namespaces) - updateStore(t, store, defaultRuleSetKey, ruleSet) - updateStore(t, store, defaultPlacementKey, placement) - - // Initialize test parameters. - iterPool := defaultSortedTagIteratorPool() - idGen := func(i int) id.ID { - return m3.NewID([]byte(fmt.Sprintf("m3+nomatch%d+namespace=%s", i, defaultNamespace)), iterPool) - } - outputRes := []outputResult{ - { - idGen: idGen, - metadatas: metadata.DefaultStagedMetadatas, - }, - } - - testReportWithRuleUpdates(t, testReportWithRuleUpdatesOptions{ - Description: "test reporting metrics with no rule match and rule updates", - Store: store, - MatcherOpts: defaultMatcherOptions(store, iterPool), - AggClientOpts: defaultAggregatorClientOptions(store), - InputIDGen: idGen, - OutputRes: outputRes, - RuleUpdateFn: func() { updateStore(t, store, defaultRuleSetKey, ruleSet) }, - }) -} diff --git a/src/collector/integration/report_with_rule_updates_test.go b/src/collector/integration/report_with_rule_updates_test.go deleted file mode 100644 index 6fbbcca1cb..0000000000 --- a/src/collector/integration/report_with_rule_updates_test.go +++ /dev/null @@ -1,259 +0,0 @@ -// +build integration - -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package integration - -import ( - "sort" - "sync" - "testing" - "time" - - aggclient "github.com/m3db/m3/src/aggregator/client" - "github.com/m3db/m3/src/cluster/kv" - aggserver "github.com/m3db/m3/src/collector/integration/server" - "github.com/m3db/m3/src/metrics/encoding/protobuf" - "github.com/m3db/m3/src/metrics/matcher" - "github.com/m3db/m3/src/metrics/metadata" - "github.com/m3db/m3/src/metrics/metric" - "github.com/m3db/m3/src/metrics/metric/id" - "github.com/m3db/m3/src/metrics/metric/unaggregated" - "github.com/m3db/m3/src/metrics/policy" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "github.com/stretchr/testify/require" -) - -type testReportWithRuleUpdatesOptions struct { - Description string - Store kv.Store - MatcherOpts matcher.Options - AggClientOpts aggclient.Options - InputIDGen idGenerator - OutputRes outputResults - RuleUpdateFn func() -} - -func testReportWithRuleUpdates( - t *testing.T, - opts testReportWithRuleUpdatesOptions, -) { - var ( - resultsLock sync.Mutex - received int - results []metricWithMetadatas - ) - handleFn := func( - metric unaggregated.MetricUnion, - metadatas metadata.StagedMetadatas, - ) error { - // The metric ID and metadatas decoded from the iterator are only valid till the - // next decoding iteration, and as such we make a copy here for validation later. - clonedMetric := cloneMetric(metric) - clonedMetadatas := cloneMetadatas(metadatas) - - resultsLock.Lock() - defer resultsLock.Unlock() - results = append(results, metricWithMetadatas{ - metric: clonedMetric, - metadatas: clonedMetadatas, - }) - received++ - return nil - } - - // Set up test. - bytesPool := defaultBytesPool() - bytesPool.Init() - decodingIterOpts := protobuf.NewUnaggregatedOptions().SetBytesPool(bytesPool) - handlerOpts := aggserver.NewHandlerOptions(). - SetHandleFn(handleFn). - SetProtobufUnaggregatedIteratorOptions(decodingIterOpts) - serverOpts := aggserver.NewOptions().SetHandlerOptions(handlerOpts) - testOpts := newTestOptions(). - SetKVStore(opts.Store). - SetMatcherOptions(opts.MatcherOpts). - SetAggregatorClientOptions(opts.AggClientOpts). - SetServerOptions(serverOpts) - testSetup := newTestSetup(t, testOpts) - defer testSetup.close() - - // Start the server. - log := testOpts.InstrumentOptions().Logger() - log.Info(opts.Description) - require.NoError(t, testSetup.startServer()) - log.Info("server is now up") - - // Report metrics. - var ( - reporter = testSetup.Reporter() - reportIter = 100000 - ruleUpdatesIter = 10000 - types = []metric.Type{metric.CounterType, metric.TimerType, metric.GaugeType} - counterVal = int64(1234) - batchTimerVals = []float64{1.57, 2.38, 99.102} - gaugeVal = 9.345 - wg sync.WaitGroup - expectedResults []metricWithMetadatas - expectedResultsLock sync.Mutex - ) - - wg.Add(2) - - go func() { - defer wg.Done() - - var subWg sync.WaitGroup - subWg.Add(3) - go func() { - defer subWg.Done() - for i := 0; i < reportIter; i++ { - metricID := opts.InputIDGen(i) - - require.NoError(t, reporter.ReportCounter(metricID, counterVal)) - expectedResultsLock.Lock() - for _, result := range opts.OutputRes { - resID := id.RawID(result.idGen(i).Bytes()) - expectedResults = append(expectedResults, metricWithMetadatas{ - metric: unaggregated.Counter{ - ID: resID, - Value: counterVal, - }.ToUnion(), - metadatas: result.metadatas, - }) - } - expectedResultsLock.Unlock() - } - }() - - go func() { - defer subWg.Done() - for i := reportIter; i < 2*reportIter; i++ { - metricID := opts.InputIDGen(i) - require.NoError(t, reporter.ReportBatchTimer(metricID, batchTimerVals)) - expectedResultsLock.Lock() - for _, result := range opts.OutputRes { - resID := id.RawID(result.idGen(i).Bytes()) - expectedResults = append(expectedResults, metricWithMetadatas{ - metric: unaggregated.BatchTimer{ - ID: resID, - Values: batchTimerVals, - }.ToUnion(), - metadatas: result.metadatas, - }) - } - expectedResultsLock.Unlock() - } - }() - - go func() { - defer subWg.Done() - for i := 2 * reportIter; i < 3*reportIter; i++ { - metricID := opts.InputIDGen(i) - require.NoError(t, reporter.ReportGauge(metricID, gaugeVal)) - expectedResultsLock.Lock() - for _, result := range opts.OutputRes { - resID := id.RawID(result.idGen(i).Bytes()) - expectedResults = append(expectedResults, metricWithMetadatas{ - metric: unaggregated.Gauge{ - ID: resID, - Value: gaugeVal, - }.ToUnion(), - metadatas: result.metadatas, - }) - } - expectedResultsLock.Unlock() - require.NoError(t, reporter.Flush()) - } - }() - - // Flush everything to the server. - subWg.Wait() - require.NoError(t, reporter.Flush()) - }() - - go func() { - defer wg.Done() - - for i := 0; i < ruleUpdatesIter; i++ { - opts.RuleUpdateFn() - } - }() - wg.Wait() - - // Wait until all metrics are processed. - for { - resultsLock.Lock() - currReceived := received - resultsLock.Unlock() - if currReceived == reportIter*len(types)*len(opts.OutputRes) { - break - } - time.Sleep(50 * time.Millisecond) - } - - // Stop the server - require.NoError(t, testSetup.stopServer()) - log.Info("server is now down") - - // Validate results. - testCmpOpts := []cmp.Option{ - cmpopts.EquateEmpty(), - cmp.AllowUnexported(metricWithMetadatas{}), - cmp.AllowUnexported(policy.StoragePolicy{}), - } - sort.Sort(resultsByTypeAscIDAsc(results)) - sort.Sort(resultsByTypeAscIDAsc(expectedResults)) - require.True(t, cmp.Equal(expectedResults, results, testCmpOpts...)) -} - -func cloneMetric(metric unaggregated.MetricUnion) unaggregated.MetricUnion { - clonedMetric := metric - clonedID := make(id.RawID, len(metric.ID)) - copy(clonedID, metric.ID) - clonedMetric.ID = clonedID - clonedBatchTimerVal := make([]float64, len(metric.BatchTimerVal)) - copy(clonedBatchTimerVal, metric.BatchTimerVal) - clonedMetric.BatchTimerVal = clonedBatchTimerVal - return clonedMetric -} - -func cloneMetadatas(metadatas metadata.StagedMetadatas) metadata.StagedMetadatas { - clonedMetadatas := make(metadata.StagedMetadatas, 0, len(metadatas)) - for _, stagedMetadata := range metadatas { - clonedStagedMetadata := stagedMetadata - pipelines := stagedMetadata.Pipelines - clonedPipelines := make([]metadata.PipelineMetadata, 0, len(pipelines)) - for _, pipeline := range pipelines { - clonedPipeline := metadata.PipelineMetadata{ - AggregationID: pipeline.AggregationID, - StoragePolicies: pipeline.StoragePolicies.Clone(), - Pipeline: pipeline.Pipeline.Clone(), - } - clonedPipelines = append(clonedPipelines, clonedPipeline) - } - clonedStagedMetadata.Pipelines = clonedPipelines - clonedMetadatas = append(clonedMetadatas, clonedStagedMetadata) - } - return clonedMetadatas -} diff --git a/src/collector/integration/server/options.go b/src/collector/integration/server/options.go deleted file mode 100644 index c47c42f56d..0000000000 --- a/src/collector/integration/server/options.go +++ /dev/null @@ -1,169 +0,0 @@ -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package server - -import ( - "github.com/m3db/m3/src/metrics/encoding/protobuf" - "github.com/m3db/m3/src/metrics/metadata" - "github.com/m3db/m3/src/metrics/metric/unaggregated" - "github.com/m3db/m3/src/x/instrument" - xserver "github.com/m3db/m3/src/x/server" -) - -const ( - defaultReadBufferSize = 1440 -) - -// Options provide a set of options for the server. -type Options interface { - // SetHandlerOptions sets the handler options. - SetHandlerOptions(value HandlerOptions) Options - - // HandlerOptions returns the handler options. - HandlerOptions() HandlerOptions - - // SetTCPServerOptions sets the TCP server options. - SetTCPServerOptions(value xserver.Options) Options - - // TCPServerOptions returns the TCP server options. - TCPServerOptions() xserver.Options -} - -type serverOptions struct { - handlerOpts HandlerOptions - serverOpts xserver.Options -} - -// NewOptions create a new set of server options. -func NewOptions() Options { - return &serverOptions{ - handlerOpts: NewHandlerOptions(), - serverOpts: xserver.NewOptions(), - } -} - -func (o *serverOptions) SetHandlerOptions(value HandlerOptions) Options { - opts := *o - opts.handlerOpts = value - return &opts -} - -func (o *serverOptions) HandlerOptions() HandlerOptions { - return o.handlerOpts -} - -func (o *serverOptions) SetTCPServerOptions(value xserver.Options) Options { - opts := *o - opts.serverOpts = value - return &opts -} - -func (o *serverOptions) TCPServerOptions() xserver.Options { - return o.serverOpts -} - -// HandleFn handles a metric alongside its staged metadatas. -type HandleFn func(unaggregated.MetricUnion, metadata.StagedMetadatas) error - -// HandlerOptions provide a set of options for the handler. -type HandlerOptions interface { - // SetInstrumentOptions sets the instrument options. - SetInstrumentOptions(value instrument.Options) HandlerOptions - - // InstrumentOptions returns the instrument options. - InstrumentOptions() instrument.Options - - // SetProtobufUnaggregatedIteratorOptions sets the protobuf unaggregated iterator options. - SetProtobufUnaggregatedIteratorOptions(value protobuf.UnaggregatedOptions) HandlerOptions - - // ProtobufUnaggregatedIteratorOptions returns the protobuf unaggregated iterator options. - ProtobufUnaggregatedIteratorOptions() protobuf.UnaggregatedOptions - - // SetReadBufferSize sets the read buffer size. - SetReadBufferSize(value int) HandlerOptions - - // ReadBufferSize returns the read buffer size. - ReadBufferSize() int - - // SetHandleFn sets the handle fn. - SetHandleFn(value HandleFn) HandlerOptions - - // HandleFn returns the handle fn. - HandleFn() HandleFn -} - -type handlerOptions struct { - handleFn HandleFn - instrumentOpts instrument.Options - protobufItOpts protobuf.UnaggregatedOptions - readBufferSize int -} - -// NewHandlerOptions create a new set of options for the handler. -func NewHandlerOptions() HandlerOptions { - return &handlerOptions{ - instrumentOpts: instrument.NewOptions(), - protobufItOpts: protobuf.NewUnaggregatedOptions(), - readBufferSize: defaultReadBufferSize, - } -} - -func (o *handlerOptions) SetInstrumentOptions(value instrument.Options) HandlerOptions { - opts := *o - opts.instrumentOpts = value - return &opts -} - -func (o *handlerOptions) InstrumentOptions() instrument.Options { - return o.instrumentOpts -} - -func (o *handlerOptions) SetProtobufUnaggregatedIteratorOptions( - value protobuf.UnaggregatedOptions, -) HandlerOptions { - opts := *o - opts.protobufItOpts = value - return &opts -} - -func (o *handlerOptions) ProtobufUnaggregatedIteratorOptions() protobuf.UnaggregatedOptions { - return o.protobufItOpts -} - -func (o *handlerOptions) SetReadBufferSize(value int) HandlerOptions { - opts := *o - opts.readBufferSize = value - return &opts -} - -func (o *handlerOptions) ReadBufferSize() int { - return o.readBufferSize -} - -func (o *handlerOptions) SetHandleFn(value HandleFn) HandlerOptions { - opts := *o - opts.handleFn = value - return &opts -} - -func (o *handlerOptions) HandleFn() HandleFn { - return o.handleFn -} diff --git a/src/collector/integration/server/server.go b/src/collector/integration/server/server.go deleted file mode 100644 index 1da99d82c3..0000000000 --- a/src/collector/integration/server/server.go +++ /dev/null @@ -1,100 +0,0 @@ -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package server - -import ( - "bufio" - "io" - "net" - - "github.com/m3db/m3/src/metrics/encoding" - "github.com/m3db/m3/src/metrics/encoding/protobuf" - "github.com/m3db/m3/src/metrics/metadata" - "github.com/m3db/m3/src/metrics/metric/unaggregated" - xserver "github.com/m3db/m3/src/x/server" - - "go.uber.org/zap" -) - -// NewServer creates a new server. -func NewServer(addr string, opts Options) xserver.Server { - handler := newHandler(opts.HandlerOptions()) - return xserver.NewServer(addr, handler, opts.TCPServerOptions()) -} - -type handler struct { - logger *zap.Logger - itOpts protobuf.UnaggregatedOptions - readBufferSize int - handleFn HandleFn -} - -func newHandler(opts HandlerOptions) *handler { - return &handler{ - logger: opts.InstrumentOptions().Logger(), - itOpts: opts.ProtobufUnaggregatedIteratorOptions(), - readBufferSize: opts.ReadBufferSize(), - handleFn: opts.HandleFn(), - } -} - -func (h *handler) Handle(conn net.Conn) { - reader := bufio.NewReaderSize(conn, h.readBufferSize) - it := protobuf.NewUnaggregatedIterator(reader, h.itOpts) - defer it.Close() - - for it.Next() { - current := it.Current() - var ( - metric unaggregated.MetricUnion - metadatas metadata.StagedMetadatas - ) - switch current.Type { - case encoding.CounterWithMetadatasType: - metric = current.CounterWithMetadatas.Counter.ToUnion() - metadatas = current.CounterWithMetadatas.StagedMetadatas - case encoding.BatchTimerWithMetadatasType: - metric = current.BatchTimerWithMetadatas.BatchTimer.ToUnion() - metadatas = current.BatchTimerWithMetadatas.StagedMetadatas - case encoding.GaugeWithMetadatasType: - metric = current.GaugeWithMetadatas.Gauge.ToUnion() - metadatas = current.GaugeWithMetadatas.StagedMetadatas - default: - h.logger.Error("unrecognized message type", - zap.Any("messageType", current.Type), - ) - } - if err := h.handleFn(metric, metadatas); err != nil { - h.logger.Error("error handling metrics", - zap.Any("metric", metric), - zap.Any("metadatas", metadatas), - zap.Error(err), - ) - } - } - - if err := it.Err(); err != nil && err != io.EOF { - h.logger.Error("decode error", - zap.Error(err)) - } -} - -func (h *handler) Close() {} diff --git a/src/collector/integration/setup.go b/src/collector/integration/setup.go deleted file mode 100644 index d2227b88de..0000000000 --- a/src/collector/integration/setup.go +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package integration - -import ( - "errors" - "flag" - "testing" - "time" - - aggclient "github.com/m3db/m3/src/aggregator/client" - aggserver "github.com/m3db/m3/src/collector/integration/server" - "github.com/m3db/m3/src/collector/reporter" - aggreporter "github.com/m3db/m3/src/collector/reporter/m3aggregator" - "github.com/m3db/m3/src/metrics/matcher" - "github.com/m3db/m3/src/metrics/matcher/cache" - "github.com/m3db/m3/src/x/server" - - "github.com/stretchr/testify/require" -) - -var ( - serverAddrArg = flag.String("serverAddr", "localhost:6789", "server address") - errServerStartTimedOut = errors.New("server took too long to start") - errServerStopTimedOut = errors.New("server took too long to stop") -) - -type testSetup struct { - opts testOptions - client aggclient.Client - reporter reporter.Reporter - serverAddr string - server server.Server -} - -func newTestSetup(t *testing.T, opts testOptions) *testSetup { - if opts == nil { - opts = newTestOptions() - } - - // Create reporter. - cache := cache.NewCache(opts.CacheOptions()) - matcher, err := matcher.NewMatcher(cache, opts.MatcherOptions()) - require.NoError(t, err) - clientOpts := opts.AggregatorClientOptions(). - SetConnectionOptions(opts.AggregatorClientOptions().ConnectionOptions(). - SetWriteTimeout(time.Second)) // CI is slow, prefer slow write than drop data. - client, err := aggclient.NewClient(clientOpts) - require.NoError(t, err) - reporter := aggreporter.NewReporter(matcher, client, opts.AggregatorReporterOptions()) - - // Create server. - serverAddr := *serverAddrArg - if addr := opts.ServerAddr(); addr != "" { - serverAddr = addr - } - server := aggserver.NewServer(serverAddr, opts.ServerOptions()) - - return &testSetup{ - opts: opts, - client: client, - reporter: reporter, - serverAddr: serverAddr, - server: server, - } -} - -func (ts *testSetup) Reporter() reporter.Reporter { return ts.reporter } - -func (ts *testSetup) newClient() *client { - aggClientOpts := ts.opts.AggregatorClientOptions() - connectTimeout := aggClientOpts.ConnectionOptions().ConnectionTimeout() - return newClient(ts.serverAddr, connectTimeout) -} - -func (ts *testSetup) waitUntilServerIsUp() error { - c := ts.newClient() - defer c.close() - - serverIsUp := func() bool { return c.testConnection() } - if waitUntil(serverIsUp, ts.opts.ServerStateChangeTimeout()) { - return nil - } - return errServerStartTimedOut -} - -func (ts *testSetup) waitUntilServerIsDown() error { - c := ts.newClient() - defer c.close() - - serverIsDown := func() bool { return !c.testConnection() } - if waitUntil(serverIsDown, ts.opts.ServerStateChangeTimeout()) { - return nil - } - return errServerStopTimedOut -} - -func (ts *testSetup) startServer() error { - if err := ts.server.ListenAndServe(); err != nil { - return err - } - if err := ts.waitUntilServerIsUp(); err != nil { - return err - } - return ts.client.Init() -} - -func (ts *testSetup) stopServer() error { - ts.server.Close() - return ts.waitUntilServerIsDown() -} - -func (ts *testSetup) close() { - ts.reporter.Close() - ts.server.Close() -} diff --git a/src/collector/reporter/m3aggregator/options.go b/src/collector/reporter/m3aggregator/options.go deleted file mode 100644 index 9515fafd7e..0000000000 --- a/src/collector/reporter/m3aggregator/options.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package m3aggregator - -import ( - "github.com/m3db/m3/src/x/clock" - "github.com/m3db/m3/src/x/instrument" -) - -// ReporterOptions provide a set of options for the reporter. -type ReporterOptions interface { - // SetClockOptions sets the clock options. - SetClockOptions(value clock.Options) ReporterOptions - - // ClockOptions returns the clock options. - ClockOptions() clock.Options - - // SetInstrumentOptions sets the instrument options. - SetInstrumentOptions(value instrument.Options) ReporterOptions - - // InstrumentOptions returns the instrument options. - InstrumentOptions() instrument.Options -} - -type options struct { - clockOpts clock.Options - instrumentOpts instrument.Options -} - -// NewReporterOptions creates a new set of options. -func NewReporterOptions() ReporterOptions { - return &options{ - clockOpts: clock.NewOptions(), - instrumentOpts: instrument.NewOptions(), - } -} - -func (o *options) SetClockOptions(value clock.Options) ReporterOptions { - opts := *o - opts.clockOpts = value - return &opts -} - -func (o *options) ClockOptions() clock.Options { - return o.clockOpts -} - -func (o *options) SetInstrumentOptions(value instrument.Options) ReporterOptions { - opts := *o - opts.instrumentOpts = value - return &opts -} - -func (o *options) InstrumentOptions() instrument.Options { - return o.instrumentOpts -} diff --git a/src/collector/reporter/m3aggregator/reporter.go b/src/collector/reporter/m3aggregator/reporter.go deleted file mode 100644 index 905abb921c..0000000000 --- a/src/collector/reporter/m3aggregator/reporter.go +++ /dev/null @@ -1,345 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package m3aggregator - -import ( - "errors" - "os" - "sync" - "sync/atomic" - "time" - - "github.com/m3db/m3/src/aggregator/client" - creporter "github.com/m3db/m3/src/collector/reporter" - "github.com/m3db/m3/src/metrics/matcher" - "github.com/m3db/m3/src/metrics/metadata" - "github.com/m3db/m3/src/metrics/metric/id" - "github.com/m3db/m3/src/metrics/metric/unaggregated" - "github.com/m3db/m3/src/x/clock" - xerrors "github.com/m3db/m3/src/x/errors" - "github.com/m3db/m3/src/x/instrument" - - "github.com/uber-go/tally" - "go.uber.org/zap" -) - -var ( - errReporterAlreadyClosed = errors.New("reporter is already closed") -) - -type reporterMetrics struct { - reportCounter instrument.MethodMetrics - reportBatchTimer instrument.MethodMetrics - reportGauge instrument.MethodMetrics - reportPending tally.Gauge - flush instrument.MethodMetrics - - counterWritesCounter tally.Counter - r2counterWritesCounter tally.Counter - timerWritesCounter tally.Counter - r2timerWritesCounter tally.Counter - gaugeWritesCounter tally.Counter - r2gaugeWritesCounter tally.Counter -} - -func newReporterMetrics(instrumentOpts instrument.Options) reporterMetrics { - scope := instrumentOpts.MetricsScope() - timerOpts := instrumentOpts.TimerOptions() - hostName := "unknown" - if name, err := os.Hostname(); err == nil { - hostName = name - } else { - instrumentOpts.Logger().Warn("unable to determine hostname when creating reporter", zap.Error(err)) - } - hostScope := scope.Tagged(map[string]string{"host": hostName}) - return reporterMetrics{ - reportCounter: instrument.NewMethodMetrics(scope, "report-counter", timerOpts), - reportBatchTimer: instrument.NewMethodMetrics(scope, "report-batch-timer", timerOpts), - reportGauge: instrument.NewMethodMetrics(scope, "report-gauge", timerOpts), - flush: instrument.NewMethodMetrics(scope, "flush", timerOpts), - reportPending: hostScope.Gauge("report-pending"), - counterWritesCounter: scope.Tagged(map[string]string{ - "metric_type": "counter", - }).Counter("writes"), - r2counterWritesCounter: scope.Tagged(map[string]string{ - "metric_type": "counter", - "r2": "true", - }).Counter("writes"), - timerWritesCounter: scope.Tagged(map[string]string{ - "metric_type": "timer", - }).Counter("writes"), - r2timerWritesCounter: scope.Tagged(map[string]string{ - "metric_type": "timer", - "r2": "true", - }).Counter("writes"), - gaugeWritesCounter: scope.Tagged(map[string]string{ - "metric_type": "gauge", - }).Counter("writes"), - r2gaugeWritesCounter: scope.Tagged(map[string]string{ - "metric_type": "gauge", - "r2": "true", - }).Counter("writes"), - } -} - -type reporter struct { - matcher matcher.Matcher - client client.Client - nowFn clock.NowFn - maxPositiveSkew time.Duration - maxNegativeSkew time.Duration - reportInterval time.Duration - - closed int32 - doneCh chan struct{} - wg sync.WaitGroup - reportPending int64 - metrics reporterMetrics -} - -// NewReporter creates a new reporter. -func NewReporter( - matcher matcher.Matcher, - client client.Client, - opts ReporterOptions, -) creporter.Reporter { - clockOpts := opts.ClockOptions() - instrumentOpts := opts.InstrumentOptions() - r := &reporter{ - matcher: matcher, - client: client, - nowFn: clockOpts.NowFn(), - maxPositiveSkew: clockOpts.MaxPositiveSkew(), - maxNegativeSkew: clockOpts.MaxNegativeSkew(), - reportInterval: instrumentOpts.ReportInterval(), - doneCh: make(chan struct{}), - metrics: newReporterMetrics(instrumentOpts), - } - - r.wg.Add(1) - go r.reportMetrics() - return r -} - -func (r *reporter) ReportCounter(id id.ID, value int64) error { - var ( - reportAt = r.nowFn() - fromNanos = reportAt.Add(-r.maxNegativeSkew).UnixNano() - toNanos = reportAt.Add(r.maxPositiveSkew).UnixNano() - multiErr = xerrors.NewMultiError() - ) - - r.incrementReportPending() - - var ( - counter = unaggregated.Counter{ID: id.Bytes(), Value: value} - matchResult = r.matcher.ForwardMatch(id, fromNanos, toNanos) - numNewIDs = matchResult.NumNewRollupIDs() - stagedMetadatas = matchResult.ForExistingIDAt(fromNanos) - hasDropPolicy = stagedMetadatas.IsDropPolicyApplied() - dropOriginal = numNewIDs > 0 && (!matchResult.KeepOriginal() || hasDropPolicy) - ) - - r.metrics.counterWritesCounter.Inc(1) - r.metrics.r2counterWritesCounter.Inc(int64(numNewIDs)) - - if !dropOriginal { - err := r.client.WriteUntimedCounter(counter, stagedMetadatas) - if err != nil { - multiErr = multiErr.Add(err) - } - } - - for idx := 0; idx < matchResult.NumNewRollupIDs(); idx++ { - var ( - rollupIDWithMetadatas = matchResult.ForNewRollupIDsAt(idx, fromNanos) - rollupID = rollupIDWithMetadatas.ID - metadatas = rollupIDWithMetadatas.Metadatas - ) - if isTombstoned(metadatas, fromNanos) { - continue - } - newRollupCounter := unaggregated.Counter{ID: rollupID, Value: value} - if err := r.client.WriteUntimedCounter(newRollupCounter, metadatas); err != nil { - multiErr = multiErr.Add(err) - } - } - - err := multiErr.FinalError() - r.metrics.reportCounter.ReportSuccessOrError(err, r.nowFn().Sub(reportAt)) - r.decrementReportPending() - return err -} - -func (r *reporter) ReportBatchTimer(id id.ID, value []float64) error { - var ( - reportAt = r.nowFn() - fromNanos = reportAt.Add(-r.maxNegativeSkew).UnixNano() - toNanos = reportAt.Add(r.maxPositiveSkew).UnixNano() - multiErr = xerrors.NewMultiError() - ) - - r.incrementReportPending() - - var ( - batchTimer = unaggregated.BatchTimer{ID: id.Bytes(), Values: value} - matchResult = r.matcher.ForwardMatch(id, fromNanos, toNanos) - numNewIDs = matchResult.NumNewRollupIDs() - stagedMetadatas = matchResult.ForExistingIDAt(fromNanos) - hasDropPolicy = stagedMetadatas.IsDropPolicyApplied() - dropOriginal = numNewIDs > 0 && (!matchResult.KeepOriginal() || hasDropPolicy) - ) - - r.metrics.timerWritesCounter.Inc(1) - r.metrics.r2timerWritesCounter.Inc(int64(numNewIDs)) - - if !dropOriginal { - err := r.client.WriteUntimedBatchTimer(batchTimer, stagedMetadatas) - if err != nil { - multiErr = multiErr.Add(err) - } - } - - for idx := 0; idx < matchResult.NumNewRollupIDs(); idx++ { - var ( - rollupIDWithMetadatas = matchResult.ForNewRollupIDsAt(idx, fromNanos) - rollupID = rollupIDWithMetadatas.ID - metadatas = rollupIDWithMetadatas.Metadatas - ) - if isTombstoned(metadatas, fromNanos) { - continue - } - newRollupTimer := unaggregated.BatchTimer{ID: rollupID, Values: value} - if err := r.client.WriteUntimedBatchTimer(newRollupTimer, metadatas); err != nil { - multiErr = multiErr.Add(err) - } - } - err := multiErr.FinalError() - r.metrics.reportBatchTimer.ReportSuccessOrError(err, r.nowFn().Sub(reportAt)) - r.decrementReportPending() - return err -} - -func (r *reporter) ReportGauge(id id.ID, value float64) error { - var ( - reportAt = r.nowFn() - fromNanos = reportAt.Add(-r.maxNegativeSkew).UnixNano() - toNanos = reportAt.Add(r.maxPositiveSkew).UnixNano() - multiErr = xerrors.NewMultiError() - ) - - r.incrementReportPending() - - var ( - gauge = unaggregated.Gauge{ID: id.Bytes(), Value: value} - matchResult = r.matcher.ForwardMatch(id, fromNanos, toNanos) - numNewIDs = matchResult.NumNewRollupIDs() - stagedMetadatas = matchResult.ForExistingIDAt(fromNanos) - hasDropPolicy = stagedMetadatas.IsDropPolicyApplied() - dropOriginal = numNewIDs > 0 && (!matchResult.KeepOriginal() || hasDropPolicy) - ) - - r.metrics.gaugeWritesCounter.Inc(1) - r.metrics.r2gaugeWritesCounter.Inc(int64(numNewIDs)) - - if !dropOriginal { - err := r.client.WriteUntimedGauge(gauge, stagedMetadatas) - if err != nil { - multiErr = multiErr.Add(err) - } - } - - for idx := 0; idx < matchResult.NumNewRollupIDs(); idx++ { - var ( - rollupIDWithMetadatas = matchResult.ForNewRollupIDsAt(idx, fromNanos) - rollupID = rollupIDWithMetadatas.ID - metadatas = rollupIDWithMetadatas.Metadatas - ) - if isTombstoned(metadatas, fromNanos) { - continue - } - newRollupGauge := unaggregated.Gauge{ID: rollupID, Value: value} - if err := r.client.WriteUntimedGauge(newRollupGauge, metadatas); err != nil { - multiErr = multiErr.Add(err) - } - } - err := multiErr.FinalError() - r.metrics.reportGauge.ReportSuccessOrError(err, r.nowFn().Sub(reportAt)) - r.decrementReportPending() - return err -} - -func (r *reporter) Flush() error { - callStart := r.nowFn() - err := r.client.Flush() - r.metrics.flush.ReportSuccessOrError(err, r.nowFn().Sub(callStart)) - return err -} - -func (r *reporter) Close() error { - if !atomic.CompareAndSwapInt32(&r.closed, 0, 1) { - return errReporterAlreadyClosed - } - close(r.doneCh) - multiErr := xerrors.NewMultiError() - if err := r.client.Close(); err != nil { - multiErr = multiErr.Add(err) - } - if err := r.matcher.Close(); err != nil { - multiErr = multiErr.Add(err) - } - r.wg.Wait() - return multiErr.FinalError() -} - -func (r *reporter) currentReportPending() int64 { return atomic.LoadInt64(&r.reportPending) } -func (r *reporter) incrementReportPending() { atomic.AddInt64(&r.reportPending, 1) } -func (r *reporter) decrementReportPending() { atomic.AddInt64(&r.reportPending, -1) } - -func (r *reporter) reportMetrics() { - defer r.wg.Done() - - ticker := time.NewTicker(r.reportInterval) - for { - select { - case <-ticker.C: - r.metrics.reportPending.Update(float64(r.currentReportPending())) - case <-r.doneCh: - ticker.Stop() - return - } - } -} - -// isTombstoned checks to see if the last metadata is currently active and indicates -// the metric ID has been tombstoned. This is a small optimization so that we don't -// send tombstoned rollup metrics to the m3aggregator only to be rejected there to -// save CPU cycles and network bandwidth. This optimization doesn't affect correctness. -func isTombstoned( - metadatas metadata.StagedMetadatas, - fromNanos int64, -) bool { - if len(metadatas) == 0 { - return false - } - lastMetadata := metadatas[len(metadatas)-1] - return lastMetadata.CutoverNanos <= fromNanos && lastMetadata.Tombstoned -} diff --git a/src/collector/reporter/m3aggregator/reporter_test.go b/src/collector/reporter/m3aggregator/reporter_test.go deleted file mode 100644 index d6feca1b71..0000000000 --- a/src/collector/reporter/m3aggregator/reporter_test.go +++ /dev/null @@ -1,941 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package m3aggregator - -import ( - "errors" - "fmt" - "math" - "os" - "strings" - "sync" - "testing" - "time" - - "github.com/m3db/m3/src/aggregator/client" - "github.com/m3db/m3/src/metrics/aggregation" - "github.com/m3db/m3/src/metrics/matcher" - "github.com/m3db/m3/src/metrics/metadata" - "github.com/m3db/m3/src/metrics/metric" - "github.com/m3db/m3/src/metrics/metric/id" - "github.com/m3db/m3/src/metrics/metric/unaggregated" - "github.com/m3db/m3/src/metrics/policy" - "github.com/m3db/m3/src/metrics/rules" - "github.com/m3db/m3/src/x/clock" - "github.com/m3db/m3/src/x/instrument" - xtime "github.com/m3db/m3/src/x/time" - - "github.com/fortytw2/leaktest" - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/uber-go/tally" -) - -var ( - testNow = time.Unix(1234, 0) - testNowFn = func() time.Time { return testNow } - testPositiveSkew = 10 * time.Second - testNegativeSkew = 10 * time.Second - testFromNanos = testNow.Add(-testNegativeSkew).UnixNano() - testToNanos = testNow.Add(testPositiveSkew).UnixNano() - testClockOpts = clock.NewOptions(). - SetNowFn(testNowFn). - SetMaxPositiveSkew(testPositiveSkew). - SetMaxNegativeSkew(testNegativeSkew) - testReporterOptions = NewReporterOptions(). - SetClockOptions(testClockOpts) - - testMatchForExistingID = metadata.StagedMetadatas{ - { - CutoverNanos: 0, - Tombstoned: false, - Metadata: metadata.Metadata{ - Pipelines: []metadata.PipelineMetadata{ - { - AggregationID: aggregation.DefaultID, - StoragePolicies: policy.StoragePolicies{ - policy.NewStoragePolicy(20*time.Second, xtime.Second, 6*time.Hour), - policy.NewStoragePolicy(10*time.Minute, xtime.Minute, 25*24*time.Hour), - }, - }, - { - AggregationID: aggregation.MustCompressTypes(aggregation.Max), - StoragePolicies: policy.StoragePolicies{ - policy.NewStoragePolicy(time.Minute, xtime.Minute, 2*24*time.Hour), - }, - }, - }, - }, - }, - { - CutoverNanos: math.MaxInt64, - Tombstoned: false, - Metadata: metadata.Metadata{ - Pipelines: []metadata.PipelineMetadata{ - { - AggregationID: aggregation.MustCompressTypes(aggregation.Max, aggregation.P9999), - StoragePolicies: policy.StoragePolicies{ - policy.NewStoragePolicy(time.Second, xtime.Second, time.Hour), - }, - }, - }, - }, - }, - } - - testMatchForNewRollupIDs = []rules.IDWithMetadatas{ - { - ID: []byte("foo"), - Metadatas: metadata.DefaultStagedMetadatas, - }, - { - ID: []byte("bar"), - Metadatas: metadata.StagedMetadatas{ - { - CutoverNanos: 100, - Tombstoned: false, - Metadata: metadata.Metadata{ - Pipelines: []metadata.PipelineMetadata{ - { - AggregationID: aggregation.DefaultID, - StoragePolicies: policy.StoragePolicies{ - policy.NewStoragePolicy(time.Minute, xtime.Minute, 2*24*time.Hour), - policy.NewStoragePolicy(10*time.Minute, xtime.Minute, 25*24*time.Hour), - }, - }, - { - AggregationID: aggregation.MustCompressTypes(aggregation.Max, aggregation.P9999), - StoragePolicies: policy.StoragePolicies{ - policy.NewStoragePolicy(20*time.Second, xtime.Second, 6*time.Hour), - }, - }, - }, - }, - }, - { - CutoverNanos: 200, - Tombstoned: true, - Metadata: metadata.Metadata{ - Pipelines: []metadata.PipelineMetadata{ - { - AggregationID: aggregation.MustCompressTypes(aggregation.P9999), - StoragePolicies: policy.StoragePolicies{ - policy.NewStoragePolicy(time.Second, xtime.Second, time.Hour), - }, - }, - }, - }, - }, - }, - }, - } - - testMatchForRollupWithKeepOriginalIDs = []rules.IDWithMetadatas{ - { - ID: []byte("bar"), - Metadatas: metadata.DefaultStagedMetadatas, - }, - } - - testMatchResult = rules.NewMatchResult( - 0, - math.MaxInt64, - testMatchForExistingID, - testMatchForNewRollupIDs, - true, - ) - - testMatchDropPolicyAppliedResult = rules.NewMatchResult( - 0, - math.MaxInt64, - metadata.StagedMetadatas{metadata.StagedMetadata{ - Metadata: metadata.DropMetadata, - CutoverNanos: testNow.UnixNano() / 2, - }}, - testMatchForNewRollupIDs, - true, - ) - - testMatchDropPolicyNotYetEffectiveResult = rules.NewMatchResult( - 0, - math.MaxInt64, - testMatchForExistingID, - testMatchForNewRollupIDs, - true, - ) - - testMatchKeepOriginalResult = rules.NewMatchResult( - 0, - 50, - testMatchForExistingID, - testMatchForRollupWithKeepOriginalIDs, - true, - ) - - testMatchNoKeepOriginalResult = rules.NewMatchResult( - 0, - 50, - testMatchForExistingID, - testMatchForRollupWithKeepOriginalIDs, - false, - ) -) - -func TestReporterReportCounter(t *testing.T) { - leakCheck := leaktest.Check(t) - defer leakCheck() - - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - var ( - errReportCounter = errors.New("test report counter error") - actual []unaggregated.CounterWithMetadatas - ) - mockID := id.NewMockID(ctrl) - mockID.EXPECT().Bytes().Return([]byte("testCounter")) - mockMatcher := matcher.NewMockMatcher(ctrl) - mockMatcher.EXPECT().ForwardMatch(mockID, testFromNanos, testToNanos).Return(testMatchResult) - mockMatcher.EXPECT().Close().Return(nil).AnyTimes() - mockClient := client.NewMockClient(ctrl) - mockClient.EXPECT(). - WriteUntimedCounter(gomock.Any(), gomock.Any()). - DoAndReturn(func(counter unaggregated.Counter, metadatas metadata.StagedMetadatas) error { - actual = append(actual, unaggregated.CounterWithMetadatas{ - Counter: counter, - StagedMetadatas: metadatas, - }) - return errReportCounter - }).MinTimes(1) - mockClient.EXPECT().Close().Return(nil).AnyTimes() - reporter := NewReporter(mockMatcher, mockClient, testReporterOptions) - defer reporter.Close() - err := reporter.ReportCounter(mockID, 1234) - require.Error(t, err) - require.True(t, strings.Contains(err.Error(), errReportCounter.Error())) - - expected := []unaggregated.CounterWithMetadatas{ - { - Counter: unaggregated.Counter{ - ID: []byte("testCounter"), - Value: 1234, - }, - StagedMetadatas: metadata.StagedMetadatas{ - { - CutoverNanos: 0, - Tombstoned: false, - Metadata: metadata.Metadata{ - Pipelines: []metadata.PipelineMetadata{ - { - AggregationID: aggregation.DefaultID, - StoragePolicies: policy.StoragePolicies{ - policy.NewStoragePolicy(20*time.Second, xtime.Second, 6*time.Hour), - policy.NewStoragePolicy(10*time.Minute, xtime.Minute, 25*24*time.Hour), - }, - }, - { - AggregationID: aggregation.MustCompressTypes(aggregation.Max), - StoragePolicies: policy.StoragePolicies{ - policy.NewStoragePolicy(time.Minute, xtime.Minute, 2*24*time.Hour), - }, - }, - }, - }, - }, - { - CutoverNanos: math.MaxInt64, - Tombstoned: false, - Metadata: metadata.Metadata{ - Pipelines: []metadata.PipelineMetadata{ - { - AggregationID: aggregation.MustCompressTypes(aggregation.Max, aggregation.P9999), - StoragePolicies: policy.StoragePolicies{ - policy.NewStoragePolicy(time.Second, xtime.Second, time.Hour), - }, - }, - }, - }, - }, - }, - }, - { - Counter: unaggregated.Counter{ - ID: []byte("foo"), - Value: 1234, - }, - StagedMetadatas: metadata.DefaultStagedMetadatas, - }, - } - require.Equal(t, expected, actual) -} - -func TestReporterReportBatchTimer(t *testing.T) { - leakCheck := leaktest.Check(t) - defer leakCheck() - - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - var ( - errReportBatchTimer = errors.New("test report batch timer error") - actual []unaggregated.BatchTimerWithMetadatas - ) - mockID := id.NewMockID(ctrl) - mockID.EXPECT().Bytes().Return([]byte("testBatchTimer")) - mockMatcher := matcher.NewMockMatcher(ctrl) - mockMatcher.EXPECT().ForwardMatch(mockID, testFromNanos, testToNanos).Return(testMatchResult) - mockMatcher.EXPECT().Close().Return(nil).AnyTimes() - mockClient := client.NewMockClient(ctrl) - mockClient.EXPECT(). - WriteUntimedBatchTimer(gomock.Any(), gomock.Any()). - DoAndReturn( - func( - batchTimer unaggregated.BatchTimer, - metadatas metadata.StagedMetadatas, - ) error { - actual = append(actual, unaggregated.BatchTimerWithMetadatas{ - BatchTimer: batchTimer, - StagedMetadatas: metadatas, - }) - return errReportBatchTimer - }). - MinTimes(1) - mockClient.EXPECT().Close().Return(nil).AnyTimes() - reporter := NewReporter(mockMatcher, mockClient, testReporterOptions) - defer reporter.Close() - err := reporter.ReportBatchTimer(mockID, []float64{1.3, 2.4}) - require.Error(t, err) - require.True(t, strings.Contains(err.Error(), errReportBatchTimer.Error())) - - expected := []unaggregated.BatchTimerWithMetadatas{ - { - BatchTimer: unaggregated.BatchTimer{ - ID: []byte("testBatchTimer"), - Values: []float64{1.3, 2.4}, - }, - StagedMetadatas: metadata.StagedMetadatas{ - { - CutoverNanos: 0, - Tombstoned: false, - Metadata: metadata.Metadata{ - Pipelines: []metadata.PipelineMetadata{ - { - AggregationID: aggregation.DefaultID, - StoragePolicies: policy.StoragePolicies{ - policy.NewStoragePolicy(20*time.Second, xtime.Second, 6*time.Hour), - policy.NewStoragePolicy(10*time.Minute, xtime.Minute, 25*24*time.Hour), - }, - }, - { - AggregationID: aggregation.MustCompressTypes(aggregation.Max), - StoragePolicies: policy.StoragePolicies{ - policy.NewStoragePolicy(time.Minute, xtime.Minute, 2*24*time.Hour), - }, - }, - }, - }, - }, - { - CutoverNanos: math.MaxInt64, - Tombstoned: false, - Metadata: metadata.Metadata{ - Pipelines: []metadata.PipelineMetadata{ - { - AggregationID: aggregation.MustCompressTypes(aggregation.Max, aggregation.P9999), - StoragePolicies: policy.StoragePolicies{ - policy.NewStoragePolicy(time.Second, xtime.Second, time.Hour), - }, - }, - }, - }, - }, - }, - }, - { - BatchTimer: unaggregated.BatchTimer{ - ID: []byte("foo"), - Values: []float64{1.3, 2.4}, - }, - StagedMetadatas: metadata.DefaultStagedMetadatas, - }, - } - require.Equal(t, expected, actual) -} - -func TestReporterReportGauge(t *testing.T) { - leakCheck := leaktest.Check(t) - defer leakCheck() - - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - var ( - errReportCounter = errors.New("test report gauge error") - actual []unaggregated.GaugeWithMetadatas - ) - mockID := id.NewMockID(ctrl) - mockID.EXPECT().Bytes().Return([]byte("testCounter")) - mockMatcher := matcher.NewMockMatcher(ctrl) - mockMatcher.EXPECT().ForwardMatch(mockID, testFromNanos, testToNanos).Return(testMatchResult) - mockMatcher.EXPECT().Close().Return(nil).AnyTimes() - mockClient := client.NewMockClient(ctrl) - mockClient.EXPECT(). - WriteUntimedGauge(gomock.Any(), gomock.Any()). - DoAndReturn(func(gauge unaggregated.Gauge, metadatas metadata.StagedMetadatas) error { - actual = append(actual, unaggregated.GaugeWithMetadatas{ - Gauge: gauge, - StagedMetadatas: metadatas, - }) - return errReportCounter - }).MinTimes(1) - mockClient.EXPECT().Close().Return(nil).AnyTimes() - reporter := NewReporter(mockMatcher, mockClient, testReporterOptions) - defer reporter.Close() - err := reporter.ReportGauge(mockID, 1.8) - require.Error(t, err) - require.True(t, strings.Contains(err.Error(), errReportCounter.Error())) - - expected := []unaggregated.GaugeWithMetadatas{ - { - Gauge: unaggregated.Gauge{ - ID: []byte("testCounter"), - Value: 1.8, - }, - StagedMetadatas: metadata.StagedMetadatas{ - { - CutoverNanos: 0, - Tombstoned: false, - Metadata: metadata.Metadata{ - Pipelines: []metadata.PipelineMetadata{ - { - AggregationID: aggregation.DefaultID, - StoragePolicies: policy.StoragePolicies{ - policy.NewStoragePolicy(20*time.Second, xtime.Second, 6*time.Hour), - policy.NewStoragePolicy(10*time.Minute, xtime.Minute, 25*24*time.Hour), - }, - }, - { - AggregationID: aggregation.MustCompressTypes(aggregation.Max), - StoragePolicies: policy.StoragePolicies{ - policy.NewStoragePolicy(time.Minute, xtime.Minute, 2*24*time.Hour), - }, - }, - }, - }, - }, - { - CutoverNanos: math.MaxInt64, - Tombstoned: false, - Metadata: metadata.Metadata{ - Pipelines: []metadata.PipelineMetadata{ - { - AggregationID: aggregation.MustCompressTypes(aggregation.Max, aggregation.P9999), - StoragePolicies: policy.StoragePolicies{ - policy.NewStoragePolicy(time.Second, xtime.Second, time.Hour), - }, - }, - }, - }, - }, - }, - }, - { - Gauge: unaggregated.Gauge{ - ID: []byte("foo"), - Value: 1.8, - }, - StagedMetadatas: metadata.DefaultStagedMetadatas, - }, - } - require.Equal(t, expected, actual) -} - -func TestReporterFlush(t *testing.T) { - leakCheck := leaktest.Check(t) - defer leakCheck() - - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - var numFlushes int - mockMatcher := matcher.NewMockMatcher(ctrl) - mockMatcher.EXPECT().Close().Return(nil).AnyTimes() - mockClient := client.NewMockClient(ctrl) - mockClient.EXPECT(). - Flush(). - DoAndReturn(func() error { - numFlushes++ - return nil - }) - mockClient.EXPECT().Close().Return(nil).AnyTimes() - reporter := NewReporter(mockMatcher, mockClient, NewReporterOptions()) - defer reporter.Close() - - require.NoError(t, reporter.Flush()) - require.Equal(t, 1, numFlushes) -} - -func TestReporterClose(t *testing.T) { - leakCheck := leaktest.Check(t) - defer leakCheck() - - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - errClientClose := errors.New("test client close error") - mockMatcher := matcher.NewMockMatcher(ctrl) - mockMatcher.EXPECT().Close().Return(nil) - mockClient := client.NewMockClient(ctrl) - mockClient.EXPECT().Close().Return(errClientClose) - reporter := NewReporter(mockMatcher, mockClient, NewReporterOptions()) - err := reporter.Close() - require.Error(t, err) - require.True(t, strings.Contains(err.Error(), errClientClose.Error())) -} - -func TestReporterMultipleCloses(t *testing.T) { - defer leaktest.Check(t)() - - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - numWorkers := 10 - mockMatcher := matcher.NewMockMatcher(ctrl) - mockMatcher.EXPECT().Close().Return(nil) - mockClient := client.NewMockClient(ctrl) - mockClient.EXPECT().Close().Return(nil) - r := NewReporter(mockMatcher, mockClient, NewReporterOptions()) - - var wg sync.WaitGroup - for i := 0; i < numWorkers; i++ { - wg.Add(1) - go func() { - defer wg.Done() - r.Close() - }() - } - wg.Wait() - require.Equal(t, int32(1), r.(*reporter).closed) -} - -func TestReporterReportPending(t *testing.T) { - leakCheck := leaktest.Check(t) - defer leakCheck() - - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockMatcher := matcher.NewMockMatcher(ctrl) - mockMatcher.EXPECT().Close().Return(nil) - mockClient := client.NewMockClient(ctrl) - mockClient.EXPECT().Close().Return(nil) - - reportInterval := 50 * time.Millisecond - scope := tally.NewTestScope("", map[string]string{"component": "reporter"}) - instrumentOpts := instrument.NewOptions(). - SetMetricsScope(scope). - SetReportInterval(reportInterval) - opts := NewReporterOptions().SetInstrumentOptions(instrumentOpts) - r := NewReporter(mockMatcher, mockClient, opts).(*reporter) - defer r.Close() - require.Equal(t, int64(0), r.currentReportPending()) - - hostname, err := os.Hostname() - require.NoError(t, err) - expectedID := fmt.Sprintf("report-pending+component=reporter,host=%s", hostname) - - // Increment report pending and wait for the metric to be reported. - iter := 10 - for i := 0; i < iter; i++ { - r.incrementReportPending() - } - require.Equal(t, int64(iter), r.currentReportPending()) - time.Sleep(2 * reportInterval) - gauges := scope.Snapshot().Gauges() - require.Equal(t, 1, len(gauges)) - res, exists := gauges[expectedID] - require.True(t, exists) - require.Equal(t, float64(iter), res.Value()) - - // Decrement report pending and wait for the metric to be reported. - for i := 0; i < iter; i++ { - r.decrementReportPending() - } - require.Equal(t, int64(0), r.currentReportPending()) - time.Sleep(2 * reportInterval) - gauges = scope.Snapshot().Gauges() - require.Equal(t, 1, len(gauges)) - res, exists = gauges[expectedID] - require.True(t, exists) - require.Equal(t, 0.0, res.Value()) -} - -func TestReporterReportMetricsWithKeepOriginal(t *testing.T) { - cases := map[string]struct { - id string - value int - mtype metric.Type - result rules.MatchResult - expect map[string]int - }{ - "counter-keep-original": { - id: "foo", - value: 1234, - mtype: metric.CounterType, - result: testMatchKeepOriginalResult, - expect: map[string]int{ - "foo": 1234, - "bar": 1234, - }, - }, - "counter-no-keep-original": { - id: "foo", - value: 1234, - mtype: metric.CounterType, - result: testMatchNoKeepOriginalResult, - expect: map[string]int{ - "bar": 1234, - }, - }, - "gauge-keep-original": { - id: "foo", - value: 1234, - mtype: metric.GaugeType, - result: testMatchKeepOriginalResult, - expect: map[string]int{ - "foo": 1234, - "bar": 1234, - }, - }, - "gauge-no-keep-original": { - id: "foo", - value: 1234, - mtype: metric.GaugeType, - result: testMatchNoKeepOriginalResult, - expect: map[string]int{ - "bar": 1234, - }, - }, - "timer-keep-original": { - id: "foo", - value: 1234, - mtype: metric.TimerType, - result: testMatchKeepOriginalResult, - expect: map[string]int{ - "foo": 1234, - "bar": 1234, - }, - }, - "timer-no-keep-original": { - id: "foo", - value: 1234, - mtype: metric.TimerType, - result: testMatchNoKeepOriginalResult, - expect: map[string]int{ - "bar": 1234, - }, - }, - } - - for name, tt := range cases { - t.Run(name, func(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockID := id.NewMockID(ctrl) - mockID.EXPECT().Bytes().Return([]byte(tt.id)).MinTimes(1) - - mockMatcher := matcher.NewMockMatcher(ctrl) - mockMatcher.EXPECT().Close().Return(nil).AnyTimes() - mockMatcher.EXPECT(). - ForwardMatch(mockID, testFromNanos, testToNanos). - Return(tt.result) - - actual := make(map[string]int) - mockClient := client.NewMockClient(ctrl) - mockClient.EXPECT().Close().Return(nil).AnyTimes() - - switch tt.mtype { - case metric.CounterType: - mockClient.EXPECT(). - WriteUntimedCounter(gomock.Any(), gomock.Any()). - DoAndReturn(func( - metric unaggregated.Counter, - metadatas metadata.StagedMetadatas, - ) error { - actual[string(metric.ID)] = int(metric.Value) - return nil - }).Times(len(tt.expect)) - case metric.GaugeType: - mockClient.EXPECT(). - WriteUntimedGauge(gomock.Any(), gomock.Any()). - DoAndReturn(func( - metric unaggregated.Gauge, - metadatas metadata.StagedMetadatas, - ) error { - actual[string(metric.ID)] = int(metric.Value) - return nil - }).Times(len(tt.expect)) - case metric.TimerType: - mockClient.EXPECT(). - WriteUntimedBatchTimer(gomock.Any(), gomock.Any()). - DoAndReturn(func( - metric unaggregated.BatchTimer, - metadatas metadata.StagedMetadatas, - ) error { - require.Equal(t, 1, len(metric.Values)) - actual[string(metric.ID)] = int(metric.Values[0]) - return nil - }).Times(len(tt.expect)) - default: - require.Fail(t, "unexpected metric type") - } - - reporter := NewReporter(mockMatcher, mockClient, testReporterOptions) - defer reporter.Close() - - var err error - switch tt.mtype { - case metric.CounterType: - err = reporter.ReportCounter(mockID, 1234) - case metric.GaugeType: - err = reporter.ReportGauge(mockID, 1234.0) - case metric.TimerType: - err = reporter.ReportBatchTimer(mockID, []float64{1234.0}) - default: - require.Fail(t, "unexpected metric type") - } - require.NoError(t, err) - - require.Equal(t, tt.expect, actual) - require.Equal(t, len(tt.expect), len(actual)) - }) - } -} - -func TestReporterReportCounterWithDropPolicyApplied(t *testing.T) { - leakCheck := leaktest.Check(t) - defer leakCheck() - - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - var actual []unaggregated.CounterWithMetadatas - mockID := id.NewMockID(ctrl) - mockID.EXPECT().Bytes().Return([]byte("testCounter")) - mockMatcher := matcher.NewMockMatcher(ctrl) - mockMatcher.EXPECT(). - ForwardMatch(mockID, testFromNanos, testToNanos). - Return(testMatchDropPolicyAppliedResult) - mockMatcher.EXPECT().Close().Return(nil).AnyTimes() - mockClient := client.NewMockClient(ctrl) - mockClient.EXPECT(). - WriteUntimedCounter(gomock.Any(), gomock.Any()). - DoAndReturn(func(counter unaggregated.Counter, metadatas metadata.StagedMetadatas) error { - actual = append(actual, unaggregated.CounterWithMetadatas{ - Counter: counter, - StagedMetadatas: metadatas, - }) - return nil - }).MinTimes(1) - mockClient.EXPECT().Close().Return(nil).AnyTimes() - reporter := NewReporter(mockMatcher, mockClient, testReporterOptions) - defer reporter.Close() - err := reporter.ReportCounter(mockID, 1234) - require.NoError(t, err) - - // Ensure just the single non-tombstoned rollup ID is emitted and not the raw ID - require.Equal(t, 1, len(actual)) - - metric := actual[0] - assert.Equal(t, "foo", string(metric.ID)) - assert.Equal(t, 1234, int(metric.Value)) - assert.True(t, metric.StagedMetadatas.IsDefault()) -} - -func TestReporterReportGaugeWithDropPolicyApplied(t *testing.T) { - leakCheck := leaktest.Check(t) - defer leakCheck() - - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - var actual []unaggregated.GaugeWithMetadatas - mockID := id.NewMockID(ctrl) - mockID.EXPECT().Bytes().Return([]byte("testGauge")) - mockMatcher := matcher.NewMockMatcher(ctrl) - mockMatcher.EXPECT(). - ForwardMatch(mockID, testFromNanos, testToNanos). - Return(testMatchDropPolicyAppliedResult) - mockMatcher.EXPECT().Close().Return(nil).AnyTimes() - mockClient := client.NewMockClient(ctrl) - mockClient.EXPECT(). - WriteUntimedGauge(gomock.Any(), gomock.Any()). - DoAndReturn(func(gauge unaggregated.Gauge, metadatas metadata.StagedMetadatas) error { - actual = append(actual, unaggregated.GaugeWithMetadatas{ - Gauge: gauge, - StagedMetadatas: metadatas, - }) - return nil - }).MinTimes(1) - mockClient.EXPECT().Close().Return(nil).AnyTimes() - reporter := NewReporter(mockMatcher, mockClient, testReporterOptions) - defer reporter.Close() - err := reporter.ReportGauge(mockID, 1234.5678) - require.NoError(t, err) - - // Ensure just the single non-tombstoned rollup ID is emitted and not the raw ID - require.Equal(t, 1, len(actual)) - - metric := actual[0] - assert.Equal(t, "foo", string(metric.ID)) - assert.Equal(t, 1234.5678, metric.Value) - assert.True(t, metric.StagedMetadatas.IsDefault()) -} - -func TestReporterReportBatchTimerWithDropPolicyApplied(t *testing.T) { - leakCheck := leaktest.Check(t) - defer leakCheck() - - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - var actual []unaggregated.BatchTimerWithMetadatas - mockID := id.NewMockID(ctrl) - mockID.EXPECT().Bytes().Return([]byte("testTimer")) - mockMatcher := matcher.NewMockMatcher(ctrl) - mockMatcher.EXPECT(). - ForwardMatch(mockID, testFromNanos, testToNanos). - Return(testMatchDropPolicyAppliedResult) - mockMatcher.EXPECT().Close().Return(nil).AnyTimes() - mockClient := client.NewMockClient(ctrl) - mockClient.EXPECT(). - WriteUntimedBatchTimer(gomock.Any(), gomock.Any()). - DoAndReturn(func(batchTimer unaggregated.BatchTimer, metadatas metadata.StagedMetadatas) error { - actual = append(actual, unaggregated.BatchTimerWithMetadatas{ - BatchTimer: batchTimer, - StagedMetadatas: metadatas, - }) - return nil - }).MinTimes(1) - mockClient.EXPECT().Close().Return(nil).AnyTimes() - reporter := NewReporter(mockMatcher, mockClient, testReporterOptions) - defer reporter.Close() - err := reporter.ReportBatchTimer(mockID, []float64{12.34, 56.78}) - require.NoError(t, err) - - // Ensure just the single non-tombstoned rollup ID is emitted and not the raw ID - require.Equal(t, 1, len(actual)) - - metric := actual[0] - assert.Equal(t, "foo", string(metric.ID)) - assert.Equal(t, []float64{12.34, 56.78}, metric.Values) - assert.True(t, metric.StagedMetadatas.IsDefault()) -} - -func TestReporterReportCounterWithDropPolicyNotEffective(t *testing.T) { - leakCheck := leaktest.Check(t) - defer leakCheck() - - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - var actual []unaggregated.CounterWithMetadatas - mockID := id.NewMockID(ctrl) - mockID.EXPECT().Bytes().Return([]byte("testCounter")) - mockMatcher := matcher.NewMockMatcher(ctrl) - mockMatcher.EXPECT(). - ForwardMatch(mockID, testFromNanos, testToNanos). - Return(testMatchDropPolicyNotYetEffectiveResult) - mockMatcher.EXPECT().Close().Return(nil).AnyTimes() - mockClient := client.NewMockClient(ctrl) - mockClient.EXPECT(). - WriteUntimedCounter(gomock.Any(), gomock.Any()). - DoAndReturn(func(counter unaggregated.Counter, metadatas metadata.StagedMetadatas) error { - actual = append(actual, unaggregated.CounterWithMetadatas{ - Counter: counter, - StagedMetadatas: metadatas, - }) - return nil - }).MinTimes(1) - mockClient.EXPECT().Close().Return(nil).AnyTimes() - reporter := NewReporter(mockMatcher, mockClient, testReporterOptions) - defer reporter.Close() - err := reporter.ReportCounter(mockID, 1234) - require.NoError(t, err) - - // Ensure just the default and staged policies are sent, stripping the staged - // metadatas with the drop policy - expected := []unaggregated.CounterWithMetadatas{ - { - Counter: unaggregated.Counter{ - ID: []byte("testCounter"), - Value: 1234, - }, - StagedMetadatas: metadata.StagedMetadatas{ - { - CutoverNanos: 0, - Tombstoned: false, - Metadata: metadata.Metadata{ - Pipelines: []metadata.PipelineMetadata{ - { - AggregationID: aggregation.DefaultID, - StoragePolicies: policy.StoragePolicies{ - policy.NewStoragePolicy(20*time.Second, xtime.Second, 6*time.Hour), - policy.NewStoragePolicy(10*time.Minute, xtime.Minute, 25*24*time.Hour), - }, - }, - { - AggregationID: aggregation.MustCompressTypes(aggregation.Max), - StoragePolicies: policy.StoragePolicies{ - policy.NewStoragePolicy(time.Minute, xtime.Minute, 2*24*time.Hour), - }, - }, - }, - }, - }, - { - CutoverNanos: math.MaxInt64, - Tombstoned: false, - Metadata: metadata.Metadata{ - Pipelines: []metadata.PipelineMetadata{ - { - AggregationID: aggregation.MustCompressTypes(aggregation.Max, aggregation.P9999), - StoragePolicies: policy.StoragePolicies{ - policy.NewStoragePolicy(time.Second, xtime.Second, time.Hour), - }, - }, - }, - }, - }, - }, - }, - { - Counter: unaggregated.Counter{ - ID: []byte("foo"), - Value: 1234, - }, - StagedMetadatas: metadata.DefaultStagedMetadatas, - }, - } - require.Equal(t, expected, actual) -} diff --git a/src/collector/reporter/reporter.go b/src/collector/reporter/reporter.go deleted file mode 100644 index 916478e38f..0000000000 --- a/src/collector/reporter/reporter.go +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package reporter - -import ( - "github.com/m3db/m3/src/metrics/metric/id" -) - -// Reporter reports aggregated metrics. -type Reporter interface { - // ReportCounter reports a counter metric. - ReportCounter(id id.ID, value int64) error - - // ReportBatchTimer reports a batched timer metric. - ReportBatchTimer(id id.ID, value []float64) error - - // ReportGauge reports a gauge metric. - ReportGauge(id id.ID, value float64) error - - // Flush flushes any buffered metrics. - Flush() error - - // Close closes the reporter. - Close() error -} diff --git a/src/collector/reporter/reporter_mock.go b/src/collector/reporter/reporter_mock.go deleted file mode 100644 index 4a98810787..0000000000 --- a/src/collector/reporter/reporter_mock.go +++ /dev/null @@ -1,126 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: github.com/m3db/m3/src/collector/reporter (interfaces: Reporter) - -// Copyright (c) 2021 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -// Package reporter is a generated GoMock package. -package reporter - -import ( - "reflect" - - "github.com/m3db/m3/src/metrics/metric/id" - - "github.com/golang/mock/gomock" -) - -// MockReporter is a mock of Reporter interface. -type MockReporter struct { - ctrl *gomock.Controller - recorder *MockReporterMockRecorder -} - -// MockReporterMockRecorder is the mock recorder for MockReporter. -type MockReporterMockRecorder struct { - mock *MockReporter -} - -// NewMockReporter creates a new mock instance. -func NewMockReporter(ctrl *gomock.Controller) *MockReporter { - mock := &MockReporter{ctrl: ctrl} - mock.recorder = &MockReporterMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockReporter) EXPECT() *MockReporterMockRecorder { - return m.recorder -} - -// Close mocks base method. -func (m *MockReporter) Close() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Close") - ret0, _ := ret[0].(error) - return ret0 -} - -// Close indicates an expected call of Close. -func (mr *MockReporterMockRecorder) Close() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockReporter)(nil).Close)) -} - -// Flush mocks base method. -func (m *MockReporter) Flush() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Flush") - ret0, _ := ret[0].(error) - return ret0 -} - -// Flush indicates an expected call of Flush. -func (mr *MockReporterMockRecorder) Flush() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Flush", reflect.TypeOf((*MockReporter)(nil).Flush)) -} - -// ReportBatchTimer mocks base method. -func (m *MockReporter) ReportBatchTimer(arg0 id.ID, arg1 []float64) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ReportBatchTimer", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// ReportBatchTimer indicates an expected call of ReportBatchTimer. -func (mr *MockReporterMockRecorder) ReportBatchTimer(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportBatchTimer", reflect.TypeOf((*MockReporter)(nil).ReportBatchTimer), arg0, arg1) -} - -// ReportCounter mocks base method. -func (m *MockReporter) ReportCounter(arg0 id.ID, arg1 int64) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ReportCounter", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// ReportCounter indicates an expected call of ReportCounter. -func (mr *MockReporterMockRecorder) ReportCounter(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportCounter", reflect.TypeOf((*MockReporter)(nil).ReportCounter), arg0, arg1) -} - -// ReportGauge mocks base method. -func (m *MockReporter) ReportGauge(arg0 id.ID, arg1 float64) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ReportGauge", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// ReportGauge indicates an expected call of ReportGauge. -func (mr *MockReporterMockRecorder) ReportGauge(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportGauge", reflect.TypeOf((*MockReporter)(nil).ReportGauge), arg0, arg1) -} diff --git a/src/collector/server/server.go b/src/collector/server/server.go deleted file mode 100644 index c8b1d53189..0000000000 --- a/src/collector/server/server.go +++ /dev/null @@ -1,216 +0,0 @@ -// Copyright (c) 2018 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package server - -import ( - "context" - "fmt" - "net/http" - "os" - "os/signal" - "syscall" - - "github.com/m3db/m3/src/aggregator/server" - clusterclient "github.com/m3db/m3/src/cluster/client" - "github.com/m3db/m3/src/cmd/services/m3aggregator/serve" - "github.com/m3db/m3/src/cmd/services/m3collector/config" - "github.com/m3db/m3/src/collector/api/v1/httpd" - "github.com/m3db/m3/src/collector/reporter" - "github.com/m3db/m3/src/collector/reporter/m3aggregator" - xconfig "github.com/m3db/m3/src/x/config" - "github.com/m3db/m3/src/x/instrument" - xio "github.com/m3db/m3/src/x/io" - "github.com/m3db/m3/src/x/pool" - "github.com/m3db/m3/src/x/serialize" - - "go.uber.org/zap" -) - -// RunOptions provides options for running the server -// with backwards compatibility if only solely adding fields. -type RunOptions struct { - // Config will be used to configure the application. - Config config.Configuration - - // InterruptCh is a programmatic interrupt channel to supply to - // interrupt and shutdown the server. - InterruptCh <-chan error - - // AggregatorServerOptions are server options for aggregator. - AggregatorServerOptions []server.AdminOption -} - -// Run runs the server programmatically given a filename for the configuration file. -func Run(runOpts RunOptions) { - cfg := runOpts.Config - - ctx := context.Background() - logger, err := cfg.Logging.Build() - if err != nil { - // NB(r): Use fmt.Fprintf(os.Stderr, ...) to avoid etcd.SetGlobals() - // sending stdlib "log" to black hole. Don't remove unless with good reason. - fmt.Fprintf(os.Stderr, "unable to create logger: %v", err) - os.Exit(1) - } - defer logger.Sync() - - xconfig.WarnOnDeprecation(cfg, logger) - - logger.Info("creating metrics scope") - scope, closer, err := cfg.Metrics.NewRootScope() - if err != nil { - logger.Fatal("could not connect to metrics", zap.Error(err)) - } - defer closer.Close() - - instrumentOpts := instrument.NewOptions(). - SetMetricsScope(scope). - SetLogger(logger) - - logger.Info("creating etcd client") - clusterClient, err := cfg.Etcd.NewClient(instrumentOpts) - if err != nil { - logger.Fatal("could not create etcd client", zap.Error(err)) - } - - serveOptions := serve.NewOptions(instrumentOpts) - for i, transform := range runOpts.AggregatorServerOptions { - if opts, err := transform(serveOptions); err != nil { - logger.Fatal("could not apply transform", - zap.Int("index", i), zap.Error(err)) - } else { - serveOptions = opts - } - } - - rwOpts := serveOptions.RWOptions() - logger.Info("creating reporter") - reporter, err := newReporter(cfg.Reporter, clusterClient, instrumentOpts, rwOpts) - if err != nil { - logger.Fatal("could not create reporter", zap.Error(err)) - } - - tagEncoderOptions := serialize.NewTagEncoderOptions() - tagDecoderOptions := serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}) - tagEncoderPoolOptions := pool.NewObjectPoolOptions(). - SetInstrumentOptions(instrumentOpts. - SetMetricsScope(instrumentOpts.MetricsScope(). - SubScope("tag-encoder-pool"))) - tagDecoderPoolOptions := pool.NewObjectPoolOptions(). - SetInstrumentOptions(instrumentOpts. - SetMetricsScope(instrumentOpts.MetricsScope(). - SubScope("tag-decoder-pool"))) - tagEncoderPool := serialize.NewTagEncoderPool(tagEncoderOptions, - tagEncoderPoolOptions) - tagEncoderPool.Init() - tagDecoderPool := serialize.NewTagDecoderPool(tagDecoderOptions, - tagDecoderPoolOptions) - tagDecoderPool.Init() - - logger.Info("creating http handlers and registering routes") - handler, err := httpd.NewHandler(reporter, tagEncoderPool, - tagDecoderPool, instrumentOpts) - if err != nil { - logger.Fatal("unable to set up handlers", zap.Error(err)) - } - - if err := handler.RegisterRoutes(); err != nil { - logger.Fatal("unable to register routes", zap.Error(err)) - } - - srv := &http.Server{Addr: cfg.ListenAddress, Handler: handler.Router()} - defer func() { - logger.Info("closing server") - if err := srv.Shutdown(ctx); err != nil { - logger.Error("error closing server", zap.Error(err)) - } - }() - - go func() { - logger.Info("starting server", zap.String("address", cfg.ListenAddress)) - if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { - logger.Fatal("server error while listening", - zap.String("address", cfg.ListenAddress), zap.Error(err)) - } - }() - - var interruptCh <-chan error = make(chan error) - if runOpts.InterruptCh != nil { - interruptCh = runOpts.InterruptCh - } - - var interruptErr error - if runOpts.InterruptCh != nil { - interruptErr = <-interruptCh - } else { - // Only use this if running standalone, as otherwise it will - // obfuscate signal channel for the db - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) - select { - case sig := <-sigChan: - interruptErr = fmt.Errorf("%v", sig) - case interruptErr = <-interruptCh: - } - } - - logger.Info("interrupt", zap.String("cause", interruptErr.Error())) -} - -func newReporter( - cfg config.ReporterConfiguration, - clusterClient clusterclient.Client, - instrumentOpts instrument.Options, - rwOpts xio.Options, -) (reporter.Reporter, error) { - scope := instrumentOpts.MetricsScope() - logger := instrumentOpts.Logger() - clockOpts := cfg.Clock.NewOptions() - - logger.Info("creating metrics matcher cache") - cache := cfg.Cache.NewCache(clockOpts, - instrumentOpts.SetMetricsScope(scope.SubScope("cache"))) - - logger.Info("creating metrics matcher") - matcher, err := cfg.Matcher.NewMatcher(cache, clusterClient, clockOpts, - instrumentOpts.SetMetricsScope(scope.SubScope("matcher"))) - if err != nil { - return nil, fmt.Errorf("unable to create matcher: %v", err) - } - - logger.Info("creating aggregator client") - aggClient, err := cfg.Client.NewClient(clusterClient, clockOpts, - instrumentOpts.SetMetricsScope(scope.SubScope("backend")), rwOpts) - if err != nil { - return nil, fmt.Errorf("unable to create agg tier client: %v", err) - } - - logger.Info("connecting to aggregator cluster") - if err := aggClient.Init(); err != nil { - return nil, fmt.Errorf("unable to initialize agg tier client: %v", err) - } - - logger.Info("creating aggregator reporter") - reporterOpts := m3aggregator.NewReporterOptions(). - SetClockOptions(clockOpts). - SetInstrumentOptions(instrumentOpts) - return m3aggregator.NewReporter(matcher, aggClient, reporterOpts), nil -}