Skip to content

Commit

Permalink
Move loop handling from plugin to main.
Browse files Browse the repository at this point in the history
  • Loading branch information
aknrdureegaesr committed Jan 4, 2024
1 parent b28c2bc commit 8e60abc
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 79 deletions.
138 changes: 128 additions & 10 deletions nikola/__main__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-

# Copyright © 2012-2023 Roberto Alsina and others.
# Copyright © 2012-2024 Roberto Alsina and others.

# Permission is hereby granted, free of charge, to any
# person obtaining a copy of this software and associated
Expand All @@ -26,16 +26,11 @@

"""The main function of Nikola."""

import importlib.util
import os
import shutil
import sys
import textwrap
import traceback
import doit.cmd_base
import asyncio
from blinker import signal
from collections import defaultdict

from blinker import signal
import doit.cmd_base
from doit.cmd_base import TaskLoader2, _wrap
from doit.cmd_clean import Clean as DoitClean
from doit.cmd_completion import TabCompletion
Expand All @@ -45,6 +40,14 @@
from doit.loader import generate_tasks
from doit.reporter import ExecutedOnlyReporter

import importlib.util
import os
import shutil
import sys
import textwrap
import traceback
from typing import Optional, Any

from . import __version__
from .nikola import Nikola
from .plugin_categories import Command
Expand All @@ -56,6 +59,13 @@
except ImportError:
pass # This is only so raw_input/input does nicer things if it's available

try:
from asyncio import AbstractChildWatcher
except ImportError:
# This type does not exist under Windows,
# so the methods needing it will also not be called under Windows.
AbstractChildWatcher = Any


config = {}

Expand Down Expand Up @@ -254,6 +264,113 @@ def clean_tasks(self, tasks, dryrun, *a):
DoitAuto = None


class FixingEventLoopPolicy(asyncio.AbstractEventLoopPolicy):
"""A context handler and event loop policy to mitigate some problems.
First problem: Python 3.7 and earlier under Windows
service the Ctrl+C interrupt signal in a polling fashion,
only when there is some action on the loop. So see to it
that there is some action on the loop every second.
See https://bugs.python.org/issue23057 (fixed in Python 3.8).
Second problem: Under Windows, the asyncio.ProactorEventLoop
is the loop to use. This happens automatically in Python 3.8
and later, but not yet in 3.7.
General sanity: When all processing is to stop, the event loop
might be closed down as well. On the other hand,
this might not be a good idea if the loop is to be re-used later.
So this can be controlled via an __init__ parameter.
"""

def __init__(self, close_loop_at_exit: bool):
"""Initialize the object."""
self._close_loop_at_exit = close_loop_at_exit

# The previous event loop, to be restored later:
self._original_event_loop_policy: Optional[asyncio.AbstractEventLoopPolicy] = None
# The event loop to delegate to, mostly the same as the original:
self._base_event_loop_policy: Optional[asyncio.AbstractEventLoopPolicy] = None

self._loop: Optional[asyncio.AbstractEventLoop] = None
self._ctrl_c_fix_future: Optional[asyncio.Future[None]] = None

def __enter__(self):
"""Set ourselves as an event loop policy.
Save enough state to clean up later.
"""
self._original_event_loop_policy = asyncio.get_event_loop_policy()
if sys.platform == 'win32' and sys.version_info < (3, 8):
# This is the sane default for Windows,
# but it has become the default only with Python 3.8:
self._base_event_loop_policy = asyncio.WindowsProactorEventLoopPolicy()
else:
self._base_event_loop_policy = self._original_event_loop_policy
asyncio.set_event_loop_policy(self)
return self

def _new_loop(self, new_loop: asyncio.AbstractEventLoop):
"""Let a new loop enter the stage."""
if self._ctrl_c_fix_future is not None:
self._ctrl_c_fix_future.cancel()
if new_loop is not None:
if sys.platform == 'win32' and sys.version_info < (3, 8):
# Fix the problem that Ctrl+C is only noticed when there's activity,
# i.e., work around bpo-23057, https://bugs.python.org/issue23057 ."""
async def ctrl_c_fix() -> None:
while True:
await asyncio.sleep(1)
self._ctrl_c_fix_future = new_loop.run_in_executor(None, ctrl_c_fix)
self._loop = new_loop

def __exit__(self, _ex_type, _ex_value, _ex_traceback):
"""Clean up after ourselves."""
try:
if self._loop is not None:
if self._loop.is_running():
if self._ctrl_c_fix_future is not None:
self._ctrl_c_fix_future.cancel()
if self._close_loop_at_exit:
self._loop.stop()
if self._close_loop_at_exit:
self._loop.close()
finally:
# Clean up:
asyncio.set_event_loop_policy(self._original_event_loop_policy)
# __exit__ should not be called again, but just to make sure:
self._loop = None
self._ctrl_c_fix_future = None
return False

def get_event_loop(self) -> asyncio.AbstractEventLoop:
"""See asyncio.AbstractEventLoopPolicy documentation."""
loop = self._base_event_loop_policy.get_event_loop()
if loop != self._loop:
self._new_loop(loop)
return self._loop

def set_event_loop(self, loop: asyncio.AbstractEventLoop) -> None:
"""See asyncio.AbstractEventLoopPolicy documentation."""
if loop != self._loop:
self._new_loop(loop)
self._base_event_loop_policy.set_event_loop(loop)

def new_event_loop(self) -> asyncio.AbstractEventLoop:
"""See asyncio.AbstractEventLoopPolicy documentation."""
# As the loop is just constructed, but not yet set,
# we dare to not call _new_loop() yet:
return self._base_event_loop_policy.new_event_loop()

def get_child_watcher(self) -> AbstractChildWatcher:
"""See asyncio.AbstractEventLoopPolicy documentation."""
return self._base_event_loop_policy.get_child_watcher()

def set_child_watcher(self, watcher: AbstractChildWatcher) -> None:
"""See asyncio.AbstractEventLoopPolicy documentation."""
self._base_event_loop_policy.set_child_watcher(watcher)


class NikolaTaskLoader(TaskLoader2):
"""Nikola-specific task loader."""

Expand Down Expand Up @@ -379,7 +496,8 @@ def run(self, cmd_args):
"existing Nikola site.")
return 3
try:
return super().run(cmd_args)
with FixingEventLoopPolicy(close_loop_at_exit=True):
return super().run(cmd_args)
except Exception:
LOGGER.error('An unhandled exception occurred.')
if self.nikola.debug or self.nikola.show_tracebacks:
Expand Down
29 changes: 6 additions & 23 deletions nikola/plugins/command/auto/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@
REBUILDING_REFRESH_DELAY = 0.35
IDLE_REFRESH_DELAY = 0.05

if sys.platform == 'win32':
asyncio.set_event_loop(asyncio.ProactorEventLoop())


def base_path_from_siteuri(siteuri: str) -> str:
"""Extract the path part from a URI such as site['SITE_URL'].
Expand Down Expand Up @@ -275,27 +272,18 @@ def _execute(self, options, args):
self.wd_observer.schedule(ConfigEventHandler(_conf_fn, self.queue_rebuild, loop), _conf_dn, recursive=False)
self.wd_observer.start()

win_sleeper = None
# https://bugs.python.org/issue23057 (fixed in Python 3.8)
if sys.platform == 'win32' and sys.version_info < (3, 8):
win_sleeper = asyncio.ensure_future(windows_ctrlc_workaround())

if not self.has_server:
self.logger.info("Watching for changes...")
# Run the event loop forever (no server mode).
try:
# Run rebuild queue
loop.run_until_complete(self.run_rebuild_queue())

loop.run_forever()
except KeyboardInterrupt:
pass
finally:
if win_sleeper:
win_sleeper.cancel()
self.wd_observer.stop()
self.wd_observer.join()
loop.close()
return

if options['ipv6'] or '::' in host:
Expand Down Expand Up @@ -326,16 +314,17 @@ def _execute(self, options, args):
pass
finally:
self.logger.info("Server is shutting down.")
if win_sleeper:
win_sleeper.cancel()
if self.dns_sd:
self.dns_sd.Reset()
rebuild_queue_fut.cancel()
reload_queue_fut.cancel()

# Not sure why this isn't done by the web_runner.cleanup() code:
loop.run_until_complete(self.remove_websockets(None))

loop.run_until_complete(self.web_runner.cleanup())
self.wd_observer.stop()
self.wd_observer.join()
loop.close()

async def set_up_server(self, host: str, port: int, base_path: str, out_folder: str) -> None:
"""Set up aiohttp server and start it."""
Expand Down Expand Up @@ -470,6 +459,7 @@ async def websocket_handler(self, request):
elif msg.type == aiohttp.WSMsgType.CLOSE:
self.logger.debug("Closing WebSocket")
await ws.close()

break
elif msg.type == aiohttp.WSMsgType.ERROR:
self.logger.error('WebSocket connection closed with exception {0}'.format(ws.exception()))
Expand All @@ -482,7 +472,7 @@ async def websocket_handler(self, request):

return ws

async def remove_websockets(self, app) -> None:
async def remove_websockets(self, _app) -> None:
"""Remove all websockets."""
for ws in self.sockets:
await ws.close()
Expand Down Expand Up @@ -512,13 +502,6 @@ async def send_to_websockets(self, message: dict) -> None:
self.sockets.remove(ws)


async def windows_ctrlc_workaround() -> None:
"""Work around bpo-23057."""
# https://bugs.python.org/issue23057
while True:
await asyncio.sleep(1)


class IndexHtmlStaticResource(StaticResource):
"""A StaticResource implementation that serves /index.html in directory roots."""

Expand Down
64 changes: 18 additions & 46 deletions tests/integration/test_dev_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import pathlib
import requests
import socket
import sys
from typing import Optional, Tuple, Any, Dict

from ..helper import FakeSite
Expand Down Expand Up @@ -38,6 +37,7 @@ def find_unused_port() -> int:

class MyFakeSite(FakeSite):
def __init__(self, config: Dict[str, Any], configuration_filename="conf.py"):
super(MyFakeSite, self).__init__()
self.configured = True
self.debug = True
self.THEMES = []
Expand Down Expand Up @@ -70,13 +70,13 @@ def test_serves_root_dir(
test_was_successful = False
test_problem_description = "Async test setup apparently broken"
test_inner_error: Optional[BaseException] = None
loop_for_this_test = None
loop = None

async def grab_loop_and_run_test() -> None:
nonlocal test_problem_description, loop_for_this_test
nonlocal test_problem_description, loop

loop_for_this_test = asyncio.get_running_loop()
watchdog_handle = loop_for_this_test.call_later(TEST_MAX_DURATION, lambda: loop_for_this_test.stop())
loop = asyncio.get_running_loop()
watchdog_handle = loop.call_later(TEST_MAX_DURATION, loop.stop)
test_problem_description = f"Test did not complete within {TEST_MAX_DURATION} seconds."

def run_test() -> None:
Expand Down Expand Up @@ -113,13 +113,11 @@ def run_test() -> None:
LOGGER.info("Test completed successfully.")
else:
LOGGER.error("Test failed: %s", test_problem_description)
loop_for_this_test.call_soon_threadsafe(lambda: watchdog_handle.cancel())
loop.call_soon_threadsafe(watchdog_handle.cancel)
# Simulate Ctrl+C:
loop.call_soon_threadsafe(lambda: loop.call_later(0.01, loop.stop))

# We give the outer grab_loop_and_run_test a chance to complete
# before burning the bridge:
loop_for_this_test.call_soon_threadsafe(lambda: loop_for_this_test.call_later(0.05, lambda: loop_for_this_test.stop()))

await loop_for_this_test.run_in_executor(None, run_test)
await loop.run_in_executor(None, run_test)

# We defeat the nikola site building functionality, so this does not actually get called.
# But the code setting up site building wants a command list:
Expand All @@ -128,40 +126,14 @@ def run_test() -> None:
# Defeat the site building functionality, and instead insert the test:
command_auto.run_initial_rebuild = grab_loop_and_run_test

try:
# Start the development server
# which under the hood runs our test when trying to build the site:
command_auto.execute(options=options)

# Verify the test succeeded:
if test_inner_error is not None:
raise test_inner_error
assert test_was_successful, test_problem_description
finally:
# Nikola is written with the assumption that it can
# create the event loop at will without ever cleaning it up.
# As this tests runs several times in succession,
# that assumption becomes a problem.
LOGGER.info("Cleaning up loop.")
# Loop cleanup:
assert loop_for_this_test is not None
assert not loop_for_this_test.is_running()
loop_for_this_test.close()
asyncio.set_event_loop(None)
# We would like to leave it at that,
# but doing so causes the next test to fail.
#
# We did not find asyncio - API to reset the loop
# to "back to square one, as if just freshly started".
#
# The following code does not feel right, it's a kludge,
# but it apparently works for now:
if sys.platform == 'win32':
# For this case, the auto module has special code
# (at module load time! 😟) which we reluctantly reproduce here:
asyncio.set_event_loop(asyncio.ProactorEventLoop())
else:
asyncio.set_event_loop(asyncio.new_event_loop())
# Start the development server
# which under the hood runs our test when trying to build the site:
command_auto.execute(options=options)

# Verify the test succeeded:
if test_inner_error is not None:
raise test_inner_error
assert test_was_successful, test_problem_description


@pytest.fixture(scope="module",
Expand All @@ -184,7 +156,7 @@ def site_and_base_path(request) -> Tuple[MyFakeSite, str]:
"SITE_URL": request.param,
"OUTPUT_FOLDER": OUTPUT_FOLDER.as_posix(),
}
return (MyFakeSite(config), auto.base_path_from_siteuri(request.param))
return MyFakeSite(config), auto.base_path_from_siteuri(request.param)


@pytest.fixture(scope="module")
Expand Down

0 comments on commit 8e60abc

Please sign in to comment.