-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[App Feature] [JsonSchema] Enabling the overriding of the catalog on the stream [LOYAL-10211] #2
base: master
Are you sure you want to change the base?
[App Feature] [JsonSchema] Enabling the overriding of the catalog on the stream [LOYAL-10211] #2
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯 Perfect, great jobs 👏
I've put some comments to discuss with you on the unused methods.
table_spec = { | ||
"columns": [ | ||
{"name": "newcol", "type": "integer", "is_new_col": True} | ||
], | ||
"name": self.table_name | ||
} | ||
alter_schema_test_table(table_spec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
table_spec = { | |
"columns": [ | |
{"name": "newcol", "type": "integer", "is_new_col": True} | |
], | |
"name": self.table_name | |
} | |
alter_schema_test_table(table_spec) | |
table_spec = { | |
"columns": [ | |
{"name": "newcol", "type": "integer"} | |
], | |
"name": self.table_name | |
} | |
add_columns(table_spec) |
Would it be more simpler if we explicitly add_columns
in lieu of the generics alter_schema_test_table
method ?
As reviewer, I was trying to reading alter_schema_test_table
to figure out that this method is actually adding new columns in this case 🤣
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay sir, i will change this and fix this in my next commit 😄
tests/utils.py
Outdated
def alter_schema_test_table(table_spec, target_db='postgres'): | ||
with get_test_connection(target_db) as conn: | ||
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: | ||
table = table_spec['name'] | ||
for col_spec in table_spec['columns']: | ||
for sql in build_alter_table_sql(quote_ident(table, cur), col_spec): | ||
LOGGER.info("alter table sql: %s", sql) | ||
cur.execute(sql) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def alter_schema_test_table(table_spec, target_db='postgres'): | |
with get_test_connection(target_db) as conn: | |
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: | |
table = table_spec['name'] | |
for col_spec in table_spec['columns']: | |
for sql in build_alter_table_sql(quote_ident(table, cur), col_spec): | |
LOGGER.info("alter table sql: %s", sql) | |
cur.execute(sql) | |
def add_columns(table_spec, target_db='postgres'): | |
with get_test_connection(target_db) as conn: | |
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: | |
table = table_spec['name'] | |
for col_name, col_type in table_spec['columns']: | |
sql = "ALTER TABLE {} ADD {} {}".format(table_name, col_name, col_type) | |
LOGGER.info("alter table sql: %s", sql) | |
cur.execute(sql) |
Nits
tests/utils.py
Outdated
def build_alter_table_sql(table, col_spec): | ||
sqls = [] | ||
if altered_name:=col_spec.get('change_name'): | ||
sqls.append("ALTER TABLE {} RENAME COLUMN {} TO {}".format(table, col_spec['name'], altered_name)) | ||
if altered_type:=col_spec.get('is_change_type'): | ||
sqls.append("ALTER TABLE {} ALTER COLUMN {} TYPE {}".format(table, col_spec['name'], altered_type)) | ||
if col_spec.get("is_new_col"): | ||
sqls.append("ALTER TABLE {} ADD {} {}".format(table, col_spec['name'], col_spec['type'])) | ||
return sqls |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nits: I understand that we will test more cases, but we don't actually use them all now nor the future. Thus, I believe that we should keep what we only need for now and add more utility in latter PR 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why I did this is because of the reason why you've specified. So If we only care about the adding another column may be we can remove this function and merge the logic in add_columns
function 😄
💯 Thanks @quocnguyendinh for making the code cleaner, would you mind taking a look at this PR as well @solteszad ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the code looks really polished to me! i would like to see it in practice, can we check the demo on staging? 😲
https://kaligo.atlassian.net/browse/LOYAL-10211
Background
As described in the RFC's link above, flattening the schema is really needed. However, due to the limitation of the current tap, the stream does not contain the expected schemas for the downstream components to handle, thus the schema ends up not being flattened. This PR is to fix that.
Design
Impact
This will help for the feature of flattening schema using
flattening_enabled
andflattening_max_depth
from the downstream components (likemapper
) to be activated, thus realizing the schema flattening feature.Caveats
This PR does not cover one edge case in which the schema of the existing tables changes (like the column name is changed, the data type is changed,...) while the catalog is already created before. All those changes will not be synchronized onto the stream unless we delete the Catalog default file in
.run/meltano/tap-postgres
.(For the case adding a new column, this can work normally).
Testing
There are 2 integration tests having been added. One is for testing the expected behavior of the stream merging mechanism and one is for testing the case in which we will add another column to see if this merging can adapt to the schema evolution or not.
Docs
RFC: https://www.notion.so/kaligo/JSON-schema-flatenning-improvement-271d7e4842d74da1a5de9ae59d3ae656