# -*- coding: utf-8 -*-
import os
import glob
import commentjson
from acc.io import _load_json, _get_event_id_tr, _get_station_id, _get_noise_id
from acc.core import spectral_whitening, time_norm, rotation, remove_response
from acc.util import iter_time, IterMultipleComponents
from acc.core import _find_start_end_time
from obspy.signal.filter import envelope
from obspy import read, Stream, Trace
from scipy.signal import correlate
import numpy as np
from tqdm import tqdm
from functools import partial
from multiprocessing import Pool
import multiprocessing
import logging
logging.basicConfig(level=logging.NOTSET, format='%(asctime)s %(filename)s[line:%(lineno)d] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename="acc.processing.log", filemode="a"
)
[docs]def processing_event_Z(jsonfile):
"""
processing vertical component of event data
:param jsonfile:
:return:
"""
# load imported data from files
kwargs = _load_json(jsonfile)
njobs = kwargs["njobs"]
datapath = kwargs["io"]["outpath"] + "/0_raw"
logging.info("This may take a lot of time if dataset is huge.")
# get filenames in Unix style for further processing
files = glob.glob(datapath + "/*/*.pkl")
logging.info("%d files found.", len(files))
# processing includes downsampling, detrend, demean,
# instrument response removal, spectral whitening, temporal normalization
# autocorrelation and filter, then output results.
do_work = partial(_proc_event_Z, **kwargs)
numbers = []
if njobs == 1:
logging.info('do work sequential (%d cores)', njobs)
for file in tqdm(files, total=len(files)):
num = do_work(file)
numbers.append(num)
else:
logging.info('do work parallel (%d cores)', njobs)
pool = multiprocessing.Pool(njobs)
for num in tqdm(pool.imap_unordered(do_work, files), total=len(files)):
numbers.append(num)
pool.close()
pool.join()
logging.info("%d/%d files processed.", sum(numbers), len(files))
[docs]def processing_event_full(jsonfile):
# load imported data from files
kwargs = _load_json(jsonfile)
njobs = kwargs["njobs"]
datapath = kwargs["io"]["outpath"] + "/0_raw"
logging.info("This may take a lot of time and consume internal memory if dataset is huge.")
# get filenames in Unix style for further processing
files = glob.glob(datapath + "/*/*.pkl")
logging.info("%d files found.", len(files))
# read files
st = Stream()
for file in files:
st += read(file)
sampling_rate = kwargs["preprocessing"]["sampling_rate"]
# processing includes downsampling, detrend, demean, a large dataset
st = _simple_proc(st, sampling_rate=sampling_rate, njobs=njobs)
# instrument response removal, spectral whitening, temporal normalization
# autocorrelation and filter, then output results.
_proc_event_full(st, **kwargs)
[docs]def _proc_event_full(st, **kwargs):
"""
processings including
:param st:
:param kwargs:
:return:
"""
# instrument response removal, spectral whitening, temporal normalization
# autocorrelation and filter, then output results.
def iter3c(stream):
# for an event, there is always "onset"
return IterMultipleComponents(stream, key="onset", number_components=(2, 3))
# resp removal, rotation, spectral whitening, temporal normalization
tasks = iter3c(st)
# loop over streams in each stream containing the 3-component traces
do_work = partial(_proc_event_rst, **kwargs)
njobs = kwargs["njobs"]
numbers = []
logging.info("deep processing for full event correlogram.")
print("deep processing for full event correlogram.")
if njobs == 1:
logging.info('do work sequential (%d cores)', njobs)
for task in tqdm(tasks, total=len(tasks)):
num = do_work(task)
numbers.append(num)
else:
logging.info('do work parallel (%d cores)', njobs)
pool = multiprocessing.Pool(njobs)
for num in tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
numbers.append(num)
pool.close()
pool.join()
logging.info("%d/%d files processed.", sum(numbers), len(tasks))
[docs]def _proc_event_rst(st, **kwargs):
"""
processing including rotation, spectral whitening and temporal normalization.
and cross-correlation.
:param st:
:param kwargs:
:return: 1 if processing successfully.
.. Note::
rst is short as Rotation Spectral whitening and Temporal normalization.
"""
# calculation SNR
# trace = Trace()
for tr in st:
if tr.stats.channel[-1] in ["Z", "L"]:
trace = tr.copy()
snr = _cal_event_snr(tr=trace)
snr_threshold = kwargs["data_selection_event"]["snr_threshold"]
if snr < snr_threshold:
return 0
for tr in st:
tr.stats.update({"snr": snr})
# remove instrumental response
if kwargs["rm_resp_on"]:
option_resp = kwargs["preprocessing"]["resp"]
st2 = Stream()
for tr in st:
tr = remove_response(trace=tr, **option_resp)
st2.append(tr)
st = st2.copy()
# st: stream containing 3-comp traces of an event recorded by a station
# Rotation, Spectral whitening, and Temporal normalization
rotation_method = kwargs["preprocessing"]["rotation_method"]
st = _rot(st, method=rotation_method)
# spectral whitening
if kwargs["whiten_on"]:
option_whiten = kwargs["preprocessing"]["whiten"]
st2 = Stream()
for tr in st:
tr = spectral_whitening(tr=tr, **option_whiten)
st2.append(tr)
st = st2.copy()
# temporal normalization
if kwargs["time_norm_on"]:
option_time_norm = kwargs["preprocessing"]["time_norm"]
st2 = Stream()
for tr in st:
tr = time_norm(tr, **option_time_norm)
st2.append(tr)
st = st2.copy()
# correlation
# trace_l = Trace()
# trace_q = Trace()
# trace_t = Trace()
for tr in st:
if tr.stats.channel[-1] in ["Z", "L"]:
trace_l = tr.copy()
elif tr.stats.channel[-1] in ["R", "Q"]:
trace_q = tr.copy()
elif tr.stats.channel[-1] in ["T"]:
trace_t = tr.copy()
else:
logging.warn("unknown trace channel.", tr.stats.channel)
# auto and cross correlation
# auto
options = kwargs["correlate"]
cc_ll = _crosscorrelation(tr1=trace_l, tr2=trace_l, **options)
# cross
options = kwargs["cross_correlate"]
cc_ql = _crosscorrelation(tr1=trace_q, tr2=trace_l, **options)
cc_tl = _crosscorrelation(tr1=trace_t, tr2=trace_l, **options)
trace_l = cc_ll.copy()
trace_q = cc_ql.copy()
trace_t = cc_tl.copy()
# rename channel name
if rotation_method == "NE->RT":
chn = ["ZZ", "RZ", "TZ"]
elif rotation_method == "ZNE->LQT":
chn = ["LL", "QL", 'TL']
trace_l.stats.channel = chn[0]
trace_q.stats.channel = chn[1]
trace_t.stats.channel = chn[2]
# write into disk
for tr in [trace_l, trace_q, trace_t]:
trace_id = tr.id
station_id = _get_station_id(tr)
event_id = _get_event_id_tr(tr)
filename = trace_id + "_" + event_id + ".pkl"
outpath = kwargs["io"]["outpath"] + "/1_results"
filepath = "/".join([outpath, station_id])
# if not os.path.exists(filepath):
try:
os.makedirs(filepath)
except:
pass
# extracting data, to have the same length of output data
tlen = kwargs["correlate"]["window"]
tlen = abs(tlen[1] - tlen[0])
tr.trim(starttime=tr.stats.starttime, endtime=tr.stats.starttime + tlen, fill_value=0, pad=True)
filen = filepath + "/" + filename
force = kwargs["io"]["force"]
if not force and os.path.exists(filen):
pass
else:
tr.write(filen, format="PICKLE")
return 1
[docs]def _rot(st, method="NE->RT"):
"""
rotation.
:param st:
:param method: "NE->RT", "ZNE->LQT".
:return:
.. Note::
the code can handle 'Z12', '123', 'ZNE'. It requires trace header of component_azimuth and component_inclination.
if method == "NE->RT", then the two components rotation will be applied.
But while the components are "123" or "Z12", they are rotated to "ZNE" first.
"""
st3c = st.copy()
# the rotating traces should have the same starttime and endtime
tmin, tmax = _find_start_end_time(stream=st3c)
if tmin > tmax:
return 0
st3c.trim(starttime=tmin, endtime=tmax)
# handling the borehole components 1, 2, Z or 1, 2, 3.
comp = []
for tr in st3c:
comp.append(tr.stats.channel[-1])
comp.sort()
cc = "".join(comp)
if cc == "12Z":
cc = "Z12"
# rotate2zne
if cc in ["123", "Z12"]:
zne = rotate2zne(st3c[0].data, st3c[0].stats.component_azimuth, st3c[0].stats.component_inclination,
st3c[1].data, st3c[1].stats.component_azimuth, st3c[1].stats.component_inclination,
st3c[2].data, st3c[2].stats.component_azimuth, st3c[2].stats.component_inclination
)
for tr, new_data, component in zip(st3c, zne, "ZNE"):
tr.data = new_data
tr.stats.channel = tr.stats.channel[:-1] + component
# rotate to various coordinates
# import matplotlib.pyplot as plt
# print(st3c)
# tr = st3c.select(channel="BHZ")[0]
# fig, ax = plt.subplots(2, 1, sharex=True)
# ax[0].plot(tr.times(), tr.data)
st3c.rotate(method=method)
# tr = st3c.select(channel="BHL")[0]
# ax[0].plot(tr.times(), tr.data)
# ax[0].set_xlim([670, 700])
# plt.tight_layout()
# plt.show()
# os._exit(0)
return st3c
[docs]def _proc(tr, sampling_rate=10):
"""
Basic processing including downsampling, detrend, and demean.
:param tr: raw trace
:param sampling_rate:
:return tr: trace after processing
"""
# deep copy
tr2 = tr.copy()
tr2.interpolate(sampling_rate)
tr2.detrend(type="linear")
tr2.detrend(type="demean")
return tr2
[docs]def _simple_proc(st, sampling_rate=10, njobs=1):
"""
A parallel version of `_proc`, i.e., Basic processing including downsampling, detrend, and demean.
:param st: an obspy stream
:param sampling_rate: expected sampling rate
:param njobs: number of jobs or CPU to use
:return st: stream after processing
"""
# downsampling, detrend, demean
do_work = partial(_proc, sampling_rate=sampling_rate)
# trace_list = []
# for tr in st:
# trace_list.append(tr)
#
st2 = Stream()
logging.info("simple processing for full event correlogram.")
print("simple processing for full event correlogram.")
if njobs == 1:
logging.info('do work sequential (%d cores)', njobs)
for tr in tqdm(st, total=len(st)):
tr2 = do_work(tr)
st2.append(tr2)
else:
logging.info('do work parallel (%d cores)', njobs)
pool = multiprocessing.Pool(njobs)
for tr2 in tqdm(pool.imap_unordered(do_work, st), total=len(st)):
st2.append(tr2)
pool.close()
pool.join()
return st2
[docs]def _proc_event_Z(file, **kwargs):
"""
Processing a single component Z including downsampling, detrend, demean,
remove instrumental response, selection of data by SNR>threshold,
spectral whitening, temporal normalization and auto-correlation.
For event type data.
:param file:
:param kwargs:
:return:
"""
try:
# read file and return an obspy trace
tr = read(file)[0]
# process z-component only.
if tr.stats.channel[-1] is not "Z":
logging.warning("component is not Z: %s", file)
return 0
# first step is downsamping data to reduce computational burden
sampling_rate = kwargs["preprocessing"]["sampling_rate"]
tr.interpolate(sampling_rate)
tr.detrend(type="linear")
tr.detrend(type="demean")
options = kwargs["data_selection_event"]
snr_threshold = options["snr_threshold"]
snr = _select_data_event(tr, **options)
if snr < snr_threshold:
return 0
tr.stats.update({"snr": snr})
logging.info("%s SNR is %f", file, snr)
# remove instrumental response
if kwargs["rm_resp_on"]:
option_resp = kwargs["preprocessing"]["resp"]
tr = remove_response(trace=tr, **option_resp)
# spectral whitening
if kwargs["whiten_on"]:
option_whiten = kwargs["preprocessing"]["whiten"]
tr = spectral_whitening(tr=tr, **option_whiten)
# temporal normalization
if kwargs["time_norm_on"]:
option_time_norm = kwargs["preprocessing"]["time_norm"]
tr = time_norm(tr, **option_time_norm)
# autocorrelation
options = kwargs["correlate"]
tr = _autocorrelation(tr, **options)
# write data to disk
tr.stats.channel = "ZZ" # change trace channel to 'ZZ' after autocorrelation
trace_id = tr.id
station_id = _get_station_id(tr)
event_id = _get_event_id_tr(tr)
filename = trace_id + "_" + event_id + ".pkl"
outpath = kwargs["io"]["outpath"] + "/1_results"
filepath = "/".join([outpath, station_id])
# if not os.path.exists(filepath):
try:
os.makedirs(filepath)
except:
pass
# extracting data, to have the same length of output data
tlen = kwargs["correlate"]["window"]
tlen = abs(tlen[1] - tlen[0])
tr.trim(starttime=tr.stats.starttime, endtime=tr.stats.starttime + tlen, fill_value=0, pad=True)
filen = filepath + "/" + filename
force = kwargs["io"]["force"]
if not force and os.path.exists(filen):
pass
else:
tr.write(filen, format="PICKLE")
return 1
except:
return 0
[docs]def _autocorrelation(tr, window=[-20, 70], filter=[0.5, 4], corners=2, zerophase=True):
"""
Autocorrelation for event type data.
:param tr:
:param window:
:param filter: A tuple or a list containing the lower and upper limit of filter.
:param corners:
:param zerophase:
:return:
.. Note::
filter after autocorrelation.
"""
tr2 = tr.copy()
t1 = tr2.stats.onset + window[0]
t2 = tr2.stats.onset + window[1]
tr2.trim(starttime=t1, endtime=t2)
npts = tr2.stats.npts
data = tr2.data
cc = correlate(in1=data, in2=data, mode="full")
tr2.data = np.copy(cc)
if filter is not None:
tr2.filter(type="bandpass", freqmin=filter[0], freqmax=filter[1],
corners=corners, zerophase=zerophase)
tr2.data = tr2.data[npts:]
return tr2
[docs]def _crosscorrelation(tr1, tr2, window=[-20, 70], filter=[0.5, 4], corners=2, zerophase=True):
"""
cross-correlation for event type data.
:param tr1:
:param tr2:
:param window:
:param filter:
:param corners:
:param zerophase:
:return:
"""
tra = tr1.copy()
trb = tr2.copy()
reverse = False
if tra.stats.phase == "S":
if tra.stats.channel == trb.stats.channel:
# in this case, it is autocorrelation, which means SS
window = window
else:
# in this case it is cross correlation over ZR/LQ or ZT/LT corresponding with Sp RF.
# using different time window since converted P arrives earlier than the primary S.
# if a window [-20, 70] is given, then a window [-70, 20] will be used in this case.
w1, w2 = window
window = [-w2, -w1]
reverse = True
t1 = tr2.stats.onset + window[0]
t2 = tr2.stats.onset + window[1]
tra.trim(starttime=t1, endtime=t2)
trb.trim(starttime=t1, endtime=t2)
npts = tra.stats.npts
data = tra.data
datb = trb.data
cc = correlate(in1=data, in2=datb, mode="full")
tr = tra.copy()
tr.data = np.copy(cc)
tr.filter(type="bandpass", freqmin=filter[0], freqmax=filter[1],
corners=corners, zerophase=zerophase)
if reverse:
tr.data = np.copy(tr.data[::-1])
tr.data = tr.data[npts:]
return tr
[docs]def _select_data_event(tr, dist_range=[30, 90], magnitude=[5.0, 9.0],
snr_threshold=2.0, signal=[-10, 10], noise=[-100, -50],
waterlevel=1.0e-8):
# select according to SNR
distance = tr.stats.distance
if distance < dist_range[0] or distance > dist_range[1]:
return 0
try:
mag = tr.stats.magnitude
if mag < magnitude[0] or mag > magnitude[1]:
return 0
except:
pass
# calculate SNR
snr = _cal_event_snr(tr, signal=signal, noise=noise, waterlevel=waterlevel)
return snr
[docs]def _cal_event_snr(tr, signal=[-10, 10], noise=[-100, -50], waterlevel=1.0e-8):
"""
Calculation of SNR for event data.
:param tr:
:param signal:
:param noise:
:param waterlevel:
:return:
"""
tr_sig = tr.copy()
tr_noi = tr.copy()
t1 = tr.stats.onset + signal[0]
t2 = tr.stats.onset + signal[1]
if t1 < tr.stats.starttime or t2 > tr.stats.endtime:
logging.warning("t1 < tr.stats.starttime")
return 0
tr_sig.trim(starttime=t1, endtime=t2)
t1 = tr.stats.onset + noise[0]
t2 = tr.stats.onset + noise[1]
if t1 < tr.stats.starttime or t2 > tr.stats.endtime:
logging.warning("t1 < tr.stats.starttime")
return 0
tr_noi.trim(starttime=t1, endtime=t2)
sig = envelope(tr_sig.data)
noi = envelope(tr_noi.data)
snr = np.max(sig) / max(np.max(noi), waterlevel)
return snr
[docs]def processing_noise_Z(jsonfile):
"""
Parallel processing of noise autocorrelation.
:param jsonfile:
:return:
"""
# load imported data from files
kwargs = _load_json(jsonfile)
njobs = kwargs["njobs"]
datapath = kwargs["io"]["outpath"] + "/0_raw"
logging.info("This may take a lot of time if dataset is huge.")
# get filenames in Unix style for further processing
files = glob.glob(datapath + "/*/*.pkl")
logging.info("%d files found.", len(files))
# processing includes downsampling, detrend, demean,
# instrument response removal, spectral whitening, temporal normalization
# autocorrelation and filter, then output results.
do_work = partial(_proc_noise_Z, **kwargs)
numbers = []
if njobs == 1:
logging.info('do work sequential (%d cores)', njobs)
for file in tqdm(files, total=len(files)):
num = do_work(file)
numbers.append(num)
else:
logging.info('do work parallel (%d cores)', njobs)
pool = multiprocessing.Pool(njobs)
for num in tqdm(pool.imap_unordered(do_work, files), total=len(files)):
numbers.append(num)
pool.close()
pool.join()
logging.info("%d/%d files processed.", sum(numbers), len(files))
[docs]def _proc_noise_Z(file, **kwargs):
"""
Internal functions of calculate z-comp autocorrelation for noise data.
:param file:
:param kwargs:
:return:
"""
# read file and return an obspy trace
tr = read(file)[0]
# process z-component only.
if tr.stats.channel[-1] is not "Z":
return 0
# first step is downsamping data to reduce computational burden
sampling_rate = kwargs["preprocessing"]["sampling_rate"]
tr.interpolate(sampling_rate)
tr.detrend(type="linear")
tr.detrend(type="demean")
# remove instrumental response
if kwargs["rm_resp_on"]:
option_resp = kwargs["preprocessing"]["resp"]
tr = remove_response(trace=tr, **option_resp)
# spectral whitening
if kwargs["whiten_on"]:
option_whiten = kwargs["preprocessing"]["whiten"]
tr = spectral_whitening(tr=tr, **option_whiten)
# temporal normalization
if kwargs["time_norm_on"]:
option_time_norm = kwargs["preprocessing"]["time_norm"]
tr = time_norm(tr, **option_time_norm)
# sliding autocorrelation
options = kwargs["correlate_noise"]
st = _sliding_autocorrelation(tr=tr, **options)
# extracting data
tlen = kwargs["correlate"]["window"]
tlen = abs(tlen[1] - tlen[0])
for tr in st:
tr.trim(starttime=tr.stats.starttime, endtime=tr.stats.starttime + tlen, fill_value=0, pad=True)
# write data to disk
for tr in st:
tr.stats.channel = "ZZ" # change trace channel to 'ZZ' after autocorrelation
trace_id = tr.id
station_id = _get_station_id(tr)
event_id = _get_noise_id(tr)
filename = trace_id + "_" + event_id + ".pkl"
outpath = kwargs["io"]["outpath"] + "/1_results"
filepath = "/".join([outpath, station_id])
# if not os.path.exists(filepath):
try:
os.makedirs(filepath)
except:
pass
filen = filepath + "/" + filename
force = kwargs["io"]["force"]
if not force and os.path.exists(filen):
pass
else:
tr.write(filen, format="PICKLE")
return 1
[docs]def _sliding_autocorrelation(tr, length=3600, overlap=1800,
filter=[0.5, 4], corners=2, zerophase=True):
"""
Sliding autocorrelation for noise data.
:param tr:
:param length:
:param overlap:
:param filter:
:param corners:
:param zerophase:
:return:
"""
trace = tr.copy()
time_series = iter_time(tr=trace, length=length, overlap=overlap)
# print(time_series)
if len(time_series) < 1:
return 0
st = Stream()
for t1, t2 in time_series:
tr2 = trace.copy()
tr2.trim(starttime=t1, endtime=t2)
# get gaps
gap_st = Stream([tr2])
gaps = gap_st.get_gaps()
if len(gaps) > 0:
continue
npts = tr2.stats.npts
data = tr2.data
cc = correlate(in1=data, in2=data, mode="full")
tr2.data = np.copy(cc)
tr2.filter(type="bandpass", freqmin=filter[0], freqmax=filter[1],
corners=corners, zerophase=zerophase)
tr2.data = tr2.data[npts:]
st.append(tr2)
if len(st) < 1:
return 0
else:
return st