From b5c4d76f23b9731dec8041792330785712aa285e Mon Sep 17 00:00:00 2001 From: Saravjeet 'Aman' Singh Date: Thu, 18 Jul 2024 21:56:16 +0530 Subject: [PATCH] ci: Add linting and code coverage (#89) * ci: add config for golangci-lint * chore: lint the codebase * ci: add lint job * ci: integrate coveralls * ci: fix missing checkout step in test job * docs: add coverage badge --- .github/workflows/build.yaml | 35 ----------- .../integration-test-setup/action.yaml | 13 ---- .github/workflows/integration-test.yaml | 42 ------------- .github/workflows/lint.yaml | 20 ++++++ .github/workflows/test.yaml | 62 +++++++++++++++++++ .golangci.yml | 30 +++++++++ README.md | 1 + app/server.go | 3 +- config/load_test.go | 1 - config/publisher.go | 2 - config/server.go | 1 - config/util/key.go | 1 - deserialization/proto.go | 1 - integration/integration_test.go | 32 +++++----- metrics/metric_test.go | 1 - metrics/metrics.go | 1 - metrics/prometheus.go | 3 +- metrics/prometheus_test.go | 5 +- metrics/statsd.go | 3 +- middleware/cors.go | 1 - publisher/kafka/kafka_test.go | 3 +- publisher/kinesis/kinesis.go | 5 +- publisher/kinesis/kinesis_test.go | 3 - publisher/pubsub/pubsub.go | 4 -- publisher/pubsub/pubsub_test.go | 4 -- services/grpc/handler.go | 1 - services/rest/handler.go | 5 +- services/rest/response_test.go | 1 - services/rest/service.go | 4 +- .../websocket/connection/upgrader_test.go | 5 +- services/rest/websocket/handler_test.go | 6 +- worker/worker.go | 2 - 32 files changed, 147 insertions(+), 154 deletions(-) delete mode 100644 .github/workflows/build.yaml delete mode 100644 .github/workflows/integration-test-setup/action.yaml delete mode 100644 .github/workflows/integration-test.yaml create mode 100644 .github/workflows/lint.yaml create mode 100644 .github/workflows/test.yaml create mode 100644 .golangci.yml diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml deleted file mode 100644 index 4f9db6f0..00000000 --- a/.github/workflows/build.yaml +++ /dev/null @@ -1,35 +0,0 @@ -name: Test - -on: - - push - - pull_request - -jobs: - test: - runs-on: ubuntu-latest - steps: - - name: Setup Go - uses: actions/setup-go@v3 - with: - go-version: "1.22.4" - - name: Checkout repo - uses: actions/checkout@v2 - with: - fetch-depth: 0 - - name: Invoking go test - run: make test - - name: Invoking go bench test - run: make test-bench - - build: - runs-on: ubuntu-latest - steps: - - name: Setup Go - uses: actions/setup-go@v3 - with: - go-version: "1.22.4" - - name: Install Protoc - uses: arduino/setup-protoc@v1 - - uses: actions/checkout@v2 - - name: Build - run: make build diff --git a/.github/workflows/integration-test-setup/action.yaml b/.github/workflows/integration-test-setup/action.yaml deleted file mode 100644 index e0a071c8..00000000 --- a/.github/workflows/integration-test-setup/action.yaml +++ /dev/null @@ -1,13 +0,0 @@ -name: Setup environment for Integration tests - -runs: - using: "composite" - steps: - - name: Setup Docker - uses: docker-practice/actions-setup-docker@master - - name: Install Protoc - uses: arduino/setup-protoc@v1 - - name: Setup Go - uses: actions/setup-go@v3 - with: - go-version: "1.22.4" \ No newline at end of file diff --git a/.github/workflows/integration-test.yaml b/.github/workflows/integration-test.yaml deleted file mode 100644 index dd930198..00000000 --- a/.github/workflows/integration-test.yaml +++ /dev/null @@ -1,42 +0,0 @@ -name: Raccoon Integration Test - -on: - - push - - pull_request - -jobs: - kafka-test: - runs-on: ubuntu-latest - steps: - - name: Checkout repo - uses: actions/checkout@v2 - - name: setup integration environment - uses: ./.github/workflows/integration-test-setup - - name: Copy integration config - run: cp .env.test .env - - name: Run Raccoon - run: make docker-run - - name: Invoking go test - run: INTEGTEST_BOOTSTRAP_SERVER=localhost:9094 INTEGTEST_HOST=localhost:8080 INTEGTEST_TOPIC_FORMAT="clickstream-%s-log" GRPC_SERVER_ADDR="localhost:8081" go test ./integration -v - pubsub-test: - runs-on: ubuntu-latest - steps: - - name: Checkout repo - uses: actions/checkout@v2 - - name: setup integration environment - uses: ./.github/workflows/integration-test-setup - - name: start pubsub emulator - run: docker compose up pubsub-emulator -d - - name: run integration tests - run: PUBSUB_EMULATOR_HOST=localhost:8085 go test ./publisher/pubsub -v - kinesis-test: - runs-on: ubuntu-latest - steps: - - name: Checkout repo - uses: actions/checkout@v2 - - name: setup integration environment - uses: ./.github/workflows/integration-test-setup - - name: start localstack (kinesis emulator) - run: docker compose up localstack -d - - name: run integration tests - run: LOCALSTACK_HOST=http://localhost:4566 go test ./publisher/kinesis/ -v \ No newline at end of file diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml new file mode 100644 index 00000000..2af6fea4 --- /dev/null +++ b/.github/workflows/lint.yaml @@ -0,0 +1,20 @@ +name: Lint + +on: + - push + - pull_request + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - name: checkout repo + uses: actions/checkout@v2 + - name: setup go + uses: actions/setup-go@v3 + with: + go-version: "1.22.4" + - name: golangci-lint + uses: golangci/golangci-lint-action@v3 + with: + version: v1.59.1 diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml new file mode 100644 index 00000000..2418c888 --- /dev/null +++ b/.github/workflows/test.yaml @@ -0,0 +1,62 @@ +name: Test + +on: + - push + - pull_request + +jobs: + test: + runs-on: ubuntu-latest + steps: + - name: Setup Docker + uses: docker-practice/actions-setup-docker@master + - name: Install Protoc + uses: arduino/setup-protoc@v1 + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: "1.22.4" + - name: Checkout code + uses: actions/checkout@v2 + - name: Initialise test config + run: cp .env.test .env + - name: Start raccoon + run: make docker-run + - name: Run tests + run: go test ./... -v -coverprofile=coverage.out + env: + INTEGTEST_BOOTSTRAP_SERVER: 'localhost:9094' + INTEGTEST_HOST: 'localhost:8080' + INTEGTEST_TOPIC_FORMAT: 'clickstream-%s-log' + GRPC_SERVER_ADDR: 'localhost:8081' + PUBSUB_EMULATOR_HOST: 'localhost:8085' + LOCALSTACK_HOST: 'http://localhost:4566' + - name: Upload coverage data + uses: shogo82148/actions-goveralls@v1 + with: + path-to-profile: coverage.out + smoke-test: + runs-on: ubuntu-latest + steps: + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: "1.22.4" + - name: Install Protoc + uses: arduino/setup-protoc@v1 + - uses: actions/checkout@v2 + - name: Build + run: make build + benchmark: + runs-on: ubuntu-latest + steps: + - name: Setup Go + uses: actions/setup-go@v3 + with: + go-version: "1.22.4" + - name: Checkout repo + uses: actions/checkout@v2 + with: + fetch-depth: 0 + - name: Invoking go bench test + run: make test-bench \ No newline at end of file diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 00000000..143b5476 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,30 @@ +run: + timeout: 5m + skip-dirs: +output: + formats: + - format: 'colored-line-number' +linters: + enable-all: false + disable-all: true + enable: + - govet + - goimports + - thelper + - tparallel + - unconvert + - wastedassign + - revive + - staticcheck + - unused + - gofmt + - whitespace + - misspell +linters-settings: + revive: + ignore-generated-header: true + severity: warning +issues: + fix: true +severity: + default-severity: error diff --git a/README.md b/README.md index 95f24c00..8d0e5d54 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,7 @@ ![build workflow](https://github.com/raystack/raccoon/actions/workflows/build.yaml/badge.svg) ![package workflow](https://github.com/raystack/raccoon/actions/workflows/release.yaml/badge.svg) +[![Coverage Status](https://coveralls.io/repos/github/raystack/raccoon/badge.svg?branch=main)](https://coveralls.io/github/raystack/raccoon?branch=main) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg?logo=apache)](LICENSE) [![Version](https://img.shields.io/github/v/release/raystack/raccoon?logo=semantic-release)](Version) diff --git a/app/server.go b/app/server.go index eb6eaf58..578137bf 100644 --- a/app/server.go +++ b/app/server.go @@ -10,7 +10,6 @@ import ( "syscall" "time" - "github.com/raystack/raccoon/collector" "github.com/raystack/raccoon/config" "github.com/raystack/raccoon/logger" @@ -57,7 +56,7 @@ func StartServer(ctx context.Context, cancel context.CancelFunc) { } func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices services.Services, bufferChannel chan collector.CollectRequest, workerPool *worker.Pool, pub Publisher) { - signalChan := make(chan os.Signal) + signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) for { sig := <-signalChan diff --git a/config/load_test.go b/config/load_test.go index 20480c3a..9a676c93 100644 --- a/config/load_test.go +++ b/config/load_test.go @@ -43,7 +43,6 @@ func TestServerWsConfig(t *testing.T) { assert.Equal(t, "8080", ServerWs.AppPort) assert.Equal(t, time.Duration(1)*time.Millisecond, ServerWs.PingInterval) assert.Equal(t, time.Duration(1)*time.Millisecond, ServerWs.PongWaitInterval) - } func TestGRPCServerConfig(t *testing.T) { diff --git a/config/publisher.go b/config/publisher.go index da80e510..4b38d29a 100644 --- a/config/publisher.go +++ b/config/publisher.go @@ -152,11 +152,9 @@ func publisherKinesisLoader() { DefaultShards: uint32(util.MustGetInt(envStreamDefaultShards)), PublishTimeout: util.MustGetDuration(envPublishTimeout, time.Millisecond), } - } func publisherConfigLoader() { - viper.SetDefault("PUBLISHER_TYPE", "kafka") Publisher = util.MustGetString("PUBLISHER_TYPE") diff --git a/config/server.go b/config/server.go index 784f004e..43307816 100644 --- a/config/server.go +++ b/config/server.go @@ -118,7 +118,6 @@ func serverWsConfigLoader() { } func serverGRPCConfigLoader() { - viper.SetDefault("SERVER_GRPC_PORT", "8081") ServerGRPC = serverGRPC{ Port: util.MustGetString("SERVER_GRPC_PORT"), diff --git a/config/util/key.go b/config/util/key.go index 87479783..1e7d0b53 100644 --- a/config/util/key.go +++ b/config/util/key.go @@ -46,5 +46,4 @@ func Contains(key string, slice []string) bool { } return false - } diff --git a/deserialization/proto.go b/deserialization/proto.go index 872afab3..3e759134 100644 --- a/deserialization/proto.go +++ b/deserialization/proto.go @@ -16,5 +16,4 @@ func DeserializeProto(b []byte, i interface{}) error { return ErrInvalidProtoMessage } return proto.Unmarshal(b, msg) - } diff --git a/integration/integration_test.go b/integration/integration_test.go index 4aa8c9f4..be7b4979 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "math/rand" "net/http" "os" @@ -18,6 +17,7 @@ import ( pb "github.com/raystack/raccoon/proto" "github.com/stretchr/testify/assert" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" @@ -48,7 +48,6 @@ func TestIntegration(t *testing.T) { "X-User-ID": []string{"1234"}, } t.Run("Should response with BadRequest when sending invalid request", func(t *testing.T) { - wss, _, err := websocket.DefaultDialer.Dial(wsurl, header) if err != nil { @@ -71,7 +70,10 @@ func TestIntegration(t *testing.T) { }) t.Run("Should response with BadRequest when sending invalid GRPC request", func(t *testing.T) { - opts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()} + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + } conn, err := grpc.Dial(grpcServerAddr, opts...) if err != nil { @@ -84,7 +86,6 @@ func TestIntegration(t *testing.T) { assert.NotEmpty(t, err) assert.Empty(t, r) - }) t.Run("Should response with BadRequest when sending invalid json request", func(t *testing.T) { @@ -123,7 +124,7 @@ func TestIntegration(t *testing.T) { assert.Fail(t, fmt.Sprintf("failed to connect to http server. %v", err)) os.Exit(1) } - defer io.Copy(ioutil.Discard, res.Body) + defer io.Copy(io.Discard, res.Body) defer res.Body.Close() r := &pb.SendEventResponse{} err = json.NewDecoder(res.Body).Decode(r) @@ -148,7 +149,7 @@ func TestIntegration(t *testing.T) { assert.Fail(t, fmt.Sprintf("failed to connect to http server. %v", err)) os.Exit(1) } - defer io.Copy(ioutil.Discard, res.Body) + defer io.Copy(io.Discard, res.Body) defer res.Body.Close() r := &pb.SendEventResponse{} err = json.NewDecoder(res.Body).Decode(r) @@ -192,7 +193,7 @@ func TestIntegration(t *testing.T) { assert.Fail(t, fmt.Sprintf("failed to connect to http server. %v", err)) os.Exit(1) } - defer io.Copy(ioutil.Discard, res.Body) + defer io.Copy(io.Discard, res.Body) defer res.Body.Close() r := &pb.SendEventResponse{} err = json.NewDecoder(res.Body).Decode(r) @@ -201,7 +202,6 @@ func TestIntegration(t *testing.T) { assert.Equal(t, pb.Status_STATUS_SUCCESS, r.Status) assert.Equal(t, r.Reason, "") assert.Equal(t, r.Data, map[string]string{"req_guid": "1234"}) - }) t.Run("Should response with success when JSON request is processed", func(t *testing.T) { @@ -245,7 +245,10 @@ func TestIntegration(t *testing.T) { }) t.Run("Should response with success when correct GRPC request is processed", func(t *testing.T) { - opts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()} + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + } conn, err := grpc.Dial(grpcServerAddr, opts...) if err != nil { @@ -281,7 +284,6 @@ func TestIntegration(t *testing.T) { assert.Equal(t, r.Status.String(), pb.Status_STATUS_SUCCESS.String()) assert.Equal(t, r.Reason, "") assert.Equal(t, r.Data, map[string]string{"req_guid": "1234"}) - }) t.Run("Should response with success when request is processed", func(t *testing.T) { @@ -393,7 +395,6 @@ func TestIntegration(t *testing.T) { } } }) - }) t.Run("Should close connection when client is unresponsive", func(t *testing.T) { @@ -531,7 +532,7 @@ func TestIntegration(t *testing.T) { assert.Fail(t, fmt.Sprintf("failed to connect to http server. %v", err)) os.Exit(1) } - defer io.Copy(ioutil.Discard, res.Body) + defer io.Copy(io.Discard, res.Body) defer res.Body.Close() r := &pb.SendEventResponse{} err = json.NewDecoder(res.Body).Decode(r) @@ -580,7 +581,7 @@ func TestIntegration(t *testing.T) { assert.Fail(t, fmt.Sprintf("failed to connect to http server. %v", err)) os.Exit(1) } - defer io.Copy(ioutil.Discard, res.Body) + defer io.Copy(io.Discard, res.Body) defer res.Body.Close() r := &pb.SendEventResponse{} err = json.NewDecoder(res.Body).Decode(r) @@ -610,7 +611,7 @@ func TestIntegration(t *testing.T) { assert.Fail(t, fmt.Sprintf("failed to connect to http server. %v", err)) os.Exit(1) } - defer io.Copy(ioutil.Discard, res.Body) + defer io.Copy(io.Discard, res.Body) defer res.Body.Close() assert.Equal(t, res.StatusCode, http.StatusOK) assert.Contains(t, res.Header, "Access-Control-Allow-Origin") @@ -635,10 +636,9 @@ func TestIntegration(t *testing.T) { assert.Fail(t, fmt.Sprintf("failed to connect to http server. %v", err)) os.Exit(1) } - defer io.Copy(ioutil.Discard, res.Body) + defer io.Copy(io.Discard, res.Body) defer res.Body.Close() assert.Equal(t, res.StatusCode, http.StatusForbidden) assert.NotContains(t, res.Header, "Access-Control-Allow-Origin") }) - } diff --git a/metrics/metric_test.go b/metrics/metric_test.go index 1f1e9389..b40b5650 100644 --- a/metrics/metric_test.go +++ b/metrics/metric_test.go @@ -32,7 +32,6 @@ func (m *mockMetricInstrument) Count(metricName string, count int64, labels map[ } else { return nil } - } func (m *mockMetricInstrument) Gauge(metricName string, value interface{}, labels map[string]string) error { diff --git a/metrics/metrics.go b/metrics/metrics.go index 83ec8b24..2a8e3c04 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -86,7 +86,6 @@ func Histogram(metricName string, value int64, labels map[string]string) error { } func Setup() error { - if config.MetricPrometheus.Enabled && config.MetricStatsd.Enabled { return errors.New("only one instrumentation tool can be enabled") } diff --git a/metrics/prometheus.go b/metrics/prometheus.go index 5551a187..c126ac6a 100644 --- a/metrics/prometheus.go +++ b/metrics/prometheus.go @@ -57,7 +57,6 @@ func initPrometheusCollector() (*PrometheusCollector, error) { serveMux.Handle(config.MetricPrometheus.Path, promhttp.HandlerFor(p.registry, promhttp.HandlerOpts{Registry: p.registry})) go server.ListenAndServe() return p, nil - } func (p *PrometheusCollector) Count(metricName string, count int64, labels map[string]string) error { @@ -177,7 +176,7 @@ func getCounterMap() map[string]CounterVec { Help: "Request count"}, []string{"status", "reason", "conn_group"}) counters["events_duplicate_total"] = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "events_duplicate_total", - Help: "Total Number of Duplicate events recieved by the server"}, []string{"conn_group", "reason"}) + Help: "Total Number of Duplicate events received by the server"}, []string{"conn_group", "reason"}) counters["server_ping_failure_total"] = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "server_ping_failure_total", Help: "Total ping that server fails to send"}, []string{"conn_group"}) diff --git a/metrics/prometheus_test.go b/metrics/prometheus_test.go index 1ed29b6e..42c1d948 100644 --- a/metrics/prometheus_test.go +++ b/metrics/prometheus_test.go @@ -116,7 +116,6 @@ func (m *mockObserver) Observe(f float64) { } func (promSuite *PrometheusTestSuite) Test_Prometheus_Collector_Metrics_Initialised() { - // NOTE(turtledev): what are we even testing here? numCounters := 18 numGauge := 15 @@ -183,7 +182,7 @@ func (promSuite *PrometheusTestSuite) Test_Prometheus_Counter_Working() { mockCounter1.On("Add", float64(callArg1)) mockCounterVec2.On("With", promLabels2).Return(&mockCounter2) mockCounter2.On("Inc") - err = promSuite.instrument.Count(sampleCounterMetric1, int64(callArg1), labels1) + err = promSuite.instrument.Count(sampleCounterMetric1, callArg1, labels1) assert.NoError(promSuite.T(), err) err = promSuite.instrument.Increment(sampleCounterMetric2, labels2) assert.NoError(promSuite.T(), err) @@ -210,7 +209,7 @@ func (promSuite *PrometheusTestSuite) Test_Prometheus_Gauge_Working() { promSuite.instrument.gauges[sampleGaugeMetric] = &mockGaugeVec mockGaugeVec.On("With", promLabels).Return(&mockGauge) mockGauge.On("Set", float64(callArg)) - err = promSuite.instrument.Gauge(sampleGaugeMetric, int64(callArg), labels) + err = promSuite.instrument.Gauge(sampleGaugeMetric, callArg, labels) assert.NoError(promSuite.T(), err) mockGaugeVec.AssertCalled(promSuite.T(), "With", promLabels) mockGaugeVec.AssertNumberOfCalls(promSuite.T(), "With", 1) diff --git a/metrics/statsd.go b/metrics/statsd.go index 59290265..d65ad2cf 100644 --- a/metrics/statsd.go +++ b/metrics/statsd.go @@ -38,12 +38,11 @@ func (s *Statsd) Count(metricName string, count int64, labels map[string]string) tags := translateLabelIntoTags(labels) s.c.Count(withTags(metricName, tags), int(count)) return nil - } func (s *Statsd) Histogram(metricName string, value int64, labels map[string]string) error { tags := translateLabelIntoTags(labels) - s.c.Timing(withTags(metricName,tags), value) + s.c.Timing(withTags(metricName, tags), value) return nil } diff --git a/middleware/cors.go b/middleware/cors.go index a16315f3..3d94d1c4 100644 --- a/middleware/cors.go +++ b/middleware/cors.go @@ -24,7 +24,6 @@ func loadCors() { } else { cors = func(h http.Handler) http.Handler { return h } } - } func GetCors() func(http.Handler) http.Handler { diff --git a/publisher/kafka/kafka_test.go b/publisher/kafka/kafka_test.go index 878b09aa..6826a3e8 100644 --- a/publisher/kafka/kafka_test.go +++ b/publisher/kafka/kafka_test.go @@ -38,7 +38,8 @@ func TestKafka_ProduceBulk(suite *testing.T) { suite.Parallel() topic := "test_topic" suite.Run("AllMessagesSuccessfulProduce", func(t *testing.T) { - t.Run("Should return nil when all message succesfully published", func(t *testing.T) { + t.Parallel() + t.Run("Should return nil when all message successfully published", func(t *testing.T) { client := &mockClient{} client.On("Produce", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { go func() { diff --git a/publisher/kinesis/kinesis.go b/publisher/kinesis/kinesis.go index 7d0bd477..24af3697 100644 --- a/publisher/kinesis/kinesis.go +++ b/publisher/kinesis/kinesis.go @@ -34,7 +34,6 @@ type Publisher struct { } func (p *Publisher) ProduceBulk(events []*pb.Event, connGroup string) error { - ctx, cancel := context.WithTimeout(globalCtx, p.publishTimeout) defer cancel() errors := make([]error, len(events)) @@ -128,7 +127,6 @@ func (p *Publisher) ProduceBulk(events []*pb.Event, connGroup string) error { } func (p *Publisher) ensureStream(ctx context.Context, name string) error { - p.streamLock.RLock() exists := p.streams[name] p.streamLock.RUnlock() @@ -213,11 +211,10 @@ func WithStreamAutocreate(autocreate bool) Opt { } func WithStreamMode(mode types.StreamMode) Opt { - validModesList := types.StreamMode("").Values() validModes := map[types.StreamMode]bool{} for _, m := range validModesList { - validModes[types.StreamMode(m)] = true + validModes[m] = true } return func(p *Publisher) error { diff --git a/publisher/kinesis/kinesis_test.go b/publisher/kinesis/kinesis_test.go index 98e1fd09..2a9b98db 100644 --- a/publisher/kinesis/kinesis_test.go +++ b/publisher/kinesis/kinesis_test.go @@ -160,7 +160,6 @@ func readStream(client *kinesis_sdk.Client, arn string) ([][]byte, error) { } func TestKinesisProducer(t *testing.T) { - localstackHost := os.Getenv(envLocalstackHost) if strings.TrimSpace(localstackHost) == "" { t.Errorf("cannot run tests because %s env variable is not set", envLocalstackHost) @@ -176,7 +175,6 @@ func TestKinesisProducer(t *testing.T) { require.NoError(t, err) err = pub.ProduceBulk([]*pb.Event{testEvent}, "conn_group") require.Error(t, err) - }) t.Run("should return an error if an invalid stream mode is specified", func(t *testing.T) { @@ -211,7 +209,6 @@ func TestKinesisProducer(t *testing.T) { deleteStream(client, testEvent.Type) }) t.Run("should create the stream with mode = ON_DEMAND (default)", func(t *testing.T) { - pub, err := kinesis.New(client, kinesis.WithStreamAutocreate(true)) require.NoError(t, err) err = pub.ProduceBulk([]*pb.Event{testEvent}, "conn_group") diff --git a/publisher/pubsub/pubsub.go b/publisher/pubsub/pubsub.go index cc2a8f83..988a7ddf 100644 --- a/publisher/pubsub/pubsub.go +++ b/publisher/pubsub/pubsub.go @@ -37,7 +37,6 @@ type Publisher struct { } func (p *Publisher) ProduceBulk(events []*pb.Event, connGroup string) error { - ctx, cancel := context.WithTimeout(globalCtx, p.publishSettings.Timeout) defer cancel() @@ -67,7 +66,6 @@ func (p *Publisher) ProduceBulk(events []*pb.Event, connGroup string) error { "event_type": event.Type, }, ) - } errors[order] = err continue @@ -121,7 +119,6 @@ func (p *Publisher) ProduceBulk(events []*pb.Event, connGroup string) error { } func (p *Publisher) topic(ctx context.Context, id string) (*pubsub.Topic, error) { - p.topicLock.RLock() topic, exists := p.topics[id] p.topicLock.RUnlock() @@ -239,7 +236,6 @@ func WithTopicFormat(format string) Opt { // NewPubSub creates a new PubSub publisher func New(client *pubsub.Client, opts ...Opt) (*Publisher, error) { - p := &Publisher{ client: client, topicFormat: "%s", diff --git a/publisher/pubsub/pubsub_test.go b/publisher/pubsub/pubsub_test.go index 6353a1c7..13a62b51 100644 --- a/publisher/pubsub/pubsub_test.go +++ b/publisher/pubsub/pubsub_test.go @@ -80,7 +80,6 @@ func TestPubSubPublisher(t *testing.T) { }) t.Run("should return an error if topic doesn't exist and topic autocreate is set to false", func(t *testing.T) { - client, err := pubsubsdk.NewClient(context.Background(), testingProject) assert.NoError(t, err, "error creating pubsub client") @@ -95,7 +94,6 @@ func TestPubSubPublisher(t *testing.T) { }) t.Run("should set retention for a topic correctly", func(t *testing.T) { - client, err := pubsubsdk.NewClient(context.Background(), testingProject) assert.NoError(t, err, "error creating pubsub client") @@ -123,7 +121,6 @@ func TestPubSubPublisher(t *testing.T) { }) t.Run("should create the topic using topic format", func(t *testing.T) { - client, err := pubsubsdk.NewClient(context.Background(), testingProject) assert.NoError(t, err, "error creating pubsub client") @@ -152,7 +149,6 @@ func TestPubSubPublisher(t *testing.T) { }) t.Run("static topic creation test", func(t *testing.T) { - client, err := pubsubsdk.NewClient(context.Background(), testingProject) assert.NoError(t, err, "error creating pubsub client") diff --git a/services/grpc/handler.go b/services/grpc/handler.go index e027c9f8..1b212f32 100644 --- a/services/grpc/handler.go +++ b/services/grpc/handler.go @@ -56,7 +56,6 @@ func (h *Handler) SendEvent(ctx context.Context, req *pb.SendEventRequest) (*pb. AckFunc: h.Ack(responseChannel, req.ReqGuid, identifier.Group), }) return <-responseChannel, nil - } func (h *Handler) Ack(responseChannel chan *pb.SendEventResponse, reqGuid, connGroup string) collector.AckFunc { diff --git a/services/rest/handler.go b/services/rest/handler.go index 66e7b735..5556c19a 100644 --- a/services/rest/handler.go +++ b/services/rest/handler.go @@ -3,7 +3,6 @@ package rest import ( "fmt" "io" - "io/ioutil" "net/http" "time" @@ -93,10 +92,10 @@ func (h *Handler) RESTAPIHandler(rw http.ResponseWriter, r *http.Request) { return } - defer io.Copy(ioutil.Discard, r.Body) + defer io.Copy(io.Discard, r.Body) defer r.Body.Close() - b, err := ioutil.ReadAll(r.Body) + b, err := io.ReadAll(r.Body) if err != nil { logger.Errorf(fmt.Sprintf("[rest.GetRESTAPIHandler] %s error reading request body, error: %v", identifier, err)) metrics.Increment("batches_read_total", map[string]string{"status": "failed", "reason": "readerr", "conn_group": identifier.Group}) diff --git a/services/rest/response_test.go b/services/rest/response_test.go index 5c242b80..ff785619 100644 --- a/services/rest/response_test.go +++ b/services/rest/response_test.go @@ -213,7 +213,6 @@ func TestResponse_SetDataMap(t *testing.T) { } func TestResponse_Write(t *testing.T) { - res := &pb.SendEventResponse{ Status: pb.Status_STATUS_SUCCESS, Code: pb.Code_CODE_OK, diff --git a/services/rest/service.go b/services/rest/service.go index e762da50..8c8ca3e3 100644 --- a/services/rest/service.go +++ b/services/rest/service.go @@ -56,9 +56,7 @@ func pingHandler(w http.ResponseWriter, r *http.Request) { } func reportConnectionMetrics(conn connection.Table) { - t := time.Tick(config.MetricInfo.RuntimeStatsRecordInterval) - for { - <-t + for range time.Tick(config.MetricInfo.RuntimeStatsRecordInterval) { for k, v := range conn.TotalConnectionPerGroup() { metrics.Gauge("connections_count_current", v, map[string]string{"conn_group": k}) } diff --git a/services/rest/websocket/connection/upgrader_test.go b/services/rest/websocket/connection/upgrader_test.go index 1d4eca0f..3c449356 100644 --- a/services/rest/websocket/connection/upgrader_test.go +++ b/services/rest/websocket/connection/upgrader_test.go @@ -33,8 +33,8 @@ var config = UpgraderConfig{ WriteBufferSize: 10240, CheckOrigin: false, MaxUser: 2, - PongWaitInterval: time.Duration(60 * time.Second), - WriteWaitInterval: time.Duration(5 * time.Second), + PongWaitInterval: 60 * time.Second, + WriteWaitInterval: 5 * time.Second, ConnIDHeader: "X-User-ID", ConnGroupHeader: "", ConnGroupDefault: "--default--", @@ -176,6 +176,7 @@ func TestConnectionRejection(t *testing.T) { // Prepare a websocket server with given upgrader and establish the connections with the given headers as many as given headers. func upgradeConnectionTestHelper(t *testing.T, upgrader *Upgrader, headers []http.Header, f assertUpgrade) { + t.Helper() res := make(chan upgradeRes) m := sync.Mutex{} iteration := 0 diff --git a/services/rest/websocket/handler_test.go b/services/rest/websocket/handler_test.go index d2f4107b..b2baf957 100644 --- a/services/rest/websocket/handler_test.go +++ b/services/rest/websocket/handler_test.go @@ -114,8 +114,8 @@ func TestHandler_GETHandlerWSEvents(t *testing.T) { WriteBufferSize: 10240, CheckOrigin: false, MaxUser: 2, - PongWaitInterval: time.Duration(60 * time.Second), - WriteWaitInterval: time.Duration(5 * time.Second), + PongWaitInterval: 60 * time.Second, + WriteWaitInterval: 5 * time.Second, ConnIDHeader: "X-User-ID", ConnGroupHeader: "string", }) @@ -169,6 +169,8 @@ func TestHandler_GETHandlerWSEvents(t *testing.T) { wss, _, err := websocket.DefaultDialer.Dial(url, http.Header{ "X-User-ID": []string{"test2-user2"}, }) + require.NoError(t, err) + defer wss.Close() require.NoError(t, err) diff --git a/worker/worker.go b/worker/worker.go index e56a2c98..8bf48372 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -42,10 +42,8 @@ func CreateWorkerPool(size int, eventsChannel <-chan collector.CollectRequest, d } func (w *Pool) worker(name string) { - logger.Info("Running worker: " + name) for request := range w.EventsChannel { - metrics.Histogram( "batch_idle_in_channel_milliseconds", time.Since(request.TimePushed).Milliseconds(),