import math
import numpy as np
import obspy
from obspy import ObsPyException
from obspy.clients.fdsn import Client as FDSNClient
from obspy.clients.fdsn.client import FDSNException, FDSNNoDataException
from obspy.io.mseed import ObsPyMSEEDError
import seisbench
[docs]
def trace_has_spikes(data, factor=25, quantile=0.975):
"""
Checks for bit flip errors in the data using a simple quantile rule
:param data: Data array
:type data: np.ndarray
:param factor: Maximum allowed factor between peak and quantile
:type factor: float
:param quantile: Quantile to check. Must be between 0 and 1.
:type quantile: float
"""
q = np.quantile(np.abs(data), quantile, axis=1, keepdims=True)
return np.any(data > q * factor)
[docs]
def stream_to_array(stream, component_order):
"""
Converts stream of single station waveforms into a numpy array according to a given component order.
If trace start and end times disagree between component traces, remaining parts are filled with zeros.
Also returns completeness, i.e., the fraction of samples in the output that actually contain data.
Assumes all traces to have the same sampling rate.
:param stream: Stream to convert
:type stream: obspy.Stream
:param component_order: Component order
:type component_order: str
:return: starttime, data, completeness
:rtype: UTCDateTime, np.ndarray, float
"""
starttime = min(trace.stats.starttime for trace in stream)
endtime = max(trace.stats.endtime for trace in stream)
sampling_rate = stream[0].stats.sampling_rate
samples = int((endtime - starttime) * sampling_rate) + 1
completeness = 0.0
data = np.zeros((len(component_order), samples), dtype="float64")
for c_idx, c in enumerate(component_order):
c_stream = stream.select(channel=f"??{c}")
if len(c_stream) > 1:
# If multiple traces are found, issue a warning and write them into the data ordered by their length
seisbench.logger.warning(
f"Found multiple traces for {c_stream[0].id} starting at {stream[0].stats.starttime}. "
f"Completeness will be wrong in case of overlapping traces."
)
c_stream = sorted(c_stream, key=lambda x: x.stats.npts)
c_completeness = 0.0
for trace in c_stream:
start_sample = int((trace.stats.starttime - starttime) * sampling_rate)
tr_length = min(len(trace.data), samples - start_sample)
data[c_idx, start_sample : start_sample + tr_length] = trace.data[
:tr_length
]
c_completeness += tr_length
completeness += min(1.0, c_completeness / samples)
data -= np.mean(data, axis=1, keepdims=True)
completeness /= len(component_order)
return starttime, data, completeness
[docs]
def rotate_stream_to_zne(stream, inventory):
"""
Tries to rotate the stream to ZNE inplace. There are several possible failures, which are silently ignored.
:param stream: Stream to rotate
:type stream: obspy.Stream
:param inventory: Inventory object
:type inventory: obspy.Inventory
"""
try:
stream.rotate("->ZNE", inventory=inventory)
except ValueError:
pass
except NotImplementedError:
pass
except AttributeError:
pass
except ObsPyException:
pass
except Exception:
# Required, because obspy throws a plain Exception for missing channel metadata
pass
[docs]
def fdsn_get_bulk_safe(client: FDSNClient, bulk: list[tuple]) -> obspy.Stream:
"""
A wrapper around obspy's get_waveforms_bulk that does error handling
and tries to download as much data as possible.
:param client: An obspy FDSN client
:param bulk: A bulk request as for get_waveforms_bulk
"""
return _fdsn_get_bulk_safe(client, bulk, failed=False)
def _fdsn_get_bulk_safe(
client: FDSNClient, bulk: list[tuple], failed: bool = False
) -> obspy.Stream:
"""
Internal wrapper to hide the failed parameter from the user.
"""
if len(bulk) == 0 or (failed and len(bulk) == 1):
return obspy.Stream()
new_failed = False
if len(bulk) == 1:
new_failed = True
try:
return client.get_waveforms_bulk(bulk)
except FDSNNoDataException:
return obspy.Stream()
except (FDSNException, ObsPyMSEEDError, Exception):
# Send off requests for each half to isolate the error
m = len(bulk) // 2
return _fdsn_get_bulk_safe(client, bulk[:m], new_failed) + _fdsn_get_bulk_safe(
client, bulk[m:], new_failed
)
def _round_py2(x: float) -> float:
if x > 0:
return float(math.floor((x) + 0.5))
else:
return float(math.ceil((x) - 0.5))
[docs]
def stream_slice(
stream: obspy.Stream,
starttime: obspy.UTCDateTime,
endtime: obspy.UTCDateTime,
) -> obspy.Stream:
"""Slice an ObsPy Stream object between starttime and endtime.
This is a lightweight implementation of `obspy.Stream.slice` that ensures with
default paramters `Stream.slice(..., keep_empty_traces=False, nearest_sample=True)`.
The performance gain of ~7x is achieved by avoiding deepcopies of traces and
combining `Trace._ltrim` and `Trace._rtrim` into a single operation.
Difference for edge cases compared to `Stream.slice` is that times are rounded
with Python3 rounding (round half to even) instead of ObsPy's custom rounding
(round half away from zero). This can lead to differences of 1 sample in edge
cases.
:param stream: The input ObsPy Stream.
:type stream: obspy.Stream
:param starttime: The start time for slicing.
:type starttime: obspy.UTCDateTime
:param endtime: The end time for slicing.
:type endtime: obspy.UTCDateTime
:return: The sliced ObsPy Stream.
:rtype: obspy.Stream
"""
if len(stream) == 0:
raise ValueError("Stream is empty and cannot be sliced.")
if starttime >= endtime:
raise ValueError("starttime must be earlier than endtime.")
first = stream[0].stats
# select start/end time fitting to a sample point on the first trace
delta = _round_py2((starttime - first.starttime) * first.sampling_rate)
starttime = first.starttime + delta * first.delta
delta = _round_py2((endtime - first.endtime) * first.sampling_rate)
endtime = first.endtime + delta * first.delta # delta is negative!
out = obspy.Stream()
for tr in stream:
stats = tr.stats
sampling_rate = stats.sampling_rate
samples_start = int(_round_py2((starttime - stats.starttime) * sampling_rate))
samples_start = max(samples_start, 0)
if samples_start >= stats.npts - 1:
continue
slice_starttime = stats.starttime + samples_start * stats.delta
samples_end = (
int(_round_py2((endtime - slice_starttime) * sampling_rate))
+ samples_start
+ 1
)
samples_end = min(samples_end, stats.npts)
n_samples = samples_end - samples_start
if n_samples <= 0:
continue
sliced_trace = obspy.Trace(
tr.data[samples_start:samples_end],
{
"network": stats.network,
"station": stats.station,
"location": stats.location,
"channel": stats.channel,
"starttime": slice_starttime,
"sampling_rate": sampling_rate,
"t_offset": stats.get("t_offset", 0.0),
},
)
out.append(sliced_trace)
return out