Skip to content

Commit

Permalink
refactor + add new config vars
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesbryer committed Jan 20, 2025
1 parent 6b3d9b8 commit 0b6b3c0
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 25 deletions.
5 changes: 4 additions & 1 deletion publish-schema/src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@

class Config:
PROJECT_ID = get_value_from_env("PROJECT_ID", "ons-sds-jb")
PROCESS_TIMEOUT = int(get_value_from_env("PROCESS_TIMEOUT", "3400"))
PROCESS_TIMEOUT = int(get_value_from_env("PROCESS_TIMEOUT", "540"))
API_URL = get_value_from_env("API_URL", "https://34.120.41.215.nip.io")
SECRET_ID = get_value_from_env("SECRET_ID", "oauth-client-name")
GITHUB_URL = get_value_from_env("GITHUB_URL", "https://raw.githubusercontent.com/ONSdigital/sds-prototype-schema/refs/heads/SDSS-823-schema-publication-automation-spike/")
POST_SCHEMA_ENDPOINT = get_value_from_env("POST_SCHEMA_URL", "/v1/schema?survey_id=")
GET_SCHEMA_METADATA_ENDPOINT = get_value_from_env("GET_SCHEMA_METADATA_URL", "/v1/schema_metadata?survey_id=")

config = Config()
78 changes: 54 additions & 24 deletions publish-schema/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@

logger = logging.getLogger(__name__)


@functions_framework.cloud_event
def publish_schema(cloud_event: CloudEvent) -> None:
"""
Method to retrieve, verify, and publish a schema to SDS.
Parameters:
cloud_event (CloudEvent): the CloudEvent containing the message.
"""
filepath = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8')
filepath = base64.b64decode(cloud_event.data["message"]["data"]).decode("utf-8")

schema = fetch_raw_schema(filepath)

Expand All @@ -30,6 +31,11 @@ def publish_schema(cloud_event: CloudEvent) -> None:
return

survey_id = fetch_survey_id(schema)

if not check_duplicate_versions(schema, survey_id):
logger.error("Stopping execution due to duplicate schema version.")
return

response = post_schema(schema, survey_id)
if response.status_code == 200:
logger.info(f"Schema {filepath} posted successfully")
Expand All @@ -41,15 +47,15 @@ def publish_schema(cloud_event: CloudEvent) -> None:
def fetch_raw_schema(path) -> dict:
"""
Method to fetch the schema from the ONSdigital GitHub repository.
Parameters:
path (str): the path to the schema JSON.
Returns:
dict: the schema JSON.
"""

url = f'https://raw.githubusercontent.com/ONSdigital/sds-prototype-schema/refs/heads/SDSS-823-schema-publication-automation-spike/{path}'
url = Config.GITHUB_URL + path
logger.info(f"Fetching schema from {url}")
try:
response = requests.get(url)
Expand All @@ -58,7 +64,7 @@ def fetch_raw_schema(path) -> dict:
logger.error(f"Failed to fetch schema from {url}")
logger.error(e)
return None

try:
schema = response.json()
except json.JSONDecodeError as e:
Expand All @@ -71,7 +77,7 @@ def fetch_raw_schema(path) -> dict:
def fetch_survey_id(schema) -> str:
"""
Method to fetch the survey ID from the schema JSON.
Parameters:
schema (dict): the schema JSON.
"""
Expand All @@ -98,12 +104,13 @@ def post_schema(schema, survey_id) -> requests.Response:
headers = generate_headers()
logger.info(f"Posting schema for survey {survey_id}")
response = session.post(
f"{Config.API_URL}/v1/schema?survey_id={survey_id}",
f"{Config.API_URL}{Config.POST_SCHEMA_ENDPOINT}{survey_id}",
json=schema,
headers=headers,
)
return response


def split_filename(path) -> str:
"""
Method to split the filename without extension from the path.
Expand Down Expand Up @@ -138,9 +145,12 @@ def verify_version(filepath, schema) -> bool:
logger.info(f"Schema version for {filename} verified.")
return True
else:
logger.error(f"Schema version for {filepath} does not match. Expected {filename}, got {schema['properties']['schema_version']['const']}")
logger.error(
f"Schema version for {filepath} does not match. Expected {filename}, got {schema['properties']['schema_version']['const']}"
)
return False


def check_duplicate_versions(schema, survey_id) -> bool:
"""
Method to call the schema_metadata endpoint and check that the schema_version for the new schema is not already present in SDS.
Expand All @@ -152,26 +162,46 @@ def check_duplicate_versions(schema, survey_id) -> bool:
Returns:
bool: True if there are no duplicate versions, False otherwise.
"""
logger.info(f"Checking for duplicate schema versions for survey {survey_id}")
logger.debug(f"Checking for duplicate schema versions for survey {survey_id}")

session = setup_session()
headers = generate_headers()
response = session.get(
f"{Config.API_URL}/v1/schema_metadata?survey_id={survey_id}",
headers=headers,
)
if response.status_code != 200:
logger.error(f"Failed to fetch schema metadata for survey {survey_id}. Cannot verify schema version. Exiting.")
logger.error(response.text)
return False
logger.debug(f"Fetching schema metadata for survey {survey_id}")
schema_metadata = get_schema_metadata(survey_id)

metadata = response.json()
new_schema_version = schema["properties"]["schema_version"]["const"]

for version in metadata:
for version in schema_metadata:
if new_schema_version == version["schema_version"]:
logger.error(f"Schema version {new_schema_version} already exists for survey {survey_id}")
logger.error(
f"Schema version {new_schema_version} already exists for survey {survey_id}"
)
return False
logger.info(f"Verified schema_version {new_schema_version} for survey {survey_id} is unique.")
logger.info(
f"Verified schema_version {new_schema_version} for survey {survey_id} is unique."
)
return True



def get_schema_metadata(survey_id) -> dict:
"""
Method to call the schema_metadata endpoint and return the metadata for the survey.
Parameters:
survey_id (str): the survey_id of the schema.
Returns:
dict: the metadata for the survey.
"""

session = setup_session()
headers = generate_headers()
try:
response = session.get(
f"{Config.API_URL}{Config.GET_SCHEMA_METADATA_ENDPOINT}{survey_id}",
headers=headers,
)
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.error(f"Failed to fetch schema metadata for survey {survey_id}")
logger.error(e)
return None
return response.json()

0 comments on commit 0b6b3c0

Please sign in to comment.