Skip to content

Latest commit

 

History

History
78 lines (59 loc) · 2.86 KB

README.md

File metadata and controls

78 lines (59 loc) · 2.86 KB

spark-kafka-writer

Build Status codecov Join the chat at https://gitter.im/BenFradet/spark-kafka-writer Maven Central Stories in Ready

Write your RDDs and DStreams to Kafka seamlessly

Installation

spark-kafka-writer is available on maven central with the following coordinates depending on whether you're using Kafka 0.8 or 0.10 and your version of Spark:

Kafka 0.8 Kafka 0.10
Spark 1.6.X "com.github.benfradet" %% "spark-kafka-writer" % "0.1.0"
Spark 2.0.X "com.github.benfradet" %% "spark-kafka-0-8-writer" % "0.2.0" "com.github.benfradet" %% "spark-kafka-0-10-writer" % "0.2.0"

Usage

  • if you want to save an RDD to Kafka
import java.util.Properties

// replace by kafka08 if you're using Kafka 0.8
import com.github.benfradet.spark.kafka010.writer._
import org.apache.kafka.common.serialization.StringSerializer

val topic = "my-topic"
val producerConfig = {
  val p = new Properties()
  p.setProperty("bootstrap.servers", "127.0.0.1:9092")
  p.setProperty("key.serializer", classOf[StringSerializer].getName)
  p.setProperty("value.serializer", classOf[StringSerializer].getName)
  p
}

val rdd: RDD[String] = ...
rdd.writeToKafka(
  producerConfig,
  s => new ProducerRecord[String, String](topic, s)
)
  • if you want to save a DStream to Kafka
import java.util.Properties

// replace by kafka08 if you're using Kafka 0.8
import com.github.benfradet.spark.kafka010.writer._
import org.apache.kafka.common.serialization.StringSerializer

val topic = "my-topic"
val producerConfig = {
  val p = new Properties()
  p.setProperty("bootstrap.servers", "127.0.0.1:9092")
  p.setProperty("key.serializer", classOf[StringSerializer].getName)
  p.setProperty("value.serializer", classOf[StringSerializer].getName)
  p
}

val dStream: DStream[String] = ...
dStream.writeToKafka(
  producerConfig,
  s => new ProducerRecord[String, String](topic, s)
)

You can find the full scaladoc at https://benfradet.github.io/spark-kafka-writer.

Credit

The original code was written by Hari Shreedharan.