"""
Calculation of sensor response and corner frequencies, with optional plotting.
"""
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from matplotlib.ticker import Formatter
from obspy.core.inventory import PolesZerosResponseStage
from saul.spectral.helpers import _FREQ_TEMPLATE
from saul.waveform.units import _VALID_UNIT_OPTIONS
# [Hz] Minimum frequency for response computation (playing it safe here by going much
# lower than the lowest expected corner of 360 s)
_MIN_FREQ = 1 / 500
# [dB] The "CORNER_DB_REF dB point", e.g. "–3 dB point" — determines where to measure
# the corner frequency
_CORNER_DB_REF = -3
# [dB] Tolerance for corner frequency search; if the derived dB value at the corner
# frequency is not within this tolerance of `_CORNER_DB_REF` an error is raised
_DB_TOL = 0.01
def _compute_sensor_response(response, sampling_rate, min_freq):
"""Compute instrument (sensor only!) response using a nicely padded FFT."""
nfft = 2 ** (int(np.ceil(np.log2(sampling_rate / min_freq))) + 8) # TODO: Padding
cpx_response, freqs = response.get_evalresp_response(
t_samp=1 / sampling_rate,
nfft=nfft,
output='DEF',
end_stage=1, # Includes only stage sequence number 1
hide_sensitivity_mismatch_warning=True, # Since we're skipping some stages
)
return cpx_response, freqs
def _compute_db_relative_to_ref(cpx_response, freqs, ref_freq):
"""Compute response in dB relative to sensor sensitivity reference frequency."""
abs_response = np.abs(cpx_response)
abs_response[abs_response == 0] = np.nan # Avoid log10(0)
ref_value = abs_response[np.argmin(np.abs(freqs - ref_freq))]
db_response = 20 * np.log10(abs_response / ref_value)
return db_response
[docs]
def calculate_responses(inventory, sampling_rate=10, plot=False):
"""Calculate sensor responses and corner frequencies from an ObsPy inventory.
Args:
inventory (:class:`~obspy.core.inventory.inventory.Inventory`): ObsPy inventory
object containing station metadata, including response information. This
means the inventory should have been fetched with the ``level='response'``
option!
sampling_rate (int or float): Sampling rate for response computation in Hz. This
must be high enough such that the Nyquist frequency is above the reference
frequency ("stage_gain_frequency") of the sensors in the input inventory.
plot (bool): If True, plot the responses and corner frequencies.
Returns:
:class:`~pandas.DataFrame`: Table of sensor and corner frequency information,
with columns ``network``, ``station``, ``location``, ``channel``,
``sensor_info``, and ``corner_frequency``
"""
# Set up lists to store key info for the DataFrame
networks, stations, locations, channels = [], [], [], []
sensor_infos = []
corner_frequencies = []
# Plot, if requested
if plot:
fig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)
zorder = 10 # Keep track of zorder so we can pair lines with dots
# Iterate over the inventory
print('Calculating responses...')
for network in inventory:
if len(network.stations) == 0:
continue # No stations in this network
for station in network:
if len(station.channels) == 0:
continue # No channels for this station
# Handle multiple location codes for a single station, which implies
# multiple sensors
unique_location_codes = sorted(
set(channel.location_code for channel in station)
)
for location_code in unique_location_codes:
location = station.select(location=location_code)
# Use double dash for empty location codes
location_code_str = '--' if location_code == '' else location_code
# Get nice regex for channel codes
channel_letters = np.array(
[list(channel.code) for channel in location.channels]
)
channel_code = ''
for column in channel_letters.T:
if len(set(column)) == 1: # All the same letter
channel_code += column[0]
else:
channel_code += '?' # Different letters, use '?' wildcard
# Form trace ID
tr_id = (
f'{network.code}.{station.code}.{location_code_str}.{channel_code}'
)
# Gather responses for this location
_responses = [channel.response for channel in location.channels]
# Are all responses present for this location?
empty_responses = [
len(_response.response_stages) == 0 for _response in _responses
]
if any(empty_responses):
continue # Incomplete or fully absent responses for this location
# Check that all sensor responses are identical for this location
_sensor_stages = [
_response.response_stages[0] for _response in _responses
]
if not all(
isinstance(_sensor_stage, PolesZerosResponseStage)
for _sensor_stage in _sensor_stages
):
plt.close(fig) if plot else None
raise NotImplementedError(
'Only PolesZerosResponseStage objects are supported!'
)
for _sensor_stage in _sensor_stages[1:]:
same_poles = _sensor_stage.poles == _sensor_stages[0].poles
same_zeros = _sensor_stage.zeros == _sensor_stages[0].zeros
same_gain = _sensor_stage.stage_gain == _sensor_stages[0].stage_gain
same_frequency = (
_sensor_stage.stage_gain_frequency
== _sensor_stages[0].stage_gain_frequency
)
if not (same_poles and same_zeros and same_gain and same_frequency):
plt.close(fig) if plot else None
raise ValueError(
f'Multiple sensor responses found:\n'
f'{tr_id} | {location.start_date} – {location.end_date}'
)
# Given the above check passed the first channel encountered at this
# location is considered representative of the sensor
channel_sensor = location.channels[0]
# Store some metadata
networks.append(network.code)
stations.append(station.code)
locations.append(location_code_str)
channels.append(channel_code)
# KEY: Sensor information, which can provide clues on response & corners
# (we check the following fields, in order of preference: "model",
# "type", "description")
sensor_info = (
channel_sensor.sensor.model
or channel_sensor.sensor.type
or channel_sensor.sensor.description
)
sensor_infos.append('' if sensor_info is None else sensor_info)
# Check the sensor response stage
sensor_stage = channel_sensor.response.response_stages[0]
input_valid = sensor_stage.input_units.lower() in _VALID_UNIT_OPTIONS
output_valid = sensor_stage.output_units.upper() == 'V'
if not (input_valid and output_valid):
plt.close(fig) if plot else None
raise ValueError(f'Invalid sensor response stage: {tr_id}')
ref_freq = sensor_stage.stage_gain_frequency # [Hz] # TODO: Correct?
nyquist = sampling_rate / 2
if ref_freq > nyquist:
plt.close(fig) if plot else None
raise ValueError(
f'Provided sampling rate Nyquist frequency ({nyquist:g} Hz) '
f'is less than the sensor response frequency ({ref_freq:g} Hz): '
f'{tr_id}\n'
f'💡 Try increasing the provided `sampling_rate`!'
)
# Calculate the response
cpx_response, freqs = _compute_sensor_response(
channel_sensor.response, sampling_rate, _MIN_FREQ
)
db_response = _compute_db_relative_to_ref(cpx_response, freqs, ref_freq)
# Find frequency of corner
mask = freqs <= ref_freq # We only look below the reference frequency
db_response_lower = db_response[mask]
freqs_lower = freqs[mask]
corner_db_ref_idx = np.nanargmin(
np.abs(db_response_lower - _CORNER_DB_REF)
)
corner_db_ref_freq = freqs_lower[corner_db_ref_idx]
corner_db_ref_value = db_response_lower[corner_db_ref_idx]
if abs(_CORNER_DB_REF - corner_db_ref_value) > _DB_TOL:
plt.close(fig) if plot else None
raise ValueError(f'Corner frequency not found within tolerance!')
corner_frequencies.append(corner_db_ref_freq)
# Optional plotting
if plot:
ax1.semilogx(freqs, db_response, zorder=zorder)
ax2.semilogx(
freqs,
np.angle(cpx_response, deg=True),
zorder=zorder,
label=tr_id,
)
ax1.scatter(
corner_db_ref_freq, corner_db_ref_value, zorder=zorder + 1
)
zorder += 2 # Increment zorder so line/point pairs are stacked
print('Done')
# Make DataFrame with results
df = pd.DataFrame(
dict(
network=networks,
station=stations,
location=locations,
channel=channels,
sensor_info=sensor_infos,
corner_frequency=corner_frequencies,
)
)
# Optionally finish the plot
if plot:
if df.empty:
plt.close(fig)
else:
yticks1 = [-20, -10, -6, -3, 0] # [dB]
ax1.set_ylim(yticks1[0], yticks1[-1])
ax1.set_yticks(yticks1)
ax2.set_ylim(-180, 180)
ax2.yaxis.set_major_locator(plt.MultipleLocator(90))
ax2.yaxis.set_minor_locator(plt.MultipleLocator(30))
ax2.set_xlim(_MIN_FREQ, nyquist)
ax1.set_ylabel('Amplitude\n(dB re. val. @ ref. freq.)')
ax2.set_ylabel('Phase (°)')
ax2.set_xlabel('Frequency (Hz)')
for ax in ax1, ax2:
ax.grid(ls=':')
ax.set_axisbelow(True)
legend = fig.legend(draggable=True)
for text in legend.get_texts():
text.set_family('monospace')
# These secondary axes show period
_ax1 = ax1.twiny()
_ax2 = ax2.twiny()
for _ax in _ax1, _ax2:
_ax.set_xscale('log')
_ax.set_xlim(1 / _MIN_FREQ, 1 / nyquist)
_ax1.set_xlabel('Period (s)', labelpad=10)
_ax2.tick_params(labeltop=False)
_ax1.format_coord = lambda x, y: Formatter.fix_minus(
f'({_FREQ_TEMPLATE.format(1 / x, x)}, {y:.1f} dB)'
)
_ax2.format_coord = lambda x, y: Formatter.fix_minus(
f'({_FREQ_TEMPLATE.format(1 / x, x)}, {y:.1f}°)'
)
fig.tight_layout()
fig.show()
# Warn if DataFrame is empty
if df.empty:
print('No responses found in the inventory!')
# Return
return df