Source code for acc.migration

# -*- coding: utf-8 -*-
from obspy.taup import TauPyModel
import pandas as pd
from scipy.interpolate import interp1d
import numpy as np
from obspy.geodetics import gps2dist_azimuth
from acc.stack import pws_stack
from obspy import Stream

import os
import glob
from tqdm import tqdm
from functools import partial
from multiprocessing import Pool
import multiprocessing

from acc.io import _load_json
import logging
from obspy import read

# standalone version
# this function is supposed to be modified to parallel version.
[docs]def mig_one_station(stream, model="ak135", earth_radius=6378137.0, depth_range=(0, 300, 1)): try: # prior to the model used to calculate traveltime model = stream[0].stats.model except AttributeError: model = model taup_model = TauPyModel(model=model) for tr in stream: evdp = tr.stats.event_depth evla = tr.stats.event_latitude evlo = tr.stats.event_longitude stla = tr.stats.station_latitude stlo = tr.stats.station_longitude phase = tr.stats.phase # tr.filter(type="bandpass", freqmin=0.05, freqmax=0.5, zerophase=True) arrivals = taup_model.get_ray_paths_geo(source_depth_in_km=evdp, source_latitude_in_deg=evla, source_longitude_in_deg=evlo, receiver_latitude_in_deg=stla, receiver_longitude_in_deg=stlo, phase_list=(phase,), resample=True) arr = arrivals[0] # raypath coordinates. The dataframe contains six columns, ray_p, traveltime, dist in rad, depth, lat and lon. df = pd.DataFrame(arr.path) # one-way reflection time time = tr.times() / 2 data = tr.data # ray path information: traveltime, depth, and coordinates # reverse time: the maximum traveltime minus traveltime at varied depths or points df["time_reverse"] = df["time"].iloc[-1] - df["time"] # get sub-dataset with traveltime (ray path) slightly longer than the one-way reflected traveltime from acc # for convenience of interpolation df2 = df[df["time_reverse"] < time[-1] * 1.2] # convert the raypath information from dataframe to numpy array ttime = df["time_reverse"].to_numpy() depth = df["depth"].to_numpy() lat = df["lat"].to_numpy() lon = df["lon"].to_numpy() # simple migration by back projection fdepth = interp1d(ttime, depth) flat = interp1d(ttime, lat) flon = interp1d(ttime, lon) # interpolation fitting in with one-way reflection time to accomplish back projection depths = fdepth(time) lats = flat(time) lons = flon(time) # calculate the distances from pierce points to station at different depths # or varied radius dists = np.zeros_like(depths) for i, d in enumerate(depths): dists[i], az, baz = gps2dist_azimuth(lat1=tr.stats.station_latitude, lon1=tr.stats.station_longitude, lat2=lats[i], lon2=lons[i], a=earth_radius-d) # convert the unit from m to km. If the piere point is to the south of the station then distance is negative. if lats[i] <= tr.stats.station_latitude: dists[i] = -dists[i] dists /= 1000 # the following several lines can be commented. This was firstly written to save data with # irregular depth sampling intervals. mig1sta = pd.DataFrame(columns=["time", "depth", "dist", "lat", "lon", "data"]) mig1sta["depth"] = depths mig1sta["lat"] = lats mig1sta["lon"] = lons mig1sta["time"] = time tr.normalize() mig1sta["data"] = tr.data mig1sta["dist"] = dists # second interpolation to regular depth grid. after back-projection the depth sampling is irregular. # depth range 0-300 km with an interval of 0.5 km. delta = depth_range[2] d = np.arange(depth_range[0], depth_range[1]+delta, delta) tr.stats.delta = delta ftime = interp1d(depths, time) fdist = interp1d(depths, dists) flat = interp1d(depths, lats) flon = interp1d(depths, lons) fdata = interp1d(depths, tr.data) time = ftime(d) dists = fdist(d) lats = flat(d) lons = flon(d) tr.data = fdata(d) depths = np.copy(d) mig1sta = pd.DataFrame(columns=["time", "depth", "dist", "lat", "lon", "data"]) mig1sta["depth"] = depths mig1sta["lat"] = lats mig1sta["lon"] = lons mig1sta["time"] = time tr.normalize() mig1sta["data"] = tr.data mig1sta["dist"] = dists header = {"path": df2, "mig":mig1sta} tr.stats.update(header) return stream
[docs]def _mig_1(path, model="ak135"): # read autocorrelograms of a given station saved in `path` st = read(path + "/*pkl") st_mig = mig_one_station(stream=st, model=model, earth_radius=6378137.0) return st_mig
[docs]def migration_1station(jsonfile): kwargs = _load_json(jsonfile) io = kwargs["io"] njobs = kwargs["njobs"] if njobs > multiprocessing.cpu_count(): njobs = multiprocessing.cpu_count() # datapath containing auto-correlograms path = io["outpath"] + "/1_results" temp = glob.glob(path + "/*") stations = [] for t in temp: if os.path.isdir(t): stations.append(t) model = kwargs["migration"]["model"] if model is None: model = kwargs["tt_model"] do_work = partial(_mig_1, model=model) st_mig_stations = [] if njobs == 1: logging.info('do work sequential (%d cores)', njobs) for sta in tqdm(stations, total=len(stations)): st = do_work(sta) st_mig_stations.append(st) else: logging.debug('do work parallel (%d cores)', njobs) pool = multiprocessing.Pool(njobs) for st in tqdm(pool.imap_unordered(do_work, stations), total=len(stations)): st_mig_stations.append(st) pool.close() pool.join() # save data into disk outpath = io["outpath"] + "/migration_1station" # if not os.path.exists(outpath): try: os.makedirs(outpath) except: pass for st in st_mig_stations: # station id tr = st[0] if tr.stats.location == "": station_id = ".".join([tr.stats.network, tr.stats.station]) else: station_id = ".".join([tr.stats.network, tr.stats.station, tr.stats.location]) fn = outpath + "/" + station_id + ".pkl" st.write(fn, format="PICKLE")
[docs]def migration_one_station_stack(stream, method="PWS", power=2, time_range=[8, 16], coeff=0.5): """ Stacking of migrated traces for one station. Currently not used. :param stream: :param method: "PWS" for phase-weighted stacking and "linear" for linear stacking :param power: used by "PWS" only. power=0 is linear stacking :param time_range: if None do simple stacking (PWS or linear). if a list contains two elements then it will select traces with high resemblance with the initial stacking to get final stacked trace. :param coeff: traces with correlation efficient higher than the value will be selected. :return: trace after stacking .. Note:: The one more procedure is to improve signal-to-noise ratio. You may check the stats header `corrstack`. If the key equals zero, the stacking is the general ones, otherwise the correlated stacking are implemented. The value denotes the number of stacked traces. """ # first initial stacking and find similar traces for final stacking. tr_stk = pws_stack(stream, power=power, normalize=True) h = {"corrstack": 0} tr_stk.stats.update(h) if time_range is None: return tr_stk delta = tr_stk.stats.delta i1 = int(time_range[0] / delta) i2 = int(time_range[1] / delta) data1 = np.copy(tr_stk.data[i1:i2]) traces = [] for tr in stream: data2 = np.copy(tr.data[i1:i2]) c = np.corrcoef(data1, data2) if c[1][0] >= coeff: traces.append(tr) if len(traces) < 1: return tr_stk # final stacking with high coherence st2 = Stream(traces=traces) tr_stk = pws_stack(st2, power=power, normalize=True) h = {"corrstack": len(traces)} tr_stk.stats.update(h) return tr_stk