Skip to content

Commit

Permalink
support serial running
Browse files Browse the repository at this point in the history
  • Loading branch information
karlicoss committed Jan 9, 2024
1 parent 32637fa commit 0525d0e
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 27 deletions.
59 changes: 32 additions & 27 deletions exobrain/src/build.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
#!/usr/bin/env python3
import argparse
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import Executor, ProcessPoolExecutor
from dataclasses import dataclass
from pathlib import Path
import re
import sys
from shutil import rmtree, copy
from subprocess import check_call, check_output, run, DEVNULL
from typing import Optional


from bs4 import BeautifulSoup # type: ignore[import]
from more_itertools import divide
from loguru import logger

from utils import make_soup
from dummy_executor import DummyExecutor


def ccall(*args, **kwargs):
print(args, file=sys.stderr)
return check_call(*args, **kwargs)


_root = Path(__file__).absolute().parent.parent
src = _root / 'src'

Expand Down Expand Up @@ -95,6 +98,7 @@ def main() -> None:
p.add_argument('--under-entr', action='store_true') # ugh.
p.add_argument('--add', action='store_true')
p.add_argument('--filter', type=str, default=None)
p.add_argument('--workers', type=int, default=None, help='by default, uses all available cores')
args = p.parse_args()

assert not args.md # broken for now
Expand All @@ -119,11 +123,14 @@ def main() -> None:
if not args.under_entr:
clean(cfg=cfg, skip_org_export=not args.org)

if args.org:
publish_org(cfg)
workers: Optional[int] = args.workers
pool = ProcessPoolExecutor(max_workers=workers) if workers != 0 else DummyExecutor()
with pool:
if args.org:
publish_org(cfg, pool=pool)

if args.html:
publish_html(cfg)
if args.html:
publish_html(cfg, pool=pool)


@dataclass
Expand Down Expand Up @@ -357,7 +364,7 @@ def sort_key(x):
fo.write(' ' * level + f'- [[file:{rp}][{title}]]\n')


def publish_org(cfg: Config) -> None:
def publish_org(cfg: Config, *, pool: Executor) -> None:
"""
Publishies intermediate ('public') org-mode
"""
Expand All @@ -368,22 +375,21 @@ def publish_org(cfg: Config) -> None:
ctx = Context(input_dir=input_dir, output_dir=public_dir)
inputs = sorted(input_dir.rglob('*.org'))

with ProcessPoolExecutor() as pool:
workers = pool._max_workers # type: ignore[attr-defined]
logger.debug(f'using {workers} workers')
groups = [list(group) for group in divide(workers, inputs)]
futures = [pool.submit(compile_org_to_org, ctx, group) for group in groups]
for group, fut in zip(groups, futures):
try:
fut.result()
except Exception as e:
raise RuntimeError(f'error while processing {group}') from e
workers = pool._max_workers # type: ignore[attr-defined]
logger.debug(f'using {workers} workers')
groups = [list(group) for group in divide(workers, inputs)]
futures = [pool.submit(compile_org_to_org, ctx, group) for group in groups]
for group, fut in zip(groups, futures):
try:
fut.result()
except Exception as e:
raise RuntimeError(f'error while processing {group}') from e

# TODO maybe collect titles from compile_org_to_org calls?
publish_org_sitemap(public_dir)


def publish_html(cfg: Config) -> None:
def publish_html(cfg: Config, *, pool: Executor) -> None:
"""
Publishes html from 'public' org-mode
"""
Expand All @@ -400,16 +406,15 @@ def publish_html(cfg: Config) -> None:
compile_org_to_html(ctx, [sitemap])
##

with ProcessPoolExecutor() as pool:
workers = pool._max_workers # type: ignore[attr-defined]
logger.debug(f'using {workers} workers')
groups = [list(group) for group in divide(workers, inputs)]
futures = [pool.submit(compile_org_to_html, ctx, group) for group in groups]
for group, fut in zip(groups, futures):
try:
fut.result()
except Exception as e:
raise RuntimeError(f'error while processing {group}') from e
workers = pool._max_workers # type: ignore[attr-defined]
logger.debug(f'using {workers} workers')
groups = [list(group) for group in divide(workers, inputs)]
futures = [pool.submit(compile_org_to_html, ctx, group) for group in groups]
for group, fut in zip(groups, futures):
try:
fut.result()
except Exception as e:
raise RuntimeError(f'error while processing {group}') from e

# for f in public_dir.rglob('*.org'):
# assert not f.is_symlink(), f # just in case?
Expand Down
26 changes: 26 additions & 0 deletions exobrain/src/dummy_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# https://stackoverflow.com/a/10436851/706389
from typing import Any, Optional
from concurrent.futures import Future, Executor
class DummyExecutor(Executor):
def __init__(self, max_workers: Optional[int]=1) -> None:
self._shutdown = False
self._max_workers = max_workers

def submit(self, fn, *args, **kwargs):
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')

f: Future[Any] = Future()
try:
result = fn(*args, **kwargs)
except KeyboardInterrupt:
raise
except BaseException as e:
f.set_exception(e)
else:
f.set_result(result)

return f

def shutdown(self, wait: bool=True, **kwargs) -> None:
self._shutdown = True

0 comments on commit 0525d0e

Please sign in to comment.