-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat: transitFeedSyncProcessing implementation #819
base: main
Are you sure you want to change the base?
Conversation
This commit: - Implements feed sync processing for Pub/Sub messages - Ensures database consistency during sync operations - Adds configuration files for feed sync settings - Includes comprehensive test coverage - Documents sync process and configuration options
functions-python/feed_sync_process_transitland/requirements.txt
Outdated
Show resolved
Hide resolved
Replaced raw SQL queries with SQLAlchemy ORM models for handling database operations in feed processing. Enhanced test coverage and updated mock configurations to align with the new ORM-based approach.
FEEDS_DATABASE_URL=postgresql://postgres:postgres@localhost:54320/MobilityDatabase | ||
PROJECT_ID=my-project-id | ||
PUBSUB_TOPIC_NAME=my-topic | ||
TRANSITLAND_API_KEY=your-api-key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this variable used?
logger = logging.getLogger("feed_processor") | ||
if not logger.handlers: | ||
handler = logging.StreamHandler() | ||
handler.setFormatter( | ||
logging.Formatter("%(asctime)s - %(name)s " "- %(levelname)s - %(message)s") | ||
) | ||
logger.addHandler(handler) | ||
logger.setLevel(logging.INFO) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please init the logger using the helpers.Logger.init_logger()
Example usage: https://github.com/MobilityData/mobility-feed-api/blob/main/functions-python/preprocessed_analytics/src/main.py#L36
This is important to make sure the logs are printed correctly when running in a GCP instance
class FeedPayload: | ||
"""Data class for feed processing payload""" | ||
|
||
external_id: str | ||
feed_id: str | ||
feed_url: str | ||
execution_id: Optional[str] | ||
spec: str | ||
auth_info_url: Optional[str] | ||
auth_param_name: Optional[str] | ||
type: Optional[str] | ||
operator_name: Optional[str] | ||
country: Optional[str] | ||
state_province: Optional[str] | ||
city_name: Optional[str] | ||
source: str | ||
payload_type: str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be duplicated from feed_sync_dispatcher_transitland
. To avoid code duplication we should move it to a common location and keep the naming consistent (it's renamed from TransitFeedSyncPayload
).
logger.error(error_msg) | ||
if "payload" in locals(): | ||
self.session.rollback() | ||
logger.debug("Database transaction rolled back due to error") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should change this to an error level logging as it is critical
result = ( | ||
self.session.query(Feed.id, Feed.producer_url) | ||
.join(Externalid) | ||
.filter( | ||
Externalid.associated_id == external_id, | ||
Externalid.source == source, | ||
Feed.status == "active", | ||
) | ||
.first() | ||
) | ||
if result: | ||
logger.debug( | ||
f"Retrieved current feed " f"info for external_id: {external_id}" | ||
) | ||
return result[0], result[1] | ||
|
||
logger.debug(f"No existing feed found for external_id: {external_id}") | ||
return None, None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[suggestion to avoid table join]
result = ( | |
self.session.query(Feed.id, Feed.producer_url) | |
.join(Externalid) | |
.filter( | |
Externalid.associated_id == external_id, | |
Externalid.source == source, | |
Feed.status == "active", | |
) | |
.first() | |
) | |
if result: | |
logger.debug( | |
f"Retrieved current feed " f"info for external_id: {external_id}" | |
) | |
return result[0], result[1] | |
logger.debug(f"No existing feed found for external_id: {external_id}") | |
return None, None | |
result = (self.session | |
.query(Feed) | |
.filter( | |
Feed.externalids.any( | |
associated_id=external_id, | |
source=source | |
), | |
Feed.status == "active") | |
.first() | |
) | |
if result is not None: | |
logger.info( | |
f"Retrieved feed {result.stable_id} info for external_id: {external_id}" | |
) | |
return result.id, result.producer_url | |
logging.info(f"No existing feed found for external_id: {external_id}") | |
return None, None |
# Update old feed status to deprecated | ||
old_feed = self.session.get(Feed, old_feed_id) | ||
if old_feed: | ||
old_feed.status = "deprecated" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question] just to confirm @emmambd in the case of the update of an existing feed during scraping. What should the status of the old feed be changed to?
existing_external_id = ( | ||
self.session.query(Externalid) | ||
.filter( | ||
Externalid.associated_id == payload.external_id, | ||
Externalid.source == payload.source, | ||
) | ||
.first() | ||
) | ||
|
||
if existing_external_id: | ||
existing_external_id.feed_id = new_feed_id | ||
logger.debug( | ||
f"Updated external ID mapping to new feed_id: {new_feed_id}" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should actually create a new entity of Externalid
linked to the updated (new) feed.
logger.debug("Database transaction rolled back due to error") | ||
raise | ||
|
||
def process_new_feed(self, payload: FeedPayload) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question] what about location information?
try: | ||
# Create new feed with updated URL | ||
new_feed_id = str(uuid.uuid4()) | ||
stable_id = f"{payload.source}-{payload.external_id}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm not mistaking, following the code logic, the updated feed would have the same stable_id
as the old version. Is this behaviour ok as we use the stable_id
as a key in a lot of processes including the feed page url in the web ui? @emmambd @davidgamez
{"feed_id": payload.feed_id, "execution_id": payload.execution_id} | ||
).encode("utf-8") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the message format that the dataset processing topic expects. It should be:
{
"message": {
"data":
{
"execution_id": "execution_id",
"producer_url": "producer_url",
"feed_stable_id": "feed_stable_id",
"feed_id": "feed_id",
"dataset_id": "dataset_id",
"dataset_hash": "dataset_hash",
"authentication_type": "authentication_type",
"authentication_info_url": "authentication_info_url",
"api_key_parameter_name": "api_key_parameter_name"
}
}
}
Refer to functions-python/batch_process_dataset
for more information.
New feeds and feed updates should have |
Summary:
The pull request addresses feed sync processing, ensuring proper handling and consistency of feed data using Pub/Sub messages. It includes necessary configuration files, comprehensive tests, and documentation.
Key implementations include:
Added support for:
This pull request addresses the functionality described in issue https://github.com/MobilityData/product-tasks/issues/102, which is part of the https://github.com/MobilityData/product-tasks/issues/95 epic.
Expected behavior:
Feed Processing Flow:
Testing tips:
Provide tips, procedures and sample files on how to test the feature.
Testers are invited to follow the tips AND to try anything they deem relevant outside the bounds of the testing tips.
Please make sure these boxes are checked before submitting your pull request - thanks!
./scripts/api-tests.sh
to make sure you didn't break anything