# stdlib imports
import sqlite3
import xml.etree.ElementTree as ET
import copy
from collections import OrderedDict
import re
import logging
import json
# third party imports
import numpy as np
# local imports
TABLES = OrderedDict(
(
(
"station",
OrderedDict(
(
("id", "text primary key"), # id is net.sta
("refid", "int"), # foreign "key" to reference table
("network", "text"),
("code", "text"),
("name", "text"),
("lat", "float"),
("lon", "float"),
("elev", "float"),
("vs30", "float"),
("stddev", "float"),
("instrumented", "int"),
("source", "text"),
)
),
),
("imt", OrderedDict((("id", "integer primary key"), ("imt_type", "text")))),
(
"amp",
OrderedDict(
(
("id", "integer primary key"),
("station_id", "text"),
("imt_id", "int"),
("original_channel", "text"),
("orientation", "text"),
("amp", "float"),
("stddev", "float"),
("nresp", "int"),
("flag", "text"),
)
),
),
(
"reference",
OrderedDict(
(
("id", "integer primary key"),
("shortref", "text"),
("longref", "text"),
("description", "text"),
)
),
),
)
)
#
# These are the netid's that indicate MMI data
#
CIIM_TUPLE = ("dyfi", "mmi", "intensity", "ciim")
[docs]class StationList(object):
"""
A class to facilitate reading ShakeMap formatted XML fies of peak
amplitudes and MMI, and
produce tables of station data. Seismic stations are considered to
be 'instrumented'; MMI data is not instrumented and is indicated
in the ShakeMap XML with a ``netid`` attribute of "DYFI," "MMI,"
"INTENSITY," or "CIIM."
.. note::
Typically the user will call the class method :meth:`fromXML`
to create a :class:`StationList` object the first time
a set of station files are processed. (Or, as an alternative,
the user can call :meth:`loadFromXML` and :meth:`fillTables`
sequentially.)
This will create a database at the location specified by the
``dbfile`` parameter to :meth:`fromXML`. Subsequent programs
can use the default constructor to simply load ``dbfile``.
"""
def __init__(self, db):
"""
The default constructor reads a pre-built SQLite database of
station data.
Args:
dbfile (str):
A SQLite database file containing pre-processed
station data.
Returns:
A :class:`StationList` object.
"""
self.db = db
self.cursor = self.db.cursor()
def __del__(self):
"""
Closes out the database when the object is destroyed.
"""
self.db.commit()
self.cursor.close()
self.db.close()
[docs] @classmethod
def loadFromSQL(cls, sql, dbfile=":memory:"):
"""
Create a new object from saved SQL code (see :meth:`dumpToSQL`).
Args:
sql (str):
SQL code to create and populate the database
dbfile (str):
The path to a file in which the database will reside.
The default is ':memory:' for an in-memory database.
Returns:
:class:`Stationlist` object.
"""
db = sqlite3.connect(dbfile, timeout=15)
self = cls(db)
self.cursor.executescript(sql)
return self
[docs] def dumpToSQL(self):
"""
Dump the database as a string of SQL code (see :meth:`loadFromSQL`).
Args:
None
Returns:
A string of SQL sufficient to restore and repopulate the
database.
"""
return "\n".join(list(self.db.iterdump()))
[docs] @classmethod
def loadFromFiles(cls, filelist, min_nresp=3, dbfile=":memory:"):
"""
Create a StationList object by reading one or more ShakeMap XML or
JSON input files.
Args:
filelist (sequence of str):
Sequence of ShakeMap XML and/or JSON input files to read.
min_nresp (int):
The minimum number of DYFI observations required to form and valid
observation. Default is 3.
dbfile (str):
Path to a file into which to write the SQLite database.
The default is ':memory:' for an in-memory database.
Returns:
:class:`StationList` object
"""
# Create the database and tables
db = sqlite3.connect(dbfile, timeout=15)
self = cls(db)
self._createTables()
self.addData(filelist, min_nresp)
return self
[docs] def addData(self, filelist, min_nresp):
"""
Add data from XML or JSON files to the existing StationList.
Args:
filelist:
A list of ShakeMap XML or JSON input files.
min_nresp (int):
The minimum number of DYFI observations required to form and valid
observation.
Returns:
nothing: Nothing.
"""
jsonfiles = [x for x in filelist if x.endswith(".json")]
xmlfiles = [x for x in filelist if x.endswith(".xml")]
if len(jsonfiles):
self._loadFromJSON(jsonfiles, min_nresp)
if len(xmlfiles):
self._loadFromXML(xmlfiles, min_nresp)
return self
def _loadFromXML(self, xmlfiles, min_nresp):
"""
Create a StationList object by reading one or more ShakeMap XML input
files.
Args:
xmlfiles (sequence of str):
Sequence of ShakeMap XML input files to read.
min_nresp (int):
The minimum number of DYFI responses for an observation to be
included in the station output.
Returns:
nothing: Nothing.
"""
# Parse the xml into a dictionary
stationdict = {}
imtset = set()
for xmlfile in xmlfiles:
stationdict, ims = self._filter_station(xmlfile, stationdict, min_nresp)
imtset |= ims
# fill the database and create the object from it
self._loadFromDict(stationdict, imtset)
self._fixOrientations()
return
def _loadFromJSON(self, jsonfiles, min_nresp):
"""
Create a StationList object by reading one or more ShakeMap JSON
data files.
Args:
jsonfiles (sequence of str):
Sequence of ShakeMap JSON data files to read.
min_nresp (int):
The minimum number of DYFI responses for an observation to be
included in the station output.
Returns:
nothing: Nothing.
"""
#
# Get the station codes for all the stations in the db
#
query = "SELECT id FROM station"
self.cursor.execute(query)
sta_set = set([z[0] for z in self.cursor.fetchall()])
orig_imt_set = self.getIMTtypes()
imt_set = orig_imt_set.copy()
amp_set = set()
station_rows = []
amp_rows = []
for jfile in jsonfiles:
jfp = open(jfile, "r")
stas = json.load(jfp)
jfp.close()
if "type" not in stas:
logging.warn(f"{jfile} appears to contain no stations, skipping")
continue
if stas["type"] != "FeatureCollection":
logging.warn(f"{jfile} is not a ShakeMap JSON stationlist, skipping")
continue
# if present, insert reference stuff into the database
if "references" in stas:
for shortref, refdict in stas["references"].items():
longref = refdict["long_reference"]
description = refdict["description"]
query = (
f"INSERT INTO reference "
"(shortref, longref, description) VALUES "
f'("{shortref}","{longref}","{description}")'
)
self.cursor.execute(query)
self.db.commit()
for feature in stas["features"]:
sta_id = feature["id"]
if sta_id in sta_set:
continue
else:
sta_set.add(sta_id)
lon = feature["geometry"]["coordinates"][0]
lat = feature["geometry"]["coordinates"][1]
netid = feature["properties"]["network"]
code = sta_id.replace(netid + ".", "")
network = feature["properties"]["network"]
name = feature["properties"].get("name", None)
elev = feature["properties"].get("elev", None)
vs30 = feature["properties"].get("vs30", None)
source = feature["properties"].get("source", None)
refid = feature["properties"].get("refid", None)
stddev = 0
# is this an intensity observation or an instrument?
instrumented = int(netid.lower() not in CIIM_TUPLE)
station_rows.append(
(
sta_id,
network,
code,
name,
lat,
lon,
elev,
vs30,
stddev,
instrumented,
source,
refid,
)
)
if not instrumented:
try:
amplitude = float(
feature["properties"].get("intensity", np.nan)
)
except (ValueError, TypeError):
amplitude = np.nan
try:
stddev = float(
feature["properties"].get("intensity_stddev", np.nan)
)
except (ValueError, TypeError):
stddev = np.nan
try:
nresp = int(feature["properties"].get("nresp", -1))
except (ValueError, TypeError):
nresp = -1
if nresp >= 0 and nresp < min_nresp:
continue
flag = feature["properties"]["intensity_flag"]
if not flag or flag == "":
flag = "0"
amp_rows.append(
[sta_id, "MMI", "mmi", "h", amplitude, stddev, flag, nresp]
)
imt_set.add("MMI")
continue
#
# Collect the channel names for this station to see if we
# can resolve the orientation of any of the channels ending
# with "1" or "2".
#
chan_names = []
for comp in feature["properties"]["channels"]:
chan_names.append(comp["name"])
# Some legacy data stupidly names the channel the same as the
# station name. If there is only one channel, and its name
# is the station name, we assume it's horizontal and move on.
if len(chan_names) == 1 and chan_names[0] == name:
orients = ["H"]
else:
orients = _getOrientationSet(chan_names)
#
# Now insert the amps into the database
#
for ic, comp in enumerate(feature["properties"]["channels"]):
original_channel = comp["name"]
orientation = orients[ic]
for amp in comp["amplitudes"]:
imt_type = amp["name"].upper()
imt_set.add(imt_type)
amp_id = sta_id + "." + imt_type + "." + original_channel
if amp_id in amp_set:
continue
amp_set.add(amp_id)
amplitude = amp["value"]
if "ln_sigma" in amp:
stddev = amp["ln_sigma"]
elif "sigma" in amp:
stddev = amp["sigma"]
else:
stddev = 0
flag = amp["flag"]
units = amp["units"]
if (
amplitude == "null"
or np.isnan(float(amplitude))
or amplitude <= 0
):
amplitude = "NULL"
flag = "G"
elif imt_type == "MMI":
pass
elif imt_type == "PGV":
if units == "cm/s":
amplitude = np.log(amplitude)
elif units == "ln(cm/s)":
pass
else:
raise ValueError(f"Unknown units {units} in input")
else:
if units == "%g":
amplitude = np.log(amplitude / 100.0)
elif units == "ln(g)":
pass
else:
raise ValueError(f"Unknown units {units} in input")
amp_rows.append(
[
sta_id,
imt_type,
original_channel,
orientation,
amplitude,
stddev,
flag,
-1,
]
)
new_imts = imt_set - orig_imt_set
if any(new_imts):
self.cursor.executemany(
"INSERT INTO imt (imt_type) VALUES (?)", zip(new_imts)
)
self.db.commit()
query = "SELECT imt_type, id FROM imt"
self.cursor.execute(query)
imt_hash = dict(self.cursor.fetchall())
for row in amp_rows:
row[1] = imt_hash[row[1]]
self.cursor.executemany(
"INSERT INTO amp (station_id, imt_id, original_channel, "
"orientation, amp, stddev, flag, nresp) VALUES "
"(?, ?, ?, ?, ?, ?, ?, ?)",
amp_rows,
)
self.db.commit()
self.cursor.executemany(
"INSERT INTO station (id, network, code, name, lat, lon, "
"elev, vs30, stddev, instrumented, source, refid) VALUES "
"(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
station_rows,
)
self.db.commit()
return
[docs] def getGeoJson(self):
jdict = {"type": "FeatureCollection", "features": []}
query = "SELECT id, shortref, longref, description FROM reference"
self.cursor.execute(query)
rows = self.cursor.fetchall()
if len(rows):
refdict = {}
for row in rows:
refid = row[0]
shortref = row[1]
longref = row[2]
description = row[3]
refdict[refid] = {
"refid": refid,
"long_reference": longref,
"short_reference": shortref,
"description": description,
}
jdict["references"] = refdict
self.cursor.execute(
"SELECT id, network, code, name, lat, lon, elev, vs30, "
"instrumented, source, refid from station"
)
sta_rows = self.cursor.fetchall()
for sta in sta_rows:
feature = {
"type": "Feature",
"id": sta[0],
"properties": {
"code": str(sta[2]),
"name": sta[3],
"refid": sta[10],
"instrumentType": "UNK" if sta[8] == 1 else "OBSERVED",
"source": sta[9],
"network": sta[1],
"commType": "UNK",
"location": "",
"intensity": None,
"intensity_flag": "",
"intensity_stddev": None,
"pga": None,
"pgv": None,
"distance": None,
"elev": sta[6],
"vs30": sta[7],
"channels": [],
},
"geometry": {"type": "Point", "coordinates": [sta[5], sta[4]]},
}
self.cursor.execute(
"SELECT a.amp, i.imt_type, a.original_channel, "
"a.flag, a.stddev, a.orientation, a.nresp "
"FROM amp a, imt i "
'WHERE a.station_id = "%s" '
"AND a.imt_id = i.id" % (str(sta[0]))
)
amp_rows = self.cursor.fetchall()
if sta[8] == 0:
if len(amp_rows) != 1:
logging.warn("Couldn't find intensity for MMI station.")
continue
feature["properties"]["intensity"] = amp_rows[0][0]
feature["properties"]["intensity_stddev"] = amp_rows[0][4]
feature["properties"]["intensity_flag"] = amp_rows[0][3]
feature["properties"]["nresp"] = amp_rows[0][6]
feature["properties"]["channels"] = []
jdict["features"].append(feature)
continue
channels = {}
for amp in amp_rows:
sd_string = "ln_sigma"
if amp[2] not in channels:
channels[amp[2]] = {"name": amp[2], "amplitudes": []}
if amp[0] == "NULL":
value = "null"
sigma = "null"
else:
value = amp[0]
sigma = float(f"{amp[4]:.4f}")
if amp[1] == "PGV":
if value != "null":
value = float(f"{np.exp(value):.4f}")
units = "cm/s"
elif amp[1] == "MMI":
if value != "null":
value = float(f"{value:.1f}")
units = "intensity"
sd_string = "sigma"
else:
if value != "null":
value = float(f"{np.exp(value) * 100:.4f}")
units = "%g"
aflag = str(amp[3])
if "I" in aflag and "IncompleteRecord" not in aflag:
aflag = aflag.replace("I", "IncompleteRecord")
if "G" in aflag and "Glitch" not in aflag:
aflag = aflag.replace("G", "Glitch")
if "T" in aflag and "Outlier" not in aflag:
aflag = aflag.replace("T", "Outlier")
if "M" in aflag and "ManuallyFlagged" not in aflag:
aflag = aflag.replace("M", "ManuallyFlagged")
if amp[5] == "U":
if aflag == "0":
aflag = "UnknownOrientation"
else:
aflag = str(amp[3]) + ",UnknownOrientation"
this_amp = {
"name": amp[1].lower(),
"value": value,
"units": units,
"flag": aflag,
sd_string: sigma,
}
channels[amp[2]]["amplitudes"].append(this_amp)
for channel in channels.values():
feature["properties"]["channels"].append(channel)
jdict["features"].append(feature)
return jdict
def _loadFromDict(self, stationdict, imtset):
"""
Internal method to turn the station dictionary created from the
ShakeMap XML input files into a SQLite database.
Args:
stationdictlist (list of stationdicts):
A list of station dictionaries returned by _filter_station().
dbfile (string):
The path to which the SQLite database will be written.
Returns:
:class:`StationList` object
"""
#
# Get the current IMTs and their IDs and add any new ones
#
imts_in_db = self.getIMTtypes()
new_imts = imtset - imts_in_db
if any(new_imts):
self.cursor.executemany(
"INSERT INTO imt (imt_type) VALUES (?)", zip(new_imts)
)
self.db.commit()
# Now get the updated list of IMTs and their IDs
query = "SELECT imt_type, id FROM imt"
self.cursor.execute(query)
imt_hash = dict(self.cursor.fetchall())
#
# Get the station codes for all the stations in the db
#
query = "SELECT id FROM station"
self.cursor.execute(query)
sta_set = set([z[0] for z in self.cursor.fetchall()])
#
# Insert any new stations into the station table
#
station_rows = []
for sta_id, station_tpl in stationdict.items():
if sta_id in sta_set:
continue
else:
sta_set.add(sta_id)
station_attributes, comp_dict = station_tpl
lat = station_attributes["lat"]
lon = station_attributes["lon"]
code = station_attributes["code"]
# the attributes dictionary may not have the same
# netid that we created. Use instead the first part of
# the station id
netid = sta_id[0 : sta_id.find(".")]
network = netid
name = station_attributes.get("name", None)
elev = station_attributes.get("elev", None)
vs30 = station_attributes.get("vs30", None)
stddev = station_attributes.get("stddev", 0)
source = station_attributes.get("source", None)
refid = station_attributes.get("refid", None)
instrumented = int(network.lower() not in CIIM_TUPLE)
station_rows.append(
(
sta_id,
network,
code,
name,
lat,
lon,
elev,
vs30,
stddev,
instrumented,
source,
refid,
)
)
self.cursor.executemany(
"INSERT INTO station (id, network, code, name, lat, lon, "
"elev, vs30, stddev, instrumented, source, refid) VALUES "
"(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
station_rows,
)
self.db.commit()
#
# Now add the amps, first get the current set so we don't add
# any duplicates; a unique amp will be (station_id, imt_id,
# original_channel)
#
query = "SELECT station_id, imt_id, original_channel FROM amp"
self.cursor.execute(query)
amp_rows = self.cursor.fetchall()
# Create a unique identifier for each amp so we don't repeat any
amp_set = set([str(v[0]) + "." + str(v[1]) + "." + str(v[2]) for v in amp_rows])
#
# Insert the amps for each component
#
amp_rows = []
for sta_id, station_tpl in stationdict.items():
station_attributes, comp_dict = station_tpl
instrumented = int(station_attributes["netid"].lower() not in CIIM_TUPLE)
for original_channel, cdict in comp_dict.items():
pgm_dict = cdict["amps"]
if len(comp_dict) == 1 and original_channel == station_attributes.get(
"name", ""
):
orientation = "H"
else:
orientation = cdict["attrs"].get("orientation", None)
orientation = _getOrientation(original_channel, orientation)
for imt_type, imt_dict in pgm_dict.items():
if (instrumented == 0) and (imt_type != "MMI"):
continue
imtid = imt_hash[imt_type]
amp_id = (
str(sta_id) + "." + str(imtid) + "." + str(original_channel)
)
if amp_id in amp_set:
continue
else:
amp_set.add(amp_id)
amp = imt_dict["value"]
units = imt_dict["units"]
stddev = imt_dict["stddev"]
flag = imt_dict["flag"]
nresp = imt_dict.get("nresp", -1)
if np.isnan(amp):
amp = "NULL"
flag = "G"
elif imt_type == "MMI":
if amp <= 0:
amp = "NULL"
flag = "G"
else:
pass
elif imt_type == "PGV":
if units == "cm/s":
if amp <= 0:
amp = "NULL"
flag = "G"
else:
amp = np.log(amp)
elif units == "ln(cm/s)":
pass
else:
raise ValueError(f"Unknown units {units} in input")
else:
if units == "%g":
if amp <= 0:
amp = "NULL"
flag = "G"
else:
amp = np.log(amp / 100.0)
elif units == "ln(g)":
pass
else:
raise ValueError(f"Unknown units {units} in input")
amp_rows.append(
(
sta_id,
imtid,
original_channel,
orientation,
amp,
stddev,
flag,
nresp,
)
)
self.cursor.executemany(
"INSERT INTO amp (station_id, imt_id, original_channel, "
"orientation, amp, stddev, flag, nresp) VALUES "
"(?, ?, ?, ?, ?, ?, ?, ?)",
amp_rows,
)
self.db.commit()
return
[docs] def getIMTtypes(self):
"""
Return a set of IMT types found in the database
Args:
None
Returns:
A set of IMT types
"""
self.cursor.execute("SELECT imt_type FROM imt")
return set([z[0] for z in self.cursor.fetchall()])
[docs] def getStationDictionary(self, instrumented=True, min_nresp=1):
"""
Return a dictionary of the instrumented or non-instrumented
stations. The keys describe the parameter, the values are Numpy
arrays of the parameter in question.
For the standard set of ShakeMap IMTs (mmi, pga, pgv, psa03, psa10,
psa30), the keys in the dictionary would be:
'id', 'network', 'code', 'name', 'lat', 'lon', 'elev', 'vs30',
'stddev', 'instrumented', 'PGA', 'PGA_sd', 'PGV', 'PGA_sd',
'SA(0.3)', 'SA(0.3)_sd, 'SA(1.0)', 'SA(1.0)_sd', 'SA(3.0)',
'SA(3.0)_sd'
For the non-instrumented dictionary, the keys would be:
'id', 'network', 'code', 'name', 'lat', 'lon', 'elev', 'vs30',
'stddev', 'instrumented', 'MMI', 'MMI_sd', 'nresp'
The **id** column is **network** and **code** concatenated with a
period (".") between them.
All ground motion units are natural log units. Distances are in km.
Args:
instrumented (bool):
Set to True if the dictionary is to contain the instrumented
stations, or to False if the dictionary is to contain the
non-instrumented (MMI) stations.
min_nresp (int):
The minimum number of DYFI responses required to make a valid
observation.
Returns:
dict, set: A dictionary of Numpy arrays, and a set specifying
the IMTs found in the dictionary.
Raises:
TypeError: if "instrumented" argument is not type bool.
"""
if not isinstance(instrumented, bool):
raise TypeError(
"getStationDictionary: the instrumented argument "
"must be of type bool"
)
columns = list(TABLES["station"].keys())
dstr = ", ".join(columns)
self.cursor.execute(
"SELECT %s FROM station where instrumented = %d" % (dstr, instrumented)
)
station_rows = self.cursor.fetchall()
nstation_rows = len(station_rows)
if not nstation_rows:
return None, set()
station_columns = list(zip(*station_rows))
df = OrderedDict()
for ic, col in enumerate(columns):
df[col] = np.array(station_columns[ic])
myimts = set()
for imt in self.getIMTtypes():
if (instrumented and "MMI" in imt) or (
not instrumented and "MMI" not in imt
):
continue
df[imt] = np.full(nstation_rows, np.nan)
df[imt + "_sd"] = np.full(nstation_rows, 0.0)
if instrumented is False:
df[imt + "_nresp"] = np.full(nstation_rows, -1, dtype=np.int32)
myimts.update([imt])
id_dict = dict(zip(df["id"], range(nstation_rows)))
#
# Get all of the unflagged amps with the proper orientation
#
self.cursor.execute(
"SELECT a.amp, i.imt_type, a.station_id, a.stddev, a.nresp FROM "
'amp a, station s, imt i WHERE a.flag = "0" '
"AND s.id = a.station_id "
"AND a.imt_id = i.id "
'AND s.instrumented = ? AND a.orientation NOT IN ("Z", "U") '
"AND a.amp IS NOT NULL "
"AND (a.nresp < 0 OR a.nresp >= ?)",
(
instrumented,
min_nresp,
),
)
amp_rows = self.cursor.fetchall()
#
# Go through all the amps and put them into the data frame
#
for this_row in amp_rows:
#
# Set the cell to the peak amp
#
rowidx = id_dict[this_row[2]]
cval = df[this_row[1]][rowidx]
amp = this_row[0]
stddev = this_row[3]
nresp = this_row[4]
if np.isnan(cval) or (cval < amp):
df[this_row[1]][rowidx] = amp
df[this_row[1] + "_sd"][rowidx] = stddev
if instrumented is False:
df[this_row[1] + "_nresp"][rowidx] = nresp
return df, myimts
def _fixOrientations(self):
"""
Look for stations with channels that have orientations "1", "2",
and "Z", and set the "1" and "2" channels to have horizontal
orientation.
"""
sql = (
"SELECT DISTINCT station_id, original_channel "
"FROM amp "
"WHERE orientation IN ('U', 'Z') "
"ORDER BY station_id"
)
self.cursor.execute(sql)
sta_rows = self.cursor.fetchall()
current_station_id = ""
channel_list = []
for row in sta_rows:
station_id, channel = row
if current_station_id == "":
current_station_id = station_id
channel_list = [channel]
elif station_id == current_station_id:
channel_list.append(channel)
if len(channel_list) == 3:
ochars = [x[-1] for x in channel_list]
if "1" in ochars and "2" in ochars and "Z" in ochars:
hinds = [i for i, x in enumerate(ochars) if x == "1" or x == "2"]
sql = (
"UPDATE amp "
'SET orientation = "H"'
"WHERE station_id = ? "
"AND original_channel IN (?, ?)"
)
self.cursor.execute(
sql,
(
current_station_id,
channel_list[hinds[0]],
channel_list[hinds[1]],
),
)
self.db.commit()
# 3 channels, but not 1, 2, Z: bummer
channel_list = []
# Channels weren't 3 so we can't fix; bummer
if current_station_id != station_id:
current_station_id = station_id
channel_list = [channel]
return
@staticmethod
def _getGroundMotions(comp, imt_translate):
"""
Get a dictionary of peak ground motions (values and flags).
Output keys are one of: [pga,pgv,psa03,psa10,psa30]
Even if flags are not specified in the input, they will
be guaranteed to at least have a flag of '0'.
"""
pgmdict = {}
imtset = set()
for pgm in comp:
if pgm.tag == "#text":
continue
key = pgm.tag
if key not in imt_translate:
if key in ("acc", "pga"):
new_key = "PGA"
elif key in ("vel", "pgv"):
new_key = "PGV"
elif "mmi" in key:
new_key = "MMI"
elif "psa" in key:
pp = get_imt_period(key)
new_key = "SA(" + str(pp) + ")"
else:
raise ValueError(f"Unknown amp type in input: {key}")
imt_translate[key] = new_key
else:
new_key = imt_translate[key]
key = new_key
if "value" in pgm.attrib:
try:
value = float(pgm.attrib["value"])
except (ValueError, TypeError):
logging.warn(
"Unknown value in XML: %s for amp: %s"
% (pgm.attrib["value"], pgm.tag)
)
continue
else:
logging.warn(f"No value for amp {pgm.tag}")
continue
if "flag" in pgm.attrib and pgm.attrib["flag"] != "":
flag = pgm.attrib["flag"]
else:
flag = "0"
if "ln_stddev" in pgm.attrib:
stddev = float(pgm.attrib["ln_stddev"])
else:
stddev = 0
if "units" in pgm.attrib:
units = pgm.attrib["units"]
else:
if key == "PGV":
units = "cm/s"
elif key == "MMI":
units = "intensity"
else:
units = "%g"
pgmdict[key] = {
"value": value,
"flag": flag,
"stddev": stddev,
"units": units,
}
imtset.add(key)
return pgmdict, imtset, imt_translate
def _filter_station(self, xmlfile, stationdict, min_nresp):
"""
Filter individual xmlfile into a stationdict data structure.
Args:
xmlfile (string):
Path to ShakeMap XML input file (or file-like object)
containing station data.
stationdict (dict):
the dictionary of stations that the stations in this file should
be added to.
min_nresp (int):
The minimum number of responses that a DYFI observation needs
to be included in the processing.
Returns:
stationdict data structure
imts: a set of IMTs that were found in the file
"""
#
# Strip off any namespace garbage that is prepended
# to the tags
#
it = ET.iterparse(xmlfile)
for _, el in it:
if "}" in el.tag:
el.tag = el.tag.split("}", 1)[1]
#
# Parse the cleaned up xml tree
#
imt_translate = {}
imtset = set()
root = it.root
for sl in root.iter("stationlist"):
if "reference" in sl.attrib:
longref = sl.attrib["reference"]
shortref = ""
description = ""
query = (
f"INSERT INTO reference "
"(shortref, longref, description) VALUES "
f'("{shortref}","{longref}","{description}")'
)
self.cursor.execute(query)
self.db.commit()
query = f"SELECT id FROM reference WHERE longref is " f'("{longref}")'
self.cursor.execute(query)
row = self.cursor.fetchall()
refid = row[0][0]
else:
refid = None
for station in sl:
if station.tag != "station":
continue
# look at the station attributes to figure out if this is a
# DYFI-type station or a station with instruments measuring
# PGA, PGV, etc.
attributes = station.attrib.copy()
if "netid" in attributes:
netid = attributes["netid"]
if not len(netid.strip()):
netid = "unknown"
else:
netid = "unknown"
attributes["netid"] = netid
attributes["refid"] = refid
instrumented = int(netid.lower() not in CIIM_TUPLE)
if "code" not in attributes:
logging.warn("Station does not have station code: skipping")
continue
code = attributes["code"]
if code.startswith(netid + "."):
sta_id = code
code = code.replace(netid + ".", "")
attributes["code"] = code
else:
sta_id = netid + "." + code
if sta_id in stationdict:
compdict = stationdict[sta_id][1]
else:
compdict = {}
for comp in station:
if "name" not in comp.attrib:
logging.warn(
f"Unnamed component for station {sta_id}; skipping"
)
continue
compname = comp.attrib["name"]
if "Intensity Questionnaire" in str(compname):
compdict["mmi"] = {"amps": {}, "attrs": {}}
continue
tpgmdict, ims, imt_translate = self._getGroundMotions(
comp, imt_translate
)
if compname in compdict:
pgmdict = compdict[compname]["amps"]
else:
pgmdict = {}
pgmdict.update(tpgmdict)
# copy the VALUES, not REFERENCES, of the component list
# into our growing dictionary
compdict[compname] = {
"attrs": copy.copy(comp.attrib),
"amps": copy.copy(pgmdict),
}
imtset |= ims
if ("intensity" in attributes) and (instrumented == 0):
if "mmi" not in compdict:
compdict["mmi"] = {"amps": {}, "attrs": {}}
if "intensity_flag" in attributes:
flag = attributes["intensity_flag"]
else:
flag = "0"
if "intensity_stddev" in attributes:
stddev = float(attributes["intensity_stddev"])
else:
stddev = 0
if "nresp" in attributes:
nresp = int(attributes["nresp"])
else:
nresp = -1
if nresp >= 0 and nresp < min_nresp:
continue
compdict["mmi"]["amps"]["MMI"] = {
"value": float(attributes["intensity"]),
"stddev": stddev,
"nresp": nresp,
"flag": flag,
"units": "intensity",
}
imtset.add("MMI")
stationdict[sta_id] = (attributes, copy.copy(compdict))
return stationdict, imtset
def _createTables(self):
"""
Build the database tables.
"""
for table in TABLES.keys():
sql = f"CREATE TABLE {table} ("
nuggets = []
for column, ctype in TABLES[table].items():
nuggets.append(f"{column} {ctype}")
sql += ",".join(nuggets) + ")"
self.cursor.execute(sql)
self.db.commit()
return
[docs]def get_imt_period(imt):
"""
Get the period from a string like psa3p0, psa3.0, or psa30 (the first
being favored). Return the floating point period.
Args:
imt (str): a string starting with "psa" and ending with something
that can reasonably be converted to a floating point number.
Returns:
float: The period of the psa input.
TODO: Could do a lot more error checking here, but I guess we're
assuming that the people who send us data aren't idiots.
"""
# Updated 'psa2p5' style
p = re.search(r"(?<=psa).*", imt)
if "p" in p.group(0):
return float(p.group(0).replace("p", "."))
# Weird, but we'll allow it psa2.5 style
if "." in p.group(0):
return float(p.group(0))
# Old school psa25 style
p = re.search(r"(?<=psa)\d+", imt)
return float(p.group(0)[:-1] + "." + p.group(0)[-1])
def _getOrientation(orig_channel, orient):
"""
Return a character representing the orientation of a channel.
Args:
orig_channel (string):
String representing the seed channel (e.g. 'HNZ'). The
final character is assumed to be the (uppercase) orientation.
orient (str or None):
Gives the orientation of the channel, overriding channel
codes that end in numbers. Must be one of 'h' (horizontal)
or 'v' (vertical), or None if the orientation has not been
explicitly specified in the "comp" element.
Returns:
Character representing the channel orientation. One of 'N',
'E', 'Z', 'H' (for horizontal), or 'U' (for unknown).
"""
if not len(orig_channel.strip()):
orientation = "U" # default when we don't know anything about channel
return orientation
if orig_channel == "mmi" or orig_channel == "DERIVED":
orientation = "H" # mmi is arbitrarily horizontal
elif orig_channel[-1] in ("N", "E", "Z"):
orientation = orig_channel[-1]
elif orig_channel == "UNK": # Channel is "UNK"; assume horizontal
orientation = "H"
elif orig_channel == "H1" or orig_channel == "H2":
orientation = "H"
elif orig_channel[-1].isdigit():
if orient == "h":
orientation = "H"
elif orient == "v":
orientation = "Z"
else:
orientation = "U"
else:
orientation = "U" # this is unknown
return orientation
def _getOrientationSet(chan_names):
"""
Return a characters representing the orientation of a set of
channels from a single station.
Args:
chan_names (list):
List of strings representing the seed channels (e.g. 'HNZ').
The final character is assumed to be the (uppercase)
orientation.
Returns:
Character representing the channel orientation. One of 'N',
'E', 'Z', 'H' (for horizontal), or 'U' (for unknown).
"""
if len(chan_names) == 3:
term_chars = [chan_names[0][-1], chan_names[1][-1], chan_names[2][-1]]
if "1" in term_chars and "Z" in term_chars:
orientations = [(lambda x: "V" if x == "Z" else "H")(x) for x in term_chars]
return orientations
orientations = []
for name in chan_names:
orientations.append(_getOrientation(name, None))
return orientations