Source code for ecgdatakit.processing.leads

"""ECG lead derivation utilities.

Derives missing leads from Einthoven's triangle and Goldberger's equations.
Pure numpy — no scipy required.
"""

from __future__ import annotations

import numpy as np

from ecgdatakit.models import Lead, LeadLike
from ecgdatakit.processing._core import ensure_lead, new_lead


def _check_compatible(a: Lead, b: Lead) -> None:
    if a.sampling_rate != b.sampling_rate:
        raise ValueError(
            f"Sample rates must match: {a.label}={a.sampling_rate} Hz, "
            f"{b.label}={b.sampling_rate} Hz"
        )
    if len(a.samples) != len(b.samples):
        raise ValueError(
            f"Sample counts must match: {a.label}={len(a.samples)}, "
            f"{b.label}={len(b.samples)}"
        )


[docs] def derive_lead_iii( lead_i: LeadLike, lead_ii: LeadLike, *, fs: int | None = None, ) -> Lead: """Derive Lead III from Leads I and II (Einthoven's law: III = II - I). Parameters ---------- lead_i : Lead | NDArray[np.float64] Lead I signal. lead_ii : Lead | NDArray[np.float64] Lead II signal. fs : int | None Sample rate in Hz. Required when passing numpy arrays. """ lead_i = ensure_lead(lead_i, fs=fs, label="I") lead_ii = ensure_lead(lead_ii, fs=fs, label="II") _check_compatible(lead_i, lead_ii) samples = (lead_ii.samples - lead_i.samples).astype(np.float64) return new_lead(lead_i, samples=samples, label="III")
[docs] def derive_augmented( lead_i: LeadLike, lead_ii: LeadLike, *, fs: int | None = None, ) -> list[Lead]: """Derive augmented limb leads aVR, aVL, aVF from Leads I and II. Parameters ---------- lead_i : Lead | NDArray[np.float64] Lead I signal. lead_ii : Lead | NDArray[np.float64] Lead II signal. fs : int | None Sample rate in Hz. Required when passing numpy arrays. Returns ------- list[Lead] [aVR, aVL, aVF] in that order. """ lead_i = ensure_lead(lead_i, fs=fs, label="I") lead_ii = ensure_lead(lead_ii, fs=fs, label="II") _check_compatible(lead_i, lead_ii) i, ii = lead_i.samples, lead_ii.samples avr = (-(i + ii) / 2.0).astype(np.float64) avl = (i - ii / 2.0).astype(np.float64) avf = (ii - i / 2.0).astype(np.float64) return [ new_lead(lead_i, samples=avr, label="aVR"), new_lead(lead_i, samples=avl, label="aVL"), new_lead(lead_i, samples=avf, label="aVF"), ]
[docs] def derive_standard_12( lead_i: LeadLike, lead_ii: LeadLike, v1: LeadLike, v2: LeadLike, v3: LeadLike, v4: LeadLike, v5: LeadLike, v6: LeadLike, *, fs: int | None = None, ) -> list[Lead]: """Assemble a full 12-lead ECG, deriving III, aVR, aVL, aVF. Parameters ---------- lead_i, lead_ii : Lead | NDArray[np.float64] Limb leads I and II. v1..v6 : Lead | NDArray[np.float64] Precordial leads. fs : int | None Sample rate in Hz. Required when passing numpy arrays. Returns ------- list[Lead] 12 leads in standard order: I, II, III, aVR, aVL, aVF, V1–V6. """ lead_i = ensure_lead(lead_i, fs=fs, label="I") lead_ii = ensure_lead(lead_ii, fs=fs, label="II") v1 = ensure_lead(v1, fs=fs, label="V1") v2 = ensure_lead(v2, fs=fs, label="V2") v3 = ensure_lead(v3, fs=fs, label="V3") v4 = ensure_lead(v4, fs=fs, label="V4") v5 = ensure_lead(v5, fs=fs, label="V5") v6 = ensure_lead(v6, fs=fs, label="V6") iii = derive_lead_iii(lead_i, lead_ii) avr, avl, avf = derive_augmented(lead_i, lead_ii) return [lead_i, lead_ii, iii, avr, avl, avf, v1, v2, v3, v4, v5, v6]
[docs] def find_lead(leads: list[Lead], label: str) -> Lead | None: """Find a lead by label (case-insensitive). Parameters ---------- leads : list[Lead] List of leads to search. label : str Lead label to find (e.g., "II", "avl", "V1"). """ target = label.lower() for lead in leads: if lead.label.lower() == target: return lead return None