From eb4a81ddb1d7f644660857107daf8d28200fab35 Mon Sep 17 00:00:00 2001 From: Joe Ranalli Date: Thu, 3 Oct 2024 16:31:51 -0400 Subject: [PATCH] Initializing tests for cloudfield, and cleaning up. Still need tests for scale_field_lave, and then to define what to do with the data. --- src/solarspatialtools/synthirrad/__init__.py | 0 .../{ => synthirrad}/cloudfield.py | 250 ++++++++++-------- tests/synthirrad/test_cloudfield.py | 215 +++++++++++++++ 3 files changed, 360 insertions(+), 105 deletions(-) create mode 100644 src/solarspatialtools/synthirrad/__init__.py rename src/solarspatialtools/{ => synthirrad}/cloudfield.py (65%) create mode 100644 tests/synthirrad/test_cloudfield.py diff --git a/src/solarspatialtools/synthirrad/__init__.py b/src/solarspatialtools/synthirrad/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/solarspatialtools/cloudfield.py b/src/solarspatialtools/synthirrad/cloudfield.py similarity index 65% rename from src/solarspatialtools/cloudfield.py rename to src/solarspatialtools/synthirrad/cloudfield.py index b323eff..495416f 100644 --- a/src/solarspatialtools/cloudfield.py +++ b/src/solarspatialtools/synthirrad/cloudfield.py @@ -10,20 +10,23 @@ def _random_at_scale(rand_size, final_size, plot=False): """ - Generate a 2D array of random. Generate initially at the size of rand_size - and then linearly interpolate to the size of final_size. + Generate a 2D array of random at an initial size and linearly interpolate up to a larger final size. Parameters ---------- rand_size : tuple - The size of the random array to generate (rows, cols) + The size of the random field final_size : tuple - The size of the final array to return (rows, cols) + The size of the final interpolated field + plot : bool + Whether to plot the results Returns ------- - np.ndarray - A 2D array of random values + random : np.ndarray + The original random field + random_new : np.ndarray + The interpolated random field """ # Generate random values at the scale of rand_size @@ -36,7 +39,7 @@ def _random_at_scale(rand_size, final_size, plot=False): xnew = np.linspace(0, 1, final_size[0]) ynew = np.linspace(0, 1, final_size[1]) - # # New Scipy Method + # # Latest Recommended Scipy Method interp_f = RegularGridInterpolator((x, y), random, method='linear') Xnew, Ynew = np.meshgrid(xnew, ynew, indexing='ij') random_new = interp_f((Xnew, Ynew)) @@ -66,19 +69,24 @@ def _random_at_scale(rand_size, final_size, plot=False): def _calc_vs_weights(scales, vs): """ - Calculate the weight for each scale + Calculate the weights for each scale based on the variability score Parameters ---------- - scales : int - The space scale + scales : np.ndarray + The scales of the wavelets vs : float The variability score Returns ------- - + weights : np.ndarray + The weights for each scale """ + + if vs < 0: + raise ValueError("VS must be greater than 0.") + VS1 = -np.log(1-vs/180)/0.6 weight = scales ** (1/VS1) return weight / np.sum(weight) @@ -86,16 +94,18 @@ def _calc_vs_weights(scales, vs): def _calc_wavelet_weights(waves): """ - Calculate the weights for each wavelet + Calculate the weights for each wavelet scale based on the wavelet values. Parameters ---------- waves : np.ndarray - The wavelet coefficients + The wavelet values. Should be the timeseries for each mode. Consider + generating from `pvlib.scaling._compute_wavelet`. Returns ------- - + weights : np.ndarray + The weights for each wavelet scale """ scales = np.nanmean(waves**2, axis=1) return scales / np.sum(scales) @@ -131,25 +141,49 @@ def space_to_time(pixres=1, cloud_speed=50): # max time = max space / velocity -def stacked_field(vs, size, weights=None, scales=(1, 2, 3, 4, 5, 6, 7), plot=False): +def _stack_random_field(weights, scales, size, normalize=False, plot=False): + """ + Generate a field of random clouds at different scales and weights based on Perlin Noise. + This runs relatively slowly for large arrays. - field = np.zeros(size, dtype=float) + Parameters + ---------- + weights : np.ndarray + The weights for each scale. See _calc_wavelet_weights or _calc_vs_weights. + Should add to unity. + scales : np.ndarray + The scales for each weight. Should be the same length as weights. + size : tuple + The size of the field + normalize : bool + Whether to normalize the field to 0-1 + plot : bool + Whether to plot the field - if weights is None: - weights = _calc_vs_weights(scales, vs) - else: - assert len(weights) == len(scales) + Returns + ------- + field : np.ndarray + The field of random clouds + """ + + if len(weights) != len(scales): + raise ValueError("Number of weights must match scales.") + + + # Calculate a field for each scale, and add them together in a weighted manner to form the field + field = np.zeros(size, dtype=float) for scale, weight in zip(scales, weights): - prop = 2**(-scale+1) # proportion for this scale + prop = 2.0**(-scale+1) # proportion for this scale _, i_field = _random_at_scale((int(size[0]*prop), int(size[1]*prop)), size) field += i_field * weight - # Scale it zero to 1?? - field = (field - np.min(field)) - field = field / np.max(field) - assert np.min(field) == 0 - assert np.max(field) == 1 + # Optionally Scale it zero to 1 + if normalize: + field = (field - np.min(field)) + field = field / np.max(field) + assert np.min(field) == 0 + assert np.max(field) == 1 if plot: # Plot the field @@ -158,120 +192,95 @@ def stacked_field(vs, size, weights=None, scales=(1, 2, 3, 4, 5, 6, 7), plot=Fal return field -def _clip_field(field, clear_frac=0.5, plot=False): +def _calc_clear_mask(field, clear_frac=0.5, plot=False): """ Find the value in the field that will produce an X% clear sky mask. The mask is 1 where clear, and 0 where cloudy. Parameters ---------- - field - clear_frac - plot + field : np.ndarray + The field of clouds + clear_frac : float + The fraction of the field that should be clear + plot : bool + Whether to plot the field Returns ------- - + field_mask : np.ndarray + The clear sky mask. Zero indicates cloudy, one indicates clear """ + if clear_frac > 1 or clear_frac < 0: + raise ValueError("Clear fraction must be between 0 and 1.") + # Zero where clouds, 1 where clear - # clipping needs to be based on pixel fraction, which thus needs to be - # done on quantile because the field has a normal distribution - quant = np.quantile(field, clear_frac) + if clear_frac == 0: + field_mask = np.zeros_like(field) + + else: + # clipping needs to be based on pixel fraction, which thus needs to be + # done on quantile because the field has a normal distribution + quant = np.quantile(field, clear_frac) - # Find that quantile and cap it - field_out = np.ones_like(field) - field_out[field > quant] = 0 + # Find that quantile and cap it + field_mask = np.ones_like(field) + field_mask[field > quant] = 0 # Test to make sure that we're close to matching the desired fraction - assert (np.isclose(clear_frac, np.sum(field_out) / field.size, rtol=1e-3)) + assert (np.isclose(clear_frac, np.sum(field_mask) / field.size, rtol=1e-3)) if plot: - plt.imshow(field_out, extent=(0, field.shape[1], 0, field.shape[0])) + plt.imshow(field_mask, extent=(0, field.shape[1], 0, field.shape[0])) plt.show() - return field_out + return field_mask def _find_edges(base_mask, size, plot=False): """ - Find the edges of the field using a sobel filter and then smooth it with a + Find the edges of a mask using a Sobel filter and then broadens it with a uniform filter. + Parameters ---------- - size - plot + base_mask : np.ndarray + The mask to find the edges of + size : int + The size of the uniform filter (effectively doubles the size of the filter. Returns ------- - + edges : np.ndarray + The edges of the mask + smoothed_binary : np.ndarray + The smoothed binary mask """ # This gets us roughly 50% overlapping with mask and 50% outside - edges = np.abs(sobel(base_mask)) + edges = np.abs(sobel(base_mask,0)**2+sobel(base_mask,1)**2) smoothed = uniform_filter(edges, size=size) # We want to binarize it smoothed[smoothed < 1e-5] = 0 # Zero out the small floating point values # Calculate a threshold based on quantile, because otherwise we get the whole clouds - baseline = np.quantile(smoothed[smoothed>0], 0.5) - smoothed_binary = smoothed > baseline + smoothed_binary = smoothed > 0 if plot: + xsiz, ysiz = base_mask.shape # Compare the edges and uniform filtered edges side by side - fig, axs = plt.subplots(1, 2, figsize=(10, 5)) + fig, axs = plt.subplots(1, 3, figsize=(10, 5)) axs[0].imshow(edges, extent=(0, ysiz, 0, xsiz)) axs[0].set_title('Edges') axs[1].imshow(smoothed_binary, extent=(0, ysiz, 0, xsiz)) axs[1].set_title('Uniform Filtered Edges') + axs[2].imshow(base_mask, extent=(0, ysiz, 0, xsiz)) + axs[2].set_title('Original Mask') plt.show() return edges, smoothed_binary -def shift_mean_lave(field, ktmean, max_overshoot=1.4, ktmin=0.2, min_quant=0.005, max_quant=0.995, plot=True): - # ##### Shift values of kt to range from 0.2 - 1 - - # Calc the "max" and "min", excluding clear values - field_max = np.quantile(field[field < 1], max_quant) - field_min = np.quantile(field[field < 1], min_quant) - - # Scale it between ktmin and max_overshoot - field_out = (field - field_min) / (field_max - field_min) * (1-ktmin) + ktmin - - # # Clip limits to sensible boundaries - field_out[field_out > 1] = 1 - field_out[field_out < 0] = 0 - - # ##### Apply multiplier to shift mean to ktmean ##### - - # Rescale the mean - tgtsum = field_out.size * ktmean # Mean scaled over whole field - diff_sum = tgtsum - np.sum(field_out == 1) # Shifting to exclude fully clear values - tgt_mean = diff_sum / np.sum(field_out < 1) # Recalculating the expected mean of the cloudy-only aareas - current_cloud_mean = np.mean(field_out[field_out < 1]) # Actual cloud mean - - if diff_sum > 0: - field_out[field_out!=1] = tgt_mean / current_cloud_mean * field_out[field_out!=1] - - # print(diff_sum) - # print(current_cloud_mean) - print(f"Desired Mean: {ktmean}, actual global mean {np.mean(field_out)}.") - - - if plot: - plt.hist(field_out[field_out<1].flatten(), bins=100) - plt.show() - - # plot field and field_out side by side - fig, axs = plt.subplots(1, 2, figsize=(10, 5)) - axs[0].imshow(field, extent=(0, ysiz, 0, xsiz)) - axs[0].set_title('Original Field') - axs[1].imshow(field_out, extent=(0, ysiz, 0, xsiz)) - axs[1].set_title('Shifted Field') - plt.show() - return field_out - - -def lave_scaling_exact(field, clear_mask, edge_mask, ktmean, ktmax=1.4, kt1pct=0.2, max_quant=0.99, plot=True): +def scale_field_lave(field, clear_mask, edge_mask, ktmean, ktmax=1.4, kt1pct=0.2, max_quant=0.99, plot=True): # ##### Shift values of kt to range from 0.2 - 1 @@ -334,19 +343,50 @@ def lave_scaling_exact(field, clear_mask, edge_mask, ktmean, ktmax=1.4, kt1pct=0 return clouds5 -def get_settings_from_timeseries(kt_ts, clear_threshold=0.95, plot=True): +def get_timeseries_props(kt_ts, clear_threshold=0.95, plot=True): + """ + Get the properties of a time series of kt values. + + Parameters + ---------- + kt_ts : pandas.Series + The time series of kt values + clear_threshold : float + The threshold in kt for what is considered clear + plot : bool + Whether to plot the wavelets + + Returns + ------- + ktmean : float + The mean of the time series + kt_1_pct : float + The 1st percentile of the time series + ktmax : float + The maximum of the time series + frac_clear : float + The fraction of clear sky + vs : float + The variability score + weights : np.ndarray + The wavelet weights + scales : list + The timescales of the wavelets + """ + import pvlib + # Get the mean and standard deviation of the time series ktmean = np.mean(kt_ts) # represents mean of kt ktstd = np.std(kt_ts) ktmax = np.max(kt_ts) # represents peak cloud enhancement ktmin = np.min(kt_ts) - kt1pct = np.nanquantile(kt_ts, 0.01) # represents "lowest" kt + kt_1_pct = np.nanquantile(kt_ts, 0.01) # represents "lowest" kt # Get the fraction of clear sky with a threshold frac_clear = np.sum(kt_ts > clear_threshold) / kt_ts.size - vs = variability_score(kt) * 1e4 + vs = variability_score(kt_ts) * 1e4 # Compute the wavelet weights # should be the mean(wavelet squared) for all modes except the steady mode @@ -361,10 +401,10 @@ def get_settings_from_timeseries(kt_ts, clear_threshold=0.95, plot=True): plt.show() waves = waves[:-1, :] # remove the steady mode - tmscales = [i+1 for i, _ in enumerate(tmscales[:-1])] + scales = [i+1 for i, _ in enumerate(tmscales[:-1])] weights = _calc_wavelet_weights(waves) - return ktmean, kt1pct, ktmax, frac_clear, vs, weights, tmscales + return ktmean, kt_1_pct, ktmax, frac_clear, vs, weights, scales @@ -374,7 +414,7 @@ def get_settings_from_timeseries(kt_ts, clear_threshold=0.95, plot=True): import pandas as pd import pvlib - datafn = "../../demos/data/hope_melpitz_1s.h5" + datafn = "../../../demos/data/hope_melpitz_1s.h5" twin = pd.date_range('2013-09-08 9:15:00', '2013-09-08 10:15:00', freq='1s', tz='UTC') data = pd.read_hdf(datafn, mode="r", key="data") data = data[40] @@ -397,7 +437,7 @@ def get_settings_from_timeseries(kt_ts, clear_threshold=0.95, plot=True): # plt.hist(kt, bins=100) # plt.show() - ktmean, kt1pct, ktmax, frac_clear, vs, weights, scales = get_settings_from_timeseries(kt, plot=False) + ktmean, kt1pct, ktmax, frac_clear, vs, weights, scales = get_timeseries_props(kt, plot=False) print(f"Clear Fraction is: {frac_clear}") @@ -406,10 +446,10 @@ def get_settings_from_timeseries(kt_ts, clear_threshold=0.95, plot=True): xsiz = 2**12 ysiz = 2**14 - cfield = stacked_field(vs, (xsiz, ysiz), weights, scales) + cfield = _stack_random_field(weights, scales, (xsiz, ysiz)) - clear_mask = stacked_field(vs, (xsiz, ysiz), weights, scales) - clear_mask = _clip_field(clear_mask, frac_clear, plot=False) # 0 is cloudy, 1 is clear + clear_mask = _stack_random_field(weights, scales, (xsiz, ysiz)) + clear_mask = _calc_clear_mask(clear_mask, frac_clear, plot=False) # 0 is cloudy, 1 is clear # Clear Everywhere @@ -423,7 +463,7 @@ def get_settings_from_timeseries(kt_ts, clear_threshold=0.95, plot=True): edges, smoothed = _find_edges(clear_mask, 3, plot=False) # field_final = shift_mean_lave(out_field, ktmean) - field_final = lave_scaling_exact(cfield, clear_mask, smoothed, ktmean, ktmax, kt1pct, plot=False) + field_final = scale_field_lave(cfield, clear_mask, smoothed, ktmean, ktmax, kt1pct, plot=False) plt.plot(field_final[1,:]) plt.show() @@ -433,7 +473,7 @@ def get_settings_from_timeseries(kt_ts, clear_threshold=0.95, plot=True): plt.show() plt.hist(np.diff(kt), bins=50) - plt.hist(np.diff(field_final[1,:]), bins=50, alpha=0.5) + plt.hist(np.diff(field_final[1,:]), bins=200, alpha=0.5) plt.show() # assert np.all(r == rnew) \ No newline at end of file diff --git a/tests/synthirrad/test_cloudfield.py b/tests/synthirrad/test_cloudfield.py new file mode 100644 index 0000000..84c6805 --- /dev/null +++ b/tests/synthirrad/test_cloudfield.py @@ -0,0 +1,215 @@ +import pytest +from pytest import approx +from numpy import testing as npt +import numpy as np + +from solarspatialtools.synthirrad import cloudfield + +import pandas as pd + + +class TestFieldGeneration: + def test_random_at_scale_identity(self): + base, interp = cloudfield._random_at_scale((10,10), (10, 10), False) + assert base.shape == (10, 10) + assert interp.shape == (10, 10) + npt.assert_allclose(base, interp) + + def test_random_at_scale_shape(self): + rand_size = (10, 10) + final_size = (20, 20) + base, interp = cloudfield._random_at_scale(rand_size, final_size) + assert base.shape == rand_size + assert interp.shape == final_size + + def test_random_at_scale_values(self): + rand_size = (10, 10) + final_size = (20, 20) + base, interp = cloudfield._random_at_scale(rand_size, final_size) + assert base.min() >= 0 and base.max() <= 1 + assert interp.min() >= 0 and interp.max() <= 1 + + def test_random_at_scale_interpolation(self): + rand_size = (10, 10) + final_size = (20, 20) + base, interp = cloudfield._random_at_scale(rand_size, final_size) + # Check that the interpolated values are within the range of the original values + assert interp.min() >= base.min() and interp.max() <= base.max() + + def test_stack_random_field_with_weights(self): + size = (100, 100) + scales = np.array([1, 2, 3]) + weights = np.array([0.2, 0.3, 0.5]) + field = cloudfield._stack_random_field(weights, scales, size) + assert field.shape == size, "Field size should match the input size" + assert np.min(field) >= 0 and np.max(field) <= 1, "Field values should be normalized between 0 and 1" + + def test_stack_random_field_weights_mismatch(self): + size = (100, 100) + scales = np.array([1, 2, 3]) + weights = np.array([0.2, 0.3]) + with pytest.raises(ValueError): + cloudfield._stack_random_field(weights, scales, size) + + def test_stack_random_field_plot(self): + size = (100, 100) + scales = np.array([1, 2, 3]) + weights = np.array([0.2, 0.3, 0.5]) + field = cloudfield._stack_random_field(weights, scales, size, normalize=True) + assert field.shape == size, "Field size should match the input size" + assert np.min(field) == 0 and np.max(field) == 1, "Field values should be normalized between 0 and 1" + +class TestFieldProcessing: + def test_calc_clear_mask_basic(self): + field = np.random.rand(100, 100) + clear_frac = 0.5 + mask = cloudfield._calc_clear_mask(field, clear_frac) + assert mask.shape == field.shape, "Mask size should match the input field size" + assert np.isclose(np.mean(mask), clear_frac, rtol=1e-3), "Clear fraction should be close to the specified value" + + def test_calc_clear_mask_all_clear(self): + field = np.random.rand(100, 100) + clear_frac = 1.0 + mask = cloudfield._calc_clear_mask(field, clear_frac) + assert np.all(mask == 1), "All values should be clear (1) when clear_frac is 1.0" + + def test_calc_clear_mask_all_cloudy(self): + field = np.random.rand(100, 100) + clear_frac = 0.0 + mask = cloudfield._calc_clear_mask(field, clear_frac) + assert np.all(mask == 0), "All values should be cloudy (0) when clear_frac is 0.0" + + def test_calc_clear_mask_invalid_clear_frac(self): + field = np.random.rand(100, 100) + with pytest.raises(ValueError): + cloudfield._calc_clear_mask(field, 1.1) + with pytest.raises(ValueError): + cloudfield._calc_clear_mask(field, -0.1) + + def test_find_edges_basic(self): + base_mask = np.random.randint(0, 2, (100, 100)) + size = 3 + edges, smoothed_binary = cloudfield._find_edges(base_mask, size) + assert edges.shape == base_mask.shape, "Edges size should match the input mask size" + assert smoothed_binary.shape == base_mask.shape, "Smoothed binary size should match the input mask size" + + def test_find_edges_all_zeros(self): + base_mask = np.zeros((100, 100)) + size = 3 + edges, smoothed_binary = cloudfield._find_edges(base_mask, size) + assert np.all(edges == 0), "Edges should be all zeros for an all-zero input mask" + assert np.all(smoothed_binary == 0), "Smoothed binary should be all zeros for an all-zero input mask" + + def test_find_edges_all_ones(self): + base_mask = np.ones((100, 100)) + size = 3 + edges, smoothed_binary = cloudfield._find_edges(base_mask, size) + assert np.all(edges == 0), "Edges should be all zeros for an all-one input mask" + assert np.all(smoothed_binary == 0), "Smoothed binary should be all zeros for an all-one input mask" + + def test_find_edges_half(self): + # make a mask half 1 half zero split vertically + base_mask = np.zeros((100, 100)) + base_mask[:50, :] = 1 + size = 3 + edges, smoothed_binary = cloudfield._find_edges(base_mask, size) + assert np.sum(edges > 0) == 200 + assert np.sum(smoothed_binary) == 400 + + def test_find_edges_square(self): + # make a square mask + base_mask = np.zeros((100, 100)) + base_mask[25:70, 25:75] = 1 + size = 3 + edges, smoothed_binary = cloudfield._find_edges(base_mask, size) + assert np.sum(edges > 0) == 380 + assert np.sum(smoothed_binary) == 760 + + +class TestWeights: + """Tests generated with AI assistance""" + + def test_calc_vs_weights_basic(self): + scales = np.array([1, 2, 3]) + vs = 0.5 + weights = cloudfield._calc_vs_weights(scales, vs) + assert np.isclose(np.sum(weights), 1), "Weights should sum to 1" + assert len(weights) == len(scales), "Weights length should match scales length" + + def test_calc_vs_weights_zero_vs(self): + scales = np.array([1, 2, 3]) + vs = 0 + weights = cloudfield._calc_vs_weights(scales, vs) + assert np.isclose(np.sum(weights), 1), "Weights should sum to 1" + assert len(weights) == len(scales), "Weights length should match scales length" + + def test_calc_vs_weights_high_vs(self): + scales = np.array([1, 2, 3]) + vs = 10 + weights = cloudfield._calc_vs_weights(scales, vs) + assert np.isclose(np.sum(weights), 1), "Weights should sum to 1" + assert len(weights) == len(scales), "Weights length should match scales length" + + def test_calc_vs_weights_large_scales(self): + scales = np.array([1, 10, 100]) + vs = 1 + weights = cloudfield._calc_vs_weights(scales, vs) + assert np.isclose(np.sum(weights), 1), "Weights should sum to 1" + assert len(weights) == len(scales), "Weights length should match scales length" + + def test_calc_vs_weights_negative_vs(self): + scales = np.array([1, 2, 3]) + vs = -0.5 + with pytest.raises(ValueError): + cloudfield._calc_vs_weights(scales, vs) + + def test_calc_wavelet_weights_basic(self): + waves = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]) + weights = cloudfield._calc_wavelet_weights(waves) + assert np.isclose(np.sum(weights), 1), "Weights should sum to 1" + assert len(weights) == waves.shape[0], "Weights length should match number of wavelet scales" + + def test_calc_wavelet_weights_zeros(self): + waves = np.zeros((3, 3)) + weights = cloudfield._calc_wavelet_weights(waves) + assert np.all(np.isnan(weights)), "Weights will be nan for zero input" + + def test_calc_wavelet_weights_large_values(self): + waves = np.array([[1e10, 2e10, 3e10], [4e10, 5e10, 6e10], [7e10, 8e10, 9e10]]) + weights = cloudfield._calc_wavelet_weights(waves) + assert np.isclose(np.sum(weights), 1), "Weights should sum to 1" + assert len(weights) == waves.shape[0], "Weights length should match number of wavelet scales" + + def test_calc_wavelet_weights_negative_values(self): + waves = np.array([[-1, -2, -3], [-4, -5, -6], [-7, -8, -9]]) + weights = cloudfield._calc_wavelet_weights(waves) + assert np.isclose(np.sum(weights), 1), "Weights should sum to 1" + assert len(weights) == waves.shape[0], "Weights length should match number of wavelet scales" + + def test_calc_wavelet_weights_mixed_values(self): + waves = np.array([[1, -2, 3], [-4, 5, -6], [7, -8, 9]]) + weights = cloudfield._calc_wavelet_weights(waves) + assert np.isclose(np.sum(weights), 1), "Weights should sum to 1" + assert len(weights) == waves.shape[0], "Weights length should match number of wavelet scales" + + def test_get_timeseries_props_basic(self): + kt_ts = pd.Series(np.random.rand(1000)) + ktmean, kt_1_pct, ktmax, frac_clear, vs, weights, scales = cloudfield.get_timeseries_props(kt_ts, plot=False) + assert isinstance(ktmean, float), "ktmean should be a float" + assert isinstance(kt_1_pct, float), "kt_1_pct should be a float" + assert isinstance(ktmax, float), "ktmax should be a float" + assert isinstance(frac_clear, float), "frac_clear should be a float" + assert isinstance(vs, float), "vs should be a float" + assert isinstance(weights, np.ndarray), "weights should be a numpy array" + assert isinstance(scales, list), "scales should be a list" + + def test_get_timeseries_props_all_clear(self): + kt_ts = pd.Series(np.ones(1000)) + ktmean, kt_1_pct, ktmax, frac_clear, vs, weights, scales = cloudfield.get_timeseries_props(kt_ts, plot=False) + assert frac_clear == 1.0, "frac_clear should be 1.0 for all-clear time series" + + def test_get_timeseries_props_all_cloudy(self): + kt_ts = pd.Series(np.zeros(1000)) + ktmean, kt_1_pct, ktmax, frac_clear, vs, weights, scales = cloudfield.get_timeseries_props(kt_ts, plot=False) + assert frac_clear == 0.0, "frac_clear should be 0.0 for all-cloudy time series" +