Skip to content

Commit

Permalink
feat: test the snowflake query for populate_product_catalog_command
Browse files Browse the repository at this point in the history
  • Loading branch information
AfaqShuaib09 committed Sep 24, 2024
1 parent 18d78aa commit 33485c9
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 11 deletions.
98 changes: 98 additions & 0 deletions course_discovery/apps/course_metadata/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,101 @@ class PathwayType(Enum):
from
prod.enterprise.course_reviews
'''

SNOWFLAKE_POPULATE_PRODUCT_CATALOG_QUERY = """
WITH course_data AS (
SELECT
c.id, c.uuid as COURSE_UUID,
c.key as COURSE_KEY,
cr.key as COURSERUN_KEY,
c.title AS COURSE_TITLE,
coursetype.name AS COURSE_TYPE,
COUNT(DISTINCT org.id) AS ORGANIZATIONS_COUNT,
LISTAGG(DISTINCT org.key, ', ') AS ORGANISATION_ABBR,
LISTAGG(DISTINCT org.name, ', ') AS ORGANIZATION_NAME,
LISTAGG(DISTINCT CONCAT('https://prod-discovery.edx-cdn.org/', org.logo_image), ', ') AS ORGANIZATION_LOGO,
COUNT(DISTINCT cr.language_id) AS languagesCount,
LISTAGG(DISTINCT cr.language_id, ', ') AS Languages,
LISTAGG(DISTINCT CASE WHEN st.language_code <> 'es' THEN st.name ELSE NULL END, ', ') AS Subjects,
LISTAGG(DISTINCT CASE WHEN st.language_code = 'es' THEN st.name ELSE NULL END, ', ') AS Subject_Spanish,
LISTAGG(DISTINCT s.type, ', ') AS SEAT_TYPE,
CONCAT(p.marketing_site_url_root, cslug.url_Slug) AS MARKETING_URL,
CASE
WHEN c.image IS NOT NULL THEN CONCAT('https://prod-discovery.edx-cdn.org/', c.image)
ELSE CONCAT('https://prod-discovery.edx-cdn.org/', c.card_image_url)
END AS MARKETING_IMAGE,
CASE
WHEN cr.RUN_START IS NOT NULL AND cr.RUN_START >= CURRENT_TIMESTAMP() THEN 'True'
ELSE 'False'
END AS is_upcoming,
CASE
WHEN (cr.enrollment_end IS NULL OR cr.enrollment_end >= CURRENT_TIMESTAMP()) AND (cr.enrollment_start IS NULL OR cr.enrollment_start <= CURRENT_TIMESTAMP()) THEN 'True'
ELSE 'False'
END AS is_enrollable,
CASE
WHEN crt.is_marketable = TRUE
AND cr.draft = FALSE
AND cr.status = 'published'
AND s.id IS NOT NULL -- Checking if there are any seats
AND (cr.slug IS NOT NULL AND cr.slug != '' AND crt.is_marketable = TRUE) -- is_marketable
THEN 'True'
ELSE 'False'
END AS is_marketable,
CASE
WHEN cr.RUN_END IS NOT NULL AND cr.RUN_END < CURRENT_TIMESTAMP() THEN 'True'
ELSE 'False'
END AS has_ended,
LISTAGG(DISTINCT
CASE
WHEN cr.RUN_END < CURRENT_TIMESTAMP() THEN 'Archived'
WHEN cr.RUN_START <= CURRENT_TIMESTAMP() THEN 'Current'
WHEN cr.RUN_START < DATEADD(DAY, 60, CURRENT_TIMESTAMP()) THEN 'Starting Soon'
ELSE 'Upcoming'
END, ', ') AS availability_status,
LISTAGG(cr.pacing_type, ', ') AS pacing
FROM
discovery.course_metadata_courserun AS cr
JOIN
discovery.course_metadata_course AS c ON c.id = cr.course_id
JOIN
discovery.core_partner AS p ON c.partner_id = p.id
JOIN
discovery.course_metadata_courseruntype AS crt ON crt.id = cr.type_id
JOIN
discovery.course_metadata_seat AS s ON cr.id = s.course_run_id
JOIN
discovery.course_metadata_coursetype AS coursetype ON coursetype.id = c.type_id
JOIN
discovery.course_metadata_course_authoring_organizations AS cao ON c.id = cao.course_id
JOIN
discovery.course_metadata_organization AS org ON cao.organization_id = org.id
JOIN
discovery.course_metadata_course_subjects AS cs ON c.id = cs.course_id
JOIN
discovery.course_metadata_subject AS sb ON sb.id = cs.subject_id
JOIN
discovery.course_metadata_subjecttranslation AS st ON st.master_id = sb.id
JOIN
discovery.course_metadata_courseurlslug AS cslug ON c.id = cslug.course_id
WHERE
c.draft != 1 AND cr.hidden != 1 AND cr.status = 'published'
AND (
coursetype.slug LIKE 'audit'
OR coursetype.slug LIKE 'verified-audit'
OR coursetype.slug LIKE 'verified'
OR coursetype.slug LIKE 'credit-verified-audit'
OR coursetype.slug LIKE 'spoc-verified-audit'
OR coursetype.slug LIKE 'professional'
)
AND cslug.is_active = 1
GROUP BY
c.uuid, c.id, c.key, cr.key, c.title, coursetype.name, p.marketing_site_url_root, cslug.url_Slug, c.image, c.card_image_url, cr.RUN_START, cr.enrollment_end, cr.enrollment_start, cr.RUN_END, crt.is_marketable, cr.draft, cr.status, s.id, cr.slug
ORDER BY
c.id
)
SELECT DISTINCT *
FROM course_data
WHERE
(is_upcoming = 'True')
OR (is_enrollable = 'True' AND has_ended != 'True' AND is_marketable = 'True');
"""
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
import datetime
import logging

import snowflake.connector
from django.conf import settings
from django.core.management import BaseCommand, CommandError
from django.db.models import Count, Prefetch, Q

from course_discovery.apps.course_metadata.constants import (
SNOWFLAKE_POPULATE_COURSE_LENGTH_QUERY, SNOWFLAKE_POPULATE_PRODUCT_CATALOG_QUERY
)
from course_discovery.apps.course_metadata.gspread_client import GspreadClient
from course_discovery.apps.course_metadata.models import Course, CourseType, Program, SubjectTranslation

Expand Down Expand Up @@ -63,6 +67,52 @@ def add_arguments(self, parser):
required=False,
help='Flag to overwrite the existing data in Google Sheet tab'
)
parser.add_argument(
'--use_snowflake',
dest='use_snowflake_flag',
type=bool,
default=False,
required=False,
help='Flag to use Snowflake for fetching data'
)
def get_products_via_snowflake(self, product_type='ocm_course', product_source=None):
"""
Fetch products from Snowflake for product catalog
"""
snowflake_client = snowflake.connector.connect(
user=settings.SNOWFLAKE_SERVICE_USER,
password=settings.SNOWFLAKE_SERVICE_USER_PASSWORD,
account='edx.us-east-1',
database='prod'
)
cs = snowflake_client.cursor()
try:
cs.execute(SNOWFLAKE_POPULATE_PRODUCT_CATALOG_QUERY)
rows = cs.fetchall()
except Exception as e:
logger.error('Error while fetching products from Snowflake')
finally:
cs.close()
snowflake_client.close()
return rows

def get_transformed_data_from_snowflake(self, product):
"""
Transform data fetched from Snowflake for product catalog
"""
transformed_data = {
'UUID': product[0],
'Title': product[1],
'Organizations Name': product[2],
'Organizations Logo': product[3],
'Organizations Abbr': product[4],
'Languages': product[5],
'Subjects': product[6],
'Subjects Spanish': product[7],
'Marketing URL': product[8],
'Marketing Image': product[9],
}
return transformed_data

def get_products(self, product_type, product_source):
"""
Expand Down Expand Up @@ -182,6 +232,7 @@ def handle(self, *args, **options):
product_source = options.get('product_source')
gspread_client_flag = options.get('gspread_client_flag')
overwrite = options.get('overwrite_flag')
snowflake_flag = options.get('use_snowflake_flag')
PRODUCT_CATALOG_CONFIG = {
'SHEET_ID': settings.PRODUCT_CATALOG_SHEET_ID,
'OUTPUT_TAB_ID': (
Expand All @@ -194,26 +245,45 @@ def handle(self, *args, **options):

try:
product_type = product_type.lower()
products = self.get_products(product_type, product_source)
if not products.exists():
raise CommandError('No products found for the given criteria.')
products_count = products.count()

if snowflake_flag:
products = self.get_products_via_snowflake(product_type, product_source)
if not products:
raise CommandError('No products found for the given criteria.')
products_count = len(products)

else:
products = self.get_products(product_type, product_source)
if not products.exists():
raise CommandError('No products found for the given criteria.')
products_count = products.count()

logger.info(f'Fetched {products_count} {product_type}s from the database')

if output_csv:
with open(output_csv, 'w', newline='') as output_file:
output_writer = self.write_csv_header(output_file)
for product in products:
try:
output_writer.writerow(self.get_transformed_data(product, product_type))
except Exception as e: # pylint: disable=broad-exception-caught
logger.error(f"Error writing product {product.uuid} to CSV: {str(e)}")
continue

if snowflake_flag:
for row in products:
transformed_data = self.get_transformed_data_from_snowflake(row)
output_writer.writerow(transformed_data)

else:
for product in products:
try:
output_writer.writerow(self.get_transformed_data(product, product_type))
except Exception as e: # pylint: disable=broad-exception-caught
logger.error(f"Error writing product {product.uuid} to CSV: {str(e)}")
continue

logger.info(f'Populated {products_count} {product_type}s to {output_csv}')

elif gspread_client_flag:
csv_data = [self.get_transformed_data(product, product_type) for product in products]
if snowflake_flag:
csv_data = [self.get_transformed_data_from_snowflake(row) for row in products]
else:
csv_data = [self.get_transformed_data(product, product_type) for product in products]
gspread_client.write_data(
PRODUCT_CATALOG_CONFIG,
self.CATALOG_CSV_HEADERS,
Expand Down

0 comments on commit 33485c9

Please sign in to comment.