Skip to content

Commit 33de34b

Browse files
Andrew/event loop fix (#72)
* create a new event loop for async data load * create a new event loop for async data load if there is no active one
1 parent 5f84043 commit 33de34b

File tree

3 files changed

+15
-3
lines changed

3 files changed

+15
-3
lines changed

workflow/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
__version_info__ = (0, 0, 41)
15+
__version_info__ = (0, 0, 42)
1616
__version__ = ".".join(map(str, __version_info__))

workflow/executor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
FileMetadata
2020
from workflow.manager import ResourceManager
2121
from workflow.utils import save_csv_output, format_duration, build_models, extract_date_range, build_relation_path, \
22-
get_common_model_relative_path
22+
get_common_model_relative_path, get_or_create_eventloop
2323

2424

2525
class WorkflowStepState(str, Enum):
@@ -358,7 +358,7 @@ def _execute(self, logger: logging.Logger, env_config: EnvConfig, rai_config: Ra
358358
self.await_pending(env_config, logger, missed_resources)
359359

360360
def await_pending(self, env_config, logger, missed_resources):
361-
loop = asyncio.get_event_loop()
361+
loop = get_or_create_eventloop()
362362
if loop.is_running():
363363
raise Exception('Waiting for resource would interrupt unexpected event loop - aborting to avoid confusion')
364364
pending = [src for src in missed_resources if self._resource_is_async(src)]

workflow/utils.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,3 +194,15 @@ def call_with_overhead(
194194
loop = asyncio.get_event_loop()
195195
loop.run_until_complete(
196196
call_with_overhead_async(f, logger, overhead_rate, start_time, timeout, max_tries, first_delay, max_delay))
197+
198+
199+
def get_or_create_eventloop():
200+
"""Get the current event loop or create a new one."""
201+
try:
202+
return asyncio.get_event_loop()
203+
except RuntimeError as ex:
204+
if "There is no current event loop in thread" in str(ex):
205+
loop = asyncio.new_event_loop()
206+
asyncio.set_event_loop(loop)
207+
return asyncio.get_event_loop()
208+
raise ex

0 commit comments

Comments
 (0)