-
Notifications
You must be signed in to change notification settings - Fork 552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Scheduler/default scheduled #4871
base: develop
Are you sure you want to change the base?
Conversation
WalkthroughThe changes in this pull request focus on enhancing the command-line interface (CLI) of the FiftyOne library and improving the handling of delegated operations. Key updates include the introduction of new command classes for managing delegated operations, modifications to existing methods for better error handling and argument parsing, and the addition of utility functions for service context checks. The changes also involve renaming operations from "queued" to "scheduled" to align with the updated operational semantics, affecting both the CLI functionality and unit tests. Changes
Possibly related PRs
Suggested labels
Suggested reviewers
📜 Recent review detailsConfiguration used: .coderabbit.yaml 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
🧹 Outside diff range and nitpick comments (4)
fiftyone/operators/delegated.py (2)
Line range hint
330-339
: LGTM! Consider clarifying the docstring.The method rename from
execute_queued_operations
toexecute_scheduled_operations
aligns well with the PR objectives. The docstring has been appropriately updated to reflect this change.Consider updating the docstring to explicitly mention that this method executes operations in the "Scheduled" state, for clarity:
- """Executes scheduled delegated operations matching the given criteria. + """Executes delegated operations in the "Scheduled" state matching the given criteria.Also applies to: 339-358
367-368
: LGTM! Consider adding a comment for clarity.The loop update to iterate over
scheduled_ops
is consistent with the earlier changes. The functionality remains unchanged, correctly executing each scheduled operation.Consider adding a brief comment before the loop to enhance readability:
+ # Execute each scheduled operation for op in scheduled_ops: self.execute_operation(operation=op, log=log)
fiftyone/core/cli.py (1)
Line range hint
3103-3112
: Consider handling unexpected exceptionsCurrently, the
try
block only catchesKeyboardInterrupt
exceptions. If any other exception occurs within thetry
block, it will not be caught and could cause the program to terminate unexpectedly without any logging or cleanup. To enhance robustness, consider adding a general exception handler to catch unexpected exceptions and log them appropriately.Apply this diff to add a general exception handler:
try: dos = food.DelegatedOperationService() print(_WELCOME_MESSAGE.format(foc.VERSION)) print("Delegated operation service running") print("\nTo exit, press ctrl + c") while True: dos.execute_scheduled_operations(limit=1, log=True) time.sleep(0.5) except KeyboardInterrupt: pass +except Exception as e: + print(f"An unexpected error occurred: {e}") + # Optionally, perform additional cleanup or logging heretests/unittests/delegated_operators_tests.py (1)
813-819
: Correct the assertion order to maintain consistencyIn the assertion
self.assertEqual(docs[0].id, scheduled[0].id)
, ensure that the expected value is the second argument for clarity in test output.Apply this diff:
-self.assertEqual(docs[0].id, scheduled[0].id) +self.assertEqual(scheduled[0].id, docs[0].id)
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (6)
- fiftyone/core/cli.py (1 hunks)
- fiftyone/factory/repos/delegated_operation_doc.py (2 hunks)
- fiftyone/internal/init.py (1 hunks)
- fiftyone/internal/util.py (1 hunks)
- fiftyone/operators/delegated.py (3 hunks)
- tests/unittests/delegated_operators_tests.py (26 hunks)
🧰 Additional context used
🪛 Ruff
fiftyone/internal/__init__.py
10-10:
.util.is_remote_service
imported but unused; consider removing, adding to__all__
, or using a redundant alias(F401)
tests/unittests/delegated_operators_tests.py
727-727: Do not use bare
except
(E722)
822-822: f-string without any placeholders
Remove extraneous
f
prefix(F541)
🔇 Additional comments (14)
fiftyone/internal/util.py (5)
1-7
: LGTM: File structure and documentation are well-organized.The file structure follows Python best practices with a clear module docstring, copyright notice, and proper spacing between functions.
10-16
: LGTM:is_remote_service()
function is well-implemented.The function correctly determines if the SDK is running in a remote service context by checking for both encryption and API keys. This aligns with the PR objective of modifying the default state for delegated operations.
19-25
: Verify the implementation ofhas_encryption_key()
.The current implementation always returns
False
. Is this intended as a placeholder? If so, consider adding a TODO comment to remind updating this function with the actual implementation.If this is not a placeholder, please explain the rationale behind always returning
False
.
28-34
: Verify the implementation ofhas_api_key()
.Similar to
has_encryption_key()
, this function always returnsFalse
. Is this intended as a placeholder? If so, consider adding a TODO comment to remind updating this function with the actual implementation.If this is not a placeholder, please explain the rationale behind always returning
False
.
10-34
:⚠️ Potential issuePotential issue: Current implementations may not meet PR objectives.
The current implementations of
has_encryption_key()
andhas_api_key()
always returnFalse
, which meansis_remote_service()
will always returnFalse
. This seems to contradict the PR objective of modifying the default state for delegated operations when executed locally vs. in a service environment.Please verify if this is the intended behavior or if these functions need to be updated to correctly differentiate between local and remote service contexts.
fiftyone/operators/delegated.py (1)
358-362
: LGTM! Consistent changes for scheduled operations.The variable rename from
queued_ops
toscheduled_ops
and the update of therun_state
parameter toExecutionRunState.SCHEDULED
are consistent with the method rename and PR objectives. These changes ensure that only operations in the "Scheduled" state are retrieved for execution.Also applies to: 362-367
fiftyone/factory/repos/delegated_operation_doc.py (4)
11-11
: Import of 'is_remote_service' function is appropriate and necessary.The import statement correctly brings in the
is_remote_service
function needed for the new conditional logic.
40-42
: Logic forrun_state
initialization is correct.The updated logic correctly sets
run_state
toSCHEDULED
when running locally and toQUEUED
when running remotely, aligning with the intended behavior.
44-44
: Initialization ofqueued_at
is appropriate.Setting
queued_at
to the current UTC time when running remotely and toNone
when running locally is consistent with the desired operation state tracking.
52-54
: Initialization ofscheduled_at
is appropriate.Assigning
scheduled_at
to the current UTC time when running locally andNone
when running remotely correctly reflects the scheduling state based on the execution context.fiftyone/core/cli.py (1)
3105-3105
: Change toexecute_scheduled_operations
is appropriateThe replacement of
dos.execute_queued_operations()
withdos.execute_scheduled_operations()
aligns with the updated processing of delegated operations as per the PR objectives.tests/unittests/delegated_operators_tests.py (3)
208-210
: LGTM: Assertions correctly updated to 'scheduled' stateThe test assertions have been appropriately updated to reflect the change to
ExecutionRunState.SCHEDULED
anddoc.scheduled_at
.
223-225
: LGTM: Second document assertions are consistentAssertions for
doc2
correctly checkscheduled_at
,run_state
, and metadata, ensuring consistency with the updated terminology.
480-482
: LGTM: Execution of scheduled operations is correctThe test correctly executes the scheduled operations using
execute_scheduled_operations
and verifies the completion state.
if is_remote_service() | ||
else ExecutionRunState.SCHEDULED | ||
) # if running locally use SCHEDULED otherwise QUEUED | ||
self.run_link = None | ||
self.queued_at = datetime.utcnow() | ||
self.queued_at = datetime.utcnow() if is_remote_service() else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider caching the result of is_remote_service()
to improve efficiency and readability.
The function is_remote_service()
is called multiple times within the __init__
method. If it's an expensive operation or for better readability, consider calling it once and storing the result in a local variable.
Apply this diff to modify the code:
+ is_remote = is_remote_service()
self.run_state = (
ExecutionRunState.QUEUED
- if is_remote_service()
+ if is_remote
else ExecutionRunState.SCHEDULED
) # if running locally use SCHEDULED otherwise QUEUED
self.run_link = None
- self.queued_at = datetime.utcnow() if is_remote_service() else None
+ self.queued_at = datetime.utcnow() if is_remote else None
self.scheduled_at = (
- datetime.utcnow() if not is_remote_service() else None
+ datetime.utcnow() if not is_remote else None
)
Add the following line at the beginning of the __init__
method:
is_remote = is_remote_service()
Also applies to: 52-54
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we will want to inject these values in, instead of determining them inside the __init__()
function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@swheaton, understood. Injecting these values into the __init__()
function makes sense and improves flexibility. Let me know if you need any assistance with refactoring.
🐰🤔
✏️ Learnings added
Learnt from: swheaton
PR: voxel51/fiftyone#4871
File: fiftyone/factory/repos/delegated_operation_doc.py:40-44
Timestamp: 2024-10-02T20:55:27.055Z
Learning: When initializing classes like `DelegatedOperationDocument`, prefer to inject values as parameters into the `__init__()` method, rather than determining them inside the method.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
) # ensure that the scheduled_at times are different | ||
self.docs_to_delete.append(doc) | ||
if i == 0: | ||
queued.append(doc) | ||
scheduled.append(doc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid using time.sleep
in tests for timestamp differentiation
Using time.sleep(0.01)
to ensure unique timestamps can slow down the test suite and is unreliable. Consider mocking the timestamp or using a counter to generate unique scheduled_at
times for testing purposes.
Example using unittest.mock
to patch the datetime
module:
with mock.patch('fiftyone.operators.delegated.datetime') as mock_datetime:
mock_datetime.utcnow.return_value = fixed_time
# proceed with test code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a bad suggestion actually
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@swheaton, glad you find the suggestion helpful!
(^‿^)/
@@ -289,31 +289,30 @@ | |||
self.docs_to_delete.append(doc) | |||
|
|||
queued = self.svc.get_queued_operations() | |||
self.assertEqual(len(queued), 20 + initial_queued) | |||
self.assertEqual(len(queued), initial_queued) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove or update 'queued' variable in assertions
The variable queued
is no longer defined due to the removal of initial_queued
. This may cause a NameError
. Ensure that any references to queued
are removed or updated to reflect the new terminology.
Apply this diff to remove the assertion:
- self.assertEqual(len(queued), initial_queued)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
self.assertEqual(len(queued), initial_queued) |
except: | ||
pytest.fail( | ||
"Should not fail when executing queued operation with renamed dataset" | ||
"Should not fail when executing scheduled operation with renamed dataset" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Specify exception type instead of using bare except
Using a bare except:
can catch unexpected exceptions, including system-exiting ones. It's best practice to catch specific exceptions to prevent masking errors.
Apply this diff to catch generic exceptions:
-except:
+except Exception:
Alternatively, catch the specific exception you expect:
-except:
+except SomeSpecificException:
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
except: | |
pytest.fail( | |
"Should not fail when executing queued operation with renamed dataset" | |
"Should not fail when executing scheduled operation with renamed dataset" | |
except Exception: | |
pytest.fail( | |
"Should not fail when executing scheduled operation with renamed dataset" |
🧰 Tools
🪛 Ruff
727-727: Do not use bare
except
(E722)
# get all the existing counts of scheduled operations | ||
initial_queued = len(self.svc.get_queued_operations()) | ||
initial_running = len(self.svc.get_running_operations()) | ||
initial_scheduled = len(self.svc.get_scheduled_operations()) | ||
initial_dataset_queued = len( | ||
self.svc.get_queued_operations(dataset_name=dataset_name) | ||
initial_dataset_scheduled = len( | ||
self.svc.get_scheduled_operations(dataset_name=dataset_name) | ||
) | ||
initial_operator_queued = len( | ||
self.svc.get_queued_operations(operator=operator) | ||
initial_operator_scheduled = len( | ||
self.svc.get_scheduled_operations(operator=operator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove redundant 'initial_queued' variable and update method call
Since the method get_queued_operations
has been deprecated in favor of get_scheduled_operations
, the variable initial_queued
is no longer needed. Additionally, ensure all method calls use the updated terminology.
Apply this diff to remove the redundant variable:
- initial_queued = len(self.svc.get_queued_operations())
initial_running = len(self.svc.get_running_operations())
initial_scheduled = len(self.svc.get_scheduled_operations())
Update the assertion accordingly:
- self.assertEqual(len(queued), initial_queued)
Committable suggestion was skipped due to low confidence.
) # default to queued state on create | ||
if is_remote_service() | ||
else ExecutionRunState.SCHEDULED | ||
) # if running locally use SCHEDULED otherwise QUEUED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is backwards. queued for local run, scheduled for remote service.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh yup my bad for some reason got it swapped again thinking about how we had it before
fiftyone/operators/delegated.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file should not change. We will be executing queued operations as always.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
fiftyone/factory/repos/delegated_operation.py (1)
Line range hint
197-252
: PotentialTypeError
Whenupdate
isNone
inupdate_run_state
MethodIn the
update_run_state
method, ifoutputs_schema
is present andupdate
is stillNone
(e.g., whenrun_state
isRUNNING
orSCHEDULED
), attempting to executeupdate["$set"]["metadata.outputs_schema"] = {...}
will raise aTypeError
becauseupdate
isNone
. To prevent this error, ensure thatupdate
is initialized before it's used or adjust the logic accordingly.Apply this diff to fix the issue:
def update_run_state( self, _id: ObjectId, run_state: ExecutionRunState, result: ExecutionResult = None, run_link: str = None, progress: ExecutionProgress = None, ) -> DelegatedOperationDocument: - update = None + update = {} execution_result = result if result is not None and not isinstance(result, ExecutionResult): execution_result = ExecutionResult(result=result) execution_result_json = ( execution_result.to_json() if execution_result else None ) outputs_schema = ( execution_result_json.pop("outputs_schema", None) if execution_result_json else None ) if run_state == ExecutionRunState.COMPLETED: update = { "$set": { "run_state": run_state, "completed_at": datetime.utcnow(), "updated_at": datetime.utcnow(), "result": execution_result_json, } } elif run_state == ExecutionRunState.FAILED: update = { "$set": { "run_state": run_state, "failed_at": datetime.utcnow(), "updated_at": datetime.utcnow(), "result": execution_result_json, } } elif run_state == ExecutionRunState.RUNNING: update = { "$set": { "run_state": run_state, "started_at": datetime.utcnow(), "updated_at": datetime.utcnow(), } } elif run_state == ExecutionRunState.SCHEDULED: update = { "$set": { "run_state": run_state, "scheduled_at": datetime.utcnow(), "updated_at": datetime.utcnow(), } } else: raise ValueError("Invalid run_state: {}".format(run_state)) + if outputs_schema: + update["$set"]["metadata.outputs_schema"] = outputs_schema or {} if run_link is not None: update["$set"]["run_link"] = run_link if progress is not None: update["$set"]["status"] = progress update["$set"]["status"]["updated_at"] = datetime.utcnow() doc = self._collection.find_one_and_update( filter={"_id": _id}, update=[update], return_document=pymongo.ReturnDocument.AFTER, ) return DelegatedOperationDocument().from_pymongo(doc)
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (4)
- fiftyone/core/cli.py (0 hunks)
- fiftyone/factory/repos/delegated_operation.py (3 hunks)
- fiftyone/factory/repos/delegated_operation_doc.py (2 hunks)
- tests/unittests/delegated_operators_tests.py (0 hunks)
💤 Files with no reviewable changes (2)
- fiftyone/core/cli.py
- tests/unittests/delegated_operators_tests.py
🚧 Files skipped from review as they are similar to previous changes (1)
- fiftyone/factory/repos/delegated_operation_doc.py
🔇 Additional comments (3)
fiftyone/factory/repos/delegated_operation.py (3)
18-18
: Importingis_remote_service
FunctionThe import statement correctly imports
is_remote_service
fromfiftyone.internal.util
.
173-173
: EnsureDelegatedOperationDocument
Acceptsis_remote
ParameterWhen initializing
DelegatedOperationDocument
withis_remote=self.is_remote
, verify that theDelegatedOperationDocument
class constructor accepts theis_remote
parameter and handles it appropriately.To confirm that
DelegatedOperationDocument
acceptsis_remote
in its constructor, run the following script:#!/bin/bash # Description: Verify the constructor of `DelegatedOperationDocument` includes `is_remote`. # Expected result: Constructor of `DelegatedOperationDocument` accepts `is_remote`. # Search for the class definition and constructor rg --type python -A 5 'class DelegatedOperationDocument' # Search for the `__init__` method accepting `is_remote` rg --type python 'def __init__\(self, .*is_remote' # Alternatively, inspect the parameters of `DelegatedOperationDocument` ast-grep --lang python --pattern 'class DelegatedOperationDocument: def __init__(self, $_, is_remote, $$$):'
137-137
: Caching the Result ofis_remote_service()
Assigning
self.is_remote = is_remote_service()
in the constructor caches the remote service status at the time of object creation. If the remote service status can change during runtime, consider retrieving this value dynamically within the methods that require it to ensure accuracy.To verify whether
is_remote_service()
returns a static value or if it can change during runtime, run the following script:
What changes are proposed in this pull request?
Changing default state of delegated operations to now be "Scheduled" when running locally and not in a service environment. Pointed cli execution of delegated operations to point to Scheduled.
How is this patch tested? If it is not, please explain why.
Launched FO locally and created a delegated operation through the UI which output the following item into mongo:
Release Notes
Is this a user-facing change that should be mentioned in the release notes?
notes for FiftyOne users.
When running delegated operations locally they will start in the "Scheduled" state. If running these using the cli, then continue to use your commands as normal you'll just see the items are added to the collection as "Scheduled".
What areas of FiftyOne does this PR affect?
fiftyone
Python library changesSummary by CodeRabbit
New Features
Bug Fixes
Documentation
Tests