"""
Implements the Rowshandel (2013) directivity model. Note that the report
provides many options for computing the directivity factor and we have
also used some coefficients that are unpublished updates by Rowshandel.
Some of the implementation options are controlled by arguments (e.g.,
mtype).
Fd is the directivity function. One of the options is whether or not to
use the 'centering' term (the argument is 'centered' and defaults to
True). If it is True then
Fd = C1 * (XiPrime - Xic) * LD * DT * WP
otherwise
Fd = (C1 * Xiprime + C2) * LD * DT * WP
where
- C1 and C2 are regression coefficients.
- XiPrime is a factor that accounts for rupture propagation and slip
direction.
- Xic is the centering term.
- LD is the rupture length de-normalization factor.
- DT is the distance-taper.
- WP is the narrow-band multiplier.
Note that Fd is intended to be used in GMPEs as:
ln(IM_dir) = ln(IM) + Fd
where
- Fd is the directivity effect.
- IM is the intensity measure predicted by the GMPE.
- IM_dir is the directivity-adjusted IM.
To do
- Add checks on function arguments (e.g., mtype) for valid values.
- Interpolate periods.
References:
Rowshandel, B. (2013). Rowshandel’s NGA-West2 directivity model,
Chapter 3 of PEER Report No. 2013/09, P. Spudich (Editor), Pacific
Earthquake Engineering Research Center, Berkeley, CA.
`[link] <http://peer.berkeley.edu/publications/peer_reports/reports_2013/webPEER-2013-09-Spudich.pdf>`__
""" # noqa
import numpy as np
import copy
import openquake.hazardlib.geo as geo
from openquake.hazardlib.geo.utils import OrthographicProjection
from impactutils.rupture import utils
from impactutils.rupture.distance import get_distance
from impactutils.vectorutils.ecef import latlon2ecef
from impactutils.vectorutils.ecef import ecef2latlon
from impactutils.vectorutils.vector import Vector
[docs]class Rowshandel2013(object):
__c1 = [0.35, 0.75, 0.95, 1.28, 1.60]
__c2 = [-0.035, -0.10, -0.15, -0.26, -0.30]
__periods = [1.0, 3.0, 5.0, 7.5, 10.0]
def __init__(
self,
origin,
rup,
lat,
lon,
dep,
dx,
T,
a_weight=0.5,
mtype=1,
simpleDT=False,
centered=True,
):
"""
Constructor for rowshandel2013.
Args:
origin: Origin instance.
rup: Rupture instance.
lat (ndarray): Numpy array of site latitudes.
lon (ndarray): Numpy array of site longitudes.
dep (ndarray): Numpy array of site depths (km); positive down.
dx (float): Target mesh spacing for subruptures in km. The mesh
snaps to the edges of the quadrilaterals so the actual mesh
spacing will not equal this exactly; spacing in x and y will
not be equal.
T (list): List floats (or a float) for periods to compute fd;
Currently, only acceptable values are 1, 2, 3, 4, 5, 7.5.
a_weight (float): Weighting factor for how p-dot-q and s-dot-q are
averaged; 0 for only p-dot-q (propagation factor) and 1 for
only s-dot-q (slip factor).
mtype (int): Either 1 or 2; 1 for adding only positive components
of dot products, 2 for adding all components of dot products.
simpleDT (bool): Should the simpler DT equation be used? Usually
False.
centered (bool): Should the centered directivity parameter be used?
"""
self._origin = origin
self._rup = rup
self._hyp = origin.getHypo()
self._lat = lat
self._lon = lon
self._dep = dep
self._rake = origin.rake
self._dx = dx
self._M = origin.mag
if isinstance(T, float):
self._T = [T]
else:
self._T = T
if (a_weight > 1.0) or (a_weight < 0):
raise ValueError("a_weight must be between 0 and 1.")
self._a_weight = a_weight
if (mtype != 1) and (mtype != 2):
raise ValueError("mtype can onlty be 1 or 2.")
self._mtype = mtype
self._simpleDT = simpleDT
self._centered = centered
# Period independent parameters
self.__computeWrup()
self.__computeLD()
self.__computeXiPrime()
self.__getCenteringTerm()
self.__computeFd()
[docs] @classmethod
def fromSites(
cls,
origin,
rup,
sites,
dx,
T,
a_weight=0.5,
mtype=1,
simpleDT=False,
centered=True,
):
"""Construct a rowshandel2013 instance from a sites instance.
Class method for constructing a rowshandel2013 instance from
a sites instance.
Args:
origin: Origin instance.
rup: Rupture instance.
sites: Sites object.
dx: Float for target mesh spacing for subruptures in km. The mesh
snaps to the edges of the quadrilaterals so the actual mesh
spacing will not equal this exactly; spacing in x and y will
not be equal.
T: Period; Currently, only acceptable values are 1, 2, 3, 4, 5,
7.5.
a_weight: Weighting factor for how p-dot-q and s-dot-q are
averaged; 0 for only p-dot-q (propagation factor) and 1 for
only s-dot-q (slip factor).
mtype: Integer, either 1 or 2; 1 for adding only positive
components of dot products, 2 for adding all components of dot
products.
simpleDT: Boolean; should the simpler DT equation be used? Usually
False.
centered: Boolean; should the centered directivity parameter be
used
Returns:
Rowshandel2013 directivity class.
"""
sm_dict = sites.getVs30Grid().getGeoDict()
west = sm_dict.xmin
east = sm_dict.xmax
south = sm_dict.ymin
north = sm_dict.ymax
nx = sm_dict.nx
ny = sm_dict.ny
lats = np.linspace(north, south, ny)
lons = np.linspace(west, east, nx)
lon, lat = np.meshgrid(lons, lats)
dep = np.zeros_like(lon)
return cls(
origin, rup, lat, lon, dep, dx, T, a_weight, mtype, simpleDT, centered
)
[docs] def getFd(self):
"""
:returns:
Numpy array of Fd; this is the directivity amplification factor
(log units).
"""
return copy.deepcopy(self._fd)
[docs] def getXiPrime(self):
"""
Returns:
Numpy array of Xi'; this is the variable that accounts for
geometric factors in Fd.
"""
return copy.deepcopy(self._xi_prime)
[docs] def getDT(self):
"""
Returns:
List of numpy arrays of the distance taper factor. Length is the
number of periods.
"""
return copy.deepcopy(self._DTlist)
[docs] def getWP(self):
"""
Returns:
List of numpy array of the narrow-band multiplier. Length is the
number of periods.
"""
return copy.deepcopy(self._WPlist)
[docs] def getLD(self):
"""
Returns:
Numpy array of the rupture length de-normalization factor.
"""
return copy.deepcopy(self._LD)
def __computeFd(self):
"""
Fd is the term that modifies the GMPE output as an additive term
(in log space).
ln(Y) = f(M, R, ...) + Fd
"""
xcipc = self._xi_prime - self._xi_c
# fd is a list with same length as T
self._fd = [None] * len(self._T)
self._DTlist = [None] * len(self._T)
self._WPlist = [None] * len(self._T)
for i in range(len(self._T)):
period = self._T[i]
c1sel = self.__c1[self.__periods == period]
c2sel = self.__c2[self.__periods == period]
# Period dependent parameters
self.__computeWP(period)
self.__computeDT(period)
self._DTlist[i] = copy.deepcopy(self._DT)
self._WPlist[i] = copy.deepcopy(self._WP)
if self._centered:
self._fd[i] = c1sel * self._WP * self._DT * xcipc
else:
self._fd[i] = (
(c1sel * self._xi_prime + c2sel) * self._LD * self._DT * self._WP
)
def __computeWrup(self):
"""
Compute the the portion (in km) of the width of the rupture which
ruptures up-dip from the hypocenter to the top of the rupture.
Wrup is the portion (in km) of the width of the rupture which
ruptures up-dip from the hypocenter to the top of the rupture.
* This is ambiguous for ruptures with varible top of rupture (not
allowed in NGA). For now, lets just compute this for the
quad where the hypocenter is located.
* Alternative is to compute max Wrup for the different quads.
"""
nquad = len(self._rup.getQuadrilaterals())
# ---------------------------------------------------------------------
# First find which quad the hypocenter is on
# ---------------------------------------------------------------------
x, y, z = latlon2ecef(self._hyp.latitude, self._hyp.longitude, self._hyp.depth)
hyp_ecef = np.array([[x, y, z]])
qdist = np.zeros(nquad)
for i in range(0, nquad):
qdist[i] = utils._quad_distance(self._rup.getQuadrilaterals()[i], hyp_ecef)
ind = int(np.where(qdist == np.min(qdist))[0][0])
# *** check that this doesn't break with more than one quad
q = self._rup.getQuadrilaterals()[ind]
# ---------------------------------------------------------------------
# Compute Wrup on that quad
# ---------------------------------------------------------------------
pp0 = Vector.fromPoint(
geo.point.Point(q[0].longitude, q[0].latitude, q[0].depth)
)
hyp_ecef = Vector.fromPoint(
geo.point.Point(self._hyp.longitude, self._hyp.latitude, self._hyp.depth)
)
hp0 = hyp_ecef - pp0
ddv = utils.get_quad_down_dip_vector(q)
self._Wrup = Vector.dot(ddv, hp0) / 1000
def __computeXiPrime(self):
"""
Computes the xi' value.
"""
hypo_ecef = Vector.fromPoint(
geo.point.Point(self._hyp.longitude, self._hyp.latitude, self._hyp.depth)
)
slat = self._lat
slon = self._lon
# Convert site to ECEF:
site_ecef_x = np.ones_like(slat)
site_ecef_y = np.ones_like(slat)
site_ecef_z = np.ones_like(slat)
# Make a 3x(#number of sites) matrix of site locations
# (rows are x, y, z) in ECEF
site_ecef_x, site_ecef_y, site_ecef_z = latlon2ecef(
slat, slon, np.zeros(slon.shape)
)
site_mat = np.array(
[
np.reshape(site_ecef_x, (-1,)),
np.reshape(site_ecef_y, (-1,)),
np.reshape(site_ecef_z, (-1,)),
]
)
xi_prime_unscaled = np.zeros_like(slat)
# Normalize by total number of subruptures. For mtype == 1, the number
# of subruptures will vary with site and be different for xi_s and
# xi_p, so keep two variables and sum them for each quad.
nsubs = np.zeros(np.product(slat.shape))
nsubp = np.zeros(np.product(slat.shape))
xi_prime_s = np.zeros(np.product(slat.shape))
xi_prime_p = np.zeros(np.product(slat.shape))
for k in range(len(self._rup.getQuadrilaterals())):
# Select a quad
q = self._rup.getQuadrilaterals()[k]
# Quad mesh (ECEF coords)
mesh = utils.get_quad_mesh(q, self._dx)
# Rupture plane normal vector (ECEF coords)
rpnv = utils.get_quad_normal(q)
rpnvcol = np.array([[rpnv.x], [rpnv.y], [rpnv.z]])
cp_mat = np.array(
[
np.reshape(mesh["cpx"], (-1,)),
np.reshape(mesh["cpy"], (-1,)),
np.reshape(mesh["cpz"], (-1,)),
]
)
# Compute matrix of p vectors
hypcol = np.array([[hypo_ecef.x], [hypo_ecef.y], [hypo_ecef.z]])
pmat = cp_mat - hypcol
# Project pmat onto quad
ndotp = np.sum(pmat * rpnvcol, axis=0)
pmat = pmat - ndotp * rpnvcol
mag = np.sqrt(np.sum(pmat * pmat, axis=0))
pmatnorm = pmat / mag # like r1
# According to Rowshandel:
# "The choice of the +/- sign in the above equations
# depends on the (along-the-strike and across-the-dip)
# location of the rupturing sub-fault relative to the
# location of the hypocenter."
# and:
# "for the along the strike component of the slip unit
# vector, the choice of the sign should result in the
# slip unit vector (s) being exactly the same as the
# rupture unit vector (p) for a pure strike-slip case"
# Strike slip and dip slip components of unit slip vector
# (ECEF coords)
ds_mat, ss_mat = _get_quad_slip_ds_ss(q, self._rake, cp_mat, pmatnorm)
slpmat = ds_mat + ss_mat
mag = np.sqrt(np.sum(slpmat * slpmat, axis=0))
slpmatnorm = slpmat / mag
# Loop over sites
for i in range(site_mat.shape[1]):
sitecol = np.array(
[[site_mat[0, i]], [site_mat[1, i]], [site_mat[2, i]]]
)
qmat = sitecol - cp_mat # 3x(ni*nj), like r2
mag = np.sqrt(np.sum(qmat * qmat, axis=0))
qmatnorm = qmat / mag
# Propagation dot product
pdotqraw = np.sum(pmatnorm * qmatnorm, axis=0)
# Slip vector dot product
sdotqraw = np.sum(slpmatnorm * qmatnorm, axis=0)
if self._mtype == 1:
# Only sum over (+) directivity effect subruptures
# xi_p_prime
pdotq = pdotqraw.clip(min=0)
nsubp[i] = nsubp[i] + np.sum(pdotq > 0)
# xi_s_prime
sdotq = sdotqraw.clip(min=0)
nsubs[i] = nsubs[i] + np.sum(sdotq > 0)
elif self._mtype == 2:
# Sum over contributing subruptures
# xi_p_prime
pdotq = pdotqraw
nsubp[i] = nsubp[i] + cp_mat.shape[1]
# xi_s_prime
sdotq = sdotqraw
nsubs[i] = nsubs[i] + cp_mat.shape[1]
# Normalize by n sub ruptures later
xi_prime_s[i] = xi_prime_s[i] + np.sum(sdotq)
xi_prime_p[i] = xi_prime_p[i] + np.sum(pdotq)
# Apply a water level to nsubp and nsubs to avoid division by
# zero. This should only occur when the numerator is also zero
# and so the resulting value should be zero.
nsubs = np.maximum(nsubs, 1)
nsubp = np.maximum(nsubp, 1)
# We are outside the 'k' loop over nquads.
# o Normalize xi_prime_s and xi_prime_p
# o Reshape them
# o Add them together with the 'a' weights
xi_prime_tmp = (self._a_weight) * (xi_prime_s / nsubs) + (
1 - self._a_weight
) * (xi_prime_p / nsubp)
xi_prime_unscaled = xi_prime_unscaled + np.reshape(xi_prime_tmp, slat.shape)
# Scale so that xi_prime has range (0, 1)
if self._mtype == 1:
xi_prime = xi_prime_unscaled
elif self._mtype == 2:
xi_prime = 0.5 * (xi_prime_unscaled + 1)
self._xi_prime = xi_prime
def __computeLD(self):
"""
Computes LD -- the rupture length de-normalization term.
"""
# Use an orthographic projection
latmin = self._lat.min()
latmax = self._lat.max()
lonmin = self._lon.min()
lonmax = self._lon.max()
proj = OrthographicProjection(lonmin, lonmax, latmax, latmin)
# Get epi projection
epi_x, epi_y = proj(self._hyp.longitude, self._hyp.latitude)
# Get the lines for the top edge of the rupture
qds = self._rup.getQuadrilaterals()
nq = len(qds)
top_lat = np.array([[]])
top_lon = np.array([[]])
top_dep = np.array([[]])
for j in range(0, nq):
top_lat = np.append(top_lat, qds[j][0].latitude)
top_lat = np.append(top_lat, qds[j][1].latitude)
top_lon = np.append(top_lon, qds[j][0].longitude)
top_lon = np.append(top_lon, qds[j][1].longitude)
top_dep = np.append(top_dep, qds[j][0].depth)
top_dep = np.append(top_dep, qds[j][1].depth)
top_x, top_y = proj(top_lon, top_lat)
Lrup_max = 400
slat = self._lat
slon = self._lon
LD = np.zeros_like(slat)
Ls = np.zeros_like(slat)
ni, nj = LD.shape
for i in range(ni):
for j in range(nj):
# -------------------------------------------------------------
# Compute Ls
# -------------------------------------------------------------
# Convert to local orthographic
site_x, site_y = proj(slon[i, j], slat[i, j])
# Shift so center is at epicenter
site_x2 = site_x - epi_x
site_y2 = site_y - epi_y
top_x2 = top_x - epi_x
top_y2 = top_y - epi_y
# Angle to rotate to put site on x-axis
alpha = np.arctan2(site_y2, site_x2)
axis = [0, 0, 1]
rmat = _rotation_matrix(axis, -alpha)
llr = [None] * len(top_lat)
# Apply rotation to each point on trace
for k in range(len(top_lat)):
llr[k] = np.dot(rmat, [top_x2[k], top_y2[k], 0])
site3 = np.dot(rmat, [site_x2, site_y2, 0])
Li = np.min([np.max([a[0] for a in llr]), site3[0]])
# -------------------------------------------------------------
# Compute LD and save results into matrices
# -------------------------------------------------------------
Lrup = np.sqrt(Li * Li + self._Wrup * self._Wrup)
LD[i, j] = np.log(Lrup) / np.log(Lrup_max)
Ls[i, j] = Li
self._LD = LD
self._Ls = Ls
def __computeDT(self, period):
"""
Computes DT -- the distance taper term.
"""
slat = self._lat
slon = self._lon
site_z = np.zeros_like(slat)
ddict = get_distance("rrup", slat, slon, site_z, self._rup)
Rrup = np.reshape(ddict["rrup"], (-1,))
nsite = len(Rrup)
if self._simpleDT: # eqn 3.10
R1 = 35
R2 = 70
DT = np.ones(nsite)
ix = tuple([(Rrup > R1) & (Rrup < R2)])
DT[ix] = 2 - Rrup[ix] / R1
DT[Rrup >= R2] = 0
else: # eqn 3.9
if period >= 1:
R1 = 20 + 10 * np.log(period)
R2 = 2 * (20 + 10 * np.log(period))
else:
R1 = 20
R2 = 40
DT = np.ones(nsite)
# As written in report:
# DT[(Rrup > R1) & (Rrup < R2)] = \
# 2 - Rrup[(Rrup > R1) & (Rrup < R2)]/(20 + 10 * np.log(period))
# Modification:
DT[(Rrup > R1) & (Rrup < R2)] = 2 - Rrup[(Rrup > R1) & (Rrup < R2)] / R1
# Note: it is not clear if the above modification is 'correct'
# but it gives results that make more sense
DT[Rrup >= R2] = 0
DT = np.reshape(DT, slat.shape)
self._DT = DT
def __getCenteringTerm(self):
L = self._rup.getLength()
CSSLP = -0.32 + 0.15 * np.log(L)
CTRST = -0.20 + 0.09 * np.log(L)
CNRML = -0.20 + 0.08 * np.log(L)
if self._rake > 0:
act = (CSSLP - CTRST) / 2
cct = (CSSLP + CTRST) / 2
self._xi_c = act * np.cos(2 * np.radians(self._rake)) + cct
else:
act = (CSSLP - CNRML) / 2
cct = (CSSLP + CNRML) / 2
self._xi_c = act * np.cos(2 * np.radians(self._rake)) + cct
def __computeWP(self, period):
"""
Computes WP -- the narrow-band multiplier.
"""
# As written in report:
# sig = 0.6
# Tp = np.exp(1.27*self._M - 7.28)
# Update (Rowshandel, personal communication)
sig = 0.55
Tp = np.exp(1.04 * self._M - 6.0)
self._WP = np.exp(
-(np.log10(period / Tp) * np.log10(period / Tp)) / (2 * sig * sig)
)
[docs] @classmethod
def getPeriods(cls):
"""
Returns a list of the periods for which the model is defined.
"""
return cls.__periods
def _get_quad_slip_ds_ss(q, rake, cp, p):
"""
Compute the DIP SLIP and STRIKE SLIP components of the unit slip vector in
ECEF coords for a quad and rake angle.
:param q:
A quadrilateral.
:param rake:
Direction of motion of the hanging wall relative to the
foot wall, as measured by the angle (deg) from the strike vector.
:param cp:
A 3x(n sub rupture) array giving center points of each sub rupture
in ECEF coords.
:param p:
A 3x(n sub rupture) array giving the unit vectors of the propagation
vector on each sub rupture in ECEF coords.
:returns:
List of dip slip and strike slip components (each is a matrix)
of the unit slip vector in ECEF space.
"""
# Get quad vertices, strike, dip
P0, P1 = q[0:2]
strike = P0.azimuth(P1)
dip = utils.get_quad_dip(q)
# Slip unit vectors in 'local' (i.e., z-up, x-east) coordinates
d1_local = utils.get_local_unit_slip_vector_DS(strike, dip, rake)
s1_local = utils.get_local_unit_slip_vector_SS(strike, dip, rake)
# Convert to a column array
d1_col = np.array([[d1_local.x], [d1_local.y], [d1_local.z]])
s1_col = np.array([[s1_local.x], [s1_local.y], [s1_local.z]])
# Define 'local' coordinate system
qlats = [a.latitude for a in q]
qlons = [a.longitude for a in q]
proj = OrthographicProjection(
np.min(qlons), np.max(qlons), np.min(qlats), np.max(qlats)
)
# Convert p and cp to geographic coords
p0lat, p0lon, p0z = ecef2latlon(
cp[
0,
],
cp[
1,
],
cp[
2,
],
)
p1lat, p1lon, p1z = ecef2latlon(
cp[
0,
]
+ p[
0,
],
cp[
1,
]
+ p[
1,
],
cp[
2,
]
+ p[
2,
],
)
# Convert p to 'local' coords
p0x, p0y = proj(p0lon, p0lat)
p1x, p1y = proj(p1lon, p1lat)
px = p1x - p0x
py = p1y - p0y
pz = p1z - p0z
# Apply sign changes in 'local' coords
s1mat = np.array(
[
[np.abs(s1_col[0]) * np.sign(px)],
[np.abs(s1_col[1]) * np.sign(py)],
[np.abs(s1_col[2]) * np.sign(pz)],
]
)
# [np.abs(s1_col[2])*np.ones_like(pz)]])
dipsign = -np.sign(np.sin(np.radians(rake)))
d1mat = np.array(
[
[d1_col[0] * np.ones_like(px) * dipsign],
[d1_col[1] * np.ones_like(py) * dipsign],
[d1_col[2] * np.ones_like(pz) * dipsign],
]
)
# Need to track 'origin'
s0 = np.array([[0], [0], [0]])
# Convert from 'local' to geographic coords
s1_ll = proj(
s1mat[
0,
],
s1mat[
1,
],
reverse=True,
)
d1_ll = proj(
d1mat[
0,
],
d1mat[
1,
],
reverse=True,
)
s0_ll = proj(s0[0], s0[1], reverse=True)
# And then back to ECEF:
s1_ecef = latlon2ecef(
s1_ll[1],
s1_ll[0],
s1mat[
2,
],
)
d1_ecef = latlon2ecef(
d1_ll[1],
d1_ll[0],
d1mat[
2,
],
)
s0_ecef = latlon2ecef(s0_ll[1], s0_ll[0], s0[2])
s00 = s0_ecef[0].reshape(-1)
s01 = s0_ecef[1].reshape(-1)
s02 = s0_ecef[2].reshape(-1)
d_mat = np.array(
[
d1_ecef[0].reshape(-1) - s00,
d1_ecef[1].reshape(-1) - s01,
d1_ecef[2].reshape(-1) - s02,
]
)
s_mat = np.array(
[
s1_ecef[0].reshape(-1) - s00,
s1_ecef[1].reshape(-1) - s01,
s1_ecef[2].reshape(-1) - s02,
]
)
return d_mat, s_mat
def _rotation_matrix(axis, theta):
"""
Return the rotation matrix associated with counterclockwise rotation about
the given axis by theta radians.
Source: Response by 'unutbu' in this thread:
http://stackoverflow.com/questions/6802577/python-rotation-of-3d-vector
"""
axis = np.asarray(axis)
theta = np.asarray(theta)
axis = axis / np.sqrt(np.dot(axis, axis))
a = np.cos(theta / 2)
b, c, d = -axis * np.sin(theta / 2)
aa, bb, cc, dd = a * a, b * b, c * c, d * d
bc, ad, ac, ab, bd, cd = b * c, a * d, a * c, a * b, b * d, c * d
return np.array(
[
[aa + bb - cc - dd, 2 * (bc + ad), 2 * (bd - ac)],
[2 * (bc - ad), aa + cc - bb - dd, 2 * (cd + ab)],
[2 * (bd + ac), 2 * (cd - ab), aa + dd - bb - cc],
]
)