diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..3ad973c --- /dev/null +++ b/.coveragerc @@ -0,0 +1,7 @@ +[run] +omit = + */tests/* + +[report] +omit = + */tests/* diff --git a/uvtools/tests/__init__.py b/uvtools/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/uvtools/tests/test_dspec.py b/uvtools/tests/test_dspec.py index 6afc267..4b0b472 100644 --- a/uvtools/tests/test_dspec.py +++ b/uvtools/tests/test_dspec.py @@ -1,5 +1,4 @@ -import unittest -import uvtools.dspec as dspec +from .. import dspec import numpy as np, random import pytest from pyuvdata import UVData @@ -9,203 +8,202 @@ import warnings random.seed(0) -class TestMethods(unittest.TestCase): - - def test_wedge_width(self): - # Test boundaries of delay bins - self.assertEqual(dspec.wedge_width(0, .01, 10), (1,10)) - self.assertEqual(dspec.wedge_width(5., .01, 10), (1,10)) - self.assertEqual(dspec.wedge_width( 9., .01, 10), (2,-1)) - self.assertEqual(dspec.wedge_width(10., .01, 10), (2,-1)) - self.assertEqual(dspec.wedge_width(15., .01, 10), (3,-2)) - # test nchan - self.assertEqual(dspec.wedge_width(10., .01, 20), (3,-2)) - self.assertEqual(dspec.wedge_width(10., .01, 40), (5,-4)) - # test sdf - self.assertEqual(dspec.wedge_width(10., .02, 10), (3,-2)) - self.assertEqual(dspec.wedge_width(10., .04, 10), (5,-4)) - # test standoff - self.assertEqual(dspec.wedge_width(100., .001, 100, standoff=4.), (11,-10)) - self.assertEqual(dspec.wedge_width(100., .001, 100, standoff=5.), (11,-10)) - self.assertEqual(dspec.wedge_width(100., .001, 100, standoff=10.), (12,-11)) - self.assertEqual(dspec.wedge_width(100., .001, 100, standoff=15.), (13,-12)) - # test horizon - self.assertEqual(dspec.wedge_width(100., .001, 100, horizon=.1), (2,-1)) - self.assertEqual(dspec.wedge_width(100., .001, 100, horizon=.5), (6,-5)) - self.assertEqual(dspec.wedge_width(100., .001, 100, horizon=1.5), (16,-15)) - self.assertEqual(dspec.wedge_width(100., .001, 100, horizon=2.), (21,-20)) - - def test_delay_filter_dims(self): - self.assertRaises(ValueError, dspec.delay_filter, np.zeros((1,2,3)), np.zeros((1,2,3)), 0, .001) - - def test_delay_filter_1D(self): - NCHAN = 128 - TOL = 1e-6 - data = np.ones(NCHAN, dtype=np.complex) - wgts = .5*np.ones(NCHAN, dtype=np.complex) - dmdl, dres, info = dspec.delay_filter(data, wgts, 0., .1/NCHAN, tol=TOL) - np.testing.assert_allclose(data, dmdl, atol=NCHAN*TOL) - np.testing.assert_allclose(dres, np.zeros_like(dres), atol=NCHAN*TOL) - wgts[::16] = 0 - # This test should have been failing since _w = 0.46 but skip_wgt=0.5 by default for delay_filter. - # The reason it was not failing originally was because no 1d check for skip_wgt existed in high_pass_fourier_filter. - # This check does exist in fourier_filter (as it should) and now the test, in its original form, fails. - # I've changed the skip_wgt to 0.1 (down from 0.5) so that it passes. - dmdl, dres, info = dspec.delay_filter(data, wgts, 0., .1/NCHAN, tol=TOL, skip_wgt=0.1) - np.testing.assert_allclose(data, dmdl, atol=NCHAN*TOL) - np.testing.assert_allclose(dres, np.zeros_like(dres), atol=NCHAN*TOL) - data = np.random.normal(size=NCHAN) - wgts = np.ones_like(data) - dmdl, dres, info = dspec.delay_filter(data, wgts, 0., .1/NCHAN, tol=1e-9) - self.assertAlmostEqual(np.average(data), np.average(dmdl), 3) - self.assertAlmostEqual(np.average(dres), 0, 3) - - #check that skip_wgt is properly passed to clean - wgts[:72] = 0. - dmdl, dres, info = dspec.delay_filter(data, wgts, 0., .1/NCHAN, tol=TOL, skip_wgt=0.5, mode='clean') - assert info['status']['axis_1'][0] == 'skipped' - - def test_delay_filter_2D(self): - NCHAN = 128 - NTIMES = 10 - TOL = 1e-6 - data = np.ones((NTIMES, NCHAN), dtype=np.complex) - wgts = np.ones((NTIMES, NCHAN), dtype=np.complex) - dmdl, dres, info = dspec.delay_filter(data, wgts, 0., .1/NCHAN, tol=TOL) - np.testing.assert_allclose(data, dmdl, atol=NCHAN*TOL) - np.testing.assert_allclose(dres, np.zeros_like(dres), atol=NCHAN*TOL) - wgts[:,::16] = 0; - wgts*=.9 #tests to make sure wgts**2 normalization works - dmdl, dres, info = dspec.delay_filter(data, wgts, 0., .1/NCHAN, tol=TOL) - np.testing.assert_allclose(data, dmdl, atol=NCHAN*TOL) - np.testing.assert_allclose(dres, np.zeros_like(dres), atol=NCHAN*TOL) - data = np.array(np.random.normal(size=(NTIMES,NCHAN)),dtype=complex) - wgts = np.ones_like(data) - dmdl, dres, info = dspec.delay_filter(data, wgts, 0., .1/NCHAN, tol=1e-9) - np.testing.assert_allclose(np.average(data,axis=1), np.average(dmdl,axis=1), atol=1e-3) - np.testing.assert_allclose(np.average(dres,axis=1), 0, atol=1e-3) - #check that skip_wgt is properly passed to clean - wgts[0,:72] = 0. - dmdl, dres, info = dspec.delay_filter(data, wgts, 0., .1/NCHAN, tol=TOL, skip_wgt=0.5, mode='clean') - assert info['status']['axis_1'][0] == 'skipped' - assert info['status']['axis_1'][1] == 'success' - - def test_fourier_model(self): - NMAX = 7 - NFREQS = 100 - nmodes = 2*NMAX + 1 - cn = (np.arange(nmodes) + 1.j*np.arange(nmodes)) / float(nmodes) - model = dspec.fourier_model(cn, NFREQS) - - # Test shape of output model - self.assertEqual((NFREQS,), model.shape) - - # Test errors - pytest.raises(ValueError, dspec.fourier_model, 3, NFREQS) - pytest.raises(ValueError, dspec.fourier_model, np.empty((3, 3)), NFREQS) - - def test_delay_filter_leastsq(self): - NCHAN = 128 - NTIMES = 10 - TOL = 1e-7 - data = np.ones((NTIMES, NCHAN), dtype=np.complex) - flags = np.zeros((NTIMES, NCHAN), dtype=np.bool) - sigma = 0.1 # Noise level (not important here) - - # Fourier coeffs for input data, ordered from (-nmax, nmax) - cn = np.array([-0.1-0.1j, -0.1+0.1j, -0.3-0.01j, 0.5+0.01j, - -0.3-0.01j, -0.1+0.1j, 0.1-0.1j]) - data *= np.atleast_2d( dspec.fourier_model(cn, NCHAN) ) - - # Estimate smooth Fourier model on unflagged data - bf_model, cn_out, data_out = dspec.delay_filter_leastsq(data, flags, - sigma, nmax=3, - add_noise=False) - np.testing.assert_allclose(data, bf_model, atol=NCHAN*TOL) - np.testing.assert_allclose(cn, cn_out[0], atol=1e-6) - - # Estimate smooth Fourier model on data with some flags - flags[:,10] = True - flags[:,65:70] = True - bf_model, cn_out, data_out = dspec.delay_filter_leastsq(data, flags, - sigma, nmax=3, - add_noise=False) - np.testing.assert_allclose(data, bf_model, atol=NCHAN*TOL) - np.testing.assert_allclose(data, data_out, atol=NCHAN*TOL) - - # Test 1D code directly - bf_model, cn_out, data_out = dspec.delay_filter_leastsq_1d( - data[0], flags[0], sigma, nmax=3, add_noise=False) - np.testing.assert_allclose(data[0], bf_model, atol=NCHAN*TOL) - - # Test 1D code with non-linear leastsq - bf_model, cn_out, data_out = dspec.delay_filter_leastsq_1d( - data[0], flags[0], sigma, nmax=3, add_noise=False, use_linear=False) - np.testing.assert_allclose(data[0], bf_model, atol=NCHAN*TOL) - - # Test that noise injection can be switched on - bf_model, cn_out, data_out = dspec.delay_filter_leastsq_1d( - data[0], flags[0], sigma, nmax=3, add_noise=True) - np.testing.assert_allclose(data[0], bf_model, atol=NCHAN * TOL * sigma) - - # Test with a noise array - sigma_array = sigma * np.ones_like(data[0]) - bf_model, cn_out, data_out = dspec.delay_filter_leastsq_1d( - data[0], flags[0], sigma_array, nmax=3, add_noise=True) - np.testing.assert_allclose(data[0], bf_model, atol=NCHAN * TOL * sigma) - - # Test errors - pytest.raises(ValueError, dspec.delay_filter_leastsq_1d, - data[0], flags[0], sigma, nmax=3, operator=np.empty((3, 3))) - pytest.raises(ValueError, dspec.delay_filter_leastsq_1d, - data[0], flags[0], sigma, nmax=3, cn_guess=np.array([3])) - - def test_skip_wgt(self): - NCHAN = 128 - NTIMES = 10 - TOL = 1e-6 - data = np.ones((NTIMES, NCHAN), dtype=np.complex) - wgts = np.ones((NTIMES, NCHAN), dtype=np.complex) - wgts[0, 0:-4] = 0 - dmdl, dres, info = dspec.delay_filter(data, wgts, 0., .1/NCHAN, tol=TOL, skip_wgt=.1) - np.testing.assert_allclose(data[1:,:], dmdl[1:,:], atol=NCHAN*TOL) - np.testing.assert_allclose(dres[1:,:], np.zeros_like(dres)[1:,:], atol=NCHAN*TOL) - np.testing.assert_allclose(dmdl[0,:], np.zeros_like(dmdl[0,:]), atol=NCHAN*TOL) - np.testing.assert_allclose(dres[0,:], (data * wgts)[0,:], atol=NCHAN*TOL) - self.assertEqual(len(info['status']['axis_1']), NTIMES) - self.assertTrue(info['status']['axis_1'][i] == 'skipped' for i in list(info['status']['axis_1'])[::-1]) - - def test_calc_width(self): - # test single filter_size - nchan = 100 - dt = 10. - filter_size = 1e-2 - u, l = dspec.calc_width(filter_size, dt, nchan) - frs = np.fft.fftfreq(nchan, dt) # negative b/c of ifft convention - assert np.all(np.abs(frs[u:l]) > filter_size) - - # test multiple entries in filter_size - filter_size = (1e-2, 2e-2) - u, l = dspec.calc_width(filter_size, dt, nchan) - assert np.all((frs[u:l] < -1e-2) | (frs[u:l] > 2e-2)) - - def test_gen_window(self): - for w in ['none', 'blackmanharris', 'hann', 'tukey', 'barthann', 'blackmanharris-7term', - 'cosinesum-9term', 'cosinesum-11term']: - win = dspec.gen_window(w, 100) - assert len(win) == 100 - assert isinstance(win, np.ndarray) - assert win.min() >= 0.0 - assert win.max() <= 1.0 - pytest.raises(ValueError, dspec.gen_window, w, 100, normalization='foo') - win2 = dspec.gen_window(w, 100,normalization='mean') - assert np.all(np.isclose(win, win2*np.mean(win),atol=1e-6)) - win3 = dspec.gen_window(w, 100,normalization='rms') - assert np.all(np.isclose(win, win3*np.sqrt(np.mean(win**2.)),atol=1e-6)) - - pytest.raises(ValueError, dspec.gen_window, 'foo', 200) - # check Ncut ValueError - pytest.raises(ValueError, dspec.gen_window, 'bh', 200, edgecut_hi=101, edgecut_low=100) +def test_wedge_width(): + # Test boundaries of delay bins + assert dspec.wedge_width(0, .01, 10) == (1,10) + assert dspec.wedge_width(5., .01, 10) == (1,10) + assert dspec.wedge_width( 9., .01, 10) == (2,-1) + assert dspec.wedge_width(10., .01, 10) == (2,-1) + assert dspec.wedge_width(15., .01, 10) == (3,-2) + # test nchan + assert dspec.wedge_width(10., .01, 20) == (3,-2) + assert dspec.wedge_width(10., .01, 40) == (5,-4) + # test sdf + assert dspec.wedge_width(10., .02, 10) == (3,-2) + assert dspec.wedge_width(10., .04, 10) == (5,-4) + # test standoff + assert dspec.wedge_width(100., .001, 100, standoff=4.) == (11,-10) + assert dspec.wedge_width(100., .001, 100, standoff=5.) == (11,-10) + assert dspec.wedge_width(100., .001, 100, standoff=10.) == (12,-11) + assert dspec.wedge_width(100., .001, 100, standoff=15.) == (13,-12) + # test horizon + assert dspec.wedge_width(100., .001, 100, horizon=.1) == (2,-1) + assert dspec.wedge_width(100., .001, 100, horizon=.5) == (6,-5) + assert dspec.wedge_width(100., .001, 100, horizon=1.5) == (16,-15) + assert dspec.wedge_width(100., .001, 100, horizon=2.) == (21,-20) + +def test_delay_filter_dims(): + pytest.raises(ValueError, dspec.delay_filter, np.zeros((1,2,3)), np.zeros((1,2,3)), 0, .001) + +def test_delay_filter_1D(): + NCHAN = 128 + TOL = 1e-6 + data = np.ones(NCHAN, dtype=np.complex) + wgts = .5*np.ones(NCHAN, dtype=np.complex) + dmdl, dres, info = dspec.delay_filter(data, wgts, 0., .1/NCHAN, tol=TOL) + np.testing.assert_allclose(data, dmdl, atol=NCHAN*TOL) + np.testing.assert_allclose(dres, np.zeros_like(dres), atol=NCHAN*TOL) + wgts[::16] = 0 + # This test should have been failing since _w = 0.46 but skip_wgt=0.5 by default for delay_filter. + # The reason it was not failing originally was because no 1d check for skip_wgt existed in high_pass_fourier_filter. + # This check does exist in fourier_filter (as it should) and now the test, in its original form, fails. + # I've changed the skip_wgt to 0.1 (down from 0.5) so that it passes. + dmdl, dres, info = dspec.delay_filter(data, wgts, 0., .1/NCHAN, tol=TOL, skip_wgt=0.1) + np.testing.assert_allclose(data, dmdl, atol=NCHAN*TOL) + np.testing.assert_allclose(dres, np.zeros_like(dres), atol=NCHAN*TOL) + data = np.random.normal(size=NCHAN) + wgts = np.ones_like(data) + dmdl, dres, info = dspec.delay_filter(data, wgts, 0., .1/NCHAN, tol=1e-9) + assert np.allclose(np.average(data), np.average(dmdl), rtol=0, atol=1e-3) + assert np.allclose(np.average(dres), 0.0, rtol=0, atol=1e-3) + + #check that skip_wgt is properly passed to clean + wgts[:72] = 0. + dmdl, dres, info = dspec.delay_filter(data, wgts, 0., .1/NCHAN, tol=TOL, skip_wgt=0.5, mode='clean') + assert info['status']['axis_1'][0] == 'skipped' + +def test_delay_filter_2D(): + NCHAN = 128 + NTIMES = 10 + TOL = 1e-6 + data = np.ones((NTIMES, NCHAN), dtype=np.complex) + wgts = np.ones((NTIMES, NCHAN), dtype=np.complex) + dmdl, dres, info = dspec.delay_filter(data, wgts, 0., .1/NCHAN, tol=TOL) + np.testing.assert_allclose(data, dmdl, atol=NCHAN*TOL) + np.testing.assert_allclose(dres, np.zeros_like(dres), atol=NCHAN*TOL) + wgts[:,::16] = 0; + wgts*=.9 #tests to make sure wgts**2 normalization works + dmdl, dres, info = dspec.delay_filter(data, wgts, 0., .1/NCHAN, tol=TOL) + np.testing.assert_allclose(data, dmdl, atol=NCHAN*TOL) + np.testing.assert_allclose(dres, np.zeros_like(dres), atol=NCHAN*TOL) + data = np.array(np.random.normal(size=(NTIMES,NCHAN)),dtype=complex) + wgts = np.ones_like(data) + dmdl, dres, info = dspec.delay_filter(data, wgts, 0., .1/NCHAN, tol=1e-9) + np.testing.assert_allclose(np.average(data,axis=1), np.average(dmdl,axis=1), atol=1e-3) + np.testing.assert_allclose(np.average(dres,axis=1), 0, atol=1e-3) + #check that skip_wgt is properly passed to clean + wgts[0,:72] = 0. + dmdl, dres, info = dspec.delay_filter(data, wgts, 0., .1/NCHAN, tol=TOL, skip_wgt=0.5, mode='clean') + assert info['status']['axis_1'][0] == 'skipped' + assert info['status']['axis_1'][1] == 'success' + +def test_fourier_model(): + NMAX = 7 + NFREQS = 100 + nmodes = 2*NMAX + 1 + cn = (np.arange(nmodes) + 1.j*np.arange(nmodes)) / float(nmodes) + model = dspec.fourier_model(cn, NFREQS) + + # Test shape of output model + assert (NFREQS,) == model.shape + + # Test errors + pytest.raises(ValueError, dspec.fourier_model, 3, NFREQS) + pytest.raises(ValueError, dspec.fourier_model, np.empty((3, 3)), NFREQS) + +def test_delay_filter_leastsq(): + NCHAN = 128 + NTIMES = 10 + TOL = 1e-7 + data = np.ones((NTIMES, NCHAN), dtype=np.complex) + flags = np.zeros((NTIMES, NCHAN), dtype=np.bool) + sigma = 0.1 # Noise level (not important here) + + # Fourier coeffs for input data, ordered from (-nmax, nmax) + cn = np.array([-0.1-0.1j, -0.1+0.1j, -0.3-0.01j, 0.5+0.01j, + -0.3-0.01j, -0.1+0.1j, 0.1-0.1j]) + data *= np.atleast_2d( dspec.fourier_model(cn, NCHAN) ) + + # Estimate smooth Fourier model on unflagged data + bf_model, cn_out, data_out = dspec.delay_filter_leastsq(data, flags, + sigma, nmax=3, + add_noise=False) + np.testing.assert_allclose(data, bf_model, atol=NCHAN*TOL) + np.testing.assert_allclose(cn, cn_out[0], atol=1e-6) + + # Estimate smooth Fourier model on data with some flags + flags[:,10] = True + flags[:,65:70] = True + bf_model, cn_out, data_out = dspec.delay_filter_leastsq(data, flags, + sigma, nmax=3, + add_noise=False) + np.testing.assert_allclose(data, bf_model, atol=NCHAN*TOL) + np.testing.assert_allclose(data, data_out, atol=NCHAN*TOL) + + # Test 1D code directly + bf_model, cn_out, data_out = dspec.delay_filter_leastsq_1d( + data[0], flags[0], sigma, nmax=3, add_noise=False) + np.testing.assert_allclose(data[0], bf_model, atol=NCHAN*TOL) + + # Test 1D code with non-linear leastsq + bf_model, cn_out, data_out = dspec.delay_filter_leastsq_1d( + data[0], flags[0], sigma, nmax=3, add_noise=False, use_linear=False) + np.testing.assert_allclose(data[0], bf_model, atol=NCHAN*TOL) + + # Test that noise injection can be switched on + bf_model, cn_out, data_out = dspec.delay_filter_leastsq_1d( + data[0], flags[0], sigma, nmax=3, add_noise=True) + np.testing.assert_allclose(data[0], bf_model, atol=NCHAN * TOL * sigma) + + # Test with a noise array + sigma_array = sigma * np.ones_like(data[0]) + bf_model, cn_out, data_out = dspec.delay_filter_leastsq_1d( + data[0], flags[0], sigma_array, nmax=3, add_noise=True) + np.testing.assert_allclose(data[0], bf_model, atol=NCHAN * TOL * sigma) + + # Test errors + pytest.raises(ValueError, dspec.delay_filter_leastsq_1d, + data[0], flags[0], sigma, nmax=3, operator=np.empty((3, 3))) + pytest.raises(ValueError, dspec.delay_filter_leastsq_1d, + data[0], flags[0], sigma, nmax=3, cn_guess=np.array([3])) + +def test_skip_wgt(): + NCHAN = 128 + NTIMES = 10 + TOL = 1e-6 + data = np.ones((NTIMES, NCHAN), dtype=np.complex) + wgts = np.ones((NTIMES, NCHAN), dtype=np.complex) + wgts[0, 0:-4] = 0 + dmdl, dres, info = dspec.delay_filter(data, wgts, 0., .1/NCHAN, tol=TOL, skip_wgt=.1) + np.testing.assert_allclose(data[1:,:], dmdl[1:,:], atol=NCHAN*TOL) + np.testing.assert_allclose(dres[1:,:], np.zeros_like(dres)[1:,:], atol=NCHAN*TOL) + np.testing.assert_allclose(dmdl[0,:], np.zeros_like(dmdl[0,:]), atol=NCHAN*TOL) + np.testing.assert_allclose(dres[0,:], (data * wgts)[0,:], atol=NCHAN*TOL) + assert len(info['status']['axis_1']) == NTIMES + assert np.all([info['status']['axis_1'][i] == 'skipped' for i in list(info['status']['axis_1'])[:1]]) + assert not np.any([info['status']['axis_1'][i] == 'skipped' for i in list(info['status']['axis_1'])[1:]]) + +def test_calc_width(): + # test single filter_size + nchan = 100 + dt = 10. + filter_size = 1e-2 + u, l = dspec.calc_width(filter_size, dt, nchan) + frs = np.fft.fftfreq(nchan, dt) # negative b/c of ifft convention + assert np.all(np.abs(frs[u:l]) > filter_size) + + # test multiple entries in filter_size + filter_size = (1e-2, 2e-2) + u, l = dspec.calc_width(filter_size, dt, nchan) + assert np.all((frs[u:l] < -1e-2) | (frs[u:l] > 2e-2)) + +def test_gen_window(): + for w in ['none', 'blackmanharris', 'hann', 'tukey', 'barthann', 'blackmanharris-7term', + 'cosinesum-9term', 'cosinesum-11term']: + win = dspec.gen_window(w, 100) + assert len(win) == 100 + assert isinstance(win, np.ndarray) + assert win.min() >= 0.0 + assert win.max() <= 1.0 + pytest.raises(ValueError, dspec.gen_window, w, 100, normalization='foo') + win2 = dspec.gen_window(w, 100,normalization='mean') + assert np.all(np.isclose(win, win2*np.mean(win),atol=1e-6)) + win3 = dspec.gen_window(w, 100,normalization='rms') + assert np.all(np.isclose(win, win3*np.sqrt(np.mean(win**2.)),atol=1e-6)) + + pytest.raises(ValueError, dspec.gen_window, 'foo', 200) + # check Ncut ValueError + pytest.raises(ValueError, dspec.gen_window, 'bh', 200, edgecut_hi=101, edgecut_low=100) def test_dft_operator(): @@ -1053,7 +1051,6 @@ def test_vis_clean(): assert np.all(np.isclose(res3, res1)) assert np.all(np.isclose(mdl3, mdl1)) - def test__fit_basis_1d(): #perform dpss interpolation, leastsq fs = np.arange(-50,50) @@ -1092,7 +1089,3 @@ def test__fit_basis_1d(): assert np.all(np.isclose(mod3, mod4, atol=1e-2)) assert np.all(np.isclose((mod2+resid2)*wgts, dw, atol=1e-6)) - - -if __name__ == '__main__': - unittest.main() diff --git a/uvtools/tests/test_plot.py b/uvtools/tests/test_plot.py index b3e308d..76109d7 100644 --- a/uvtools/tests/test_plot.py +++ b/uvtools/tests/test_plot.py @@ -2,7 +2,7 @@ import matplotlib import matplotlib.pyplot as plt import unittest -import uvtools as uvt +from .. import plot, utils import numpy as np from itertools import combinations from astropy import units @@ -39,37 +39,36 @@ def axes_contains(ax, obj_list): # Return True if no problems found return True -class TestMethods(unittest.TestCase): def test_data_mode(self): data = np.ones(100) - 1j*np.ones(100) - d = uvt.plot.data_mode(data, mode='abs') - self.assertTrue(np.all(d == np.sqrt(2))) - d = uvt.plot.data_mode(data, mode='log') - self.assertTrue(np.all(d == np.log10(np.sqrt(2)))) - d = uvt.plot.data_mode(data, mode='phs') - self.assertTrue(np.all(d == -np.pi/4)) - d = uvt.plot.data_mode(data, mode='real') - self.assertTrue(np.all(d == 1)) - d = uvt.plot.data_mode(data, mode='imag') - self.assertTrue(np.all(d == -1)) - self.assertRaises(ValueError, uvt.plot.data_mode, data, mode='') + d = plot.data_mode(data, mode='abs') + assert np.all(d == np.sqrt(2)) + d = plot.data_mode(data, mode='log') + assert np.all(d == np.log10(np.sqrt(2))) + d = plot.data_mode(data, mode='phs') + assert np.all(d == -np.pi/4) + d = plot.data_mode(data, mode='real') + assert np.all(d == 1) + d = plot.data_mode(data, mode='imag') + assert np.all(d == -1) + pytest.raises(ValueError, plot.data_mode, data, mode='') def test_waterfall(self): import matplotlib data = np.ones((10,10)) - 1j*np.ones((10,10)) for mode in ('abs','log','phs','real','imag'): - uvt.plot.waterfall(data, mode=mode) + plot.waterfall(data, mode=mode) #matplotlib.pyplot.show() matplotlib.pyplot.clf() def test_plot_antpos(self): antpos = {i: [i,i,0] for i in range(10)} import matplotlib - uvt.plot.plot_antpos(antpos) + plot.plot_antpos(antpos) #matplotlib.pyplot.show() -class TestFancyPlotters(unittest.TestCase): +class TestFancyPlotters(): def setUp(self): import hera_sim sim = hera_sim.Simulator( @@ -83,17 +82,15 @@ def setUp(self): def tearDown(self): pass - def runTest(self): - pass - def test_labeled_waterfall(self): + self.setUp() # Most of this functionality is tested in the next test function, # so this will test some features not exposed in # plot.fourier_transform_waterfalls uvd = self.uvd # Dynamic range setting. - fig, ax = uvt.plot.labeled_waterfall( + fig, ax = plot.labeled_waterfall( uvd, antpairpol=(0,1,"xx"), mode="phs", @@ -104,7 +101,7 @@ def test_labeled_waterfall(self): assert image.cmap.name == "RdBu" assert np.allclose(image.get_clim(), (-1, 1)) - fig, ax = uvt.plot.labeled_waterfall( + fig, ax = plot.labeled_waterfall( uvd, antpairpol=(0,1,"xx"), mode="log", @@ -115,7 +112,7 @@ def test_labeled_waterfall(self): assert image.cmap.name == "inferno" assert np.allclose(image.get_clim(), (-7, -5)) - fig, ax = uvt.plot.labeled_waterfall( + fig, ax = plot.labeled_waterfall( uvd, antpairpol=(0,1,"xx"), mode="log", @@ -124,15 +121,17 @@ def test_labeled_waterfall(self): ) image = ax.get_images()[0] assert np.allclose(image.get_clim(), (-6, -5)) + self.tearDown() def test_fourier_transform_waterfalls(self): + self.setUp() uvd = self.uvd data = uvd.get_data(0,1,'xx') freqs = np.unique(uvd.freq_array) # Hz times = np.unique(uvd.time_array) # JD lsts = np.unique(uvd.lst_array) * 24 / (2 * np.pi) # hours - delays = uvt.utils.fourier_freqs(freqs) * 1e9 # ns - fringe_rates = uvt.utils.fourier_freqs(times * 24 * 3600) * 1e3 # mHz + delays = utils.fourier_freqs(freqs) * 1e9 # ns + fringe_rates = utils.fourier_freqs(times * 24 * 3600) * 1e3 # mHz f1, f2 = freqs[10] / 1e6, freqs[90] / 1e6 lst1, lst2 = lsts[10], lsts[40] @@ -147,7 +146,7 @@ def test_fourier_transform_waterfalls(self): } # Test that it works passing a UVData object. - fig = uvt.plot.fourier_transform_waterfalls( + fig = plot.fourier_transform_waterfalls( data=uvd, antpairpol=(0,1,'xx'), plot_limits=plot_limits, @@ -172,21 +171,21 @@ def test_fourier_transform_waterfalls(self): # Now test with an array. plot_times = times - int(times[0]) # For bound checking later lsts = np.unique(uvd.lst_array) # Ensure they're in radians - fringe_rates = uvt.utils.fourier_freqs(times * units.day.to("s")) # Hz - delays = uvt.utils.fourier_freqs(freqs) # s + fringe_rates = utils.fourier_freqs(times * units.day.to("s")) # Hz + delays = utils.fourier_freqs(freqs) # s - fig = uvt.plot.fourier_transform_waterfalls(data=data, freqs=freqs, lsts=lsts) + fig = plot.fourier_transform_waterfalls(data=data, freqs=freqs, lsts=lsts) axes = fig.get_axes() ylabels = list(ax.get_ylabel() for ax in axes) assert sum("LST" in ylabel for ylabel in ylabels) == 2 - fig = uvt.plot.fourier_transform_waterfalls(data=data, freqs=freqs, times=times) + fig = plot.fourier_transform_waterfalls(data=data, freqs=freqs, times=times) axes = fig.get_axes() ylabels = list(ax.get_ylabel() for ax in axes) assert sum("JD" in ylabel for ylabel in ylabels) == 2 # Check custom data units. - fig = uvt.plot.fourier_transform_waterfalls( + fig = plot.fourier_transform_waterfalls( data=data, freqs=freqs, lsts=lsts, data_units="mK sr" ) axes = fig.get_axes() @@ -207,7 +206,7 @@ def test_fourier_transform_waterfalls(self): frmin, frmax = np.array([fringe_rates.min(), fringe_rates.max()]) # Hz dlymin, dlymax = np.array([delays.min(), delays.max()]) * 1e6 # us - fig = uvt.plot.fourier_transform_waterfalls( + fig = plot.fourier_transform_waterfalls( data=data, freqs=freqs, lsts=lsts, plot_units=plot_units ) axes = fig.get_axes() @@ -225,7 +224,7 @@ def test_fourier_transform_waterfalls(self): assert sum(np.allclose(ylims, (frmax, frmin), rtol=0.01) for ylims in ylimits) == 2 assert sum(np.allclose(ylims, (lstmax, lstmin)) for ylims in ylimits) == 2 - fig = uvt.plot.fourier_transform_waterfalls( + fig = plot.fourier_transform_waterfalls( data=data, freqs=freqs, times=times, plot_units=plot_units ) axes = fig.get_axes() @@ -238,35 +237,35 @@ def test_fourier_transform_waterfalls(self): # Do some exception raising checking. with pytest.raises(ValueError): - uvt.plot.fourier_transform_waterfalls( + plot.fourier_transform_waterfalls( data=uvd, antpairpol=(0,1,'xx'), time_or_lst="nan" ) with pytest.raises(TypeError): - uvt.plot.fourier_transform_waterfalls(data={}) + plot.fourier_transform_waterfalls(data={}) with pytest.raises(TypeError): - uvt.plot.fourier_transform_waterfalls( + plot.fourier_transform_waterfalls( data=data, freqs=freqs, lsts=lsts, plot_units="bad_type" ) with pytest.raises(ValueError): - uvt.plot.fourier_transform_waterfalls(data=np.ones((3,5,2), dtype=np.complex)) + plot.fourier_transform_waterfalls(data=np.ones((3,5,2), dtype=np.complex)) with pytest.raises(ValueError): - uvt.plot.fourier_transform_waterfalls(data=data, freqs=freqs) + plot.fourier_transform_waterfalls(data=data, freqs=freqs) with pytest.raises(ValueError): - uvt.plot.fourier_transform_waterfalls(data=data, times=times) + plot.fourier_transform_waterfalls(data=data, times=times) with pytest.raises(ValueError): - uvt.plot.fourier_transform_waterfalls(data=uvd) + plot.fourier_transform_waterfalls(data=uvd) with pytest.raises(TypeError): - uvt.plot.fourier_transform_waterfalls(data=np.ones((15,20), dtype=np.float)) + plot.fourier_transform_waterfalls(data=np.ones((15,20), dtype=np.float)) + self.tearDown() - -class TestDiffPlotters(unittest.TestCase): +class TestDiffPlotters(): def setUp(self): # make some mock data @@ -369,10 +368,8 @@ def setUp(self): def tearDown(self): pass - def runTest(self): - pass - def test_plot_diff_1d(self): + self.setUp() # list possible plot types and dimensions plot_types = ("normal", "fourier", "both") dimensions = ("time", "freq") @@ -383,13 +380,13 @@ def test_plot_diff_1d(self): Nplots = 6 if plot_type == "both" else 3 elements = [(plt.Subplot, Nplots),] for dimension in dimensions: - fig = uvt.plot.plot_diff_1d( + fig = plot.plot_diff_1d( self.uvd1, self.uvd2, self.antpairpol, plot_type=plot_type, dimension=dimension ) # check the number of plots is correct - self.assertTrue(axes_contains(fig, elements)) + assert axes_contains(fig, elements) # check that the plots are labeled correctly for i, ax in enumerate(fig.axes): @@ -408,35 +405,37 @@ def test_plot_diff_1d(self): dim = "lst" if dim == "time" else dim # make sure that the label is correct - self.assertTrue(xlabel.startswith(dim)) + assert xlabel.startswith(dim) plt.close(fig) # now test the auto-dimension-choosing feature # make just one row of plots - fig = uvt.plot.plot_diff_1d( + fig = plot.plot_diff_1d( self.sim.data, self.sim.data, self.antpairpol, plot_type="normal" ) # make sure that it's plotting in frequency space ax = fig.axes[0] xlabel = ax.get_xlabel().lower() - self.assertTrue(xlabel.startswith('freq')) + assert xlabel.startswith('freq') # check that it works when an axis has length 1 - fig = uvt.plot.plot_diff_1d( + fig = plot.plot_diff_1d( self.uvd_1d_freqs, self.uvd_1d_freqs, self.antpairpol, plot_type="normal" ) + self.tearDown() def test_plot_diff_uv(self): + self.setUp() # plot something - fig = uvt.plot.plot_diff_uv(self.uvd1, self.uvd2) + fig = plot.plot_diff_uv(self.uvd1, self.uvd2) # check for six instances of subplots, one per image and # one per colorbar elements = [(plt.Subplot, 6),] - self.assertTrue(axes_contains(fig, elements)) + assert axes_contains(fig, elements) # now check that we get three images and three colorbars Nimages = 0 @@ -446,18 +445,20 @@ def test_plot_diff_uv(self): cbar = [(matplotlib.collections.QuadMesh, 1),] contains_image = axes_contains(ax, image) contains_cbar = axes_contains(ax, cbar) - self.assertTrue(contains_image or contains_cbar) + assert contains_image or contains_cbar Nimages += int(contains_image) Ncbars += int(contains_cbar) - self.assertTrue(Nimages == 3) - self.assertTrue(Ncbars == 3) + assert Nimages == 3 + assert Ncbars == 3 # close the figure plt.close(fig) + self.tearDown() def test_plot_diff_waterfall(self): + self.setUp() plot_types = ("time_vs_freq", "time_vs_dly", "fr_vs_freq", "fr_vs_dly") # get all combinations @@ -481,12 +482,12 @@ def test_plot_diff_waterfall(self): elements = [(plt.Subplot, Nsubplots),] # actually make the plot - fig = uvt.plot.plot_diff_waterfall(self.uvd1, self.uvd2, + fig = plot.plot_diff_waterfall(self.uvd1, self.uvd2, self.antpairpol, plot_type=plot_type) # check that the correct number of subplots are made - self.assertTrue(axes_contains(fig, elements)) + assert axes_contains(fig, elements) Nimages = 0 Ncbars = 0 @@ -499,37 +500,38 @@ def test_plot_diff_waterfall(self): contains_cbar = axes_contains(ax, cbar) Nimages += int(contains_image) Ncbars += int(contains_cbar) - self.assertTrue(contains_image or contains_cbar) + assert contains_image or contains_cbar # check that the amount of colorbars and images is correct - self.assertTrue(Nimages == Nplots) - self.assertTrue(Ncbars == Nplots) + assert Nimages == Nplots + assert Ncbars == Nplots # now close the figure plt.close(fig) + self.tearDown() def test_plot_diff_waterfall_with_tapers(self): + self.setUp() # since the above test makes sure the figures are correctly configured, # this one will just make sure nothing breaks when a taper is specified - fig = uvt.plot.plot_diff_waterfall( + fig = plot.plot_diff_waterfall( self.uvd1, self.uvd2, self.antpairpol, freq_taper='blackman-harris', time_taper='hann' ) plt.close(fig) + self.tearDown() def test_check_metadata(self): + self.setUp() for attr, value in self.__dict__.items(): if attr.startswith("uvd_1d"): - uvt.utils.check_uvd_pair_metadata(value, value) + utils.check_uvd_pair_metadata(value, value) continue if not attr.startswith("uvd_bad"): continue - pytest.raises(uvt.utils.MetadataError, - uvt.plot.plot_diff_uv, + pytest.raises(utils.MetadataError, + plot.plot_diff_uv, self.uvd1, value, check_metadata=True) - - -if __name__ == '__main__': - unittest.main() + self.tearDown() diff --git a/uvtools/tests/test_utils.py b/uvtools/tests/test_utils.py index 6069eb6..8b58359 100644 --- a/uvtools/tests/test_utils.py +++ b/uvtools/tests/test_utils.py @@ -1,6 +1,6 @@ import pytest -import uvtools as uvt import numpy as np +from .. import utils import glob import os import sys @@ -22,7 +22,7 @@ def test_search_data(): templates = sorted(files + ["zen.inp.{pol}.uv"]) # search data - dfs, dps = uvt.utils.search_data(templates, pols) + dfs, dps = utils.search_data(templates, pols) assert len(dfs) == 2 assert len(dfs[0]) == len(dfs[1]) assert len(dfs[1]) == 2 @@ -33,23 +33,23 @@ def test_search_data(): assert np.all(['.xx.' in df for df in dfs[0]]) # matched pols - dfs, dps = uvt.utils.search_data(templates, pols, matched_pols=True) + dfs, dps = utils.search_data(templates, pols, matched_pols=True) assert len(dfs) == 2 assert len(dfs[0]) == len(dfs[1]) assert len(dfs[0]) == 2 assert np.all(['.xx.' in df for df in dfs[0]]) - dfs, dps = uvt.utils.search_data(files, pols + ['pI'], matched_pols=True) + dfs, dps = utils.search_data(files, pols + ['pI'], matched_pols=True) assert len(dfs) == 0 # reverse nesting - dfs, dps = uvt.utils.search_data(templates, pols, reverse_nesting=True) + dfs, dps = utils.search_data(templates, pols, reverse_nesting=True) assert len(dfs) == 2 assert len(dfs[0]) == len(dfs[1]) assert len(dfs[1]) == 2 assert np.all(['.bar.' in df for df in dfs[0]]) # flatten - dfs, dps = uvt.utils.search_data(templates, pols, flatten=True) + dfs, dps = utils.search_data(templates, pols, flatten=True) assert len(dfs) == 4 assert isinstance(dfs[0], (str, np.str))