Skip to content

Commit

Permalink
De-duplicate super implementation of get_url_params
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenFrankel committed Oct 4, 2023
1 parent 3051e2e commit ad16b5f
Showing 1 changed file with 17 additions and 48 deletions.
65 changes: 17 additions & 48 deletions tap_shopify/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

from decimal import Decimal
from pathlib import Path
from typing import Any, Dict, Optional
from urllib.parse import parse_qsl, urlsplit
from typing import Optional

from tap_shopify.client import tap_shopifyStream

Expand Down Expand Up @@ -33,20 +32,16 @@ class CollectStream(tap_shopifyStream):
replication_method = "INCREMENTAL"
schema_filepath = SCHEMAS_DIR / "collect.json"

def get_url_params(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Dict[str, Any]:
def get_url_params(self, context, next_page_token):
"""Return a dictionary of values to be used in URL parameterization."""
params: dict = {}
params = super().get_url_params(context, next_page_token)

if next_page_token:
return dict(parse_qsl(urlsplit(next_page_token).query))
if not next_page_token:
context_state = self.get_context_state(context)
last_id = context_state.get("replication_key_value")

context_state = self.get_context_state(context)
last_id = context_state.get("replication_key_value")

if last_id:
params["since_id"] = last_id

return params


Expand Down Expand Up @@ -105,27 +100,14 @@ class InventoryLevelsStream(tap_shopifyStream):
def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
"""Return a context dictionary for child streams."""
return {"inventory_item_id": record["inventory_item_id"]}

def get_url_params(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Dict[str, Any]:
"""Return a dictionary of values to be used in URL parameterization."""
params: dict = {}

if next_page_token:
return dict(parse_qsl(urlsplit(next_page_token).query))

context_state = self.get_context_state(context)
last_updated = context_state.get("replication_key_value")
def get_url_params(self, context, next_page_token):
"""Return a dictionary of values to be used in URL parameterization."""
params = super().get_url_params(context, next_page_token)

start_date = self.config.get("start_date")
if not next_page_token:
params["location_ids"] = context["location_id"]

if last_updated:
params["updated_at_min"] = last_updated
return params
elif start_date:
params["created_at_min"] = start_date
params["location_ids"] = context["location_id"]
return params


Expand Down Expand Up @@ -177,27 +159,14 @@ def post_process(self, row: dict, context: Optional[dict] = None):
def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
"""Return a context dictionary for child streams."""
return {"order_id": record["id"]}

def get_url_params(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Dict[str, Any]:
"""Return a dictionary of values to be used in URL parameterization."""
params: dict = {}

if next_page_token:
return dict(parse_qsl(urlsplit(next_page_token).query))

context_state = self.get_context_state(context)
last_updated = context_state.get("replication_key_value")
def get_url_params(self, context, next_page_token):
"""Return a dictionary of values to be used in URL parameterization."""
params = super().get_url_params(context, next_page_token)

start_date = self.config.get("start_date")
if not next_page_token:
params["status"] = "any"

if last_updated:
params["updated_at_min"] = last_updated
return params
elif start_date:
params["created_at_min"] = start_date
params["status"] = "any"
return params


Expand Down

0 comments on commit ad16b5f

Please sign in to comment.