Skip to content

Commit

Permalink
Merge pull request #203 from LOLOLEKIK/master
Browse files Browse the repository at this point in the history
FEATURE - Add range port
  • Loading branch information
Dramelac authored Feb 24, 2024
2 parents e144b3b + 355d04c commit eeede32
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 42 deletions.
10 changes: 10 additions & 0 deletions exegol/console/cli/SyntaxFormat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from enum import Enum


class SyntaxFormat(Enum):
port_sharing = "[default green not bold][<host_ipv4>:]<host_port>[-<end_host_port>][:<container_port>[-<end_container_port>]][:<protocol>][/default green not bold]"
desktop_config = "[blue]proto[:ip[:port]][/blue]"
volume = "/path/on/host/:/path/in/container/[blue][:ro|rw][/blue]"

def __str__(self):
return self.value
7 changes: 4 additions & 3 deletions exegol/console/cli/actions/GenericParameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from exegol.config.UserConfig import UserConfig
from exegol.console.cli.ExegolCompleter import ContainerCompleter, ImageCompleter, VoidCompleter, DesktopConfigCompleter
from exegol.console.cli.SyntaxFormat import SyntaxFormat
from exegol.console.cli.actions.Command import Option, GroupArg


Expand Down Expand Up @@ -191,12 +192,12 @@ def __init__(self, groupArgs: List[GroupArg]):
action="append",
default=[],
dest="volumes",
help="Share a new volume between host and exegol (format: --volume /path/on/host/:/path/in/container/[blue][:ro|rw][/blue])")
help=f"Share a new volume between host and exegol (format: --volume {SyntaxFormat.volume})")
self.ports = Option("-p", "--port",
action="append",
default=[],
dest="ports",
help="Share a network port between host and exegol (format: --port [<host_ipv4>:]<host_port>[:<container_port>][:<protocol>]. This configuration will disable the shared network with the host.",
help=f"Share a network port between host and exegol (format: --port {SyntaxFormat.port_sharing}). This configuration will disable the default host network.",
completer=VoidCompleter)
self.hostname = Option("--hostname",
dest="hostname",
Expand Down Expand Up @@ -274,7 +275,7 @@ def __init__(self, groupArgs: List[GroupArg]):
default="",
action="store",
help=f"Configure your exegol desktop ([blue]{'[/blue] or [blue]'.join(UserConfig.desktop_available_proto)}[/blue]) and its exposure "
f"(format: [blue]proto[:ip[:port]][/blue]) "
f"(format: {SyntaxFormat.desktop_config}) "
f"(default: [blue]{UserConfig().desktop_default_proto}[/blue]:[blue]{'127.0.0.1' if UserConfig().desktop_default_localhost else '0.0.0.0'}[/blue]:[blue]<random>[/blue])",
completer=DesktopConfigCompleter)
groupArgs.append(GroupArg({"arg": self.desktop, "required": False},
Expand Down
177 changes: 138 additions & 39 deletions exegol/model/ContainerConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from docker.types import Mount
from rich.prompt import Prompt

from exegol.console.cli.SyntaxFormat import SyntaxFormat
from exegol.config.ConstantConfig import ConstantConfig
from exegol.config.EnvInfo import EnvInfo
from exegol.config.UserConfig import UserConfig
Expand Down Expand Up @@ -85,7 +86,7 @@ def __init__(self, container: Optional[Container] = None):
self.__sysctls: Dict[str, str] = {}
self.__envs: Dict[str, str] = {}
self.__labels: Dict[str, str] = {}
self.__ports: Dict[str, Optional[Union[int, Tuple[str, int], List[int], List[Dict[str, Union[int, str]]]]]] = {}
self.__ports: Dict[str, Optional[Union[int, Tuple[str, int], List[Union[int, Tuple[str, int], Dict[str, Union[int, str]]]]]]] = {}
self.__extra_host: Dict[str, str] = {}
self.interactive: bool = True
self.tty: bool = True
Expand Down Expand Up @@ -512,7 +513,7 @@ def enableDesktop(self, desktop_config: str = ""):

def configureDesktop(self, desktop_config: str, create_mode: bool = False):
"""Configure the exegol desktop feature from user parameters.
Accepted format: 'mode:host:port'
Accepted format: 'proto:host:port'
"""
self.__desktop_proto = UserConfig().desktop_default_proto
self.__desktop_host = "127.0.0.1" if UserConfig().desktop_default_localhost else "0.0.0.0"
Expand All @@ -537,7 +538,7 @@ def configureDesktop(self, desktop_config: str, create_mode: bool = False):
except ValueError:
logger.critical(f"Invalid desktop port: '{data}' is not a valid port.")
else:
logger.critical(f"Your configuration is invalid, please use the following format:[green]mode:host:port[/green]")
logger.critical(f"Your configuration is invalid, please use the following format: {SyntaxFormat.desktop_config}")

if self.__desktop_port is None:
logger.debug(f"Desktop port will be set automatically")
Expand Down Expand Up @@ -1117,9 +1118,13 @@ def addPort(self,
if protocol.lower() not in ['tcp', 'udp', 'sctp']:
raise ProtocolNotSupported(f"Unknown protocol '{protocol}'")
logger.debug(f"Adding port {host_ip}:{port_host} -> {port_container}/{protocol}")
self.__ports[f"{port_container}/{protocol}"] = (host_ip, port_host)
# Casting type because at this stage, the data is only controlled by the wrapper itself.
existing_config = self.__ports.get(f"{port_container}/{protocol}", [])
assert type(existing_config) is list
existing_config.append((host_ip, port_host))
self.__ports[f"{port_container}/{protocol}"] = existing_config

def getPorts(self) -> Dict[str, Optional[Union[int, Tuple[str, int], List[int], List[Dict[str, Union[int, str]]]]]]:
def getPorts(self) -> Dict[str, Optional[Union[int, Tuple[str, int], List[Union[int, Tuple[str, int], Dict[str, Union[int, str]]]]]]]:
"""Ports config getter"""
return self.__ports

Expand Down Expand Up @@ -1234,26 +1239,46 @@ def addUserDevice(self, user_device_config: str):
self.__addDevice(user_device_config)

def addRawPort(self, user_test_port: str):
"""Add port config from user input.
Format must be [<host_ipv4>:]<host_port>[:<container_port>][:<protocol>]
"""Add port config or range of ports from user input.
Format must be [<host_ipv4>:]<host_port>[-<end_host_port>][:<container_port>[-<end_container_port>]][:<protocol>]
If host_ipv4 is not set, default to 0.0.0.0
If container_port is not set, default is the same as host port
If container_port is not set, the same port(s) as host port(s) will be used
If protocol is not set, default is 'tcp'"""
match = re.search(r"^((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}):)?(\d+):?(\d+)?:?(udp|tcp|sctp)?$", user_test_port)
# Regex to capture port ranges and protocols correctly
match = re.search(r"^((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}):)?(\d+)(-(\d+))?:?(\d+)?(-(\d+))?:?(udp|tcp|sctp)?$", user_test_port)
if match is None:
logger.critical(f"Incorrect port syntax ({user_test_port}). Please use this format: [green][<host_ipv4>:]<host_port>[:<container_port>][:<protocol>][/green]")
logger.critical(f"Incorrect port syntax ({user_test_port}). Please use this format: '{SyntaxFormat.port_sharing}'")
return
host_ip = "0.0.0.0" if match.group(2) is None else match.group(2)
protocol = "tcp" if match.group(5) is None else match.group(5)
protocol = match.group(9) if match.group(9) else 'tcp'
try:
host_port = int(match.group(3))
container_port = host_port if match.group(4) is None else int(match.group(4))
if host_port > 65535 or container_port > 65535:
raise ValueError
except ValueError:
logger.critical(f"The syntax for opening prot in NAT is incorrect. The ports must be numbers between 0 and 65535. ({match.group(3)}:{match.group(4)})")
start_host_port = int(match.group(3))
end_host_port = int(match.group(5)) if match.group(5) else start_host_port
start_container_port_defined = match.group(6) is not None
end_container_port_defined = match.group(8) is not None
start_container_port = int(match.group(6)) if start_container_port_defined else start_host_port
# If start_container_port is not defined, use end_host_port, otherwise use start_container_port or end_container_port if defined
if not start_container_port_defined:
end_container_port = end_host_port
else:
end_container_port = int(match.group(8)) if match.group(8) else start_container_port
# check port consistency
if (len(range(start_host_port, end_host_port)) != len(range(start_container_port, end_container_port))) or (
start_host_port != end_host_port and (not start_container_port_defined and end_container_port_defined)) or (
start_host_port != end_host_port and (start_container_port_defined and not end_container_port_defined)):
logger.info(
f"Port sharing configuration does not respect standard usage ({user_test_port}). The configuration in the 'Container sumamry' below will be applied. Please consult the help section for more information on using the -p/--port option.")
# Check if start port is lower than end port
if end_host_port < start_host_port or end_container_port < start_container_port:
raise ValueError("End port cannot be less than start port.")
# Check if any port in the range exceeds the valid range
if end_host_port > 65535 or end_container_port > 65535:
raise ValueError(f"The syntax for opening port in NAT is incorrect. The ports must be numbers between 0 and 65535. ({end_host_port}:{end_container_port})")
except ValueError as e:
logger.critical(e)
return
self.addPort(host_port, container_port, protocol=protocol, host_ip=host_ip)
for host_port, container_port in zip(range(start_host_port, end_host_port + 1), range(start_container_port, end_container_port + 1)):
self.addPort(host_port, container_port, protocol=protocol, host_ip=host_ip)

def addRawEnv(self, env: str):
"""Parse and add an environment variable from raw user input"""
Expand Down Expand Up @@ -1375,35 +1400,109 @@ def getTextPorts(self) -> str:
Dict Port key = container port/protocol
Dict Port Values:
None = Random port
int = open port ont he host
int = open port on the host
tuple = (host_ip, port)
list of int = open multiple host port
list of dict = open one or more ports on host, key ('HostIp' / 'HostPort') and value ip or port"""
result = ''
# TODO if network bridge and container not started, ports config cannot be printed: add a user warning message

start_host_ip = None
start_host_port = None
previous_host_port: Optional[Union[str, int]] = None

start_container_protocole = None
start_container_port = None
previous_container_port = None

previous_entry = None

for container_config, host_config in self.__ports.items():
host_info = "Unknown"
# Parse config
current_container_port = int(container_config.split('/')[0])
current_container_protocole = container_config.split('/')[-1]
# We might have multiple host context config at the same time for the same container config
current_host_contexts: List[Dict[str, Union[str, int]]] = []
# Init range context, container side
if start_container_port is None:
start_container_port = current_container_port
previous_container_port = current_container_port
start_container_protocole = current_container_protocole

# Parse host config multiple format
if host_config is None:
host_info = "0.0.0.0:<Random port>"
elif type(host_config) is int:
host_info = f"0.0.0.0:{host_config}"
elif type(host_config) is tuple:
assert len(host_config) == 2
host_info = f"{host_config[0]}:{host_config[1]}"
elif type(host_config) is list:
sub_info = []
for sub_host_config in host_config:
if type(sub_host_config) is int:
sub_info.append(f"0.0.0.0:{sub_host_config}")
elif type(sub_host_config) is dict:
sub_port = sub_host_config.get('HostPort', '<Random port>')
current_host_contexts.append({"ip": "0.0.0.0",
"port": "<Random port>"})
else:
if type(host_config) is list:
host_configs: List[Union[int, Tuple[str, int], Dict[str, Union[int, str]]]] = host_config
else:
host_configs = cast(List[Union[int, Tuple[str, int], Dict[str, Union[int, str]]]], [host_config])

for current_host_config in host_configs:
if type(current_host_config) is int:
current_host_contexts.append({"ip": "0.0.0.0",
"port": current_host_config})
elif type(current_host_config) is tuple:
assert len(current_host_config) == 2
current_host_contexts.append({"ip": current_host_config[0],
"port": int(current_host_config[1])})
elif type(current_host_config) is dict:
sub_port = current_host_config.get('HostPort')
if sub_port is None:
sub_port = "<Random port>"
sub_info.append(f"{sub_host_config.get('HostIp', '0.0.0.0')}:{sub_port}")
if len(sub_info) > 0:
host_info = ", ".join(sub_info)
else:
logger.debug(f"Unknown port config: {type(host_config)}={host_config}")
result += f"{host_info} :right_arrow: {container_config}{os.linesep}"
elif type(sub_port) is str:
sub_port = int(sub_port)
current_host_contexts.append({"ip": current_host_config.get('HostIp', '0.0.0.0'),
"port": sub_port})
else:
logger.debug(f"Unknown port config: {type(host_config)}={host_config} :right_arrow: {container_config}")
continue

for current_context in current_host_contexts:
current_host_port = current_context.get("port")
current_host_ip = current_context.get('ip')

# Init range context
if start_host_port is None:
start_host_port = current_host_port
previous_host_port = current_host_port
start_host_ip = current_host_ip
# Check if range continue
elif (start_host_ip == current_host_ip and
current_container_protocole == start_container_protocole and
(current_host_port == previous_host_port or
current_host_port == previous_host_port + 1) and
(current_container_port == previous_container_port or
current_container_port == previous_container_port + 1)):
previous_host_port = current_host_port
previous_container_port = current_container_port
# If range exit, submit previous entry + reset new range context
else:
# Register previous range
if previous_entry:
result += previous_entry
# reset context host and container side
start_host_port = current_host_port
previous_host_port = current_host_port
start_host_ip = current_host_ip
start_container_port = current_container_port
previous_container_port = current_container_port
start_container_protocole = current_container_protocole

# Register last range
range_host_port = ""
if type(start_host_port) is int:
assert type(previous_host_port) is int
range_host_port = "" if previous_host_port - start_host_port <= 0 else f"-{previous_host_port}"
range_container_port = "" if previous_container_port - start_container_port <= 0 else f"-{previous_container_port}"
previous_entry = (f"{start_host_ip}:{start_host_port}{range_host_port} :right_arrow: "
f"{start_container_port}{range_container_port}/{start_container_protocole}{os.linesep}")

# Submit last entry is any
if previous_entry:
result += previous_entry

return result

def __str__(self):
Expand Down

0 comments on commit eeede32

Please sign in to comment.