Skip to content

Commit

Permalink
Add Flink-Hive use case in Jupyter notebook (apache#91)
Browse files Browse the repository at this point in the history
This commit adds a Flink-Hive use case to the Jupyter notebook
  • Loading branch information
TungYuChiang committed Nov 5, 2024
1 parent 4c3352a commit 440ffd2
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
**/.idea
**/.DS_Store
**/packages/**
2 changes: 2 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ services:
volumes:
- ./init/jupyter:/tmp/gravitino
entrypoint: /bin/bash /tmp/gravitino/init.sh
environment:
- HADOOP_CLASSPATH=/tmp/gravitino/packages/hadoop-2.7.3/etc/hadoop:/tmp/gravitino/packages/hadoop-2.7.3/share/hadoop/common/lib/*:/tmp/gravitino/packages/hadoop-2.7.3/share/hadoop/common/*:/tmp/gravitino/packages/hadoop-2.7.3/share/hadoop/hdfs:/tmp/gravitino/packages/hadoop-2.7.3/share/hadoop/hdfs/lib/*:/tmp/gravitino/packages/hadoop-2.7.3/share/hadoop/hdfs/*:/tmp/gravitino/packages/hadoop-2.7.3/share/hadoop/yarn/lib/*:/tmp/gravitino/packages/hadoop-2.7.3/share/hadoop/yarn/*:/tmp/gravitino/packages/hadoop-2.7.3/share/hadoop/mapreduce/lib/*:/tmp/gravitino/packages/hadoop-2.7.3/share/hadoop/mapreduce/*:/tmp/gravitino/packages/contrib/capacity-scheduler/*.jar
depends_on:
hive :
condition: service_healthy
Expand Down
244 changes: 244 additions & 0 deletions init/jupyter/gravitino-flink-hive-example.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "27e1e593-bf96-4977-a272-c6884d46b9e3",
"metadata": {},
"source": [
"# Gravitino Flink-Hive Example"
]
},
{
"cell_type": "markdown",
"id": "e604cee4-43d6-472c-8227-75e339b92c9f",
"metadata": {},
"source": [
"## Setting Up PyFlink with Hive and Gravitino Connectors"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "f0cf8f3e-14f9-4209-8103-a3a0c598a21a",
"metadata": {},
"outputs": [],
"source": [
"from pyflink.table import EnvironmentSettings, TableEnvironment\n",
"from pyflink.common import Configuration\n",
"from pyflink.table.expressions import col\n",
"from pyflink.table import DataTypes\n",
"\n",
"configuration = Configuration()\n",
" \n",
"configuration.set_string(\n",
" \"pipeline.jars\",\n",
" \"file:///tmp/gravitino/packages/gravitino-flink-connector-runtime-1.18_2.12-0.6.1-incubating.jar;\"\n",
" \"file:///tmp/gravitino/packages/gravitino-flink-1.18_2.12-0.6.1-incubating.jar;\"\n",
" \"file:///tmp/gravitino/packages/flink-sql-connector-hive-2.3.10_2.12-1.20.0.jar\"\n",
" )\n",
"configuration.set_string(\"table.catalog-store.kind\", \"gravitino\")\n",
"configuration.set_string(\"table.catalog-store.gravitino.gravitino.uri\", \"http://gravitino:8090\")\n",
"configuration.set_string(\"table.catalog-store.gravitino.gravitino.metalake\", \"metalake_demo\")\n",
"\n",
"env_settings = EnvironmentSettings.new_instance().with_configuration(configuration)\n",
"table_env = TableEnvironment.create(env_settings.in_batch_mode().build())\n"
]
},
{
"cell_type": "markdown",
"id": "d2f87a66-6bd7-4d54-8912-7c03e0661b0f",
"metadata": {},
"source": [
"## Write Queries "
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "f1037708-56a3-4b7a-80a1-b1015e928a03",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<pyflink.table.table_result.TableResult at 0xffff67efd290>"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"table_env.use_catalog(\"catalog_hive\")\n",
"table_env.execute_sql(\"CREATE DATABASE IF NOT EXISTS Reading_System\")\n",
"table_env.execute_sql(\"USE Reading_System\")\n",
"table_env.execute_sql(\"\"\"\n",
" CREATE TABLE IF NOT EXISTS books (\n",
" id INT,\n",
" title STRING,\n",
" author STRING,\n",
" publish_date STRING\n",
" ) \n",
"\"\"\")"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "0996b4d2-35dc-456c-9b08-52b8beb8fe86",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"<Row('default')>\n",
"<Row('reading_system')>\n",
"<Row('sales')>\n"
]
}
],
"source": [
"result = table_env.execute_sql(\"SHOW DATABASES\")\n",
"with result.collect() as results:\n",
" for row in results:\n",
" print(row)\n"
]
},
{
"cell_type": "markdown",
"id": "b860f1dc-cb63-46a4-aff1-bbbe8d47fee8",
"metadata": {},
"source": [
"### Write Table API Queries"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "69eea5be-73c9-489a-b294-74bdea0f6bf7",
"metadata": {},
"outputs": [],
"source": [
"new_books = table_env.from_elements(\n",
" [\n",
" (4, 'The Great Gatsby', 'F. Scott Fitzgerald', '1925-04-10'),\n",
" (5, 'Moby Dick', 'Herman Melville', '1851-11-14')\n",
" ],\n",
" DataTypes.ROW([\n",
" DataTypes.FIELD(\"id\", DataTypes.INT()),\n",
" DataTypes.FIELD(\"title\", DataTypes.STRING()),\n",
" DataTypes.FIELD(\"author\", DataTypes.STRING()),\n",
" DataTypes.FIELD(\"publish_date\", DataTypes.STRING())\n",
" ])\n",
")\n",
"\n",
"\n",
"new_books.execute_insert('books').wait()"
]
},
{
"cell_type": "markdown",
"id": "46c223b3-a08b-4ffc-b6b0-1e73451036d6",
"metadata": {},
"source": [
"### Write SQL Queries"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "228d0ca3-8ad2-4b53-ae99-c430713aeb02",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<pyflink.table.table_result.TableResult at 0xffff67f14a50>"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"table_env.execute_sql(\"\"\"\n",
" INSERT INTO books VALUES \n",
" (6, 'Pride and Prejudice', 'Jane Austen', '1813-01-28'),\n",
" (7, 'The Catcher in the Rye', 'J.D. Salinger', '1951-07-16')\n",
"\"\"\")"
]
},
{
"cell_type": "markdown",
"id": "d27ed14a-4b77-42c6-8c7f-b3e75cb92519",
"metadata": {},
"source": [
"### Result"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "5232e493-a699-4d50-b489-4de4652bf344",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"<Row(4, 'The Great Gatsby', 'F. Scott Fitzgerald', '1925-04-10')>\n",
"<Row(5, 'Moby Dick', 'Herman Melville', '1851-11-14')>\n",
"<Row(6, 'Pride and Prejudice', 'Jane Austen', '1813-01-28')>\n",
"<Row(7, 'The Catcher in the Rye', 'J.D. Salinger', '1951-07-16')>\n",
"<Row(6, 'Pride and Prejudice', 'Jane Austen', '1813-01-28')>\n",
"<Row(7, 'The Catcher in the Rye', 'J.D. Salinger', '1951-07-16')>\n",
"<Row(6, 'Pride and Prejudice', 'Jane Austen', '1813-01-28')>\n",
"<Row(7, 'The Catcher in the Rye', 'J.D. Salinger', '1951-07-16')>\n",
"<Row(4, 'The Great Gatsby', 'F. Scott Fitzgerald', '1925-04-10')>\n",
"<Row(5, 'Moby Dick', 'Herman Melville', '1851-11-14')>\n",
"<Row(4, 'The Great Gatsby', 'F. Scott Fitzgerald', '1925-04-10')>\n",
"<Row(5, 'Moby Dick', 'Herman Melville', '1851-11-14')>\n"
]
}
],
"source": [
"result = table_env.execute_sql(\"SELECT * FROM books\")\n",
"with result.collect() as results:\n",
" for row in results:\n",
" print(row)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2f90cdec-a989-4e3f-b7cb-262a82e88e4f",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.5"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
11 changes: 11 additions & 0 deletions init/jupyter/jupyter-dependency.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,14 @@ if [[ ! -d "${jupyter_dir}/packages" ]]; then
find "${jupyter_dir}/../spark/packages/" | grep jar | xargs -I {} ln {} "${jupyter_dir}/packages/"
fi

FLINK_HIVE_CONNECTOR_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.10_2.12/1.20.0/flink-sql-connector-hive-2.3.10_2.12-1.20.0.jar"
FLINK_HIVE_CONNECTOR_MD5="${FLINK_HIVE_CONNECTOR_JAR}.md5"
download_and_verify "${FLINK_HIVE_CONNECTOR_JAR}" "${FLINK_HIVE_CONNECTOR_MD5}" "${script_dir}/packages"

GRAVITINO_FLINK_JAR="https://repo1.maven.org/maven2/org/apache/gravitino/gravitino-flink-1.18_2.12/0.6.1-incubating/gravitino-flink-1.18_2.12-0.6.1-incubating.jar"
GRAVITINO_FLINK_MD5="${GRAVITINO_FLINK_JAR}.md5"
download_and_verify "${GRAVITINO_FLINK_JAR}" "${GRAVITINO_FLINK_MD5}" "${script_dir}/packages"

GRAVITINO_FLINK_CONNECTOR_RUNTIME_JAR="https://repo1.maven.org/maven2/org/apache/gravitino/gravitino-flink-connector-runtime-1.18_2.12/0.6.1-incubating/gravitino-flink-connector-runtime-1.18_2.12-0.6.1-incubating.jar"
GRAVITINO_FLINK_CONNECTOR_RUNTIME_MD5="${GRAVITINO_FLINK_CONNECTOR_RUNTIME_JAR}.md5"
download_and_verify "${GRAVITINO_FLINK_CONNECTOR_RUNTIME_JAR}" "${GRAVITINO_FLINK_CONNECTOR_RUNTIME_MD5}" "${script_dir}/packages"

0 comments on commit 440ffd2

Please sign in to comment.