Source code for pymetawear.modules.base
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Base module
-----------
Created by hbldh <henrik.blidh@nedomkull.com> on 2016-04-14
Modified by lkasso <hello@mbientlab.com>
"""
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
import logging
from copy import deepcopy
from ctypes import c_int, c_uint, c_float, cast, POINTER, c_ubyte, byref
from functools import wraps
from threading import Event
import tqdm
from pymetawear import libmetawear
from pymetawear.exceptions import PyMetaWearException, PyMetaWearDownloadTimeout
from mbientlab.metawear.cbindings import FnVoid_VoidP_DataP, \
DataTypeId, CartesianFloat, BatteryState, Tcs34725ColorAdc, EulerAngles, \
Quaternion, CorrectedCartesianFloat, FnVoid_VoidP_VoidP, \
LogDownloadHandler, FnVoid_VoidP_UInt_UInt, \
FnVoid_VoidP_UByte_Long_UByteP_UByte
log = logging.getLogger(__name__)
[docs]class Modules(object):
"""Class for storing PyMetaWear module identifiers."""
MBL_MW_MODULE_NA = -1
MBL_MW_MODULE_SWITCH = 1
MBL_MW_MODULE_LED = 2
MBL_MW_MODULE_ACCELEROMETER = 3
MBL_MW_MODULE_TEMPERATURE = 4
MBL_MW_MODULE_GPIO = 5
MBL_MW_MODULE_NEO_PIXEL = 6
MBL_MW_MODULE_IBEACON = 7
MBL_MW_MODULE_HAPTIC = 8
MBL_MW_MODULE_DATA_PROCESSOR = 9
MBL_MW_MODULE_EVENT = 0xa
MBL_MW_MODULE_LOGGING = 0xb
MBL_MW_MODULE_TIMER = 0xc
MBL_MW_MODULE_I2C = 0xd
MBL_MW_MODULE_MACRO = 0xf
MBL_MW_MODULE_GSR = 0x10
MBL_MW_MODULE_SETTINGS = 0x11
MBL_MW_MODULE_BAROMETER = 0x12
MBL_MW_MODULE_GYRO = 0x13
MBL_MW_MODULE_AMBIENT_LIGHT = 0x14
MBL_MW_MODULE_MAGNETOMETER = 0x15
MBL_MW_MODULE_HUMIDITY = 0x16
MBL_MW_MODULE_COLOR_DETECTOR = 0x17
MBL_MW_MODULE_PROXIMITY = 0x18
MBL_MW_MODULE_SENSOR_FUSION = 0x19
MBL_MW_MODULE_DEBUG = 0xfe
[docs]class PyMetaWearModule(object):
"""Base class for PyMetaWear module implementations.
:param ctypes.c_long board: The MetaWear board pointer value.
:param bool debug: If ``True``, module prints out debug information.
"""
def __init__(self, board):
self.board = board
self.is_present = True
self.callback = None
def __str__(self):
return "PyMetaWearModule"
def __repr__(self):
return super(PyMetaWearModule, self).__repr__()
@property
def module_name(self):
"""Get module name.
:return: The name of this module.
:rtype: str
"""
return ''
@property
def sensor_name(self):
"""Get sensor name, if applicable.
:return: The name of this module.
:rtype: str or None
"""
return None
@property
def data_signal(self):
"""Returns the data signal pointer value for the switch module.
:returns: The pointer value. (Long if on x64 architecture.)
:rtype: :py:class:`ctypes.c_long` or :py:class:`ctypes.c_int`
"""
raise PyMetaWearException(
"No data signal exists for {0} module.".format(self))
def set_settings(self, **kwargs):
raise PyMetaWearException(
"No settings exists for {0} module.".format(self))
def get_current_settings(self):
return {}
def get_possible_settings(self):
return {}
[docs] def notifications(self, callback=None):
"""Toggle notifications/subscriptions to data signals
on the MetaWear board.
:param callable callback: The function to call when
data signal notification arrives. If ``None``, an
unsubscription to notifications is sent.
"""
data_signal = self.data_signal
if callback is not None:
log.debug("Subscribing to {0} changes. (Sig#: {1})".format(
self.module_name, data_signal))
if self.callback is not None:
raise PyMetaWearException(
"Subscription to {0} signal already in place!")
# Handle the context argument added in MetaWear C++ API 0.12.
callback = context_callback(callback)
self.callback = (callback, FnVoid_VoidP_DataP(callback))
libmetawear.mbl_mw_datasignal_subscribe(
data_signal, None, self.callback[1])
else:
log.debug("Unsubscribing to {0} changes. (Sig#: {1})".format(
self.module_name, data_signal))
if self.callback is None:
return
libmetawear.mbl_mw_datasignal_unsubscribe(data_signal)
self.callback = None
[docs]class PyMetaWearLoggingModule(PyMetaWearModule):
"""Special class with additions for modules with logging support."""
def __init__(self, board):
super(PyMetaWearLoggingModule, self).__init__(board)
self._logger_ready_event = None
self._data_received = None
self._download_done = False
self._logger_running = False
self._logger_address = None
self._progress_bar = None
self._logged_data = []
def _logger_ready(self, address):
if address:
self._logger_address = address
log.debug("Logger address: {0}".format(self._logger_address))
else:
# Do nothing here. Let main thread handle lack of address.
pass
self._logger_ready_event.set()
def _progress_update(self, entries_left, total_entries):
if self._progress_bar is None:
self._progress_bar = tqdm.tqdm(total=total_entries)
n_read_entries = total_entries - entries_left
self._progress_bar.update(n_read_entries - self._progress_bar.n)
self._data_received.set()
if entries_left == 0:
self._progress_bar.update(
total_entries - self._progress_bar.n)
self._progress_bar.close()
self._progress_bar = None
self._download_done = True
def _unknown_entry(self, id, epoch, data, length):
"""Handle unknown data entries in the log.
I have no idea what this data is. Needs further investigation.
:param id (int):
:param epoch (int):
:param data:
:param length (int):
"""
self._data_received.set()
log.debug('Unknown Entry: ID: {0}, epoch: {1}, '
'data: {2}, Length: {3}'.format(
id, epoch, bytearray(data)[:length], length))
def _unhandled_entry(self, data):
self._data_received.set()
log.debug('Unhandled Entry: ' + str(data))
def _default_download_callback(self, data):
self._logged_data.append(data)
def start(self):
raise NotImplementedError("Must be implemented by module.")
def stop(self):
raise NotImplementedError("Must be implemented by module.")
def toggle_sampling(self, enabled=True):
raise NotImplementedError("Must be implemented by module.")
[docs] def start_logging(self):
"""Setup and start logging of data signals on the MetaWear board"""
data_signal = self.data_signal
if getattr(self, 'high_frequency_stream', False):
raise PyMetaWearException(
"Cannot log on high frequency stream signal.")
self._logger_ready_event = Event()
logger_ready = FnVoid_VoidP_VoidP(context_callback(self._logger_ready))
libmetawear.mbl_mw_datasignal_log(self.data_signal, None, logger_ready)
self._logger_ready_event.wait()
if self._logger_address is None:
raise PyMetaWearException(
'Failed to start logging for {0} module!'.format(
self.module_name))
log.debug("Start Logger (Logger#: {0}, Signal#: {1})".format(
self._logger_address, data_signal))
self._logger_running = True
self._download_done = False
libmetawear.mbl_mw_logging_start(self.board, 0)
self.toggle_sampling(True)
self.start()
[docs] def stop_logging(self):
"""Stop logging of data signals on the MetaWear board"""
self.stop()
self.toggle_sampling(False)
log.debug("Stop Logger (Logger#: {0})".format(self._logger_address))
libmetawear.mbl_mw_logging_stop(self.board)
self._logger_running = False
[docs] def download_log(
self,
timeout=3.0,
data_callback=None,
progress_update_function=None,
unknown_entry_function=None,
unhandled_entry_function=None
):
"""Download logged data from the MetaWear board
:param timeout: Time to wait for download to resume if connection is lost.
:param data_callback: Function called to process each downloaded sample.
:param progress_update_function: Function called each
`progress_update_each_n_sample` to give feedback on download progress.
:param unknown_entry_function: Function called when unknown logging
entries are encountered.
:param unhandled_entry_function: Function called when unhandled entries
are encountered.
:return: The logged data, in case download was successful.
"""
if self._logger_running:
# Stop logging if it is active.
self.stop_logging()
if data_callback is None:
data_callback = data_handler(self._default_download_callback)
if progress_update_function is None:
progress_update_function = self._progress_update
if unknown_entry_function is None:
unknown_entry_function = self._unknown_entry
if unhandled_entry_function is None:
unhandled_entry_function = self._unhandled_entry
data_point_handler = FnVoid_VoidP_DataP(context_callback(data_callback))
progress_update = FnVoid_VoidP_UInt_UInt(context_callback(progress_update_function))
unknown_entry = FnVoid_VoidP_UByte_Long_UByteP_UByte(context_callback(unknown_entry_function))
unhandled_entry = FnVoid_VoidP_DataP(data_handler(context_callback(unhandled_entry_function)))
log_download_handler = LogDownloadHandler(
received_progress_update=progress_update,
received_unknown_entry=unknown_entry,
received_unhandled_entry=unhandled_entry
)
log.debug("Subscribe to Logger. (Logger#: {0})".format(
self._logger_address))
libmetawear.mbl_mw_logger_subscribe(
self._logger_address, None, data_point_handler)
self._data_received = Event()
log.debug("Waiting for completed download. (Logger#: {0})".format(
self._logger_address))
libmetawear.mbl_mw_logging_download(
self.board, 1000,
byref(log_download_handler))
while not self._download_done:
status = self._data_received.wait(timeout)
self._data_received.clear()
if not self._download_done and not status:
if self._progress_bar is not None:
self._progress_bar.close()
self._progress_bar = None
raise PyMetaWearDownloadTimeout(
"Bluetooth connection lost! Please reconnect and retry download...")
log.debug("Download done. (Logger#: {0})".format(
self._logger_address))
log.debug("Remove logger. (Logger#: {0})".format(
self._logger_address))
libmetawear.mbl_mw_logger_remove(self._logger_address)
logged_data = self._logged_data
self._logged_data = []
self._data_received = None
return logged_data
def _error_handler(data):
raise RuntimeError('Unrecognized data type id: ' +
str(data.contents.type_id))
def _byte_array_handler(data):
ptr = cast(data.contents.value, POINTER(c_ubyte * data.contents.length))
return [ptr.contents[i].value for i in range(0, data.contents.length)]
DATA_HANDLERS = {
DataTypeId.UINT32: lambda x: cast(
x.contents.value, POINTER(c_uint)).contents.value,
DataTypeId.INT32: lambda x: cast(
x.contents.value, POINTER(c_int)).contents.value,
DataTypeId.FLOAT: lambda x: cast(
x.contents.value, POINTER(c_float)).contents.value,
DataTypeId.CARTESIAN_FLOAT: lambda x: cast(
x.contents.value, POINTER(CartesianFloat)).contents,
DataTypeId.BATTERY_STATE: lambda x: cast(
x.contents.value, POINTER(BatteryState)).contents,
DataTypeId.BYTE_ARRAY: _byte_array_handler,
DataTypeId.TCS34725_ADC: lambda x: cast(
x.contents.value, POINTER(Tcs34725ColorAdc)).contents,
DataTypeId.EULER_ANGLE: lambda x: cast(
x.contents.value, POINTER(EulerAngles)).contents,
DataTypeId.QUATERNION: lambda x: cast(
x.contents.value, POINTER(Quaternion)).contents,
DataTypeId.CORRECTED_CARTESIAN_FLOAT: lambda x: cast(
x.contents.value, POINTER(CorrectedCartesianFloat)).contents
}
[docs]def context_callback(func):
"""Decorator to handle the context argument added in MetaWear C++ API 0.12
This method is used internally so that the end-user should not have to
know about and handle the ``context`` argument.
:param func: The function to add context argument to.
:return: The wrapped function.
"""
@wraps(func)
def wrapper(context, *args):
func(*args)
return wrapper
def data_handler(func):
@wraps(func)
def wrapper(data):
func({
'epoch': int(data.contents.epoch),
'value': deepcopy(DATA_HANDLERS.get(
data.contents.type_id, _error_handler)(data)),
})
return wrapper