Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Integration][AWS] | Fix InvalidToken Exceptions Due to Improper Token Refresh Calls #1190

Open
wants to merge 40 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
c4c8e36
Fix an issue where the integration enters an endless loop on permissi…
mk-armah Nov 21, 2024
02c9c6f
lint
mk-armah Nov 21, 2024
7f6187b
Merge branch 'main' into aws/port-11555
mk-armah Nov 21, 2024
75d6190
removed permission missing error log
mk-armah Nov 21, 2024
629af39
Merge branch 'aws/port-11555' of https://github.com/port-labs/ocean i…
mk-armah Nov 21, 2024
ddc123b
Merge branch 'main' into aws/port-11555
mk-armah Nov 21, 2024
3bac44a
bumped integration version
mk-armah Nov 21, 2024
f794011
removed unnecessary calls for token refresh
mk-armah Nov 27, 2024
6e8ae57
Merge branch 'main' into aws/port-11555
mk-armah Nov 28, 2024
186a768
Fix tests and lints
mk-armah Nov 29, 2024
a700b11
Merge branch 'main' into aws/port-11555
mk-armah Dec 3, 2024
2e65ce4
switch token refresh strategy from cache to aiobotocore refreshable c…
mk-armah Dec 3, 2024
17c9cdf
resolve conflicts
mk-armah Dec 3, 2024
5cc7258
remodel token refresh strategy
mk-armah Dec 3, 2024
7e5e81e
tests
mk-armah Dec 3, 2024
072894b
tests
mk-armah Dec 4, 2024
004a3d8
reduce sts client object creationc
mk-armah Dec 4, 2024
ecbd9a2
Merge branch 'main' into aws/port-11555
mk-armah Dec 4, 2024
b79b4de
bump integration version
mk-armah Dec 4, 2024
69c176e
lint fix
mk-armah Dec 4, 2024
418176e
use setattr to pass refreshable_credentials to botocore
mk-armah Dec 4, 2024
2c59250
restore naming convention
mk-armah Dec 10, 2024
ec9fd92
clean up codebase and resolve test failures
mk-armah Dec 11, 2024
e8a9de8
Merge branch 'main' into aws/port-11555
mk-armah Dec 11, 2024
afa2dec
Merge branch 'main' into aws/port-11555
phalbert Dec 23, 2024
12029a8
Assume role for refreshable credentials once
mk-armah Jan 3, 2025
098c2d3
Merge branch 'aws/port-11555' of https://github.com/port-labs/ocean i…
mk-armah Jan 3, 2025
7d113c5
Merge branch 'main' into aws/port-11555
mk-armah Jan 7, 2025
d0a122d
ready for rc release
mk-armah Jan 7, 2025
6b486c1
resolving UnrecognizedClientException issue
mk-armah Jan 8, 2025
aa06383
debuggin
mk-armah Jan 9, 2025
e07a666
Merge branch 'main' into aws/port-11555
mk-armah Jan 9, 2025
9a816a0
Merge branch 'aws/port-11555' of https://github.com/port-labs/ocean i…
mk-armah Jan 9, 2025
baca035
ready for rc release
mk-armah Jan 9, 2025
655a49c
lint
mk-armah Jan 9, 2025
bb63690
debug switch case for isrole
mk-armah Jan 9, 2025
42f9456
Merge branch 'main' into aws/port-11555
mk-armah Jan 13, 2025
7246b49
resolved AccountNotFoundError
mk-armah Jan 13, 2025
0c5db5d
Merge branch 'main' into aws/port-11555
mk-armah Jan 13, 2025
3204bd4
Merge branch 'main' into aws/port-11555
mk-armah Jan 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions integrations/aws/.port/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ configurations:
require: false
description: The number of concurrent accounts to scan. By default, it is set to 50.
default: 50
- name: assumeRoleDuration
type: integer
require: false
description: The duration in seconds for which the credentials are valid. By default, it is set to 3600 seconds.
default: 3600
deploymentMethodRequirements:
- type: default
configurations: ['awsAccessKeyId', 'awsSecretAccessKey']
Expand Down
9 changes: 9 additions & 0 deletions integrations/aws/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

## 0.2.64 (2024-11-28)


### Bug Fixes

- Fixed a bug where token refresh fails because its triggered while an active session was still using the old token.


## 0.2.63 (2024-11-25)


### Bug Fixes

- Do not break delete entities when a region is not accessible


## 0.2.62 (2024-11-25)


Expand Down
11 changes: 6 additions & 5 deletions integrations/aws/aws/session_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ class AccountNotFoundError(OceanAbortException):
pass


ASSUME_ROLE_DURATION_SECONDS = 3600 # 1 hour


class SessionManager:
def __init__(self) -> None:
"""
Expand Down Expand Up @@ -98,7 +95,7 @@ async def _get_organization_session(self) -> aioboto3.Session | None:
organizations_client = await sts_client.assume_role(
RoleArn=organization_role_arn,
RoleSessionName="OceanOrgAssumeRoleSession",
DurationSeconds=ASSUME_ROLE_DURATION_SECONDS,
DurationSeconds=self._assume_role_duration_seconds(),
)

credentials = organizations_client["Credentials"]
Expand All @@ -121,6 +118,10 @@ async def _get_organization_session(self) -> aioboto3.Session | None:
def _get_account_read_role_name(self) -> str:
return ocean.integration_config.get("account_read_role_name", "")

@staticmethod
def _assume_role_duration_seconds() -> int:
return int(ocean.integration_config["assume_role_duration"])

async def _update_available_access_credentials(self) -> None:
logger.info("Updating AWS credentials")
async with (
Expand Down Expand Up @@ -156,7 +157,7 @@ async def _assume_role_and_update_credentials(
account_role = await sts_client.assume_role(
RoleArn=f'arn:aws:iam::{account["Id"]}:role/{self._get_account_read_role_name()}',
RoleSessionName="OceanMemberAssumeRoleSession",
DurationSeconds=ASSUME_ROLE_DURATION_SECONDS,
DurationSeconds=self._assume_role_duration_seconds(),
)
raw_credentials = account_role["Credentials"]
credentials = AwsCredentials(
Expand Down
10 changes: 4 additions & 6 deletions integrations/aws/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ async def resync_resources_for_account(
aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config)

if is_global_resource(kind):
logger.info(
f"Handling global resource {kind} for account {credentials.account_id}"
)
Comment on lines +80 to +82
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

async for batch in _handle_global_resource_resync(
kind, credentials, aws_resource_config
):
Expand Down Expand Up @@ -119,6 +122,7 @@ async def resync_all(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
return

await update_available_access_credentials()

tasks = [
semaphore_async_iterator(
semaphore,
Expand All @@ -128,7 +132,6 @@ async def resync_all(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
]
if tasks:
async for batch in stream_async_iterators_tasks(*tasks):
await update_available_access_credentials()
yield batch


Expand Down Expand Up @@ -162,7 +165,6 @@ async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
]
if tasks:
async for batch in stream_async_iterators_tasks(*tasks):
await update_available_access_credentials()
yield batch


Expand Down Expand Up @@ -190,7 +192,6 @@ async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:

if tasks:
async for batch in stream_async_iterators_tasks(*tasks):
await update_available_access_credentials()
yield batch


Expand Down Expand Up @@ -218,7 +219,6 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:

if tasks:
async for batch in stream_async_iterators_tasks(*tasks):
await update_available_access_credentials()
yield batch


Expand Down Expand Up @@ -246,7 +246,6 @@ async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
]
if tasks:
async for batch in stream_async_iterators_tasks(*tasks):
await update_available_access_credentials()
yield batch


Expand Down Expand Up @@ -274,7 +273,6 @@ async def resync_cloudformation(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:

if tasks:
async for batch in stream_async_iterators_tasks(*tasks):
await update_available_access_credentials()
yield batch


Expand Down
2 changes: 1 addition & 1 deletion integrations/aws/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "aws"
version = "0.2.63"
version = "0.2.64"
description = "This integration will map all your resources in all the available accounts to your Port entities"
authors = ["Shalev Avhar <[email protected]>", "Erik Zaadi <[email protected]>"]

Expand Down
20 changes: 13 additions & 7 deletions integrations/aws/tests/utils/test_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ async def _create_iterator_tasks(func: Any, count: int) -> List[Any]:

@patch("utils.aws._session_manager.reset", new_callable=AsyncMock)
@patch("utils.aws.lock", new_callable=AsyncMock)
@patch(
"port_ocean.context.ocean.PortOceanContext.integration_config",
return_value={"assume_role_duration": 3600},
)
async def test_multiple_task_execution(
self, mock_lock: AsyncMock, mock_reset: AsyncMock
self, integration_config_mock: Any, mock_lock: AsyncMock, mock_reset: AsyncMock
) -> None:
tasks: List[Any] = await self._create_iterator_tasks(
self._run_update_access_iterator_result, 10
Expand All @@ -35,20 +39,22 @@ async def test_multiple_task_execution(
# Assert that the reset method was awaited exactly once (i.e., no thundering herd)
mock_reset.assert_awaited_once()

mock_lock.__aenter__.assert_awaited_once()
mock_lock.__aexit__.assert_awaited_once()
self.assertEqual(mock_lock.__aenter__.call_count, 10)
self.assertEqual(mock_lock.__aexit__.call_count, 10)


class TestAwsSessions(unittest.IsolatedAsyncioTestCase):
def setUp(self) -> None:
self.session_manager_mock: AsyncMock = patch(
"utils.aws._session_manager", autospec=SessionManager
).start()
patcher = patch("utils.aws._session_manager", autospec=SessionManager)
self.session_manager_mock = patcher.start()
self.addCleanup(patcher.stop)

self.session_manager_mock._assume_role_duration_seconds.return_value = 3600

self.credentials_mock: AsyncMock = AsyncMock(spec=AwsCredentials)
self.session_mock: AsyncMock = AsyncMock(spec=Session)

def tearDown(self) -> None:
async def asyncTearDown(self) -> None:
patch.stopall()

async def test_get_sessions_with_custom_account_id(self) -> None:
Expand Down
29 changes: 14 additions & 15 deletions integrations/aws/utils/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,33 @@
from port_ocean.context.ocean import ocean
from starlette.requests import Request

from aws.session_manager import SessionManager, ASSUME_ROLE_DURATION_SECONDS
from aws.session_manager import SessionManager
from aws.aws_credentials import AwsCredentials

from aiocache import cached, Cache # type: ignore
from aiocache import Cache # type: ignore
from asyncio import Lock

from port_ocean.utils.async_iterators import stream_async_iterators_tasks


_session_manager: SessionManager = SessionManager()
lock = Lock()
cache = Cache(Cache.MEMORY)

CACHE_DURATION_SECONDS = (
0.80 * ASSUME_ROLE_DURATION_SECONDS
) # Refresh role credentials after exhausting 80% of the session duration

lock = Lock()
def _get_cache_duration_seconds() -> float:
return 0.50 * _session_manager._assume_role_duration_seconds()


@cached(ttl=CACHE_DURATION_SECONDS, cache=Cache.MEMORY)
async def update_available_access_credentials() -> bool:
"""
Fetches the AWS account IDs that the current IAM role can access.
and saves them up to use as sessions

:return: List of AWS account IDs.
"""
cache_key = "update_available_access_credentials"
async with lock:
result = await cache.get(cache_key)
if result is not None:
return result

await _session_manager.reset()
# makes this run once per DurationSeconds
await cache.set(cache_key, True, ttl=_get_cache_duration_seconds())
return True


Expand All @@ -50,6 +49,7 @@ async def get_accounts() -> AsyncIterator[AwsCredentials]:
Gets the AWS account IDs that the current IAM role can access.
"""
await update_available_access_credentials()

for credentials in _session_manager._aws_credentials:
yield credentials

Expand Down Expand Up @@ -78,7 +78,6 @@ async def get_sessions(
"""
Gets boto3 sessions for the AWS regions.
"""
await update_available_access_credentials()

if custom_account_id:
credentials = _session_manager.find_credentials_by_account_id(custom_account_id)
Expand Down
4 changes: 4 additions & 0 deletions integrations/aws/utils/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
import asyncio


def get_assume_role_duration_seconds() -> int:
return int(ocean.integration_config["assume_role_duration"])


def get_semaphore() -> asyncio.BoundedSemaphore:
max_concurrent_accounts: int = int(
ocean.integration_config["maximum_concurrent_accounts"]
Expand Down