Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add integration test for MySQL2Spanner notebook #1001

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
![Python Dataproc Serverless Integration Test Status](https://dataproctemplatesci.com/buildStatus/icon?job=dataproc-templates-build%2Fintegration-tests-python&&subject=python-serverless-integration-tests)
![Python Dataproc Cluster Integration Tests Status](https://dataproctemplatesci.com/buildStatus/icon?job=dataproc-templates-build%2Fcluster-integration-tests-python&&subject=python-cluster-integration-tests)

![Notebooks Integration Test Status](https://dataproctemplatesci.com/buildStatus/icon?job=dataproc-templates-build%2Fintegration-tests-notebooks&&subject=integration-tests-notebooks)

# Dataproc Templates
Dataproc templates are designed to address various in-cloud data tasks, including data import/export/backup/restore and bulk API operations. These templates leverage the power of [Google Cloud's Dataproc](https://cloud.google.com/dataproc/), supporting both Dataproc Serverless and Dataproc clusters.
Expand Down
117 changes: 117 additions & 0 deletions notebooks/.ci/Jenkinsfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
def stageRetryCount = 3

pipeline {

agent any

environment {
DATAPROC_TELEPORT_WEBHOOK_URL = credentials('dataproc-teleport-webhook-url')

TEST_JDBC_URL = credentials('env-test-jdbc-url')

GIT_BRANCH_LOCAL = sh (
script: "echo $branchName | sed -e 's|origin/||g' | sed -e 's|^null\$|main|'", // Remove "origin/" and set the default branch to main
returnStdout: true
).trim()
}

stages {
stage('Checkout') {
steps{
git branch: "${GIT_BRANCH_LOCAL}", changelog: false, poll: false, url: 'https://github.com/GoogleCloudPlatform/dataproc-templates/'
}
}
stage('Build'){
steps {
catchError {
sh '''
python3.8 -m pip install --user virtualenv
python3.8 -m venv env
source env/bin/activate

export PACKAGE_EGG_FILE=dist/dataproc_templates_distribution.egg

cd python
python setup.py bdist_egg --output=$PACKAGE_EGG_FILE

cd ../notebooks
pip install --upgrade pip ipython ipykernel
ipython kernel install --name "python3" --user
pip install -r requirements.txt

'''
}
}
}
stage('Extract JDBC URL Parameters') {
steps {
script {
def host = TEST_JDBC_URL.substring(TEST_JDBC_URL.indexOf("//") + 2, TEST_JDBC_URL.indexOf(":", TEST_JDBC_URL.indexOf("//") + 2))
def database = TEST_JDBC_URL.substring(TEST_JDBC_URL.indexOf("/", TEST_JDBC_URL.indexOf("//") + 2) + 1, TEST_JDBC_URL.indexOf("?"))
def user = TEST_JDBC_URL.substring(TEST_JDBC_URL.indexOf("user=") + 5, TEST_JDBC_URL.indexOf("&"))
def password = TEST_JDBC_URL.substring(TEST_JDBC_URL.indexOf("password=") + 9)
env.DB_HOST = host
env.DB_NAME = database
env.DB_USER = user
env.DB_PASSWORD = password
}
}
}
stage('Parallel Execution'){
parallel{
stage('MYSQL TO SPANNER') {
steps{
retry(count: stageRetryCount) {
sh '''
source env/bin/activate

export GCS_STAGING_LOCATION=gs://dataproc-templates/integration-testing
export JARS="gs://datproc_template_nk/jars/mysql-connector-java-8.0.29.jar,gs://datproc_template_nk/jars/postgresql-42.2.6.jar,gs://datproc_template_nk/jars/mssql-jdbc-6.4.0.jre8.jar"
export SKIP_BUILD=true

cd notebooks/mysql2spanner
pip install -r requirements.txt
cd ..

wget https://dl.google.com/cloudsql/cloud_sql_proxy.linux.amd64 -O ./cloud_sql_proxy
chmod +x cloud_sql_proxy
nohup ./cloud_sql_proxy -instances=$ENV_TEST_MYSQL_INSTANCE_CONNECTION_NAME=tcp:3306 &

python run_notebook.py --script=MYSQLTOSPANNER \
--mysql.host="${DB_HOST}" \
--mysql.port="3306" \
--mysql.username="${DB_USER}" \
--mysql.password="${DB_PASSWORD}" \
--mysql.database="${DB_NAME}" \
--mysql.table.list="employee" \
--mysql.read.partition.columns="{}" \
--use.cloud.sql.proxy="true" \
--spanner.instance="dataproc-spark-test" \
--spanner.database="spark-ci-db" \
--spanner.table.primary.keys="{\\"employee\\":\\"empno\\"}"

kill $(pgrep cloud_sql_proxy)

'''
}
}
}
}
}
}
post {
always{
script {
if( env.GIT_BRANCH_LOCAL == 'main' ){
googlechatnotification url: DATAPROC_TELEPORT_WEBHOOK_URL,
message: 'Jenkins: ${JOB_NAME}\nBuild status is ${BUILD_STATUS}\nSee ${BUILD_URL}\n',
notifyFailure: 'true',
notifyAborted: 'true',
notifyUnstable: 'true',
notifyNotBuilt: 'true',
notifyBackToNormal: 'true'
}
}
}
}
}
32 changes: 12 additions & 20 deletions notebooks/mysql2spanner/MySqlToSpanner_notebook.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,7 @@
},
"outputs": [],
"source": [
"!pip3 install pymysql SQLAlchemy\n",
"!pip3 install --upgrade google-cloud-pipeline-components kfp --user -q\n",
"!pip3 install google-cloud-spanner\n",
"!pip3 install --upgrade google-cloud-storage"
"!pip3 install --upgrade google-cloud-storage google-cloud-aiplatform kfp google-cloud-pipeline-components pandas google-cloud-spanner pymysql SQLAlchemy"
]
},
{
Expand All @@ -115,9 +112,9 @@
},
"outputs": [],
"source": [
"!sudo apt-get update -y\n",
"!sudo apt-get install default-jdk -y\n",
"!sudo apt-get install maven -y"
"#!sudo apt-get update -y\n",
"#!sudo apt-get install default-jdk -y\n",
"#!sudo apt-get install maven -y"
]
},
{
Expand Down Expand Up @@ -179,7 +176,10 @@
"execution_count": null,
"id": "2703b502-1b41-44f1-bf21-41069255bc32",
"metadata": {
"tags": []
"tags": [],
"pycharm": {
"is_executing": true
}
},
"outputs": [],
"source": [
Expand Down Expand Up @@ -312,6 +312,7 @@
" MYSQL_PASSWORD = \"\"\n",
" MYSQL_DATABASE = \"\"\n",
" MYSQL_TABLE_LIST = [] # Leave list empty for migrating complete database else provide tables as ['table1','table2']\n",
" USE_CLOUD_SQL_PROXY = \"false\"\n",
" MYSQL_READ_PARTITION_COLUMNS = {} # Leave empty for default read partition columns\n",
" MYSQL_OUTPUT_SPANNER_MODE = \"overwrite\" # one of overwrite|append (Use append when schema already exists in Spanner)\n",
"\n",
Expand Down Expand Up @@ -399,7 +400,7 @@
" username=MYSQL_USERNAME,\n",
" password=MYSQL_PASSWORD,\n",
" database=MYSQL_DATABASE,\n",
" host=MYSQL_HOST,\n",
" host=MYSQL_HOST if USE_CLOUD_SQL_PROXY==\"false\" else \"127.0.0.1\",\n",
" port=MYSQL_PORT\n",
" )\n",
")\n",
Expand Down Expand Up @@ -541,7 +542,7 @@
"metadata": {},
"outputs": [],
"source": [
"!wget https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-java-8.0.29.tar.gz\n",
"!wget --backups=1 https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-java-8.0.29.tar.gz\n",
"!tar -xf mysql-connector-java-8.0.29.tar.gz\n",
"!mvn clean spotless:apply install -DskipTests "
]
Expand Down Expand Up @@ -658,9 +659,8 @@
" file_uris=FILE_URIS,\n",
" subnetwork_uri=SUBNETWORK_URI,\n",
" runtime_config_version=\"1.1\", # issue 665\n",
" service_account=DATAPROC_SERVICE_ACCOUNT,\n",
" args=TEMPLATE_SPARK_ARGS\n",
" )\n",
" ) # add DATAPROC_SERVICE_ACCOUNT if needed\n",
" time.sleep(1)\n",
"\n",
" compiler.Compiler().compile(pipeline_func=pipeline, package_path=\"pipeline.json\")\n",
Expand Down Expand Up @@ -858,14 +858,6 @@
"- You may create relationships (FKs), constraints and indexes (as needed).\n",
"- You may configure countinuous replication with [DataStream](https://cloud.google.com/datastream/docs/configure-your-source-mysql-database) or any other 3rd party tools."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "89ec62c1-0b95-4536-9339-03a4a8de035e",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand Down
19 changes: 18 additions & 1 deletion notebooks/mysql2spanner/MySqlToSpanner_parameterize_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]:
required=True,
help="MySQL database name",
)

parser.add_argument(
f"--{constants.MYSQL_TABLE_LIST_ARG}",
dest=constants.MYSQL_TABLE_LIST,
Expand All @@ -91,6 +90,14 @@ def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]:
choices=[constants.OUTPUT_MODE_OVERWRITE, constants.OUTPUT_MODE_APPEND],
)

parser.add_argument(
f"--{constants.USE_CLOUD_SQL_PROXY_ARG}",
dest=constants.USE_CLOUD_SQL_PROXY,
default="false",
required=False,
help="Flag to reach mysql instance using cloud sql proxy"
)

parser.add_argument(
f"--{constants.SPANNER_INSTANCE_ARG}",
dest=constants.SPANNER_INSTANCE,
Expand All @@ -112,6 +119,13 @@ def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]:
help='Provide table & PK column which do not have PK in MySQL table {"table_name":"primary_key"}',
)

parser.add_argument(
f"--{constants.MYSQL_READ_PARTITION_COLUMNS_ARG}",
dest=constants.MYSQL_READ_PARTITION_COLUMNS,
required=True,
help='Dictionary of custom read partition columns, e.g.: {"table2": "secondary_id"}',
)

parser.add_argument(
f"--{constants.MAX_PARALLELISM_ARG}",
dest=constants.MAX_PARALLELISM,
Expand Down Expand Up @@ -141,6 +155,9 @@ def run(self, args: Dict[str, Any]) -> None:
args[constants.SPANNER_TABLE_PRIMARY_KEYS] = json.loads(
args[constants.SPANNER_TABLE_PRIMARY_KEYS]
)
args[constants.MYSQL_READ_PARTITION_COLUMNS] = json.loads(
args[constants.MYSQL_READ_PARTITION_COLUMNS]
)

# Exclude arguments that are not needed to be passed to the notebook
ignore_keys = {constants.LOG_LEVEL_ARG, constants.OUTPUT_NOTEBOOK_ARG}
Expand Down
8 changes: 8 additions & 0 deletions notebooks/mysql2spanner/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
google-cloud-storage
google-cloud-aiplatform
kfp
google-cloud-pipeline-components
pandas
google-cloud-spanner
pymysql
SQLAlchemy
4 changes: 4 additions & 0 deletions notebooks/parameterize_script/util/notebook_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@
MYSQL_PASSWORD_ARG = "mysql.password"
MYSQL_DATABASE_ARG = "mysql.database"
MYSQL_TABLE_LIST_ARG = "mysql.table.list"
MYSQL_READ_PARTITION_COLUMNS_ARG = "mysql.read.partition.columns"
MYSQL_OUTPUT_SPANNER_MODE_ARG = "mysql.output.spanner.mode"
USE_CLOUD_SQL_PROXY_ARG = "use.cloud.sql.proxy"
SPANNER_INSTANCE_ARG = "spanner.instance"
SPANNER_DATABASE_ARG = "spanner.database"
# provide table & pk column which do not have PK in MYSQL "{"table_name":"primary_key"}"
Expand All @@ -64,7 +66,9 @@
MYSQL_PASSWORD = "MYSQL_PASSWORD"
MYSQL_DATABASE = "MYSQL_DATABASE"
MYSQL_TABLE_LIST = "MYSQL_TABLE_LIST"
MYSQL_READ_PARTITION_COLUMNS = "MYSQL_READ_PARTITION_COLUMNS"
MYSQL_OUTPUT_SPANNER_MODE = "MYSQL_OUTPUT_SPANNER_MODE"
USE_CLOUD_SQL_PROXY = "USE_CLOUD_SQL_PROXY"
SPANNER_INSTANCE = "SPANNER_INSTANCE"
SPANNER_DATABASE = "SPANNER_DATABASE"
SPANNER_TABLE_PRIMARY_KEYS = "SPANNER_TABLE_PRIMARY_KEYS"
Expand Down
1 change: 1 addition & 0 deletions notebooks/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ google-cloud-spanner>=3.35.1
google-cloud-storage>=2.9.0
pandas>=1.3.5
papermill>=2.4.0
ipykernel>=6.29.5
Loading