Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a base sigint_handler() method to the CommandSet. #1279

Merged
merged 1 commit into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions cmd2/cmd2.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,9 @@ def __init__(
self.suggest_similar_command = suggest_similar_command
self.default_suggestion_message = "Did you mean {}?"

# the current command being executed
self.current_command: Optional[Statement] = None

def find_commandsets(self, commandset_type: Type[CommandSet], *, subclass_match: bool = False) -> List[CommandSet]:
"""
Find all CommandSets that match the provided CommandSet type.
Expand Down Expand Up @@ -2363,7 +2366,13 @@ def sigint_handler(self, signum: int, _: FrameType) -> None:

# Check if we are allowed to re-raise the KeyboardInterrupt
if not self.sigint_protection:
self._raise_keyboard_interrupt()
raise_interrupt = True
if self.current_command is not None:
command_set = self.find_commandset_for_command(self.current_command.command)
if command_set is not None:
raise_interrupt = not command_set.sigint_handler()
if raise_interrupt:
self._raise_keyboard_interrupt()

def _raise_keyboard_interrupt(self) -> None:
"""Helper function to raise a KeyboardInterrupt"""
Expand Down Expand Up @@ -2953,7 +2962,11 @@ def onecmd(self, statement: Union[Statement, str], *, add_to_history: bool = Tru
):
self.history.append(statement)

stop = func(statement)
try:
self.current_command = statement
stop = func(statement)
finally:
self.current_command = None

else:
stop = self.default(statement)
Expand Down
9 changes: 9 additions & 0 deletions cmd2/command_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,12 @@ def remove_settable(self, name: str) -> None:
del self._settables[name]
except KeyError:
raise KeyError(name + " is not a settable parameter")

def sigint_handler(self) -> bool:
"""
Handle a SIGINT that occurred for a command in this CommandSet.

:return: True if this completes the interrupt handling and no KeyboardInterrupt will be raised.
False to raise a KeyboardInterrupt.
"""
return False
36 changes: 36 additions & 0 deletions tests_isolated/test_commandset/test_commandset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

import argparse
import signal
from typing import (
List,
)
Expand Down Expand Up @@ -197,6 +198,9 @@ def test_load_commands(command_sets_manual, capsys):
assert command_sets_manual.find_commandsets(CommandSetA)[0] is cmd_set
assert command_sets_manual.find_commandset_for_command('elderberry') is cmd_set

out = command_sets_manual.app_cmd('apple')
assert 'Apple!' in out.stdout

# Make sure registration callbacks ran
out, err = capsys.readouterr()
assert "in on_register now" in out
Expand Down Expand Up @@ -573,6 +577,38 @@ def test_subcommands(command_sets_manual):
command_sets_manual.unregister_command_set(base_cmds)


def test_commandset_sigint(command_sets_manual):
# shows that the command is able to continue execution if the sigint_handler
# returns True that we've handled interrupting the command.
class SigintHandledCommandSet(cmd2.CommandSet):
def do_foo(self, _):
self._cmd.poutput('in foo')
self._cmd.sigint_handler(signal.SIGINT, None)
self._cmd.poutput('end of foo')

def sigint_handler(self) -> bool:
return True

cs1 = SigintHandledCommandSet()
command_sets_manual.register_command_set(cs1)
out = command_sets_manual.app_cmd('foo')
assert 'in foo' in out.stdout
assert 'end of foo' in out.stdout

# shows that the command is interrupted if we don't report we've handled the sigint
class SigintUnhandledCommandSet(cmd2.CommandSet):
def do_bar(self, _):
self._cmd.poutput('in do bar')
self._cmd.sigint_handler(signal.SIGINT, None)
self._cmd.poutput('end of do bar')

cs2 = SigintUnhandledCommandSet()
command_sets_manual.register_command_set(cs2)
out = command_sets_manual.app_cmd('bar')
assert 'in do bar' in out.stdout
assert 'end of do bar' not in out.stdout


def test_nested_subcommands(command_sets_manual):
base_cmds = LoadableBase(1)
pasta_cmds = LoadablePastaStir(1)
Expand Down
Loading