diff --git a/data_diff/databases/redshift.py b/data_diff/databases/redshift.py index d31258e1..44e86b17 100644 --- a/data_diff/databases/redshift.py +++ b/data_diff/databases/redshift.py @@ -122,6 +122,38 @@ def query_pg_get_cols(self, path: DbPath) -> Dict[str, tuple]: return schema_dict + def select_svv_columns_schema(self, path: DbPath) -> Dict[str, tuple]: + database, schema, table = self._normalize_table_path(path) + + db_clause = "" + if database: + db_clause = f" AND table_catalog = '{database.lower()}'" + + return ( + f""" + select + distinct + column_name, + data_type, + datetime_precision, + numeric_precision, + numeric_scale + from + svv_columns + where table_name = '{table.lower()}' and table_schema = '{schema.lower()}' + """ + + db_clause + ) + + def query_svv_columns(self, path: DbPath) -> Dict[str, tuple]: + rows = self.query(self.select_svv_columns_schema(path), list) + if not rows: + raise RuntimeError(f"{self.name}: Table '{'.'.join(path)}' does not exist, or has no columns") + + d = {r[0]: r for r in rows} + assert len(d) == len(rows) + return d + # when using a non-information_schema source, strip (N) from type(N) etc. to match # typical information_schema output def _normalize_schema_info(self, rows) -> Dict[str, tuple]: @@ -150,7 +182,10 @@ def query_table_schema(self, path: DbPath) -> Dict[str, tuple]: try: return self.query_external_table_schema(path) except RuntimeError: - return self.query_pg_get_cols(path) + try: + return self.query_pg_get_cols(path) + except Exception: + return self.query_svv_columns(path) def _normalize_table_path(self, path: DbPath) -> DbPath: if len(path) == 1: