"""ECG signal filtering utilities.
All filters use SOS (second-order sections) representation with zero-phase
``sosfiltfilt`` to preserve ECG morphology (no phase distortion).
"""
from __future__ import annotations
import numpy as np
from numpy.typing import NDArray
from ecgdatakit.models import Lead, LeadLike
from ecgdatakit.processing._core import ensure_lead, new_lead, require_scipy
def _validate_nyquist(cutoff: float, fs: int, label: str = "cutoff") -> None:
nyquist = fs / 2.0
if cutoff >= nyquist:
raise ValueError(
f"{label} ({cutoff} Hz) must be less than Nyquist frequency ({nyquist} Hz)"
)
if cutoff <= 0:
raise ValueError(f"{label} must be positive, got {cutoff}")
[docs]
def lowpass(lead: LeadLike, cutoff: float, order: int = 4, *, fs: int | None = None) -> Lead:
"""Apply a Butterworth low-pass filter.
Parameters
----------
lead : Lead | NDArray[np.float64]
Input ECG lead or raw signal array.
cutoff : float
Cutoff frequency in Hz.
order : int
Filter order (default 4).
fs : int | None
Sample rate in Hz. Required when *lead* is a numpy array.
"""
lead = ensure_lead(lead, fs=fs)
sig = require_scipy("signal")
_validate_nyquist(cutoff, lead.sampling_rate, "cutoff")
sos = sig.butter(order, cutoff, btype="low", fs=lead.sampling_rate, output="sos")
filtered = sig.sosfiltfilt(sos, lead.samples).astype(np.float64)
return new_lead(lead, samples=filtered)
[docs]
def highpass(lead: LeadLike, cutoff: float, order: int = 4, *, fs: int | None = None) -> Lead:
"""Apply a Butterworth high-pass filter.
Parameters
----------
lead : Lead | NDArray[np.float64]
Input ECG lead or raw signal array.
cutoff : float
Cutoff frequency in Hz.
order : int
Filter order (default 4).
fs : int | None
Sample rate in Hz. Required when *lead* is a numpy array.
"""
lead = ensure_lead(lead, fs=fs)
sig = require_scipy("signal")
_validate_nyquist(cutoff, lead.sampling_rate, "cutoff")
sos = sig.butter(order, cutoff, btype="high", fs=lead.sampling_rate, output="sos")
filtered = sig.sosfiltfilt(sos, lead.samples).astype(np.float64)
return new_lead(lead, samples=filtered)
[docs]
def bandpass(lead: LeadLike, low: float, high: float, order: int = 4, *, fs: int | None = None) -> Lead:
"""Apply a Butterworth band-pass filter.
Parameters
----------
lead : Lead | NDArray[np.float64]
Input ECG lead or raw signal array.
low : float
Lower cutoff frequency in Hz.
high : float
Upper cutoff frequency in Hz.
order : int
Filter order (default 4).
fs : int | None
Sample rate in Hz. Required when *lead* is a numpy array.
"""
lead = ensure_lead(lead, fs=fs)
sig = require_scipy("signal")
_validate_nyquist(high, lead.sampling_rate, "high")
if low <= 0:
raise ValueError(f"low must be positive, got {low}")
if low >= high:
raise ValueError(f"low ({low}) must be less than high ({high})")
sos = sig.butter(order, [low, high], btype="band", fs=lead.sampling_rate, output="sos")
filtered = sig.sosfiltfilt(sos, lead.samples).astype(np.float64)
return new_lead(lead, samples=filtered)
[docs]
def notch(lead: LeadLike, freq: float = 50.0, quality: float = 30.0, *, fs: int | None = None) -> Lead:
"""Apply an IIR notch (band-stop) filter.
Parameters
----------
lead : Lead | NDArray[np.float64]
Input ECG lead or raw signal array.
freq : float
Center frequency to remove (default 50 Hz for mains hum).
quality : float
Quality factor — higher means narrower notch (default 30).
fs : int | None
Sample rate in Hz. Required when *lead* is a numpy array.
"""
lead = ensure_lead(lead, fs=fs)
sig = require_scipy("signal")
_validate_nyquist(freq, lead.sampling_rate, "freq")
b, a = sig.iirnotch(freq, quality, fs=lead.sampling_rate)
sos = sig.tf2sos(b, a)
filtered = sig.sosfiltfilt(sos, lead.samples).astype(np.float64)
return new_lead(lead, samples=filtered)
[docs]
def remove_baseline(lead: LeadLike, cutoff: float = 0.5, order: int = 2, *, fs: int | None = None) -> Lead:
"""Remove baseline wander using a high-pass filter.
Parameters
----------
lead : Lead | NDArray[np.float64]
Input ECG lead or raw signal array.
cutoff : float
Cutoff frequency in Hz (default 0.5 Hz).
order : int
Filter order (default 2).
fs : int | None
Sample rate in Hz. Required when *lead* is a numpy array.
"""
lead = ensure_lead(lead, fs=fs)
return highpass(lead, cutoff=cutoff, order=order)
[docs]
def diagnostic_filter(lead: LeadLike, notch_freq: float = 50.0, *, fs: int | None = None) -> Lead:
"""Apply AHA diagnostic-grade filtering: 0.05–150 Hz bandpass + notch.
Suitable for diagnostic ECG interpretation where full morphology
(including ST segment) must be preserved.
Parameters
----------
lead : Lead | NDArray[np.float64]
Input ECG lead or raw signal array.
notch_freq : float
Power-line frequency to notch out (50 or 60 Hz).
fs : int | None
Sample rate in Hz. Required when *lead* is a numpy array.
"""
lead = ensure_lead(lead, fs=fs)
result = bandpass(lead, low=0.05, high=150.0, order=4)
result = notch(result, freq=notch_freq)
return result
[docs]
def monitoring_filter(lead: LeadLike, notch_freq: float = 50.0, *, fs: int | None = None) -> Lead:
"""Apply monitoring-grade filtering: 0.67–40 Hz bandpass + notch.
Suitable for arrhythmia monitoring where baseline stability
is more important than preserving fine morphology.
Parameters
----------
lead : Lead | NDArray[np.float64]
Input ECG lead or raw signal array.
notch_freq : float
Power-line frequency to notch out (50 or 60 Hz).
fs : int | None
Sample rate in Hz. Required when *lead* is a numpy array.
"""
lead = ensure_lead(lead, fs=fs)
result = bandpass(lead, low=0.67, high=40.0, order=4)
result = notch(result, freq=notch_freq)
return result