diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index 3f076541ad..82dd9e73cc 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -6,7 +6,7 @@ from dlt.common.typing import DictStrAny, StrAny, REPattern, SupportsVariant, VARIANT_FIELD_FORMAT from dlt.common.normalizers.names import TNormalizeBreakPath, TNormalizeMakePath, TNormalizeNameFunc from dlt.common.normalizers.json import TNormalizeJSONFunc -from dlt.common.schema.typing import (LOADS_TABLE_NAME, VERSION_TABLE_NAME, TNormalizersConfig, TPartialTableSchema, TSchemaSettings, TSimpleRegex, TStoredSchema, +from dlt.common.schema.typing import (SCHEMA_ENGINE_VERSION, LOADS_TABLE_NAME, VERSION_TABLE_NAME, TNormalizersConfig, TPartialTableSchema, TSchemaSettings, TSimpleRegex, TStoredSchema, TSchemaTables, TTableSchema, TTableSchemaColumns, TColumnSchema, TColumnProp, TDataType, TColumnHint, TWriteDisposition) from dlt.common.schema import utils @@ -16,7 +16,7 @@ class Schema: - ENGINE_VERSION: ClassVar[int] = 5 + ENGINE_VERSION: ClassVar[int] = SCHEMA_ENGINE_VERSION # name normalization functions normalize_table_name: TNormalizeNameFunc diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index e0d91c6e8a..c554e509ad 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -2,6 +2,12 @@ from dlt.common.typing import StrAny +# current version of schema engine +SCHEMA_ENGINE_VERSION = 5 + +# dlt tables +VERSION_TABLE_NAME = "_dlt_version" +LOADS_TABLE_NAME = "_dlt_loads" TDataType = Literal["text", "double", "bool", "timestamp", "bigint", "binary", "complex", "decimal", "wei"] TColumnHint = Literal["not_null", "partition", "cluster", "primary_key", "foreign_key", "sort", "unique"] @@ -15,10 +21,6 @@ COLUMN_HINTS: Set[TColumnHint] = set(["partition", "cluster", "primary_key", "foreign_key", "sort", "unique"]) WRITE_DISPOSITIONS: Set[TWriteDisposition] = set(get_args(TWriteDisposition)) -# dlt tables -VERSION_TABLE_NAME = "_dlt_version" -LOADS_TABLE_NAME = "_dlt_loads" - class TColumnSchemaBase(TypedDict, total=True): name: Optional[str] diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index d7e9a9be7b..90108dccd4 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -21,7 +21,7 @@ from dlt.common.utils import map_nested_in_place, str2bool from dlt.common.validation import TCustomValidator, validate_dict from dlt.common.schema import detections -from dlt.common.schema.typing import LOADS_TABLE_NAME, SIMPLE_REGEX_PREFIX, VERSION_TABLE_NAME, TColumnName, TNormalizersConfig, TPartialTableSchema, TSimpleRegex, TStoredSchema, TTableSchema, TTableSchemaColumns, TColumnSchemaBase, TColumnSchema, TColumnProp, TDataType, TColumnHint, TTypeDetectionFunc, TTypeDetections, TWriteDisposition +from dlt.common.schema.typing import SCHEMA_ENGINE_VERSION, LOADS_TABLE_NAME, SIMPLE_REGEX_PREFIX, VERSION_TABLE_NAME, TColumnName, TNormalizersConfig, TPartialTableSchema, TSimpleRegex, TStoredSchema, TTableSchema, TTableSchemaColumns, TColumnSchemaBase, TColumnSchema, TColumnProp, TDataType, TColumnHint, TTypeDetectionFunc, TTypeDetections, TWriteDisposition from dlt.common.schema.exceptions import CannotCoerceColumnException, ParentTableNotFoundException, SchemaEngineNoUpgradePathException, SchemaException, TablePropertiesConflictException @@ -96,14 +96,16 @@ def generate_version_hash(stored_schema: TStoredSchema) -> str: return base64.b64encode(h.digest()).decode('ascii') -def verify_schema_hash(stored_schema: DictStrAny, empty_hash_verifies: bool = True) -> bool: +def verify_schema_hash(loaded_schema_dict: DictStrAny, verifies_if_not_migrated: bool = False) -> bool: # generates content hash and compares with existing - current_hash: str = stored_schema.get("version_hash") - if not current_hash and empty_hash_verifies: + engine_version: str = loaded_schema_dict.get("engine_version") + # if upgrade is needed, the hash cannot be compared + if verifies_if_not_migrated and engine_version != SCHEMA_ENGINE_VERSION: return True # if hash is present we can assume at least v4 engine version so hash is computable - hash_ = generate_version_hash(cast(TStoredSchema, stored_schema)) - return hash_ == current_hash + stored_schema = cast(TStoredSchema, loaded_schema_dict) + hash_ = generate_version_hash(stored_schema) + return hash_ == stored_schema["version_hash"] def simple_regex_validator(path: str, pk: str, pv: Any, t: Any) -> bool: diff --git a/dlt/common/storages/schema_storage.py b/dlt/common/storages/schema_storage.py index e7b6e53260..cd4aaec377 100644 --- a/dlt/common/storages/schema_storage.py +++ b/dlt/common/storages/schema_storage.py @@ -30,7 +30,7 @@ def load_schema(self, name: str) -> Schema: try: storage_schema = json.loads(self.storage.load(schema_file)) # prevent external modifications of schemas kept in storage - if not verify_schema_hash(storage_schema, empty_hash_verifies=True): + if not verify_schema_hash(storage_schema, verifies_if_not_migrated=True): raise InStorageSchemaModified(name, self.config.schema_volume_path) except FileNotFoundError: # maybe we can import from external storage diff --git a/tests/common/cases/schemas/sheets/google_spreadsheet_v4.schema.json b/tests/common/cases/schemas/sheets/google_spreadsheet_v4.schema.json new file mode 100644 index 0000000000..b74a4a5c51 --- /dev/null +++ b/tests/common/cases/schemas/sheets/google_spreadsheet_v4.schema.json @@ -0,0 +1,397 @@ +{ + "version": 3, + "version_hash": "vgxR7fdg9HNUY/7iHKNQTS2qkYaEvKgrSiHIMWoIx7Y=", + "engine_version": 4, + "name": "google_spreadsheet", + "tables": { + "_dlt_version": { + "name": "_dlt_version", + "columns": { + "version": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "version", + "data_type": "bigint", + "nullable": false + }, + "engine_version": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "engine_version", + "data_type": "bigint", + "nullable": false + }, + "inserted_at": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "inserted_at", + "data_type": "timestamp", + "nullable": false + }, + "schema_name": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "schema_name", + "data_type": "text", + "nullable": false + }, + "version_hash": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "version_hash", + "data_type": "text", + "nullable": false + }, + "schema": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "schema", + "data_type": "text", + "nullable": false + } + }, + "write_disposition": "skip", + "description": "Created by DLT. Tracks schema updates" + }, + "_dlt_loads": { + "name": "_dlt_loads", + "columns": { + "load_id": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "load_id", + "data_type": "text", + "nullable": false + }, + "status": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "status", + "data_type": "bigint", + "nullable": false + }, + "inserted_at": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "inserted_at", + "data_type": "timestamp", + "nullable": false + } + }, + "write_disposition": "skip", + "description": "Created by DLT. Tracks completed loads" + }, + "_2022_05": { + "name": "_2022_05", + "columns": { + "sender_id": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "sender_id", + "data_type": "text", + "nullable": true + }, + "message_id": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "message_id", + "data_type": "bigint", + "nullable": true + }, + "annotation": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "annotation", + "data_type": "text", + "nullable": true + }, + "confidence": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "confidence", + "data_type": "double", + "nullable": true + }, + "count": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "count", + "data_type": "bigint", + "nullable": true + }, + "added_at": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "added_at", + "data_type": "text", + "nullable": true + }, + "reviewed": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "reviewed", + "data_type": "bool", + "nullable": true + }, + "_dlt_load_id": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "_dlt_load_id", + "data_type": "text", + "nullable": false + }, + "_dlt_id": { + "partition": false, + "cluster": false, + "unique": true, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "_dlt_id", + "data_type": "text", + "nullable": false + } + }, + "write_disposition": "replace" + }, + "model_metadata": { + "name": "model_metadata", + "columns": { + "model_name": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "model_name", + "data_type": "text", + "nullable": true + }, + "accuracy": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "accuracy", + "data_type": "double", + "nullable": true + }, + "last_run_time": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "last_run_time", + "data_type": "text", + "nullable": true + }, + "_dlt_load_id": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "_dlt_load_id", + "data_type": "text", + "nullable": false + }, + "_dlt_id": { + "partition": false, + "cluster": false, + "unique": true, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "_dlt_id", + "data_type": "text", + "nullable": false + } + }, + "write_disposition": "replace" + }, + "_dlt_pipeline_state": { + "name": "_dlt_pipeline_state", + "columns": { + "version": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "version", + "data_type": "bigint", + "nullable": true + }, + "engine_version": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "engine_version", + "data_type": "bigint", + "nullable": true + }, + "pipeline_name": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "pipeline_name", + "data_type": "text", + "nullable": true + }, + "state": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "state", + "data_type": "text", + "nullable": true + }, + "created_at": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "created_at", + "data_type": "timestamp", + "nullable": true + }, + "_dlt_load_id": { + "partition": false, + "cluster": false, + "unique": false, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "_dlt_load_id", + "data_type": "text", + "nullable": false + }, + "_dlt_id": { + "partition": false, + "cluster": false, + "unique": true, + "sort": false, + "primary_key": false, + "foreign_key": false, + "name": "_dlt_id", + "data_type": "text", + "nullable": false + } + }, + "write_disposition": "append" + } + }, + "settings": { + "default_hints": { + "not_null": [ + "_dlt_id", + "_dlt_root_id", + "_dlt_parent_id", + "_dlt_list_idx", + "_dlt_load_id" + ], + "foreign_key": [ + "_dlt_parent_id" + ], + "unique": [ + "_dlt_id" + ] + } + }, + "normalizers": { + "detections": [ + "timestamp", + "iso_timestamp" + ], + "names": "dlt.common.normalizers.names.snake_case", + "json": { + "module": "dlt.common.normalizers.json.relational" + } + } +} \ No newline at end of file diff --git a/tests/common/storages/test_schema_storage.py b/tests/common/storages/test_schema_storage.py index 254a3a8449..6e4cb42545 100644 --- a/tests/common/storages/test_schema_storage.py +++ b/tests/common/storages/test_schema_storage.py @@ -48,6 +48,14 @@ def test_load_non_existing(storage: SchemaStorage) -> None: storage.load_schema("nonexisting") +def test_load_schema_with_upgrade() -> None: + # point the storage root to v4 schema google_spreadsheet_v3.schema + storage = LiveSchemaStorage(SchemaVolumeConfiguration(COMMON_TEST_CASES_PATH + "schemas/sheets")) + # the hash when computed on the schema does not match the version_hash in the file so it should raise InStorageSchemaModified + # but because the version upgrade is required, the check is skipped and the load succeeds + storage.load_schema("google_spreadsheet_v4") + + def test_import_non_existing(synced_storage: SchemaStorage) -> None: with pytest.raises(SchemaNotFoundError): synced_storage.load_schema("nonexisting")