Skip to content

Commit

Permalink
Merge pull request #1318 from stephenfin/missing-lzma
Browse files Browse the repository at this point in the history
Handle absent lzma library
  • Loading branch information
kmvanbrunt authored Sep 15, 2024
2 parents f24fe11 + 63848b9 commit b82117c
Showing 1 changed file with 52 additions and 26 deletions.
78 changes: 52 additions & 26 deletions cmd2/cmd2.py
Original file line number Diff line number Diff line change
Expand Up @@ -4627,7 +4627,7 @@ def do_ipy(self, _: argparse.Namespace) -> Optional[bool]: # pragma: no cover
)

# Start IPython
start_ipython(config=config, argv=[], user_ns=local_vars)
start_ipython(config=config, argv=[], user_ns=local_vars) # type: ignore[no-untyped-call]
self.poutput("Now exiting IPython shell...")

# The IPython application is a singleton and won't be recreated next time
Expand Down Expand Up @@ -4814,11 +4814,9 @@ def _initialize_history(self, hist_file: str) -> None:
previous sessions will be included. Additionally, all history will be written
to this file when the application exits.
"""
import json
import lzma

self.history = History()
# with no persistent history, nothing else in this method is relevant

# With no persistent history, nothing else in this method is relevant
if not hist_file:
self.persistent_history_file = hist_file
return
Expand All @@ -4839,28 +4837,60 @@ def _initialize_history(self, hist_file: str) -> None:
self.perror(f"Error creating persistent history file directory '{hist_file_dir}': {ex}")
return

# Read and process history file
# Read history file
try:
with open(hist_file, 'rb') as fobj:
compressed_bytes = fobj.read()
history_json = lzma.decompress(compressed_bytes).decode(encoding='utf-8')
self.history = History.from_json(history_json)
except FileNotFoundError:
# Just use an empty history
pass
compressed_bytes = bytes()
except OSError as ex:
self.perror(f"Cannot read persistent history file '{hist_file}': {ex}")
return
except (json.JSONDecodeError, lzma.LZMAError, KeyError, UnicodeDecodeError, ValueError) as ex:

# Register a function to write history at save
import atexit

self.persistent_history_file = hist_file
atexit.register(self._persist_history)

# Empty or nonexistent history file. Nothing more to do.
if not compressed_bytes:
return

# Decompress history data
try:
import lzma as decompress_lib

decompress_exceptions: Tuple[type[Exception]] = (decompress_lib.LZMAError,)
except ModuleNotFoundError: # pragma: no cover
import bz2 as decompress_lib # type: ignore[no-redef]

decompress_exceptions: Tuple[type[Exception]] = (OSError, ValueError) # type: ignore[no-redef]

try:
history_json = decompress_lib.decompress(compressed_bytes).decode(encoding='utf-8')
except decompress_exceptions as ex:
self.perror(
f"Error decompressing persistent history data '{hist_file}': {ex}\n"
f"The history file will be recreated when this application exits."
)
return

# Decode history json
import json

try:
self.history = History.from_json(history_json)
except (json.JSONDecodeError, KeyError, ValueError) as ex:
self.perror(
f"Error processing persistent history file '{hist_file}': {ex}\n"
f"Error processing persistent history data '{hist_file}': {ex}\n"
f"The history file will be recreated when this application exits."
)
return

self.history.start_session()
self.persistent_history_file = hist_file

# populate readline history
# Populate readline history
if rl_type != RlType.NONE:
last = None
for item in self.history:
Expand All @@ -4872,25 +4902,21 @@ def _initialize_history(self, hist_file: str) -> None:
readline.add_history(line)
last = line

# register a function to write history at save
# if the history file is in plain text format from 0.9.12 or lower
# this will fail, and the history in the plain text file will be lost
import atexit

atexit.register(self._persist_history)

def _persist_history(self) -> None:
"""Write history out to the persistent history file as compressed JSON"""
import lzma

if not self.persistent_history_file:
return

self.history.truncate(self._persistent_history_length)
try:
history_json = self.history.to_json()
compressed_bytes = lzma.compress(history_json.encode(encoding='utf-8'))
import lzma as compress_lib
except ModuleNotFoundError: # pragma: no cover
import bz2 as compress_lib # type: ignore[no-redef]

self.history.truncate(self._persistent_history_length)
history_json = self.history.to_json()
compressed_bytes = compress_lib.compress(history_json.encode(encoding='utf-8'))

try:
with open(self.persistent_history_file, 'wb') as fobj:
fobj.write(compressed_bytes)
except OSError as ex:
Expand Down

0 comments on commit b82117c

Please sign in to comment.