diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..ac74403 --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,32 @@ +name: ruff + +on: [push] + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.11"] + + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + + - name: Lint with Ruff + run: | + pip install ruff + ruff check --output-format=github . + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v3 + with: + file: ./coverage.xml + flags: unittests diff --git a/.github/workflows/mypy.yml b/.github/workflows/mypy.yml new file mode 100644 index 0000000..3a207d7 --- /dev/null +++ b/.github/workflows/mypy.yml @@ -0,0 +1,27 @@ +name: mypy + +on: [push] + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.11"] + + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + + - name: mypy + run: | + pip install mypy + pip install . + mypy --install-types --non-interactive meteor/ test/ diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..87ef7f0 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,33 @@ +name: tests + +on: [push] + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.11"] + + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + + - name: Test with pytest + run: | + pip install pytest pytest-cov + pip install . + pytest test/ + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v3 + with: + file: ./coverage.xml + flags: unittests diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f06d0c1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,163 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class +*.ccp4 + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/latest/usage/project/#working-with-version-control +.pdm.toml +.pdm-python +.pdm-build/ + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..7861ca3 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 Alisia Fadini and T. J. Lane + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/meteor/TV_filtering/__init__.py b/meteor/TV_filtering/__init__.py deleted file mode 100755 index e69de29..0000000 diff --git a/meteor/TV_filtering/iterative_tv.py b/meteor/TV_filtering/iterative_tv.py deleted file mode 100755 index 967a49d..0000000 --- a/meteor/TV_filtering/iterative_tv.py +++ /dev/null @@ -1,403 +0,0 @@ -import argparse -from re import I -from tqdm import tqdm -import matplotlib.pyplot as plt -import numpy as np -import os - -import seaborn as sns - -sns.set_context("notebook", font_scale=1.8) - -from meteor import io -from meteor import dsutils -from meteor import maps -from meteor import tv - -""" - -Iterative TV algorithm to improve phase estimates for low occupancy species. -Writes out a difference map file (MTZ) with improved phases. - -""" - - -def parse_arguments(): - """Parse commandline arguments""" - parser = argparse.ArgumentParser( - formatter_class=argparse.RawTextHelpFormatter, description=__doc__ - ) - - # Required arguments - parser.add_argument( - "-mtz", - "--mtz", - nargs=6, - metavar=("mtz", "F_off", "F_on", "phi_col", "SIGF_off", "SIGF_on"), - required=True, - help=( - "MTZ to be used for initial map. Specified as (filename, F_off, F_on, Phi)" - ), - ) - - parser.add_argument( - "-ref", - "--refpdb", - nargs=1, - metavar=("pdb"), - required=True, - help=( - "PDB to be used as reference ('off') structure. " "Specified as (filename)." - ), - ) - - # Optional arguments - - parser.add_argument( - "-d_h", - "--highres", - type=float, - default=None, - help="If set, high res to truncate maps", - ) - - parser.add_argument( - "-fl", - "--flags", - type=str, - default=None, - help="If set, label for Rfree flags to use as test set", - ) - - parser.add_argument( - "-a", - "--alpha", - type=float, - default=0.0, - help="alpha value for computing difference map weights (default=0.0)", - ) - - parser.add_argument( - "-l", - "--lambda_tv", - type=float, - default=0.015, - help="lambda value for TV denoising weighting (default 0.015)", - ) - - parser.add_argument( - "--plot", - help="--plot for optional plotting and saving, --no-plot to skip", - action=argparse.BooleanOptionalAction, - ) - - return parser.parse_args() - - -def main(): - - # Map writing parameters - map_res = 4 - - # Parse commandline arguments - args = parse_arguments() - - path = os.path.split(args.mtz[0])[0] - name = os.path.split(args.mtz[0])[1].split(".")[0] - cell, space_group = io.get_pdbinfo(args.refpdb[0]) - - print( - "%%%%%%%%%% ANALYZING DATASET : {n} in {p} %%%%%%%%%%%".format(n=name, p=path) - ) - print("CELL : {}".format(cell)) - print("SPACEGROUP : {}".format(space_group)) - - # Apply resolution cut if specified - if args.highres is not None: - high_res = args.highres - else: - high_res = np.min(io.load_mtz(args.mtz[0]).compute_dHKL()["dHKL"]) - - # Read in mtz file - og_mtz = io.load_mtz(args.mtz[0]) - og_mtz = og_mtz.loc[og_mtz.compute_dHKL()["dHKL"] > high_res] - # og_mtz = og_mtz.loc[og_mtz.compute_dHKL()["dHKL"] < 15] - - # for ocp - # calc = io.load_mtz("ecn_dark_SFALL.mtz") - # common = og_mtz.index.intersection(calc.index).sort_values() - - # Use own R-free flags set if specified - if args.flags is not None: - og_mtz = og_mtz.loc[:, [args.mtz[1], args.mtz[2], args.mtz[3], args.flags]] - flags = og_mtz[args.flags] == 0 - - else: - og_mtz = og_mtz.loc[ - :, [args.mtz[1], args.mtz[2], args.mtz[3], args.mtz[4], args.mtz[5]] - ] - flags = np.random.binomial(1, 0.03, og_mtz[args.mtz[1]].shape[0]).astype(bool) - - # scale second dataset ('on') and the first ('off') to FCalcs, and calculate deltaFs (weighted or not) - # og_mtz["FC"] = calc.loc[common, "FC_ALL"] - og_mtz, ws = maps.find_w_diffs( - og_mtz, - args.mtz[2], - args.mtz[1], - args.mtz[5], - args.mtz[4], - args.refpdb[0], - high_res, - path, - args.alpha, - ) - - # in case of calculated structure factors: - # og_mtz["light-phis"] = mtr.load_mtz(args.mtz[0])["light-phis"] - - proj_mags = [] - entropies = [] - phase_changes = [] - cum_phase_changes = [] - - # in case of calculated structure factors: - # ph_err_corrs = [] - - N = 50 - l = args.lambda_tv - - with tqdm(total=N) as pbar: - for i in np.arange(N) + 1: - if i == 1: - new_amps, new_phases, proj_mag, entropy, phase_change, z = ( - tv.TV_iteration( - og_mtz, - "WDF", - args.mtz[3], - "scaled_on", - "scaled_off", - args.mtz[3], - map_res, - cell, - space_group, - flags, - l, - high_res, - ws, - ) - ) - cum_phase_change = np.abs( - np.array( - dsutils.positive_Fs( - og_mtz, args.mtz[3], "WDF", "phases-pos", "diffs-pos" - )["phases-pos"] - - new_phases - ) - ) - # ph_err_corr = np.abs(new_phases - np.array(mtr.positive_Fs(og_mtz, "light-phis", "diffs", "phases-pos", "diffs-pos")["phases-pos"])) - - cum_phase_change = dsutils.adjust_phi_interval(cum_phase_change) - # ph_err_corr = mtr.adjust_phi_interval(ph_err_corr) - - og_mtz["new_amps"] = new_amps - og_mtz["new_amps"] = og_mtz["new_amps"].astype("SFAmplitude") - og_mtz["new_phases"] = new_phases - og_mtz["new_phases"] = og_mtz["new_phases"].astype("Phase") - og_mtz.write_mtz("{name}_TVit{i}_{l}.mtz".format(name=name, i=i, l=l)) - - else: - new_amps, new_phases, proj_mag, entropy, phase_change, z = ( - tv.TV_iteration( - og_mtz, - "new_amps", - "new_phases", - "scaled_on", - "scaled_off", - args.mtz[3], - map_res, - cell, - space_group, - flags, - l, - high_res, - ws, - ) - ) - cum_phase_change = np.abs( - np.array( - dsutils.positive_Fs( - og_mtz, args.mtz[3], "WDF", "phases-pos", "diffs-pos" - )["phases-pos"] - - new_phases - ) - ) - # ph_err_corr = np.abs(new_phases - np.array(mtr.positive_Fs(og_mtz, "light-phis", "diffs", "phases-pos", "diffs-pos")["phases-pos"])) - - cum_phase_change = dsutils.adjust_phi_interval(cum_phase_change) - - og_mtz["new_amps"] = new_amps - og_mtz["new_amps"] = og_mtz["new_amps"].astype("SFAmplitude") - og_mtz["new_phases"] = new_phases - og_mtz["new_phases"] = og_mtz["new_phases"].astype("Phase") - - og_mtz["new-light-phi"] = z - og_mtz["new-light-phi"] = og_mtz["new-light-phi"].astype("Phase") - - og_mtz.write_mtz("{name}_TVit{i}_{l}.mtz".format(name=name, i=i, l=l)) - - # Track projection magnitude and phase change for each iteration - proj_mags.append(proj_mag) - entropies.append(entropy) - phase_changes.append(phase_change) - cum_phase_changes.append(cum_phase_change) - # ph_err_corrs.append(ph_err_corr) - pbar.update() - - # np.save("{name}_TVit{i}_{l}-Z-mags.npy".format(name=name, i=i, l=l), z) - print("FINAL ENTROPY VALUE ", entropies[-1]) - np.save("{name}_TVit{i}_{l}-entropies.npy".format(name=name, i=i, l=l), entropies) - - print("SAVING FINAL MAP") - - og_mtz["new_amps"] = new_amps * ws - - finmap = dsutils.map_from_Fs(og_mtz, "new_amps", "new_phases", map_res) - - from meteor import validate - - fin_map_afterkweight = validate.negentropy(np.array(finmap.grid).flatten()) - print("FINAL ENTROPY AFTER K WEIGHTING ", fin_map_afterkweight) - finmap.write_ccp4_map("{name}_TVit{i}_{l}.ccp4".format(name=name, i=i, l=l)) - fin_TV_map, fin_ent_TV = tv.TV_filter( - finmap, l, finmap.grid.shape, cell, space_group - ) - print("FINAL TV FILTERED ENTROPY VALUE", fin_ent_TV) - mtz_finTV = dsutils.from_gemmi(io.map2mtz(fin_TV_map, high_res)) - mtz_finTV.write_mtz( - "{name}_TVit{i}_{l}_finalTVfiltered.mtz".format(name=name, i=i, l=l) - ) - - # Optionally plot result for errors and negentropy - if args.plot is True: - - fig, ax1 = plt.subplots(figsize=(10, 4)) - - color = "black" - ax1.set_title("$\lambda$ = {}".format(l)) - ax1.set_xlabel(r"Iteration") - ax1.set_ylabel(r"TV Projection Magnitude (TV$_\mathrm{proj}$)", color=color) - ax1.plot( - np.arange(N - 1), - np.array( - np.mean(np.array(proj_mags), axis=1) - / np.max(np.mean(np.array(proj_mags), axis=1)) - )[1:], - color=color, - linewidth=5, - ) - ax1.tick_params(axis="y", labelcolor=color) - - ax2 = ax1.twinx() - - color = "silver" - ax2.set_ylabel("Negentropy", color="grey") - ax2.plot(np.arange(N - 1), entropies[1:], color=color, linewidth=5) - ax2.tick_params(axis="y", labelcolor="grey") - # ax2.set_xlim(0,0.11) - plt.tight_layout() - fig.savefig("{p}/{n}non-cum-error-TV.png".format(p=path, n=name)) - # np.save("/Users/alisia/Desktop/negentropy.npy", entropies[1:]) - - fig, ax1 = plt.subplots(figsize=(10, 4)) - - color = "tomato" - ax1.set_title("$\lambda$ = {}".format(l)) - ax1.set_xlabel(r"Iteration") - ax1.set_ylabel( - " Iteration < $\phi_\mathrm{c}$ - $\phi_\mathrm{TV}$ > ($^\circ$)" - ) - ax1.plot(np.arange(N), phase_changes, color=color, linewidth=5) - ax1.tick_params(axis="y") - fig.set_tight_layout(True) - fig.savefig("{p}/{n}non-cum-phi-change.png".format(p=path, n=name)) - # np.save("/Users/alisia/Desktop/non-cum-phi-change.npy", phase_changes) - - fig, ax = plt.subplots(figsize=(10, 5)) - ax.set_title("$\lambda$ = {}".format(l)) - ax.set_xlabel(r"Iteration") - ax.set_ylabel( - r"Cumulative < $\phi_\mathrm{c}$ - $\phi_\mathrm{TV}$ > ($^\circ$)" - ) - ax.plot( - np.arange(N), - np.mean(cum_phase_changes, axis=1), - color="orangered", - linewidth=5, - ) - # ax.plot(np.arange(N), np.mean(ph_err_corrs, axis=1), color='orangered', linewidth=5, linestyle='--') - plt.tight_layout() - fig.savefig("{p}/{n}cum-phi-change.png".format(p=path, n=name)) - # np.save( - # "/Users/alisia/Desktop/cum-phi-change.npy", - # np.mean(cum_phase_changes, axis=1), - # ) - - fig, ax = plt.subplots(figsize=(6, 5)) - ax.set_title("$\lambda$ = {}".format(l)) - ax.set_xlabel(r"1/dHKL (${\AA}^{-1}$)") - ax.set_ylabel(r"TV Projection Magnitude (TV$_\mathrm{proj}$)") - ax.scatter( - 1 / og_mtz.compute_dHKL()["dHKL"][flags], - proj_mags[N - 1], - color="black", - alpha=0.5, - ) - - res_mean, data_mean = dsutils.resolution_shells( - proj_mags[N - 1], 1 / og_mtz.compute_dHKL()["dHKL"][flags], 15 - ) - ax.plot(res_mean, data_mean, linewidth=3, linestyle="--", color="orangered") - plt.tight_layout() - fig.savefig("{p}/{n}tv-err-dhkl.png".format(p=path, n=name)) - - fig, ax = plt.subplots(figsize=(6, 5)) - ax.set_title("$\lambda$ = {}".format(l)) - ax.set_xlabel(r"1/dHKL (${\AA}^{-1}$)") - ax.set_ylabel( - r"Cumulative < $\phi_\mathrm{c}$ - $\phi_\mathrm{TV}$ > ($^\circ$)" - ) - ax.scatter( - 1 / og_mtz.compute_dHKL()["dHKL"], - np.abs(cum_phase_changes[N - 1]), - color="orangered", - alpha=0.05, - ) - # ax.scatter(1/og_mtz.compute_dHKL()["dHKL"], np.abs(ph_err_corrs[N-1]), color='blue', alpha=0.5) - - res_mean, data_mean = dsutils.resolution_shells( - np.abs(cum_phase_changes[N - 1]), 1 / og_mtz.compute_dHKL()["dHKL"], 15 - ) - ax.plot(res_mean, data_mean, linewidth=3, linestyle="--", color="black") - plt.tight_layout() - fig.savefig("{p}/{n}cum-phase-change-dhkl.png".format(p=path, n=name)) - - fig, ax = plt.subplots(figsize=(6, 5)) - ax.set_title("$\lambda$ = {}".format(l)) - ax.set_xlabel(r"|Fobs|") - ax.set_ylabel( - r"Cumulative < $\phi_\mathrm{c}$ - $\phi_\mathrm{TV}$ > ($^\circ$)" - ) - ax.scatter( - og_mtz["WDF"], - np.abs(cum_phase_changes[N - 1]), - color="mediumpurple", - alpha=0.5, - ) - plt.tight_layout() - fig.savefig("{p}/{n}cum-phase-change-Fobs.png".format(p=path, n=name)) - - print("DONE") - - -if __name__ == "__main__": - main() diff --git a/meteor/TV_filtering/tv_denoise_map.py b/meteor/TV_filtering/tv_denoise_map.py deleted file mode 100755 index b90c001..0000000 --- a/meteor/TV_filtering/tv_denoise_map.py +++ /dev/null @@ -1,303 +0,0 @@ -import argparse -import matplotlib.pyplot as plt -import numpy as np -import os - -import seaborn as sns - -sns.set_context("notebook", font_scale=1.8) - -from meteor import io -from meteor import dsutils -from meteor import tv - - -""" - -Apply a total variation (TV) filter to a map. -The level of filtering (determined by the regularization parameter lambda) -is chosen so as to minimize the error between denoised and measured amplitudes or maximize the map negentropy -for a free set ('test' set) of reflections. - -Write two denoised map file (MTZ), one for each optimal parameter, and optionally the plot/save the plot from the lambda determination (PNG). - -""" - - -def parse_arguments(): - """Parse commandline arguments""" - parser = argparse.ArgumentParser( - formatter_class=argparse.RawTextHelpFormatter, description=__doc__ - ) - - # Required arguments - parser.add_argument( - "-mtz", - "--mtz", - nargs=3, - metavar=("mtz", "F_col", "phi_col"), - required=True, - help=("MTZ to be used for initial map. Specified as (filename, F, Phi)"), - ) - - parser.add_argument( - "-ref", - "--refpdb", - nargs=1, - metavar=("pdb"), - required=True, - help=( - "PDB to be used as reference ('off') structure. " "Specified as (filename)." - ), - ) - - # Optional arguments - - parser.add_argument( - "-d_h", - "--highres", - type=float, - default=None, - help="If set, high res to truncate maps", - ) - - parser.add_argument( - "-fl", - "--flags", - type=str, - default=None, - help="If set, label for Rfree flags to use as test set", - ) - - parser.add_argument( - "--plot", - help="--plot for optional plotting and saving, --no-plot to skip", - action=argparse.BooleanOptionalAction, - ) - - return parser.parse_args() - - -def main(): - np.random.seed(42) - - # Map writing parameters - map_res = 4 - - # Parse commandline arguments - args = parse_arguments() - - path = os.path.split(args.mtz[0])[0] - name = os.path.split(args.mtz[0])[1].split(".")[0] - cell, space_group = io.get_pdbinfo(args.refpdb[0]) - - print( - "%%%%%%%%%% ANALYZING DATASET : {n} in {p} %%%%%%%%%%%".format(n=name, p=path) - ) - print("CELL : {}".format(cell)) - print("SPACEGROUP : {}".format(space_group)) - - # Apply resolution cut if specified - if args.highres is not None: - high_res = args.highres - else: - high_res = np.min(io.load_mtz(args.mtz[0]).compute_dHKL()["dHKL"]) - - # Use own R-free flags set if specified - if args.flags is not None: - og_mtz = io.subset_to_FandPhi( - *args.mtz, {args.mtz[1]: "F", args.mtz[2]: "Phi"}, args.flags - ).dropna() - ( - TVmap_best_err, - TVmap_best_entr, - lambda_best_err, - lambda_best_entr, - errors, - entropies, - amp_change, - ph_change, - ) = tv.find_TVmap( - og_mtz, "F", "Phi", name, path, map_res, cell, space_group, flags=args.flags - ) - - else: - # Read in mtz file - og_mtz = io.subset_to_FandPhi( - *args.mtz, {args.mtz[1]: "F", args.mtz[2]: "Phi"} - ).dropna() - - og_mtz = og_mtz.compute_dHKL() - # og_mtz = og_mtz[og_mtz["dHKL"] < 10] - og_mtz = og_mtz.loc[og_mtz.compute_dHKL()["dHKL"] > high_res] - - # Find and save denoised maps that (1) minimizes the map error or (2) maximizes the map negentropy - ( - TVmap_best_err, - TVmap_best_entr, - lambda_best_err, - lambda_best_entr, - errors, - entropies, - amp_change, - ph_change, - ) = tv.find_TVmap(og_mtz, "F", "Phi", name, path, map_res, cell, space_group) - - print("high res", high_res) - - io.map2mtzfile( - TVmap_best_err, - "{n}_TV_{l}_besterror.mtz".format( - n=name, l=np.round(lambda_best_err, decimals=3) - ), - high_res, - ) - io.map2mtzfile( - TVmap_best_entr, - "{n}_TV_{l}_bestentropy.mtz".format( - n=name, l=np.round(lambda_best_entr, decimals=3) - ), - high_res, - ) - - print( - "Writing out TV denoised map with weights={lerr} and {lentr}".format( - lerr=np.round(lambda_best_err, decimals=3), - lentr=np.round(lambda_best_entr, decimals=3), - ) - ) - - # finmap = dsutils.map_from_Fs(TVmap_best_entr, "FWT", "PHWT", 8) - # TVmap_best_entr.write_ccp4_map("{p}{n}_TV_0.67map.ccp4".format(p=path, n=name)) - - # Optionally plot result for errors and negentropy - if args.plot is True: - fig, ax1 = plt.subplots(figsize=(10, 5)) - - color = "black" - ax1.set_xlabel(r"$\lambda$") - ax1.set_ylabel( - r"$\sum (\Delta \mathrm{Fobs}_\mathrm{free} - \Delta \mathrm{Fcalc}_\mathrm{free})^2$", - color=color, - ) - ax1.plot( - np.linspace(1e-8, 0.1, len(errors)), - errors / np.max(errors), - color=color, - linewidth=5, - ) - ax1.tick_params(axis="y", labelcolor=color) - - ax2 = ax1.twinx() - - color = "silver" - ax2.set_ylabel("Negentropy", color="grey") - ax2.plot( - np.linspace(1e-8, 0.1, len(entropies)), entropies, color=color, linewidth=5 - ) - ax2.tick_params(axis="y", labelcolor="grey") - - fig.tight_layout() - fig.savefig("{p}{n}lambda-optimize.png".format(p=path, n=name)) - - print("Saving weight scan plot") - - fig, ax = plt.subplots(figsize=(9, 5)) - ax.set_xlabel(r"$\lambda$") - ax.set_ylabel(r" < | |F$_\mathrm{obs}$| - |F$_\mathrm{TV}$| | > ") - ax.plot( - np.linspace(1e-8, 0.1, len(entropies)), - np.mean(np.abs(amp_change), axis=1), - color="indigo", - linewidth=5, - ) - fig.tight_layout() - fig.savefig("{p}{n}F-change.png".format(p=path, n=name)) - - fig, ax = plt.subplots(figsize=(9, 5)) - ax.set_xlabel(r"$\lambda$") - ax.set_ylabel(r"< | $\phi_\mathrm{obs}$ - $\phi_\mathrm{TV}$ | > ($^\circ$)") - ax.plot( - np.linspace(1e-8, 0.1, len(entropies)), - np.mean(np.abs(ph_change), axis=1), - color="orangered", - linewidth=5, - ) - ax.axhline(y=19.52, linewidth=2, linestyle="--", color="orangered") - fig.tight_layout() - fig.savefig("{p}{n}phi-change.png".format(p=path, n=name)) - - fig, ax = plt.subplots(figsize=(9, 5)) - # ax.scatter(1/og_mtz.compute_dHKL()["dHKL"], np.abs(ph_change[np.argmin(errors)]), label="Best Error Map, mean = {}".format(np.round(np.mean(ph_change[np.argmin(errors)])), decimals=2), color='black', alpha=0.5) - # ax.scatter(1/og_mtz.compute_dHKL()["dHKL"], np.abs(ph_change[np.argmax(entropies)]), label="Best Entropy Map, mean = {}".format(np.round(np.mean(ph_change[np.argmax(entropies)])), decimals=2), color='grey', alpha=0.5) - - res_mean, data_mean = dsutils.resolution_shells( - np.abs(ph_change[np.argmin(errors)]), 1 / og_mtz.compute_dHKL()["dHKL"], 20 - ) - # ax.plot( - # res_mean, - # data_mean, - # linewidth=3, - # linestyle="--", - # color="black", - # label="Best Error", - # ) - res_mean, data_mean = dsutils.resolution_shells( - np.abs(ph_change[np.argmax(entropies)]), - 1 / og_mtz.compute_dHKL()["dHKL"], - 20, - ) - ax.plot( - res_mean, - data_mean, - linewidth=3, - linestyle="--", - color="darkgray", - label="Best Negentropy", - ) - ax.set_ylabel(r"| $\phi_\mathrm{obs}$ - $\phi_\mathrm{TV}$ | ($^\circ$)") - ax.set_xlabel(r"1/dHKL (${\AA}^{-1}$)") - fig.tight_layout() - fig.legend(handlelength=0.6, loc="center right") - fig.savefig("{p}{n}phi-change-dhkl.png".format(p=path, n=name)) - - fig, ax = plt.subplots(figsize=(9, 5)) - # ax.scatter(1/og_mtz.compute_dHKL()["dHKL"], np.abs(amp_change[np.argmin(errors)]), label="Best Error Map, mean = {}".format(np.round(np.mean(amp_change[np.argmin(errors)])), decimals=2), color='black', alpha=0.5) - # ax.scatter(1/og_mtz.compute_dHKL()["dHKL"], np.abs(amp_change[np.argmax(entropies)]), label="Best Entropy Map, mean = {}".format(np.round(np.mean(amp_change[np.argmax(entropies)])), decimals=2), color='darkgray', alpha=0.5) - - res_mean, data_mean = dsutils.resolution_shells( - np.abs(amp_change[np.argmin(errors)]), 1 / og_mtz.compute_dHKL()["dHKL"], 15 - ) - # ax.plot( - # res_mean, - # data_mean, - # linewidth=3, - # linestyle="--", - # color="black", - # label="Best Error", - # ) - res_mean, data_mean = dsutils.resolution_shells( - np.abs(amp_change[np.argmax(entropies)]), - 1 / og_mtz.compute_dHKL()["dHKL"], - 15, - ) - ax.plot( - res_mean, - data_mean, - linewidth=3, - linestyle="--", - color="darkgray", - label="Best Negentropy", - ) - ax.set_ylabel(r"| |F$_\mathrm{obs}$| - |F$_\mathrm{TV}$| |") - ax.set_xlabel(r"1/dHKL (${\AA}^{-1}$)") - fig.tight_layout() - fig.legend(handlelength=0.6, loc="center right") - fig.savefig("{p}{n}amp-change-dhkl.png".format(p=path, n=name)) - - # plt.show() - print("DONE.") - - -if __name__ == "__main__": - main() diff --git a/meteor/background_subtraction/__init__.py b/meteor/background_subtraction/__init__.py deleted file mode 100755 index e69de29..0000000 diff --git a/meteor/background_subtraction/make_Nbgmax_map.py b/meteor/background_subtraction/make_Nbgmax_map.py deleted file mode 100755 index 5096e87..0000000 --- a/meteor/background_subtraction/make_Nbgmax_map.py +++ /dev/null @@ -1,201 +0,0 @@ -import argparse -import numpy as np -import matplotlib.pyplot as plt -import os - -import reciprocalspaceship as rs -import seaborn as sns -from tqdm import tqdm -sns.set_context("notebook", font_scale=1.4) - -from meteor import io -from meteor import dsutils -from meteor import maps -from meteor import validate - - -""" -Make a background subtracted map with an optimal Nbg value (Nbg_max). -A local region of interest for the background subtraction needs to be specified. -The naming convention chosen for inputs is 'on' and 'off', such -that the generated difference map will be |F_on| - Nbg_max x |F_off|. -Phases come from a reference structure ('off' state). - -Write a background subtracted map file (MTZ) and optionally plot/save the plot from the Nbg_max determination (PNG). - -""" - -def parse_arguments(): - """Parse commandline arguments""" - parser = argparse.ArgumentParser( - formatter_class=argparse.RawTextHelpFormatter, description=__doc__ - ) - - # Required arguments - parser.add_argument( - "-on", - "--onmtz", - nargs=3, - metavar=("mtz", "data_col", "sig_col"), - required=True, - help=("MTZ to be used as 'on' data. Specified as (filename, F, SigF)"), - ) - - parser.add_argument( - "-off", - "--offmtz", - nargs=3, - metavar=("mtz", "data_col", "sig_col"), - required=True, - help=("MTZ to be used as 'off' data. Specified as (filename, F, SigF)"), - ) - - parser.add_argument( - "-ref", - "--refpdb", - nargs=1, - metavar=("pdb"), - required=True, - help=( - "PDB to be used as reference ('off') structure. " - "Specified as (filename)." - ), - ) - - parser.add_argument( - "-c", - "--center", - nargs=3, - metavar=("x", "y", "z"), - required=True, - help=( - "XYZ coordinates in PDB for the region of interest. " - "Specified as (x, y, z)." - ), - ) - - # Optional arguments - - parser.add_argument( - "-a", - "--alpha", - type=float, - default=0.0, - help="alpha value for computing difference map weights (default=0.0)", - ) - - parser.add_argument( - "-d_h", - "--highres", - type=float, - default=None, - help="If set, high res to truncate maps", - ) - - parser.add_argument( - "-r", - "--radius", - type=float, - default=5.0, - help="Local region radius for background subtraction", - ) - - parser.add_argument("--plot", help="--plot for optional plotting and saving, --no-plot to skip", action=argparse.BooleanOptionalAction) - - - return parser.parse_args() - -def main(): - - #Map writing parameters - map_res = 4 - - # Parse commandline arguments - args = parse_arguments() - - path = os.path.split(args.onmtz[0])[0] - name = os.path.split(args.onmtz[0])[1].split('.')[0].split('/')[-1] - - onmtz = io.subset_to_FSigF(*args.onmtz, {args.onmtz[1]: "F", args.onmtz[2]: "SIGF"}) - offmtz = io.subset_to_FSigF(*args.offmtz, {args.offmtz[1]: "F", args.offmtz[2]: "SIGF"}) - cell, space_group = io.get_pdbinfo(args.refpdb[0]) - calc = io.get_Fcalcs(args.refpdb[0], np.min(onmtz.compute_dHKL()["dHKL"]), path) - #calc = io.load_mtz("./7ar6_SFALL.mtz") - - # Join all data - alldata = onmtz.merge(offmtz, on=["H", "K", "L"], suffixes=("_on", "_off")).dropna() - common = alldata.index.intersection(calc.index).sort_values() - alldata = alldata.loc[common].compute_dHKL() - alldata["PHIC"] = calc.loc[common, "PHIC"] - alldata["FC"] = calc.loc[common, "FC"] - - #alldata["PHIC"] = calc.loc[common, "PHIC_ALL"] - #alldata["FC"] = calc.loc[common, "FC_ALL"] - - # Scale both to FCalcs and write differences - if args.highres is not None: - alldata = alldata.loc[alldata["dHKL"] > args.highres] - spacing = args.highres / map_res - h_res = args.highres - - # Screen different background subtraction values for maximum correlation difference - h_res = np.min(alldata["dHKL"]) - spacing = h_res / map_res - Nbgs = np.linspace(0,1, 100) - - CC_diffs = [] - CC_locs = [] - CC_globs = [] - - calc_map = dsutils.map_from_Fs(alldata, "FC" , "PHIC", map_res) #map to use as reference state - - for Nbg in tqdm(Nbgs) : - - alldata, weights = maps.find_w_diffs(alldata, "F_on", "F_off", "SIGF_on", "SIGF_off", args.refpdb[0], h_res, path, args.alpha, 1-Nbg) - Nbg_map = dsutils.map_from_Fs(alldata, "WDF", "PHIC", map_res) - - CC_diff, CC_loc, CC_glob = validate.get_corrdiff(Nbg_map, calc_map, np.array(args.center).astype(float), args.radius, args.refpdb[0], cell, spacing) - CC_diffs.append(CC_diff) - CC_locs.append(CC_loc) - CC_globs.append(CC_glob) - alldata.drop(columns=["WDF"]) - - if args.plot is True: - - #Plot and find Nbg that maximizes this difference - fig, ax = plt.subplots(2,1, figsize=(8,5.5), tight_layout=True) - - ax[0].plot(Nbgs, CC_locs, color='mediumaquamarine', label=r'R$_\mathrm{loc}$', linewidth=3) - ax[0].plot(Nbgs, CC_globs, color='lightskyblue', label=r'R$_\mathrm{glob}$', linewidth=3) - - ax[1].plot(Nbgs, CC_diffs, color='silver', linestyle= 'dashed', label=r'R$_\mathrm{glob}$ - R$_\mathrm{loc}$', linewidth=3) - ax[1].vlines(Nbgs[np.argmax(CC_diffs)], 0.22, 0, 'r', linestyle= 'dashed', linewidth=2.5, label='Max={}'.format(np.round(Nbgs[np.argmax(CC_diffs)], decimals=3))) - - #ax[0].set_title('{}'.format(name), fontsize=17) - ax[0].set_title("Reciprocal Space Weighted Amplitude Extrapolation") - #ax[0].set_xlabel('N$_{\mathrm{bg}}$', fontsize=17) - ax[0].set_xlabel(r'$\alpha$') - ax[0].set_ylabel('CC') - #ax[1].set_xlabel('N$_{\mathrm{bg}}$', fontsize=17) - ax[1].set_xlabel(r'$\alpha$') - ax[1].set_ylabel('CC Difference') - ax[0].legend(fontsize=17) - ax[1].legend(fontsize=17) - fig.savefig("{p}{n}_plotCCdiff.png".format(p=path, n=name)) - print("Saved {p}{n}_plotCCdiff.png".format(p=path, n=name)) - - - #Save map with optimal Nbg - - alldata["DF-Nbgmax"] = weights * (alldata["scaled_on"] - (1 - Nbgs[np.argmax(CC_diffs)]) * alldata["scaled_off"]) - #alldata["DF-Nbgmax"] = weights * (alldata["scaled_on"] - Nbgs[94] * alldata["scaled_off"]) - alldata["DF-Nbgmax"] = alldata["DF-Nbgmax"].astype("SFAmplitude") - alldata.write_mtz("{p}{n}_Nbgmax.mtz".format(p=path, n=name)) - print("Wrote {p}{n}_Nbgmax.mtz with Nbg of {N}".format(p=path, n=name, N=np.round(Nbgs[np.argmax(CC_diffs)], decimals=3))) - print("SAVING FINAL MAP") - finmap = dsutils.map_from_Fs(alldata, "DF-Nbgmax", "PHIC", 6) - finmap.write_ccp4_map("{p}{n}_Nbgmax_alpha{a}.ccp4".format(p=path, n=name, a=args.alpha)) - -if __name__ == "__main__": - main() - diff --git a/meteor/background_subtraction/make_diffmap.py b/meteor/background_subtraction/make_diffmap.py deleted file mode 100755 index 8a315af..0000000 --- a/meteor/background_subtraction/make_diffmap.py +++ /dev/null @@ -1,155 +0,0 @@ -import argparse -import numpy as np -import matplotlib.pyplot as plt -import os - -import reciprocalspaceship as rs -import seaborn as sns -from tqdm import tqdm - -sns.set_context("notebook", font_scale=1.4) - -from meteor import io -from meteor import dsutils -from meteor import maps -from meteor import validate - - -""" -Make a background subtracted map with an optimal Nbg value (Nbg_max). -A local region of interest for the background subtraction needs to be specified. -The naming convention chosen for inputs is 'on' and 'off', such -that the generated difference map will be |F_on| - Nbg_max x |F_off|. -Phases come from a reference structure ('off' state). - -Write a background subtracted map file (MTZ) and optionally plot/save the plot from the Nbg_max determination (PNG). - -""" - - -def parse_arguments(): - """Parse commandline arguments""" - parser = argparse.ArgumentParser( - formatter_class=argparse.RawTextHelpFormatter, description=__doc__ - ) - - # Required arguments - parser.add_argument( - "-on", - "--onmtz", - nargs=3, - metavar=("mtz", "data_col", "sig_col"), - required=True, - help=("MTZ to be used as 'on' data. Specified as (filename, F, SigF)"), - ) - - parser.add_argument( - "-off", - "--offmtz", - nargs=3, - metavar=("mtz", "data_col", "sig_col"), - required=True, - help=("MTZ to be used as 'off' data. Specified as (filename, F, SigF)"), - ) - - parser.add_argument( - "-ref", - "--refpdb", - nargs=1, - metavar=("pdb"), - required=True, - help=( - "PDB to be used as reference ('off') structure. " "Specified as (filename)." - ), - ) - - # Optional arguments - - parser.add_argument( - "-a", - "--alpha", - type=float, - default=0.0, - help="alpha value for computing difference map weights (default=0.0)", - ) - - parser.add_argument( - "-d_h", - "--highres", - type=float, - default=None, - help="If set, high res to truncate maps", - ) - - return parser.parse_args() - - -def main(): - # Map writing parameters - map_res = 4 - - # Parse commandline arguments - args = parse_arguments() - - path = os.path.split(args.onmtz[0])[0] - name = os.path.split(args.onmtz[0])[1].split(".")[0].split("/")[-1] - - onmtz = io.subset_to_FSigF(*args.onmtz, {args.onmtz[1]: "F", args.onmtz[2]: "SIGF"}) - offmtz = io.subset_to_FSigF( - *args.offmtz, {args.offmtz[1]: "F", args.offmtz[2]: "SIGF"} - ) - cell, space_group = io.get_pdbinfo(args.refpdb[0]) - # calc = io.get_Fcalcs(args.refpdb[0], np.min(onmtz.compute_dHKL()["dHKL"]), path) - - calc = io.load_mtz("can/dark_files/can_dark_SFALL.mtz") - - # Join all data - alldata = onmtz.merge(offmtz, on=["H", "K", "L"], suffixes=("_on", "_off")).dropna() - common = alldata.index.intersection(calc.index).sort_values() - alldata = alldata.loc[common].compute_dHKL() - alldata["PHIC"] = calc.loc[common, "PHIC_ALL"] - alldata["FC"] = calc.loc[common, "FC_ALL"] - # alldata["PHIC"] = calc.loc[common, "PHIC"] - # alldata["FC"] = calc.loc[common, "FC"] - - # Scale both to FCalcs and write differences - if args.highres is not None: - alldata = alldata.loc[alldata["dHKL"] > args.highres] - h_res = args.highres - - else: - h_res = np.min(alldata["dHKL"]) - - alldata, weights = maps.find_w_diffs( - alldata, - "F_on", - "F_off", - "SIGF_on", - "SIGF_off", - args.refpdb[0], - h_res, - path, - args.alpha, - 1.0, - ) - - # Save map with optimal Nbg - alldata.write_mtz( - "{p}/{n}_diffmap_{alpha:.2f}_{hres:.2f}.mtz".format( - p=path, n=name, alpha=args.alpha, hres=h_res - ) - ) - print( - "Wrote {p}/{n}_diffmap_{alpha:.2f}_{hres:.2f}.mtz".format( - p=path, n=name, alpha=args.alpha, hres=h_res - ) - ) - print("SAVING FINAL MAP") - # finmap = dsutils.map_from_Fs(alldata, "WDF", "PHIC", 6) - # finmap.write_ccp4_map( - # "{p}/{n}_{alpha:.2f}_diffmap.ccp4".format(p=path, n=name, alpha=args.alpha) - # ) - - -if __name__ == "__main__": - main() diff --git a/meteor/background_subtraction/screen_alphas_maps.py b/meteor/background_subtraction/screen_alphas_maps.py deleted file mode 100755 index 54b05fc..0000000 --- a/meteor/background_subtraction/screen_alphas_maps.py +++ /dev/null @@ -1,163 +0,0 @@ -import argparse -import numpy as np -import matplotlib.pyplot as plt -import os - -import reciprocalspaceship as rs -import seaborn as sns -from tqdm import tqdm - -sns.set_context("notebook", font_scale=1.4) - -from meteor import io -from meteor import dsutils -from meteor import maps -from meteor import validate - - -def parse_arguments(): - """Parse commandline arguments""" - parser = argparse.ArgumentParser( - formatter_class=argparse.RawTextHelpFormatter, description=__doc__ - ) - - # Required arguments - parser.add_argument( - "-on", - "--onmtz", - nargs=3, - metavar=("mtz", "data_col", "sig_col"), - required=True, - help=("MTZ to be used as 'on' data. Specified as (filename, F, SigF)"), - ) - - parser.add_argument( - "-off", - "--offmtz", - nargs=3, - metavar=("mtz", "data_col", "sig_col"), - required=True, - help=("MTZ to be used as 'off' data. Specified as (filename, F, SigF)"), - ) - - parser.add_argument( - "-ref", - "--refpdb", - nargs=1, - metavar=("pdb"), - required=True, - help=( - "PDB to be used as reference ('off') structure. " "Specified as (filename)." - ), - ) - - parser.add_argument( - "-c", - "--center", - nargs=3, - metavar=("x", "y", "z"), - required=True, - help=( - "XYZ coordinates in PDB for the region of interest. " - "Specified as (x, y, z)." - ), - ) - - # Optional arguments - - parser.add_argument( - "-d_h", - "--highres", - type=float, - default=None, - help="If set, high res to truncate maps", - ) - - parser.add_argument( - "--plot", - help="--plot for optional plotting and saving, --no-plot to skip", - action=argparse.BooleanOptionalAction, - ) - - return parser.parse_args() - - -def main(): - # Parse commandline arguments - map_res = 4 - - args = parse_arguments() - - path = os.path.split(args.onmtz[0])[0] - name = os.path.split(args.onmtz[0])[1].split(".")[0].split("/")[-1] - cell, space_group = io.get_pdbinfo(args.refpdb[0]) - - onmtz = io.subset_to_FSigF(*args.onmtz, {args.onmtz[1]: "F", args.onmtz[2]: "SIGF"}) - offmtz = io.subset_to_FSigF( - *args.offmtz, {args.offmtz[1]: "F", args.offmtz[2]: "SIGF"} - ) - # calc = io.get_Fcalcs(args.refpdb[0], np.min(onmtz.compute_dHKL()["dHKL"]), path) - calc = io.load_mtz( - "/Users/alisia/Desktop/sept29_files_for_maps/dark_cell83_tjl-r3_SFALL.mtz" - ) - - # Join all data - alldata = onmtz.merge(offmtz, on=["H", "K", "L"], suffixes=("_on", "_off")).dropna() - common = alldata.index.intersection(calc.index).sort_values() - alldata = alldata.loc[common].compute_dHKL() - # alldata["PHIC"] = calc.loc[common, "PHIC"] - # alldata["FC"] = calc.loc[common, "FC"] - alldata["PHIC"] = calc.loc[common, "PHIC_ALL"] - alldata["FC"] = calc.loc[common, "FC_ALL"] - - alldata.write_mtz("/Users/alisia/Desktop/1nsalldata.mtz") - - # Scale both to FCalcs and write differences - if args.highres is not None: - alldata = alldata.loc[alldata["dHKL"] > args.highres] - h_res = args.highres - - # Screen different background subtraction values for maximum correlation difference - h_res = np.min(alldata["dHKL"]) - - best_alpha_map, errors, entropies = maps.screen_alpha_weight( - alldata, - "F_on", - "F_off", - "SIGF_on", - "SIGF_off", - "PHIC", - args.refpdb[0], - path, - name, - map_res, - np.array(args.center).astype("float"), - hres=h_res, - ) - best_alpha_map.write_mtz("/Users/alisia/Desktop/test-bestalpha.mtz") - - if args.plot is True: - fig, ax1 = plt.subplots(figsize=(10, 5)) - - color = "black" - ax1.set_xlabel(r"$\lambda$") - ax1.set_ylabel( - r"$\sum (\Delta \mathrm{Fobs}_\mathrm{free} - \Delta \mathrm{Fcalc}_\mathrm{free})^2$", - color=color, - ) - ax1.plot(np.linspace(0, 1, len(errors)), errors, color=color, linewidth=5) - ax1.tick_params(axis="y", labelcolor=color) - - ax2 = ax1.twinx() - color = "silver" - ax2.set_ylabel("Negentropy", color=color) - ax2.plot(np.linspace(0, 1, len(entropies)), entropies, color=color, linewidth=5) - ax2.tick_params(axis="y", labelcolor=color) - - fig.tight_layout() - # plt.show() - fig.savefig("{p}{n}alpha-optimize.png".format(p=path, n=name)) - - -if __name__ == "__main__": - main() diff --git a/meteor/dsutils.py b/meteor/dsutils.py deleted file mode 100644 index a5d4c20..0000000 --- a/meteor/dsutils.py +++ /dev/null @@ -1,150 +0,0 @@ - -import numpy as np -import gemmi as gm -import reciprocalspaceship as rs -from scipy.stats import binned_statistic - - -def res_cutoff(df, h_res, l_res) : - """ - Apply specified low and high resolution cutoffs to rs.Dataset. - """ - df = df.loc[(df['dHKL'] >= h_res) & (df['dHKL'] <= l_res)] - return df - - -def resolution_shells(data, dhkl, n): - - """ Average data in n resolution shells """ - - mean_data = binned_statistic(dhkl, data, statistic='mean', bins=n, range=(np.min(dhkl), np.max(dhkl))) - bin_centers = (mean_data.bin_edges[:-1] + mean_data.bin_edges[1:]) / 2 - - return bin_centers, mean_data.statistic - - -def adjust_phi_interval(phi): - - """ Given a set of phases, return the equivalent in -180 <= phi <= 180 interval""" - - phi = phi%360 - phi[phi > 180] -=360 - - assert np.min(phi) >= -181 - assert np.max(phi) <= 181 - - return phi - - -def positive_Fs(df, phases, Fs, phases_new, Fs_new): - - """ - Convert between an MTZ format where difference structure factor amplitudes are saved as both positive and negative, to format where they are only positive. - - Parameters : - - df : (rs.Dataset) from MTZ of interest - phases, Fs : (str) labels for phases and amplitudes in original MTZ - phases_new, Fs_new : (str) labels for phases and amplitudes in new MTZ - - - Returns : - - rs.Dataset with new labels - - """ - - new_phis = df[phases].copy(deep=True) - new_Fs = df[Fs].copy(deep=True) - - negs = np.where(df[Fs]<0) - - df[phases] = adjust_phi_interval(df[phases]) - - for i in negs: - new_phis.iloc[i] = df[phases].iloc[i]+180 - new_Fs.iloc[i] = np.abs(new_Fs.iloc[i]) - - new_phis = adjust_phi_interval(new_phis) - - df_new = df.copy(deep=True) - df_new[Fs_new] = new_Fs - df_new[Fs_new] = df_new[Fs_new].astype("SFAmplitude") - df_new[phases_new] = new_phis - df_new[phases_new] = df_new[phases_new].astype("Phase") - - return df_new - - -def map_from_Fs(dataset, Fs, phis, map_res): - - """ - Return a GEMMI CCP4 map object from an rs.Dataset object - - Parameters : - - dataset : rs.Dataset of interest - Fs, phis : (str) and (str) labels for amplitudes and phases to be used - map_res : (float) to determine map spacing resolution - - """ - - mtz = dataset.to_gemmi() - ccp4 = gm.Ccp4Map() - ccp4.grid = mtz.transform_f_phi_to_map('{}'.format(Fs), '{}'.format(phis), sample_rate=map_res) - ccp4.update_ccp4_header(2, True) - - return ccp4 - - -def from_gemmi(gemmi_mtz): - - """ - Construct DataSet from gemmi.Mtz object - - If the gemmi.Mtz object contains an M/ISYM column and contains duplicated - Miller indices, an unmerged DataSet will be constructed. The Miller indices - will be mapped to their observed values, and a partiality flag will be - extracted and stored as a boolean column with the label, ``PARTIAL``. - Otherwise, a merged DataSet will be constructed. - If columns are found with the ``MTZInt`` dtype and are labeled ``PARTIAL`` - or ``CENTRIC``, these will be interpreted as boolean flags used to - label partial or centric reflections, respectively. - - Parameters - ---------- - gemmi_mtz : gemmi.Mtz - gemmi Mtz object - - Returns - ------- - rs.DataSet - """ - - dataset = rs.DataSet(spacegroup=gemmi_mtz.spacegroup, cell=gemmi_mtz.cell) - - # Build up DataSet - for c in gemmi_mtz.columns: - dataset[c.label] = c.array - # Special case for CENTRIC and PARTIAL flags - if c.type == "I" and c.label in ["CENTRIC", "PARTIAL"]: - dataset[c.label] = dataset[c.label].astype(bool) - else: - dataset[c.label] = dataset[c.label].astype(c.type) - dataset.set_index(["H", "K", "L"], inplace=True) - - # Handle unmerged DataSet. Raise ValueError if M/ISYM column is not unique - m_isym = dataset.get_m_isym_keys() - if m_isym and dataset.index.duplicated().any(): - if len(m_isym) == 1: - dataset.merged = False - dataset.hkl_to_observed(m_isym[0], inplace=True) - else: - raise ValueError( - "Only a single M/ISYM column is supported for unmerged data" - ) - else: - dataset.merged = True - - return dataset - diff --git a/meteor/io.py b/meteor/io.py deleted file mode 100644 index e25716a..0000000 --- a/meteor/io.py +++ /dev/null @@ -1,208 +0,0 @@ -import os -import gemmi as gm -import reciprocalspaceship as rs - - -def load_mtz(mtz): - """ - Load mtz file from path (str) and return rs.Dataset object - """ - dataset = rs.read_mtz(mtz) - dataset.compute_dHKL(inplace=True) - - return dataset - - -def subset_to_FSigF(mtzpath, data_col, sig_col, column_names_dict={}): - """ - Utility function for reading MTZ and returning DataSet with F and SigF. - - Parameters - ---------- - mtzpath : str, filename - Path to MTZ file to read - data_col : str, column name - Column name for data column. If Intensity is specified, it will be - French-Wilson'd. - sig_col : str, column name - Column name for sigma column. Must select for a StandardDeviationDtype. - column_names_dict : dictionary - If particular column names are desired for the output, this can be specified - as a dictionary that includes `data_col` and `sig_col` as keys and what - values they should map to. - - Returns - ------- - rs.DataSet - """ - mtz = rs.read_mtz(mtzpath) - - # Check dtypes - if not isinstance( - mtz[data_col].dtype, (rs.StructureFactorAmplitudeDtype, rs.IntensityDtype) - ): - raise ValueError( - f"{data_col} must specify an intensity or |F| column in {mtzpath}" - ) - if not isinstance(mtz[sig_col].dtype, rs.StandardDeviationDtype): - raise ValueError( - f"{sig_col} must specify a standard deviation column in {mtzpath}" - ) - - # Run French-Wilson if intensities are provided - if isinstance(mtz[data_col].dtype, rs.IntensityDtype): - scaled = rs.algorithms.scale_merged_intensities( - mtz, data_col, sig_col, mean_intensity_method="anisotropic" - ) - mtz = scaled.loc[:, ["FW-F", "FW-SIGF"]] - mtz.rename(columns={"FW-F": data_col, "FW-SIGF": sig_col}, inplace=True) - else: - mtz = mtz.loc[:, [data_col, sig_col]] - - mtz.rename(columns=column_names_dict, inplace=True) - return mtz - - -def subset_to_FandPhi(mtzpath, data_col, phi_col, column_names_dict={}, flags_col=None): - """ - Utility function for reading MTZ and returning DataSet with F and Phi. - - Parameters - ---------- - mtzpath : str, filename - Path to MTZ file to read - data_col : str, column name - Column name for data column. - phi_col : str, column name - Column name for phase column. Must select for a PhaseDtype. - column_names_dict : dictionary - If particular column names are desired for the output, this can be specified - as a dictionary that includes `data_col` and `phi_col` as keys and what - values they should map to. - - Returns - ------- - rs.DataSet - - """ - - mtz = rs.read_mtz(mtzpath) - - # Check dtypes - if not isinstance(mtz[data_col].dtype, (rs.StructureFactorAmplitudeDtype)): - raise ValueError(f"{data_col} must specify an |F| column in {mtzpath}") - if not isinstance(mtz[phi_col].dtype, rs.PhaseDtype): - raise ValueError(f"{phi_col} must specify a phase column in {mtzpath}") - - if flags_col is not None: - mtz = mtz.loc[:, [data_col, phi_col, flags_col]] - else: - mtz = mtz.loc[:, [data_col, phi_col]] - - mtz.rename(columns=column_names_dict, inplace=True) - return mtz - - -def map2mtzfile(map, mtz_name, high_res): - """ - Write an MTZ file from a GEMMI map object. - """ - - # TODO this seems to be redundant - # combine with map2mtz function - # --> should these functions be methods on the map object? - - sf = gm.transform_map_to_f_phi(map.grid, half_l=False) - data = sf.prepare_asu_data(dmin=high_res - 0.05, with_sys_abs=True) - mtz = gm.Mtz(with_base=True) - mtz.spacegroup = sf.spacegroup - mtz.set_cell_for_all(sf.unit_cell) - mtz.add_dataset("unknown") - mtz.add_column("FWT", "F") - mtz.add_column("PHWT", "P") - mtz.set_data(data) - mtz.switch_to_asu_hkl() - mtz.write_to_file(mtz_name) - - -def map2mtz(map, high_res): - """ - Return an rs.Dataset from a GEMMI map object. - """ - sf = gm.transform_map_to_f_phi(map.grid, half_l=False) - data = sf.prepare_asu_data(dmin=high_res - 0.05, with_sys_abs=True) - mtz = gm.Mtz(with_base=True) - mtz.spacegroup = sf.spacegroup - mtz.set_cell_for_all(sf.unit_cell) - mtz.add_dataset("unknown") - mtz.add_column("FWT", "F") - mtz.add_column("PHWT", "P") - mtz.set_data(data) - mtz.switch_to_asu_hkl() - - return mtz - - -def map_from_mtzfile(path, Fs, phis, map_res): - """ - Return a GEMMI CCP4 map object from a specified MTZ file path. - - Parameters : - - path : (str) path to MTZ of interest - Fs, phis : (str) and (str) labels for amplitudes and phases to be used - map_res : (float) to determine map spacing resolution - - """ - - mtz = gm.read_mtz_file("{}".format(path)) - ccp4 = gm.Ccp4Map() - ccp4.grid = mtz.transform_f_phi_to_map( - "{}".format(Fs), "{}".format(phis), sample_rate=map_res - ) - ccp4.update_ccp4_header(2, True) - - return ccp4 - - -def get_pdbinfo(pdb): - """ - From a PDB file path (str), return unit cell and space group information. - """ - - # TODO clean up this function and remove dependency on biopython (!) - - # pdb = PandasPdb().read_pdb(pdb) - # text = '\n\n%s\n' % pdb.pdb_text[:] - - with open(pdb, "r") as f: - for line in f: - if line.startswith("CRYST1"): - - split_line = line.strip().split() - unit_cell = [float(i) for i in split_line[1:7]] - space_group = "".join(split_line[7:11]) - - return unit_cell, space_group - - # if here, we did not find the CRYST1 record - raise IOError("could not find CRYST1 record in:", pdb) - - -def get_Fcalcs(pdb, dmin, path): - """ - From a PDB file path (str), calculate structure factors and return as rs.Dataset - """ - - os.system( - "gemmi sfcalc {pdb} --to-mtz={path}/{root}_FCalcs.mtz --dmin={d}".format( - pdb=pdb, d=dmin - 0.05, path=path, root=pdb.split(".")[0].split("/")[-1] - ) - ) - calcs = load_mtz( - "{path}/{root}_FCalcs.mtz".format( - path=path, root=pdb.split(".")[0].split("/")[-1] - ) - ) - - return calcs diff --git a/meteor/maps.py b/meteor/maps.py deleted file mode 100644 index 509ee26..0000000 --- a/meteor/maps.py +++ /dev/null @@ -1,194 +0,0 @@ -import numpy as np -import gemmi as gm -from tqdm import tqdm -from meteor import dsutils, validate, mask -from scipy.stats import kurtosis - -from . import scale -from . import io - - -def make_map(data, grid_size, cell, space_group): - """ - Create a GEMMI map object from data and grid information. - - Parameters : - - data : (numpy array) - grid_size : (list) specifying grid dimensions for the map - cell, space_group : (list) and (str) - - Returns : - - GEMMI CCP4 map object - - """ - og = gm.Ccp4Map() - - og.grid = gm.FloatGrid(data) - og.grid.set_unit_cell( - gm.UnitCell(cell[0], cell[1], cell[2], cell[3], cell[4], cell[5]) - ) - og.grid.set_size(grid_size[0], grid_size[1], grid_size[2]) - og.grid.spacegroup = gm.find_spacegroup_by_name(space_group) - og.grid.symmetrize_max() - og.update_ccp4_header() - - return og - - -def compute_weights(df, sigdf, alpha): - """ - Compute weights for each structure factor based on DeltaF and its uncertainty. - Parameters - ---------- - df : series-like or array-like - Array of DeltaFs (difference structure factor amplitudes) - sigdf : series-like or array-like - Array of SigDeltaFs (uncertainties in difference structure factor amplitudes) - """ - w = 1 + (sigdf**2 / (sigdf**2).mean()) + alpha * (df**2 / (df**2).mean()) - return w**-1 - - -def find_w_diffs(mtz, Fon, Foff, SIGon, SIGoff, pdb, high_res, path, a, Nbg=1.00): - """ - - Calculate weighted difference structure factors from a reference structure and input mtz. - - Parameters : - - 1. MTZ, Fon, Foff, SIGFon, SIGFoff : (rsDataset) with specified structure factor and error labels (str) - 2. pdb : reference pdb file name (str) - 3. highres : high resolution cutoff for map generation (float) - 4. path : path of directory where to store any files (string) - 5. a : alpha weighting parameter q-weighting (float) - 6. Nbg : background subtraction value if making a background subtracted map (float – default=1.00) - - - Returns : - - 1. mtz : (rs-Dataset) of original mtz + added column for weighted differences - 2. ws : weights applied to each structure factor difference (1D array) - - """ - calcs = io.get_Fcalcs(pdb, high_res, path)["FC"] - calcs = calcs[calcs.index.isin(mtz.index)] - # calcs = mtz["FC"] - mtx_on, t_on, scaled_on = scale.scale_aniso( - np.array(calcs), np.array(mtz[Fon]), np.array(list(mtz.index)) - ) - mtx_off, t_off, scaled_off = scale.scale_aniso( - np.array(calcs), np.array(mtz[Foff]), np.array(list(mtz.index)) - ) - - mtz["scaled_on"] = scaled_on - mtz["scaled_off"] = scaled_off - mtz["SIGF_on_s"] = (mtx_on.x[0] * np.exp(t_on)) * mtz[SIGon] - mtz["SIGF_off_s"] = (mtx_off.x[0] * np.exp(t_off)) * mtz[SIGoff] - # mtz = mtz.compute_dHKL() - # qs = 1/(2*mtz['dHKL']) - # c_on, b_on, on_s = scale.scale_iso(np.array(calcs), np.array(mtz[Fon]), np.array(mtz['dHKL'])) - # c_off, b_off, off_s = scale.scale_iso(np.array(calcs), np.array(mtz[Foff]), np.array(mtz['dHKL'])) - - # mtz["scaled_on"] = on_s - # mtz["scaled_off"] = off_s - # mtz["SIGF_on_s"] = (c_on * np.exp(-b_on*(qs**2))) * mtz[SIGon] - # mtz["SIGF_off_s"] = (c_off * np.exp(-b_off*(qs**2))) * mtz[SIGoff] - - sig_diffs = np.sqrt(mtz["SIGF_on_s"] ** 2 + (mtz["SIGF_off_s"]) ** 2) - ws = compute_weights(mtz["scaled_on"] - Nbg * mtz["scaled_off"], sig_diffs, alpha=a) - mtz["DF"] = mtz["scaled_on"] - Nbg * mtz["scaled_off"] - mtz["DF"] = mtz["DF"].astype("SFAmplitude") - mtz["WDF"] = ws * (mtz["scaled_on"] - Nbg * mtz["scaled_off"]) - mtz["WDF"] = mtz["WDF"].astype("SFAmplitude") - mtz.infer_mtz_dtypes(inplace=True) - - return mtz, ws - - -def screen_alpha_weight( - data, - F_on, - F_off, - SIGF_on, - SIGF_off, - philabel, - refpdb, - path, - name, - map_res, - center, - radius=8.0, - hres=None, - percent=0.03, - flags=None, -): - """ - This is a work in progress - - """ - # TO ADD: assertion or test for no negative amplitudes in F_on or F_off - - # if Rfree flags were specified, use these - if flags is not None: - _, _, choose_test = validate.make_test_set( - data, percent, F_off, name, path, flags - ) # choose_test is Boolean array to select free (test) reflections from entire set - - # Keep 3% of reflections for test set - else: - _, _, choose_test = validate.make_test_set(data, percent, F_off, name, path) - - alphas = np.linspace(0, 1.0, 20) - noweight, _ = find_w_diffs( - data, F_on, F_off, SIGF_on, SIGF_off, refpdb, hres, path, 0.0 - ) - noweight_pos = dsutils.positive_Fs( - noweight, philabel, "DF", "ogPhis_pos", "ogFs_pos" - ) - - errors = [] - entropies = [] - datasets = [] - - for a in tqdm(alphas): - diffs, _ = find_w_diffs( - data.copy(deep=True), F_on, F_off, SIGF_on, SIGF_off, refpdb, hres, path, a - ) - - # make DED map, find negentropy - fit_map = dsutils.map_from_Fs(diffs, "WDF", philabel, map_res) - - # mask chromophore region for negentropy - reg_mask = mask.get_mapmask(fit_map.grid, center, radius) - loc_reg = reg_mask.flatten().astype(bool) - - fit_map_arr = np.array(fit_map.grid).flatten()[loc_reg] - # fit_map_arr = np.array(fit_map.grid).flatten() - error = kurtosis(fit_map_arr) - # entropy = differential_entropy(fit_map_arr) - entropy = validate.negentropy(fit_map_arr) - - # backcalculate all Fs - if hres is not None: - diffs_all = dsutils.from_gemmi(io.map2mtz(fit_map, hres)) - else: - diffs_all = dsutils.from_gemmi( - io.map2mtz(fit_map, np.min(noweight.compute_dHKL()["dHKL"])) - ) - - diffs_all = diffs_all[diffs_all.index.isin(noweight_pos.index)] - # error = np.sum(np.array(noweight_pos["ogFs_pos"][np.invert(choose_test)]) - np.array(diffs_all["FWT"][np.invert(choose_test)])) ** 2 - # print(np.array(noweight_pos["ogFs_pos"][np.invert(choose_test)])[0:20], np.array(diffs_all["FWT"][np.invert(choose_test)])[0:20]) - # print(np.array(noweight_pos["ogFs_pos"][choose_test])[0:20], np.array(diffs_all["FWT"][choose_test])[0:20]) - - # error = np.sum(np.array(noweight_pos["ogFs_pos"][np.invert(choose_test)]) - np.array(diffs_all["FWT"][np.invert(choose_test)])) ** 2 - # print(diffs_all[choose_test]) - entropies.append(entropy) - datasets.append(diffs_all) - errors.append(error) - - print("Best Weighting Alpha Found is ", alphas[np.argmin(errors)]) - - return datasets[np.argmin(errors)], errors, entropies diff --git a/meteor/mask.py b/meteor/mask.py deleted file mode 100644 index 80119f2..0000000 --- a/meteor/mask.py +++ /dev/null @@ -1,47 +0,0 @@ - -import numpy as np -import gemmi as gm - -def get_mapmask(grid_in, position, r) : - - """ - Returns mask (numpy array) of a map (GEMMI grid element) : mask radius 'r' (float) with center at 'position' (numpy array) - """ - grid = grid_in.clone() - grid - grid.fill(0) - grid.set_points_around(gm.Position(position[0], position[1], position[2]), radius=r, value=1) - grid.symmetrize_max() - - return np.array(grid, copy=True) - -def solvent_mask(pdb, cell, map_array, spacing) : - - """ - Applies a solvent mask to an electron density map - - Parameters : - - pdb, cell : (str) and (list) PDB file name and unit cell information - map_array : (numpy array) of map to be masked - spacing : (float) spacing to generate solvent mask - - Returns : - - Flattened (1D) numpy array of mask - - """ - - st = gm.read_structure(pdb) - solventmask = gm.FloatGrid() - solventmask.setup_from(st, spacing=spacing) - solventmask.set_unit_cell(gm.UnitCell(cell[0], cell[1], cell[2], cell[3], cell[4], cell[5])) - - masker = gm.SolventMasker(gm.AtomicRadiiSet.Constant, 1.5) - masker.rprobe = 0.9 - masker.rshrink = 1.1 - - masker.put_mask_on_float_grid(solventmask, st[0]) - nosolvent = np.where(np.array(solventmask)==0, map_array, 0) - - return nosolvent.flatten() \ No newline at end of file diff --git a/meteor/scale.py b/meteor/scale.py index b8a7a4c..590cc16 100644 --- a/meteor/scale.py +++ b/meteor/scale.py @@ -1,85 +1,15 @@ +import reciprocalspaceship as rs -import numpy as np -import scipy.optimize as opt -def scale_iso(data1, data2, ds): +def scale_structure_factors(reference: rs.DataSet, dataset_to_scale: rs.DataSet) -> rs.DataSet: + """Apply an anisotropic scaling so that `dataset_to_scale` is on the same scale as `reference`. - """ - Isotropic resolution-dependent scaling of data2 to data1. - (minimize [dataset1 - c*exp(-B*sintheta**2/lambda**2)*dataset2] - - Input : - - 1. dataset1 in form of 1D numpy array - 2. dataset2 in form of 1D numpy array - 3. dHKLs for the datasets in form of 1D numpy array - - Returns : + C * exp{ -(h**2 B11 + k**2 B22 + l**2 B33 + + 2hk B12 + 2hl B13 + 2kl B23) } - 1. entire results from least squares fitting - 2. c (as float) - 3. B (as float) - 2. scaled dataset2 in the form of a 1D numpy array - - """ - - def scale_func(p, x1, x2, qs): - return x1 - (p[0]*np.exp(-p[1]*(qs**2)))*x2 - - p0 = np.array([1.0, -20]) - qs = 1/(2*ds) - matrix = opt.least_squares(scale_func, p0, args=(data1, data2, qs)) - - return matrix.x[0], matrix.x[1], (matrix.x[0]*np.exp(-matrix.x[1]*(qs**2)))*data2 + This is the same procedure implemented by CCP4's SCALEIT. -def scale_aniso(x_dataset, y_dataset, Miller_indx): - - """" - Author: Virginia Apostolopoulou - Anisotropically scales y_dataset to x_dataset given an ndarray of Miller indices. + .. https://www.ccp4.ac.uk/html/scaleit.html """ - - p0 = np.array([1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], dtype=np.float32) - matrix_ani = opt.least_squares(aniso_scale_func, p0, args=(x_dataset, y_dataset, Miller_indx)) - - h = Miller_indx[:,0] - k = Miller_indx[:,1] - l = Miller_indx[:,2] - h_sq = np.square(h) - k_sq = np.square(k) - l_sq = np.square(l) - - hk_prod = h*k - hl_prod = h*l - kl_prod = k*l - - t = - (h_sq * matrix_ani.x[1] + k_sq * matrix_ani.x[2] + l_sq * matrix_ani.x[3] - + 2*hk_prod * matrix_ani.x[4] + 2*hl_prod * matrix_ani.x[5] + 2*kl_prod * matrix_ani.x[6]) - - data_ani_scaled = (matrix_ani.x[0]*np.exp(t))*y_dataset - - return matrix_ani, t, data_ani_scaled - - -def aniso_scale_func(p, x1, x2, H_arr): - - "Author: Virginia Apostolopoulou" - - h = H_arr[:,0] - k = H_arr[:,1] - l = H_arr[:,2] - - h_sq = np.square(h) - k_sq = np.square(k) - l_sq = np.square(l) - - hk_prod = h*k - hl_prod = h*l - kl_prod = k*l - - t = - (h_sq * p[1] + k_sq * p[2] + l_sq * p[3] + - 2*hk_prod * p[4] + 2*hl_prod * p[5] + 2*kl_prod * p[6]) - expnt = np.exp( t ) - r = x1 - p[0] * expnt * x2 - return r \ No newline at end of file + ... diff --git a/meteor/settings.py b/meteor/settings.py new file mode 100644 index 0000000..e7fbdfa --- /dev/null +++ b/meteor/settings.py @@ -0,0 +1,4 @@ +TV_LAMBDA_RANGE: tuple[float, float] = (1e-3, 1.0) +TV_STOP_TOLERANCE: float = 0.00000005 +TV_MAX_NUM_ITER: int = 50 +TV_MAP_SAMPLING: int = 3 diff --git a/meteor/tv.py b/meteor/tv.py index 01fd255..0cac0ed 100644 --- a/meteor/tv.py +++ b/meteor/tv.py @@ -1,317 +1,192 @@ +from dataclasses import dataclass +from typing import Literal, Sequence, overload + import numpy as np -from tqdm import tqdm +import reciprocalspaceship as rs +from scipy.optimize import minimize_scalar from skimage.restoration import denoise_tv_chambolle -from memory_profiler import profile - -# from scipy.stats import kurtosis - -from . import maps -from . import validate -from . import dsutils -from . import io - -""" -def TV_filter(map, l, grid_size, cell, space_group): - - Apply TV filtering to a Gemmi map object. Compute negentropy for denoised array. - - Parameters : - map : (GEMMI map object) - l : (float) lambda – regularization parameter to be used in filtering. - grid_size : (list) specifying grid dimensions for the map - cell, space_group : (list) and (str) - - Returns : - - Denoised map (GEMMI object) and associated negentropy (float) - - - - TV_arr = denoise_tv_chambolle( - np.array(map.grid), eps=0.00000005, weight=l, max_num_iter=50 +from .settings import ( + TV_LAMBDA_RANGE, + TV_MAP_SAMPLING, + TV_MAX_NUM_ITER, + TV_STOP_TOLERANCE, +) +from .utils import ( + compute_coefficients_from_map, + compute_map_from_coefficients, + numpy_array_to_map, + resolution_limits, +) +from .validate import negentropy + + +@dataclass +class TvDenoiseResult: + optimal_lambda: float + optimal_negentropy: float + map_sampling_used_for_tv: float + + +def _tv_denoise_array(*, map_as_array: np.ndarray, weight: float) -> np.ndarray: + """Closure convienence function to generate more readable code.""" + denoised_map = denoise_tv_chambolle( + map_as_array, + weight=weight, + eps=TV_STOP_TOLERANCE, + max_num_iter=TV_MAX_NUM_ITER, ) - entropy = validate.negentropy(TV_arr.flatten()) - TV_map = maps.make_map(TV_arr, grid_size, cell, space_group) - - return TV_map, entropy -""" - - -from joblib import Parallel, delayed - - -# @profile -def TV_filter(map, l, grid_size, cell, space_group): - """ - Apply TV filtering to a Gemmi map object. Compute negentropy for denoised array. - - Parameters : - - map : (GEMMI map object) - l : (float) lambda – regularization parameter to be used in filtering. - grid_size : (list) specifying grid dimensions for the map - cell, space_group : (list) and (str) - - Returns : - - Denoised map (GEMMI object) and associated negentropy (float) + return denoised_map + + +@overload +def tv_denoise_difference_map( + difference_map_coefficients: rs.DataSet, + full_output: Literal[False], + difference_map_amplitude_column: str = "DF", + difference_map_phase_column: str = "PHIC", + lambda_values_to_scan: Sequence[float] | None = None, +) -> rs.DataSet: ... + + +@overload +def tv_denoise_difference_map( + difference_map_coefficients: rs.DataSet, + full_output: Literal[True], + difference_map_amplitude_column: str = "DF", + difference_map_phase_column: str = "PHIC", + lambda_values_to_scan: Sequence[float] | None = None, +) -> tuple[rs.DataSet, TvDenoiseResult]: ... + + +def tv_denoise_difference_map( + difference_map_coefficients: rs.DataSet, + full_output: bool = False, + difference_map_amplitude_column: str = "DF", + difference_map_phase_column: str = "PHIC", + lambda_values_to_scan: Sequence[float] | np.ndarray | None = None, +) -> rs.DataSet | tuple[rs.DataSet, TvDenoiseResult]: + """Single-pass TV denoising of a difference map. + + Automatically selects the optimal level of regularization (the TV lambda parameter) by + maximizing the negentropy of the denoised map. Two modes can be used to dictate which + candidate values of lambda are assessed: + + 1. By default (`lambda_values_to_scan=None`), the golden-section search algorithm selects + a lambda value according to the bounds and convergence criteria set in meteor.settings. + 2. Alternatively, an explicit list of lambda values to assess can be provided using + `lambda_values_to_scan`. + + + Parameters + ---------- + difference_map_coefficients : rs.DataSet + The input dataset containing the difference map coefficients (amplitude and phase) + that will be used to compute the difference map. + full_output : bool, optional + If `True`, the function returns both the denoised map coefficients and a `TvDenoiseResult` + object containing the optimal lambda and the associated negentropy. If `False`, only + the denoised map coefficients are returned. Default is `False`. + difference_map_amplitude_column : str, optional + The column name in `difference_map_coefficients` that contains the amplitude values for + the difference map. Default is "DF". + difference_map_phase_column : str, optional + The column name in `difference_map_coefficients` that contains the phase values for the + difference map. Default is "PHIC". + lambda_values_to_scan : Sequence[float] | None, optional + A sequence of lambda values to explicitly scan for determining the optimal value. If + `None`, the function uses the golden-section search method to determine the optimal + lambda. Default is `None`. + + Returns + ------- + rs.DataSet | tuple[rs.DataSet, TvDenoiseResult] + If `full_output` is `False`, returns a `rs.DataSet`, the denoised map coefficients. + If `full_output` is `True`, returns a tuple containing: + - `rs.DataSet`: The denoised map coefficients. + - `TvDenoiseResult`: An object w/ the optimal lambda and the corresponding negentropy. + + Raises + ------ + AssertionError + If the golden-section search fails to find an optimal lambda. + + Notes + ----- + - The function is designed to maximize the negentropy of the denoised map, which is a + measure of the map's "randomness." + Higher negentropy generally corresponds to a more informative and less noisy map. + - The golden-section search is a robust method for optimizing unimodal functions, + particularly suited for scenarios where + an explicit list of candidate values is not provided. + + Example + ------- + >>> coefficients = rs.read_mtz("./path/to/difference_map.mtz") # load dataset + >>> denoised_map, result = tv_denoise_difference_map(coefficients, full_output=True) + >>> print(f"Optimal Lambda: {result.optimal_lambda}, Negentropy: {result.optimal_negentropy}") """ - - # Convert map.grid to a numpy array with a specified data type for memory efficiency - grid_array = np.array( - map.grid, dtype=np.float32 - ) # Use float32 or other appropriate type - - # Determine the number of chunks for parallel processing - num_chunks = 4 - sub_arrays = np.array_split(grid_array, num_chunks) - - # Define the parallel denoising function - def parallel_denoise(sub_array, weight, max_num_iter): - return denoise_tv_chambolle(sub_array, weight=weight, max_num_iter=max_num_iter) - - # Process in parallel - TV_arrs = Parallel(n_jobs=num_chunks)( - delayed(parallel_denoise)(sub, l, 50) for sub in sub_arrays + difference_map = compute_map_from_coefficients( + map_coefficients=difference_map_coefficients, + amplitude_label=difference_map_amplitude_column, + phase_label=difference_map_phase_column, + map_sampling=TV_MAP_SAMPLING, ) + difference_map_as_array = np.array(difference_map.grid) - # Combine sub-arrays back into one array - TV_arr = np.concatenate(TV_arrs) - - # Delete the original array to free memory - del grid_array - - # Compute negentropy in a memory-efficient manner - entropy = validate.negentropy( - TV_arr.ravel() - ) # Use ravel instead of flatten if possible + def negentropy_objective(tv_lambda: float): + denoised_map = _tv_denoise_array(map_as_array=difference_map_as_array, weight=tv_lambda) + return -1.0 * negentropy(denoised_map.flatten()) - # Create the map from the denoised array - TV_map = maps.make_map(TV_arr, grid_size, cell, space_group) - - # Delete the TV_arr to free memory - del TV_arr - - return TV_map, entropy - - -# @profile -def TV_iteration( - mtz, - diffs, - phi_diffs, - Fon, - Foff, - phicalc, - map_res, - cell, - space_group, - flags, - l, - highres, - ws, -): - """ - Go through one iteration of TV denoising Fon-Foff map + projection onto measured Fon magnitude - to obtain new phases estimates for the difference structure factors vector. + optimal_lambda: float - Parameters : - - 1. MTZ, diffs, phi_diffs, Fon, Foff, phicalc : (rsDataset) with specified structure factor and phase labels (str) - 2. map_res : (float) spacing for map generation - 3. cell, space group : (array) and (str) - 4. flags : (boolean array) where True marks reflections to keep for test set - 5. l : (float) weighting parameter for TV denoising - 6. highres : (float) high resolution cutoff for map generation - - Returns : - - 1. new amps, new phases : (1D-array) for new difference amplitude and phase estimates after the iteration - 2. test_proj_errror : (float) the magnitude of the projection of the TV difference vector onto Fon for the test set - 3. entropy : (float) negentropy of TV denoised map - 4. phase_change : (float) mean change in phases between input (phi_diffs) and output (new_phases) arrays - - """ - - # Fon - Foff and TV denoise - mtz = dsutils.positive_Fs(mtz, phi_diffs, diffs, "phases-pos", "diffs-pos") - fit_mtz = mtz[np.invert(flags)] - - fit_map = dsutils.map_from_Fs(fit_mtz, "diffs-pos", "phases-pos", map_res) - entropy = validate.negentropy(np.array(fit_map.grid).flatten()) - fit_TV_map, _ = TV_filter(fit_map, l, fit_map.grid.shape, cell, space_group) - mtz_TV = dsutils.from_gemmi(io.map2mtz(fit_TV_map, highres)) - del fit_TV_map - mtz_TV = mtz_TV[mtz_TV.index.isin(mtz.index)] - F_plus = np.array(mtz_TV["FWT"].astype("float")) - phi_plus = np.radians(np.array(mtz_TV["PHWT"].astype("float"))) - - # Call to function that does projection - new_amps, new_phases, proj_error, z = TV_projection( - np.array(mtz[Foff]).astype("float"), - np.array(mtz[Fon]).astype("float"), - np.radians(np.array(mtz[phicalc]).astype("float")), - F_plus, - phi_plus, - ws, - ) - test_proj_error = proj_error[np.array(flags).astype(bool)] - phase_changes = np.abs(np.array(mtz["phases-pos"] - new_phases)) - phase_changes = dsutils.adjust_phi_interval(phase_changes) - phase_change = np.mean(phase_changes) + # scan a specific set of lambda values and find the best one + if lambda_values_to_scan is not None: + # use no denoising as the default to beat + optimal_lambda = 0.0 # initialization + highest_negentropy = negentropy(difference_map_as_array.flatten()) - return new_amps, new_phases, test_proj_error, entropy, phase_change, z + for tv_lambda in lambda_values_to_scan: + trial_negentropy = -1.0 * negentropy_objective(tv_lambda) + if trial_negentropy > highest_negentropy: + optimal_lambda = tv_lambda + highest_negentropy = trial_negentropy - -def TV_projection(Foff, Fon, phi_calc, F_plus, phi_plus, ws): - """ - Project a TV denoised difference structure factor vector onto measured Fon magnitude - to obtain new phases estimates for the difference structure factors. - - Parameters : - - 1. Foff, Fon : (1D arrays) of measured amplitudes used for the 'Fon - Foff' difference - 2. phi_calc : (1D array) of phases calculated from refined 'off' model - 3. F_plus, phi_plus : (1D arrays) from TV denoised 'Fon - Foff' electron density map - - Returns : - - 1. new amps, new phases : (1D array) for new difference amplitude and phase estimates after the iteration - 2. proj_error : (1D array) magnitude of the projection of the TV difference vector onto Fon - - """ - - z = F_plus * np.exp(phi_plus * 1j) + Foff * np.exp(phi_calc * 1j) - p_deltaF = (Fon / np.absolute(z)) * z - Foff * np.exp(phi_calc * 1j) - - # new_amps = np.absolute(p_deltaF).astype(np.float32) * ws - new_amps = np.absolute(p_deltaF).astype(np.float32) - new_phases = np.angle(p_deltaF, deg=True).astype(np.float32) - - proj_error = np.absolute(np.absolute(z) - Fon) - - return new_amps, new_phases, proj_error, np.angle(z, deg=True) - - -# @profile -def find_TVmap( - mtz, - Flabel, - philabel, - name, - path, - map_res, - cell, - space_group, - percent=0.10, - flags=None, - highres=None, -): - """ - Find two TV denoised maps (one that maximizes negentropy and one that minimizes free set error) from an initial mtz and associated information. - - Optional arguments are whether to generate a new set of free (test) reflections – and specify what percentage of total reflections to reserve for this - - and whether to apply a resolution cutoff. - Screen the regularization parameter (lambda) from 0 to 0.1 - - Required Parameters : - - 1. MTZ, Flabel, philabefl : (rsDataset) with specified structure factor and phase labels (str) - 2. name, path : (str) for test/fit set and MTZ output - 3. map_res : (float) spacing for map generation - 4. cell, space group : (array) and (str) - - Returns : - - 1. The two optimal TV denoised maps (GEMMI objects) and corresponding regularization parameter used (float) - 2. Errors and negentropy values from the regularization screening (numpy arrays) - 3. Mean changes in amplitude and phase between TV-denoised and observed datasets - - """ - - mtz_pos = dsutils.positive_Fs(mtz, philabel, Flabel, "ogPhis_pos", "ogFs_pos") - - # if Rfree flags were specified, use these - if flags is not None: - test_set, fit_set, choose_test = validate.make_test_set( - mtz_pos, percent, "ogFs_pos", name, path, flags - ) # choose_test is Boolean array to select free (test) reflections from entire set - - # Keep 3% of reflections for test set + # use golden ratio optimization to pick an optimal lambda else: - test_set, fit_set, choose_test = validate.make_test_set( - mtz_pos, percent, "ogFs_pos", name, path - ) - - # Loop through values of lambda - - print("Scanning TV weights") - lambdas = np.linspace(1e-8, 0.4, 100) - # lambdas = np.linspace(1e-8, 0.1, 10) - # lambdas = [1e-8] - errors = [] - entropies = [] - amp_changes = [] - phase_changes = [] - - for l in tqdm(lambdas): - fit_map = dsutils.map_from_Fs(mtz_pos, "fit-set", "ogPhis_pos", map_res) - fit_TV_map, entropy = TV_filter( - fit_map, l, fit_map.grid.shape, cell, space_group - ) - - if highres is not None: - Fs_fit_TV = dsutils.from_gemmi(io.map2mtz(fit_TV_map, highres)) - else: - Fs_fit_TV = dsutils.from_gemmi( - io.map2mtz(fit_TV_map, np.min(mtz_pos.compute_dHKL()["dHKL"])) - ) - - Fs_fit_TV = Fs_fit_TV[Fs_fit_TV.index.isin(mtz_pos.index)] - test_TV = Fs_fit_TV["FWT"][choose_test] - amp_change = np.array(mtz_pos["ogFs_pos"]) - np.array(Fs_fit_TV["FWT"]) - phase_change = np.abs( - np.array(mtz_pos["ogPhis_pos"]) - np.array(Fs_fit_TV["PHWT"]) + optimizer_result = minimize_scalar( + negentropy_objective, bracket=TV_LAMBDA_RANGE, method="golden" ) + assert optimizer_result.success, "Golden minimization failed to find optimal TV lambda" + optimal_lambda = optimizer_result.x + highest_negentropy = negentropy_objective(optimal_lambda) + + # denoise using the optimized parameters and convert to an rs.DataSet + final_map = _tv_denoise_array(map_as_array=difference_map_as_array, weight=optimal_lambda) + final_map_as_ccp4 = numpy_array_to_map( + final_map, + spacegroup=difference_map_coefficients.spacegroup, + cell=difference_map_coefficients.cell, + ) + _, dmin = resolution_limits(difference_map_coefficients) + final_map_coefficients = compute_coefficients_from_map( + ccp4_map=final_map_as_ccp4, + high_resolution_limit=dmin, + amplitude_label=difference_map_amplitude_column, + phase_label=difference_map_phase_column, + ) - phase_change = dsutils.adjust_phi_interval(phase_change) - # error = np.sum(np.array(test_set) - np.array(test_TV)) ** 2 - error = np.sum(np.array(test_set) - np.array(test_TV)) / np.sum( - np.array(test_set) + if full_output: + tv_result = TvDenoiseResult( + optimal_lambda=optimal_lambda, + optimal_negentropy=highest_negentropy, + map_sampling_used_for_tv=TV_MAP_SAMPLING, ) - errors.append(error) - entropies.append(entropy) - amp_changes.append(amp_change) - phase_changes.append(phase_change) - - # Normalize errors - # errors = np.array(errors) / len(errors) - entropies = np.array(entropies) - print("ERRORS", errors) - print("ENTROPIES", entropies) - print("TEST SET IS", test_TV) + return final_map_coefficients, tv_result + else: + return final_map_coefficients - # Find lambda that minimizes error and that maximizes negentropy - lambda_best_err = lambdas[np.argmin(errors)] - lambda_best_entr = lambdas[np.argmax(entropies)] - print("BEST ENTROPY VALUE : ", np.max(entropies)) - print("ENTROPY FOR BEST ERROR :", entropies[np.argmin(errors)]) - TVmap_best_err, _ = TV_filter( - fit_map, lambda_best_err, fit_map.grid.shape, cell, space_group - ) - TVmap_best_entr, _ = TV_filter( - fit_map, lambda_best_entr, fit_map.grid.shape, cell, space_group - ) - return ( - TVmap_best_err, - TVmap_best_entr, - lambda_best_err, - lambda_best_entr, - errors, - entropies, - amp_changes, - phase_changes, - ) # , phase_corrs +def iterative_tv_phase_retrieval(): + raise NotImplementedError() diff --git a/meteor/utils.py b/meteor/utils.py new file mode 100644 index 0000000..e6142de --- /dev/null +++ b/meteor/utils.py @@ -0,0 +1,140 @@ +from dataclasses import dataclass +from typing import Literal, overload + +import gemmi +import numpy as np +import reciprocalspaceship as rs + +GEMMI_HIGH_RESOLUTION_BUFFER = 1e-6 + + +@dataclass +class MapLabels: + amplitude: str + phase: str + + +def resolution_limits(dataset: rs.DataSet) -> tuple[float, float]: + d_hkl = dataset.compute_dHKL()["dHKL"] + return d_hkl.max(), d_hkl.min() + + +def cut_resolution( + dataset: rs.DataSet, + *, + dmax_limit: float | None = None, + dmin_limit: float | None = None, +) -> rs.DataSet: + d_hkl = dataset.compute_dHKL()["dHKL"] + if dmax_limit: + dataset = dataset.loc[(d_hkl <= dmax_limit)] + if dmin_limit: + dataset = dataset.loc[(d_hkl >= dmin_limit)] + return dataset + + +@overload +def canonicalize_amplitudes( + dataset: rs.DataSet, + *, + amplitude_label: str, + phase_label: str, + inplace: Literal[False], +) -> rs.DataSet: ... + + +@overload +def canonicalize_amplitudes( + dataset: rs.DataSet, + *, + amplitude_label: str, + phase_label: str, + inplace: Literal[True], +) -> None: ... + + +def canonicalize_amplitudes( + dataset: rs.DataSet, + *, + amplitude_label: str, + phase_label: str, + inplace: bool = False, +) -> rs.DataSet | None: + if not inplace: + dataset = dataset.copy(deep=True) + + negative_amplitude_indices = dataset[amplitude_label] < 0.0 + dataset[amplitude_label] = np.abs(dataset[amplitude_label]) + dataset.loc[negative_amplitude_indices, phase_label] += 180.0 + + dataset.canonicalize_phases(inplace=True) + + if not inplace: + return dataset + else: + return None + + +def numpy_array_to_map( + array: np.ndarray, + *, + spacegroup: str | int | gemmi.SpaceGroup, + cell: tuple[float, float, float, float, float, float] | gemmi.UnitCell, +) -> gemmi.Ccp4Map: + ccp4_map = gemmi.Ccp4Map() + ccp4_map.grid = gemmi.FloatGrid(array.astype(np.float32)) + + if isinstance(cell, gemmi.UnitCell): + ccp4_map.grid.unit_cell = cell + else: + ccp4_map.grid.unit_cell.set(*cell) + + if not isinstance(spacegroup, gemmi.SpaceGroup): + spacegroup = gemmi.SpaceGroup(spacegroup) + ccp4_map.grid.spacegroup = spacegroup + + return ccp4_map + + +def compute_map_from_coefficients( + *, + map_coefficients: rs.DataSet, + amplitude_label: str, + phase_label: str, + map_sampling: int, +) -> gemmi.Ccp4Map: + map_coefficients_gemmi_format = map_coefficients.to_gemmi() + ccp4_map = gemmi.Ccp4Map() + ccp4_map.grid = map_coefficients_gemmi_format.transform_f_phi_to_map( + amplitude_label, phase_label, sample_rate=map_sampling + ) + ccp4_map.update_ccp4_header() + + return ccp4_map + + +def compute_coefficients_from_map( + *, + ccp4_map: gemmi.Ccp4Map, + high_resolution_limit: float, + amplitude_label: str, + phase_label: str, +) -> rs.DataSet: + # to ensure we include the final shell of reflections, add a small buffer to the resolution + + gemmi_structure_factors = gemmi.transform_map_to_f_phi(ccp4_map.grid, half_l=False) + data = gemmi_structure_factors.prepare_asu_data( + dmin=high_resolution_limit - GEMMI_HIGH_RESOLUTION_BUFFER, with_sys_abs=True + ) + + mtz = gemmi.Mtz(with_base=True) + mtz.spacegroup = gemmi_structure_factors.spacegroup + mtz.set_cell_for_all(gemmi_structure_factors.unit_cell) + mtz.add_dataset("FromMap") + mtz.add_column(amplitude_label, "F") + mtz.add_column(phase_label, "P") + + mtz.set_data(data) + mtz.switch_to_asu_hkl() + + return rs.DataSet.from_gemmi(mtz) diff --git a/meteor/validate.py b/meteor/validate.py index 7eef2a6..0f2140e 100644 --- a/meteor/validate.py +++ b/meteor/validate.py @@ -1,99 +1,50 @@ import numpy as np from scipy.stats import differential_entropy -from . import mask +def negentropy(samples: np.ndarray, tolerance: float = 0.1) -> float: + """Computes the negentropy of a given sample array. -def negentropy(X): - """ - Return negentropy (float) of X (numpy array) - """ + Negentropy is defined as the difference between the entropy of a given + distribution and the entropy of a Gaussian distribution with the same variance. + It is a measure of non-Gaussianity, with higher values indicating greater deviation + from Gaussianity. - # negetropy is the difference between the entropy of samples x - # and a Gaussian with same variance - # http://gregorygundersen.com/blog/2020/09/01/gaussian-entropy/ + Args: + ---- + samples (np.ndarray): A numpy array of sample data for which to calculate the negentropy. + tolerance (float): Tolerance level determining if the negentropy is suspiciously negative. + Defaults to 0.01. - std = np.std(X) - # neg_e = np.log(std*np.sqrt(2*np.pi*np.exp(1))) - differential_entropy(X) - neg_e = 0.5 * np.log(2.0 * np.pi * std**2) + 0.5 - differential_entropy(X) - # assert neg_e >= 0.0 + 1e-8 - - return neg_e - - -def make_test_set(df, percent, Fs, out_name, path, flags=False): - """ - Write MTZ file where data from an original MTZ has been divided in a "fit" set and a "test" set. + Returns: + ------- + float: The computed negentropy of the sample data. - Additionally save test set indices as numpy object. + Raises: + ------ + ValueError: If the computed negentropy is less than the negative tolerance, + indicating potential issues with the computation. - Required parameters : + References: + ---------- + http://gregorygundersen.com/blog/2020/09/01/gaussian-entropy/ - df : (rs.Dataset) to split - percent : (float) fraction of reflections to keep for test set – e.g. 0.03 - Fs : (str) labels for structure factors to split - out_name, path : (str) and (str) output file name and path specifications - - Returns : - - test_set, fit_set : (rs.Dataset) and (rs.Dataset) for the two sets - choose_test : (1D array) containing test data indices as boolean type + Example: + ------- + >>> samples = np.random.normal(size=1000) + >>> negentropy(samples) + 0.0012 # Example output, varies with input samples. """ - if flags is not False: - choose_test = df[flags] == 0 - - else: - np.random.seed(42) - choose_test = np.random.binomial(1, percent, df[Fs].shape[0]).astype(bool) - test_set = df[Fs][choose_test] # e.g. 3% - fit_set = df[Fs][np.invert(choose_test)] # 97% + std = np.std(samples.flatten()) + if std <= 0.0: + return -np.inf - df["fit-set"] = fit_set - df["fit-set"] = df["fit-set"].astype("SFAmplitude") - df["test-set"] = test_set - df["test-set"] = df["test-set"].astype("SFAmplitude") - df.infer_mtz_dtypes(inplace=True) - df.write_mtz("{path}split-{name}.mtz".format(path=path, name=out_name)) - np.save("{path}test_flags-{name}.npy".format(path=path, name=out_name), choose_test) + neg_e = 0.5 * np.log(2.0 * np.pi * std**2) + 0.5 - differential_entropy(samples.flatten()) + if not neg_e >= -tolerance: + raise ValueError( + f"negentropy is a large negative number {neg_e}, exceeds the tolerance {tolerance}" + " -- something may have gone wrong" + ) - return test_set, fit_set, choose_test - - -def get_corrdiff(on_map, off_map, center, radius, pdb, cell, spacing): - """ - Function to find the correlation coefficient difference between two maps in local and global regions. - - FIRST applies solvent mask to 'on' and an 'off' map. - THEN applies a mask around a specified region - - Parameters : - - on_map, off_map : (GEMMI objects) to be compared - center : (numpy array) XYZ coordinates in PDB for the region of interest - radius : (float) radius for local region of interest - pdb, cell : (str) and (list) PDB file name and cell information - spacing : (float) spacing to generate solvent mask - - Returns : - - diff : (float) difference between local and global correlation coefficients of 'on'-'off' values - CC_loc, CC_glob : (numpy array) local and global correlation coefficients of 'on'-'off' values - - """ - - off_a = np.array(off_map.grid) - on_a = np.array(on_map.grid) - on_nosolvent = np.nan_to_num(mask.solvent_mask(pdb, cell, on_a, spacing)) - off_nosolvent = np.nan_to_num(mask.solvent_mask(pdb, cell, off_a, spacing)) - reg_mask = mask.get_mapmask(on_map.grid, center, radius) - - loc_reg = np.array(reg_mask, copy=True).flatten().astype(bool) - CC_loc = np.corrcoef(on_a.flatten()[loc_reg], off_a.flatten()[loc_reg])[0, 1] - CC_glob = np.corrcoef( - on_nosolvent[np.logical_not(loc_reg)], off_nosolvent[np.logical_not(loc_reg)] - )[0, 1] - - diff = np.array(CC_glob) - np.array(CC_loc) - - return diff, CC_loc, CC_glob + return neg_e diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..685ac4c --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,53 @@ +[project] +name = "meteor" +description = "denoise crystallographic difference maps" +version = "0.2.0" +authors = [ + { name = "Alisia Fadini", email = "af840@cam.ac.uk" }, + { name = "Thomas Lane", email = "thomas.lane@desy.de" } +] +dependencies = [ + "numpy", + "scipy", + "gemmi", + "scikit-image", + "reciprocalspaceship", + "pytest", +] + +[project.optional-dependencies] +dev = [ + "pre-commit", +] + +[build-system] +requires = ["setuptools >= 61.0"] +build-backend = "setuptools.build_meta" + +[tool.mypy] +[[tool.mypy.overrides]] +module = "reciprocalspaceship.*,gemmi.*,scipy.stats.*,scipy.optimize.*" +ignore_missing_imports = true + +[tool.ruff] +line-length = 100 +lint.flake8-pytest-style.fixture-parentheses = false +lint.select = [ + "E", # pycodestyle (PEP 8) rules + "F", # pyflakes rules + "W", # warnings like trailing whitespace + "C90", # specific rules for use of commas, e.g., avoid trailing commas + "I", # isort rules for import sorting + "N", # flake8 naming conventions + "Q", # quote rules (e.g., enforcing consistent quote usage) + "PT", # flake8-pytest-style rules for pytest +] +exclude = [ + "build/", + "dist/", + "migrations/", + ".venv/", + ".git/", + "__pycache__/", + "*.pyc", +] diff --git a/setup.py b/setup.py deleted file mode 100755 index 88f8a27..0000000 --- a/setup.py +++ /dev/null @@ -1,30 +0,0 @@ -DESCRIPTION = "Scripts for crystallographic electron density maps containing low occupancy species" -LONG_DESCRIPTION = """ -METEOR contains commandline scripts and functions for finding low occupancy species in crystallographic maps. -This module will be part of reciprocalspaceship and its "booster" package, rs-booster. -""" - -try: - from setuptools import setup, find_packages - -except ImportError: - from disutils.core import setup - - -setup( - name="meteor", - description=DESCRIPTION, - long_description=LONG_DESCRIPTION, - author="Alisia Fadini", - author_email="alisia.fadini15@imperial.ac.uk", - install_requires=["reciprocalspaceship", "matplotlib", "seaborn"], - packages=find_packages(), - entry_points={ - "console_scripts": [ - "meteor.bckgr_subtract=meteor.background_subtraction.make_Nbgmax_map:main", - "meteor.diffmap=meteor.background_subtraction.make_diffmap:main", - "meteor.tv_denoise=meteor.TV_filtering.tv_denoise_map:main", - "meteor.iterative_TV=meteor.TV_filtering.iterative_tv:main", - ] - }, -) diff --git a/test/conftest.py b/test/conftest.py new file mode 100644 index 0000000..b0bd51a --- /dev/null +++ b/test/conftest.py @@ -0,0 +1,49 @@ +import gemmi +import numpy as np +import pytest +import reciprocalspaceship as rs + +from meteor.utils import MapLabels, canonicalize_amplitudes + + +@pytest.fixture +def diffmap_labels() -> MapLabels: + return MapLabels( + amplitude="DF", + phase="PHIC", + ) + + +@pytest.fixture +def random_difference_map(diffmap_labels: MapLabels) -> rs.DataSet: + resolution = 1.0 + cell = gemmi.UnitCell(10.0, 10.0, 10.0, 90.0, 90.0, 90.0) + space_group = gemmi.SpaceGroup(1) + hall = rs.utils.generate_reciprocal_asu(cell, space_group, resolution, anomalous=False) + + h, k, l = hall.T # noqa: E741 + number_of_reflections = len(h) + + ds = rs.DataSet( + { + "H": h, + "K": k, + "L": l, + diffmap_labels.amplitude: np.random.randn(number_of_reflections), + diffmap_labels.phase: np.random.uniform(-180, 180, size=number_of_reflections), + }, + spacegroup=space_group, + cell=cell, + ).infer_mtz_dtypes() + + ds.set_index(["H", "K", "L"], inplace=True) + ds["DF"] = ds["DF"].astype("SFAmplitude") + + canonicalize_amplitudes( + ds, + amplitude_label="DF", + phase_label="PHIC", + inplace=True, + ) + + return ds diff --git a/test/test_dsutils.py b/test/test_dsutils.py deleted file mode 100644 index 4189ce3..0000000 --- a/test/test_dsutils.py +++ /dev/null @@ -1,48 +0,0 @@ -import pytest -import numpy as np -import reciprocalspaceship as rs -from pathlib import Path -from meteor import dsutils - -@pytest.fixture -def dummy_mtz(): - data = np.array([1, 2, 3, 4, 5]) - dhkl = np.array([1.1, 1.2, 1.3, 1.4, 1.5]) - dataset = rs.DataSet({"data": data, "dHKL": dhkl}) - return dataset - -def test_res_cutoff(dummy_mtz): - df = dummy_mtz - h_res = 1.2 - l_res = 1.4 - - filtered_df = dsutils.res_cutoff(df, h_res, l_res) - - assert len(filtered_df) == 2 - assert filtered_df["data"].tolist() == [2, 3] - -def test_res_cutoff_raises_error(dummy_mtz): - df = dummy_mtz - h_res = 1.5 - l_res = 1.2 - - with pytest.raises(ValueError): - dsutils.res_cutoff(df, h_res, l_res) - -def test_resolution_shells(dummy_mtz): - data = dummy_mtz["data"] - dhkl = dummy_mtz["dHKL"] - n = 2 - - bin_centers, mean_data = dsutils.resolution_shells(data, dhkl, n) - - assert len(bin_centers) == n - assert len(mean_data.statistic) == n - -def test_resolution_shells_raises_error(dummy_mtz): - data = dummy_mtz["data"] - dhkl = dummy_mtz["dHKL"] - n = 10 - - with pytest.raises(ValueError): - dsutils.resolution_shells(data, dhkl, n) \ No newline at end of file diff --git a/test/test_io.py b/test/test_io.py deleted file mode 100644 index 0f6835b..0000000 --- a/test/test_io.py +++ /dev/null @@ -1,26 +0,0 @@ - -import pathlib - -from meteor import io - -# todo this is probably the right way to do this -#PDB_FILE = pathlib.Path(__file__) / 'data' / 'dark.pdb' -PDB_FILE = './data/dark.pdb' - - -def test_get_pdbinfo(): - - unit_cell, space_group = io.get_pdbinfo(PDB_FILE) - - assert unit_cell == [51.99, 62.91, 72.03, 90.0, 90.0, 90.0], 'unit cell incorrect' - assert space_group == 'P212121', 'space group incorrect' - - # todo write a file without CRYST1 - # check to make sure that fails - - return - - -if __name__ == '__main__': - test_get_pdbinfo() - diff --git a/test/unit/test_tv.py b/test/unit/test_tv.py new file mode 100644 index 0000000..51b2679 --- /dev/null +++ b/test/unit/test_tv.py @@ -0,0 +1,180 @@ +from typing import Sequence + +import gemmi +import numpy as np +import pytest +import reciprocalspaceship as rs + +from meteor import tv +from meteor.utils import MapLabels, compute_coefficients_from_map, compute_map_from_coefficients + + +def _generate_single_carbon_density( + carbon_position: tuple[float, float, float], + space_group: gemmi.SpaceGroup, + unit_cell: gemmi.UnitCell, + d_min: float, +) -> gemmi.FloatGrid: + model = gemmi.Model("single_atom") + chain = gemmi.Chain("A") + + residue = gemmi.Residue() + residue.name = "X" + residue.seqid = gemmi.SeqId("1") + + atom = gemmi.Atom() + atom.name = "C" + atom.element = gemmi.Element("C") + atom.pos = gemmi.Position(*carbon_position) + + residue.add_atom(atom) + chain.add_residue(residue) + model.add_chain(chain) + + structure = gemmi.Structure() + structure.add_model(model) + structure.cell = unit_cell + structure.spacegroup_hm = space_group.hm + + density_map = gemmi.DensityCalculatorX() + density_map.d_min = d_min + density_map.grid.setup_from(structure) + density_map.put_model_density_on_grid(structure[0]) + + return density_map.grid + + +def displaced_single_atom_difference_map_coefficients( + *, + noise_sigma: float, +) -> rs.DataSet: + unit_cell = gemmi.UnitCell(a=10.0, b=10.0, c=10.0, alpha=90, beta=90, gamma=90) + space_group = gemmi.find_spacegroup_by_name("P1") + d_min = 1.0 + + carbon_position1 = (5.0, 5.0, 5.0) + carbon_position2 = (5.1, 5.0, 5.0) + + density1 = _generate_single_carbon_density(carbon_position1, space_group, unit_cell, d_min) + density2 = _generate_single_carbon_density(carbon_position2, space_group, unit_cell, d_min) + + ccp4_map = gemmi.Ccp4Map() + grid_values = ( + np.array(density2) - np.array(density1) + noise_sigma * np.random.randn(*density2.shape) + ) + ccp4_map.grid = gemmi.FloatGrid(grid_values.astype(np.float32), unit_cell, space_group) + ccp4_map.update_ccp4_header() + + difference_map_coefficients = compute_coefficients_from_map( + ccp4_map=ccp4_map, + high_resolution_limit=1.0, + amplitude_label="DF", + phase_label="PHIC", + ) + assert (difference_map_coefficients.max() > 0.0).any() + + return difference_map_coefficients + + +def rms_between_coefficients(ds1: rs.DataSet, ds2: rs.DataSet, diffmap_labels: MapLabels) -> float: + map1 = compute_map_from_coefficients( + map_coefficients=ds1, + amplitude_label=diffmap_labels.amplitude, + phase_label=diffmap_labels.phase, + map_sampling=3, + ) + map2 = compute_map_from_coefficients( + map_coefficients=ds2, + amplitude_label=diffmap_labels.amplitude, + phase_label=diffmap_labels.phase, + map_sampling=3, + ) + + map1_array = np.array(map2.grid) + map2_array = np.array(map1.grid) + + map1_array /= map1_array.std() + map2_array /= map2_array.std() + + rms = float(np.linalg.norm(map2_array - map1_array)) + + return rms + + +@pytest.fixture +def noise_free_map() -> rs.DataSet: + return displaced_single_atom_difference_map_coefficients(noise_sigma=0.0) + + +@pytest.fixture +def noisy_map() -> rs.DataSet: + return displaced_single_atom_difference_map_coefficients(noise_sigma=0.03) + + +@pytest.mark.parametrize( + "lambda_values_to_scan", + [ + None, + [ + 0.01, + ], + ], +) +@pytest.mark.parametrize("full_output", [False, True]) +def test_tv_denoise_difference_map_smoke( + lambda_values_to_scan: None | Sequence[float], + full_output: bool, + noisy_map: rs.DataSet, +) -> None: + output = tv.tv_denoise_difference_map( + difference_map_coefficients=noisy_map, + lambda_values_to_scan=lambda_values_to_scan, + full_output=full_output, + ) # type: ignore + if full_output: + assert len(output) == 2 + assert isinstance(output[0], rs.DataSet) + assert isinstance(output[1], tv.TvDenoiseResult) + else: + assert isinstance(output, rs.DataSet) + + +@pytest.mark.parametrize("lambda_values_to_scan", [None, np.logspace(-3, 2, 100)]) +def test_tv_denoise_difference_map( + lambda_values_to_scan: None | Sequence[float], + noise_free_map: rs.DataSet, + noisy_map: rs.DataSet, + diffmap_labels: MapLabels, +) -> None: + rms_before_denoising = rms_between_coefficients(noise_free_map, noisy_map, diffmap_labels) + denoised_map, result = tv.tv_denoise_difference_map( + difference_map_coefficients=noisy_map, + lambda_values_to_scan=lambda_values_to_scan, + full_output=True, + ) + rms_after_denoising = rms_between_coefficients(noise_free_map, denoised_map, diffmap_labels) + assert rms_after_denoising < rms_before_denoising + + # print("xyz", result.optimal_lambda, rms_before_denoising, rms_after_denoising) + + # testmap = compute_map_from_coefficients( + # map_coefficients=noise_free_map, + # amplitude_label=diffmap_labels.amplitude, + # phase_label=diffmap_labels.phase, + # map_sampling=1, + # ) + # testmap.write_ccp4_map("original.ccp4") + # testmap = compute_map_from_coefficients( + # map_coefficients=noisy_map, + # amplitude_label=diffmap_labels.amplitude, + # phase_label=diffmap_labels.phase, + # map_sampling=1, + # ) + # testmap.write_ccp4_map("noisy.ccp4") + # testmap = compute_map_from_coefficients( + # map_coefficients=denoised_map, + # amplitude_label=diffmap_labels.amplitude, + # phase_label=diffmap_labels.phase, + # map_sampling=1, + # ) + # testmap.write_ccp4_map("denoised.ccp4") diff --git a/test/unit/test_utils.py b/test/unit/test_utils.py new file mode 100644 index 0000000..e150e47 --- /dev/null +++ b/test/unit/test_utils.py @@ -0,0 +1,119 @@ +import gemmi +import numpy as np +import pandas as pd +import pytest +import reciprocalspaceship as rs + +from meteor import utils + + +def omit_nones_in_list(input_list: list) -> list: + return [x for x in input_list if x] + + +def test_resolution_limits(random_difference_map: rs.DataSet) -> None: + dmax, dmin = utils.resolution_limits(random_difference_map) + assert dmax == 10.0 + assert dmin == 1.0 + + +@pytest.mark.parametrize( + ("dmax_limit", "dmin_limit"), + [ + (None, None), + (None, 2.0), + (8.0, None), + (8.0, 2.0), + ], +) +def test_cut_resolution( + random_difference_map: rs.DataSet, dmax_limit: float, dmin_limit: float +) -> None: + dmax_before_cut, dmin_before_cut = utils.resolution_limits(random_difference_map) + expected_dmax_upper_bound: float = max(omit_nones_in_list([dmax_before_cut, dmax_limit])) + expected_dmin_lower_bound: float = min(omit_nones_in_list([dmin_before_cut, dmin_limit])) + + random_intensities = utils.cut_resolution( + random_difference_map, dmax_limit=dmax_limit, dmin_limit=dmin_limit + ) + assert len(random_intensities) > 0 + + dmax, dmin = utils.resolution_limits(random_intensities) + assert dmax <= expected_dmax_upper_bound + assert dmin >= expected_dmin_lower_bound + + +@pytest.mark.parametrize("inplace", [False, True]) +def test_canonicalize_amplitudes( + inplace: bool, random_difference_map: rs.DataSet, diffmap_labels: utils.MapLabels +) -> None: + # ensure at least one amplitude is negative, one phase is outside [-180,180) + index_single_hkl = 0 + random_difference_map.loc[index_single_hkl, diffmap_labels.amplitude] = -1.0 + random_difference_map.loc[index_single_hkl, diffmap_labels.phase] = -470.0 + + if inplace: + canonicalized = random_difference_map + utils.canonicalize_amplitudes( + canonicalized, + amplitude_label=diffmap_labels.amplitude, + phase_label=diffmap_labels.phase, + inplace=inplace, + ) + else: + canonicalized = utils.canonicalize_amplitudes( + random_difference_map, + amplitude_label=diffmap_labels.amplitude, + phase_label=diffmap_labels.phase, + inplace=inplace, + ) + + assert (canonicalized[diffmap_labels.amplitude] >= 0.0).all(), "not all amplitudes positive" + assert (canonicalized[diffmap_labels.phase] >= -180.0).all(), "not all phases > -180" + assert (canonicalized[diffmap_labels.phase] <= 180.0).all(), "not all phases < +180" + + np.testing.assert_almost_equal( + np.array(np.abs(random_difference_map[diffmap_labels.amplitude])), + np.array(canonicalized[diffmap_labels.amplitude]), + ) + + +def test_compute_map_from_coefficients( + random_difference_map: rs.DataSet, diffmap_labels: utils.MapLabels +) -> None: + map = utils.compute_map_from_coefficients( + map_coefficients=random_difference_map, + amplitude_label=diffmap_labels.amplitude, + phase_label=diffmap_labels.phase, + map_sampling=1, + ) + assert isinstance(map, gemmi.Ccp4Map) + + +@pytest.mark.parametrize("map_sampling", [1, 2, 2.25, 3, 5]) +def test_map_to_coefficients_round_trip( + map_sampling: int, random_difference_map: rs.DataSet, diffmap_labels: utils.MapLabels +) -> None: + map = utils.compute_map_from_coefficients( + map_coefficients=random_difference_map, + amplitude_label=diffmap_labels.amplitude, + phase_label=diffmap_labels.phase, + map_sampling=map_sampling, + ) + + _, dmin = utils.resolution_limits(random_difference_map) + + output_coefficients = utils.compute_coefficients_from_map( + ccp4_map=map, + high_resolution_limit=dmin, + amplitude_label=diffmap_labels.amplitude, + phase_label=diffmap_labels.phase, + ) + + utils.canonicalize_amplitudes( + output_coefficients, + amplitude_label=diffmap_labels.amplitude, + phase_label=diffmap_labels.phase, + inplace=True, + ) + pd.testing.assert_frame_equal(left=random_difference_map, right=output_coefficients, atol=0.5) diff --git a/test/unit/test_validate.py b/test/unit/test_validate.py new file mode 100644 index 0000000..710fa7d --- /dev/null +++ b/test/unit/test_validate.py @@ -0,0 +1,26 @@ +import numpy as np + +from meteor import validate + + +def test_negentropy_gaussian() -> None: + n_samples = 10000 + samples = np.random.normal(size=n_samples) + negentropy = validate.negentropy(samples) + + # negentropy should be zero for a Gaussian sample + assert np.abs(negentropy) < 1e-2 + + +def test_negentropy_uniform() -> None: + n_samples = 1000000 + samples = np.random.uniform(size=n_samples) + negentropy = validate.negentropy(samples) + + uniform_negentropy = (1.0 / 2.0) * np.log(np.pi * np.exp(1) / 6.0) + assert np.abs(negentropy - uniform_negentropy) < 1e-2 + + +def test_negentropy_zero() -> None: + negentropy = validate.negentropy(np.zeros(100)) + assert negentropy == -np.inf