Skip to content

Commit

Permalink
Merging staging branch into prod branch
Browse files Browse the repository at this point in the history
  • Loading branch information
nayib-jose-gloria committed Sep 4, 2024
2 parents 6baf9a7 + b4eb214 commit a640771
Show file tree
Hide file tree
Showing 19 changed files with 1,086 additions and 56 deletions.
62 changes: 62 additions & 0 deletions .happy/terraform/modules/batch/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,68 @@ resource aws_batch_job_definition dataset_metadata_update {
})
}

resource aws_batch_job_definition rollback {
type = "container"
name = "dp-${var.deployment_stage}-${var.custom_stack_name}-rollback"
container_properties = jsonencode({
"command": ["python3", "-m", "backend.layers.processing.rollback"],
"jobRoleArn": "${var.batch_role_arn}",
"image": "${var.image}",
"memory": 8000,
"environment": [
{
"name": "ARTIFACT_BUCKET",
"value": "${var.artifact_bucket}"
},
{
"name": "CELLXGENE_BUCKET",
"value": "${var.cellxgene_bucket}"
},
{
"name": "DATASETS_BUCKET",
"value": "${var.datasets_bucket}"
},
{
"name": "DEPLOYMENT_STAGE",
"value": "${var.deployment_stage}"
},
{
"name": "AWS_DEFAULT_REGION",
"value": "${data.aws_region.current.name}"
},
{
"name": "REMOTE_DEV_PREFIX",
"value": "${var.remote_dev_prefix}"
}
],
"vcpus": 1,
"linuxParameters": {
"maxSwap": 0,
"swappiness": 0
},
"retryStrategy": {
"attempts": 3,
"evaluateOnExit": [
{
"action": "RETRY",
"onReason": "Task failed to start"
},
{
"action": "EXIT",
"onReason": "*"
}
]
},
"logConfiguration": {
"logDriver": "awslogs",
"options": {
"awslogs-group": "${aws_cloudwatch_log_group.cloud_watch_logs_group.id}",
"awslogs-region": "${data.aws_region.current.name}"
}
}
})
}

resource aws_cloudwatch_log_group cloud_watch_logs_group {
retention_in_days = 365
name = "/dp/${var.deployment_stage}/${var.custom_stack_name}/upload"
Expand Down
51 changes: 50 additions & 1 deletion backend/layers/business/business.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
DatasetVersionNotFoundException,
InvalidURIException,
MaxFileSizeExceededException,
NoPreviousCollectionVersionException,
NoPreviousDatasetVersionException,
)
from backend.layers.common import validation
Expand Down Expand Up @@ -273,6 +274,19 @@ def get_unpublished_collection_version_from_canonical(
unpublished_collection = collection
return unpublished_collection

def get_unpublished_collection_versions_from_canonical(
self, collection_id: CollectionId
) -> List[CollectionVersionWithDatasets]:
"""
Given a canonical collection_id, retrieves its latest unpublished versions (max of 2 with a migration_revision
and non-migration revision)
"""
unpublished_collections = []
for collection in self.get_collection_versions_from_canonical(collection_id):
if collection.published_at is None:
unpublished_collections.append(collection)
return unpublished_collections

def get_collection_url(self, collection_id: str) -> str:
return f"{CorporaConfig().collections_base_url}/collections/{collection_id}"

Expand Down Expand Up @@ -437,7 +451,7 @@ def update_collection_version(
):
for dataset in current_collection_version.datasets:
if dataset.status.processing_status != DatasetProcessingStatus.SUCCESS:
self.logger.info(
logger.info(
f"Dataset {dataset.version_id.id} is not successfully processed. Skipping metadata update."
)
continue
Expand Down Expand Up @@ -1323,3 +1337,38 @@ def restore_previous_dataset_version(
self.database_provider.replace_dataset_in_collection_version(
collection_version_id, current_version.version_id, previous_version_id
)

def restore_previous_collection_version(self, collection_id: CollectionId) -> CollectionVersion:
"""
Restore the previously published collection version for a collection, if any exist.
Returns CollectionVersion that was replaced, and is no longer linked to canonical collection.
:param collection_id: The collection id to restore the previous version of.
"""
version_to_replace = self.get_collection_version_from_canonical(collection_id)
all_published_versions = list(self.get_all_published_collection_versions_from_canonical(collection_id))
if len(all_published_versions) < 2:
raise NoPreviousCollectionVersionException(f"No previous collection version for collection {collection_id}")

# get most recent previously published version
previous_version = None
previous_published_at = datetime.fromtimestamp(0)

for version in all_published_versions:
if version.version_id == version_to_replace.version_id:
continue
if version.published_at > previous_published_at:
previous_version = version
previous_published_at = version.published_at

logger.info(
{
"message": "Restoring previous collection version",
"collection_id": collection_id.id,
"replace_version_id": version_to_replace.version_id.id,
"restored_version_id": previous_version.version_id.id,
}
)
self.database_provider.replace_collection_version(collection_id, previous_version.version_id)
return version_to_replace
6 changes: 6 additions & 0 deletions backend/layers/business/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ class NoPreviousDatasetVersionException(BusinessException):
"""


class NoPreviousCollectionVersionException(BusinessException):
"""
Raised when a previous collection version is expected, but does not exist
"""


class InvalidLinkException(BusinessException):
def __init__(self, errors: Optional[List[str]] = None) -> None:
self.errors: Optional[List[str]] = errors
Expand Down
13 changes: 13 additions & 0 deletions backend/layers/persistence/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -1106,3 +1106,16 @@ def get_previous_dataset_version_id(self, dataset_id: DatasetId) -> Optional[Dat
if version_id is None:
return None
return DatasetVersionId(str(version_id.id))

def replace_collection_version(
self, collection_id: CollectionId, new_collection_version_id: CollectionVersionId
) -> None:
"""
Replaces the version_id of a canonical collection, and deletes the replaced CollectionVersionId.
"""
with self._manage_session() as session:
collection = session.query(CollectionTable).filter_by(id=collection_id.id).one()
replaced_version_id = str(collection.version_id)
collection.version_id = new_collection_version_id.id
replaced_collection_version = session.query(CollectionVersionTable).filter_by(id=replaced_version_id).one()
session.delete(replaced_collection_version)
7 changes: 7 additions & 0 deletions backend/layers/persistence/persistence_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,13 @@ def replace_dataset_in_collection_version(
Replaces an existing mapping between a collection version and a dataset version
"""

def replace_collection_version(
self, collection_id: CollectionId, new_collection_version_id: CollectionVersionId
) -> None:
"""
Replaces existing canonical Collection mapping with a new CollectionVersionId
"""

def get_dataset_mapped_version(self, dataset_id: DatasetId, get_tombstoned: bool) -> Optional[DatasetVersion]:
"""
Returns the dataset version mapped to a canonical dataset_id, or None if not existing
Expand Down
9 changes: 8 additions & 1 deletion backend/layers/persistence/persistence_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def get_canonical_collection(self, collection_id: CollectionId) -> Optional[Cano
def get_all_mapped_collection_versions(
self, get_tombstoned: bool = False
) -> Iterable[CollectionVersion]: # TODO: add filters if needed
for version_id, collection_version in self.collections_versions.items():
for version_id, collection_version in list(self.collections_versions.items()):
if version_id in [c.version_id.id for c in self.collections.values()]:
collection_id = collection_version.collection_id.id
if not get_tombstoned and self.collections[collection_id].tombstoned:
Expand Down Expand Up @@ -588,6 +588,13 @@ def replace_dataset_in_collection_version(
collection_version.datasets[idx] = new_dataset_version_id
return copy.deepcopy(new_dataset_version)

def replace_collection_version(
self, collection_id: CollectionId, new_collection_version_id: CollectionVersionId
) -> None:
old_version_id = self.collections[collection_id.id].version_id
self.collections[collection_id.id].version_id = new_collection_version_id
del self.collections_versions[old_version_id.id]

def set_collection_version_datasets_order(
self,
collection_version_id: CollectionVersionId,
Expand Down
9 changes: 9 additions & 0 deletions backend/layers/processing/process_seurat.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)
from backend.layers.processing.logger import logit
from backend.layers.processing.process_logic import ProcessingLogic
from backend.layers.processing.utils.matrix_utils import enforce_canonical_format
from backend.layers.processing.utils.rds_citation_from_h5ad import rds_citation_from_h5ad
from backend.layers.thirdparty.s3_provider import S3ProviderInterface
from backend.layers.thirdparty.uri_provider import UriProviderInterface
Expand Down Expand Up @@ -74,6 +75,14 @@ def process(self, dataset_version_id: DatasetVersionId, artifact_bucket: str, da
adata = anndata.read_h5ad(labeled_h5ad_filename)
if "citation" in adata.uns:
adata.uns["citation"] = rds_citation_from_h5ad(adata.uns["citation"])

# enforce for canonical
logger.info("enforce canonical format in X")
enforce_canonical_format(adata)
if adata.raw:
logger.info("enforce canonical format in raw.X")
enforce_canonical_format(adata.raw)

adata.write_h5ad(labeled_h5ad_filename)

# Use Seurat to convert to RDS
Expand Down
Loading

0 comments on commit a640771

Please sign in to comment.