With this plugin, you can easily publish or subscribe to Kafka streaming services. It supports serialization and deserialization of the following data types:
- DolphinDB scalar types
- Built-in types of Kafka Java API: String(UTF-8), Short, Integer, Long, Float, Double, Bytes, byte[] and ByteBuffer
- Vector of types above
Dowload and unzip the folder to the root directory, and execute the following script on Linux:
export LD_LIBRARY_PATH="LD_LIBRARY_PATH:/path/to/bin/linux"
Start DolphinDB server on Linux. Run the following script in DolphinDB to load the plugin:
loadPlugin("/path/to/PluginKafka.txt")
Install CMake. For Ubuntu users (repalce apt
with yum
if you use Centos):
sudo apt install cmake
The project depends on 'cppkafka', which depends on 'boost' and 'librdkafka'. Download with the following script:
# The ubuntu which is a low version such as 14.04 will not
# find rdkafka, and you need to compile the librdkafka manully.
# The address is https://github.com/edenhill/librdkafka
# For ubuntu install
sudo apt install librdkafka-dev
sudo apt install libboost-dev
sudo apt install libssl-dev
# For Centos install
sudo yum install librdkafka-devel
sudo yum install boost-devel
sudo yum install openssl-devel
cd /path/to/the/main/project/
git submodule update --init --recursive
If it is too slow to download submodule, you can download it with the cppkafka.git from the hidden file .gitModules.
git clone https://github.com/mfontanini/cppkafka.git
copy the libDolphinDB.so to bin/linux64 or /lib
cp /path/to/dolphindb/server/libDolphinDB.so /path/to/kafka/bin/linux64
Build the project:
cd /path/to/DolphinDBPlugin/kafka
cd cppkafka
mkdir build
cd build
cmake ..
make
sudo make install
cd ../..
mkdir build
# copy the libDolphinDB.so to ./build
cp /path/to/dolphindb/server/libDolphinDB.so ./build
cd build
cmake ..
make
copy the .so and .txt to bin/linux64
cp /path/to/libPluginKafka.so /path/to/kafka/bin/linux64
cp /path/to/PluginKafka.txt /path/to/kafka/bin/linux64
Before loading the Kafka plugin, please download it and start Zookeeper and Kafka server. Please refer to this link for details.
Run the following script in DolphinDB to load the plugin (the directory need to be replaced with the path of luginKafka.txt):
loadPlugin("/path/to/PluginKafka.txt")
kafka::producer(config);
'config' is a dictionary indicating the Kafka producer configuration, whose key is a string and value is a string or a boolean. Please refer to CONFIGURATION for more about Kafka configuration.
Create a Kafka producer with specified configurations, and return the handler.
kafka::produce(producer, topic, key, value, json, [partition] );
- 'producer' is a Kafka producer handler.
- 'topic' is a string indicating the Kafka topic.
- 'key' indicates a Kafka key.
- 'value' indicates a Kafka value.
- 'json' indicates whether to transfer the data in json format or not.
- 'partition' is an optioanl parameter. It is an integer indicating the Kafka broker partition number.
Produce key-value data in a specified partition.
Note:
Please don't send too much messages at once, otherwise an exception Local: Queue full
might be thrown.
kafka::producerFlush(producer);
- 'producer' is a Kafka producer handler.
Flush all the messages of the producer.
kafka::getProducerTime(producer)
- 'producer' is a Kafka producer handler.
kafka::setProducerTime(producer, timeout)
- 'producer' is a Kafka producer handler.
- 'timeout' is the maximum amount of time you will wait for the response of a request.
kafka::consumer(config)
- 'config' is a dictionary indicating the Kafka consumer configuration, whose key is a string and value is an anyVector. Please refer to CONFIGURATION for more about Kafka configuration.
The following example is for users in SASL protocol:
consumerCfg = dict(string, any);
consumerCfg["metadata.broker.list"] = "localhost";
consumerCfg["group.id"] = "test";
consumerCfg["sasl.mechanisms"] = "PLAIN";
consumerCfg["security.protocol"] = "sasl_plaintext";
consumerCfg["sasl.username"] = "admin";
consumerCfg["sasl.password"] = "admin";
consumer = kafka::consumer(consumerCfg);
topics=["test"];
kafka::subscribe(consumer, topics);
kafka::consumerPoll(consumer);
Create a Kafka consumer and return the handler.
kafka::subscribe(consumer, topics)
- 'consumer' is a Kafka consumer handler.
- 'topics' is a STRING vector indicating the topics to subscribe to.
Subscribe a Kafka topic.
kafka::unsubscribe(consumer)
- 'consumer' is a Kafka consumer handler.
Unsubscribe all topics.
kafka::consumerPoll(consumer, [timeout])
- 'consumer' is a Kafka consumer handler.
- 'timeout' is the maximum amount of time to wait for a polling.
Save the subscribed data to DolphinDB, and return a tuple. The first element is a string indicating the error message. The second element is a tuple including the following elements: topic, partition, key, value and the timestamp when the consumer received the data.
kafka::consumerPoll
will block the current thread and the poll default timeout is 1000 millisecond.
It is recommended to use function consumerPollBatch
to submit multiple kafka::consumerPoll
tasks.
kafka::consumerPollBatch(consumer, batch_size, [time_out])
- 'consumer' is a Kafka consumer handler.
- 'batch_size' is the number of messages you want to get.
- 'timeout' indicates the maximum amount of time to get messages.
kafka::createSubJob(consumer, table, parser, description, [timeout])
- 'consumer' is a Kafka consumer handler.
- 'table' is a table to store the messages.
- 'parser' is a function to deal with the input data, and it returns a table. You can use mseed::parser or the user-defined function.
- 'description' is a string to describe the thread.
- 'timeout' indicates the maximum amount of time to get each message.
The parser need a string for input and return a table, and you can use mseed::parser for example or you can define a function by yourself.
Note:
If a task you created subscribes to a partition that has been subscribed by another task, messages wil be split and distributed to all tasks that subscribe to this partition. Therefore, the subscribed messages obtained from these tasks may be incomplete.
kafka::getJobStat()
- None.
kafka::cancelSubJob(connection)
- 'connection' is the result of the
kafka::createSubJob
, or the subscription id you get from functiongetJobStat()
, which can be LONG, INT, and STRING data type.
kafka::pollDict(consumer, batch_size, [timeout])
- 'consumer' is a Kafka consumer handler.
- 'batch_size' is the number of messages you want to get.
- 'timeout' indicates the maximum amount of time to get messages.
Save the subscribed data to DolphinDB. It returns a DolphinDB dictionary containing messages in the form of key-value pair.
kafka::commit(consumer)
- 'consumer' is a Kafka consumer handler.
Commit the offset of the last processed message to producer synchronously. If there is no lastest offset, an exception will be thrown.
kafka::commitTopic(consumer, topics, partitions, offsets)
- 'consumer' is a Kafka consumer handler.
- 'topics' is a STRING vector indicating the topics to subscribe to.
- 'partitions' is an INT vector indicating the partition corresponding to each topic.
- 'offsets' is an INT vector indicating the offset corresponding to each topic.
kafka::asyncCommit(consumer)
- 'consumer' is a Kafka consumer handler.
Commit the offset of the last processed message to producer asynchronously to _consumer_offset, a topic that is used to store messages.
kafka::asyncCommitTopic(consumer, topics, partitions, offsets)
- 'consumer' is a Kafka consumer handler.
- 'topics' is a STRING vector indicating the topics to subscribe to.
- 'partitions' is an INT vector indicating the partition corresponding to each topic.
- 'offsets' is an INT vector indicating the offset corresponding to each topic.
kafka::getConsumerTime(consumer)
- 'consumer' is a Kafka consumer handler.
kafka::setConsumerTime(consumer, timeout)
- 'consumer' is a Kafka consumer handler.
- 'timeout' is the maximum amount of time to get messages.
kafka::assign(consumer, topics, partitions, offsets)
- 'consumer' is a Kafka consumer handler.
- 'topics' is a STRING vector indicating the topics to subscribe to.
- 'partitions' is an INT vector indicating the partition corresponding to each topic.
- 'offsets' is an INT vector indicating the offset corresponding to each topic.
Unlike kafka::subscribe(consumer, topics)
, this function enables you to assign specific topics, partitions and offsets to the consumer.
kafka::unassign(consumer)
- 'consumer' is a Kafka consumer handler.
Unassign all topics of the consumer.
kafka::getAssignment(consumer)
- 'consumer' is a Kafka consumer handler.
kafka::getOffset(consumer, topic, partition)
- 'consumer' is a Kafka consumer handler.
- 'topic' is a STRING vector indicating the topics to subscribe to.
- 'partition' is an INT vector indicating the partition corresponding to each topic.
Print the offsets of the consumer.
kafka::getOffsetCommitted(consumer, topics, partitions, offsets, [timeout])
- 'consumer' is a Kafka consumer handler.
- 'topics' is a STRING vector indicating the topics to subscribe to.
- 'partitions' is an INT vector indicating the partition corresponding to each topic.
- 'offsets' is an INT vector indicating the offset corresponding to each topic.
- 'timeout' is the maximum amount of time to get messages.
Get the offsets committed for the given topic/partition list.
kafka::getOffsetPosition(consumer, topics, partitions)
- 'consumer' is a Kafka consumer handler.
- 'topics' is a STRING vector indicating the topics to subscribe to.
- 'partitions' is an INT vector indicating the partition corresponding to each topic.
Get the offset positions for the given topic/partition list.
kafka::storeConsumedOffset(consumer)
- 'consumer' is a Kafka consumer handler.
Store the offsets on the currently assigned topic/partitions (legacy).
Please set enable.auto.offset.store=false, enable.auto.commit=true for consumer, otherwise an error will be reported.
storeOffset(consumer, topics, partitions, offsets)
- 'consumer' is a Kafka consumer handler.
- 'topics' is a STRING vector indicating the topics to subscribe to.
- 'partitions' is an INT vector indicating the partition corresponding to each topic.
- 'offsets' is an INT vector indicating the offset corresponding to each topic.
Store the offsets on the given topic/partitions (legacy).
Please set enable.auto.offset.store=false, enable.auto.commit=true for consumer, otherwise an error will be reported.
kafka::getMemId(consumer)
- 'consumer' is a Kafka consumer handler.
Get the group member ID.
kafka::getMainQueue(consumer)
- 'consumer' is a Kafka consumer handler.
Get the global event queue corresponding to the consumer.
kafka::getConsumerQueue(consumer)
- 'consumer' is a Kafka consumer handler.
Get the consumer group queue.
kafka::getPartitionQueue(consumer, topic, partition)
- 'consumer' is a Kafka consumer handler.
- 'topic' is a STRING vector indicating the topics to subscribe to.
- 'partition' is an INT vector indicating the partition corresponding to each topic.
Get the queue of this partition. If the consumer is not assigned to this partition, an empty queue will be returned.
kafka::queueLength(queue)
- 'queue' is a Kafka queue handler.
Returns the length of the queue
Note:
It is not recommended to use this function if it is deployed on the ARM architecture servers. The result may not be as expected.
kafka::forToQueue(queue, forward_queue)
- 'queue' is a Kafka queue handler.
- 'forward_queue' is a Kafka queue handler, indicating the target that the queue forward to.
kafka::disforToQueue(queue)
- 'queue' is a Kafka queue handler.
Stop forwarding to another queue.
kafka::setQueueTime(queue, timeout)
- 'queue' is a Kafka queue handler.
- 'timeout' is the maximum amount of time for the queue.
kafka::getQueueTime(queue)
- 'queue' is a Kafka queue handler.
Get the configured timeout.
kafka::queuePoll(queue, [timeout])
- 'queue' is a Kafka queue handler.
- 'timeout' is the maximum amount of time for the queue.
kafka::queuePollBatch(queue, batch_size, [timeout])
- 'queue' is a Kafka queue handler.
- 'batch_size' is the number of messages you want to get.
- 'timeout' is the maximum amount of time for the queue.
kafka::queueEvent(queue)
- 'queue' is a Kafka queue handler.
Extract the next event in this queue.
Note:
Before deleting a consumer, please ensure that the events generated by it is still in its lifecycle (specify event=NULL to release resources), otherwise the program may collapse.
kafka::getEventName(event)
- 'event' is a Kafka event handler.
Return the name of the event.
kafka::eventGetMessage(event)
- 'event' is a Kafka event handler.
Get all messages in this event (if any).
kafka::getEventMessageCount(event)
- 'event' is a Kafka event handler.
kafka::eventGetError(event)
- 'event' is a Kafka event handler.
Return error messages in this event.
kafka::eventGetPart(event)
- 'event' is a Kafka event handler.
kafka::eventGetParts(event)
- 'event' is a Kafka event handler.
kafka::eventBool(event)
- 'event' is a Kafka event handler.
kafka::getBufferSize()
kafka::setBufferSize(size)
- 'size' is the capacity of the buffer size you want to set, which is no larger than the buffer size of the broker, and the default value is 900k.
kafka::getMessageSize()
kafka::setMessageSize(size)
- 'size' is the capacity of the message size you want to set. The
message_size
is no larger than thebuffer_size
, and the default value is 10k.
#create producer
producerCfg = dict(STRING, ANY);
producerCfg["metadata.broker.list"] = "localhost";
producer = kafka::producer(producerCfg);
#create consumer
consumerCfg = dict(string, any);
consumerCfg["metadata.broker.list"] = "localhost";
consumerCfg["group.id"] = "test";
consumer = kafka::consumer(consumerCfg);
#subscribe
topics=["test"];
kafka::subscribe(consumer, topics);
kafka::consumerPoll(consumer);
#produce and consume English string
kafka::produce(producer, "test", "1", "producer1:i'm producer",false,false);
kafka::consumerPoll(consumer);
#produce and consume Chinese string
kafka::produce(producer, "test", "2", "I am a producer",false,false);
kafka::consumerPoll(consumer);
#produce and consume integer
kafka::produce(producer, "test", "3", 10086,false,false);
kafka::consumerPoll(consumer);
#produce and consume float
kafka::produce(producer, "test", "4", 123.456,false,false);
kafka::consumerPoll(consumer);
#produce and consume integer vector
message=[1,2,3,4];
kafka::produce(producer, "test", 1, message,false,false);
kafka::consumerPoll(consumer);
#produce and consume float vector
message=[1.1,2.2,3.3,4.4];
kafka::produce(producer, "test", 1, message,false,false);
kafka::consumerPoll(consumer);
#produce and consume Chinese string vector
message=["I","I am","I am a","I am a producer","I am a producer"];
kafka::produce(producer, "test", 1, message,false,false);
kafka::consumerPoll(consumer);
#produce and consume table
tab=table(1 2 3 as a, `x`y`z as b, 10.8 7.6 3.5 as c, "I" "I am" "I am a" as d);
kafka::produce(producer, "test", 1, tab,false,false);
kafka::consumerPoll(consumer);
#produce and consume two messages
kafka::produce(producer, "test", 1, "producer1:i'm producer",false,false);
kafka::produce(producer, "test", 1, "I am a producer",false,false);
kafka::consumerPollBatch(consumer,2);
#assign specific partition and offset
topics = ["test"];
partitions = [0];
offsets = [0];
kafka::unassign(consumer);
kafka::assign(consumer,topics,partitions,offsets);
#produce and consumer messages
kafka::produce(producer, "test", "1", "producer1:i'm producer",false,0);
kafka::produce(producer, "test", "2", "I am a producer",false,0);
kafka::produce(producer, "test", "3", 10086,false,0);
kafka::produce(producer, "test", "4", 123.456,false,0);
kafka::consumerPoll(consumer);
kafka::consumerPoll(consumer);
kafka::consumerPoll(consumer);
kafka::consumerPoll(consumer);
#Get the size of specific partitions
kafka::getOffsetCommitted(consumer,topics,partitions,offsets);
kafka::getAssignment(consumer);
kafka::getOffset(consumer,"test",2);
#Get the size of the current offset
kafka::getOffsetPosition(consumer,topics,partitions);
#deal with queue
queue=kafka::getConsumerQueue(consumer);
kafka::queueLength(queue);
kafka::queuePoll(queue);
#deal with event
event=kafka::queueEvent(queue);
kafka::getEventName(event);
kafka::eventGetMessage(event);
kafka::getEventMessageCount(event);
kafka::eventGetPart(event);
kafka::eventGetError(event);
kafka::eventBool(event);
#get a dictionary
kafka::produce(producer, "test", "1", "producer1:i'm producer",false,false,0);
kafka::produce(producer, "test", "2", "I am a producer",false,false,0);
kafka::produce(producer, "test", "3", 10086,false,false,0);
kafka::produce(producer, "test", "4", 123.456,false,false,0);
kafka::pollDict(consumer,4);
#get messages in json format
tab=table(1 2 3 as a, `x`y`z as b, 10.8 7.6 3.5 as c, "I" "I am" "I am a" as d);
dict={"1":1,"2":2,"3":3};
message=[1.1,2.2,3.3,4.4];
vec=[1,message,tab,];
kafka::produce(producer, "test", "1", tab,true,false,0);
kafka::consumerPoll(consumer);
kafka::produce(producer, "test", "1", dict,true,false,0);
kafka::consumerPoll(consumer);
kafka::produce(producer, "test", "1", message,true,false,0);
kafka::consumerPoll(consumer);
kafka::produce(producer, "test", "1", vec,true,false,0);
kafka::consumerPoll(consumer);
#change the buffer_size and message_size
kafka::getBufferSize();
kafka::getMessageSize();
kafka::setBufferSize(100);
kafka::setMessageSize(20);
a=[];
for(i in 0:120){a.append!(i%10)};
kafka::produce(producer,"test","1",a,false,false,0);
kafka::consumerPoll(consumer);
kafka::produce(producer,"test","1",tab,false,false,0);
kafka::consumerPoll(consumer);
#mult-thread
#the multithreading function need a parser, you can install mseed as an example
loadPlugin("/path/to/PluginKafka.txt");
loadPlugin("/path/to/PluginMseed.txt")
consumerCfg = dict(string, any);
consumerCfg["metadata.broker.list"] = "115.239.209.234";
consumerCfg["group.id"] = "test";
consumer = kafka::consumer(consumerCfg);
topics=["test"];
kafka::subscribe(consumer, topics);
tab = table(40000000:0,`id`time`value,[SYMBOL,TIMESTAMP,INT])
conn = kafka::createSubJob(consumer,tab,mseed::parse,"test:0:get mseed data");
kafka::getJobStat();
kafka::cancelSubJob(conn);