We will practice Apache Flink with simple activities:
- setup Apache Flink in local machine
- write streaming applications with Flink
- run Flink streaming applications
- change the application to run it on remote server
- understand relationships between developers and platform providers through tools/supports
Note: there are many tutorials about Apache Flink that you can take a look in the Internet, e.g. Flink DataStream API Tutorial or Apache Flink with in AWS
Download Apache Flink from Apache and follow the installation guide for a local machine. In this simple tutorial, we use Apache Flink 1.14.3 for Scala 2.11.
The example we use to run is for the BTS data and we will use Kafka and RabbitMQ as the message broker through the streaming analytics application obtains data.
You can setup your own RabbitMQ or use a test RabbitMQ during the tutorial. A very simple way of starting a test RabbitMQ is via docker. Use the command:
$ docker run -d -p 5672:5672 --hostname my-rabbit --name rabbitMQ rabbitmq
It will start a container of rabbitMQ with both username and password as guest
.
You can also follow the Kafka instructions to start a Kafka cluster or use our simple Kafka tutorial. Then you have to create a few topics before running the experiment and test if your Kafka server works
$ bin/kafka-topics.sh --create --topic <your topic name> --bootstrap-server <your Kafka host ip>:<Kafka port>
$ bin/kafka-topics.sh --list --zookeeper <zookeeper host>:<zookeeper port>
Following Flink guide to see if the installation is ok. Move into the directory of your Flink installation and start Flink:
/opt/flink$ bin/start-cluster.sh
then check the UI
Alternatively, you can also use docker-compose
to start a cluster. The relevant compose configuration file is in code/docker-compose.yml
. You can start the cluster using :
$ docker-compose up -d
and then check the UI
You can check the Flink example and test it to see how it works.
Hint: You can also use the web UI to submit a job to a Session cluster. Alternatively, use Flink CLI on the host if it is installed: flink run -d -m ${FLINK_JOBMANAGER_URL} /job.jar [jar_arguments]
Check the source of BTS in our Git. It is a simple example for illustrating purposes. The program is built with maven.
$ mvn install
$ ls target/simplebts-0.1-SNAPSHOT.jar
the file *target/simplebts-0.1-SNAPSHOT.jar is the one that will be submitted to Flink.
Before running BTS Flink, check if we can send and receive data to/from RabbitMQ. We have two python test programs in scripts/ and the data file in cs-e4640/data/bts:
Start a BTS test producer using Kafka client:
$ python3 test_kafka_producer.py --queue_name [your_selected_queue_name] --input_file [cs-e4640/data/bts/bts-data-alarm-2017.csv] --kafka [your_kafka_host]
Then start a BTS test receivers in both Kafka client and RabbitMQ:
$ python3 test_kafka_consumer.py --queue_name [your_selected_queue_name] --kafka [your_kafka_host]
$ python3 test_amqp_consumer.py --queue_name [your_selected_queue_name] --rabbit [your_rabbitmq_uri]
if you see the receiver outputs data, it means that the RabbitMQ is working.
Now assume that you choose two queue names:
- iqueue123: indicate the queue where we send the data
- oqueue123: indicate the queue where we receive the alert.
- amqp://guest:guest@localhost:5672: is the AMQPURI
- localhost:9092: is the Kafka url
Run the Flink BTS program:
bin/flink run simplebts-0.1-SNAPSHOT.jar --amqpurl amqp://guest:guest@localhost:5672 --iqueue iqueue123 --oqueue oqueue123 --kafkaurl localhost:9092 --parallelism 1
Now start our test producer again with the queue name as iqueue123:
python3 test_kafka_producer.py --queue_name iqueue123 --input_file cs-e4640/data/bts/bts-data-alarm-2017.csv --kafka localhost:9092
and then start a BTS test receivers with queue name as oqueue123:
$ python3 test_kafka_consumer.py --queue_name oqueue123 --kafka localhost:9092
$ python3 test_amqp_consumer.py --queue_name oqueue123 --rabbit amqp://guest:guest@localhost:5672
to see if you can receive any alerts.
Check the logs under flink/log:
- flink * taskexecutor *.log
- flink * taskexecutor *.stdout
to see errors, printout. Alternatively, you can also see the logs on the flink UI.
Change the code to submit Flink job to a remote server.