Skip to content

Commit

Permalink
chore: reverve eng datetime format
Browse files Browse the repository at this point in the history
  • Loading branch information
donotpush committed Oct 16, 2024
1 parent b29d75e commit 17dc69a
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 34 deletions.
24 changes: 24 additions & 0 deletions dlt/common/time.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import contextlib
import datetime # noqa: I251
import re
from typing import Any, Optional, Union, overload, TypeVar, Callable # noqa

from pendulum.parsing import (
Expand Down Expand Up @@ -154,6 +155,29 @@ def ensure_pendulum_time(value: Union[str, datetime.time]) -> pendulum.Time:
raise TypeError(f"Cannot coerce {value} to a pendulum.Time object.")


def detect_datetime_format(value: str) -> Optional[str]:
format_patterns = {
re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$"): "%Y-%m-%dT%H:%M:%SZ", # UTC 'Z'
re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\+\d{2}:\d{2}$"
): "%Y-%m-%dT%H:%M:%S%z", # Timezone offset
re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$"): "%Y-%m-%dT%H:%M:%S", # No timezone
re.compile(r"^\d{4}-\d{2}-\d{2}$"): "%Y-%m-%d", # Date only
re.compile(r"^\d{4}-\d{2}$"): "%Y-%m", # Year and month
re.compile(r"^\d{4}-W\d{2}$"): "%Y-W%W", # Week-based date
re.compile(r"^\d{4}-\d{3}$"): "%Y-%j", # Ordinal date
re.compile(r"^\d{4}$"): "%Y", # Year only
}

# Match against each compiled regular expression
for pattern, format_str in format_patterns.items():
if pattern.match(value):
return format_str

# Return None if no pattern matches
return None


def to_py_datetime(value: datetime.datetime) -> datetime.datetime:
"""Convert a pendulum.DateTime to a py datetime object.
Expand Down
70 changes: 36 additions & 34 deletions dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from datetime import datetime, timedelta # noqa: I251
from datetime import datetime, timedelta, date # noqa: I251
from typing import Generic, ClassVar, Any, Optional, Type, Dict
import dateutil.parser
from typing_extensions import get_origin, get_args

import inspect
Expand All @@ -9,6 +10,7 @@
from dlt.common import logger
from dlt.common.exceptions import MissingDependencyException
from dlt.common.pendulum import pendulum
from dlt.common.time import ensure_pendulum_datetime, detect_datetime_format
from dlt.common.jsonpath import compile_path
from dlt.common.typing import (
TDataItem,
Expand Down Expand Up @@ -298,41 +300,41 @@ def _apply_lag(self, value: TCursorValue) -> TCursorValue:
if self.lag is None:
return value

# parses string representation for datetime, int, or float
if isinstance(value, str):
try:
value = datetime.fromisoformat(value) # type: ignore
except ValueError:
try:
value = float(value) # type: ignore
except ValueError:
try:
value = int(value) # type: ignore
except ValueError:
logger.warning(
"Failed to convert incremental string cursor to datetime, float, or int"
)

if isinstance(value, datetime):
if self.last_value_func is max:
return value - timedelta(seconds=self.lag) # type: ignore
elif self.last_value_func is min:
return value + timedelta(seconds=self.lag) # type: ignore
# Determine if the input is originally a string and capture its format
is_str = isinstance(value, str)
original_format = None
if is_str:
original_format = detect_datetime_format(value)
value = ensure_pendulum_datetime(value) # type: ignore

# Apply lag based on the type of value
if isinstance(value, (datetime, date)):
delta = (
timedelta(seconds=self.lag)
if isinstance(value, datetime)
else timedelta(days=self.lag)
)
value = value - delta if self.last_value_func is max else value + delta # type: ignore

# If originally a string, convert back to the original format
if is_str and original_format:
value = value.strftime(original_format) # type: ignore

elif isinstance(value, int):
if self.last_value_func is max:
return int(value - self.lag) # type: ignore
elif self.last_value_func is min:
return int(value + self.lag) # type: ignore
# Ensure that int types remain integers
adjusted_value = value - self.lag if self.last_value_func is max else value + self.lag
value = int(adjusted_value) # type: ignore

elif isinstance(value, float):
if self.last_value_func is max:
return value - self.lag # type: ignore
elif self.last_value_func is min:
return value + self.lag # type: ignore

logger.warning(
f"Lag is not supported for last_value_func: {self.last_value_func} and cursor type:"
f" {type(value)}"
)
value = value - self.lag if self.last_value_func is max else value + self.lag # type: ignore

else:
# Handle unsupported types
logger.warning(
f"Lag is not supported for last_value_func: {self.last_value_func} and cursor type:"
f" {type(value)}"
)

return value

def get_state(self) -> IncrementalColumnState:
Expand Down

0 comments on commit 17dc69a

Please sign in to comment.