Skip to content

Commit dd5deee

Browse files
author
Jussi Kukkonen
authored
Merge pull request #1672 from ivanayov/ivanayov/rolenames
Create constants for top-level rolenames
2 parents d991362 + 00589f0 commit dd5deee

9 files changed

+257
-246
lines changed

tests/repository_simulator.py

+13-13
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ def targets(self) -> Targets:
139139

140140
def all_targets(self) -> Iterator[Tuple[str, Targets]]:
141141
"""Yield role name and signed portion of targets one by one."""
142-
yield "targets", self.md_targets.signed
142+
yield Targets.type, self.md_targets.signed
143143
for role, md in self.md_delegates.items():
144144
yield role, md.signed
145145

@@ -181,7 +181,7 @@ def _initialize(self) -> None:
181181
def publish_root(self) -> None:
182182
"""Sign and store a new serialized version of root."""
183183
self.md_root.signatures.clear()
184-
for signer in self.signers["root"].values():
184+
for signer in self.signers[Root.type].values():
185185
self.md_root.sign(signer, append=True)
186186

187187
self.signed_roots.append(self.md_root.to_bytes(JSONSerializer()))
@@ -197,8 +197,8 @@ def fetch(self, url: str) -> Iterator[bytes]:
197197
ver_and_name = path[len("/metadata/") :][: -len(".json")]
198198
version_str, _, role = ver_and_name.partition(".")
199199
# root is always version-prefixed while timestamp is always NOT
200-
if role == "root" or (
201-
self.root.consistent_snapshot and ver_and_name != "timestamp"
200+
if role == Root.type or (
201+
self.root.consistent_snapshot and ver_and_name != Timestamp.type
202202
):
203203
version: Optional[int] = int(version_str)
204204
else:
@@ -248,7 +248,7 @@ def _fetch_metadata(
248248
249249
If version is None, non-versioned metadata is being requested.
250250
"""
251-
if role == "root":
251+
if role == Root.type:
252252
# return a version previously serialized in publish_root()
253253
if version is None or version > len(self.signed_roots):
254254
raise FetcherHTTPError(f"Unknown root version {version}", 404)
@@ -257,11 +257,11 @@ def _fetch_metadata(
257257

258258
# sign and serialize the requested metadata
259259
md: Optional[Metadata]
260-
if role == "timestamp":
260+
if role == Timestamp.type:
261261
md = self.md_timestamp
262-
elif role == "snapshot":
262+
elif role == Snapshot.type:
263263
md = self.md_snapshot
264-
elif role == "targets":
264+
elif role == Targets.type:
265265
md = self.md_targets
266266
else:
267267
md = self.md_delegates.get(role)
@@ -297,7 +297,7 @@ def update_timestamp(self) -> None:
297297
self.timestamp.snapshot_meta.version = self.snapshot.version
298298

299299
if self.compute_metafile_hashes_length:
300-
hashes, length = self._compute_hashes_and_length("snapshot")
300+
hashes, length = self._compute_hashes_and_length(Snapshot.type)
301301
self.timestamp.snapshot_meta.hashes = hashes
302302
self.timestamp.snapshot_meta.length = length
303303

@@ -320,7 +320,7 @@ def update_snapshot(self) -> None:
320320

321321
def add_target(self, role: str, data: bytes, path: str) -> None:
322322
"""Create a target from data and add it to the target_files."""
323-
if role == "targets":
323+
if role == Targets.type:
324324
targets = self.targets
325325
else:
326326
targets = self.md_delegates[role].signed
@@ -339,7 +339,7 @@ def add_delegation(
339339
hash_prefixes: Optional[List[str]],
340340
) -> None:
341341
"""Add delegated target role to the repository."""
342-
if delegator_name == "targets":
342+
if delegator_name == Targets.type:
343343
delegator = self.targets
344344
else:
345345
delegator = self.md_delegates[delegator_name].signed
@@ -375,9 +375,9 @@ def write(self) -> None:
375375

376376
for ver in range(1, len(self.signed_roots) + 1):
377377
with open(os.path.join(dest_dir, f"{ver}.root.json"), "wb") as f:
378-
f.write(self._fetch_metadata("root", ver))
378+
f.write(self._fetch_metadata(Root.type, ver))
379379

380-
for role in ["timestamp", "snapshot", "targets"]:
380+
for role in [Timestamp.type, Snapshot.type, Targets.type]:
381381
with open(os.path.join(dest_dir, f"{role}.json"), "wb") as f:
382382
f.write(self._fetch_metadata(role))
383383

tests/test_api.py

+44-44
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def setUpClass(cls) -> None:
7878

7979
# Load keys into memory
8080
cls.keystore = {}
81-
for role in ["delegation", "snapshot", "targets", "timestamp"]:
81+
for role in ["delegation", Snapshot.type, Targets.type, Timestamp.type]:
8282
cls.keystore[role] = import_ed25519_privatekey_from_file(
8383
os.path.join(cls.keystore_dir, role + "_key"),
8484
password="password",
@@ -92,10 +92,10 @@ def tearDownClass(cls) -> None:
9292

9393
def test_generic_read(self) -> None:
9494
for metadata, inner_metadata_cls in [
95-
("root", Root),
96-
("snapshot", Snapshot),
97-
("timestamp", Timestamp),
98-
("targets", Targets),
95+
(Root.type, Root),
96+
(Snapshot.type, Snapshot),
97+
(Timestamp.type, Timestamp),
98+
(Targets.type, Targets),
9999
]:
100100

101101
# Load JSON-formatted metdata of each supported type from file
@@ -136,7 +136,7 @@ def test_compact_json(self) -> None:
136136
)
137137

138138
def test_read_write_read_compare(self) -> None:
139-
for metadata in ["root", "snapshot", "timestamp", "targets"]:
139+
for metadata in [Root.type, Snapshot.type, Timestamp.type, Targets.type]:
140140
path = os.path.join(self.repo_dir, "metadata", metadata + ".json")
141141
md_obj = Metadata.from_file(path)
142142

@@ -148,7 +148,7 @@ def test_read_write_read_compare(self) -> None:
148148
os.remove(path_2)
149149

150150
def test_to_from_bytes(self) -> None:
151-
for metadata in ["root", "snapshot", "timestamp", "targets"]:
151+
for metadata in [Root.type, Snapshot.type, Timestamp.type, Targets.type]:
152152
path = os.path.join(self.repo_dir, "metadata", metadata + ".json")
153153
with open(path, "rb") as f:
154154
metadata_bytes = f.read()
@@ -169,11 +169,11 @@ def test_sign_verify(self) -> None:
169169
root = Metadata[Root].from_file(root_path).signed
170170

171171
# Locate the public keys we need from root
172-
targets_keyid = next(iter(root.roles["targets"].keyids))
172+
targets_keyid = next(iter(root.roles[Targets.type].keyids))
173173
targets_key = root.keys[targets_keyid]
174-
snapshot_keyid = next(iter(root.roles["snapshot"].keyids))
174+
snapshot_keyid = next(iter(root.roles[Snapshot.type].keyids))
175175
snapshot_key = root.keys[snapshot_keyid]
176-
timestamp_keyid = next(iter(root.roles["timestamp"].keyids))
176+
timestamp_keyid = next(iter(root.roles[Timestamp.type].keyids))
177177
timestamp_key = root.keys[timestamp_keyid]
178178

179179
# Load sample metadata (targets) and assert ...
@@ -192,7 +192,7 @@ def test_sign_verify(self) -> None:
192192
with self.assertRaises(exceptions.UnsignedMetadataError):
193193
targets_key.verify_signature(md_obj, JSONSerializer()) # type: ignore[arg-type]
194194

195-
sslib_signer = SSlibSigner(self.keystore["snapshot"])
195+
sslib_signer = SSlibSigner(self.keystore[Snapshot.type])
196196
# Append a new signature with the unrelated key and assert that ...
197197
sig = md_obj.sign(sslib_signer, append=True)
198198
# ... there are now two signatures, and
@@ -203,7 +203,7 @@ def test_sign_verify(self) -> None:
203203
# ... the returned (appended) signature is for snapshot key
204204
self.assertEqual(sig.keyid, snapshot_keyid)
205205

206-
sslib_signer = SSlibSigner(self.keystore["timestamp"])
206+
sslib_signer = SSlibSigner(self.keystore[Timestamp.type])
207207
# Create and assign (don't append) a new signature and assert that ...
208208
md_obj.sign(sslib_signer, append=False)
209209
# ... there now is only one signature,
@@ -218,7 +218,7 @@ def test_verify_failures(self) -> None:
218218
root = Metadata[Root].from_file(root_path).signed
219219

220220
# Locate the timestamp public key we need from root
221-
timestamp_keyid = next(iter(root.roles["timestamp"].keyids))
221+
timestamp_keyid = next(iter(root.roles[Timestamp.type].keyids))
222222
timestamp_key = root.keys[timestamp_keyid]
223223

224224
# Load sample metadata (timestamp)
@@ -369,20 +369,20 @@ def test_metadata_verify_delegate(self) -> None:
369369
role2 = Metadata[Targets].from_file(role2_path)
370370

371371
# test the expected delegation tree
372-
root.verify_delegate("root", root)
373-
root.verify_delegate("snapshot", snapshot)
374-
root.verify_delegate("targets", targets)
372+
root.verify_delegate(Root.type, root)
373+
root.verify_delegate(Snapshot.type, snapshot)
374+
root.verify_delegate(Targets.type, targets)
375375
targets.verify_delegate("role1", role1)
376376
role1.verify_delegate("role2", role2)
377377

378378
# only root and targets can verify delegates
379379
with self.assertRaises(TypeError):
380-
snapshot.verify_delegate("snapshot", snapshot)
380+
snapshot.verify_delegate(Snapshot.type, snapshot)
381381
# verify fails for roles that are not delegated by delegator
382382
with self.assertRaises(ValueError):
383383
root.verify_delegate("role1", role1)
384384
with self.assertRaises(ValueError):
385-
targets.verify_delegate("targets", targets)
385+
targets.verify_delegate(Targets.type, targets)
386386
# verify fails when delegator has no delegations
387387
with self.assertRaises(ValueError):
388388
role2.verify_delegate("role1", role1)
@@ -391,31 +391,31 @@ def test_metadata_verify_delegate(self) -> None:
391391
expires = snapshot.signed.expires
392392
snapshot.signed.bump_expiration()
393393
with self.assertRaises(exceptions.UnsignedMetadataError):
394-
root.verify_delegate("snapshot", snapshot)
394+
root.verify_delegate(Snapshot.type, snapshot)
395395
snapshot.signed.expires = expires
396396

397397
# verify fails if roles keys do not sign the metadata
398398
with self.assertRaises(exceptions.UnsignedMetadataError):
399-
root.verify_delegate("timestamp", snapshot)
399+
root.verify_delegate(Timestamp.type, snapshot)
400400

401401
# Add a key to snapshot role, make sure the new sig fails to verify
402-
ts_keyid = next(iter(root.signed.roles["timestamp"].keyids))
403-
root.signed.add_key("snapshot", root.signed.keys[ts_keyid])
402+
ts_keyid = next(iter(root.signed.roles[Timestamp.type].keyids))
403+
root.signed.add_key(Snapshot.type, root.signed.keys[ts_keyid])
404404
snapshot.signatures[ts_keyid] = Signature(ts_keyid, "ff" * 64)
405405

406406
# verify succeeds if threshold is reached even if some signatures
407407
# fail to verify
408-
root.verify_delegate("snapshot", snapshot)
408+
root.verify_delegate(Snapshot.type, snapshot)
409409

410410
# verify fails if threshold of signatures is not reached
411-
root.signed.roles["snapshot"].threshold = 2
411+
root.signed.roles[Snapshot.type].threshold = 2
412412
with self.assertRaises(exceptions.UnsignedMetadataError):
413-
root.verify_delegate("snapshot", snapshot)
413+
root.verify_delegate(Snapshot.type, snapshot)
414414

415415
# verify succeeds when we correct the new signature and reach the
416416
# threshold of 2 keys
417-
snapshot.sign(SSlibSigner(self.keystore["timestamp"]), append=True)
418-
root.verify_delegate("snapshot", snapshot)
417+
snapshot.sign(SSlibSigner(self.keystore[Timestamp.type]), append=True)
418+
root.verify_delegate(Snapshot.type, snapshot)
419419

420420
def test_key_class(self) -> None:
421421
# Test if from_securesystemslib_key removes the private key from keyval
@@ -441,44 +441,44 @@ def test_root_add_key_and_remove_key(self) -> None:
441441
)
442442

443443
# Assert that root does not contain the new key
444-
self.assertNotIn(keyid, root.signed.roles["root"].keyids)
444+
self.assertNotIn(keyid, root.signed.roles[Root.type].keyids)
445445
self.assertNotIn(keyid, root.signed.keys)
446446

447447
# Add new root key
448-
root.signed.add_key("root", key_metadata)
448+
root.signed.add_key(Root.type, key_metadata)
449449

450450
# Assert that key is added
451-
self.assertIn(keyid, root.signed.roles["root"].keyids)
451+
self.assertIn(keyid, root.signed.roles[Root.type].keyids)
452452
self.assertIn(keyid, root.signed.keys)
453453

454454
# Confirm that the newly added key does not break
455455
# the object serialization
456456
root.to_dict()
457457

458458
# Try adding the same key again and assert its ignored.
459-
pre_add_keyid = root.signed.roles["root"].keyids.copy()
460-
root.signed.add_key("root", key_metadata)
461-
self.assertEqual(pre_add_keyid, root.signed.roles["root"].keyids)
459+
pre_add_keyid = root.signed.roles[Root.type].keyids.copy()
460+
root.signed.add_key(Root.type, key_metadata)
461+
self.assertEqual(pre_add_keyid, root.signed.roles[Root.type].keyids)
462462

463463
# Add the same key to targets role as well
464-
root.signed.add_key("targets", key_metadata)
464+
root.signed.add_key(Targets.type, key_metadata)
465465

466466
# Add the same key to a nonexistent role.
467467
with self.assertRaises(ValueError):
468468
root.signed.add_key("nosuchrole", key_metadata)
469469

470470
# Remove the key from root role (targets role still uses it)
471-
root.signed.remove_key("root", keyid)
472-
self.assertNotIn(keyid, root.signed.roles["root"].keyids)
471+
root.signed.remove_key(Root.type, keyid)
472+
self.assertNotIn(keyid, root.signed.roles[Root.type].keyids)
473473
self.assertIn(keyid, root.signed.keys)
474474

475475
# Remove the key from targets as well
476-
root.signed.remove_key("targets", keyid)
477-
self.assertNotIn(keyid, root.signed.roles["targets"].keyids)
476+
root.signed.remove_key(Targets.type, keyid)
477+
self.assertNotIn(keyid, root.signed.roles[Targets.type].keyids)
478478
self.assertNotIn(keyid, root.signed.keys)
479479

480480
with self.assertRaises(ValueError):
481-
root.signed.remove_key("root", "nosuchkey")
481+
root.signed.remove_key(Root.type, "nosuchkey")
482482
with self.assertRaises(ValueError):
483483
root.signed.remove_key("nosuchrole", keyid)
484484

@@ -670,7 +670,7 @@ def test_length_and_hash_validation(self) -> None:
670670
targets_path = os.path.join(self.repo_dir, "metadata", "targets.json")
671671
targets = Metadata[Targets].from_file(targets_path)
672672
file1_targetfile = targets.signed.targets["file1.txt"]
673-
filepath = os.path.join(self.repo_dir, "targets", "file1.txt")
673+
filepath = os.path.join(self.repo_dir, Targets.type, "file1.txt")
674674

675675
with open(filepath, "rb") as file1:
676676
file1_targetfile.verify_length_and_hashes(file1)
@@ -688,7 +688,7 @@ def test_length_and_hash_validation(self) -> None:
688688

689689
def test_targetfile_from_file(self) -> None:
690690
# Test with an existing file and valid hash algorithm
691-
file_path = os.path.join(self.repo_dir, "targets", "file1.txt")
691+
file_path = os.path.join(self.repo_dir, Targets.type, "file1.txt")
692692
targetfile_from_file = TargetFile.from_file(
693693
file_path, file_path, ["sha256"]
694694
)
@@ -697,20 +697,20 @@ def test_targetfile_from_file(self) -> None:
697697
targetfile_from_file.verify_length_and_hashes(file)
698698

699699
# Test with a non-existing file
700-
file_path = os.path.join(self.repo_dir, "targets", "file123.txt")
700+
file_path = os.path.join(self.repo_dir, Targets.type, "file123.txt")
701701
with self.assertRaises(FileNotFoundError):
702702
TargetFile.from_file(
703703
file_path, file_path, [sslib_hash.DEFAULT_HASH_ALGORITHM]
704704
)
705705

706706
# Test with an unsupported algorithm
707-
file_path = os.path.join(self.repo_dir, "targets", "file1.txt")
707+
file_path = os.path.join(self.repo_dir, Targets.type, "file1.txt")
708708
with self.assertRaises(exceptions.UnsupportedAlgorithmError):
709709
TargetFile.from_file(file_path, file_path, ["123"])
710710

711711
def test_targetfile_from_data(self) -> None:
712712
data = b"Inline test content"
713-
target_file_path = os.path.join(self.repo_dir, "targets", "file1.txt")
713+
target_file_path = os.path.join(self.repo_dir, Targets.type, "file1.txt")
714714

715715
# Test with a valid hash algorithm
716716
targetfile_from_data = TargetFile.from_data(

0 commit comments

Comments
 (0)