Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip some PyPy tests and revert breaking poll_at timestamp change #621

Merged
merged 7 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion faust/transport/drivers/aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ def _verify_aiokafka_event_path(self, now: float, tp: TP) -> bool:
poll_at = None
aiotp_state = assignment.state_value(aiotp)
if aiotp_state and aiotp_state.timestamp:
poll_at = aiotp_state.timestamp
poll_at = aiotp_state.timestamp / 1000 # milliseconds
if poll_at is None:
if secs_since_started >= self.tp_fetch_request_timeout_secs:
# NO FETCH REQUEST SENT AT ALL SINCE WORKER START
Expand Down
3 changes: 3 additions & 0 deletions tests/consistency/test_consistency.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import subprocess
import sys

import pytest

from tests.consistency.consistency_checker import ConsistencyChecker


Expand Down Expand Up @@ -176,6 +178,7 @@ async def _stop_process(self, proc):
await proc.wait()


@pytest.mark.skip(reason="Needs fixing")
async def test_consistency(loop):
stresser = Stresser(num_workers=4, num_producers=4, loop=loop)
checker = ConsistencyChecker(
Expand Down
42 changes: 30 additions & 12 deletions tests/functional/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging as _logging
import os
import sys
from copy import copy
from typing import IO, Dict, NamedTuple, Union
from unittest.mock import Mock
Expand Down Expand Up @@ -71,26 +72,43 @@ def logging(request):
**((marks.kwargs or {}) if marks else {}),
}
)
_logging._acquireLock()
try:
prev_state = copy(_logging.Logger.manager.loggerDict)
prev_handlers = copy(_logging.root.handlers)
finally:
_logging._releaseLock()
try:
# acquireLock() is removed in Python 3.13
if sys.version_info < (3, 13):
_logging._acquireLock()
try:
prev_state = copy(_logging.Logger.manager.loggerDict)
prev_handlers = copy(_logging.root.handlers)
finally:
_logging._releaseLock()
try:
setup_logging(
logfile=options.logfile,
loglevel=options.loglevel,
logging_config=options.logging_config,
)
yield
finally:
_logging._acquireLock()
try:
_logging.Logger.manager.loggerDict = prev_state
_logging.root.handlers = prev_handlers
finally:
_logging._releaseLock()
else:
with _logging._lock:
prev_state = copy(_logging.Logger.manager.loggerDict)
prev_handlers = copy(_logging.root.handlers)

setup_logging(
logfile=options.logfile,
loglevel=options.loglevel,
logging_config=options.logging_config,
)
yield
finally:
_logging._acquireLock()
try:

with _logging._lock:
_logging.Logger.manager.loggerDict = prev_state
_logging.root.handlers = prev_handlers
finally:
_logging._releaseLock()


@pytest.fixture()
Expand Down
46 changes: 46 additions & 0 deletions tests/functional/test_streams.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import platform
from copy import copy
from unittest.mock import Mock, patch

Expand Down Expand Up @@ -39,6 +40,9 @@ def _prepare_app(app):
return app


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
@pytest.mark.allow_lingering_tasks(count=1)
async def test_simple(app, loop):
Expand All @@ -50,6 +54,9 @@ async def test_simple(app, loop):
assert await channel_empty(stream.channel)


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_async_iterator(app):
async with new_stream(app) as stream:
Expand All @@ -64,6 +71,9 @@ async def test_async_iterator(app):
assert await channel_empty(stream.channel)


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_throw(app):
async with new_stream(app) as stream:
Expand All @@ -75,6 +85,9 @@ async def test_throw(app):
await anext(streamit)


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_enumerate(app):
async with new_stream(app) as stream:
Expand All @@ -89,6 +102,9 @@ async def test_enumerate(app):
assert await channel_empty(stream.channel)


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_items(app):
async with new_stream(app) as stream:
Expand All @@ -104,6 +120,9 @@ async def test_items(app):
assert await channel_empty(stream.channel)


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_through(app):
app._attachments.enabled = False
Expand Down Expand Up @@ -236,6 +255,9 @@ async def test_stream_filter_acks_filtered_out_messages(app, event_loop):
assert len(app.consumer.unacked) == 0


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_acks_filtered_out_messages_when_using_take(app, event_loop):
"""
Expand All @@ -260,6 +282,9 @@ async def test_acks_filtered_out_messages_when_using_take(app, event_loop):
assert len(acked) == len(initial_values)


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_events(app):
async with new_stream(app) as stream:
Expand Down Expand Up @@ -296,6 +321,9 @@ def assert_events_acked(events):
raise


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
class Test_chained_streams:
def _chain(self, app):
root = new_stream(app)
Expand Down Expand Up @@ -399,6 +427,9 @@ async def assert_was_stopped(self, leader, followers):
assert node._stopped.is_set()


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_start_and_stop_Stream(app):
s = new_topic_stream(app)
Expand All @@ -414,6 +445,9 @@ async def _start_stop_stream(stream):
await stream.stop()


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_ack(app):
async with new_stream(app) as s:
Expand All @@ -439,6 +473,9 @@ async def test_ack(app):
assert not event.message.refcount


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_noack(app):
async with new_stream(app) as s:
Expand All @@ -459,6 +496,9 @@ async def test_noack(app):
event.ack.assert_not_called()


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_acked_when_raising(app):
async with new_stream(app) as s:
Expand Down Expand Up @@ -496,6 +536,9 @@ async def test_acked_when_raising(app):
assert not event2.message.refcount


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
@pytest.mark.allow_lingering_tasks(count=1)
async def test_maybe_forward__when_event(app):
Expand All @@ -508,6 +551,9 @@ async def test_maybe_forward__when_event(app):
s.channel.send.assert_not_called()


@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
async def test_maybe_forward__when_concrete_value(app):
s = new_stream(app)
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/agents/test_agent.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import platform
from unittest.mock import ANY, call, patch

import pytest
Expand Down Expand Up @@ -953,6 +954,9 @@ def test_channel_iterator(self, *, agent):
def test_label(self, *, agent):
assert label(agent)

@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
async def test_context_calls_sink(self, *, agent):
class SinkCalledException(Exception):
pass
Expand Down
7 changes: 7 additions & 0 deletions tests/unit/test_streams.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import platform
from collections import defaultdict
from contextlib import ExitStack
from unittest.mock import Mock, patch
Expand Down Expand Up @@ -122,6 +123,9 @@ async def test_echo(self, *, stream, app):
await echoing("val")
channel.send.assert_called_once_with(value="val")

@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
@pytest.mark.allow_lingering_tasks(count=1)
async def test_aiter_tracked(self, *, stream, app):
Expand All @@ -137,6 +141,9 @@ async def test_aiter_tracked(self, *, stream, app):
else:
event.ack.assert_called_once_with()

@pytest.mark.skipif(
platform.python_implementation() == "PyPy", reason="Not yet supported on PyPy"
)
@pytest.mark.asyncio
@pytest.mark.allow_lingering_tasks(count=1)
async def test_aiter_tracked__CancelledError(self, *, stream, app):
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/transport/drivers/test_aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def _consumer(self, now, cthread, tp):
_consumer._fetcher._subscriptions.subscription.assignment.state_value
).return_value = MagicMock(
assignment={tp},
timestamp=now,
timestamp=now * 1000.0,
highwater=1,
position=0,
)
Expand Down Expand Up @@ -549,7 +549,7 @@ def test_timed_out(self, *, cthread, now, tp, logger, _consumer):
(fetcher._subscriptions.subscription.assignment.state_value).return_value = (
MagicMock(
assignment=assignment,
timestamp=now,
timestamp=now * 1000.0,
highwater=None,
tp_stream_timeout_secs=cthread.tp_stream_timeout_secs,
tp_fetch_request_timeout_secs=cthread.tp_fetch_request_timeout_secs,
Expand Down
Loading