diff --git a/exegol/console/cli/SyntaxFormat.py b/exegol/console/cli/SyntaxFormat.py new file mode 100644 index 00000000..d165308f --- /dev/null +++ b/exegol/console/cli/SyntaxFormat.py @@ -0,0 +1,10 @@ +from enum import Enum + + +class SyntaxFormat(Enum): + port_sharing = "[default green not bold][:][-][:[-]][:][/default green not bold]" + desktop_config = "[blue]proto[:ip[:port]][/blue]" + volume = "/path/on/host/:/path/in/container/[blue][:ro|rw][/blue]" + + def __str__(self): + return self.value diff --git a/exegol/console/cli/actions/GenericParameters.py b/exegol/console/cli/actions/GenericParameters.py index ca584b30..76a7bedd 100644 --- a/exegol/console/cli/actions/GenericParameters.py +++ b/exegol/console/cli/actions/GenericParameters.py @@ -4,6 +4,7 @@ from exegol.config.UserConfig import UserConfig from exegol.console.cli.ExegolCompleter import ContainerCompleter, ImageCompleter, VoidCompleter, DesktopConfigCompleter +from exegol.console.cli.SyntaxFormat import SyntaxFormat from exegol.console.cli.actions.Command import Option, GroupArg @@ -191,12 +192,12 @@ def __init__(self, groupArgs: List[GroupArg]): action="append", default=[], dest="volumes", - help="Share a new volume between host and exegol (format: --volume /path/on/host/:/path/in/container/[blue][:ro|rw][/blue])") + help=f"Share a new volume between host and exegol (format: --volume {SyntaxFormat.volume})") self.ports = Option("-p", "--port", action="append", default=[], dest="ports", - help="Share a network port between host and exegol (format: --port [:][:][:]. This configuration will disable the shared network with the host.", + help=f"Share a network port between host and exegol (format: --port {SyntaxFormat.port_sharing}). This configuration will disable the default host network.", completer=VoidCompleter) self.hostname = Option("--hostname", dest="hostname", @@ -274,7 +275,7 @@ def __init__(self, groupArgs: List[GroupArg]): default="", action="store", help=f"Configure your exegol desktop ([blue]{'[/blue] or [blue]'.join(UserConfig.desktop_available_proto)}[/blue]) and its exposure " - f"(format: [blue]proto[:ip[:port]][/blue]) " + f"(format: {SyntaxFormat.desktop_config}) " f"(default: [blue]{UserConfig().desktop_default_proto}[/blue]:[blue]{'127.0.0.1' if UserConfig().desktop_default_localhost else '0.0.0.0'}[/blue]:[blue][/blue])", completer=DesktopConfigCompleter) groupArgs.append(GroupArg({"arg": self.desktop, "required": False}, diff --git a/exegol/model/ContainerConfig.py b/exegol/model/ContainerConfig.py index 34fa1131..5d074a74 100644 --- a/exegol/model/ContainerConfig.py +++ b/exegol/model/ContainerConfig.py @@ -14,6 +14,7 @@ from docker.types import Mount from rich.prompt import Prompt +from exegol.console.cli.SyntaxFormat import SyntaxFormat from exegol.config.ConstantConfig import ConstantConfig from exegol.config.EnvInfo import EnvInfo from exegol.config.UserConfig import UserConfig @@ -85,7 +86,7 @@ def __init__(self, container: Optional[Container] = None): self.__sysctls: Dict[str, str] = {} self.__envs: Dict[str, str] = {} self.__labels: Dict[str, str] = {} - self.__ports: Dict[str, Optional[Union[int, Tuple[str, int], List[int], List[Dict[str, Union[int, str]]]]]] = {} + self.__ports: Dict[str, Optional[Union[int, Tuple[str, int], List[Union[int, Tuple[str, int], Dict[str, Union[int, str]]]]]]] = {} self.__extra_host: Dict[str, str] = {} self.interactive: bool = True self.tty: bool = True @@ -512,7 +513,7 @@ def enableDesktop(self, desktop_config: str = ""): def configureDesktop(self, desktop_config: str, create_mode: bool = False): """Configure the exegol desktop feature from user parameters. - Accepted format: 'mode:host:port' + Accepted format: 'proto:host:port' """ self.__desktop_proto = UserConfig().desktop_default_proto self.__desktop_host = "127.0.0.1" if UserConfig().desktop_default_localhost else "0.0.0.0" @@ -537,7 +538,7 @@ def configureDesktop(self, desktop_config: str, create_mode: bool = False): except ValueError: logger.critical(f"Invalid desktop port: '{data}' is not a valid port.") else: - logger.critical(f"Your configuration is invalid, please use the following format:[green]mode:host:port[/green]") + logger.critical(f"Your configuration is invalid, please use the following format: {SyntaxFormat.desktop_config}") if self.__desktop_port is None: logger.debug(f"Desktop port will be set automatically") @@ -1117,9 +1118,13 @@ def addPort(self, if protocol.lower() not in ['tcp', 'udp', 'sctp']: raise ProtocolNotSupported(f"Unknown protocol '{protocol}'") logger.debug(f"Adding port {host_ip}:{port_host} -> {port_container}/{protocol}") - self.__ports[f"{port_container}/{protocol}"] = (host_ip, port_host) + # Casting type because at this stage, the data is only controlled by the wrapper itself. + existing_config = self.__ports.get(f"{port_container}/{protocol}", []) + assert type(existing_config) is list + existing_config.append((host_ip, port_host)) + self.__ports[f"{port_container}/{protocol}"] = existing_config - def getPorts(self) -> Dict[str, Optional[Union[int, Tuple[str, int], List[int], List[Dict[str, Union[int, str]]]]]]: + def getPorts(self) -> Dict[str, Optional[Union[int, Tuple[str, int], List[Union[int, Tuple[str, int], Dict[str, Union[int, str]]]]]]]: """Ports config getter""" return self.__ports @@ -1234,26 +1239,46 @@ def addUserDevice(self, user_device_config: str): self.__addDevice(user_device_config) def addRawPort(self, user_test_port: str): - """Add port config from user input. - Format must be [:][:][:] + """Add port config or range of ports from user input. + Format must be [:][-][:[-]][:] If host_ipv4 is not set, default to 0.0.0.0 - If container_port is not set, default is the same as host port + If container_port is not set, the same port(s) as host port(s) will be used If protocol is not set, default is 'tcp'""" - match = re.search(r"^((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}):)?(\d+):?(\d+)?:?(udp|tcp|sctp)?$", user_test_port) + # Regex to capture port ranges and protocols correctly + match = re.search(r"^((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}):)?(\d+)(-(\d+))?:?(\d+)?(-(\d+))?:?(udp|tcp|sctp)?$", user_test_port) if match is None: - logger.critical(f"Incorrect port syntax ({user_test_port}). Please use this format: [green][:][:][:][/green]") + logger.critical(f"Incorrect port syntax ({user_test_port}). Please use this format: '{SyntaxFormat.port_sharing}'") return host_ip = "0.0.0.0" if match.group(2) is None else match.group(2) - protocol = "tcp" if match.group(5) is None else match.group(5) + protocol = match.group(9) if match.group(9) else 'tcp' try: - host_port = int(match.group(3)) - container_port = host_port if match.group(4) is None else int(match.group(4)) - if host_port > 65535 or container_port > 65535: - raise ValueError - except ValueError: - logger.critical(f"The syntax for opening prot in NAT is incorrect. The ports must be numbers between 0 and 65535. ({match.group(3)}:{match.group(4)})") + start_host_port = int(match.group(3)) + end_host_port = int(match.group(5)) if match.group(5) else start_host_port + start_container_port_defined = match.group(6) is not None + end_container_port_defined = match.group(8) is not None + start_container_port = int(match.group(6)) if start_container_port_defined else start_host_port + # If start_container_port is not defined, use end_host_port, otherwise use start_container_port or end_container_port if defined + if not start_container_port_defined: + end_container_port = end_host_port + else: + end_container_port = int(match.group(8)) if match.group(8) else start_container_port + # check port consistency + if (len(range(start_host_port, end_host_port)) != len(range(start_container_port, end_container_port))) or ( + start_host_port != end_host_port and (not start_container_port_defined and end_container_port_defined)) or ( + start_host_port != end_host_port and (start_container_port_defined and not end_container_port_defined)): + logger.info( + f"Port sharing configuration does not respect standard usage ({user_test_port}). The configuration in the 'Container sumamry' below will be applied. Please consult the help section for more information on using the -p/--port option.") + # Check if start port is lower than end port + if end_host_port < start_host_port or end_container_port < start_container_port: + raise ValueError("End port cannot be less than start port.") + # Check if any port in the range exceeds the valid range + if end_host_port > 65535 or end_container_port > 65535: + raise ValueError(f"The syntax for opening port in NAT is incorrect. The ports must be numbers between 0 and 65535. ({end_host_port}:{end_container_port})") + except ValueError as e: + logger.critical(e) return - self.addPort(host_port, container_port, protocol=protocol, host_ip=host_ip) + for host_port, container_port in zip(range(start_host_port, end_host_port + 1), range(start_container_port, end_container_port + 1)): + self.addPort(host_port, container_port, protocol=protocol, host_ip=host_ip) def addRawEnv(self, env: str): """Parse and add an environment variable from raw user input""" @@ -1375,35 +1400,109 @@ def getTextPorts(self) -> str: Dict Port key = container port/protocol Dict Port Values: None = Random port - int = open port ont he host + int = open port on the host tuple = (host_ip, port) list of int = open multiple host port list of dict = open one or more ports on host, key ('HostIp' / 'HostPort') and value ip or port""" result = '' + # TODO if network bridge and container not started, ports config cannot be printed: add a user warning message + + start_host_ip = None + start_host_port = None + previous_host_port: Optional[Union[str, int]] = None + + start_container_protocole = None + start_container_port = None + previous_container_port = None + + previous_entry = None + for container_config, host_config in self.__ports.items(): - host_info = "Unknown" + # Parse config + current_container_port = int(container_config.split('/')[0]) + current_container_protocole = container_config.split('/')[-1] + # We might have multiple host context config at the same time for the same container config + current_host_contexts: List[Dict[str, Union[str, int]]] = [] + # Init range context, container side + if start_container_port is None: + start_container_port = current_container_port + previous_container_port = current_container_port + start_container_protocole = current_container_protocole + + # Parse host config multiple format if host_config is None: - host_info = "0.0.0.0:" - elif type(host_config) is int: - host_info = f"0.0.0.0:{host_config}" - elif type(host_config) is tuple: - assert len(host_config) == 2 - host_info = f"{host_config[0]}:{host_config[1]}" - elif type(host_config) is list: - sub_info = [] - for sub_host_config in host_config: - if type(sub_host_config) is int: - sub_info.append(f"0.0.0.0:{sub_host_config}") - elif type(sub_host_config) is dict: - sub_port = sub_host_config.get('HostPort', '') + current_host_contexts.append({"ip": "0.0.0.0", + "port": ""}) + else: + if type(host_config) is list: + host_configs: List[Union[int, Tuple[str, int], Dict[str, Union[int, str]]]] = host_config + else: + host_configs = cast(List[Union[int, Tuple[str, int], Dict[str, Union[int, str]]]], [host_config]) + + for current_host_config in host_configs: + if type(current_host_config) is int: + current_host_contexts.append({"ip": "0.0.0.0", + "port": current_host_config}) + elif type(current_host_config) is tuple: + assert len(current_host_config) == 2 + current_host_contexts.append({"ip": current_host_config[0], + "port": int(current_host_config[1])}) + elif type(current_host_config) is dict: + sub_port = current_host_config.get('HostPort') if sub_port is None: sub_port = "" - sub_info.append(f"{sub_host_config.get('HostIp', '0.0.0.0')}:{sub_port}") - if len(sub_info) > 0: - host_info = ", ".join(sub_info) - else: - logger.debug(f"Unknown port config: {type(host_config)}={host_config}") - result += f"{host_info} :right_arrow: {container_config}{os.linesep}" + elif type(sub_port) is str: + sub_port = int(sub_port) + current_host_contexts.append({"ip": current_host_config.get('HostIp', '0.0.0.0'), + "port": sub_port}) + else: + logger.debug(f"Unknown port config: {type(host_config)}={host_config} :right_arrow: {container_config}") + continue + + for current_context in current_host_contexts: + current_host_port = current_context.get("port") + current_host_ip = current_context.get('ip') + + # Init range context + if start_host_port is None: + start_host_port = current_host_port + previous_host_port = current_host_port + start_host_ip = current_host_ip + # Check if range continue + elif (start_host_ip == current_host_ip and + current_container_protocole == start_container_protocole and + (current_host_port == previous_host_port or + current_host_port == previous_host_port + 1) and + (current_container_port == previous_container_port or + current_container_port == previous_container_port + 1)): + previous_host_port = current_host_port + previous_container_port = current_container_port + # If range exit, submit previous entry + reset new range context + else: + # Register previous range + if previous_entry: + result += previous_entry + # reset context host and container side + start_host_port = current_host_port + previous_host_port = current_host_port + start_host_ip = current_host_ip + start_container_port = current_container_port + previous_container_port = current_container_port + start_container_protocole = current_container_protocole + + # Register last range + range_host_port = "" + if type(start_host_port) is int: + assert type(previous_host_port) is int + range_host_port = "" if previous_host_port - start_host_port <= 0 else f"-{previous_host_port}" + range_container_port = "" if previous_container_port - start_container_port <= 0 else f"-{previous_container_port}" + previous_entry = (f"{start_host_ip}:{start_host_port}{range_host_port} :right_arrow: " + f"{start_container_port}{range_container_port}/{start_container_protocole}{os.linesep}") + + # Submit last entry is any + if previous_entry: + result += previous_entry + return result def __str__(self):