"""
Forward modeling utilities for Electrical Resistivity Tomography (ERT).
"""
import numpy as np
import pygimli as pg
from pygimli.physics import ert
from typing import Tuple, Optional, Union
[docs]class ERTForwardModeling:
"""Class for forward modeling of Electrical Resistivity Tomography (ERT) data."""
[docs] def __init__(self, mesh: pg.Mesh, data: Optional[pg.DataContainer] = None):
"""
Initialize ERT forward modeling.
Args:
mesh: PyGIMLI mesh for forward modeling
data: ERT data container
"""
self.mesh = mesh
self.data = data
self.fwd_operator = ert.ERTModelling()
if data is not None:
self.fwd_operator.setData(data)
self.fwd_operator.setMesh(mesh)
[docs] def set_data(self, data: pg.DataContainer) -> None:
"""
Set ERT data for forward modeling.
Args:
data: ERT data container
"""
self.data = data
self.fwd_operator.setData(data)
[docs] def set_mesh(self, mesh: pg.Mesh) -> None:
"""
Set mesh for forward modeling.
Args:
mesh: PyGIMLI mesh
"""
self.mesh = mesh
self.fwd_operator.setMesh(mesh)
[docs] def forward(self, resistivity_model: np.ndarray, log_transform: bool = True) -> np.ndarray:
"""
Compute forward response for a given resistivity model.
Args:
resistivity_model: Resistivity model values
log_transform: Whether resistivity_model is log-transformed
Returns:
Forward response (apparent resistivity)
"""
# Convert to PyGIMLI RVector if needed
if isinstance(resistivity_model, np.ndarray):
model = pg.Vector(resistivity_model.ravel())
else:
model = resistivity_model
# Apply exponentiation if log-transformed input
if log_transform:
model = pg.Vector(np.exp(model))
# Calculate response
response = self.fwd_operator.response(model)
# Log-transform response if requested
if log_transform:
return np.log(response.array())
return response.array()
[docs] def forward_and_jacobian(self, resistivity_model: np.ndarray, log_transform: bool = True) -> Tuple[np.ndarray, np.ndarray]:
"""
Compute forward response and Jacobian matrix.
Args:
resistivity_model: Resistivity model values
log_transform: Whether resistivity_model is log-transformed
Returns:
Tuple of (forward response, Jacobian matrix)
"""
# Convert to PyGIMLI RVector if needed
if isinstance(resistivity_model, np.ndarray):
model = pg.Vector(resistivity_model.ravel())
else:
model = resistivity_model
# Apply exponentiation if log-transformed input
if log_transform:
model = pg.Vector(np.exp(model))
# Calculate response
response = self.fwd_operator.response(model)
# Create Jacobian matrix
self.fwd_operator.createJacobian(model)
jacobian = self.fwd_operator.jacobian()
J = pg.utils.gmat2numpy(jacobian)
# Process Jacobian according to log transformations
if log_transform:
# For log-transformed model and response
# J_log = J * exp(m) / d = d(log(d))/d(log(m))
J = np.exp(resistivity_model.ravel()) * J
response_array = response.array()
J = J / response_array.reshape(response_array.shape[0], 1)
return np.log(response.array()), J
return response.array(), J
[docs] def get_coverage(self, resistivity_model: np.ndarray, log_transform: bool = True) -> np.ndarray:
"""
Compute coverage (resolution) for a given resistivity model.
Args:
resistivity_model: Resistivity model values
log_transform: Whether resistivity_model is log-transformed
Returns:
Coverage values for each cell
"""
# Convert to PyGIMLI RVector if needed
if isinstance(resistivity_model, np.ndarray):
model = pg.Vector(resistivity_model.ravel())
else:
model = resistivity_model
# Apply exponentiation if log-transformed input
if log_transform:
model = pg.Vector(np.exp(model))
# Calculate response and Jacobian
response = self.fwd_operator.response(model)
self.fwd_operator.createJacobian(model)
# Calculate coverage
covTrans = pg.core.coverageDCtrans(
self.fwd_operator.jacobian(),
1.0 / response,
1.0 / model
)
# Weight by cell sizes
paramSizes = np.zeros(len(model))
mesh = self.fwd_operator.paraDomain
for c in mesh.cells():
paramSizes[c.marker()] += c.size()
FinalJ = np.log10(covTrans / paramSizes)
return FinalJ
[docs] def create_synthetic_data(cls, xpos: np.ndarray,
ypos: Optional[np.ndarray] = None,
mesh: Optional[pg.Mesh] = None,
res_models: Optional[np.ndarray] = None,
schemeName: str = 'wa',
noise_level: float = 0.05,
absolute_error: float = 0.0,
relative_error: float = 0.05,
save_path: Optional[str] = None,
show_data: bool = False,
seed: Optional[int] = None,
xbound: float = 100,
ybound: float = 100) -> Tuple[pg.DataContainer, pg.Mesh]:
"""
Create synthetic ERT data using forward modeling.
This method simulates an ERT survey by placing electrodes, creating a measurement
scheme, performing forward modeling to generate synthetic data, and adding noise.
Args:
xpos: X-coordinates of electrodes
ypos: Y-coordinates of electrodes (if None, uses flat surface)
mesh: Mesh for forward modeling
res_models: Resistivity model values
schemeName: Name of measurement scheme ('wa', 'dd', etc.)
noise_level: Level of Gaussian noise to add
absolute_error: Absolute error for data estimation
relative_error: Relative error for data estimation
save_path: Path to save synthetic data (if None, does not save)
show_data: Whether to display data after creation
seed: Random seed for noise generation
xbound: X boundary extension for mesh
ybound: Y boundary extension for mesh
Returns:
Tuple of (synthetic ERT data container, simulation mesh)
"""
# Set random seed if provided
if seed is not None:
np.random.seed(seed)
pg.rrng.randpin.seed(seed)
# Create electrode positions
if ypos is None:
# Create flat surface if no y-coordinates provided
ypos = np.zeros_like(xpos)
pos = np.hstack((xpos.reshape(-1, 1), ypos.reshape(-1, 1)))
# Create ERT survey scheme
scheme = ert.createData(elecs=pos, schemeName=schemeName)
# Prepare mesh for forward modeling
if mesh is not None:
# Set all cells to same marker
mesh.setCellMarkers(np.ones(mesh.cellCount()) * 2)
# Append triangle boundary for forward modeling
grid = pg.meshtools.appendTriangleBoundary(mesh, marker=1,
xbound=xbound, ybound=ybound)
else:
# Create a simple mesh if none provided
grid = pg.createGrid(
x=np.linspace(np.min(xpos) - 10, np.max(xpos) + 10, 50),
y=np.linspace(np.min(ypos) - 20, 0, 20)
)
grid = pg.meshtools.appendTriangleBoundary(grid, marker=1,
xbound=xbound, ybound=ybound)
# Create homogeneous resistivity model if none provided
if res_models is None:
res_models = np.ones(grid.cellCount()) * 100
# Create synthetic data
synth_data = scheme.copy()
# Initialize a forward operator
fwd_operator = cls(mesh=grid, data=scheme)
# Forward response
fob = ert.ERTModelling()
fob.setData(scheme)
fob.setMesh(grid)
dr = fob.response(res_models)
# Add noise
dr *= 1. + pg.randn(dr.size()) * noise_level
# Set data and error values
synth_data['rhoa'] = dr
# Estimate error
ert_manager = ert.ERTManager(synth_data)
synth_data['err'] = ert_manager.estimateError(
synth_data, absoluteUError=absolute_error, relativeError=relative_error
)
# Display data if requested
if show_data:
ert.showData(synth_data, logscale=True)
# Save data if a path is provided
if save_path is not None:
synth_data.save(save_path)
return synth_data, grid
[docs]def ertforward(fob, mesh, rhomodel, xr):
"""
Forward model for ERT.
Args:
fob (pygimli.ERTModelling): ERT forward operator.
mesh (pg.Mesh): Mesh for the forward model.
rhomodel (pg.RVector): Resistivity model vector.
xr (np.ndarray): Log-transformed model parameter (resistivity).
Returns:
dr (np.ndarray): Log-transformed forward response.
rhomodel (pg.RVector): Updated resistivity model.
"""
xr1 = np.log(rhomodel.array())
xr1[mesh.cellMarkers() == 2] = np.exp(xr)
rhomodel = pg.matrix.RVector(xr1)
dr = fob.response(rhomodel)
return np.log(dr.array()), rhomodel
[docs]def ertforward2(fob, xr, mesh):
"""
Simplified ERT forward model.
Args:
fob (pygimli.ERTModelling): ERT forward operator.
xr (np.ndarray): Log-transformed model parameter.
mesh (pg.Mesh): Mesh for the forward model.
Returns:
dr (np.ndarray): Log-transformed forward response.
"""
xr1 = xr.copy()
xr1 = np.exp(xr)
rhomodel = xr1
dr = fob.response(rhomodel)
dr = np.log(dr)
return dr
[docs]def ertforandjac(fob, rhomodel, xr):
"""
Forward model and Jacobian for ERT.
Args:
fob (pygimli.ERTModelling): ERT forward operator.
rhomodel (pg.RVector): Resistivity model.
xr (np.ndarray): Log-transformed model parameter.
Returns:
dr (np.ndarray): Log-transformed forward response.
J (np.ndarray): Jacobian matrix.
"""
dr = fob.response(rhomodel)
fob.createJacobian(rhomodel)
J = fob.jacobian()
J = pg.utils.gmat2numpy(J)
J = np.exp(xr)*J
dr = dr.array()
J = J/dr.reshape(dr.shape[0],1)
dr = np.log(dr)
return dr, J
[docs]def ertforandjac2(fob, xr, mesh):
"""
Alternative ERT forward model and Jacobian using log-resistivity values.
Args:
fob (pygimli.ERTModelling): ERT forward operator.
xr (np.ndarray): Log-transformed model parameter.
mesh (pg.Mesh): Mesh for the forward model.
Returns:
dr (np.ndarray): Log-transformed forward response.
J (np.ndarray): Jacobian matrix.
"""
xr1 = xr.copy()
xr1 = np.exp(xr)
rhomodel = xr1
dr = fob.response(rhomodel)
fob.createJacobian(rhomodel)
J = fob.jacobian()
J = pg.utils.gmat2numpy(J)
J = np.exp(xr.T)*J
dr = dr.array()
J = J/dr.reshape(dr.shape[0],1)
dr = np.log(dr)
return dr, J