Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for commercial versions of Qt #878

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4180492
Add install-qt-commercial feature and tests
Kidev Jan 6, 2025
1fe3990
Make the auto-answers parameters, fix linter issues
Kidev Jan 6, 2025
3169929
Fork and execv instead of using subprocess
Kidev Jan 7, 2025
c13bbc2
Return to simpler process execution method version
Kidev Jan 8, 2025
0790ad3
Fix test
Kidev Jan 9, 2025
93f3a11
Move commercial installer into its own file
Kidev Jan 11, 2025
cfc21ab
Fix shadowing of symbol platform causing errors
Kidev Jan 11, 2025
c8d86dc
Adapt test_cli for argparse format changes on py 3.13+
Kidev Jan 11, 2025
399acf0
Fix some errors, monkeypatch install test
Kidev Jan 11, 2025
976998d
Add --override super command
Kidev Jan 11, 2025
330683a
Properly handle --override and grab all the remaining commands when n…
Kidev Jan 11, 2025
fcb9805
Fix tests
Kidev Jan 12, 2025
0973ecc
Add base for modules, some niche features are not yet entirely implem…
Kidev Jan 12, 2025
686fc43
Fix some mistakes
Kidev Jan 12, 2025
4e3d0e2
Fix errors made with the monkeypatch, update Settings to make sure it…
Kidev Jan 12, 2025
fcd55e1
Tests commercial (#20)
Kidev Jan 18, 2025
e9ae059
Fix windows issue
Kidev Jan 18, 2025
7b04ddf
Hidden summon works
Kidev Jan 18, 2025
084e198
Remove both subprocess direct calls
Kidev Jan 18, 2025
1952019
Dipose of temp folder
Kidev Jan 18, 2025
de2baf0
Fix path issue
Kidev Jan 18, 2025
b0a5ec6
Add list-qt-commercial command
Kidev Jan 18, 2025
b126bc6
Fix help info
Kidev Jan 18, 2025
b77e810
Make no params valid for list-qt-commercial
Kidev Jan 18, 2025
f1d436c
Fix lint errors, and param overflow when no args are passed to list
Kidev Jan 18, 2025
26dd68e
Fix search
Kidev Jan 18, 2025
55d6906
Add tests for coverage, fix lint
Kidev Jan 18, 2025
90b53f2
Test for overwriting, and for cache usage coverage
Kidev Jan 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ Qt/
.eggs
qtaccount.ini
.pytest_cache
.run/
349 changes: 349 additions & 0 deletions aqt/commercial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,349 @@
import json
import os
from dataclasses import dataclass
from logging import Logger, getLogger
from pathlib import Path
from typing import List, Optional

import requests
from defusedxml import ElementTree

from aqt.exceptions import DiskAccessNotPermitted
from aqt.helper import Settings, get_os_name, get_qt_account_path, get_qt_installer_name, safely_run, safely_run_save_output
from aqt.metadata import Version


@dataclass
class QtPackageInfo:
name: str
displayname: str
version: str


class QtPackageManager:
def __init__(self, arch: str, version: Version, target: str):
self.arch = arch
self.version = version
self.target = target
self.cache_dir = self._get_cache_dir()
self.packages: List[QtPackageInfo] = []

def _get_cache_dir(self) -> Path:
"""Create and return cache directory path."""
base_cache = Settings.qt_installer_cache_path
cache_path = os.path.join(base_cache, self.target, self.arch, str(self.version))
Path(cache_path).mkdir(parents=True, exist_ok=True)
return Path(cache_path)

def _get_cache_file(self) -> Path:
"""Get the cache file path."""
return self.cache_dir / "packages.json"

def _save_to_cache(self) -> None:
"""Save packages information to cache."""
cache_data = [{"name": pkg.name, "displayname": pkg.displayname, "version": pkg.version} for pkg in self.packages]

with open(self._get_cache_file(), "w") as f:
json.dump(cache_data, f, indent=2)

def _load_from_cache(self) -> bool:
"""Load packages information from cache if available."""
cache_file = self._get_cache_file()
if not cache_file.exists():
return False

try:
with open(cache_file, "r") as f:
cache_data = json.load(f)
self.packages = [
QtPackageInfo(name=pkg["name"], displayname=pkg["displayname"], version=pkg["version"])
for pkg in cache_data
]
return True
except (json.JSONDecodeError, KeyError):
return False

def _parse_packages_xml(self, xml_content: str) -> None:
"""Parse packages XML content and extract package information using defusedxml."""
try:
# Use defusedxml.ElementTree to safely parse the XML content
root = ElementTree.fromstring(xml_content)
self.packages = []

# Find all package elements using XPath-like expression
# Note: defusedxml supports a subset of XPath
for pkg in root.findall(".//package"):
name = pkg.get("name", "")
displayname = pkg.get("displayname", "")
version = pkg.get("version", "")

if all([name, displayname, version]): # Ensure all required attributes are present
self.packages.append(QtPackageInfo(name=name, displayname=displayname, version=version))
except ElementTree.ParseError as e:
raise RuntimeError(f"Failed to parse package XML: {e}")

def _get_version_string(self) -> str:
"""Get formatted version string for package names."""
return f"{self.version.major}{self.version.minor}{self.version.patch}"

def _get_base_package_name(self) -> str:
"""Get the base package name for the current configuration."""
version_str = self._get_version_string()
return f"qt.qt{self.version.major}.{version_str}"

def gather_packages(self, installer_path: str) -> None:
"""Gather package information using qt installer search command."""
if self._load_from_cache():
return

version_str = self._get_version_string()
base_package = f"qt.qt{self.version.major}.{version_str}"

cmd = [
installer_path,
"--accept-licenses",
"--accept-obligations",
"--confirm-command",
"--default-answer",
"search",
base_package,
]

try:
output = safely_run_save_output(installer_path, cmd, Settings.qt_installer_timeout)

# Extract the XML portion from the output
xml_start = output.find("<availablepackages>")
xml_end = output.find("</availablepackages>") + len("</availablepackages>")

if xml_start != -1 and xml_end != -1:
xml_content = output[xml_start:xml_end]
self._parse_packages_xml(xml_content)
self._save_to_cache()
else:
raise RuntimeError("Failed to find package information in installer output")

except Exception as e:
raise RuntimeError(f"Failed to get package information: {e}")

def get_install_command(self, modules: Optional[List[str]], temp_dir: str) -> List[str]:
"""Generate installation command based on requested modules."""
package_name = f"{self._get_base_package_name()}.{self.arch}"
cmd = ["install", package_name]

# No modules requested, return base package only
if not modules:
return cmd

# Ensure package cache exists
self.gather_packages(temp_dir)

if "all" in modules:
# Find all addon and direct module packages
for pkg in self.packages:
if f"{self._get_base_package_name()}.addons." in pkg.name or pkg.name.startswith(
f"{self._get_base_package_name()}."
):
module_name = pkg.name.split(".")[-1]
if module_name != self.arch: # Skip the base package
cmd.append(pkg.name)
else:
# Add specifically requested modules that exist in either format
for module in modules:
addon_name = f"{self._get_base_package_name()}.addons.{module}"
direct_name = f"{self._get_base_package_name()}.{module}"

# Check if either package name exists
matching_pkg = next(
(pkg.name for pkg in self.packages if pkg.name == addon_name or pkg.name == direct_name), None
)

if matching_pkg:
cmd.append(matching_pkg)

return cmd


class CommercialInstaller:
"""Qt Commercial installer that handles module installation and package management."""

def __init__(
self,
target: str,
arch: Optional[str],
version: Optional[str],
username: Optional[str] = None,
password: Optional[str] = None,
output_dir: Optional[str] = None,
logger: Optional[Logger] = None,
base_url: str = "https://download.qt.io",
override: Optional[list[str]] = None,
modules: Optional[List[str]] = None,
no_unattended: bool = False,
):
self.override = override
self.target = target
self.arch = arch or ""
self.version = Version(version) if version else Version("0.0.0")
self.username = username
self.password = password
self.output_dir = output_dir
self.logger = logger or getLogger(__name__)
self.base_url = base_url
self.modules = modules
self.no_unattended = no_unattended

# Set OS-specific properties
self.os_name = get_os_name()
self._installer_filename = get_qt_installer_name()
self.qt_account = get_qt_account_path()
self.package_manager = QtPackageManager(self.arch, self.version, self.target)

@staticmethod
def get_auto_answers() -> str:
"""Get auto-answer options from settings."""
settings_map = {
"OperationDoesNotExistError": Settings.qt_installer_operationdoesnotexisterror,
"OverwriteTargetDirectory": Settings.qt_installer_overwritetargetdirectory,
"stopProcessesForUpdates": Settings.qt_installer_stopprocessesforupdates,
"installationErrorWithCancel": Settings.qt_installer_installationerrorwithcancel,
"installationErrorWithIgnore": Settings.qt_installer_installationerrorwithignore,
"AssociateCommonFiletypes": Settings.qt_installer_associatecommonfiletypes,
"telemetry-question": Settings.qt_installer_telemetry,
}

answers = []
for key, value in settings_map.items():
answers.append(f"{key}={value}")

return ",".join(answers)

@staticmethod
def build_command(
installer_path: str,
override: Optional[List[str]] = None,
username: Optional[str] = None,
password: Optional[str] = None,
output_dir: Optional[str] = None,
no_unattended: bool = False,
) -> List[str]:
"""Build the installation command with proper safeguards."""
cmd = [installer_path]

# Add unattended flags unless explicitly disabled
if not no_unattended:
cmd.extend(["--accept-licenses", "--accept-obligations", "--confirm-command"])

if override:
# When using override, still include unattended flags unless disabled
cmd.extend(override)
return cmd

# Add authentication if provided
if username and password:
cmd.extend(["--email", username, "--pw", password])

# Add output directory if specified
if output_dir:
cmd.extend(["--root", str(Path(output_dir).resolve())])

# Add auto-answer options from settings
auto_answers = CommercialInstaller.get_auto_answers()
if auto_answers:
cmd.extend(["--auto-answer", auto_answers])

return cmd

def install(self) -> None:
"""Run the Qt installation process."""
if (
not self.qt_account.exists()
and not (self.username and self.password)
and not os.environ.get("QT_INSTALLER_JWT_TOKEN")
):
raise RuntimeError(
"No Qt account credentials found. Provide username and password or ensure qtaccount.ini exists."
)

# Check output directory if specified
if self.output_dir:
output_path = Path(self.output_dir) / str(self.version)
if output_path.exists():
if Settings.qt_installer_overwritetargetdirectory.lower() == "yes":
self.logger.warning(f"Target directory {output_path} exists - removing as overwrite is enabled")
try:
import shutil

shutil.rmtree(output_path)
except (OSError, PermissionError) as e:
raise DiskAccessNotPermitted(f"Failed to remove existing target directory {output_path}: {str(e)}")
else:
msg = (
f"Target directory {output_path} already exists. "
"Set overwrite_target_directory='Yes' in settings.ini to overwrite, or select another directory."
)
raise DiskAccessNotPermitted(msg)

# Setup cache directory
cache_path = Path(Settings.qt_installer_cache_path)
cache_path.mkdir(parents=True, exist_ok=True)

import shutil

temp_dir = Settings.qt_installer_temp_path
temp_path = Path(temp_dir)
if temp_path.exists():
shutil.rmtree(temp_dir)
temp_path.mkdir(parents=True, exist_ok=True)
installer_path = temp_path / self._installer_filename

self.logger.info(f"Downloading Qt installer to {installer_path}")
self.download_installer(installer_path, Settings.qt_installer_timeout)

try:
cmd = []
if self.override:
cmd = self.build_command(str(installer_path), override=self.override, no_unattended=self.no_unattended)
else:
# Initialize package manager and gather packages
self.package_manager.gather_packages(str(installer_path))

base_cmd = self.build_command(
str(installer_path.absolute()),
username=self.username,
password=self.password,
output_dir=self.output_dir,
no_unattended=self.no_unattended,
)

cmd = [
*base_cmd,
*self.package_manager.get_install_command(self.modules, temp_dir),
]

self.logger.info(f"Running: {cmd}")

safely_run(installer_path, cmd, Settings.qt_installer_timeout)
except Exception as e:
self.logger.error(f"Installation failed with exit code {e.__str__()}")
raise
finally:
self.logger.info("Qt installation completed successfully")

def download_installer(self, target_path: Path, timeout: int) -> None:
url = f"{self.base_url}/official_releases/online_installers/{self._installer_filename}"
try:
response = requests.get(url, stream=True, timeout=timeout)
response.raise_for_status()

with open(target_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
if self.os_name != "windows":
os.chmod(target_path, 0o500)
except Exception as e:
raise RuntimeError(f"Failed to download installer: {e}")

def _get_package_name(self) -> str:
qt_version = f"{self.version.major}{self.version.minor}{self.version.patch}"
return f"qt.qt{self.version.major}.{qt_version}.{self.arch}"
Loading
Loading