DirectKafkaInputDStream
is an input dstream of KafkaRDD batches.
DirectKafkaInputDStream
is also a CanCommitOffsets
object.
As an input dstream, DirectKafkaInputDStream
implements the mandatory abstract Methods (from DStream Contract and InputDStream Contract):
-
dependencies
returns an empty collection, i.e. it has no dependencies on other streams (other than Kafka brokers to read data from). -
slideDuration
passes all calls on to DStreamGraph.batchDuration. -
compute to create a
KafkaRDD
per batch. -
start to start polling for messages from Kafka.
-
stop to close the Kafka consumer (and therefore polling for messages from Kafka).
The name
of a DirectKafkaInputDStream
is Kafka 0.10 direct stream [id] (that you can use to differentiate between the different implementations for Kafka 0.10+ and older releases).
Tip
|
You can find the name of a input dstream in the Streaming tab in web UI (in the details of a batch in Input Metadata section). |
It uses spark.streaming.kafka.maxRetries setting while computing latestLeaderOffsets
(i.e. a mapping of kafka.common.TopicAndPartition
and LeaderOffset).
Tip
|
Enable Add the following line to
Refer to Logging. |
You can create a DirectKafkaInputDStream
instance using KafkaUtils.createDirectStream factory method.
import org.apache.spark.streaming.kafka010.KafkaUtils
// WARN: Incomplete to show only relevant parts
val dstream = KafkaUtils.createDirectStream[String, String](
ssc = streamingContext,
locationStrategy = hosts,
consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets))
Internally, when a DirectKafkaInputDStream
instance is created, it initializes the internal executorKafkaParams using the input consumerStrategy
's executorKafkaParams.
Tip
|
Use ConsumerStrategy for a Kafka Consumer configuration. |
With WARN logging level enabled for the KafkaUtils logger, you may see the following WARN messages and one ERROR in the logs (the number of messages depends on how correct the Kafka Consumer configuration is):
WARN KafkaUtils: overriding enable.auto.commit to false for executor
WARN KafkaUtils: overriding auto.offset.reset to none for executor
ERROR KafkaUtils: group.id is null, you should probably set it
WARN KafkaUtils: overriding executor group.id to spark-executor-null
WARN KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135
Tip
|
You should always set |
It initializes the internal currentOffsets property.
It creates an instance of DirectKafkaInputDStreamCheckpointData
as checkpointData
.
It sets up rateController
as DirectKafkaRateController
when backpressure is enabled.
It sets up maxRateLimitPerPartition
as spark.streaming.kafka.maxRatePerPartition.
It initializes commitQueue and commitCallback properties.
currentOffsets: Map[TopicPartition, Long]
currentOffsets
holds the latest (highest) available offsets for all the topic partitions the dstream is subscribed to (as set by latestOffsets and compute).
currentOffsets
is initialized when DirectKafkaInputDStream
is created afresh (it could also be re-created from a checkpoint).
The ConsumerStrategy (that was used to initialize DirectKafkaInputDStream
) uses it to create a Kafka Consumer.
It is then set to the available offsets when DirectKafkaInputDStream
is started.
commitCallback: AtomicReference[OffsetCommitCallback]
commitCallback
is initialized when DirectKafkaInputDStream
is created. It is set to a OffsetCommitCallback
that is the input parameter of commitAsync
when it is called (as part of the CanCommitOffsets
contract that DirectKafkaInputDStream
implements).
commitQueue: ConcurrentLinkedQueue[OffsetRange]
commitQueue
is initialized when DirectKafkaInputDStream
is created. It is used in commitAsync
(that is part of the CanCommitOffsets
contract that DirectKafkaInputDStream
implements) to queue up offsets for commit to Kafka at a future time (i.e. when the internal commitAll is called).
Tip
|
Read java.util.concurrent.ConcurrentLinkedQueue javadoc. |
executorKafkaParams: HashMap[String, Object]
executorKafkaParams
is a collection of …FIXME
When DirectKafkaInputDStream
is created, it initializes executorKafkaParams
with executorKafkaParams
of the given ConsumerStrategy
(that was used to create the DirectKafkaInputDStream
instance).
executorKafkaParams
is then reviewed and corrected where needed.
Note
|
executorKafkaParams is used when computing a KafkaRDD for a batch and restoring KafkaRDD s from checkpoint.
|
start(): Unit
start
creates a Kafka consumer and fetches available records in the subscribed list of topics and partitions (using Kafka’s Consumer.poll with 0
timeout that says to return immediately with any records that are available currently).
Note
|
start is part of the InputDStream Contract.
|
After the polling, start
checks if the internal currentOffsets is empty, and if it is, it requests Kafka for topic (using Kafka’s Consumer.assignment) and builds a map with topics and their offsets (using Kafka’s Consumer.position).
Ultimately, start
pauses all partitions (using Kafka’s Consumer.pause with the internal collection of topics and their current offsets).
compute(validTime: Time): Option[KafkaRDD[K, V]]
Note
|
compute is a part of the DStream Contract.
|
compute
always computes a KafkaRDD (despite the return type that allows for no RDDs and irrespective the number of records inside). It is left to a KafkaRDD
itself to decide what to do when no Kafka records exist in topic partitions to process for a given batch.
When compute
is called, it calls latestOffsets and clamp. The result topic partition offsets are then mapped to OffsetRanges with a topic, a partition, and current offset for the given partition and the result offset. That in turn is used to create KafkaRDD (with the current SparkContext, executorKafkaParams, the OffsetRange
s, preferred hosts, and useConsumerCache
enabled).
Caution
|
FIXME We all would appreciate if Jacek made the above less technical. |
Caution
|
FIXME What’s useConsumerCache ?
|
With that, compute
informs InputInfoTracker
about the state of an input stream (as StreamInputInfo with metadata with offsets and a human-friendly description).
In the end, compute
sets the just-calculated offsets as current offsets, asynchronously commits all queued offsets (from commitQueue) and returns the newly-created KafkaRDD
.
commitAll(): Unit
commitAll
commits all queued OffsetRanges in commitQueue (using Kafka’s Consumer.commitAsync).
Note
|
commitAll is used for every batch interval (when compute is called to generate a KafkaRDD ).
|
Internally, commitAll
walks through OffsetRange
s in commitQueue and calculates the offsets for every topic partition. It uses them to create a collection of Kafka’s TopicPartition and OffsetAndMetadata pairs for Kafka’s Consumer.commitAsync using the internal Kafka consumer reference.
clamp(offsets: Map[TopicPartition, Long]): Map[TopicPartition, Long]
clamp
calls maxMessagesPerPartition on the input offsets
collection (of topic partitions with their offsets)…
Caution
|
FIXME |
consumer(): Consumer[K, V]
consumer
creates a Kafka Consumer
with keys of type K
and values of type V
(specified when the DirectKafkaInputDStream
is created).
consumer
starts the ConsumerStrategy (that was used when the DirectKafkaInputDStream
was created). It passes the internal collection of TopicPartition
s and their offsets.
Caution
|
FIXME A note with What ConsumerStrategy is for?
|
getPreferredHosts: java.util.Map[TopicPartition, String]
getPreferredHosts
calculates preferred hosts per topic partition (that are later used to map KafkaRDD partitions to host leaders of topic partitions that Spark executors read records from).
getPreferredHosts
relies exclusively on the LocationStrategy that was passed in when creating a DirectKafkaInputDStream
instance.
Location Strategy | DirectKafkaInputDStream.getPreferredHosts |
---|---|
|
|
|
No host preference. Returns an empty collection of preferred hosts per topic partition. It does not call Kafka broker(s) for topic assignments. |
|
Returns the preferred hosts that were passed in when It does not call Kafka broker(s) for topic assignments. |
Note
|
getPreferredHosts is used when creating a KafkaRDD for a batch interval.
|
getBrokers: ju.Map[TopicPartition, String]
getBrokers
uses the internal Kafka Consumer instance to request Kafka broker(s) for partition assignments, i.e. the leader host per topic partition.
Note
|
getBrokers uses Kafka’s Consumer.assignment().
|
stop(): Unit
stop
closes the internal Kafka consumer.
Note
|
stop is a part of the InputDStream Contract.
|
latestOffsets(): Map[TopicPartition, Long]
latestOffsets
uses the internal Kafka consumer to poll for the latest topic partition offsets, including partitions that have been added recently.
latestOffsets
calculates the topic partitions that are new (comparing to current offsets) and adds them to currentOffsets
.
Note
|
latestOffsets uses poll(0) , assignment , position (twice for every TopicPartition ), pause , seekToEnd method calls. They seem quite performance-heavy. Are they?
|
The new partitions are pause
d and the current offsets seekToEnd
ed.
Caution
|
FIXME Why are new partitions paused? Make the description more user-friendly. |
Note
|
latestOffsets is used when computing a KafkaRDD for batch intervals.
|
Caution
|
FIXME |
Back pressure for Direct Kafka input dstream can be configured using spark.streaming.backpressure.enabled setting.
Note
|
Back pressure is disabled by default. |