Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added ora file decompression fastq pair component #624

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/.yarn/releases/** binary
/.yarn/plugins/** binary
*.json.gz filter=lfs diff=lfs merge=lfs -text
*.tar.gz filter=lfs diff=lfs merge=lfs -text
9 changes: 9 additions & 0 deletions config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ import {
getOncoanalyserPipelineManagerStackProps,
getOncoanalyserPipelineTableStackProps,
} from './stacks/oncoanalyser';
import {
getOraCompressionIcav2PipelineManagerStackProps,
getOraCompressionIcav2PipelineTableStackProps,
} from './stacks/oraCompressionPipelineManager';
import { getOraDecompressionManagerStackProps } from './stacks/oraDecompressionPipelineManager';

interface EnvironmentConfig {
name: string;
Expand Down Expand Up @@ -92,6 +97,7 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null
wtsIcav2PipelineTableStackProps: getWtsIcav2PipelineTableStackProps(),
umccriseIcav2PipelineTableStackProps: getUmccriseIcav2PipelineTableStackProps(),
rnasumIcav2PipelineTableStackProps: getRnasumIcav2PipelineTableStackProps(),
oraCompressionIcav2PipelineTableStackProps: getOraCompressionIcav2PipelineTableStackProps(),
BclConvertTableStackProps: getBclConvertManagerTableStackProps(stage),
stackyStatefulTablesStackProps: getStatefulGlueStackProps(),
pierianDxPipelineTableStackProps: getPierianDxPipelineTableStackProps(),
Expand All @@ -115,6 +121,9 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null
pieriandxPipelineManagerStackProps: getPierianDxPipelineManagerStackProps(stage),
oncoanalyserPipelineManagerStackProps: getOncoanalyserPipelineManagerStackProps(stage),
sashPipelineManagerStackProps: getSashPipelineManagerStackProps(stage),
oraCompressionIcav2PipelineManagerStackProps:
getOraCompressionIcav2PipelineManagerStackProps(stage),
oraDecompressionManagerStackProps: getOraDecompressionManagerStackProps(stage),
eventSchemaStackProps: getEventSchemaStackProps(),
dataSchemaStackProps: getDataSchemaStackProps(),
bclConvertManagerStackProps: getBclConvertManagerStackProps(stage),
Expand Down
50 changes: 50 additions & 0 deletions config/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -808,3 +808,53 @@ export const stackyAnalysisLogsUriSsmParameterName = '/orcabus/stacky/analysis_l
// stg: s3://pipeline-stg-cache-503977275616-ap-southeast-2/byob-icav2/staging/cache/__workflow_name__/__portal_run_id__/
// prod: s3://pipeline-prod-cache-503977275616-ap-southeast-2/byob-icav2/production/cache/__workflow_name__/__portal_run_id__/
export const stackyAnalysisCacheUriSsmParameterName = '/orcabus/stacky/analysis_cache_uri';

/*
Resources generated by the ORA Compression pipeline
*/

export const oraCompressionSSMRoot = '/orcabus/ora_compression/';

export const oraCompressionIcav2PipelineManagerDynamodbTableName =
'oraCompressionICAv2AnalysesDynamoDBTable';

// Stateful
export const oraCompressionDynamoDbTableSSMName = path.join(
oraCompressionSSMRoot,
'dynamodb_table_name'
);
export const oraCompressionDynamoDbTableSSMArn = path.join(
oraCompressionSSMRoot,
'dynamodb_table_arn'
);

// Stateless
export const oraCompressionIcav2PipelineWorkflowType = 'ora-compression';
export const oraCompressionIcav2PipelineWorkflowTypeVersion = '4.2.4--v2';
export const oraCompressionIcav2ServiceVersion = '2024.07.01';

export const oraCompressionIcav2ReadyEventSource = 'orcabus.workflowmanager';
export const oraCompressionIcav2EventSource = 'orcabus.oracompression';
export const oraCompressionIcav2EventDetailType = 'WorkflowRunStateChange';
export const oraCompressionStateMachinePrefix = 'oraCompressionSfn';

/*
Resources used by the ora compression pipeline
*/

// Release can be found here: https://github.com/umccr/cwl-ica/releases/tag/dragen-instrument-run-fastq-to-ora-pipeline%2F4.2.4__20241030041958
// Pipeline ID is: ba8f618a-842f-4a2f-9b2f-a074c0472218 // FIXME not in stg/prod
export const oraCompressionIcav2PipelineIdSSMParameterPath =
'/icav2/umccr-prod/ora_compression_pipeline_id';

// Default Reference Uri for compressing ORA files // FIXME not in stg/prod
export const oraCompressionDefaultReferenceUriSSmParameterPath =
'/icav2/umccr-prod/ora_compression_default_reference_version_uri';

/*
Resources generated by the ora decompression manager
*/
export const oraDecompressionIcav2ReadyEventSource = 'orcabus.workflowmanager';
export const oraDecompressionIcav2EventSource = 'orcabus.oradecompression';
export const oraDecompressionIcav2EventDetailType = 'FastqListRowDecompressed';
export const oraDecompressionStateMachinePrefix = 'oraDecompressionSfn';
57 changes: 57 additions & 0 deletions config/stacks/oraCompressionPipelineManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import {
AppStage,
eventBusName,
icaEventPipeStackName,
icav2AccessTokenSecretName,
oraCompressionIcav2PipelineIdSSMParameterPath,
oraCompressionIcav2PipelineManagerDynamodbTableName,
oraCompressionIcav2PipelineWorkflowType,
oraCompressionIcav2PipelineWorkflowTypeVersion,
oraCompressionIcav2ServiceVersion,
oraCompressionIcav2ReadyEventSource,
oraCompressionIcav2EventSource,
oraCompressionIcav2EventDetailType,
oraCompressionDynamoDbTableSSMArn,
oraCompressionDynamoDbTableSSMName,
oraCompressionDefaultReferenceUriSSmParameterPath,
oraCompressionStateMachinePrefix,
} from '../constants';
import { OraCompressionIcav2PipelineTableConfig } from '../../lib/workload/stateful/stacks/ora-decompression-dynamodb/deploy/stack';
import { OraCompressionIcav2PipelineManagerConfig } from '../../lib/workload/stateless/stacks/ora-compression-manager/deploy';

// Stateful
export const getOraCompressionIcav2PipelineTableStackProps =
(): OraCompressionIcav2PipelineTableConfig => {
return {
oraDecompressionIcav2DynamodbTableArnSsmParameterPath: oraCompressionDynamoDbTableSSMArn,
oraDecompressionIcav2DynamodbTableNameSsmParameterPath: oraCompressionDynamoDbTableSSMName,
dynamodbTableName: oraCompressionIcav2PipelineManagerDynamodbTableName,
};
};

// Stateless
export const getOraCompressionIcav2PipelineManagerStackProps = (
stage: AppStage
): OraCompressionIcav2PipelineManagerConfig => {
return {
/* ICAv2 Pipeline analysis essentials */
icav2TokenSecretId: icav2AccessTokenSecretName[stage], // "/icav2/umccr-prod/service-production-jwt-token-secret-arn"
/* Table to store analyis metadata */
dynamodbTableName: oraCompressionIcav2PipelineManagerDynamodbTableName,
/* Internal and external buses */
eventBusName: eventBusName,
icaEventPipeName: `${icaEventPipeStackName}Pipe`,
/* Event handling */
workflowName: oraCompressionIcav2PipelineWorkflowType,
workflowVersion: oraCompressionIcav2PipelineWorkflowTypeVersion,
serviceVersion: oraCompressionIcav2ServiceVersion,
triggerLaunchSource: oraCompressionIcav2ReadyEventSource,
internalEventSource: oraCompressionIcav2EventSource,
detailType: oraCompressionIcav2EventDetailType,
/* Names for statemachines */
stateMachinePrefix: oraCompressionStateMachinePrefix,
/* SSM Workflow Parameters */
referenceUriSsmPath: oraCompressionDefaultReferenceUriSSmParameterPath,
pipelineIdSsmPath: oraCompressionIcav2PipelineIdSSMParameterPath, // List of parameters the workflow session state machine will need access to
};
};
28 changes: 28 additions & 0 deletions config/stacks/oraDecompressionPipelineManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import {
AppStage,
eventBusName,
icav2AccessTokenSecretName,
oraDecompressionIcav2ReadyEventSource,
oraDecompressionIcav2EventSource,
oraDecompressionIcav2EventDetailType,
oraDecompressionStateMachinePrefix,
} from '../constants';
import { OraDecompressionPipelineManagerConfig } from '../../lib/workload/stateless/stacks/ora-decompression-manager/deploy';

// Stateless
export const getOraDecompressionManagerStackProps = (
stage: AppStage
): OraDecompressionPipelineManagerConfig => {
return {
/* ICAv2 Pipeline analysis essentials */
icav2TokenSecretId: icav2AccessTokenSecretName[stage], // "/icav2/umccr-prod/service-production-jwt-token-secret-arn"
/* Internal and external buses */
eventBusName: eventBusName,
triggerEventSource: oraDecompressionIcav2ReadyEventSource,
outputEventSource: oraDecompressionIcav2EventSource,
/* Event handling */
detailType: oraDecompressionIcav2EventDetailType,
/* Names for statemachines */
stateMachinePrefix: oraDecompressionStateMachinePrefix,
};
};
4 changes: 2 additions & 2 deletions lib/workload/components/icav2-copy-files-batch/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ export class ICAv2CopyBatchUtilityConstruct extends Construct {
});

// Add execution permissions to stateMachine role
manifestInverterLambda.currentVersion.grantInvoke(this.icav2CopyFilesBatchSfnObj.role);
manifestInverterLambda.currentVersion.grantInvoke(this.icav2CopyFilesBatchSfnObj);

// Because we run a nested state machine, we need to add the permissions to the state machine role
// See https://stackoverflow.com/questions/60612853/nested-step-function-in-a-step-function-unknown-error-not-authorized-to-cr
Expand All @@ -68,6 +68,6 @@ export class ICAv2CopyBatchUtilityConstruct extends Construct {
);

// Add state machine execution permissions to stateMachineBatch role
this.icav2CopyFilesSfnObj.grantStartExecution(this.icav2CopyFilesBatchSfnObj.role);
this.icav2CopyFilesSfnObj.grantStartExecution(this.icav2CopyFilesBatchSfnObj);
}
}
7 changes: 2 additions & 5 deletions lib/workload/components/icav2-copy-files/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Duration } from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as secretsManager from 'aws-cdk-lib/aws-secretsmanager';
import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha';
Expand Down Expand Up @@ -35,9 +34,7 @@ export class ICAv2CopyFilesConstruct extends Construct {
});

// Allow launch job lambda to read the secret
props.icav2JwtSecretParameterObj.grantRead(
<iam.Role>check_or_launch_job_lambda.currentVersion.role
);
props.icav2JwtSecretParameterObj.grantRead(check_or_launch_job_lambda.currentVersion);

// Specify the single statemachine and replace the arn placeholders with the lambda arns defined above
this.icav2CopyFilesSfnObj = new sfn.StateMachine(this, 'copy_single_state_machine', {
Expand All @@ -54,6 +51,6 @@ export class ICAv2CopyFilesConstruct extends Construct {
});

// Add execution permissions to stateMachine role
check_or_launch_job_lambda.currentVersion.grantInvoke(this.icav2CopyFilesSfnObj.role);
check_or_launch_job_lambda.currentVersion.grantInvoke(this.icav2CopyFilesSfnObj);
}
}
114 changes: 114 additions & 0 deletions lib/workload/components/ora-file-decompression-fq-pair-sfn/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#!/usr/bin/env python3

import { Construct } from 'constructs';
import * as ecs from 'aws-cdk-lib/aws-ecs';
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as secretsManager from 'aws-cdk-lib/aws-secretsmanager';
import * as ecrAssets from 'aws-cdk-lib/aws-ecr-assets';
import path from 'path';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import { RetentionDays } from 'aws-cdk-lib/aws-logs';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as cdk from 'aws-cdk-lib';

export interface OraDecompressionConstructProps {
sfnPrefix: string;
icav2AccessTokenSecretId: string;
}

export class OraDecompressionConstruct extends Construct {
public readonly sfnObject: sfn.StateMachine;

constructor(scope: Construct, id: string, props: OraDecompressionConstructProps) {
super(scope, id);

// Set up task definition and cluster
// a fargate cluster we use for non-lambda Tasks
// we sometimes need to execute tasks in a VPC context so we need one of these
const vpc = ec2.Vpc.fromLookup(this, 'MainVpc', {
vpcName: 'main-vpc',
});
const cluster = new ecs.Cluster(this, 'FargateCluster', {
vpc: vpc,
enableFargateCapacityProviders: true,
containerInsights: true,
});
const taskDefinition = new ecs.FargateTaskDefinition(this, 'FargateTaskDefinition', {
runtimePlatform: {
cpuArchitecture: ecs.CpuArchitecture.ARM64,
},
cpu: 8192, // Maps to 8 CPUs
// For 8 CPU:
// Available memory values:
// Between 16384 (16 GB) and 61440 (60 GB) in increments of 4096 (4 GB)
memoryLimitMiB: 16384,
});
// We also need a security group context to run the task in
const securityGroup = new ec2.SecurityGroup(this, 'SecurityGroup', {
vpc,
});

// Generate the docker image asset
const architecture = lambda.Architecture.ARM_64;
const oraDecompressionImage = new ecrAssets.DockerImageAsset(this, 'OraDecompression', {
directory: path.join(__dirname, 'tasks', 'ora_decompression'),
buildArgs: {
TARGETPLATFORM: architecture.dockerPlatform,
},
});

// Add permission to task role
const icav2SecretObj = secretsManager.Secret.fromSecretNameV2(
this,
'icav2SecretObject',
props.icav2AccessTokenSecretId
);
icav2SecretObj.grantRead(taskDefinition.taskRole);

// Add container to task role
const oraDecompressionContainer = taskDefinition.addContainer('oraDecompressionContainer', {
image: ecs.ContainerImage.fromDockerImageAsset(oraDecompressionImage),
containerName: `${props.sfnPrefix}-orad-container`,
logging: ecs.LogDriver.awsLogs({
streamPrefix: 'orad',
logRetention: RetentionDays.ONE_WEEK,
}),
});

// Set up step function
// Build state machine object
this.sfnObject = new sfn.StateMachine(this, 'state_machine', {
stateMachineName: `${props.sfnPrefix}-ora-decompression-sfn`,
definitionBody: sfn.DefinitionBody.fromFile(
path.join(
__dirname,
'step_functions_templates/decompress_ora_fastq_pair_sfn_template.asl.json'
)
),
definitionSubstitutions: {
/* ICAv2 Secret ID */
__icav2_access_token_secret_id__: icav2SecretObj.secretName,
/* Task Definition and Cluster ARNs */
__ora_decompression_cluster_arn__: cluster.clusterArn,
__ora_task_definition_arn__: taskDefinition.taskDefinitionArn,
__ora_container_name__: oraDecompressionContainer.containerName,
__subnets__: cluster.vpc.privateSubnets.map((subnet) => subnet.subnetId).join(','),
__sg_group__: securityGroup.securityGroupId,
},
});

// Allow step function to run the ECS task
taskDefinition.grantRun(this.sfnObject);

/* Grant the state machine access to monitor the tasks */
this.sfnObject.addToRolePolicy(
new iam.PolicyStatement({
resources: [
`arn:aws:events:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:rule/StepFunctionsGetEventsForECSTaskRule`,
],
actions: ['events:PutTargets', 'events:PutRule', 'events:DescribeRule'],
})
);
}
}
Loading
Loading