Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offline mode #2472

Closed
wants to merge 9 commits into from
Closed
39 changes: 26 additions & 13 deletions tests/test_trusted_metadata_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,22 +192,35 @@ def test_out_of_order_ops(self) -> None:
self.metadata["role1"], "role1", Targets.type
)

def test_initial_root_with_invalid_json(self) -> None:
# root is not json
with self.assertRaises(exceptions.RepositoryError):
TrustedMetadataSet(b"")

# root is invalid
root = Metadata.from_bytes(self.metadata[Root.type])
root.signed.version += 1
with self.assertRaises(exceptions.UnsignedMetadataError):
TrustedMetadataSet(root.to_bytes())

# metadata is of wrong type
with self.assertRaises(exceptions.RepositoryError):
TrustedMetadataSet(self.metadata[Snapshot.type])

def test_root_with_invalid_json(self) -> None:
# Test loading initial root and root update
for test_func in [TrustedMetadataSet, self.trusted_set.update_root]:
# root is not json
with self.assertRaises(exceptions.RepositoryError):
test_func(b"")
# root is not json
with self.assertRaises(exceptions.RepositoryError):
self.trusted_set.update_root(b"")

# root is invalid
root = Metadata.from_bytes(self.metadata[Root.type])
root.signed.version += 1
with self.assertRaises(exceptions.UnsignedMetadataError):
test_func(root.to_bytes())
# root is invalid
root = Metadata.from_bytes(self.metadata[Root.type])
root.signed.version += 1
with self.assertRaises(exceptions.UnsignedMetadataError):
self.trusted_set.update_root(root.to_bytes())

# metadata is of wrong type
with self.assertRaises(exceptions.RepositoryError):
test_func(self.metadata[Snapshot.type])
# metadata is of wrong type
with self.assertRaises(exceptions.RepositoryError):
self.trusted_set.update_root(self.metadata[Snapshot.type])

def test_top_level_md_with_invalid_json(self) -> None:
top_level_md: List[Tuple[bytes, Callable[[bytes], Metadata]]] = [
Expand Down
207 changes: 207 additions & 0 deletions tests/test_updater_offline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
#!/usr/bin/env python

# Copyright 2021, New York University and the TUF contributors
# SPDX-License-Identifier: MIT OR Apache-2.0


"""Test ngclient Updater offline mode"""

import datetime
import os
import sys
import tempfile
import unittest
from typing import Optional
from unittest.mock import Mock, patch

from tests import utils
from tests.repository_simulator import RepositorySimulator
from tuf.api.exceptions import DownloadError, ExpiredMetadataError
from tuf.api.metadata import SPECIFICATION_VERSION, DelegatedRole, Targets
from tuf.ngclient import Updater, UpdaterConfig


class TestOffline(unittest.TestCase):
"""Test Updater in offline mode"""

# set dump_dir to trigger repository state dumps
dump_dir: Optional[str] = None

def setUp(self) -> None:
# pylint: disable=consider-using-with
self.temp_dir = tempfile.TemporaryDirectory()
self.metadata_dir = os.path.join(self.temp_dir.name, "metadata")
self.targets_dir = os.path.join(self.temp_dir.name, "targets")
os.mkdir(self.metadata_dir)
os.mkdir(self.targets_dir)

self.sim = RepositorySimulator()

# Add a delegated role and two targets to repository
self.sim.targets.version += 1
spec_version = ".".join(SPECIFICATION_VERSION)
targets = Targets(1, spec_version, self.sim.safe_expiry, {}, None)
role = DelegatedRole("delegated", [], 1, False, ["delegated/*"], None)
self.sim.add_delegation("targets", role, targets)
self.sim.add_target("targets", b"hello world", "file")
self.sim.add_target("delegated", b"content", "delegated/file2")
self.sim.update_snapshot()

# boostrap client with initial root metadata
with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f:
f.write(self.sim.signed_roots[0])

if self.dump_dir is not None:
# create test specific dump directory
name = self.id().split(".")[-1]
self.sim.dump_dir = os.path.join(self.dump_dir, name)
os.mkdir(self.sim.dump_dir)

def tearDown(self) -> None:
self.temp_dir.cleanup()

def _run_refresh(self) -> Updater:
"""Create a new Updater instance and refresh"""
if self.dump_dir is not None:
self.sim.write()

updater = Updater(
self.metadata_dir,
"https://example.com/metadata/",
self.targets_dir,
"https://example.com/targets/",
self.sim,
)
updater.refresh()
return updater

def _run_offline_refresh(self) -> Updater:
"""Create a new Updater instance and refresh"""
if self.dump_dir is not None:
self.sim.write()

updater = Updater(
self.metadata_dir,
"https://example.com/metadata/",
self.targets_dir,
"https://example.com/targets/",
self.sim,
UpdaterConfig(offline=True),
)
updater.refresh()
return updater

@patch.object(datetime, "datetime", wraps=datetime.datetime)
def test_refresh(self, mock_time: Mock) -> None:
"""Test metadata refresh refresh()in offline mode"""
# Run a "online" updater refresh to get toplevel metadata in local cache
self._run_refresh()

self.sim.fetch_tracker.metadata.clear()

# Refresh works in Offline mode (at this point metadata is not expired)
self._run_offline_refresh()
# Expect no download attempts
self.assertListEqual(self.sim.fetch_tracker.metadata, [])

# Move current time a year into the future: all metadata is now expired
mock_time.utcnow.return_value = (
datetime.datetime.utcnow() + datetime.timedelta(weeks=52)
)

# Refresh in default online mode fails when metadata has expired
with self.assertRaises(ExpiredMetadataError):
self._run_refresh()

self.sim.fetch_tracker.metadata.clear()

# Refresh in offline mode succeeds when local metadata has expired
self._run_offline_refresh()
# Expect no download attempts
self.assertListEqual(self.sim.fetch_tracker.metadata, [])

def test_refresh_with_missing_top_level_metadata(self) -> None:
"""Test metadata refresh in offline mode when cache does not contain all top level metadata"""
# Run a "online" updater refresh to get toplevel metadata in local cache
self._run_refresh()

self.sim.fetch_tracker.metadata.clear()

for role in ["targets", "snapshot", "timestamp"]:
fname = os.path.join(self.metadata_dir, f"{role}.json")
os.remove(fname)

# Refresh in offline mode fails since top level metadata is not in cache
with self.assertRaises(DownloadError):
self._run_offline_refresh()
# Expect no download attempts
self.assertListEqual(self.sim.fetch_tracker.metadata, [])

def test_download(self) -> None:
"""Test download in offline mode"""

# Run a "online" updater refresh to get toplevel metadata in local cache
self._run_refresh()

self.sim.fetch_tracker.metadata.clear()
self.sim.fetch_tracker.targets.clear()

# Downloading a target file while in offline mode fails
updater = self._run_offline_refresh()
info = updater.get_targetinfo("file")
assert info
with self.assertRaises(DownloadError):
updater.download_target(info)

# Expect no download attempts
self.assertListEqual(self.sim.fetch_tracker.metadata, [])
self.assertListEqual(self.sim.fetch_tracker.targets, [])

def test_find_cached_target(self) -> None:
"""Test find_cached_target() in offline mode"""

# Run a "online" refresh to get metadata in local cache
updater = self._run_refresh()

# offline find_cached_target() returns None because target is not cached
updater = self._run_offline_refresh()
info = updater.get_targetinfo("file")
assert info
self.assertIsNone(updater.find_cached_target(info))

# Run a "online" download to get target in local cache
updater = self._run_refresh()
info = updater.get_targetinfo("file")
assert info
updater.download_target(info)

self.sim.fetch_tracker.metadata.clear()
self.sim.fetch_tracker.targets.clear()

# offline find_cached_target() succeeds now
updater = self._run_offline_refresh()
info = updater.get_targetinfo("file")
assert info
self.assertIsNotNone(updater.find_cached_target(info))
# Expect no download attempts
self.assertListEqual(self.sim.fetch_tracker.metadata, [])
self.assertListEqual(self.sim.fetch_tracker.targets, [])

def test_get_targetinfo_with_delegated_metadata(self) -> None:
# Run a "online" refresh to get toplevel metadata in local cache
updater = self._run_refresh()

# offline find_cached_target() fails because delegated metadata is not cached
updater = self._run_offline_refresh()
with self.assertRaises(DownloadError):
updater.get_targetinfo("delegated/file2")


if __name__ == "__main__":
if "--dump" in sys.argv:
TestOffline.dump_dir = tempfile.mkdtemp()
print(f"Repository Simulator dumps in {TestOffline.dump_dir}")
sys.argv.remove("--dump")

utils.configure_test_logging(sys.argv)
unittest.main()
50 changes: 27 additions & 23 deletions tuf/ngclient/_internal/trusted_metadata_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,24 @@ class TrustedMetadataSet(abc.Mapping):
to update the metadata with the caller making decisions on what is updated.
"""

def __init__(self, root_data: bytes):
def __init__(self, root_data: bytes, respect_expiry: bool = True):
"""Initialize ``TrustedMetadataSet`` by loading trusted root metadata.

Args:
root_data: Trusted root metadata as bytes. Note that this metadata
will only be verified by itself: it is the source of trust for
all metadata in the ``TrustedMetadataSet``
respect_expiry: If set to False, expired Metadata is considered valid
(all other security checks are still done). This mode should NOT be
used when loading new metadata from remote repository.

Raises:
RepositoryError: Metadata failed to load or verify. The actual
error type and content will contain more details.
"""
self._trusted_set: Dict[str, Metadata] = {}
self.reference_time = datetime.datetime.utcnow()
self._respect_expiry = respect_expiry

# Load and validate the local root metadata. Valid initial trusted root
# metadata is required
Expand Down Expand Up @@ -207,8 +211,8 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]:
raise RuntimeError("Cannot update timestamp after snapshot")

# client workflow 5.3.10: Make sure final root is not expired.
if self.root.signed.is_expired(self.reference_time):
raise exceptions.ExpiredMetadataError("Final root.json is expired")
self._handle_expiry("root", self.root)

# No need to check for 5.3.11 (fast forward attack recovery):
# timestamp/snapshot can not yet be loaded at this point

Expand Down Expand Up @@ -245,25 +249,19 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]:
f", got version {new_snapshot_meta.version}"
)

# expiry not checked to allow old timestamp to be used for rollback
# protection of new timestamp: expiry is checked in update_snapshot()

# Timestamp is set as trusted and then the expiry is checked. The order is this
# so local expired timestamp can be loaded and used to verify new non-expired one
self._trusted_set[Timestamp.type] = new_timestamp
logger.debug("Updated timestamp v%d", new_timestamp.signed.version)

# timestamp is loaded: raise if it is not valid _final_ timestamp
self._check_final_timestamp()
self._handle_expiry("timestamp", self.timestamp)

return new_timestamp

def _check_final_timestamp(self) -> None:
"""Raise if timestamp is expired."""

if self.timestamp.signed.is_expired(self.reference_time):
raise exceptions.ExpiredMetadataError("timestamp.json is expired")

def update_snapshot(
self, data: bytes, trusted: Optional[bool] = False
self,
data: bytes,
trusted: Optional[bool] = False,
) -> Metadata[Snapshot]:
"""Verify and load ``data`` as new snapshot metadata.

Expand Down Expand Up @@ -300,7 +298,7 @@ def update_snapshot(
logger.debug("Updating snapshot")

# Snapshot cannot be loaded if final timestamp is expired
self._check_final_timestamp()
self._handle_expiry("timestamp", self.timestamp)

snapshot_meta = self.timestamp.signed.snapshot_meta

Expand Down Expand Up @@ -341,9 +339,8 @@ def update_snapshot(
f"{new_fileinfo.version}, got {fileinfo.version}."
)

# expiry not checked to allow old snapshot to be used for rollback
# protection of new snapshot: it is checked when targets is updated

# Snapshot is set as trusted and then the expiry is checked. The order is this
# so local expired snapshot can be loaded and used to verify new non-expired one
self._trusted_set[Snapshot.type] = new_snapshot
logger.debug("Updated snapshot v%d", new_snapshot.signed.version)

Expand All @@ -352,11 +349,19 @@ def update_snapshot(

return new_snapshot

def _handle_expiry(self, rolename: str, md: Metadata) -> None:
if md.signed.is_expired(self.reference_time):
message = f"Metadata for {rolename} is expired"
if self._respect_expiry:
raise exceptions.ExpiredMetadataError(message)

logger.warning(message)

def _check_final_snapshot(self) -> None:
"""Raise if snapshot is expired or meta version does not match."""

if self.snapshot.signed.is_expired(self.reference_time):
raise exceptions.ExpiredMetadataError("snapshot.json is expired")
self._handle_expiry("snapshot", self.snapshot)

snapshot_meta = self.timestamp.signed.snapshot_meta
if self.snapshot.signed.version != snapshot_meta.version:
raise exceptions.BadVersionNumberError(
Expand Down Expand Up @@ -436,8 +441,7 @@ def update_delegated_targets(
f"Expected {role_name} v{meta.version}, got v{version}."
)

if new_delegate.signed.is_expired(self.reference_time):
raise exceptions.ExpiredMetadataError(f"New {role_name} is expired")
self._handle_expiry(role_name, new_delegate)

self._trusted_set[role_name] = new_delegate
logger.debug("Updated %s v%d", role_name, version)
Expand Down
2 changes: 2 additions & 0 deletions tuf/ngclient/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@


@dataclass
# pylint: disable=too-many-instance-attributes
class UpdaterConfig:
"""Used to store ``Updater`` configuration.

Expand All @@ -33,3 +34,4 @@ class UpdaterConfig:
snapshot_max_length: int = 2000000 # bytes
targets_max_length: int = 5000000 # bytes
prefix_targets_with_hash: bool = True
offline: bool = False
jku marked this conversation as resolved.
Show resolved Hide resolved
Loading