From 77bc0dc80721e8b6dc92ebf2308b2dccb0546e2e Mon Sep 17 00:00:00 2001 From: charlie4284 Date: Sat, 27 Jan 2024 18:57:25 +0800 Subject: [PATCH 01/13] fix: wait for machine address to be ready --- juju/machine.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/juju/machine.py b/juju/machine.py index 60554fd09..4534e331b 100644 --- a/juju/machine.py +++ b/juju/machine.py @@ -6,11 +6,11 @@ import pyrfc3339 -from . import model, tag, jasyncio +from . import jasyncio, model, tag from .annotationhelper import _get_annotations, _set_annotations from .client import client from .errors import JujuError -from juju.utils import juju_ssh_key_paths +from juju.utils import juju_ssh_key_paths, block_until log = logging.getLogger(__name__) @@ -82,7 +82,8 @@ async def scp_to(self, source, destination, user='ubuntu', proxy=False, """ if proxy: raise NotImplementedError('proxy option is not implemented') - + if not self.dns_name: + raise JujuError("Machine address not yet ready, please call await machine.wait()") try: # if dns_name is an IP address format it appropriately address = self._format_addr(self.dns_name) @@ -105,7 +106,8 @@ async def scp_from(self, source, destination, user='ubuntu', proxy=False, """ if proxy: raise NotImplementedError('proxy option is not implemented') - + if not self.dns_name: + raise JujuError("Machine address not yet ready, please call await machine.wait()") try: # if dns_name is an IP address format it appropriately address = self._format_addr(self.dns_name) @@ -147,6 +149,8 @@ async def ssh( if proxy: raise NotImplementedError('proxy option is not implemented') address = self.dns_name + if not address: + raise JujuError("Machine address not yet ready, please call await machine.wait()") destination = "{}@{}".format(user, address) _, id_path = juju_ssh_key_paths() cmd = [ @@ -167,6 +171,14 @@ async def ssh( # stdout is a bytes-like object, returning a string might be more useful return stdout.decode() + async def wait(self, timeout: int=300) -> None: + """Waits until the machie is ready to take ssh/scp commands. + + :param int timeout: Timeout in seconds to wait for. + """ + await block_until(lambda: self.safe_data["addresses"] and + self.agent_status == "started", timeout=timeout) + @property def agent_status(self): """Returns the current Juju agent status string. From 0dc32a5fccbe64ec6bab57c67d113fcad3f5c89a Mon Sep 17 00:00:00 2001 From: charlie4284 Date: Sat, 27 Jan 2024 19:07:52 +0800 Subject: [PATCH 02/13] fix: assertion --- tests/integration/test_machine.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/integration/test_machine.py b/tests/integration/test_machine.py index ca5959b17..b35046f81 100644 --- a/tests/integration/test_machine.py +++ b/tests/integration/test_machine.py @@ -7,6 +7,7 @@ import pytest from .. import base +from juju.machine import Machine @base.bootstrapped @@ -65,3 +66,12 @@ async def test_scp(event_loop): with NamedTemporaryFile() as f: await machine.scp_from('testfile', f.name, scp_opts='-p') assert f.read() == b'testcontents' + + +async def test_machine_ssh(): + async with base.CleanModel() as model: + machine: Machine = await model.add_machine() + await machine.wait() + out = await machine.ssh("echo hello world!") + + assert out == "hello world!\n" From e1b84108b6b7314ce45f67e5fd8503b23d3109eb Mon Sep 17 00:00:00 2001 From: charlie4284 Date: Sat, 27 Jan 2024 19:21:49 +0800 Subject: [PATCH 03/13] fix: lint --- juju/machine.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/juju/machine.py b/juju/machine.py index 4534e331b..0b1ff5c22 100644 --- a/juju/machine.py +++ b/juju/machine.py @@ -171,12 +171,12 @@ async def ssh( # stdout is a bytes-like object, returning a string might be more useful return stdout.decode() - async def wait(self, timeout: int=300) -> None: + async def wait(self, timeout: int = 300) -> None: """Waits until the machie is ready to take ssh/scp commands. - + :param int timeout: Timeout in seconds to wait for. """ - await block_until(lambda: self.safe_data["addresses"] and + await block_until(lambda: self.safe_data["addresses"] and self.agent_status == "started", timeout=timeout) @property From a7d9e48408753973bd8aa540e79dd7d9d5a34c1b Mon Sep 17 00:00:00 2001 From: charlie4284 Date: Sat, 27 Jan 2024 19:29:05 +0800 Subject: [PATCH 04/13] fix: typo in docstring --- juju/machine.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/juju/machine.py b/juju/machine.py index 0b1ff5c22..c81cbce65 100644 --- a/juju/machine.py +++ b/juju/machine.py @@ -172,9 +172,9 @@ async def ssh( return stdout.decode() async def wait(self, timeout: int = 300) -> None: - """Waits until the machie is ready to take ssh/scp commands. + """Waits until the machine is ready to take ssh/scp commands. - :param int timeout: Timeout in seconds to wait for. + :param int timeout: Timeout in seconds. """ await block_until(lambda: self.safe_data["addresses"] and self.agent_status == "started", timeout=timeout) From ef5edf76118e1bb93434f1c754ef7478c5f37f7b Mon Sep 17 00:00:00 2001 From: charlie4284 Date: Wed, 31 Jan 2024 18:02:25 +0800 Subject: [PATCH 05/13] fix: use self.addresses --- juju/machine.py | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/juju/machine.py b/juju/machine.py index c81cbce65..1545dd735 100644 --- a/juju/machine.py +++ b/juju/machine.py @@ -70,7 +70,7 @@ def _format_addr(self, addr): return fmt.format(ipaddr) async def scp_to(self, source, destination, user='ubuntu', proxy=False, - scp_opts=''): + scp_opts='', wait_for_active=True, timeout=None): """Transfer files to this machine. :param str source: Local path of file(s) to transfer @@ -79,11 +79,13 @@ async def scp_to(self, source, destination, user='ubuntu', proxy=False, :param bool proxy: Proxy through the Juju API server :param scp_opts: Additional options to the `scp` command :type scp_opts: str or list + :param bool wait_for_active: Wait until the machine is ready to take in ssh commands. + :param int timeout: Time in seconds to wait until the machine becomes ready. """ if proxy: raise NotImplementedError('proxy option is not implemented') - if not self.dns_name: - raise JujuError("Machine address not yet ready, please call await machine.wait()") + if wait_for_active: + await block_until(lambda: self.addresses, timeout=timeout) try: # if dns_name is an IP address format it appropriately address = self._format_addr(self.dns_name) @@ -94,7 +96,7 @@ async def scp_to(self, source, destination, user='ubuntu', proxy=False, await self._scp(source, destination, scp_opts) async def scp_from(self, source, destination, user='ubuntu', proxy=False, - scp_opts=''): + scp_opts='', wait_for_active=True, timeout=None): """Transfer files from this machine. :param str source: Remote path of file(s) to transfer @@ -103,11 +105,13 @@ async def scp_from(self, source, destination, user='ubuntu', proxy=False, :param bool proxy: Proxy through the Juju API server :param scp_opts: Additional options to the `scp` command :type scp_opts: str or list + :param bool wait_for_active: Wait until the machine is ready to take in ssh commands. + :param int timeout: Time in seconds to wait until the machine becomes ready. """ if proxy: raise NotImplementedError('proxy option is not implemented') - if not self.dns_name: - raise JujuError("Machine address not yet ready, please call await machine.wait()") + if wait_for_active: + await block_until(lambda: self.addresses, timeout=timeout) try: # if dns_name is an IP address format it appropriately address = self._format_addr(self.dns_name) @@ -137,20 +141,21 @@ async def _scp(self, source, destination, scp_opts): raise JujuError("command failed: %s" % cmd) async def ssh( - self, command, user='ubuntu', proxy=False, ssh_opts=None): + self, command, user='ubuntu', proxy=False, ssh_opts=None,wait_for_active=True, timeout=None): """Execute a command over SSH on this machine. :param str command: Command to execute :param str user: Remote username :param bool proxy: Proxy through the Juju API server :param str ssh_opts: Additional options to the `ssh` command - + :param bool wait_for_active: Wait until the machine is ready to take in ssh commands. + :param int timeout: Time in seconds to wait until the machine becomes ready. """ if proxy: raise NotImplementedError('proxy option is not implemented') address = self.dns_name - if not address: - raise JujuError("Machine address not yet ready, please call await machine.wait()") + if wait_for_active: + await block_until(lambda: self.addresses, timeout=timeout) destination = "{}@{}".format(user, address) _, id_path = juju_ssh_key_paths() cmd = [ @@ -171,13 +176,14 @@ async def ssh( # stdout is a bytes-like object, returning a string might be more useful return stdout.decode() - async def wait(self, timeout: int = 300) -> None: - """Waits until the machine is ready to take ssh/scp commands. - - :param int timeout: Timeout in seconds. + + @property + def addresses(self) -> typing.List[str]: + """Returns the machine addresses. + """ - await block_until(lambda: self.safe_data["addresses"] and - self.agent_status == "started", timeout=timeout) + return self.safe_data['addresses'] or [] + @property def agent_status(self): @@ -233,11 +239,10 @@ def dns_name(self): May return None if no suitable address is found. """ - addresses = self.safe_data['addresses'] or [] ordered_addresses = [] ordered_scopes = ['public', 'local-cloud', 'local-fan'] for scope in ordered_scopes: - for address in addresses: + for address in self.addresses: if scope == address['scope']: ordered_addresses.append(address) for address in ordered_addresses: From 98b6d445d370d1b1f52d78766b28be8e07dba926 Mon Sep 17 00:00:00 2001 From: charlie4284 Date: Thu, 1 Feb 2024 11:09:56 +0800 Subject: [PATCH 06/13] fix: import typing --- juju/machine.py | 1 + 1 file changed, 1 insertion(+) diff --git a/juju/machine.py b/juju/machine.py index 1545dd735..2b3a8e7e6 100644 --- a/juju/machine.py +++ b/juju/machine.py @@ -3,6 +3,7 @@ import ipaddress import logging +import typing import pyrfc3339 From f163603fa6fdbc1ed1496bc94b6eca3d763bee67 Mon Sep 17 00:00:00 2001 From: charlie4284 Date: Thu, 1 Feb 2024 11:10:57 +0800 Subject: [PATCH 07/13] fix: lint --- juju/machine.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/juju/machine.py b/juju/machine.py index 2b3a8e7e6..c82e66a3c 100644 --- a/juju/machine.py +++ b/juju/machine.py @@ -142,7 +142,7 @@ async def _scp(self, source, destination, scp_opts): raise JujuError("command failed: %s" % cmd) async def ssh( - self, command, user='ubuntu', proxy=False, ssh_opts=None,wait_for_active=True, timeout=None): + self, command, user='ubuntu', proxy=False, ssh_opts=None, wait_for_active=True, timeout=None): """Execute a command over SSH on this machine. :param str command: Command to execute @@ -177,15 +177,13 @@ async def ssh( # stdout is a bytes-like object, returning a string might be more useful return stdout.decode() - @property def addresses(self) -> typing.List[str]: """Returns the machine addresses. - + """ return self.safe_data['addresses'] or [] - @property def agent_status(self): """Returns the current Juju agent status string. From d7c7f2a1f0b3b9f4be1b8601ae78a7fd5ba8cfc3 Mon Sep 17 00:00:00 2001 From: charlie4284 Date: Thu, 1 Feb 2024 11:23:00 +0800 Subject: [PATCH 08/13] fix: tests --- juju/machine.py | 2 +- tests/integration/test_machine.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/juju/machine.py b/juju/machine.py index c82e66a3c..db596710d 100644 --- a/juju/machine.py +++ b/juju/machine.py @@ -154,9 +154,9 @@ async def ssh( """ if proxy: raise NotImplementedError('proxy option is not implemented') - address = self.dns_name if wait_for_active: await block_until(lambda: self.addresses, timeout=timeout) + address = self.dns_name destination = "{}@{}".format(user, address) _, id_path = juju_ssh_key_paths() cmd = [ diff --git a/tests/integration/test_machine.py b/tests/integration/test_machine.py index b35046f81..a68cbead3 100644 --- a/tests/integration/test_machine.py +++ b/tests/integration/test_machine.py @@ -71,7 +71,6 @@ async def test_scp(event_loop): async def test_machine_ssh(): async with base.CleanModel() as model: machine: Machine = await model.add_machine() - await machine.wait() out = await machine.ssh("echo hello world!") assert out == "hello world!\n" From b2fd4de09fa4a0a2953073f3522fbe600a184c1e Mon Sep 17 00:00:00 2001 From: charlie4284 Date: Thu, 22 Feb 2024 21:10:34 +0800 Subject: [PATCH 09/13] test: remove redundant test & use wait_for_active --- tests/integration/test_machine.py | 30 +----------------------------- 1 file changed, 1 insertion(+), 29 deletions(-) diff --git a/tests/integration/test_machine.py b/tests/integration/test_machine.py index 16c5e583f..67fd8cfb5 100644 --- a/tests/integration/test_machine.py +++ b/tests/integration/test_machine.py @@ -39,37 +39,9 @@ async def test_status(): timeout=480) -@base.bootstrapped -async def test_scp(event_loop): - # ensure that asyncio.subprocess will work; - try: - asyncio.get_child_watcher().attach_loop(event_loop) - except RuntimeError: - pytest.skip('test_scp will always fail outside of MainThread') - async with base.CleanModel() as model: - await model.add_machine() - await asyncio.wait_for( - model.block_until(lambda: model.machines), - timeout=240) - machine = model.machines['0'] - await asyncio.wait_for( - model.block_until(lambda: (machine.status == 'running' and - machine.agent_status == 'started')), - timeout=480) - - with NamedTemporaryFile() as f: - f.write(b'testcontents') - f.flush() - await machine.scp_to(f.name, 'testfile', scp_opts='-p') - - with NamedTemporaryFile() as f: - await machine.scp_from('testfile', f.name, scp_opts='-p') - assert f.read() == b'testcontents' - - async def test_machine_ssh(): async with base.CleanModel() as model: machine: Machine = await model.add_machine() - out = await machine.ssh("echo hello world!") + out = await machine.ssh("echo hello world!", wait_for_active=True) assert out == "hello world!\n" From 92ab788133d3cb63c9b95efb3ade9438d867f8a5 Mon Sep 17 00:00:00 2001 From: charlie4284 Date: Mon, 4 Mar 2024 13:28:12 +0800 Subject: [PATCH 10/13] chore: change default value of wait_for_actie --- juju/machine.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/juju/machine.py b/juju/machine.py index db596710d..6e71bcc58 100644 --- a/juju/machine.py +++ b/juju/machine.py @@ -71,7 +71,7 @@ def _format_addr(self, addr): return fmt.format(ipaddr) async def scp_to(self, source, destination, user='ubuntu', proxy=False, - scp_opts='', wait_for_active=True, timeout=None): + scp_opts='', wait_for_active=False, timeout=None): """Transfer files to this machine. :param str source: Local path of file(s) to transfer @@ -97,7 +97,7 @@ async def scp_to(self, source, destination, user='ubuntu', proxy=False, await self._scp(source, destination, scp_opts) async def scp_from(self, source, destination, user='ubuntu', proxy=False, - scp_opts='', wait_for_active=True, timeout=None): + scp_opts='', wait_for_active=False, timeout=None): """Transfer files from this machine. :param str source: Remote path of file(s) to transfer @@ -142,7 +142,7 @@ async def _scp(self, source, destination, scp_opts): raise JujuError("command failed: %s" % cmd) async def ssh( - self, command, user='ubuntu', proxy=False, ssh_opts=None, wait_for_active=True, timeout=None): + self, command, user='ubuntu', proxy=False, ssh_opts=None, wait_for_active=False, timeout=None): """Execute a command over SSH on this machine. :param str command: Command to execute From 57b27a7c899c3e01b6491f698bc12fd11eb79a4d Mon Sep 17 00:00:00 2001 From: Caner Derici Date: Mon, 4 Mar 2024 12:33:56 -0700 Subject: [PATCH 11/13] Add retry for the machine ssh There's a bit of a gap between the time that the machine is assigned an IP and the ssh service is up and listening, which creates a race for the ssh command (i.e. we may run the ssh command after the machine gets the IP but before the ssh service is up). So we retry a couple of times to mitigate that effect until either the ssh command succeeds, or we run out of attempts (i.e. something else is going on). --- juju/machine.py | 36 +++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/juju/machine.py b/juju/machine.py index 6e71bcc58..0c3a2f242 100644 --- a/juju/machine.py +++ b/juju/machine.py @@ -136,10 +136,21 @@ async def _scp(self, source, destination, scp_opts): ] cmd.extend(scp_opts.split() if isinstance(scp_opts, str) else scp_opts) cmd.extend([source, destination]) - process = await jasyncio.create_subprocess_exec(*cmd) - await process.wait() + # There's a bit of a gap between the time that the machine is assigned an IP and the ssh + # service is up and listening, which creates a race for the ssh command. So we retry a + # couple of times until either we run out of attempts, or the ssh command succeeds to + # mitigate that effect. + # TODO (cderici): refactor the ssh and scp subcommand processing into a single method. + retry_backoff = 1 + retries = 10 + for _ in range(retries): + process = await jasyncio.create_subprocess_exec(*cmd) + await process.wait() + if process.returncode == 0: + break + await jasyncio.sleep(retry_backoff) if process.returncode != 0: - raise JujuError("command failed: %s" % cmd) + raise JujuError(f"command failed after {retries} attempts: {cmd}") async def ssh( self, command, user='ubuntu', proxy=False, ssh_opts=None, wait_for_active=False, timeout=None): @@ -169,11 +180,22 @@ async def ssh( if ssh_opts: cmd.extend(ssh_opts.split() if isinstance(ssh_opts, str) else ssh_opts) cmd.extend([command]) - process = await jasyncio.create_subprocess_exec( - *cmd, stdout=jasyncio.subprocess.PIPE, stderr=jasyncio.subprocess.PIPE) - stdout, stderr = await process.communicate() + + # There's a bit of a gap between the time that the machine is assigned an IP and the ssh + # service is up and listening, which creates a race for the ssh command. So we retry a + # couple of times until either we run out of attempts, or the ssh command succeeds to + # mitigate that effect. + retry_backoff = 1 + retries = 10 + for _ in range(retries): + process = await jasyncio.create_subprocess_exec( + *cmd, stdout=jasyncio.subprocess.PIPE, stderr=jasyncio.subprocess.PIPE) + stdout, stderr = await process.communicate() + if process.returncode == 0: + break + await jasyncio.sleep(retry_backoff) if process.returncode != 0: - raise JujuError("command failed: %s with %s" % (cmd, stderr.decode())) + raise JujuError(f"command failed: {cmd} after {retries} attempts, with {stderr.decode()}") # stdout is a bytes-like object, returning a string might be more useful return stdout.decode() From 8092fa140c1ab5cdf26ba6b2c0991dda32e89b4e Mon Sep 17 00:00:00 2001 From: Caner Derici Date: Mon, 4 Mar 2024 13:27:43 -0700 Subject: [PATCH 12/13] Extend the retry backoff for machine ssh scp In a real world scenario, this will work in the first iteration (maybe the second after 2sec backoff), however the github workers are particularly slow so extending this to make the tests pass. --- juju/machine.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/juju/machine.py b/juju/machine.py index 0c3a2f242..b0d77ef65 100644 --- a/juju/machine.py +++ b/juju/machine.py @@ -141,7 +141,7 @@ async def _scp(self, source, destination, scp_opts): # couple of times until either we run out of attempts, or the ssh command succeeds to # mitigate that effect. # TODO (cderici): refactor the ssh and scp subcommand processing into a single method. - retry_backoff = 1 + retry_backoff = 2 retries = 10 for _ in range(retries): process = await jasyncio.create_subprocess_exec(*cmd) @@ -185,7 +185,7 @@ async def ssh( # service is up and listening, which creates a race for the ssh command. So we retry a # couple of times until either we run out of attempts, or the ssh command succeeds to # mitigate that effect. - retry_backoff = 1 + retry_backoff = 2 retries = 10 for _ in range(retries): process = await jasyncio.create_subprocess_exec( From 1f9b6b3b7df972baf4f7f262bae37253aa721295 Mon Sep 17 00:00:00 2001 From: Caner Derici Date: Mon, 4 Mar 2024 19:20:33 -0700 Subject: [PATCH 13/13] Run the test only if we have a bootstrapped environment --- tests/integration/test_machine.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/test_machine.py b/tests/integration/test_machine.py index 67fd8cfb5..1a6804152 100644 --- a/tests/integration/test_machine.py +++ b/tests/integration/test_machine.py @@ -39,6 +39,7 @@ async def test_status(): timeout=480) +@base.bootstrapped async def test_machine_ssh(): async with base.CleanModel() as model: machine: Machine = await model.add_machine()