Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NotImplementedError raised when using FeatureStore.push() #4403

Closed
singsinghai opened this issue Aug 13, 2024 · 4 comments
Closed

NotImplementedError raised when using FeatureStore.push() #4403

singsinghai opened this issue Aug 13, 2024 · 4 comments

Comments

@singsinghai
Copy link

Expected Behavior

I'm currently starting to try simple usecase of Feast v0.40 for local development, I'm ingesting data from Airflow job to Postgres.

def process():
        ...
        store = FeatureStore(repo_path=f"{mother_path}/helpers/feast/")

        driver = Entity(name="driver_id", join_keys=["driver_id"], description="driver ID")

        table_name = 'feast_driver_hourly_stats'
        create_postgres_table(
            table_name=table_name, 
            schema="""
                driver_id SERIAL PRIMARY KEY,
                event_timestamp TIMESTAMP,
                surge_score FLOAT,
                trip_cancel INT,
                number_trip INT,
                latest_completed INT
            """
        )
        

        driver_stats_source = PostgreSQLSource(
            name=table_name,
            query=f"SELECT * FROM airflow.feast.{table_name}",
            timestamp_field="event_timestamp",
        )

        push_source = PushSource(
            name="push_source",
            batch_source=driver_stats_source
        )

        # Define the feature view (without specifying a source since we'll use a DataFrame)
        driver_feature_view = FeatureView(
            name="driver_feature_view",
            entities=[driver],
            ttl=None,  # Set TTL if needed
            schema=[
                Field(name="surge_score", dtype=Float32),
                Field(name="trip_cancel", dtype=Int64),
                Field(name="number_trip", dtype=Int64),
                Field(name="latest_completed", dtype=Int64),
            ],
            # online=True,
            stream_source=push_source
        )

        # Apply the feature store (register entities and feature views)
        store.apply([driver_feature_view])
        result = store.push("push_source", data, to=PushMode.OFFLINE)

I have ensured that the database, the schema, and the table exists. I expect after calling push(), the data will then be ingested (written) into the destined table.

Current Behavior

Suddenly, I got the follow error:

File "/opt/airflow/dags/test_feast.py", line 112, in process
    result = store.push("push_source", data, to=PushMode.OFFLINE)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/feast/feature_store.py", line 1427, in push
    self.write_to_offline_store(
  File "/home/airflow/.local/lib/python3.11/site-packages/feast/feature_store.py", line 1511, in write_to_offline_store
    provider.ingest_df_to_offline_store(feature_view, table)
  File "/home/airflow/.local/lib/python3.11/site-packages/feast/infra/passthrough_provider.py", line 303, in ingest_df_to_offline_store
    self.offline_write_batch(self.repo_config, feature_view, table, None)
  File "/home/airflow/.local/lib/python3.11/site-packages/feast/infra/passthrough_provider.py", line 180, in offline_write_batch
    self.offline_store.__class__.offline_write_batch(
  File "/home/airflow/.local/lib/python3.11/site-packages/feast/infra/offline_stores/offline_store.py", line 353, in offline_write_batch
    raise NotImplementedError

The write_to_offline_store() method might have called directly to an interface, causing the bug. I'm currently trying to review my configs/codes to spot any mistake and also read the push() method. However, I think I might have to raise an issue here.

Specifications

  • Version: v0.40
  • Platform: Airflow, Postgres
  • Subsystem: Python 3.11
@tokoko
Copy link
Collaborator

tokoko commented Aug 13, 2024

postgres offline store doesn't implement offline_write_batch yet. There was a PR #3767 a while back, but it was abandoned. The relevant ticket is #3759.

@singsinghai
Copy link
Author

postgres offline store doesn't implement offline_write_batch yet. There was a PR #3767 a while back, but it was abandoned. The relevant ticket is #3759.

Hi @tokoko , so currently with Postgres, Feast supports offline retrieval but not offline ingestion? Is there currently any available offline ingestion source yet?

@tokoko
Copy link
Collaborator

tokoko commented Aug 20, 2024

@singsinghai sorry, forgot to reply here. yes, that's right, you can't ingest data with feast, your best option is to populate postgres tables w/o feast and use feast only for retrieval.

Is there currently any available offline ingestion source yet?

Not sure I get the question tbh, if you're using postgres offline store, then no. there's no way to ingest data to offline store source tables with feast.

@tokoko
Copy link
Collaborator

tokoko commented Sep 25, 2024

Closing this as we already have another feature request ticket.

@tokoko tokoko closed this as completed Sep 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants