diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index 1381f0d5..280f5382 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -4627,7 +4627,7 @@ def do_ipy(self, _: argparse.Namespace) -> Optional[bool]: # pragma: no cover ) # Start IPython - start_ipython(config=config, argv=[], user_ns=local_vars) + start_ipython(config=config, argv=[], user_ns=local_vars) # type: ignore[no-untyped-call] self.poutput("Now exiting IPython shell...") # The IPython application is a singleton and won't be recreated next time @@ -4814,11 +4814,9 @@ def _initialize_history(self, hist_file: str) -> None: previous sessions will be included. Additionally, all history will be written to this file when the application exits. """ - import json - import lzma - self.history = History() - # with no persistent history, nothing else in this method is relevant + + # With no persistent history, nothing else in this method is relevant if not hist_file: self.persistent_history_file = hist_file return @@ -4839,28 +4837,60 @@ def _initialize_history(self, hist_file: str) -> None: self.perror(f"Error creating persistent history file directory '{hist_file_dir}': {ex}") return - # Read and process history file + # Read history file try: with open(hist_file, 'rb') as fobj: compressed_bytes = fobj.read() - history_json = lzma.decompress(compressed_bytes).decode(encoding='utf-8') - self.history = History.from_json(history_json) except FileNotFoundError: - # Just use an empty history - pass + compressed_bytes = bytes() except OSError as ex: self.perror(f"Cannot read persistent history file '{hist_file}': {ex}") return - except (json.JSONDecodeError, lzma.LZMAError, KeyError, UnicodeDecodeError, ValueError) as ex: + + # Register a function to write history at save + import atexit + + self.persistent_history_file = hist_file + atexit.register(self._persist_history) + + # Empty or nonexistent history file. Nothing more to do. + if not compressed_bytes: + return + + # Decompress history data + try: + import lzma as decompress_lib + + decompress_exceptions: Tuple[type[Exception]] = (decompress_lib.LZMAError,) + except ModuleNotFoundError: # pragma: no cover + import bz2 as decompress_lib # type: ignore[no-redef] + + decompress_exceptions: Tuple[type[Exception]] = (OSError, ValueError) # type: ignore[no-redef] + + try: + history_json = decompress_lib.decompress(compressed_bytes).decode(encoding='utf-8') + except decompress_exceptions as ex: + self.perror( + f"Error decompressing persistent history data '{hist_file}': {ex}\n" + f"The history file will be recreated when this application exits." + ) + return + + # Decode history json + import json + + try: + self.history = History.from_json(history_json) + except (json.JSONDecodeError, KeyError, ValueError) as ex: self.perror( - f"Error processing persistent history file '{hist_file}': {ex}\n" + f"Error processing persistent history data '{hist_file}': {ex}\n" f"The history file will be recreated when this application exits." ) + return self.history.start_session() - self.persistent_history_file = hist_file - # populate readline history + # Populate readline history if rl_type != RlType.NONE: last = None for item in self.history: @@ -4872,25 +4902,21 @@ def _initialize_history(self, hist_file: str) -> None: readline.add_history(line) last = line - # register a function to write history at save - # if the history file is in plain text format from 0.9.12 or lower - # this will fail, and the history in the plain text file will be lost - import atexit - - atexit.register(self._persist_history) - def _persist_history(self) -> None: """Write history out to the persistent history file as compressed JSON""" - import lzma - if not self.persistent_history_file: return - self.history.truncate(self._persistent_history_length) try: - history_json = self.history.to_json() - compressed_bytes = lzma.compress(history_json.encode(encoding='utf-8')) + import lzma as compress_lib + except ModuleNotFoundError: # pragma: no cover + import bz2 as compress_lib # type: ignore[no-redef] + + self.history.truncate(self._persistent_history_length) + history_json = self.history.to_json() + compressed_bytes = compress_lib.compress(history_json.encode(encoding='utf-8')) + try: with open(self.persistent_history_file, 'wb') as fobj: fobj.write(compressed_bytes) except OSError as ex: