In this exercise we will learn to ingest data in real-time into the architecture so, at the end, we will have cryptocurrency prices flowing in real-time through NiFi to Kafka.
The data source we will be using for this exercise is Coinbase, where they have a full API, including the delivery of real-time data via WebSockets.
For a reference of the API follow the below link:
Start the NiFi service:
docker compose start nifi-upv
Once it is running, go to https://localhost:8443/nifi/
NOTE: You will find the user and password in the docker logs:
Generated Username [USERNAME]
Generated Password [PASSWORD]
You can check the logs either in the Docker desktop console or using the docker logs command ("grep" part only valid in Linux, Mac or Cygwin):
docker logs nifi-upv | grep Generated
This will be explained during the class.
In this part we will be getting data in real-time from Coinbase's websockets, and storing locally for testing purposes.
First load the template with the Coinbase workflow:
- Stop (or even remove) previous processors
- Load template
- Upload Template
nifi/Coinbase-basic.xml
- Right click on canvas and "Upload template"
- Add template to the canvas
- Top menu --> Drag & Drop "Template" --> Select the uploaded template
- Upload Template
Now we need to configure the secrets as shown in the image and explained below:
- Configure services (secrets):
- Right click on "ConnectWebsocket" processor --> "Configure" --> "Properties"
- Click on the arrow on "JettyWebSocketClient"
- Configure the "StandardRestrictedSSLContextService" and add the secrects:
- Keystore Passwd: FSDWYWIBfOf0beegaOXYsHnF2JHiFFEDx0UxCA5EQqU
- Key Passwd: FSDWYWIBfOf0beegaOXYsHnF2JHiFFEDx0UxCA5EQqU
- Trustore Passwd: Ba9Cuw6qam8/CziLiXmUIkjuxYmf/UzLYlJemlqrxjQ
- Save and enable the services (lightning bolt icon)
Now the workflow is configured and should look something like this (with no "warning sign"):
If everything is ok, run it:
- Right-click on the canvas and "Start"
Check the processors to confirm nothing is failing and, if you want to check the results (files saved), they will be available in the NiFi Docker container (if using Docker). Do the following to check it:
docker ps
docker exec -it <nifi_container_id> /bin/bash
ls -l <folder_configured_in_PutFile_Processor>
You can also check it in the Docker Dashboard, just by clicking on the container and then the "Files" tab.
Now, your turn. You will have to update the workflow so it sends the data to Kafka, instead of file system.
First of all, start the Kafka services:
docker compose start zookeeper broker kafka-ui
If the components were not launched before, do so now with "docker compose up":
docker compose up zookeeper broker kafka-ui -d
Once it is running, go to the Kafka UI (http://localhost:8080/) and navigate to the topics section (click on the cluster and then on "Topics"). There should be no topics and/or messages created by us.
Now change the NiFi workflow to send messages to Kafka instead of saving to file:
- TIPS:
- Replace the PutFile processor for PublishKafka_2_6
- Hostname: broker:29092 (as seen in the Kafka Docker Compose config in KAFKA_ADVERTISED_LISTENERS)
- Use the "tickers" topic name (if you use a different one, make sure you are consistent throught the exercises)
- Do not use transactions and set the delivery guarantee to "Best effort", in order to improve performance
- Terminate the outputs (in the "RELATIONSHIPS" tab)
Once done and, if everything is working, go back to the Kafka UI and check that the topic is created and the messages flowing.