Skip to content

Commit

Permalink
Add sqlite support
Browse files Browse the repository at this point in the history
  • Loading branch information
msg555 committed May 13, 2024
1 parent 7a35f36 commit 02807d3
Show file tree
Hide file tree
Showing 12 changed files with 152 additions and 119 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
# sudo apt update
# sudo apt install -y pkg-config mysql-server
- name: Install package deps
run: python -m pip install -e .[mysql,postgres]
run: python -m pip install -e .[all]
- name: Install dev deps
run: python -m pip install -r requirements-dev.txt
- name: Lint
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
- Improve error messaging
- Update output formatting
- Add support for creating output schema if it does not exist
- Removed `normalize_foreign_keys` option that is no longer needed
- Like constraints in target constraints are now 'and'ed together as with all
other constraints instead of 'or'ed
- Changed optional dependency names to match sqlalchemy
- Added sqlite support (mostly to help with tests)

# v0.3.0

Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ RUN pip install -U pip tqdm
WORKDIR /subsetter

COPY . ./
RUN python3 -m pip install -e .[mysql,postgres]
RUN python3 -m pip install -e .[all]

RUN adduser -S ctruser

Expand Down
24 changes: 6 additions & 18 deletions planner_config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ targets:
amount: 100
# Additional possible filters shown below. Multiple filters can be provided
# and the results will be intersected together (except all which overrides
# everything). Note that additional rows may be included beyond what is
# specified here if needed when following foreign keys.
# everything). Additional rows will only be sampled if a row from another
# targetted table has a dependence on them.

# all: true
# percent: 5.0
Expand All @@ -43,14 +43,11 @@ select:
- db2.gadgets
- db2.gizmos-*

# Add additional constraints for some tables. The planner does not attempt to
# verify that these constraints will not break foreign key relationships. In
# general it's always safe to apply constraints to tables that have no incoming
# foreign key constraints among selected tables. This config file does not
# accept arbitrary SQL; however you can manually modify the SQL in the generated
# plan with arbitrary SQL.
# Add additional constraints for some tables. Constraints can only be applied
# for tables where filtering rows would not cause foreign key constraints to be
# violated.
table_constraints:
db1.user-data:
db1.user_data:
- column: action_date
operator: '>'
value: '2023-07-01'
Expand Down Expand Up @@ -84,12 +81,3 @@ extra_fks:
# matches the name of a primary key column (that is unique within the database)
# should function as a foreign key to that table.
infer_foreign_keys: false

# If set to true the subsetter will automatically attempt to normalize some
# foreign key relationships. In particular if there are foreign key
# relationships A->B, A->C, B->C then the subsetter will assume that the
# relationship A->C is redundant and ignore it. Without this assumption this
# sort of relationship triangle cannot be sampled with a "one table, one query"
# strategy. Note that this does not currently attempt to normalize chains that
# involve more than three tables.
normalize_foreign_keys: false
13 changes: 13 additions & 0 deletions sampler_config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,26 @@ source:
username: my_user # overridden by SUBSET_SOURCE_USERNAME
password: my_s3cret # overridden by SUBSET_SOURCE_PASSWORD
# database: my_dbname # overridden by SUBSET_SOURCE_DATABASE (if needed)

# For sqlite the the file named by 'database' will be mounted as the 'main'
# schema. You can mount additional databases using the 'sqlite_databases'
# mapping:
#
# sqlite_databases:
# foo: /path/to/foo.db
# bar: /path/to/bar.db

session_sqls: # Set any additional session variables; e.g.
- SET @@session.max_statement_time=0
- SET @@session.net_read_timeout=3600
- SET @@session.net_write_timeout=3600
- SET @@session.wait_timeout=28800
- SET @@session.innodb_lock_wait_timeout=3600

# Set the transaction isolation level. Defaults to REPEATABLE READ for all
# engines other than sqlite which defaults to SERIALIZABLE.
isolation_level: "REPEATABLE READ"

# Optionally specify the source database. This can also be passed on the command
# line or through environment variables.
output:
Expand Down
15 changes: 11 additions & 4 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,18 @@ install_requires =
typing-extensions

[options.extras_require]
mysql =
pymysql ~= 1.0

postgres =
psycopg2-binary ~= 2.0
all =
sqlalchemy[pymysql,postgresql_psycopg2binary]

pymysql =
sqlalchemy[pymysql]

postgresql =
sqlalchemy[postgresql]

postgresql_psycopg2binary =
sqlalchemy[postgresql_psycopg2binary]

[options.package_data]
subsetter =
Expand Down
34 changes: 27 additions & 7 deletions subsetter/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
DatabaseDialect = Literal[
"mysql",
"postgres",
"sqlite",
]

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -37,9 +38,9 @@ def database_url(
dialect = dialect or os.getenv(f"{env_prefix}DIALECT", None) # type: ignore
host = host or os.getenv(f"{env_prefix}HOST", "localhost")
port = port or int(os.getenv(f"{env_prefix}PORT", "0"))
database = database or os.getenv(f"{env_prefix}DATABASE", "")
username = username or os.environ[f"{env_prefix}USERNAME"]
password = os.environ[f"{env_prefix}PASSWORD"] if password is None else password
database = database or os.getenv(f"{env_prefix}DATABASE", None)
username = username or os.getenv(f"{env_prefix}USERNAME", None)
password = password or os.getenv(f"{env_prefix}PASSWORD", None)

if dialect is None:
dialect = DEFAULT_DIALECT
Expand All @@ -57,13 +58,15 @@ def database_url(
port = 5432
if database:
extra_kwargs["database"] = database
elif dialect == "sqlite":
return sa.engine.URL.create(drivername="sqlite", database=database)
else:
raise ValueError(f"Unsupported SQL dialect {dialect!r}")

return sa.engine.URL.create(
drivername=drivername,
host=host,
port=port or 3306,
port=port,
username=username,
password=password,
**extra_kwargs,
Expand All @@ -86,7 +89,8 @@ class DatabaseConfig(BaseModel):
username: Optional[str] = None
password: Optional[str] = None
session_sqls: List[str] = []
isolation_level: IsolationLevel = "READ COMMITTED"
sqlite_databases: Optional[Dict[str, str]] = {}
isolation_level: Optional[IsolationLevel] = None

def database_url(
self,
Expand All @@ -106,16 +110,32 @@ def database_engine(
self,
env_prefix: Optional[str] = None,
) -> sa.engine.Engine:
if self.isolation_level:
isolation_level = self.isolation_level
elif self.dialect == "sqlite":
isolation_level = "SERIALIZABLE"
else:
isolation_level = "READ COMMITTED"
engine = sa.create_engine(
self.database_url(env_prefix=env_prefix),
isolation_level=self.isolation_level,
isolation_level=isolation_level,
pool_pre_ping=True,
)

@sa.event.listens_for(engine, "connect")
def _set_session_sqls(dbapi_connection, _):
with dbapi_connection.cursor() as cursor:
cursor = dbapi_connection.cursor()
try:
if self.dialect == "sqlite":
for db_alias, db_file in self.sqlite_databases.items():
escaped_db_file = db_file.replace("'", "''")
cursor.execute(
f"ATTACH DATABASE '{escaped_db_file}' as {db_alias}"
)

for session_sql in self.session_sqls:
cursor.execute(session_sql)
finally:
cursor.close()

return engine
50 changes: 1 addition & 49 deletions subsetter/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import dataclasses
import logging
from fnmatch import fnmatch
from typing import Dict, List, Optional, Set, TextIO, Tuple
from typing import Dict, List, Optional, Set, Tuple

import sqlalchemy as sa

Expand Down Expand Up @@ -167,37 +167,6 @@ def infer_missing_foreign_keys(self) -> None:
)
table.foreign_keys.append(fk)

def normalize_foreign_keys(self) -> None:
"""
If table A has a foreign key to table B and they both share a foreign
key on the same column in table C, remove the foreign key from table A
assuming it is redundant.
"""
fk_sets = {
table_key: {
(fk.dst_schema, fk.dst_table, fk.dst_columns)
for fk in table.foreign_keys
}
for table_key, table in self.tables.items()
}
for table in self.tables.values():
child_fk_sets = set()
for fk in table.foreign_keys:
child_fk_sets |= fk_sets[(fk.dst_schema, fk.dst_table)]
fk_out = []
for fk in table.foreign_keys:
if (fk.dst_schema, fk.dst_table, fk.dst_columns) not in child_fk_sets:
fk_out.append(fk)
else:
LOGGER.info(
"Normalizing foreign key, removed %s->%s.%s on %r",
table,
fk.dst_schema,
fk.dst_table,
fk.columns,
)
table.foreign_keys = fk_out

def toposort(self) -> List[TableMetadata]:
return [ # type: ignore
self.tables[parse_table_name(u)] for u in toposort(self.as_graph())
Expand Down Expand Up @@ -247,20 +216,3 @@ def _context(ident: SQLTableIdentifier) -> sa.Table:
return self.tables[(ident.table_schema, ident.table_name)].table_obj

return _context

def output_graphviz(self, fout: TextIO) -> None:
def _dot_label(lbl: TableMetadata) -> str:
return f'"{str(lbl)}"'

fout.write("digraph {\n")
for table in self.tables.values():
fout.write(" ")
fout.write(_dot_label(table))
fout.write(" -> {")

deps = {
self.tables[(fk.dst_schema, fk.dst_table)] for fk in table.foreign_keys
}
fout.write(", ".join(_dot_label(dep) for dep in deps))
fout.write("}\n")
fout.write("}\n")
70 changes: 43 additions & 27 deletions subsetter/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,14 @@ class ColumnConstraint(BaseModel):
ignore_fks: List[IgnoreFKConfig] = []
extra_fks: List[ExtraFKConfig] = []
infer_foreign_keys: bool = False
normalize_foreign_keys: bool = False


class Planner:
"""
Class responsible for taking in a plan configuration and a source database
schema and producing a subsetting strategy.
"""

def __init__(self, config: PlannerConfig) -> None:
self.config = config
self.engine = self.config.source.database_engine(env_prefix="SUBSET_SOURCE_")
Expand Down Expand Up @@ -92,10 +96,11 @@ def plan(self) -> SubsetPlan:
extra_table[1],
)

return self._plan_internal()

def _plan_internal(self) -> SubsetPlan:
if self.config.infer_foreign_keys:
self.meta.infer_missing_foreign_keys()
if self.config.normalize_foreign_keys:
self.meta.normalize_foreign_keys()
self._remove_ignore_fks()
self._add_extra_fks()
self._check_ignore_tables()
Expand Down Expand Up @@ -280,16 +285,23 @@ def _plan_table(
assert not foreign_keys
if target.all_:
rev_foreign_keys.clear()
LOGGER.debug("Targetting %s and sampling from %s", table, rev_foreign_keys)
if rev_foreign_keys:
LOGGER.debug(
"Sampling %s as union of target parameters and references from %s",
table,
[f"{fk.dst_schema}.{fk.dst_table}" for fk in rev_foreign_keys],
)
else:
LOGGER.debug("Targetting %s", table)
elif foreign_keys:
LOGGER.debug(
"Reverse sampling %s from %s",
"Sampling %s as intersection of references from %s",
table,
[f"{fk.dst_schema}.{fk.dst_table}" for fk in foreign_keys],
)
else:
LOGGER.debug(
"Sampling %s from %s",
"Sampling %s as union of references from %s",
table,
[f"{fk.dst_schema}.{fk.dst_table}" for fk in rev_foreign_keys],
)
Expand Down Expand Up @@ -327,17 +339,26 @@ def _plan_table(
f"{table.schema}.{table.name}", []
)
conf_constraints_sql: List[SQLWhereClause] = []
all_columns = {column.name for column in table.table_obj.columns}
if conf_constraints and rev_foreign_keys:
raise ValueError(
f"Cannot apply table constraints to {table} without violating "
"foreign key constraints of previously sampled tables",
)

for conf_constraint in conf_constraints:
if conf_constraint.column in all_columns:
conf_constraints_sql.append(
SQLWhereClauseOperator(
type_="operator",
operator=conf_constraint.operator,
column=conf_constraint.column,
value=conf_constraint.value,
)
if conf_constraint.column not in table.table_obj.columns:
raise ValueError(
"Table {table} has no column {conf_constraint.column!r} for table constraint",
)

conf_constraints_sql.append(
SQLWhereClauseOperator(
type_="operator",
operator=conf_constraint.operator,
column=conf_constraint.column,
value=conf_constraint.value,
)
)

# Calculate initial foreign-key / config constraint statement
statements: List[SQLStatementSelect] = [
Expand Down Expand Up @@ -376,19 +397,14 @@ def _plan_table(
)

for column, patterns in target.like.items():
target_constraints.append(
SQLWhereClauseOr(
type_="or",
conditions=[
SQLWhereClauseOperator(
type_="operator",
operator="like",
column=column,
value=pattern,
)
for pattern in patterns
],
target_constraints.extend(
SQLWhereClauseOperator(
type_="operator",
operator="like",
column=column,
value=pattern,
)
for pattern in patterns
)

for column, in_list in target.in_.items():
Expand Down
Loading

0 comments on commit 02807d3

Please sign in to comment.