"""Spatial and temporal derivative operators on lat-lon-pressure grids.
All operators respect:
- Periodic zonal boundary conditions (globe wraps in longitude)
- One-sided differences at polar boundaries (meridional)
- Non-uniform pressure level spacing (pressure derivatives)
Two interfaces are provided:
1. NumPy functions for raw arrays: ``ddx``, ``ddy``, ``ddp``, ``ddt``
2. xarray wrappers: ``ddx_da``, ``ddy_da``, ``ddp_da``, ``ddt_da``
"""
from __future__ import annotations
import numpy as np
import xarray as xr
from .constants import R_EARTH
# ═══════════════════════════════════════════════════════════════
# NumPy array operators
# ═══════════════════════════════════════════════════════════════
[docs]
def ddx(
field: np.ndarray,
dx_arr: np.ndarray,
periodic: bool = True,
) -> np.ndarray:
"""Zonal derivative ∂f/∂x with periodic wrapping.
Uses centred finite differences in the interior. At the zonal
boundaries the field wraps around the globe when *periodic* is
True (default); otherwise one-sided differences are applied.
Args:
field: 2-D array ``(nlat, nlon)`` or 3-D ``(nlev, nlat, nlon)``.
dx_arr: Zonal grid spacing per latitude [m], shape ``(nlat,)``.
periodic: If True, wrap zonally (col 0 ↔ col -1).
Returns:
Same shape as *field*, in units of ``[field_units / m]``.
"""
if field.ndim == 3:
return np.stack(
[ddx(field[k], dx_arr, periodic) for k in range(field.shape[0])]
)
nlat, nlon = field.shape
out = np.empty_like(field)
for j in range(nlat):
# Interior: centred differences
out[j, 1:-1] = (field[j, 2:] - field[j, :-2]) / (2 * dx_arr[j])
if periodic:
out[j, 0] = (field[j, 1] - field[j, -1]) / (2 * dx_arr[j])
out[j, -1] = (field[j, 0] - field[j, -2]) / (2 * dx_arr[j])
else:
out[j, 0] = (field[j, 1] - field[j, 0]) / dx_arr[j]
out[j, -1] = (field[j, -1] - field[j, -2]) / dx_arr[j]
return out
[docs]
def ddy(field: np.ndarray, dy: float) -> np.ndarray:
"""Meridional derivative ∂f/∂y with one-sided differences at boundaries.
Uses centred differences in the interior and forward/backward
differences at the first/last latitude row.
Args:
field: 2-D ``(nlat, nlon)`` or 3-D ``(nlev, nlat, nlon)``.
dy: Meridional grid spacing [m].
Returns:
Same shape as *field*, in ``[field_units / m]``.
"""
if field.ndim == 3:
return np.stack(
[ddy(field[k], dy) for k in range(field.shape[0])]
)
out = np.empty_like(field)
out[1:-1] = (field[2:] - field[:-2]) / (2 * dy)
out[0] = (field[1] - field[0]) / dy
out[-1] = (field[-1] - field[-2]) / dy
return out
[docs]
def ddp(field: np.ndarray, plevs_pa: np.ndarray) -> np.ndarray:
"""Pressure derivative ∂f/∂p for non-uniform levels.
Uses a three-point Lagrange stencil in the interior (second-order
accurate on non-uniform grids, matching LOG20 ``d_dp.m``) and
one-sided differences at the top/bottom pressure boundaries.
Args:
field: Array with pressure as axis 0, shape ``(nlev, ...)``.
plevs_pa: Pressure levels in Pa, shape ``(nlev,)``.
Returns:
Same shape as *field*, in ``[field_units / Pa]``.
"""
nlev = field.shape[0]
out = np.zeros_like(field)
# Interior: 3-point Lagrange stencil (second-order on non-uniform grids)
for k in range(1, nlev - 1):
h_m = plevs_pa[k] - plevs_pa[k - 1] # h_minus
h_p = plevs_pa[k + 1] - plevs_pa[k] # h_plus
out[k] = (
-h_p / (h_m * (h_m + h_p)) * field[k - 1]
+ (h_p - h_m) / (h_m * h_p) * field[k]
+ h_m / (h_p * (h_m + h_p)) * field[k + 1]
)
# Boundaries: one-sided differences
dp0 = plevs_pa[1] - plevs_pa[0]
dpn = plevs_pa[-1] - plevs_pa[-2]
out[0] = (field[1] - field[0]) / dp0
out[-1] = (field[-1] - field[-2]) / dpn
return out
[docs]
def ddt(field: np.ndarray, dt_s: float) -> np.ndarray:
"""Time derivative ∂f/∂t via centred differences.
Uses centred differences in the interior (second-order accurate)
and second-order one-sided stencils at the first and last time
steps when nt >= 3, matching ``numpy.gradient`` behaviour.
Falls back to first-order forward/backward when nt == 2.
Args:
field: Array with time as axis 0, shape ``(nt, ...)``.
dt_s: Time step in seconds (assumed uniform).
Returns:
Same shape as *field*, in ``[field_units / s]``.
"""
nt = field.shape[0]
out = np.zeros_like(field)
if nt >= 3:
# Interior: centred differences — O(Δt²)
out[1:-1] = (field[2:] - field[:-2]) / (2 * dt_s)
# Boundaries: 2nd-order one-sided stencils
out[0] = (-3 * field[0] + 4 * field[1] - field[2]) / (2 * dt_s)
out[-1] = (3 * field[-1] - 4 * field[-2] + field[-3]) / (2 * dt_s)
elif nt == 2:
# Only two points — first-order is the best we can do
out[0] = (field[1] - field[0]) / dt_s
out[-1] = (field[-1] - field[-2]) / dt_s
return out
def gradient_periodic(
phi: np.ndarray,
dx_arr: np.ndarray,
dy: float,
) -> tuple[np.ndarray, np.ndarray]:
"""Horizontal gradient with periodic zonal wrapping.
Convenience wrapper combining :func:`ddx` (periodic) and :func:`ddy`.
Args:
phi: 2-D field ``(nlat, nlon)``.
dx_arr: Zonal spacing per latitude [m], shape ``(nlat,)``.
dy: Meridional spacing [m].
Returns:
Tuple ``(dphi_dx, dphi_dy)``, each ``(nlat, nlon)``.
"""
return ddx(phi, dx_arr, periodic=True), ddy(phi, dy)
# ═══════════════════════════════════════════════════════════════
# Grid-spacing helpers
# ═══════════════════════════════════════════════════════════════
def compute_dx_arr(
grid_spacing_deg: float,
center_lat: float,
y_rel: np.ndarray,
) -> np.ndarray:
"""Build a cos(lat)-corrected zonal grid-spacing array.
Args:
grid_spacing_deg: Zonal grid spacing in degrees.
center_lat: Latitude of the patch centre (degrees N).
y_rel: 1-D array of relative y-coordinates (degrees),
running from south to north within the patch.
Returns:
1-D array of zonal grid spacing in metres, shape ``(len(y_rel),)``.
"""
actual_lats = float(center_lat) + np.asarray(y_rel, dtype=float)
return np.abs(np.deg2rad(grid_spacing_deg)) * R_EARTH * np.cos(
np.deg2rad(actual_lats)
)
def compute_dy(grid_spacing_deg: float) -> float:
"""Meridional grid spacing in metres (latitude-independent).
Args:
grid_spacing_deg: Meridional grid spacing in degrees.
Returns:
Grid spacing in metres.
"""
return np.abs(np.deg2rad(grid_spacing_deg)) * R_EARTH
# ═══════════════════════════════════════════════════════════════
# Second-order derivative operators
# ═══════════════════════════════════════════════════════════════
def ddx_dx(
field: np.ndarray,
dx_arr: np.ndarray,
periodic: bool = True,
) -> np.ndarray:
r"""Second zonal derivative :math:`\partial^2 f / \partial x^2`.
Computed by applying :func:`ddx` twice.
Args:
field: 2-D ``(nlat, nlon)`` or 3-D ``(nlev, nlat, nlon)``.
dx_arr: Zonal spacing per latitude [m], shape ``(nlat,)``.
periodic: Wrap zonally.
Returns:
Same shape as *field*, in ``[field_units / m²]``.
"""
return ddx(ddx(field, dx_arr, periodic), dx_arr, periodic)
def ddy_dy(field: np.ndarray, dy: float) -> np.ndarray:
r"""Second meridional derivative :math:`\partial^2 f / \partial y^2`.
Computed by applying :func:`ddy` twice.
Args:
field: 2-D ``(nlat, nlon)`` or 3-D ``(nlev, nlat, nlon)``.
dy: Meridional spacing [m].
Returns:
Same shape as *field*, in ``[field_units / m²]``.
"""
return ddy(ddy(field, dy), dy)
def ddx_dy(
field: np.ndarray,
dx_arr: np.ndarray,
dy: float,
periodic: bool = True,
) -> np.ndarray:
r"""Mixed second derivative :math:`\partial^2 f / \partial x \partial y`.
Computed as ``ddy(ddx(field))``.
Args:
field: 2-D ``(nlat, nlon)`` or 3-D ``(nlev, nlat, nlon)``.
dx_arr: Zonal spacing per latitude [m], shape ``(nlat,)``.
dy: Meridional spacing [m].
periodic: Wrap zonally for the :func:`ddx` step.
Returns:
Same shape as *field*, in ``[field_units / m²]``.
"""
return ddy(ddx(field, dx_arr, periodic), dy)
def second_derivs(
field: np.ndarray,
lat: np.ndarray,
lon: np.ndarray,
method: str = "spectral",
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
r"""Second-order horizontal derivatives ``(f_xx, f_xy, f_yy)``.
Default backend is spectral (spherical harmonics via
:func:`pvtend.sh_ops.second_derivs_sh`), which closes cleanly at the
poles and auto-mirrors NH-only inputs with scalar parity. The
``method="fd"`` path falls back to repeated finite-difference
application of :func:`ddx` / :func:`ddy` on a uniform lat-lon grid.
The output triple is what downstream basis projections (six-basis
decomposition, gamma/tilt diagnostics) consume, and is now stored
alongside the primary diagnostic in event NPZs.
Args:
field: Scalar field, shape ``(nlat, nlon)`` or
``(nlev, nlat, nlon)``.
lat: Latitudes in degrees, ascending.
lon: Longitudes in degrees in [0, 360).
method: ``"spectral"`` (default) or ``"fd"``.
Returns:
Tuple ``(f_xx, f_xy, f_yy)`` with the same shape as *field*.
"""
if method in ("spectral", "sh"):
from .sh_ops import second_derivs_sh, _SPHARM_AVAILABLE
if not _SPHARM_AVAILABLE:
import warnings
warnings.warn(
"pyspharm not installed; falling back to method='fd' for "
"second_derivs. Install via `pip install pvtend[sh]`.",
RuntimeWarning,
stacklevel=2,
)
method = "fd"
else:
if field.ndim == 3:
fxx = np.empty_like(field, dtype=np.float64)
fxy = np.empty_like(field, dtype=np.float64)
fyy = np.empty_like(field, dtype=np.float64)
for k in range(field.shape[0]):
fxx[k], fxy[k], fyy[k] = second_derivs_sh(field[k], lat, lon)
return fxx, fxy, fyy
return second_derivs_sh(field, lat, lon)
if method != "fd":
raise ValueError(f"Unknown method {method!r}; use 'spectral' or 'fd'.")
# ── FD fallback ──
lat = np.asarray(lat, dtype=float).ravel()
lon = np.asarray(lon, dtype=float).ravel()
dlat = np.abs(lat[1] - lat[0]) if lat.size > 1 else 1.5
dlon = np.abs(lon[1] - lon[0]) if lon.size > 1 else 1.5
dy = np.deg2rad(dlat) * R_EARTH
dx_arr = np.abs(np.deg2rad(dlon)) * R_EARTH * np.cos(np.deg2rad(lat))
dx_arr = np.maximum(dx_arr, dy * 0.1)
fxx = ddx_dx(field, dx_arr)
fxy = ddx_dy(field, dx_arr, dy)
fyy = ddy_dy(field, dy)
return fxx, fxy, fyy
# ═══════════════════════════════════════════════════════════════
# Angular-coordinate derivative operators (matching LOG20)
# ═══════════════════════════════════════════════════════════════
[docs]
def d_dlambda(
field: np.ndarray,
dlambda: float,
periodic: bool = True,
) -> np.ndarray:
"""Zonal derivative in angular coordinates ∂f/∂λ.
Matches LOG20 ``d_dlambda.m``: centred differences in the interior,
3-point one-sided stencils at boundaries (non-periodic), or periodic
wrapping.
Args:
field: 2-D ``(nlat, nlon)`` or 3-D ``(nlev, nlat, nlon)``.
dlambda: Zonal grid spacing in **radians**.
periodic: Wrap zonally (default True for full-globe grids).
Returns:
Same shape, in ``[field_units / radian]``.
"""
if field.ndim == 3:
return np.stack(
[d_dlambda(field[k], dlambda, periodic) for k in range(field.shape[0])]
)
nlat, nlon = field.shape
out = np.empty_like(field)
# Interior: centred differences along lon (axis 1)
out[:, 1:-1] = (field[:, 2:] - field[:, :-2]) / (2 * dlambda)
if periodic:
out[:, 0] = (field[:, 1] - field[:, -1]) / (2 * dlambda)
out[:, -1] = (field[:, 0] - field[:, -2]) / (2 * dlambda)
else:
# 3-point one-sided (LOG20 d_dlambda.m)
out[:, 0] = (
-1.5 * field[:, 0] + 2.0 * field[:, 1] - 0.5 * field[:, 2]
) / dlambda
out[:, -1] = (
1.5 * field[:, -1] - 2.0 * field[:, -2] + 0.5 * field[:, -3]
) / dlambda
return out
[docs]
def d_dphi(
field: np.ndarray,
dphi: float,
) -> np.ndarray:
"""Meridional derivative in angular coordinates ∂f/∂φ.
Matches LOG20 ``d_dphi.m``: centred differences in the interior,
3-point one-sided stencils at the polar boundaries.
Args:
field: 2-D ``(nlat, nlon)`` or 3-D ``(nlev, nlat, nlon)``.
dphi: Meridional grid spacing in **radians**.
Returns:
Same shape, in ``[field_units / radian]``.
"""
if field.ndim == 3:
return np.stack(
[d_dphi(field[k], dphi) for k in range(field.shape[0])]
)
out = np.empty_like(field)
# Interior: centred differences along lat (axis 0)
out[1:-1, :] = (field[2:, :] - field[:-2, :]) / (2 * dphi)
# Boundaries: 3-point one-sided (LOG20 d_dphi.m)
out[0, :] = (
-1.5 * field[0, :] + 2.0 * field[1, :] - 0.5 * field[2, :]
) / dphi
out[-1, :] = (
1.5 * field[-1, :] - 2.0 * field[-2, :] + 0.5 * field[-3, :]
) / dphi
return out
[docs]
def div_spherical(
Ax: np.ndarray,
Ay: np.ndarray,
lat_rad: np.ndarray,
dphi: float,
dlambda: float,
periodic: bool = True,
) -> np.ndarray:
"""Spherical divergence of a 2-D vector field (Aλ, Aφ).
Matches LOG20 ``div.m``:
.. math::
\\nabla\\cdot\\mathbf{A} =
\\frac{1}{a\\cos\\varphi}\\frac{\\partial A_\\lambda}{\\partial\\lambda}
+ \\frac{1}{a\\cos\\varphi}
\\frac{\\partial(A_\\varphi\\cos\\varphi)}{\\partial\\varphi}
Args:
Ax: Zonal component, 2-D ``(nlat, nlon)``.
Ay: Meridional component, 2-D ``(nlat, nlon)``.
lat_rad: Latitude in radians, shape ``(nlat,)``.
dphi: Meridional spacing [rad].
dlambda: Zonal spacing [rad].
periodic: Periodic zonal boundary.
Returns:
Scalar divergence field, ``(nlat, nlon)``.
"""
cos_phi = np.cos(lat_rad)[:, None] # (nlat, 1)
inv_r_cos = 1.0 / (R_EARTH * cos_phi) # (nlat, 1)
dAx_dlam = d_dlambda(Ax, dlambda, periodic)
dAy_cos_dphi = d_dphi(Ay * cos_phi, dphi)
return inv_r_cos * dAx_dlam + inv_r_cos * dAy_cos_dphi
# ═══════════════════════════════════════════════════════════════
# xarray DataArray wrappers
# ═══════════════════════════════════════════════════════════════
def ddx_da(da: xr.DataArray, lon_name: str = "longitude") -> xr.DataArray:
"""Periodic zonal derivative via ``DataArray.roll()``.
Uses centred differences with periodic roll so that the first and
last longitude columns reference each other.
Args:
da: xarray DataArray with a longitude dimension.
lon_name: Name of the longitude dimension.
Returns:
``d(da)/dx`` in SI units ``[field / m]``.
"""
lon_vals = da[lon_name].values.astype(float)
dlon_deg = float(np.nanmean(np.diff(lon_vals)))
# Compute dx from latitude
lat_name = _find_lat_dim(da)
lat_rad = np.deg2rad(da[lat_name].values)
dx_m = np.abs(np.deg2rad(dlon_deg)) * R_EARTH * np.cos(lat_rad)
# Broadcast dx_m to match da's dims
dx_da = xr.DataArray(dx_m, dims=[lat_name], coords={lat_name: da[lat_name]})
return (
da.roll({lon_name: -1}, roll_coords=False)
- da.roll({lon_name: 1}, roll_coords=False)
) / (2.0 * dx_da)
def ddy_da(da: xr.DataArray, lat_name: str = "latitude") -> xr.DataArray:
"""Meridional derivative ``d()/dy`` in SI units.
Uses ``xr.DataArray.differentiate`` along latitude, then converts
from degrees to metres.
Args:
da: xarray DataArray with a latitude dimension.
lat_name: Name of the latitude dimension.
Returns:
``d(da)/dy`` in ``[field / m]``.
"""
dy_m = np.deg2rad(1.0) * R_EARTH # metres per degree latitude
return da.differentiate(lat_name) / dy_m
def ddp_da(da: xr.DataArray, p_name: str = "pressure_level") -> xr.DataArray:
"""Pressure derivative ``d()/dp`` where coordinate is in hPa.
Uses ``xr.DataArray.differentiate`` along the pressure coordinate,
then converts from hPa to Pa (factor 100).
Args:
da: xarray DataArray with a pressure level dimension.
p_name: Name of the pressure dimension.
Returns:
``d(da)/dp`` in ``[field / Pa]``.
"""
return da.differentiate(p_name) / 100.0 # hPa → Pa
def ddt_da(da: xr.DataArray, t_name: str = "valid_time") -> xr.DataArray:
"""Time derivative ``d()/dt`` in s⁻¹.
Uses ``xr.DataArray.differentiate`` with ``datetime_unit='s'``.
Args:
da: xarray DataArray with a time dimension.
t_name: Name of the time dimension.
Returns:
``d(da)/dt`` in ``[field / s]``.
"""
return da.differentiate(coord=t_name, datetime_unit="s")
# ═══════════════════════════════════════════════════════════════
# Helpers
# ═══════════════════════════════════════════════════════════════
def _find_lat_dim(da: xr.DataArray) -> str:
"""Auto-detect latitude dimension name.
Args:
da: xarray DataArray to inspect.
Returns:
The name of the latitude dimension.
Raises:
ValueError: If no dimension containing ``'lat'`` is found.
"""
for name in da.dims:
if "lat" in name.lower():
return name
raise ValueError(f"No latitude dimension found in {da.dims}")