Source code for pvtend.tendency

"""PV tendency term computation for weather events.

Orchestrates the full computation pipeline for a single event:

1. Load ERA5 data for the time window
2. Subtract climatology to get anomalies
3. Compute all spatial/temporal derivatives
4. FFT Helmholtz decomposition on the full NH hemisphere
5. QG omega solver → omega_dry
6. Moist/dry decomposition → omega_moist, chi_moist
7. Extract event-centred patches
8. Compute PV cross-terms and vertical weighted averages
9. Write per-timestep NPZ files

The :class:`TendencyComputer` class is parameterized by event type
(blocking / PRP), eliminating the 95 % code duplication between the
original scripts.  Ported from
``tempest_extreme_4_basis/core/step2_compute_tendency_terms_blocking.py``
and ``step2_compute_tendency_terms_prp.py``.
"""

from __future__ import annotations

import gc
import os
import tempfile
from dataclasses import dataclass, field
from functools import partial
from pathlib import Path
from typing import Sequence

import numpy as np
import pandas as pd
import xarray as xr
from scipy.ndimage import gaussian_filter

from .constants import (
    CP_DRY,
    DEFAULT_LEVELS,
    G0,
    H_SCALE,
    KAPPA,
    OMEGA_E,
    R_DRY,
    R_EARTH,
    WAVG_LEVELS,
    LAT_HALF,
    LON_HALF,
    GEO_SMOOTH_SIGMA,
    F_MIN_LAT,
    LAT_QG_LO,
    LAT_QG_HI,
    LAT_QG_POLAR,
    SP19_DRY_FRACTION,
    CLIM_VARIABLES,
    MONTH_ABBREVS,
)
from .derivatives import ddx, ddy, ddp, ddt
from .helmholtz import helmholtz_decomposition, solve_poisson_spherical_fft, gradient
from .climatology import load_helmholtz_climatology
from .omega import (
    solve_qg_omega_sip,
    _compute_diabatic_rhs_log20,
    _compute_diabatic_rhs_emanuel,
)

# Alias for brevity
LEVELS = DEFAULT_LEVELS

# ── Full list of variables stored in each NPZ ──────────────────────────
VARS_3D: list[str] = [
    "z", "pv", "u", "v", "w", "t", "q", "t_dt",
    "pv_dt", "pv_total_dx", "pv_total_dy", "pv_total_dp",
    "u_bar", "v_bar", "w_bar", "pv_bar",
    "u_anom", "v_anom", "w_anom", "pv_anom",
    "pv_bar_dx", "pv_bar_dy", "pv_bar_dp", "pv_bar_dt",
    "pv_anom_dx", "pv_anom_dy", "pv_anom_dp", "pv_anom_dt",
    "theta", "theta_dt", "theta_dot", "Q",
    # Total-wind Helmholtz (v2.0)
    "u_rot", "u_div", "v_rot", "v_div",
    # Climatological Helmholtz (v2.0)
    "u_rot_bar", "u_div_bar", "v_rot_bar", "v_div_bar",
    # Anomaly Helmholtz (total − clim)
    "u_rot_anom", "u_div_anom", "u_har_anom",
    "v_rot_anom", "v_div_anom", "v_har_anom",
    "u_div_diabatic", "v_div_diabatic",
    "u_div_adiabatic", "v_div_adiabatic",
    "u_div_qg_diabatic", "v_div_qg_diabatic",
    "w_adiabatic", "w_diabatic", "w_qg_diabatic",
    # Second-order PV derivatives (full field)
    "pv_total_dx_dx", "pv_total_dy_dy", "pv_total_dx_dy",
]

# Variables extracted from the DS for each patch
_EXTRACT_VARS = [
    "z", "pv", "u", "v", "w", "t", "q", "t_dt",
    "pv_dt", "pv_total_dx", "pv_total_dy", "pv_total_dp",
    "u_bar", "v_bar", "w_bar", "pv_bar",
    "u_anom", "v_anom", "w_anom", "pv_anom",
    "pv_bar_dx", "pv_bar_dy", "pv_bar_dp", "pv_bar_dt",
    "pv_anom_dx", "pv_anom_dy", "pv_anom_dp", "pv_anom_dt",
    "theta", "theta_dt", "theta_dot", "Q",
    # Total-wind Helmholtz (v2.0)
    "u_rot", "u_div", "v_rot", "v_div",
    # Climatological Helmholtz (v2.0)
    "u_rot_bar", "u_div_bar", "v_rot_bar", "v_div_bar",
    # Anomaly Helmholtz (total − clim)
    "u_rot_anom", "u_div_anom", "u_har_anom",
    "v_rot_anom", "v_div_anom", "v_har_anom",
    "z_bar", "t_bar",
    # Second-order PV derivatives
    "pv_total_dx_dx", "pv_total_dy_dy", "pv_total_dx_dy",
]


def _log(msg: str) -> None:
    """Print with flush."""
    print(msg, flush=True)


# ============================================================================
#  Config
# ============================================================================
[docs] @dataclass class TendencyConfig: """Configuration for PV tendency computation. Attributes: event_type: ``'blocking'`` or ``'prp'``. data_dir: Path to ERA5 monthly NetCDF files. clim_path: Path to climatology file or directory. clim_helmholtz_dir: Directory with pre-computed Helmholtz climatology files (from ``pvtend-pipeline clim-helmholtz``). output_dir: Root output directory for NPZ files. csv_path: Path to TempestExtremes event CSV. track_file: Path to tracking data file (for lagrangian mode). levels: Pressure levels [hPa]. wavg_levels: Subset of *levels* used for vertical weighted averaging. rel_hours: Relative hour offsets from the event reference time. year_start: First event year. year_end: Last event year. lat_half: Half-width of the extraction patch in degrees latitude. lon_half: Half-width of the extraction patch in degrees longitude. partial_at_pole: Allow truncated patches near the poles. qg_omega_method: ``'log20'`` (SIP) or ``'sp19'`` (empirical scaling). center_mode: ``'eulerian'`` or ``'lagrangian'``. skip_existing: Skip events with existing NPZ output. engine: NetCDF engine passed to xarray. n_workers: Number of parallel workers for multiprocessing. """ event_type: str = "blocking" data_dir: Path = Path("/net/flood/data2/users/x_yan/era") clim_path: Path = Path( "/net/flood/data2/users/x_yan/era/era5_hourly_clim_1990-2019.nc" ) clim_helmholtz_dir: Path = Path( "/net/flood/data2/users/x_yan/era/clim" ) output_dir: Path = Path( "/net/flood/data2/users/x_yan/composite_blocking_tempest" ) csv_path: Path = Path("") track_file: Path = Path("") levels: list[int] = field(default_factory=lambda: list(LEVELS)) wavg_levels: list[int] = field(default_factory=lambda: list(WAVG_LEVELS)) rel_hours: list[int] = field(default_factory=lambda: list(range(-49, 25))) year_start: int = 1990 year_end: int = 2020 lat_half: float = LAT_HALF lon_half: float = LON_HALF partial_at_pole: bool = True qg_omega_method: str = "log20" center_mode: str = "eulerian" skip_existing: bool = True engine: str = "netcdf4" n_workers: int = 1
# ============================================================================ # Climatology loading # ============================================================================ def _find_per_var_month_files(parent: Path, stem: str) -> list[Path]: """Discover per-variable-per-month climatology files.""" files = [] for m_abbr in MONTH_ABBREVS: for var in CLIM_VARIABLES: f_raw = parent / f"{stem}_{m_abbr}_{var}.nc" if f_raw.exists(): files.append(f_raw) return files
[docs] def load_climatology( clim_path: Path, engine: str = "netcdf4", ) -> xr.Dataset: """Load climatology, auto-detecting the file layout. Fallback order: 1. Single merged file 2. Per-var-per-month files 3. Per-variable files """ clim_path = Path(clim_path) if clim_path.is_file(): _log(f"Loading climatology from single file: {clim_path}") return xr.open_dataset(clim_path, chunks=None, engine=engine, lock=False) parent = clim_path.parent stem = clim_path.stem.replace("_allvars", "") pvm_files = _find_per_var_month_files(parent, stem) if pvm_files: _log(f"Loading climatology from {len(pvm_files)} " f"per-var-per-month files") return xr.open_mfdataset( [str(f) for f in pvm_files], chunks=None, engine=engine, lock=False, combine="by_coords", join="outer", ) per_var = sorted( f for f in parent.glob(f"{stem}_*.nc") if "_smoothed" not in f.stem and "_allvars" not in f.stem ) if per_var: _log(f"Loading climatology from {len(per_var)} per-variable files") return xr.open_mfdataset(per_var, chunks=None, engine=engine, lock=False) raise FileNotFoundError( f"Climatology missing: {clim_path} " f"(and no per-variable files matching '{stem}_*' in {parent})" )
# ============================================================================ # ERA5 data loading helpers # ============================================================================ def _ensure_valid_time(ds: xr.Dataset) -> xr.Dataset: if "valid_time" in ds.coords: return ds if "time" in ds.coords: return ds.rename({"time": "valid_time"}) raise KeyError("Neither 'valid_time' nor 'time' coordinate present.") def _drop_cds_artefacts(ds: xr.Dataset) -> xr.Dataset: to_drop = [v for v in ("number", "expver") if v in ds.coords or v in ds.data_vars] if to_drop: ds = ds.drop_vars(to_drop, errors="ignore") return ds def _plev_name(ds: xr.Dataset) -> str: for nm in ("pressure_level", "level"): if nm in ds.dims or nm in ds.coords: return nm raise KeyError("No pressure level dimension.")
[docs] def month_keys_for_window( base_ts: pd.Timestamp, hmin: int = -49, hmax: int = 24, ) -> list[tuple[int, int]]: """Determine which (year, month) files are needed for the time window.""" t0 = (pd.to_datetime(base_ts) + pd.Timedelta(hours=hmin)).to_period("M") t1 = (pd.to_datetime(base_ts) + pd.Timedelta(hours=hmax)).to_period("M") months = pd.period_range(t0, t1, freq="M") return [(int(p.year), int(p.month)) for p in months]
[docs] def open_months_ds( data_dir: Path, var_list: list[str], month_keys: list[tuple[int, int]], engine: str = "netcdf4", ) -> xr.Dataset: """Open multiple months of ERA5 data as a single dataset.""" data_dir = Path(data_dir) parts = [] for v in var_list: fns = [ str(data_dir / f"era5_{v}_{y}_{m:02d}.nc") for y, m in month_keys if (data_dir / f"era5_{v}_{y}_{m:02d}.nc").exists() ] if not fns: raise FileNotFoundError(f"No files for {v} in months {month_keys}") dsv = xr.open_mfdataset( fns, combine="by_coords", parallel=False, chunks=None, engine=engine, lock=False, ) dsv = _drop_cds_artefacts(dsv) dsv = _ensure_valid_time(dsv) if "level" in dsv.dims and "pressure_level" not in dsv.dims: dsv = dsv.rename({"level": "pressure_level"}) dsv = dsv[[v]] parts.append(dsv) ds = xr.merge(parts, compat="no_conflicts", join="inner") ds = ds.assign_coords( longitude=((ds.longitude + 180) % 360) - 180, ).sortby("longitude") return ds
# ============================================================================ # Tracking data for Lagrangian mode # ============================================================================ _TRACK_DF: pd.DataFrame | None = None def _load_track_data(track_file: Path) -> pd.DataFrame: global _TRACK_DF if _TRACK_DF is None: _log(f"Loading tracking data from {track_file}...") df = pd.read_csv( track_file, sep=r"\s+", names=["track_id", "step", "time", "centlat", "centlon", "area"], skiprows=1, ) df["time"] = df["time"].str.strip('"') df["time"] = pd.to_datetime(df["time"]) _TRACK_DF = df return _TRACK_DF
[docs] def get_tracked_center( track_file: Path, track_id: int, target_time: pd.Timestamp, ) -> tuple[float | None, float | None]: """Look up Lagrangian centre from tracking data.""" df = _load_track_data(track_file) mask = (df["track_id"] == track_id) & (df["time"] == target_time) matches = df[mask] if len(matches) == 0: return None, None row = matches.iloc[0] lat = float(row["centlat"]) lon = float(row["centlon"]) if lon > 180: lon -= 360 return lat, lon
# ============================================================================ # Geostrophic wind & gradient helpers # ============================================================================ def _gaussian_smooth_2d( field_2d: np.ndarray, sigma: float = GEO_SMOOTH_SIGMA, ) -> np.ndarray: """NaN-tolerant Gaussian smoothing via normalised convolution.""" mask = np.isnan(field_2d) filled = field_2d.copy() filled[mask] = 0.0 weights = np.ones_like(field_2d) weights[mask] = 0.0 s_field = gaussian_filter(filled, sigma=sigma, mode="wrap") s_weight = gaussian_filter(weights, sigma=sigma, mode="wrap") s_weight[s_weight < 1e-10] = np.nan return s_field / s_weight def _grad_np_periodic_x( phi: np.ndarray, dx_arr: np.ndarray, dy: float, ) -> tuple[np.ndarray, np.ndarray]: """Gradient with periodic zonal boundary, one-sided meridional at poles.""" nlat, nlon = phi.shape dphi_dx = np.empty_like(phi) dphi_dy = np.empty_like(phi) for j in range(nlat): dphi_dx[j, 1:-1] = (phi[j, 2:] - phi[j, :-2]) / (2 * dx_arr[j]) dphi_dx[j, 0] = (phi[j, 1] - phi[j, -1]) / (2 * dx_arr[j]) dphi_dx[j, -1] = (phi[j, 0] - phi[j, -2]) / (2 * dx_arr[j]) dphi_dy[1:-1] = (phi[2:] - phi[:-2]) / (2 * dy) dphi_dy[0] = (phi[1] - phi[0]) / dy dphi_dy[-1] = (phi[-1] - phi[-2]) / dy return dphi_dx, dphi_dy def _compute_geostrophic_wind( phi_3d: np.ndarray, lat: np.ndarray, lon: np.ndarray, sigma_smooth: int = 0, ) -> tuple[np.ndarray, np.ndarray]: """Geostrophic wind (u_g, v_g) from geopotential Φ on ascending lat.""" nlev, nlat, nlon = phi_3d.shape lat_rad = np.deg2rad(lat) f_arr = 2 * OMEGA_E * np.sin(lat_rad) f_min = 2 * OMEGA_E * np.sin(np.deg2rad(F_MIN_LAT)) f_arr = np.where(np.abs(f_arr) < f_min, np.sign(f_arr + 1e-30) * f_min, f_arr) dlat = np.abs(lat[1] - lat[0]) if nlat > 1 else 1.5 dlon = np.abs(lon[1] - lon[0]) if nlon > 1 else 1.5 dy = np.deg2rad(dlat) * R_EARTH dx_arr = np.deg2rad(dlon) * R_EARTH * np.cos(lat_rad) dx_arr = np.maximum(dx_arr, dy * 0.01) u_g = np.zeros_like(phi_3d) v_g = np.zeros_like(phi_3d) for k in range(nlev): phi_k = phi_3d[k] if sigma_smooth > 0: phi_k = _gaussian_smooth_2d(phi_k, sigma=sigma_smooth) dphi_dx, dphi_dy = _grad_np_periodic_x(phi_k, dx_arr, dy) for j in range(nlat): u_g[k, j, :] = -dphi_dy[j, :] / f_arr[j] v_g[k, j, :] = dphi_dx[j, :] / f_arr[j] return u_g, v_g # ============================================================================ # Velocity potential solver (full NH, spherical Laplacian) # ============================================================================ def _solve_chi_nh( omega_nh: np.ndarray, lat_nh: np.ndarray, lon_nh: np.ndarray, plevs_pa: np.ndarray, method: str = "spectral", ) -> tuple[np.ndarray, np.ndarray]: """Solve ∇²χ = -∂ω/∂p on full NH, return u/v divergent wind. Default uses the spherical-harmonic Poisson inverse via :func:`pvtend.sh_ops.invert_laplacian_sh` with NH→global parity mirroring (scalar parity for χ), giving pole-closed χ at 90°N. Pass ``method="fd"`` to fall back to the legacy spherical-FFT Poisson solver. """ nlev, _, _ = omega_nh.shape rhs = -ddp(omega_nh, plevs_pa) if method in ("spectral", "sh"): from .sh_ops import invert_laplacian_sh, gradient_sh, _SPHARM_AVAILABLE if not _SPHARM_AVAILABLE: import warnings warnings.warn( "pyspharm not installed; falling back to FD for " "u_div_from_omega. Install via `pip install pvtend[sh]`.", RuntimeWarning, stacklevel=2, ) method = "fd" else: u_div_nh = np.zeros_like(omega_nh) v_div_nh = np.zeros_like(omega_nh) for k in range(nlev): chi_k = invert_laplacian_sh( rhs[k], lat_nh, lon_nh, R_earth=R_EARTH, parity="scalar", ) dchi_dx, dchi_dy = gradient_sh(chi_k, lat_nh, lon_nh, R_earth=R_EARTH) u_div_nh[k] = dchi_dx v_div_nh[k] = dchi_dy return u_div_nh, v_div_nh # ── Legacy FD path (preserved for regression) ── lat_rad = np.deg2rad(lat_nh) dlat = float(np.abs(np.diff(lat_nh).mean())) dlon = float(np.abs(np.diff(lon_nh).mean())) dy = np.deg2rad(dlat) * R_EARTH dx_arr = np.deg2rad(dlon) * R_EARTH * np.cos(lat_rad) dx_arr = np.maximum(dx_arr, dy * 0.1) dlon_rad = np.deg2rad(dlon) cos_phi = np.cos(lat_rad) area_weights = cos_phi / cos_phi.sum() nlon = omega_nh.shape[2] for k in range(nlev): weighted_mean = np.sum(area_weights[:, None] * rhs[k]) / nlon rhs[k] -= weighted_mean u_div_nh = np.zeros_like(omega_nh) v_div_nh = np.zeros_like(omega_nh) for k in range(nlev): chi_k = solve_poisson_spherical_fft( rhs[k], lat_nh, dy, dlon_rad, R_earth=R_EARTH ) dchi_dx, dchi_dy = gradient(chi_k, dx_arr, dy) u_div_nh[k] = dchi_dx v_div_nh[k] = dchi_dy return u_div_nh, v_div_nh # Backward-compatible alias _solve_chi_moist_nh = _solve_chi_nh # ============================================================================ # Patch-level QG omega + moist/dry decomposition # ============================================================================ def _qg_diabatic_adiabatic_on_patch( cube3d: dict[str, np.ndarray], lat_vec: np.ndarray, lon_vec: np.ndarray, plevs_hpa: np.ndarray, center_lat: float, qg_method: str = "log20", nh_data: dict | None = None, ) -> None: """QG omega + 4-way adiabatic/diabatic decomposition on the event patch. When *qg_method* is ``"log20"`` (default), performs three full SIP solves on the NH domain to separate vertical velocity into four components: ω_adiabatic = QG omega (terms A+B only, no diabatic forcing) ω_qg_diabatic = QG omega (A+B+C_log20) − ω_adiabatic ω_lhr_moist = QG omega (A+B+C_em) − ω_adiabatic ω_diabatic = ω_total − ω_adiabatic [total diabatic residual] C_log20 uses the full LOG20 J = J₁+J₂ with spherical Laplacian. C_em uses the Emanuel LHR formulation J_em = c_p θ̇_LHR T/θ. When *qg_method* is ``"sp19"`` (Steinfeld & Pfahl 2019), uses the empirical 1/3–2/3 scaling (no elliptic solve): ω_adiabatic = (1/3) ω_total ω_diabatic = ω_qg_diabatic = (2/3) ω_total For each omega component the divergent wind is recovered via Poisson inversion: ∇²χ = −∂ω/∂p → (u_div, v_div) = ∇χ Modifies *cube3d* in-place, adding: w_adiabatic, w_diabatic, w_qg_diabatic, w_lhr_moist, u_div_diabatic, v_div_diabatic, u_div_adiabatic, v_div_adiabatic, u_div_qg_diabatic, v_div_qg_diabatic, u_div_lhr_moist, v_div_lhr_moist """ nlevs, nlat, nlon = cube3d["z"].shape valid = ~np.isnan(lat_vec) n_valid = int(valid.sum()) if n_valid < 3: zeros = np.zeros((nlevs, nlat, nlon), dtype=np.float32) for k in ("w_adiabatic", "w_diabatic", "w_qg_diabatic", "w_lhr_moist", "u_div_diabatic", "v_div_diabatic", "u_div_adiabatic", "v_div_adiabatic", "u_div_qg_diabatic", "v_div_qg_diabatic", "u_div_lhr_moist", "v_div_lhr_moist"): cube3d[k] = zeros.copy() return lat_v = lat_vec[valid] psort = np.argsort(plevs_hpa) plevs_pa = plevs_hpa[psort] * 100.0 def pick(arr3d): return np.nan_to_num(arr3d[psort][:, valid, :], nan=0.0) def unpack(arr_sv): out = np.zeros((nlevs, nlat, nlon), dtype=np.float32) for ki, si in enumerate(psort): out[si, valid, :] = arr_sv[ki] return out z_sv = pick(cube3d["z"]) t_sv = pick(cube3d["t"]) w_sv = pick(cube3d["w"]) ug, vg = _compute_geostrophic_wind(z_sv, lat_v, lon_vec) # ---- SP19: empirical 1/3 dry, 2/3 moist (no elliptic solve) ---- if qg_method == "sp19": from .constants import SP19_DRY_FRACTION cube3d["w_adiabatic"] = SP19_DRY_FRACTION * cube3d["w"] cube3d["w_diabatic"] = cube3d["w"] - cube3d["w_adiabatic"] cube3d["w_qg_diabatic"] = cube3d["w_diabatic"].copy() # Poisson inversions on full NH for divergent wind recovery if nh_data is None: raise ValueError( "nh_data is required for divergent-wind Poisson inversion" ) lat_nh = nh_data["lat"] lon_nh = nh_data["lon"] if lat_nh[0] > lat_nh[-1]: lat_nh_asc = lat_nh[::-1] flip_nh = True else: lat_nh_asc = lat_nh flip_nh = False def _prep_sp19(arr3d): out = arr3d[psort] if flip_nh: out = out[:, ::-1, :] return np.nan_to_num(out, nan=0.0) w_nh = _prep_sp19(nh_data["w"]) w_adiabatic_nh = SP19_DRY_FRACTION * w_nh w_diabatic_nh = w_nh - w_adiabatic_nh udm_nh, vdm_nh = _solve_chi_nh( w_diabatic_nh, lat_nh_asc, lon_nh, plevs_pa) udd_nh, vdd_nh = _solve_chi_nh( w_adiabatic_nh, lat_nh_asc, lon_nh, plevs_pa) lat_idx = np.array([np.argmin(np.abs(lat_nh_asc - la)) for la in lat_v]) def _circ_nearest_sp19(lv): d = np.abs((lon_nh - lv + 180) % 360 - 180) return int(np.argmin(d)) lon_idx = np.array([_circ_nearest_sp19(lo) for lo in lon_vec]) ix = np.ix_(np.arange(w_adiabatic_nh.shape[0]), lat_idx, lon_idx) cube3d["u_div_diabatic"] = unpack(udm_nh[ix]) cube3d["v_div_diabatic"] = unpack(vdm_nh[ix]) cube3d["u_div_qg_diabatic"] = cube3d["u_div_diabatic"].copy() cube3d["v_div_qg_diabatic"] = cube3d["v_div_diabatic"].copy() cube3d["u_div_adiabatic"] = unpack(udd_nh[ix]) cube3d["v_div_adiabatic"] = unpack(vdd_nh[ix]) return # ---- LOG20 (default): Full SIP solve on NH domain ---- # nh_data is required — all solves run on the full NH domain if nh_data is None: raise ValueError( "nh_data is required for _qg_diabatic_adiabatic_on_patch; " "local-patch fallback has been removed" ) # --- Full NH solves (spherical Poisson, periodic zonal BCs) --- lat_nh = nh_data["lat"] lon_nh = nh_data["lon"] if lat_nh[0] > lat_nh[-1]: lat_nh_asc = lat_nh[::-1] flip_nh = True else: lat_nh_asc = lat_nh flip_nh = False def _prep(arr3d): out = arr3d[psort] if flip_nh: out = out[:, ::-1, :] return np.nan_to_num(out, nan=0.0) z_nh = _prep(nh_data["z"]) t_nh = _prep(nh_data["t"]) w_nh = _prep(nh_data["w"]) u_nh = _prep(nh_data["u"]) v_nh = _prep(nh_data["v"]) ug_nh, vg_nh = _compute_geostrophic_wind(z_nh, lat_nh_asc, lon_nh) # --- Compute local 3-D static stability for LOG20 J₂ --- nlev_nh = t_nh.shape[0] kappa_s = R_DRY / CP_DRY sigma_3d_nh = np.zeros_like(t_nh) for k in range(1, nlev_nh - 1): dp_s = plevs_pa[k + 1] - plevs_pa[k - 1] th_kp1 = t_nh[k + 1] * (1e5 / plevs_pa[k + 1]) ** kappa_s th_km1 = t_nh[k - 1] * (1e5 / plevs_pa[k - 1]) ** kappa_s dlnt = np.log(th_kp1) - np.log(th_km1) sigma_3d_nh[k] = -(R_DRY * t_nh[k] / plevs_pa[k]) * (dlnt / dp_s) sigma_3d_nh[0] = sigma_3d_nh[1] sigma_3d_nh[-1] = sigma_3d_nh[-2] sigma_3d_nh = np.maximum(sigma_3d_nh, 1e-7) # ── Solve 1: QG omega terms A+B → ω_dry ── od_nh, _ = solve_qg_omega_sip( ug_nh, vg_nh, t_nh, lat_nh_asc, lon_nh, plevs_pa, center_lat=center_lat, omega_b=w_nh, phi_3d=z_nh, bc_top=0.0, bc_bot=0.0) # ── Solve 2: Direct C-only QG (LOG20 full J) → ω_qg_diabatic ── # Exploits operator linearity: solve(C) = solve(A+B+C) - solve(A+B) # verified in research_questions/09_qg_moist_linearity to machine precision. # Zero wind → A=B=0; zero lateral BCs (omega_b=None). dTdt_raw = nh_data.get("t_dt") u_zero = np.zeros_like(u_nh) v_zero = np.zeros_like(v_nh) if dTdt_raw is not None: dTdt_nh = _prep(dTdt_raw) C_log20 = _compute_diabatic_rhs_log20( t_nh, dTdt_nh, u_nh, v_nh, w_nh, sigma_3d_nh, plevs_pa, lat_nh_asc, lon_nh) w_qg_diabatic_nh, _ = solve_qg_omega_sip( u_zero, v_zero, t_nh, lat_nh_asc, lon_nh, plevs_pa, center_lat=center_lat, omega_b=None, rhs_c=C_log20, phi_3d=z_nh, bc_top=0.0, bc_bot=0.0) else: w_qg_diabatic_nh = np.zeros_like(od_nh) # ── Solve 3: Direct C-only Emanuel (LHR) → ω_lhr_moist ── tdot_raw = nh_data.get("theta_dot") theta_raw = nh_data.get("theta") if tdot_raw is not None and theta_raw is not None: tdot_nh = _prep(tdot_raw) theta_nh = _prep(theta_raw) C_em = _compute_diabatic_rhs_emanuel( tdot_nh, t_nh, theta_nh, plevs_pa, lat_nh_asc, lon_nh) w_em_diabatic_nh, _ = solve_qg_omega_sip( u_zero, v_zero, t_nh, lat_nh_asc, lon_nh, plevs_pa, center_lat=center_lat, omega_b=None, rhs_c=C_em, phi_3d=z_nh, bc_top=0.0, bc_bot=0.0) else: w_em_diabatic_nh = np.zeros_like(od_nh) # Diabatic omega: ERA5 ω − ω_adiabatic (observational residual) w_diabatic_nh = w_nh - od_nh # w_qg_diabatic_nh and w_em_diabatic_nh already from direct C-only solves # Independent Poisson inversions on full NH (spherical Laplacian) udm_nh, vdm_nh = _solve_chi_nh( w_diabatic_nh, lat_nh_asc, lon_nh, plevs_pa) udd_nh, vdd_nh = _solve_chi_nh( od_nh, lat_nh_asc, lon_nh, plevs_pa) udqm_nh, vdqm_nh = _solve_chi_nh( w_qg_diabatic_nh, lat_nh_asc, lon_nh, plevs_pa) udem_nh, vdem_nh = _solve_chi_nh( w_em_diabatic_nh, lat_nh_asc, lon_nh, plevs_pa) # Extract patch from full NH solutions lat_idx = np.array([np.argmin(np.abs(lat_nh_asc - la)) for la in lat_v]) def _circ_nearest(lv): d = np.abs((lon_nh - lv + 180) % 360 - 180) return int(np.argmin(d)) lon_idx = np.array([_circ_nearest(lo) for lo in lon_vec]) ix = np.ix_(np.arange(od_nh.shape[0]), lat_idx, lon_idx) od = od_nh[ix] wqm_sv = w_qg_diabatic_nh[ix] wem_sv = w_em_diabatic_nh[ix] udm_sv = udm_nh[ix] vdm_sv = vdm_nh[ix] udd_sv = udd_nh[ix] vdd_sv = vdd_nh[ix] udqm_sv = udqm_nh[ix] vdqm_sv = vdqm_nh[ix] udem_sv = udem_nh[ix] vdem_sv = vdem_nh[ix] # --- Unpack & store --- cube3d["w_adiabatic"] = unpack(od) cube3d["w_diabatic"] = cube3d["w"] - cube3d["w_adiabatic"] cube3d["w_qg_diabatic"] = unpack(wqm_sv) cube3d["w_lhr_moist"] = unpack(wem_sv) cube3d["u_div_diabatic"] = unpack(udm_sv) cube3d["v_div_diabatic"] = unpack(vdm_sv) cube3d["u_div_qg_diabatic"] = unpack(udqm_sv) cube3d["v_div_qg_diabatic"] = unpack(vdqm_sv) cube3d["u_div_lhr_moist"] = unpack(udem_sv) cube3d["v_div_lhr_moist"] = unpack(vdem_sv) cube3d["u_div_adiabatic"] = unpack(udd_sv) cube3d["v_div_adiabatic"] = unpack(vdd_sv) # ============================================================================ # with_derivs_for_window (the main "big array" builder) # ============================================================================
[docs] def with_derivs_for_window( base_ts: pd.Timestamp, cfg: TendencyConfig, clim_ds: xr.Dataset, ) -> xr.Dataset: """Open data for base_ts window, compute all bars/anoms/derivatives, Helmholtz decomposition on full NH. Args: base_ts: Event reference timestamp. cfg: Tendency configuration. clim_ds: Pre-loaded climatology dataset. Returns: xr.Dataset with all original + derived fields on the ERA5 grid. """ month_keys = month_keys_for_window( base_ts, hmin=cfg.rel_hours[0], hmax=cfg.rel_hours[-1]) ds = open_months_ds(cfg.data_dir, ["u", "v", "w", "pv", "z", "t", "q"], month_keys, engine=cfg.engine) # --- lat metrics & Coriolis --- ds["latitude_rad"] = np.deg2rad(ds.latitude) lat_rad_vals = ds["latitude_rad"].values.copy() lat_rad_vals[np.abs(lat_rad_vals - np.pi / 2) < 0.01] = np.pi / 2 - 0.01 ds["latitude_rad"] = xr.DataArray(lat_rad_vals, dims=["latitude"]) ds["f"] = 2 * OMEGA_E * np.sin(ds["latitude_rad"]) # --- climatology --- CLIM = clim_ds mo = ds.valid_time.dt.month dy = ds.valid_time.dt.day hr = ds.valid_time.dt.hour ds["pv_bar"] = CLIM["pv"].sel(month=mo, day=dy, hour=hr) ds["u_bar"] = CLIM["u"].sel(month=mo, day=dy, hour=hr) ds["v_bar"] = CLIM["v"].sel(month=mo, day=dy, hour=hr) ds["w_bar"] = CLIM["w"].sel(month=mo, day=dy, hour=hr) ds["z_bar"] = CLIM["z"].sel(month=mo, day=dy, hour=hr) ds["t_bar"] = CLIM["t"].sel(month=mo, day=dy, hour=hr) ds["pv_anom"] = ds["pv"] - ds["pv_bar"] ds["u_anom"] = ds["u"] - ds["u_bar"] ds["v_anom"] = ds["v"] - ds["v_bar"] ds["w_anom"] = ds["w"] - ds["w_bar"] # --- grid spacings --- dy_m = 2 * np.pi * R_EARTH / 360 plev = _plev_name(ds) # --- Derivative helpers --- def _ddx_periodic_da(da, lon_name="longitude"): lon_vals = da[lon_name].values.astype(float) dlon_deg = float(np.nanmean(np.diff(lon_vals))) dx_m = np.deg2rad(abs(dlon_deg)) * R_EARTH * np.cos(ds["latitude_rad"]) return (da.roll({lon_name: -1}, roll_coords=False) - da.roll({lon_name: 1}, roll_coords=False)) / (2.0 * dx_m) def _ddy_da(da, lat_name="latitude"): return da.differentiate(lat_name) / dy_m def _ddp_da(da, p_name=plev): return da.differentiate(p_name) / 100.0 def _ddt_da(da, t_name="valid_time"): return da.differentiate(coord=t_name, datetime_unit="s") # --- PV derivatives --- ds["pv_anom_dx"] = _ddx_periodic_da(ds.pv_anom) ds["pv_anom_dy"] = _ddy_da(ds.pv_anom) ds["pv_anom_dp"] = _ddp_da(ds.pv_anom) ds["pv_bar_dx"] = _ddx_periodic_da(ds.pv_bar) ds["pv_bar_dy"] = _ddy_da(ds.pv_bar) ds["pv_bar_dp"] = _ddp_da(ds.pv_bar) ds["pv_total_dx"] = _ddx_periodic_da(ds.pv) ds["pv_total_dy"] = _ddy_da(ds.pv) ds["pv_total_dp"] = _ddp_da(ds.pv) ds["pv_anom_dt"] = _ddt_da(ds.pv_anom) ds["pv_bar_dt"] = _ddt_da(ds.pv_bar) ds["pv_dt"] = _ddt_da(ds.pv) # --- Second-order PV derivatives (full field, for 6-basis decomposition) --- ds["pv_total_dx_dx"] = _ddx_periodic_da(ds.pv_total_dx) ds["pv_total_dy_dy"] = _ddy_da(ds.pv_total_dy) ds["pv_total_dx_dy"] = _ddy_da(ds.pv_total_dx) # --- θ and Q terms (Emanuel 1987 / Tamarin & Kaspi 2016 LHR) --- kappa = 0.286 L_V = 2.501e6 # latent heat of vapourisation [J/kg] R_V = 461.5 # gas constant for water vapour [J/(kg·K)] gamma_d = G0 / CP_DRY # dry adiabatic lapse rate [K/m] ds["theta"] = ds["t"] * (1000.0 / ds[plev]) ** kappa ds["theta_dt"] = _ddt_da(ds["theta"]) # local tendency (diagnostic) ds["theta_dp"] = _ddp_da(ds.theta) # Eulerian temperature tendency (proxy for diabatic heating J/Cp) ds["t_dt"] = _ddt_da(ds["t"]) # saturation vapour pressure (Bolton 1980) and specific humidity p_pa = ds[plev] * 100.0 # hPa → Pa es = 611.2 * np.exp(17.67 * (ds["t"] - 273.15) / (ds["t"] - 29.65)) qs = 0.622 * es / (p_pa - 0.378 * es) # equivalent potential temperature (uses actual q from ERA5) ds["theta_e"] = ds["theta"] * np.exp(L_V * ds["q"] / (CP_DRY * ds["t"])) ds["theta_e_dp"] = _ddp_da(ds["theta_e"]) # moist adiabatic lapse rate gamma_m = gamma_d * ((1.0 + L_V * qs / (R_DRY * ds["t"])) / (1.0 + L_V**2 * qs / (CP_DRY * R_V * ds["t"]**2))) # LHR diabatic heating rate (only where ω < 0, i.e. ascending) theta_dot_raw = ds["w"] * ( ds["theta_dp"] - (gamma_m / gamma_d) * (ds["theta"] / ds["theta_e"]) * ds["theta_e_dp"] ) ds["theta_dot"] = theta_dot_raw.where(ds["w"] < 0, 0.0) ds["theta_dot_dp"] = _ddp_da(ds["theta_dot"]) # relative vorticity ζ = ∂v/∂x − ∂u/∂y ds["v_dx"] = _ddx_periodic_da(ds.v) ds["u_dy"] = _ddy_da(ds.u) ds["zeta"] = ds["v_dx"] - ds["u_dy"] # Q = −g(f + ζ) ∂θ̇_LHR/∂p (vertical stretching only) ds["Q"] = -G0 * (ds["f"] + ds["zeta"]) * ds["theta_dot_dp"] ds = ds.assign_coords( longitude=((ds.longitude + 180) % 360) - 180, ).sortby("longitude") # ================================================================ # FFT Helmholtz on the TOTAL NH wind field (v2.0) # ================================================================ lat_nh = ds.latitude.values lon_nh = ds.longitude.values if lat_nh[0] > lat_nh[-1]: lat_asc = lat_nh[::-1] flip_lat = True else: lat_asc = lat_nh flip_lat = False ntimes = ds.sizes["valid_time"] nlevs = ds.sizes[plev] nlat_nh = ds.sizes["latitude"] nlon_nh = ds.sizes["longitude"] shape_4d = (ntimes, nlevs, nlat_nh, nlon_nh) u_rot_all = np.zeros(shape_4d, dtype=np.float32) u_div_all = np.zeros(shape_4d, dtype=np.float32) u_har_all = np.zeros(shape_4d, dtype=np.float32) v_rot_all = np.zeros(shape_4d, dtype=np.float32) v_div_all = np.zeros(shape_4d, dtype=np.float32) v_har_all = np.zeros(shape_4d, dtype=np.float32) # Helmholtz on total (u, v) — not (u_anom, v_anom) u_total_vals = ds["u"].values v_total_vals = ds["v"].values for ti in range(ntimes): for li in range(nlevs): u2d = u_total_vals[ti, li] v2d = v_total_vals[ti, li] if flip_lat: u2d = u2d[::-1] v2d = v2d[::-1] # method="spectral" → SH inversion via pyspharm/windspharm with # NH→global parity mirroring (u even, v odd), giving pole-closed # ψ/χ at 90 °N. Falls back to the conservative-form spherical-FFT # solver if pyspharm is not installed (warning emitted). helm = helmholtz_decomposition( u2d, v2d, lat_asc, lon_nh, R_earth=R_EARTH, method="spectral") if flip_lat: for key in ("u_rot", "u_div", "u_har", "v_rot", "v_div", "v_har"): helm[key] = helm[key][::-1] u_rot_all[ti, li] = helm["u_rot"] u_div_all[ti, li] = helm["u_div"] u_har_all[ti, li] = helm["u_har"] v_rot_all[ti, li] = helm["v_rot"] v_div_all[ti, li] = helm["v_div"] v_har_all[ti, li] = helm["v_har"] _log(f" FFT-NH Helmholtz (total wind) done: {ntimes} times × {nlevs} levels") # ── Store total Helmholtz fields ── dims4d = ("valid_time", plev, "latitude", "longitude") coords4d = ds["u"].coords for name, arr in [ ("u_rot", u_rot_all), ("u_div", u_div_all), ("v_rot", v_rot_all), ("v_div", v_div_all), ]: ds[name] = xr.DataArray(arr, dims=dims4d, coords=coords4d) # ================================================================ # Load climatological Helmholtz & compute anomaly by subtraction # ================================================================ # Gather unique months needed in this time window months_needed = sorted(set(ds.valid_time.dt.month.values.tolist())) clim_helm_cache: dict[int, dict[str, np.ndarray]] = {} for m in months_needed: clim_helm_cache[m] = load_helmholtz_climatology( cfg.clim_helmholtz_dir, m) # Build 4-D bar arrays by matching each timestep to its month/hour/level u_rot_bar_4d = np.zeros(shape_4d, dtype=np.float32) u_div_bar_4d = np.zeros(shape_4d, dtype=np.float32) v_rot_bar_4d = np.zeros(shape_4d, dtype=np.float32) v_div_bar_4d = np.zeros(shape_4d, dtype=np.float32) times = pd.to_datetime(ds.valid_time.values) for ti, t in enumerate(times): m = t.month hr = t.hour day = t.day # 1-based calendar day ch = clim_helm_cache[m] # Climatology files now have shape (nday, 24, nlev, nlat, nlon) # Index by (day-1, hour) for daily-hourly resolution di = day - 1 # 0-based day index if di < ch["u_rot_bar"].shape[0] and hr < ch["u_rot_bar"].shape[1]: u_rot_bar_4d[ti] = ch["u_rot_bar"][di, hr] u_div_bar_4d[ti] = ch["u_div_bar"][di, hr] v_rot_bar_4d[ti] = ch["v_rot_bar"][di, hr] v_div_bar_4d[ti] = ch["v_div_bar"][di, hr] for name, arr in [ ("u_rot_bar", u_rot_bar_4d), ("u_div_bar", u_div_bar_4d), ("v_rot_bar", v_rot_bar_4d), ("v_div_bar", v_div_bar_4d), ]: ds[name] = xr.DataArray(arr, dims=dims4d, coords=coords4d) # ── Anomaly Helmholtz by subtraction: u'_rot = u_rot − ū_rot ── ds["u_rot_anom"] = ds["u_rot"] - ds["u_rot_bar"] ds["u_div_anom"] = ds["u_div"] - ds["u_div_bar"] ds["u_har_anom"] = xr.DataArray(u_har_all, dims=dims4d, coords=coords4d) ds["v_rot_anom"] = ds["v_rot"] - ds["v_rot_bar"] ds["v_div_anom"] = ds["v_div"] - ds["v_div_bar"] ds["v_har_anom"] = xr.DataArray(v_har_all, dims=dims4d, coords=coords4d) _log(" Climatological Helmholtz loaded; anomaly Helmholtz computed by subtraction") return ds
# ============================================================================ # Grid / patching utilities # ============================================================================ class _GridInfo: """Container for grid metadata (cached per-worker).""" def __init__(self, lat, lon, lat_half, lon_half): dlat = float(abs(np.diff(lat).mean())) dlon = float(abs(np.diff(lon).mean())) self.lat = lat self.lon = lon self.LAT_PAD = int(round(lat_half / dlat)) self.LON_PAD = int(round(lon_half / dlon)) rlat = np.linspace(-lat_half, lat_half, 2 * self.LAT_PAD + 1) rlon = np.linspace(-lon_half, lon_half, 2 * self.LON_PAD + 1) self.Y_rel, self.X_rel = np.meshgrid(rlat, rlon, indexing="ij") self.lat_desc = bool(np.all(np.diff(lat) < 0)) def _nearest_idx(lat0, lon0, grid): ilat = int(np.abs(grid.lat - lat0).argmin()) ilon = int(np.abs(grid.lon - lon0).argmin()) ok = (ilat >= grid.LAT_PAD) and (ilat + grid.LAT_PAD < len(grid.lat)) return ilat, ilon, ok def _wrapped_lon_index(ilon, *, LON_PAD, nlon): start = ilon - LON_PAD return (np.arange(0, 2 * LON_PAD + 1) + start) % nlon def _patch_lon1d(ds, ilon, grid): nlon = ds.sizes["longitude"] idx = _wrapped_lon_index(ilon, LON_PAD=grid.LON_PAD, nlon=nlon) lon_seg = ds.longitude.values[idx] return np.rad2deg(np.unwrap(np.deg2rad(lon_seg))) def _patch_lat1d_full(ds, ilat, grid, eff_north, eff_south): nlat = ds.sizes["latitude"] full = 2 * grid.LAT_PAD + 1 out = np.full((full,), np.nan, dtype=float) if grid.lat_desc: i0 = max(0, ilat - eff_north) i1 = min(nlat, ilat + eff_south + 1) else: i0 = max(0, ilat - eff_south) i1 = min(nlat, ilat + eff_north + 1) seg = ds.latitude.isel(latitude=slice(i0, i1)).values if grid.lat_desc: seg = seg[::-1] y_eff = seg.shape[0] y0 = grid.LAT_PAD - eff_south out[y0:y0 + y_eff] = seg return out def _extract_cube_with_pads3d(ds, varnames, ts, ilat, ilon, levels, grid, eff_north, eff_south): plev = _plev_name(ds) nlon = ds.sizes["longitude"] lon_idx = xr.DataArray( _wrapped_lon_index(ilon, LON_PAD=grid.LON_PAD, nlon=nlon), dims=("x",)) ts_sel = ds.sel(valid_time=ts) nlat = ds.sizes["latitude"] if grid.lat_desc: i0 = max(0, ilat - eff_north) i1 = min(nlat, ilat + eff_south + 1) else: i0 = max(0, ilat - eff_south) i1 = min(nlat, ilat + eff_north + 1) parts = [] for v in varnames: da = (ts_sel[v] .sel({plev: levels}) .isel(latitude=slice(i0, i1)) .isel(longitude=lon_idx)) if grid.lat_desc: da = da.isel(latitude=slice(None, None, -1)) parts.append(da) stacked = xr.concat(parts, dim="__var__").compute() Y_full = 2 * grid.LAT_PAD + 1 Y_eff = stacked.sizes["latitude"] X = stacked.sizes["x"] L = stacked.sizes[plev] y0 = grid.LAT_PAD - eff_south out = {} for i, v in enumerate(varnames): arr = stacked.isel(__var__=i).values buf = np.full((L, Y_full, X), np.nan, dtype=arr.dtype) buf[:, y0:y0 + Y_eff, :] = arr out[v] = buf return out # ============================================================================ # TendencyComputer class # ============================================================================
[docs] class TendencyComputer: """Computes PV tendency terms for weather events. Parameterized by :class:`TendencyConfig` — works for both blocking and PRP event types without code duplication. Example:: cfg = TendencyConfig( event_type="blocking", csv_path=Path("events_blocking.csv"), ) tc = TendencyComputer(cfg) n = tc.process_event("onset", track_id=42, lat0=55.0, lon0=-30.0, base_ts=pd.Timestamp("2010-01-15")) print(f"Wrote {n} NPZ files.") """
[docs] def __init__(self, config: TendencyConfig) -> None: self.cfg = config self._clim: xr.Dataset | None = None self._grid: _GridInfo | None = None
def _get_clim(self) -> xr.Dataset: if self._clim is None: self._clim = load_climatology(self.cfg.clim_path, self.cfg.engine) return self._clim def _init_grid(self, ds: xr.Dataset) -> _GridInfo: if self._grid is None: lat = ds.latitude.values lon = ds.longitude.values self._grid = _GridInfo(lat, lon, self.cfg.lat_half, self.cfg.lon_half) return self._grid # ── public API ───────────────────────────────────────────────── def process_event( self, evt_name: str, track_id: int, lat0: float, lon0: float, base_ts: pd.Timestamp, ) -> int: """Process a single event and write NPZ files. Loads data **once** via :func:`with_derivs_for_window`, then iterates over ``cfg.rel_hours``, extracting a patch and computing QG omega + moist/dry + cross-terms + wavg per timestep. Returns the number of NPZ files written. """ written = 0 _log(f"\n--- Processing event: {track_id} " f"at ({lat0}, {lon0}) ---") clim_ds = self._get_clim() # ── Event-level skip: all NPZs already exist? ── if self.cfg.skip_existing: all_exist = all( self._out_path(evt_name, dh, track_id, base_ts + pd.Timedelta(hours=dh)).exists() for dh in self.cfg.rel_hours ) if all_exist: _log(f"-> Event {track_id}: all {len(self.cfg.rel_hours)} " f"NPZ(s) exist, skipping.") return 0 ds = with_derivs_for_window(base_ts, self.cfg, clim_ds) grid = self._init_grid(ds) ilat, ilon, ok = _nearest_idx(lat0, lon0, grid) _log(f"Nearest grid index: ilat={ilat}, ilon={ilon}, ok={ok}") nlat = len(grid.lat) if grid.lat_desc: eff_north = min(grid.LAT_PAD, ilat) eff_south = min(grid.LAT_PAD, nlat - 1 - ilat) else: eff_south = min(grid.LAT_PAD, ilat) eff_north = min(grid.LAT_PAD, nlat - 1 - ilat) if not self.cfg.partial_at_pole and ( eff_north < grid.LAT_PAD or eff_south < grid.LAT_PAD ): _log("-> Event skipped: Too close to boundary.") return 0 if eff_north <= 0 and eff_south <= 0: _log("-> Event skipped: zero latitude rows.") return 0 dt_index = pd.to_datetime(ds.valid_time.values) plev = _plev_name(ds) plevs_hpa = ds[plev].values levels = self.cfg.levels wavg_idx = [levels.index(l) for l in self.cfg.wavg_levels if l in levels] for dh in self.cfg.rel_hours: ts = base_ts + pd.Timedelta(hours=dh) if ts not in dt_index: continue out_fp = self._out_path(evt_name, dh, track_id, ts) if self.cfg.skip_existing and out_fp.exists(): written += 1 continue # Lagrangian centre if self.cfg.center_mode == "lagrangian": tracked_lat, tracked_lon = get_tracked_center( self.cfg.track_file, track_id, ts) if tracked_lat is None: current_lat, current_lon = lat0, lon0 else: current_lat, current_lon = tracked_lat, tracked_lon else: current_lat, current_lon = lat0, lon0 ilat_c, ilon_c, ok_c = _nearest_idx( current_lat, current_lon, grid) if not ok_c and not self.cfg.partial_at_pole: continue if grid.lat_desc: en_c = min(grid.LAT_PAD, ilat_c) es_c = min(grid.LAT_PAD, nlat - 1 - ilat_c) else: es_c = min(grid.LAT_PAD, ilat_c) en_c = min(grid.LAT_PAD, nlat - 1 - ilat_c) if en_c <= 0 and es_c <= 0: continue lat_vec_full = _patch_lat1d_full(ds, ilat_c, grid, en_c, es_c) lon_unwrapped = _patch_lon1d(ds, ilon_c, grid) cube3d = _extract_cube_with_pads3d( ds, _EXTRACT_VARS, ts, ilat_c, ilon_c, levels, grid, en_c, es_c) # --- Patch-level QG omega + moist/dry --- nh_data = None if self.cfg.qg_omega_method == "log20": snap = ds.sel(valid_time=ts) nh_data = { "z": snap["z"].values, "t": snap["t"].values, "w": snap["w"].values, "t_dt": snap["t_dt"].values, "u": snap["u"].values, "v": snap["v"].values, "theta_dot": snap["theta_dot"].values, "theta": snap["theta"].values, "lat": ds.latitude.values, "lon": ds.longitude.values, } _qg_diabatic_adiabatic_on_patch( cube3d, lat_vec_full, lon_unwrapped, plevs_hpa, center_lat=current_lat, qg_method=self.cfg.qg_omega_method, nh_data=nh_data) # NaN safety on adiabatic/diabatic decomposition outputs for _key in ("w_adiabatic", "w_diabatic", "w_qg_diabatic", "w_lhr_moist", "u_div_diabatic", "v_div_diabatic", "u_div_adiabatic", "v_div_adiabatic", "u_div_qg_diabatic", "v_div_qg_diabatic", "u_div_lhr_moist", "v_div_lhr_moist", "q", "t_dt"): if _key in cube3d: cube3d[_key] = np.nan_to_num( cube3d[_key], nan=0.0, posinf=0.0, neginf=0.0) z_m_3d = cube3d["z"] / G0 def vwm(arrL, *, z_m_3d=z_m_3d, wavg_idx=wavg_idx): arr_w = arrL[wavg_idx] z_w = z_m_3d[wavg_idx] wt = np.exp(-z_w / H_SCALE) num = np.nansum(arr_w * wt, axis=0) den = np.nansum(wt, axis=0) out = np.full_like(num, np.nan) mask = den > 0 out[mask] = num[mask] / den[mask] return out vw = partial(vwm, z_m_3d=z_m_3d, wavg_idx=wavg_idx) # ──────────────────────────────────────────── # 3-D cross terms (53-term v2.0 catalog) # ──────────────────────────────────────────── # ── 12 base (bar/anom × bar/anom) ── uanom_pvbar_dx_3d = cube3d["u_anom"] * cube3d["pv_bar_dx"] uanom_pvanom_dx_3d = cube3d["u_anom"] * cube3d["pv_anom_dx"] ubar_pvanom_dx_3d = cube3d["u_bar"] * cube3d["pv_anom_dx"] ubar_pvbar_dx_3d = cube3d["u_bar"] * cube3d["pv_bar_dx"] vanom_pvbar_dy_3d = cube3d["v_anom"] * cube3d["pv_bar_dy"] vanom_pvanom_dy_3d = cube3d["v_anom"] * cube3d["pv_anom_dy"] vbar_pvanom_dy_3d = cube3d["v_bar"] * cube3d["pv_anom_dy"] vbar_pvbar_dy_3d = cube3d["v_bar"] * cube3d["pv_bar_dy"] wanom_pvbar_dp_3d = cube3d["w_anom"] * cube3d["pv_bar_dp"] wanom_pvanom_dp_3d = cube3d["w_anom"] * cube3d["pv_anom_dp"] wbar_pvanom_dp_3d = cube3d["w_bar"] * cube3d["pv_anom_dp"] wbar_pvbar_dp_3d = cube3d["w_bar"] * cube3d["pv_bar_dp"] # ── 16 Helmholtz primary (anom + bar rot/div) ── urot_anom_pvbar_dx_3d = cube3d["u_rot_anom"] * cube3d["pv_bar_dx"] urot_anom_pvanom_dx_3d = cube3d["u_rot_anom"] * cube3d["pv_anom_dx"] udiv_anom_pvbar_dx_3d = cube3d["u_div_anom"] * cube3d["pv_bar_dx"] udiv_anom_pvanom_dx_3d = cube3d["u_div_anom"] * cube3d["pv_anom_dx"] urot_bar_pvbar_dx_3d = cube3d["u_rot_bar"] * cube3d["pv_bar_dx"] urot_bar_pvanom_dx_3d = cube3d["u_rot_bar"] * cube3d["pv_anom_dx"] udiv_bar_pvbar_dx_3d = cube3d["u_div_bar"] * cube3d["pv_bar_dx"] udiv_bar_pvanom_dx_3d = cube3d["u_div_bar"] * cube3d["pv_anom_dx"] vrot_anom_pvbar_dy_3d = cube3d["v_rot_anom"] * cube3d["pv_bar_dy"] vrot_anom_pvanom_dy_3d = cube3d["v_rot_anom"] * cube3d["pv_anom_dy"] vdiv_anom_pvbar_dy_3d = cube3d["v_div_anom"] * cube3d["pv_bar_dy"] vdiv_anom_pvanom_dy_3d = cube3d["v_div_anom"] * cube3d["pv_anom_dy"] vrot_bar_pvbar_dy_3d = cube3d["v_rot_bar"] * cube3d["pv_bar_dy"] vrot_bar_pvanom_dy_3d = cube3d["v_rot_bar"] * cube3d["pv_anom_dy"] vdiv_bar_pvbar_dy_3d = cube3d["v_div_bar"] * cube3d["pv_bar_dy"] vdiv_bar_pvanom_dy_3d = cube3d["v_div_bar"] * cube3d["pv_anom_dy"] # ── 16 divergent adiabatic/diabatic horizontal ── udm_pvbar_dx_3d = cube3d["u_div_diabatic"] * cube3d["pv_bar_dx"] udm_pvanom_dx_3d = cube3d["u_div_diabatic"] * cube3d["pv_anom_dx"] udd_pvbar_dx_3d = cube3d["u_div_adiabatic"] * cube3d["pv_bar_dx"] udd_pvanom_dx_3d = cube3d["u_div_adiabatic"] * cube3d["pv_anom_dx"] vdm_pvbar_dy_3d = cube3d["v_div_diabatic"] * cube3d["pv_bar_dy"] vdm_pvanom_dy_3d = cube3d["v_div_diabatic"] * cube3d["pv_anom_dy"] vdd_pvbar_dy_3d = cube3d["v_div_adiabatic"] * cube3d["pv_bar_dy"] vdd_pvanom_dy_3d = cube3d["v_div_adiabatic"] * cube3d["pv_anom_dy"] udqm_pvbar_dx_3d = cube3d["u_div_qg_diabatic"] * cube3d["pv_bar_dx"] udqm_pvanom_dx_3d = cube3d["u_div_qg_diabatic"] * cube3d["pv_anom_dx"] vdqm_pvbar_dy_3d = cube3d["v_div_qg_diabatic"] * cube3d["pv_bar_dy"] vdqm_pvanom_dy_3d = cube3d["v_div_qg_diabatic"] * cube3d["pv_anom_dy"] udem_pvbar_dx_3d = cube3d["u_div_lhr_moist"] * cube3d["pv_bar_dx"] udem_pvanom_dx_3d = cube3d["u_div_lhr_moist"] * cube3d["pv_anom_dx"] vdem_pvbar_dy_3d = cube3d["v_div_lhr_moist"] * cube3d["pv_bar_dy"] vdem_pvanom_dy_3d = cube3d["v_div_lhr_moist"] * cube3d["pv_anom_dy"] # ── 8 alt vertical (adiabatic/diabatic/qg/lhr omega) ── w_dry_pvbar_dp_3d = cube3d["w_adiabatic"] * cube3d["pv_bar_dp"] w_dry_pvanom_dp_3d = cube3d["w_adiabatic"] * cube3d["pv_anom_dp"] w_moist_pvbar_dp_3d = cube3d["w_diabatic"] * cube3d["pv_bar_dp"] w_moist_pvanom_dp_3d = cube3d["w_diabatic"] * cube3d["pv_anom_dp"] w_qgm_pvbar_dp_3d = cube3d["w_qg_diabatic"] * cube3d["pv_bar_dp"] w_qgm_pvanom_dp_3d = cube3d["w_qg_diabatic"] * cube3d["pv_anom_dp"] w_em_pvbar_dp_3d = cube3d["w_lhr_moist"] * cube3d["pv_bar_dp"] w_em_pvanom_dp_3d = cube3d["w_lhr_moist"] * cube3d["pv_anom_dp"] # ── 1 diabatic (Q_LHR) — already in cube3d["Q"] ── # ──────────────────────────────────────────── # Write NPZ (atomic via tempfile) # ──────────────────────────────────────────── out_fp.parent.mkdir(parents=True, exist_ok=True) _log(f" Writing {out_fp} ...") # 300-hPa level index for QG-omega blowup watch (Phase 7 v2) _lvl300 = int(np.argmin(np.abs(np.asarray(levels) - 300))) with tempfile.NamedTemporaryFile( dir=out_fp.parent, prefix=out_fp.stem + ".", suffix=".npz", delete=False, ) as tf: tmp_name = tf.name np.savez_compressed( tf, # ── Metadata ── Y_rel=grid.Y_rel, X_rel=grid.X_rel, levels=np.array(levels, dtype=np.int32), wavg_levels=np.array(self.cfg.wavg_levels, dtype=np.int32), H_SCALE=float(H_SCALE), G0=float(G0), lat_vec=lat_vec_full.astype(float), lon_vec_unwrapped=lon_unwrapped.astype(float), track_id=int(track_id), lat0=float(lat0), lon0=float(lon0), center_lat=float(current_lat), center_lon=float(current_lon), center_mode=self.cfg.center_mode, ts=str(ts), dh=int(dh), # ── 2-D wavg fields ── pv_dt=vw(cube3d["pv_dt"]), pv=vw(cube3d["pv"]), z=vw(z_m_3d), u=vw(cube3d["u"]), v=vw(cube3d["v"]), w=vw(cube3d["w"]), pv_dx=vw(cube3d["pv_total_dx"]), pv_dy=vw(cube3d["pv_total_dy"]), pv_dp=vw(cube3d["pv_total_dp"]), pv_dx_dx=vw(cube3d["pv_total_dx_dx"]), pv_dy_dy=vw(cube3d["pv_total_dy_dy"]), pv_dx_dy=vw(cube3d["pv_total_dx_dy"]), u_bar=vw(cube3d["u_bar"]), v_bar=vw(cube3d["v_bar"]), w_bar=vw(cube3d["w_bar"]), pv_bar=vw(cube3d["pv_bar"]), u_anom=vw(cube3d["u_anom"]), v_anom=vw(cube3d["v_anom"]), w_anom=vw(cube3d["w_anom"]), pv_anom=vw(cube3d["pv_anom"]), pv_bar_dx=vw(cube3d["pv_bar_dx"]), pv_bar_dy=vw(cube3d["pv_bar_dy"]), pv_bar_dp=vw(cube3d["pv_bar_dp"]), pv_bar_dt=vw(cube3d["pv_bar_dt"]), pv_anom_dx=vw(cube3d["pv_anom_dx"]), pv_anom_dy=vw(cube3d["pv_anom_dy"]), pv_anom_dp=vw(cube3d["pv_anom_dp"]), pv_anom_dt=vw(cube3d["pv_anom_dt"]), t=vw(cube3d["t"]), theta=vw(cube3d["theta"]), theta_dt=vw(cube3d["theta_dt"]), theta_dot=vw(cube3d["theta_dot"]), Q=vw(cube3d["Q"]), # Total Helmholtz (v2.0) u_rot=vw(cube3d["u_rot"]), u_div=vw(cube3d["u_div"]), v_rot=vw(cube3d["v_rot"]), v_div=vw(cube3d["v_div"]), # Climatological Helmholtz (v2.0) u_rot_bar=vw(cube3d["u_rot_bar"]), u_div_bar=vw(cube3d["u_div_bar"]), v_rot_bar=vw(cube3d["v_rot_bar"]), v_div_bar=vw(cube3d["v_div_bar"]), # Anomaly Helmholtz u_rot_anom=vw(cube3d["u_rot_anom"]), u_div_anom=vw(cube3d["u_div_anom"]), u_har_anom=vw(cube3d["u_har_anom"]), v_rot_anom=vw(cube3d["v_rot_anom"]), v_div_anom=vw(cube3d["v_div_anom"]), v_har_anom=vw(cube3d["v_har_anom"]), u_div_diabatic=vw(cube3d["u_div_diabatic"]), v_div_diabatic=vw(cube3d["v_div_diabatic"]), u_div_adiabatic=vw(cube3d["u_div_adiabatic"]), v_div_adiabatic=vw(cube3d["v_div_adiabatic"]), w_adiabatic=vw(cube3d["w_adiabatic"]), w_diabatic=vw(cube3d["w_diabatic"]), w_qg_diabatic=vw(cube3d["w_qg_diabatic"]), u_div_qg_diabatic=vw(cube3d["u_div_qg_diabatic"]), v_div_qg_diabatic=vw(cube3d["v_div_qg_diabatic"]), w_lhr_moist=vw(cube3d["w_lhr_moist"]), # ── QG-omega solver blowup watch (Phase 7 v2) ── # Patch max |ω| at 300 hPa for each solver-derived ω. # Empirical raw-ERA5 envelope at 300 hPa over # 1990-2020 hourly: max=22.4 Pa/s, 99.9th=19.9 Pa/s. # We flag QG/Emanuel solver output > 25 Pa/s as # solver pathology (raw ω essentially never exceeds # this; 25 Pa/s = raw_max + ~10 % headroom). max_abs_w_adiabatic_300=np.float32( np.nanmax(np.abs(cube3d["w_adiabatic"][_lvl300]))), max_abs_w_diabatic_300=np.float32( np.nanmax(np.abs(cube3d["w_diabatic"][_lvl300]))), max_abs_w_qg_diabatic_300=np.float32( np.nanmax(np.abs(cube3d["w_qg_diabatic"][_lvl300]))), max_abs_w_lhr_moist_300=np.float32( np.nanmax(np.abs(cube3d["w_lhr_moist"][_lvl300]))), u_div_lhr_moist=vw(cube3d["u_div_lhr_moist"]), v_div_lhr_moist=vw(cube3d["v_div_lhr_moist"]), q=vw(cube3d["q"]), t_dt=vw(cube3d["t_dt"]), # ── 2-D cross terms (53-term v2.0 catalog) ── # 12 base u_anom_pv_bar_dx=vw(uanom_pvbar_dx_3d), u_anom_pv_anom_dx=vw(uanom_pvanom_dx_3d), u_bar_pv_anom_dx=vw(ubar_pvanom_dx_3d), u_bar_pv_bar_dx=vw(ubar_pvbar_dx_3d), v_anom_pv_bar_dy=vw(vanom_pvbar_dy_3d), v_anom_pv_anom_dy=vw(vanom_pvanom_dy_3d), v_bar_pv_anom_dy=vw(vbar_pvanom_dy_3d), v_bar_pv_bar_dy=vw(vbar_pvbar_dy_3d), w_anom_pv_bar_dp=vw(wanom_pvbar_dp_3d), w_anom_pv_anom_dp=vw(wanom_pvanom_dp_3d), w_bar_pv_anom_dp=vw(wbar_pvanom_dp_3d), w_bar_pv_bar_dp=vw(wbar_pvbar_dp_3d), # 16 Helmholtz (anom + bar rot/div) u_rot_anom_pv_bar_dx=vw(urot_anom_pvbar_dx_3d), u_rot_anom_pv_anom_dx=vw(urot_anom_pvanom_dx_3d), u_div_anom_pv_bar_dx=vw(udiv_anom_pvbar_dx_3d), u_div_anom_pv_anom_dx=vw(udiv_anom_pvanom_dx_3d), u_rot_bar_pv_bar_dx=vw(urot_bar_pvbar_dx_3d), u_rot_bar_pv_anom_dx=vw(urot_bar_pvanom_dx_3d), u_div_bar_pv_bar_dx=vw(udiv_bar_pvbar_dx_3d), u_div_bar_pv_anom_dx=vw(udiv_bar_pvanom_dx_3d), v_rot_anom_pv_bar_dy=vw(vrot_anom_pvbar_dy_3d), v_rot_anom_pv_anom_dy=vw(vrot_anom_pvanom_dy_3d), v_div_anom_pv_bar_dy=vw(vdiv_anom_pvbar_dy_3d), v_div_anom_pv_anom_dy=vw(vdiv_anom_pvanom_dy_3d), v_rot_bar_pv_bar_dy=vw(vrot_bar_pvbar_dy_3d), v_rot_bar_pv_anom_dy=vw(vrot_bar_pvanom_dy_3d), v_div_bar_pv_bar_dy=vw(vdiv_bar_pvbar_dy_3d), v_div_bar_pv_anom_dy=vw(vdiv_bar_pvanom_dy_3d), # 16 divergent adiabatic/diabatic horizontal u_div_diabatic_pv_bar_dx=vw(udm_pvbar_dx_3d), u_div_diabatic_pv_anom_dx=vw(udm_pvanom_dx_3d), u_div_adiabatic_pv_bar_dx=vw(udd_pvbar_dx_3d), u_div_adiabatic_pv_anom_dx=vw(udd_pvanom_dx_3d), v_div_diabatic_pv_bar_dy=vw(vdm_pvbar_dy_3d), v_div_diabatic_pv_anom_dy=vw(vdm_pvanom_dy_3d), v_div_adiabatic_pv_bar_dy=vw(vdd_pvbar_dy_3d), v_div_adiabatic_pv_anom_dy=vw(vdd_pvanom_dy_3d), u_div_qg_diabatic_pv_bar_dx=vw(udqm_pvbar_dx_3d), u_div_qg_diabatic_pv_anom_dx=vw(udqm_pvanom_dx_3d), v_div_qg_diabatic_pv_bar_dy=vw(vdqm_pvbar_dy_3d), v_div_qg_diabatic_pv_anom_dy=vw(vdqm_pvanom_dy_3d), u_div_lhr_moist_pv_bar_dx=vw(udem_pvbar_dx_3d), u_div_lhr_moist_pv_anom_dx=vw(udem_pvanom_dx_3d), v_div_lhr_moist_pv_bar_dy=vw(vdem_pvbar_dy_3d), v_div_lhr_moist_pv_anom_dy=vw(vdem_pvanom_dy_3d), # 8 alt vertical w_adiabatic_pv_bar_dp=vw(w_dry_pvbar_dp_3d), w_adiabatic_pv_anom_dp=vw(w_dry_pvanom_dp_3d), w_diabatic_pv_bar_dp=vw(w_moist_pvbar_dp_3d), w_diabatic_pv_anom_dp=vw(w_moist_pvanom_dp_3d), w_qg_diabatic_pv_bar_dp=vw(w_qgm_pvbar_dp_3d), w_qg_diabatic_pv_anom_dp=vw(w_qgm_pvanom_dp_3d), w_lhr_moist_pv_bar_dp=vw(w_em_pvbar_dp_3d), w_lhr_moist_pv_anom_dp=vw(w_em_pvanom_dp_3d), # ── 3-D per-level cubes ── z_3d=z_m_3d, pv_3d=cube3d["pv"], u_3d=cube3d["u"], v_3d=cube3d["v"], w_3d=cube3d["w"], t_3d=cube3d["t"], pv_dt_3d=cube3d["pv_dt"], pv_dx_3d=cube3d["pv_total_dx"], pv_dy_3d=cube3d["pv_total_dy"], pv_dp_3d=cube3d["pv_total_dp"], pv_dx_dx_3d=cube3d["pv_total_dx_dx"], pv_dy_dy_3d=cube3d["pv_total_dy_dy"], pv_dx_dy_3d=cube3d["pv_total_dx_dy"], u_bar_3d=cube3d["u_bar"], v_bar_3d=cube3d["v_bar"], w_bar_3d=cube3d["w_bar"], pv_bar_3d=cube3d["pv_bar"], u_anom_3d=cube3d["u_anom"], v_anom_3d=cube3d["v_anom"], w_anom_3d=cube3d["w_anom"], pv_anom_3d=cube3d["pv_anom"], pv_bar_dx_3d=cube3d["pv_bar_dx"], pv_bar_dy_3d=cube3d["pv_bar_dy"], pv_bar_dp_3d=cube3d["pv_bar_dp"], pv_bar_dt_3d=cube3d["pv_bar_dt"], pv_anom_dx_3d=cube3d["pv_anom_dx"], pv_anom_dy_3d=cube3d["pv_anom_dy"], pv_anom_dp_3d=cube3d["pv_anom_dp"], pv_anom_dt_3d=cube3d["pv_anom_dt"], theta_3d=cube3d["theta"], theta_dt_3d=cube3d["theta_dt"], theta_dot_3d=cube3d["theta_dot"], Q_3d=cube3d["Q"], # Total Helmholtz 3-D u_rot_3d=cube3d["u_rot"], u_div_3d_helm=cube3d["u_div"], v_rot_3d=cube3d["v_rot"], v_div_3d_helm=cube3d["v_div"], # Clim Helmholtz 3-D u_rot_bar_3d=cube3d["u_rot_bar"], u_div_bar_3d=cube3d["u_div_bar"], v_rot_bar_3d=cube3d["v_rot_bar"], v_div_bar_3d=cube3d["v_div_bar"], # Anomaly Helmholtz 3-D u_rot_anom_3d=cube3d["u_rot_anom"], u_div_anom_3d=cube3d["u_div_anom"], u_har_anom_3d=cube3d["u_har_anom"], v_rot_anom_3d=cube3d["v_rot_anom"], v_div_anom_3d=cube3d["v_div_anom"], v_har_anom_3d=cube3d["v_har_anom"], u_div_diabatic_3d=cube3d["u_div_diabatic"], v_div_diabatic_3d=cube3d["v_div_diabatic"], u_div_adiabatic_3d=cube3d["u_div_adiabatic"], v_div_adiabatic_3d=cube3d["v_div_adiabatic"], w_adiabatic_3d=cube3d["w_adiabatic"], w_diabatic_3d=cube3d["w_diabatic"], w_qg_diabatic_3d=cube3d["w_qg_diabatic"], u_div_qg_diabatic_3d=cube3d["u_div_qg_diabatic"], v_div_qg_diabatic_3d=cube3d["v_div_qg_diabatic"], w_lhr_moist_3d=cube3d["w_lhr_moist"], u_div_lhr_moist_3d=cube3d["u_div_lhr_moist"], v_div_lhr_moist_3d=cube3d["v_div_lhr_moist"], q_3d=cube3d["q"], t_dt_3d=cube3d["t_dt"], # Cross-terms 3-D u_anom_pv_bar_dx_3d=uanom_pvbar_dx_3d, u_anom_pv_anom_dx_3d=uanom_pvanom_dx_3d, u_bar_pv_anom_dx_3d=ubar_pvanom_dx_3d, u_bar_pv_bar_dx_3d=ubar_pvbar_dx_3d, v_anom_pv_bar_dy_3d=vanom_pvbar_dy_3d, v_anom_pv_anom_dy_3d=vanom_pvanom_dy_3d, v_bar_pv_anom_dy_3d=vbar_pvanom_dy_3d, v_bar_pv_bar_dy_3d=vbar_pvbar_dy_3d, w_anom_pv_bar_dp_3d=wanom_pvbar_dp_3d, w_anom_pv_anom_dp_3d=wanom_pvanom_dp_3d, w_bar_pv_anom_dp_3d=wbar_pvanom_dp_3d, w_bar_pv_bar_dp_3d=wbar_pvbar_dp_3d, u_rot_anom_pv_bar_dx_3d=urot_anom_pvbar_dx_3d, u_rot_anom_pv_anom_dx_3d=urot_anom_pvanom_dx_3d, u_div_anom_pv_bar_dx_3d=udiv_anom_pvbar_dx_3d, u_div_anom_pv_anom_dx_3d=udiv_anom_pvanom_dx_3d, u_rot_bar_pv_bar_dx_3d=urot_bar_pvbar_dx_3d, u_rot_bar_pv_anom_dx_3d=urot_bar_pvanom_dx_3d, u_div_bar_pv_bar_dx_3d=udiv_bar_pvbar_dx_3d, u_div_bar_pv_anom_dx_3d=udiv_bar_pvanom_dx_3d, v_rot_anom_pv_bar_dy_3d=vrot_anom_pvbar_dy_3d, v_rot_anom_pv_anom_dy_3d=vrot_anom_pvanom_dy_3d, v_div_anom_pv_bar_dy_3d=vdiv_anom_pvbar_dy_3d, v_div_anom_pv_anom_dy_3d=vdiv_anom_pvanom_dy_3d, v_rot_bar_pv_bar_dy_3d=vrot_bar_pvbar_dy_3d, v_rot_bar_pv_anom_dy_3d=vrot_bar_pvanom_dy_3d, v_div_bar_pv_bar_dy_3d=vdiv_bar_pvbar_dy_3d, v_div_bar_pv_anom_dy_3d=vdiv_bar_pvanom_dy_3d, u_div_diabatic_pv_bar_dx_3d=udm_pvbar_dx_3d, u_div_diabatic_pv_anom_dx_3d=udm_pvanom_dx_3d, u_div_adiabatic_pv_bar_dx_3d=udd_pvbar_dx_3d, u_div_adiabatic_pv_anom_dx_3d=udd_pvanom_dx_3d, v_div_diabatic_pv_bar_dy_3d=vdm_pvbar_dy_3d, v_div_diabatic_pv_anom_dy_3d=vdm_pvanom_dy_3d, v_div_adiabatic_pv_bar_dy_3d=vdd_pvbar_dy_3d, v_div_adiabatic_pv_anom_dy_3d=vdd_pvanom_dy_3d, w_adiabatic_pv_bar_dp_3d=w_dry_pvbar_dp_3d, w_adiabatic_pv_anom_dp_3d=w_dry_pvanom_dp_3d, w_diabatic_pv_bar_dp_3d=w_moist_pvbar_dp_3d, w_diabatic_pv_anom_dp_3d=w_moist_pvanom_dp_3d, u_div_qg_diabatic_pv_bar_dx_3d=udqm_pvbar_dx_3d, u_div_qg_diabatic_pv_anom_dx_3d=udqm_pvanom_dx_3d, v_div_qg_diabatic_pv_bar_dy_3d=vdqm_pvbar_dy_3d, v_div_qg_diabatic_pv_anom_dy_3d=vdqm_pvanom_dy_3d, w_qg_diabatic_pv_bar_dp_3d=w_qgm_pvbar_dp_3d, w_qg_diabatic_pv_anom_dp_3d=w_qgm_pvanom_dp_3d, u_div_lhr_moist_pv_bar_dx_3d=udem_pvbar_dx_3d, u_div_lhr_moist_pv_anom_dx_3d=udem_pvanom_dx_3d, v_div_lhr_moist_pv_bar_dy_3d=vdem_pvbar_dy_3d, v_div_lhr_moist_pv_anom_dy_3d=vdem_pvanom_dy_3d, w_lhr_moist_pv_bar_dp_3d=w_em_pvbar_dp_3d, w_lhr_moist_pv_anom_dp_3d=w_em_pvanom_dp_3d, ) os.replace(tmp_name, str(out_fp)) written += 1 gc.collect() _log(f"-> Event {track_id}: wrote {written} NPZ(s).") return written # ── output path ──────────────────────────────────────────────── def _out_path( self, evt: str, dh: int, track_id: int, ts: pd.Timestamp, ) -> Path: """Compute the output NPZ file path.""" return ( Path(self.cfg.output_dir) / evt / f"dh={dh:+d}" / f"track_{track_id}_{ts.strftime('%Y%m%d%H')}_dh{dh:+d}.npz" )