Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rest_api_resources exit early when joining external schedulers #2171

Open
otosky opened this issue Dec 22, 2024 · 0 comments · May be fixed by #2172
Open

rest_api_resources exit early when joining external schedulers #2171

otosky opened this issue Dec 22, 2024 · 0 comments · May be fixed by #2172

Comments

@otosky
Copy link

otosky commented Dec 22, 2024

dlt version

1.5.0

Describe the problem

Dynamically created rest_api resources from a RestApiConfig having an Incremental configuration exit prematurely when "joining" an external scheduler. This seems to prohibit resources created through the rest_api_resources factory function from being able to use Airflow start/end inference for incremental parameters.

Expected behavior

I would expect that resources created through rest_api_resources would behave similarly to resources defined by the more traditional dlt.resource decorator with respect to allow_external_schedulers.

The warning that I see in the logs is always:

2024-12-21 19:41:12,788|[WARNING]|188923|134449741906688|dlt|__init__.py|_join_external_scheduler:414|Specified Incremental last value type dlt.extract.incremental.Incremental[typing.Any] is not supported. Please use DateTime, Date, float, int or str to join external schedulers.(issubclass() arg 1 must be a class)

Steps to reproduce

Here is an example test-case hitting an endpoint on the Gemini Rest API:

from pathlib import Path
import os
from datetime import datetime
import dlt
from dlt.sources.rest_api import (
    rest_api_resources,
    RESTAPIConfig,
    ClientConfig,
    EndpointResource,
    Endpoint,
)
from dlt.sources.rest_api.typing import IncrementalConfig

DEST_PATH = Path.home() / "example"
os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = f"file:///{DEST_PATH}"
os.environ["NORMALIZE__DATA_WRITER__DISABLE_COMPRESSION"] = "true"


@dlt.source
def gemini_btc():
    config = RESTAPIConfig(
        client=ClientConfig(base_url="https://api.gemini.com/v1"),
        resources=[
            EndpointResource(
                name="btcusd_trades",
                write_disposition="append",
                endpoint=Endpoint(
                    path="trades/btcusd",
                    incremental=IncrementalConfig(
                        allow_external_schedulers=True,
                        cursor_path="timestamp",
                        initial_value=datetime(2024, 12, 21).timestamp(),
                        start_param="timestamp",
                    ),
                ),
            ),
        ],
    )
    return rest_api_resources(config)


if __name__ == "__main__":
    pipeline = dlt.pipeline(
        pipeline_name="gemini_btc",
        dataset_name="gemini_btc",
        destination="filesystem",
    )
    pipeline.run(gemini_btc())

When I step through the pipeline run, I'll get to dlt.common.typing.get_generic_type_argument_from_instance which seems to handle cases where the instance.__orig_class__ is an Incremental generic, but not an Optional[Incremental[T]], which is the actual type at this point.

Operating system

Linux

Runtime environment

Astronomer

Python version

3.11

dlt data source

Rest API

dlt destination

No response

Other deployment details

No response

Additional information

No response

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Todo
Development

Successfully merging a pull request may close this issue.

1 participant