"""
Contains helper functions and constants for tasks related to spectral estimation and
filtering.
"""
import inspect
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.ticker import Formatter
from obspy.core.util.base import _get_function_from_entry_point
from obspy.signal.filter import lowpass_cheby_2
from scipy.signal import freqz_zpk, iirfilter, tf2zpk
# Reference values for PSD dB units
REFERENCE_SEISMIC = 1 # [m, m/s, or m/s**2]
REFERENCE_INFRASOUND = 20e-6 # [Pa]
# Minimum number of cycles a wave must make within a given time window in order to be
# considered "resolvable"
CYCLES_PER_WINDOW = 4
# This file downloaded from the supplementary material of Macpherson et al. (2022)
AK_INFRA_NOISE = Path(__file__).with_name('alaska_ambient_infrasound_noise_models.txt')
# Common cursor data formatting template for frequencies and periods
_FREQ_TEMPLATE = '{:.3g} Hz / {:.2f} s'
[docs]
def get_ak_infra_noise():
"""Returns the Alaska ambient infrasound noise models from Macpherson et al. (2022).
Macpherson, K. A., Coffey, J. R., Witsil, A. J., Fee, D., Holtkamp, S., Dalton,
S., McFarlin, H., & West, M. (2022). Ambient infrasound noise, station
performance, and their relation to land cover across Alaska. *Seismological
Research Letters*, *93*\ (4), 2239–2258. https://doi.org/10.1785/0220210365
Usage example:
.. code-block:: python
from saul import get_ak_infra_noise
p, hnm, mnm, lnm = get_ak_infra_noise()
Returns:
:class:`tuple`: Period [s], high noise model [dB rel. 1 Pa\ :sup:`2` Hz\ :sup:`–1`],
median noise model [dB rel. 1 Pa\ :sup:`2` Hz\ :sup:`–1`], low noise model [dB
rel. 1 Pa\ :sup:`2` Hz\ :sup:`–1`]
"""
f, hnm, mnm, lnm = np.loadtxt(AK_INFRA_NOISE, delimiter=',', skiprows=12).T
# We convert frequency to period to match the ObsPy functions
return 1 / f, hnm, mnm, lnm
[docs]
def obspy_filter_response(
filter_type, sampling_rate, freqs=4096, plot=False, **options
):
"""Calculate the frequency response of an ObsPy filter.
Based on ObsPy 1.4.1.
Args:
filter_type (str): Type of filter to use. Note that not all of ObsPy's filter
types are supported; see the match statement in the code
sampling_rate (int or float): Sampling rate of target data
freqs (int or array_like): Passed on as ``worN`` argument to
:func:`scipy.signal.freqz_zpk` — if an array, the response will be computed
at these frequencies
plot (bool): Whether to plot the frequency response
**options: Necessary keyword arguments that will be passed on to the respective
filter function (e.g., ``freqmin=1``, ``freqmax=5`` for
``filter_type='bandpass'``)
Returns:
:class:`tuple`: Array of frequencies at which the response was computed [Hz],
frequency response [dB]
"""
df = sampling_rate # Rename so that code below resembles ObsPy more closely
match filter_type:
case 'bandpass':
defaults = _get_defaults_for_filter_func(filter_type)
corners = options.get('corners', defaults['corners'])
zerophase = options.get('zerophase', defaults['zerophase'])
fe = 0.5 * df
low = options['freqmin'] / fe
high = options['freqmax'] / fe
effective_corners = corners * 2 if zerophase else corners
z, p, k = iirfilter(
effective_corners,
[low, high],
btype='band',
ftype='butter',
output='zpk',
)
case 'highpass' | 'lowpass':
defaults = _get_defaults_for_filter_func(filter_type)
corners = options.get('corners', defaults['corners'])
zerophase = options.get('zerophase', defaults['zerophase'])
fe = 0.5 * df
f = options['freq'] / fe
effective_corners = corners * 2 if zerophase else corners
z, p, k = iirfilter(
effective_corners, f, btype=filter_type, ftype='butter', output='zpk'
)
case 'lowpass_cheby_2':
defaults = _get_defaults_for_filter_func(filter_type)
maxorder = options.get('maxorder', defaults['maxorder'])
b, a = lowpass_cheby_2(
None, options['freq'], df, maxorder=maxorder, ba=True
)
z, p, k = tf2zpk(b, a)
case _:
raise NotImplementedError(f'Filter type "{filter_type}" is not supported.')
f, h = freqz_zpk(z, p, k, worN=freqs, fs=df)
# If the user didn't supply specific frequencies, we remove the DC component
if isinstance(freqs, (int, float)):
f = f[1:]
h = h[1:]
h_db = 20 * np.log10(abs(h))
if plot:
fig, ax = plt.subplots(figsize=(4, 4))
ax.semilogx(f, h_db)
match filter_type:
case 'bandpass':
x1, x2 = options['freqmin'], options['freqmax']
ax.scatter([x1, x2], [-3, -3])
case 'highpass':
x1, x2 = options['freq'], f.max()
ax.scatter(x1, -3)
case 'lowpass' | 'lowpass_cheby_2':
x1, x2 = f.min(), options['freq']
if filter_type != 'lowpass_cheby_2':
ax.scatter(x2, -3)
ax.axvspan(x1, x2, color='tab:gray', alpha=0.5, lw=0, zorder=-5)
ax.grid(which='both', ls=':', color='tab:gray')
ax.set_axisbelow(True)
left = f.min() if x1 == f.min() else x1 / 2
right = f.max() if x2 == f.max() else x2 * 2
ax.set_xlim(left, right)
if filter_type == 'lowpass_cheby_2':
ax.set_ylim(-100, 5)
ax.set_yticks([0, -20, -40, -60, -80, -96])
else:
ax.set_ylim(-20, 1)
ax.set_yticks([0, -1, -3, -6, -10, -20])
ax.set_xlabel('Frequency (Hz)')
ax.set_ylabel('Gain (dB)')
ax.format_coord = lambda x, y: Formatter.fix_minus(
f'({_FREQ_TEMPLATE.format(x, 1 / x)}, {y:.1f} dB)'
)
fig.tight_layout()
fig.show()
return f, h_db
def _get_defaults_for_filter_func(filter_type):
"""Generate dictionary of default parameters for an ObsPy filter function."""
func = _get_function_from_entry_point('filter', filter_type)
# https://stackoverflow.com/a/12627202
defaults = {
k: v.default
for k, v in inspect.signature(func).parameters.items()
if v.default is not inspect.Parameter.empty
}
return defaults
def _format_power_label(db_ref_val, waveform_units):
"""Format the axis / colorbar label for spectral power quantities."""
match waveform_units:
case 'pa':
assert db_ref_val == REFERENCE_INFRASOUND
# Convert Pa to µPa
return rf'Power (dB rel. [{db_ref_val * 1e6:g} μPa]$\mathdefault{{^2}}$ Hz$\mathdefault{{^{{-1}}}}$)'
# All of these use formatting that relies on 1^2 = 1, so we assert that here!
case 'm':
assert db_ref_val == REFERENCE_SEISMIC == 1
return rf'Power (dB rel. {db_ref_val:g} m$\mathdefault{{^2}}$ Hz$\mathdefault{{^{{-1}}}}$)'
case 'm/s':
assert db_ref_val == REFERENCE_SEISMIC == 1
return rf'Power (dB rel. {db_ref_val:g} [m s$\mathdefault{{^{{-1}}}}$]$\mathdefault{{^2}}$ Hz$\mathdefault{{^{{-1}}}}$)'
case 'm/s**2':
assert db_ref_val == REFERENCE_SEISMIC == 1
return rf'Power (dB rel. {db_ref_val:g} [m s$\mathdefault{{^{{-2}}}}$]$\mathdefault{{^2}}$ Hz$\mathdefault{{^{{-1}}}}$)'
case None:
assert db_ref_val is None # Reference value is meaningless w/o units!
return r'Power (dB rel. max. Hz$\mathdefault{{^{{-1}}}}$)'
case _:
raise ValueError(f'Invalid units: {waveform_units}')
def _get_db_reference_value(data_kind):
"""Get the reference value for spectral power quantities in dB."""
match data_kind:
case 'infrasound':
return REFERENCE_INFRASOUND
case 'seismic':
return REFERENCE_SEISMIC
case _:
raise ValueError(f'Unknown data kind: {data_kind}')