"""Adiabatic/diabatic omega decomposition and divergent wind recovery.
Decomposes total vertical velocity (omega) into adiabatic and diabatic
components:
ω_total = ω_adiabatic + ω_diabatic
ω_adiabatic = QG omega (from solve_qg_omega)
ω_diabatic = ω_total − ω_adiabatic
Each divergent wind component (diabatic, adiabatic, qg-diabatic) is
recovered **independently** via its own Poisson inversion:
∇²χ_X = −∂ω_X/∂p
(u_div_X, v_div_X) = ∇χ_X
where X ∈ {diabatic, adiabatic, qg_diabatic}.
Total-field approximation
~~~~~~~~~~~~~~~~~~~~~~~~~
The QG omega solve and Poisson inversion are performed on **total
fields** (ω, not ω'), exploiting |ω'| >> |ω̄| in midlatitude synoptic
systems. Because the climatological mean vertical velocity is
negligibly small compared with the anomaly, the total-field diabatic
omega closely approximates the anomaly diabatic omega:
ω_diabatic ≈ ω'_diabatic
The same linear approximation propagates through the Poisson inversion
to the horizontal divergent wind:
u_div_diabatic ≈ u'_div_diabatic
"""
from __future__ import annotations
import numpy as np
from .constants import R_EARTH
from .derivatives import ddp
from .helmholtz import gradient, solve_poisson_spherical_fft
[docs]
def solve_chi_from_omega(
omega: np.ndarray,
lat: np.ndarray,
lon: np.ndarray,
plevs_pa: np.ndarray,
method: str = "spectral",
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Solve ∇²χ = -∂ω/∂p at each level.
Computes the velocity potential of the divergent wind associated
with *omega* by solving a Poisson equation on each pressure level.
By default (``method="spectral"``) the solve uses spherical
harmonics via :func:`pvtend.sh_ops.invert_laplacian_sh`, which is
pole-closed and supports auto NH→global parity mirroring (scalar
parity for χ). Pass ``method="fd"`` to fall back to the legacy
conservative-form spherical-FFT Poisson solver.
Args:
omega: Vertical velocity component [Pa/s],
shape ``(nlev, nlat, nlon)``.
lat: Latitude [degrees], ascending, shape ``(nlat,)``.
lon: Longitude [degrees], shape ``(nlon,)``.
plevs_pa: Pressure levels [Pa], ascending, shape ``(nlev,)``.
method: ``"spectral"`` (default) or ``"fd"``.
Returns:
Tuple of ``(chi, u_div, v_div)``, each with
shape ``(nlev, nlat, nlon)``.
Raises:
ValueError: If ``omega`` is not 3-D.
"""
if omega.ndim != 3:
raise ValueError(
f"omega must be 3-D (nlev, nlat, nlon), got {omega.ndim}-D"
)
nlev, _, _ = omega.shape
# ── Compute ∂ω/∂p using centred finite differences ──
domega_dp = ddp(omega, plevs_pa)
# RHS of Poisson equation: -∂ω/∂p
rhs_poisson = -domega_dp
if method in ("spectral", "sh"):
from .sh_ops import invert_laplacian_sh, gradient_sh, _SPHARM_AVAILABLE
if not _SPHARM_AVAILABLE:
import warnings
warnings.warn(
"pyspharm not installed; falling back to FD for "
"solve_chi_from_omega. Install via `pip install pvtend[sh]`.",
RuntimeWarning,
stacklevel=2,
)
return _solve_chi_from_omega_fd(rhs_poisson, lat, lon)
chi_out = np.zeros_like(omega)
u_div_out = np.zeros_like(omega)
v_div_out = np.zeros_like(omega)
for k in range(nlev):
chi_k = invert_laplacian_sh(
rhs_poisson[k], lat, lon, R_earth=R_EARTH, parity="scalar",
)
chi_out[k] = chi_k
dchi_dx, dchi_dy = gradient_sh(chi_k, lat, lon, R_earth=R_EARTH)
u_div_out[k] = dchi_dx
v_div_out[k] = dchi_dy
return chi_out, u_div_out, v_div_out
# ── Legacy FD path ──
return _solve_chi_from_omega_fd(rhs_poisson, lat, lon)
def _solve_chi_from_omega_fd(
rhs_poisson: np.ndarray,
lat: np.ndarray,
lon: np.ndarray,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Legacy spherical-FFT Poisson path (Dirichlet BCs).
Kept for regression testing against the pre-SH baseline.
"""
nlev, nlat, nlon = rhs_poisson.shape
lat_rad = np.deg2rad(lat)
dlat = np.abs(lat[1] - lat[0]) if nlat > 1 else 1.5
dlon = np.abs(lon[1] - lon[0]) if nlon > 1 else 1.5
dy = np.deg2rad(dlat) * R_EARTH
dx_arr = np.deg2rad(dlon) * R_EARTH * np.cos(lat_rad)
dx_arr = np.maximum(dx_arr, dy * 0.1) # guard near poles
dlon_rad = np.deg2rad(dlon)
chi_out = np.zeros_like(rhs_poisson)
u_div_out = np.zeros_like(rhs_poisson)
v_div_out = np.zeros_like(rhs_poisson)
rhs = rhs_poisson.copy()
cos_phi = np.cos(lat_rad)
area_weights = cos_phi / cos_phi.sum()
for k in range(nlev):
weighted_mean = np.sum(area_weights[:, None] * rhs[k]) / nlon
rhs[k] -= weighted_mean
for k in range(nlev):
chi_k = solve_poisson_spherical_fft(
rhs[k], lat, dy, dlon_rad, R_earth=R_EARTH
)
chi_out[k] = chi_k
dchi_dx, dchi_dy = gradient(chi_k, dx_arr, dy)
u_div_out[k] = dchi_dx
v_div_out[k] = dchi_dy
return chi_out, u_div_out, v_div_out
# Backward-compatible alias
solve_chi_moist = solve_chi_from_omega
[docs]
def decompose_omega(
omega_total: np.ndarray,
omega_adiabatic: np.ndarray,
lat: np.ndarray,
lon: np.ndarray,
plevs_pa: np.ndarray,
u_div: np.ndarray | None = None,
v_div: np.ndarray | None = None,
) -> dict[str, np.ndarray]:
"""Full adiabatic/diabatic omega decomposition with divergent wind recovery.
Given total omega and its QG (adiabatic) component, this function:
1. Computes the diabatic residual: ``ω_diabatic = ω_total − ω_adiabatic``.
2. Solves independently for **both** diabatic and adiabatic velocity
potentials via Poisson equation on each pressure level.
3. Recovers the divergent wind for each component as the
gradient of its respective velocity potential.
Args:
omega_total: Total vertical velocity [Pa/s],
shape ``(nlev, nlat, nlon)``.
omega_adiabatic: QG omega (adiabatic component) [Pa/s],
shape ``(nlev, nlat, nlon)``.
lat: Latitude [degrees], ascending, shape ``(nlat,)``.
lon: Longitude [degrees], shape ``(nlon,)``.
plevs_pa: Pressure levels [Pa], ascending, shape ``(nlev,)``.
u_div: Total divergent u-component [m/s] (optional),
shape ``(nlev, nlat, nlon)``. No longer used for adiabatic
residual computation but accepted for API compatibility.
v_div: Total divergent v-component [m/s] (optional),
shape ``(nlev, nlat, nlon)``. No longer used for adiabatic
residual computation but accepted for API compatibility.
Returns:
Dictionary containing:
- ``"omega_diabatic"``: Diabatic omega residual.
- ``"chi_diabatic"``: Diabatic velocity potential.
- ``"u_div_diabatic"``: Diabatic divergent u-component.
- ``"v_div_diabatic"``: Diabatic divergent v-component.
- ``"chi_adiabatic"``: Adiabatic velocity potential.
- ``"u_div_adiabatic"``: Adiabatic divergent u-component.
- ``"v_div_adiabatic"``: Adiabatic divergent v-component.
"""
omega_diabatic = omega_total - omega_adiabatic
chi_m, u_div_m, v_div_m = solve_chi_from_omega(
omega_diabatic, lat, lon, plevs_pa
)
chi_d, u_div_d, v_div_d = solve_chi_from_omega(
omega_adiabatic, lat, lon, plevs_pa
)
return {
"omega_diabatic": omega_diabatic,
"chi_diabatic": chi_m,
"u_div_diabatic": u_div_m,
"v_div_diabatic": v_div_m,
"chi_adiabatic": chi_d,
"u_div_adiabatic": u_div_d,
"v_div_adiabatic": v_div_d,
}
[docs]
def verify_div_additivity(
u_div: np.ndarray,
u_div_adiabatic: np.ndarray,
u_div_diabatic: np.ndarray,
) -> float:
"""Verify additive consistency of divergent wind decomposition.
Checks that the independently Poisson-inverted adiabatic and diabatic
divergent winds sum to the total divergent wind to machine
precision:
max |u_div_adiabatic + u_div_diabatic − u_div|
Args:
u_div: Total divergent component, any shape.
u_div_adiabatic: Adiabatic divergent component, same shape.
u_div_diabatic: Diabatic divergent component, same shape.
Returns:
Maximum absolute error of the additive split.
"""
return float(np.max(np.abs(u_div_adiabatic + u_div_diabatic - u_div)))