Source code for ecgdatakit.processing.transforms

"""ECG signal transforms and beat segmentation."""

from __future__ import annotations

import numpy as np
from numpy.typing import NDArray

from ecgdatakit.models import Lead, LeadLike
from ecgdatakit.processing._core import ensure_lead, new_lead, require_scipy
from ecgdatakit.processing.peaks import detect_r_peaks


[docs] def power_spectrum( lead: LeadLike, method: str = "welch", nperseg: int | None = None, *, fs: int | None = None, ) -> tuple[NDArray[np.float64], NDArray[np.float64]]: """Compute the power spectral density of an ECG lead. Parameters ---------- lead : Lead | NDArray[np.float64] Input ECG lead or raw signal array. method : str ``"welch"`` (default) for Welch's method. nperseg : int | None Segment length for Welch's method. Defaults to ``min(256, len(samples))``. fs : int | None Sample rate in Hz. Required when *lead* is a numpy array. Returns ------- tuple[NDArray, NDArray] ``(frequencies, power)`` arrays. """ lead = ensure_lead(lead, fs=fs) sig = require_scipy("signal") if nperseg is None: nperseg = min(256, len(lead.samples)) freqs, psd = sig.welch( lead.samples, fs=lead.sampling_rate, nperseg=nperseg ) return freqs.astype(np.float64), psd.astype(np.float64)
[docs] def fft(lead: LeadLike, *, fs: int | None = None) -> tuple[NDArray[np.float64], NDArray[np.float64]]: """Compute the single-sided FFT magnitude spectrum. Parameters ---------- lead : Lead | NDArray[np.float64] Input ECG lead or raw signal array. fs : int | None Sample rate in Hz. Required when *lead* is a numpy array. Returns ------- tuple[NDArray, NDArray] ``(frequencies, magnitudes)`` arrays (positive frequencies only). """ lead = ensure_lead(lead, fs=fs) n = len(lead.samples) yf = np.fft.rfft(lead.samples) xf = np.fft.rfftfreq(n, d=1.0 / lead.sampling_rate) magnitudes = (2.0 / n) * np.abs(yf) return xf.astype(np.float64), magnitudes.astype(np.float64)
[docs] def segment_beats( lead: LeadLike, peaks: NDArray[np.intp] | None = None, before: float = 0.2, after: float = 0.4, *, fs: int | None = None, ) -> list[Lead]: """Segment individual heartbeats around R-peaks. Parameters ---------- lead : Lead | NDArray[np.float64] Input ECG lead or raw signal array. peaks : NDArray | None R-peak indices. Detected automatically if ``None``. before : float Seconds before R-peak to include (default 0.2). after : float Seconds after R-peak to include (default 0.4). fs : int | None Sample rate in Hz. Required when *lead* is a numpy array. Returns ------- list[Lead] One Lead per beat, labelled ``"{label}_beat_{i}"``. """ lead = ensure_lead(lead, fs=fs) if peaks is None: peaks = detect_r_peaks(lead) pre = int(round(before * lead.sampling_rate)) post = int(round(after * lead.sampling_rate)) n = len(lead.samples) beats: list[Lead] = [] for idx, p in enumerate(peaks): lo = p - pre hi = p + post if lo < 0 or hi > n: continue segment = lead.samples[lo:hi].copy().astype(np.float64) beats.append( new_lead(lead, samples=segment, label=f"{lead.label}_beat_{idx}") ) return beats
[docs] def average_beat( lead: LeadLike, peaks: NDArray[np.intp] | None = None, before: float = 0.2, after: float = 0.4, *, fs: int | None = None, ) -> Lead: """Compute the ensemble-averaged heartbeat (template). Parameters ---------- lead : Lead | NDArray[np.float64] Input ECG lead or raw signal array. peaks : NDArray | None R-peak indices. Detected automatically if ``None``. before : float Seconds before R-peak (default 0.2). after : float Seconds after R-peak (default 0.4). fs : int | None Sample rate in Hz. Required when *lead* is a numpy array. Returns ------- Lead Averaged beat labelled ``"{label}_avg"``. """ lead = ensure_lead(lead, fs=fs) beats = segment_beats(lead, peaks, before, after) if not beats: pre = int(round(before * lead.sampling_rate)) post = int(round(after * lead.sampling_rate)) return new_lead( lead, samples=np.zeros(pre + post, dtype=np.float64), label=f"{lead.label}_avg", ) stacked = np.stack([b.samples for b in beats], axis=0) avg = stacked.mean(axis=0).astype(np.float64) return new_lead(lead, samples=avg, label=f"{lead.label}_avg")