Skip to content

Commit

Permalink
Prepare for v0.0.2 (first public beta) (#84)
Browse files Browse the repository at this point in the history
* add time chunks to gain_chunks dict

* bump version

* clean up setup.py

* check

* cleanup spotless worker

* import from qcal instead of redefining

* clean up dist options

* towards parametric models

* degrid from parametrised model

* reweight on convergence of primal dual

* add crude field, ddid, scan selection logic for init worker

* vertical parallelism in weight_data

* reinstate wavelet tests

* remove use of make_noncritical in psi funcs

* use more of the PSF to determine clean beam

* switch to default qcal branch

* add postfix to spotless outputs

* use map_blocks in l2reweight to avoid div by zero

* fixes to forward worker

* add min_val to model2comps

* add channel selection

* set freq range to none when not supplied

* fix failing tests

* do not optimise grap in grid

* dont use delayed, just dont

* do not print emaj emin

* Add test for spotless workflow

* move model2comps fitting into separate function and add test

* orthogonal polynomials for the win

* extend test for spotless workflow

* fix interp when nband!=nbasisf

* remove full model from coeff dataset

* more flexible restore worker (optional psf)

* initial implementation

* bug in restore when only single component present

* start adding Blocker wrapper

* GaussPars typo in restore

* prefer clone over inline_array

* chunk freq axis before sum overlap

* counts None

* allow changing model resolution in grid

* rechunk chan axis after concatenation

* do not chunk on list

* nx -> npix_x

* x0 -> center_x

* extrapolate with zeros

* remove pdb imports

* do not modify in place in normwsum

* remove trailing .zarr when naming outputs

* dual update to avoid duplicate arrays

* simplify dual update

* norm_diff in primal dual

* rm large islands function:

* ok

* TypeError: unhashable type: 'Array' when using blocker

* still weirdness

* do not fit psf cube if no restored products cubes requested

* argmax instead of searchsorted for freq selection

* fix freq selection

* Add pip update step to readme

* correct chan chunks when cpi=-1

* prefer requests over packratt

* ANTENNA1->ANTENNA2 in test framework

* remove cell_scaling lie

* grid running through

* fix failing tests

* tbump attempt 1

* add tbump.toml file

* fix failing dual_update test

* add min_val to model2comps

* add CI step to publish to Test PyPI

---------

Co-authored-by: landmanbester <[email protected]>
  • Loading branch information
landmanbester and landmanbester authored Jul 28, 2023
1 parent 79ea206 commit c07aa50
Show file tree
Hide file tree
Showing 45 changed files with 2,916 additions and 2,441 deletions.
35 changes: 31 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,35 @@ jobs:
deploy:
needs: [test]
runs-on: ubuntu-latest

# Run on a push to a tag or master
if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags')
steps:
- name: Deploy Step
run: >
echo "Deploying PyPI"
- name: Set up Python 3.8
uses: actions/setup-python@v4
with:
python-version: 3.8

- name: Install latest setuptools, wheel, pip
run: python3 -m pip install -U pip setuptools wheel

- name: Checkout source
uses: actions/checkout@v4
with:
fetch-depth: 1

- name: Build distributions
run: python setup.py sdist bdist_wheel

- name: Publish distribution to Test PyPI
uses: pypa/[email protected]
with:
user: __token__
password: ${{ secrets.TESTTOKEN }}
repository_url: https://test.pypi.org/legacy/
continue-on-error: false

- name: Publish distribution 📦 to PyPI
uses: pypa/gh-action-pypi-publish@master
with:
user: __token__
password: ${{ secrets.PYPI_API_TOKEN }}
41 changes: 0 additions & 41 deletions .github/workflows/publish.yml

This file was deleted.

2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ RUN apt -y update && \


RUN python -m pip install -U pip setuptools wheel && \
python -m pip install -U pfb-clean@git+https://github.com/ratt-ru/pfb-clean@mappings && \
python -m pip install -U pfb-clean@git+https://github.com/ratt-ru/pfb-clean@main && \
python -m pip install numpy==1.22 && \
python -m pip cache purge
4 changes: 4 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ Install the package by cloning and running

Note casacore needs to be installed on the system for this to work.

You may also need to update pip eg.

:code:`$ pip install -U pip setuptools wheel`

It is strongly recommended to install ducc in no binary mode eg

:code:`$ git clone https://gitlab.mpcdf.mpg.de/mtr/ducc.git`
Expand Down
73 changes: 25 additions & 48 deletions pfb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,54 +12,32 @@
def set_client(opts, stack, log, scheduler='distributed'):

from omegaconf import open_dict
# number of threads per worker
if opts.nthreads is None:
if opts.host_address is not None:
raise ValueError("You have to specify nthreads when using a distributed scheduler")
import multiprocessing
nthreads = multiprocessing.cpu_count()
with open_dict(opts):
opts.nthreads = nthreads
else:
nthreads = int(opts.nthreads)

# deprecated for now
# # configure memory limit
# if opts.mem_limit is None:
# if opts.host_address is not None:
# raise ValueError("You have to specify mem-limit when using a distributed scheduler")
# import psutil
# mem_limit = int(psutil.virtual_memory()[1]/1e9) # all available memory by default
# with open_dict(opts):
# opts.mem_limit = mem_limit
# else:
# mem_limit = int(opts.mem_limit)

# the number of chunks being read in simultaneously is equal to
# the number of dask threads
nthreads_dask = opts.nworkers * opts.nthreads_per_worker

# attempt somewhat intelligent default setup
import multiprocessing
nthreads_max = max(multiprocessing.cpu_count(), 1)
if opts.nvthreads is None:
# we allocate half by default
nthreads_tot = max(nthreads_max//2, 1)
if opts.scheduler in ['single-threaded', 'sync']:
nvthreads = nthreads
elif opts.host_address is not None:
nvthreads = max(nthreads//opts.nthreads_per_worker, 1)
with open_dict(opts):
opts.nvthreads = nthreads_tot
else:
nvthreads = max(nthreads//nthreads_dask, 1)
with open_dict(opts):
opts.nvthreads = nvthreads
ndask_chunks = opts.nthreads_dask*opts.nworkers
nvthreads = max(nthreads_tot//ndask_chunks, 1)
with open_dict(opts):
opts.nvthreads = nvthreads

os.environ["OMP_NUM_THREADS"] = str(opts.nvthreads)
os.environ["OPENBLAS_NUM_THREADS"] = str(opts.nvthreads)
os.environ["MKL_NUM_THREADS"] = str(opts.nvthreads)
os.environ["VECLIB_MAXIMUM_THREADS"] = str(opts.nvthreads)
os.environ["NUMBA_NUM_THREADS"] = str(opts.nthreads)
# avoids numexpr error, probably don't want more than 10 vthreads for ne anyway
import numexpr as ne
max_cores = ne.detect_number_of_cores()
ne_threads = min(max_cores, opts.nthreads)
os.environ["NUMEXPR_NUM_THREADS"] = str(ne_threads)
# ne_threads = min(max_cores, opts.nvthreads)
os.environ["NUMEXPR_NUM_THREADS"] = str(max_cores)
os.environ["NUMBA_NUM_THREADS"] = str(max_cores)

import dask
if scheduler=='distributed':
# TODO - investigate what difference this makes
# with dask.config.set({"distributed.scheduler.worker-saturation": 1.1}):
Expand All @@ -70,31 +48,30 @@ def set_client(opts, stack, log, scheduler='distributed'):
print("Initialising distributed client.", file=log)
client = stack.enter_context(Client(opts.host_address))
else:
if nthreads_dask * opts.nvthreads > opts.nthreads:
if opts.nthreads_dask * opts.nvthreads > nthreads_max:
print("Warning - you are attempting to use more threads than "
"available. This may lead to suboptimal performance.",
file=log)
from dask.distributed import Client, LocalCluster
print("Initialising client with LocalCluster.", file=log)
cluster = LocalCluster(processes=True, n_workers=opts.nworkers,
threads_per_worker=ops.nthreads_per_worker,
memory_limit=0) # str(mem_limit/nworkers)+'GB'
cluster = stack.enter_context(cluster)
client = stack.enter_context(Client(cluster))
with dask.config.set({"distributed.scheduler.worker-saturation": 1.1}):
cluster = LocalCluster(processes=True, n_workers=opts.nworkers,
threads_per_worker=opts.nthreads_dask,
memory_limit=0) # str(mem_limit/nworkers)+'GB'
cluster = stack.enter_context(cluster)
client = stack.enter_context(Client(cluster))

from pfb.scheduling import install_plugin
from quartical.scheduling import install_plugin
client.run_on_scheduler(install_plugin)
client.wait_for_workers(opts.nworkers)
elif scheduler in ['sync', 'single-threaded']:
import dask
dask.config.set(scheduler=scheduler)
print(f"Initialising with synchronous scheduler",
file=log)
elif scheduler=='threads':
import dask
from multiprocessing.pool import ThreadPool
dask.config.set(pool=ThreadPool(nthreads_dask))
print(f"Initialising ThreadPool with {nthreads_dask} threads",
dask.config.set(pool=ThreadPool(opts.nthreads_dask))
print(f"Initialising ThreadPool with {opts.nthreads_dask} threads",
file=log)
else:
raise ValueError(f"Unknown scheduler option {opts.scheduler}")
Expand Down
Loading

0 comments on commit c07aa50

Please sign in to comment.