-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
6f6b0bf
commit 99391fe
Showing
3 changed files
with
195 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
import numpy as np | ||
import reciprocalspaceship as rs | ||
import scipy.optimize as opt | ||
from typing import Literal, overload | ||
|
||
|
||
@overload | ||
def scale_structure_factors( | ||
reference: rs.DataSeries, | ||
dataset_to_scale: rs.DataSeries, | ||
inplace: Literal[True] = True | ||
) -> None: ... | ||
|
||
|
||
@overload | ||
def scale_structure_factors( | ||
reference: rs.DataSeries, | ||
dataset_to_scale: rs.DataSeries, | ||
inplace: Literal[False] = False | ||
) -> rs.DataSeries: ... | ||
|
||
|
||
def scale_structure_factors( | ||
reference: rs.DataSeries, | ||
dataset_to_scale: rs.DataSeries, | ||
inplace: bool = True | ||
) -> None | rs.DataSeries: | ||
""" | ||
Apply an anisotropic scaling so that `dataset_to_scale` is on the same scale as `reference`. | ||
C * exp{ -(h**2 B11 + k**2 B22 + l**2 B33 + | ||
2hk B12 + 2hl B13 + 2kl B23) } | ||
This is the same procedure implemented by CCP4's SCALEIT. | ||
Parameters: | ||
reference (rs.DataSeries): Single-column DataSeries to use as the reference for scaling. | ||
dataset_to_scale (rs.DataSeries): Single-column DataSeries to be scaled. | ||
inplace (bool): If `True`, modifies the original DataSeries. If `False`, returns a new scaled DataSeries. | ||
Returns: | ||
None if `inplace` is True, otherwise rs.DataSeries with scaled data. | ||
""" | ||
|
||
def aniso_scale_func(params, x_ref, x_scale, miller_indices): | ||
h, k, l = miller_indices[:, 0], miller_indices[:, 1], miller_indices[:, 2] # noqa: E741 | ||
h_sq, k_sq, l_sq = np.square(h), np.square(k), np.square(l) | ||
hk_prod, hl_prod, kl_prod = h * k, h * l, k * l | ||
|
||
t = -( | ||
h_sq * params[1] + | ||
k_sq * params[2] + | ||
l_sq * params[3] + | ||
2 * hk_prod * params[4] + | ||
2 * hl_prod * params[5] + | ||
2 * kl_prod * params[6] | ||
) | ||
|
||
return x_ref - params[0] * np.exp(t) * x_scale | ||
|
||
reference_data = reference.to_numpy() | ||
scale_data = dataset_to_scale.to_numpy() | ||
|
||
# Convert the Miller indices to a 2D NumPy array if they are not already | ||
miller_indices_ref = np.array(list(reference.index)) | ||
miller_indices_scale = np.array(list(dataset_to_scale.index)) | ||
|
||
# Ensure that Miller indices match between reference and dataset_to_scale | ||
assert np.array_equal(miller_indices_ref, miller_indices_scale), "Miller indices of reference and dataset_to_scale do not match." # noqa: E501 | ||
|
||
# Initial guess for the parameters: [C, B11, B22, B33, B12, B13, B23] | ||
initial_params = np.array([1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], dtype=np.float32) | ||
|
||
# Optimize parameters to scale dataset_to_scale to reference | ||
result = opt.least_squares( | ||
aniso_scale_func, | ||
initial_params, | ||
args=(reference_data, scale_data, miller_indices_scale) | ||
) | ||
|
||
# Apply the scaling to dataset_to_scale | ||
h, k, l = miller_indices_scale[:, 0], miller_indices_scale[:, 1], miller_indices_scale[:, 2] # noqa: E741 | ||
h_sq, k_sq, l_sq = np.square(h), np.square(k), np.square(l) | ||
hk_prod, hl_prod, kl_prod = h * k, h * l, k * l | ||
|
||
t = -( | ||
h_sq * result.x[1] + | ||
k_sq * result.x[2] + | ||
l_sq * result.x[3] + | ||
2 * hk_prod * result.x[4] + | ||
2 * hl_prod * result.x[5] + | ||
2 * kl_prod * result.x[6] | ||
) | ||
|
||
scaled_data = (result.x[0] * np.exp(t)) * scale_data | ||
|
||
if inplace: | ||
dataset_to_scale[:] = scaled_data | ||
return None | ||
else: | ||
scaled_dataset = dataset_to_scale.copy() | ||
scaled_dataset[:] = scaled_data | ||
return scaled_dataset |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
import numpy as np | ||
import pandas as pd | ||
import pytest | ||
import reciprocalspaceship as rs | ||
|
||
from meteor import map_utils | ||
|
||
|
||
def generate_mock_dataset(miller_indices, data): | ||
return rs.DataSeries( | ||
data, index=pd.MultiIndex.from_tuples(miller_indices, names=["H", "K", "L"]) | ||
) | ||
|
||
|
||
@pytest.fixture | ||
def identical_datasets(): | ||
miller_indices = [(0, 0, 1), (1, 0, 0), (0, 1, 0), (1, 1, 1)] | ||
data = np.array([10.0, 20.0, 30.0, 40.0], dtype=np.float32) | ||
return generate_mock_dataset(miller_indices, data), generate_mock_dataset( | ||
miller_indices, data | ||
) | ||
|
||
|
||
@pytest.fixture | ||
def different_datasets(): | ||
miller_indices = [(0, 0, 1), (1, 0, 0), (0, 1, 0), (1, 1, 1)] | ||
reference_data = np.array([10.0, 20.0, 30.0, 40.0], dtype=np.float32) | ||
scale_data = np.array([15.0, 25.0, 35.0, 45.0], dtype=np.float32) | ||
return generate_mock_dataset(miller_indices, reference_data), generate_mock_dataset( | ||
miller_indices, scale_data | ||
) | ||
|
||
|
||
@pytest.mark.parametrize("inplace", [True, False]) | ||
def test_scale_structure_factors_identical(identical_datasets, inplace): | ||
reference, dataset_to_scale = identical_datasets | ||
original_data = dataset_to_scale.copy() | ||
|
||
result = map_utils.scale_structure_factors( | ||
reference, dataset_to_scale, inplace=inplace | ||
) | ||
|
||
if inplace: | ||
np.testing.assert_array_almost_equal( | ||
dataset_to_scale.to_numpy(), original_data.to_numpy() | ||
) | ||
assert result is None | ||
else: | ||
np.testing.assert_array_almost_equal( | ||
result.to_numpy(), original_data.to_numpy() | ||
) | ||
|
||
|
||
@pytest.mark.parametrize("inplace", [True, False]) | ||
def test_scale_structure_factors_different(different_datasets, inplace): | ||
reference, dataset_to_scale = different_datasets | ||
|
||
if inplace: | ||
original_data = dataset_to_scale.copy() | ||
map_utils.scale_structure_factors( | ||
reference, dataset_to_scale, inplace=inplace | ||
) | ||
assert not np.array_equal( | ||
original_data.to_numpy(), dataset_to_scale.to_numpy() | ||
) | ||
else: | ||
result = map_utils.scale_structure_factors( | ||
reference, dataset_to_scale, inplace=inplace | ||
) | ||
assert result is not None | ||
assert not np.array_equal( | ||
dataset_to_scale.to_numpy(), result.to_numpy() | ||
) | ||
|
||
|
||
def test_miller_indices_mismatch(): | ||
miller_indices_1 = [(0, 0, 1), (1, 0, 0), (0, 1, 0), (1, 1, 1)] | ||
miller_indices_2 = [(0, 0, 1), (1, 0, 0), (0, 1, 1), (1, 1, 1)] | ||
data_1 = np.array([10.0, 20.0, 30.0, 40.0], dtype=np.float32) | ||
data_2 = np.array([15.0, 25.0, 35.0, 45.0], dtype=np.float32) | ||
|
||
reference = generate_mock_dataset(miller_indices_1, data_1) | ||
dataset_to_scale = generate_mock_dataset(miller_indices_2, data_2) | ||
|
||
with pytest.raises( | ||
AssertionError, | ||
match="Miller indices of reference and dataset_to_scale do not match.", | ||
): | ||
map_utils.scale_structure_factors(reference, dataset_to_scale) | ||
|
||
|
||
|