Source code for pvtend.io.era5

"""ERA5 monthly NetCDF file loading.

Supports the file naming convention era5_{var}_{YYYY}_{MM}.nc
and handles CDS artefacts (expver, number dimensions).
"""

from __future__ import annotations
from pathlib import Path
from typing import Sequence

import xarray as xr


[docs] def load_era5_month( data_dir: str | Path, year: int, month: int, variables: Sequence[str], *, engine: str = "netcdf4", chunks: dict | None = None, ) -> xr.Dataset: """Load ERA5 monthly data for specified variables. Parameters: data_dir: Directory with era5_{var}_{YYYY}_{MM}.nc files. year: Year. month: Month (1-12). variables: Variable names to load. engine: NetCDF engine. chunks: Dask chunks (None for eager loading). Returns: Merged xarray Dataset. """ data_dir = Path(data_dir) parts = [] for var in variables: fp = data_dir / f"era5_{var}_{year}_{month:02d}.nc" if not fp.exists(): raise FileNotFoundError(f"Missing: {fp}") ds = xr.open_dataset(fp, engine=engine, chunks=chunks) ds = _drop_cds_artefacts(ds) ds = _ensure_valid_time(ds) if "level" in ds.dims and "pressure_level" not in ds.dims: ds = ds.rename({"level": "pressure_level"}) ds = ds[[var]] parts.append(ds) merged = xr.merge(parts, compat="no_conflicts", join="inner") merged = merged.assign_coords( longitude=((merged.longitude + 180) % 360) - 180, ).sortby("longitude") return merged
[docs] def open_months_dataset( data_dir: str | Path, var_list: Sequence[str], month_keys: Sequence[tuple[int, int]], *, engine: str = "netcdf4", chunks: dict | None = None, ) -> xr.Dataset: """Open multiple months of ERA5 data as a single dataset. Parameters: data_dir: Directory with ERA5 monthly files. var_list: Variable names. month_keys: List of (year, month) tuples. engine: NetCDF engine. chunks: Dask chunks. Returns: Merged xarray Dataset spanning all requested months. """ data_dir = Path(data_dir) parts = [] for var in var_list: files = [] for y, m in month_keys: fp = data_dir / f"era5_{var}_{y}_{m:02d}.nc" if fp.exists(): files.append(str(fp)) if not files: raise FileNotFoundError(f"No files for {var} in {month_keys}") dsv = xr.open_mfdataset( files, combine="by_coords", parallel=False, chunks=chunks, engine=engine, ) dsv = _drop_cds_artefacts(dsv) dsv = _ensure_valid_time(dsv) if "level" in dsv.dims and "pressure_level" not in dsv.dims: dsv = dsv.rename({"level": "pressure_level"}) dsv = dsv[[var]] parts.append(dsv) ds = xr.merge(parts, compat="no_conflicts", join="inner") ds = ds.assign_coords( longitude=((ds.longitude + 180) % 360) - 180, ).sortby("longitude") return ds
def _drop_cds_artefacts(ds: xr.Dataset) -> xr.Dataset: """Remove CDS artefact variables (number, expver).""" to_drop = [v for v in ("number", "expver") if v in ds.coords or v in ds.data_vars] if to_drop: ds = ds.drop_vars(to_drop, errors="ignore") return ds def _ensure_valid_time(ds: xr.Dataset) -> xr.Dataset: """Normalize time dimension name to 'valid_time'.""" if "valid_time" in ds.coords: return ds if "time" in ds.coords: return ds.rename({"time": "valid_time"}) raise KeyError("Neither 'valid_time' nor 'time' coordinate present.")