Source code for PyHydroGeophysX.visualization.animation

"""Time-lapse animation utilities for geophysical models."""

import io
import os
from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union

import matplotlib.pyplot as plt
import numpy as np


[docs]def create_timelapse_gif( mesh, models: Sequence[np.ndarray], filename: str, *, titles: Optional[Sequence[str]] = None, cmap=None, cmin: Optional[float] = None, cmax: Optional[float] = None, log_scale: bool = False, label: str = "", xlabel: str = "Distance (m)", ylabel: str = "Elevation (m)", coverage: Optional[Union[np.ndarray, Sequence[np.ndarray]]] = None, figsize: Tuple[float, float] = (8, 2.5), dpi: int = 150, duration: int = 100, first_frame_duration: int = 500, loop: int = 0, ) -> str: """Create a GIF animation of time-lapse model snapshots. Each frame renders the model on the given mesh using ``pg.show`` and captures it as a PIL image. Requires ``Pillow``. Parameters ---------- mesh : pygimli.Mesh Mesh shared by all models. models : sequence of array-like Model values for each frame. filename : str Output GIF path. titles : sequence of str, optional Title per frame. cmap : str or Colormap, optional Colormap. Defaults to BlueDarkRed18_18_r if available. cmin, cmax : float, optional Fixed color limits. log_scale : bool Logarithmic color scale. label : str Colorbar label. coverage : array or list of arrays, optional Coverage mask(s). figsize : tuple Figure size per frame. dpi : int Resolution. duration : int Milliseconds per frame. first_frame_duration : int Duration of the first frame in ms (longer for visual pause). loop : int Number of loops (0 = infinite). Returns ------- str Path to the saved GIF file. """ import pygimli as pg from PIL import Image from .plotting import _get_cmap cmap = _get_cmap(cmap) # Prepare coverage if coverage is not None: cov_arr = np.asarray(coverage) single_cov = cov_arr.ndim == 1 else: cov_arr = None single_cov = False frames = [] for i, model in enumerate(models): fig, ax = plt.subplots(figsize=figsize) arr = np.asarray(model, dtype=float).ravel() kw = dict( cMap=cmap, logScale=log_scale, label=label, xlabel=xlabel, ylabel=ylabel, orientation="vertical", pad=0.3, ) if cmin is not None: kw["cMin"] = cmin if cmax is not None: kw["cMax"] = cmax if cov_arr is not None: c = cov_arr if single_cov else cov_arr[i] kw["coverage"] = np.asarray(c).ravel() > -1 pg.show(mesh, arr, ax=ax, **kw) if titles is not None and i < len(titles): ax.set_title(titles[i]) buf = io.BytesIO() fig.savefig(buf, format="png", dpi=dpi, bbox_inches="tight") plt.close(fig) buf.seek(0) frames.append(Image.open(buf).copy()) durations = [first_frame_duration] + [duration] * (len(frames) - 1) frames[0].save( filename, format="GIF", append_images=frames[1:], save_all=True, duration=durations, loop=loop, optimize=True, ) return filename
[docs]def create_timelapse_mp4( mesh, models: Sequence[np.ndarray], filename: str, *, titles: Optional[Sequence[str]] = None, cmap=None, cmin: Optional[float] = None, cmax: Optional[float] = None, log_scale: bool = False, label: str = "", xlabel: str = "Distance (m)", ylabel: str = "Elevation (m)", coverage: Optional[Union[np.ndarray, Sequence[np.ndarray]]] = None, figsize: Tuple[float, float] = (8, 2.5), dpi: int = 150, fps: int = 10, ) -> str: """Create an MP4 video of time-lapse model snapshots using matplotlib. Requires ``ffmpeg`` to be available on the system path. Parameters ---------- mesh : pygimli.Mesh models : sequence of array-like filename : str Output ``.mp4`` path. fps : int Frames per second. (other parameters same as :func:`create_timelapse_gif`) Returns ------- str Path to the saved MP4 file. """ import pygimli as pg from matplotlib.animation import FuncAnimation, FFMpegWriter from .plotting import _get_cmap cmap = _get_cmap(cmap) if coverage is not None: cov_arr = np.asarray(coverage) single_cov = cov_arr.ndim == 1 else: cov_arr = None single_cov = False fig, ax = plt.subplots(figsize=figsize) def _draw(i): ax.clear() arr = np.asarray(models[i], dtype=float).ravel() kw = dict( cMap=cmap, logScale=log_scale, label=label, xlabel=xlabel, ylabel=ylabel, orientation="vertical", pad=0.3, ) if cmin is not None: kw["cMin"] = cmin if cmax is not None: kw["cMax"] = cmax if cov_arr is not None: c = cov_arr if single_cov else cov_arr[i] kw["coverage"] = np.asarray(c).ravel() > -1 pg.show(mesh, arr, ax=ax, **kw) if titles is not None and i < len(titles): ax.set_title(titles[i]) anim = FuncAnimation(fig, _draw, frames=len(models), interval=1000 // fps) writer = FFMpegWriter(fps=fps) anim.save(filename, writer=writer, dpi=dpi) plt.close(fig) return filename
[docs]def create_difference_gif( mesh, models: Sequence[np.ndarray], reference: np.ndarray, filename: str, *, mode: str = "difference", titles: Optional[Sequence[str]] = None, cmap: str = "RdBu_r", symmetric: bool = True, label: str = "", figsize: Tuple[float, float] = (8, 2.5), dpi: int = 150, duration: int = 100, coverage: Optional[Union[np.ndarray, Sequence[np.ndarray]]] = None, ) -> str: """Create a GIF animation showing model changes relative to a reference. Parameters ---------- mesh : pygimli.Mesh models : sequence of array-like Model snapshots for each frame. reference : array-like Baseline model to subtract / divide. mode : ``'difference'`` | ``'ratio'`` | ``'percent_change'`` symmetric : bool Center colorbar at zero / one. (other parameters same as :func:`create_timelapse_gif`) Returns ------- str Path to the saved GIF file. """ ref = np.asarray(reference, dtype=float).ravel() diff_models = [] for m in models: arr = np.asarray(m, dtype=float).ravel() if mode == "difference": diff_models.append(arr - ref) elif mode == "ratio": diff_models.append(np.where(np.abs(ref) > 1e-30, arr / ref, np.nan)) elif mode == "percent_change": diff_models.append(np.where(np.abs(ref) > 1e-30, 100.0 * (arr - ref) / np.abs(ref), np.nan)) else: raise ValueError(f"Unknown mode '{mode}'.") # Compute global limits if symmetric: all_vals = np.concatenate([d.ravel() for d in diff_models]) vmax = np.nanmax(np.abs(all_vals)) if mode == "ratio": cmin_val = 1.0 / max(vmax, 1.01) cmax_val = max(vmax, 1.01) else: cmin_val = -vmax cmax_val = vmax else: cmin_val = None cmax_val = None return create_timelapse_gif( mesh, diff_models, filename, titles=titles, cmap=cmap, cmin=cmin_val, cmax=cmax_val, label=label or mode.replace("_", " ").title(), figsize=figsize, dpi=dpi, duration=duration, coverage=coverage, )
[docs]def create_combined_timelapse_gif( mesh, filename: str, *, wc_models: Optional[Sequence[np.ndarray]] = None, res_models: Optional[Sequence[np.ndarray]] = None, app_res_data: Optional[Sequence] = None, precipitation: Optional[np.ndarray] = None, n_frames: Optional[int] = None, wc_cmap=None, res_cmap=None, app_res_cmap=None, wc_cmin: float = 0.0, wc_cmax: float = 0.32, res_cmin: float = 100, res_cmax: float = 2000, app_res_cmin: float = 100, app_res_cmax: float = 1000, app_res_plot_type: str = "scatter", app_res_scatter_size: float = 10, mesh_ylim: Optional[Tuple[float, float]] = None, figsize: Tuple[float, float] = (14, 10), dpi: int = 100, duration: int = 100, first_frame_duration: int = 500, loop: int = 0, day_labels: Optional[Sequence[str]] = None, ) -> str: """Create a combined GIF with water content, resistivity, apparent resistivity pseudosection, and precipitation panels. The water content and resistivity model panels are placed side-by-side on the same row. Parameters ---------- mesh : pygimli.Mesh Mesh shared by WC and resistivity models. filename : str Output GIF path. wc_models : sequence of array-like, optional Water content model per frame. res_models : sequence of array-like, optional Resistivity model per frame. app_res_data : sequence, optional PyGimli DataContainers (or file paths) of apparent resistivity per frame. Converted to SimPEG internally for pseudosection plotting. precipitation : array-like, optional 1-D precipitation array (length = ``n_frames``). n_frames : int, optional Number of frames. Inferred from whichever data list is provided. wc_cmap, res_cmap, app_res_cmap : Colormaps for each panel. wc_cmin, wc_cmax : float Water content color limits. res_cmin, res_cmax : float Resistivity color limits. app_res_cmin, app_res_cmax : float Apparent resistivity color limits. app_res_plot_type : str ``'scatter'``, ``'pcolor'``, or ``'contourf'``. mesh_ylim : tuple of (ymin, ymax), optional Y-axis limits for the mesh model panels (WC and resistivity). E.g. ``(1600, 1720)`` to crop the vertical range. figsize : tuple Figure size per frame. dpi : int Resolution. duration : int Milliseconds per frame. first_frame_duration : int Duration of first frame in ms. loop : int 0 = infinite loop. day_labels : sequence of str, optional Label for each frame (e.g. ``'Day 0'``, ``'Day 1'``, …). Returns ------- str Path to the saved GIF file. """ import pygimli as pg from PIL import Image from .plotting import _get_cmap # Determine frame count for seq in [wc_models, res_models, app_res_data]: if seq is not None: if n_frames is None: n_frames = len(seq) break if n_frames is None: raise ValueError("Must provide at least one of wc_models, res_models, or app_res_data.") # Colormaps if wc_cmap is None: try: from palettable.lightbartlein.diverging import BlueDarkRed18_18_r wc_cmap = BlueDarkRed18_18_r.mpl_colormap except ImportError: wc_cmap = plt.get_cmap("RdBu_r") elif isinstance(wc_cmap, str): wc_cmap = plt.get_cmap(wc_cmap) if res_cmap is None: try: from palettable.lightbartlein.diverging import BlueDarkRed18_18 res_cmap = BlueDarkRed18_18.mpl_colormap except ImportError: res_cmap = plt.get_cmap("RdBu") elif isinstance(res_cmap, str): res_cmap = plt.get_cmap(res_cmap) if app_res_cmap is None: try: from palettable.lightbartlein.diverging import BlueDarkRed18_18 app_res_cmap = BlueDarkRed18_18.mpl_colormap except ImportError: app_res_cmap = plt.get_cmap("RdBu") elif isinstance(app_res_cmap, str): app_res_cmap = plt.get_cmap(app_res_cmap) # Pre-convert SimPEG data if needed simpeg_data_list = None if app_res_data is not None: from .plotting import _convert_pygimli_to_simpeg from SimPEG.electromagnetics.static.utils.static_utils import ( plot_pseudosection, ) simpeg_data_list = [] for d in app_res_data: try: from SimPEG import data as simpeg_data_mod if isinstance(d, simpeg_data_mod.Data): simpeg_data_list.append(d) else: dc_dat, _ = _convert_pygimli_to_simpeg(d) simpeg_data_list.append(dc_dat) except Exception: dc_dat, _ = _convert_pygimli_to_simpeg(d) simpeg_data_list.append(dc_dat) # Determine layout rows: # Row 0: precipitation (if present) # Row 1: WC (left) + Resistivity (right) side-by-side # Row 2: apparent resistivity pseudosection (if present) has_precip = precipitation is not None has_mesh_models = wc_models is not None or res_models is not None has_app_res = simpeg_data_list is not None # Build gridspec layout import matplotlib.gridspec as gridspec rows = [] height_ratios = [] if has_precip: rows.append("precip") height_ratios.append(1.0) if has_mesh_models: rows.append("mesh_models") height_ratios.append(2.5) if has_app_res: rows.append("app_res") height_ratios.append(2.0) if not rows: raise ValueError("No data panels provided.") frames = [] for i in range(n_frames): if i % 10 == 0: print(f"Creating combined frame {i}/{n_frames}") fig = plt.figure(figsize=figsize) gs = gridspec.GridSpec(len(rows), 1, figure=fig, height_ratios=height_ratios, hspace=0.35) row_idx = 0 # --- Precipitation bar --- if has_precip: ax_p = fig.add_subplot(gs[row_idx]) row_idx += 1 precip_arr = np.asarray(precipitation) ax_p.bar(np.arange(len(precip_arr)), precip_arr, color="k", width=1.0) ax_p.axvline(x=i, color="red", linewidth=2, label="Current") ax_p.set_xlim([0, len(precip_arr) - 1]) ax_p.set_ylabel("Precip (mm)") ax_p.legend(loc="upper right", fontsize=8) if day_labels is not None and i < len(day_labels): ax_p.set_title(day_labels[i], fontsize=12, fontweight="bold") else: ax_p.set_title(f"Day {i}", fontsize=12, fontweight="bold") # --- WC + Resistivity side-by-side --- if has_mesh_models: gs_inner = gridspec.GridSpecFromSubplotSpec( 1, 2, subplot_spec=gs[row_idx], wspace=0.35) row_idx += 1 # Water content (left) if wc_models is not None: ax_wc = fig.add_subplot(gs_inner[0, 0]) arr = np.asarray(wc_models[i], dtype=float).ravel() pg.show(mesh, arr, ax=ax_wc, cMap=wc_cmap, logScale=False, cMin=wc_cmin, cMax=wc_cmax, label="Water Content (-)", xlabel="Distance (m)", ylabel="Elevation (m)", orientation="vertical", pad=0.3) ax_wc.set_title("Water Content Model", fontsize=10) if mesh_ylim is not None: ax_wc.set_ylim(mesh_ylim) # Resistivity (right) if res_models is not None: ax_res = fig.add_subplot(gs_inner[0, 1]) arr = np.asarray(res_models[i], dtype=float).ravel() pg.show(mesh, arr, ax=ax_res, cMap=res_cmap, logScale=True, cMin=res_cmin, cMax=res_cmax, label="Resistivity (Ω·m)", xlabel="Distance (m)", ylabel="Elevation (m)", orientation="vertical", pad=0.3) ax_res.set_title("Resistivity Model", fontsize=10) if mesh_ylim is not None: ax_res.set_ylim(mesh_ylim) # --- Apparent resistivity pseudosection --- if has_app_res: ax_app = fig.add_subplot(gs[row_idx]) row_idx += 1 scatter_opts = {"cmap": app_res_cmap, "marker": "s", "s": app_res_scatter_size} pcolor_opts = {"cmap": app_res_cmap} contourf_opts = {"cmap": app_res_cmap} try: ax_app, _ = plot_pseudosection( simpeg_data_list[i], plot_type=app_res_plot_type, ax=ax_app, scale="linear", cbar_label="App. Resistivity (Ω·m)", mask_topography=True, pcolor_opts=pcolor_opts, contourf_opts=contourf_opts, scatter_opts=scatter_opts, data_locations=False, cbar_opts={"location": "right", "shrink": 0.8, "aspect": 30}, clim=[app_res_cmin, app_res_cmax], ) ax_app.set_title("Apparent Resistivity Pseudosection", fontsize=10) except Exception as e: ax_app.text(0.5, 0.5, f"Error: {e}", transform=ax_app.transAxes, ha="center", va="center", fontsize=8) buf = io.BytesIO() fig.savefig(buf, format="png", dpi=dpi, bbox_inches="tight") plt.close(fig) buf.seek(0) frames.append(Image.open(buf).copy()) durations = [first_frame_duration] + [duration] * (len(frames) - 1) frames[0].save( filename, format="GIF", append_images=frames[1:], save_all=True, duration=durations, loop=loop, optimize=True, ) print(f"Combined GIF saved to {filename}") return filename
def _panel_height_ratios(panels: List[str]) -> List[float]: """Return GridSpec height ratios for the given panel list.""" ratios = [] for p in panels: if p == "precip": ratios.append(1.0) else: ratios.append(2.0) return ratios