Skip to content

Commit

Permalink
Merge pull request #3 from StreetContxt/tweak-stats
Browse files Browse the repository at this point in the history
Tweaked stats
  • Loading branch information
anikiforovopensource authored Jan 12, 2018
2 parents 7a30a5d + 1508865 commit a0a73be
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 1 deletion.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
organization in ThisBuild := "com.contxt"
scalaVersion in ThisBuild := "2.11.8"
version in ThisBuild := "1.0.0-SNAPSHOT"
version in ThisBuild := "1.0.1-SNAPSHOT"

val slf4j = "org.slf4j" % "slf4j-api" % "1.7.21"
val amazonKinesisProducer = "com.amazonaws" % "amazon-kinesis-producer" % "0.12.8"
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/com/contxt/kinesis/ProducerStats.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import scala.util.control.NonFatal

trait ProducerStats {
def trackSend(streamId: StreamId, size: Int)(closure: => Future[UserRecordResult]): Future[UserRecordResult]
def reportInitialization(streamId: StreamId): Unit
def reportShutdown(streamId: StreamId): Unit
}

Expand All @@ -29,5 +30,6 @@ object ProducerStats {

class NoopProducerStats extends ProducerStats {
def trackSend(streamId: StreamId, size: Int)(closure: => Future[UserRecordResult]): Future[UserRecordResult] = closure
def reportInitialization(streamId: StreamId): Unit = {}
def reportShutdown(streamId: StreamId): Unit = {}
}
2 changes: 2 additions & 0 deletions src/main/scala/com/contxt/kinesis/ScalaKinesisProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ private[kinesis] class ScalaKinesisProducerImpl(
) extends ScalaKinesisProducer {
import ScalaKinesisProducer.listenableToScalaFuture

stats.reportInitialization(streamId)

def send(partitionKey: String, data: ByteBuffer, explicitHashKey: Option[String]): Future[UserRecordResult] = {
stats.trackSend(streamId, data.remaining) {
producer.addUserRecord(streamId.streamName, partitionKey, explicitHashKey.orNull, data).map { result =>
Expand Down

0 comments on commit a0a73be

Please sign in to comment.