Source code for seisbench.data.neic

import json
import shutil
import tarfile
from collections import defaultdict

import numpy as np

import seisbench
import seisbench.util

from .base import WaveformBenchmarkDataset

# Conversion from earth radius
DEG2KM = 2 * np.pi * 6371 / 360


[docs] class NEIC(WaveformBenchmarkDataset): """ NEIC dataset from Yeck and Patton """ def __init__(self, **kwargs): citation = ( "Yeck, W.L., and Patton, J., 2020, Waveform Data and Metadata used to " "National Earthquake Information Center Deep-Learning Models: " "U.S. Geological Survey data release, https://doi.org/10.5066/P9OHF4WL." ) super().__init__(citation=citation, repository_lookup=True, **kwargs) def _download_dataset(self, writer, cleanup=True, blocksize=2**14): """ Downloads and converts the dataset from the original publication :param writer: WaveformWriter :param cleanup: If true, delete the original and temporary data files after conversion. Defaults to true. :param blocksize: Number of waveform samples to read from disk at once :return: """ seisbench.logger.warning( "Converting this catalog from source will require ~250 GB disk storage. " "The resulting catalog has ~75GB. " "Please ensure that the storage is available on your disk." ) path = self.path path_original = path / "original" path_original.mkdir(parents=True, exist_ok=True) path_meta = path_original / "meta.json" # Download metadata in json format to extract download links for the data seisbench.util.download_http( "https://www.sciencebase.gov/catalog/item/5ed528ff82ce2832f047eee6?format=json", path_meta, progress_bar=False, ) # Load metadata with open(path_meta, "r") as fmeta: meta = json.load(fmeta) # Download data files for f in meta["files"]: # Uses callback_if_uncached only to be able to utilize the cache mechanism. # Concurrent accesses are anyhow already controlled by the callback_if_uncached # call wrapping _download_dataset. # It's therefore considered save to set force=True. def callback_download_original(path): seisbench.util.download_http( f["url"], path, desc=f"Downloading file {f['name']}", ) seisbench.util.callback_if_uncached( path_original / f["name"], callback_download_original, force=True ) # Note: The following lines could also each be wrapped into a callback_if_uncached # However, concatenating and unpacking do not take too long and this way the code is easier. # Concatenate partitioned files seisbench.logger.warning( "Concatenating partitioned tar.gz archives. This might take a few minutes." ) partitioned_files = ["PWF_Test.tar.gz", "PWF_Train.tar.gz", "SWF_Train.tar.gz"] for partitioned_file in partitioned_files: members = sorted( [ x for x in path_original.iterdir() if x.name.startswith(partitioned_file) and not x.name == partitioned_file ] ) with open(path_original / partitioned_file, "wb") as fout: for file in members: with open(file, "rb") as fin: data = fin.read(1000000) # Read 1MB parts while len(data) > 0: fout.write(data) data = fin.read(1000000) # Read 1MB parts # Unpack files seisbench.logger.warning( "Unpacking tar.gz archives. This might take a few minutes." ) path_unpacked = path / "unpacked" path_unpacked.mkdir(parents=True, exist_ok=True) tar_files = [x for x in path_original.iterdir() if x.name.endswith(".tar.gz")] for tar_path in tar_files: with tarfile.open(tar_path, "r:gz") as file: seisbench.util.safe_extract_tar(file, path_unpacked) groups = [("P", "Train"), ("S", "Train"), ("P", "Test"), ("S", "Test")] total_samples = ( np.load(path_unpacked / "PAzi_Train.npy").shape[0] + np.load(path_unpacked / "SAzi_Train.npy").shape[0] + np.load(path_unpacked / "PAzi_Test.npy").shape[0] + np.load(path_unpacked / "SAzi_Test.npy").shape[0] ) # Select 10 percent of the training events for development # As the train test split is random, but event wise, a similar strategy is employed here train_ids = np.concatenate( [ np.load(path_unpacked / "PEID_Train.npy"), np.load(path_unpacked / "SEID_Train.npy"), ] ) train_ids = np.unique(train_ids) dev_ids = set(train_ids[::10]) writer.set_total(total_samples) # TODO: Verify that these are unrestituted counts writer.data_format = { "dimension_order": "CW", "component_order": "ZNE", "measurement": "velocity", "sampling_rate": 40, "unit": "counts", "instrument_response": "not restituted", } eq_counts = defaultdict(lambda: 0) for wavetype, split in groups: # Flush cache after train/dev is complete if wavetype == "P" and split == "Test": writer.flush_hdf5() azimuth = np.load(path_unpacked / f"{wavetype}Azi_{split}.npy") distance = np.load(path_unpacked / f"{wavetype}Dist_{split}.npy") event_id = np.load(path_unpacked / f"{wavetype}EID_{split}.npy") magnitude = np.load(path_unpacked / f"{wavetype}Mag_{split}.npy") p = 0 while p < azimuth.shape[0]: # Recreate memmap each epoch to avoid memory "leak" # For details see # https://stackoverflow.com/questions/45132940/numpy-memmap-memory-usage-want-to-iterate-once waveforms = np.load( path_unpacked / f"{wavetype}WF_{split}.npy", mmap_mode="r" ) block_azimuth = azimuth[p : p + blocksize] block_distance = distance[p : p + blocksize] block_event_id = event_id[p : p + blocksize] block_magnitude = magnitude[p : p + blocksize] block_waveforms = waveforms[ p : p + blocksize ].copy() # Copy causes the load into memory for azi, dist, eid, mag, wf in zip( block_azimuth, block_distance, block_event_id, block_magnitude, block_waveforms, ): trace_station_id = eq_counts[eid] eq_counts[eid] += 1 trace_name = f"{eid}_st{trace_station_id}" trace_split = split.lower() if eid in dev_ids: trace_split = "dev" metadata = { "trace_name": trace_name, "trace_category": "earthquake", f"trace_{wavetype.lower()}_arrival_sample": 1200, f"trace_{wavetype.lower()}_status": "manual", "source_magnitude": mag, "source_id": eid, "path_ep_distance_km": dist * DEG2KM, "path_back_azimuth_deg": azi, "split": trace_split, } writer.add_trace(metadata, wf) p += blocksize if cleanup: seisbench.logger.warning( "Cleaning up source and temporary files. This might take a few minutes." ) shutil.rmtree(path_unpacked) shutil.rmtree(path_original)
[docs] class MLAAPDE(WaveformBenchmarkDataset): """ MLAAPDE dataset from Cole et al. (2023) Note that the SeisBench version is not identical to the precompiled version distributed directly through USGS but uses a different data selection. In addition, custom versions of MLAAPDE can be compiled with the software provided by the original authors. These datasets can be exported in SeisBench format. """ def __init__(self, **kwargs): citation = ( "Cole, H. M., Yeck, W. L., & Benz, H. M. (2023). " "MLAAPDE: A Machine Learning Dataset for Determining " "Global Earthquake Source Parameters. " "Seismological Research Letters, 94(5), 2489-2499. " "https://doi.org/10.1785/0220230021" "\n\n" "Cole H. M. and W. L. Yeck, 2022, " "Global Earthquake Machine Learning Dataset: " "Machine Learning Asset Aggregation of the PDE (MLAAPDE): " "U.S. Geological Survey data release, doi:10.5066/P96FABIB" ) license = "MLAAPDE code under CC0 1.0 Universal, data licenses dependent on the underlying networks" super().__init__( citation=citation, license=license, repository_lookup=True, **kwargs ) def _download_dataset(self, writer, **kwargs): pass
[docs] @staticmethod def available_chunks(*args, **kwargs): return [ "_201307", "_201308", "_201309", "_201310", "_201311", "_201312", "_201401", "_201402", "_201403", "_201404", "_201405", "_201406", "_201407", "_201408", "_201409", "_201410", "_201411", "_201412", "_201501", "_201502", "_201503", "_201504", "_201505", "_201506", "_201507", "_201508", "_201509", "_201510", "_201511", "_201512", "_201601", "_201602", "_201603", "_201604", "_201605", "_201606", "_201607", "_201608", "_201609", "_201610", "_201611", "_201612", "_201701", "_201702", "_201703", "_201704", "_201705", "_201706", "_201707", "_201708", "_201709", "_201710", "_201711", "_201712", "_201901", "_201902", "_201903", "_201904", "_201905", "_201906", "_201907", "_201908", "_201909", "_201910", "_201911", "_201912", "_202001", "_202002", "_202003", "_202004", "_202005", "_202006", "_202007", "_202008", "_202009", "_202010", "_202011", "_202012", "_202101", "_202102", "_202103", "_202104", "_202105", "_202106", "_202107", "_202108", "_202109", "_202110", "_202111", "_202112", "_201801", "_201802", "_201803", "_201804", "_201805", "_201806", "_201807", "_201808", "_201809", "_201811", "_201810", "_201812", "_202201", "_202202", "_202203", "_202204", ]
def _write_chunk_file(self): """ Write out the chunk file :return: None """ chunks_path = self.path / "chunks" if chunks_path.is_file(): return chunks = self.available_chunks() chunks_str = "\n".join(chunks) + "\n" self.path.mkdir(exist_ok=True, parents=True) with open(chunks_path, "w") as f: f.write(chunks_str)