-
-
Notifications
You must be signed in to change notification settings - Fork 184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support moto client wrappers #755
Comments
The # Use dummy AWS credentials
AWS_REGION = "us-west-2"
AWS_ACCESS_KEY_ID = "dummy_AWS_ACCESS_KEY_ID"
AWS_SECRET_ACCESS_KEY = "dummy_AWS_SECRET_ACCESS_KEY"
@pytest.fixture
def aws_credentials(monkeypatch):
monkeypatch.setenv("AWS_ACCESS_KEY_ID", AWS_ACCESS_KEY_ID)
monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", AWS_SECRET_ACCESS_KEY)
monkeypatch.setenv("AWS_SECURITY_TOKEN", "testing")
monkeypatch.setenv("AWS_SESSION_TOKEN", "testing")
@pytest.fixture(scope="session")
def aws_region():
return AWS_REGION
@pytest.fixture
def aio_aws_session(aws_credentials, aws_region, event_loop):
session = aiobotocore.get_session(loop=event_loop)
session.user_agent_name = "aiobotocore-pytest"
assert session.get_default_client_config() is None
aioconfig = aiobotocore.config.AioConfig(max_pool_connections=1, region_name=aws_region)
session.set_default_client_config(aioconfig)
assert session.get_default_client_config() == aioconfig
# ensure fake credentials
session.set_credentials(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
# Try this debug logger, but it might be overkill
session.set_debug_logger(logger_name="aiobotocore-pytest")
# # Add custom response parser factory
# aio_response_parser_factory = AioResponseParserFactory()
# session.register_component("response_parser_factory", aio_response_parser_factory)
yield session
@pytest.fixture
@pytest.mark.asyncio
async def aio_aws_batch_client(aio_aws_session):
with mock_batch():
async with aio_aws_session.create_client("batch") as client:
yield client
@pytest.mark.asyncio
async def test_aio_aws_batch_client(aio_aws_batch_client):
assert isinstance(aio_aws_batch_client, BaseClient)
job_queues = await aio_aws_batch_client.describe_job_queues()
assert job_queues == {
"ResponseMetadata": {
"HTTPStatusCode": 200,
"HTTPHeaders": {"server": "amazon.com"},
"RetryAttempts": 0,
},
"jobQueues": [],
} patched site-package code for
|
It's possible that Although this documentation is on boto3, the event system is also in botocore: By editing site-packages as follows and then running async def _do_get_response(self, request, operation_model):
try:
logger.debug("Sending http request: %s", request)
history_recorder.record('HTTP_REQUEST', {
'method': request.method,
'headers': request.headers,
'streaming': operation_model.has_streaming_input,
'url': request.url,
'body': request.body
})
service_id = operation_model.service_model.service_id.hyphenize()
event_name = 'before-send.%s.%s' % (service_id, operation_model.name)
responses = self._event_emitter.emit(event_name, request=request)
http_response = first_non_none_response(responses)
assert False
if http_response is None:
http_response = await self._send(request) (Pdb) http_response = first_non_none_response(responses)
(Pdb) http_response
<botocore.awsrequest.AWSResponse object at 0x7fafc23bcf98> So this test never hits the It's not clear whether the event emitter has any details about the registered callable that returns the (Pdb) dir(self._event_emitter)
['__class__', '__copy__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_alias_event_name', '_emitter', '_event_aliases', '_replace_subsection', '_verify_accept_kwargs', '_verify_and_register', '_verify_is_callable', 'emit', 'emit_until_response', 'register', 'register_first', 'register_last', 'unregister'] If the hack to add the To add a proxy requires an
To get that working requires using moto-server or something. Somehow there must be an easy way to:
In the
moto uses: from botocore.handlers import BUILTIN_HANDLERS
botocore_stubber = BotocoreStubber()
BUILTIN_HANDLERS.append(("before-send", botocore_stubber)) where For example, an AWS
Note esp. the moto callback in:
|
My understanding is that botocore is using the former (what Moto uses) going forward and deprecating the use of requests. |
It's possible to detect when moto mocks are active, e.g. def has_moto_mocks(client, event_name):
# moto registers mock callbacks with the `before-send` event-name, using
# specific callbacks for the methods that are generated dynamically. By
# checking that the first callback is a BotocoreStubber, this verifies
# that moto mocks are intercepting client requests.
callbacks = client.meta.events._emitter._lookup_cache[event_name]
if len(callbacks) > 0:
stub = callbacks[0]
assert isinstance(stub, BotocoreStubber)
return stub.enabled
return False I don't know if it's possible to simply disable it with When I find some time to craft a full PR on this, there are better ways to work around this using the # assumes python >= py3.6 (async generators are OK)
@pytest.fixture
async def aio_aws_s3_server():
async with MotoService("s3") as svc:
yield svc.endpoint_url
@pytest.fixture
def aio_aws_session(aws_credentials, aws_region, event_loop):
# pytest-asyncio provides and manages the `event_loop`
session = aiobotocore.get_session(loop=event_loop)
session.user_agent_name = "aiomoto"
assert session.get_default_client_config() is None
aioconfig = aiobotocore.config.AioConfig(max_pool_connections=1, region_name=aws_region)
# forget about adding any proxies for moto.server, that doesn't work
session.set_default_client_config(aioconfig)
assert session.get_default_client_config() == aioconfig
session.set_credentials(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
session.set_debug_logger(logger_name="aiomoto")
yield session
@pytest.fixture
async def aio_aws_s3_client(aio_aws_session, aio_aws_s3_server):
# aio_aws_s3_server is just a string URI for the new `moto.server` for this client
async with aio_aws_session.create_client("s3", endpoint_url=aio_aws_s3_server) as client:
yield client
@pytest.mark.asyncio
async def test_aio_aws_s3_client(aio_aws_s3_client):
client = aio_aws_s3_client
assert isinstance(client, AioBaseClient)
assert client.meta.config.region_name == AWS_REGION
assert client.meta.region_name == AWS_REGION
resp = await client.list_buckets()
assert response_success(resp)
assert resp.get("Buckets") == []
# the event-name mocks are dynamically generated after calling the method;
# for aio-clients, they should be disabled for aiohttp to hit moto.server.
assert not has_moto_mocks(client, "before-send.s3.ListBuckets") The only minor drawback is that |
In case helpful, moto does have a reset API: http://docs.getmoto.org/en/latest/docs/moto_apis.html#reset-api |
seems to work: #773 |
|
ugh, forgot again this is my in-proc version. So we basically don't support the moto wrapper based tests, but that's not a big deal in my opinion as we expose fixtures for each client (see my PR above). I'm going to close this given I don't see any benefits of supporting the moto wrappers. Feel free to re-open if I'm missing something. |
Just hit the same issue. I am replacing something written using botocore and set of monkeypatches with aiobotocore. For testing my project I thought of using moto in mocking (default) configuration. However got hit by this. My thinking is that even if you guys decided to not support testing with moto, you should conform to return type of whatever stock botocore returns now in case there are some client code which registers custom handler for some reason which as result produces |
Going to re-open for more investigation |
Hi just wanted to check in if there's any progress on this ? |
haven't had a chance yet to look into this, been swamped at work |
@kkopachev totally agree |
Adding my voice here. Working on making s3fs async, and would like to test via moto. |
is it is most definitely possible, I do it extensively at work, however it requires you use the server model instead of wrapper which requires a bit more work |
I honestly didn't know about that - works just fine! |
Since the working snippet was with s3fs, consider implementing it there as part of the test fixture (currently uses moto as separate process). |
Is there a fix I can use with little to no boilerplate that I can use instead of skipping tests of functions that use |
Note that fsspec/s3fs does us moto for testing, so perhaps set it us as those packages' CIs do? |
Interesting observation. On my CI pipeline the tests don't fail but when run locally I get the AttributeError about Moto response. I will sniff around a bit to see if there is anything I can find. |
This is how I fixed it for class MockedAWSResponse(AWSResponse):
raw_headers = {} # type: ignore
async def read(self): # type: ignore
return self.text
@contextmanager
def patch_async_botocore_moto():
with ExitStack() as stack:
target_botocore = "botocore.awsrequest.AWSResponse"
patch_botocore = patch(target_botocore, MockedAWSResponse)
stack.enter_context(patch_botocore)
target_moto = "moto.core.botocore_stubber.AWSResponse"
patch_moto = patch(target_moto, MockedAWSResponse)
stack.enter_context(patch_moto)
yield |
Here's the 2023 version of my patch, passes strict mypy and works with a bunch of functions from S3, SQS and CloudFront. from collections.abc import Awaitable, Callable, Iterator
from dataclasses import dataclass
from typing import TypeVar
import aiobotocore
import aiobotocore.endpoint
import botocore
T = TypeVar("T")
R = TypeVar("R")
@dataclass
class _PatchedAWSReponseContent:
"""Patched version of `botocore.awsrequest.AWSResponse.content`"""
content: bytes | Awaitable[bytes]
def __await__(self) -> Iterator[bytes]:
async def _generate_async() -> bytes:
if isinstance(self.content, Awaitable):
return await self.content
else:
return self.content
return _generate_async().__await__()
def decode(self, encoding: str) -> str:
assert isinstance(self.content, bytes)
return self.content.decode(encoding)
class PatchedAWSResponse:
"""Patched version of `botocore.awsrequest.AWSResponse`"""
def __init__(self, response: botocore.awsrequest.AWSResponse) -> None:
self._response = response
self.status_code = response.status_code
self.content = _PatchedAWSReponseContent(response.content)
self.raw = response.raw
if not hasattr(self.raw, "raw_headers"):
self.raw.raw_headers = {}
def _factory(
original: Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]]
) -> Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]]:
async def patched_convert_to_response_dict(http_response: botocore.awsrequest.AWSResponse, operation_model: T) -> R:
return await original(PatchedAWSResponse(http_response), operation_model) # type: ignore[arg-type]
return patched_convert_to_response_dict
aiobotocore.endpoint.convert_to_response_dict = _factory(aiobotocore.endpoint.convert_to_response_dict) # type: ignore[assignment] |
Right now the unit tests via Stubber are broken: #939 as well as moto (which was a great surprise when I started work on migrating unit tests from one to the other). I'm also frustrated that Amazon doesn't provide async support natively. It would be much easier for them to add it to boto3 than for @thehesiod to patch the package up. There's a ticket that was opened for 8 years now: boto/botocore#458. It's not like AWS service is free to use. Stuff like this should be provided by Amazon not volunteers. @Apakottur I'm glad that it looks like there's a workaround for moto at least, so far it looks like resolved my issue. |
Hi I was just wondering if there was gonna be a fix for this. @juftin fix helped me get past it, but I'm not sure I understand if this is something that needs to be fixed or just a weird quirk of everything. |
This is great @Apakottur, I needed a few slight modifications to work with my tests (mainly had to also patch the response object used in the RetryContext). Sharing below. This has been tested with calls to DynamoDB and SQS, with the following package versions: from collections.abc import Awaitable, Callable, Iterator
from dataclasses import dataclass
from typing import TypeVar
import aiobotocore
import aiobotocore.endpoint
import botocore
import botocore.retries.standard
T = TypeVar("T")
R = TypeVar("R")
@dataclass
class _PatchedAWSReponseContent:
"""Patched version of `botocore.awsrequest.AWSResponse.content`"""
content: bytes | Awaitable[bytes]
def __await__(self) -> Iterator[bytes]:
async def _generate_async() -> bytes:
if isinstance(self.content, Awaitable):
return await self.content
else:
return self.content
return _generate_async().__await__()
def decode(self, encoding: str) -> str:
assert isinstance(self.content, bytes)
return self.content.decode(encoding)
class PatchedAWSResponse:
"""Patched version of `botocore.awsrequest.AWSResponse`"""
def __init__(self, response: botocore.awsrequest.AWSResponse) -> None:
self._response = response
self.status_code = response.status_code
self.headers = response.headers
self.url = response.url
self.content = _PatchedAWSReponseContent(response.content)
self.raw = response.raw
if not hasattr(self.raw, "raw_headers"):
self.raw.raw_headers = {}
class PatchedRetryContext(botocore.retries.standard.RetryContext):
"""Patched version of `botocore.retries.standard.RetryContext`"""
def __init__(self, *args, **kwargs):
if kwargs.get("http_response"):
kwargs["http_response"] = PatchedAWSResponse(kwargs["http_response"])
super().__init__(*args, **kwargs)
def _factory(
original: Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]]
) -> Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]]:
async def patched_convert_to_response_dict(http_response: botocore.awsrequest.AWSResponse, operation_model: T) -> R:
return await original(PatchedAWSResponse(http_response), operation_model) # type: ignore[arg-type]
return patched_convert_to_response_dict
aiobotocore.endpoint.convert_to_response_dict = _factory(aiobotocore.endpoint.convert_to_response_dict) # type: ignore[assignment]
botocore.retries.standard.RetryContext = PatchedRetryContext |
raw_headers isn't technically supported by aws standard, so probably it should be removed from aiobotocore anyway |
raw headers is necessary as aiohttp munges the response header names which breaks the AWS API calls. I expressed to the aiohttp group they should stop doing this but they decided instead to expose this attribute :( |
Thanks for this! Wrapped up with the # Attempt to import optional dependencies
from collections.abc import Awaitable, Callable, Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Generator, TypeVar
import aiobotocore
import aiobotocore.endpoint
import botocore
import botocore.retries.standard
import pytest
from moto import mock_aws
T = TypeVar("T")
R = TypeVar("R")
@dataclass
class _PatchedAWSReponseContent:
"""Patched version of `botocore.awsrequest.AWSResponse.content`"""
content: bytes | Awaitable[bytes]
def __await__(self) -> Iterator[bytes]:
async def _generate_async() -> bytes:
if isinstance(self.content, Awaitable):
return await self.content
else:
return self.content
return _generate_async().__await__()
def decode(self, encoding: str) -> str:
assert isinstance(self.content, bytes)
return self.content.decode(encoding)
class PatchedAWSResponse:
"""Patched version of `botocore.awsrequest.AWSResponse`"""
def __init__(self, response: botocore.awsrequest.AWSResponse) -> None:
self._response = response
self.status_code = response.status_code
self.headers = response.headers
self.url = response.url
self.content = _PatchedAWSReponseContent(response.content)
self.raw = response.raw
if not hasattr(self.raw, "raw_headers"):
self.raw.raw_headers = {}
class PatchedRetryContext(botocore.retries.standard.RetryContext):
"""Patched version of `botocore.retries.standard.RetryContext`"""
def __init__(self, *args, **kwargs):
if kwargs.get("http_response"):
kwargs["http_response"] = PatchedAWSResponse(kwargs["http_response"])
super().__init__(*args, **kwargs)
def _factory(
original: Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]],
) -> Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]]:
async def patched_convert_to_response_dict(http_response: botocore.awsrequest.AWSResponse, operation_model: T) -> R:
return await original(PatchedAWSResponse(http_response), operation_model) # type: ignore[arg-type]
return patched_convert_to_response_dict
@contextmanager
def mock_aio_aws(monkeypatch: pytest.MonkeyPatch) -> Generator[None, None, None]:
# Patch aiobotocore and botocore
monkeypatch.setattr(
aiobotocore.endpoint, "convert_to_response_dict", _factory(aiobotocore.endpoint.convert_to_response_dict)
)
monkeypatch.setattr(botocore.retries.standard, "RetryContext", PatchedRetryContext)
with mock_aws():
yield Which can be used like: import boto3
import pytest
from botocore.exceptions import ClientError
from .test_utils import mock_aio_aws
@pytest.fixture()
def mock_aws(monkeypatch):
with mock_aio_aws(monkeypatch):
yield |
Just building on whats before I needed to remove the self.headers["x-amz-crc32"] = None moto_patch.py # Attempt to import optional dependencies
from collections.abc import Awaitable, Callable, Generator, Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from typing import TypeVar
from unittest.mock import patch
import aiobotocore
import aiobotocore.endpoint
import botocore
import botocore.retries.standard
from moto import mock_aws
T = TypeVar("T")
R = TypeVar("R")
@dataclass
class _PatchedAWSReponseContent:
"""Patched version of `botocore.awsrequest.AWSResponse.content`"""
content: bytes | Awaitable[bytes]
def __await__(self) -> Iterator[bytes]:
async def _generate_async() -> bytes:
if isinstance(self.content, Awaitable):
return await self.content
return self.content
return _generate_async().__await__()
def decode(self, encoding: str) -> str:
assert isinstance(self.content, bytes)
return self.content.decode(encoding)
class PatchedAWSResponse:
"""Patched version of `botocore.awsrequest.AWSResponse`"""
def __init__(self, response: botocore.awsrequest.AWSResponse) -> None:
self._response = response
self.status_code = response.status_code
# '317822581'{'server': 'amazon.com', 'date': 'Thu, 29 Aug 2024 17:10:05 GMT', 'x-amzn-requestid': 'Rlz2JkR24Tzbh5GEFyIBCKempp5HjXw6uh17z5J5EtoGhW4Udr97', 'x-amz-crc32': '317822581'}
self.headers = response.headers
self.headers["x-amz-crc32"] = None
self.url = response.url
self.content = _PatchedAWSReponseContent(response.content)
self._content = self.content
self.raw = response.raw
self.text = response.text
if not hasattr(self.raw, "raw_headers"):
self.raw.raw_headers = {}
class PatchedRetryContext(botocore.retries.standard.RetryContext):
"""Patched version of `botocore.retries.standard.RetryContext`"""
def __init__(self, *args, **kwargs):
if kwargs.get("http_response"):
kwargs["http_response"] = PatchedAWSResponse(kwargs["http_response"])
super().__init__(*args, **kwargs)
def _factory(
original: Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]],
) -> Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]]:
async def patched_convert_to_response_dict(http_response: botocore.awsrequest.AWSResponse, operation_model: T) -> R:
return await original(PatchedAWSResponse(http_response), operation_model) # type: ignore[arg-type]
return patched_convert_to_response_dict
@contextmanager
def mock_aio_aws() -> Generator[None, None, None]:
with (
patch(
"aiobotocore.endpoint.convert_to_response_dict", new=_factory(aiobotocore.endpoint.convert_to_response_dict)
),
patch("botocore.retries.standard.RetryContext", new=PatchedRetryContext),
mock_aws(),
):
yield |
Can we revisit this to make them part of, I don't know, I'm pretty sure this is 100% required when using |
I was trying to use @harvey251's approach for fsspec but I can't get it play nice :/ Patch from collections.abc import Awaitable, Callable, Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Generator, TypeVar
import aiobotocore
import aiobotocore.endpoint
import botocore
import botocore.retries.standard
from moto import mock_aws
import pytest
T = TypeVar("T")
R = TypeVar("R")
@dataclass
class _PatchedAWSReponseContent:
"""Patched version of `botocore.awsrequest.AWSResponse.content`"""
content: bytes | Awaitable[bytes]
def __await__(self) -> Iterator[bytes]:
async def _generate_async() -> bytes:
if isinstance(self.content, Awaitable):
return await self.content
else:
return self.content
return _generate_async().__await__()
def decode(self, encoding: str) -> str:
assert isinstance(self.content, bytes)
return self.content.decode(encoding)
class PatchedAWSResponse:
"""Patched version of `botocore.awsrequest.AWSResponse`"""
def __init__(self, response: botocore.awsrequest.AWSResponse) -> None:
self._response = response
self.status_code = response.status_code
self.headers = response.headers
self.headers["x-amz-crc32"] = None
self.url = response.url
self.content = _PatchedAWSReponseContent(response.content)
self.raw = response.raw
if not hasattr(self.raw, "raw_headers"):
self.raw.raw_headers = {}
class PatchedRetryContext(botocore.retries.standard.RetryContext):
"""Patched version of `botocore.retries.standard.RetryContext`"""
def __init__(self, *args, **kwargs):
if kwargs.get("http_response"):
kwargs["http_response"] = PatchedAWSResponse(kwargs["http_response"])
super().__init__(*args, **kwargs)
def _factory(
original: Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]],
) -> Callable[[botocore.awsrequest.AWSResponse, T], Awaitable[R]]:
async def patched_convert_to_response_dict(
http_response: botocore.awsrequest.AWSResponse, operation_model: T
) -> R:
return await original(PatchedAWSResponse(http_response), operation_model) # type: ignore[arg-type]
return patched_convert_to_response_dict
@contextmanager
def mock_aio_aws(monkeypatch: pytest.MonkeyPatch) -> Generator[None, None, None]:
# Patch aiobotocore and botocore
monkeypatch.setattr(
aiobotocore.endpoint,
"convert_to_response_dict",
_factory(aiobotocore.endpoint.convert_to_response_dict),
)
monkeypatch.setattr(botocore.retries.standard, "RetryContext", PatchedRetryContext)
with mock_aws():
yield Test that fails import boto3
import fsspec
import pytest
from moto_patch import mock_aio_aws
@pytest.fixture()
def mock_aws(monkeypatch):
with mock_aio_aws(monkeypatch):
yield
def test_fsspec_s3(mock_aws):
# Create a mock S3 bucket
conn = boto3.client("s3", region_name="us-east-1")
conn.create_bucket(Bucket="test-bucket")
# Write data to a file in the mock S3 bucket using fsspec
with fsspec.open("s3://test-bucket/test.txt", "w") as f:
f.write("Hello, mocked S3!")
# Read data from the mock S3 bucket using fsspec
with fsspec.open("s3://test-bucket/test.txt", "r") as f:
content = f.read()
assert content == "Hello, mocked S3!" Error
|
This bug arises in pytest with moto
1.3.14
and althoughrequirements-dev.txt
has a dev-version, that fix is for something else, i.e. this is irrelevant:See also:
lowercase_dict
function alreadyBelow is an exception detail, when testing the following pytest fixtures:
This raises a simple exception when trying to parse a moto response (below) and the source code for botocore seems to match (there is no
AWSResponse.raw_headers
attr). Maybe there are API version differences between aiobotocore, botocore and moto (at the time of posting this issue). In the project, the requirements pull in the aiobotocore deps for boto3/botocore and moto is the latest release:The simple test function is:
The moto job-queues should be an empty list (and it is, see pdb details below).
Note that the moto response is an
botocore.awsrequest.AWSResponse
and not a:type http_response: botocore.vendored.requests.model.Response
The text was updated successfully, but these errors were encountered: