Skip to content

confluentinc/kafka-rest-node

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

60 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

kafka-rest

kafka-rest is a node.js library for the Kafka REST Proxy. It provides a thin wrapper around the REST API, providing a more convenient interface for accessing cluster metadata and producing and consuming Avro and binary data.

NOTE: This library was written to demonstrate how to create a language-specific wrapper around the REST Proxy. It is no longer maintained or kept up to date (e.g. the REST Proxy has a v2 API that is now recommended, but support has not been added to this wrapper library). Since this library was created, better JavaScript clients have been developed that do not require the REST Proxy. In particular, https://github.com/Blizzard/node-rdkafka, which wraps the librdkafka C client is a good choice and actively maintained.

Getting Started

You can install the kafka-rest library via npm:

npm install kafka-rest

The only required dependency is async. However, some additional libraries are required to run the examples and are included as devDependencies.

Next, make sure you have Kafka, the Schema Registry (if using Avro), and the Kafka REST Proxy running. The Confluent Quickstart and REST Proxy documentation explains how to do this step-by-step.

API

Start by requiring the library and creating a KafkaRest object to interact with the server:

var KafkaRest = require('kafka-rest');
var kafka = new KafkaRest({ 'url': 'http://localhost:8082' });

Metadata

The API mirrors the REST API closely: there are resources for Brokers, Topics, Partitions, and Consumers, and these are available in the KafkaRest object. For example, to get a list of brokers (we'll skip error checking to keep things simple):

// kafka.brokers is a Brokers instance, list() returns a list of Broker instances
kafka.brokers.list(function(err, brokers) {
    for(var i = 0; i < brokers.length; i++)
        console.log(brokers[i].toString());
});

Objects generated from API responses will have a field raw where you can get at the raw response data. For the brokers, brokers[i].raw would just be a number because the broker list API only returns broker IDs.

All types have both collection (Topics) and singular (Topic) types corresponding to the /topics and /topics/<topic> API endpoints. The collection types primary method is list() which returns a list of singular types which are initialize only with the data returned by the API call to get the list of objects. Singular objects can be constructed directly, or there is a shorthand via the collection type:

kafka.topics.get('test', function(err, topic) {
    // topic is a Topic object
});

And you can create an incomplete singular object (i.e. no API call is made, only its identifying properties are stored):

kafka.topics.topic('test');

This can be useful when you want to get directly at a nested resource you know the path to and don't want to do API calls on the intermediate resources (i.e. get a single partition, but don't look up topic metadata):

// The complete version makes the resource hierarchy clear
kafka.topics.topic('test').partitions.partition(0);
// Or use the shorter version. Under the hood they do the same thing, so you
// can use whichever is clearer based on context
kafka.topic('test').partition(0);
// For partitions there's also:
kafka.topicPartition('test', 0)

If you have an incomplete singular object returned by a list() call, you can fill request its data using get().

Producing

You can produce messages by calling produce() on Topic or Partition objects. They can be incomplete instances, e.g.:

kafka.topic('test').produce('message')

With this one method you can:

  • Specify values by passing them in directly, or records with any combination of key, value, and partition fields. (Only Topic objects support the partition field).
  • Send a batch of messages by passing them as separate arguments or as an array. Messages may be of mixed form, i.e. some may be raw values, others may be records with key and value fields.
  • Send raw binary data (null, string, or Buffer objects) or Avro data (in JSON form) with a schema. For Avro data, you can pass up to two AvroSchema objects. If one is included, it is used as the value schema; if two are include the first is the key schema, the second is the value schema.
  • Add a callback of the form function(err, res) anywhere in the argument list to be notified when the request completes. res is the JSON response returned by the proxy.

Here are a few examples showing these features:

// With binary data:

// Single message with a string value
topic.produce('msg1');

// Single message with key, value, and partition, with callback
topic.produce({'key': 'key1', 'value': 'msg1', 'partition': 0}, function(err,res){});

// Any record fields can be omitted
topic.produce({'partition': 0, 'value': 'msg1'});

// Multiple messages containing only values
topic.produce('msg1', 'msg2', 'msg3');

// Multiple messages containing only values, passed as array
topic.produce(['msg1', 'msg2', 'msg3']);

// Multiple messages, mixed format
topic.produce('msg1', {'partition': 0, 'value': 'msg2'});


// With Avro data:

var userIdSchema = new KafkaRest.AvroSchema("int");
var userInfoSchema = new KafkaRest.AvroSchema({
    "name": "UserInfo",
    "type": "record",
    "fields": [
        { "name": "id", "type": "int" },
        { "name": "name", "type": "string" }]
});

// Avro value schema followed by messages containing only values
topic.produce(userInfoSchema, {'avro': 'record'}, {'avro': 'another record'});

// Avro key and value schema.
topic.produce(userIdSchema, userInfoSchema, {'key': 1, 'value': {'id': 1, 'name': 'Bob'}});

To avoid sending schemas with every request, the REST API supports schema IDs. Use of schema IDs is handled transparently for you -- as long as you use the same AvroSchema object across calls to produce(), the IDs will be used instead of the full schemas.

Note that because this API is a thin wrapper around the REST Proxy, you must batch your messages to improve performance. The twitter/stream_tweets.js example performs this type of batching.

Consumers

Currently the REST proxy supports the high-level consumer interface using consumer groups. To start consuming data, join a consumer group, optionally specifying some configuration options (passed directly to the API call):

kafka.consumer("my-consumer-group").join({
    "format": "avro",
    "auto.commit.enable": "true",
}, function(err, consumer_instance) {
    // consumer_instance is a ConsumerInstance object
});

The group doesn't have to exist yet -- if you use a new consumer group name, it will be created. You can then subscribe to a topic, resulting in a ConsumerStream, and setup event handlers:

var stream = consumer_instance.subscribe('my-topic')
stream.on('data', function(msgs) {
    for(var i = 0; i < msgs.length; i++)
        console.log("Got a message: key=" + msgs[i].key + " value=" + msgs[i].value + " partition=" + msgs[i].partition);
});
stream.on('error', function(err) {
    console.log("Something broke: " + err);
});

The exact type for each messages key/value depends on the data format you're reading. Binary data will have been decoded from its base64 representation into a Buffer (or null). For Avro, you'll get an object.

Finally, when you're ready to clean up, request a graceful shutdown of the consumer instance, which also cleans up the stream:

consumer_instance.shutdown(function() {
    console.log("Shutdown complete.");
});

Examples

A few examples are included in the examples/ directory:

  • metadata.js - A simple demo of some of the metadata APIs, covering brokers, topics, and partitions.
  • console_producer.js - Reads from stdin and produces each line as a message to a Kafka topic.
  • console_consumer.js - Consumes a Kafka topic and writes each message to stdout.
  • twitter/stream_tweets.js - Uses Twitter's API to get a realtime feed of tweets which it produces to a Kafka topic.
  • twitter/trending.js - Uses the tweet data produced by the previous example to generate a list of trending hashtags, which it prints every 10 seconds to stdout.

Contribute

License

The project is licensed under the Apache 2 license.