Source code for model_output.modflow_output

"""
Module for processing MODFLOW model outputs.
"""
import os
import numpy as np
from typing import Tuple, Optional, Union, List, Dict

from .base import HydroModelOutput


[docs]def binaryread(file, vartype, shape=(1,), charlen=16): """ Uses numpy to read from binary file. This was found to be faster than the struct approach and is used as the default. Args: file: Open file object in binary read mode vartype: Variable type to read shape: Shape of the data to read (default: (1,)) charlen: Length of character strings (default: 16) Returns: The read data """ # Read a string variable of length charlen if vartype == str: result = file.read(charlen * 1) else: # Find the number of values nval = np.prod(shape) result = np.fromfile(file, vartype, nval) if nval == 1: result = result # [0] else: result = np.reshape(result, shape) return result
[docs]class MODFLOWWaterContent(HydroModelOutput): """Class for processing water content data from MODFLOW simulations."""
[docs] def __init__(self, model_directory: str, idomain: np.ndarray): """ Initialize MODFLOWWaterContent processor. Args: model_directory: Path to simulation workspace idomain: Domain array indicating active cells """ super().__init__(model_directory) self.idomain = idomain self.nrows, self.ncols = idomain.shape # Build reverse lookup dictionary (only for first layer as in original code) self.iuzno_dict_rev = {} iuzno = 0 for i in range(self.nrows): for j in range(self.ncols): if idomain[i, j] != 0: self.iuzno_dict_rev[iuzno] = (i, j) iuzno += 1 # Store number of UZ flow cells self.nuzfcells = len(self.iuzno_dict_rev)
[docs] def load_timestep(self, timestep_idx: int, nlay: int = 3) -> np.ndarray: """ Load water content for a specific timestep. Args: timestep_idx: Index of the timestep to load nlay: Number of layers in the model Returns: Water content array with shape (nlay, nrows, ncols) """ return self.load_time_range(timestep_idx, timestep_idx + 1, nlay)[0]
[docs] def load_time_range(self, start_idx: int = 0, end_idx: Optional[int] = None, nlay: int = 3) -> np.ndarray: """ Load water content for a range of timesteps. Args: start_idx: Starting timestep index (default: 0) end_idx: Ending timestep index (exclusive, default: None loads all) nlay: Number of layers in the model (default: 3) Returns: Water content array with shape (timesteps, nlay, nrows, ncols) """ # Calculate total UZ flow cells nuzfcells = self.nuzfcells * nlay # Open water content file fpth = os.path.join(self.model_directory, "WaterContent") file = open(fpth, "rb") WC_tot = [] # Skip to starting timestep for _ in range(start_idx): try: # Read header vartype = [ ("kstp", "<i4"), ("kper", "<i4"), ("pertim", "<f8"), ("totim", "<f8"), ("text", "S16"), ("maxbound", "<i4"), ("1", "<i4"), ("11", "<i4"), ] binaryread(file, vartype) # Skip data for this timestep vartype = [("data", "<f8")] for _ in range(nuzfcells): binaryread(file, vartype) except Exception: print(f"Error skipping to timestep {start_idx}") file.close() return np.array(WC_tot) # Read timesteps timestep = 0 while True: # Break if we've read the requested number of timesteps if end_idx is not None and timestep >= (end_idx - start_idx): break try: # Read header information vartype = [ ("kstp", "<i4"), ("kper", "<i4"), ("pertim", "<f8"), ("totim", "<f8"), ("text", "S16"), ("maxbound", "<i4"), ("1", "<i4"), ("11", "<i4"), ] header = binaryread(file, vartype) # Initialize water content array for this timestep WC_arr = np.zeros((nlay, self.nrows, self.ncols)) * np.nan # Read water content data vartype = [("data", "<f8")] # Read data for each layer and cell for k in range(nlay): for n in range(self.nuzfcells): i, j = self.iuzno_dict_rev[n] WC_arr[k, i, j] = np.array(binaryread(file, vartype).tolist()) WC_tot.append(WC_arr) timestep += 1 except Exception as e: print(f"Reached end of file or error at timestep {timestep}: {str(e)}") break file.close() return np.array(WC_tot)
[docs] def get_timestep_info(self) -> List[Tuple[int, int, float, float]]: """ Get information about each timestep in the WaterContent file. Returns: List of tuples (kstp, kper, pertim, totim) for each timestep """ # Open water content file fpth = os.path.join(self.model_directory, "WaterContent") file = open(fpth, "rb") timestep_info = [] nuzfcells = self.nuzfcells * 3 # Assuming 3 layers by default while True: try: # Read header information vartype = [ ("kstp", "<i4"), ("kper", "<i4"), ("pertim", "<f8"), ("totim", "<f8"), ("text", "S16"), ("maxbound", "<i4"), ("1", "<i4"), ("11", "<i4"), ] header = binaryread(file, vartype) # Extract timestep info kstp = header[0][0] kper = header[0][1] pertim = header[0][2] totim = header[0][3] timestep_info.append((kstp, kper, pertim, totim)) # Skip data for this timestep vartype = [("data", "<f8")] for _ in range(nuzfcells): binaryread(file, vartype) except Exception: break file.close() return timestep_info
[docs]class MODFLOWPorosity(HydroModelOutput): """Class for processing porosity data from MODFLOW simulations."""
[docs] def __init__(self, model_directory: str, model_name: str): """ Initialize MODFLOWPorosity processor. Args: model_directory: Path to simulation workspace model_name: Name of the MODFLOW model """ super().__init__(model_directory) self.model_directory = model_directory self.model_name = model_name self.nlay = 1 self.nrow = 1 self.ncol = 1 try: import flopy self.flopy_available = True except ImportError: self.flopy_available = False raise ImportError("flopy is required to load MODFLOW porosity data. Please install flopy.")
[docs] def load_porosity(self) -> np.ndarray: """ Load porosity data from MODFLOW model (supports both MODFLOW 6 and earlier versions). Returns: 3D array of porosity values (nlay, nrow, ncol) """ if not self.flopy_available: raise ImportError("flopy is required to load MODFLOW porosity data.") try: import flopy # Check if this is a MODFLOW 6 model mf6_indicator_files = ["mfsim.nam", f"{self.model_name}.sim"] is_mf6 = any(os.path.exists(os.path.join(self.model_directory, f)) for f in mf6_indicator_files) if is_mf6: # MODFLOW 6 approach try: # Load the MODFLOW 6 simulation sim = flopy.mf6.MFSimulation.load( sim_name=self.model_name, sim_ws=self.model_directory, exe_name="mf6", ) # Get the groundwater flow model gwf = sim.get_model(self.model_name) # Try to get dimensions from DIS package dis = gwf.get_package("DIS") self.nlay = dis.nlay.data self.nrow = dis.nrow.data self.ncol = dis.ncol.data # Try to get porosity from the STO (Storage) package sto = gwf.get_package("STO") return sto.sy.array except Exception as e: print(f"Error loading MODFLOW 6 model: {str(e)}") else: # Legacy MODFLOW models (2005 or earlier) try: # Load the model model = flopy.modflow.Modflow.load( f"{self.model_name}.nam", model_ws=self.model_directory, load_only=["UPW", "LPF", "DIS"], # Load packages with porosity and dimensions check=False ) # Get dimensions self.nlay = model.nlay self.nrow = model.nrow self.ncol = model.ncol # Try to get porosity from UPW package first if hasattr(model, 'upw') and model.upw is not None: if hasattr(model.upw, 'sy'): return model.upw.sy.array # Then try LPF package if hasattr(model, 'lpf') and model.lpf is not None: if hasattr(model.lpf, 'sy'): return model.lpf.sy.array # If specific yield not found, try specific storage if hasattr(model, 'upw') and model.upw is not None: if hasattr(model.upw, 'ss'): print("WARNING: Using specific storage as substitute for porosity") return model.upw.ss.array if hasattr(model, 'lpf') and model.lpf is not None: if hasattr(model.lpf, 'ss'): print("WARNING: Using specific storage as substitute for porosity") return model.lpf.ss.array except Exception as e: print(f"Error loading legacy MODFLOW model: {str(e)}") # If nothing found, use default value print("WARNING: No porosity data found in model. Using default value of 0.3") return np.ones((self.nlay, self.nrow, self.ncol)) * 0.3 except Exception as e: raise ValueError(f"Error loading porosity data: {str(e)}")
# Implement required abstract methods
[docs] def load_timestep(self, timestep_idx: int, **kwargs) -> np.ndarray: """ Load porosity for a specific timestep. Note: For MODFLOW, porosity is typically constant over time, so this returns the same array regardless of timestep. Args: timestep_idx: Index of the timestep (unused) Returns: 3D array of porosity values """ return self.load_porosity()
[docs] def load_time_range(self, start_idx: int = 0, end_idx: Optional[int] = None, **kwargs) -> np.ndarray: """ Load porosity for a range of timesteps. Since porosity is typically constant, this returns a stack of identical arrays. Args: start_idx: Starting timestep index (unused) end_idx: Ending timestep index (unused) Returns: 4D array of porosity values (nt, nlay, nrow, ncol) where all timesteps are identical """ porosity = self.load_porosity() # Determine number of timesteps nt = 1 if end_idx is None else (end_idx - start_idx) # Stack porosity array for each timestep return np.tile(porosity[np.newaxis, :, :, :], (nt, 1, 1, 1))
[docs] def get_timestep_info(self) -> List[Tuple]: """ Get information about each timestep in the model. Returns a minimal placeholder since porosity doesn't vary with time. Returns: List with single dummy timestep info """ return [(0, 0, 0.0)] # (stress_period, timestep, time)