Source code for pvtend.derivatives

"""Spatial and temporal derivative operators on lat-lon-pressure grids.

All operators respect:
- Periodic zonal boundary conditions (globe wraps in longitude)
- One-sided differences at polar boundaries (meridional)
- Non-uniform pressure level spacing (pressure derivatives)

Two interfaces are provided:
1. NumPy functions for raw arrays: ``ddx``, ``ddy``, ``ddp``, ``ddt``
2. xarray wrappers: ``ddx_da``, ``ddy_da``, ``ddp_da``, ``ddt_da``
"""

from __future__ import annotations

import numpy as np
import xarray as xr

from .constants import R_EARTH


# ═══════════════════════════════════════════════════════════════
#  NumPy array operators
# ═══════════════════════════════════════════════════════════════


[docs] def ddx( field: np.ndarray, dx_arr: np.ndarray, periodic: bool = True, ) -> np.ndarray: """Zonal derivative ∂f/∂x with periodic wrapping. Uses centred finite differences in the interior. At the zonal boundaries the field wraps around the globe when *periodic* is True (default); otherwise one-sided differences are applied. Args: field: 2-D array ``(nlat, nlon)`` or 3-D ``(nlev, nlat, nlon)``. dx_arr: Zonal grid spacing per latitude [m], shape ``(nlat,)``. periodic: If True, wrap zonally (col 0 ↔ col -1). Returns: Same shape as *field*, in units of ``[field_units / m]``. """ if field.ndim == 3: return np.stack( [ddx(field[k], dx_arr, periodic) for k in range(field.shape[0])] ) nlat, nlon = field.shape out = np.empty_like(field) for j in range(nlat): # Interior: centred differences out[j, 1:-1] = (field[j, 2:] - field[j, :-2]) / (2 * dx_arr[j]) if periodic: out[j, 0] = (field[j, 1] - field[j, -1]) / (2 * dx_arr[j]) out[j, -1] = (field[j, 0] - field[j, -2]) / (2 * dx_arr[j]) else: out[j, 0] = (field[j, 1] - field[j, 0]) / dx_arr[j] out[j, -1] = (field[j, -1] - field[j, -2]) / dx_arr[j] return out
[docs] def ddy(field: np.ndarray, dy: float) -> np.ndarray: """Meridional derivative ∂f/∂y with one-sided differences at boundaries. Uses centred differences in the interior and forward/backward differences at the first/last latitude row. Args: field: 2-D ``(nlat, nlon)`` or 3-D ``(nlev, nlat, nlon)``. dy: Meridional grid spacing [m]. Returns: Same shape as *field*, in ``[field_units / m]``. """ if field.ndim == 3: return np.stack( [ddy(field[k], dy) for k in range(field.shape[0])] ) out = np.empty_like(field) out[1:-1] = (field[2:] - field[:-2]) / (2 * dy) out[0] = (field[1] - field[0]) / dy out[-1] = (field[-1] - field[-2]) / dy return out
[docs] def ddp(field: np.ndarray, plevs_pa: np.ndarray) -> np.ndarray: """Pressure derivative ∂f/∂p for non-uniform levels. Uses a three-point Lagrange stencil in the interior (second-order accurate on non-uniform grids, matching LOG20 ``d_dp.m``) and one-sided differences at the top/bottom pressure boundaries. Args: field: Array with pressure as axis 0, shape ``(nlev, ...)``. plevs_pa: Pressure levels in Pa, shape ``(nlev,)``. Returns: Same shape as *field*, in ``[field_units / Pa]``. """ nlev = field.shape[0] out = np.zeros_like(field) # Interior: 3-point Lagrange stencil (second-order on non-uniform grids) for k in range(1, nlev - 1): h_m = plevs_pa[k] - plevs_pa[k - 1] # h_minus h_p = plevs_pa[k + 1] - plevs_pa[k] # h_plus out[k] = ( -h_p / (h_m * (h_m + h_p)) * field[k - 1] + (h_p - h_m) / (h_m * h_p) * field[k] + h_m / (h_p * (h_m + h_p)) * field[k + 1] ) # Boundaries: one-sided differences dp0 = plevs_pa[1] - plevs_pa[0] dpn = plevs_pa[-1] - plevs_pa[-2] out[0] = (field[1] - field[0]) / dp0 out[-1] = (field[-1] - field[-2]) / dpn return out
[docs] def ddt(field: np.ndarray, dt_s: float) -> np.ndarray: """Time derivative ∂f/∂t via centred differences. Uses centred differences in the interior (second-order accurate) and second-order one-sided stencils at the first and last time steps when nt >= 3, matching ``numpy.gradient`` behaviour. Falls back to first-order forward/backward when nt == 2. Args: field: Array with time as axis 0, shape ``(nt, ...)``. dt_s: Time step in seconds (assumed uniform). Returns: Same shape as *field*, in ``[field_units / s]``. """ nt = field.shape[0] out = np.zeros_like(field) if nt >= 3: # Interior: centred differences — O(Δt²) out[1:-1] = (field[2:] - field[:-2]) / (2 * dt_s) # Boundaries: 2nd-order one-sided stencils out[0] = (-3 * field[0] + 4 * field[1] - field[2]) / (2 * dt_s) out[-1] = (3 * field[-1] - 4 * field[-2] + field[-3]) / (2 * dt_s) elif nt == 2: # Only two points — first-order is the best we can do out[0] = (field[1] - field[0]) / dt_s out[-1] = (field[-1] - field[-2]) / dt_s return out
def gradient_periodic( phi: np.ndarray, dx_arr: np.ndarray, dy: float, ) -> tuple[np.ndarray, np.ndarray]: """Horizontal gradient with periodic zonal wrapping. Convenience wrapper combining :func:`ddx` (periodic) and :func:`ddy`. Args: phi: 2-D field ``(nlat, nlon)``. dx_arr: Zonal spacing per latitude [m], shape ``(nlat,)``. dy: Meridional spacing [m]. Returns: Tuple ``(dphi_dx, dphi_dy)``, each ``(nlat, nlon)``. """ return ddx(phi, dx_arr, periodic=True), ddy(phi, dy) # ═══════════════════════════════════════════════════════════════ # Grid-spacing helpers # ═══════════════════════════════════════════════════════════════ def compute_dx_arr( grid_spacing_deg: float, center_lat: float, y_rel: np.ndarray, ) -> np.ndarray: """Build a cos(lat)-corrected zonal grid-spacing array. Args: grid_spacing_deg: Zonal grid spacing in degrees. center_lat: Latitude of the patch centre (degrees N). y_rel: 1-D array of relative y-coordinates (degrees), running from south to north within the patch. Returns: 1-D array of zonal grid spacing in metres, shape ``(len(y_rel),)``. """ actual_lats = float(center_lat) + np.asarray(y_rel, dtype=float) return np.abs(np.deg2rad(grid_spacing_deg)) * R_EARTH * np.cos( np.deg2rad(actual_lats) ) def compute_dy(grid_spacing_deg: float) -> float: """Meridional grid spacing in metres (latitude-independent). Args: grid_spacing_deg: Meridional grid spacing in degrees. Returns: Grid spacing in metres. """ return np.abs(np.deg2rad(grid_spacing_deg)) * R_EARTH # ═══════════════════════════════════════════════════════════════ # Second-order derivative operators # ═══════════════════════════════════════════════════════════════ def ddx_dx( field: np.ndarray, dx_arr: np.ndarray, periodic: bool = True, ) -> np.ndarray: r"""Second zonal derivative :math:`\partial^2 f / \partial x^2`. Computed by applying :func:`ddx` twice. Args: field: 2-D ``(nlat, nlon)`` or 3-D ``(nlev, nlat, nlon)``. dx_arr: Zonal spacing per latitude [m], shape ``(nlat,)``. periodic: Wrap zonally. Returns: Same shape as *field*, in ``[field_units / m²]``. """ return ddx(ddx(field, dx_arr, periodic), dx_arr, periodic) def ddy_dy(field: np.ndarray, dy: float) -> np.ndarray: r"""Second meridional derivative :math:`\partial^2 f / \partial y^2`. Computed by applying :func:`ddy` twice. Args: field: 2-D ``(nlat, nlon)`` or 3-D ``(nlev, nlat, nlon)``. dy: Meridional spacing [m]. Returns: Same shape as *field*, in ``[field_units / m²]``. """ return ddy(ddy(field, dy), dy) def ddx_dy( field: np.ndarray, dx_arr: np.ndarray, dy: float, periodic: bool = True, ) -> np.ndarray: r"""Mixed second derivative :math:`\partial^2 f / \partial x \partial y`. Computed as ``ddy(ddx(field))``. Args: field: 2-D ``(nlat, nlon)`` or 3-D ``(nlev, nlat, nlon)``. dx_arr: Zonal spacing per latitude [m], shape ``(nlat,)``. dy: Meridional spacing [m]. periodic: Wrap zonally for the :func:`ddx` step. Returns: Same shape as *field*, in ``[field_units / m²]``. """ return ddy(ddx(field, dx_arr, periodic), dy) def second_derivs( field: np.ndarray, lat: np.ndarray, lon: np.ndarray, method: str = "spectral", ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: r"""Second-order horizontal derivatives ``(f_xx, f_xy, f_yy)``. Default backend is spectral (spherical harmonics via :func:`pvtend.sh_ops.second_derivs_sh`), which closes cleanly at the poles and auto-mirrors NH-only inputs with scalar parity. The ``method="fd"`` path falls back to repeated finite-difference application of :func:`ddx` / :func:`ddy` on a uniform lat-lon grid. The output triple is what downstream basis projections (six-basis decomposition, gamma/tilt diagnostics) consume, and is now stored alongside the primary diagnostic in event NPZs. Args: field: Scalar field, shape ``(nlat, nlon)`` or ``(nlev, nlat, nlon)``. lat: Latitudes in degrees, ascending. lon: Longitudes in degrees in [0, 360). method: ``"spectral"`` (default) or ``"fd"``. Returns: Tuple ``(f_xx, f_xy, f_yy)`` with the same shape as *field*. """ if method in ("spectral", "sh"): from .sh_ops import second_derivs_sh, _SPHARM_AVAILABLE if not _SPHARM_AVAILABLE: import warnings warnings.warn( "pyspharm not installed; falling back to method='fd' for " "second_derivs. Install via `pip install pvtend[sh]`.", RuntimeWarning, stacklevel=2, ) method = "fd" else: if field.ndim == 3: fxx = np.empty_like(field, dtype=np.float64) fxy = np.empty_like(field, dtype=np.float64) fyy = np.empty_like(field, dtype=np.float64) for k in range(field.shape[0]): fxx[k], fxy[k], fyy[k] = second_derivs_sh(field[k], lat, lon) return fxx, fxy, fyy return second_derivs_sh(field, lat, lon) if method != "fd": raise ValueError(f"Unknown method {method!r}; use 'spectral' or 'fd'.") # ── FD fallback ── lat = np.asarray(lat, dtype=float).ravel() lon = np.asarray(lon, dtype=float).ravel() dlat = np.abs(lat[1] - lat[0]) if lat.size > 1 else 1.5 dlon = np.abs(lon[1] - lon[0]) if lon.size > 1 else 1.5 dy = np.deg2rad(dlat) * R_EARTH dx_arr = np.abs(np.deg2rad(dlon)) * R_EARTH * np.cos(np.deg2rad(lat)) dx_arr = np.maximum(dx_arr, dy * 0.1) fxx = ddx_dx(field, dx_arr) fxy = ddx_dy(field, dx_arr, dy) fyy = ddy_dy(field, dy) return fxx, fxy, fyy # ═══════════════════════════════════════════════════════════════ # Angular-coordinate derivative operators (matching LOG20) # ═══════════════════════════════════════════════════════════════
[docs] def d_dlambda( field: np.ndarray, dlambda: float, periodic: bool = True, ) -> np.ndarray: """Zonal derivative in angular coordinates ∂f/∂λ. Matches LOG20 ``d_dlambda.m``: centred differences in the interior, 3-point one-sided stencils at boundaries (non-periodic), or periodic wrapping. Args: field: 2-D ``(nlat, nlon)`` or 3-D ``(nlev, nlat, nlon)``. dlambda: Zonal grid spacing in **radians**. periodic: Wrap zonally (default True for full-globe grids). Returns: Same shape, in ``[field_units / radian]``. """ if field.ndim == 3: return np.stack( [d_dlambda(field[k], dlambda, periodic) for k in range(field.shape[0])] ) nlat, nlon = field.shape out = np.empty_like(field) # Interior: centred differences along lon (axis 1) out[:, 1:-1] = (field[:, 2:] - field[:, :-2]) / (2 * dlambda) if periodic: out[:, 0] = (field[:, 1] - field[:, -1]) / (2 * dlambda) out[:, -1] = (field[:, 0] - field[:, -2]) / (2 * dlambda) else: # 3-point one-sided (LOG20 d_dlambda.m) out[:, 0] = ( -1.5 * field[:, 0] + 2.0 * field[:, 1] - 0.5 * field[:, 2] ) / dlambda out[:, -1] = ( 1.5 * field[:, -1] - 2.0 * field[:, -2] + 0.5 * field[:, -3] ) / dlambda return out
[docs] def d_dphi( field: np.ndarray, dphi: float, ) -> np.ndarray: """Meridional derivative in angular coordinates ∂f/∂φ. Matches LOG20 ``d_dphi.m``: centred differences in the interior, 3-point one-sided stencils at the polar boundaries. Args: field: 2-D ``(nlat, nlon)`` or 3-D ``(nlev, nlat, nlon)``. dphi: Meridional grid spacing in **radians**. Returns: Same shape, in ``[field_units / radian]``. """ if field.ndim == 3: return np.stack( [d_dphi(field[k], dphi) for k in range(field.shape[0])] ) out = np.empty_like(field) # Interior: centred differences along lat (axis 0) out[1:-1, :] = (field[2:, :] - field[:-2, :]) / (2 * dphi) # Boundaries: 3-point one-sided (LOG20 d_dphi.m) out[0, :] = ( -1.5 * field[0, :] + 2.0 * field[1, :] - 0.5 * field[2, :] ) / dphi out[-1, :] = ( 1.5 * field[-1, :] - 2.0 * field[-2, :] + 0.5 * field[-3, :] ) / dphi return out
[docs] def div_spherical( Ax: np.ndarray, Ay: np.ndarray, lat_rad: np.ndarray, dphi: float, dlambda: float, periodic: bool = True, ) -> np.ndarray: """Spherical divergence of a 2-D vector field (Aλ, Aφ). Matches LOG20 ``div.m``: .. math:: \\nabla\\cdot\\mathbf{A} = \\frac{1}{a\\cos\\varphi}\\frac{\\partial A_\\lambda}{\\partial\\lambda} + \\frac{1}{a\\cos\\varphi} \\frac{\\partial(A_\\varphi\\cos\\varphi)}{\\partial\\varphi} Args: Ax: Zonal component, 2-D ``(nlat, nlon)``. Ay: Meridional component, 2-D ``(nlat, nlon)``. lat_rad: Latitude in radians, shape ``(nlat,)``. dphi: Meridional spacing [rad]. dlambda: Zonal spacing [rad]. periodic: Periodic zonal boundary. Returns: Scalar divergence field, ``(nlat, nlon)``. """ cos_phi = np.cos(lat_rad)[:, None] # (nlat, 1) inv_r_cos = 1.0 / (R_EARTH * cos_phi) # (nlat, 1) dAx_dlam = d_dlambda(Ax, dlambda, periodic) dAy_cos_dphi = d_dphi(Ay * cos_phi, dphi) return inv_r_cos * dAx_dlam + inv_r_cos * dAy_cos_dphi
# ═══════════════════════════════════════════════════════════════ # xarray DataArray wrappers # ═══════════════════════════════════════════════════════════════ def ddx_da(da: xr.DataArray, lon_name: str = "longitude") -> xr.DataArray: """Periodic zonal derivative via ``DataArray.roll()``. Uses centred differences with periodic roll so that the first and last longitude columns reference each other. Args: da: xarray DataArray with a longitude dimension. lon_name: Name of the longitude dimension. Returns: ``d(da)/dx`` in SI units ``[field / m]``. """ lon_vals = da[lon_name].values.astype(float) dlon_deg = float(np.nanmean(np.diff(lon_vals))) # Compute dx from latitude lat_name = _find_lat_dim(da) lat_rad = np.deg2rad(da[lat_name].values) dx_m = np.abs(np.deg2rad(dlon_deg)) * R_EARTH * np.cos(lat_rad) # Broadcast dx_m to match da's dims dx_da = xr.DataArray(dx_m, dims=[lat_name], coords={lat_name: da[lat_name]}) return ( da.roll({lon_name: -1}, roll_coords=False) - da.roll({lon_name: 1}, roll_coords=False) ) / (2.0 * dx_da) def ddy_da(da: xr.DataArray, lat_name: str = "latitude") -> xr.DataArray: """Meridional derivative ``d()/dy`` in SI units. Uses ``xr.DataArray.differentiate`` along latitude, then converts from degrees to metres. Args: da: xarray DataArray with a latitude dimension. lat_name: Name of the latitude dimension. Returns: ``d(da)/dy`` in ``[field / m]``. """ dy_m = np.deg2rad(1.0) * R_EARTH # metres per degree latitude return da.differentiate(lat_name) / dy_m def ddp_da(da: xr.DataArray, p_name: str = "pressure_level") -> xr.DataArray: """Pressure derivative ``d()/dp`` where coordinate is in hPa. Uses ``xr.DataArray.differentiate`` along the pressure coordinate, then converts from hPa to Pa (factor 100). Args: da: xarray DataArray with a pressure level dimension. p_name: Name of the pressure dimension. Returns: ``d(da)/dp`` in ``[field / Pa]``. """ return da.differentiate(p_name) / 100.0 # hPa → Pa def ddt_da(da: xr.DataArray, t_name: str = "valid_time") -> xr.DataArray: """Time derivative ``d()/dt`` in s⁻¹. Uses ``xr.DataArray.differentiate`` with ``datetime_unit='s'``. Args: da: xarray DataArray with a time dimension. t_name: Name of the time dimension. Returns: ``d(da)/dt`` in ``[field / s]``. """ return da.differentiate(coord=t_name, datetime_unit="s") # ═══════════════════════════════════════════════════════════════ # Helpers # ═══════════════════════════════════════════════════════════════ def _find_lat_dim(da: xr.DataArray) -> str: """Auto-detect latitude dimension name. Args: da: xarray DataArray to inspect. Returns: The name of the latitude dimension. Raises: ValueError: If no dimension containing ``'lat'`` is found. """ for name in da.dims: if "lat" in name.lower(): return name raise ValueError(f"No latitude dimension found in {da.dims}")