Skip to content

Commit f62eebd

Browse files
committedSep 4, 2017
Version 0.6.0
Gevent is supported.
1 parent 42b8368 commit f62eebd

17 files changed

+528
-43
lines changed
 

‎CHANGELOG.rst

+4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
CHANGELOG
22
=========
3+
Version 0.6.0
4+
-------------
5+
Gevent is supported.
6+
37
Version 0.5.0
48
-------------
59
CI/CD pipelines.

‎CI_REQUIREMENTS.txt

+1
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ six
22
typing >= 3.6
33
futures>=3.1; python_version == "2.7"
44
asyncio>=3.4; python_version == "3.3"
5+
gevent >= 1.2

‎README.rst

+33-5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ threaded is a set of decorators, which wrap functions in:
2222
* `concurrent.futures.ThreadPool`
2323
* `threading.Thread`
2424
* `asyncio.Task` in Python 3.
25+
* `gevent.threadpool.ThreadPool` if gevent is installed.
2526

2627
Why? Because copy-paste of `loop.create_task`, `threading.Thread` and `thread_pool.submit` is boring,
2728
especially if target functions is used by this way only.
@@ -46,15 +47,20 @@ Pros:
4647

4748
Decorators:
4849

49-
* `ThreadPooled` - native concurrent.futures.ThreadPool usage on Python 3 and it's backport on Python 2.7.
50+
* `ThreadPooled` - native ``concurrent.futures.ThreadPool`` usage on Python 3 and it's backport on Python 2.7.
5051
* `threadpooled` is alias for `ThreadPooled`.
5152

52-
* `Threaded` - wrap in threading.Thread.
53+
* `Threaded` - wrap in ``threading.Thread``.
5354
* `threaded` is alias for `Threaded`.
5455

55-
* `AsyncIOTask` - wrap in asyncio.Task. Uses the same API, as Python 3 `ThreadPooled`.
56+
* `AsyncIOTask` - wrap in ``asyncio.Task``. Uses the same API, as Python 3 `ThreadPooled`.
5657
* `asynciotask` is alias for `AsyncIOTask`.
5758

59+
* `GThreadPooled` - wrap function in ``gevent.threadpool.ThreadPool``.
60+
* `gthreadpooled` is alias for `GThreadPooled`.
61+
62+
.. note:: gevent is not in default package requirements.
63+
5864
Usage
5965
=====
6066

@@ -123,7 +129,7 @@ During application shutdown, pool can be stopped (while it will be recreated aut
123129
124130
Threaded
125131
--------
126-
Classic threading.Thread. Useful for running until close and self-closing threads without return.
132+
Classic ``threading.Thread``. Useful for running until close and self-closing threads without return.
127133

128134
Usage example:
129135

@@ -168,7 +174,7 @@ it can be started automatically before return:
168174
169175
AsyncIOTask
170176
-----------
171-
Wrap in asyncio.Task.
177+
Wrap in ``asyncio.Task``.
172178

173179
.. note:: Python 3 only.
174180

@@ -208,6 +214,28 @@ Usage with loop extraction from call arguments:
208214
loop = asyncio.get_event_loop()
209215
loop.run_until_complete(asyncio.wait_for(func(loop), timeout))
210216
217+
GThreadPooled
218+
-------------
219+
Post function to ``gevent.threadpool.ThreadPool``.
220+
221+
.. code-block:: python
222+
223+
threaded.GThreadPooled.configure(max_workers=3)
224+
225+
.. note:: By if executor is not configured - it configures with default parameters: ``max_workers=(CPU_COUNT or 1) * 5``
226+
227+
.. note:: Instead of standard ThreadPoolExecutor, gevent pool is not re-created during re-configuration.
228+
229+
Basic usage example:
230+
231+
.. code-block:: python
232+
233+
@threaded.GThreadPooled
234+
def func():
235+
pass
236+
237+
func().wait()
238+
211239
Testing
212240
=======
213241
The main test mechanism for the package `threaded` is using `tox`.

‎doc/source/asynciotask.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
.. PrettyFormat, pretty_repr and pretty_str
1+
.. AsyncIOTask, asynciotask.
22
33
API: Decorators: `AsyncIOTask`, `asynciotask`.
44
================================================

‎doc/source/gthreadpooled.rst

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
.. GThreadPooled, gthreadpooled.
2+
3+
API: Decorators: `GThreadPooled`, `gthreadpooled`.
4+
==================================================
5+
6+
.. py:module:: pooled
7+
.. py:currentmodule:: pooled
8+
9+
.. py:class:: GThreadPooled(func, )
10+
11+
Post function to gevent.threadpool.ThreadPool.
12+
13+
:param func: function to wrap
14+
:type func: typing.Optional[typing.Callable[..., typing.Union[typing.Any, typing.Awaitable]]]
15+
16+
.. note:: Attributes is read-only
17+
18+
.. py:attribute:: executor
19+
20+
``gevent.threadpool.ThreadPool`` instance. Class-wide.
21+
22+
.. py:attribute:: _func
23+
24+
``typing.Optional[typing.Callable[..., typing.Union[typing.Any, typing.Awaitable]]]``
25+
Wrapped function. Used for inheritance only.
26+
27+
.. py:classmethod:: configure(max_workers=None, hub=None)
28+
29+
Pool executor create and configure.
30+
31+
:param max_workers: Maximum workers
32+
:type max_workers: typing.Optional[int]
33+
:param hub: Event-loop hub
34+
:type hub: typing.Optional[gevent.hub.Hub]
35+
36+
.. note:: max_workers=None means `(CPU_COUNT or 1) * 5`, it's default value.
37+
38+
.. py:classmethod:: shutdown
39+
40+
Shutdown executor.
41+
42+
.. py:method:: __call__(*args, **kwargs)
43+
44+
Decorator entry point.
45+
46+
:rtype: typing.Union[typing.Callable[..., gevent.event.AsyncResult], gevent.event.AsyncResult]
47+
48+
49+
.. py:function:: gthreadpooled(func, )
50+
51+
Post function to gevent.threadpool.ThreadPool.
52+
53+
:param func: function to wrap
54+
:type func: typing.Optional[typing.Callable]
55+
:rtype: ThreadPooled

‎doc/source/index.rst

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Contents:
1313
threadpooled
1414
threaded
1515
asynciotask
16+
gthreadpooled
1617

1718
Indices and tables
1819
==================

‎doc/source/threaded.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
.. logwrap function and LogWrap class description.
1+
.. Threaded class and threaded function.
22
33
API: Decorators: `Threaded` class and `threaded` function.
44
==========================================================

‎doc/source/threadpooled.rst

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
.. PrettyFormat, pretty_repr and pretty_str
1+
.. ThreadPooled, threadpooled.
22
33
API: Decorators: `ThreadPooled`, `threadpooled`.
44
================================================
@@ -11,7 +11,7 @@ API: Decorators: `ThreadPooled`, `threadpooled`.
1111
Post function to ThreadPoolExecutor.
1212

1313
:param func: function to wrap
14-
:type func: typing.Optional[typing.Callable[..., typing.Union[typing.Callable, typing.Awaitable]]]
14+
:type func: typing.Optional[typing.Callable[..., typing.Union[typing.Any, typing.Awaitable]]]
1515

1616
:param loop_getter: Method to get event loop, if wrap in asyncio task
1717

@@ -46,7 +46,7 @@ API: Decorators: `ThreadPooled`, `threadpooled`.
4646

4747
.. py:attribute:: _func
4848
49-
``typing.Optional[typing.Callable[..., typing.Union[typing.Callable, typing.Awaitable]]]``
49+
``typing.Optional[typing.Callable[..., typing.Union[typing.Any, typing.Awaitable]]]``
5050
Wrapped function. Used for inheritance only.
5151

5252
.. py:classmethod:: configure(max_workers=None)
@@ -74,7 +74,7 @@ API: Decorators: `ThreadPooled`, `threadpooled`.
7474
Post function to ThreadPoolExecutor.
7575

7676
:param func: function to wrap
77-
:type func: typing.Optional[typing.Callable]
77+
:type func: typing.Optional[typing.Callable[..., typing.Union[typing.Any, typing.Awaitable]]]
7878

7979
.. note:: Next arguments is Python 3 only:
8080

‎setup.py

+3
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ def get_simple_vars_from_src(src):
134134
':python_version == "3.3"': [
135135
'asyncio>=3.4',
136136
],
137+
'gevent': [
138+
'gevent >= 1.2'
139+
],
137140
},
138141
install_requires=required,
139142
)

‎test/test_gevent_threadpooled.py

+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# Copyright 2017 Alexey Stepanov aka penguinolog
2+
##
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
# not use this file except in compliance with the License. You may obtain
5+
# a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
# License for the specific language governing permissions and limitations
13+
# under the License.
14+
15+
from __future__ import absolute_import
16+
from __future__ import unicode_literals
17+
18+
import threading
19+
import unittest
20+
21+
try:
22+
import gevent
23+
import gevent.threadpool
24+
except ImportError:
25+
gevent = None
26+
27+
import six
28+
29+
import threaded
30+
31+
if six.PY3:
32+
from os import cpu_count
33+
else:
34+
try:
35+
from multiprocessing import cpu_count
36+
except ImportError:
37+
def cpu_count():
38+
"""Fake CPU count."""
39+
return 1
40+
41+
42+
@unittest.skipIf(gevent is None, 'No gevent')
43+
class TestThreadPooled(unittest.TestCase):
44+
def tearDown(self):
45+
threaded.GThreadPooled.shutdown()
46+
47+
def test_thread_pooled_default(self):
48+
@threaded.gthreadpooled
49+
def test():
50+
return threading.current_thread().name
51+
52+
pooled_name = test().wait()
53+
self.assertNotEqual(pooled_name, threading.current_thread().name)
54+
55+
def test_thread_pooled_construct(self):
56+
@threaded.gthreadpooled()
57+
def test():
58+
return threading.current_thread().name
59+
60+
pooled_name = test().wait()
61+
self.assertNotEqual(pooled_name, threading.current_thread().name)
62+
63+
def test_thread_pooled_config(self):
64+
thread_pooled = threaded.gthreadpooled()
65+
thread_pooled.configure()
66+
67+
self.assertEqual(
68+
thread_pooled.executor.maxsize,
69+
(cpu_count() or 1) * 5
70+
)
71+
72+
thread_pooled.configure(max_workers=2)
73+
74+
@thread_pooled
75+
def test():
76+
return threading.current_thread().name
77+
78+
pooled_name = test().wait()
79+
self.assertNotEqual(pooled_name, threading.current_thread().name)
80+
self.assertEqual(thread_pooled.executor.maxsize, 2)
81+
82+
def test_reconfigure(self):
83+
"""Gevent worker instance is re-used."""
84+
thread_pooled = threaded.gthreadpooled()
85+
executor = thread_pooled.executor
86+
87+
thread_pooled.configure(max_workers=executor.maxsize)
88+
self.assertIs(executor, thread_pooled.executor)
89+
90+
old_size = executor.maxsize
91+
thread_pooled.configure(max_workers=old_size + 1)
92+
self.assertIs(executor, thread_pooled.executor)
93+
self.assertEqual(executor.maxsize, old_size + 1)

‎threaded/__init__.py

+15-1
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,22 @@
2828
threaded,
2929
asynciotask
3030
)
31+
32+
try: # pragma: no cover
33+
from ._gthreadpooled3 import GThreadPooled, gthreadpooled
34+
except ImportError: # pragma: no cover
35+
GThreadPooled = gthreadpooled = None
3136
else: # pragma: no cover
3237
from ._threaded2 import (
3338
ThreadPooled,
3439
Threaded,
3540
threadpooled,
3641
threaded,
3742
)
43+
try: # pragma: no cover
44+
from ._gthreadpooled2 import GThreadPooled, gthreadpooled
45+
except ImportError: # pragma: no cover
46+
GThreadPooled = gthreadpooled = None
3847
# pylint: enable=no-name-in-module
3948

4049

@@ -48,6 +57,11 @@
4857
'AsyncIOTask',
4958
'asynciotask'
5059
)
60+
if GThreadPooled is not None: # pragma: no cover
61+
__all__ += (
62+
'GThreadPooled',
63+
'gthreadpooled'
64+
)
5165

52-
__version__ = '0.5.0'
66+
__version__ = '0.6.0'
5367
__author__ = "Alexey Stepanov <penguinolog@gmail.com>"

‎threaded/_base_gthreadpooled.py

+102
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# Copyright 2017 Alexey Stepanov aka penguinolog
2+
##
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
# not use this file except in compliance with the License. You may obtain
5+
# a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
# License for the specific language governing permissions and limitations
13+
# under the License.
14+
15+
"""gevent.threadpool.ThreadPool usage."""
16+
17+
from __future__ import absolute_import
18+
19+
import gevent.threadpool
20+
import six
21+
22+
from . import _base_threaded
23+
24+
__all__ = (
25+
'BaseGThreadPooled',
26+
)
27+
28+
29+
class BaseGThreadPooled(_base_threaded.APIPooled):
30+
"""Post function to gevent.threadpool.ThreadPool."""
31+
32+
__slots__ = ()
33+
34+
__executor = None
35+
36+
# pylint: disable=arguments-differ
37+
@classmethod
38+
def configure(
39+
cls,
40+
max_workers=None,
41+
hub=None
42+
):
43+
"""Pool executor create and configure.
44+
45+
:param max_workers: Maximum workers
46+
:type max_workers: typing.Optional[int]
47+
:param hub: Event-loop hub
48+
:type hub: typing.Optional[gevent.hub.Hub]
49+
"""
50+
if max_workers is None:
51+
max_workers = _base_threaded.cpu_count() * 5
52+
53+
if isinstance(cls.__executor, gevent.threadpool.ThreadPool):
54+
if hub is None or hub == cls.__executor.hub:
55+
if max_workers == cls.__executor.maxsize:
56+
return # Nothing to change)
57+
cls.__executor.maxsize = max_workers # We can use internals
58+
return
59+
# Hub change. Very special case.
60+
cls.__executor.kill() # pragma: no cover
61+
62+
cls.__executor = gevent.threadpool.ThreadPool(
63+
maxsize=max_workers,
64+
hub=hub
65+
)
66+
67+
# pylint: enable=arguments-differ
68+
69+
@classmethod
70+
def shutdown(cls):
71+
"""Shutdown executor.
72+
73+
Due to not implemented method, set maxsize to 0 (do not accept new).
74+
"""
75+
if cls.__executor is not None:
76+
cls.__executor.kill()
77+
78+
@property
79+
def executor(self):
80+
"""Executor instance.
81+
82+
:rtype: gevent.threadpool.ThreadPool
83+
"""
84+
if not isinstance(self.__executor, gevent.threadpool.ThreadPool):
85+
self.configure()
86+
return self.__executor
87+
88+
def _get_function_wrapper(self, func):
89+
"""Here should be constructed and returned real decorator.
90+
91+
:param func: Wrapped function
92+
:type func: typing.Callable
93+
:return: wrapped function
94+
:rtype: typing.Callable[..., gevent.event.AsyncResult]
95+
"""
96+
# pylint: disable=missing-docstring
97+
# noinspection PyMissingOrEmptyDocstring
98+
@six.wraps(func)
99+
def wrapper(*args, **kwargs):
100+
return self.executor.spawn(func, *args, **kwargs)
101+
# pylint: enable=missing-docstring
102+
return wrapper

‎threaded/_base_threaded.py

+37-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
from __future__ import absolute_import
1818

19+
import abc
1920
# noinspection PyCompatibility
2021
import concurrent.futures
2122
import threading
@@ -35,12 +36,47 @@ def cpu_count():
3536
return 1
3637

3738
__all__ = (
39+
'APIPooled',
3840
'BasePooled',
3941
'ThreadPoolExecutor',
42+
'cpu_count'
4043
)
4144

4245

43-
class BasePooled(_class_decorator.BaseDecorator):
46+
class APIPooled(_class_decorator.BaseDecorator):
47+
"""API description for pooled."""
48+
49+
__slots__ = ()
50+
51+
__executor = None
52+
53+
@classmethod
54+
@abc.abstractmethod
55+
def configure(
56+
cls,
57+
max_workers=None,
58+
):
59+
"""Pool executor create and configure.
60+
61+
:param max_workers: Maximum workers
62+
:type max_workers: typing.Optional[int]
63+
"""
64+
raise NotImplementedError() # pragma: no cover
65+
66+
@classmethod
67+
@abc.abstractmethod
68+
def shutdown(cls):
69+
"""Shutdown executor."""
70+
raise NotImplementedError() # pragma: no cover
71+
72+
@property
73+
@abc.abstractmethod
74+
def executor(self):
75+
"""Executor instance."""
76+
raise NotImplementedError() # pragma: no cover
77+
78+
79+
class BasePooled(APIPooled):
4480
"""Base ThreadPooled class."""
4581

4682
__slots__ = ()

‎threaded/_gthreadpooled2.py

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# Copyright 2017 Alexey Stepanov aka penguinolog
2+
##
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
# not use this file except in compliance with the License. You may obtain
5+
# a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
# License for the specific language governing permissions and limitations
13+
# under the License.
14+
15+
"""Python 2 threaded implementation.
16+
17+
Uses backport of concurrent.futures.
18+
"""
19+
20+
from __future__ import absolute_import
21+
22+
from . import _base_gthreadpooled
23+
24+
__all__ = (
25+
'GThreadPooled',
26+
'gthreadpooled',
27+
)
28+
29+
30+
class GThreadPooled(_base_gthreadpooled.BaseGThreadPooled):
31+
"""Post function to gevent.threadpool.ThreadPool."""
32+
33+
__slots__ = ()
34+
35+
36+
# pylint: disable=unexpected-keyword-arg, no-value-for-parameter
37+
def gthreadpooled(func=None):
38+
"""Post function to gevent.threadpool.ThreadPool.
39+
40+
:param func: function to wrap
41+
:type func: typing.Optional[typing.Callable]
42+
:rtype: BaseGThreadPooled
43+
"""
44+
return GThreadPooled(func=func)
45+
# pylint: enable=unexpected-keyword-arg, no-value-for-parameter

‎threaded/_gthreadpooled3.py

+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# Copyright 2017 Alexey Stepanov aka penguinolog
2+
##
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
# not use this file except in compliance with the License. You may obtain
5+
# a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
# License for the specific language governing permissions and limitations
13+
# under the License.
14+
15+
"""Python 3 threaded implementation.
16+
17+
Asyncio is supported
18+
"""
19+
20+
import functools
21+
import typing
22+
23+
import gevent.event
24+
25+
from . import _base_gthreadpooled
26+
from . import _py3_helpers
27+
28+
__all__ = (
29+
'GThreadPooled',
30+
'gthreadpooled',
31+
)
32+
33+
34+
class GThreadPooled(_base_gthreadpooled.BaseGThreadPooled):
35+
"""Post function to ThreadPoolExecutor."""
36+
37+
__slots__ = ()
38+
39+
def _get_function_wrapper(
40+
self,
41+
func: typing.Callable
42+
) -> typing.Callable[
43+
...,
44+
gevent.event.AsyncResult
45+
]:
46+
"""Here should be constructed and returned real decorator.
47+
48+
:param func: Wrapped function
49+
:type func: typing.Callable
50+
:return: wrapped coroutine or function
51+
:rtype: typing.Callable[..., gevent.event.AsyncResult]
52+
"""
53+
prepared = _py3_helpers.await_if_required(func)
54+
55+
# pylint: disable=missing-docstring
56+
# noinspection PyMissingOrEmptyDocstring
57+
@functools.wraps(prepared)
58+
def wrapper(
59+
*args, **kwargs
60+
) -> gevent.event.AsyncResult:
61+
return self.executor.spawn(prepared, *args, **kwargs)
62+
63+
# pylint: enable=missing-docstring
64+
return wrapper
65+
66+
67+
# pylint: disable=unexpected-keyword-arg, no-value-for-parameter
68+
def gthreadpooled(
69+
func: typing.Optional[typing.Callable]=None
70+
) -> GThreadPooled:
71+
"""Post function to gevent.threadpool.ThreadPool.
72+
73+
:param func: function to wrap
74+
:type func: typing.Optional[typing.Callable]
75+
:rtype: GThreadPooled
76+
"""
77+
return GThreadPooled(func=func)
78+
# pylint: enable=unexpected-keyword-arg, no-value-for-parameter

‎threaded/_py3_helpers.py

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Copyright 2017 Alexey Stepanov aka penguinolog
2+
##
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
# not use this file except in compliance with the License. You may obtain
5+
# a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
# License for the specific language governing permissions and limitations
13+
# under the License.
14+
15+
"""Python 3 related helpers."""
16+
17+
# noinspection PyCompatibility
18+
import asyncio
19+
import functools
20+
import typing
21+
22+
__all__ = (
23+
'get_loop', 'await_if_required'
24+
)
25+
26+
27+
def get_loop(
28+
self,
29+
*args, **kwargs
30+
) -> typing.Optional[asyncio.AbstractEventLoop]:
31+
"""Get event loop in decorator class."""
32+
if callable(self.loop_getter):
33+
if self.loop_getter_need_context:
34+
return self.loop_getter(*args, **kwargs)
35+
return self.loop_getter()
36+
return self.loop_getter
37+
38+
39+
def await_if_required(target: typing.Callable) -> typing.Callable:
40+
"""Await result if coroutine was returned."""
41+
@functools.wraps(target)
42+
def wrapper(*args, **kwargs):
43+
"""Decorator/wrapper."""
44+
result = target(*args, **kwargs)
45+
if asyncio.iscoroutine(result):
46+
loop = asyncio.new_event_loop()
47+
result = loop.run_until_complete(result)
48+
loop.close()
49+
return result
50+
return wrapper

‎threaded/_threaded3.py

+5-30
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
from . import _base_threaded
3131
from . import _class_decorator
32+
from . import _py3_helpers
3233

3334
__all__ = (
3435
'ThreadPooled',
@@ -40,32 +41,6 @@
4041
)
4142

4243

43-
def _get_loop(
44-
self,
45-
*args, **kwargs
46-
) -> typing.Optional[asyncio.AbstractEventLoop]:
47-
"""Get event loop in decorator class."""
48-
if callable(self.loop_getter):
49-
if self.loop_getter_need_context:
50-
return self.loop_getter(*args, **kwargs)
51-
return self.loop_getter()
52-
return self.loop_getter
53-
54-
55-
def await_if_required(target: typing.Callable) -> typing.Callable:
56-
"""Await result if coroutine was returned."""
57-
@functools.wraps(target)
58-
def wrapper(*args, **kwargs):
59-
"""Decorator/wrapper."""
60-
result = target(*args, **kwargs)
61-
if asyncio.iscoroutine(result):
62-
loop = asyncio.new_event_loop()
63-
result = loop.run_until_complete(result)
64-
loop.close()
65-
return result
66-
return wrapper
67-
68-
6944
class ThreadPooled(_base_threaded.BasePooled):
7045
"""Post function to ThreadPoolExecutor."""
7146

@@ -151,7 +126,7 @@ def _get_function_wrapper(
151126
]
152127
]
153128
"""
154-
prepared = await_if_required(func)
129+
prepared = _py3_helpers.await_if_required(func)
155130

156131
# pylint: disable=missing-docstring
157132
# noinspection PyMissingOrEmptyDocstring
@@ -162,7 +137,7 @@ def wrapper(
162137
concurrent.futures.Future,
163138
asyncio.Task
164139
]:
165-
loop = _get_loop(self, *args, **kwargs)
140+
loop = _py3_helpers.get_loop(self, *args, **kwargs)
166141

167142
if loop is None:
168143
return self.executor.submit(prepared, *args, **kwargs)
@@ -210,7 +185,7 @@ def _get_function_wrapper(
210185
:return: wrapped function
211186
:rtype: typing.Callable[..., threading.Thread]
212187
"""
213-
prepared = await_if_required(func)
188+
prepared = _py3_helpers.await_if_required(func)
214189
name = self.name
215190
if name is None:
216191
name = 'Threaded: ' + getattr(
@@ -310,7 +285,7 @@ def _get_function_wrapper(
310285
# noinspection PyCompatibility,PyMissingOrEmptyDocstring
311286
@functools.wraps(func)
312287
def wrapper(*args, **kwargs) -> asyncio.Task:
313-
loop = _get_loop(self, *args, **kwargs)
288+
loop = _py3_helpers.get_loop(self, *args, **kwargs)
314289
return loop.create_task(func(*args, **kwargs))
315290

316291
# pylint: enable=missing-docstring

0 commit comments

Comments
 (0)
Please sign in to comment.