Skip to content

Commit

Permalink
implement fix for #108, initial work for Data Pipeline edges
Browse files Browse the repository at this point in the history
  • Loading branch information
Erik Steringer committed Jan 31, 2022
1 parent ef8e2ef commit dc7ab36
Showing 3 changed files with 102 additions and 17 deletions.
85 changes: 85 additions & 0 deletions principalmapper/graphing/datapipeline_edges.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""Code to identify if a principal in an AWS account can use access to AWS Data Pipeline to access other principals."""


# Copyright (c) NCC Group and Erik Steringer 2019. This file is part of Principal Mapper.
#
# Principal Mapper is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Principal Mapper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Principal Mapper. If not, see <https://www.gnu.org/licenses/>.

import logging
from typing import Dict, List, Optional

from botocore.exceptions import ClientError

from principalmapper.common import Edge, Node
from principalmapper.graphing.edge_checker import EdgeChecker
from principalmapper.querying import query_interface
from principalmapper.querying.local_policy_simulation import resource_policy_authorization, ResourcePolicyEvalResult
from principalmapper.util import arns, botocore_tools

logger = logging.getLogger(__name__)


class DataPipelineEdgeChecker(EdgeChecker):
"""Class for identifying if Data Pipeline can be used by IAM principals to gain access to other IAM principals."""

def return_edges(self, nodes: List[Node], region_allow_list: Optional[List[str]] = None,
region_deny_list: Optional[List[str]] = None, scps: Optional[List[List[dict]]] = None,
client_args_map: Optional[dict] = None, partition: str = 'aws') -> List[Edge]:
"""Fulfills expected method return_edges."""

logger.info('Generating Edges based on Data Pipeline.')

result = generate_edges_locally(nodes, scps)

for edge in result:
logger.info("Found new edge: {}".format(edge.describe_edge()))

return result


def generate_edges_locally(nodes: List[Node], scps: Optional[List[List[dict]]] = None) -> List[Edge]:
"""For Data Pipeline, we do something a little different. The way people can use DataPipeline to pivot is
to create a pipeline, then put a definition on the pipeline that creates an EC2 instance resource. The
role that's used by the EC2 instance is the ultimate target. This requires:
* datapipeline:CreatePipeline (resource "*")
* datapipeline:PutPipelineDefinition (resource "*")
* iam:PassRole for the Data Pipeline Role (which must trust datapipeline.amazonaws.com)
* (TODO: Verify) iam:PassRole for the EC2 Data Pipeline Role (which must trust ec2.amazonaws.com and have an instance profile)
Note that we have two roles involved. Data Pipeline Role, which seems to be a sorta service role but
doesn't have the same path/naming convention as other service roles, is used to actually call EC2 and
spin up the target instance. It's meant to be accessible to datapipeline.amazonaws.com. Then, we have
the EC2 Data Pipeline Role, which actually is accessible to the EC2 instance doing the computational
work of the pipeline.
Other works seemed to indicate the Data Pipeline Role was accessible, however that might not be true
anymore? In any case, recent experimentation only allowed me access to the EC2 Data Pipeline Role.
To create the list of edges, we gather our:
* Potential Data Pipeline Roles
* Potential EC2 Data Pipeline Roles
Then we determine which of the EC2 roles are accessible to the Data Pipeline Roles, then run through all
potential source nodes to see if they have the correct datapipeline:* + iam:PassRole permissions, then generate
edges that have the EC2 roles as destinations.
This vector is neat because even if specific EC2-accessible roles are blocked via ec2:RunInstances, this might be
an alternative option the same as autoscaling was.
"""

results = []

return results
13 changes: 1 addition & 12 deletions principalmapper/querying/query_interface.py
Original file line number Diff line number Diff line change
@@ -288,7 +288,7 @@ def local_check_authorization_full(principal: Node, action_to_check: str, resour
prepped_condition_keys = _prepare_condition_context(conditions_keys_copy)
prepped_condition_keys.update(_infer_condition_keys(principal, prepped_condition_keys))

is_not_service_linked_role = not _check_if_service_linked_role(principal)
is_not_service_linked_role = not query_utils.check_if_service_linked_role(principal)

logger.debug(
'Testing authorization for: principal: {}, action: {}, resource: {}, conditions: {}, Resource Policy: {}, SCPs: {}, Session Policy: {}'.format(
@@ -402,17 +402,6 @@ def local_check_authorization_full(principal: Node, action_to_check: str, resour
return False


def _check_if_service_linked_role(principal: Node) -> bool:
"""Given a Node, determine if it should be treated as a service-linked role. This affects SCP policy decisions as
described in
https://docs.aws.amazon.com/organizations/latest/userguide/orgs_manage_policies_scps.html#not-restricted-by-scp"""

if ':role/' in principal.arn:
role_name = principal.arn.split('/')[-1]
return role_name.startswith('AWSServiceRoleFor')
return False


def simulation_api_check_authorization(iamclient, principal: Node, action_to_check: str, resource_to_check: str,
condition_keys_to_check: dict) -> bool:
"""DO NOT USE THIS FUNCTION, IT WILL ONLY THROW A NotImplementedError."""
21 changes: 16 additions & 5 deletions principalmapper/querying/query_utils.py
Original file line number Diff line number Diff line change
@@ -36,10 +36,11 @@ def get_search_list(graph: Graph, node: Node) -> List[List[Edge]]:
result = []
explored_nodes = []

# Special-case: node is an "admin", so we make up admin edges and return them all
# Special-case: node is an "admin", so we make up admin edges and return them all. BUT, if the destination
# node is the original node or a service-linked role, then we skip those
if node.is_admin:
for other_node in graph.nodes:
if node == other_node:
if node == other_node or check_if_service_linked_role(other_node):
continue
result.append([Edge(node, other_node, 'can access through administrative actions', 'Admin')])
return result
@@ -225,9 +226,8 @@ def get_interaccount_search_list(all_graphs: List[Graph], inter_account_edges: L

def get_edges_interaccount(source_graph: Graph, inter_account_edges: List[Edge], node: Node, ignored_nodes: List[Node]) -> List[Edge]:
"""Given a Node, the Graph it belongs to, a list of inter-account Edges, and a list of Nodes to skip, this returns
any Edges where the Node is the source element as long as the destination element isn't included in the skipped Nodes.
If the given node is an admin, those Edge objects get generated and returned.
any Edges where the Node is the source element as long as the destination element isn't included in the skipped
Nodes.
"""

result = []
@@ -241,3 +241,14 @@ def get_edges_interaccount(source_graph: Graph, inter_account_edges: List[Edge],
result.append(inter_account_edge)

return result


def check_if_service_linked_role(principal: Node) -> bool:
"""Given a Node, determine if it should be treated as a service-linked role. This affects SCP policy decisions as
described in
https://docs.aws.amazon.com/organizations/latest/userguide/orgs_manage_policies_scps.html#not-restricted-by-scp"""

if ':role/' in principal.arn:
role_name = principal.arn.split('/')[-1]
return role_name.startswith('AWSServiceRoleFor')
return False

0 comments on commit dc7ab36

Please sign in to comment.