diff --git a/tests/test_metadata_serialization.py b/tests/test_metadata_serialization.py index 9f228d4c8f..c5eafbd296 100644 --- a/tests/test_metadata_serialization.py +++ b/tests/test_metadata_serialization.py @@ -242,10 +242,9 @@ def test_snapshot_serialization(self, test_case_data: str): "no path attribute": '{"keyids": ["keyid"], "name": "a", "terminating": false, \ "path_hash_prefixes": ["h1", "h2"], "threshold": 99}', - "no hash or path prefix": - '{"keyids": ["keyid"], "name": "a", "terminating": true, "threshold": 3}', "unrecognized field": - '{"keyids": ["keyid"], "name": "a", "terminating": true, "threshold": 3, "foo": "bar"}', + '{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], \ + "terminating": true, "threshold": 3, "foo": "bar"}', } @run_sub_tests_with_dataset(valid_delegated_roles) @@ -255,12 +254,27 @@ def test_delegated_role_serialization(self, test_case_data: str): self.assertDictEqual(case_dict, deserialized_role.to_dict()) + invalid_delegated_roles: DataSet = { + "missing hash prefixes and paths": + '{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false}', + "both hash prefixes and paths": + '{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false, \ + "paths": ["fn1", "fn2"], "path_hash_prefixes": ["h1", "h2"]}', + } + + @run_sub_tests_with_dataset(invalid_delegated_roles) + def test_invalid_delegated_role_serialization(self, test_case_data: str): + case_dict = json.loads(test_case_data) + with self.assertRaises(ValueError): + DelegatedRole.from_dict(copy.copy(case_dict)) + + valid_delegations: DataSet = { "all": '{"keys": {"keyid" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}}, \ - "roles": [ {"keyids": ["keyid"], "name": "a", "terminating": true, "threshold": 3} ]}', + "roles": [ {"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], "terminating": true, "threshold": 3} ]}', "unrecognized field": '{"keys": {"keyid" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}}, \ - "roles": [ {"keyids": ["keyid"], "name": "a", "terminating": true, "threshold": 3} ], \ + "roles": [ {"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], "terminating": true, "threshold": 3} ], \ "foo": "bar"}', } @@ -305,13 +319,13 @@ def test_targetfile_serialization(self, test_case_data: str): "targets": { "file.txt": {"length": 12, "hashes": {"sha256" : "abc"} } }, \ "delegations": {"keys": {"keyid" : {"keytype": "rsa", \ "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"} }}, \ - "roles": [ {"keyids": ["keyid"], "name": "a", "terminating": true, "threshold": 3} ]} \ + "roles": [ {"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], "terminating": true, "threshold": 3} ]} \ }', "empty targets": '{"_type": "targets", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "targets": {}, \ "delegations": {"keys": {"keyid" : {"keytype": "rsa", \ "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"} }}, \ - "roles": [ {"keyids": ["keyid"], "name": "a", "terminating": true, "threshold": 3} ]} \ + "roles": [ {"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], "terminating": true, "threshold": 3} ]} \ }', "no delegations": '{"_type": "targets", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "targets": { "file.txt": {"length": 12, "hashes": {"sha256" : "abc"} } } \ diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index a7d967d8d9..edf1376ec2 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -16,8 +16,10 @@ """ import abc +import fnmatch import io import logging +import os import tempfile from collections import OrderedDict from datetime import datetime, timedelta @@ -960,12 +962,12 @@ def update(self, rolename: str, role_info: MetaFile) -> None: class DelegatedRole(Role): """A container with information about a delegated role. - A delegation can happen in three ways: - - paths is None and path_hash_prefixes is None: delegates all targets + A delegation can happen in two ways: - paths is set: delegates targets matching any path pattern in paths - path_hash_prefixes is set: delegates targets whose target path hash starts with any of the prefixes in path_hash_prefixes - paths and path_hash_prefixes are mutually exclusive: both cannot be set. + paths and path_hash_prefixes are mutually exclusive: both cannot be set, + at least one of them must be set. Attributes: name: A string giving the name of the delegated role. @@ -990,10 +992,11 @@ def __init__( self.name = name self.terminating = terminating if paths is not None and path_hash_prefixes is not None: - raise ValueError( - "Only one of the attributes 'paths' and" - "'path_hash_prefixes' can be set!" - ) + raise ValueError("Either paths or path_hash_prefixes can be set") + + if paths is None and path_hash_prefixes is None: + raise ValueError("One of paths or path_hash_prefixes must be set") + self.paths = paths self.path_hash_prefixes = path_hash_prefixes @@ -1031,6 +1034,35 @@ def to_dict(self) -> Dict[str, Any]: res_dict["path_hash_prefixes"] = self.path_hash_prefixes return res_dict + def is_delegated_path(self, target_filepath: str) -> bool: + """Determines whether the given 'target_filepath' is in one of + the paths that DelegatedRole is trusted to provide""" + + if self.path_hash_prefixes is not None: + # Calculate the hash of the filepath + # to determine in which bin to find the target. + digest_object = sslib_hash.digest(algorithm="sha256") + digest_object.update(target_filepath.encode("utf-8")) + target_filepath_hash = digest_object.hexdigest() + + for path_hash_prefix in self.path_hash_prefixes: + if target_filepath_hash.startswith(path_hash_prefix): + return True + + elif self.paths is not None: + for pathpattern in self.paths: + # A delegated role path may be an explicit path or glob + # pattern (Unix shell-style wildcards). Explicit filepaths + # are also considered matches. Make sure to strip any leading + # path separators so that a match is made. + # Example: "foo.tgz" should match with "/*.tgz". + if fnmatch.fnmatch( + target_filepath.lstrip(os.sep), pathpattern.lstrip(os.sep) + ): + return True + + return False + class Delegations: """A container object storing information about all delegations. diff --git a/tuf/ngclient/updater.py b/tuf/ngclient/updater.py index 850f46b9cf..ebf5e6264f 100644 --- a/tuf/ngclient/updater.py +++ b/tuf/ngclient/updater.py @@ -58,16 +58,15 @@ updater.download_target(targetinfo, "~/tufclient/downloads/") """ -import fnmatch import logging import os -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Set, Tuple from urllib import parse -from securesystemslib import hash as sslib_hash from securesystemslib import util as sslib_util from tuf import exceptions +from tuf.api.metadata import Targets from tuf.ngclient._internal import requests_fetcher, trusted_metadata_set from tuf.ngclient.config import UpdaterConfig from tuf.ngclient.fetcher import FetcherInterface @@ -143,7 +142,9 @@ def refresh(self) -> None: self._load_snapshot() self._load_targets("targets", "root") - def get_one_valid_targetinfo(self, target_path: str) -> Dict: + def get_one_valid_targetinfo( + self, target_path: str + ) -> Optional[Dict[str, Any]]: """Returns target information for 'target_path'. The return value can be used as an argument to @@ -169,6 +170,9 @@ def get_one_valid_targetinfo(self, target_path: str) -> Dict: OSError: New metadata could not be written to disk RepositoryError: Metadata failed to verify in some way TODO: download-related errors + + Returns: + A targetinfo dictionary or None """ return self._preorder_depth_first_walk(target_path) @@ -368,26 +372,27 @@ def _load_targets(self, role: str, parent_role: str) -> None: self._trusted_set.update_delegated_targets(data, role, parent_role) self._persist_metadata(role, data) - def _preorder_depth_first_walk(self, target_filepath) -> Dict: + def _preorder_depth_first_walk( + self, target_filepath: str + ) -> Optional[Dict[str, Any]]: """ Interrogates the tree of target delegations in order of appearance (which implicitly order trustworthiness), and returns the matching target found in the most trusted role. """ - target = None - role_names = [("targets", "root")] - visited_role_names = set() + # List of delegations to be interrogated. A (role, parent role) pair + # is needed to load and verify the delegated targets metadata. + delegations_to_visit = [("targets", "root")] + visited_role_names: Set[Tuple[str, str]] = set() number_of_delegations = self.config.max_delegations # Preorder depth-first traversal of the graph of target delegations. - while ( - target is None and number_of_delegations > 0 and len(role_names) > 0 - ): + while number_of_delegations > 0 and len(delegations_to_visit) > 0: # Pop the role name from the top of the stack. - role_name, parent_role = role_names.pop(-1) - self._load_targets(role_name, parent_role) + role_name, parent_role = delegations_to_visit.pop(-1) + # Skip any visited current role to prevent cycles. if (role_name, parent_role) in visited_role_names: logger.debug("Skipping visited current role %s", role_name) @@ -395,183 +400,52 @@ def _preorder_depth_first_walk(self, target_filepath) -> Dict: # The metadata for 'role_name' must be downloaded/updated before # its targets, delegations, and child roles can be inspected. + self._load_targets(role_name, parent_role) - role_metadata = self._trusted_set[role_name].signed + role_metadata: Targets = self._trusted_set[role_name].signed target = role_metadata.targets.get(target_filepath) + if target is not None: + logger.debug("Found target in current role %s", role_name) + return {"filepath": target_filepath, "fileinfo": target} + # After preorder check, add current role to set of visited roles. visited_role_names.add((role_name, parent_role)) # And also decrement number of visited roles. number_of_delegations -= 1 - child_roles = [] - if role_metadata.delegations is not None: - child_roles = role_metadata.delegations.roles - - if target is None: + if role_metadata.delegations is not None: child_roles_to_visit = [] # NOTE: This may be a slow operation if there are many # delegated roles. - for child_role in child_roles: - child_role_name = _visit_child_role( - child_role, target_filepath - ) - - if child_role.terminating and child_role_name is not None: - logger.debug( - "Adding child role %s.\n" - "Not backtracking to other roles.", - child_role_name, - ) - role_names = [] - child_roles_to_visit.append( - (child_role_name, role_name) - ) - break - - if child_role_name is None: - logger.debug("Skipping child role %s", child_role_name) + for child_role in role_metadata.delegations.roles: + if child_role.is_delegated_path(target_filepath): + logger.debug("Adding child role %s", child_role.name) - else: - logger.debug("Adding child role %s", child_role_name) child_roles_to_visit.append( - (child_role_name, role_name) + (child_role.name, role_name) ) - + if child_role.terminating: + logger.debug("Not backtracking to other roles.") + delegations_to_visit = [] + break # Push 'child_roles_to_visit' in reverse order of appearance - # onto 'role_names'. Roles are popped from the end of - # the 'role_names' list. + # onto 'delegations_to_visit'. Roles are popped from the end of + # the list. child_roles_to_visit.reverse() - role_names.extend(child_roles_to_visit) - - else: - logger.debug("Found target in current role %s", role_name) + delegations_to_visit.extend(child_roles_to_visit) - if ( - target is None - and number_of_delegations == 0 - and len(role_names) > 0 - ): + if number_of_delegations == 0 and len(delegations_to_visit) > 0: logger.debug( "%d roles left to visit, but allowed to " "visit at most %d delegations.", - len(role_names), + len(delegations_to_visit), self.config.max_delegations, ) - return {"filepath": target_filepath, "fileinfo": target} - - -def _visit_child_role(child_role: Dict, target_filepath: str) -> str: - """ - - Non-public method that determines whether the given 'target_filepath' - is an allowed path of 'child_role'. - - Ensure that we explore only delegated roles trusted with the target. The - metadata for 'child_role' should have been refreshed prior to this point, - however, the paths/targets that 'child_role' signs for have not been - verified (as intended). The paths/targets that 'child_role' is allowed - to specify in its metadata depends on the delegating role, and thus is - left to the caller to verify. We verify here that 'target_filepath' - is an allowed path according to the delegated 'child_role'. - - TODO: Should the TUF spec restrict the repository to one particular - algorithm? Should we allow the repository to specify in the role - dictionary the algorithm used for these generated hashed paths? - - - child_role: - The delegation targets role object of 'child_role', containing its - paths, path_hash_prefixes, keys, and so on. - - target_filepath: - The path to the target file on the repository. This will be relative to - the 'targets' (or equivalent) directory on a given mirror. - - - None. - - - None. - - - If 'child_role' has been delegated the target with the name - 'target_filepath', then we return the role name of 'child_role'. - - Otherwise, we return None. - """ - - child_role_name = child_role.name - child_role_paths = child_role.paths - child_role_path_hash_prefixes = child_role.path_hash_prefixes - - if child_role_path_hash_prefixes is not None: - target_filepath_hash = _get_filepath_hash(target_filepath) - for child_role_path_hash_prefix in child_role_path_hash_prefixes: - if not target_filepath_hash.startswith(child_role_path_hash_prefix): - continue - - return child_role_name - - elif child_role_paths is not None: - # Is 'child_role_name' allowed to sign for 'target_filepath'? - for child_role_path in child_role_paths: - # A child role path may be an explicit path or glob pattern (Unix - # shell-style wildcards). The child role 'child_role_name' is - # returned if 'target_filepath' is equal to or matches - # 'child_role_path'. Explicit filepaths are also considered - # matches. A repo maintainer might delegate a glob pattern with a - # leading path separator, while the client requests a matching - # target without a leading path separator - make sure to strip any - # leading path separators so that a match is made. - # Example: "foo.tgz" should match with "/*.tgz". - if fnmatch.fnmatch( - target_filepath.lstrip(os.sep), child_role_path.lstrip(os.sep) - ): - logger.debug( - "Child role %s is allowed to sign for %s", - repr(child_role_name), - repr(target_filepath), - ) - - return child_role_name - - logger.debug( - "The given target path %s " - "does not match the trusted path or glob pattern: %s", - repr(target_filepath), - repr(child_role_path), - ) - continue - - else: - # 'role_name' should have been validated when it was downloaded. - # The 'paths' or 'path_hash_prefixes' fields should not be missing, - # so we raise a format error here in case they are both missing. - raise exceptions.FormatError( - repr(child_role_name) + " " - 'has neither a "paths" nor "path_hash_prefixes". At least' - " one of these attributes must be present." - ) - - return None - - -def _get_filepath_hash(target_filepath, hash_function="sha256"): - """ - Calculate the hash of the filepath to determine which bin to find the - target. - """ - # The client currently assumes the repository (i.e., repository - # tool) uses 'hash_function' to generate hashes and UTF-8. - digest_object = sslib_hash.digest(hash_function) - encoded_target_filepath = target_filepath.encode("utf-8") - digest_object.update(encoded_target_filepath) - target_filepath_hash = digest_object.hexdigest() - - return target_filepath_hash + # If this point is reached then target is not found, return None + return None def _ensure_trailing_slash(url: str):