Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gcp): improve docstrings #5716

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions prowler/providers/gcp/gcp_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,31 @@


class GcpProvider(Provider):
"""
GCP Provider class to handle the GCP provider

Attributes:
- _type: str -> The provider type
- _session: Credentials -> The GCP credentials session
- _project_ids: list -> The list of project IDs
- _excluded_project_ids: list -> The list of excluded project IDs
- _identity: GCPIdentityInfo -> The GCP identity info
- _audit_config: dict -> The audit config
- _mutelist: GCPMutelist -> The GCP mutelist
- audit_metadata: Audit_Metadata -> The audit metadata

Methods:
- __init__ -> GCP Provider constructora
- setup_session -> Setup the GCP session
- test_connection -> Test the connection to GCP
- print_credentials -> Print the GCP credentials
- get_projects -> Get the projects accessible by the provided credentials
- update_projects_with_organizations -> Update the projects with organizations
- is_project_matching -> Check if the input project matches the project to match
- validate_static_arguments -> Validate the static arguments
- validate_project_id -> Validate the provider ID
"""

_type: str = "gcp"
_session: Credentials
_project_ids: list
Expand Down Expand Up @@ -81,6 +106,54 @@ def __init__(
client_id: str
client_secret: str
refresh_token: str

Raises:
GCPNoAccesibleProjectsError if no project IDs can be accessed via Google Credentials
GCPSetUpSessionError if an error occurs during the setup session
GCPLoadCredentialsFromDictError if an error occurs during the loading credentials from dict
GCPGetProjectError if an error occurs during the get project

Returns:
None

Usage:
- Authentication: Prowler will use by default your User Account credentials, you can configure it using:
- gcloud init to use a new account
- gcloud config set account <account> to use a specific account
- gcloud auth application-default login to use the Application Default Credentials
- Prowler will use the Application Default Credentials if no credentials are provided
- Using static credentials:
>>> GcpProvider(
... client_id="client_id",
... client_secret="client_secret",
... refresh_token="refresh_token"
... )
- Using a credentials file:
>>> GcpProvider(
... credentials_file="credentials_file"
... )
- Impersonating a service account: If you want to impersonate a GCP service account, you can use the impersonate_service_account parameter:
>>> GcpProvider(
... impersonate_service_account="service_account"
... )
- Projects: Prowler is multi-project, which means that is going to scan all the Google Cloud projects that the authenticated user has access to.
- If you want to scan a specific project(s), you can use the project-ids argument.
>>> GcpProvider(
... project_ids=["project_id1", "project_id2"]
... )
- If you want to exclude a specific project(s), you can use the excluded-project-ids argument.
>>> GcpProvider(
... excluded_project_ids=["project_id1", "project_id2"]
... )
* Note: You can use asterisk * to exclude projects that match a pattern. For example, using "sys*" will exclude all the projects that start with sys.
- If you want to list all the available project IDs, you can use the list-project-ids argument.
>>> GcpProvider(
... list_project_ids=True
... )
- Organizations: If you want to scan a specific organization, you can use the organization-id argument. With this argument, Prowler will scan all the projects under that organization.
>>> GcpProvider(
... organization_id="organization_id"
... )
"""
logger.info("Instantiating GCP Provider ...")
self._impersonated_service_account = impersonate_service_account
Expand Down Expand Up @@ -233,11 +306,21 @@ def setup_session(
) -> tuple:
"""
Setup the GCP session with the provided credentials file or service account to impersonate

Args:
credentials_file: str
service_account: str

Returns:
Credentials object and default project ID

Raises:
GCPLoadCredentialsFromDictError if an error occurs during the loading credentials from dict
GCPSetUpSessionError if an error occurs during the setup session

Usage:
>>> GcpProvider.setup_session(credentials_file, service_account)
>>> GcpProvider.setup_session(service_account, gcp_credentials)
"""
try:
scopes = ["https://www.googleapis.com/auth/cloud-platform"]
Expand Down Expand Up @@ -305,6 +388,7 @@ def test_connection(
If the connection is successful, return a Connection object with is_connected set to True. If the connection fails, return a Connection object with error set to the exception.
Raise an exception if raise_on_exception is set to True.
If the Cloud Resource Manager API has not been used before or it is disabled, log a critical message and return a Connection object with error set to the exception.

Args:
credentials_file: str
service_account: str
Expand All @@ -313,8 +397,32 @@ def test_connection(
client_secret: str
refresh_token: str
provider_id: Optional[str] -> The provider ID, for GCP it is the project ID

Returns:
Connection object with is_connected set to True if the connection is successful, or error set to the exception if the connection fails

Raises:
GCPLoadCredentialsFromDictError if an error occurs during the loading credentials from dict
GCPSetUpSessionError if an error occurs during the setup session
GCPCloudResourceManagerAPINotUsedError if the Cloud Resource Manager API has not been used before or it is disabled
GCPInvalidProviderIdError if the provider ID does not match with the expected project_id
GCPTestConnectionError if an error occurs during the test connection

Usage:
- Using static credentials:
>>> GcpProvider.test_connection(
... client_id="client_id",
... client_secret="client_secret",
... refresh_token="refresh_token"
... )
- Using a credentials file:
>>> GcpProvider.test_connection(
... credentials_file="credentials_file"
... )
- Using a service account to impersonate:
>>> GcpProvider.test_connection(
... service_account="service_account"
... )
"""
try:
# Set the GCP credentials using the provided client_id, client_secret and refresh_token
Expand Down Expand Up @@ -415,11 +523,22 @@ def get_projects(
) -> dict[str, GCPProject]:
"""
Get the projects accessible by the provided credentials. If an organization ID is provided, only the projects under that organization are returned.

Args:
credentials: Credentials
organization_id: str

Returns:
dict[str, GCPProject]

Raises:
GCPCloudResourceManagerAPINotUsedError if the Cloud Resource Manager API has not been used before or it is disabled
GCPCloudAssetAPINotUsedError if the Cloud Asset API has not been used before or it is disabled
GCPHTTPError if an error occurs during the HTTP request
GCPGetProjectError if an error occurs during the get project

Usage:
>>> GcpProvider.get_projects(credentials=credentials, organization_id=organization_id)
"""
try:
projects = {}
Expand Down Expand Up @@ -535,6 +654,19 @@ def get_projects(
return projects

def update_projects_with_organizations(self):
"""
Update the projects with organizations

Returns:
None

Raises:
GCPHTTPError if an error occurs during the HTTP request
GCPGetProjectError if an error occurs during the get project

Usage:
>>> GcpProvider.update_projects_with_organizations()
"""
try:
service = discovery.build(
"cloudresourcemanager", "v1", credentials=self._session
Expand Down Expand Up @@ -571,11 +703,16 @@ def update_projects_with_organizations(self):
def is_project_matching(self, input_project: str, project_to_match: str) -> bool:
"""
Check if the input project matches the project to match

Args:
input_project: str
project_to_match: str

Returns:
bool

Usage:
>>> GcpProvider.is_project_matching(input_project, project_to_match)
"""
return (
"*" in input_project
Expand All @@ -602,6 +739,9 @@ def validate_static_arguments(

Raises:
GCPStaticCredentialsError if any of the static arguments is missing

Usage:
>>> GcpProvider.validate_static_arguments(client_id, client_secret, refresh_token)
"""

if not client_id or not client_secret or not refresh_token:
Expand Down Expand Up @@ -631,6 +771,9 @@ def validate_project_id(provider_id: str, credentials: str = None) -> None:

Raises:
GCPInvalidProviderIdError if the provider ID does not match with the expected project_id

Usage:
>>> GcpProvider.validate_project_id(provider_id, credentials)
"""

available_projects = list(
Expand Down