#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Sensor Fusion module
--------------------
Created by mgeorgi <marcus.georgi@kinemic.de> on 2017-02-01
"""
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
import logging
from threading import Event
from pymetawear import libmetawear
from pymetawear.exceptions import PyMetaWearException
from mbientlab.metawear.cbindings import SensorFusionAccRange, \
SensorFusionData, SensorFusionGyroRange, SensorFusionMode, \
SensorOrientation, FnVoid_VoidP_VoidP, FnVoid_VoidP_DataP, TimeMode
from pymetawear.modules.base import PyMetaWearLoggingModule, Modules, data_handler, context_callback
log = logging.getLogger(__name__)
PROCESSOR_SET_WAIT_TIME = 5
def require_fusion_module(f):
def wrapper(*args, **kwargs):
if getattr(args[0], 'available', False) is False:
raise PyMetaWearException("There is no Sensor Fusion "
"module on your MetaWear board!")
return f(*args, **kwargs)
return wrapper
[docs]class SensorFusionModule(PyMetaWearLoggingModule):
"""MetaWear accelerometer module implementation.
:param ctypes.c_long board: The MetaWear board pointer value.
:param int module_id: The module id of the sensorfusion
component, obtained from ``libmetawear``.
:param bool debug: If ``True``, module prints out debug information.
"""
def __init__(self, board, module_id):
super(SensorFusionModule, self).__init__(board)
self.module_id = module_id
if self.module_id == Modules.MBL_MW_MODULE_NA:
# No sensor fusion present!
self.available = False
else:
self.available = True
self.board = board
self.current_active_signal = None
self._streams_to_enable = {
SensorFusionData.CORRECTED_ACC: False,
SensorFusionData.CORRECTED_GYRO: False,
SensorFusionData.CORRECTED_MAG: False,
SensorFusionData.QUATERNION: False,
SensorFusionData.EULER_ANGLE: False,
SensorFusionData.GRAVITY_VECTOR: False,
SensorFusionData.LINEAR_ACC: False,
}
self._data_source_signals = {
SensorFusionData.CORRECTED_ACC: None,
SensorFusionData.CORRECTED_GYRO: None,
SensorFusionData.CORRECTED_MAG: None,
SensorFusionData.QUATERNION: None,
SensorFusionData.EULER_ANGLE: None,
SensorFusionData.GRAVITY_VECTOR: None,
SensorFusionData.LINEAR_ACC: None,
}
self._callbacks = {}
def __str__(self):
return "{0}".format(self.module_name)
def __repr__(self):
return str(self)
@require_fusion_module
def set_sample_delay(self, data_source, delay=None, differential=False):
"""
Change the delay between samples using the onboard time processor
module to change the effective sampling rate of a specific data source.
:param data_source: A data source from sensor.SensorFusion
:param delay: The delay in ms between samples,
or None to reset to default
:param differential: Set Time Preprocessor mode to differential,
instead of the default, absolute
"""
if self._data_source_signals[data_source] is None:
log.debug("Getting data signal for data source {0}".format(
data_source
))
self._data_source_signals[data_source] = \
libmetawear.mbl_mw_sensor_fusion_get_data_signal(
self.board, data_source
)
if delay is not None:
mode = TimeMode.DIFFERENTIAL if differential else \
TimeMode.ABSOLUTE
log.debug("Creating time dataprocessor for signal {0}".format(
self._data_source_signals[data_source]
))
_done = Event()
def _processor_set(context, processor):
"""
Set global variables as the libmetawear callback can't handle the self
parameter of instance methods.
:param context: Pointer to additional data for the callback function
:param processor: The processor that was created
"""
self._data_source_signals[data_source] = processor
_done.set()
processor_set_func = FnVoid_VoidP_VoidP(_processor_set)
libmetawear.mbl_mw_dataprocessor_time_create(
self._data_source_signals[data_source],
mode,
delay,
None,
processor_set_func)
_done.wait(timeout=PROCESSOR_SET_WAIT_TIME)
if self._data_source_signals[data_source] is None:
raise PyMetaWearException("Can't set data processor!")
else:
data_signal = libmetawear.mbl_mw_sensor_fusion_get_data_signal(
self.board, data_source)
if self._data_source_signals[data_source] != data_signal:
libmetawear.mbl_mw_dataprocessor_remove(
self._data_source_signals[data_source]
)
self._data_source_signals[data_source] = data_signal
def _delay_set(self, processor):
self._current_processor = processor
self._waiting_for_processor = False
@require_fusion_module
def set_mode(self, mode):
libmetawear.mbl_mw_sensor_fusion_set_mode(self.board,
mode)
libmetawear.mbl_mw_sensor_fusion_write_config(self.board)
@require_fusion_module
def set_acc_range(self, acc_range):
libmetawear.mbl_mw_sensor_fusion_set_acc_range(self.board,
acc_range)
libmetawear.mbl_mw_sensor_fusion_write_config(self.board)
@require_fusion_module
def set_gyro_range(self, gyro_range):
libmetawear.mbl_mw_sensor_fusion_set_gyro_range(self.board,
gyro_range)
libmetawear.mbl_mw_sensor_fusion_write_config(self.board)
@require_fusion_module
def get_data_signal(self, data_source):
if self._data_source_signals[data_source] is None:
self._data_source_signals[data_source] = \
libmetawear.mbl_mw_sensor_fusion_get_data_signal(
self.board, data_source
)
return self._data_source_signals[data_source]
@property
def data_signal(self):
return self.current_active_signal
@property
def module_name(self):
return "Sensor Fusion"
def get_current_settings(self):
raise NotImplementedError()
def get_possible_settings(self):
raise NotImplementedError()
def set_settings(self):
raise NotImplementedError()
@require_fusion_module
def notifications(self,
corrected_acc_callback=None,
corrected_gyro_callback=None,
corrected_mag_callback=None,
quaternion_callback=None,
euler_angle_callback=None,
gravity_callback=None,
linear_acc_callback=None):
"""Subscribe or unsubscribe to sensor fusion notifications.
Convenience method for handling sensor fusion usage.
Example:
.. code-block:: python
def handle_notification(data):
# Handle dictionary with [epoch, value] keys.
epoch = data["epoch"]
xyz = data["value"]
print(str(data))
mwclient.sensorfusion.notifications(
corrected_acc_callback=handle_notification)
:param callable corrected_acc_callback: Acceleration notification
callback function.
If `None`, unsubscription to acceleration notifications is
registered.
:param callable corrected_gyro_callback: Gyroscope notification
callback function.
If `None`, unsubscription to gyroscope notifications is registered.
:param callable corrected_mag_callback: Magnetometer notification
callback function.
If `None`, unsubscription to magnetometer notifications is
registered.
:param callable quaternion_callback: Quaternion notification callback
function.
If `None`, unsubscription to quaternion notifications is registered.
:param callable euler_angle_callback: Euler angle notification callback
function.
If `None`, unsubscription to euler angle notifications is
registered.
:param callable gravity_callback: Gravity vector notification callback
function.
If `None`, unsubscription to gravity notifications is registered.
:param callable linear_acc_callback: Linear acceleration notification
callback function.
If `None`, unsubscription to linear acceleration notifications is
registered.
"""
callback_data_source_map = {
SensorFusionData.CORRECTED_ACC: corrected_acc_callback,
SensorFusionData.CORRECTED_GYRO: corrected_gyro_callback,
SensorFusionData.CORRECTED_MAG: corrected_mag_callback,
SensorFusionData.QUATERNION: quaternion_callback,
SensorFusionData.EULER_ANGLE: euler_angle_callback,
SensorFusionData.GRAVITY_VECTOR: gravity_callback,
SensorFusionData.LINEAR_ACC: linear_acc_callback
}
for data_source in callback_data_source_map:
if callback_data_source_map[data_source] is not None:
self._streams_to_enable[data_source] = True
else:
self._streams_to_enable[data_source] = False
enable = False in [x is None for x in callback_data_source_map.values()]
log.debug("Enable: %s" % enable)
if not enable:
self.stop()
self.toggle_sampling(False)
for data_source in callback_data_source_map:
self.current_active_signal = self.get_data_signal(data_source)
callback = callback_data_source_map[data_source]
if callback is not None:
self.check_and_change_callback(
self.get_data_signal(data_source),
data_handler(callback)
)
else:
self.check_and_change_callback(
self.get_data_signal(data_source),
None
)
if enable:
self.toggle_sampling(True)
self.start()
def check_and_change_callback(self, data_signal, callback):
if callback is not None:
log.debug("Subscribing to {0} changes. (Sig#: {1})".format(
self.module_name, data_signal))
if data_signal in self._callbacks:
log.debug('Replacing callback for datasignal {0}...'.format(
data_signal))
libmetawear.mbl_mw_datasignal_unsubscribe(data_signal)
self._callbacks.pop(data_signal)
callback = context_callback(callback)
self._callbacks[data_signal] = (callback, FnVoid_VoidP_DataP(callback))
libmetawear.mbl_mw_datasignal_subscribe(
data_signal, None, self._callbacks[data_signal][1])
else:
if data_signal not in self._callbacks:
return
log.debug("Unsubscribing to {0} changes. (Sig#: {1})".format(
self.module_name, data_signal))
libmetawear.mbl_mw_datasignal_unsubscribe(data_signal)
self._callbacks.pop(data_signal)
@require_fusion_module
def start(self):
"""Switches the sensorfusion to active mode."""
libmetawear.mbl_mw_sensor_fusion_start(self.board)
@require_fusion_module
def stop(self):
"""Switches the sensorfusion to standby mode."""
libmetawear.mbl_mw_sensor_fusion_stop(self.board)
@require_fusion_module
def toggle_sampling(self, enabled=True):
"""Enables or disables sensor fusion sampling.
:param bool enabled: Desired state of the sensor fusion module.
"""
if enabled:
for data_source in self._streams_to_enable:
if self._streams_to_enable[data_source]:
libmetawear.mbl_mw_sensor_fusion_enable_data(
self.board, data_source)
else:
libmetawear.mbl_mw_sensor_fusion_clear_enabled_mask(
self.board)