Skip to content

Commit

Permalink
Merge pull request #2 from StreetContxt/add-stats
Browse files Browse the repository at this point in the history
Added Stats
  • Loading branch information
agenovese authored Jan 10, 2018
2 parents 3781129 + 3579710 commit 7a30a5d
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 23 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,20 @@ A lightweight Scala wrapper around Kinesis Producer Library (KPL).
The main benefit of this library is working with Scala-native Futures when
interacting with KPL.


## Amazon Licensing Restrictions
**KPL license is not compatible with open source licenses!** See
[this discussion](https://issues.apache.org/jira/browse/LEGAL-198) for more details.

As such, the licensing terms of this library is Apache 2 license **PLUS** whatever restrictions
are imposed by the KPL license.


## No Message Ordering
Kinesis producer library **does not provide message ordering guarantees** at a reasonable throughput,
see [this ticket](https://github.com/awslabs/amazon-kinesis-producer/issues/23) for more details.


## Integration Tests
This library is tested as part of [kcl-akka-stream](https://github.com/StreetContxt/kcl-akka-stream)
integration tests.
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
organization in ThisBuild := "com.contxt"
scalaVersion in ThisBuild := "2.11.8"
version in ThisBuild := "1.0.0-SNAPSHOT"

val slf4j = "org.slf4j" % "slf4j-api" % "1.7.21"
val amazonKinesisProducer = "com.amazonaws" % "amazon-kinesis-producer" % "0.12.8"
val typesafeConfig = "com.typesafe" % "config" % "1.3.1"

libraryDependencies ++= Seq(
slf4j,
amazonKinesisProducer
amazonKinesisProducer,
typesafeConfig
)
5 changes: 5 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
com.contxt.kinesis {
producer {
stats-class-name = "com.contxt.kinesis.NoopProducerStats"
}
}
33 changes: 33 additions & 0 deletions src/main/scala/com/contxt/kinesis/ProducerStats.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.contxt.kinesis

import com.amazonaws.services.kinesis.producer.UserRecordResult
import com.typesafe.config.{ Config, ConfigFactory }
import org.slf4j.LoggerFactory
import scala.concurrent.Future
import scala.util.control.NonFatal

trait ProducerStats {
def trackSend(streamId: StreamId, size: Int)(closure: => Future[UserRecordResult]): Future[UserRecordResult]
def reportShutdown(streamId: StreamId): Unit
}

object ProducerStats {
private val log = LoggerFactory.getLogger(classOf[ProducerStats])

def getInstance(config: Config): ProducerStats = {
try {
val className = config.getString("com.contxt.kinesis.producer.stats-class-name")
Class.forName(className).newInstance().asInstanceOf[ProducerStats]
}
catch {
case NonFatal(e) =>
log.error("Could not load a `ProducerStats` instance, falling back to `NoopProducerStats`.", e)
new NoopProducerStats
}
}
}

class NoopProducerStats extends ProducerStats {
def trackSend(streamId: StreamId, size: Int)(closure: => Future[UserRecordResult]): Future[UserRecordResult] = closure
def reportShutdown(streamId: StreamId): Unit = {}
}
71 changes: 49 additions & 22 deletions src/main/scala/com/contxt/kinesis/ScalaKinesisProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,68 +2,95 @@ package com.contxt.kinesis

import com.amazonaws.services.kinesis.producer.{ KinesisProducer, KinesisProducerConfiguration, UserRecordResult }
import com.google.common.util.concurrent.ListenableFuture
import com.typesafe.config.{ Config, ConfigFactory }
import java.nio.ByteBuffer
import scala.concurrent._
import scala.language.implicitConversions
import scala.util.Try
import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext.Implicits.global

/** A lightweight Scala wrapper around Kinesis Producer Library (KPL). */
trait ScalaKinesisProducer {

def streamId: StreamId

/** Sends a record to a stream. See
* [[[com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(String, String, String, ByteBuffer):ListenableFuture[UserRecordResult]*]]].
*/
def send(partitionKey: String, data: ByteBuffer, explicitHashKey: Option[String] = None): Future[UserRecordResult]

/** Flushes all the outgoing messages, returning a Future that completes when all the flushed messages have been sent.
* See [[com.amazonaws.services.kinesis.producer.KinesisProducer.flushSync]].
*/
def flushAll(): Future[Unit]

/** Performs an orderly shutdown, waiting for all the outgoing messages before destroying the underlying producer. */
def shutdown(): Future[Unit]
}

object ScalaKinesisProducer {
def apply(streamName: String, producerConfig: KinesisProducerConfiguration): ScalaKinesisProducer = {
val producer = new KinesisProducer(producerConfig)
new ScalaKinesisProducerImpl(streamName, producer)
def apply(
streamName: String,
kplConfig: KinesisProducerConfiguration,
config: Config = ConfigFactory.load()
): ScalaKinesisProducer = {
val producerStats = ProducerStats.getInstance(config)
ScalaKinesisProducer(streamName, kplConfig, producerStats)
}

def apply(
streamName: String,
kplConfig: KinesisProducerConfiguration,
producerStats: ProducerStats
): ScalaKinesisProducer = {
val streamId = StreamId(kplConfig.getRegion, streamName)
val producer = new KinesisProducer(kplConfig)
new ScalaKinesisProducerImpl(streamId, producer, producerStats)
}

private[kinesis] implicit def listenableToScalaFuture[A](listenable: ListenableFuture[A]): Future[A] = {
implicit val executionContext: ExecutionContextExecutor = scala.concurrent.ExecutionContext.global
val promise = Promise[A]
val callback = new Runnable {
override def run(): Unit = promise.tryComplete(Try(listenable.get()))
}
listenable.addListener(callback, executionContext)
listenable.addListener(callback, global)
promise.future
}
}

private[kinesis] class ScalaKinesisProducerImpl(
val streamName: String,
private val producer: KinesisProducer
val streamId: StreamId,
private val producer: KinesisProducer,
private val stats: ProducerStats
) extends ScalaKinesisProducer {
import ScalaKinesisProducer.listenableToScalaFuture

def send(partitionKey: String, data: ByteBuffer, explicitHashKey: Option[String]): Future[UserRecordResult] = {
producer.addUserRecord(streamName, partitionKey, explicitHashKey.orNull, data)
}

def flushAll(): Future[Unit] = {
import scala.concurrent.ExecutionContext.Implicits.global
Future {
blocking {
producer.flushSync()
stats.trackSend(streamId, data.remaining) {
producer.addUserRecord(streamId.streamName, partitionKey, explicitHashKey.orNull, data).map { result =>
if (!result.isSuccessful) throwSendFailedException(result) else result
}
}
}

def shutdown(): Future[Unit] = {
import scala.concurrent.ExecutionContext.Implicits.global
val allFlushedFuture = flushAll()
allFlushedFuture.onComplete(_ => producer.destroy())
allFlushedFuture.onComplete { _ =>
producer.destroy()
stats.reportShutdown(streamId)
}
allFlushedFuture
}

private def throwSendFailedException(result: UserRecordResult): Nothing = {
val attemptCount = result.getAttempts.size
val errorMessage = result.getAttempts.lastOption.map(_.getErrorMessage)
throw new RuntimeException(
s"Sending a record to $streamId failed after $attemptCount attempts, last error message: $errorMessage."
)
}

private def flushAll(): Future[Unit] = {
Future {
blocking {
producer.flushSync()
}
}
}
}
9 changes: 9 additions & 0 deletions src/main/scala/com/contxt/kinesis/StreamId.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.contxt.kinesis

case class StreamId(
/** AWS region name. */
regionName: String,

/** Stream name. */
streamName: String
)

0 comments on commit 7a30a5d

Please sign in to comment.