"""Functions to compute features across 2 dimensional arrays of data."""
import warnings
from copy import deepcopy
from functools import partial
from multiprocessing import Pool, cpu_count
import numpy as np
from bycycle.features import compute_features
from bycycle.burst import detect_bursts_cycles, detect_bursts_amp
from bycycle.group.utils import progress_bar, check_kwargs_shape
from bycycle.utils.dataframes import epoch_df
###################################################################################################
###################################################################################################
[docs]def compute_features_2d(sigs, fs, f_range, compute_features_kwargs=None, axis=0,
return_samples=True, n_jobs=-1, progress=None):
"""Compute shape and burst features for a 2 dimensional array of signals.
Parameters
----------
sigs : 2d array
Voltage time series, i.e. (n_channels, n_samples) or (n_epochs, n_samples).
fs : float
Sampling rate, in Hz.
f_range : tuple of (float, float)
Frequency range for narrowband signal of interest, in Hz.
compute_features_kwargs : dict or list of dict
Keyword arguments used in :func:`~.compute_features`.
axis : {0, None}
Which axes to calculate features across:
- ``axis=0`` : Iterates over each row/signal in an array independently (i.e. for each
channel in (n_channels, n_timepoints)).
- ``axis=None`` : Flattens rows/signals prior to computing features (i.e. across flatten
epochs in (n_epochs, n_timepoints)).
return_samples : bool, optional, default: True
Whether to return a dataframe of cyclepoint sample indices.
n_jobs : int, optional, default: -1
The number of jobs to compute features in parallel.
progress : {None, 'tqdm', 'tqdm.notebook'}
Specify whether to display a progress bar. Uses 'tqdm', if installed.
Returns
-------
dfs_features : list of pandas.DataFrame
Dataframes containing shape and burst features for each cycle.
Each dataframe is computed using the :func:`~.compute_features` function.
Notes
-----
- The order of ``dfs_features`` corresponds to the order of ``sigs``. This list of dataframes
may be reorganized into a single dataframe using :func:`~.flatten_dfs`.
- When ``axis=None`` parallel computation may not be performed due to the requirement of
flattening the array into one dimension.
- If ``compute_features_kwargs`` is a dictionary, the same kwargs are applied applied across
the first axis of ``sigs``. Otherwise, a list of dictionaries equal in length to the
first axis of ``sigs`` is required to apply unique kwargs to each signal.
- ``return_samples`` is controlled from the kwargs passed in this function. If
``return_samples`` is a key in ``compute_features_kwargs``, it's value will be ignored.
Examples
--------
Compute the features of a 2d array (n_epochs=10, n_samples=5000) containing epoched data:
>>> import numpy as np
>>> from neurodsp.sim import sim_bursty_oscillation
>>> fs = 500
>>> sigs = np.array([sim_bursty_oscillation(10, fs, 10) for i in range(10)])
>>> compute_kwargs = {'burst_method': 'amp', 'threshold_kwargs':{'burst_fraction_threshold': 1}}
>>> dfs_features = compute_features_2d(sigs, fs, f_range=(8, 12), axis=None,
... compute_features_kwargs=compute_kwargs)
Compute the features of a 2d array in parallel using the same compute_features kwargs. Note each
signal features are computed separately in this case, recommended for (n_channels, n_samples):
>>> compute_kwargs = {'burst_method': 'amp', 'threshold_kwargs':{'burst_fraction_threshold': 1}}
>>> dfs_features = compute_features_2d(sigs, fs, f_range=(8, 12), n_jobs=2, axis=0,
... compute_features_kwargs=compute_kwargs)
Compute the features of a 2d array in parallel using using individualized settings per signal to
examine the effect of various amplitude consistency thresholds:
>>> sigs = np.array([sim_bursty_oscillation(10, fs, freq=10)] * 10)
>>> compute_kwargs = [{'threshold_kwargs': {'amp_consistency_threshold': thresh*.1}}
... for thresh in range(1, 11)]
>>> dfs_features = compute_features_2d(sigs, fs, f_range=(8, 12), return_samples=False,
... n_jobs=2, compute_features_kwargs=compute_kwargs, axis=0)
"""
# Check compute_features_kwargs
kwargs = deepcopy(compute_features_kwargs)
kwargs = np.array(kwargs) if isinstance(kwargs, list) else kwargs
check_kwargs_shape(sigs, kwargs, axis)
kwargs = {} if kwargs is None else kwargs
kwargs = [kwargs] if isinstance(kwargs, dict) else list(kwargs)
# Drop return_samples argument, as it is set directly in the function call
for kwarg in kwargs:
kwarg.pop('return_samples', None)
n_jobs = cpu_count() if n_jobs == -1 else n_jobs
if axis == 0:
# Compute each signal independently and in paralllel
with Pool(processes=n_jobs) as pool:
if len(kwargs) > 1:
# Map iterable sigs and kwargs together
mapping = pool.imap(partial(_proxy_2d, fs=fs, f_range=f_range,
return_samples=return_samples),
zip(sigs, kwargs))
else:
# Only map sigs, kwargs are the same for each mapping
mapping = pool.imap(partial(compute_features, fs=fs, f_range=f_range,
return_samples=return_samples,
**kwargs[0]),
sigs)
dfs_features = list(progress_bar(mapping, progress, len(sigs)))
elif axis is None:
# Compute features after flattening the 2d array (i.e. calculated across a 1d signal)
sig_flat = sigs.flatten()
center_extrema = kwargs[0].pop('center_extrema', 'peak')
df_flat = compute_features(sig_flat, fs=fs, f_range=f_range, return_samples=True,
center_extrema=center_extrema, **kwargs[0])
dfs_features = epoch_df(df_flat, len(sig_flat), len(sigs[0]))
# Apply different thresholds if specified
if len(kwargs) > 0:
for idx, compute_kwargs in enumerate(kwargs):
burst_method = compute_kwargs.pop('burst_method', 'cycles')
thresholds = compute_kwargs.pop('threshold_kwargs', {})
center_extrema_next = compute_kwargs.pop('center_extrema', None)
if idx > 0 and center_extrema_next is not None \
and center_extrema_next != center_extrema:
warnings.warn('''
The same center extrema must be used when axis is None and
compute_features_kwargs is a list. Proceeding using the first
center_extrema: {extrema}.'''.format(extrema=center_extrema))
if burst_method == 'cycles':
dfs_features[idx] = detect_bursts_cycles(dfs_features[idx], **thresholds)
elif burst_method == 'amp':
dfs_features[idx] = detect_bursts_amp(dfs_features[idx], **thresholds)
else:
raise ValueError("The axis kwarg must be either 0 or None.")
return dfs_features
[docs]def compute_features_3d(sigs, fs, f_range, compute_features_kwargs=None, axis=0,
return_samples=True, n_jobs=-1, progress=None):
"""Compute shape and burst features for a 3 dimensional array of signals.
Parameters
----------
sigs : 3d array
Voltage time series, with 3d shape, i.e. (n_channels, n_epochs, n_samples).
fs : float
Sampling rate, in Hz.
f_range : tuple of (float, float)
Frequency range for narrowband signal of interest, in Hz.
compute_features_kwargs : dict or 1d list of dict or 2d list of dict
Keyword arguments used in :func:`~.compute_features`.
axis : {0, 1, (0, 1)}
Which axes to calculate features across:
- ``axis=0`` : Iterates over 2D slices along the zeroth dimension, (i.e. for each channel in
(n_channels, n_epochs, n_timepoints)).
- ``axis=1`` : Iterates over 2D slices along the first dimension (i.e. across flatten epochs
in (n_epochs, n_channels, n_timepoints)).
- ``axis=(0, 1)`` : Iterates over 1D slices along the zeroth and first dimensions (i.e across
each signal independently in (n_participants, n_channels, n_timepoints)).
return_samples : bool, optional, default: True
Whether to return a dataframe of cyclepoint sample indices.
n_jobs : int, optional, default: -1
The number of jobs to compute features in parallel.
progress : {None, 'tqdm', 'tqdm.notebook'}
Specify whether to display a progress bar. Uses 'tqdm' if installed.
Returns
-------
dfs_features : list of pandas.DataFrame
Dataframes containing shape and burst features for each cycle.
Each dataframe is computed using the :func:`~.compute_features` function.
Notes
-----
- The order of ``dfs_features`` corresponds to the order of ``sigs``. This list of dataframes
may be reorganized into a single dataframe using :func:`~.flatten_dfs`.
- If ``compute_features_kwargs`` is a dictionary, the same kwargs are applied applied across
all signals. A 1d list, equal in length to the first dimensions of sigs, may be applied to
each set of signals along the first dimensions. A 2d list, the same shape as the first two
dimensions of ``sigs`` may also be used to applied unique parameters to each signal.
- ``return_samples`` is controlled from the kwargs passed in this function. The
``return_samples`` value in ``compute_features_kwargs`` will be ignored.
Examples
--------
Compute the features of a 3d array, in parallel, with a shape of
(n_channels=2, n_epochs=3, n_signals=5000) using the same compute_features kwargs:
>>> import numpy as np
>>> from neurodsp.sim import sim_bursty_oscillation
>>> fs = 500
>>> sigs = np.array([[sim_bursty_oscillation(10, fs, freq=10) for epoch in range(3)]
... for ch in range(2)])
>>> threshold_kwargs = {'amp_consistency_threshold': .5, 'period_consistency_threshold': .5,
... 'monotonicity_threshold': .8, 'min_n_cycles': 3}
>>> compute_feature_kwargs = {'threshold_kwargs': threshold_kwargs, 'center_extrema': 'trough'}
>>> features = compute_features_3d(sigs, fs, f_range= (8, 12),
... compute_features_kwargs=compute_feature_kwargs, axis=0,
... n_jobs=2)
Compute the features of a 3d array, in parallel, with a shape of
(n_channels=2, n_epochs=3, n_signals=5000) using channel-specific compute_features kwargs:
>>> threshold_kwargs_ch1 = {'amp_consistency_threshold': .25, 'monotonicity_threshold': .25,
... 'period_consistency_threshold': .25, 'min_n_cycles': 3}
>>> threshold_kwargs_ch2 = {'amp_consistency_threshold': .5, 'monotonicity_threshold': .5,
... 'period_consistency_threshold': .5, 'min_n_cycles': 3}
>>> compute_kwargs = [{'threshold_kwargs': threshold_kwargs_ch1, 'center_extrema': 'trough'},
... {'threshold_kwargs': threshold_kwargs_ch2, 'center_extrema': 'trough'}]
>>> features = compute_features_3d(sigs, fs, f_range= (8, 12),
... compute_features_kwargs=compute_kwargs, axis=0, n_jobs=2)
"""
n_jobs = cpu_count() if n_jobs == -1 else n_jobs
# Convert list of kwargs to array to check dimensions
kwargs = deepcopy(compute_features_kwargs)
kwargs = np.array(kwargs) if isinstance(kwargs, list) else kwargs
check_kwargs_shape(sigs, kwargs, axis)
kwargs = list(kwargs.flatten()) if isinstance(kwargs, np.ndarray) else [kwargs]
if axis in [0, 1]:
# Independently across 2d slices along either the zeroth or first axis
sigs = np.swapaxes(sigs, 0, 1) if axis == 1 else sigs
kwargs = kwargs * np.shape(sigs)[0] if len(kwargs) == 1 else kwargs
with Pool(processes=n_jobs) as pool:
mapping = pool.imap(partial(_proxy_3d, fs=fs, f_range=f_range,
return_samples=return_samples),
zip(sigs, kwargs))
dfs_features = list(progress_bar(mapping, progress, len(sigs)))
# Swap the first two axes to return original shape
dfs_features = [list(dfs) for dfs in zip(*dfs_features)] if axis == 1 else dfs_features
elif axis == (0, 1):
# Independently across the first two axes (i.e. for each signal)
sigs_2d = sigs.reshape(np.shape(sigs)[0]*np.shape(sigs)[1], np.shape(sigs)[2])
kwargs = kwargs[0] if len(kwargs) == 1 else kwargs
df_2d = compute_features_2d(sigs_2d, fs, f_range, compute_features_kwargs=kwargs,
return_samples=return_samples, n_jobs=n_jobs,
progress=progress, axis=0)
else:
raise ValueError("The axis kwarg must be either 0, 1, or (0, 1).")
if axis == (0, 1):
dfs_features = np.zeros((np.shape(sigs)[0], np.shape(sigs)[1])).tolist()
# Reshape
for dim0_idx in range(np.shape(sigs)[0]):
for dim1_idx in range(np.shape(sigs)[1]):
dfs_features[dim0_idx][dim1_idx] = df_2d[dim0_idx + dim1_idx]
return dfs_features
def _proxy_2d(args, fs=None, f_range=None, return_samples=None):
"""Proxy function to map kwargs and 2d sigs together."""
sig, kwargs = args[0], args[1:]
return compute_features(sig, fs=fs, f_range=f_range,
return_samples=return_samples, **kwargs[0])
def _proxy_3d(args, fs=None, f_range=None, return_samples=None):
"""Proxy function to map kwargs and 3d sigs together."""
sigs, kwargs = args[0], args[1]
return compute_features_2d(sigs, fs, f_range, compute_features_kwargs=kwargs, axis=None,
return_samples=return_samples)