This example shows how to save input dataset with features into a MySQL Cluster
with MySQLSink
, and provides online serving based on the saved features with
MySQLSource
. It involves the following steps:
-
Read a batch of historical item price events from a file.
Each item price event has the following fields:
- item_id, unique identifier of the item.
- price, the new price of this item.
- timestamp, time when the new price is used for this item.
-
For each item price event, save the event into a MySQL cluster. If a feature with the same item_id has been saved to MySQL before,FeatHub will update the price and timestamp values of the entry in MySQL.
-
Read the latest item prices from the MySQL cluster and use them to create an
OnDemandFeatureView
to provide online serving. In this example, the features displayed by online serving are printed out in the terminal.
Prerequisites for running this example:
- Unix-like operating system (e.g. Linux, Mac OS X)
- Python 3.7
Please execute the following commands under the flink-read-write-mysql
folder
to run this example.
-
Install FeatHub pip package with FlinkProcessor dependencies.
$ python -m pip install --upgrade "feathub-nightly[flink]"
-
Start the Flink cluster and MySQL cluster.
$ docker-compose up -d
After the Flink cluster has started, you should be able to navigate to the web UI at localhost:8081 to view the Flink dashboard.
-
Initialize MySQL Table
$ python initialize_mysql_table.py
-
Run the FeatHub program to save the item price features to MySQL and provide online serving accordingly.
$ python main.py
You should be able to find the following information printed out in the end of the terminal.
item_id price 0 item_1 400.0 1 item_2 200.0 2 item_3 300.0
-
Tear down the Flink cluster and MySQL cluster after the FeatHub program has finished.
docker-compose down