Source code for pvtend.isentropic

"""Isentropic interpolation aligned with MetPy's algorithm.

Interpolates 3-D isobaric fields onto isentropic (constant-θ) surfaces
using the method of Ziv & Alpert (1994), as implemented in MetPy v1.7
(`metpy.calc.isentropic_interpolation`).

Algorithm — Pressure solve (Newton-Raphson on Poisson's equation)
-----------------------------------------------------------------
1. Sort pressure levels in **descending** order (surface → top).
2. Compute potential temperature at every grid point:

       θ = T · (P₀ / p)^κ

3. For each target θ*, find the **bounding pressure levels** (above/below).
4. Assume temperature varies **linearly with ln(p)** between the
   bounding levels:

       T(ln p) = a · ln(p) + b

   where  a = (T_above − T_below) / (ln p_above − ln p_below),
          b = T_above − a · ln(p_above).

5. Solve for p* on the θ* surface using **Newton-Raphson** (fixed-point
   iteration on ln(p)):

       θ* = T(ln p) · (P₀ / p)^κ
          = (a · ln p + b) · P₀^κ · exp(−κ · ln p)

       f(ln p)  = θ* − (a · ln p + b) · P₀^κ · exp(−κ · ln p)
       f'(ln p) = P₀^κ · exp(−κ · ln p) · (κ T − a)

       ln p_{n+1} = ln p_n − f / f'

Algorithm — Field interpolation (vectorized linear-in-θ)
--------------------------------------------------------
6. Once p* is known, additional fields φ are **linearly interpolated** vs θ.
   Vectorized implementation (no Python column loops):

   a) Sort each grid column by ascending θ (``np.argsort`` on axis 0).
   b) For each target θ*, find the bracketing index via ``np.searchsorted``
      on the sorted θ columns.
   c) Compute linear weight  w = (θ* − θ_lo) / (θ_hi − θ_lo).
   d) Result = φ_lo + w · (φ_hi − φ_lo).
   e) Mask out-of-range (θ* outside column min/max) → NaN.

Reference
---------
Ziv, B., & Alpert, P. (1994). Isentropic cross-sections across the
Middle East and their relationship to synoptic patterns.
*Beitr. Phys. Atmosph.*, 67, 221–230.

MetPy implementation:
https://github.com/Unidata/MetPy/blob/v1.7.1/src/metpy/calc/thermo.py#L3023-L3190
"""

from __future__ import annotations

import numpy as np
from scipy.optimize import fixed_point

from pvtend.constants import KAPPA, R_DRY, CP_DRY

# Reference surface pressure [hPa] — matches MetPy's P0 = 1000 hPa
_P0_HPA: float = 1000.0


# ── helpers ──────────────────────────────────────────────────────────
def _potential_temperature(pressure_hpa: np.ndarray,
                           temperature_k: np.ndarray) -> np.ndarray:
    """θ = T · (P₀/p)^κ  (all in hPa / K)."""
    return temperature_k * (_P0_HPA / pressure_hpa) ** KAPPA


def _find_bounding_indices(
    theta_sorted: np.ndarray,
    target_levels: np.ndarray,
    vertical_axis: int = 0,
    from_below: bool = True,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Find indices that bracket each target level along the vertical axis.

    Parameters
    ----------
    theta_sorted : (..., nlev, ...) — potential temperature sorted so that
        values increase along *vertical_axis*.
    target_levels : (n_theta,) — 1-D sorted target θ values.
    vertical_axis : int
    from_below : bool — search direction.

    Returns
    -------
    above, below : index arrays (n_theta, *spatial_shape)
    good : bool mask of same shape — True where both bounds are valid.
    """
    nlev = theta_sorted.shape[vertical_axis]
    n_theta = target_levels.size

    # Move vertical to axis-0 for easier indexing
    theta_t = np.moveaxis(theta_sorted, vertical_axis, 0)  # (nlev, ...)
    spatial_shape = theta_t.shape[1:]

    above = np.empty((n_theta, *spatial_shape), dtype=int)
    below = np.empty_like(above)
    good = np.zeros((n_theta, *spatial_shape), dtype=bool)

    for ti, th_target in enumerate(target_levels):
        mask_ge = theta_t >= th_target  # (nlev, ...)
        if from_below:
            idx = np.argmax(mask_ge, axis=0)
        else:
            idx = nlev - 1 - np.argmax(mask_ge[::-1], axis=0)

        any_ge = np.any(mask_ge, axis=0)
        idx_below = idx - 1

        valid = any_ge & (idx_below >= 0)
        above[ti] = idx
        below[ti] = np.clip(idx_below, 0, nlev - 1)
        good[ti] = valid

    return above, below, good


def _isen_iter(iter_log_p, theta_target_nd, ka, a, b, pok):
    """One Newton-Raphson step — identical to MetPy's ``_isen_iter``."""
    exner = pok * np.exp(-ka * iter_log_p)
    t = a * iter_log_p + b
    f = theta_target_nd - t * exner
    fp = exner * (ka * t - a)
    return iter_log_p - (f / fp)


# ── public API ───────────────────────────────────────────────────────
[docs] def isentropic_interpolation_pressure( theta_levels: np.ndarray, pressure_hpa: np.ndarray, temperature_k: np.ndarray, *, vertical_axis: int = 0, max_iters: int = 50, eps: float = 1e-6, bottom_up_search: bool = True, ) -> np.ndarray: """Compute pressure on isentropic surfaces (MetPy algorithm). Parameters ---------- theta_levels : (n_theta,) Target potential-temperature levels [K]. pressure_hpa : (nlev,) or same shape as *temperature_k* Pressure [hPa]. temperature_k : (nlev, ...) — temperature [K] on isobaric levels. vertical_axis : int Axis of *temperature_k* that is the vertical. max_iters, eps : Newton-Raphson controls. bottom_up_search : bool Search direction for bounding indices. Returns ------- isen_prs : (n_theta, ...) — pressure [hPa] on each θ surface. NaN where the θ level is outside the data range. """ temperature_k = np.asarray(temperature_k, dtype=np.float64) pressure_hpa = np.asarray(pressure_hpa, dtype=np.float64) theta_levels = np.asarray(theta_levels, dtype=np.float64) ndim = temperature_k.ndim slices = [np.newaxis] * ndim slices[vertical_axis] = slice(None) slices = tuple(slices) if pressure_hpa.ndim == 1: pressure_hpa = pressure_hpa[slices] pressure_hpa = np.broadcast_to(pressure_hpa, temperature_k.shape).copy() # ── Sort descending pressure (surface first) ── sort_idx = np.argsort(pressure_hpa, axis=vertical_axis) sort_idx = np.flip(sort_idx, axis=vertical_axis) sorter = _broadcast_indices(sort_idx, temperature_k.shape, vertical_axis) levs = pressure_hpa[sorter] tmpk = temperature_k[sorter] # ── Sort target θ levels ascending ── theta_levels = np.sort(theta_levels) n_theta = theta_levels.size # ── Potential temperature on sorted isobaric grid ── pres_theta = _potential_temperature(levs, tmpk) # ── Build broadcast shape for isentropic levels ── shape = list(temperature_k.shape) shape[vertical_axis] = n_theta isentlevs_nd = np.broadcast_to( theta_levels.reshape( [1] * vertical_axis + [n_theta] + [1] * (ndim - vertical_axis - 1) ), shape, ) ka = KAPPA log_p = np.log(levs) pok = _P0_HPA ** ka # ── Bounding indices ── above, below, good = _find_bounding_indices( pres_theta, theta_levels, vertical_axis, from_below=bottom_up_search) # Move vertical axis to 0 for easier advanced indexing log_p_0 = np.moveaxis(log_p, vertical_axis, 0) tmpk_0 = np.moveaxis(tmpk, vertical_axis, 0) # ── Compute coefficients a, b (vectorized) ── log_p_above = _gather(log_p_0, above) log_p_below = _gather(log_p_0, below) t_above = _gather(tmpk_0, above) t_below = _gather(tmpk_0, below) dlog = log_p_above - log_p_below dlog[dlog == 0] = np.nan a = (t_above - t_below) / dlog b = t_above - a * log_p_above # ── First guess: midpoint in log-p space ── isentprs = 0.5 * (log_p_above + log_p_below) # Ignore NaNs in a (they propagate from NaN temperature) good = good & np.isfinite(a) # ── Newton-Raphson via scipy.optimize.fixed_point ── log_p_solved = fixed_point( _isen_iter, isentprs[good], args=(isentlevs_nd.reshape(isentprs.shape)[good], ka, a[good], b[good], pok), xtol=eps, maxiter=max_iters, ) isentprs[good] = np.exp(log_p_solved) isentprs[~good] = np.nan # Mask points beyond the max pressure in the data isentprs[~(good & (isentprs <= np.nanmax(levs) * 1.001))] = np.nan return isentprs # (n_theta, *spatial_shape)
[docs] def isentropic_interpolation( theta_levels: np.ndarray, pressure_hpa: np.ndarray, temperature_k: np.ndarray, *fields: np.ndarray, vertical_axis: int = 0, max_iters: int = 50, eps: float = 1e-6, bottom_up_search: bool = True, ) -> list[np.ndarray]: """Interpolate fields from isobaric to isentropic coordinates. This is a pure-NumPy/SciPy re-implementation of MetPy's ``isentropic_interpolation`` that does **not** require pint units, making it suitable for use inside pvtend's compositing pipeline. Parameters ---------- theta_levels : (n_theta,) — target θ [K]. pressure_hpa : (nlev,) or (nlev, ny, nx) — pressure [hPa]. temperature_k : (nlev, ny, nx) or (nlev, ...) — temperature [K]. *fields : additional arrays of same shape to interpolate. vertical_axis : int — the pressure/vertical axis (default 0). max_iters, eps, bottom_up_search : Newton-Raphson controls. Returns ------- [isen_pressure, *interpolated_fields] — each (n_theta, ...). Pressure in [hPa]; fields in their original units. """ temperature_k = np.asarray(temperature_k, dtype=np.float64) pressure_hpa = np.asarray(pressure_hpa, dtype=np.float64) theta_levels = np.asarray(theta_levels, dtype=np.float64).ravel() theta_levels = np.sort(theta_levels) ndim = temperature_k.ndim slices = [np.newaxis] * ndim slices[vertical_axis] = slice(None) slices = tuple(slices) if pressure_hpa.ndim == 1: pressure_hpa = pressure_hpa[slices] pressure_hpa = np.broadcast_to(pressure_hpa, temperature_k.shape).copy() # ── Sort descending pressure (surface first) ── sort_idx = np.argsort(pressure_hpa, axis=vertical_axis) sort_idx = np.flip(sort_idx, axis=vertical_axis) sorter = _broadcast_indices(sort_idx, temperature_k.shape, vertical_axis) levs = pressure_hpa[sorter] tmpk = temperature_k[sorter] # ── Compute θ on sorted grid ── pres_theta = _potential_temperature(levs, tmpk) # ── Get pressure on isentropic surfaces ── isen_prs = isentropic_interpolation_pressure( theta_levels, levs, tmpk, vertical_axis=vertical_axis, max_iters=max_iters, eps=eps, bottom_up_search=bottom_up_search, ) ret = [isen_prs] # ── Interpolate additional fields linearly vs θ ── if fields: for fld in fields: fld = np.asarray(fld, dtype=np.float64) fld_sorted = fld[sorter] interp_fld = _interp_on_theta( theta_levels, pres_theta, fld_sorted, vertical_axis) ret.append(interp_fld) return ret
# ── Internal utilities ─────────────────────────────────────────────── def _broadcast_indices( indices: np.ndarray, shape: tuple, axis: int, ) -> tuple: """Build advanced-indexing tuple from argsort output (MetPy-style).""" ndim = len(shape) idx = [] for d in range(ndim): if d == axis: idx.append(indices) else: s = [1] * ndim s[d] = shape[d] ar = np.arange(shape[d]).reshape(s) idx.append(np.broadcast_to(ar, shape)) return tuple(idx) def _gather(arr_axis0: np.ndarray, idx_nd: np.ndarray) -> np.ndarray: """Gather from arr_axis0[k, ...] using idx_nd of shape (n_theta, ...). Fully vectorized — no Python loops. Parameters ---------- arr_axis0 : (nlev, *spatial_shape) idx_nd : (n_theta, *spatial_shape) — level indices to gather Returns ------- (n_theta, *spatial_shape) """ spatial_shape = arr_axis0.shape[1:] # Build spatial index arrays that broadcast with idx_nd spatial_indices = np.indices(spatial_shape) # (ndim_spatial, *spatial_shape) # Expand each to (1, *spatial_shape) then broadcast with idx_nd's leading dim expanded = tuple( np.broadcast_to(si[np.newaxis], idx_nd.shape) for si in spatial_indices ) return arr_axis0[(idx_nd, *expanded)] def _interp_on_theta( theta_levels: np.ndarray, pres_theta: np.ndarray, field: np.ndarray, vertical_axis: int = 0, ) -> np.ndarray: """Linearly interpolate *field* onto isentropic levels using θ as coordinate. **Vectorized** — no Python column loops. Sort each column by ascending θ, then use fancy indexing to find the bracketing pair and compute the linear weight. Parameters ---------- theta_levels : (n_theta,) — sorted ascending target θ values. pres_theta : (nlev, ...) — θ on isobaric grid. field : (nlev, ...) — variable to interpolate. vertical_axis : int Returns ------- (n_theta, ...) interpolated array (NaN where out-of-range). """ # Move vertical to axis-0 pt = np.moveaxis(pres_theta, vertical_axis, 0) # (nlev, ...) fld = np.moveaxis(field, vertical_axis, 0) # (nlev, ...) nlev = pt.shape[0] spatial_shape = pt.shape[1:] n_theta = theta_levels.size # ── Sort each column by ascending θ ── sort_idx = np.argsort(pt, axis=0) # (nlev, ...) theta_sorted = np.take_along_axis(pt, sort_idx, axis=0) field_sorted = np.take_along_axis(fld, sort_idx, axis=0) # ── Mask invalid (NaN) values: set to inf / -inf so they don't bracket ── nan_mask = ~(np.isfinite(theta_sorted) & np.isfinite(field_sorted)) theta_clean = theta_sorted.copy() theta_clean[nan_mask] = np.inf # push NaNs to "top" # ── Compute min/max valid θ per column ── # Replace NaN with inf/−inf so nanmin/nanmax work even if all-NaN th_for_min = np.where(nan_mask, np.inf, theta_sorted) th_for_max = np.where(nan_mask, -np.inf, theta_sorted) col_min = np.min(th_for_min, axis=0) # (...) col_max = np.max(th_for_max, axis=0) # (...) out = np.full((n_theta, *spatial_shape), np.nan, dtype=np.float64) # Spatial index arrays for fancy indexing into (nlev, *spatial_shape) spatial_indices = np.indices(spatial_shape) # (ndim_spatial, *spatial_shape) for ti, th_target in enumerate(theta_levels): # Number of levels below target (sorted ascending) n_below = np.sum(theta_clean <= th_target, axis=0) # (...) idx_below = np.clip(n_below - 1, 0, nlev - 2) idx_above = idx_below + 1 # Build fancy-index tuples idx_lo = (idx_below, *spatial_indices) idx_hi = (idx_above, *spatial_indices) th_lo = theta_sorted[idx_lo] th_hi = theta_sorted[idx_hi] f_lo = field_sorted[idx_lo] f_hi = field_sorted[idx_hi] # Linear interpolation weight dth = th_hi - th_lo dth = np.where(dth == 0, np.nan, dth) w = (th_target - th_lo) / dth result = f_lo + w * (f_hi - f_lo) # Mask out-of-range and invalid in_range = (th_target >= col_min) & (th_target <= col_max) valid = in_range & np.isfinite(th_lo) & np.isfinite(th_hi) out[ti] = np.where(valid, result, np.nan) return out # ── Convenience wrappers for composite NPZ events ───────────────────
[docs] def interp_event_fields_to_theta( theta_3d: np.ndarray, pressure_hpa_1d: np.ndarray, field_3d: np.ndarray, theta_levels: np.ndarray, ) -> np.ndarray: """Interpolate one 3-D isobaric field to multiple θ surfaces (vectorized). This is the convenience wrapper for the composite-event use case (shape: ``(nlev, ny, nx)``). Parameters ---------- theta_3d : (nlev, ny, nx) — potential temperature [K] pressure_hpa_1d : (nlev,) — isobaric levels [hPa] (unused in θ-path but kept for API compatibility with the full MetPy path). field_3d : (nlev, ny, nx) — field to interpolate theta_levels : (n_theta,) — target θ [K] Returns ------- (n_theta, ny, nx) — field on isentropic surfaces. """ theta_levels = np.sort( np.asarray(theta_levels, dtype=np.float64).ravel() ) return _interp_on_theta(theta_levels, theta_3d, field_3d, vertical_axis=0)
[docs] def interp_event_field_to_single_theta( theta_3d: np.ndarray, field_3d: np.ndarray, theta_target: float, ) -> np.ndarray: """Interpolate one 3-D field onto a single θ surface → (ny, nx). Vectorized linear-in-θ interpolation. Equivalent to one slice of :func:`interp_event_fields_to_theta`. """ result = _interp_on_theta( np.array([float(theta_target)]), theta_3d, field_3d, vertical_axis=0, ) return result[0] # squeeze the θ dimension