From 1efc832efeeb6ef0c8c1eaf17715aea4e424f61f Mon Sep 17 00:00:00 2001 From: Trent Smith <1429913+Bento007@users.noreply.github.com> Date: Fri, 12 Jan 2024 14:26:52 -0800 Subject: [PATCH 1/4] fix build-images-and-create-deployment.yml and push-rdev.yml (#6509) --- .github/workflows/build-images-and-create-deployment.yml | 1 - .github/workflows/push-rdev.yml | 1 - 2 files changed, 2 deletions(-) diff --git a/.github/workflows/build-images-and-create-deployment.yml b/.github/workflows/build-images-and-create-deployment.yml index b3a3fd09dd91e..57186bc40203a 100644 --- a/.github/workflows/build-images-and-create-deployment.yml +++ b/.github/workflows/build-images-and-create-deployment.yml @@ -24,7 +24,6 @@ jobs: build_images: uses: ./.github/workflows/build-images.yml secrets: inherit - needs: get_previous_image_digests create_deployment: needs: diff --git a/.github/workflows/push-rdev.yml b/.github/workflows/push-rdev.yml index 1a296fa3ed04b..e3698f3929432 100644 --- a/.github/workflows/push-rdev.yml +++ b/.github/workflows/push-rdev.yml @@ -26,7 +26,6 @@ jobs: build_images: uses: ./.github/workflows/build-images.yml secrets: inherit - needs: get_previous_image_digests summarize: runs-on: ubuntu-22.04 From 0ea34e9b4521b737f81680418fe7d765f38dcc5a Mon Sep 17 00:00:00 2001 From: Trent Smith <1429913+Bento007@users.noreply.github.com> Date: Fri, 12 Jan 2024 15:49:29 -0800 Subject: [PATCH 2/4] chore: matrix build processing image (#6502) --- .../workflows/rebuild-processing-image.yml | 68 ++----------- .github/workflows/schema-migration.yml | 99 ++----------------- 2 files changed, 15 insertions(+), 152 deletions(-) diff --git a/.github/workflows/rebuild-processing-image.yml b/.github/workflows/rebuild-processing-image.yml index 2c02439c04889..651782f3430d0 100644 --- a/.github/workflows/rebuild-processing-image.yml +++ b/.github/workflows/rebuild-processing-image.yml @@ -16,9 +16,13 @@ permissions: on: repository_dispatch: types: [rebuild-processing] + workflow_call: jobs: - rebuild-and-push-processing-image-dev: + rebuild-and-push-processing-image: + strategy: + matrix: + branch: [main, staging, prod] runs-on: ubuntu-22.04 steps: - name: Configure AWS Credentials @@ -31,66 +35,10 @@ jobs: uses: docker/login-action@v3 with: registry: ${{ secrets.ECR_REPO }} - - name: Checkout Dev (main) + - name: Checkout ${{ matrix.branch }} uses: actions/checkout@v2 with: - ref: main - fetch-depth: 1 - - name: Install happy - uses: chanzuckerberg/github-actions/.github/actions/install-happy@install-happy-v1.4.2 - with: - happy_version: "0.110.1" - - name: Docker build, push, and tag - shell: bash - run: | - export BRANCH_SHA=$(git rev-parse --short=8 HEAD) - happy push "" --aws-profile "" --tag sha-${BRANCH_SHA} --slice processing - - rebuild-and-push-processing-image-staging: - runs-on: ubuntu-22.04 - steps: - - name: Configure AWS Credentials - uses: aws-actions/configure-aws-credentials@v4 - with: - aws-region: us-west-2 - role-to-assume: ${{ secrets.AWS_ROLE_TO_ASSUME }} - role-duration-seconds: 1800 - - name: Login to ECR - uses: docker/login-action@v3 - with: - registry: ${{ secrets.ECR_REPO }} - - name: Checkout Staging - uses: actions/checkout@v2 - with: - ref: staging - fetch-depth: 1 - - name: Install happy - uses: chanzuckerberg/github-actions/.github/actions/install-happy@install-happy-v1.4.2 - with: - happy_version: "0.110.1" - - name: Docker build, push, and tag - shell: bash - run: | - export BRANCH_SHA=$(git rev-parse --short=8 HEAD) - happy push "" --aws-profile "" --tag sha-${BRANCH_SHA} --slice processing - - rebuild-and-push-processing-image-prod: - runs-on: ubuntu-22.04 - steps: - - name: Configure AWS Credentials - uses: aws-actions/configure-aws-credentials@v4 - with: - aws-region: us-west-2 - role-to-assume: ${{ secrets.AWS_ROLE_TO_ASSUME }} - role-duration-seconds: 1800 - - name: Login to ECR - uses: docker/login-action@v3 - with: - registry: ${{ secrets.ECR_REPO }} - - name: Checkout Prod - uses: actions/checkout@v2 - with: - ref: prod + ref: ${{ matrix.branch }} fetch-depth: 1 - name: Install happy uses: chanzuckerberg/github-actions/.github/actions/install-happy@install-happy-v1.4.2 @@ -108,4 +56,4 @@ jobs: fields: repo,eventName,workflow,job env: SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }} - if: always() # Pick up events even if the job fails or is canceled. + if: matrix.branch == 'prod' && always() # Pick up events even if the job fails or is canceled. diff --git a/.github/workflows/schema-migration.yml b/.github/workflows/schema-migration.yml index 9cd1edde1a960..fcb6eb3883c61 100644 --- a/.github/workflows/schema-migration.yml +++ b/.github/workflows/schema-migration.yml @@ -18,101 +18,16 @@ on: types: [schema-migration] jobs: - rebuild-and-push-processing-image-dev: - runs-on: ubuntu-22.04 - steps: - - name: Configure AWS Credentials - uses: aws-actions/configure-aws-credentials@v4 - with: - aws-region: us-west-2 - role-to-assume: ${{ secrets.AWS_ROLE_TO_ASSUME }} - role-duration-seconds: 1800 - - name: Login to ECR - uses: docker/login-action@v3 - with: - registry: ${{ secrets.ECR_REPO }} - - name: Checkout Dev (main) - uses: actions/checkout@v2 - with: - ref: main - fetch-depth: 1 - - name: Install happy - uses: chanzuckerberg/github-actions/.github/actions/install-happy@install-happy-v1.4.2 - with: - happy_version: "0.110.1" - - name: Docker build, push, and tag - shell: bash - run: | - export BRANCH_SHA=$(git rev-parse --short=8 HEAD) - happy push "" --aws-profile "" --tag sha-${BRANCH_SHA} --slice processing - - rebuild-and-push-processing-image-staging: - runs-on: ubuntu-22.04 - steps: - - name: Configure AWS Credentials - uses: aws-actions/configure-aws-credentials@v4 - with: - aws-region: us-west-2 - role-to-assume: ${{ secrets.AWS_ROLE_TO_ASSUME }} - role-duration-seconds: 1800 - - name: Login to ECR - uses: docker/login-action@v3 - with: - registry: ${{ secrets.ECR_REPO }} - - name: Checkout Staging - uses: actions/checkout@v2 - with: - ref: staging - fetch-depth: 1 - - name: Install happy - uses: chanzuckerberg/github-actions/.github/actions/install-happy@install-happy-v1.4.2 - with: - happy_version: "0.110.1" - - name: Docker build, push, and tag - shell: bash - run: | - export BRANCH_SHA=$(git rev-parse --short=8 HEAD) - happy push "" --aws-profile "" --tag sha-${BRANCH_SHA} --slice processing - - rebuild-and-push-processing-image-prod: - runs-on: ubuntu-22.04 - steps: - - name: Configure AWS Credentials - uses: aws-actions/configure-aws-credentials@v4 - with: - aws-region: us-west-2 - role-to-assume: ${{ secrets.AWS_ROLE_TO_ASSUME }} - role-duration-seconds: 1800 - - name: Login to ECR - uses: docker/login-action@v3 - with: - registry: ${{ secrets.ECR_REPO }} - - name: Checkout Prod - uses: actions/checkout@v2 - with: - ref: prod - fetch-depth: 1 - - name: Install happy - uses: chanzuckerberg/github-actions/.github/actions/install-happy@install-happy-v1.4.2 - with: - happy_version: "0.110.1" - - name: Docker build, push, and tag - shell: bash - run: | - export BRANCH_SHA=$(git rev-parse --short=8 HEAD) - happy push "" --aws-profile "" --tag sha-${BRANCH_SHA} --slice processing - - name: Alert in Slack - uses: 8398a7/action-slack@v3 - with: - status: ${{ job.status }} - fields: repo,eventName,workflow,job - env: - SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }} - if: always() # Pick up events even if the job fails or is canceled. + rebuild-processing-images: + uses: ./.github/actions/rebuild-processing-images.yml + secrets: + ECR_REPO: ${{ secrets.ECR_REPO }} + AWS_ROLE_TO_ASSUME: ${{ secrets.AWS_ROLE_TO_ASSUME }} + SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }} trigger-schema-migration: runs-on: ubuntu-22.04 - needs: rebuild-and-push-processing-image-prod + needs: rebuild-processing-images steps: - name: Configure AWS Credentials uses: aws-actions/configure-aws-credentials@v4 From 13406018fb1c789ef8350b2ee7a490c864d76129 Mon Sep 17 00:00:00 2001 From: Trent Smith <1429913+Bento007@users.noreply.github.com> Date: Fri, 12 Jan 2024 17:23:08 -0800 Subject: [PATCH 3/4] fix: fix upgrade during deployment (#6511) --- .github/workflows/build-images.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-images.yml b/.github/workflows/build-images.yml index 874c1eedeb83b..ea1d586265c48 100644 --- a/.github/workflows/build-images.yml +++ b/.github/workflows/build-images.yml @@ -55,7 +55,7 @@ jobs: run: | echo "HAPPY_COMMIT=$(git rev-parse --verify HEAD)" >> envfile echo "HAPPY_BRANCH=$(git branch --show-current)" >> envfile - export IMAGE_TAG=sha-${GITHUB_SHA:0:7} + export IMAGE_TAG=sha-${GITHUB_SHA:0:8} export BRANCH_TAG=branch-$(echo ${GITHUB_REF_NAME} | sed 's/[\+\/]/-/g') echo "IMAGE_TAG=${IMAGE_TAG}" >> $GITHUB_OUTPUT happy push devstack --env dev --slice ${{ matrix.image }} \ From 3b2f46314b10eb397efd53ea1876a279bf00a2d8 Mon Sep 17 00:00:00 2001 From: prathap sridharan Date: Tue, 16 Jan 2024 10:09:14 -0800 Subject: [PATCH 4/4] feat: Cache WMG snapshot on web server's local disk (#6447) Co-authored-by: atarashansky --- .happy/terraform/modules/ecs-stack/main.tf | 10 + .../terraform/modules/ecs-stack/variables.tf | 6 + .happy/terraform/modules/service/main.tf | 3 + .happy/terraform/modules/service/variables.tf | 6 + Dockerfile.backend | 2 + backend/wmg/api/v2.py | 18 +- backend/wmg/api/wmg_api_config.py | 9 + backend/wmg/data/snapshot.py | 327 ++++++++++++++---- container_init.sh | 24 ++ tests/unit/backend/wmg/data/test_snapshot.py | 37 ++ .../unit/wmg_processing/test_cube_pipeline.py | 19 +- 11 files changed, 378 insertions(+), 83 deletions(-) create mode 100644 tests/unit/backend/wmg/data/test_snapshot.py diff --git a/.happy/terraform/modules/ecs-stack/main.tf b/.happy/terraform/modules/ecs-stack/main.tf index 2c7448398f05c..0e4cd6d3d9da4 100644 --- a/.happy/terraform/modules/ecs-stack/main.tf +++ b/.happy/terraform/modules/ecs-stack/main.tf @@ -112,6 +112,9 @@ module frontend_service { task_role_arn = local.ecs_role_arn service_port = 9000 memory = var.frontend_memory + + # 30gb of disk storage allocated for the task running the frontend container + task_storage_size_gb = 30 cpu = 2048 deployment_stage = local.deployment_stage step_function_arn = module.upload_sfn.step_function_arn @@ -141,6 +144,9 @@ module backend_service { security_groups = local.security_groups task_role_arn = local.ecs_role_arn service_port = 5000 + + # 100gb of disk storage allocated for the task running backend container + task_storage_size_gb = 100 memory = var.backend_memory cpu = var.backend_cpus * 1024 cmd = local.backend_cmd @@ -154,6 +160,10 @@ module backend_service { dataset_submissions_bucket = local.dataset_submissions_bucket datasets_bucket = local.datasets_bucket execution_role = local.ecs_execution_role + + # Bump health_check_interval from 15 seconds to 30 seconds so that WMG snapshot download, + # which at the time of this writing is around 27GB, has time to complete. + health_check_interval = 30 dd_key_secret_arn = var.dd_key_secret_arn wait_for_steady_state = local.wait_for_steady_state diff --git a/.happy/terraform/modules/ecs-stack/variables.tf b/.happy/terraform/modules/ecs-stack/variables.tf index 86c96ed432eb6..8dcf45f1bfeaa 100644 --- a/.happy/terraform/modules/ecs-stack/variables.tf +++ b/.happy/terraform/modules/ecs-stack/variables.tf @@ -35,6 +35,12 @@ variable happy_config_secret { description = "Happy Path configuration secret name" } +variable "task_storage_size_gb" { + type = number + description = "ephemeral disk storage in GB available for the task" + default = 30 +} + variable deployment_stage { type = string description = "Deployment stage for the app" diff --git a/.happy/terraform/modules/service/main.tf b/.happy/terraform/modules/service/main.tf index 0978514ee2877..16be0132df44e 100644 --- a/.happy/terraform/modules/service/main.tf +++ b/.happy/terraform/modules/service/main.tf @@ -39,6 +39,9 @@ resource aws_ecs_task_definition task_definition { task_role_arn = var.task_role_arn execution_role_arn = var.execution_role requires_compatibilities = ["FARGATE"] + ephemeral_storage { + size_in_gib = var.task_storage_size_gb + } container_definitions = < WmgSnapshot: """ Loads and caches the snapshot identified by the snapshot schema version and a snapshot id. @@ -100,103 +100,195 @@ def load_snapshot( The snapshot representation is cached in memory. Therefore, multiple calls to this function will simply return the cached snapshot if there isn't a newer snapshot id. + + Args: + snapshot_schema_version (str): The version of the snapshot schema. + explicit_snapshot_id_to_load (str, optional): The explicit snapshot id to load. Defaults to None. + snapshot_fs_root_path (str, optional): The root path of the snapshot on the local filesystem. Defaults to None. + + Returns: + WmgSnapshot: The loaded snapshot. + """ global cached_snapshot + if not ( + snapshot_fs_root_path + and _local_disk_snapshot_is_valid( + snapshot_fs_root_path=snapshot_fs_root_path, + snapshot_schema_version=snapshot_schema_version, + explicit_snapshot_id_to_load=explicit_snapshot_id_to_load, + ) + ): + snapshot_fs_root_path = None + should_reload, snapshot_id = _should_reload_snapshot( snapshot_schema_version=snapshot_schema_version, explicit_snapshot_id_to_load=explicit_snapshot_id_to_load, - read_versioned_snapshot=read_versioned_snapshot, + snapshot_fs_root_path=snapshot_fs_root_path, ) if should_reload: cached_snapshot = _load_snapshot( snapshot_schema_version=snapshot_schema_version, snapshot_id=snapshot_id, - read_versioned_snapshot=read_versioned_snapshot, + snapshot_fs_root_path=snapshot_fs_root_path, ) return cached_snapshot ###################################### PRIVATE INTERFACE ################################# -def _get_latest_snapshot_id(*, snapshot_schema_version: str, read_versioned_snapshot: bool) -> str: +def _get_latest_snapshot_id(snapshot_schema_version: str, snapshot_fs_root_path: Optional[str] = None) -> str: """ Get latest snapshot id for a given snapshot schema version + + Args: + snapshot_schema_version (str): The version of the snapshot schema. + snapshot_fs_root_path (str, optional): The root path of the snapshot on the local filesystem. Defaults to None. + + Returns: + str: The latest snapshot id for the given snapshot schema version. """ - data_schema_dir_path = _get_wmg_snapshot_schema_dir_path( - snapshot_schema_version=snapshot_schema_version, - read_versioned_snapshot=read_versioned_snapshot, - ) + data_schema_dir_path = _get_wmg_snapshot_schema_dir_rel_path(snapshot_schema_version) file_name = "latest_snapshot_identifier" - key_path = f"{data_schema_dir_path}/{file_name}" if data_schema_dir_path else file_name + rel_path = f"{data_schema_dir_path}/{file_name}" - latest_snapshot_id = _read_value_at_s3_key(key_path) + latest_snapshot_id = _read_wmg_data_file(rel_path, snapshot_fs_root_path) return latest_snapshot_id -def _get_wmg_snapshot_schema_dir_path(*, snapshot_schema_version: str, read_versioned_snapshot: bool) -> str: +def _get_wmg_snapshot_schema_dir_rel_path(snapshot_schema_version: str) -> str: """ - Get path to a particular snapshot schema version. + Get relative path to a particular snapshot schema version. + + That is, the relative path is the path that does not include root path where the data + resides. + + Examples: + + 1. An S3 fullpath to a snapshot schema version maybe s3://env-rdev-wmg/pr-6447/snapshots/v3. + Here, "s3://env-rdev-wmg" is the S3 bucket. The S3 bucket is considered the "root" of the + fullpath. Therefore, "pr-6447/snapshots/v3" would be the relative path returned by this function. + + 2. A filesystem fullpath to a snapshot schema version maybe: + /single-cell-data-portal/wmg_snapshot_cache/snapshots/v3. + Here, "/single-cell-data-portal/wmg_snapshot_cache" is considered the "root" of the + fullpath. Therefore "snapshots/v3" would be the relative path returned by + this function. + + Args: + snapshot_schema_version (str): The version of the snapshot schema. + + Returns: + str: The relative path to the snapshot schema version. + + """ - data_schema_dir_path = f"snapshots/{snapshot_schema_version}" + data_schema_dir_rel_path = f"snapshots/{snapshot_schema_version}" if WMG_ROOT_DIR_PATH: - data_schema_dir_path = f"{WMG_ROOT_DIR_PATH}/{data_schema_dir_path}" + data_schema_dir_rel_path = f"{WMG_ROOT_DIR_PATH}/{data_schema_dir_rel_path}" - if read_versioned_snapshot: - return data_schema_dir_path - return WMG_ROOT_DIR_PATH + return data_schema_dir_rel_path -def _get_wmg_snapshot_dir_path(*, snapshot_schema_version: str, snapshot_id: str, read_versioned_snapshot: bool) -> str: +def _get_wmg_snapshot_rel_path(snapshot_schema_version: str, snapshot_id: str) -> str: """ - Get path to the snapshot id directory for a the given snapshot schema version. + Get relative path to the snapshot id directory for a the given snapshot schema version. + + That is, the relative path is the path that does not include root path where the data + resides. + + Examples: + + 1. An S3 fullpath to a particular snapshot_id maybe: s3://env-rdev-wmg/pr-6447/snapshots/v3/1704754452. + Here, "s3://env-rdev-wmg" is the S3 bucket. The S3 bucket is considered the "root" of the + fullpath. Therefore, "pr-6447/snapshots/v3/1704754452" would be the relative path + returned by this function. + + 2. A filesystem fullpath to a particular snapshot_id maybe: + /single-cell-data-portal/wmg_snapshot_cache/snapshots/v3/1704754452. + Here, "/single-cell-data-portal/wmg_snapshot_cache" is considered the "root" of the + fullpath. Therefore "snapshots/v3/1704754452" would be the relative path returned by + this function. + + Args: + snapshot_schema_version (str): The version of the snapshot schema. + snapshot_id (str): The unique identifier of the snapshot. + + Returns: + str: The relative path to the snapshot id directory for the given snapshot schema version. + """ - data_schema_dir_path = _get_wmg_snapshot_schema_dir_path( - snapshot_schema_version=snapshot_schema_version, - read_versioned_snapshot=read_versioned_snapshot, - ) + data_schema_dir_rel_path = _get_wmg_snapshot_schema_dir_rel_path(snapshot_schema_version) - snapshot_id_dir_path = f"{data_schema_dir_path}/{snapshot_id}" if data_schema_dir_path else snapshot_id - return snapshot_id_dir_path + snapshot_id_dir_rel_path = f"{data_schema_dir_rel_path}/{snapshot_id}" + return snapshot_id_dir_rel_path -def _get_wmg_snapshot_dir_s3_uri(snapshot_dir_path: str) -> str: +def _get_wmg_snapshot_fullpath(snapshot_rel_path: str, snapshot_fs_root_path: Optional[str] = None) -> str: """ - Get the s3 uri of the snapshot directory + Return the full path of the snapshot on local disk or S3 URI of the snapshot. + + Examples: + + 1. For snapshot on S3, this maybe: s3://env-rdev-wmg/pr-6447/snapshots/v3/1704754452. + + 2. For snapshot on local disk, this maybe: + /single-cell-data-portal/wmg_snapshot_cache/snapshots/v3/1704754452 + + Args: + snapshot_rel_path (str): The relative path of the snapshot. + snapshot_fs_root_path (Optional[str]): The root path of the snapshot in the filesystem. Defaults to None. + + Returns: + str: The full path of the snapshot on local disk or S3 URI of the snapshot. """ + + if snapshot_fs_root_path: + return os.path.join(snapshot_fs_root_path, snapshot_rel_path) + wmg_config = WmgConfig() + return os.path.join("s3://", wmg_config.bucket, snapshot_rel_path) - return os.path.join("s3://", wmg_config.bucket, snapshot_dir_path) +def _load_snapshot( + *, snapshot_schema_version: str, snapshot_id: str, snapshot_fs_root_path: Optional[str] = None +) -> WmgSnapshot: + """ + Load a snapshot given its schema version, id, and root path in the filesystem. -def _load_snapshot(*, snapshot_schema_version: str, snapshot_id: str, read_versioned_snapshot: bool) -> WmgSnapshot: - snapshot_dir_path = _get_wmg_snapshot_dir_path( - snapshot_schema_version=snapshot_schema_version, - snapshot_id=snapshot_id, - read_versioned_snapshot=read_versioned_snapshot, - ) + Args: + snapshot_schema_version (str): The version of the snapshot schema. + snapshot_id (str): The unique identifier of the snapshot. + snapshot_fs_root_path (Optional[str]): The root path of the snapshot in the filesystem. Defaults to None. - cell_type_orderings = _load_cell_type_order(snapshot_dir_path) - primary_filter_dimensions = _load_primary_filter_data(snapshot_dir_path) - filter_relationships = _load_filter_graph_data(snapshot_dir_path) - cell_type_ancestors = _load_cell_type_ancestors(snapshot_dir_path) - dataset_metadata = _load_dataset_metadata(snapshot_dir_path) + Returns: + WmgSnapshot: The loaded snapshot. + """ - snapshot_base_uri = _get_wmg_snapshot_dir_s3_uri(snapshot_dir_path) - logger.info(f"Loading WMG snapshot from directory path {snapshot_dir_path} into URI {snapshot_base_uri}") + snapshot_rel_path = _get_wmg_snapshot_rel_path(snapshot_schema_version, snapshot_id) + + cell_type_orderings = _load_cell_type_order(snapshot_rel_path, snapshot_fs_root_path) + primary_filter_dimensions = _load_primary_filter_data(snapshot_rel_path, snapshot_fs_root_path) + filter_relationships = _load_filter_graph_data(snapshot_rel_path, snapshot_fs_root_path) + cell_type_ancestors = _load_cell_type_ancestors(snapshot_rel_path, snapshot_fs_root_path) + dataset_metadata = _load_dataset_metadata(snapshot_rel_path, snapshot_fs_root_path) + + snapshot_uri = _get_wmg_snapshot_fullpath(snapshot_rel_path, snapshot_fs_root_path) + logger.info(f"Loading WMG snapshot from absolute path: {snapshot_uri}") # TODO: Okay to keep TileDB arrays open indefinitely? Is it faster than re-opening each request? # https://app.zenhub.com/workspaces/single-cell-5e2a191dad828d52cc78b028/issues/chanzuckerberg/single-cell # -data-portal/2134 return WmgSnapshot( snapshot_identifier=snapshot_id, - expression_summary_cube=_open_cube(f"{snapshot_base_uri}/{EXPRESSION_SUMMARY_CUBE_NAME}"), - expression_summary_default_cube=_open_cube(f"{snapshot_base_uri}/{EXPRESSION_SUMMARY_DEFAULT_CUBE_NAME}"), - marker_genes_cube=_open_cube(f"{snapshot_base_uri}/{MARKER_GENES_CUBE_NAME}"), - cell_counts_cube=_open_cube(f"{snapshot_base_uri}/{CELL_COUNTS_CUBE_NAME}"), + expression_summary_cube=_open_cube(f"{snapshot_uri}/{EXPRESSION_SUMMARY_CUBE_NAME}"), + expression_summary_default_cube=_open_cube(f"{snapshot_uri}/{EXPRESSION_SUMMARY_DEFAULT_CUBE_NAME}"), + marker_genes_cube=_open_cube(f"{snapshot_uri}/{MARKER_GENES_CUBE_NAME}"), + cell_counts_cube=_open_cube(f"{snapshot_uri}/{CELL_COUNTS_CUBE_NAME}"), cell_type_orderings=cell_type_orderings.set_index(["tissue_ontology_term_id", "cell_type_ontology_term_id"])[ "order" ].to_dict(), @@ -207,44 +299,110 @@ def _load_snapshot(*, snapshot_schema_version: str, snapshot_id: str, read_versi ) +def _local_disk_snapshot_is_valid( + *, + snapshot_fs_root_path: str, + snapshot_schema_version: str, + explicit_snapshot_id_to_load: Optional[str] = None, +) -> bool: + """ + Checks that the path on local disk contains valid WMG snapshot. + + Args: + snapshot_fs_root_path (str): The root path of the snapshot on the local filesystem. + snapshot_schema_version (str): The version of the snapshot schema. + explicit_snapshot_id_to_load (Optional[str]): The explicit snapshot id to load. Defaults to None. + + Returns: + bool: True if the path on local disk contains valid WMG snapshot, False otherwise. + """ + if not os.path.exists(snapshot_fs_root_path): + logger.warning(f"{snapshot_fs_root_path} does not exist. Falling back to S3 to load WMG snapshot...") + return False + else: + if explicit_snapshot_id_to_load: + snapshot_id = explicit_snapshot_id_to_load + else: + snapshot_id = _get_latest_snapshot_id(snapshot_schema_version, snapshot_fs_root_path) + + snapshot_rel_path = _get_wmg_snapshot_rel_path(snapshot_schema_version, snapshot_id) + snapshot_full_path = _get_wmg_snapshot_fullpath(snapshot_rel_path, snapshot_fs_root_path) + + if not os.path.exists(snapshot_full_path): + logger.warning(f"{snapshot_full_path} does not exist. Falling back to S3 to load WMG snapshot...") + return False + + return True + + def _open_cube(cube_uri) -> Array: return tiledb.open(cube_uri, ctx=create_ctx(json.loads(WmgConfig().tiledb_config_overrides))) -def _load_cell_type_order(snapshot_dir_path: str) -> DataFrame: - key_path = f"{snapshot_dir_path}/{CELL_TYPE_ORDERINGS_FILENAME}" - return pd.read_json(_read_value_at_s3_key(key_path)) +def _load_cell_type_order(snapshot_rel_path: str, snapshot_fs_root_path: Optional[str] = None) -> DataFrame: + rel_path = f"{snapshot_rel_path}/{CELL_TYPE_ORDERINGS_FILENAME}" + return pd.read_json(_read_wmg_data_file(rel_path, snapshot_fs_root_path)) -def _load_primary_filter_data(snapshot_dir_path: str) -> Dict: - key_path = f"{snapshot_dir_path}/{PRIMARY_FILTER_DIMENSIONS_FILENAME}" - return json.loads(_read_value_at_s3_key(key_path)) +def _load_primary_filter_data(snapshot_rel_path: str, snapshot_fs_root_path: Optional[str] = None) -> Dict: + rel_path = f"{snapshot_rel_path}/{PRIMARY_FILTER_DIMENSIONS_FILENAME}" + return json.loads(_read_wmg_data_file(rel_path, snapshot_fs_root_path)) -def _load_dataset_metadata(snapshot_dir_path: str) -> Dict: - key_path = f"{snapshot_dir_path}/{DATASET_METADATA_FILENAME}" - return json.loads(_read_value_at_s3_key(key_path)) +def _load_dataset_metadata(snapshot_rel_path: str, snapshot_fs_root_path: Optional[str] = None) -> Dict: + rel_path = f"{snapshot_rel_path}/{DATASET_METADATA_FILENAME}" + return json.loads(_read_wmg_data_file(rel_path, snapshot_fs_root_path)) -def _load_cell_type_ancestors(snapshot_dir_path: str) -> Dict: - key_path = f"{snapshot_dir_path}/{CELL_TYPE_ANCESTORS_FILENAME}" - return json.loads(_read_value_at_s3_key(key_path)) +def _load_cell_type_ancestors(snapshot_rel_path: str, snapshot_fs_root_path: Optional[str] = None) -> Dict: + rel_path = f"{snapshot_rel_path}/{CELL_TYPE_ANCESTORS_FILENAME}" + return json.loads(_read_wmg_data_file(rel_path, snapshot_fs_root_path)) -def _load_filter_graph_data(snapshot_dir_path: str) -> str: +def _load_filter_graph_data(snapshot_rel_path: str, snapshot_fs_root_path: Optional[str] = None) -> str: try: - key_path = f"{snapshot_dir_path}/{FILTER_RELATIONSHIPS_FILENAME}" - return json.loads(_read_value_at_s3_key(key_path)) + rel_path = f"{snapshot_rel_path}/{FILTER_RELATIONSHIPS_FILENAME}" + return json.loads(_read_wmg_data_file(rel_path, snapshot_fs_root_path)) except Exception: logger.warning( - f"{_get_wmg_snapshot_dir_s3_uri(snapshot_dir_path)}/{FILTER_RELATIONSHIPS_FILENAME} could not be loaded" + f"{_get_wmg_snapshot_fullpath(snapshot_rel_path)}/{FILTER_RELATIONSHIPS_FILENAME} could not be loaded" ) return None -def _read_value_at_s3_key(key_path: str): +def _read_wmg_data_file(rel_path: str, snapshot_fs_root_path: Optional[str] = None) -> str: + """ + Read file from local disk if snapshot_fs_root_path is provided. Otherwise, read from S3. + + When reading from S3, the 'rel_path' argument is the S3 key of the object to read. + When reading from the local filesystem, 'rel_path' is suffixed to the 'snapshot_fs_root_path' + to derive a fullpath of the file to read. + + Args: + rel_path (str): The relative path of the file to read. + snapshot_fs_root_path (Optional[str]): The root path of the snapshot in the filesystem. Defaults to None. + + Returns: + str: The content of the file as a string. + """ + if snapshot_fs_root_path: + full_path = os.path.join(snapshot_fs_root_path, rel_path) + with open(full_path, encoding="utf-8") as f: + data = f.read().strip() + return data + + return _read_value_at_s3_key(key_path=rel_path) + + +def _read_value_at_s3_key(key_path: str) -> str: """ Read value at an s3 key + + Args: + key_path (str): The S3 key path to read the value from. + + Returns: + str: The value read from the specified S3 key path. """ s3 = buckets.portal_resource @@ -259,20 +417,55 @@ def _should_reload_snapshot( *, snapshot_schema_version: str, explicit_snapshot_id_to_load: Optional[str] = None, - read_versioned_snapshot: bool, + snapshot_fs_root_path: Optional[str] = None, ) -> tuple[bool, str]: """ - Returns a pair: (, ) where is a boolean indicating - whether then in-memory snapshot should be reloaded and is the id of the snapshot that - the in-memory data structure represents. + Determine whether the in-memory snapshot should be reloaded and provide the id of the snapshot. + + Args: + snapshot_schema_version (str): The version of the snapshot schema. + explicit_snapshot_id_to_load (Optional[str]): The explicit snapshot id to load. Defaults to None. + snapshot_fs_root_path (Optional[str]): The root path of the snapshot in the filesystem. Defaults to None. + + Returns: + tuple[bool, str]: A pair of values. The first is a boolean indicating whether the in-memory snapshot should be reloaded. The second is the id of the snapshot that the in-memory data structure represents. """ + snapshot_id = explicit_snapshot_id_to_load or _get_latest_snapshot_id( - snapshot_schema_version=snapshot_schema_version, read_versioned_snapshot=read_versioned_snapshot + snapshot_schema_version, snapshot_fs_root_path ) if cached_snapshot is None: logger.info(f"Loading snapshot id: {snapshot_id}") return (True, snapshot_id) + ######################### AN IMPORTANT NOTE ################################# + # 1. As of this writing on 01/12/2024, when the app is configured to read + # the WMG snapshot from the local filesystem, the latest snapshot id will always + # equal the snapshot id of the `cached_snapshot` object. + # + # This is because the scheme of downloading all the snapshots to the local filesystem + # is done ONLY ONCE ON APP CONTAINER INITIALIZATION. That is, the code in the below + # `elif` block will only execute if the app is configred to read from S3. If the app + # is changed such that the local filesytem cache is updated by a another thread, then + # the filesystem cache will behave like S3 in that updates to 'latest_snapshot_identifier' + # file will eventually be reflected in the local disk while the app is still running. But + # as of this writing, the WMG snapshot on local filesystem remains static throughout the + # lifetime of a running application server process. + # + # 2. In the case of the application being configured to read from S3, note well that the + # below `elif` condition will be come True if the value in 'latest_snapshot_identifier' file + # is updated to be different from 'cached_snapshot.snapshot_identifer'. That is, it is possible + # 'latest_snapshot_identifier' file could be modified to contain a snapshot_id that is older + # than what is in 'cached_snapshot.snapshot_identifier'. + # + # This might be useful if we want to remove a corrupt wmg snapshot by simply + # updating the latest_snapshot_identifer file to be an older snapshot_id without + # requiring updating the explicit_snapshot_id_to_load in code and deploying. + # Such an approach to rollback (updating the latest_snapshot_identifier on S3/filesystem) + # should only be used in EXTREME emergencies. The normal rollback of updating the code's + # config file and redeploying should be used in all other scenarios as this form of + # rollback provides an audit log via git commit history. + elif snapshot_id != cached_snapshot.snapshot_identifier: logger.info( f"Reloading snapshot. Detected snapshot id update from cached " diff --git a/container_init.sh b/container_init.sh index 9a20f5fbcd2d9..dcaa73312fc07 100755 --- a/container_init.sh +++ b/container_init.sh @@ -5,6 +5,30 @@ echo "| starting backend container" echo " =====" echo +# Download WMG data snapshot to a mounted filesystem of the compute node on AWS +# This is done as optimization because retrieving data from local disk is +# significantly faster than retrieving data from S3 +WMG_SNAPSHOT_FS_CACHE_ROOT_PATH="/single-cell-data-portal/wmg_snapshot_cache" + +if [[ "${DEPLOYMENT_STAGE}" == "rdev" && -n "${REMOTE_DEV_PREFIX}" ]]; then + echo "| Downloading WMG data snapshot for RDEV stack: ${REMOTE_DEV_PREFIX} from S3 to filesystem path: ${WMG_SNAPSHOT_FS_CACHE_ROOT_PATH}" + + strip_slash_remote_dev_prefix="${REMOTE_DEV_PREFIX//\//}" # strips ALL "/" + + echo aws s3 sync "s3://env-rdev-wmg/${strip_slash_remote_dev_prefix}/snapshots" "${WMG_SNAPSHOT_FS_CACHE_ROOT_PATH}/${strip_slash_remote_dev_prefix}/snapshots" + + aws s3 sync "s3://env-rdev-wmg/${strip_slash_remote_dev_prefix}/snapshots" "${WMG_SNAPSHOT_FS_CACHE_ROOT_PATH}/${strip_slash_remote_dev_prefix}/snapshots" +elif [[ "${DEPLOYMENT_STAGE}" == "dev" || "${DEPLOYMENT_STAGE}" == "staging" || "${DEPLOYMENT_STAGE}" == "prod" ]]; then + echo "| Downloading WMG data snapshot for deployment env: ${DEPLOYMENT_STAGE} from S3 to filesystem path: ${WMG_SNAPSHOT_FS_CACHE_ROOT_PATH}" + echo aws s3 sync "s3://cellxgene-wmg-${DEPLOYMENT_STAGE}/snapshots" "${WMG_SNAPSHOT_FS_CACHE_ROOT_PATH}/snapshots" + + aws s3 sync "s3://cellxgene-wmg-${DEPLOYMENT_STAGE}/snapshots" "${WMG_SNAPSHOT_FS_CACHE_ROOT_PATH}/snapshots" +else + echo "| Skipping downloading WMG data snapshot for deployment env: ${DEPLOYMENT_STAGE}..." +fi + +echo "| Finished downloading WMG data snapshot from S3 to filesystem path: ${WMG_SNAPSHOT_FS_CACHE_ROOT_PATH} for valid deployment environments" + # If user passed a command line, run it in place of the server if [ $# -ne 0 ]; then exec "$@" diff --git a/tests/unit/backend/wmg/data/test_snapshot.py b/tests/unit/backend/wmg/data/test_snapshot.py new file mode 100644 index 0000000000000..d7d0de599ff29 --- /dev/null +++ b/tests/unit/backend/wmg/data/test_snapshot.py @@ -0,0 +1,37 @@ +import os +from unittest.mock import patch + +import pytest + +from backend.wmg.data.snapshot import ( + _get_wmg_snapshot_fullpath, + _get_wmg_snapshot_rel_path, + _get_wmg_snapshot_schema_dir_rel_path, +) + + +def test_get_wmg_snapshot_schema_dir_rel_path(): + snapshot_schema_version = "1.0.0" + expected_path = f"snapshots/{snapshot_schema_version}" + assert _get_wmg_snapshot_schema_dir_rel_path(snapshot_schema_version) == expected_path + + +def test_get_wmg_snapshot_rel_path(): + snapshot_schema_version = "1.0.0" + snapshot_id = "test_id" + expected_path = f"snapshots/{snapshot_schema_version}/{snapshot_id}" + assert _get_wmg_snapshot_rel_path(snapshot_schema_version, snapshot_id) == expected_path + + +@pytest.mark.parametrize("snapshot_fs_root_path", [None, "/tmp"]) +@patch("backend.wmg.data.snapshot.WmgConfig", autospec=True) +def test_get_wmg_snapshot_fullpath(mock_wmg_config, snapshot_fs_root_path): + mock_wmg_config.return_value.bucket = "test-bucket" + snapshot_schema_version = "1.0.0" + snapshot_id = "test_id" + snapshot_rel_path = f"snapshots/{snapshot_schema_version}/{snapshot_id}" + full_path = _get_wmg_snapshot_fullpath(snapshot_rel_path, snapshot_fs_root_path) + if snapshot_fs_root_path: + assert full_path == os.path.join(snapshot_fs_root_path, snapshot_rel_path) + else: + assert full_path == os.path.join("s3://test-bucket", snapshot_rel_path) diff --git a/tests/unit/wmg_processing/test_cube_pipeline.py b/tests/unit/wmg_processing/test_cube_pipeline.py index 5af241e19d948..21af7534a82a1 100644 --- a/tests/unit/wmg_processing/test_cube_pipeline.py +++ b/tests/unit/wmg_processing/test_cube_pipeline.py @@ -6,7 +6,7 @@ from unittest import mock from unittest.mock import Mock, patch -from backend.wmg.data.snapshot import _get_wmg_snapshot_schema_dir_path +from backend.wmg.data.snapshot import _get_wmg_snapshot_schema_dir_rel_path from backend.wmg.pipeline import logger, main from backend.wmg.pipeline.constants import MAXIMUM_ADMISSIBLE_CENSUS_SCHEMA_MAJOR_VERSION from backend.wmg.pipeline.expression_summary_and_cell_counts import create_expression_summary_and_cell_counts_cubes @@ -55,8 +55,7 @@ def test_pipeline_creates_files(self): def test_versioned_s3_paths(self): """ - Tests that the path we use for writing the snapshot is the versioned path, and verifies that the - path we read from for API usage respects the read_versioned_snapshot param. + Tests that the path we use for writing the snapshot is the versioned path. NOTE: Ideally, we would want this to be a true test of the cube pipeline that actually runs the pipeline, mocks the S3 upload, and then verifies that the API reads are pulling from the correct mocked S3 files. @@ -71,20 +70,10 @@ def test_versioned_s3_paths(self): dest_path = _get_wmg_snapshot_s3_fullpath("v1", "snapshot-id", True) self.assertEqual(dest_path, f"s3://{wmg_bucket_name}/snapshots/v1/snapshot-id") - # Verify that we're reading from the versioned path if we pass in read_versioned_snapshot=True - verioned_read_path = _get_wmg_snapshot_schema_dir_path( - snapshot_schema_version="v1", - read_versioned_snapshot=True, - ) + # Verify that we're reading from the versioned path + verioned_read_path = _get_wmg_snapshot_schema_dir_rel_path(snapshot_schema_version="v1") self.assertEqual(verioned_read_path, "snapshots/v1") - # Verify that we're reading from the non-versioned path if we pass in read_versioned_snapshot=False - root_read_path = _get_wmg_snapshot_schema_dir_path( - snapshot_schema_version="v1", - read_versioned_snapshot=False, - ) - self.assertEqual(root_read_path, "") - def test__pipeline_fails_if_census_schema_version_unsupported(self): # test that the pipeline fails if the census schema version is unsupported with patch(