-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
a958047
commit 79bd6d0
Showing
2 changed files
with
108 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
|
||
# -*- coding: utf8 -*- | ||
# SPDX-License-Identifier: CECILL-2.1 | ||
""" | ||
Processing utilities for SourceSpec. | ||
:copyright: | ||
2013-2024 Claudio Satriano <[email protected]> | ||
:license: | ||
CeCILL Free Software License Agreement v2.1 | ||
(http://www.cecill.info/licences.en.html) | ||
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
# -*- coding: utf-8 -*- | ||
# SPDX-License-Identifier: CECILL-2.1 | ||
""" | ||
Signal processing utilities. | ||
:copyright: | ||
2012-2024 Claudio Satriano <[email protected]> | ||
:license: | ||
CeCILL Free Software License Agreement v2.1 | ||
(http://www.cecill.info/licences.en.html) | ||
""" | ||
import logging | ||
import numpy as np | ||
from obspy.signal.invsim import cosine_taper as _cos_taper | ||
logger = logging.getLogger(__name__.rsplit('.', maxsplit=1)[-1]) | ||
|
||
|
||
def cosine_taper(signal, width, left_taper=False): | ||
"""Apply a cosine taper to the signal.""" | ||
# TODO: this taper looks more like a hanning... | ||
npts = len(signal) | ||
p = 2 * width | ||
tap = _cos_taper(npts, p) | ||
if left_taper: | ||
tap[npts // 2:] = 1. | ||
signal *= tap | ||
|
||
|
||
# modified from: http://stackoverflow.com/q/5515720 | ||
def smooth(signal, window_len=11, window='hanning'): | ||
"""Smooth the signal using a window with requested size.""" | ||
if signal.ndim != 1: | ||
raise ValueError('smooth only accepts 1 dimension arrays.') | ||
if signal.size < window_len: | ||
raise ValueError('Input vector needs to be bigger than window size.') | ||
if window_len < 3: | ||
return signal | ||
if window not in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']: | ||
raise ValueError("Window is one of 'flat', 'hanning', 'hamming', " | ||
"'bartlett', 'blackman'") | ||
s = np.r_[ | ||
2 * signal[0] - signal[window_len - 1::-1], | ||
signal, | ||
2 * signal[-1] - signal[-1:-window_len:-1] | ||
] | ||
if window == 'flat': # moving average | ||
w = np.ones(window_len, 'd') | ||
else: | ||
w = eval(f'np.{window}(window_len)') # pylint: disable=eval-used | ||
y = np.convolve(w / w.sum(), s, mode='same') | ||
yy = y[window_len:-window_len + 1] | ||
# check if there are NaN values | ||
nanindexes = np.where(np.isnan(yy)) | ||
yy[nanindexes] = signal[nanindexes] | ||
return yy | ||
|
||
|
||
def remove_instr_response(trace, pre_filt=(0.5, 0.6, 40., 45.)): | ||
""" | ||
Remove instrument response from a trace. | ||
Trace is converted to the sensor units (m for a displacement sensor, | ||
m/s for a short period or broadband velocity sensor, m/s**2 for a strong | ||
motion sensor). | ||
:param trace: Trace to be corrected. | ||
:type trace: :class:`~obspy.core.trace.Trace` | ||
:param pre_filt: Pre-filter frequencies (``None`` means no pre-filtering). | ||
:type pre_filt: tuple of four floats | ||
""" | ||
trace_info = trace.stats.info | ||
inventory = trace.stats.inventory | ||
if not inventory: | ||
# empty inventory | ||
raise RuntimeError(f'{trace_info}: no instrument response for trace') | ||
# remove the mean... | ||
trace.detrend(type='constant') | ||
# ...and the linear trend | ||
trace.detrend(type='linear') | ||
# Define output units based on nominal units in inventory | ||
# Note: ObsPy >= 1.3.0 supports the 'DEF' output unit, which will make | ||
# this step unnecessary | ||
if trace.stats.units.lower() == 'm': | ||
output = 'DISP' | ||
if trace.stats.units.lower() == 'm/s': | ||
output = 'VEL' | ||
if trace.stats.units.lower() == 'm/s**2': | ||
output = 'ACC' | ||
# Finally remove instrument response, | ||
# trace is converted to the sensor units | ||
trace.remove_response( | ||
inventory=inventory, output=output, pre_filt=pre_filt) | ||
if any(np.isnan(trace.data)): | ||
raise RuntimeError( | ||
f'{trace_info}: NaN values in trace after ' | ||
'instrument response removal') |