diff --git a/gcp_deepvariant_runner.py b/gcp_deepvariant_runner.py index d1ef5ac..731c069 100644 --- a/gcp_deepvariant_runner.py +++ b/gcp_deepvariant_runner.py @@ -62,6 +62,7 @@ import time import urllib import uuid +import enum import gke_cluster from google.api_core import exceptions as google_exceptions @@ -161,6 +162,66 @@ }} """ +# Following const values are used to automatically set the computational flags. +_WGS_STANDARD = 'wgs_standard' +_WES_STANDARD = 'wes_standard' +_WES_LARGE_THR = 12 * 1024 * 1024 * 1024 +_WGS_SMALL_THR = 25 * 1024 * 1024 * 1024 +_WGS_LARGE_THR = 200 * 1024 * 1024 * 1024 + + +class BamCategories(enum.Enum): + """List of BAM categories that determine automatically assigned flags.""" + WES_SMALL = 0 + WES_LARGE = 1 + WGS_SMALL = 2 + WGS_MEDIUM = 3 + WGS_LARGE = 4 + + +# Default optimal computational flag values, one per BAM category. +_DEFAULT_FLAGS = {} +_DEFAULT_FLAGS[BamCategories.WES_SMALL] = { + 'make_examples_workers': 8, + 'make_examples_cores_per_worker': 2, + 'call_variants_workers': 1, + 'call_variants_cores_per_worker': 2, + 'gpu': True +} +_DEFAULT_FLAGS[BamCategories.WES_LARGE] = { + 'make_examples_workers': 8, + 'make_examples_cores_per_worker': 2, + 'call_variants_workers': 2, + 'call_variants_cores_per_worker': 2, + 'gpu': True +} +_DEFAULT_FLAGS[BamCategories.WGS_SMALL] = { + 'make_examples_workers': 16, + 'make_examples_cores_per_worker': 2, + 'call_variants_workers': 2, + 'call_variants_cores_per_worker': 2, + 'gpu': True +} +_DEFAULT_FLAGS[BamCategories.WGS_MEDIUM] = { + 'make_examples_workers': 32, + 'make_examples_cores_per_worker': 2, + 'call_variants_workers': 4, + 'call_variants_cores_per_worker': 2, + 'gpu': True +} +_DEFAULT_FLAGS[BamCategories.WGS_LARGE] = { + 'make_examples_workers': 64, + 'make_examples_cores_per_worker': 2, + 'call_variants_workers': 8, + 'call_variants_cores_per_worker': 2, + 'gpu': True +} +# Common computational flag values across all BAM categories. +_RAM_PER_CORE = 4 +_MAKE_EXAMPLES_DISK_PER_WORKER = 200 +_CALL_VARIANTS_DISK_PER_WORKER = 50 +_POSTPROCESS_VARIANTS_DISK_GVCF = 200 + def _get_staging_examples_folder_to_write(pipeline_args, make_example_worker_index): @@ -313,22 +374,30 @@ def _is_valid_gcs_path(gcs_path): urllib.parse.urlparse(gcs_path).netloc != '') -def _gcs_object_exist(gcs_obj_path): - """Returns true if the given path is a valid object on GCS. +def _get_gcs_object_size(gcs_obj_path): + """Returns size of GCS object or 0, if object is missing or access is denied. Args: gcs_obj_path: (str) a path to an obj on GCS. """ try: - storage_client = storage.Client() bucket_name = _get_gcs_bucket(gcs_obj_path) obj_name = _get_gcs_relative_path(gcs_obj_path) - bucket = storage_client.bucket(bucket_name) - obj = bucket.blob(obj_name) - return obj.exists() - except google_exceptions.Forbidden as e: - logging.error('Missing GCS object: %s', str(e)) - return False + except ValueError as e: + logging.error('Invalid GCS path: %s', str(e)) + return 0 + + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + try: + blob = bucket.get_blob(obj_name) + except (google_exceptions.NotFound, google_exceptions.Forbidden) as e: + logging.error('Unable to access GCS bucket: %s', str(e)) + return 0 + + if blob is None: + return 0 + return blob.size def _can_write_to_bucket(bucket_name): @@ -386,6 +455,69 @@ def _meets_gcp_label_restrictions(label): label) is not None +def _get_bam_category(pipeline_args): + """Returns the category that input BAM files belongs to.""" + bam_size = _get_gcs_object_size(pipeline_args.bam) + if bam_size == 0: + logging.warning('Size of input bam file is 0.') + + is_wes = pipeline_args.model.find(_WES_STANDARD) != -1 + is_wgs = pipeline_args.model.find(_WGS_STANDARD) != -1 + + if is_wes: + if bam_size < _WES_LARGE_THR: + return BamCategories.WES_SMALL + else: + return BamCategories.WES_LARGE + + if is_wgs: + if bam_size < _WGS_SMALL_THR: + return BamCategories.WGS_SMALL + elif bam_size > _WGS_LARGE_THR: + return BamCategories.WGS_LARGE + else: + return BamCategories.WGS_MEDIUM + + +def _set_computational_flags_based_on_bam_size(pipeline_args): + """Automatically sets computational flags based on size of input BAM file.""" + bam_category = _get_bam_category(pipeline_args) + default_flags = _DEFAULT_FLAGS[bam_category] + + pipeline_args.shards = ( + default_flags['make_examples_workers'] * + default_flags['make_examples_cores_per_worker']) + pipeline_args.make_examples_workers = (default_flags['make_examples_workers']) + pipeline_args.make_examples_cores_per_worker = ( + default_flags['make_examples_cores_per_worker']) + pipeline_args.make_examples_ram_per_worker_gb = ( + default_flags['make_examples_cores_per_worker'] * _RAM_PER_CORE) + pipeline_args.make_examples_disk_per_worker_gb = ( + _MAKE_EXAMPLES_DISK_PER_WORKER) + if 'gpu' in default_flags: + pipeline_args.gpu = default_flags['gpu'] + pipeline_args.call_variants_workers = ( + default_flags['call_variants_workers']) + pipeline_args.call_variants_cores_per_worker = ( + default_flags['call_variants_cores_per_worker']) + pipeline_args.call_variants_ram_per_worker_gb = ( + default_flags['call_variants_cores_per_worker'] * _RAM_PER_CORE) + pipeline_args.call_variants_disk_per_worker_gb = ( + _CALL_VARIANTS_DISK_PER_WORKER) + elif 'tpu' in default_flags: + pipeline_args.tpu = default_flags['tpu'] + pipeline_args.gke_cluster_zone = pipeline_args.zones[0] + else: + raise ValueError('Either gpu or tpu is needed for default flag settings.') + # Following flags are independent of BAM file category. + pipeline_args.gcsfuse = True + pipeline_args.preemptible = True + pipeline_args.max_preemptible_tries = 2 + pipeline_args.max_non_preemptible_tries = 1 + if pipeline_args.gvcf_outfile: + pipeline_args.postprocess_variants_disk_gb = _POSTPROCESS_VARIANTS_DISK_GVCF + + def _run_make_examples(pipeline_args): """Runs the make_examples job.""" @@ -423,6 +555,8 @@ def get_extra_args(): extra_args.extend(['--sample_name', pipeline_args.sample_name]) if pipeline_args.hts_block_size: extra_args.extend(['--hts_block_size', str(pipeline_args.hts_block_size)]) + if pipeline_args.bam.endswith(_CRAM_FILE_SUFFIX): + extra_args.extend(['--use_ref_for_cram']) return extra_args command = _MAKE_EXAMPLES_COMMAND.format( @@ -675,6 +809,23 @@ def get_extra_args(): def _validate_and_complete_args(pipeline_args): """Validates pipeline arguments and fills some missing args (if any).""" + if pipeline_args.set_optimized_flags_based_on_bam_size: + # First validating all necessary flags are present. + if not (pipeline_args.docker_image and pipeline_args.docker_image_gpu): + raise ValueError('both --docker_image and --docker_image_gpu must be ' + 'provided with --set_optimized_flags_based_on_bam_size') + is_wes = pipeline_args.model.find(_WES_STANDARD) != -1 + is_wgs = pipeline_args.model.find(_WGS_STANDARD) != -1 + if not is_wes and not is_wgs: + raise ValueError('Unable to automatically set computational flags. Given ' + 'model is neither WGS nor WES: %s' % pipeline_args.model) + if is_wes and is_wgs: + raise ValueError('Unable to automatically set computational flags. Given ' + 'model matches both WGS & WES: %s' % pipeline_args.model) + if not pipeline_args.bam.endswith(_BAM_FILE_SUFFIX): + raise ValueError( + 'Only able to automatically set computational flags for BAM files.') + _set_computational_flags_based_on_bam_size(pipeline_args) # Basic validation logic. More detailed validation is done by pipelines API. if (pipeline_args.job_name_prefix and not _meets_gcp_label_restrictions(pipeline_args.job_name_prefix)): @@ -743,20 +894,21 @@ def _validate_and_complete_args(pipeline_args): pipeline_args.ref_gzi = pipeline_args.ref + _GZI_FILE_SUFFIX if not pipeline_args.bai: pipeline_args.bai = pipeline_args.bam + _BAI_FILE_SUFFIX - if not _gcs_object_exist(pipeline_args.bai): + if _get_gcs_object_size(pipeline_args.bai) == 0: pipeline_args.bai = pipeline_args.bam.replace(_BAM_FILE_SUFFIX, _BAI_FILE_SUFFIX) # Ensuring all input files exist... - if not _gcs_object_exist(pipeline_args.ref): + if _get_gcs_object_size(pipeline_args.ref) == 0: raise ValueError('Given reference file via --ref does not exist') - if not _gcs_object_exist(pipeline_args.ref_fai): + if _get_gcs_object_size(pipeline_args.ref_fai) == 0: raise ValueError('Given FAI index file via --ref_fai does not exist') - if (pipeline_args.ref_gzi and not _gcs_object_exist(pipeline_args.ref_gzi)): + if (pipeline_args.ref_gzi and + _get_gcs_object_size(pipeline_args.ref_gzi) == 0): raise ValueError('Given GZI index file via --ref_gzi does not exist') - if not _gcs_object_exist(pipeline_args.bam): + if _get_gcs_object_size(pipeline_args.bam) == 0: raise ValueError('Given BAM file via --bam does not exist') - if not _gcs_object_exist(pipeline_args.bai): + if _get_gcs_object_size(pipeline_args.bai) == 0: raise ValueError('Given BAM index file via --bai does not exist') # ...and we can write to output buckets. if not _can_write_to_bucket(_get_gcs_bucket(pipeline_args.staging)): @@ -896,6 +1048,15 @@ def run(argv=None): help=('Optional. If non-zero, specifies the time interval in seconds for ' 'writing workers log. Otherwise, log is written when the job is ' 'finished.')) + parser.add_argument( + '--set_optimized_flags_based_on_bam_size', + default=False, + action='store_true', + help=('Automatically sets the best values for computational flags, such ' + 'as number of workers, number of cores, amount of ram and disk per ' + 'worker for both make_examples and call_variants steps based on ' + 'the size of input BAM file. This flag also automatically decides ' + 'whether to use TPU or GPU for call_variants stage.')) # Optional GPU args. parser.add_argument( @@ -1000,12 +1161,12 @@ def run(argv=None): parser.add_argument( '--postprocess_variants_cores', type=int, - default=8, + default=4, help='Number of cores to use for postprocess_variants.') parser.add_argument( '--postprocess_variants_ram_gb', type=int, - default=30, + default=16, help='RAM (in GB) to use for postprocess_variants.') parser.add_argument( '--postprocess_variants_disk_gb', diff --git a/gcp_deepvariant_runner_test.py b/gcp_deepvariant_runner_test.py index a47ea7d..be24572 100644 --- a/gcp_deepvariant_runner_test.py +++ b/gcp_deepvariant_runner_test.py @@ -118,9 +118,14 @@ def setUp(self): 'gs://bucket/ref', ] + def _update_argv(self, bam_suffix, model_suffix, extend_auto_flags): + self._argv[self._argv.index('--bam') + 1] += bam_suffix + self._argv[self._argv.index('--model') + 1] += model_suffix + self._argv.extend(extend_auto_flags) + @mock.patch('gcp_deepvariant_runner._run_job') @mock.patch.object(multiprocessing, 'Pool') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') def testRunPipeline(self, mock_can_write_to_bucket, mock_obj_exist, mock_pool, mock_run_job): @@ -162,7 +167,7 @@ def testRunPipeline(self, mock_can_write_to_bucket, mock_obj_exist, mock_pool, @mock.patch('gcp_deepvariant_runner._run_job') @mock.patch.object(multiprocessing, 'Pool') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') def testRunPipeline_WithGVCFOutFile(self, mock_can_write_to_bucket, mock_obj_exist, mock_pool, mock_run_job): @@ -212,7 +217,7 @@ def testRunPipeline_WithGVCFOutFile(self, mock_can_write_to_bucket, 'gs://bucket/staging/logs/postprocess_variants') @mock.patch.object(multiprocessing, 'Pool') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') def testRunMakeExamples(self, mock_can_write_to_bucket, mock_obj_exist, mock_pool): @@ -308,7 +313,7 @@ def testRunMakeExamples(self, mock_can_write_to_bucket, mock_obj_exist, sorted(recieved_actions_list[i].items())) @mock.patch.object(multiprocessing, 'Pool') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') def testRunMakeExamples_WithGcsfuse(self, mock_can_write_to_bucket, mock_obj_exist, mock_pool): @@ -425,7 +430,7 @@ def testRunMakeExamples_WithGcsfuse(self, mock_can_write_to_bucket, sorted(recieved_actions_list[i].items())) @mock.patch.object(multiprocessing, 'Pool') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') def testRunCallVariants(self, mock_can_write_to_bucket, mock_obj_exist, mock_pool): @@ -467,7 +472,7 @@ def testRunCallVariants(self, mock_can_write_to_bucket, mock_obj_exist, self.assertEqual(mock_apply_async.call_count, 3) @mock.patch.object(multiprocessing, 'Pool') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') def testRunCallVariants_GPU(self, mock_can_write_to_bucket, mock_obj_exist, mock_pool): @@ -514,7 +519,7 @@ def testRunCallVariants_GPU(self, mock_can_write_to_bucket, mock_obj_exist, @mock.patch.object(gke_cluster.GkeCluster, '__init__', return_value=None) @mock.patch.object(gke_cluster.GkeCluster, 'deploy_pod') @mock.patch.object(gke_cluster.GkeCluster, '_cluster_exists') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') def testRunCallVariants_TPU(self, mock_can_write_to_bucket, mock_obj_exist, mock_cluster_exists, mock_deploy_pod, mock_init): @@ -570,7 +575,7 @@ def testRunFailCallVariants_TPU(self): gcp_deepvariant_runner.run(self._argv) @mock.patch('gcp_deepvariant_runner._run_job') - @mock.patch('gcp_deepvariant_runner._gcs_object_exist') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') def testRunPostProcessVariants(self, mock_can_write_to_bucket, mock_obj_exist, mock_run_job): @@ -631,6 +636,743 @@ def testRunFailsCannotWriteOutputBucket(self, mock_blob_exists): with self.assertRaises(ValueError): gcp_deepvariant_runner.run(self._argv) + def testRunFailsSetOptimizedFlagsMissingExpectedModel(self): + self._update_argv(gcp_deepvariant_runner._BAM_FILE_SUFFIX, '', + ['--set_optimized_flags_based_on_bam_size']) + with self.assertRaises(ValueError): + gcp_deepvariant_runner.run(self._argv) + + def testRunFailsSetOptimizedFlagsMissingBamFile(self): + self._update_argv('', gcp_deepvariant_runner._WES_STANDARD, + ['--set_optimized_flags_based_on_bam_size']) + with self.assertRaises(ValueError): + gcp_deepvariant_runner.run(self._argv) + + @mock.patch.object(multiprocessing, 'Pool') + @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') + def testRunSetOptimizedFlagsBasedOnBamSizeWesSmall_makeExamples( + self, mock_object_size, mock_can_write_to_bucket, mock_pool): + mock_apply_async = mock_pool.return_value.apply_async + mock_apply_async.return_value = None + mock_can_write_to_bucket.return_value = True + mock_object_size.return_value = 12 * 1024 * 1024 * 1024 - 1 + expected_workers = 8 + expected_cores = 2 + expected_shards = expected_workers * expected_cores + expected_ram = expected_cores * gcp_deepvariant_runner._RAM_PER_CORE * 1024 + self._update_argv(gcp_deepvariant_runner._BAM_FILE_SUFFIX, + gcp_deepvariant_runner._WES_STANDARD, + ['--jobs_to_run', + 'make_examples', + '--set_optimized_flags_based_on_bam_size', + '--docker_image_gpu', + 'gcr.io/dockerimage_gpu', + '--job_name_prefix', + 'auto_flags_']) + + temp_dir = tempfile.gettempdir() + before_temp_files = os.listdir(temp_dir) + gcp_deepvariant_runner.run(self._argv) + after_temp_files = os.listdir(tempfile.gettempdir()) + new_json_files = [os.path.join(temp_dir, item) for item in + after_temp_files if item not in before_temp_files] + new_json_files.sort() + self.assertEqual(len(new_json_files), expected_workers) + + expected_mock_calls = [] + for i in range(expected_workers): + expected_mock_calls.append( + mock.call(mock.ANY, [ + _HasAllOf('auto_flags_make_examples', 'gcr.io/dockerimage', + 'EXAMPLES=gs://bucket/staging/examples/0/*', + 'INPUT_REF=gs://bucket/ref', + 'INPUT_BAI=gs://bucket/bam.bam.bai', + 'custom-{}-{}'.format(expected_cores, expected_ram), + new_json_files[i]), + 'gs://bucket/staging/logs/make_examples/{}'.format(i) + ])) + mock_apply_async.assert_has_calls(expected_mock_calls,) + self.assertEqual(mock_apply_async.call_count, expected_workers) + # Verify json files contain correct actions_list. + make_examples_template = gcp_deepvariant_runner._MAKE_EXAMPLES_COMMAND.format( + NUM_SHARDS=expected_shards, EXTRA_ARGS='') + for worker_index in range(expected_workers): + with open(new_json_files[worker_index]) as json_file: + recieved_actions_list = json.load(json_file) + + expected_actions_list = [] + for core_index in range(expected_cores): + local_dir = gcp_deepvariant_runner._GCSFUSE_LOCAL_DIR_TEMPLATE.format( + SHARD_INDEX=worker_index * expected_cores + core_index) + gcsfuse_create_command = gcp_deepvariant_runner._GCSFUSE_CREATE_COMMAND_TEMPLATE.format( + BUCKET='bucket', LOCAL_DIR=local_dir) + expected_actions_list.append( + {'commands': + ['-c', gcsfuse_create_command], + 'entrypoint': '/bin/sh', + 'flags': ['RUN_IN_BACKGROUND', 'ENABLE_FUSE'], + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/cloud-genomics-pipelines/gcsfuse'}) + gcsfuse_verify_command = gcp_deepvariant_runner._GCSFUSE_VERIFY_COMMAND_TEMPLATE.format( + LOCAL_DIR=local_dir) + expected_actions_list.append( + {'commands': + ['-c', gcsfuse_verify_command], + 'entrypoint': '/bin/sh', + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/cloud-genomics-pipelines/gcsfuse'}) + make_examples_command = make_examples_template.format( + SHARD_START_INDEX=worker_index * expected_cores, + SHARD_END_INDEX=(worker_index + 1) * expected_cores - 1, + TASK_INDEX='{}', + INPUT_BAM='/mnt/google/input-gcsfused-{}/bam.bam') + expected_actions_list.append( + {'commands': + ['-c', make_examples_command], + 'entrypoint': 'bash', + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/dockerimage'}) + self.assertEqual(len(expected_actions_list), len(recieved_actions_list)) + for i in range(len(expected_actions_list)): + self.assertEqual(sorted(expected_actions_list[i].items()), + sorted(recieved_actions_list[i].items())) + + @mock.patch.object(multiprocessing, 'Pool') + @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') + def testRunSetOptimizedFlagsBasedOnBamSizeWesLarge_makeExamples( + self, mock_object_size, mock_can_write_to_bucket, mock_pool): + mock_apply_async = mock_pool.return_value.apply_async + mock_apply_async.return_value = None + mock_can_write_to_bucket.return_value = True + mock_object_size.return_value = 12 * 1024 * 1024 * 1024 + 1 + expected_workers = 8 + expected_cores = 2 + expected_shards = expected_workers * expected_cores + expected_ram = expected_cores * gcp_deepvariant_runner._RAM_PER_CORE * 1024 + self._update_argv(gcp_deepvariant_runner._BAM_FILE_SUFFIX, + gcp_deepvariant_runner._WES_STANDARD, + ['--jobs_to_run', + 'make_examples', + '--set_optimized_flags_based_on_bam_size', + '--docker_image_gpu', + 'gcr.io/dockerimage_gpu', + '--job_name_prefix', + 'auto_flags_']) + + temp_dir = tempfile.gettempdir() + before_temp_files = os.listdir(temp_dir) + gcp_deepvariant_runner.run(self._argv) + after_temp_files = os.listdir(tempfile.gettempdir()) + new_json_files = [os.path.join(temp_dir, item) for item in + after_temp_files if item not in before_temp_files] + new_json_files.sort() + self.assertEqual(len(new_json_files), expected_workers) + + expected_mock_calls = [] + for i in range(expected_workers): + expected_mock_calls.append( + mock.call(mock.ANY, [ + _HasAllOf('auto_flags_make_examples', 'gcr.io/dockerimage', + 'EXAMPLES=gs://bucket/staging/examples/{}/*'.format( + int(i / (expected_workers / 2))), # 2 call_variants + 'INPUT_REF=gs://bucket/ref', + 'INPUT_BAI=gs://bucket/bam.bam.bai', + 'custom-{}-{}'.format(expected_cores, expected_ram), + new_json_files[i]), + 'gs://bucket/staging/logs/make_examples/{}'.format(i) + ])) + mock_apply_async.assert_has_calls(expected_mock_calls,) + self.assertEqual(mock_apply_async.call_count, expected_workers) + # Verify json files contain correct actions_list. + make_examples_template = gcp_deepvariant_runner._MAKE_EXAMPLES_COMMAND.format( + NUM_SHARDS=expected_shards, EXTRA_ARGS='') + for worker_index in range(expected_workers): + with open(new_json_files[worker_index]) as json_file: + recieved_actions_list = json.load(json_file) + + expected_actions_list = [] + for core_index in range(expected_cores): + local_dir = gcp_deepvariant_runner._GCSFUSE_LOCAL_DIR_TEMPLATE.format( + SHARD_INDEX=worker_index * expected_cores + core_index) + gcsfuse_create_command = gcp_deepvariant_runner._GCSFUSE_CREATE_COMMAND_TEMPLATE.format( + BUCKET='bucket', LOCAL_DIR=local_dir) + expected_actions_list.append( + {'commands': + ['-c', gcsfuse_create_command], + 'entrypoint': '/bin/sh', + 'flags': ['RUN_IN_BACKGROUND', 'ENABLE_FUSE'], + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/cloud-genomics-pipelines/gcsfuse'}) + gcsfuse_verify_command = gcp_deepvariant_runner._GCSFUSE_VERIFY_COMMAND_TEMPLATE.format( + LOCAL_DIR=local_dir) + expected_actions_list.append( + {'commands': + ['-c', gcsfuse_verify_command], + 'entrypoint': '/bin/sh', + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/cloud-genomics-pipelines/gcsfuse'}) + make_examples_command = make_examples_template.format( + SHARD_START_INDEX=worker_index * expected_cores, + SHARD_END_INDEX=(worker_index + 1) * expected_cores - 1, + TASK_INDEX='{}', + INPUT_BAM='/mnt/google/input-gcsfused-{}/bam.bam') + expected_actions_list.append( + {'commands': + ['-c', make_examples_command], + 'entrypoint': 'bash', + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/dockerimage'}) + self.assertEqual(len(expected_actions_list), len(recieved_actions_list)) + for i in range(len(expected_actions_list)): + self.assertEqual(sorted(expected_actions_list[i].items()), + sorted(recieved_actions_list[i].items())) + + @mock.patch.object(multiprocessing, 'Pool') + @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') + def testRunSetOptimizedFlagsBasedOnBamSizeWgsSmall_makeExamples( + self, mock_object_size, mock_can_write_to_bucket, mock_pool): + mock_apply_async = mock_pool.return_value.apply_async + mock_apply_async.return_value = None + mock_can_write_to_bucket.return_value = True + mock_object_size.return_value = 25 * 1024 * 1024 * 1024 - 1 + expected_workers = 16 + expected_cores = 2 + expected_shards = expected_workers * expected_cores + expected_ram = expected_cores * gcp_deepvariant_runner._RAM_PER_CORE * 1024 + self._update_argv(gcp_deepvariant_runner._BAM_FILE_SUFFIX, + gcp_deepvariant_runner._WGS_STANDARD, + ['--jobs_to_run', + 'make_examples', + '--set_optimized_flags_based_on_bam_size', + '--docker_image_gpu', + 'gcr.io/dockerimage_gpu', + '--job_name_prefix', + 'auto_flags_']) + + temp_dir = tempfile.gettempdir() + before_temp_files = os.listdir(temp_dir) + gcp_deepvariant_runner.run(self._argv) + after_temp_files = os.listdir(tempfile.gettempdir()) + new_json_files = [os.path.join(temp_dir, item) for item in + after_temp_files if item not in before_temp_files] + new_json_files.sort() + self.assertEqual(len(new_json_files), expected_workers) + + expected_mock_calls = [] + for i in range(expected_workers): + expected_mock_calls.append( + mock.call(mock.ANY, [ + _HasAllOf('auto_flags_make_examples', 'gcr.io/dockerimage', + 'EXAMPLES=gs://bucket/staging/examples/{}/*'.format( + int(i / (expected_workers / 2))), # 2 call_variants + 'INPUT_REF=gs://bucket/ref', + 'INPUT_BAI=gs://bucket/bam.bam.bai', + 'custom-{}-{}'.format(expected_cores, expected_ram), + new_json_files[i]), + 'gs://bucket/staging/logs/make_examples/{}'.format(i) + ])) + mock_apply_async.assert_has_calls(expected_mock_calls,) + self.assertEqual(mock_apply_async.call_count, expected_workers) + # Verify json files contain correct actions_list. + make_examples_template = gcp_deepvariant_runner._MAKE_EXAMPLES_COMMAND.format( + NUM_SHARDS=expected_shards, EXTRA_ARGS='') + for worker_index in range(expected_workers): + with open(new_json_files[worker_index]) as json_file: + recieved_actions_list = json.load(json_file) + + expected_actions_list = [] + for core_index in range(expected_cores): + local_dir = gcp_deepvariant_runner._GCSFUSE_LOCAL_DIR_TEMPLATE.format( + SHARD_INDEX=worker_index * expected_cores + core_index) + gcsfuse_create_command = gcp_deepvariant_runner._GCSFUSE_CREATE_COMMAND_TEMPLATE.format( + BUCKET='bucket', LOCAL_DIR=local_dir) + expected_actions_list.append( + {'commands': + ['-c', gcsfuse_create_command], + 'entrypoint': '/bin/sh', + 'flags': ['RUN_IN_BACKGROUND', 'ENABLE_FUSE'], + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/cloud-genomics-pipelines/gcsfuse'}) + gcsfuse_verify_command = gcp_deepvariant_runner._GCSFUSE_VERIFY_COMMAND_TEMPLATE.format( + LOCAL_DIR=local_dir) + expected_actions_list.append( + {'commands': + ['-c', gcsfuse_verify_command], + 'entrypoint': '/bin/sh', + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/cloud-genomics-pipelines/gcsfuse'}) + make_examples_command = make_examples_template.format( + SHARD_START_INDEX=worker_index * expected_cores, + SHARD_END_INDEX=(worker_index + 1) * expected_cores - 1, + TASK_INDEX='{}', + INPUT_BAM='/mnt/google/input-gcsfused-{}/bam.bam') + expected_actions_list.append( + {'commands': + ['-c', make_examples_command], + 'entrypoint': 'bash', + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/dockerimage'}) + self.assertEqual(len(expected_actions_list), len(recieved_actions_list)) + for i in range(len(expected_actions_list)): + self.assertEqual(sorted(expected_actions_list[i].items()), + sorted(recieved_actions_list[i].items())) + + @mock.patch.object(multiprocessing, 'Pool') + @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') + def testRunSetOptimizedFlagsBasedOnBamSizeWgsMedium_makeExamples( + self, mock_object_size, mock_can_write_to_bucket, mock_pool): + mock_apply_async = mock_pool.return_value.apply_async + mock_apply_async.return_value = None + mock_can_write_to_bucket.return_value = True + mock_object_size.return_value = 25 * 1024 * 1024 * 1024 + 1 + expected_workers = 32 + expected_cores = 2 + expected_shards = expected_workers * expected_cores + expected_ram = expected_cores * gcp_deepvariant_runner._RAM_PER_CORE * 1024 + self._update_argv(gcp_deepvariant_runner._BAM_FILE_SUFFIX, + gcp_deepvariant_runner._WGS_STANDARD, + ['--jobs_to_run', + 'make_examples', + '--set_optimized_flags_based_on_bam_size', + '--docker_image_gpu', + 'gcr.io/dockerimage_gpu', + '--job_name_prefix', + 'auto_flags_']) + + temp_dir = tempfile.gettempdir() + before_temp_files = os.listdir(temp_dir) + gcp_deepvariant_runner.run(self._argv) + after_temp_files = os.listdir(tempfile.gettempdir()) + new_json_files = [os.path.join(temp_dir, item) for item in + after_temp_files if item not in before_temp_files] + new_json_files.sort() + self.assertEqual(len(new_json_files), expected_workers) + + expected_mock_calls = [] + for i in range(expected_workers): + expected_mock_calls.append( + mock.call(mock.ANY, [ + _HasAllOf('auto_flags_make_examples', 'gcr.io/dockerimage', + 'EXAMPLES=gs://bucket/staging/examples/{}/*'.format( + int(i / (expected_workers / 4))), # 4 call_variants + 'INPUT_REF=gs://bucket/ref', + 'INPUT_BAI=gs://bucket/bam.bam.bai', + 'custom-{}-{}'.format(expected_cores, expected_ram), + new_json_files[i]), + 'gs://bucket/staging/logs/make_examples/{}'.format(i) + ])) + mock_apply_async.assert_has_calls(expected_mock_calls,) + self.assertEqual(mock_apply_async.call_count, expected_workers) + # Verify json files contain correct actions_list. + make_examples_template = gcp_deepvariant_runner._MAKE_EXAMPLES_COMMAND.format( + NUM_SHARDS=expected_shards, EXTRA_ARGS='') + for worker_index in range(expected_workers): + with open(new_json_files[worker_index]) as json_file: + recieved_actions_list = json.load(json_file) + + expected_actions_list = [] + for core_index in range(expected_cores): + local_dir = gcp_deepvariant_runner._GCSFUSE_LOCAL_DIR_TEMPLATE.format( + SHARD_INDEX=worker_index * expected_cores + core_index) + gcsfuse_create_command = gcp_deepvariant_runner._GCSFUSE_CREATE_COMMAND_TEMPLATE.format( + BUCKET='bucket', LOCAL_DIR=local_dir) + expected_actions_list.append( + {'commands': + ['-c', gcsfuse_create_command], + 'entrypoint': '/bin/sh', + 'flags': ['RUN_IN_BACKGROUND', 'ENABLE_FUSE'], + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/cloud-genomics-pipelines/gcsfuse'}) + gcsfuse_verify_command = gcp_deepvariant_runner._GCSFUSE_VERIFY_COMMAND_TEMPLATE.format( + LOCAL_DIR=local_dir) + expected_actions_list.append( + {'commands': + ['-c', gcsfuse_verify_command], + 'entrypoint': '/bin/sh', + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/cloud-genomics-pipelines/gcsfuse'}) + make_examples_command = make_examples_template.format( + SHARD_START_INDEX=worker_index * expected_cores, + SHARD_END_INDEX=(worker_index + 1) * expected_cores - 1, + TASK_INDEX='{}', + INPUT_BAM='/mnt/google/input-gcsfused-{}/bam.bam') + expected_actions_list.append( + {'commands': + ['-c', make_examples_command], + 'entrypoint': 'bash', + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/dockerimage'}) + self.assertEqual(len(expected_actions_list), len(recieved_actions_list)) + for i in range(len(expected_actions_list)): + self.assertEqual(sorted(expected_actions_list[i].items()), + sorted(recieved_actions_list[i].items())) + + @mock.patch.object(multiprocessing, 'Pool') + @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') + def testRunSetOptimizedFlagsBasedOnBamSizeWgsLarge_makeExamples( + self, mock_object_size, mock_can_write_to_bucket, mock_pool): + mock_apply_async = mock_pool.return_value.apply_async + mock_apply_async.return_value = None + mock_can_write_to_bucket.return_value = True + mock_object_size.return_value = 200 * 1024 * 1024 * 1024 + 1 + expected_workers = 64 + expected_cores = 2 + expected_shards = expected_workers * expected_cores + expected_ram = expected_cores * gcp_deepvariant_runner._RAM_PER_CORE * 1024 + self._update_argv(gcp_deepvariant_runner._BAM_FILE_SUFFIX, + gcp_deepvariant_runner._WGS_STANDARD, + ['--jobs_to_run', + 'make_examples', + '--set_optimized_flags_based_on_bam_size', + '--docker_image_gpu', + 'gcr.io/dockerimage_gpu', + '--job_name_prefix', + 'auto_flags_']) + + temp_dir = tempfile.gettempdir() + before_temp_files = os.listdir(temp_dir) + gcp_deepvariant_runner.run(self._argv) + after_temp_files = os.listdir(tempfile.gettempdir()) + new_json_files = [os.path.join(temp_dir, item) for item in + after_temp_files if item not in before_temp_files] + new_json_files.sort() + self.assertEqual(len(new_json_files), expected_workers) + + expected_mock_calls = [] + for i in range(expected_workers): + expected_mock_calls.append( + mock.call(mock.ANY, [ + _HasAllOf('auto_flags_make_examples', 'gcr.io/dockerimage', + 'EXAMPLES=gs://bucket/staging/examples/{}/*'.format( + int(i / (expected_workers / 8))), # 8 call_variants + 'INPUT_REF=gs://bucket/ref', + 'INPUT_BAI=gs://bucket/bam.bam.bai', + 'custom-{}-{}'.format(expected_cores, expected_ram), + new_json_files[i]), + 'gs://bucket/staging/logs/make_examples/{}'.format(i) + ])) + mock_apply_async.assert_has_calls(expected_mock_calls,) + self.assertEqual(mock_apply_async.call_count, expected_workers) + # Verify json files contain correct actions_list. + make_examples_template = gcp_deepvariant_runner._MAKE_EXAMPLES_COMMAND.format( + NUM_SHARDS=expected_shards, EXTRA_ARGS='') + for worker_index in range(expected_workers): + with open(new_json_files[worker_index]) as json_file: + recieved_actions_list = json.load(json_file) + + expected_actions_list = [] + for core_index in range(expected_cores): + local_dir = gcp_deepvariant_runner._GCSFUSE_LOCAL_DIR_TEMPLATE.format( + SHARD_INDEX=worker_index * expected_cores + core_index) + gcsfuse_create_command = gcp_deepvariant_runner._GCSFUSE_CREATE_COMMAND_TEMPLATE.format( + BUCKET='bucket', LOCAL_DIR=local_dir) + expected_actions_list.append( + {'commands': + ['-c', gcsfuse_create_command], + 'entrypoint': '/bin/sh', + 'flags': ['RUN_IN_BACKGROUND', 'ENABLE_FUSE'], + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/cloud-genomics-pipelines/gcsfuse'}) + gcsfuse_verify_command = gcp_deepvariant_runner._GCSFUSE_VERIFY_COMMAND_TEMPLATE.format( + LOCAL_DIR=local_dir) + expected_actions_list.append( + {'commands': + ['-c', gcsfuse_verify_command], + 'entrypoint': '/bin/sh', + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/cloud-genomics-pipelines/gcsfuse'}) + make_examples_command = make_examples_template.format( + SHARD_START_INDEX=worker_index * expected_cores, + SHARD_END_INDEX=(worker_index + 1) * expected_cores - 1, + TASK_INDEX='{}', + INPUT_BAM='/mnt/google/input-gcsfused-{}/bam.bam') + expected_actions_list.append( + {'commands': + ['-c', make_examples_command], + 'entrypoint': 'bash', + 'mounts': [{'disk': 'google', 'path': '/mnt/google'}], + 'imageUri': 'gcr.io/dockerimage'}) + self.assertEqual(len(expected_actions_list), len(recieved_actions_list)) + for i in range(len(expected_actions_list)): + self.assertEqual(sorted(expected_actions_list[i].items()), + sorted(recieved_actions_list[i].items())) + + @mock.patch.object(multiprocessing, 'Pool') + @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') + def testRunSetOptimizedFlagsBasedOnBamSizeWesSmall_callVariants( + self, mock_object_size, mock_can_write_to_bucket, mock_pool): + mock_apply_async = mock_pool.return_value.apply_async + mock_apply_async.return_value = None + mock_can_write_to_bucket.return_value = True + + mock_object_size.return_value = 12 * 1024 * 1024 * 1024 - 1 + expected_workers = 1 + expected_cores = 2 + expected_shards = 16 # equals to make_exmples: num_wokers * num_cores + expected_ram = expected_cores * gcp_deepvariant_runner._RAM_PER_CORE * 1024 + self._update_argv(gcp_deepvariant_runner._BAM_FILE_SUFFIX, + gcp_deepvariant_runner._WES_STANDARD, + ['--jobs_to_run', + 'call_variants', + '--set_optimized_flags_based_on_bam_size', + '--docker_image_gpu', + 'gcr.io/dockerimage_gpu']) + expected_mock_calls = [] + for i in range(expected_workers): + expected_mock_calls.append( + mock.call(mock.ANY, [ + _HasAllOf( + 'call_variants', 'gcr.io/dockerimage_gpu', 'nvidia-tesla-k80', + 'SHARDS={}'.format(expected_shards), + 'CALL_VARIANTS_SHARDS={}'.format(expected_workers), + 'CALL_VARIANTS_SHARD_INDEX={}'.format(i), + 'EXAMPLES=gs://bucket/staging/examples/{}/*'.format(i), + 'CALLED_VARIANTS=gs://bucket/staging/called_variants/*', + '--machine-type', 'custom-{c}-{r}'.format( + c=expected_cores, r=expected_ram), '--disk-size', '50'), + 'gs://bucket/staging/logs/call_variants/{}'.format(i) + ])) + + gcp_deepvariant_runner.run(self._argv) + mock_apply_async.assert_has_calls(expected_mock_calls,) + self.assertEqual(mock_apply_async.call_count, expected_workers) + + @mock.patch.object(multiprocessing, 'Pool') + @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') + def testRunSetOptimizedFlagsBasedOnBamSizeWesLarge_callVariants( + self, mock_object_size, mock_can_write_to_bucket, mock_pool): + mock_apply_async = mock_pool.return_value.apply_async + mock_apply_async.return_value = None + mock_can_write_to_bucket.return_value = True + + mock_object_size.return_value = 12 * 1024 * 1024 * 1024 + 1 + expected_workers = 2 + expected_cores = 2 + expected_shards = 16 # equals to make_exmples: num_wokers * num_cores + expected_ram = expected_cores * gcp_deepvariant_runner._RAM_PER_CORE * 1024 + self._update_argv(gcp_deepvariant_runner._BAM_FILE_SUFFIX, + gcp_deepvariant_runner._WES_STANDARD, + ['--jobs_to_run', + 'call_variants', + '--set_optimized_flags_based_on_bam_size', + '--docker_image_gpu', + 'gcr.io/dockerimage_gpu']) + expected_mock_calls = [] + for i in range(expected_workers): + expected_mock_calls.append( + mock.call(mock.ANY, [ + _HasAllOf( + 'call_variants', 'gcr.io/dockerimage_gpu', 'nvidia-tesla-k80', + 'SHARDS={}'.format(expected_shards), + 'CALL_VARIANTS_SHARDS={}'.format(expected_workers), + 'CALL_VARIANTS_SHARD_INDEX={}'.format(i), + 'EXAMPLES=gs://bucket/staging/examples/{}/*'.format(i), + 'CALLED_VARIANTS=gs://bucket/staging/called_variants/*', + '--machine-type', 'custom-{c}-{r}'.format( + c=expected_cores, r=expected_ram), '--disk-size', '50'), + 'gs://bucket/staging/logs/call_variants/{}'.format(i) + ])) + + gcp_deepvariant_runner.run(self._argv) + mock_apply_async.assert_has_calls(expected_mock_calls,) + self.assertEqual(mock_apply_async.call_count, expected_workers) + + @mock.patch.object(multiprocessing, 'Pool') + @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') + def testRunSetOptimizedFlagsBasedOnBamSizeWgsSmall_callVariants( + self, mock_object_size, mock_can_write_to_bucket, mock_pool): + mock_apply_async = mock_pool.return_value.apply_async + mock_apply_async.return_value = None + mock_can_write_to_bucket.return_value = True + + mock_object_size.return_value = 25 * 1024 * 1024 * 1024 - 1 + expected_workers = 2 + expected_cores = 2 + expected_shards = 32 # equals to make_exmples: num_wokers * num_cores + expected_ram = expected_cores * gcp_deepvariant_runner._RAM_PER_CORE * 1024 + self._update_argv(gcp_deepvariant_runner._BAM_FILE_SUFFIX, + gcp_deepvariant_runner._WGS_STANDARD, + ['--jobs_to_run', + 'call_variants', + '--set_optimized_flags_based_on_bam_size', + '--docker_image_gpu', + 'gcr.io/dockerimage_gpu']) + expected_mock_calls = [] + for i in range(expected_workers): + expected_mock_calls.append( + mock.call(mock.ANY, [ + _HasAllOf( + 'call_variants', 'gcr.io/dockerimage_gpu', 'nvidia-tesla-k80', + 'SHARDS={}'.format(expected_shards), + 'CALL_VARIANTS_SHARDS={}'.format(expected_workers), + 'CALL_VARIANTS_SHARD_INDEX={}'.format(i), + 'EXAMPLES=gs://bucket/staging/examples/{}/*'.format(i), + 'CALLED_VARIANTS=gs://bucket/staging/called_variants/*', + '--machine-type', 'custom-{c}-{r}'.format( + c=expected_cores, r=expected_ram), '--disk-size', '50'), + 'gs://bucket/staging/logs/call_variants/{}'.format(i) + ])) + + gcp_deepvariant_runner.run(self._argv) + mock_apply_async.assert_has_calls(expected_mock_calls,) + self.assertEqual(mock_apply_async.call_count, expected_workers) + + @mock.patch.object(multiprocessing, 'Pool') + @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') + def testRunSetOptimizedFlagsBasedOnBamSizeWgsMedium_callVariants( + self, mock_object_size, mock_can_write_to_bucket, mock_pool): + mock_apply_async = mock_pool.return_value.apply_async + mock_apply_async.return_value = None + mock_can_write_to_bucket.return_value = True + + mock_object_size.return_value = 25 * 1024 * 1024 * 1024 + 1 + expected_workers = 4 + expected_cores = 2 + expected_shards = 64 # equals to make_exmples: num_wokers * num_cores + expected_ram = expected_cores * gcp_deepvariant_runner._RAM_PER_CORE * 1024 + self._update_argv(gcp_deepvariant_runner._BAM_FILE_SUFFIX, + gcp_deepvariant_runner._WGS_STANDARD, + ['--jobs_to_run', + 'call_variants', + '--set_optimized_flags_based_on_bam_size', + '--docker_image_gpu', + 'gcr.io/dockerimage_gpu']) + expected_mock_calls = [] + for i in range(expected_workers): + expected_mock_calls.append( + mock.call(mock.ANY, [ + _HasAllOf( + 'call_variants', 'gcr.io/dockerimage_gpu', 'nvidia-tesla-k80', + 'SHARDS={}'.format(expected_shards), + 'CALL_VARIANTS_SHARDS={}'.format(expected_workers), + 'CALL_VARIANTS_SHARD_INDEX={}'.format(i), + 'EXAMPLES=gs://bucket/staging/examples/{}/*'.format(i), + 'CALLED_VARIANTS=gs://bucket/staging/called_variants/*', + '--machine-type', 'custom-{c}-{r}'.format( + c=expected_cores, r=expected_ram), '--disk-size', '50'), + 'gs://bucket/staging/logs/call_variants/{}'.format(i) + ])) + + gcp_deepvariant_runner.run(self._argv) + mock_apply_async.assert_has_calls(expected_mock_calls,) + self.assertEqual(mock_apply_async.call_count, expected_workers) + + @mock.patch.object(multiprocessing, 'Pool') + @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') + def testRunSetOptimizedFlagsBasedOnBamSizeWgsLarge_callVariants( + self, mock_object_size, mock_can_write_to_bucket, mock_pool): + mock_apply_async = mock_pool.return_value.apply_async + mock_apply_async.return_value = None + mock_can_write_to_bucket.return_value = True + + mock_object_size.return_value = 200 * 1024 * 1024 * 1024 + 1 + expected_workers = 8 + expected_cores = 2 + expected_shards = 128 # equals to make_exmples: num_wokers * num_cores + expected_ram = expected_cores * gcp_deepvariant_runner._RAM_PER_CORE * 1024 + self._update_argv(gcp_deepvariant_runner._BAM_FILE_SUFFIX, + gcp_deepvariant_runner._WGS_STANDARD, + ['--jobs_to_run', + 'call_variants', + '--set_optimized_flags_based_on_bam_size', + '--docker_image_gpu', + 'gcr.io/dockerimage_gpu']) + expected_mock_calls = [] + for i in range(expected_workers): + expected_mock_calls.append( + mock.call(mock.ANY, [ + _HasAllOf( + 'call_variants', 'gcr.io/dockerimage_gpu', 'nvidia-tesla-k80', + 'SHARDS={}'.format(expected_shards), + 'CALL_VARIANTS_SHARDS={}'.format(expected_workers), + 'CALL_VARIANTS_SHARD_INDEX={}'.format(i), + 'EXAMPLES=gs://bucket/staging/examples/{}/*'.format(i), + 'CALLED_VARIANTS=gs://bucket/staging/called_variants/*', + '--machine-type', 'custom-{c}-{r}'.format( + c=expected_cores, r=expected_ram), '--disk-size', '50'), + 'gs://bucket/staging/logs/call_variants/{}'.format(i) + ])) + + gcp_deepvariant_runner.run(self._argv) + mock_apply_async.assert_has_calls(expected_mock_calls,) + self.assertEqual(mock_apply_async.call_count, expected_workers) + + @mock.patch('gcp_deepvariant_runner._run_job') + @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') + def testRunSetOptimizedFlagsBasedOnBamSizeWgsSmall_postProcessVariants( + self, mock_object_size, mock_can_write_to_bucket, mock_run_job): + mock_can_write_to_bucket.return_value = True + mock_object_size.return_value = 25 * 1024 * 1024 * 1024 - 1 + call_variant_workers = 2 + expected_shards = 32 # equals to make_exmples: num_wokers * num_cores + # Default flag values + expected_cores = 4 + expected_ram = 16 * 1024 + expected_gvcf_disk = gcp_deepvariant_runner._POSTPROCESS_VARIANTS_DISK_GVCF + self._update_argv(gcp_deepvariant_runner._BAM_FILE_SUFFIX, + gcp_deepvariant_runner._WGS_STANDARD, + ['--jobs_to_run', + 'postprocess_variants', + '--set_optimized_flags_based_on_bam_size', + '--docker_image_gpu', + 'gcr.io/dockerimage_gpu', + '--gvcf_outfile', + 'gvcf-folder-path']) + gcp_deepvariant_runner.run(self._argv) + mock_run_job.assert_called_once_with( + _HasAllOf( + 'postprocess_variants', 'gcr.io/dockerimage', + 'CALLED_VARIANTS=gs://bucket/staging/called_variants/*', + 'SHARDS={}'.format(expected_shards), + 'CALL_VARIANTS_SHARDS={}'.format(call_variant_workers), + '--machine-type', 'custom-{c}-{r}'.format( + c=expected_cores, + r=expected_ram), '--disk-size', '{}'.format(expected_gvcf_disk), + 'OUTFILE=gs://bucket/output.vcf', 'GVCF_OUTFILE=gvcf-folder-path'), + 'gs://bucket/staging/logs/postprocess_variants') + + @mock.patch('gcp_deepvariant_runner._run_job') + @mock.patch('gcp_deepvariant_runner._can_write_to_bucket') + @mock.patch('gcp_deepvariant_runner._get_gcs_object_size') + def testRunSetOptimizedFlagsBasedOnBamSizeWgsLarge_postProcessVariants( + self, mock_object_size, mock_can_write_to_bucket, mock_run_job): + mock_can_write_to_bucket.return_value = True + mock_object_size.return_value = 200 * 1024 * 1024 * 1024 + 1 + call_variant_workers = 8 + expected_shards = 128 # equals to make_exmples: num_wokers * num_cores + # Default flag values + expected_cores = 4 + expected_ram = 16 * 1024 + expected_disk = 30 + self._update_argv(gcp_deepvariant_runner._BAM_FILE_SUFFIX, + gcp_deepvariant_runner._WGS_STANDARD, + ['--jobs_to_run', + 'postprocess_variants', + '--set_optimized_flags_based_on_bam_size', + '--docker_image_gpu', + 'gcr.io/dockerimage_gpu']) + gcp_deepvariant_runner.run(self._argv) + mock_run_job.assert_called_once_with( + _HasAllOf('postprocess_variants', 'gcr.io/dockerimage', + 'CALLED_VARIANTS=gs://bucket/staging/called_variants/*', + 'SHARDS={}'.format(expected_shards), + 'CALL_VARIANTS_SHARDS={}'.format(call_variant_workers), + '--machine-type', 'custom-{c}-{r}'.format( + c=expected_cores, r=expected_ram), '--disk-size', + '{}'.format(expected_disk), 'OUTFILE=gs://bucket/output.vcf'), + 'gs://bucket/staging/logs/postprocess_variants') + class UtilsTest(unittest.TestCase):