"""Calibrate IMU measurements for temperature effects and errors
"""
import logging
import re
import numpy as np
import pandas as pd
import statsmodels.formula.api as smf
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import scipy.signal as signal
import xarray as xr
from skdiveMove.tdrsource import _load_dataset
from .imu import (IMUBase,
_ACCEL_NAME, _OMEGA_NAME, _MAGNT_NAME, _DEPTH_NAME)
_TRIAXIAL_VARS = [_ACCEL_NAME, _OMEGA_NAME, _MAGNT_NAME]
_MONOAXIAL_VARS = [_DEPTH_NAME, "light_levels"]
_AXIS_NAMES = list("xyz")
logger = logging.getLogger(__name__)
# Add the null handler if importing as library; whatever using this library
# should set up logging.basicConfig() as needed
logger.addHandler(logging.NullHandler())
[docs]
class IMUcalibrate(IMUBase):
r"""Calibration framework for IMU measurements
Measurements from most IMU sensors are influenced by temperature, among
other artifacts. The IMUcalibrate class implements the following
procedure to remove the effects of temperature from IMU signals:
- For each axis, fit a piecewise or simple linear regression of
measured (lowpass-filtered) data against temperature.
- Compute predicted signal from the model.
- Select a reference temperature :math:`T_{\alpha}` to standardize all
measurements at.
- The standardized measurement (:math:`x_\sigma`) at :math:`T_{\alpha}`
is calculated as:
.. math::
:label: 5
x_\sigma = x - (\hat{x} - \hat{x}_{\alpha})
where :math:`\hat{x}` is the value predicted from the model at the
measured temperature, and :math:`\hat{x}_{\alpha}` is the predicted
value at :math:`T_{\alpha}`.
The models fit to signals from a *motionless* (i.e. experimental) IMU
device in the first step can subsequently be used to remove or minimize
temperature effects from an IMU device measuring motions of interest,
provided the temperature is within the range observed in experiments.
In addition to attributes in :class:`IMUBase`, `IMUcalibrate` adds the
attributes listed below.
Attributes
----------
periods : list
List of slices with the beginning and ending timestamps defining
periods in `x_calib` where valid calibration data are available.
Periods are assumed to be ordered chronologically.
models_l : list
List of dictionaries as long as there are periods, with each
element corresponding to a sensor, in turn containing another
dictionary with each element corresponding to each sensor axis.
axis_order : list
List of characters specifying which axis ``x``, ``y``, or ``z`` was
pointing in the same direction as gravity in each period in
`periods`.
Examples
--------
Construct IMUcalibrate from NetCDF file with samples of IMU signals and
a list with begining and ending timestamps for experimental periods:
>>> import importlib.resources as rsrc
>>> import skdiveMove.imutools as imutools
>>> icdf = (rsrc.files("skdiveMove") / "tests" / "data" /
... "cats_temperature_calib.nc")
>>> pers = [slice("2021-09-20T09:00:00", "2021-09-21T10:33:00"),
... slice("2021-09-21T10:40:00", "2021-09-22T11:55:00"),
... slice("2021-09-22T12:14:00", "2021-09-23T11:19:00")]
>>> imucal = (imutools.IMUcalibrate
... .read_netcdf(icdf, periods=pers,
... axis_order=list("zxy"),
... time_name="timestamp_utc"))
>>> print(imucal) # doctest: +ELLIPSIS
IMU -- Class IMUcalibrate object
Source File None
IMU: <xarray.Dataset> ...
Dimensions: (timestamp_utc: 268081, axis: 3)
Coordinates:
* axis (axis) <U1 12B 'x' 'y' 'z'
* timestamp_utc (timestamp_utc) datetime64[ns] ...
Data variables:
acceleration (timestamp_utc, axis) float64 ...
angular_velocity (timestamp_utc, axis) float64 ...
magnetic_density (timestamp_utc, axis) float64 ...
depth (timestamp_utc) float64 ...
temperature (timestamp_utc) float64 ...
Attributes:...
history: Resampled from 20 Hz to 1 Hz
Periods:
0:['2021-09-20T09:00:00', '2021-09-21T10:33:00']
1:['2021-09-21T10:40:00', '2021-09-22T11:55:00']
2:['2021-09-22T12:14:00', '2021-09-23T11:19:00']
Plot signals from a given period:
>>> fig, axs, axs_temp = imucal.plot_experiment(0, var="acceleration")
Build temperature models for a given variable and chosen
:math:`T_{\alpha}`, without low-pass filtering the input signals:
>>> fs = 1.0
>>> acc_cal = imucal.build_tmodels("acceleration", T_alpha=8,
... use_axis_order=True,
... win_len=int(2 * 60 * fs) - 1)
Plot model of IMU variable against temperature:
>>> fig, axs = imucal.plot_var_model("acceleration",
... use_axis_order=True)
Notes
-----
This class redefines :meth:`IMUBase.read_netcdf`.
"""
def __init__(self, x_calib, periods, axis_order=list("xyz"),
**kwargs):
"""Set up attributes required for calibration
Parameters
----------
x_calib : xarray.Dataset
Dataset with temperature and tri-axial data from *motionless*
IMU experiments. Data are assumed to be in FLU coordinate
frame.
periods : list
List of slices with the beginning and ending timestamps
defining periods in `x_calib` where valid calibration data are
available. Periods are assumed to be ordered chronologically.
axis_order : list
List of characters specifying which axis ``x``, ``y``, or ``z``
was pointing in the same direction as gravity in each period in
`periods`.
**kwargs
Optional keyword arguments passed to the `IMUBase.__init__` for
instantiation.
"""
super(IMUcalibrate, self).__init__(x_calib, **kwargs)
self.periods = periods
models_l = []
for period in periods:
models_1d = {i: dict() for i in _MONOAXIAL_VARS}
models_2d = dict.fromkeys(_TRIAXIAL_VARS)
for k in models_2d:
models_2d[k] = dict.fromkeys(axis_order)
models_l.append(dict(**models_1d, **models_2d))
self.models_l = models_l
self.axis_order = axis_order
# Private attribute collecting DataArrays with standardized data
self._stdda_l = []
[docs]
@classmethod
def read_netcdf(cls, imu_nc, load_dataset_kwargs=dict(), **kwargs):
"""Create IMUcalibrate from NetCDF file and list of slices
This method redefines :meth:`IMUBase.read_netcdf`.
Parameters
----------
imu_nc : str
Path to NetCDF file.
load_dataset_kwargs : dict, optional
Dictionary of optional keyword arguments passed to
:func:`xarray.load_dataset`.
**kwargs
Optional keyword arguments passed to
:meth:`IMUcalibrate.__init__` method, except `has_depth` or
`imu_filename`. The input `Dataset` is assumed to have a depth
`DataArray`.
Returns
-------
out :
"""
imu = _load_dataset(imu_nc, **load_dataset_kwargs)
ocls = cls(imu, **kwargs)
return ocls
def __str__(self):
super_str = super(IMUcalibrate, self).__str__()
pers_ends = []
for per in self.periods:
pers_ends.append([per.start, per.stop])
msg = ("\n".join("{}:{}".format(i, per)
for i, per in enumerate(pers_ends)))
return super_str + "\nPeriods:\n{}".format(msg)
[docs]
def savgol_filter(self, var, period_idx, win_len, polyorder=1):
"""Apply Savitzky-Golay filter on tri-axial IMU signals
Parameters
----------
var : str
Name of the variable in `x` with tri-axial signals.
period_idx : int
Index of period to plot (zero-based).
win_len : int
Window length for the low pass filter.
polyorder : int, optional
Polynomial order to use.
Returns
-------
xarray.DataArray
Array with filtered signals, with the same coordinates,
dimensions, and updated attributes.
"""
darray = self.subset_imu(period_idx)[var]
var_df = darray.to_dataframe().unstack()
var_sg = signal.savgol_filter(var_df, window_length=win_len,
polyorder=polyorder, axis=0)
new_history = (("{}: Savitzky-Golay filter: win_len={}, "
"polyorder={}\n")
.format(pd.to_datetime("today")
.strftime("%Y-%m-%d"), win_len, polyorder))
darray_new = xr.DataArray(var_sg, coords=darray.coords,
dims=darray.dims, name=darray.name,
attrs=darray.attrs)
darray_new.attrs["history"] = (darray_new.attrs["history"] +
new_history)
return darray_new
[docs]
def build_tmodels(self, var, T_alpha=None, T_brk=None,
use_axis_order=False, filter_sig=True, **kwargs):
r"""Build temperature models for experimental tri-axial IMU sensor
signals
Perform thermal compensation on *motionless* tri-axial IMU sensor
data. A simple approach is used for the compensation:
- For each axis, build a piecewise or simple linear regression of
measured data against temperature. If a breakpoint is known,
as per manufacturer specifications or experimentation, use
piecewise regression.
- Compute predicted signal from the model.
- Select a reference temperature :math:`T_{\alpha}` to
standardize all measurements at.
- The standardized measurement at :math:`T_{\alpha}` is
calculated as :math:`x - (\hat{x} - x_{T_{\alpha}})`, where
:math:`\hat{x}` is the value predicted from the model at the
measured temperature, and :math:`x_{T_{\alpha}}` is the
predicted value at :math:`T_{\alpha}`.
Parameters
----------
var : str
Name of the variable in `x` with tri-axial data.
T_alpha : float, optional
Reference temperature at which all measurements will be
adjusted to. Defaults to the mean temperature for each period,
rounded to the nearest integer.
T_brk : float, optional
Temperature change point separating data to be fit differently.
A piecewise regression model is fit. Default is a simple
linear model is fit.
use_axis_order : bool, optional
Whether to use axis order from the instance. If True, only one
sensor axis per period is considered to have valid calibration
data for the correction. Otherwise, all three axes for each
period are used in the correction.
filter_sig : bool, optional
Whether to filter in the measured signal for thermal
correction. Default is to apply a Savitzky-Golay filter to the
signal for characterizing the temperature relationship, and to
calculate the standardized signal.
**kwargs
Optional keyword arguments passed to
:func:`~scipy.signal.savgol_filter` (e.g. `win_len` and
`polyorder`).
Returns
-------
list
List of tuples as long as there are periods, with tuple elements:
- Dictionary with regression model objects for each sensor
axis.
- DataFrame with hierarchical column index with sensor axis
label at the first level. The following columns are in the
second level:
- temperature
- var_name
- var_name_pred
- var_name_temp_refC
- var_name_adj
Notes
-----
A new DataArray with signal standardized at :math:`T_{\alpha}` is
added to the instance Dataset. These signals correspond to the
lowpass-filtered form of the input used to build the models.
See Also
--------
apply_model
"""
# Iterate through periods
per_l = [] # output list as long as periods
for idx in range(len(self.periods)):
per = self.subset_imu(idx)
# Subset the requested variable, smoothing if necessary
if filter_sig:
per_var = self.savgol_filter(var, idx, **kwargs)
else:
per_var = per[var]
per_temp = per["temperature"]
var_df = xr.merge([per_var, per_temp]).to_dataframe()
if T_alpha is None:
t_alpha = np.rint(per_temp.mean().to_numpy().item())
logger.info("Period {} T_alpha set to {:.2f}"
.format(idx, t_alpha))
else:
t_alpha = T_alpha
odata_l = []
models_d = self.models_l[idx]
if use_axis_order:
axis_names = [self.axis_order[idx]]
elif len(per_var.dims) > 1:
axis_names = per_var[per_var.dims[-1]].to_numpy()
else:
axis_names = [per_var.dims[0]]
std_colname = "{}_std".format(var)
pred_colname = "{}_pred".format(var)
for i, axis in enumerate(axis_names): # do all axes
if isinstance(var_df.index, pd.MultiIndex):
data_axis = var_df.xs(axis, level="axis").copy()
else:
data_axis = var_df.copy()
if T_brk is not None:
temp0 = (data_axis["temperature"]
.where(data_axis["temperature"] < T_brk, 0))
data_axis.loc[:, "temp0"] = temp0
temp1 = (data_axis["temperature"]
.where(data_axis["temperature"] > T_brk, 0))
data_axis.loc[:, "temp1"] = temp1
fmla = "{} ~ temperature + temp0 + temp1".format(var)
else:
fmla = "{} ~ temperature".format(var)
model_fit = smf.ols(formula=fmla, data=data_axis).fit()
models_d[var][axis] = model_fit
data_axis.loc[:, pred_colname] = model_fit.fittedvalues
# Data at reference temperature
ref_colname = "{}_{}C".format(var, t_alpha)
if T_brk is not None:
if t_alpha < T_brk:
pred = model_fit.predict(exog=dict(
temperature=t_alpha,
temp0=t_alpha, temp1=0)).to_numpy().item()
data_axis[ref_colname] = pred
else:
pred = model_fit.predict(exog=dict(
temperature=t_alpha,
temp0=0, temp1=t_alpha)).to_numpy().item()
data_axis[ref_colname] = pred
data_axis.drop(["temp0", "temp1"], axis=1, inplace=True)
else:
pred = model_fit.predict(exog=dict(
temperature=t_alpha)).to_numpy().item()
data_axis.loc[:, ref_colname] = pred
logger.info("Predicted {} ({}, rounded) at {:.2f}: {:.3f}"
.format(var, axis, t_alpha, pred))
data_axis[std_colname] = (data_axis[var] -
(data_axis[pred_colname] -
data_axis[ref_colname]))
odata_l.append(data_axis)
# Update instance models_l attribute
self.models_l[idx][var][axis] = model_fit
if var in _MONOAXIAL_VARS:
odata = pd.concat(odata_l)
std_data = xr.DataArray(odata.loc[:, std_colname],
name=std_colname)
else:
odata = pd.concat(odata_l, axis=1,
keys=axis_names[:i + 1],
names=["axis", "variable"])
std_data = xr.DataArray(odata.xs(std_colname,
axis=1, level=1),
name=std_colname)
per_l.append((models_d, odata))
std_data.attrs = per_var.attrs
new_description = ("{} standardized at {}C"
.format(std_data.attrs["description"],
t_alpha))
std_data.attrs["description"] = new_description
new_history = ("{}: temperature_model: temperature models\n"
.format(pd.to_datetime("today")
.strftime("%Y-%m-%d")))
std_data.attrs["history"] = (std_data.attrs["history"] +
new_history)
# Update instance _std_da_l attribute with DataArray having an
# additional dimension for the period index
std_data = std_data.expand_dims(period=[idx])
self._stdda_l.append(std_data)
return per_l
[docs]
def plot_experiment(self, period_idx, var, units_label=None, **kwargs):
"""Plot experimental IMU
Parameters
----------
period_idx : int
Index of period to plot (zero-based).
var : str
Name of the variable in with tri-axial data.
units_label : str, optional
Label for the units of the chosen variable. Defaults to the
"units_label" attribute available in the DataArray.
**kwargs
Optional keyword arguments passed to
:func:`~matplotlib.pyplot.subplots` (e.g. `figsize`).
Returns
-------
fig : matplotlib.Figure
axs : array_like
Array of :class:`~matplotlib.axes.Axes` instances in `fig` with
IMU signal plots.
axs_temp : array_like
Array of :class:`~matplotlib.axes.Axes` instances in `fig` with
temperature plots.
See Also
--------
plot_var_model
plot_standardized
"""
per_da = self.subset_imu(period_idx)
per_var = per_da[var]
per_temp = per_da["temperature"]
def _plot(var, temp, ax):
"""Plot variable and temperature"""
ax_temp = ax.twinx()
var.plot.line(ax=ax, label="measured", color="k",
linewidth=0.5)
temp.plot.line(ax=ax_temp, label="temperature", color="r",
linewidth=0.5, alpha=0.5)
ax.set_title("")
ax.set_xlabel("")
# Adjust ylim to exclude outliers
ax.set_ylim(var.quantile(1e-5).to_numpy().item(),
var.quantile(1 - 1e-5).to_numpy().item())
# Axis locators and formatters
dlocator = mdates.AutoDateLocator(minticks=3, maxticks=7)
dformatter = mdates.ConciseDateFormatter(dlocator)
ax.xaxis.set_major_locator(dlocator)
ax.xaxis.set_major_formatter(dformatter)
ax.xaxis.set_tick_params(rotation=0)
return ax_temp
if units_label is None:
units_label = per_var.attrs["units_label"]
ylabel_pre = "{} [{}]".format(per_var.attrs["full_name"],
units_label)
temp_label = "{} [{}]".format(per_temp.attrs["full_name"],
per_temp.attrs["units_label"])
ndims = len(per_var.dims)
axs_temp = []
if ndims == 1:
fig, axs = plt.subplots(**kwargs)
ax_temp = _plot(per_var, per_temp, axs)
axs.set_xlabel("")
axs.set_title("")
axs.set_ylabel(ylabel_pre)
ax_temp.set_ylabel(temp_label)
axs_temp.append(ax_temp)
else:
fig, axs = plt.subplots(3, 1, sharex=True, **kwargs)
ax_x, ax_y, ax_z = axs
axis_names = per_var[per_var.dims[-1]].to_numpy()
for i, axis in enumerate(axis_names):
ymeasured = per_var.sel(axis=axis)
ax_temp = _plot(ymeasured, per_temp, axs[i])
axs[i].set_title("")
axs[i].set_xlabel("")
axs[i].set_ylabel("{} {}".format(ylabel_pre, axis))
if i == 1:
ax_temp.set_ylabel(temp_label)
else:
ax_temp.set_ylabel("")
axs_temp.append(ax_temp)
ax_z.set_xlabel("")
return fig, axs, axs_temp
[docs]
def plot_var_model(self, var, use_axis_order=True, units_label=None,
axs=None, **kwargs):
"""Plot IMU variable against temperature and fitted model
A multi-panel plot of the selected variable against temperature
from all periods.
Parameters
----------
var : str
IMU variable to plot.
use_axis_order : bool
Whether to use axis order from the instance. If True, only one
sensor axis per period is considered to have valid calibration
data for the correction. Otherwise, all three axes for each
period are used in the correction. Ignored for uniaxial
variables.
units_label : str
Label for the units of the chosen variable. Defaults to the
"units_label" attribute available in the DataArray.
axs : array_like, optional
Array of Axes instances to plot in.
**kwargs
Optional keyword arguments passed to
:func:`~matplotlib.pyplot.subplots` (e.g. `figsize`).
Returns
-------
fig : matplotlib.Figure
axs : array_like
Array of :class:`~matplotlib.axes.Axes` instances in `fig` with
IMU signal plots.
See Also
--------
plot_experiment
plot_standardized
"""
def _plot_signal(x, y, idx, model_fit, ax):
ax.plot(x, y, ".", markersize=2, alpha=0.03,
label="Period {}".format(idx))
# Adjust ylim to exclude outliers
ax.set_ylim(np.quantile(y, 1e-3), np.quantile(y, 1 - 1e-3))
# Linear model
xpred = np.linspace(x.min(), x.max())
ypreds = (model_fit
.get_prediction(exog=dict(Intercept=1,
temperature=xpred))
.summary_frame())
ypred_0 = ypreds["mean"]
ypred_l = ypreds["obs_ci_lower"]
ypred_u = ypreds["obs_ci_upper"]
ax.plot(xpred, ypred_0, color="k", alpha=0.5)
ax.plot(xpred, ypred_l, color="k", linestyle="dashed",
linewidth=1, alpha=0.5)
ax.plot(xpred, ypred_u, color="k", linestyle="dashed",
linewidth=1, alpha=0.5)
per0 = self.subset_imu(0)
if units_label is None:
units_label = per0[var].attrs["units_label"]
xlabel = "{} [{}]".format(per0["temperature"].attrs["full_name"],
per0["temperature"].attrs["units_label"])
ylabel_pre = "{} [{}]".format(per0[var].attrs["full_name"],
units_label)
nperiods = len(self.periods)
if axs is not None:
fig = plt.gcf()
if var in _MONOAXIAL_VARS:
if axs is None:
fig, axs = plt.subplots(1, nperiods, **kwargs)
for per_i in range(nperiods):
peri = self.subset_imu(per_i)
per_var = peri[var]
per_temp = peri["temperature"]
xdata = per_temp.to_numpy()
ydata = per_var.to_numpy()
# Linear model
model_fit = self.get_model(var, period=per_i,
axis=per_var.dims[0])
ax_i = axs[per_i]
_plot_signal(x=xdata, y=ydata, idx=per_i,
model_fit=model_fit, ax=ax_i)
ax_i.set_xlabel(xlabel)
axs[0].set_ylabel(ylabel_pre)
elif use_axis_order:
if axs is None:
fig, axs = plt.subplots(3, 1, **kwargs)
axs[-1].set_xlabel(xlabel)
for i, axis in enumerate(_AXIS_NAMES):
idx = self.axis_order.index(axis)
peri = self.subset_imu(idx)
xdata = peri["temperature"].to_numpy()
ydata = peri[var].sel(axis=axis).to_numpy()
# Linear model
model_fit = self.get_model(var, period=idx, axis=axis)
ax_i = axs[i]
_plot_signal(xdata, y=ydata, idx=idx,
model_fit=model_fit, ax=ax_i)
ax_i.set_ylabel("{} {}".format(ylabel_pre, axis))
ax_i.legend(loc=9, bbox_to_anchor=(0.5, 1),
frameon=False, borderaxespad=0)
else:
if axs is None:
fig, axs = plt.subplots(3, nperiods, **kwargs)
for vert_i in range(nperiods):
peri = self.subset_imu(vert_i)
xdata = peri["temperature"].to_numpy()
axs_xyz = axs[:, vert_i]
for i, axis in enumerate(_AXIS_NAMES):
ydata = (peri[var].sel(axis=axis).to_numpy())
# Linear model
model_fit = self.get_model(var, period=vert_i,
axis=axis)
ax_i = axs_xyz[i]
_plot_signal(xdata, y=ydata, idx=vert_i,
model_fit=model_fit, ax=ax_i)
ax_i.set_ylabel("{} {}".format(ylabel_pre, axis))
axs_xyz[0].set_title("Period {}".format(vert_i))
axs_xyz[-1].set_xlabel(xlabel)
return fig, axs
[docs]
def plot_standardized(self, var, use_axis_order=True, units_label=None,
ref_val=None, axs=None, **kwargs):
r"""Plot IMU measured and standardized variable along with temperature
A multi-panel time series plot of the selected variable, measured
and standardized, for all periods.
Parameters
----------
var : str
IMU variable to plot.
use_axis_order : bool, optional
Whether to use axis order from the instance. If True, only one
sensor axis per period is considered to have valid calibration
data for the correction. Otherwise, all three axes for each
period are used in the correction.
units_label : str, optional
Label for the units of the chosen variable. Defaults to the
"units_label" attribute available in the DataArray.
ref_val : float
Reference value for the chosen variable (e.g. gravity, for
acceleration). If provided, a horizontal line is included in
the plot for reference.
axs : array_like, optional
Array of Axes instances to plot in.
**kwargs
Optional keyword arguments passed to
:func:`~matplotlib.pyplot.subplots` (e.g. `figsize`).
Returns
-------
fig : matplotlib.Figure
axs : array_like
Array of :class:`~matplotlib.axes.Axes` instances in `fig` with
IMU signal plots.
axs_temp : array_like
Array of :class:`~matplotlib.axes.Axes` instances in `fig` with
temperature plots.
See Also
--------
plot_experiment
plot_var_model
"""
def _plot_signal(ymeasured, ystd, temp, ax, neg_ref=False):
ax_temp = ax.twinx()
(ymeasured.plot.line(ax=ax, label="measured", color="k",
linewidth=0.5))
(ystd.plot.line(ax=ax, label="standardized", color="b",
linewidth=0.5, alpha=0.5))
temp.plot.line(ax=ax_temp, label="temperature", color="r",
linewidth=0.5, alpha=0.5)
txt_desc = ystd.attrs["description"]
t_alpha_match = re.search(r'[-+]?\d+\.\d+', txt_desc)
ax_temp.axhline(float(txt_desc[t_alpha_match.start():
t_alpha_match.end()]),
linestyle="dashed", linewidth=1,
color="r", label=r"$T_\alpha$")
q0 = ymeasured.quantile(1e-5).to_numpy().item()
q1 = ymeasured.quantile(1 - 11e-5).to_numpy().item()
if ref_val is not None:
# Assumption of FLU with axes pointing against field
if neg_ref:
ref_i = -ref_val
else:
ref_i = ref_val
ax.axhline(ref_i, linestyle="dashdot", color="m",
linewidth=1, label="reference")
ylim0 = np.minimum(q0, ref_i)
ylim1 = np.maximum(q1, ref_i)
else:
ylim0 = q0
ylim1 = q1
ax.set_title("")
ax.set_xlabel("")
# Adjust ylim to exclude outliers
ax.set_ylim(ylim0, ylim1)
# Axis locators and formatters
dlocator = mdates.AutoDateLocator(minticks=3, maxticks=7)
dformatter = mdates.ConciseDateFormatter(dlocator)
ax.xaxis.set_major_locator(dlocator)
ax.xaxis.set_major_formatter(dformatter)
ax.xaxis.set_tick_params(rotation=0)
return ax_temp
per0 = self.subset_imu(0)
if units_label is None:
units_label = per0[var].attrs["units_label"]
ylabel_pre = "{} [{}]".format(per0[var].attrs["full_name"],
units_label)
var_std = var + "_std"
nperiods = len(self.periods)
if axs is not None:
fig = plt.gcf()
std_ds = xr.merge(self._stdda_l)
if var in _MONOAXIAL_VARS:
if axs is None:
fig, axs = plt.subplots(1, nperiods, **kwargs)
axs_temp = np.empty_like(axs)
for per_i in range(nperiods):
peri = self.subset_imu(per_i)
per_var = peri[var]
per_std = std_ds.loc[dict(period=per_i)][var_std]
per_temp = peri["temperature"]
ax_i = axs[per_i]
ax_temp = _plot_signal(per_var, ystd=per_std,
temp=per_temp, ax=ax_i)
ax_i.set_ylabel(ylabel_pre)
axs_temp[per_i] = ax_temp
# legend at center top panel
axs[1].legend(loc=9, bbox_to_anchor=(0.5, 1.15), ncol=3,
frameon=False, borderaxespad=0)
# Temperature legend at the bottom
axs_temp[1].legend(loc=9, bbox_to_anchor=(0.5, -0.23), ncol=2,
frameon=False, borderaxespad=0)
elif use_axis_order:
if axs is None:
fig, axs = plt.subplots(3, 1, sharex=False, **kwargs)
axs_temp = np.empty_like(axs)
for i, axis in enumerate(_AXIS_NAMES):
idx = self.axis_order.index(axis)
peri = self.subset_imu(idx)
per_var = peri[var].sel(axis=axis, drop=True)
per_std = (std_ds.loc[dict(period=idx)][var_std]
.sel(axis=axis, drop=True))
per_temp = peri["temperature"]
ax_i = axs[i]
if axis == "x":
neg_ref = True
else:
neg_ref = False
ax_temp = _plot_signal(per_var, ystd=per_std,
temp=per_temp, ax=ax_i,
neg_ref=neg_ref)
ax_i.set_xlabel("Period {}".format(idx))
ax_i.set_ylabel("{} {}".format(ylabel_pre, axis))
axs_temp[i] = ax_temp
# legend at top panel
axs[0].legend(loc=9, bbox_to_anchor=(0.5, 1.15), ncol=3,
frameon=False, borderaxespad=0)
# Temperature legend at the bottom
axs_temp[i].legend(loc=9, bbox_to_anchor=(0.5, -0.23), ncol=2,
frameon=False, borderaxespad=0)
else:
if axs is None:
fig, axs = plt.subplots(3, nperiods, **kwargs)
axs_temp = np.empty_like(axs)
for vert_i in range(nperiods):
axs_xyz = axs[:, vert_i]
for i, axis in enumerate(_AXIS_NAMES):
peri = self.subset_imu(vert_i)
per_var = peri[var].sel(axis=axis, drop=True)
per_std = (std_ds.loc[dict(period=vert_i)][var_std]
.sel(axis=axis, drop=True))
per_temp = peri["temperature"]
ax_i = axs_xyz[i]
ax_temp = _plot_signal(per_var, ystd=per_std,
temp=per_temp, ax=ax_i)
axs_temp[i, vert_i] = ax_temp
if vert_i == 0:
ax_i.set_ylabel("{} {}".format(ylabel_pre, axis))
else:
ax_i.set_ylabel("")
axs_xyz[0].set_title("Period {}".format(vert_i))
# legend at bottom panel
leg0 = axs[-1, 1].legend(loc=9, bbox_to_anchor=(0.5, -0.23),
ncol=3, frameon=False, borderaxespad=0)
# Temperature legend at bottom panel
leg1 = axs_temp[-1, 1].legend(loc=9, bbox_to_anchor=(0.5, -0.37),
ncol=2, frameon=False,
borderaxespad=0)
axs[-1, 1].add_artist(leg0)
axs_temp[-1, 1].add_artist(leg1)
return fig, axs, axs_temp
[docs]
def get_model(self, var, period, axis=None):
"""Retrieve linear model for a given IMU sensor axis signal
Parameters
----------
var : str
Name of the variable to calculate offset for.
period: int
Period containing calibration model to use.
axis : str, optional
Name of the sensor axis the signal comes from, if `var` is
tri-axial; ignored otherwise.
Returns
-------
RegressionResultsWrapper
"""
if var in _MONOAXIAL_VARS:
model_d = self.models_l[period][var]
model_fit = [*model_d.values()][0]
else:
model_fit = self.models_l[period][var][axis]
return model_fit
[docs]
def get_offset(self, var, period, T_alpha, ref_val, axis=None):
"""Calculate signal ofset at given temperature from calibration model
Parameters
----------
var : str
Name of the variable to calculate offset for.
period: int
Period (zero-based) containing calibration model to use.
T_alpha : float
Temperature at which to compute offset.
ref_val : float
Reference value for the chosen variable (e.g. gravity, for
acceleration).
axis : str, optional
Name of the sensor axis the signal comes from, if `var` is
tri-axial; ignored otherwise.
Returns
-------
float
Notes
-----
For obtaining offset and gain of magnetometer signals, the
ellipsoid method from the the ``ellipsoid`` module yields far more
accurate results, as it allows for the simultaneous
three-dimensional estimation of the offset.
"""
if var in _MONOAXIAL_VARS:
model_fit = self.get_model(var, period=period)
else:
model_fit = self.get_model(var, period=period, axis=axis)
ypred = (model_fit.predict(exog=dict(temperature=T_alpha))
.to_numpy().item())
logger.info("Predicted {} ({}, rounded) at T_alpha: {:.3f}"
.format(var, axis, ypred))
offset = ypred - ref_val
return offset
[docs]
def apply_model(self, var, dataset, T_alpha=None, ref_vals=None,
use_axis_order=True, model_idx=None):
r"""Apply fitted temperature compensation model to Dataset
The selected models for tri-axial sensor data are applied to input
Dataset, standardizing signals at :math:`T_{\alpha}`, optionally
subtracting the offset at :math:`T_{\alpha}`.
Parameters
----------
var : str
Name of the variable with tri-axial data.
dataset : xarray.Dataset
Dataset with temperature and tri-axial data from motionless IMU.
T_alpha : float, optional
Reference temperature at which all measurements will be
adjusted to. Default is the mean temperature in the input
dataset.
ref_vals : list, optional
Sequence of three floats with target values to compare against
the signal from each sensor axis. If provided, the offset of
each signal at :math:`T_{\alpha}` is computed and subtracted from
the temperature-standardized signal. The order should be the
same as in the `axis_order` attribute if `use_axis_order` is
True, or ``x``, ``y``, ``z`` otherwise.
use_axis_order : bool, optional
Whether to use axis order from the instance. If True, retrieve
model to apply using instance's `axis_order` attribute.
Otherwise, use the models defined by `model_idx` argument.
Ignored if `var` is monoaxial.
model_idx : list or int, optional
Sequence of three integers identifying the period (zero-based)
from which to retrieve the models for ``x``, ``y``, and ``z``
sensor axes, in that order. If `var` is monoaxial, an integer
specifying the period for the model to use. Ignored if
`use_axis_order` is True.
Returns
-------
xarray.Dataset
"""
temp_obs = dataset["temperature"]
darray = dataset[var]
if T_alpha is None:
T_alpha = temp_obs.mean().item()
logger.info("T_alpha set to {:.2f}".format(T_alpha))
def _standardize_array(darray, model_fit, period_idx, axis=None):
x_hat = (model_fit
.get_prediction(exog=dict(Intercept=1,
temperature=temp_obs))
.predicted_mean)
x_alpha = (model_fit
.get_prediction(exog=dict(Intercept=1,
temperature=T_alpha))
.predicted_mean)
x_sigma = darray - (x_hat - x_alpha)
if ref_vals is not None:
off = self.get_offset(var, axis=axis, period=period_idx,
T_alpha=T_alpha,
ref_val=ref_vals[period_idx])
x_sigma -= off
return x_sigma
darray_l = []
if var in _MONOAXIAL_VARS:
model_fit = self.get_model(var, period=model_idx)
x_sigma = _standardize_array(darray, model_fit=model_fit,
period_idx=model_idx)
darray_l.append(x_sigma)
elif use_axis_order:
for i, axis in enumerate(_AXIS_NAMES):
idx = self.axis_order.index(axis)
model_fit = self.get_model(var, period=idx, axis=axis)
x_i = darray.sel(axis=axis)
x_sigma = _standardize_array(x_i, model_fit=model_fit,
period_idx=idx, axis=axis)
darray_l.append(x_sigma)
else:
for i, axis in enumerate(_AXIS_NAMES):
model_fit = self.get_model(var, period=model_idx[i],
axis=axis)
x_i = darray.sel(axis=axis)
x_sigma = _standardize_array(x_i, model_fit=model_fit,
period_idx=model_idx[i],
axis=axis)
darray_l.append(x_sigma)
if len(darray_l) > 1:
darray_new = xr.concat(darray_l, dim="axis").transpose()
else:
darray_new = darray_l[0]
darray_new.attrs = darray.attrs
new_history = ("{}: Applied temperature model at: T={}\n"
.format(pd.to_datetime("today")
.strftime("%Y-%m-%d"), T_alpha))
darray_new.attrs["history"] = (darray_new.attrs["history"] +
new_history)
return darray_new
[docs]
def subset_imu(self, period_idx):
"""Subset IMU dataset given a period index
The dataset is subset using the slice corresponding to the period
index.
Parameters
----------
period_idx : int
Index of the experiment period to subset.
Returns
-------
xarray.Dataset
"""
time_name = self.time_name
return self.imu.loc[{time_name: self.periods[period_idx]}]