Skip to content

Commit

Permalink
cd:create a gha that can generate an index for
Browse files Browse the repository at this point in the history
every query in the src/sql folder. The gha will
check if the version of IDC is out of date. If
so, the queries will be updated, and run with
bigquery to generate csv and parquet files in
root directory. The cmake lists file is edited
to pick up the csv and parquet files generated
by bq and package them. Moreover gitignore is
edited to have a pattern for ignoring service
account files. Lastly, if queries are updated,
a pull request will be created to update the queries
to be in sync with latest idc release.
  • Loading branch information
vkt1414 committed Mar 15, 2024
1 parent 01eea6f commit fd40ab3
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 26 deletions.
200 changes: 200 additions & 0 deletions .github/get_latest_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
from __future__ import annotations

import logging
import os
import re
import uuid
from pathlib import Path

import pandas as pd
from google.cloud import bigquery

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)


class IDCIndexDataManager:
def __init__(self, project_id: str):
"""
Initializes the IDCIndexDataManager.
Args:
project_id (str): The Google Cloud Platform project ID.
"""
self.project_id = project_id
self.client = bigquery.Client(project=project_id)
logger.debug("IDCIndexDataManager initialized with project ID: %s", project_id)

def get_latest_idc_release_version(self) -> int:
"""
Retrieves the latest IDC release version from BigQuery.
Returns:
int: The latest IDC release version.
"""
query = """
SELECT
MAX(idc_version) AS latest_idc_release_version
FROM
`bigquery-public-data.idc_current.version_metadata`
"""
query_job = self.client.query(query)
result = query_job.result()
latest_idc_release_version = int(next(result).latest_idc_release_version)
logger.debug(
"Retrieved latest IDC release version: %d", latest_idc_release_version
)
return latest_idc_release_version

def extract_current_index_version(self, file_path: str) -> int:
"""
Extracts the current index version from the specified file.
Args:
file_path (str): The path to the file containing the index version.
Returns:
int: The current index version.
"""
try:
with Path(file_path).open("r") as file:
for line in file:
if "bigquery-public-data" in line:
match = re.findall(r"bigquery-public-data.(\w+).(\w+)", line)
if match:
dataset_name, table_name = match[0]
current_index_version = int(
re.findall(r"idc_v(\d+)", dataset_name)[0]
)
logger.debug(
"Extracted current index version: %d",
current_index_version,
)
return current_index_version
except FileNotFoundError:
logger.debug("File %s not found.", file_path)
except Exception as e:
logger.debug("An error occurred while extracting index version: %s", str(e))
return None

def update_sql_queries_folder(
self, dir_path: str, current_index_version: int, latest_idc_release_version: int
) -> None:
"""
Updates SQL queries in the specified folder.
Args:
dir_path (str): The path to the folder containing SQL queries.
current_index_version (int): The current index version.
latest_idc_release_version (int): The latest IDC release version.
"""
for file_name in os.listdir(dir_path):
if file_name.endswith(".sql"):
file_path = Path(dir_path) / file_name
with Path(file_path).open("r") as file:
sql_query = file.read()
modified_sql_query = sql_query.replace(
f"idc_v{current_index_version}",
f"idc_v{latest_idc_release_version}",
)
with Path(file_path, "w").open() as file:
file.write(modified_sql_query)
logger.debug("Updated SQL queries in file: %s", file_path)

def execute_sql_query(self, file_path: str) -> tuple[pd.DataFrame, str, str]:
"""
Executes the SQL query in the specified file.
Args:
file_path (str): The path to the file containing the SQL query.
Returns:
Tuple[pd.DataFrame, str, str]: A tuple containing the DataFrame with query results,
the CSV file name, and the Parquet file name.
"""
with Path(file_path).open("r") as file:
sql_query = file.read()
index_df = self.client.query(sql_query).to_dataframe()
file_name = Path(file_path).name.split(".")[0]
csv_file_name = f"{file_name}.csv.zip"
parquet_file_name = f"{file_name}.parquet"
logger.debug("Executed SQL query from file: %s", file_path)
return index_df, csv_file_name, parquet_file_name

def create_csv_zip_from_df(
self, index_df: pd.DataFrame, csv_file_name: str
) -> None:
"""
Creates a compressed CSV file from a pandas DataFrame.
Args:
index_df (pd.DataFrame): The pandas DataFrame to be saved as a CSV.
csv_file_name (str): The desired name for the resulting ZIP file (including the ".csv.zip" extension).
"""
index_df.to_csv(csv_file_name, compression={"method": "zip"}, escapechar="\\")
logger.debug("Created CSV zip file: %s", csv_file_name)

def create_parquet_from_df(
self, index_df: pd.DataFrame, parquet_file_name: str
) -> None:
"""
Creates a Parquet file from a pandas DataFrame.
Args:
index_df (pd.DataFrame): The pandas DataFrame to be saved as a Parquet file.
parquet_file_name (str): The desired name for the resulting Parquet file.
"""
index_df.to_parquet(parquet_file_name)
logger.debug("Created Parquet file: %s", parquet_file_name)

def run_queries_folder(self, dir_path: str) -> None:
"""
Executes SQL queries in the specified folder.
Args:
dir_path (str): The path to the folder containing SQL query files.
"""
for file_name in os.listdir(dir_path):
if file_name.endswith(".sql"):
file_path = Path(dir_path) / file_name
index_df, csv_file_name, parquet_file_name = self.execute_sql_query(
file_path
)
self.create_csv_zip_from_df(index_df, csv_file_name)
self.create_parquet_from_df(index_df, parquet_file_name)
logger.debug(
"Executed and processed SQL queries from folder: %s", dir_path
)

def set_multiline_output(self, name: str, value: str) -> None:
"""
Sets multiline output with a specified name and value.
Args:
name (str): The name of the output.
value (str): The value of the output.
"""
with Path(os.environ["GITHUB_OUTPUT"]).open("a") as fh:
delimiter = uuid.uuid1()
fh.write(f"{name}<<{delimiter}\n")
fh.write(f"{value}\n")
fh.write(f"{delimiter}\n")
logger.debug("Set multiline output with name: %s and value: %s", name, value)

def run(self) -> None:
"""
Runs the IDCIndexDataManager process.
"""
latest_idc_release_version = self.get_latest_idc_release_version()
current_index_version = self.extract_current_index_version(
"src/sql/idc_index.sql"
)
self.set_multiline_output("current_index_version", str(current_index_version))
self.set_multiline_output(
"latest_idc_release_version", str(latest_idc_release_version)
)


if __name__ == "__main__":
manager = IDCIndexDataManager("gcp-project-id")
manager.run()
72 changes: 63 additions & 9 deletions .github/workflows/cd.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
name: CD

on:
"on":
workflow_dispatch:
pull_request:
push:
Expand All @@ -9,35 +8,90 @@ on:
release:
types:
- published

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
group: "${{ github.workflow }}-${{ github.ref }}"
cancel-in-progress: true

env:
FORCE_COLOR: 3

jobs:
dist:
name: Distribution build
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: 3.11

- name: Install dependencies
run: |
pip install requests==2.31.0 pandas==2.1.1 \
google-cloud-bigquery==3.12.0 pyarrow==13.0.0 db-dtypes==1.1.1 \
pygithub==2.1.1
shell: bash

- name: Authorize Google Cloud
uses: google-github-actions/auth@v1
with:
credentials_json: "${{ secrets.SERVICE_ACCOUNT_KEY }}"
create_credentials_file: true
export_environment_variables: true

- name: Run script to get the latest idc index
id: initialize_idc_manager_class
shell: python
run: |
import sys
import os
sys.path.append(".github")
from get_latest_index import IDCIndexDataManager
project_id = os.environ["GCP_PROJECT_ID"]
manager = IDCIndexDataManager(project_id)
current_index_version = manager.extract_current_index_version("src/sql/idc_index.sql")
latest_idc_release_version = manager.get_latest_idc_release_version()
if current_index_version < latest_idc_release_version:
manager.update_sql_queries_folder(
"src/sql/", current_index_version, latest_idc_release_version
)
manager.run_queries_folder("src/sql/")
manager.set_multiline_output("current_index_version", int(current_index_version))
manager.set_multiline_output("latest_idc_release_version", int(latest_idc_release_version))
env:
GCP_PROJECT_ID: "${{ secrets.GCP_PROJECT_ID }}"

- uses: hynek/build-and-inspect-python-package@v2

- name: Create Pull Request
uses: peter-evans/create-pull-request@v5
with:
title:
"Update idc version to v${{
steps.initialize_idc_manager_class.outputs.latest_idc_release_version
}}"
body:
"Update idc version in the sql queries to v${{
steps.initialize_idc_manager_class.outputs.latest_idc_release_version
}}"
base: main
branch: update-sql-queries
add-paths: |
src/sql/*.sql
env:
GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}"

publish:
needs: [dist]
needs:
- dist
name: Publish to PyPI
environment: pypi
permissions:
id-token: write
runs-on: ubuntu-latest
if: github.event_name == 'release' && github.event.action == 'published'

steps:
- uses: actions/download-artifact@v4
with:
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,6 @@ Thumbs.db
# Common editor files
*~
*.swp

# gcp service account keys
gha-creds-**.json
22 changes: 6 additions & 16 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,20 +1,10 @@
cmake_minimum_required(VERSION 3.15...3.26)
project(${SKBUILD_PROJECT_NAME} LANGUAGES NONE)

# Use the csv.zip and parquet files generated from the root folder
set(idc_index_csv_zip "${CMAKE_SOURCE_DIR}/idc_index.csv.zip")
set(idc_index_parquet "${CMAKE_SOURCE_DIR}/idc_index.parquet")

set(idc_index_release_version "0.3.2")
set(idc_index_data_url "https://github.com/ImagingDataCommons/idc-index/releases/download/${idc_index_release_version}/idc_index.csv.zip")
set(idc_index_data_sha256 "70ec9f915686a27bee3098163b8695c69c8696c05bfb7bd76943a24024cdeeb9")

#
# Download and install index
#
set(download_dir "${PROJECT_BINARY_DIR}")
include(FetchContent)
FetchContent_Populate(s5cmd
URL ${idc_index_data_url}
URL_HASH SHA256=${idc_index_data_sha256}
DOWNLOAD_DIR ${download_dir}
DOWNLOAD_NO_EXTRACT TRUE
)
install(FILES "${download_dir}/idc_index.csv.zip" DESTINATION "idc_index_data")
# Install the csv.zip and parquet files
install(FILES "${idc_index_csv_zip}" DESTINATION "idc_index_data")
install(FILES "${idc_index_parquet}" DESTINATION "idc_index_data")
7 changes: 6 additions & 1 deletion src/idc_index_data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

from ._version import version as __version__

__all__ = ["__version__", "IDC_INDEX_CSV_ARCHIVE_FILEPATH"]
__all__ = [
"__version__",
"IDC_INDEX_CSV_ARCHIVE_FILEPATH",
"IDC_INDEX_PARQUET_FILEPATH",
]


def _lookup(path: str) -> Path:
Expand All @@ -31,3 +35,4 @@ def _lookup(path: str) -> Path:


IDC_INDEX_CSV_ARCHIVE_FILEPATH: Path = _lookup("idc_index_data/idc_index.csv.zip")
IDC_INDEX_PARQUET_FILEPATH: Path = _lookup("idc_index_data/idc_index.parquet")
34 changes: 34 additions & 0 deletions src/sql/idc_index.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
SELECT
# collection level attributes
ANY_VALUE(collection_id) AS collection_id,
ANY_VALUE(PatientID) AS PatientID,
SeriesInstanceUID,
ANY_VALUE(StudyInstanceUID) AS StudyInstanceUID,
ANY_VALUE(source_DOI) AS source_DOI,
# patient level attributes
ANY_VALUE(PatientAge) AS PatientAge,
ANY_VALUE(PatientSex) AS PatientSex,
# study level attributes
ANY_VALUE(StudyDate) AS StudyDate,
ANY_VALUE(StudyDescription) AS StudyDescription,
ANY_VALUE(dicom_curated.BodyPartExamined) AS BodyPartExamined,
# series level attributes
ANY_VALUE(Modality) AS Modality,
ANY_VALUE(Manufacturer) AS Manufacturer,
ANY_VALUE(ManufacturerModelName) AS ManufacturerModelName,
ANY_VALUE(SAFE_CAST(SeriesDate AS STRING)) AS SeriesDate,
ANY_VALUE(SeriesDescription) AS SeriesDescription,
ANY_VALUE(SeriesNumber) AS SeriesNumber,
COUNT(dicom_all.SOPInstanceUID) AS instanceCount,
ANY_VALUE(license_short_name) as license_short_name,
# download related attributes
ANY_VALUE(CONCAT("s3://", SPLIT(aws_url,"/")[SAFE_OFFSET(2)], "/", crdc_series_uuid, "/*")) AS series_aws_url,
ROUND(SUM(SAFE_CAST(instance_size AS float64))/1000000, 2) AS series_size_MB,
FROM
`bigquery-public-data.idc_v17.dicom_all` AS dicom_all
JOIN
`bigquery-public-data.idc_v17.dicom_metadata_curated` AS dicom_curated
ON
dicom_all.SOPInstanceUID = dicom_curated.SOPInstanceUID
GROUP BY
SeriesInstanceUID
2 changes: 2 additions & 0 deletions tests/test_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ def test_version():
def test_filepath():
assert m.IDC_INDEX_CSV_ARCHIVE_FILEPATH.is_file()
assert m.IDC_INDEX_CSV_ARCHIVE_FILEPATH.name == "idc_index.csv.zip"
assert m.IDC_INDEX_PARQUET_FILEPATH.is_file()
assert m.IDC_INDEX_PARQUET_FILEPATH.name == "idc_index.parquet"

0 comments on commit fd40ab3

Please sign in to comment.