Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Commit

Permalink
Feature/integration testing (#34)
Browse files Browse the repository at this point in the history
Added automated integration testing to the library, running tests against a dockerized (localstack) Kinesis instance.
  • Loading branch information
markglh authored Dec 3, 2017
1 parent 5bff161 commit 2b25839
Show file tree
Hide file tree
Showing 11 changed files with 359 additions and 68 deletions.
14 changes: 13 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@ sudo: false
language: scala
scala:
- 2.11.11
- 2.12.2
- 2.12.4
jdk:
- oraclejdk8
cache:
directories:
- "$HOME/.m2/repository"
- "$HOME/.sbt"
- "$HOME/.ivy2"
before_script:
- docker-compose -f localstack/docker-compose.yml pull
- docker-compose -f localstack/docker-compose.yml up -d
- docker ps -a
script:
- sbt ++$TRAVIS_SCALA_VERSION clean coverage scalafmtTest test
- sbt ++$TRAVIS_SCALA_VERSION it:test
# Tagged releases
- if [ $TRAVIS_TEST_RESULT -eq 0 -a "$TRAVIS_PULL_REQUEST" = "false" ] && [[ "$TRAVIS_TAG"
=~ ^v[0-9]+\.[0-9]+(\.[0-9]+)?(-M[0-9]+)?$ ]]; then echo "** Publishing Release
Expand All @@ -23,14 +28,21 @@ script:
= "master" ]; then echo "** Publishing Snapshot from master **" && tar xf secrets.tar
&& mkdir ~/.bintray && cp publishing/.credentials ~/.bintray/ && cp publishing/.artifactory
~/.bintray/ && sbt ++$TRAVIS_SCALA_VERSION publish; fi
after_script:
- docker-compose -f localstack/docker-compose.yml down
after_success:
- sbt ++$TRAVIS_SCALA_VERSION coverageReport coveralls
before_cache:
- find $HOME/.ivy2/cache -name "ivydata-*.properties" -print -delete
- find $HOME/.sbt -name "*.lock" -print -delete
env:
global:
- DOCKER_COMPOSE_VERSION=1.17.1
- secure: KffThTeOg0ELt+iw5ZNtDW6x4/1iKIj1v91Yq6earF+YQquusROqt/MtdmyXRG9ho0c/xnB82BJHrBsvac6SwsPUZ/cRvQ0KML3bWyCMCL2B/SurhjxbahAWdgQGj67KO1n72q24cqyRMb9wCCOwy4p/7qomBYM+m8YRLc6YOxxj6DKriIiqOq8jFmt/nktpCUhXVURYvFxHtHQvLutaWWICwGZX1ibyloCmC3wWPMNOFc/1FBqLXNQTUTKuYOOC46TJ5janON1E2obDKZyGf/Q/ZEuNymcJYKi6/7XWzN24Ep194PLGj29J44Zpp9P36hnIiET8dQ1s2Dafo+alubqFEUaP4fV8XyYu5ZtTt5A0wxKEe/u7N4SdxzUZF/zBszCLRL2uJhQlfC7af6keCpIVoKLDi+9b6zpSVLDZmE1UmFuu8LLoMY2s2xRt6vHXTSiyPrIJaIFyaryxjhH+KmKNE0GW1jzVkPc6NpxjnVtF2E7LYogHWHzJLoMpBtTQoAeJlKvtbEmWrZXO9TGPFzu0fb+x/H0Ocre2QDSnjKobtMZs6S8zvD+STfmrZ8ZtZ0H0HNbGJW/rPkSZFLunOPXWxSZzNoWNrzg9LCoPtdkVsa7BYiAY9fM9xNX8SOx/kRZtw52el71jK8mDCV/lQjzjBwyo3AZl/jOx3cbWrAM=
- secure: KClWy/cCpMSKyJmVNOt8Dmwfatgx/9KsffLcL/BlXUYsBkB3jwdr9SzFok1ArGd02FzrFQU739/wnSKpmsW1fEO64tlh+qJwW76+gFlqWmOjc6r7ENH2u6LGLfRYeD4r76CiEV9mnU5GVMUPJIfbxlP0zB7ukzSGZwbkJ4WBqlxmurRr70jWz2F3ytJM0r31rpqc+ypHi58vj0NPBO7bR6XI4kquW9cDpCCgj4CVVshpsxUB1N4RPzcGsxUPucigH71lJ0sZjnEjSB+E95/Ovbf7PuPYnY0N8S6hLEgTyh7xAcxNpMsFgd1GInb92hrXLujUZU5N2BeX7jnYVa90q55AZd+IQqmJU+/4tBBrgWvppMIiTbbU5pnDoNqHNWmfB4wtbxwNW4y4tQH+/wSVPhgGYOWfsj3nLNnyfcjRAClQzEWCoAbx7Tkgj1OmvrOqwwmiPz3dFMInawQ6RkisqwgjT/sAhNGa2i26Tpj5Sd8XAjxU61MAzsr8gpVdA6TJcSdHkHC2zZVn+64Cc9WHJ9I6WbRD0fR3pFeTbQv+0SOUWV1owqtdGLl6z0gdI5rLZIYO7SMeRPjTYf/5NV/WxF2LcJu06LHYpzURaGQHp6T5NH2eEZof1Dt5tqa2eQ7KfZyui3rz3WB0AQlJSPK9yL78BvZzNlCNWbv5wLiW6hQ=
before_install:
- sudo rm /usr/local/bin/docker-compose
- curl -L https://github.com/docker/compose/releases/download/${DOCKER_COMPOSE_VERSION}/docker-compose-`uname -s`-`uname -m` > docker-compose
- chmod +x docker-compose
- sudo mv docker-compose /usr/local/bin
- 'if [ "$TRAVIS_PULL_REQUEST" = "false" ]; then openssl aes-256-cbc -K $encrypted_09ef3e55c311_key -iv $encrypted_09ef3e55c311_iv -in secrets.tar.enc -out secrets.tar -d; fi'
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ It's worth familiarising yourself with [Sequence numbers and Sub sequence number
* [FAQ](#faq)
* [Contributor Guide](#contributor-guide)
* [Code Formatting](#contributor-guide-code-formatting)
* [Integration Tests](#contributor-guide-integration-tests)
* [Tag Requirements](#contributor-guide-tag-requirements)
* [Version information](#contributor-guide-tag-requirements-version-information)
* [Valid Release Tag Examples:](#contributor-guide-tag-requirements-valid-release-tag-examples)
Expand Down Expand Up @@ -521,6 +522,12 @@ This project uses [scalafmt](http://scalameta.org/scalafmt/) and will automatica

Please run `sbt scalafmt` before committing and pushing changes.

<a name="contributor-guide-integration-tests"></a>
## Integration tests
As part of the travis build, integration tests will run against a Kinesis localstack instance. You can run these locally as follows:
* `docker-compose -f localstack/docker-compose.yml up`
* `sbt it:test`

<a name="contributor-guide-tag-requirements"></a>
## Tag Requirements
Uses tags and [sbt-git](https://github.com/sbt/sbt-git) to determine the current version.
Expand Down
16 changes: 10 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ lazy val library =
"org.scalactic" %% "scalactic" % Version.scalaTest % Compile)

val testing = Seq(
"org.scalatest" %% "scalatest" % Version.scalaTest % Test,
"org.scalacheck" %% "scalacheck" % Version.scalaCheck % Test,
"com.typesafe.akka" %% "akka-testkit" % Version.akka % Test,
"org.mockito" % "mockito-core" % "2.7.15" % Test,
"org.scalatest" %% "scalatest" % Version.scalaTest % "it,test",
"org.scalacheck" %% "scalacheck" % Version.scalaCheck % "it,test",
"com.typesafe.akka" %% "akka-testkit" % Version.akka % "it,test",
"org.mockito" % "mockito-core" % "2.7.15" % "it,test",
"io.kamon" %% "kamon-core" % Version.kamon % Test,
"io.kamon" %% "kamon-akka-2.4" % Version.kamon % Test,
"io.kamon" %% "kamon-statsd" % Version.kamon % Test,
Expand Down Expand Up @@ -131,7 +131,7 @@ lazy val commonSettings =
"-Ywarn-infer-any", // Warn when a type argument is inferred to be `Any`.
"-Ywarn-nullary-override", // Warn when non-nullary `def f()' overrides nullary `def f'.
"-Ywarn-nullary-unit", // Warn when nullary methods return Unit.
"-Ywarn-numeric-widen" // Warn when numerics are widened.
"-Ywarn-numeric-widen" // Warn when numerics are widened.
),
scalacOptions in (Compile, doc) ++= Seq(
"-no-link-warnings" // Suppresses problems with Scaladoc @throws links
Expand All @@ -146,7 +146,11 @@ lazy val commonSettings =
val project = Project.extract(state).currentRef.project
s"[$project]> "
},
parallelExecution in Test := false
parallelExecution in Test := false,
parallelExecution in IntegrationTest := false,
fork in IntegrationTest := true,
javaOptions in IntegrationTest += "-Dcom.amazonaws.sdk.disableCertChecking=true",
envVars in IntegrationTest += ("AWS_CBOR_DISABLE" -> "true")
)

/* This allows to derive an sbt version string from the git information.
Expand Down
23 changes: 23 additions & 0 deletions localstack/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=<AWS_ACCESS_KEY_ID>
AWS_SECRET_ACCESS_KEY=<AWS_SECRET_ACCESS_KEY>
AWS_CBOR_DISABLE=true
APPLICATION_NAME=myReallyAwesomeApplication
KINESIS_STREAM_NAME=my-stream
KPL_KINESIS_ENDPOINT=localhost
KPL_KINESIS_PORT=4568
KPL_CLOUDWATCH_ENDPOINT=localhost
KPL_CLOUDWATCH_PORT=4582
KPL_VERIFY_CERTIFICATE=false
KCL_KINESIS_ENDPOINT=https://localhost:4568
KCL_DYNAMODB_ENDPOINT=https://localhost:4569
ADDITIONAL_JAVA_OPTS=-Dcom.amazonaws.sdk.disableCertChecking=true
LOCALSTACK_SERVICES=kinesis,dynamodb,sqs,cloudwatch,cloudformation
LOCALSTACK_HOSTNAME=localhost
LOCALSTACK_USE_SSL=true
LOCALSTACK_KINESIS_ERROR_PROBABILITY=0.0
LOCALSTACK_DYNAMODB_ERROR_PROBABILITY=0.0
LOCALSTACK_LAMBDA_EXECUTOR=local
LOCALSTACK_LAMBDA_REMOTE_DOCKER=false
LOCALSTACK_DATA_DIR=/tmp/localstack/data
LOCALSTACK_DOCKER_IMAGE_TAG=0.7.5
27 changes: 27 additions & 0 deletions localstack/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
version: "2.3"

services:
localstack:
image: markglh/initialised-localstack:0.8.3
volumes:
- ./templates:/opt/bootstrap/templates
#network_mode: "host"
environment:
- "SERVICES=${LOCALSTACK_SERVICES:-kinesis,dynamodb,cloudwatch,cloudformation}"
- "DEFAULT_REGION=${AWS_REGION:-us-east-1}"
- "HOSTNAME=${LOCALSTACK_HOSTNAME:-localhost}"
- "HOSTNAME_EXTERNAL=${LOCALSTACK_HOSTNAME_EXTERNAL:-localhost}"
- "USE_SSL=true"
#- "DATA_DIR=${LOCALSTACK_DATA_DIR:-/tmp/localstack/data}"
ports:
- "4567-4582:4567-4582"
- "8080:8080"

# Ensures the health check runs
dummy-service:
image: alpine:3.5
depends_on:
localstack:
condition: service_healthy


23 changes: 23 additions & 0 deletions localstack/templates/cftemplate.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
AWSTemplateFormatVersion: '2010-09-09'
Description: CloudFormation Int Test Template
Resources:
KinesisStream1:
Type: AWS::Kinesis::Stream
Properties:
Name: int-test-stream-1
ShardCount: 1
KinesisStream2:
Type: AWS::Kinesis::Stream
Properties:
Name: int-test-stream-2
ShardCount: 1
KinesisStream3:
Type: AWS::Kinesis::Stream
Properties:
Name: int-test-stream-3
ShardCount: 1
KinesisStream4:
Type: AWS::Kinesis::Stream
Properties:
Name: int-test-stream-4
ShardCount: 1
58 changes: 0 additions & 58 deletions src/it/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,59 +1 @@

kamon {
metric.filters {
akka-actor {
includes = ["test-system/user/simple-consumer/consumer-worker", "test-system/user/simple-consumer/consumer-worker*/checkpoint-worker*"]
excludes = ["test-system/system/**"]
}

akka-dispatcher {
includes = ["test-system/akka.actor.default-dispatcher", "test-system/kinesis.akka.default-dispatcher"]
}

trace {
includes = [ "**" ]
excludes = []
}

#akka-router {
# includes = [ "test-system/user/some-router" ]
#}
}

statsd {
# Hostname and port in which your StatsD is running. Remember that StatsD packets are sent using UDP and
# setting unreachable hosts and/or not open ports wont be warned by the Kamon, your data wont go anywhere.
hostname = "127.0.0.1"
port = 8125

# Interval between metrics data flushes to StatsD. It's value must be equal or greater than the
# kamon.metrics.tick-interval setting.
flush-interval = 1 second

# Max packet size for UDP metrics data sent to StatsD.
max-packet-size = 1024 bytes

# Subscription patterns used to select which metrics will be pushed to StatsD. Note that first, metrics
# collection for your desired entities must be activated under the kamon.metrics.filters settings.
includes {
actor = ["*"]
trace = ["*"]
dispatcher = ["*"]
}

simple-metric-key-generator {
# Application prefix for all metrics pushed to StatsD. The default namespacing scheme for metrics follows
# this pattern:
# application.host.entity.entity-name.metric-name
application = "kinesis-test"
}
}

# modules can be disabled at startup using yes/no arguments.
modules {
kamon-log-reporter.auto-start = no
kamon-system-metrics.auto-start = no
kamon-statsd.auto-start = no
kamon-akka.auto-start = no
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright 2017 WeightWatchers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.weightwatchers.reactive.kinesis

import java.io.File

import com.amazonaws.services.kinesis.producer.{KinesisProducer => AWSKinesisProducer}
import com.typesafe.config.ConfigFactory
import com.weightwatchers.reactive.kinesis.common.{KinesisTestConsumer, TestCredentials}
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf
import com.weightwatchers.reactive.kinesis.models.ProducerEvent
import com.weightwatchers.reactive.kinesis.producer.{KinesisProducer, ProducerConf}
import org.scalatest.concurrent.Eventually
import org.scalatest.mockito.MockitoSugar
import org.scalatest.time.{Millis, Seconds, Span}
import org.scalatest.{BeforeAndAfterAll, FreeSpec, Matchers}

import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random

//scalastyle:off magic.number
class KinesisProducerIntegrationSpec
extends FreeSpec
with Matchers
with MockitoSugar
with BeforeAndAfterAll
with Eventually {

implicit val ece = scala.concurrent.ExecutionContext.global

val defaultKinesisConfig =
ConfigFactory.parseFile(new File("src/main/resources/reference.conf")).getConfig("kinesis")

val kinesisConfig = ConfigFactory
.parseString(
"""
|kinesis {
|
| application-name = "ScalaProducerTestSpec"
|
| testProducer {
| stream-name = "int-test-stream-1"
|
| kpl {
| Region = us-east-1
|
| CloudwatchEndpoint = localhost
| CloudwatchPort = 4582
|
| KinesisEndpoint = localhost
| KinesisPort = 4568
|
| VerifyCertificate = false
| }
| }
|
| testConsumer {
| stream-name = "int-test-stream-1"
|
| kcl {
| AWSCredentialsProvider = "com.weightwatchers.reactive.kinesis.common.TestCredentials|foo|bar"
| regionName = us-east-1
| KinesisEndpoint = "https://localhost:4568"
| DynamoDBEndpoint = "https://localhost:4569"
|
|
| metricsLevel = None
| }
|
| }
|}
""".stripMargin
)
.getConfig("kinesis")
.withFallback(defaultKinesisConfig)

implicit override val patienceConfig: PatienceConfig =
PatienceConfig(timeout = Span(5, Seconds), interval = Span(100, Millis))

val consumer: KinesisTestConsumer =
KinesisTestConsumer.from(ConsumerConf(kinesisConfig, "testConsumer"), Some(100 millis))

override protected def afterAll(): Unit = {
consumer.shutdown
}

"The KinesisProducer" - {

"Should publish a message to a stream" in {

val producerConf =
ProducerConf(kinesisConfig, "testProducer", Some(TestCredentials.Credentials))
val producer = KinesisProducer(producerConf)

val existingRecordCount = consumer.retrieveRecords(producerConf.streamName, 10).size

val event = ProducerEvent("1234", Random.alphanumeric.take(10).mkString)
producer.addUserRecord(event)

eventually {
val records: Seq[String] = consumer.retrieveRecords(producerConf.streamName, 10)
records.size shouldBe (existingRecordCount + 1)
records should contain(
new String(event.payload.array(), java.nio.charset.StandardCharsets.UTF_8)
)
}
}
}
}

//scalastyle:on
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@ import com.weightwatchers.reactive.kinesis.consumer.ConsumerWorker.{
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf
import com.weightwatchers.reactive.kinesis.models.CompoundSequenceNumber
import kamon.Kamon
import org.joda.time.{DateTime, DateTimeZone, Period}

import scala.collection.mutable.ListBuffer
import com.weightwatchers.eventing.system

object RunSimpleConsumer extends App {
Kamon.start()
val consumer = system.actorOf(SimpleKinesisConsumer.props, "simple-consumer")
}

Expand Down Expand Up @@ -109,7 +107,6 @@ class SimpleKinesisConsumer(kinesisConfig: Config) extends Actor with LazyLoggin
logger.error(s"\n\nFailed pit stop check @ $totalReceived!\n\n")
logger.error(s"\n\nFailed pit stop check @ $totalReceived!\n\n")
context.stop(self)
Kamon.shutdown()
System.exit(3)
} else {
logger.warn(s"\n\n**** PIT STOP OK: $totalVerified records verified\n\n")
Expand Down
Loading

0 comments on commit 2b25839

Please sign in to comment.