Finding anomalies can be tricky, with Apache Kafka® and Apache Flink® we can find out in streaming mode
The first step is to create an Aiven for Apache Kafka® and an Aiven for Apache Flink® services. You are free to use any Apache Kafka® or Apache Flink® service out there. The anomaly detection SQL queries will work over any Apache Flink® 1.14+ version, the table definition might require some minor tweaks.
The following commands will create the Aiven for Apache Kafka® and an Aiven for Apache Flink® services, and the integration between them.
avn service create demo-kafka \
-t kafka \
--cloud google-europe-west3 \
-p business-4 \
-c kafka.auto_create_topics_enable=true \
-c kafka_connect=true \
-c kafka_rest=true \
-c schema_registry=true
avn service create demo-flink -t flink --cloud google-europe-west3 -p business-4
avn service integration-create \
-t flink \
-s demo-kafka \
-d demo-flink
To be able to push the data to Aiven for Apache Kafka® we need the SSL certificates, which can be downloaded in the certs
folder with the following
avn service user-creds-download demo-kafka --username avnadmin -d certs
Now it's time to clone the Fake pizza generator on Docker and start creating pizza orders.
rm -rf fake-data-producer-for-apache-kafka-docker
git clone https://github.com/aiven/fake-data-producer-for-apache-kafka-docker.git
You need to copy the env.conf.sample
within the fake-data-producer-for-apache-kafka-docker/conf/
folder to env.conf
and include all the required parameters. Then we can build the container and run it.
cd fake-data-producer-for-apache-kafka-docker/
docker build -t fake-data-producer-for-apache-kafka-docker .
docker run -it fake-data-producer-for-apache-kafka-docker
If everything works correctly we should see the flow of pizza orders being generated
To be able to define flink pipelines we need to retrieve the ID of the integration between Apache Kafka and Apache Flink with:
KAFKA_FLINK_SI=$(avn service integration-list --json demo-kafka | jq -r '.[] | select(.dest == "demo-flink").service_integration_id')
To make the following calls easier to run, assign to the variable PROJECT
the name of your project with:
PROJECT=my_aiven_project_name
The first anomaly will be to spot all occurrences of 🍍 pineapple
, 🍓 strawberry
and 🍌 banana
and redirect them to a specific topic, we can do so by:
- Creating an application named
BasicFiltering
with
avn service flink create-application demo-flink \
--project $PROJECT \
"{\"name\":\"BasicFiltering\"}"
- Retrieve the application id with:
APP_ID=$(avn service flink list-applications demo-flink \
--project $PROJECT | jq -r '.applications[] | select(.name == "BasicFiltering").id')
- Replace the integration ids in the Application definition file named
01-basic-filtering.json
mkdir -p tmp
sed "s/KAFKA_INTEGRATION_ID/$KAFKA_FLINK_SI/" 'flink-app/01-basic-filtering.json' > tmp/01-basic-filtering.json
- Creating the Apache Flink application filtering the data with:
avn service flink create-application-version demo-flink \
--project $PROJECT \
--application-id $APP_ID \
@tmp/01-basic-filtering.json
The flink-app/01-basic-filtering.json
contains:
- A source table definition, pointing to the
pizzas
topic:
CREATE TABLE pizza_source (
id INT,
shop VARCHAR,
name VARCHAR,
phoneNumber VARCHAR,
address VARCHAR,
pizzas ARRAY <ROW (pizzaName VARCHAR, additionalToppings ARRAY <VARCHAR>)>, orderTimestamp TIMESTAMP(3) METADATA FROM 'timestamp', orderProctime AS PROCTIME(), WATERMARK FOR orderTimestamp AS orderTimestamp - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '',
'scan.startup.mode' = 'earliest-offset',
'topic' = 'pizzas',
'value.format' = 'json'
)
- A target table application, pointing to a new Kafka topic named
pizza_stream_out_filter
:
CREATE TABLE pizza_filtered (
id INT,
name VARCHAR,
topping VARCHAR,
orderTimestamp TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '',
'scan.startup.mode' = 'earliest-offset',
'topic' = 'pizza_stream_out_filter',
'value.format' = 'json'
)
- The Filtering transformation logic:
insert into pizza_filtered
select
id,
name,
c.topping,
orderTimestamp
from pizza_source
cross join UNNEST(pizzas) b
cross join UNNEST(b.additionalToppings) as c(topping)
where c.topping in ('🍍 pineapple', '🍓 strawberry','🍌 banana')
Run the application
We can run the application by following the steps below:
- Retrieve the Application version id you want to run, e.g. for the version
1`` of the
BasicFiltering` application:
APP_VERSION_1=$(avn service flink get-application demo-flink \
--project $PROJECT --application-id $APP_ID | jq -r '.application_versions[] | select(.version == 1).id')
- Create a deployment and store its id in the
DEPLOYMENT_ID
variable
avn service flink create-application-deployment demo-flink \
--project $PROJECT \
--application-id $APP_ID \
"{\"parallelism\": 1,\"restart_enabled\": true, \"version_id\": \"$APP_VERSION_1\"}"
- Retrieve the deployment id with:
APP_DEPLOYMENT=$(avn service flink list-application-deployments demo-flink \
--project $PROJECT \
--application-id $APP_ID | jq -r ".deployments[] | select(.version_id == \"$APP_VERSION_1\").id")
- Retrieve the deployment status
avn service flink get-application-deployment demo-flink \
--project $PROJECT \
--application-id $APP_ID \
--deployment-id $APP_DEPLOYMENT | jq '.status'
The app should be in RUNNING
state
We can verify the presence of data in the target pizza_stream_out_filter
topic using kcat (after properly setting the kcat.config
file as per dedicated documentation)
kcat -F $PINEAPPLE_PATH/kcat.config -C -t pizza_stream_out_filter
What if we want to flag only orders having more than 3 prohibited toppings?
- Creating an application named
Aggregating
with
avn service flink create-application demo-flink \
--project $PROJECT \
"{\"name\":\"Aggregating\"}"
- Retrieve the application id with:
APP_ID=$(avn service flink list-applications demo-flink \
--project $PROJECT | jq -r '.applications[] | select(.name == "Aggregating").id')
- Replace the integration ids in the Application definition file named
02-aggregating.json
mkdir -p tmp
sed "s/KAFKA_INTEGRATION_ID/$KAFKA_FLINK_SI/" 'flink-app/02-aggregating.json' > tmp/02-aggregating.json
- Creating the Apache Flink application filtering the data with:
avn service flink create-application-version demo-flink \
--project $PROJECT \
--application-id $APP_ID \
@tmp/02-aggregating.json
The flink-app/02-aggregating.json
contains:
- A source table definition, pointing to the
pizzas
topic:
CREATE TABLE pizza_source (
id INT,
shop VARCHAR,
name VARCHAR,
phoneNumber VARCHAR,
address VARCHAR,
pizzas ARRAY <ROW (pizzaName VARCHAR, additionalToppings ARRAY <VARCHAR>)>, orderTimestamp TIMESTAMP(3) METADATA FROM 'timestamp', orderProctime AS PROCTIME(), WATERMARK FOR orderTimestamp AS orderTimestamp - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '',
'scan.startup.mode' = 'earliest-offset',
'topic' = 'pizzas',
'value.format' = 'json'
)
- A target table application, pointing to a new Kafka topic named
pizza_stream_out_agg
with the upsert mode and the orderid
being the primary key:
CREATE TABLE pizzas_aggs (
id INT PRIMARY KEY,
name VARCHAR,
nr_bad_items BIGINT,
toppings VARCHAR,
orderTimestamp TIMESTAMP(3)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = '',
'topic' = 'pizza_stream_out_agg',
'value.format' = 'json',
'key.format' = 'json'
)
- The aggregating transformation logic:
insert into pizzas_aggs
select
id,
name,
count(*) nr_bad_items,
LISTAGG(c.topping) list_bad_items,
orderTimestamp
from pizza_source
cross join UNNEST(pizzas) b
cross join UNNEST(b.additionalToppings) as c(topping)
where c.topping in ('🍍 pineapple', '🍓 strawberry','🍌 banana')
group by id, name, orderTimestamp
having count(*) > 3
Run the application
We can run the application by following the steps below:
- Retrieve the Application version id you want to run, e.g. for the version
1`` of the
BasicFiltering` application:
APP_VERSION_1=$(avn service flink get-application demo-flink \
--project $PROJECT --application-id $APP_ID | jq -r '.application_versions[] | select(.version == 1).id')
- Create a deployment and store its id in the
DEPLOYMENT_ID
variable
avn service flink create-application-deployment demo-flink \
--project $PROJECT \
--application-id $APP_ID \
"{\"parallelism\": 1,\"restart_enabled\": true, \"version_id\": \"$APP_VERSION_1\"}"
- Retrieve the deployment id with:
APP_DEPLOYMENT=$(avn service flink list-application-deployments demo-flink \
--project $PROJECT \
--application-id $APP_ID | jq -r ".deployments[] | select(.version_id == \"$APP_VERSION_1\").id")
- Retrieve the deployment status
avn service flink get-application-deployment demo-flink \
--project $PROJECT \
--application-id $APP_ID \
--deployment-id $APP_DEPLOYMENT | jq '.status'
The app should be in RUNNING
state
We can verify the presence of data in the target pizza_stream_out_agg
topic using kcat (after properly setting the kcat.config
file as per dedicated documentation)
kcat -F $PINEAPPLE_PATH/kcat.config -C -t pizza_stream_out_agg
We might want to check orders over time and flag only if certain thresholds have been met over a precise time window.
- Creating an application named
Windowing
with
avn service flink create-application demo-flink \
--project $PROJECT \
"{\"name\":\"Windowing\"}"
- Retrieve the application id with:
APP_ID=$(avn service flink list-applications demo-flink \
--project $PROJECT | jq -r '.applications[] | select(.name == "Windowing").id')
- Replace the integration ids in the Application definition file named
03-windowing.json
mkdir -p tmp
sed "s/KAFKA_INTEGRATION_ID/$KAFKA_FLINK_SI/" 'flink-app/03-windowing.json' > tmp/03-windowing.json
- Creating the Apache Flink application filtering the data with:
avn service flink create-application-version demo-flink \
--project $PROJECT \
--application-id $APP_ID \
@tmp/03-windowing.json
The flink-app/03-windowing.json
contains:
- A source table definition, pointing to the
pizzas
topic:
CREATE TABLE pizza_source (
id INT,
shop VARCHAR,
name VARCHAR,
phoneNumber VARCHAR,
address VARCHAR,
pizzas ARRAY <ROW (pizzaName VARCHAR, additionalToppings ARRAY <VARCHAR>)>, orderTimestamp TIMESTAMP(3) METADATA FROM 'timestamp', orderProctime AS PROCTIME(), WATERMARK FOR orderTimestamp AS orderTimestamp - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '',
'scan.startup.mode' = 'earliest-offset',
'topic' = 'pizzas',
'value.format' = 'json'
)
- A target table application, pointing to a new Kafka topic named
pizza_stream_out_agg_windows
with the upsert mode and the ordertopping
,window_start
, andwindow_end
being the primary key:
CREATE TABLE pizza_windows (
window_time TIMESTAMP(3),
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
topping VARCHAR,
nr_orders BIGINT,
PRIMARY KEY(topping, window_start, window_end) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = '',
'topic' = 'pizza_stream_out_agg_windows',
'value.format' = 'json',
'key.format' = 'json'
)
- The aggregating transformation logic:
insert into pizza_windows
with raw_data as (
select orderTimestamp,
id,
c.topping
from pizza_source
cross join UNNEST(pizzas) b
cross join UNNEST(b.additionalToppings) as c(topping)
)
SELECT window_time,
window_start,
window_end,
topping,
count(*) nr_orders
FROM TABLE(TUMBLE(TABLE raw_data, DESCRIPTOR(orderTimestamp), interval '5' seconds))
where topping in ('🍍 pineapple', '🍓 strawberry','🍌 banana')
group by window_time,
topping,
window_start,
window_end
having count(*) > 10
Run the application
We can run the application by following the steps below:
- Retrieve the Application version id you want to run, e.g. for the version
1`` of the
BasicFiltering` application:
APP_VERSION_1=$(avn service flink get-application demo-flink \
--project $PROJECT --application-id $APP_ID | jq -r '.application_versions[] | select(.version == 1).id')
- Create a deployment and store its id in the
DEPLOYMENT_ID
variable
avn service flink create-application-deployment demo-flink \
--project $PROJECT \
--application-id $APP_ID \
"{\"parallelism\": 1,\"restart_enabled\": true, \"version_id\": \"$APP_VERSION_1\"}"
- Retrieve the deployment id with:
APP_DEPLOYMENT=$(avn service flink list-application-deployments demo-flink \
--project $PROJECT \
--application-id $APP_ID | jq -r ".deployments[] | select(.version_id == \"$APP_VERSION_1\").id")
- Retrieve the deployment status
avn service flink get-application-deployment demo-flink \
--project $PROJECT \
--application-id $APP_ID \
--deployment-id $APP_DEPLOYMENT | jq '.status'
The app should be in RUNNING
state
We can verify the presence of data in the target pizza_stream_out_agg_windows
topic using kcat (after properly setting the kcat.config
file as per dedicated documentation)
kcat -F $PINEAPPLE_PATH/kcat.config -C -t pizza_stream_out_agg_windows -u | jq -c
The last anomaly is to check for particular trends, using the MATCH_RECOGNIZE
function. Let's create the table.
- Creating an application named
Trends
with
avn service flink create-application demo-flink \
--project $PROJECT \
"{\"name\":\"Trends\"}"
- Retrieve the application id with:
APP_ID=$(avn service flink list-applications demo-flink \
--project $PROJECT | jq -r '.applications[] | select(.name == "Trends").id')
- Replace the integration ids in the Application definition file named
04-trends.json
mkdir -p tmp
sed "s/KAFKA_INTEGRATION_ID/$KAFKA_FLINK_SI/" 'flink-app/04-trends.json' > tmp/04-trends.json
- Creating the Apache Flink application filtering the data with:
avn service flink create-application-version demo-flink \
--project $PROJECT \
--application-id $APP_ID \
@tmp/04-trends.json
The flink-app/04-trends.json
contains:
- A source table definition, pointing to the
pizzas
topic:
CREATE TABLE pizza_source (
id INT,
shop VARCHAR,
name VARCHAR,
phoneNumber VARCHAR,
address VARCHAR,
pizzas ARRAY <ROW (pizzaName VARCHAR, additionalToppings ARRAY <VARCHAR>)>, orderTimestamp TIMESTAMP(3) METADATA FROM 'timestamp', orderProctime AS PROCTIME(), WATERMARK FOR orderTimestamp AS orderTimestamp - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '',
'scan.startup.mode' = 'earliest-offset',
'topic' = 'pizzas',
'value.format' = 'json'
)
- A target table application, pointing to a new Kafka topic named
pizza_stream_out_trends
with the upsert mode and the ordertopping
andtrend_start
being the primary key:
CREATE TABLE pizza_orders_trend (
topping VARCHAR,
trend_start TIMESTAMP(3),
trend_detail VARCHAR,
PRIMARY KEY(topping, trend_start) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'pizza_stream_out_trends',
'topic' = 'pizza_stream_out_trends',
'value.format' = 'json',
'key.format' = 'json'
)
- The aggregating transformation logic using Apache Flink
MATCH_RECOGNIZE
function:
insert into pizza_orders_trend
with raw_data as (
select orderTimestamp,
id,
c.topping
from pizza_source
cross join UNNEST(pizzas) b
cross join UNNEST(b.additionalToppings) as c(topping)
)
, windowing as
(SELECT window_time,
window_start,
window_end,
topping,
count(*) nr_orders
FROM TABLE(TUMBLE(TABLE raw_data, DESCRIPTOR(orderTimestamp), interval '1' seconds))
where topping in ('🍍 pineapple', '🍓 strawberry','🍌 banana')
group by window_time,
topping,
window_start,
window_end)
select * from windowing
MATCH_RECOGNIZE (
PARTITION BY topping
ORDER BY window_time
MEASURES
START_ROW.window_time as start_tstamp,
LISTAGG(cast(nr_orders as string)) as various_nr_orders
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (START_ROW NR_UP NR_DOWN)
DEFINE
NR_UP AS
NR_UP.nr_orders > START_ROW.nr_orders,
NR_DOWN AS
(NR_UP.nr_orders - NR_DOWN.nr_orders)*100.0/NR_UP.nr_orders > 30
)
Run the application
We can run the application by following the steps below:
- Retrieve the Application version id you want to run, e.g. for the version
1`` of the
BasicFiltering` application:
APP_VERSION_1=$(avn service flink get-application demo-flink \
--project $PROJECT --application-id $APP_ID | jq -r '.application_versions[] | select(.version == 1).id')
- Create a deployment and store its id in the
DEPLOYMENT_ID
variable
avn service flink create-application-deployment demo-flink \
--project $PROJECT \
--application-id $APP_ID \
"{\"parallelism\": 1,\"restart_enabled\": true, \"version_id\": \"$APP_VERSION_1\"}"
- Retrieve the deployment id with:
APP_DEPLOYMENT=$(avn service flink list-application-deployments demo-flink \
--project $PROJECT \
--application-id $APP_ID | jq -r ".deployments[] | select(.version_id == \"$APP_VERSION_1\").id")
- Retrieve the deployment status
avn service flink get-application-deployment demo-flink \
--project $PROJECT \
--application-id $APP_ID \
--deployment-id $APP_DEPLOYMENT | jq '.status'
The app should be in RUNNING
state
We can verify the presence of data in the target pizza_stream_out_trends
topic using kcat (after properly setting the kcat.config
file as per dedicated documentation)
kcat -F $PINEAPPLE_PATH/kcat.config -C -t pizza_stream_out_trends -u | jq -c
Once done, we can delete our services with:
avn service terminate demo-kafka --force
avn service terminate demo-flink --force
avn service terminate demo-postgresql --force