Skip to content

Commit

Permalink
Merge pull request #58 from dbrattli/python-3.10
Browse files Browse the repository at this point in the history
Re-enable Python 3.10
  • Loading branch information
dbrattli authored Sep 28, 2024
2 parents ff3bea7 + c9bf511 commit 25772aa
Show file tree
Hide file tree
Showing 29 changed files with 421 additions and 391 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11", "3.12"]
python-version: ["3.10", "3.11", "3.12"]

steps:
- uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ repos:
language: node
pass_filenames: false
types: [python]
additional_dependencies: ["[email protected].364"]
additional_dependencies: ["[email protected].382"]
repo: local
15 changes: 8 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
<img src="logo/logo.jpg" alt="drawing" width="200"/>

# aioreactive - ReactiveX for asyncio using async and await

[![PyPI](https://img.shields.io/pypi/v/aioreactive.svg)](https://pypi.python.org/pypi/aioreactive)
![Python package](https://github.com/dbrattli/aioreactive/workflows/Python%20package/badge.svg)
![Publish Package](https://github.com/dbrattli/aioreactive/actions/workflows/python-publish.yml/badge.svg)
[![codecov](https://codecov.io/gh/dbrattli/aioreactive/branch/master/graph/badge.svg)](https://codecov.io/gh/dbrattli/aioreactive)


> *NEWS: Project rebooted Nov. 2020. Rebuilt using [Expression](https://github.com/dbrattli/Expression).*
Aioreactive is [RxPY](https://github.com/ReactiveX/RxPY) for asyncio.
Expand All @@ -19,9 +19,9 @@ and, integrates naturally with the Python language.
> aioreactive is the unification of RxPY and reactive programming with
> asyncio using async and await.
## The design goals for aioreactive:
## The design goals for aioreactive

* Python 3.11+ only. We have a hard dependency [Expression v5]([https://www.python.org/dev/peps/pep-0585/](https://github.com/dbrattli/Expression)).
* Python 3.10+ only. We have a hard dependency [Expression v5]([https://www.python.org/dev/peps/pep-0585/](https://github.com/dbrattli/Expression)).
* All operators and tools are implemented as plain old functions.
* Everything is `async`. Sending values is async, subscribing to
observables is async. Disposing subscriptions is async.
Expand Down Expand Up @@ -131,9 +131,10 @@ Even more interesting, with `to_async_iterable` you can flip around from
the stream of events.

```python
obv = AsyncIteratorObserver()
subscription = subscribe_async(source, obv)
async for x in obv:
import aioreactive as rx

xs = rx.from_iterable([1, 2, 3])
async for x in xs:
print(x)
```

Expand All @@ -153,7 +154,7 @@ import aioreactive as rx
xs = rx.from_iterable([1, 2, 3])
result = []

obv = rx.AsyncIteratorObserver()
obv = rx.AsyncIteratorObserver(xs)
async with await xs.subscribe_async(obv) as subscription:
async for x in obv:
result.append(x)
Expand Down
24 changes: 12 additions & 12 deletions aioreactive/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Aioreactive module.
Contains the AsyncRx chained obserable that allows method chaining of all operators.
Contains the AsyncRx chained observable that allows method chaining of all operators.
Also contains all operators as plain functions.
Expand All @@ -16,11 +16,11 @@
from __future__ import annotations

from collections.abc import AsyncIterable, Awaitable, Callable, Iterable
from typing import Any, TypeVar, TypeVarTuple
from typing import Any, TypeVar

from expression import Option, curry_flip, pipe
from expression.system.disposable import AsyncDisposable
from typing_extensions import Unpack
from typing_extensions import TypeVarTuple, Unpack

from .observables import AsyncAnonymousObservable, AsyncIterableObservable
from .observers import (
Expand Down Expand Up @@ -134,11 +134,11 @@ def empty(cls) -> AsyncRx[_TSource]:
return AsyncRx(empty())

@classmethod
def from_iterable(cls, iter: Iterable[_TSource]) -> AsyncRx[_TSource]:
def from_iterable(cls, iter: Iterable[_TOther]) -> AsyncRx[_TOther]:
return AsyncRx(from_iterable(iter))

@staticmethod
def from_async_iterable(iter: AsyncIterable[_TResult]) -> AsyncObservable[_TResult]:
def from_async_iterable(iter: AsyncIterable[_TResult]) -> AsyncRx[_TResult]:
"""Convert an async iterable to an async observable stream.
Example:
Expand All @@ -159,7 +159,7 @@ def single(cls, value: _TSource) -> AsyncRx[_TSource]:
def as_async_observable(self) -> AsyncObservable[_TSource]:
return AsyncAnonymousObservable(self.subscribe_async)

def choose(self, chooser: Callable[[_TSource], Option[_TSource]]) -> AsyncObservable[_TSource]:
def choose(self, chooser: Callable[[_TSource], Option[_TSource]]) -> AsyncRx[_TSource]:
"""Choose.
Applies the given function to each element of the stream and returns
Expand All @@ -175,7 +175,7 @@ def choose(self, chooser: Callable[[_TSource], Option[_TSource]]) -> AsyncObserv
"""
return AsyncRx(pipe(self, choose(chooser)))

def choose_async(self, chooser: Callable[[_TSource], Awaitable[Option[_TSource]]]) -> AsyncObservable[_TSource]:
def choose_async(self, chooser: Callable[[_TSource], Awaitable[Option[_TSource]]]) -> AsyncRx[_TSource]:
"""Choose async.
Applies the given async function to each element of the stream and
Expand Down Expand Up @@ -226,7 +226,7 @@ def delay(self, seconds: float) -> AsyncRx[_TSource]:

return AsyncRx(pipe(self, delay(seconds)))

def distinct_until_changed(self) -> AsyncObservable[_TSource]:
def distinct_until_changed(self) -> AsyncRx[_TSource]:
from .filtering import distinct_until_changed

return AsyncRx(distinct_until_changed(self))
Expand All @@ -250,7 +250,7 @@ def filter(self, predicate: Callable[[_TSource], bool]) -> AsyncRx[_TSource]:

return AsyncRx(pipe(self, _filter(predicate)))

def filteri(self, predicate: Callable[[_TSource, int], bool]) -> AsyncObservable[_TSource]:
def filteri(self, predicate: Callable[[_TSource, int], bool]) -> AsyncRx[_TSource]:
"""Filter with index.
Filters the elements of an observable sequence based on a predicate
Expand Down Expand Up @@ -325,7 +325,7 @@ def reduce_async(
) -> AsyncRx[_TResult]:
return pipe(self, reduce_async(accumulator, initial), AsyncRx[_TResult])

def skip(self, count: int) -> AsyncObservable[_TSource]:
def skip(self, count: int) -> AsyncRx[_TSource]:
"""Skip items from start of the stream.
Bypasses a specified number of elements in an observable sequence
Expand Down Expand Up @@ -388,7 +388,7 @@ def starmap(self: AsyncRx[tuple[Unpack[_V]]], mapper: Callable[[Unpack[_V]], _TR
"""
return AsyncRx(pipe(self, starmap(mapper)))

def take(self, count: int) -> AsyncObservable[_TSource]:
def take(self, count: int) -> AsyncRx[_TSource]:
"""Take the first elements from the stream.
Returns a specified number of contiguous elements from the start of
Expand All @@ -405,7 +405,7 @@ def take(self, count: int) -> AsyncObservable[_TSource]:

return AsyncRx(pipe(self, take(count)))

def take_last(self, count: int) -> AsyncObservable[_TSource]:
def take_last(self, count: int) -> AsyncRx[_TSource]:
"""Take last elements from stream.
Returns a specified number of contiguous elements from the end of an
Expand Down
6 changes: 3 additions & 3 deletions aioreactive/timeshift.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import logging
from collections.abc import Awaitable, Callable, Iterable
from datetime import UTC, datetime, timedelta
from datetime import datetime, timedelta, timezone
from typing import NoReturn, TypeVar

from expression import curry_flipped
Expand Down Expand Up @@ -59,7 +59,7 @@ async def loop() -> TailCallResult[None, ...]:

ns, due_time = await inbox.receive()

diff = due_time - datetime.now(UTC)
diff = due_time - datetime.now(timezone.utc)
seconds = diff.total_seconds()
if seconds > 0:
await asyncio.sleep(seconds)
Expand All @@ -84,7 +84,7 @@ async def matcher() -> None:
agent = MailboxProcessor.start(worker, token)

async def fn(ns: Notification[_TSource]) -> None:
due_time = datetime.now(UTC) + timedelta(seconds=seconds)
due_time = datetime.now(timezone.utc) + timedelta(seconds=seconds)
agent.post((ns, due_time))

obv: AsyncNotificationObserver[_TSource] = AsyncNotificationObserver(fn)
Expand Down
Loading

0 comments on commit 25772aa

Please sign in to comment.