Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[App Feature] [JsonSchema] Enabling the overriding of the catalog on the stream [LOYAL-10211] #2

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

quocnguyendinh
Copy link

@quocnguyendinh quocnguyendinh commented Apr 9, 2024

https://kaligo.atlassian.net/browse/LOYAL-10211

Background

As described in the RFC's link above, flattening the schema is really needed. However, due to the limitation of the current tap, the stream does not contain the expected schemas for the downstream components to handle, thus the schema ends up not being flattened. This PR is to fix that.

Design

  • Writing the merging process to override the schema from the catalog on the schema from the process of discovering db.
  • Writing the Unit Tests to test for our expected behaviors.

Impact

This will help for the feature of flattening schema using flattening_enabled and flattening_max_depth from the downstream components (like mapper) to be activated, thus realizing the schema flattening feature.

Caveats

This PR does not cover one edge case in which the schema of the existing tables changes (like the column name is changed, the data type is changed,...) while the catalog is already created before. All those changes will not be synchronized onto the stream unless we delete the Catalog default file in .run/meltano/tap-postgres.
(For the case adding a new column, this can work normally).

Testing

There are 2 integration tests having been added. One is for testing the expected behavior of the stream merging mechanism and one is for testing the case in which we will add another column to see if this merging can adapt to the schema evolution or not.

Docs

RFC: https://www.notion.so/kaligo/JSON-schema-flatenning-improvement-271d7e4842d74da1a5de9ae59d3ae656

Copy link

@khoaanguyenn khoaanguyenn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 Perfect, great jobs 👏

I've put some comments to discuss with you on the unused methods.

Comment on lines 131 to 137
table_spec = {
"columns": [
{"name": "newcol", "type": "integer", "is_new_col": True}
],
"name": self.table_name
}
alter_schema_test_table(table_spec)
Copy link

@khoaanguyenn khoaanguyenn Apr 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
table_spec = {
"columns": [
{"name": "newcol", "type": "integer", "is_new_col": True}
],
"name": self.table_name
}
alter_schema_test_table(table_spec)
table_spec = {
"columns": [
{"name": "newcol", "type": "integer"}
],
"name": self.table_name
}
add_columns(table_spec)

Would it be more simpler if we explicitly add_columns in lieu of the generics alter_schema_test_table method ?

As reviewer, I was trying to reading alter_schema_test_table to figure out that this method is actually adding new columns in this case 🤣

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay sir, i will change this and fix this in my next commit 😄

tests/utils.py Outdated
Comment on lines 145 to 152
def alter_schema_test_table(table_spec, target_db='postgres'):
with get_test_connection(target_db) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
table = table_spec['name']
for col_spec in table_spec['columns']:
for sql in build_alter_table_sql(quote_ident(table, cur), col_spec):
LOGGER.info("alter table sql: %s", sql)
cur.execute(sql)
Copy link

@khoaanguyenn khoaanguyenn Apr 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def alter_schema_test_table(table_spec, target_db='postgres'):
with get_test_connection(target_db) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
table = table_spec['name']
for col_spec in table_spec['columns']:
for sql in build_alter_table_sql(quote_ident(table, cur), col_spec):
LOGGER.info("alter table sql: %s", sql)
cur.execute(sql)
def add_columns(table_spec, target_db='postgres'):
with get_test_connection(target_db) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
table = table_spec['name']
for col_name, col_type in table_spec['columns']:
sql = "ALTER TABLE {} ADD {} {}".format(table_name, col_name, col_type)
LOGGER.info("alter table sql: %s", sql)
cur.execute(sql)

Nits

tests/utils.py Outdated
Comment on lines 135 to 143
def build_alter_table_sql(table, col_spec):
sqls = []
if altered_name:=col_spec.get('change_name'):
sqls.append("ALTER TABLE {} RENAME COLUMN {} TO {}".format(table, col_spec['name'], altered_name))
if altered_type:=col_spec.get('is_change_type'):
sqls.append("ALTER TABLE {} ALTER COLUMN {} TYPE {}".format(table, col_spec['name'], altered_type))
if col_spec.get("is_new_col"):
sqls.append("ALTER TABLE {} ADD {} {}".format(table, col_spec['name'], col_spec['type']))
return sqls

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nits: I understand that we will test more cases, but we don't actually use them all now nor the future. Thus, I believe that we should keep what we only need for now and add more utility in latter PR 👍

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I did this is because of the reason why you've specified. So If we only care about the adding another column may be we can remove this function and merge the logic in add_columns function 😄

@khoaanguyenn khoaanguyenn changed the title Enabling the overriding of the catalog on the stream [App Feature] [JsonSchema] Enabling the overriding of the catalog on the stream [LOYAL-10211] Apr 10, 2024
@khoaanguyenn khoaanguyenn added enhancement New feature or request ready for review labels Apr 10, 2024
@khoaanguyenn
Copy link

💯 Thanks @quocnguyendinh for making the code cleaner, would you mind taking a look at this PR as well @solteszad ?

Copy link

@solteszad solteszad left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the code looks really polished to me! i would like to see it in practice, can we check the demo on staging? 😲

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
code approved enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants