Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: transitFeedSyncProcessing implementation #819

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

AlfredNwolisa
Copy link
Contributor

@AlfredNwolisa AlfredNwolisa commented Nov 12, 2024

Summary:

The pull request addresses feed sync processing, ensuring proper handling and consistency of feed data using Pub/Sub messages. It includes necessary configuration files, comprehensive tests, and documentation.

Key implementations include:

  • Created FeedProcessor class with comprehensive database interaction capabilities
  • Implemented idempotent feed processing logic that handles both new and existing feeds
  • Added support for feed URL change detection and deprecation workflow
  • Integrated with Google Cloud Pub/Sub for dataset batch processing
  • Implemented stable ID generation for feed tracking across updates
  • Added comprehensive logging and error handling at all processing stages
  • Implemented database transaction management with rollback support

Added support for:

  • Authentication type handling for feeds
  • External ID mapping and management
  • Feed redirection tracking
  • URL duplication checking
  • Feed status management (active/deprecated)

This pull request addresses the functionality described in issue https://github.com/MobilityData/product-tasks/issues/102, which is part of the https://github.com/MobilityData/product-tasks/issues/95 epic.

Expected behavior:

Feed Processing Flow:

  1. The Cloud Function receives a Pub/Sub event containing feed information:
  • Decodes base64 encoded message
  • Validates and parses the payload into a FeedPayload object
  1. For each feed processing request:
  • Checks if feed exists using external ID and source
  • Validates feed URL for duplicates across the system
  1. New Feed Processing:
  • If feed doesn't exist:
  • Generates new UUID and stable ID
  • Creates feed record with active status
  • Creates external ID mapping
  • Publishes to dataset batch topic if not authenticated
  1. Feed Update Processing:
  • If feed exists with different URL:
  • Creates new feed record with updated URL
  • Deprecates old feed record
  • Updates external ID mapping
  • Creates redirect mapping between old and new feed IDs
  • Publishes update to dataset batch topic if not authenticated
  1. Database Transaction Handling:
  • Commits successful operations
  • Rolls back on any errors
  • Maintains data consistency across all operations
  1. Error Handling:
  • Provides detailed logging at each step

Testing tips:

Provide tips, procedures and sample files on how to test the feature.
Testers are invited to follow the tips AND to try anything they deem relevant outside the bounds of the testing tips.

Please make sure these boxes are checked before submitting your pull request - thanks!

  • Run the unit tests with ./scripts/api-tests.sh to make sure you didn't break anything
  • Add or update any needed documentation to the repo
  • Format the title like "feat: [new feature short description]". Title must follow the Conventional Commit Specification(https://www.conventionalcommits.org/en/v1.0.0/).
  • Linked all relevant issues
  • Include screenshot(s) showing how this pull request works and fixes the issue(s)

This commit:
- Implements feed sync processing for Pub/Sub messages
- Ensures database consistency during sync operations
- Adds configuration files for feed sync settings
- Includes comprehensive test coverage
- Documents sync process and configuration options
@AlfredNwolisa AlfredNwolisa self-assigned this Nov 12, 2024
@AlfredNwolisa AlfredNwolisa changed the title feat: Add Transitland feed sync processor Feat: transitFeedSyncProcessing implementation Nov 12, 2024
Replaced raw SQL queries with SQLAlchemy ORM models for handling database operations in feed processing. Enhanced test coverage and updated mock configurations to align with the new ORM-based approach.
@AlfredNwolisa AlfredNwolisa marked this pull request as ready for review November 18, 2024 20:06
FEEDS_DATABASE_URL=postgresql://postgres:postgres@localhost:54320/MobilityDatabase
PROJECT_ID=my-project-id
PUBSUB_TOPIC_NAME=my-topic
TRANSITLAND_API_KEY=your-api-key
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this variable used?

Comment on lines +33 to +40
logger = logging.getLogger("feed_processor")
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(
logging.Formatter("%(asctime)s - %(name)s " "- %(levelname)s - %(message)s")
)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please init the logger using the helpers.Logger.init_logger()
Example usage: https://github.com/MobilityData/mobility-feed-api/blob/main/functions-python/preprocessed_analytics/src/main.py#L36
This is important to make sure the logs are printed correctly when running in a GCP instance

Comment on lines +49 to +65
class FeedPayload:
"""Data class for feed processing payload"""

external_id: str
feed_id: str
feed_url: str
execution_id: Optional[str]
spec: str
auth_info_url: Optional[str]
auth_param_name: Optional[str]
type: Optional[str]
operator_name: Optional[str]
country: Optional[str]
state_province: Optional[str]
city_name: Optional[str]
source: str
payload_type: str
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be duplicated from feed_sync_dispatcher_transitland. To avoid code duplication we should move it to a common location and keep the naming consistent (it's renamed from TransitFeedSyncPayload).

logger.error(error_msg)
if "payload" in locals():
self.session.rollback()
logger.debug("Database transaction rolled back due to error")
Copy link
Contributor

@cka-y cka-y Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should change this to an error level logging as it is critical

Comment on lines +306 to +323
result = (
self.session.query(Feed.id, Feed.producer_url)
.join(Externalid)
.filter(
Externalid.associated_id == external_id,
Externalid.source == source,
Feed.status == "active",
)
.first()
)
if result:
logger.debug(
f"Retrieved current feed " f"info for external_id: {external_id}"
)
return result[0], result[1]

logger.debug(f"No existing feed found for external_id: {external_id}")
return None, None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[suggestion to avoid table join]

Suggested change
result = (
self.session.query(Feed.id, Feed.producer_url)
.join(Externalid)
.filter(
Externalid.associated_id == external_id,
Externalid.source == source,
Feed.status == "active",
)
.first()
)
if result:
logger.debug(
f"Retrieved current feed " f"info for external_id: {external_id}"
)
return result[0], result[1]
logger.debug(f"No existing feed found for external_id: {external_id}")
return None, None
result = (self.session
.query(Feed)
.filter(
Feed.externalids.any(
associated_id=external_id,
source=source
),
Feed.status == "active")
.first()
)
if result is not None:
logger.info(
f"Retrieved feed {result.stable_id} info for external_id: {external_id}"
)
return result.id, result.producer_url
logging.info(f"No existing feed found for external_id: {external_id}")
return None, None

# Update old feed status to deprecated
old_feed = self.session.get(Feed, old_feed_id)
if old_feed:
old_feed.status = "deprecated"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] just to confirm @emmambd in the case of the update of an existing feed during scraping. What should the status of the old feed be changed to?

Comment on lines +237 to +250
existing_external_id = (
self.session.query(Externalid)
.filter(
Externalid.associated_id == payload.external_id,
Externalid.source == payload.source,
)
.first()
)

if existing_external_id:
existing_external_id.feed_id = new_feed_id
logger.debug(
f"Updated external ID mapping to new feed_id: {new_feed_id}"
)
Copy link
Contributor

@cka-y cka-y Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should actually create a new entity of Externalid linked to the updated (new) feed.

logger.debug("Database transaction rolled back due to error")
raise

def process_new_feed(self, payload: FeedPayload) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] what about location information?

try:
# Create new feed with updated URL
new_feed_id = str(uuid.uuid4())
stable_id = f"{payload.source}-{payload.external_id}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not mistaking, following the code logic, the updated feed would have the same stable_id as the old version. Is this behaviour ok as we use the stable_id as a key in a lot of processes including the feed page url in the web ui? @emmambd @davidgamez

Comment on lines +336 to +337
{"feed_id": payload.feed_id, "execution_id": payload.execution_id}
).encode("utf-8")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the message format that the dataset processing topic expects. It should be:

{
        "message": {
            "data": 
            {
                "execution_id":  "execution_id",
                "producer_url":  "producer_url",
                "feed_stable_id":  "feed_stable_id",
                "feed_id":  "feed_id",
                "dataset_id":  "dataset_id",
                "dataset_hash":  "dataset_hash",
                "authentication_type":  "authentication_type",
                "authentication_info_url":  "authentication_info_url",
                "api_key_parameter_name": "api_key_parameter_name"
            }            
        }
    }

Refer to functions-python/batch_process_dataset for more information.

@cka-y
Copy link
Contributor

cka-y commented Nov 19, 2024

New feeds and feed updates should have status="wip" so they can be manually validated before becoming public.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants