Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Commit

Permalink
Feature/issue28 test kpl properties (#30)
Browse files Browse the repository at this point in the history
* Upgrading KPL/KCL libraries, replacing deprecated shutdown calls

* auto format from compile

* restructure the ProducerConf into its own file and spec

* further refactoring of Producer to use ProducerConf

* tests pass - ish

* improve test exception for manager

* Fix Intermittent failing test - #10

* refactored producer to remove pointless trait, updated readme and specs

* removed unused config

* fix scalafmt

* address potters comments

* address formatting issues

* added ConsumerConfig test plus missing consumer fields

* improved thread reference docs

* removed comment
  • Loading branch information
markglh authored and agaro1121 committed Sep 11, 2017
1 parent fc7fee2 commit 6fe5371
Show file tree
Hide file tree
Showing 11 changed files with 776 additions and 326 deletions.
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -410,22 +410,21 @@ kpa ! Send(producerEvent) //Send without a callback confirmation

<a name="usage-usage-producer-pure-scala-based-implementation-simple-wrapper-around-kpl"></a>
### Pure Scala based implementation (simple wrapper around KPL)
*Note that throttling will be unavailable using this method.*
*Note that future throttling will be unavailable using this method.*

```scala
import java.util.UUID
import com.amazonaws.services.kinesis.producer.{UserRecordFailedException, UserRecordResult}
import com.weightwatchers.reactive.kinesis.producer.KinesisProducer
import com.weightwatchers.reactive.kinesis.producer.ProducerConf
import com.typesafe.config._
import com.weightwatchers.reactive.kinesis.models._
import com.weightwatchers.reactive.kinesis.producer.KinesisProducerKPL
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global //Not for production

val kinesisConfig: Config = ConfigFactory.load().getConfig("kinesis")
val producerConfig: Config = kinesisConfig.getConfig("some-producer")
val streamName: String = producerConfig.getString("stream-name")

val kpl = KinesisProducerKPL(kinesisConfig.getConfig("kpl"), streamName)
val kpl = KinesisProducer(ProducerConf(kinesisConfig, "some-producer"))

val producerEvent = ProducerEvent(UUID.randomUUID.toString, "{Some Payload}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor.{
SendSuccessful,
SendWithCallback
}
import com.weightwatchers.reactive.kinesis.producer.{KinesisProducerActor, KinesisProducerKPL}
import com.weightwatchers.reactive.kinesis.producer.{KinesisProducerActor, KinesisProducer}

import scala.collection.mutable
import scala.concurrent.ExecutionContext.Implicits.global
Expand Down Expand Up @@ -62,7 +62,7 @@ class SimpleKinesisProducer(kConfig: Config) extends Actor with LazyLogging {
//We're creating the producer the hard way to get access to the underlying KPL
val kpaProps = KinesisProducerActor.props(kinesisConfig, "testProducer")
val kpa = context.actorOf(kpaProps)
val kinesisProducerKPL = kpaProps.args.head.asInstanceOf[KinesisProducerKPL]
val kinesisProducerKPL = kpaProps.args.head.asInstanceOf[KinesisProducer]

/* producer without actor:
val producerConfig = kinesisConfig.getConfig("testProducer")
Expand Down
37 changes: 30 additions & 7 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ kinesis {
# Default: 443
# Minimum: 1
# Maximum (inclusive): 65535
#KinesisPort = 443
# KinesisPort = 443

# If true, throttled puts are not retried. The records that got throttled
# will be failed immediately upon receiving the throttling error. This is
Expand Down Expand Up @@ -379,12 +379,18 @@ kinesis {

# Sets the threading model that the native process will use.
# Enum:
# ThreadingModel.PER_REQUEST: Tells the native process to create a thread for each request.
# ThreadingModel.POOLED: Tells the native process to use a thread pool. The size of the pool can be controlled by ThreadPoolSize
# Default = ThreadingModel.PER_REQUEST
# PER_REQUEST: Tells the native process to create a thread for each request.
# Under heavy load this can create a very large number of threads, which may cause resource exhaustion.
# POOLED: Tells the native process to use a thread pool. The size of the pool can be controlled by ThreadPoolSize
# This uses a queue, and thread pool to execute requests to Kinesis.
# This limits the number of threads that the native process may use.
# Under extremely heavy load this can increase latency significantly more than the per request model.
#
# Default = PER_REQUEST
# ThreadingModel =

# Sets the maximum number of threads that the native process' thread pool will be configured with.
#
# Default: 0
# ThreadPoolSize =
}
Expand Down Expand Up @@ -527,11 +533,11 @@ kinesis {
# http://developer.amazonwebservices.com/connect/entry.jspa?externalID=3912
#
# Default: null
#kinesisEndpoint = https://kinesis
#kinesisEndpoint = "https://kinesis"

# DynamoDB endpoint
# Default: null
#DynamoDBEndpoint =
#dynamoDBEndpoint = "https://dynamo"

# Don't call processRecords() on the record processor for empty record lists.
# Enables applications flush/checkpoint (if they have some data "in progress but don't get new data for while)
Expand Down Expand Up @@ -582,6 +588,13 @@ kinesis {
# Default: Operation, ShardId
#metricsEnabledDimensions = Operation, ShardId

# Sets the max size of the thread pool that will be used to renew leases.
# Setting this to low may starve the lease renewal process, and cause the worker to lose leases at a higher rate.
#
# Min: 2
# Default: 20
#maxLeaseRenewalThreads=20


# The max number of leases (shards) this worker should process.
# This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints
Expand Down Expand Up @@ -628,7 +641,17 @@ kinesis {

# TableName name of the lease table in DynamoDB
# Default = <applicationName>
#TableName =
#tableName =

# A timeout when dispatching records to the client MultiLang record processor.
# If the record processor doesn't respond within the timeout the parent Java process will be terminated.
# This is a temporary fix to handle cases where the KCL becomes blocked while waiting for a client record processor.
# Setting this can cause the KCL to exit suddenly,
# before using this ensure that you have an automated restart for your application
#
# Default: no timeout
#timeoutInSeconds =

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,64 +29,7 @@ import com.weightwatchers.reactive.kinesis.utils.{FutureUtils, TypesafeConfigExt

import scala.concurrent.{ExecutionContextExecutor, Future}

trait KinesisProducer {

/**
* Adds a message to the next batch to be sent to the configured stream.
*
* @return On success: Future{UserRecordResult}
* On failure: Future.failed(...): Any Throwable related to put.
* @see Callee `com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord`
* @see UserRecordResult
* @see KinesisProducerConfiguration#setRecordTtl(long)
* @see UserRecordFailedException
*/
def addUserRecord(event: ProducerEvent)(
implicit ec: ExecutionContextExecutor
): Future[UserRecordResult]

/**
* Get the number of unfinished records currently being processed. The
* records could either be waiting to be sent to the child process, or have
* reached the child process and are being worked on.
*
* <p>
* This is equal to the number of futures returned from [[addUserRecord]]
* that have not finished.
*
* This is useful for applying backpressure and throttling the number of concurrent Futures.
*
* @return The number of unfinished records currently being processed.
*/
def outstandingRecordsCount(): Int

/**
* Firstly, blocks whilst all all records are complete (either succeeding or failing).
*
* <p>
*
* The includes whilst any retries are performed. Depending on
* your configuration of record TTL and request timeout, this can
* potentially take a long time if the library is having trouble delivering
* records to the backend, for example due to network problems.
*
* <p>
*
* Finally the [[KinesisProducer]] is destroyed, preventing further use.
*
* @throws com.amazonaws.services.kinesis.producer.DaemonException if the child process is dead //TODO - handle this better?
* @see [[AWSKinesisProducer]]
*/
def stop(): Unit

/**
* @return true if the [[KinesisProducer]] has been stopped & destroyed.
*/
def destroyed(): Boolean

}

object KinesisProducerKPL extends LazyLogging {
object KinesisProducer extends LazyLogging {

/**
* The config passed is expected to contain the AWS KPL properties at the top level.
Expand All @@ -100,9 +43,11 @@ object KinesisProducerKPL extends LazyLogging {
* @param credentialsProvider A specific CredentialsProvider. The KCL defaults to DefaultAWSCredentialsProviderChain.
* @return an instantiated [[KinesisProducer]]
*/
@deprecated("Use KinesisProducer(producerConf: ProducerConf) instead", "v0.5.7")
def apply(kplConfig: Config,
streamName: String,
credentialsProvider: Option[AWSCredentialsProvider] = None): KinesisProducer = {

import TypesafeConfigExtensions._

// We directly load our properties into the KPL as a Java `Properties` object
Expand All @@ -117,7 +62,32 @@ object KinesisProducerKPL extends LazyLogging {
KinesisProducerConfiguration.fromProperties(kplProps)
credentialsProvider.foreach(kplLibConfiguration.setCredentialsProvider)

new KinesisProducerKPL(new AWSKinesisProducer(kplLibConfiguration), streamName)
new KinesisProducer(new AWSKinesisProducer(kplLibConfiguration), streamName)
}

/**
* The config passed is expected to contain the AWS KPL properties at the top level.
*
* @param producerConf An instance of [[ProducerConf]] which contains all required configuration for the KPL.
* @return an instantiated [[KinesisProducer]]
*/
def apply(producerConf: ProducerConf): KinesisProducer = {
apply(producerConf.kplLibConfiguration, producerConf.streamName)
}

/**
* The [[KinesisProducerConfiguration]] argument is passed directly to the KPL library.
* This constructor makes no use of the Typesafe config.
*
* @see `src/it/resources/reference.conf` for a more detailed example.
* @param kplConfig An instance of the underlying [[KinesisProducerConfiguration]] to be passed
* directly to the library.
* @param streamName Th name of the Kinesis stream, which must exist.
* @return an instantiated [[KinesisProducer]]
*/
def apply(kplConfig: KinesisProducerConfiguration, streamName: String): KinesisProducer = {
//TODO add logging
new KinesisProducer(new AWSKinesisProducer(kplConfig), streamName)
}
}

Expand All @@ -126,9 +96,7 @@ object KinesisProducerKPL extends LazyLogging {
*
* To create an instance of this class, we recommend using the apply method to instantiate from config.
*/
class KinesisProducerKPL(kinesis: AWSKinesisProducer, streamName: String)
extends LazyLogging
with KinesisProducer {
class KinesisProducer(kinesis: AWSKinesisProducer, streamName: String) extends LazyLogging {

val underlying = kinesis
private var _destroyed = false
Expand All @@ -137,9 +105,16 @@ class KinesisProducerKPL(kinesis: AWSKinesisProducer, streamName: String)
//TODO seems difficult to get access to stream specific operations from producer

/**
* @see [[KinesisProducer]].addUserRecord
* Adds a message to the next batch to be sent to the configured stream.
*
* @return On success: Future{UserRecordResult}
* On failure: Future.failed(...): Any Throwable related to put.
* @see Callee `com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord`
* @see UserRecordResult
* @see KinesisProducerConfiguration#setRecordTtl(long)
* @see UserRecordFailedException
*/
override def addUserRecord(
def addUserRecord(
event: ProducerEvent
)(implicit ec: ExecutionContextExecutor): Future[UserRecordResult] = {
assert(!_destroyed, "Kinesis has been destroyed, no longer accepting messages") //TODO specific exception?
Expand All @@ -148,24 +123,47 @@ class KinesisProducerKPL(kinesis: AWSKinesisProducer, streamName: String)
}

/**
* @see [[KinesisProducer]].outstandingRecordsCount()
* Get the number of unfinished records currently being processed. The
* records could either be waiting to be sent to the child process, or have
* reached the child process and are being worked on.
*
* <p>
* This is equal to the number of futures returned from [[addUserRecord]]
* that have not finished.
*
* This is useful for applying backpressure and throttling the number of concurrent Futures.
*
* @return The number of unfinished records currently being processed.
*/
override def outstandingRecordsCount(): Int = {
def outstandingRecordsCount(): Int = {
kinesis.getOutstandingRecordsCount
}

/**
* @see [[KinesisProducer]].stop()sbt publish
* Firstly, blocks whilst all all records are complete (either succeeding or failing).
*
* <p>
*
* The includes whilst any retries are performed. Depending on
* your configuration of record TTL and request timeout, this can
* potentially take a long time if the library is having trouble delivering
* records to the backend, for example due to network problems.
*
* <p>
*
* Finally the [[KinesisProducer]] is destroyed, preventing further use.
*
* @throws com.amazonaws.services.kinesis.producer.DaemonException if the child process is dead //TODO - handle this better?
* @see [[AWSKinesisProducer]]
*/
override def stop(): Unit = {
def stop(): Unit = {
kinesis.flushSync() //This blocks until all records are flushed
kinesis.destroy()
_destroyed = true
}

/**
* @see [[KinesisProducer]]destroyed()
* @return true if the [[KinesisProducer]] has been stopped & destroyed.
*/
override def destroyed(): Boolean = _destroyed
def destroyed(): Boolean = _destroyed
}
Loading

0 comments on commit 6fe5371

Please sign in to comment.