Skip to content

Commit

Permalink
Merge pull request #7 from mj-will/udpdate-hm-pr
Browse files Browse the repository at this point in the history
Add support for HoM
  • Loading branch information
WuShichao authored May 14, 2024
2 parents 39c3c7b + c35104c commit d4702cf
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 42 deletions.
187 changes: 157 additions & 30 deletions BBHX_PhenomD.py → BBHX_Phenom.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,64 @@
from bbhx.utils.constants import MTSUN_SI, YRSID_SI, L_SI
from bbhx.waveformbuild import BBHWaveformFD
from pycbc.coordinates import TIME_OFFSET_20_DEGREES, lisa_to_ssb, ssb_to_lisa
from warnings import warn


@functools.lru_cache(maxsize=128)
def get_waveform_genner(log_mf_min, run_phenomd=True):
# See below where this function is called for description of how we handle
# log_mf_min.
mf_min = math.exp(log_mf_min/25.)
wave_gen = BBHWaveformFD(amp_phase_kwargs=dict(run_phenomd=run_phenomd, mf_min=mf_min))
wave_gen = BBHWaveformFD(
amp_phase_kwargs=dict(run_phenomd=run_phenomd, mf_min=mf_min),
)
return wave_gen


@functools.lru_cache(maxsize=10)
def cached_arange(start, stop, spacing):
return np.arange(start, stop, spacing)

def chirptime(m1, m2, f_lower):

@functools.lru_cache(maxsize=128)
def cached_freq_logspace(f_lower, num_interp):
return np.logspace(math.log10(f_lower), 0, num=num_interp)


def chirptime(m1, m2, f_lower, m_mode=None):
"""Compute the chirptime.
Defaults for the (2,2) mode.
"""
from pycbc.waveform.spa_tmplt import findchirp_chirptime

# Find the (2,2) mode duration
duration = findchirp_chirptime(m1=m1, m2=m2, fLower=f_lower, porder=7)
# If a mode is specified, convert to that mode
if m_mode is not None:
# See https://arxiv.org/abs/2005.08830, eqs. 2-3
factor = (2 / (m_mode)) ** (-8 / 3)
duration *= factor
return duration


def imr_duration(**params):
# More accurate duration (within LISA frequency band) of the waveform,
# including merge, ringdown, and aligned spin effects.
# This is used in the time-domain signal injection in PyCBC.
import warnings
from pycbc.waveform.waveform import imrphenomd_length_in_time

nparams = {'mass1':params['mass1'], 'mass2':params['mass2'],
'spin1z':params['spin1z'], 'spin2z':params['spin2z'],
'f_lower':params['f_lower']}
time_length = np.float64(imrphenomd_length_in_time(**nparams))

if params['approximant'] == 'BBHX_IMRPhenomD':
from pycbc.waveform.waveform import imrphenomd_length_in_time
time_length = np.float64(imrphenomd_length_in_time(**nparams))
elif params['approximant'] == 'BBHX_IMRPhenomHM':
from pycbc.waveform.waveform import imrphenomhm_length_in_time
time_length = np.float64(imrphenomhm_length_in_time(**nparams))

if time_length < 2678400:
warnings.warn("Waveform duration is too short! Setting it to 1 month (2678400 s).")
time_length = 2678400
Expand All @@ -44,19 +71,80 @@ def imr_duration(**params):
time_length = params['t_obs_start']
return time_length * 1.1

def interpolated_tf(m1, m2):
# Using findchirp_chirptime in PyCBC to calculate

def interpolated_tf(m1, m2, m_mode=None, num_interp=100, f_lower=1e-4):
"""Interpolate the time frequency-track.
Defaults to the dominant (2,2) mode and uses :code:`chirptime` to compute
the track.
"""
# Using findchirp_chirptime in PyCBC to calculate
# the time-frequency track of dominant mode to get
# the corresponding `f_min` for `t_obs_start`.
freq_array = np.logspace(-4, 0, num=10)
t_array = np.zeros(len(freq_array))
for i in range(len(freq_array)):
t_array[i] = chirptime(m1=m1, m2=m2, f_lower=freq_array[i])
freq_array = cached_freq_logspace(f_lower, num_interp)
t_array = chirptime(m1=m1, m2=m2, f_lower=freq_array, m_mode=m_mode)
tf_track = interp1d(t_array, freq_array)
return tf_track

def bbhx_fd(ifos=None, run_phenomd=True, tdi=None,
ref_frame='LISA', sample_points=None, **params):

def waveform_setup(**kwargs):
if kwargs['approximant'] == "BBHX_PhenomD":
if kwargs.get('mode_array') is not None and len(kwargs['mode_array']) != 1:
raise RuntimeError("BBHX_PhenomD only supports the (2,2) mode!")
kwargs['mode_array'] = [(2, 2)]
return _bbhx_fd(run_phenomd=True, **kwargs)
elif kwargs['approximant'] == "BBHX_PhenomHM":
if kwargs.get('mode_array') is None:
kwargs['mode_array'] = [(2, 2), (2, 1), (3, 3), (3, 2), (4, 4), (4, 3)]
return _bbhx_fd(run_phenomd=False, **kwargs)
else:
raise ValueError(f"Invalid approximant: {kwargs['approximant']}")


def _bbhx_fd(
ifos=None,
run_phenomd=True,
ref_frame='LISA',
tdi=None,
sample_points=None,
length=1024,
direct=False,
num_interp=100,
interp_f_lower=1e-4,
**params
):

"""Function to generate frequency-domain waveforms using BBHx.
Parameters
----------
ifos : list
List of interferometers
run_phenomd : bool
Flag passed to :code:`bbhx.waveformbuild.BBHWaveformFD` that determines
if PhenomD or PhenomHM is used.
tdi : {'1.5', '2.0}
Version of TDI to use.
ref_frame : {'LISA', 'SSB'}
Reference frame.
samples_points : numpy.ndarray, optional
Array of frequencies for computing the waveform
length : int
Length parameter passed to BBHx. Must be specified if
:code:`direct=False`. See BBHx documentation for more details.
direct : bool
See BBHx documentation.
num_interp : int
Number of interpolation points used for computing chirp time.
interp_f_lower : float
Lower frequency cutoff used for interpolation when computing the
chirp time.
Returns
-------
dict
A dictionary containing the the waveforms for each interferometer.
"""

if ifos is None:
raise Exception("Must define data streams to compute")
Expand All @@ -79,9 +167,13 @@ def bbhx_fd(ifos=None, run_phenomd=True, tdi=None,
t_offset = np.float64(params['t_offset']) # in seconds
else:
raise Exception("Must set `t_offset`, if you don't have a preferred value, \
please set it to be the default value %f, which will put LISA behind \
the Earth by ~20 degrees." % TIME_OFFSET_20_DEGREES)
please set it to be the default value %f, which will put LISA behind \
the Earth by ~20 degrees." % TIME_OFFSET_20_DEGREES)
t_obs_start = np.float64(params['t_obs_start']) # in seconds
mode_array = list(params["mode_array"])
num_interp = int(num_interp)
length = int(length) if length is not None else None
interp_f_lower = float(interp_f_lower)

if ref_frame == 'LISA':
t_ref_lisa = np.float64(params['tc']) + t_offset
Expand Down Expand Up @@ -114,32 +206,53 @@ def bbhx_fd(ifos=None, run_phenomd=True, tdi=None,
err_msg = f"Don't recognise reference frame {ref_frame}. "
err_msg = f"Known frames are 'LISA' and 'SSB'."

# We follow the convention used in LAL and set the frequency based on the
# highest m mode. This means that lower m modes will start at later times.
max_m_mode = max([mode[1] for mode in mode_array])
if ('f_lower' not in params) or (params['f_lower'] < 0):
# the default value of 'f_lower' in PyCBC is -1.
tf_track = interpolated_tf(m1, m2)
t_max = chirptime(m1=m1, m2=m2, f_lower=1e-4)
t_max = chirptime(
m1=m1, m2=m2, f_lower=interp_f_lower, m_mode=max_m_mode
)
if t_obs_start > t_max:
# Avoid "above the interpolation range" issue.
f_min = 1e-4
f_min = interp_f_lower
else:
tf_track = interpolated_tf(
m1,
m2,
m_mode=max_m_mode,
num_interp=num_interp,
f_lower=interp_f_lower,
)
f_min = tf_track(t_obs_start) # in Hz
else:
f_min = np.float64(params['f_lower']) # in Hz
tf_track = interpolated_tf(m1, m2)
t_max = chirptime(m1=m1, m2=m2, f_lower=1e-4)
t_max = chirptime(
m1=m1, m2=m2, f_lower=interp_f_lower, m_mode=max_m_mode
)
if t_obs_start > t_max:
f_min_tobs = 1e-4
f_min_tobs = interp_f_lower
else:
tf_track = interpolated_tf(
m1,
m2,
m_mode=max_m_mode,
num_interp=num_interp,
f_lower=interp_f_lower,
)
f_min_tobs = tf_track(t_obs_start) # in Hz
if f_min < f_min_tobs:
err_msg = f"Input 'f_lower' is lower than the value calculated from 't_obs_start'."
warn(err_msg, RuntimeWarning)

# We want to cache the waveform generator, but as it takes a mass dependent
# start frequency as input this is hard.
# To solve this we *round* the *logarithm* of this mass-dependent start
# frequency. The factor of 25 ensures reasonable spacing while doing this.
# So we round down to the nearest 1/25 of the logarithm of the frequency
log_mf_min = int(math.log(f_min*MTSUN_SI*(m1+m2)) * 25)

wave_gen = get_waveform_genner(log_mf_min, run_phenomd=run_phenomd)

if sample_points is None:
Expand All @@ -160,21 +273,35 @@ def bbhx_fd(ifos=None, run_phenomd=True, tdi=None,
else:
freqs = sample_points

modes = [(2,2)] # More modes if not phenomd
direct = False # See the BBHX documentation
# If creating injection of many modes, or just single, compress = True
# will do the same thing.
compress = True # If True, combine harmonics into single channel waveforms. (Default: True)
fill = True # See the BBHX documentation
squeeze = True # See the BBHX documentation
length = 1024 # An internal generation parameter, not an output parameter
shift_t_limits = False # Times are relative to merger
t_obs_end = 0.0 # Generates ringdown as well!

wave = wave_gen(m1, m2, a1, a2,
dist, phi_ref, f_ref, inc, lam,
beta, psi, t_ref, freqs=freqs,
modes=modes, direct=direct, fill=fill, squeeze=squeeze,
length=length, t_obs_start=t_obs_start/YRSID_SI,
t_obs_end=t_obs_end,
shift_t_limits=shift_t_limits)[0]
# NOTE: This does not allow for the separation of multiple modes into
# their own streams. All modes requested are combined into one stream.
wave = wave_gen(
m1, m2, a1, a2,
dist, phi_ref, f_ref, inc, lam,
beta, psi, t_ref,
freqs=freqs,
modes=mode_array,
direct=direct,
fill=fill,
squeeze=squeeze,
t_obs_start=t_obs_start / YRSID_SI,
t_obs_end=t_obs_end,
compress=compress,
length=length,
shift_t_limits=shift_t_limits,
)
# For some reason, the shape is different depending on if direct is True
# or False.
if not direct:
wave = wave[0]

wanted = {}

Expand Down
19 changes: 15 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,21 @@
download_url = 'https://github.com/gwastro/BBHX-waveform-model/v%s' % VERSION,
keywords = ['pycbc', 'signal processing', 'gravitational waves', 'lisa'],
install_requires = ['pycbc'],
py_modules = ['BBHX_PhenomD'],
entry_points = {"pycbc.waveform.fd_det":"BBHX_PhenomD=BBHX_PhenomD:bbhx_fd",
"pycbc.waveform.fd_det_sequence":"BBHX_PhenomD=BBHX_PhenomD:bbhx_fd",
"pycbc.waveform.length":"BBHX_PhenomD=BBHX_PhenomD:imr_duration"},
py_modules = ['BBHX_Phenom'],
entry_points = {
"pycbc.waveform.fd_det":[
"BBHX_PhenomD=BBHX_Phenom:waveform_setup",
"BBHX_PhenomHM=BBHX_Phenom:waveform_setup",
],
"pycbc.waveform.fd_det_sequence": [
"BBHX_PhenomD=BBHX_Phenom:waveform_setup",
"BBHX_PhenomHM=BBHX_Phenom:waveform_setup",
],
"pycbc.waveform.length": [
"BBHX_PhenomD=BBHX_Phenom:imr_duration",
"BBHX_PhenomHM=BBHX_Phenom:imr_duration",
]
},

classifiers=[
'Programming Language :: Python',
Expand Down
56 changes: 48 additions & 8 deletions tests.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,30 @@
import numpy as np
from pycbc.waveform import get_fd_det_waveform
from pycbc.waveform import get_fd_det_waveform, get_fd_det_waveform_sequence
import pytest


@pytest.mark.parametrize("ref_frame", ["SSB", "LISA"])
@pytest.mark.parametrize("tdi", ["1.5", "2.0"])
def test_get_fd_det_waveform(tdi, ref_frame):
@pytest.fixture(params=["BBHX_PhenomD", "BBHX_PhenomHM"])
def approximant(request):
return request.param


@pytest.fixture(params=["LISA", "SSB"])
def ref_frame(request):
return request.param


@pytest.fixture(params=["1.5", "2.0"])
def tdi(request):
return request.param


@pytest.fixture()
def params():
params = {}
params["tdi"] = tdi
params["ref_frame"] = ref_frame
params["approximant"] = "BBHX_PhenomD"
params["approximant"] = "BBHX_PhenomHM"
params["tdi"] = 1.5
params["ref_frame"] = "LISA"
params["ifos"] = ["LISA_A", "LISA_E", "LISA_T"]
params["coa_phase"] = 0.0
params["mass1"] = 1e6
params["mass2"] = 8e5
Expand All @@ -28,7 +43,32 @@ def test_get_fd_det_waveform(tdi, ref_frame):
params["eclipticlongitude"] = 0.5
params["eclipticlatitude"] = 0.23
params["polarization"] = 0.1
wf = get_fd_det_waveform(ifos=["LISA_A", "LISA_E", "LISA_T"], **params)
return params


def test_get_fd_det_waveform(params, ref_frame, approximant, tdi):
params["tdi"] = tdi
params["ref_frame"] = ref_frame
params["approximant"] = approximant
wf = get_fd_det_waveform(**params)
# Check all 3 ifos are returned
assert len(wf) == 3


def test_get_fd_det_waveform_sequence(params, approximant, tdi):
params["ifos"] = "LISA_A"
params["tdi"] = tdi
params["approximant"] = approximant
freqs = np.array([1-4, 1e-3, 1e-2])
wf = get_fd_det_waveform_sequence(sample_points=freqs, **params)
# Check all 3 ifos are returned
assert len(wf) == 1
assert len(wf["LISA_A"]) == len(freqs)


@pytest.mark.parametrize("mode_array", [None, [(3, 3)], [(2, 2), (3, 3)]])
def test_phenomhm_mode_array(params, mode_array):
params["approximant"] = "BBHX_PhenomHM"
params["mode_array"] = mode_array
wf = get_fd_det_waveform(**params)
assert len(wf) == 3

0 comments on commit d4702cf

Please sign in to comment.