diff --git a/principalmapper/graphing/datapipeline_edges.py b/principalmapper/graphing/datapipeline_edges.py new file mode 100644 index 0000000..c9ce735 --- /dev/null +++ b/principalmapper/graphing/datapipeline_edges.py @@ -0,0 +1,85 @@ +"""Code to identify if a principal in an AWS account can use access to AWS Data Pipeline to access other principals.""" + + +# Copyright (c) NCC Group and Erik Steringer 2019. This file is part of Principal Mapper. +# +# Principal Mapper is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Principal Mapper is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Principal Mapper. If not, see . + +import logging +from typing import Dict, List, Optional + +from botocore.exceptions import ClientError + +from principalmapper.common import Edge, Node +from principalmapper.graphing.edge_checker import EdgeChecker +from principalmapper.querying import query_interface +from principalmapper.querying.local_policy_simulation import resource_policy_authorization, ResourcePolicyEvalResult +from principalmapper.util import arns, botocore_tools + +logger = logging.getLogger(__name__) + + +class DataPipelineEdgeChecker(EdgeChecker): + """Class for identifying if Data Pipeline can be used by IAM principals to gain access to other IAM principals.""" + + def return_edges(self, nodes: List[Node], region_allow_list: Optional[List[str]] = None, + region_deny_list: Optional[List[str]] = None, scps: Optional[List[List[dict]]] = None, + client_args_map: Optional[dict] = None, partition: str = 'aws') -> List[Edge]: + """Fulfills expected method return_edges.""" + + logger.info('Generating Edges based on Data Pipeline.') + + result = generate_edges_locally(nodes, scps) + + for edge in result: + logger.info("Found new edge: {}".format(edge.describe_edge())) + + return result + + +def generate_edges_locally(nodes: List[Node], scps: Optional[List[List[dict]]] = None) -> List[Edge]: + """For Data Pipeline, we do something a little different. The way people can use DataPipeline to pivot is + to create a pipeline, then put a definition on the pipeline that creates an EC2 instance resource. The + role that's used by the EC2 instance is the ultimate target. This requires: + + * datapipeline:CreatePipeline (resource "*") + * datapipeline:PutPipelineDefinition (resource "*") + * iam:PassRole for the Data Pipeline Role (which must trust datapipeline.amazonaws.com) + * (TODO: Verify) iam:PassRole for the EC2 Data Pipeline Role (which must trust ec2.amazonaws.com and have an instance profile) + + Note that we have two roles involved. Data Pipeline Role, which seems to be a sorta service role but + doesn't have the same path/naming convention as other service roles, is used to actually call EC2 and + spin up the target instance. It's meant to be accessible to datapipeline.amazonaws.com. Then, we have + the EC2 Data Pipeline Role, which actually is accessible to the EC2 instance doing the computational + work of the pipeline. + + Other works seemed to indicate the Data Pipeline Role was accessible, however that might not be true + anymore? In any case, recent experimentation only allowed me access to the EC2 Data Pipeline Role. + + To create the list of edges, we gather our: + + * Potential Data Pipeline Roles + * Potential EC2 Data Pipeline Roles + + Then we determine which of the EC2 roles are accessible to the Data Pipeline Roles, then run through all + potential source nodes to see if they have the correct datapipeline:* + iam:PassRole permissions, then generate + edges that have the EC2 roles as destinations. + + This vector is neat because even if specific EC2-accessible roles are blocked via ec2:RunInstances, this might be + an alternative option the same as autoscaling was. + """ + + results = [] + + return results diff --git a/principalmapper/querying/query_interface.py b/principalmapper/querying/query_interface.py index fd5366e..376be89 100644 --- a/principalmapper/querying/query_interface.py +++ b/principalmapper/querying/query_interface.py @@ -288,7 +288,7 @@ def local_check_authorization_full(principal: Node, action_to_check: str, resour prepped_condition_keys = _prepare_condition_context(conditions_keys_copy) prepped_condition_keys.update(_infer_condition_keys(principal, prepped_condition_keys)) - is_not_service_linked_role = not _check_if_service_linked_role(principal) + is_not_service_linked_role = not query_utils.check_if_service_linked_role(principal) logger.debug( 'Testing authorization for: principal: {}, action: {}, resource: {}, conditions: {}, Resource Policy: {}, SCPs: {}, Session Policy: {}'.format( @@ -402,17 +402,6 @@ def local_check_authorization_full(principal: Node, action_to_check: str, resour return False -def _check_if_service_linked_role(principal: Node) -> bool: - """Given a Node, determine if it should be treated as a service-linked role. This affects SCP policy decisions as - described in - https://docs.aws.amazon.com/organizations/latest/userguide/orgs_manage_policies_scps.html#not-restricted-by-scp""" - - if ':role/' in principal.arn: - role_name = principal.arn.split('/')[-1] - return role_name.startswith('AWSServiceRoleFor') - return False - - def simulation_api_check_authorization(iamclient, principal: Node, action_to_check: str, resource_to_check: str, condition_keys_to_check: dict) -> bool: """DO NOT USE THIS FUNCTION, IT WILL ONLY THROW A NotImplementedError.""" diff --git a/principalmapper/querying/query_utils.py b/principalmapper/querying/query_utils.py index 36b336c..5ecc6bf 100644 --- a/principalmapper/querying/query_utils.py +++ b/principalmapper/querying/query_utils.py @@ -36,10 +36,11 @@ def get_search_list(graph: Graph, node: Node) -> List[List[Edge]]: result = [] explored_nodes = [] - # Special-case: node is an "admin", so we make up admin edges and return them all + # Special-case: node is an "admin", so we make up admin edges and return them all. BUT, if the destination + # node is the original node or a service-linked role, then we skip those if node.is_admin: for other_node in graph.nodes: - if node == other_node: + if node == other_node or check_if_service_linked_role(other_node): continue result.append([Edge(node, other_node, 'can access through administrative actions', 'Admin')]) return result @@ -225,9 +226,8 @@ def get_interaccount_search_list(all_graphs: List[Graph], inter_account_edges: L def get_edges_interaccount(source_graph: Graph, inter_account_edges: List[Edge], node: Node, ignored_nodes: List[Node]) -> List[Edge]: """Given a Node, the Graph it belongs to, a list of inter-account Edges, and a list of Nodes to skip, this returns - any Edges where the Node is the source element as long as the destination element isn't included in the skipped Nodes. - - If the given node is an admin, those Edge objects get generated and returned. + any Edges where the Node is the source element as long as the destination element isn't included in the skipped + Nodes. """ result = [] @@ -241,3 +241,14 @@ def get_edges_interaccount(source_graph: Graph, inter_account_edges: List[Edge], result.append(inter_account_edge) return result + + +def check_if_service_linked_role(principal: Node) -> bool: + """Given a Node, determine if it should be treated as a service-linked role. This affects SCP policy decisions as + described in + https://docs.aws.amazon.com/organizations/latest/userguide/orgs_manage_policies_scps.html#not-restricted-by-scp""" + + if ':role/' in principal.arn: + role_name = principal.arn.split('/')[-1] + return role_name.startswith('AWSServiceRoleFor') + return False