Skip to content

Commit

Permalink
Black format source code
Browse files Browse the repository at this point in the history
  • Loading branch information
saraedum committed Nov 20, 2024
1 parent 515316f commit 428b096
Show file tree
Hide file tree
Showing 15 changed files with 138 additions and 42 deletions.
1 change: 1 addition & 0 deletions flatsurvey/cache/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def _is_empty(file):
pos = file.tell()

import os

file.seek(0, os.SEEK_END)
if file.tell() == 0:
return True
Expand Down
8 changes: 4 additions & 4 deletions flatsurvey/cache/externalize_pickles.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ def click(jsons, pickles):
return {
"goals": [ExternalizePickles],
"bindings": [
PartialBindingSpec(ExternalizePickles)(
jsons=jsons, pickle_dir=pickles
)
PartialBindingSpec(ExternalizePickles)(jsons=jsons, pickle_dir=pickles)
],
}

Expand All @@ -78,7 +76,9 @@ def externalize(json):
import os.path

if self._pickle_dir is not None:
fname = os.path.join(self._pickle_dir, f"{hash}.pickle.gz")
fname = os.path.join(
self._pickle_dir, f"{hash}.pickle.gz"
)

import gzip

Expand Down
1 change: 1 addition & 0 deletions flatsurvey/cache/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ async def resolve(self):
for json in self._jsons:
with open(json) as input:
from flatsurvey.cache.cache import Cache

if Cache._is_empty(input):
continue

Expand Down
22 changes: 18 additions & 4 deletions flatsurvey/jobs/orbit_closure.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,15 +218,21 @@ def click(stale_limit, expansions_limit, deform, cache_only):
return {
"goals": [OrbitClosure],
"bindings": OrbitClosure.bindings(
stale_limit=stale_limit, expansions_limit=expansions_limit, deform=deform, cache_only=cache_only
stale_limit=stale_limit,
expansions_limit=expansions_limit,
deform=deform,
cache_only=cache_only,
),
}

@classmethod
def bindings(cls, stale_limit, expansions_limit, deform, cache_only):
return [
PartialBindingSpec(OrbitClosure)(
stale_limit=stale_limit, expansions_limit=expansions_limit, deform=deform, cache_only=cache_only
stale_limit=stale_limit,
expansions_limit=expansions_limit,
deform=deform,
cache_only=cache_only,
)
]

Expand Down Expand Up @@ -362,7 +368,11 @@ async def _consume(self, decomposition, cost):

return Goal.COMPLETED

if not self._deformed and self.dimension > 3 and self._directions >= self._stale_limit:
if (
not self._deformed
and self.dimension > 3
and self._directions >= self._stale_limit
):
self._progress.progress(message="deforming surface")

tangents = [
Expand All @@ -380,7 +390,11 @@ def upper_bound(v):
def height(v):
bound = upper_bound(v)

return max(c.height() for x in v for c in (x.parent().number_field(x) / bound).list())
return max(
c.height()
for x in v
for c in (x.parent().number_field(x) / bound).list()
)

tangents.sort(key=height)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def click(limit):
)
],
}

_hacks_enabled = False

@classmethod
Expand Down
20 changes: 15 additions & 5 deletions flatsurvey/limits.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

logger = logging.getLogger()


class Limit:
def __init__(self, limit):
self._limit = limit
Expand All @@ -22,6 +23,7 @@ def start(self):
assert self._task is None

import asyncio

loop = asyncio.get_running_loop()
self._task = loop.create_task(self._run())

Expand All @@ -34,17 +36,21 @@ def check(self):
if not self._limit.check():
self._callback()
self.stop()

async def _run(self):
try:
while True:
self.check()

import asyncio

await asyncio.sleep(self._period)
except Exception as e:
import traceback
logging.error(f"limit checker crashed with {''.join(traceback.format_exception(e))}")

logging.error(
f"limit checker crashed with {''.join(traceback.format_exception(e))}"
)


class TimeLimit(Limit):
Expand All @@ -56,7 +62,8 @@ def __init__(self, limit):
@staticmethod
def parse_limit(limit):
import pandas
return pandas.Timedelta(limit).to_pytimedelta()

return pandas.Timedelta(limit).to_pytimedelta()

def check(self):
import time
Expand All @@ -78,6 +85,7 @@ class MemoryLimit(Limit):
@staticmethod
def parse_limit(limit):
import psutil

ram = psutil.virtual_memory().total
cpus = psutil.cpu_count()

Expand All @@ -91,11 +99,13 @@ def parse_limit(limit):
return int(ram / cpus * 1.5)

from datasize import DataSize

return int(DataSize(limit))

@staticmethod
def memory():
import os

pid = os.getpid()

smap = f"/proc/{pid}/smaps"
Expand All @@ -106,11 +116,11 @@ def memory():
for line in smap:
items = line.split()

if not items[0].endswith(':'):
if not items[0].endswith(":"):
continue
if len(items) != 3:
continue
if items[2] != 'kB':
if items[2] != "kB":
continue

key = items[0][:-1]
Expand Down
1 change: 1 addition & 0 deletions flatsurvey/pipeline/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ async def resolve(self):
return not Consumer.COMPLETED

import asyncio

await asyncio.sleep(0)

return Consumer.COMPLETED
Expand Down
5 changes: 4 additions & 1 deletion flatsurvey/pipeline/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ def wrap(**kwargs):
{
f"provide_{name}": provider,
"__repr__": lambda self: f"{name} binding to {prototype.__name__}",
"__reduce__": lambda self: (PartialBindingSpec_unpickle, ((prototype, name, scope), kwargs))
"__reduce__": lambda self: (
PartialBindingSpec_unpickle,
((prototype, name, scope), kwargs),
),
},
)

Expand Down
13 changes: 7 additions & 6 deletions flatsurvey/reporting/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ def __init__(self, surface, output="-", pickles=False):
type=click.Path(exists=True, file_okay=False, dir_okay=True, allow_dash=False),
default=None,
)
@click.option(
"--pickles/--no-pickles", default=False)
@click.option("--pickles/--no-pickles", default=False)
def click(output, prefix, pickles):
return {
"bindings": Json.bindings(output=output, prefix=prefix, pickles=pickles),
Expand All @@ -91,12 +90,11 @@ def click(output, prefix, pickles):

@classmethod
def bindings(cls, output, prefix=None, pickles=False):
return [
JsonBindingSpec(output=output, prefix=prefix, pickles=pickles)
]
return [JsonBindingSpec(output=output, prefix=prefix, pickles=pickles)]

def deform(self, deformation):
from flatsurvey.pipeline.util import FactoryBindingSpec

return {
"bindings": [FactoryBindingSpec("json", lambda surface: self)],
"reporters": [Json],
Expand Down Expand Up @@ -207,7 +205,10 @@ def flush(self):
import json

from contextlib import nullcontext
with (open(self._output, "w") if self._output != "-" else nullcontext(sys.stdout)) as stream:

with (
open(self._output, "w") if self._output != "-" else nullcontext(sys.stdout)
) as stream:
stream.write(json.dumps(self._data, default=self._serialize_to_pickle))
stream.flush()

Expand Down
7 changes: 4 additions & 3 deletions flatsurvey/reporting/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(self, surface, stream=None):

if self._stream is None:
import sys

self._stream = sys.stdout

def _log_prefix(self, source):
Expand Down Expand Up @@ -117,12 +118,11 @@ def click(output, prefix):

@classmethod
def bindings(cls, output, prefix=None):
return [
LogBindingSpec(output=output, prefix=prefix)
]
return [LogBindingSpec(output=output, prefix=prefix)]

def deform(self, deformation):
from flatsurvey.pipeline.util import FactoryBindingSpec

return {
"bindings": [FactoryBindingSpec("log", lambda surface: self)],
"reporters": [Log],
Expand Down Expand Up @@ -210,6 +210,7 @@ def __init__(self, output, prefix):
def provide_log(self, surface):
if self._output == "-" or (self._output is None and self._prefix is None):
import sys

stream = None
elif self._output is not None:
stream = open(self._output, "w")
Expand Down
38 changes: 32 additions & 6 deletions flatsurvey/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,20 @@ async def _create_pool(self):
TODO: Make configurable (also without environment variables.) and comment what does not need to be configured.
"""
import dask.config
dask.config.set({'distributed.worker.daemon': False})

dask.config.set({"distributed.worker.daemon": False})

import dask.distributed
pool = await dask.distributed.Client(scheduler_file=self._scheduler, direct_to_workers=True, connection_limit=2**16, asynchronous=True, n_workers=8, nthreads=1, preload="flatsurvey.worker.dask")

pool = await dask.distributed.Client(
scheduler_file=self._scheduler,
direct_to_workers=True,
connection_limit=2**16,
asynchronous=True,
n_workers=8,
nthreads=1,
preload="flatsurvey.worker.dask",
)

return pool

Expand Down Expand Up @@ -107,12 +117,19 @@ async def start(self):
what="tasks running",
) as execution_progress:
from more_itertools import roundrobin

surfaces = roundrobin(*self._generators)

pending = []

async def schedule_one():
return await self._schedule(pool, pending, surfaces, self._goals, scheduling_progress)
return await self._schedule(
pool,
pending,
surfaces,
self._goals,
scheduling_progress,
)

async def consume_one():
return await self._consume(pool, pending)
Expand Down Expand Up @@ -212,19 +229,26 @@ async def _schedule(self, pool, pending, surfaces, goals, scheduling_progress):
continue

bindings = list(self._bindings)
bindings = [binding for binding in bindings if not hasattr(binding, "provide_cache")]
bindings = [
binding for binding in bindings if not hasattr(binding, "provide_cache")
]
bindings.append(SurfaceBindingSpec(surface))

from flatsurvey.worker.dask import DaskTask
task = DaskTask(bindings=bindings, goals=self._goals, reporters=self._reporters)

task = DaskTask(
bindings=bindings, goals=self._goals, reporters=self._reporters
)

pending.append(pool.submit(task))
return True

async def _consume(self, pool, pending):
import dask.distributed

completed, still_pending = await dask.distributed.wait(pending, return_when='FIRST_COMPLETED')
completed, still_pending = await dask.distributed.wait(
pending, return_when="FIRST_COMPLETED"
)

pending.clear()
pending.extend(still_pending)
Expand Down Expand Up @@ -288,6 +312,8 @@ def __init__(self, goals):


import pinject


class SurfaceBindingSpec(pinject.BindingSpec):
def __init__(self, surface):
self._surface = surface
Expand Down
4 changes: 3 additions & 1 deletion flatsurvey/surfaces/ngons.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,9 @@ def __reduce__(self):

def __hash__(self):
if self.polygon.cache is None:
raise Exception("cannot hash Ngon whose polygon() has not been determined yet")
raise Exception(
"cannot hash Ngon whose polygon() has not been determined yet"
)

return hash((tuple(self.angles), self.polygon()))

Expand Down
4 changes: 3 additions & 1 deletion flatsurvey/survey.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ def survey(dry_run, debug, queue, verbose, scheduler):


@survey.result_callback()
def process(subcommands, dry_run=False, debug=False, queue=128, verbose=0, scheduler=None):
def process(
subcommands, dry_run=False, debug=False, queue=128, verbose=0, scheduler=None
):
r"""
Run the specified subcommands of ``survey``.
Expand Down
Loading

0 comments on commit 428b096

Please sign in to comment.