Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle absent lzma library #1318

Merged
merged 3 commits into from
Sep 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 52 additions & 26 deletions cmd2/cmd2.py
Original file line number Diff line number Diff line change
Expand Up @@ -4627,7 +4627,7 @@ def do_ipy(self, _: argparse.Namespace) -> Optional[bool]: # pragma: no cover
)

# Start IPython
start_ipython(config=config, argv=[], user_ns=local_vars)
start_ipython(config=config, argv=[], user_ns=local_vars) # type: ignore[no-untyped-call]
self.poutput("Now exiting IPython shell...")

# The IPython application is a singleton and won't be recreated next time
Expand Down Expand Up @@ -4814,11 +4814,9 @@ def _initialize_history(self, hist_file: str) -> None:
previous sessions will be included. Additionally, all history will be written
to this file when the application exits.
"""
import json
import lzma

self.history = History()
# with no persistent history, nothing else in this method is relevant

# With no persistent history, nothing else in this method is relevant
if not hist_file:
self.persistent_history_file = hist_file
return
Expand All @@ -4839,28 +4837,60 @@ def _initialize_history(self, hist_file: str) -> None:
self.perror(f"Error creating persistent history file directory '{hist_file_dir}': {ex}")
return

# Read and process history file
# Read history file
try:
with open(hist_file, 'rb') as fobj:
compressed_bytes = fobj.read()
history_json = lzma.decompress(compressed_bytes).decode(encoding='utf-8')
self.history = History.from_json(history_json)
except FileNotFoundError:
# Just use an empty history
pass
compressed_bytes = bytes()
except OSError as ex:
self.perror(f"Cannot read persistent history file '{hist_file}': {ex}")
return
except (json.JSONDecodeError, lzma.LZMAError, KeyError, UnicodeDecodeError, ValueError) as ex:

# Register a function to write history at save
import atexit

self.persistent_history_file = hist_file
atexit.register(self._persist_history)

# Empty or nonexistent history file. Nothing more to do.
if not compressed_bytes:
return

# Decompress history data
try:
import lzma as decompress_lib

decompress_exceptions: Tuple[type[Exception]] = (decompress_lib.LZMAError,)
except ModuleNotFoundError: # pragma: no cover
import bz2 as decompress_lib # type: ignore[no-redef]

decompress_exceptions: Tuple[type[Exception]] = (OSError, ValueError) # type: ignore[no-redef]

try:
history_json = decompress_lib.decompress(compressed_bytes).decode(encoding='utf-8')
except decompress_exceptions as ex:
self.perror(
f"Error decompressing persistent history data '{hist_file}': {ex}\n"
f"The history file will be recreated when this application exits."
)
return

# Decode history json
import json

try:
self.history = History.from_json(history_json)
except (json.JSONDecodeError, KeyError, ValueError) as ex:
self.perror(
f"Error processing persistent history file '{hist_file}': {ex}\n"
f"Error processing persistent history data '{hist_file}': {ex}\n"
f"The history file will be recreated when this application exits."
)
return

self.history.start_session()
self.persistent_history_file = hist_file

# populate readline history
# Populate readline history
if rl_type != RlType.NONE:
last = None
for item in self.history:
Expand All @@ -4872,25 +4902,21 @@ def _initialize_history(self, hist_file: str) -> None:
readline.add_history(line)
last = line

# register a function to write history at save
# if the history file is in plain text format from 0.9.12 or lower
# this will fail, and the history in the plain text file will be lost
import atexit

atexit.register(self._persist_history)

def _persist_history(self) -> None:
"""Write history out to the persistent history file as compressed JSON"""
import lzma

if not self.persistent_history_file:
return

self.history.truncate(self._persistent_history_length)
try:
history_json = self.history.to_json()
compressed_bytes = lzma.compress(history_json.encode(encoding='utf-8'))
import lzma as compress_lib
except ModuleNotFoundError: # pragma: no cover
import bz2 as compress_lib # type: ignore[no-redef]

self.history.truncate(self._persistent_history_length)
history_json = self.history.to_json()
compressed_bytes = compress_lib.compress(history_json.encode(encoding='utf-8'))

try:
with open(self.persistent_history_file, 'wb') as fobj:
fobj.write(compressed_bytes)
except OSError as ex:
Expand Down
Loading