Skip to content

Latest commit

 

History

History
 
 

fix-hbase

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 

FIX HBase Example

The FIX example is an Envelope pipeline that receives FIX financial messages of order fulfillment (new orders and execution reports) and updates an HBase history table. This example shows how historical order information can be maintained for each stock symbol.

The configuration for this example is found here. The messages do not fully conform to the real FIX protocol but should be sufficient to demonstrate how a complete implementation could be developed.

Pre-requisites

  1. Create the required HBase table using the provided HBase shell script:

    hbase shell create_hbase_tables.rb
  2. Create the required Kafka topics:

    kafka-topics --zookeeper YOURZOOKEEPERHOSTNAME:2181 --create --topic fix --partitions 9 --replication-factor 3

Running the example

Run the example in non-secure clusters by submitting the Envelope application with the example’s configuration:

$ export SPARK_KAFKA_VERSION=0.10
$ export EXTRA_CP=/etc/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core.jar
$ spark2-submit \
  --driver-class-path ${EXTRA_CP} \
  --conf spark.executor.extraClassPath=${EXTRA_CP} \
  envelope-*.jar fix_hbase.conf

For secure clusters, we need to supply a keytab and a JAAS file to the Spark job, as well as ensure that Spark obtains an HBase token on startup. For this we need a couple of supporting files. Firstly, a keytab file user.kt containing entries for the user submitting the job. Second, we need a JAAS configuration file jaas.conf for the driver and executors to reference. A sample JAAS file is supplied here.

A method that works on secure clusters in both client and cluster mode is as follows:

$ ln -s user.kt kafka.kt
$ export SPARK_KAFKA_VERSION=0.10
$ export PRINCNAME=REPLACEME
$ HADOOP_CONF_DIR=/etc/hbase/conf:/etc/hive/conf:/etc/spark2/conf:/opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core.jar \
    spark2-submit \
    --keytab user.kt \
    --principal ${PRINCNAME} \
    --files jaas.conf,fix_hbase.conf,kafka.kt \
    --driver-java-options=-Djava.security.auth.login.config=jaas.conf \
    --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf \
    --conf spark.executor.extraClassPath=/etc/hbase/conf \
    envelope-*.jar fix_hbase.conf

(Note: we do the symlink of the user.kt file to avoid the Spark YARN client complaining about uploading the same file twice. By default the file referenced in --keytab is only distributed to the Application Master container so we must also supply the keytab in --files.)

In another terminal session start the data generator, which is itself an Envelope pipeline that writes out one million example orders to Kafka. For unsecure clusters, this can be launched via:

$ export SPARK_KAFKA_VERSION=0.10
$ spark2-submit envelope-*.jar fix_generator.conf

For secure clusters, use something like:

$ export SPARK_KAFKA_VERSION=0.10
$ spark2-submit \
    --files jaas.conf,kafka.kt \
    --driver-java-options=-Djava.security.auth.login.config=jaas.conf \
    --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf \
    envelope-*.jar fix_generator.conf

Seeing the results

You can verify the data is being loaded into HBase by running a few sample scans from the HBase shell:

$ hbase shell
$ scan 'envelopetest:test', { LIMIT => 100, STARTROW => 'GOOG'}
...
 GOOG:\x00\x00\x01a\x9AH\xD8\x89        column=cf2:orderqty, timestamp=1518711928433, value=\x00\x00\x14G
 GOOG:\x00\x00\x01a\x9AH\xD8\x8A        column=cf1:clordid, timestamp=1518711929062, value=74687578-c24f-4d98-930a-7f466cece584
 GOOG:\x00\x00\x01a\x9AH\xD8\x8A        column=cf2:cumqty, timestamp=1518711929062, value=\x00\x00\x08D
 GOOG:\x00\x00\x01a\x9AH\xD8\x8A        column=cf2:leavesqty, timestamp=1518711929062, value=\x00\x00\x17#
 GOOG:\x00\x00\x01a\x9AH\xD8\x8A        column=cf2:orderqty, timestamp=1518711929062, value=\x00\x00\x1E\x19
100 row(s) in 0.4580 seconds
$ count 'envelopetest:test'
...
Current count: 380000, row: VMW:\x00\x00\x01a\x9AJ17
Current count: 381000, row: VMW:\x00\x00\x01a\x9AJ7\xD9
Current count: 382000, row: VMW:\x00\x00\x01a\x9AJ?\x9A
382705 row(s) in 10.5010 seconds