Skip to content

Commit

Permalink
allows hash mismatch when loading schema that needs migration
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Jan 6, 2023
1 parent 00ef005 commit 6855453
Show file tree
Hide file tree
Showing 6 changed files with 422 additions and 13 deletions.
4 changes: 2 additions & 2 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dlt.common.typing import DictStrAny, StrAny, REPattern, SupportsVariant, VARIANT_FIELD_FORMAT
from dlt.common.normalizers.names import TNormalizeBreakPath, TNormalizeMakePath, TNormalizeNameFunc
from dlt.common.normalizers.json import TNormalizeJSONFunc
from dlt.common.schema.typing import (LOADS_TABLE_NAME, VERSION_TABLE_NAME, TNormalizersConfig, TPartialTableSchema, TSchemaSettings, TSimpleRegex, TStoredSchema,
from dlt.common.schema.typing import (SCHEMA_ENGINE_VERSION, LOADS_TABLE_NAME, VERSION_TABLE_NAME, TNormalizersConfig, TPartialTableSchema, TSchemaSettings, TSimpleRegex, TStoredSchema,
TSchemaTables, TTableSchema, TTableSchemaColumns, TColumnSchema, TColumnProp, TDataType,
TColumnHint, TWriteDisposition)
from dlt.common.schema import utils
Expand All @@ -16,7 +16,7 @@


class Schema:
ENGINE_VERSION: ClassVar[int] = 5
ENGINE_VERSION: ClassVar[int] = SCHEMA_ENGINE_VERSION

# name normalization functions
normalize_table_name: TNormalizeNameFunc
Expand Down
10 changes: 6 additions & 4 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

from dlt.common.typing import StrAny

# current version of schema engine
SCHEMA_ENGINE_VERSION = 5

# dlt tables
VERSION_TABLE_NAME = "_dlt_version"
LOADS_TABLE_NAME = "_dlt_loads"

TDataType = Literal["text", "double", "bool", "timestamp", "bigint", "binary", "complex", "decimal", "wei"]
TColumnHint = Literal["not_null", "partition", "cluster", "primary_key", "foreign_key", "sort", "unique"]
Expand All @@ -15,10 +21,6 @@
COLUMN_HINTS: Set[TColumnHint] = set(["partition", "cluster", "primary_key", "foreign_key", "sort", "unique"])
WRITE_DISPOSITIONS: Set[TWriteDisposition] = set(get_args(TWriteDisposition))

# dlt tables
VERSION_TABLE_NAME = "_dlt_version"
LOADS_TABLE_NAME = "_dlt_loads"


class TColumnSchemaBase(TypedDict, total=True):
name: Optional[str]
Expand Down
14 changes: 8 additions & 6 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from dlt.common.utils import map_nested_in_place, str2bool
from dlt.common.validation import TCustomValidator, validate_dict
from dlt.common.schema import detections
from dlt.common.schema.typing import LOADS_TABLE_NAME, SIMPLE_REGEX_PREFIX, VERSION_TABLE_NAME, TColumnName, TNormalizersConfig, TPartialTableSchema, TSimpleRegex, TStoredSchema, TTableSchema, TTableSchemaColumns, TColumnSchemaBase, TColumnSchema, TColumnProp, TDataType, TColumnHint, TTypeDetectionFunc, TTypeDetections, TWriteDisposition
from dlt.common.schema.typing import SCHEMA_ENGINE_VERSION, LOADS_TABLE_NAME, SIMPLE_REGEX_PREFIX, VERSION_TABLE_NAME, TColumnName, TNormalizersConfig, TPartialTableSchema, TSimpleRegex, TStoredSchema, TTableSchema, TTableSchemaColumns, TColumnSchemaBase, TColumnSchema, TColumnProp, TDataType, TColumnHint, TTypeDetectionFunc, TTypeDetections, TWriteDisposition
from dlt.common.schema.exceptions import CannotCoerceColumnException, ParentTableNotFoundException, SchemaEngineNoUpgradePathException, SchemaException, TablePropertiesConflictException


Expand Down Expand Up @@ -96,14 +96,16 @@ def generate_version_hash(stored_schema: TStoredSchema) -> str:
return base64.b64encode(h.digest()).decode('ascii')


def verify_schema_hash(stored_schema: DictStrAny, empty_hash_verifies: bool = True) -> bool:
def verify_schema_hash(loaded_schema_dict: DictStrAny, verifies_if_not_migrated: bool = False) -> bool:
# generates content hash and compares with existing
current_hash: str = stored_schema.get("version_hash")
if not current_hash and empty_hash_verifies:
engine_version: str = loaded_schema_dict.get("engine_version")
# if upgrade is needed, the hash cannot be compared
if verifies_if_not_migrated and engine_version != SCHEMA_ENGINE_VERSION:
return True
# if hash is present we can assume at least v4 engine version so hash is computable
hash_ = generate_version_hash(cast(TStoredSchema, stored_schema))
return hash_ == current_hash
stored_schema = cast(TStoredSchema, loaded_schema_dict)
hash_ = generate_version_hash(stored_schema)
return hash_ == stored_schema["version_hash"]


def simple_regex_validator(path: str, pk: str, pv: Any, t: Any) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/storages/schema_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def load_schema(self, name: str) -> Schema:
try:
storage_schema = json.loads(self.storage.load(schema_file))
# prevent external modifications of schemas kept in storage
if not verify_schema_hash(storage_schema, empty_hash_verifies=True):
if not verify_schema_hash(storage_schema, verifies_if_not_migrated=True):
raise InStorageSchemaModified(name, self.config.schema_volume_path)
except FileNotFoundError:
# maybe we can import from external storage
Expand Down
Loading

0 comments on commit 6855453

Please sign in to comment.