Skip to content

Commit

Permalink
Merge pull request #10 from flyingcircusio/PL-131744
Browse files Browse the repository at this point in the history
migration: fix crash due to failed heartbeats not propagating
  • Loading branch information
ctheune authored Nov 20, 2023
2 parents 6c56e40 + a59afaf commit e053a0c
Show file tree
Hide file tree
Showing 11 changed files with 344 additions and 125 deletions.
19 changes: 18 additions & 1 deletion CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,25 @@ Release notes
1.4.1 (unreleased)
------------------

- Nothing changed yet.
- Live migration: fix crash due to failed heartbeats not propagating
tests: reduce test flakiness.

- Introduce `pytest-patterns` as a replacement for the current
use of Ellipsis.

- Increase general test timeouts so tests are a bit less flaky.

- Improve some lock logging to make it easier to understand and
debug when reading the locks and I think this also fixes some race
conditions.

This also removes a file descriptor leak that was found in testing.

- Fix VM kill handling to include our little supervisor process and
selectively decide whether to kill it or not.

- Add more testing output about the environment in case something goes
wrong in the tests and we need to debug it.

1.4.0 (2023-10-12)
------------------
Expand Down
2 changes: 1 addition & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[pytest]
addopts = --tb=native --timeout=3 --cov=src --cov-report=html --junitxml=/tmp/fc.qemu-report.xml
addopts = --tb=native --timeout=20 --cov=src --cov-report=html --junitxml=/tmp/fc.qemu-report.xml

markers =
slow: mark test as slow.
Expand Down
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
"setuptools",
"structlog>=16.1.0",
],
tests_require=[
"pytest-patterns",
],
entry_points={
"console_scripts": [
"fc-qemu = fc.qemu.main:main",
Expand Down
27 changes: 16 additions & 11 deletions src/fc/qemu/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,22 +203,24 @@ def locked_func(self, *args, **kw):
mode="nonblocking",
)
return os.EX_TEMPFAIL
self._lock_count += 1
self.log.debug(
"acquire-lock", target=self.lockfile, result="locked"
"acquire-lock",
target=self.lockfile,
result="locked",
count=self._lock_count,
)

self._lock_count += 1
self.log.debug("lock-status", count=self._lock_count)
try:
return f(self, *args, **kw)
finally:
self._lock_count -= 1
self.log.debug("lock-status", count=self._lock_count)

self.log.debug(
"release-lock",
target=self.lockfile,
count=self._lock_count,
)
if self._lock_count == 0:
# New
try:
self.log.debug("release-lock", target=self.lockfile)
fcntl.flock(self._lockfile_fd, fcntl.LOCK_UN)
self.log.debug(
"release-lock",
Expand All @@ -232,7 +234,8 @@ def locked_func(self, *args, **kw):
target=self.lockfile,
result="error",
)
pass
os.close(self._lockfile_fd)
self._lockfile_fd = None

return locked_func

Expand Down Expand Up @@ -1005,7 +1008,7 @@ def ensure_(self):
self.log.error(
"inconsistent-state", action="destroy", exc_info=True
)
self.qemu.destroy()
self.qemu.destroy(kill_supervisor=True)

def ensure_offline(self):
if self.qemu.is_running():
Expand Down Expand Up @@ -1335,7 +1338,7 @@ def restart(self):
def kill(self):
self.log.info("kill-vm")
timeout = TimeOut(15, interval=1, raise_on_timeout=True)
self.qemu.destroy()
self.qemu.destroy(kill_supervisor=True)
while timeout.tick():
if not self.qemu.is_running():
self.log.info("killed-vm")
Expand Down Expand Up @@ -1396,6 +1399,8 @@ def outmigrate(self):
client = Outgoing(self)
exitcode = client()
if not exitcode:
# XXX I think this can lead to inconsistent behaviour
# if the local VM is destroyed during migration?
self.consul_deregister()
self.log.info("outmigrate-finished", exitcode=exitcode)
return exitcode
Expand Down
61 changes: 58 additions & 3 deletions src/fc/qemu/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import shutil
import subprocess
import sys
import time
import traceback

import pkg_resources
Expand Down Expand Up @@ -38,7 +39,7 @@ def setup_structlog():
from . import util

# set to True to temporarily get detailed tracebacks
log_exceptions = False
log_exceptions = True

def test_logger(logger, method_name, event):
stack = event.pop("stack", None)
Expand Down Expand Up @@ -119,16 +120,70 @@ def pytest_assertrepr_compare(op, left, right):
@pytest.fixture
def clean_environment():
def clean():
subprocess.call("pkill -f qemu", shell=True)
subprocess.call(["pkill", "-f", "supervised-qemu"])
subprocess.call(["pkill", "-f", "qemu-system-x86_64"])
subprocess.call("rbd rm rbd.ssd/simplevm.swap", shell=True)
subprocess.call("rbd snap purge rbd.ssd/simplevm.root", shell=True)
subprocess.call("rbd rm rbd.ssd/simplevm.root", shell=True)
subprocess.call("rbd rm rbd.ssd/simplevm.tmp", shell=True)
time.sleep(1)

clean()

print(
subprocess.check_output(
"rbd ls rbd.ssd",
shell=True,
).decode("ascii")
)

yield
clean()

print(
subprocess.check_output(
"free && sync && echo 3 > /proc/sys/vm/drop_caches && free",
shell=True,
).decode("ascii")
)
print(
subprocess.check_output(
"ceph df",
shell=True,
).decode("ascii")
)
print(
subprocess.check_output(
"ps auxf",
shell=True,
).decode("ascii")
)
print(
subprocess.check_output(
"df -h",
shell=True,
).decode("ascii")
)
print(
subprocess.check_output(
"rbd showmapped",
shell=True,
).decode("ascii")
)


@pytest.fixture(autouse=True)
def clean_tmpdir_with_flakefinder(tmpdir, pytestconfig):
"""The `tmpdir` is normally not cleaned out for debugging purposes.
Running with flakefinder causes the tmpdir to quickly grow too big.
So, if flakefinder is active, we clean it out to avoid running out of
disk space.
"""
yield
if pytestconfig.getoption("flake_finder_enable") > 0:
shutil.rmtree(tmpdir)


@pytest.fixture
def vm(clean_environment, monkeypatch, tmpdir):
Expand All @@ -146,7 +201,7 @@ def vm(clean_environment, monkeypatch, tmpdir):
vm.qemu.vm_expected_overhead = 128
for snapshot in vm.ceph.root.snapshots:
snapshot.remove()
vm.qemu.destroy()
vm.qemu.destroy(kill_supervisor=True)
vm.unlock()
get_log()
yield vm
Expand Down
2 changes: 0 additions & 2 deletions src/fc/qemu/ellipsis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@


class Report(object):

matches = None

def __init__(self):
Expand Down Expand Up @@ -49,7 +48,6 @@ def match(pattern, line):


class Ellipsis(object):

# other = other.replace('\t', ' '*8) oder allgemein white-space unsensibel
# multi-line support

Expand Down
31 changes: 27 additions & 4 deletions src/fc/qemu/hazmat/qemu.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def detect_current_machine_type(

def locked_global(f):
LOCK = "/run/fc-qemu.lock"

# This is thread-safe *AS LONG* as every thread uses a separate instance
# of the agent. Using multiple file descriptors will guarantee that the
# lock can only be held once even within a single process.
Expand Down Expand Up @@ -85,7 +86,6 @@ def locked(self, *args, **kw):


class Qemu(object):

prefix = ""
executable = "qemu-system-x86_64"

Expand Down Expand Up @@ -576,20 +576,43 @@ def graceful_shutdown(self):
],
)

def destroy(self):
def destroy(self, kill_supervisor=False):
# We use this destroy command in "fire-and-forget"-style because
# sometimes the init script will complain even if we achieve what
# we want: that the VM isn't running any longer. We check this
# by contacting the monitor instead.
timeout = TimeOut(100, interval=1, raise_on_timeout=True)
p = self.proc()
if not p:
return

# Check whether the parent is the supervising process.
# Kill that one first so we avoid immediate restarts.
if kill_supervisor:
parent = p.parent()
if "supervised-qemu-wrapped" in parent.cmdline()[1]:
# Do not raise on timeout so we get a chance to actually kill
# the VM even if killing the supervisor fails.
timeout = TimeOut(100, interval=2, raise_on_timeout=False)
attempt = 0
while parent.is_running() and timeout.tick():
attempt += 1
self.log.debug(
"vm-destroy-kill-supervisor", attempt=attempt
)
try:
parent.terminate()
except psutil.NoSuchProcess:
break

timeout = TimeOut(100, interval=2, raise_on_timeout=True)
attempt = 0
while p.is_running() and timeout.tick():
attempt += 1
self.log.debug("vm-destroy-kill-vm", attempt=attempt)
try:
p.terminate()
except psutil.NoSuchProcess:
pass
break

def resize_root(self, size):
self.qmp.command("block_resize", device="virtio0", size=size)
Expand Down
6 changes: 6 additions & 0 deletions src/fc/qemu/incoming.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ def run(self):
if self.finished == "success":
return 0
else:
# Do not kill the supervisor. We will be giving up here, but the
# supervisor will restart the process, potentially with updated
# config data so we get a "free" retry.
self.qemu.destroy()
return 1

Expand Down Expand Up @@ -229,6 +232,9 @@ def destroy(self):
log.info("destroying", machine=self.name)
self.finished = "destroyed"
try:
# Do not kill the supervisor. We will be giving up here, but the
# supervisor will restart the process, potentially with updated
# config data so we get a "free" retry.
self.qemu.destroy()
except QemuNotRunning:
pass
Expand Down
22 changes: 16 additions & 6 deletions src/fc/qemu/outgoing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class Heartbeat(object):
"""

cookie = None
failed = False

def __init__(self, log):
self.thread = threading.Thread(target=self.run)
Expand Down Expand Up @@ -48,13 +49,18 @@ def run(self):
self.connection.ping(self.cookie)
except Exception:
self.log.exception("ping-failed", exc_info=True)
self.failed = True
time.sleep(10)
finally:
self.log.debug("stopped-heartbeat-ping")

def propagate(self):
if self.failed:
self.log.warning("heartbeat-propagate")
raise RuntimeError("Heartbeat failed.")

class Outgoing(object):

class Outgoing(object):
migration_exitcode = None
target = None
cookie = None
Expand Down Expand Up @@ -209,6 +215,7 @@ def acquire_migration_locks(self):
if self.agent.has_new_config():
self.target.cancel(self.cookie)
raise ConfigChanged()
self.heartbeat.propagate()

# Try to acquire local lock
if self.agent.qemu.acquire_migration_lock():
Expand All @@ -230,6 +237,7 @@ def acquire_migration_locks(self):
self.log.debug(
"acquire-remote-migration-lock", result="success"
)
self.heartbeat.propagate()
except Exception:
self.log.exception(
"acquire-remote-migration-lock",
Expand Down Expand Up @@ -264,6 +272,7 @@ def migrate(self):
mbps=mbps,
output=pprint.pformat(stat),
)
self.heartbeat.propagate()
except Exception:
self.log.exception("error-waiting-for-migration", exc_info=True)
raise
Expand All @@ -272,12 +281,13 @@ def migrate(self):
assert not status["running"], status
assert status["status"] == "postmigrate", status
self.log.info("finish-migration")
self.destroy(kill_supervisor=True)
try:
self.heartbeat.stop()
self.log.info("finish-remote")
self.target.finish_incoming(self.cookie)
except Exception:
self.log.exception("error-finish-remote", exc_info=True)
self.agent.qemu.destroy()
self.heartbeat.stop()

def rescue(self):
"""Outgoing rescue: try to rescue the remote side first."""
Expand All @@ -287,7 +297,7 @@ def rescue(self):
self.target.rescue(self.cookie)
self.target.finish_incoming(self.cookie)
self.log.info("rescue-remote-success", action="destroy local")
self.destroy()
self.destroy(kill_supervisor=True)
# We managed to rescue on the remote side - hooray!
self.migration_exitcode = 0
return
Expand Down Expand Up @@ -317,6 +327,6 @@ def rescue(self):
else:
self.log.info("continue-locally", result="success")

def destroy(self):
self.agent.qemu.destroy()
def destroy(self, kill_supervisor=False):
self.agent.qemu.destroy(kill_supervisor)
self.agent.qemu.clean_run_files()
Loading

0 comments on commit e053a0c

Please sign in to comment.