Source code for bycycle.cyclepoints.phase

"""Interpolated phase, using cyclepoints."""

import numpy as np

###################################################################################################
###################################################################################################

[docs]def extrema_interpolated_phase(sig, peaks, troughs, rises=None, decays=None): """Use extrema and (optionally) zero-crossings to estimate instantaneous phase. Parameters ---------- sig : 1d array Time series. peaks : 1d array Samples of oscillatory peaks. troughs : 1d array Samples of oscillatory troughs. rises : 1d array, optional Samples of oscillatory rising zero-crossings. decays : 1d array, optional Samples of oscillatory decaying zero-crossings. Returns ------- pha : 1d array Instantaneous phase time series. Notes ----- - Phase is encoded as: - phase 0 for peaks - phase pi/2 for decay zero-crossing - phase pi/-pi for troughs - phase -pi/2 for rise zero-crossing - Extrema and zero-crossing estimation can be poor if, for example, the signal is noisy. In such cases, the same index may be assigned to both a peak and a decaying zero-crossing. To address this, we first assign phase values by zero-crossings, and then may overwrite them with extrema phases. - Using burst detection helps avoid analyzing oscillatory properties of non-oscillatory sections of the signal. Examples -------- Estimate phase from peaks and troughs: >>> from neurodsp.sim import sim_bursty_oscillation >>> from bycycle.cyclepoints import find_extrema >>> fs = 500 >>> sig = sim_bursty_oscillation(10, fs, freq=10) >>> peaks, troughs = find_extrema(sig, fs, f_range=(8, 12)) >>> pha = extrema_interpolated_phase(sig, peaks, troughs) """ # Initialize phase arrays, one for trough pi and trough -pi sig_len = len(sig) times = np.arange(sig_len) pha_tpi = np.zeros(sig_len) * np.nan pha_tnpi = np.zeros(sig_len) * np.nan # If specified, assign phases to zero-crossings if rises is not None: pha_tpi[rises] = -np.pi / 2 pha_tnpi[rises] = -np.pi / 2 if decays is not None: pha_tpi[decays] = np.pi / 2 pha_tnpi[decays] = np.pi / 2 # Define phases pha_tpi[peaks] = 0 pha_tpi[troughs] = np.pi pha_tnpi[peaks] = 0 pha_tnpi[troughs] = -np.pi # Interpolate to find all phases pha_tpi = np.interp(times, times[~np.isnan(pha_tpi)], pha_tpi[~np.isnan(pha_tpi)]) pha_tnpi = np.interp(times, times[~np.isnan(pha_tnpi)], pha_tnpi[~np.isnan(pha_tnpi)]) pha = _merge_phases(pha_tpi, pha_tnpi) return pha
def _merge_phases(pha_tpi, pha_tnpi): """Helper functions for extrema_interpolated_phase.""" # Create phase differences to determine where phase is decaying/rising in the -pi array diffs = np.diff(pha_tnpi) # Pad the phase difference array with a NaN to maintain a length equal to the timeseries diffs = np.append(diffs, np.nan) # Create new phase series, using trough pi for decaying periods & trough -pi for rising periods pha = np.array([pha_tpi[idx] if diffs[idx] < 0 else pha for idx, pha in enumerate(pha_tnpi)]) # Assign the periods before the first empirical phase timepoint to NaN diffs = np.diff(pha) first_empirical_idx = next(idx for idx, xi in enumerate(diffs) if xi > 0) pha[:first_empirical_idx] = np.nan # Assign the periods after the last empirical phase timepoint to NaN diffs = np.diff(pha) last_empirical_idx = next(idx for idx, xi in enumerate(diffs[::-1]) if xi > 0) pha[-last_empirical_idx + 1:] = np.nan return pha