Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic flags based on size of BAM file #11

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

samanvp
Copy link
Member

@samanvp samanvp commented Mar 22, 2019

In order to make execution of DeepVariant easier for new users, we introduced an automatic method for setting computational flags based on the size of input BAM file.

This is our best effort to reduce the cost of running DeepVariant. We ran extensive profiling experiments to find the optimal settings for both WGS and WES inputs.

'tpu': True
}
_DEFAULT_FLAGS[BamCategories.WGS_LARGE] = {
'make_examples_workers': 128,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Co[y pasting from the closed PR

"
nmousavi@
128 looks large to me. My concern is that one can go out of IP quota if it runs several DV at the same time. Maybe we should recommend a different soln for large batch of DV runs in our cloud page, where the different soln uses less workers but more cores. WDYT?

@samanvp
I agree this is not scalable for batch runs of DV. We should emphasize in our documentations this flag should only be used for single runs. For batch runs, even 32 workers does not scale for hundreds of input BAM files.

I believe this flag will be very helpful for first time users of DV. When a customer reaches a point where they feel comfortable to run DV for hundreds of BAM file, hopefully they have enough experience to set these flags optimally based on their specific needs. For example:

If cost is the main deciding factor for a customer, still the most cost efficient solution is to run DV using 128 workers, sequentially one BAM file at a time. This approach will take weeks to finish hundreds of samples.
For another customer, weeks of processing might not be acceptable, so they will be running multiple expensive runs (using fewer workers with more cores) in parallel to finish their batch job within acceptable time frame.
I really don't think we should/can make that decision for our customers.
"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the cost diff between 64 and 128 workers configs? If the cost gain is not much (say less than 5%), I think it's better to use less workers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a 550GB BAM file, by moving 64 -> 128 we reduce the processing time by more than 1.5 hours (5.83 -> 4.32) and also reduce the cost by more than $2 (8.32 -> 6.15). So I guess this worth it.

Even for a 91GB BAM file, by moving 64 -> 128 we can reduce the processing time (2.24 -> 1.25) without much affecting the cost ($1.65 -> $1.77).
So I believe for large BAM files we should keep it 128.

storage_client = storage.Client()
bucket_name = _get_gcs_bucket(gcs_obj_path)
obj_name = _get_gcs_relative_path(gcs_obj_path)
bucket = storage_client.get_bucket(bucket_name)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original comments

"
nmousavi@
Does this and the below throw exceptions? If yes either catch or declare in the comment.

@samanvp
Good point, I reuse the existing helper function.
"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you change it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added an if statement at the beginning of this method
if not _gcs_object_exist(gcs_obj_path):
This method will check for existence of the file and catches all the possible exceptions. If it returns True then we will not have any exception here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. This function is very similar to _gcs_object_exist. I would put shared pieces into _get_gcs_obj() and then call .size() and .exist() in these two methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got rid of redundant code by implementing _gcs_object_exist()simply by checking whether the size is != 0.
I think this is a better way to deal with all the complexity of exceptions raised by storage module and NoneType returned when blob is missing. Otherwise, we would have to repeat a check after each call of:

obj = _get_gcs_obj(...)
if obj is None:
  ... 
else:
  ...

Also I found a bug in _is_valid_gcs_path(), currently it return True for gs:/test-bucket/ (note missing '/') and this will cause a exception further down when it tries to parse the url.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not correct, we can have an obj of size 0 on GCS. In other words, size == 0 doesn't imply non-existence.

Also, _get_gcs_obj() should not return an null obj, this is design.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline and we decided to remove _gcs_object_exist() method and only rely on _get_gcs_object_size() for our flag values validation.

'provided with --set_optimized_flags_based_on_bam_size')
is_wes = pipeline_args.model.find(_WES_STANDARD) != -1
is_wgs = pipeline_args.model.find(_WGS_STANDARD) != -1
if is_wes == is_wgs:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original comments

"
nmousavi@
I don't think this check belongs to here. Maybe move it to where we validate flags.

Anyway, I think it should be as below

if not is_wes and not is_wgs:
....

It's possible to have both is_wes and is_wgs true, thought would be a bug.

@samanvp
Here we need is_wes XOR is_wgs meaning that exactly one of those two must be True.

And for placement, note that _set_computational_flags_based_on_bam_size is run before validating flags. This is a conscious decision since we should first run this piece of code to set all the flags automatically and then we validate flags; regardless if they were set manually by user or automatically.

Please let me know if you think this needs to change.
"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I was not clear on the first point. What I meant is that the two cases (not is_wes and not is_wgs) and (is_wes and is_wgs) are different, and you may want to show different error.

re the second point:
I am not sure why flag validation has to be done after setting flags set_computational_flags...()? Some flags have to be validated before setting computational flags, and that's why this two concerns are mixed here. Can we have all flags validated first, and then set computational flags? Or you think once our code set computational flags, we need to validate it? why? In that I am proposing, these two concerns (flag validation and setting computational flags) are not mixed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with separation of concerns, I moved those checks to _validate_and_complete_args(...).

Instead of XOR I also added two checks.

Copy link
Member Author

@samanvp samanvp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Nima.

storage_client = storage.Client()
bucket_name = _get_gcs_bucket(gcs_obj_path)
obj_name = _get_gcs_relative_path(gcs_obj_path)
bucket = storage_client.get_bucket(bucket_name)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got rid of redundant code by implementing _gcs_object_exist()simply by checking whether the size is != 0.
I think this is a better way to deal with all the complexity of exceptions raised by storage module and NoneType returned when blob is missing. Otherwise, we would have to repeat a check after each call of:

obj = _get_gcs_obj(...)
if obj is None:
  ... 
else:
  ...

Also I found a bug in _is_valid_gcs_path(), currently it return True for gs:/test-bucket/ (note missing '/') and this will cause a exception further down when it tries to parse the url.

def _gcs_object_exist(gcs_obj_path):
"""Returns true if the given path is a valid object on GCS.
def _get_gcs_object_size(gcs_obj_path):
"""Returns the size of the given GCS object.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a note that 0 is returned in the case of failure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

logging.error('Unable to access GCS bucket: %s', str(e))
return 0

blob = bucket.get_blob(obj_name)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this raise exception? When a null blob is returned?
We had it in try block, but now it's not guarded.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it does not raise exception. Earlier in _gcs_object_exist() we were using bucket.blob() which does not make an HTTP request; it simply instantiates a blob object. When we tried to access blob.exists() it will raise an exception if the blob is missing or access is denied.
Here we are using bucket.get_blob() which returns the blob object if it exists, otherwise it returns None.

Copy link

@nmousavi nmousavi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please run it on GCP manually before submitting this PR. LGLTM.

@@ -256,25 +314,34 @@ def _is_valid_gcs_path(gcs_path):
Args:
gcs_path: (str) a path to directory or an obj on GCS.
"""
return urlparse.urlparse(gcs_path).scheme == 'gs'
return (urlparse.urlparse(gcs_path).scheme == 'gs' and
urlparse.urlparse(gcs_path).netloc)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change (and other related parts of that commit) should either be a separate PR or you should squash your commits so the history is meaningful after.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in #12

Copy link
Member Author

@samanvp samanvp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In last commit I modified the code so that we will be using GPU for all sizes of BAM file for WGS analysis.
We will switch back to TPU for WGS medium and large later when our TPU offering is more reliable.

@@ -256,25 +314,34 @@ def _is_valid_gcs_path(gcs_path):
Args:
gcs_path: (str) a path to directory or an obj on GCS.
"""
return urlparse.urlparse(gcs_path).scheme == 'gs'
return (urlparse.urlparse(gcs_path).scheme == 'gs' and
urlparse.urlparse(gcs_path).netloc)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in #12

gcp_deepvariant_runner.run(self._argv)

def testRunFailsSetOptimizedFlagsMissingBamFile(self):
self._argv = [

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would very much like to see this duplication removed and replaced with a utility function that generates them and then we will only see the parts that are expected to differ for each test

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Copy link
Member Author

@samanvp samanvp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a new method _update_argv() and now in each unit_test we call it only to modify/extend self._argv as it's needed for each test.

gcp_deepvariant_runner.run(self._argv)

def testRunFailsSetOptimizedFlagsMissingBamFile(self):
self._argv = [
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

And also fix the unit_tests to reflect this change.
Also updating some of the computational flags to use resources more
efficiently.
instead of soon will be deprecated flags --max_non_preemptible_tries and --maxwq_preemptible_tries
Due to high preemption rate of single core machines, specially when the
zone we use to run the job is more busy, I decided to use 2-cores
workers. This might increase the cost a bit (comparing to finishing job
using single core machines) however we avoid cases the overal cost of
preemptive machines approaches the cost of non-preemptive machines.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants