Skip to content

Commit

Permalink
Add ALWAYS_CONTINUE (#169)
Browse files Browse the repository at this point in the history
* Update run.py

* Update config.py

* Update cp-worker.py

* add JOB_RETRIES, docs

---------

Co-authored-by: ErinWeisbart <[email protected]>
  • Loading branch information
bethac07 and ErinWeisbart authored Jul 15, 2024
1 parent f45fd58 commit 707b3cd
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 5 deletions.
4 changes: 4 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
SQS_QUEUE_NAME = APP_NAME + 'Queue'
SQS_MESSAGE_VISIBILITY = 1*60 # Timeout (secs) for messages in flight (average time to be processed)
SQS_DEAD_LETTER_QUEUE = 'user_DeadMessages'
JOB_RETRIES = 3 # Number of times to retry a job before sending it to DEAD_LETTER_QUEUE

# MONITORING
AUTO_MONITOR = 'True'
Expand All @@ -49,6 +50,9 @@
MIN_FILE_SIZE_BYTES = 1 #What is the minimal number of bytes an object should be to "count"?
NECESSARY_STRING = '' #Is there any string that should be in the file name to "count"?

# CELLPROFILER SETTINGS
ALWAYS_CONTINUE = 'False' # Whether or not to run CellProfiler with the --always-continue flag, which will keep CellProfiler from crashing if it errors

# PLUGINS
USE_PLUGINS = 'False' # True to use any plugin from CellProfiler-plugins repo
UPDATE_PLUGINS = 'False' # True to download updates from CellProfiler-plugins repo
Expand Down
7 changes: 7 additions & 0 deletions documentation/DCP-documentation/SQS_QUEUE_information.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ To confirm that multiple Dockers are never processing the same job, you can keep
Once you have run a pipeline once, you can check the execution time (either by noticing how long after you started your jobs that your first jobs begin to finish, or by checking the logs of individual jobs and noting the start and end time), you will then have an accurate idea of roughly how long that pipeline needs to execute, and can set your message visibility accordingly.
You can even do this on the fly while jobs are currently processing; the updated visibility time won’t affect the jobs already out for processing (i.e. if the time was set to 3 hours and you change it to 1 hour, the jobs already processing will remain hidden for 3 hours or until finished), but any job that begins processing AFTER the change will use the new visibility timeout setting.

## JOB_RETRIES

**JOB_RETRIES** is the number of times that a job will be retried before it is sent to the Dead Letter Queue.
The count goes up every time a message is "In Flight" and after the SQS_MESSAGE_VISIBILITY times out, if the count is too high the message will not be made "Available" but will instead go to your SQS_DEAD_LETTER_QUEUE.
We recommend setting this larger than 1 because stochastic job failures are possible (e.g. the EC2 machine running the job become unavailable mid-run).
Allowing large numbers of retries tends to waste compute as most failure modes are not stochastic.

## Example SQS Queue

[[images/Sample_SQS_Queue.png|alt="Sample_SQS_Queue"]]
Expand Down
4 changes: 2 additions & 2 deletions documentation/DCP-documentation/advanced_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ Alternate locations can be designated in the run script.
* **Log configuration and location of exported logs:** Distributed-CellProfiler creates log groups with a default retention of 60 days (to avoid hitting the AWS limit of 250) and after finishing the run exports them into your bucket with a prefix of 'exportedlogs/LOG_GROUP_NAME/'.
These may be modified in the run script.
* **Advanced EC2 configuration:** Any additional configuration of your EC2 spot fleet (such as installing additional packages or running scripts on startup) can be done by modifying the userData parameter in the run script.
* **SQS queue detailed configuration:** Distributed-CellProfiler creates a queue where messages will be tried 10 times before being consigned to a DeadLetterQueue, and unprocessed messages will expire after 14 days (the AWS maximum).
These values can be modified in run.py .
* **SQS queue detailed configuration:** Distributed-CellProfiler creates a queue where unprocessed messages will expire after 14 days (the AWS maximum).
This value can be modified in run.py .

***

Expand Down
2 changes: 2 additions & 0 deletions documentation/DCP-documentation/config_examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,15 @@ Our internal configurations for each pipeline are as follows:
| SQS_QUEUE_NAME | APP_NAME + 'Queue' | APP_NAME + 'Queue' | APP_NAME + 'Queue' | APP_NAME + 'Queue' | APP_NAME + 'Queue' | We never change this. |
| SQS_MESSAGE_VISIBILITY | 3*60 | 240*60 | 15*60 | 10*60 | 120*60 | About how long you expect a job to take * 1.5 in seconds |
| SQS_DEAD_LETTER_QUEUE | 'YOURNAME_DEADMESSAGES' | 'YOURNAME_DEADMESSAGES' | 'YOURNAME_DEADMESSAGES' | 'YOURNAME_DEADMESSAGES' |'YOURNAME_DEADMESSAGES' | |
| JOB_RETRIES | 3 | 3 | 3 | 3 | 3 | |
| AUTO_MONITOR | 'True' | 'True' | 'True' | 'True' | 'True' | Can be turned off if manually running Monitor. |
| CREATE_DASHBOARD | 'True' | 'True' | 'True' | 'True' | 'True' | |
| CLEAN_DASHBOARD | 'True' | 'True' | 'True' | 'True' | 'True' | |
| CHECK_IF_DONE_BOOL | 'True' | 'True' | 'True' | 'True' | 'True' | Can be turned off if wanting to overwrite old data. |
| EXPECTED_NUMBER_FILES | 1 (an image) | number channels + 1 (an .npy for each channel and isdone) | 3 (Experiment.csv, Image.csv, and isdone) | 1 (an image) | 5 (Experiment, Image, Cells, Nuclei, and Cytoplasm .csvs) | Better to underestimate than overestimate. |
| MIN_FILE_SIZE_BYTES | 1 | 1 | 1 | 1 | 1 | Count files of any size. |
| NECESSARY_STRING | '' | '' | '' | '' | '' | Not necessary for standard workflows. |
| ALWAYS_CONTINUE | 'False' | 'False' | 'False' | 'False' | 'False' | Use with caution. |
| USE_PLUGINS | 'False' | 'False' | 'False' | 'False' | 'False' | Not necessary for standard workflows. |
| UPDATE_PLUGINS | 'False' | 'False' | 'False' | 'False' | 'False' | Not necessary for standard workflows. |
| PLUGINS_COMMIT | '' | '' | '' | '' | '' | Not necessary for standard workflows. |
Expand Down
10 changes: 10 additions & 0 deletions documentation/DCP-documentation/step_1_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ We recommend setting this to slightly longer than the average amount of time it
* **SQS_DEAD_LETTER_QUEUE:** The name of the queue to send jobs to if they fail to process correctly multiple times; this keeps a single bad job (such as one where a single file has been corrupted) from keeping your cluster active indefinitely.
This queue will be automatically made if it doesn't exist already.
See [Step 0: Prep](step_0_prep.med) for more information.
* **JOB_RETRIES:** This is the number of times that a job will be retried before it is sent to the Dead Letter Queue.

***

Expand Down Expand Up @@ -109,6 +110,15 @@ Useful when trying to detect jobs that may have exported smaller corrupted files

***

### CELLPROFILER SETTINGS
* **ALWAYS CONTINUE:** Whether or not to run CellProfiler with the --always-continue flag, which will keep CellProfiler from crashing if it errors.
Use with caution.
This can be particularly helpful in jobs where a large number of files are loaded in a single run (such as during illumination correction) so that a corrupted or missing file doesn't prevent the whole job completing.
However, this can make it harder to notice jobs that are not completely succesffully so should be used with caution.
We suggest using this setting in conjunction with a small number of JOB_RETRIES.

***

### PLUGINS
* **USE_PLUGINS:** Whether or not you will be using external plugins from the CellProfiler-plugins repository.
* **UPDATE_PLUGINS:** Whether or not to update the plugins repository before use.
Expand Down
7 changes: 4 additions & 3 deletions run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
CREATE_DASHBOARD = 'False'
CLEAN_DASHBOARD = 'False'
AUTO_MONITOR = 'False'
ALWAYS_CONTINUE = 'False'
JOB_RETRIES = 10

from config import *

Expand Down Expand Up @@ -125,6 +127,7 @@ def generate_task_definition(AWS_PROFILE):
{"name": "USE_PLUGINS", "value": str(USE_PLUGINS)},
{"name": "NECESSARY_STRING", "value": NECESSARY_STRING},
{"name": "DOWNLOAD_FILES", "value": DOWNLOAD_FILES},
{"name": "ALWAYS_CONTINUE", "value": ALWAYS_CONTINUE},
]
if SOURCE_BUCKET.lower()!='false':
task_definition['containerDefinitions'][0]['environment'] += [
Expand Down Expand Up @@ -219,9 +222,7 @@ def get_or_create_queue(sqs):
"MaximumMessageSize": "262144",
"MessageRetentionPeriod": "1209600",
"ReceiveMessageWaitTimeSeconds": "0",
"RedrivePolicy": '{"deadLetterTargetArn":"'
+ dead_arn
+ '","maxReceiveCount":"10"}',
"RedrivePolicy": f'{{"deadLetterTargetArn":"{dead_arn}","maxReceiveCount":"{str(JOB_RETRIES)}"}}',
"VisibilityTimeout": str(SQS_MESSAGE_VISIBILITY),
}
sqs.create_queue(QueueName=SQS_QUEUE_NAME, Attributes=SQS_DEFINITION)
Expand Down
6 changes: 6 additions & 0 deletions worker/cp-worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
DOWNLOAD_FILES = 'False'
else:
DOWNLOAD_FILES = os.environ['DOWNLOAD_FILES']
if 'ALWAYS_CONTINUE' not in os.environ:
ALWAYS_CONTINUE = False
else:
ALWAYS_CONTINUE = os.environ['ALWAYS_CONTINUE']

localIn = '/home/ubuntu/local_input'

Expand Down Expand Up @@ -276,6 +280,8 @@ def runCellProfiler(message):
printandlog("Didn't recognize input file",logger)
if USE_PLUGINS.lower() == 'true':
cmd += f' --plugins-directory={PLUGIN_DIR}'
if ALWAYS_CONTINUE.lower() == 'true':
cmd +=' --always-continue'
print(f'Running {cmd}')
logger.info(cmd)

Expand Down

0 comments on commit 707b3cd

Please sign in to comment.