"""Representation of Inertial Measurement Unit data
"""
import numpy as np
import pandas as pd
import allantools as allan
import ahrs.filters as filters
from scipy import constants, signal, integrate
from sklearn import preprocessing
from skdiveMove.tdrsource import _load_dataset
from .allan import allan_coefs
from .vector import rotate_vector
_TIME_NAME = "timestamp"
_DEPTH_NAME = "depth"
_ACCEL_NAME = "acceleration"
_OMEGA_NAME = "angular_velocity"
_MAGNT_NAME = "magnetic_density"
[docs]
class IMUBase:
"""Define IMU data source
Use :class:`xarray.Dataset` to ensure pseudo-standard metadata.
Attributes
----------
imu_file : str
String indicating the file where the data comes from.
imu : xarray.Dataset
Dataset with input data.
imu_var_names : list
Names of the data variables with accelerometer, angular velocity,
and magnetic density measurements.
has_depth : bool
Whether input data include depth measurements.
depth_name : str
Name of the data variable with depth measurements.
time_name : str
Name of the time dimension in the dataset.
quats : numpy.ndarray
Array of quaternions representing the orientation relative to the
frame of the IMU object data. Note that the scalar component is
last, following `scipy`'s convention.
Examples
--------
This example illustrates some of the issues encountered while reading
data files in a real-world scenario. `scikit-diveMove` includes a
NetCDF file with IMU signals collected using a Samsung Galaxy S5 mobile
phone. Set up instance from NetCDF example data:
>>> import importlib.resources as rsrc
>>> import xarray as xr
>>> import skdiveMove.imutools as imutools
>>> icdf = (rsrc.files("skdiveMove") / "tests" / "data" /
... "samsung_galaxy_s5.nc")
The angular velocity and magnetic density arrays have two sets of
measurements: output and measured, which, along with the sensor axis
designation, constitutes a multi-index. These multi-indices can be
rebuilt prior to instantiating IMUBase, as they provide significant
advantages for indexing later:
>>> s5ds = (xr.load_dataset(icdf)
... .set_index(gyroscope=["gyroscope_type", "gyroscope_axis"],
... magnetometer=["magnetometer_type",
... "magnetometer_axis"]))
>>> imu = imutools.IMUBase(s5ds.sel(gyroscope="output",
... magnetometer="output"),
... imu_filename=icdf)
See :doc:`/demos/demo_allan` demo for an extended example of typical
usage of the methods in this class.
"""
def __init__(self, dataset,
acceleration_name=_ACCEL_NAME,
angular_velocity_name=_OMEGA_NAME,
magnetic_density_name=_MAGNT_NAME,
time_name=_TIME_NAME,
has_depth=False, depth_name=_DEPTH_NAME,
imu_filename=None):
"""Set up attributes for IMU objects
Parameters
----------
dataset : xarray.Dataset
Dataset containing IMU sensor DataArrays, and optionally other
DataArrays.
acceleration_name : str, optional
Name of the acceleration :class:`~xarray.DataArray` in the
`dataset`.
angular_velocity_name : str, optional
Name of the angular velocity :class:`~xarray.DataArray` in the
`dataset`.
magnetic_density_name : str, optional
Name of the magnetic density :class:`~xarray.DataArray` in the
`dataset`.
time_name : str, optional
Name of the time dimension in the `dataset`.
has_depth : bool, optional
Whether input data include depth measurements.
depth_name : str, optional
Name of the depth :class:`~xarray.DataArray` in the `dataset`.
imu_filename : str, optional
Name of the file from which `dataset` originated.
"""
self.time_name = time_name
self.imu = dataset
self.imu_var_names = [acceleration_name,
angular_velocity_name,
magnetic_density_name]
if has_depth:
self.has_depth = True
self.depth_name = depth_name
else:
self.has_depth = False
self.depth_name = None
self.imu_file = imu_filename
self.quats = None
[docs]
@classmethod
def read_netcdf(cls, imu_file,
acceleration_name=_ACCEL_NAME,
angular_velocity_name=_OMEGA_NAME,
magnetic_density_name=_MAGNT_NAME,
time_name=_TIME_NAME,
has_depth=False, depth_name=_DEPTH_NAME,
**kwargs):
"""Instantiate object by loading Dataset from NetCDF file
Provided all `DataArrays` in the NetCDF file have the same
dimensions (N, 3), this is an efficient way to instantiate.
Parameters
----------
imu_file : str
As first argument for :func:`xarray.load_dataset`.
acceleration_name : str, optional
Name of the acceleration :class:`~xarray.DataArray` in the
:class:`~xarray.Dataset`.
angular_velocity_name : str, optional
Name of the angular velocity :class:`~xarray.DataArray` in the
:class:`~xarray.Dataset`.
magnetic_density_name : str, optional
Name of the magnetic density :class:`~xarray.DataArray` in the
:class:`~xarray.Dataset`.
has_depth : bool, optional
Whether input data include depth measurements.
depth_name : str, optional
Name of the depth :class:`~xarray.DataArray` in the
:class:`~xarray.Dataset`.
**kwargs
Optional keyword arguments passed to
:func:`xarray.load_dataset`.
Returns
-------
obj : IMUBase
Class matches the caller.
"""
dataset = _load_dataset(imu_file, **kwargs)
return cls(dataset, acceleration_name=acceleration_name,
angular_velocity_name=angular_velocity_name,
magnetic_density_name=magnetic_density_name,
time_name=time_name, has_depth=has_depth,
depth_name=depth_name, imu_filename=imu_file)
def __str__(self):
x = self.imu
objcls = ("IMU -- Class {} object\n"
.format(self.__class__.__name__))
src = "{0:<20} {1}\n".format("Source File", self.imu_file)
imu_desc = "IMU: {}".format(x.__str__())
return objcls + src + imu_desc
def _allan_deviation(self, sensor, taus):
"""Compute Allan deviation for all axes of a given sensor
Currently uses the modified Allan deviation in package
`allantools`.
Parameters
----------
sensor : str
Attribute name of the sensor of interest
taus : {float, str}
Tau value, in seconds, for which to compute statistic. Can be
one of "octave" or "decade" for automatic generation of the
value.
Returns
-------
pandas.DataFrame
Allan deviation and error for each sensor axis. DataFrame
index is the averaging time `tau` for each estimate.
"""
sensor_obj = getattr(self, sensor)
sampling_rate = float(sensor_obj.attrs["sampling_rate"])
sensor_std = preprocessing.scale(sensor_obj, with_std=False)
allan_l = []
for axis in sensor_std.T:
taus, adevs, errs, ns = allan.mdev(axis, rate=sampling_rate,
data_type="freq",
taus=taus)
# taus is common to all sensor axes
adevs_df = pd.DataFrame(np.column_stack((adevs, errs)),
columns=["allan_dev", "error"],
index=taus)
allan_l.append(adevs_df)
keys = [sensor + "_" + i for i in list("xyz")]
devs = pd.concat(allan_l, axis=1, keys=keys)
return devs
[docs]
def allan_coefs(self, sensor, taus):
"""Estimate Allan deviation coefficients for each error type
This procedure implements the autonomous regression method for
Allan variance described in [1]_.
Given averaging intervals `taus` and corresponding Allan deviation
`adevs`, compute the Allan deviation coefficient for each error
type:
- Quantization
- (Angle, Velocity) Random Walk
- Bias Instability
- Rate Random Walk
- Rate Ramp
Parameters
----------
sensor : str
Attribute name of the sensor of interest
taus : {float, str}
Tau value, in seconds, for which to compute statistic. Can be
one of "octave" or "decade" for automatic generation of the
value.
Returns
-------
coefs_all : pandas.DataFrame
Allan deviation coefficient and corresponding averaging time
for each sensor axis and error type.
adevs : pandas.DataFrame
:class:`~pandas.MultiIndex` DataFrame with Allan deviation,
corresponding averaging time, and fitted ARMAV model estimates
of the coefficients for each sensor axis and error type.
Notes
-----
Currently uses a modified Allan deviation formula.
.. [1] Jurado, J, Schubert Kabban, CM, Raquet, J (2019). A
regression-based methodology to improve estimation of
inertial sensor errors using Allan variance data. Navigation
66:251-263.
"""
adevs_errs = self._allan_deviation(sensor, taus)
taus = adevs_errs.index.to_numpy()
adevs = adevs_errs.xs("allan_dev", level=1, axis=1).to_numpy()
coefs_l = []
fitted_l = []
for adevs_i in adevs.T:
coefs_i, adevs_fitted = allan_coefs(taus, adevs_i)
# Parse output for dataframe
coefs_l.append(pd.Series(coefs_i))
fitted_l.append(adevs_fitted)
keys = [sensor + "_" + i for i in list("xyz")]
coefs_all = pd.concat(coefs_l, keys=keys, axis=1)
fitted_all = pd.DataFrame(np.column_stack(fitted_l), columns=keys,
index=taus)
fitted_all.columns = (pd.MultiIndex
.from_tuples([(c, "fitted")
for c in fitted_all]))
adevs = (pd.concat([adevs_errs, fitted_all], axis=1)
.sort_index(axis=1))
return (coefs_all, adevs)
[docs]
def compute_orientation(self, method="Madgwick", **kwargs):
"""Compute the orientation of IMU tri-axial signals
The method must be one of the following estimators implemented in
Python module :mod:`ahrs.filters`:
- ``AngularRate``: Attitude from angular rate
- ``AQUA``: Algebraic quaternion algorithm
- ``Complementary``: Complementary filter
- ``Davenport``: Davenport's q-method
- ``EKF``: Extended Kalman filter
- ``FAAM``: Fast accelerometer-magnetometer combination
- ``FLAE``: Fast linear attitude estimator
- ``Fourati``: Fourati's nonlinear attitude estimation
- ``FQA``: Factored quaternion algorithm
- ``Madgwick``: Madgwick orientation filter
- ``Mahony``: Mahony orientation filter
- ``OLEQ``: Optimal linear estimator quaternion
- ``QUEST``
- ``ROLEQ``: Recursive optimal linear estimator of quaternion
- ``SAAM``: Super-fast attitude from accelerometer and magnetometer
- ``Tilt``: Attitude from gravity
The estimated quaternions are stored in the ``quats`` attribute.
Parameters
----------
method : str, optional
Name of the filtering method to use.
**kwargs
Optional keyword arguments passed to filtering method.
"""
orienter_cls = getattr(filters, method)
orienter = orienter_cls(acc=self.acceleration,
gyr=self.angular_velocity,
mag=self.magnetic_density,
Dt=self.sampling_interval,
**kwargs)
self.quats = orienter.Q
[docs]
def dead_reckon(self, g=constants.g, Wn=1.0, k=1.0):
"""Calculate position assuming orientation is already known
Integrate dynamic acceleration in the body frame to calculate
velocity and position. If the IMU instance has a depth signal, it
is used in the integration instead of acceleration in the vertical
dimension.
Parameters
----------
g : float, optional
Assume gravity (:math:`m / s^2`) is equal to this value.
Default to standard gravity.
Wn : float, optional
Cutoff frequency for second-order Butterworth lowpass filter.
k : float, optional
Scalar to apply to scale lowpass-filtered dynamic acceleration.
This scaling has the effect of making position estimates
realistic for dead-reckoning tracking purposes.
Returns
-------
vel, pos : numpy.ndarray
Velocity and position 2D arrays.
"""
# Acceleration, velocity, and position from q and the measured
# acceleration, get the \frac{d^2x}{dt^2}. Retrieved sampling
# frequency assumes common frequency
fs = self.acceleration.attrs["sampling_rate"]
# Shift quaternions to scalar last to match convention
quats = np.roll(self.quats, -1, axis=1)
g_v = rotate_vector(np.array([0, 0, g]), quats, inverse=True)
acc_sensor = self.acceleration - g_v
acc_space = rotate_vector(acc_sensor, quats, inverse=False)
# Low-pass Butterworth filter design
b, a = signal.butter(2, Wn, btype="lowpass", output="ba", fs=fs)
acc_space_f = signal.filtfilt(b, a, acc_space, axis=0)
# Position and Velocity through integration, assuming 0-velocity at t=0
vel = integrate.cumulative_trapezoid(acc_space_f / k, dx=1.0 / fs,
initial=0, axis=0)
# Use depth derivative (on FLU) for the vertical dimension
if self.has_depth:
pos_z = self.depth
zdiff = np.append([0], np.diff(pos_z))
vel[:, -1] = -zdiff
pos = np.nan * np.ones_like(acc_space)
pos[:, -1] = pos_z
pos[:, :2] = (integrate
.cumulative_trapezoid(vel[:, :2], dx=1.0 / fs,
axis=0, initial=0))
else:
pos = integrate.cumulative_trapezoid(vel, dx=1.0 / fs,
axis=0, initial=0)
return vel, pos
def _get_acceleration(self):
# Acceleration name is the first
return self.imu[self.imu_var_names[0]]
acceleration = property(_get_acceleration)
"""Return acceleration array
Returns
-------
xarray.DataArray
"""
def _get_angular_velocity(self):
# Angular velocity name is the second
return self.imu[self.imu_var_names[1]]
angular_velocity = property(_get_angular_velocity)
"""Return angular velocity array
Returns
-------
xarray.DataArray
"""
def _get_magnetic_density(self):
# Magnetic density name is the last one
return self.imu[self.imu_var_names[-1]]
magnetic_density = property(_get_magnetic_density)
"""Return magnetic_density array
Returns
-------
xarray.DataArray
"""
def _get_depth(self):
return getattr(self.imu, self.depth_name)
depth = property(_get_depth)
"""Return depth array
Returns
-------
xarray.DataArray
"""
def _get_sampling_interval(self):
# Retrieve sampling rate from one DataArray
sampling_rate = self.acceleration.attrs["sampling_rate"]
sampling_rate_units = (self.acceleration
.attrs["sampling_rate_units"])
if sampling_rate_units.lower() == "hz":
itvl = 1.0 / sampling_rate
else:
itvl = sampling_rate
return itvl
sampling_interval = property(_get_sampling_interval)
"""Return sampling interval
Assuming all `DataArray`s have the same interval, the sampling interval
is retrieved from the acceleration `DataArray`.
Returns
-------
xarray.DataArray
Warnings
--------
The sampling rate is retrieved from the attribute named `sampling_rate`
in the NetCDF file, which is assumed to be in Hz units.
"""