Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IBM DB2 migration #481

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions python/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import oracledb
import pyodbc
import psycopg2
import ibm_db
import threading

from typing import Any, Dict
Expand Down Expand Up @@ -342,6 +343,94 @@ def cleanup_migrate_postgresql():
mgp.add_batch_read_proc(postgresql, init_migrate_postgresql, cleanup_migrate_postgresql)


# IBM DB2 dictionary to store connections and cursors by thread
db2_dict = {}


def init_migrate_db2(
table_or_sql: str,
config: mgp.Map,
config_path: str = "",
params: mgp.Nullable[mgp.Any] = None,
):
global db2_dict

if params:
_check_params_type(params)

if len(config_path) > 0:
config = _combine_config(config=config, config_path=config_path)

if _query_is_table(table_or_sql):
table_or_sql = f"SELECT * FROM {table_or_sql}"

if threading.get_native_id not in db2_dict:
db2_dict[threading.get_native_id] = {}

if Constants.CURSOR not in db2_dict[threading.get_native_id]:
db2_dict[threading.get_native_id][Constants.CURSOR] = None

if db2_dict[threading.get_native_id][Constants.CURSOR] is None:
connection_string = _create_connection_string(config)
connection = ibm_db.connect(connection_string, "", "")
stmt = ibm_db.prepare(connection, table_or_sql)

if params:
for i, param in enumerate(params):
ibm_db.bind_param(stmt, i + 1, param)

ibm_db.execute(stmt)

db2_dict[threading.get_native_id][Constants.CONNECTION] = connection
db2_dict[threading.get_native_id][Constants.CURSOR] = stmt
db2_dict[threading.get_native_id][Constants.COLUMN_NAMES] = [
column[Constants.I_COLUMN_NAME] for column in ibm_db.fetch_assoc(stmt)
]


def db2(
table_or_sql: str,
config: mgp.Map,
config_path: str = "",
params: mgp.Nullable[mgp.Any] = None,
) -> mgp.Record(row=mgp.Map):
"""
With migrate.db2 you can access IBM DB2 and execute queries. The result table is converted into a stream,
and returned rows can be used to create or create graph structures. Config must be at least empty map.
If config_path is passed, every key,value pair from JSON file will overwrite any values in config file.

:param table_or_sql: Table name or an SQL query
:param config: Connection configuration parameters (as in ibm_db.connect),
:param config_path: Path to the JSON file containing configuration parameters (as in ibm_db.connect)
:param params: Optionally, queries may be parameterized. In that case, `params` provides parameter values
:return: The result table as a stream of rows
"""
global db2_dict
cursor = db2_dict[threading.get_native_id][Constants.CURSOR]
column_names = db2_dict[threading.get_native_id][Constants.COLUMN_NAMES]

rows = []
for _ in range(Constants.BATCH_SIZE):
row = ibm_db.fetch_assoc(cursor)
if not row:
break
rows.append(row)

return [mgp.Record(row=_name_row_cells(row, column_names)) for row in rows]


def cleanup_migrate_db2():
global db2_dict
db2_dict[threading.get_native_id][Constants.CURSOR] = None
ibm_db.commit(db2_dict[threading.get_native_id][Constants.CONNECTION])
ibm_db.close(db2_dict[threading.get_native_id][Constants.CONNECTION])
db2_dict[threading.get_native_id][Constants.CONNECTION] = None
db2_dict[threading.get_native_id][Constants.COLUMN_NAMES] = None


mgp.add_batch_read_proc(db2, init_migrate_db2, cleanup_migrate_db2)


def _query_is_table(table_or_sql: str) -> bool:
return len(table_or_sql.split()) == 1

Expand Down Expand Up @@ -375,3 +464,7 @@ def _check_params_type(params: Any, types=(dict, list, tuple)) -> None:
raise TypeError(
"Database query parameter values must be passed in a container of type List[Any] (or Map, if migrating from MySQL or Oracle DB)"
)


def _create_connection_string(config: mgp.Map) -> str:
return ";".join(f"{key}={value}" for key, value in config.items())
1 change: 1 addition & 0 deletions python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ mysql-connector-python==8.0.32
oracledb==1.2.2
pyodbc==4.0.35
psycopg2-binary==2.9.9
ibm-db==3.2.3
defusedxml==0.7.1
scipy==1.12.0
1 change: 1 addition & 0 deletions python/requirements_no_ml.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ mysql-connector-python==8.0.32
oracledb==1.2.2
pyodbc==4.0.35
psycopg2-binary==2.9.9
ibm-db==3.2.3
defusedxml==0.7.1
scipy==1.12.0
Loading