Skip to content

Commit

Permalink
Revert "Avoid non-preemptible retries when '--preemptible' (#18)" (#24)
Browse files Browse the repository at this point in the history
This reverts commit 72d3f90.
Why: without non-preemptive retries, especially when we use many workers
for make_examples stage such as 64 or 128, there is a high chance one of the
workers get preempted more than --attempts. This will cause the whole
operation fail.

A more efficient recovery would be to have a final retry using
non-preemtpive worker to avoid wasting all work done by other workers.
  • Loading branch information
samanvp authored Jul 16, 2019
1 parent 72d3f90 commit b1cbbc9
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 88 deletions.
106 changes: 41 additions & 65 deletions gcp_deepvariant_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,17 @@ def _get_staging_called_variants_folder(pipeline_args):

def _get_base_job_args(pipeline_args):
"""Base arguments that are common among all jobs."""
pvm_attempts = 0
if pipeline_args.preemptible:
attempts_args = ['--attempts', '0',
'--pvm-attempts', str(pipeline_args.attempts)]
else:
attempts_args = ['--attempts', str(pipeline_args.attempts),
'--pvm-attempts', '0']

job_args = (['pipelines', '--project', pipeline_args.project, 'run'] +
attempts_args +
['--boot-disk-size', _DEFAULT_BOOT_DISK_SIZE_GB,
'--output-interval',
str(pipeline_args.logging_interval_sec) + 's', '--zones'] +
pipeline_args.zones)
pvm_attempts = pipeline_args.max_preemptible_tries

job_args = [
'pipelines', '--project', pipeline_args.project, 'run', '--attempts',
str(pipeline_args.max_non_preemptible_tries), '--pvm-attempts',
str(pvm_attempts), '--boot-disk-size', _DEFAULT_BOOT_DISK_SIZE_GB,
'--output-interval',
str(pipeline_args.logging_interval_sec) + 's', '--zones'
] + pipeline_args.zones
if pipeline_args.network:
job_args.extend(['--network', pipeline_args.network])
if pipeline_args.subnetwork:
Expand Down Expand Up @@ -515,10 +513,15 @@ def _deploy_call_variants_pod(pod_name, cluster, pipeline_args):
pipeline_args.preemptible else 'cloud-tpus.google.com/v2'),
BATCH_SIZE=pipeline_args.call_variants_batch_size)

if pipeline_args.preemptible:
num_tries = pipeline_args.max_preemptible_tries
else:
num_tries = pipeline_args.max_non_preemptible_tries

cluster.deploy_pod(
pod_config=pod_config,
pod_name=pod_name,
retries=pipeline_args.attempts - 1,
retries=num_tries - 1,
wait=True)


Expand Down Expand Up @@ -678,31 +681,10 @@ def _validate_and_complete_args(pipeline_args):
raise ValueError(
'--job_name_prefix must meet GCP label restrictions: {}'.format(
pipeline_args.job_name_prefix))
# TODO(#22): Modify this logic after deprecating the two flags.
if (pipeline_args.max_non_preemptible_tries > 0 or
pipeline_args.max_preemptible_tries > 0):
logging.warning('These two flags will be deprecated soon: '
'\n--max_non_preemptible_tries \n --max_preemptible_tries '
'\nPlease instead use --attempts and --preemptible flags.')
if pipeline_args.attempts > 0:
raise ValueError(
'--attempts cannot be used with --max_non_preemptible_tries or '
'--max_preemptible_tries flags. Please set those two to zero.')
if pipeline_args.preemptible:
if pipeline_args.max_preemptible_tries <= 0:
raise ValueError('--max_preemptible_tries must be greater than zero '
'when --preemptible is requested.')
else:
pipeline_args.attempts = pipeline_args.max_preemptible_tries
else:
if pipeline_args.max_non_preemptible_tries <= 0:
raise ValueError('--max_non_preemptible_tries must be greater zero. ')
else:
pipeline_args.attempts = pipeline_args.max_non_preemptible_tries
else:
if pipeline_args.attempts <= 0:
raise ValueError('--attempts must be greater than zero.')

if pipeline_args.preemptible and pipeline_args.max_preemptible_tries <= 0:
raise ValueError('--max_preemptible_tries must be greater than zero.')
if pipeline_args.max_non_preemptible_tries <= 0:
raise ValueError('--max_non_preemptible_tries must be greater than zero.')
if pipeline_args.make_examples_workers <= 0:
raise ValueError('--make_examples_workers must be greater than zero.')
if pipeline_args.call_variants_workers <= 0:
Expand Down Expand Up @@ -893,37 +875,16 @@ def run(argv=None):
help=('Optional space-separated list of regions to process. Elements can '
'be region literals (chr20:10-20) or Google Cloud Storage paths '
'to BED/BEDPE files.'))
parser.add_argument(
'--preemptible',
default=False,
action='store_true',
help=('Use preemptible VMs for the pipeline and all re-tries in case '
'--attempts is set to larger than 1.'))
parser.add_argument(
'--attempts',
type=int,
default=0,
help=('Maximum number of times to attempt running each worker (within a '
'job). Depending on --preemptible flag re-attempts will be using '
'regular (non-preemptible) VMs or preemptible VMs. Note regular '
'VMs may still crash unexpectedly, so it may be worth to retry on '
'transient failures.'))
# TODO(#22): deprecate this flag in next release in favour of --attempts.
parser.add_argument(
'--max_non_preemptible_tries',
type=int,
default=2,
help=('WARNING: This flag is deprecated and will be removed in a future '
'release. Use --attempts and --preemptible to control retry '
'behaviour.'))
# TODO(#22): deprecate this flag in next release in favour of --attempts.
parser.add_argument(
'--max_preemptible_tries',
type=int,
default=3,
help=('WARNING: This flag is deprecated and will be removed in a future '
'release. Use --attempts and --preemptible to control retry '
'behaviour.'))
help=('Maximum number of times to try running each worker (within a job) '
'with regular (non-preemptible) VMs. Regular VMs may still crash '
'unexpectedly, so it may be worth to retry on transient failures. '
'Note that if max_preemptible_tries is also specified, then '
'the pipeline would first be run with preemptible VMs, and then '
'with regular VMs following the value provided here.'))
parser.add_argument(
'--network', help=('Optional. The VPC network on GCP to use.'))
parser.add_argument(
Expand Down Expand Up @@ -971,6 +932,21 @@ def run(argv=None):
help=('GKE cluster zone used for searching an existing cluster or '
'creating a new one. This is relevant only if --tpu is set.'))

# Optional preemptible args.
parser.add_argument(
'--preemptible',
default=False,
action='store_true',
help=('Use preemptible VMs for the pipeline.'))
parser.add_argument(
'--max_preemptible_tries',
type=int,
default=3,
help=('Maximum number of times to try running each worker (within a job) '
'with preemptible VMs. Regular VMs will be used (for the '
'particular shards assigned to that worker) after this many '
'preemptions.'))

# Optional pipeline sharding and machine shapes.
parser.add_argument(
'--shards',
Expand Down
23 changes: 0 additions & 23 deletions gcp_deepvariant_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,6 @@ def setUp(self):
'gs://bucket/bam',
'--ref',
'gs://bucket/ref',
'--attempts',
'2',
'--max_preemptible_tries', '0', '--max_non_preemptible_tries', '0'
]

@mock.patch('gcp_deepvariant_runner._run_job')
Expand All @@ -143,7 +140,6 @@ def testRunPipeline(self, mock_can_write_to_bucket, mock_obj_exist, mock_pool,
'INPUT_REF=gs://bucket/ref',
'INPUT_REF_FAI=gs://bucket/ref.fai',
'EXAMPLES=gs://bucket/staging/examples/0/*',
'--attempts', '2', '--pvm-attempts', '0',
'--output-interval', '60s'),
'gs://bucket/staging/logs/make_examples/0'
]),
Expand All @@ -152,7 +148,6 @@ def testRunPipeline(self, mock_can_write_to_bucket, mock_obj_exist, mock_pool,
'MODEL=gs://bucket/model',
'EXAMPLES=gs://bucket/staging/examples/0/*',
'CALLED_VARIANTS=gs://bucket/staging/called_variants/*',
'--attempts', '2', '--pvm-attempts', '0',
'--output-interval', '60s'),
'gs://bucket/staging/logs/call_variants/0'
]),
Expand All @@ -162,7 +157,6 @@ def testRunPipeline(self, mock_can_write_to_bucket, mock_obj_exist, mock_pool,
mock_run_job.assert_called_once_with(
_HasAllOf('postprocess_variants', 'gcr.io/dockerimage',
'CALLED_VARIANTS=gs://bucket/staging/called_variants/*',
'--attempts', '2', '--pvm-attempts', '0',
'OUTFILE=gs://bucket/output.vcf', '--output-interval', '60s'),
'gs://bucket/staging/logs/postprocess_variants')

Expand Down Expand Up @@ -196,15 +190,13 @@ def testRunPipeline_WithGVCFOutFile(self, mock_can_write_to_bucket,
'INPUT_REF=gs://bucket/ref',
'INPUT_REF_FAI=gs://bucket/ref.fai',
'EXAMPLES=gs://bucket/staging/examples/0/*',
'--attempts', '2', '--pvm-attempts', '0',
'GVCF=gs://bucket/staging/gvcf/*'),
'gs://bucket/staging/logs/make_examples/0'
]),
mock.call(mock_run_job, [
_HasAllOf('call_variants', 'gcr.io/dockerimage',
'MODEL=gs://bucket/model',
'EXAMPLES=gs://bucket/staging/examples/0/*',
'--attempts', '2', '--pvm-attempts', '0',
'CALLED_VARIANTS=gs://bucket/staging/called_variants/*'),
'gs://bucket/staging/logs/call_variants/0'
]),
Expand All @@ -216,7 +208,6 @@ def testRunPipeline_WithGVCFOutFile(self, mock_can_write_to_bucket,
'CALLED_VARIANTS=gs://bucket/staging/called_variants/*',
'OUTFILE=gs://bucket/output.vcf',
'GVCF=gs://bucket/staging/gvcf/*',
'--attempts', '2', '--pvm-attempts', '0',
'GVCF_OUTFILE=gs://bucket/gvcf_output.vcf'),
'gs://bucket/staging/logs/postprocess_variants')

Expand Down Expand Up @@ -265,7 +256,6 @@ def testRunMakeExamples(self, mock_can_write_to_bucket, mock_obj_exist,
'INPUT_BAI=gs://bucket/bam.bai',
'INPUT_REGIONS_0=gs://bucket/region-1.bed',
'INPUT_REGIONS_1=gs://bucket/region-2.bed',
'--attempts', '2', '--pvm-attempts', '0',
new_json_files[0]),
'gs://bucket/staging/logs/make_examples/0'
]),
Expand All @@ -276,7 +266,6 @@ def testRunMakeExamples(self, mock_can_write_to_bucket, mock_obj_exist,
'INPUT_BAI=gs://bucket/bam.bai',
'INPUT_REGIONS_0=gs://bucket/region-1.bed',
'INPUT_REGIONS_1=gs://bucket/region-2.bed',
'--attempts', '2', '--pvm-attempts', '0',
new_json_files[1]),
'gs://bucket/staging/logs/make_examples/1'
]),
Expand All @@ -287,7 +276,6 @@ def testRunMakeExamples(self, mock_can_write_to_bucket, mock_obj_exist,
'INPUT_BAI=gs://bucket/bam.bai',
'INPUT_REGIONS_0=gs://bucket/region-1.bed',
'INPUT_REGIONS_1=gs://bucket/region-2.bed',
'--attempts', '2', '--pvm-attempts', '0',
new_json_files[2]),
'gs://bucket/staging/logs/make_examples/2'
]),
Expand Down Expand Up @@ -359,7 +347,6 @@ def testRunMakeExamples_WithGcsfuse(self, mock_can_write_to_bucket,
'EXAMPLES=gs://bucket/staging/examples/0/*',
'INPUT_REF=gs://bucket/ref',
'INPUT_BAI=gs://bucket/bam.bai',
'--attempts', '2', '--pvm-attempts', '0',
new_json_files[0]),
'gs://bucket/staging/logs/make_examples/0'
]),
Expand All @@ -368,7 +355,6 @@ def testRunMakeExamples_WithGcsfuse(self, mock_can_write_to_bucket,
'EXAMPLES=gs://bucket/staging/examples/0/*',
'INPUT_REF=gs://bucket/ref',
'INPUT_BAI=gs://bucket/bam.bai',
'--attempts', '2', '--pvm-attempts', '0',
new_json_files[1]),
'gs://bucket/staging/logs/make_examples/1'
]),
Expand All @@ -377,7 +363,6 @@ def testRunMakeExamples_WithGcsfuse(self, mock_can_write_to_bucket,
'EXAMPLES=gs://bucket/staging/examples/1/*',
'INPUT_REF=gs://bucket/ref',
'INPUT_BAI=gs://bucket/bam.bai',
'--attempts', '2', '--pvm-attempts', '0',
new_json_files[2]),
'gs://bucket/staging/logs/make_examples/2'
]),
Expand All @@ -386,7 +371,6 @@ def testRunMakeExamples_WithGcsfuse(self, mock_can_write_to_bucket,
'EXAMPLES=gs://bucket/staging/examples/1/*',
'INPUT_REF=gs://bucket/ref',
'INPUT_BAI=gs://bucket/bam.bai',
'--attempts', '2', '--pvm-attempts', '0',
new_json_files[3]),
'gs://bucket/staging/logs/make_examples/3'
]),
Expand Down Expand Up @@ -466,19 +450,16 @@ def testRunCallVariants(self, mock_can_write_to_bucket, mock_obj_exist,
mock_apply_async.assert_has_calls([
mock.call(mock.ANY, [
_HasAllOf('call_variants', 'gcr.io/dockerimage',
'--attempts', '2', '--pvm-attempts', '0',
'CALL_VARIANTS_SHARD_INDEX=0'),
'gs://bucket/staging/logs/call_variants/0'
]),
mock.call(mock.ANY, [
_HasAllOf('call_variants', 'gcr.io/dockerimage',
'--attempts', '2', '--pvm-attempts', '0',
'CALL_VARIANTS_SHARD_INDEX=1'),
'gs://bucket/staging/logs/call_variants/1'
]),
mock.call(mock.ANY, [
_HasAllOf('call_variants', 'gcr.io/dockerimage',
'--attempts', '2', '--pvm-attempts', '0',
'CALL_VARIANTS_SHARD_INDEX=2'),
'gs://bucket/staging/logs/call_variants/2'
]),
Expand Down Expand Up @@ -514,19 +495,16 @@ def testRunCallVariants_GPU(self, mock_can_write_to_bucket, mock_obj_exist,
mock_apply_async.assert_has_calls([
mock.call(mock.ANY, [
_HasAllOf('call_variants', 'gcr.io/dockerimage_gpu',
'--attempts', '2', '--pvm-attempts', '0',
'nvidia-tesla-k80', 'CALL_VARIANTS_SHARD_INDEX=0'),
'gs://bucket/staging/logs/call_variants/0'
]),
mock.call(mock.ANY, [
_HasAllOf('call_variants', 'gcr.io/dockerimage_gpu',
'--attempts', '2', '--pvm-attempts', '0',
'nvidia-tesla-k80', 'CALL_VARIANTS_SHARD_INDEX=1'),
'gs://bucket/staging/logs/call_variants/1'
]),
mock.call(mock.ANY, [
_HasAllOf('call_variants', 'gcr.io/dockerimage_gpu',
'--attempts', '2', '--pvm-attempts', '0',
'nvidia-tesla-k80', 'CALL_VARIANTS_SHARD_INDEX=2'),
'gs://bucket/staging/logs/call_variants/2'
]),
Expand Down Expand Up @@ -615,7 +593,6 @@ def testRunPostProcessVariants(self, mock_can_write_to_bucket, mock_obj_exist,
'CALLED_VARIANTS=gs://bucket/staging/called_variants/*',
'INPUT_REF=gs://bucket/ref', 'SHARDS=15',
'CALL_VARIANTS_SHARDS=1', 'INPUT_REF_FAI=gs://bucket/ref.fai',
'--attempts', '2', '--pvm-attempts', '0',
'OUTFILE=gs://bucket/output.vcf'),
'gs://bucket/staging/logs/postprocess_variants')

Expand Down

0 comments on commit b1cbbc9

Please sign in to comment.