Source code for acc.core

# -*- coding: utf-8 -*-
# Copyright 2019- Weijia Sun, MIT license
"""Preprocessing and correlation"""
import glob
# from functools import partial
# import itertools
import logging
# import multiprocessing

import numpy as np

from scipy.signal import freqz, iirfilter
from scipy.fftpack import fft, ifft, fftshift, ifftshift, next_fast_len

from acc.util import smooth as smooth_func
from acc.util import IterMultipleComponents

from obspy import Stream
from obspy import read_inventory
from obspy.signal.rotate import rotate2zne

log = logging.getLogger('acc.core.log')


[docs]def spectral_whitening(tr, smooth=None, filter=None, waterlevel=1e-8, corners=2, zerophase=True): """ Apply spectral whitening to data Data is divided by its smoothed (Default: None) amplitude spectrum. :param tr: trace to manipulate :param smooth: length of smoothing window in Hz (default None -> no smoothing) :param filter: filter spectrum with bandpass after whitening (tuple with min and max frequency) (default None -> no filter) :param waterlevel: waterlevel relative to mean of spectrum :param mask_again: weather to mask array after this operation again and set the corresponding data to 0 :param corners: parameters parsing to filter, :param zerophase: parameters parsing to filter :return: whitened data """ sr = tr.stats.sampling_rate data = np.copy(tr.data) # data = _fill_array(data, fill_value=0) # mask = np.ma.getmask(data) # transform to frequency domain nfft = next_fast_len(len(data)) spec = fft(data, nfft) # amplitude spectrum spec_ampl = np.abs(spec) # normalization spec_ampl /= np.max(spec_ampl) spec_ampl_raw = np.copy(spec_ampl) # smooth if smooth: smooth = int(smooth * nfft / sr) spec_ampl = ifftshift(smooth_func(fftshift(spec_ampl), smooth)) spec_ampl_smth = np.copy(spec_ampl) # save guard against division by 0 spec_ampl[spec_ampl < waterlevel] = waterlevel # make the spectrum have the equivalent amplitude before/after smooth if smooth: scale = np.max(spec_ampl_raw) / np.max(spec_ampl_smth) spec /= spec_ampl * scale else: spec /= spec_ampl # FFT back to time domain ret = np.real(ifft(spec, nfft)[:len(data)]) tr.data = ret # filter if filter is not None: tr.filter(type="bandpass", freqmin=filter[0], freqmax=filter[1], corners=corners, zerophase=zerophase) return tr
[docs]def _fill_array(data, mask=None, fill_value=None): """ Mask numpy array and/or fill array value without demasking. Additionally set fill_value to value. If data is not a MaskedArray and mask is None returns silently data. :param mask: apply mask to array :param fill_value: fill value """ if mask is not None and mask is not False: data = np.ma.MaskedArray(data, mask=mask, copy=False) if np.ma.is_masked(data) and fill_value is not None: data._data[data.mask] = fill_value np.ma.set_fill_value(data, fill_value) # elif not np.ma.is_masked(data): # data = np.ma.filled(data) return data
[docs]def time_norm(tr, method, time_length=5, filter=(0.01, 1), corners=2, zerophase=True, waterlevel=1.0e-8): """ Calculate normalized data, see e.g. Bensen et al. (2007, GJI) :param tr: Trace to manipulate :param str method: 1bit: reduce data to +1 if >0 and -1 if <0\n run_abs_mean: running absolute mean normalization :param float time_length: time length to be smoothed, default 5 sec :param tuple filter: bandpass frequency band, default (0.01, 1) Hz :param int corners: corners default 2 :param bool zerophase: default True :param float waterlevel: waterlevel to safe guard division :return: normalized data """ data = tr.data data = _fill_array(data, fill_value=0) mask = np.ma.getmask(data) if method == '1bit': np.sign(data, out=data) elif method == 'run_abs_mean': data = _run_abs_mean(tr, time_length=time_length, filter=filter, corners=corners, zerophase=zerophase, waterlevel=waterlevel) else: msg = 'The method passed to time_norm is not known: %s.' % method raise ValueError(msg) tr.data = _fill_array(data, mask=mask, fill_value=0) return tr
[docs]def _run_abs_mean(tr, time_length=5, filter=(0.01, 1), corners=4, zerophase=True, waterlevel=1e-8): """ running absolute mean normalization :param tr: trace to be normalized :param float time_length: time length to be smoothed, default 5 sec :param tuple filter: bandpass frequency band, default (0.01, 1) Hz :param int corners: corners default 4 :param bool zerophase: default True :param float waterlevel: waterlevel to safe guard division :return: data after temporal normalization """ tr2 = tr.copy() tr2.filter(type="bandpass", freqmin=filter[0], freqmax=filter[1], corners=corners, zerophase=zerophase) data = np.abs(tr2.data) nl = int(time_length / tr2.stats.delta) smth_data = smooth_func(x=data, window_len=nl, window='flat', method='zeros') # save guard against division by 0 smth_data[smth_data < waterlevel] = waterlevel data = np.copy(tr.data) data /= smth_data return data
[docs]def rotation(stream, method="NE->RT", acc_type="event"): """ Rotatation. :param stream: obspy stream to be rotated :param method: "NE->RT" or "ZNE->LQT" :param acc_type: "event", "noise" :return stream: obspy stream after rotation. .. note:: The keywords `component_azimuth` and `component_inclination` must be given in the stats. Currently, only acc_type="event" are well debugged. The input 3-component data should be in the roughly same periods or time window. """ rot_stream = Stream() # function to extract 3-component stream. # if the data are given event the data, then use onset as the key word to find the 3-component data if acc_type == "event": key = "onset" elif acc_type == "noise": key = "starttime" def iter3c(stream): return IterMultipleComponents(stream, key=key, number_components=(2, 3)) for st3c in iter3c(stream): tmin, tmax = _find_start_end_time(stream=st3c) if tmin > tmax: continue st3c.trim(starttime=tmin, endtime=tmax) # handling the borehole components 1, 2, Z or 1, 2, 3. comp = [] for tr in st3c: comp.append(tr.stats.channel[-1]) comp.sort() cc = "".join(comp) if cc == "12Z": cc = "Z12" # rotate2zne if cc in ["123", "Z12"]: zne = rotate2zne(st3c[0].data, st3c[0].stats.component_azimuth, st3c[0].stats.component_inclination, st3c[1].data, st3c[1].stats.component_azimuth, st3c[1].stats.component_inclination, st3c[2].data, st3c[2].stats.component_azimuth, st3c[2].stats.component_inclination ) for tr, new_data, component in zip(st3c, zne, "ZNE"): tr.data = new_data tr.stats.channel = tr.stats.channel[:-1] + component # rotate to various coordinates st3c.rotate(method=method) rot_stream += st3c return rot_stream
[docs]def _find_start_end_time(stream): """ find the start and end time of a specific stream. :param stream: :return starttime, endtime: """ starttime = [] endtime = [] for tr in stream: starttime.append(tr.stats.starttime) endtime.append(tr.stats.endtime) return max(starttime), min(endtime)
[docs]def remove_response(trace, resp_path, pre_filt=(0.01, 0.02, 8, 9), output="VEL", format="XML"): """ Romove instrumental response :param trace: :param resp_path: :param pre_filt: :param output: :param format: :return: .. Note:: currently only support XML DATALESS RESP format. """ station_id = ".".join([trace.stats.network, trace.stats.station]) trace_id = trace.id tr = trace.copy() if format.upper()in ["XML", "DATALESS", "RESP"]: if format.upper() == "RESP": resp_file = glob.glob(resp_path + "/*%s*" % trace_id) else: resp_file = glob.glob(resp_path + "/*%s*" % station_id) if len(resp_file) < 1: logging.error("Cannot find Response file: ", station_id) inv = read_inventory(resp_file[0]) tr.remove_response(inventory=inv, pre_filt=pre_filt, output=output) elif method == "SACPZ": # from obspy.io.sac.sacpz import attach_paz # attach_paz(tr, paz_file="SACPZ.AU.WRKA.--.BHZ") # https://docs.obspy.org/packages/obspy.io.sac.sacpz.attach_paz.html#obspy.io.sac.sacpz.attach_paz logging.warn("The SACPZ format not supported currently.") pass else: logging.warn("other RESP format not supported: ", method) pass return tr