Skip to content

Commit

Permalink
sourcery
Browse files Browse the repository at this point in the history
  • Loading branch information
abhisrkckl committed Jan 21, 2025
1 parent a622afa commit 76d0193
Showing 1 changed file with 48 additions and 69 deletions.
117 changes: 48 additions & 69 deletions src/pint/pint_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,13 @@ def shape(self):
@property
def labels(self):
labels = []
for dim in range(len(self.axis_labels)):
labels.append(self.get_axis_labels(dim))
labels.extend(self.get_axis_labels(dim) for dim in range(len(self.axis_labels)))
return labels

@property
def label_units(self):
units = []
for dim in range(len(self.axis_labels)):
units.append(self.get_axis_labels(dim))
units.extend(self.get_axis_labels(dim) for dim in range(len(self.axis_labels)))
return units

def diag(self, k=0):
Expand Down Expand Up @@ -114,14 +112,12 @@ def get_label_names(self, axis=None):
labels = []
if axis is None:
r = range(len(self.axis_labels))
elif isinstance(axis, int):
return [x[0] for x in self.get_axis_labels(axis)]
else:
if isinstance(axis, int):
return [x[0] for x in self.get_axis_labels(axis)]
else:
# assume it's an iterable (how to check?)
r = axis
for dim in r:
labels.append([x[0] for x in self.get_axis_labels(dim)])
# assume it's an iterable (how to check?)
r = axis
labels.extend([x[0] for x in self.get_axis_labels(dim)] for dim in r)
return labels

def get_unique_label_names(self):
Expand Down Expand Up @@ -156,9 +152,7 @@ def get_label_size(self, label, axis=None):
"""
lb_sizes = []
lbs = self.get_label(label, axis=axis)
for ii, lb in enumerate(lbs):
size = lb[3] - lb[2]
lb_sizes.append((ii, size))
lb_sizes.extend((ii, lb[3] - lb[2]) for ii, lb in enumerate(lbs))
return lb_sizes

def _check_index_overlap(self):
Expand All @@ -167,7 +161,7 @@ def _check_index_overlap(self):
comb = combinations(axis_labels, 2)
for cb in comb:
if cb[0][1][0] <= cb[1][1][1] and cb[1][1][0] <= cb[0][1][1] - 1:
raise ValueError("Label index in dim {} has" " overlap".format(ii))
raise ValueError(f"Label index in dim {ii} has overlap")

def _get_label_start(self, label_entry):
return label_entry[1][0]
Expand Down Expand Up @@ -207,19 +201,17 @@ def get_label(self, label, axis=None):

# We assume the labels are unique in the matrix.
all_label = []
for ii, dim in enumerate(self.axis_labels):
if label in dim.keys():
if axis is None or axis == ii:
all_label.append((label, ii) + dim[label])
if all_label == []:
if axis is None:
raise KeyError("Label {} is not in the matrix".format(label))
else:
raise KeyError(
"Label {} is not in the matrix in axis {}".format(label, axis)
)
else:
all_label.extend(
(label, ii) + dim[label]
for ii, dim in enumerate(self.axis_labels)
if label in dim.keys() and (axis is None or axis == ii)
)
if all_label != []:
return all_label
if axis is None:
raise KeyError(f"Label {label} is not in the matrix")
else:
raise KeyError(f"Label {label} is not in the matrix in axis {axis}")

def get_label_along_axis(self, axis, label_name):
"""Get the request label from on axis.
Expand All @@ -234,9 +226,7 @@ def get_label_along_axis(self, axis, label_name):
if label_name in label_in_one_axis.keys():
return (label_name, axis) + label_in_one_axis[label_name]
else:
raise ValueError(
"Label '{}' is not in the axis {}".format(label_name, axis)
)
raise ValueError(f"Label '{label_name}' is not in the axis {axis}")

def get_label_slice(self, labels):
"""Return the given label slices.
Expand Down Expand Up @@ -362,7 +352,7 @@ def derivative_params(self):


class DesignMatrixMaker:
"""Class for pint design matrix maker class.
"""Class for generating design matrices.
Parameters
----------
Expand Down Expand Up @@ -390,7 +380,7 @@ def __init__(self, derivative_quantity=None, quantity_unit=None):
# The derivative function should be a wrapper function like d_phase_d_param()
if derivative_quantity is None:
raise ValueError("Argument 'derivative_quantity' can not be None.")
self.deriv_func_name = "d_{}_d_param".format(self.derivative_quantity)
self.deriv_func_name = f"d_{self.derivative_quantity}_d_param"

def __call__(
self, data, model, derivative_params, offset=False, offset_padding=0.0
Expand Down Expand Up @@ -420,9 +410,8 @@ def __call__(

params = ["Offset"] if offset else []
params += derivative_params
labels = []
M = np.zeros((len(data), len(params)))
labels.append({self.derivative_quantity: (0, M.shape[0], self.quantity_unit)})
labels = [{self.derivative_quantity: (0, M.shape[0], self.quantity_unit)}]
labels_dim2 = {}
for ii, param in enumerate(params):
if param == "Offset":
Expand Down Expand Up @@ -466,9 +455,8 @@ def __call__(self, data, model, derivative_params, offset=True, offset_padding=1

params = ["Offset"] if offset else []
params += derivative_params
labels = []
M = np.zeros((data.ntoas, len(params)))
labels.append({self.derivative_quantity: (0, M.shape[0], self.quantity_unit)})
labels = [{self.derivative_quantity: (0, M.shape[0], self.quantity_unit)}]
labels_dim2 = {}
delay = model.delay(data)
for ii, param in enumerate(params):
Expand All @@ -489,19 +477,14 @@ def __call__(self, data, model, derivative_params, offset=True, offset_padding=1
labels_dim2[param] = (ii, ii + 1, param_unit)

labels.append(labels_dim2)
mask = []
for ii, param in enumerate(params):
if param == "Offset":
continue
mask.append(ii)
mask = [ii for ii, param in enumerate(params) if param != "Offset"]
M[:, mask] /= model.F0.value
# TODO maybe use defined label is better
labels[0] = {
self.derivative_quantity: (0, M.shape[0], self.quantity_unit * u.s)
}

d_matrix = DesignMatrix(M, labels)
return d_matrix
return DesignMatrix(M, labels)


class TOADesignMatrixMaker(PhaseDesignMatrixMaker):
Expand All @@ -517,10 +500,13 @@ def __init__(self, derivative_quantity, quantity_unit):
self.deriv_func_name = "d_phase_d_param"

def __call__(self, data, model, derivative_params, offset=True, offset_padding=1.0):
d_matrix = super().__call__(
data, model, derivative_params, offset=offset, offset_padding=offset_padding
return super().__call__(
data,
model,
derivative_params,
offset=offset,
offset_padding=offset_padding,
)
return d_matrix


class NoiseDesignMatrixMaker(DesignMatrixMaker):
Expand All @@ -532,13 +518,11 @@ class NoiseDesignMatrixMaker(DesignMatrixMaker):
"""

def __call__(self, data, model):
result = []
if len(model.basis_funcs) == 0:
return None

for nf in model.basis_funcs:
result.append(nf(data)[0])
M = np.hstack([r for r in result])
result = [nf(data)[0] for nf in model.basis_funcs]
M = np.hstack(list(result))
labels = [
{"toa": (0, M.shape[0], u.s)},
{"toa_noise_params": (0, M.shape[1], u.s)},
Expand Down Expand Up @@ -587,8 +571,7 @@ def combine_design_matrices_by_quantity(design_matrices):
off_set = new_labels[-1][1][1]
axis_labels[0].update(dict(new_labels))
all_matrix.append(d_matrix.matrix)
result = DesignMatrix(np.vstack(all_matrix), axis_labels)
return result
return DesignMatrix(np.vstack(all_matrix), axis_labels)


def combine_design_matrices_by_param(matrix1, matrix2, padding=0.0):
Expand Down Expand Up @@ -632,9 +615,7 @@ def combine_design_matrices_by_param(matrix1, matrix2, padding=0.0):

if d_quantity_size != base_size:
raise ValueError(
"Input design matrix's label "
"{} has different size with matrix "
"{}".format(d_quantity, 0)
f"Input design matrix's label {d_quantity} has different size with matrix 0"
)
else:
# assign new index for combined matrix
Expand Down Expand Up @@ -664,7 +645,7 @@ def combine_design_matrices_by_param(matrix1, matrix2, padding=0.0):
new_matrix = np.zeros((base_matrix.shape[0], matrix2.shape[1]))
new_matrix.fill(padding)
# Fill up the new_matrix with matrix2
for quantity, new_idx in new_quantity_index.items():
for new_idx in new_quantity_index.values():
old_idx = matrix2.get_label_along_axis(0, d_quantity)[2:4]
new_matrix[new_idx[0] : new_idx[1], :] = matrix2.matrix[
old_idx[0] : old_idx[1], :
Expand Down Expand Up @@ -751,10 +732,9 @@ def prettyprint(self, prec=3, coordinatefirst=False, offset=False, usecolor=True
elif ("ELONG" in fps) and ("ELAT" in fps):
coordinates = ["ELONG", "ELAT"]
# TODO: allow for other coordinates
if not (
(fps.index(coordinates[0]) == (0 + int(offsetfirst)))
and (fps.index(coordinates[1]) == (1 + int(offsetfirst)))
):
if fps.index(coordinates[0]) != 0 + int(offsetfirst) or fps.index(
coordinates[1]
) != 1 + int(offsetfirst):
if offsetfirst:
neworder = [fps.index("Offset")]
start = 1
Expand Down Expand Up @@ -787,10 +767,10 @@ def prettyprint(self, prec=3, coordinatefirst=False, offset=False, usecolor=True
base = "{0: {width}.{prec}f}"
lens = [max(len(fp) + 2, prec + 4) for fp in fps]
maxlen = max(lens)
sout = "\nParameter {} matrix:\n".format(self.matrix_type)
line = "{0:^{width}}".format("", width=maxlen)
for fp, ln in zip(fps, lens):
line += "{0:^{width}}".format(fp, width=ln)
sout = f"\nParameter {self.matrix_type} matrix:\n"
sout += line + "\n"
for ii, fp1 in enumerate(fps):
line = "{0:^{width}}".format(fp1, width=maxlen)
Expand Down Expand Up @@ -847,7 +827,7 @@ def __init__(self, covariance_quantity, quantity_unit):
self.covariance_quantity = covariance_quantity
self.quantity_unit = quantity_unit
# The derivative function should be a wrapper function like d_phase_d_param()
self.cov_func_name = "{}_covariance_matrix".format(self.covariance_quantity)
self.cov_func_name = f"{self.covariance_quantity}_covariance_matrix"

def __call__(self, data, model):
"""A general method to make design matrix.
Expand Down Expand Up @@ -886,7 +866,7 @@ def combine_covariance_matrix(covariance_matrices, crossterm={}, crossterm_paddi
# Since covariance matrix are symmtric, only use dim1
new_size += cm.shape[0]
cm_labels = cm.get_axis_labels(1)
label_entry = tuple()
label_entry = ()
for cmlb in cm_labels:
label_entry = (
cmlb[0],
Expand All @@ -908,11 +888,10 @@ def combine_covariance_matrix(covariance_matrices, crossterm={}, crossterm_paddi
new_cm[lb1[1][0] : lb1[1][1], lb2[1][0] : lb2[1][1]] = (
covariance_matrices[ii].matrix
)
else:
if crossterm != {}:
cross_m = crossterm.get((lb1, lb2), None)
if cross_m is None:
cross_m = crossterm.get((lb2, lb1), None).T
elif crossterm != {}:
cross_m = crossterm.get((lb1, lb2), None)
if cross_m is None:
cross_m = crossterm.get((lb2, lb1), None).T

new_cm[lb1[1][0] : lb1[1][1], lb2[1][0] : lb2[1][1]] = cross_m
new_cm[lb1[1][0] : lb1[1][1], lb2[1][0] : lb2[1][1]] = cross_m
return CovarianceMatrix(new_cm, [OrderedDict(new_label)] * 2)

0 comments on commit 76d0193

Please sign in to comment.