diff --git a/genesis/utils/pipeline/data/extraction.py b/genesis/utils/pipeline/data/extraction.py index d6a2e66..b1ea816 100644 --- a/genesis/utils/pipeline/data/extraction.py +++ b/genesis/utils/pipeline/data/extraction.py @@ -36,9 +36,16 @@ class XArrayTarget3DExtraction(XArrayTarget): def open(self, *args, **kwargs): ds = super(XArrayTarget3DExtraction, self).open(*args, **kwargs) - if len(ds.coords) == 0: + if len(ds.coords) == 0 and len(ds.dims) == 0: raise Exception(f"{self.fn} doesn't contain any data") ds = self._ensure_coord_units(ds) + + if isinstance(ds, xr.Dataset) and len(ds.variables) == 0: + raise Exception( + f"Stored 3D file for `{self.path}` is empty, please delete so" + "it can be recreated" + ) + return ds def _ensure_coord_units(self, da): @@ -60,6 +67,8 @@ class ExtractField3D(luigi.Task): base_name = luigi.Parameter() field_name = luigi.Parameter() + # follows filename of uclales-utils given that we put ".tn{tn}" into "var_name + # SINGLE_VAR_FILENAME_FORMAT_3D = "{file_prefix}.{var_name}.tn{tn}.nc" FN_FORMAT = "{experiment_name}.{field_name}.nc" @staticmethod @@ -171,19 +180,7 @@ def output(self): p = get_workdir() / self.base_name / fn - t = XArrayTarget3DExtraction(str(p)) - - if t.exists(): - data = t.open() - if isinstance(data, xr.Dataset): - if len(data.variables) == 0: - warnings.warn( - "Stored file for `{}` is empty, deleting..." - "".format(self.field_name) - ) - p.unlink() - - return t + return XArrayTarget3DExtraction(str(p)) class XArrayTarget2DCrossSection(XArrayTarget): diff --git a/genesis/utils/pipeline/data/tracking_2d/aggregation.py b/genesis/utils/pipeline/data/tracking_2d/aggregation.py index a976603..e5ac99a 100644 --- a/genesis/utils/pipeline/data/tracking_2d/aggregation.py +++ b/genesis/utils/pipeline/data/tracking_2d/aggregation.py @@ -167,6 +167,7 @@ def fn_unique_dropna(v): def run(self): da_labels = self.input()["tracking_labels"].open().fillna(0).astype(int) + op = self.op if self.var_name in ["xt", "yt"]: if self.var_name in "xt": _, da_values = xr.broadcast(da_labels.xt, da_labels.yt) @@ -187,18 +188,19 @@ def run(self): da_values = xr.ones_like(da_labels) * dx**2.0 da_values.attrs["units"] = f"{da_labels.xt.units}^2" da_values.attrs["long_name"] = "area" + op = "sum_labels" else: da_values = self.input()["field"].open() - if self.op == "histogram": + if op == "histogram": da_out = self._aggregate_as_hist( da_values=da_values, da_labels=da_labels, ) name = f"{self.var_name}__hist_{self.dx}" - elif self.op in vars(dmeasure): + elif op in vars(dmeasure): da_out = self._aggregate_generic( - da_values=da_values, da_labels=da_labels, op=self.op + da_values=da_values, da_labels=da_labels, op=op ) name = f"{self.var_name}__{self.op}" else: diff --git a/genesis/utils/pipeline/data_sources/uclales/base.py b/genesis/utils/pipeline/data_sources/uclales/base.py index 266aa41..c36218e 100644 --- a/genesis/utils/pipeline/data_sources/uclales/base.py +++ b/genesis/utils/pipeline/data_sources/uclales/base.py @@ -120,15 +120,18 @@ class RawDataPathDoesNotExist(Exception): pass -def _build_block_extraction_task(dataset_meta, field_name): +def _build_block_extraction_task(dataset_meta, field_name, output_path): raw_data_path = Path(dataset_meta["path"]) / "raw_data" + dest_path = Path(dataset_meta["path"]) / "3d_blocks" / "full_domain" task_kwargs = dict( source_path=raw_data_path, file_prefix=dataset_meta["experiment_name"], tn=dataset_meta["timestep"], kind="3d", + dest_path=dest_path, + fix_units=True, ) if field_name in ["u", "v", "w"]: @@ -205,6 +208,7 @@ def extract_field_to_filename(dataset_meta, path_out, field_name, **kwargs): # task = _build_block_extraction_task( dataset_meta=dataset_meta, field_name=field_name, + output_path=path_out, ) # if the source file doesn't exist we return a task to create # it, next time we pass here the file should exist and we can @@ -212,8 +216,13 @@ def extract_field_to_filename(dataset_meta, path_out, field_name, **kwargs): # if not task.output().exists(): return task - da = task.output().open(decode_times=False) - can_symlink = False + try: + da = task.output().open() + except ValueError: + da = task.output().open(decode_times=False) + can_symlink = False + # set `path_in` so that we ensure we try symlinking the right file + path_in = Path(task.output().path) except RawDataPathDoesNotExist: raise Exception( @@ -310,40 +319,63 @@ def _calc_qv__norain(qt, qc): return qv -@np.vectorize -def _calc_temperature_single(q_l, p, theta_l): +# @numba.jit(numba.float64(numba.float64, numba.float64, numba.float64), nopython=True) +def _calc_theta_l(T, p, q_l): + # constants from UCLALES + L_v = 2.5 * 1.0e6 # [J/kg] + p_theta = 1.0e5 + cp_d = 1.004 * 1.0e3 # [J/kg/K] + R_d = 287.04 # [J/kg/K] + # XXX: this is *not* the *actual* liquid potential temperature (as + # given in B. Steven's notes on moist thermodynamics), but instead + # reflects the form used in UCLALES where in place of the mixture + # heat-capacity the dry-air heat capacity is used + return T * (p_theta / p) ** (R_d / cp_d) * np.exp(-L_v * q_l / (cp_d * T)) + + +def _calc_temperature_single(q_l, p, theta_l, T_rtol=1.0e-6, T_abstol=1.0e-6): # constants from UCLALES cp_d = 1.004 * 1.0e3 # [J/kg/K] R_d = 287.04 # [J/kg/K] - L_v = 2.5 * 1.0e6 # [J/kg] p_theta = 1.0e5 # XXX: this is *not* the *actual* liquid potential temperature (as # given in B. Steven's notes on moist thermodynamics), but instead # reflects the form used in UCLALES where in place of the mixture # heat-capacity the dry-air heat capacity is used - def temp_func(T): - return theta_l - T * (p_theta / p) ** (R_d / cp_d) * np.exp( - -L_v * q_l / (cp_d * T) - ) + def temp_func(T, theta_l, p, q_l): + return theta_l - _calc_theta_l(T=T, p=p, q_l=q_l) - if np.all(q_l == 0.0): + if q_l == 0.0: # no need for root finding return theta_l / ((p_theta / p) ** (R_d / cp_d)) # XXX: brentq solver requires bounds, I don't expect we'll get below -100C T_min = -100.0 + 273.0 T_max = 50.0 + 273.0 - T = scipy.optimize.brentq(f=temp_func, a=T_min, b=T_max) + T = scipy.optimize.brentq(f=temp_func, a=T_min, b=T_max, args=(theta_l, p, q_l), xtol=T_abstol, rtol=T_rtol) # check that we're within 1.0e-4 - assert np.all(np.abs(temp_func(T)) < 1.0e-4) + assert np.all(np.abs(temp_func(T, theta_l, p, q_l)) < 1.0e-4) return T def _calc_temperature(qc, qr, p, theta_l): + if not qc.dims == qr.dims == p.dims == theta_l.dims: + expvars = [qc, qr, p, theta_l] + s = "\n\t".join([f"{v.name}: {list(v.dims)}" for v in expvars]) + raise Exception("Incompatible dims:\n\t" + s) + + raise Exception("Use julia implementation") + """ + q_l = qc + qr + + q_l = q_l.isel(**kws) + p = p.isel(**kws) + theta_l = theta_l.isel(**kws) + arr_temperature = np.vectorize(_calc_temperature_single)( q_l=q_l, p=p, theta_l=theta_l ) @@ -352,6 +384,9 @@ def _calc_temperature(qc, qr, p, theta_l): arr_temperature, dims=p.dims, attrs=dict(longname="temperature", units="K"), + coords=p.coords, + name="abstemp" ) return da_temperature + """