Source code for seisbench.models.depthphase

import pickle
from collections import defaultdict
from pathlib import Path
from typing import Any, Optional

import numpy as np
import obspy
import scipy.stats
import torch
from obspy import UTCDateTime
from obspy.clients.fdsn import Client
from obspy.geodetics import locations2degrees
from obspy.taup import TauPyModel
from tqdm import tqdm

import seisbench
import seisbench.util as sbu

from .base import WaveformModel
from .phasenet import PhaseNet
from .team import PhaseTEAM


[docs] class DepthPhaseModel: """ Helper class implementing all tools for determining depth from depth phases :param time_before: Time included before the P pick in seconds :param depth_levels: Array of depth levels to search for :param tt_args: Arguments for the :py:class:`TTLookup` :param qc_std: Maximum standard deviation to pass quality control. If None, no quality control is applied. :param qc_depth: Quality control is only applied to predictions shallower than this depth. If None, quality control is applied to all depth levels. """ def __init__( self, time_before: float = 12.5, depth_levels: Optional[np.ndarray] = None, tt_args: Optional[dict[str, Any]] = None, qc_std: Optional[float] = None, qc_depth: Optional[float] = None, ) -> None: self.time_before = time_before if tt_args is None: tt_args = {} self._ttlookup = TTLookup(**tt_args) self.qc_std = qc_std self.qc_depth = qc_depth if depth_levels is None: self.depth_levels = np.linspace(0, 650, 651) else: self.depth_levels = depth_levels def _prepare_classify_args( self, p_picks: dict[str, UTCDateTime], distances: Optional[dict[str, float]], inventory: Optional[obspy.Inventory], epicenter: Optional[tuple[float, float]], ) -> tuple[dict[str, UTCDateTime], dict[str, float]]: distances = self._validate_distances(distances, epicenter, inventory) if distances is None: distances = self._calculate_distances(inventory, epicenter) p_picks = self._validate_p_picks(p_picks, distances) return p_picks, distances @staticmethod def _validate_distances(distances, epicenter, inventory): if distances is None and (inventory is None or epicenter is None): raise ValueError( "Either distances of inventory and epicenter need to be defined." ) elif distances is not None and not (inventory is None or epicenter is None): seisbench.logger.warning( "Distances and station/event positions are provided. " "Will ignore station/event positions." ) if distances is not None: distances = { DepthPhaseModel._homogenize_station_name(k): v for k, v in distances.items() } return distances @staticmethod def _validate_p_picks(p_picks, distances): p_picks = { DepthPhaseModel._homogenize_station_name(k): v for k, v in p_picks.items() } del_keys = [] for key in p_picks: if key not in distances: seisbench.logger.warning( f"No distance for '{key}'. Trace will be ignored" ) del_keys.append(key) elif not (15 < distances[key] < 100): seisbench.logger.debug( f"Station '{key}' at distance outside 15 to 100 degrees will be ignored." ) del_keys.append(key) for key in del_keys: del p_picks[key] return p_picks @staticmethod def _calculate_distances( inventory: obspy.Inventory, epicenter: tuple[float, float] ) -> dict[str, float]: distances = {} for net in inventory: for sta in net: if len(sta) == 0: # No channel details given, assume empty location code code = f"{net.code}.{sta.code}." distances[code] = locations2degrees( sta.latitude, sta.longitude, *epicenter ) continue for channel in sta: code = f"{net.code}.{sta.code}.{channel.location_code}" if code in distances: continue distances[code] = locations2degrees( channel.latitude, channel.longitude, *epicenter ) return distances @staticmethod def _homogenize_station_name(trace_id: str) -> str: """ Truncates station name to `NET.STA.LOC` """ if trace_id.count(".") == 3: # Channel given return trace_id[: trace_id.rfind(".")] elif trace_id.count(".") == 2: # Correct format return trace_id elif trace_id.count(".") < 2: # Add missing parts return trace_id + "." * (2 - trace_id.count(".")) else: raise ValueError(f"Could not parse trace id '{trace_id}'") def _line_search_depth( self, annotations: obspy.Stream, distances: dict[str, float], caller_name: str ) -> sbu.ClassifyOutput: annotations = annotations.slice( starttime=UTCDateTime(0) ) # Make sure sample 0 is at the P arrival probabilities = [] for station, station_annotations in self._group_traces(annotations).items(): probabilities.append( self._backproject_single_station( station_annotations, distances[station] ) ) if len(probabilities) == 0: sbu.ClassifyOutput( caller_name, depth=np.nan, depth_levels=self.depth_levels, probabilities=np.nan * self.depth_levels, avg_probabilities=np.nan * self.depth_levels, ) probabilities = np.stack(probabilities, axis=0) avg_probabilities = scipy.stats.mstats.gmean( probabilities, nan_policy="omit", axis=0 ) depth = self.depth_levels[np.argmax(avg_probabilities)] depth = self._qc_prediction(avg_probabilities, depth) return sbu.ClassifyOutput( caller_name, depth=depth, depth_levels=self.depth_levels, probabilities=probabilities, avg_probabilities=avg_probabilities, ) def _qc_prediction(self, prob: np.ndarray, depth: float) -> float: normed_prob = self._norm_label(prob, eps=1e-12) mean = np.sum(normed_prob * self.depth_levels) var = np.sum(normed_prob * ((self.depth_levels - mean) ** 2)) std = np.sqrt(var) if self.qc_std is not None: if std > self.qc_std and (self.qc_depth is None or depth < self.qc_depth): seisbench.logger.warning( f"Standard deviation ({std:.1f} km) above quality control " f"limit ({self.qc_std:.1f} km). Returning NaN. You can increase " f"qc_std to get a depth estimate nonetheless, but the result is " f"likely unreliable." ) return np.nan return depth def _backproject_single_station( self, station_annotations: obspy.Stream, dist: float, q_min: float = 0.5, truncate: int = 100, ): """ Backproject single station :param q_min: Quantile to use as lower cutoff for stability :param truncate: Number of samples truncated at the end for stability """ prob = np.ones_like(self.depth_levels) has_phases = np.zeros( len(self.depth_levels), dtype=bool ) # Log where at least one phase value was available for i, depth in enumerate(self.depth_levels): arrivals = self._ttlookup.get_traveltimes(dist, depth) for phase in ["pP", "sP"]: j = self._ttlookup.phases.index(phase) trace = station_annotations.select(channel=f"*_{phase}")[0] y_trace = trace.data[:-truncate] y_trace = self._smooth_curve(y_trace) y_trace = self._norm_label(y_trace) if not np.isnan(arrivals[j]): sample = int(arrivals[j] * trace.stats.sampling_rate) if sample < y_trace.shape[0]: prob[i] *= max(y_trace[sample], np.quantile(y_trace, q_min)) has_phases[i] = True else: prob[i] *= np.quantile(y_trace, q_min) else: prob[i] *= np.quantile(y_trace, q_min) prob[~has_phases] = np.nan # Set all values without any phase to nan return prob @staticmethod def _norm_label(y: np.ndarray, eps: float = 1e-5) -> np.ndarray: """ Norm label trace to sum to 1. """ y = y + eps / y.shape[-1] return y / np.sum(y, axis=-1, keepdims=True) @staticmethod def _smooth_curve(y: np.ndarray, smoothing: float = 10) -> np.ndarray: """ Smooth curve with Gaussian kernel """ if smoothing == 0: return y kernel = np.arange(-7 * smoothing, 7 * smoothing + 1, 1) kernel = np.exp(-0.5 * (kernel / (2 * smoothing)) ** 2) kernel /= np.sum(kernel) if y.ndim == 1: return np.convolve(y, kernel, "same") else: y_new = np.zeros_like(y) for i in range(y.shape[0]): y_new[i] = np.convolve(y[i], kernel, "same") return y_new @staticmethod def _group_traces(annotations: obspy.Stream) -> dict[str, obspy.Stream]: grouping = defaultdict(obspy.Stream) for trace in annotations: key = f"{trace.stats.network}.{trace.stats.station}.{trace.stats.location}" grouping[key].append(trace) return grouping def _rebase_streams_for_picks( self, stream: obspy.Stream, p_picks: dict[str, UTCDateTime], in_samples: int ) -> obspy.Stream: """ Cuts appropriate segments from the stream. Rebases each trace to have the pick at 0. """ selected_stream = obspy.Stream() for trace_id, pick in p_picks.items(): for trace in stream.select(*trace_id.split(".")): trace = trace.slice(starttime=pick - self.time_before).copy() trace.data = trace.data[:in_samples] if ( abs(trace.stats.starttime - pick + self.time_before) > trace.stats.delta ) or trace.stats.npts != in_samples: # Trace does not start at correct time or is too short continue trace.stats.starttime -= pick # trace.stats.starttime = UTCDateTime(0) - self.time_before selected_stream.append(trace) return selected_stream
class TTLookup: def __init__( self, dists: np.ndarray = np.linspace(1, 100, 50), # In degrees depths: np.ndarray = np.linspace(5, 660, 50), # In kilometers model: str = "iasp91", phases: tuple[str] = ("P", "pP", "sP"), ): assert self._linspaced(dists), "TTLookup requires linearly spaced distances" assert self._linspaced(depths), "TTLookup requires linearly spaced depths" self.model = model self.dists = dists self.depths = depths self.phases = phases self._grid = None cache = seisbench.cache_aux_root / "ttlookup" / f"{model}.pkl" write_cache = False if cache.is_file(): with open(cache, "rb") as f: dists, depths, phases, grid = pickle.load(f) if not ( dists.shape == self.dists.shape and depths.shape == self.depths.shape and np.allclose(dists, self.dists) and np.allclose(depths, self.depths) and phases == self.phases ): seisbench.logger.warning("Traveltime cache invalid. Recalculating.") write_cache = True else: self._grid = grid else: write_cache = True if write_cache: self._precalculate() cache.parent.mkdir(parents=True, exist_ok=True) with open(cache, "wb") as f: pickle.dump((self.dists, self.depths, self.phases, self._grid), f) def _precalculate(self): seisbench.logger.warning( "Precalculating travel times. This will take a moment. " "Results will be cached for future applications." ) from obspy.taup import TauPyModel model = TauPyModel(model=self.model) self._grid = ( np.zeros((len(self.depths), len(self.dists), len(self.phases))) * np.nan ) for i, depth in enumerate(tqdm(self.depths, desc="Precalculating traveltimes")): for j, dist in enumerate(self.dists): arrivals = model.get_travel_times( source_depth_in_km=depth, distance_in_degree=dist, phase_list=self.phases + ("Pdiff",), ) if len(arrivals) == 0: continue t0 = arrivals[0].time if arrivals[0].phase.name not in ["P", "Pdiff"]: t0 = np.nan for k, phase in enumerate(self.phases): for arrival in arrivals: if arrival.phase.name == phase: self._grid[i, j, k] = arrival.time - t0 break def get_traveltimes(self, dist: float, depth: float): frac_dist = (dist - self.dists[0]) / (self.dists[1] - self.dists[0]) frac_depth = (depth - self.depths[0]) / (self.depths[1] - self.depths[0]) idx_dist = int(frac_dist) idx_depth = int(frac_depth) alpha_dist = frac_dist - idx_dist alpha_depth = frac_depth - idx_depth tt = ( self._grid[idx_depth, idx_dist] * (1 - alpha_depth) * (1 - alpha_dist) + self._grid[idx_depth + 1, idx_dist] * alpha_depth * (1 - alpha_dist) + self._grid[idx_depth, idx_dist + 1] * (1 - alpha_depth) * alpha_dist + self._grid[idx_depth + 1, idx_dist + 1] * alpha_depth * alpha_dist ) return tt @staticmethod def _linspaced(x: np.ndarray): return np.allclose(x[1] - x[0], x[1:] - x[:-1])
[docs] class DepthFinder: """ This class is a high-level interface to the depth phase models. It determines event depth at teleseismic distances based on a preliminary location. In contrast to the depth phase models, it is not provided with waveforms, but automatically downloads data through FDSN. Furthermore, it automatically determines first P arrivals using predicted travel times and a deep learning picker. The processing consists of several steps: - determine available station at the time of the event - predict P arrivals - download waveforms through FDSN - repick P arrivals with a deep learning model - determine depth with deep learning based depth model If waveforms and P wave picks are already available, it is highly recommended to directly use the underlying depth phase model instead of this helper. .. code-block:: python :caption: Example application networks = {"GFZ": ["GE"], "IRIS": ["II", "IU"]} # FDSN providers and networks depth_model = sbm.DepthPhaseTEAM.from_pretrained("original") # A depth phase model phase_model = sbm.PhaseNet.from_pretrained("geofon") # A teleseismic picking model depth_finder = DepthFinder(networks, depth_model, phase_model) :param networks: Dictionary of FDSN providers and seismic network codes to query :param depth_model: The depth phase model to use :param phase_model: The phase picking model to use for pick refinement :param p_window: Seconds around the predicted P arrival to search for actual arrival :param p_threshold: Minimum detection confidence for the primary P phase to include a record """ def __init__( self, networks: dict[str, list[str]], depth_model: DepthPhaseModel, phase_model: WaveformModel, p_window: float = 10, p_threshold: float = 0.15, ): self.networks = networks self.depth_model = depth_model self.phase_model = phase_model self.p_window = p_window self.p_threshold = p_threshold self.cache: Optional[Path] = ( None # If set, cache waveforms at this path and try loading them here ) self.tt_model = TauPyModel(model="iasp91") self._clients = {provider: Client(provider) for provider in networks.keys()} self._network_to_provider = { net: provider for provider, nets in networks.items() for net in nets } self._setup_inventories() def _setup_inventories(self): self._inventories = {} for provider, networks in self.networks.items(): for network in networks: seisbench.logger.debug( f"Querying inventory for {network} from {provider}" ) self._inventories[network] = self._clients[provider].get_stations( network=network, channel="BH?", level="CHANNEL" ) def _get_stations(self, time: UTCDateTime) -> obspy.Inventory: stations = obspy.Inventory() for network, inv in self._inventories.items(): stations += inv.select(time=time) return stations
[docs] def get_depth( self, lat: float, lon: float, depth: float, origin_time: UTCDateTime, ) -> sbu.ClassifyOutput: """ Get the depth of an event based on its preliminary latitude, longitude, depth and origin time. A depth estimate needs to be input, as it is required to predict preliminary P arrivals. This is not a circular reasoning, as depth and origin_time trade off against each other. :param lat: Latitude of the event :param lon: Longitude of the event :param depth: Preliminary depth of the event :param origin_time: Preliminary origin time of the event """ stations = self._get_stations(origin_time) distances = self.depth_model._calculate_distances(stations, (lat, lon)) distances = {key: val for key, val in distances.items() if 15 < val < 100} p_picks_tt = self._get_picks_tt(origin_time, depth, distances) stream = self._get_cache(lat, lon, depth, origin_time) if stream is None: stream = self._get_waveforms(p_picks_tt) self._set_cache(stream, lat, lon, depth, origin_time) p_picks = self._repick_dl(p_picks_tt, stream) seisbench.logger.debug("Calculating depth") classify_outputs = self.depth_model.classify(stream, p_picks, distances) return sbu.ClassifyOutput( self.__class__.__name__, depth=classify_outputs.depth, depth_levels=classify_outputs.depth_levels, probabilities=classify_outputs.probabilities, avg_probabilities=classify_outputs.avg_probabilities, p_picks=p_picks, p_picks_tt=p_picks_tt, distances=distances, stream=stream, )
def _get_picks_tt( self, origin_time: UTCDateTime, depth: float, distances: dict[str, float] ): seisbench.logger.debug("Calculating traveltimes") p_picks = {} for station, dist in distances.items(): # Assume all station are at 0 km elevation. Error is small enough to be fixed by repicker. tt = self._get_traveltime(dist, depth) if not np.isnan(tt): p_picks[station] = origin_time + tt return p_picks def _get_traveltime(self, dist_deg: float, source_depth_km: float) -> float: arrivals = self.tt_model.get_travel_times( source_depth_in_km=source_depth_km, distance_in_degree=dist_deg, phase_list=["p", "P"], ) if len(arrivals) > 0: return arrivals[0].time else: return np.nan def _get_cache( self, lat: float, lon: float, depth: float, origin_time: UTCDateTime ) -> Optional[obspy.Stream]: if self.cache is None: return ev_cache = self.cache / self._get_event_key(lat, lon, depth, origin_time) if ev_cache.is_file(): if ev_cache.stat().st_size == 0: # Empty token file return obspy.Stream() else: return obspy.read(ev_cache) def _set_cache( self, stream: obspy.Stream, lat: float, lon: float, depth: float, origin_time: UTCDateTime, ) -> None: if self.cache is None: return ev_cache = self.cache / self._get_event_key(lat, lon, depth, origin_time) if not ev_cache.is_file(): if len(stream) > 0: stream.write(ev_cache) else: with open(ev_cache, "w"): pass # Create empty token file def _get_event_key( self, lat: float, lon: float, depth: float, origin_time: UTCDateTime ) -> str: return f"{origin_time}__{lat:.3f}__{lon:.3f}__{depth:.2f}.mseed" def _get_waveforms( self, p_picks: dict[str, UTCDateTime], time_before: float = 100, time_after: float = 300, ) -> obspy.Stream: bulks = {provider: [] for provider in self.networks.keys()} for station, pick in p_picks.items(): net, sta, loc = station.split(".") provider = self._network_to_provider[net] bulks[provider].append( (net, sta, loc, "BH?", pick - time_before, pick + time_after) ) stream = obspy.Stream() for provider, bulk in bulks.items(): seisbench.logger.debug(f"Querying {provider}") stream += sbu.fdsn_get_bulk_safe(self._clients[provider], bulk) return stream def _repick_dl( self, p_picks: dict[str, UTCDateTime], stream: obspy.Stream ) -> dict[str, UTCDateTime]: seisbench.logger.debug("Repicking") ann = self.phase_model.annotate(stream).select(channel="*_P") refined_p_picks = {} for station, pick in p_picks.items(): net, sta, loc = station.split(".") station_ann = ann.select(network=net, station=sta, location=loc) station_ann = station_ann.slice(pick - self.p_window, pick + self.p_window) if len(station_ann) != 1: continue station_ann = station_ann[0] if np.max(station_ann.data) < self.p_threshold: continue refined_p_picks[station] = ( station_ann.stats.starttime + station_ann.times()[np.argmax(station_ann.data)] ) return refined_p_picks
[docs] class DepthPhaseNet(PhaseNet, DepthPhaseModel): """ .. document_args:: seisbench.models DepthPhaseNet """ def __init__( self, phases: str = ("P", "pP", "sP"), sampling_rate: float = 20.0, depth_phase_args: Optional[dict] = None, norm="peak", **kwargs, ) -> None: if depth_phase_args is None: depth_phase_args = {} PhaseNet.__init__( self, phases=phases, sampling_rate=sampling_rate, norm=norm, **kwargs, ) DepthPhaseModel.__init__(self, **depth_phase_args) self._citation = ( "Münchmeyer, J., Saul, J. & Tilmann, F. (2023) " "Learning the deep and the shallow: Deep learning " "based depth phase picking and earthquake depth estimation." "Seismological Research Letters (in revision)." )
[docs] def forward(self, x: torch.tensor, logits=False) -> torch.tensor: y = super().forward(x, logits=True) if logits: return y else: return torch.sigmoid(y)
[docs] def annotate( self, stream: obspy.Stream, p_picks: dict[str, UTCDateTime], **kwargs, ): """ Get depth phase probabilities curves for one event. Note that the annotations are aligned to have the P arrival at UTCDateTime(0), i.e., 1970-01-01 00:00:00. The probability curves are not normalized, there absolute value is therefore meaningless. .. warning:: This class does not expose an 'annotate_async` function directly. :param stream: Obspy stream to annotate :param p_picks: Dictionary of P pick times. Station codes will be truncated to `NET.STA.LOC`. :param kwargs: All kwargs are passed to the annotate function of the superclass. """ p_picks = { DepthPhaseModel._homogenize_station_name(k): v for k, v in p_picks.items() } argdict = self.default_args.copy() argdict.update(kwargs) # Ensure all traces are at the right sampling rate and filtering causes no boundary artifacts self.annotate_stream_pre(stream, argdict) selected_stream = self._rebase_streams_for_picks( stream, p_picks, self.in_samples ) return super().annotate(selected_stream, **kwargs)
[docs] def classify( self, stream: obspy.Stream, p_picks: dict[str, UTCDateTime], distances: Optional[dict[str, float]] = None, inventory: Optional[obspy.Inventory] = None, epicenter: Optional[tuple[float, float]] = None, **kwargs, ) -> sbu.ClassifyOutput: """ Calculate depth of an event using depth phase picking and a line search over the depth axis. Can only handle one event at a time. For the line search, the epicentral distances of the stations to the event is required. These can either be provided directly or through an inventory and the event epicenter. .. warning:: This class does not expose an 'classify_async` function directly. :param stream: Obspy stream to classify :param p_picks: Dictionary of P pick times. Station codes will be truncated to `NET.STA.LOC`. :param distances: Dictionary of epicentral distances for the stations in degrees :param inventory: Inventory for the stations :param epicenter: (latitude, longitude) of the event epicenter """ p_picks, distances = self._prepare_classify_args( p_picks, distances, inventory, epicenter ) annotations = self.annotate(stream, p_picks, **kwargs) return self._line_search_depth(annotations, distances, self.name)
[docs] class DepthPhaseTEAM(PhaseTEAM, DepthPhaseModel): """ .. document_args:: seisbench.models DepthPhaseTEAM """ def __init__( self, phases: str = ("P", "pP", "sP"), classes: int = 3, sampling_rate: float = 20.0, depth_phase_args: Optional[dict] = None, norm="peak", **kwargs, ) -> None: if depth_phase_args is None: depth_phase_args = {} PhaseTEAM.__init__( self, phases=phases, classes=classes, sampling_rate=sampling_rate, norm=norm, **kwargs, ) DepthPhaseModel.__init__(self, **depth_phase_args) self._citation = ( "Münchmeyer, J., Saul, J. & Tilmann, F. (2023) " "Learning the deep and the shallow: Deep learning " "based depth phase picking and earthquake depth estimation." "Seismological Research Letters (in revision)." )
[docs] def annotate( self, stream: obspy.Stream, p_picks: dict[str, UTCDateTime], **kwargs, ): """ Get depth phase probabilities curves for one event. Note that the annotations are aligned to have the P arrival at UTCDateTime(0), i.e., 1970-01-01 00:00:00. The probability curves are not normalized, there absolute value is therefore meaningless. :param stream: Obspy stream to annotate :param p_picks: Dictionary of P pick times. Station codes will be truncated to `NET.STA.LOC`. :param kwargs: All kwargs are passed to the annotate function of the superclass. """ p_picks = { DepthPhaseModel._homogenize_station_name(k): v for k, v in p_picks.items() } argdict = self.default_args.copy() argdict.update(kwargs) # Ensure all traces are at the right sampling rate and filtering causes no boundary artifacts self.annotate_stream_pre(stream, argdict) selected_stream = self._rebase_streams_for_picks( stream, p_picks, self.in_samples ) return super().annotate(selected_stream, **kwargs)
[docs] def classify( self, stream: obspy.Stream, p_picks: dict[str, UTCDateTime], distances: Optional[dict[str, float]] = None, inventory: Optional[obspy.Inventory] = None, epicenter: Optional[tuple[float, float]] = None, **kwargs, ) -> sbu.ClassifyOutput: """ Calculate depth of an event using depth phase picking and a line search over the depth axis. Can only handle one event at a time. For the line search, the epicentral distances of the stations to the event is required. These can either be provided directly or through an inventory and the event epicenter. :param stream: Obspy stream to classify :param p_picks: Dictionary of P pick times. Station codes will be truncated to `NET.STA.LOC`. :param distances: Dictionary of epicentral distances for the stations in degrees :param inventory: Inventory for the stations :param epicenter: (latitude, longitude) of the event epicenter """ p_picks, distances = self._prepare_classify_args( p_picks, distances, inventory, epicenter ) annotations = self.annotate(stream, p_picks, **kwargs) return self._line_search_depth(annotations, distances, self.name)