Source code for Geophy_modular.seismic_processor

"""
Seismic data processing module for structure identification.
"""
import numpy as np
import pygimli as pg
from pygimli.physics import traveltime as tt
from scipy.interpolate import interp1d
from scipy.signal import savgol_filter
from typing import Tuple, List, Optional, Union, Dict, Any


[docs]def process_seismic_tomography(ttData, mesh=None, **kwargs): """ Process seismic tomography data and perform inversion. Args: ttData: Travel time data container mesh: Mesh for inversion (optional, created if None) **kwargs: Additional parameters including: - lam: Regularization parameter (default: 50) - zWeight: Vertical regularization weight (default: 0.2) - vTop: Top velocity constraint (default: 500) - vBottom: Bottom velocity constraint (default: 5000) - quality: Mesh quality if creating new mesh (default: 31) - paraDepth: Maximum depth for parametric domain (default: 30) - verbose: Verbosity level (default: 1) Returns: TravelTimeManager object with inversion results """ # Set default parameters params = { 'lam': 50, 'zWeight': 0.2, 'vTop': 500, 'vBottom': 5000, 'quality': 31, 'paraDepth': 30.0, 'verbose': 1, 'limits': [100., 6000.] } # Update with user-provided parameters params.update(kwargs) # Create travel time manager TT = pg.physics.traveltime.TravelTimeManager() # Set or create mesh if mesh is not None: TT.setMesh(mesh) else: # Create mesh from data if not provided # For a more sophisticated mesh creation, we could use createParaMesh pass # Run inversion TT.invert(ttData, lam=params['lam'], zWeight=params['zWeight'], vTop=params['vTop'], vBottom=params['vBottom'], verbose=params['verbose'], limits=params['limits']) return TT
[docs]def seismic_velocity_classifier(velocity_data, mesh, threshold=1200): """ Classify mesh cells based on velocity threshold. Args: velocity_data: Velocity values for each cell mesh: PyGIMLi mesh threshold: Velocity threshold for classification (default: 1200) Returns: Array of cell markers (1: below threshold, 2: above threshold) """ # Initialize classification array thresholded = np.ones_like(velocity_data, dtype=int) # Get cell centers cell_centers = mesh.cellCenters() x_coords = cell_centers[:,0] # X-coordinates of cell centers z_coords = cell_centers[:,1] # Z-coordinates (depth) of cell centers # Get unique x-coordinates (horizontal distances) unique_x = np.unique(x_coords) # For each vertical column (each unique x-coordinate) for x in unique_x: # Get indices of cells in this column, sorted by depth (z-coordinate) column_indices = np.where(x_coords == x)[0] column_indices = column_indices[np.argsort(z_coords[column_indices])] # Check if any cell in this column exceeds the threshold threshold_crossed = False # Process cells from top to bottom for idx in column_indices: if velocity_data[idx] >= threshold or threshold_crossed: thresholded[idx] = 2 threshold_crossed = True return thresholded
[docs]def extract_velocity_structure(mesh, velocity_data, threshold=1200, interval=4.0): """ Extract structure interface from velocity model at the specified threshold. Args: mesh: PyGIMLi mesh velocity_data: Velocity values for each cell threshold: Velocity threshold defining interface (default: 1200) interval: Horizontal sampling interval (default: 4.0) Returns: x_coords: Horizontal coordinates of interface points z_coords: Vertical coordinates of interface points interface_data: Dictionary with interface information """ # Get cell centers cell_centers = mesh.cellCenters() x_coords = cell_centers[:,0] z_coords = cell_centers[:,1] # Get x-range for complete boundary x_min, x_max = np.min(x_coords), np.max(x_coords) # Create bins across the entire x-range x_bins = np.arange(x_min, x_max + interval, interval) # Arrays to store interface points interface_x = [] interface_z = [] # For each bin, find the velocity interface for i in range(len(x_bins)-1): # Get all cells in this x-range bin_indices = np.where((x_coords >= x_bins[i]) & (x_coords < x_bins[i+1]))[0] if len(bin_indices) > 0: # Get velocity values and depths for this bin bin_velocities = velocity_data[bin_indices] bin_depths = z_coords[bin_indices] # Sort by depth sort_indices = np.argsort(bin_depths) bin_velocities = bin_velocities[sort_indices] bin_depths = bin_depths[sort_indices] # Find where velocity crosses the threshold for j in range(1, len(bin_velocities)): if (bin_velocities[j-1] < threshold and bin_velocities[j] >= threshold) or \ (bin_velocities[j-1] >= threshold and bin_velocities[j] < threshold): # Linear interpolation for exact interface depth v1 = bin_velocities[j-1] v2 = bin_velocities[j] z1 = bin_depths[j-1] z2 = bin_depths[j] # Calculate the interpolated z-value where velocity = threshold ratio = (threshold - v1) / (v2 - v1) interface_depth = z1 + ratio * (z2 - z1) interface_x.append((x_bins[i] + x_bins[i+1]) / 2) interface_z.append(interface_depth) break # Ensure we have interface points for the entire range # If first point is missing, extrapolate from the first available points if len(interface_x) > 0 and interface_x[0] > x_min + interval: interface_x.insert(0, x_min) # Use the slope of the first two points to extrapolate if len(interface_x) > 2: slope = (interface_z[1] - interface_z[0]) / (interface_x[1] - interface_x[0]) interface_z.insert(0, interface_z[0] - slope * (interface_x[1] - x_min)) else: interface_z.insert(0, interface_z[0]) # If last point is missing, extrapolate from the last available points if len(interface_x) > 0 and interface_x[-1] < x_max - interval: interface_x.append(x_max) # Use the slope of the last two points to extrapolate if len(interface_x) > 2: slope = (interface_z[-1] - interface_z[-2]) / (interface_x[-1] - interface_x[-2]) interface_z.append(interface_z[-1] + slope * (x_max - interface_x[-1])) else: interface_z.append(interface_z[-1]) # Create a dense interpolation grid for smoothing x_dense = np.linspace(x_min, x_max, 500) # 500 points for smooth curve # Apply cubic interpolation for smoother interface if len(interface_x) > 3: try: interp_func = interp1d(interface_x, interface_z, kind='cubic', bounds_error=False, fill_value="extrapolate") z_dense = interp_func(x_dense) # Apply additional smoothing z_dense = savgol_filter(z_dense, window_length=31, polyorder=3) except: # Fall back to linear interpolation if cubic fails interp_func = interp1d(interface_x, interface_z, kind='linear', bounds_error=False, fill_value="extrapolate") z_dense = interp_func(x_dense) else: # Not enough points for cubic interpolation interp_func = interp1d(interface_x, interface_z, kind='linear', bounds_error=False, fill_value="extrapolate") z_dense = interp_func(x_dense) # Prepare interface data dictionary interface_data = { 'threshold': threshold, 'raw_x': interface_x, 'raw_z': interface_z, 'smooth_x': x_dense, 'smooth_z': z_dense, 'min_x': x_min, 'max_x': x_max } return x_dense, z_dense, interface_data
[docs]def save_velocity_structure(filename, x_coords, z_coords, interface_data=None): """ Save velocity structure data to file. Args: filename: Output filename x_coords: X coordinates of interface z_coords: Z coordinates of interface interface_data: Additional data to save (optional) """ # Create dictionary with data save_data = { 'x_coords': x_coords, 'z_coords': z_coords } # Add additional data if provided if interface_data is not None: save_data.update(interface_data) # Save as numpy file np.savez(filename, **save_data) # Also save as CSV for easier viewing csv_filename = filename.replace('.npz', '.csv') with open(csv_filename, 'w') as f: f.write('x,z\n') for x, z in zip(x_coords, z_coords): f.write(f"{x},{z}\n")