From b3770edc307098418beae7875c87d1b5a29a1348 Mon Sep 17 00:00:00 2001 From: Andriy Gapon Date: Thu, 25 Jun 2015 19:10:13 +0300 Subject: [PATCH 01/23] zfs: reimplement Filesystem._exists using lzc_exists This is a start of the conversion of the ZFS backend to libzfs_core / pyzfs. --- flocker/volume/filesystems/zfs.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/flocker/volume/filesystems/zfs.py b/flocker/volume/filesystems/zfs.py index a28b229330..2f32fe0ea9 100644 --- a/flocker/volume/filesystems/zfs.py +++ b/flocker/volume/filesystems/zfs.py @@ -7,10 +7,12 @@ from __future__ import absolute_import import os +import libzfs_core + from contextlib import contextmanager from uuid import uuid4 from subprocess import ( - CalledProcessError, STDOUT, PIPE, Popen, check_call, check_output + STDOUT, PIPE, Popen, check_call, check_output ) from characteristic import attributes, with_cmp, with_repr @@ -198,11 +200,7 @@ def _exists(self): :return: ``True`` if there is a filesystem with this name, ``False`` otherwise. """ - try: - check_output([b"zfs", b"list", self.name], stderr=STDOUT) - except CalledProcessError: - return False - return True + return libzfs_core.lzc_exists(self.name) def snapshots(self): if self._exists(): From d6cbfaf0b86ac8ee16270fd5dd3a9050a1b0fcd1 Mon Sep 17 00:00:00 2001 From: Andriy Gapon Date: Fri, 17 Jul 2015 17:38:22 +0300 Subject: [PATCH 02/23] zfs: add utility code for running lzc functions in a thread pool The helper wraps lzc functions but also provides a method to schedule any function without creating a wrapper. Currently a thread pool of a reactor is used, but a dedicated pool could be used. Test reactor(s) without a thread pool are also supported. --- flocker/volume/filesystems/zfs.py | 65 ++++++++++++++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) diff --git a/flocker/volume/filesystems/zfs.py b/flocker/volume/filesystems/zfs.py index 2f32fe0ea9..b6c7bec468 100644 --- a/flocker/volume/filesystems/zfs.py +++ b/flocker/volume/filesystems/zfs.py @@ -9,6 +9,7 @@ import os import libzfs_core +from functools import wraps from contextlib import contextmanager from uuid import uuid4 from subprocess import ( @@ -24,8 +25,12 @@ from twisted.python.failure import Failure from twisted.python.filepath import FilePath from twisted.internet.endpoints import ProcessEndpoint, connectProtocol +from twisted.internet.interfaces import IReactorThreads from twisted.internet.protocol import Protocol -from twisted.internet.defer import Deferred, succeed, gatherResults +from twisted.internet.defer import ( + Deferred, succeed, gatherResults, maybeDeferred +) +from twisted.internet.threads import deferToThreadPool from twisted.internet.error import ConnectionDone, ProcessTerminated from twisted.application.service import Service @@ -69,6 +74,64 @@ def connectionLost(self, reason): del self._result +class _AsyncLZC(object): + def __init__(self, reactor): + self._reactor = reactor + self._cache = {} + + def callDeferred(self, func, *args, **kwargs): + return deferToThreadPool(self._reactor, self._reactor.getThreadPool(), + func, *args, **kwargs) + + def __getattr__(self, name): + try: + return self._cache[name] + except KeyError: + func = getattr(libzfs_core, name) + + @wraps(func) + def _async_wrapper(*args, **kwargs): + return self.callDeferred(func, *args, **kwargs) + + self._cache[name] = _async_wrapper + return self._cache[name] + + +class _FakeAsyncLZC(object): + def __init__(self): + self._cache = {} + + def callDeferred(self, func, *args, **kwargs): + return maybeDeferred(func, *args, **kwargs) + + def __getattr__(self, name): + try: + return self._cache[name] + except KeyError: + func = getattr(libzfs_core, name) + + @wraps(func) + def _async_wrapper(*args, **kwargs): + return maybeDeferred(func, *args, **kwargs) + + self._cache[name] = _async_wrapper + return self._cache[name] + + +_reactor_to_alzc = {} + + +def _async_lzc(reactor): + try: + return _reactor_to_alzc[reactor] + except KeyError: + if IReactorThreads.providedBy(reactor): + _reactor_to_alzc[reactor] = _AsyncLZC(reactor) + else: + _reactor_to_alzc[reactor] = _FakeAsyncLZC() + return _reactor_to_alzc[reactor] + + def zfs_command(reactor, arguments): """ Asynchronously run the ``zfs`` command-line tool with the given arguments. From e5531f6868ffb4f6ab4195bd7b28799322612cd7 Mon Sep 17 00:00:00 2001 From: Andriy Gapon Date: Tue, 30 Jun 2015 17:50:20 +0300 Subject: [PATCH 03/23] zfs: switch snapshot creation to lzc Also, remove all unit tests for snapshots. The unit tests were based on the fact that the operations were done via zfs command, so they checked that the command was spawned with the correct arguments and the output was correctly parsed. To do: invent the way to test the new implementation --- flocker/volume/filesystems/zfs.py | 3 +- flocker/volume/test/test_filesystems_zfs.py | 102 +------------------- 2 files changed, 3 insertions(+), 102 deletions(-) diff --git a/flocker/volume/filesystems/zfs.py b/flocker/volume/filesystems/zfs.py index b6c7bec468..f50bc39ceb 100644 --- a/flocker/volume/filesystems/zfs.py +++ b/flocker/volume/filesystems/zfs.py @@ -385,11 +385,12 @@ class ZFSSnapshots(object): def __init__(self, reactor, filesystem): self._reactor = reactor + self._async_lzc = _async_lzc(self._reactor) self._filesystem = filesystem def create(self, name): encoded_name = b"%s@%s" % (self._filesystem.name, name) - d = zfs_command(self._reactor, [b"snapshot", encoded_name]) + d = self._async_lzc.lzc_snapshot([encoded_name]) d.addCallback(lambda _: None) return d diff --git a/flocker/volume/test/test_filesystems_zfs.py b/flocker/volume/test/test_filesystems_zfs.py index c4b7dd6454..4c90fceadd 100644 --- a/flocker/volume/test/test_filesystems_zfs.py +++ b/flocker/volume/test/test_filesystems_zfs.py @@ -24,7 +24,7 @@ from ..filesystems.zfs import ( _DatasetInfo, - zfs_command, CommandFailed, BadArguments, Filesystem, ZFSSnapshots, + zfs_command, CommandFailed, BadArguments, Filesystem, _sync_command_error_squashed, _latest_common_snapshot, ZFS_ERROR, Snapshot, ) @@ -224,106 +224,6 @@ def test_success(self): self.assertIs(None, result) -class ZFSSnapshotsTests(SynchronousTestCase): - """Unit tests for ``ZFSSnapshotsTests``.""" - - def test_create(self): - """ - ``ZFSSnapshots.create()`` calls the ``zfs snapshot`` command with the - given ``bytes`` as the snapshot name. - """ - reactor = FakeProcessReactor() - snapshots = ZFSSnapshots(reactor, Filesystem(b"pool", "fs")) - snapshots.create(b"myname") - arguments = reactor.processes[0] - self.assertEqual(arguments.args, [b"zfs", b"snapshot", - b"pool/fs@myname"]) - - def test_create_no_result_yet(self): - """ - The result of ``ZFSSnapshots.create()`` is a ``Deferred`` that does not - fire if the creation is unfinished. - """ - reactor = FakeProcessReactor() - snapshots = ZFSSnapshots(reactor, Filesystem(b"mypool", None)) - d = snapshots.create(b"name") - self.assertNoResult(d) - - def test_create_result(self): - """ - The result of ``ZFSSnapshots.create()`` is a ``Deferred`` that fires - when creation has finished. - """ - reactor = FakeProcessReactor() - snapshots = ZFSSnapshots(reactor, Filesystem(b"mypool", None)) - d = snapshots.create(b"name") - reactor.processes[0].processProtocol.processEnded( - Failure(ProcessDone(0))) - self.assertEqual(self.successResultOf(d), None) - - def test_list(self): - """ - ``ZFSSnapshots.list()`` calls the ``zfs list`` command with the pool - name. - """ - reactor = FakeProcessReactor() - snapshots = ZFSSnapshots(reactor, Filesystem(b"mypool", None)) - snapshots.list() - self.assertEqual(reactor.processes[0].args, - [b"zfs", b"list", b"-H", b"-r", b"-t", b"snapshot", - b"-o", b"name", b"-s", b"creation", b"mypool"]) - - def test_list_result_root_dataset(self): - """ - ``ZFSSnapshots.list`` parses out the snapshot names of the root dataset - from the results of the command. - """ - reactor = FakeProcessReactor() - snapshots = ZFSSnapshots(reactor, Filesystem(b"mypool", None)) - - d = snapshots.list() - process_protocol = reactor.processes[0].processProtocol - process_protocol.childDataReceived(1, b"mypool@name\n") - process_protocol.childDataReceived(1, b"mypool@name2\n") - reactor.processes[0].processProtocol.processEnded( - Failure(ProcessDone(0))) - self.assertEqual(self.successResultOf(d), [b"name", b"name2"]) - - def test_list_result_child_dataset(self): - """ - ``ZFSSnapshots.list`` parses out the snapshot names of a non-root - dataset from the results of the command. - """ - reactor = FakeProcessReactor() - snapshots = ZFSSnapshots(reactor, Filesystem(b"mypool", b"myfs")) - - d = snapshots.list() - process_protocol = reactor.processes[0].processProtocol - process_protocol.childDataReceived(1, b"mypool/myfs@name\n") - process_protocol.childDataReceived(1, b"mypool/myfs@name2\n") - reactor.processes[0].processProtocol.processEnded( - Failure(ProcessDone(0))) - self.assertEqual(self.successResultOf(d), [b"name", b"name2"]) - - def test_list_result_ignores_other_pools(self): - """ - ``ZFSSnapshots.list`` skips snapshots of other pools. - - In particular, we are likely to see snapshot names of sub-pools in - the output. - """ - reactor = FakeProcessReactor() - snapshots = ZFSSnapshots(reactor, Filesystem(b"mypool", None)) - - d = snapshots.list() - process_protocol = reactor.processes[0].processProtocol - process_protocol.childDataReceived(1, b"mypool/child@name\n") - process_protocol.childDataReceived(1, b"mypool@name2\n") - reactor.processes[0].processProtocol.processEnded( - Failure(ProcessDone(0))) - self.assertEqual(self.successResultOf(d), [b"name2"]) - - class LatestCommonSnapshotTests(SynchronousTestCase): """ Tests for ``_latest_common_snapshot``. From 9a8d17cdc827232c5f4905c23f17a5e81bdd4135 Mon Sep 17 00:00:00 2001 From: Andriy Gapon Date: Tue, 30 Jun 2015 17:53:23 +0300 Subject: [PATCH 04/23] zfs: switch filesystem creation to lzc Note that lzc_create() never mounts the filesystem. 'zfs create' used to mount the filesystem, so now we have to do that explicitly and at the moment that is done via 'zfs mount'. --- flocker/volume/filesystems/zfs.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/flocker/volume/filesystems/zfs.py b/flocker/volume/filesystems/zfs.py index f50bc39ceb..5461a611aa 100644 --- a/flocker/volume/filesystems/zfs.py +++ b/flocker/volume/filesystems/zfs.py @@ -508,6 +508,7 @@ def __init__(self, reactor, name, mount_root): mounted. """ self._reactor = reactor + self._async_lzc = _async_lzc(self._reactor) self._name = name self._mount_root = mount_root @@ -554,17 +555,15 @@ def _check_for_out_of_space(self, reason): def create(self, volume): filesystem = self.get(volume) mount_path = filesystem.get_path().path - properties = [b"-o", b"mountpoint=" + mount_path] + properties = {b"mountpoint": mount_path} if volume.locally_owned(): - properties.extend([b"-o", b"readonly=off"]) + properties[b"readonly"] = 0 if volume.size.maximum_size is not None: - properties.extend([ - b"-o", u"refquota={0}".format( - volume.size.maximum_size).encode("ascii") - ]) - d = zfs_command(self._reactor, - [b"create"] + properties + [filesystem.name]) + properties[b"refquota"] = volume.size.maximum_size + d = self._async_lzc.lzc_create(filesystem.name, False, properties) d.addErrback(self._check_for_out_of_space) + d.addCallback( + lambda _: zfs_command(self._reactor, [b"mount", filesystem.name])) d.addCallback(lambda _: filesystem) return d From 5f66c2dbe69cb49848f4e05520609be3f56b164e Mon Sep 17 00:00:00 2001 From: Andriy Gapon Date: Thu, 9 Jul 2015 13:32:14 +0300 Subject: [PATCH 05/23] zfs: convert set_maximum_size() to libzfs_core --- flocker/volume/filesystems/zfs.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/flocker/volume/filesystems/zfs.py b/flocker/volume/filesystems/zfs.py index 5461a611aa..31f0a5d67e 100644 --- a/flocker/volume/filesystems/zfs.py +++ b/flocker/volume/filesystems/zfs.py @@ -585,16 +585,12 @@ def got_snapshots(snapshots): def set_maximum_size(self, volume): filesystem = self.get(volume) - properties = [] if volume.size.maximum_size is not None: - properties.extend([ - u"refquota={0}".format( - volume.size.maximum_size).encode("ascii") - ]) + requota = volume.size.maximum_size else: - properties.extend([u"refquota=none"]) - d = zfs_command(self._reactor, - [b"set"] + properties + [filesystem.name]) + # zero means no quota + requota = 0 + d = self._async_lzc.lzc_set_prop(filesystem.name, b"refquota", requota) d.addErrback(self._check_for_out_of_space) d.addCallback(lambda _: filesystem) return d From fa0a0ff7884de20a514f282c5cf270ec2438ea5f Mon Sep 17 00:00:00 2001 From: Andriy Gapon Date: Tue, 30 Jun 2015 18:37:01 +0300 Subject: [PATCH 06/23] zfs: delete a comment paragraph that was meant to be deleted earlier --- flocker/volume/filesystems/zfs.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/flocker/volume/filesystems/zfs.py b/flocker/volume/filesystems/zfs.py index 31f0a5d67e..7eb5409c68 100644 --- a/flocker/volume/filesystems/zfs.py +++ b/flocker/volume/filesystems/zfs.py @@ -350,12 +350,6 @@ def writer(self): # a hack. When we replace this mechanism with a proper API we # should make it include that information. # - # -e means "if the stream says it is for foo/bar/baz then receive - # into baz". I don't know why self.name is also required, - # then. XXX try -d self.pool instead. XXX it works without -e w/ - # self.name too. XXX Delete this paragraph if we go ahead with just - # `-F` in the implementation. - # # -F means force. If the stream is based on not-quite-the-latest # snapshot then we have to throw away all the snapshots newer than # it in order to receive the stream. To do that you have to From 9a80397fb1eb201b2ef33221a005fcb696e81740 Mon Sep 17 00:00:00 2001 From: Andriy Gapon Date: Tue, 30 Jun 2015 17:59:01 +0300 Subject: [PATCH 07/23] zfs: switch filesystem sending and receiving to lzc Note: lzc_send()/lzc_receive() do not close their end of the pipe (the file descriptor, in general) when they are done writing to it. Note: this API is synchrnous at the moment including a requirement for a synchronous cleanup of the file descriptors. There are functional tests to verify that. --- flocker/volume/filesystems/zfs.py | 83 ++++++++++++++++++++----------- 1 file changed, 55 insertions(+), 28 deletions(-) diff --git a/flocker/volume/filesystems/zfs.py b/flocker/volume/filesystems/zfs.py index 7eb5409c68..a404427f07 100644 --- a/flocker/volume/filesystems/zfs.py +++ b/flocker/volume/filesystems/zfs.py @@ -15,6 +15,7 @@ from subprocess import ( STDOUT, PIPE, Popen, check_call, check_output ) +from Queue import Queue from characteristic import attributes, with_cmp, with_repr @@ -255,6 +256,7 @@ def __init__(self, pool, dataset, mountpoint=None, size=None, if reactor is None: from twisted.internet import reactor self._reactor = reactor + self._async_lzc = _async_lzc(self._reactor) def _exists(self): """ @@ -302,7 +304,7 @@ def reader(self, remote_snapshots=None): # I'm just using UUIDs, and hopefully requirements will become # clearer as we iterate. snapshot = b"%s@%s" % (self.name, uuid4()) - check_call([b"zfs", b"snapshot", snapshot]) + libzfs_core.lzc_snapshot([snapshot]) # Determine whether there is a shared snapshot which can be used as the # basis for an incremental send. @@ -318,23 +320,29 @@ def reader(self, remote_snapshots=None): latest_common_snapshot = _latest_common_snapshot( remote_snapshots, local_snapshots) - - if latest_common_snapshot is None: - identifier = [snapshot] - else: - identifier = [ - b"-i", - u"{}@{}".format( - self.name, latest_common_snapshot.name).encode("ascii"), - snapshot, - ] - - process = Popen([b"zfs", b"send"] + identifier, stdout=PIPE) + latest_common_name = None + if latest_common_snapshot is not None: + latest_common_name = b"%s@%s" % (self.name, + latest_common_snapshot.name) + + (rfd, wfd) = os.pipe() + out = os.fdopen(rfd) + queue = Queue() + + def send_and_close(): + try: + libzfs_core.lzc_send(snapshot, latest_common_name, wfd) + finally: + os.close(wfd) + queue.put(None) + + d = self._async_lzc.callDeferred(send_and_close) + d.addBoth(lambda _: None) try: - yield process.stdout + yield out finally: - process.stdout.close() - process.wait() + out.close() + queue.get() @contextmanager def writer(self): @@ -350,27 +358,46 @@ def writer(self): # a hack. When we replace this mechanism with a proper API we # should make it include that information. # - # -F means force. If the stream is based on not-quite-the-latest + # If the stream is based on not-quite-the-latest # snapshot then we have to throw away all the snapshots newer than # it in order to receive the stream. To do that you have to # force. # - cmd = [b"zfs", b"receive", b"-F", self.name] + force = True else: # If the filesystem doesn't already exist then this is a complete # data stream. - cmd = [b"zfs", b"receive", self.name] - process = Popen(cmd, stdin=PIPE) - succeeded = False + force = False + + (rfd, wfd) = os.pipe() + wfile = os.fdopen(wfd, "w") + queue = Queue() + + def recv_and_close(): + try: + libzfs_core.lzc_receive(self.name, rfd, force) + success = True + except Exception: + success = False + finally: + os.close(rfd) + queue.put(success) + + d = self._async_lzc.callDeferred(recv_and_close) + d.addBoth(lambda _: None) try: - yield process.stdin + yield wfile finally: - process.stdin.close() - succeeded = not process.wait() - if succeeded: - check_call([b"zfs", b"set", - b"mountpoint=" + self._mountpoint.path, - self.name]) + try: + wfile.close() + except: + pass + succeeded = queue.get() + if succeeded and not force: + # a new filesystem + libzfs_core.lzc_set_prop(self.name, b"mountpoint", + self._mountpoint.path) + check_call([b"zfs", b"mount", self.name]) @implementer(IFilesystemSnapshots) From a44576fd13ebe739d882621595aaeb333116faf9 Mon Sep 17 00:00:00 2001 From: Andriy Gapon Date: Fri, 17 Jul 2015 18:37:52 +0300 Subject: [PATCH 08/23] zfs: switch cloning, renaming and setting some properties to lzc The properties are readonly and mountpoint properties. Note that one thing that all these operations have in common is that they may require a dataset to be unmounted and/or mounted. --- flocker/volume/filesystems/zfs.py | 33 ++++++++++++++----------------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/flocker/volume/filesystems/zfs.py b/flocker/volume/filesystems/zfs.py index a404427f07..39223ba37a 100644 --- a/flocker/volume/filesystems/zfs.py +++ b/flocker/volume/filesystems/zfs.py @@ -622,13 +622,9 @@ def clone_to(self, parent, volume): zfs_snapshots = ZFSSnapshots(self._reactor, parent_filesystem) snapshot_name = bytes(uuid4()) d = zfs_snapshots.create(snapshot_name) - clone_command = [b"clone", - # Snapshot we're cloning from: - b"%s@%s" % (parent_filesystem.name, snapshot_name), - # New filesystem we're cloning to: - new_filesystem.name, - ] - d.addCallback(lambda _: zfs_command(self._reactor, clone_command)) + full_snap_name = b"%s@%s" % (parent_filesystem.name, snapshot_name) + d.addCallback(lambda _: self._async_lzc.lzc_clone(new_filesystem.name, + full_snap_name)) self._created(d, volume) d.addCallback(lambda _: new_filesystem) return d @@ -637,7 +633,9 @@ def change_owner(self, volume, new_volume): old_filesystem = self.get(volume) new_filesystem = self.get(new_volume) d = zfs_command(self._reactor, - [b"rename", old_filesystem.name, new_filesystem.name]) + [b"umount", old_filesystem.name]) + d.addCallback(lambda _: self._async_lzc.lzc_rename( + old_filesystem.name, new_filesystem.name)) self._created(d, new_volume) def remounted(ignored): @@ -665,7 +663,7 @@ def _created(self, result, new_volume): new_mount_path = new_filesystem.get_path().path def creation_failed(f): - if f.check(CommandFailed): + if (f.check(libzfs_core.exceptions.FilesystemExists)): # This isn't the only reason the operation could fail. We # should figure out why and report it appropriately. # https://clusterhq.atlassian.net/browse/FLOC-199 @@ -675,16 +673,15 @@ def creation_failed(f): def exists(ignored): if new_volume.locally_owned(): - result = zfs_command(self._reactor, - [b"set", b"readonly=off", - new_filesystem.name]) + result = self._async_lzc.lzc_set_prop(new_filesystem.name, + b"readonly", 0) else: - result = zfs_command(self._reactor, - [b"inherit", b"readonly", - new_filesystem.name]) - result.addCallback(lambda _: zfs_command(self._reactor, - [b"set", b"mountpoint=" + new_mount_path, - new_filesystem.name])) + result = self._async_lzc.lzc_inherit_prop(new_filesystem.name, + b"readonly") + result.addCallback(lambda _: self._async_lzc.lzc_set_prop( + new_filesystem.name, b"mountpoint", new_mount_path)) + result.addCallback(lambda _: zfs_command( + self._reactor, [b"mount", new_filesystem.name])) return result result.addCallback(exists) From bcd8bf1797b2a48f3198d33a7ea0138c2a556953 Mon Sep 17 00:00:00 2001 From: Andriy Gapon Date: Fri, 17 Jul 2015 17:44:56 +0300 Subject: [PATCH 09/23] zfs: replace snapshot listing via 'zfs' with lzc_list_snaps() --- flocker/volume/filesystems/zfs.py | 59 +++++++++++-------------------- 1 file changed, 21 insertions(+), 38 deletions(-) diff --git a/flocker/volume/filesystems/zfs.py b/flocker/volume/filesystems/zfs.py index 39223ba37a..475a49ede5 100644 --- a/flocker/volume/filesystems/zfs.py +++ b/flocker/volume/filesystems/zfs.py @@ -12,9 +12,7 @@ from functools import wraps from contextlib import contextmanager from uuid import uuid4 -from subprocess import ( - STDOUT, PIPE, Popen, check_call, check_output -) +from subprocess import STDOUT, PIPE, Popen, check_call from Queue import Queue from characteristic import attributes, with_cmp, with_repr @@ -310,10 +308,8 @@ def reader(self, remote_snapshots=None): # basis for an incremental send. local_snapshots = list( Snapshot(name=name) for name in - _parse_snapshots( - check_output([b"zfs"] + _list_snapshots_command(self)), - self - )) + _parse_snapshots(_do_list_snapshots(self), self) + ) if remote_snapshots is None: remote_snapshots = [] @@ -422,56 +418,43 @@ def list(self): return _list_snapshots(self._reactor, self._filesystem) -def _list_snapshots_command(filesystem): +def _do_list_snapshots(filesystem): """ - Construct a ``zfs`` command which will output the names of the snapshots of - the given filesystem. + Produce a list of snapshots of the given filesystem sorted by their + creation order. :param Filesystem filesystem: The ZFS filesystem the snapshots of which to list. - :return list: An argument list (of ``bytes``) which can be passed to - ``zfs`` to produce the desired list of snapshots. ``zfs`` is not - included as the first element. + :return list: A ``list`` of ``bytes`` corresponding to the + names of the snapshots. """ - return [ - b"list", - # Format the output without a header. - b"-H", - # Recurse to datasets beneath the named dataset. - b"-r", - # Only output datasets of type snapshot. - b"-t", b"snapshot", - # Only output the name of each dataset encountered. The name is the - # only thing we currently store in our snapshot model. - b"-o", b"name", - # Sort by the creation property. This gives us the snapshots in the - # order they were taken. - b"-s", b"creation", - # Start with this the dataset we're interested in. - filesystem.name, - ] + snaps = [] + for snap in libzfs_core.lzc_list_snaps(filesystem.name): + creation = libzfs_core.lzc_get_props(snap)[b"creation"] + snaps.append((snap, creation)) + return [x[0] for x in sorted(snaps, key=lambda x: x[1])] def _parse_snapshots(data, filesystem): """ - Parse the output of a ``zfs list`` command (like the one defined by - ``_list_snapshots_command`` into a ``list`` of ``bytes`` (the snapshot - names only). + Transform the list of fully qualified snapshot names to a list of + snapshot short names that are relative to the given filesystem. - :param bytes data: The output to parse. + :param bytes data: A ``list`` of ``bytes`` corresponding to the names + of the snapshots. :param Filesystem filesystem: The filesystem from which to extract snapshots. If the output includes snapshots for other filesystems (eg siblings or children) they are excluded from the result. :return list: A ``list`` of ``bytes`` corresponding to the - names of the snapshots in the output. The order of the list is the + short names of the snapshots. The order of the list is the same as the order of the snapshots in the data being parsed. """ result = [] - for line in data.splitlines(): - dataset, snapshot = line.split(b'@', 1) + for snap in data: + dataset, snapshot = snap.split(b'@', 1) if dataset == filesystem.name: result.append(snapshot) return result @@ -490,7 +473,7 @@ def _list_snapshots(reactor, filesystem): :return: A ``Deferred`` which fires with a ``list`` of ``Snapshot`` instances giving the requested snapshot information. """ - d = zfs_command(reactor, _list_snapshots_command(filesystem)) + d = _async_lzc(reactor).callDeferred(_do_list_snapshots, filesystem) d.addCallback(_parse_snapshots, filesystem) return d From 8d8a6d8486049bb63f8919758b736a110fb248aa Mon Sep 17 00:00:00 2001 From: Andriy Gapon Date: Wed, 8 Jul 2015 17:34:57 +0300 Subject: [PATCH 10/23] zfs: make snapshot sorting more reliable by using createtxg property In theory two snapshots can the same 'creation' time because its resolution is on second. In fact, I've seen a test failing because of that. createtxg is always unique, because ZFS does not allow to created more than one snapshot for the same filesystem within the same transaction group. --- flocker/volume/filesystems/zfs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flocker/volume/filesystems/zfs.py b/flocker/volume/filesystems/zfs.py index 475a49ede5..5fb95d3697 100644 --- a/flocker/volume/filesystems/zfs.py +++ b/flocker/volume/filesystems/zfs.py @@ -431,7 +431,7 @@ def _do_list_snapshots(filesystem): """ snaps = [] for snap in libzfs_core.lzc_list_snaps(filesystem.name): - creation = libzfs_core.lzc_get_props(snap)[b"creation"] + creation = libzfs_core.lzc_get_props(snap)[b"createtxg"] snaps.append((snap, creation)) return [x[0] for x in sorted(snaps, key=lambda x: x[1])] From 1349bfdf5d0bdbd514c89651e29807d97af726bd Mon Sep 17 00:00:00 2001 From: Andriy Gapon Date: Thu, 9 Jul 2015 15:51:15 +0300 Subject: [PATCH 11/23] zfs: convert filesystem listing to libzfs_core Also, internal _list_filesystems() method is changed to return an iterator rather than a deferred that produces an iterator. It's not clear why we need the iterator at all because the single user of this method consumes the whole iterator and builds a set. --- flocker/volume/filesystems/zfs.py | 46 +++++++++---------------------- 1 file changed, 13 insertions(+), 33 deletions(-) diff --git a/flocker/volume/filesystems/zfs.py b/flocker/volume/filesystems/zfs.py index 5fb95d3697..1457abfe6d 100644 --- a/flocker/volume/filesystems/zfs.py +++ b/flocker/volume/filesystems/zfs.py @@ -675,7 +675,7 @@ def get(self, volume): self._name, dataset, mount_path, volume.size) def enumerate(self): - listing = _list_filesystems(self._reactor, self._name) + listing = self._async_lzc.callDeferred(_list_filesystems, self._name) def listed(filesystems): result = set() @@ -701,40 +701,20 @@ class _DatasetInfo(object): """ -def _list_filesystems(reactor, pool): +def _list_filesystems(pool): """Get a listing of all filesystems on a given pool. :param pool: A `flocker.volume.filesystems.interface.IStoragePool` provider. - :return: A ``Deferred`` that fires with an iterator, the elements - of which are ``tuples`` containing the name and mountpoint of each - filesystem. + :return: An iterator, the elements of which are ``tuples`` containing + the name and mountpoint of each filesystem. """ - listing = zfs_command( - reactor, - [b"list", - # Descend the hierarchy to a depth of one (ie, list the direct - # children of the pool) - b"-d", b"1", - # Omit the output header - b"-H", - # Output exact, machine-parseable values (eg 65536 instead of 64K) - b"-p", - # Output each dataset's name, mountpoint and refquota - b"-o", b"name,mountpoint,refquota", - # Look at this pool - pool]) - - def listed(output, pool): - for line in output.splitlines(): - name, mountpoint, refquota = line.split(b'\t') - name = name[len(pool) + 1:] - if name: - refquota = int(refquota.decode("ascii")) - if refquota == 0: - refquota = None - yield _DatasetInfo( - dataset=name, mountpoint=mountpoint, refquota=refquota) - - listing.addCallback(listed, pool) - return listing + for child in libzfs_core.lzc_list_children(pool): + props = libzfs_core.lzc_get_props(child) + name = child[len(pool) + 1:] + refquota = props[b"refquota"] + mountpoint = props[b"mountpoint"] + if refquota == 0: + refquota = None + yield _DatasetInfo( + dataset=name, mountpoint=mountpoint, refquota=refquota) From 35ce7c247a07ab69615e094faf8cd3a617b70e15 Mon Sep 17 00:00:00 2001 From: Andriy Gapon Date: Wed, 8 Jul 2015 17:37:13 +0300 Subject: [PATCH 12/23] zfs: convert filesystem (with snapshots) destruction to libzfs_core Note: that we need to explicitly unmount the filesystem before destroying it. --- flocker/volume/filesystems/zfs.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/flocker/volume/filesystems/zfs.py b/flocker/volume/filesystems/zfs.py index 1457abfe6d..052aec67e7 100644 --- a/flocker/volume/filesystems/zfs.py +++ b/flocker/volume/filesystems/zfs.py @@ -26,9 +26,7 @@ from twisted.internet.endpoints import ProcessEndpoint, connectProtocol from twisted.internet.interfaces import IReactorThreads from twisted.internet.protocol import Protocol -from twisted.internet.defer import ( - Deferred, succeed, gatherResults, maybeDeferred -) +from twisted.internet.defer import Deferred, succeed, maybeDeferred from twisted.internet.threads import deferToThreadPool from twisted.internet.error import ConnectionDone, ProcessTerminated from twisted.application.service import Service @@ -578,13 +576,15 @@ def destroy(self, volume): # It would be better to have snapshot destruction logic as part of # IFilesystemSnapshots, but that isn't really necessary yet. def got_snapshots(snapshots): - return gatherResults(list(zfs_command( - self._reactor, - [b"destroy", b"%s@%s" % (filesystem.name, snapshot.name)]) - for snapshot in snapshots)) + return self._async_lzc.lzc_destroy_snaps([ + b"%s@%s" % (filesystem.name, snapshot.name) + for snapshot in snapshots + ], defer=False) d.addCallback(got_snapshots) - d.addCallback(lambda _: zfs_command( - self._reactor, [b"destroy", filesystem.name])) + d.addCallback( + lambda _: zfs_command(self._reactor, [b"umount", filesystem.name])) + d.addCallback( + lambda _: self._async_lzc.lzc_destroy(filesystem.name)) return d def set_maximum_size(self, volume): From f4eb274e326f116b127846cf7f8b0b71145b4af9 Mon Sep 17 00:00:00 2001 From: Andriy Gapon Date: Thu, 9 Jul 2015 17:35:43 +0300 Subject: [PATCH 13/23] zfs: convert the initial setup of the pool root filesystem to libzfs_core _sync_command_error_squashed() became obsolete and was removed. Its unit tests are removed as well. --- flocker/volume/filesystems/zfs.py | 50 +++++++----------- flocker/volume/test/test_filesystems_zfs.py | 56 +-------------------- 2 files changed, 20 insertions(+), 86 deletions(-) diff --git a/flocker/volume/filesystems/zfs.py b/flocker/volume/filesystems/zfs.py index 052aec67e7..fc52e02fdc 100644 --- a/flocker/volume/filesystems/zfs.py +++ b/flocker/volume/filesystems/zfs.py @@ -12,7 +12,7 @@ from functools import wraps from contextlib import contextmanager from uuid import uuid4 -from subprocess import STDOUT, PIPE, Popen, check_call +from subprocess import call, check_call from Queue import Queue from characteristic import attributes, with_cmp, with_repr @@ -162,33 +162,6 @@ def zfs_command(reactor, arguments): u"The zfs command signaled an error.") -def _sync_command_error_squashed(arguments, logger): - """ - Synchronously run a command-line tool with the given arguments. - - :param arguments: A ``list`` of ``bytes``, command-line arguments to - execute. - - :param eliot.Logger logger: The log writer to use to log errors running the - zfs command. - """ - message = None - log_arguments = b" ".join(arguments) - try: - process = Popen(arguments, stdout=PIPE, stderr=STDOUT) - output = process.stdout.read() - status = process.wait() - except Exception as e: - message = ZFS_ERROR( - zfs_command=log_arguments, output=str(e), status=1) - else: - if status: - message = ZFS_ERROR( - zfs_command=log_arguments, output=output, status=status) - if message is not None: - message.write(logger) - - @attributes(["name"]) class Snapshot(object): """ @@ -532,18 +505,31 @@ def startService(self): # for StoragePool being an IService implementation). # https://clusterhq.atlassian.net/browse/FLOC-635 + # First, actually unmount the dataset. + # See the explanation below where 'canmount' is set to 'off'. + # At the moment all errors are ignored. + call([b"umount", self._name]) + # Set the root dataset to be read only; IService.startService # doesn't support Deferred results, and in any case startup can be # synchronous with no ill effects. - _sync_command_error_squashed( - [b"zfs", b"set", b"readonly=on", self._name], self.logger) + try: + libzfs_core.lzc_set_prop(self._name, b"readonly", 1) + except libzfs_core.exceptions.ZFSError as e: + message = ZFS_ERROR(zfs_command="set readonly=on " + self._name, + output=str(e), status=e.errno) + message.write(self.logger) # If the root dataset is read-only then it's not possible to create # mountpoints in it for its child datasets. Avoid mounting it to avoid # this problem. This should be fine since we don't ever intend to put # any actual data into the root dataset. - _sync_command_error_squashed( - [b"zfs", b"set", b"canmount=off", self._name], self.logger) + try: + libzfs_core.lzc_set_prop(self._name, b"canmount", 0) + except libzfs_core.exceptions.ZFSError as e: + message = ZFS_ERROR(zfs_command="set canmount=off" + self._name, + output=str(e), status=e.errno) + message.write(self.logger) def _check_for_out_of_space(self, reason): """ diff --git a/flocker/volume/test/test_filesystems_zfs.py b/flocker/volume/test/test_filesystems_zfs.py index 4c90fceadd..eee8a378f4 100644 --- a/flocker/volume/test/test_filesystems_zfs.py +++ b/flocker/volume/test/test_filesystems_zfs.py @@ -13,10 +13,7 @@ from twisted.python.failure import Failure from twisted.python.filepath import FilePath -from eliot import Logger -from eliot.testing import ( - LoggedMessage, validateLogging, assertHasMessage, - ) +from eliot.testing import LoggedMessage, assertHasMessage from ...testtools import ( FakeProcessReactor, assert_equal_comparison, assert_not_equal_comparison @@ -25,8 +22,7 @@ from ..filesystems.zfs import ( _DatasetInfo, zfs_command, CommandFailed, BadArguments, Filesystem, - _sync_command_error_squashed, _latest_common_snapshot, ZFS_ERROR, - Snapshot, + _latest_common_snapshot, ZFS_ERROR, Snapshot, ) @@ -176,54 +172,6 @@ def no_such_executable_logged(case, logger): case.assertEqual(len(LoggedMessage.ofType(logger.messages, ZFS_ERROR)), 1) -def error_status_logged(case, logger): - """ - Validate the error logging behavior of ``_sync_command_error_squashed``. - """ - assertHasMessage(case, logger, ZFS_ERROR, { - 'status': 1, - 'zfs_command': 'python -c raise SystemExit(1)', - 'output': ''}) - case.assertEqual(len(LoggedMessage.ofType(logger.messages, ZFS_ERROR)), 1) - - -class SyncCommandTests(SynchronousTestCase): - """ - Tests for ``_sync_command_error_squashed``. - """ - @validateLogging(no_such_executable_logged) - def test_no_such_executable(self, logger): - """ - If the executable specified to ``_sync_command_error_squashed`` cannot - be found then the function nevertheless returns ``None``. - """ - result = _sync_command_error_squashed( - [b"nonsense garbage made up no such command"], - logger) - self.assertIs(None, result) - - @validateLogging(error_status_logged) - def test_error_exit(self, logger): - """ - If the child process run by ``_sync_command_error_squashed`` exits with - an an error status then the function nevertheless returns ``None``. - """ - result = _sync_command_error_squashed( - [b"python", b"-c", b"raise SystemExit(1)"], - logger) - self.assertIs(None, result) - - def test_success(self): - """ - ``_sync_command_error_squashed`` runs the given command and returns - ``None``. - """ - result = _sync_command_error_squashed( - [b"python", b"-c", b""], - Logger()) - self.assertIs(None, result) - - class LatestCommonSnapshotTests(SynchronousTestCase): """ Tests for ``_latest_common_snapshot``. From 186eff406b9c2cd27260ceba602d826fc42ef6d8 Mon Sep 17 00:00:00 2001 From: Andriy Gapon Date: Mon, 13 Jul 2015 17:49:58 +0300 Subject: [PATCH 14/23] zfs: fix a problem with unmounting of some filesystems 'zfs umount' refuses to work on a ZFS filesystem if its mountpoint property is set to legacy or none. On the other hand, 'umount' can handle all property settings. So, prefer using the latter. The ZFS backend itself never sets mountpoint to those values, but there is at least one funcitonal test case that sets up a requried scenario by directly fiddling with the property. To implement the change zfs_command() function is split into ext_command() that can handle any external command and zfs_command() convenience facade. --- flocker/volume/filesystems/zfs.py | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/flocker/volume/filesystems/zfs.py b/flocker/volume/filesystems/zfs.py index fc52e02fdc..2a3ef75ee5 100644 --- a/flocker/volume/filesystems/zfs.py +++ b/flocker/volume/filesystems/zfs.py @@ -129,26 +129,42 @@ def _async_lzc(reactor): return _reactor_to_alzc[reactor] -def zfs_command(reactor, arguments): +def ext_command(reactor, arguments): """ - Asynchronously run the ``zfs`` command-line tool with the given arguments. + Asynchronously run the given command-line tool with the given arguments. :param reactor: A ``IReactorProcess`` provider. - :param arguments: A ``list`` of ``bytes``, command-line arguments to - ``zfs``. + :param arguments: A ``list`` of ``bytes``, the command and command-line + arguments. :return: A :class:`Deferred` firing with the bytes of the result (on exit code 0), or errbacking with :class:`CommandFailed` or :class:`BadArguments` depending on the exit code (1 or 2). """ - endpoint = ProcessEndpoint(reactor, b"zfs", [b"zfs"] + arguments, + endpoint = ProcessEndpoint(reactor, arguments[0], arguments, os.environ) d = connectProtocol(endpoint, _AccumulatingProtocol()) d.addCallback(lambda protocol: protocol._result) return d +def zfs_command(reactor, arguments): + """ + Asynchronously run the ``zfs`` command-line tool with the given arguments. + + :param reactor: A ``IReactorProcess`` provider. + + :param arguments: A ``list`` of ``bytes``, command-line arguments to + ``zfs``. + + :return: A :class:`Deferred` firing with the bytes of the result (on + exit code 0), or errbacking with :class:`CommandFailed` or + :class:`BadArguments` depending on the exit code (1 or 2). + """ + return ext_command(reactor, [b"zfs"] + arguments) + + _ZFS_COMMAND = Field.forTypes( "zfs_command", [bytes], u"The command which was run.") _OUTPUT = Field.forTypes( @@ -568,7 +584,7 @@ def got_snapshots(snapshots): ], defer=False) d.addCallback(got_snapshots) d.addCallback( - lambda _: zfs_command(self._reactor, [b"umount", filesystem.name])) + lambda _: ext_command(self._reactor, [b"umount", filesystem.name])) d.addCallback( lambda _: self._async_lzc.lzc_destroy(filesystem.name)) return d @@ -601,7 +617,7 @@ def clone_to(self, parent, volume): def change_owner(self, volume, new_volume): old_filesystem = self.get(volume) new_filesystem = self.get(new_volume) - d = zfs_command(self._reactor, + d = ext_command(self._reactor, [b"umount", old_filesystem.name]) d.addCallback(lambda _: self._async_lzc.lzc_rename( old_filesystem.name, new_filesystem.name)) From 8308e8a1e4d839de9e0f93ccf1a9de28930b18b8 Mon Sep 17 00:00:00 2001 From: Andriy Gapon Date: Fri, 7 Aug 2015 18:05:20 +0300 Subject: [PATCH 15/23] zfs: add basic docstrings for _AsyncLZC and the related code --- flocker/volume/filesystems/zfs.py | 44 +++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/flocker/volume/filesystems/zfs.py b/flocker/volume/filesystems/zfs.py index 2a3ef75ee5..0c5a8aed3e 100644 --- a/flocker/volume/filesystems/zfs.py +++ b/flocker/volume/filesystems/zfs.py @@ -72,15 +72,40 @@ def connectionLost(self, reason): class _AsyncLZC(object): + """ + A proxy class for the asynchronous execution using a given reactor and its + thread pool. + + Primarily this class dispatches its method calls to the functions in + :mod:`libzfs_core`. But it can also be used for the asynchronous execution + of an arbitrary function. + """ + def __init__(self, reactor): + """ + :param reactor: the reactor that is to be used for the asynchronous + execution. + """ self._reactor = reactor self._cache = {} def callDeferred(self, func, *args, **kwargs): + """ + This is a thin wrapper around :func:`deferToThreadPool`. + + Its primary advantage is that the reactor is already associated with + an instance of :class:`_AsyncLZC` and :meth:`getThreadPool` is called + to get the reactor's thread pool. + """ return deferToThreadPool(self._reactor, self._reactor.getThreadPool(), func, *args, **kwargs) def __getattr__(self, name): + """ + Pretend that this class provides the same methods as the functions + in :mod:`libzfs_core`. The proxy methods execute the functions + in the asynchronous mode using the reactor and its thread pool. + """ try: return self._cache[name] except KeyError: @@ -95,6 +120,16 @@ def _async_wrapper(*args, **kwargs): class _FakeAsyncLZC(object): + """ + A proxy class that emulates the asynchronous execution. + + This class simulates behavior of :class:`_AsyncLZC`, but all the calls + are actually synchronous and returned :class:`Deferred` objects already + have results. + + This is useful for testing when a reactor used does not have a thread pool. + """ + def __init__(self): self._cache = {} @@ -119,6 +154,15 @@ def _async_wrapper(*args, **kwargs): def _async_lzc(reactor): + """ + Return an instance of either :class:`_AsyncLZC` or :class:`_FakeAsyncLZC` + for the given reactor depending on its capabilities. + + :param reactor: the reactor. + + The instance gets associated with the reactor and the same instance will + be returned for subsequent calls with the same ``reactor`` argument. + """ try: return _reactor_to_alzc[reactor] except KeyError: From 3d8f9cb348b7927638f7f83243f178ab940fc4f9 Mon Sep 17 00:00:00 2001 From: Andriy Gapon Date: Fri, 7 Aug 2015 20:10:58 +0300 Subject: [PATCH 16/23] zfs: catch up with lzc_create signature change --- flocker/volume/filesystems/zfs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flocker/volume/filesystems/zfs.py b/flocker/volume/filesystems/zfs.py index 0c5a8aed3e..a18276ea71 100644 --- a/flocker/volume/filesystems/zfs.py +++ b/flocker/volume/filesystems/zfs.py @@ -608,7 +608,7 @@ def create(self, volume): properties[b"readonly"] = 0 if volume.size.maximum_size is not None: properties[b"refquota"] = volume.size.maximum_size - d = self._async_lzc.lzc_create(filesystem.name, False, properties) + d = self._async_lzc.lzc_create(filesystem.name, props=properties) d.addErrback(self._check_for_out_of_space) d.addCallback( lambda _: zfs_command(self._reactor, [b"mount", filesystem.name])) From dcc914fd2515491e5b38d598e0cda04d1a68dfc0 Mon Sep 17 00:00:00 2001 From: Andriy Gapon Date: Mon, 10 Aug 2015 13:20:35 +0300 Subject: [PATCH 17/23] log lzc_send and lzc_receive exceptions --- flocker/volume/filesystems/zfs.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/flocker/volume/filesystems/zfs.py b/flocker/volume/filesystems/zfs.py index a18276ea71..0537f7e0d1 100644 --- a/flocker/volume/filesystems/zfs.py +++ b/flocker/volume/filesystems/zfs.py @@ -265,6 +265,8 @@ class Filesystem(object): filesystem. This will likely grow into a more sophisticiated implementation over time. """ + logger = Logger() + def __init__(self, pool, dataset, mountpoint=None, size=None, reactor=None): """ @@ -359,6 +361,10 @@ def reader(self, remote_snapshots=None): def send_and_close(): try: libzfs_core.lzc_send(snapshot, latest_common_name, wfd) + except Exception as e: + message = ZFS_ERROR(zfs_command="lzc_send " + snapshot, + output=str(e), status=e.errno) + message.write(self.logger) finally: os.close(wfd) queue.put(None) @@ -404,8 +410,11 @@ def recv_and_close(): try: libzfs_core.lzc_receive(self.name, rfd, force) success = True - except Exception: + except Exception as e: success = False + message = ZFS_ERROR(zfs_command="lzc_receive " + self.name, + output=str(e), status=e.errno) + message.write(self.logger) finally: os.close(rfd) queue.put(success) From 4d2bbdb604c88bc07daa4935aefff64cc8dd86e0 Mon Sep 17 00:00:00 2001 From: Andriy Gapon Date: Mon, 10 Aug 2015 13:29:36 +0300 Subject: [PATCH 18/23] zfs backend: remove _FakeAsyncLZC that fell out of use --- flocker/volume/filesystems/zfs.py | 42 +++---------------------------- 1 file changed, 3 insertions(+), 39 deletions(-) diff --git a/flocker/volume/filesystems/zfs.py b/flocker/volume/filesystems/zfs.py index 0537f7e0d1..e119438dbd 100644 --- a/flocker/volume/filesystems/zfs.py +++ b/flocker/volume/filesystems/zfs.py @@ -24,9 +24,8 @@ from twisted.python.failure import Failure from twisted.python.filepath import FilePath from twisted.internet.endpoints import ProcessEndpoint, connectProtocol -from twisted.internet.interfaces import IReactorThreads from twisted.internet.protocol import Protocol -from twisted.internet.defer import Deferred, succeed, maybeDeferred +from twisted.internet.defer import Deferred, succeed from twisted.internet.threads import deferToThreadPool from twisted.internet.error import ConnectionDone, ProcessTerminated from twisted.application.service import Service @@ -119,44 +118,12 @@ def _async_wrapper(*args, **kwargs): return self._cache[name] -class _FakeAsyncLZC(object): - """ - A proxy class that emulates the asynchronous execution. - - This class simulates behavior of :class:`_AsyncLZC`, but all the calls - are actually synchronous and returned :class:`Deferred` objects already - have results. - - This is useful for testing when a reactor used does not have a thread pool. - """ - - def __init__(self): - self._cache = {} - - def callDeferred(self, func, *args, **kwargs): - return maybeDeferred(func, *args, **kwargs) - - def __getattr__(self, name): - try: - return self._cache[name] - except KeyError: - func = getattr(libzfs_core, name) - - @wraps(func) - def _async_wrapper(*args, **kwargs): - return maybeDeferred(func, *args, **kwargs) - - self._cache[name] = _async_wrapper - return self._cache[name] - - _reactor_to_alzc = {} def _async_lzc(reactor): """ - Return an instance of either :class:`_AsyncLZC` or :class:`_FakeAsyncLZC` - for the given reactor depending on its capabilities. + Return an instance of :class:`_AsyncLZC` for the given reactor. :param reactor: the reactor. @@ -166,10 +133,7 @@ def _async_lzc(reactor): try: return _reactor_to_alzc[reactor] except KeyError: - if IReactorThreads.providedBy(reactor): - _reactor_to_alzc[reactor] = _AsyncLZC(reactor) - else: - _reactor_to_alzc[reactor] = _FakeAsyncLZC() + _reactor_to_alzc[reactor] = _AsyncLZC(reactor) return _reactor_to_alzc[reactor] From e387cf504b68bad090f5c4e01f4d3068916f675b Mon Sep 17 00:00:00 2001 From: Andriy Gapon Date: Fri, 14 Aug 2015 13:21:00 +0300 Subject: [PATCH 19/23] lzc_send/lzc_receive: log traceback in addition to exception --- flocker/volume/filesystems/zfs.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flocker/volume/filesystems/zfs.py b/flocker/volume/filesystems/zfs.py index e119438dbd..d5c9d7fff8 100644 --- a/flocker/volume/filesystems/zfs.py +++ b/flocker/volume/filesystems/zfs.py @@ -19,7 +19,7 @@ from zope.interface import implementer -from eliot import Field, MessageType, Logger +from eliot import Field, MessageType, Logger, write_traceback from twisted.python.failure import Failure from twisted.python.filepath import FilePath @@ -329,6 +329,7 @@ def send_and_close(): message = ZFS_ERROR(zfs_command="lzc_send " + snapshot, output=str(e), status=e.errno) message.write(self.logger) + write_traceback(self.logger) finally: os.close(wfd) queue.put(None) @@ -379,6 +380,7 @@ def recv_and_close(): message = ZFS_ERROR(zfs_command="lzc_receive " + self.name, output=str(e), status=e.errno) message.write(self.logger) + write_traceback(self.logger) finally: os.close(rfd) queue.put(success) From 47dfab0fa7b28a0a8033145d0351aef19c7284ff Mon Sep 17 00:00:00 2001 From: Andriy Gapon Date: Tue, 29 Sep 2015 16:43:45 +0300 Subject: [PATCH 20/23] add pyzfs to requirements --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index e0d1275aff..cf165fdeb1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -37,6 +37,7 @@ python-keystoneclient==1.4.0 python-keystoneclient-rackspace==0.1.3 python-novaclient==2.24.1 pytz==2015.4 +pyzfs==0.2 PyYAML==3.10 requests==2.4.3 service-identity==14.0.0 From 7cfdc3a9bc0f58fbc38e9d55db19f1716df7c129 Mon Sep 17 00:00:00 2001 From: Andriy Gapon Date: Thu, 1 Oct 2015 11:22:16 +0300 Subject: [PATCH 21/23] update pyzfs requirements --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index db72bec2c2..baf404dedc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -40,7 +40,7 @@ python-keystoneclient==1.4.0 python-keystoneclient-rackspace==0.1.3 python-novaclient==2.24.1 pytz==2015.4 -pyzfs==0.2 +pyzfs==0.2.1 PyYAML==3.10 repoze.lru==0.6 requests==2.7.0 From 05fb5d504b907897b8945ea5c6d73ad32605327c Mon Sep 17 00:00:00 2001 From: Andriy Gapon Date: Thu, 8 Oct 2015 16:01:59 +0300 Subject: [PATCH 22/23] flocker-volume: ensure that the reactor is running deferLater() unlike maybeDeferred ensures that the reactor is actually running. This is required by the new ZFS backend, because it uses the reactor's thread pool to run lzc_send() and lzc_receive() in separate threads. That has to be done because those calls lend their context to the kernel for doing the actual send / receive work. The main thread pumps the stream data between the userspace and the kernel. --- flocker/volume/script.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flocker/volume/script.py b/flocker/volume/script.py index c667c9cf73..7d79c0865e 100644 --- a/flocker/volume/script.py +++ b/flocker/volume/script.py @@ -6,7 +6,8 @@ from twisted.python.usage import Options from twisted.python.filepath import FilePath -from twisted.internet.defer import succeed, maybeDeferred +from twisted.internet import task +from twisted.internet.defer import succeed from zope.interface import implementer @@ -240,7 +241,7 @@ def main(self, reactor, options, service): documentation. """ if options.subCommand is not None: - return maybeDeferred(options.subOptions.run, service) + return task.deferLater(reactor, 0, options.subOptions.run, service) else: return succeed(None) From 8ba38077c4583c0b2f9a5bc50f4b1b8c2606e776 Mon Sep 17 00:00:00 2001 From: Andriy Gapon Date: Fri, 30 Oct 2015 13:42:14 +0200 Subject: [PATCH 23/23] use lzc_receive_with_header() instead of lzc_receive The latter can not automatically derive a snapshot name from the given stream. See ZFS-30 for more details. --- flocker/volume/filesystems/zfs.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/flocker/volume/filesystems/zfs.py b/flocker/volume/filesystems/zfs.py index d5c9d7fff8..d204c3f26f 100644 --- a/flocker/volume/filesystems/zfs.py +++ b/flocker/volume/filesystems/zfs.py @@ -373,7 +373,13 @@ def writer(self): def recv_and_close(): try: - libzfs_core.lzc_receive(self.name, rfd, force) + (header, c_header) = libzfs_core.receive_header(rfd) + # drr_toname is a full snapshot name, but we need only the part + # after '@' that we use to construct a local snapshot name. + snapname = header['drr_toname'].split('@', 1)[1] + snapname = self.name + '@' + snapname + libzfs_core.lzc_receive_with_header(snapname, rfd, c_header, + force) success = True except Exception as e: success = False