"""
Linear solvers for geophysical inversion.
"""
import numpy as np
import scipy
import scipy.sparse
from scipy.sparse import linalg as splinalg
import sys
import time
from typing import Optional, Union, Dict, Any, Tuple, List, Callable
# Try to import cupy for GPU acceleration
try:
import cupy as cp
from cupyx.scipy.sparse import csr_matrix
GPU_AVAILABLE = True
except ImportError:
GPU_AVAILABLE = False
# Try to import joblib for parallel processing
try:
from joblib import Parallel, delayed
PARALLEL_AVAILABLE = True
except ImportError:
PARALLEL_AVAILABLE = False
[docs]def generalized_solver(A, b, method="cgls", x=None, maxiter=200, tol=1e-8,
verbose=False, damp=0.0, use_gpu=False, parallel=False, n_jobs=-1):
"""
Generalized solver for Ax = b with optional GPU acceleration and parallelism.
Parameters:
-----------
A : array_like or sparse matrix
The system matrix (Jacobian or forward operator).
b : array_like
Right-hand side vector.
method : str, optional
Solver method: 'lsqr', 'rrlsqr', 'cgls', or 'rrls'. Default is 'cgls'.
x : array_like, optional
Initial guess for the solution. If None, zeros are used.
maxiter : int, optional
Maximum number of iterations.
tol : float, optional
Convergence tolerance.
verbose : bool, optional
Print progress information every 10 iterations.
damp : float, optional
Damping factor (Tikhonov regularization).
use_gpu : bool, optional
Use GPU acceleration with CuPy (if available).
parallel : bool, optional
Use parallel CPU computations.
n_jobs : int, optional
Number of parallel jobs (if parallel is True).
Returns:
--------
x : array_like
The computed solution vector.
"""
# Choose the backend (NumPy or CuPy)
if use_gpu and GPU_AVAILABLE:
xp = cp
else:
xp = np
use_gpu = False # Ensure it's turned off if not available
# Convert A and b to appropriate arrays
if use_gpu:
if scipy.sparse.isspmatrix(A):
A = csr_matrix(A)
else:
A = cp.asarray(A)
b = cp.asarray(b)
else:
if scipy.sparse.isspmatrix(A):
A = A.tocsr()
else:
A = np.asarray(A)
b = np.asarray(b)
# Initialize solution and residual
if x is None:
x = xp.zeros(A.shape[1])
r = b.copy()
else:
x = xp.asarray(x)
r = b - A.dot(x)
# Precompute initial quantities
s = A.T.dot(r)
p = s.copy()
gamma = xp.dot(s.T, s)
rr = xp.dot(r.T, r)
rr0 = rr
# Choose the solver routine based on method
if method.lower() == "lsqr":
return _lsqr(A, b, x, r, s, gamma, rr, rr0, maxiter, tol, verbose, damp, use_gpu, parallel, n_jobs, xp)
elif method.lower() == "rrlsqr":
return _rrlsqr(A, b, x, r, s, gamma, rr, rr0, maxiter, tol, verbose, damp, use_gpu, parallel, n_jobs, xp)
elif method.lower() == "cgls":
return _cgls(A, b, x, r, s, gamma, rr, rr0, maxiter, tol, verbose, damp, use_gpu, parallel, n_jobs, xp)
elif method.lower() == "rrls":
return _rrls(A, b, x, r, s, gamma, rr, rr0, maxiter, tol, verbose, damp, use_gpu, parallel, n_jobs, xp)
else:
raise ValueError(f"Unknown method: {method}. Supported methods: 'lsqr', 'rrlsqr', 'cgls', 'rrls'")
def _matrix_multiply(A, v, use_gpu, parallel, n_jobs, xp):
"""
Helper function for matrix-vector multiplication with optional GPU or parallel CPU support.
Args:
A: Matrix
v: Vector
use_gpu: Whether to use GPU
parallel: Whether to use parallel CPU
n_jobs: Number of parallel jobs
xp: NumPy or CuPy module
Returns:
Matrix-vector product
"""
if use_gpu:
v = xp.asarray(v)
return A.dot(v)
else:
if scipy.sparse.isspmatrix(A):
return A.dot(v)
else:
if parallel and PARALLEL_AVAILABLE:
# Partition matrix rows for parallel processing
n_rows = A.shape[0]
if n_jobs <= 0:
import multiprocessing
n_jobs = multiprocessing.cpu_count()
partition_size = max(1, n_rows // n_jobs)
partitions = [(i, min(i + partition_size, n_rows))
for i in range(0, n_rows, partition_size)]
# Compute matrix-vector product in parallel
results = Parallel(n_jobs=n_jobs, backend='threading')(
delayed(lambda row_range: A[row_range[0]:row_range[1]].dot(v))(partition)
for partition in partitions
)
# Combine results
return xp.concatenate(results)
else:
return A.dot(v)
def _cgls(A, b, x, r, s, gamma, rr, rr0, maxiter, tol, verbose, damp,
use_gpu, parallel, n_jobs, xp):
"""
CGLS solver for linear least squares problems.
This implements the Conjugate Gradient Least Squares method for solving
the normal equations A^T A x = A^T b.
Args:
A: System matrix
b: Right-hand side vector
x: Initial solution vector
r: Initial residual
s: Initial A^T r
gamma: Initial s^T s
rr: Initial r^T r
rr0: Initial residual norm
maxiter: Maximum iterations
tol: Convergence tolerance
verbose: Whether to print progress
damp: Damping parameter
use_gpu: Whether to use GPU acceleration
parallel: Whether to use parallel computation
n_jobs: Number of parallel jobs
xp: NumPy or CuPy module
Returns:
Solution vector
"""
# Ensure inputs have correct shape
if x.ndim == 1:
x = x.reshape(-1, 1)
if r.ndim == 1:
r = r.reshape(-1, 1)
if s.ndim == 1:
s = s.reshape(-1, 1)
# Initialize search direction
p = s.copy()
for i in range(maxiter):
if verbose and i % 10 == 0:
pg.info("CGLS Iteration:", i, "residual:", float(rr), "relative:", float(rr / rr0))
# Compute A*p
q = _matrix_multiply(A, p, use_gpu, parallel, n_jobs, xp)
# Add damping if requested
if damp > 0:
q += damp * p
# Ensure q is a column vector
q = q.reshape(-1, 1)
# Compute step size
alpha = float(gamma / xp.dot(q.T, q))
# Update solution and residual
x += alpha * p
r -= alpha * q
# Compute new gradient
s = _matrix_multiply(A.T, r, use_gpu, parallel, n_jobs, xp)
# Add damping if requested
if damp > 0:
s += damp * r
# Ensure s is a column vector
s = s.reshape(-1, 1)
# Compute new gamma and beta
gamma_new = float(xp.dot(s.T, s))
beta = float(gamma_new / gamma)
# Update search direction
p = s + beta * p
# Update gamma
gamma = gamma_new
# Check convergence
rr = float(xp.dot(r.T, r))
if rr / rr0 < tol:
if verbose:
pg.info(f"CGLS converged after {i+1} iterations")
break
# Return solution (convert back to CPU if on GPU)
return x.get() if use_gpu and GPU_AVAILABLE else x
def _lsqr(A, b, x, r, s, gamma, rr, rr0, maxiter, tol, verbose, damp,
use_gpu, parallel, n_jobs, xp):
"""
LSQR solver for linear least squares problems.
This implements the LSQR algorithm of Paige and Saunders for solving
the least squares problem min ||Ax - b||_2.
Args: (same as _cgls)
Returns:
Solution vector
"""
# Ensure x and r are column vectors
if x is None:
x = xp.zeros((A.shape[1], 1))
else:
x = xp.asarray(x)
if x.ndim == 1:
x = x.reshape(-1, 1)
if r.ndim == 1:
r = r.reshape(-1, 1)
# Initialize u and beta
u = r.copy()
beta = xp.sqrt(float(xp.dot(u.T, u)))
if beta > 0:
u = u / beta
# Initialize v and alpha
v = _matrix_multiply(A.T, u, use_gpu, parallel, n_jobs, xp)
if v.ndim == 1:
v = v.reshape(-1, 1)
alpha = xp.sqrt(float(xp.dot(v.T, v)))
if alpha > 0:
v = v / alpha
w = v.copy()
phi_bar = beta
rho_bar = alpha
for i in range(maxiter):
if verbose and i % 10 == 0:
pg.info("LSQR Iteration:", i, "residual:", float(rr), "relative:", float(rr / rr0))
# Bidiagonalization
u_next = _matrix_multiply(A, v, use_gpu, parallel, n_jobs, xp)
if u_next.ndim == 1:
u_next = u_next.reshape(-1, 1)
u_next = u_next - alpha * u
beta = xp.sqrt(float(xp.dot(u_next.T, u_next)))
if beta > 0:
u = u_next / beta
v_next = _matrix_multiply(A.T, u, use_gpu, parallel, n_jobs, xp)
if v_next.ndim == 1:
v_next = v_next.reshape(-1, 1)
v_next = v_next - beta * v
alpha = xp.sqrt(float(xp.dot(v_next.T, v_next)))
if alpha > 0:
v = v_next / alpha
# Apply orthogonal transformation
rho = xp.sqrt(rho_bar**2 + beta**2)
c = rho_bar / rho
s = beta / rho
theta = s * alpha
rho_bar = -c * alpha
phi = c * phi_bar
phi_bar = s * phi_bar
# Update x and w
t = phi / rho
x = x + t * w
w = v - (theta / rho) * w
# Check convergence
rr = phi_bar**2
if rr / rr0 < tol:
if verbose:
pg.info(f"LSQR converged after {i+1} iterations")
break
return x.get() if use_gpu and GPU_AVAILABLE else x
def _rrlsqr(A, b, x, r, s, gamma, rr, rr0, maxiter, tol, verbose, damp,
use_gpu, parallel, n_jobs, xp):
"""
Regularized LSQR solver.
This implements a regularized version of the LSQR algorithm.
Args: (same as _lsqr)
Returns:
Solution vector
"""
# Ensure x and r are column vectors
if x is None:
x = xp.zeros((A.shape[1], 1))
else:
x = xp.asarray(x)
if x.ndim == 1:
x = x.reshape(-1, 1)
if r.ndim == 1:
r = r.reshape(-1, 1)
# Initialize u and beta
u = r.copy()
beta = xp.sqrt(float(xp.dot(u.T, u)))
if beta > 0:
u = u / beta
# Initialize v and alpha with regularization
v = _matrix_multiply(A.T, u, use_gpu, parallel, n_jobs, xp)
if v.ndim == 1:
v = v.reshape(-1, 1)
if damp > 0:
v = v + damp * x
alpha = xp.sqrt(float(xp.dot(v.T, v)))
if alpha > 0:
v = v / alpha
w = v.copy()
phi_bar = beta
rho_bar = alpha
for i in range(maxiter):
if verbose and i % 10 == 0:
pg.info("RRLSQR Iteration:", i, "residual:", float(rr), "relative:", float(rr / rr0))
# Bidiagonalization with regularization
u_next = _matrix_multiply(A, v, use_gpu, parallel, n_jobs, xp)
if u_next.ndim == 1:
u_next = u_next.reshape(-1, 1)
u_next = u_next - alpha * u
beta = xp.sqrt(float(xp.dot(u_next.T, u_next)))
if beta > 0:
u = u_next / beta
v_next = _matrix_multiply(A.T, u, use_gpu, parallel, n_jobs, xp)
if v_next.ndim == 1:
v_next = v_next.reshape(-1, 1)
v_next = v_next - beta * v
if damp > 0:
v_next = v_next + damp * x
alpha = xp.sqrt(float(xp.dot(v_next.T, v_next)))
if alpha > 0:
v = v_next / alpha
# Apply orthogonal transformation
rho = xp.sqrt(rho_bar**2 + beta**2 + damp**2)
c = rho_bar / rho
s = beta / rho
theta = s * alpha
rho_bar = -c * alpha
phi = c * phi_bar
phi_bar = s * phi_bar
# Update x and w
t = phi / rho
x = x + t * w
w = v - (theta / rho) * w
# Check convergence
rr = phi_bar**2
if rr / rr0 < tol:
if verbose:
pg.info(f"RRLSQR converged after {i+1} iterations")
break
return x.get() if use_gpu and GPU_AVAILABLE else x
def _rrls(A, b, x, r, s, gamma, rr, rr0, maxiter, tol, verbose, damp,
use_gpu, parallel, n_jobs, xp):
"""
Range-Restricted Least Squares solver.
This implements a Range-Restricted Least Squares method.
Args: (same as _cgls)
Returns:
Solution vector
"""
# Ensure x, r, and s are column vectors
if x is None:
x = xp.zeros((A.shape[1], 1))
else:
x = xp.asarray(x)
if x.ndim == 1:
x = x.reshape(-1, 1)
if r.ndim == 1:
r = r.reshape(-1, 1)
if s.ndim == 1:
s = s.reshape(-1, 1)
w = s.copy()
for i in range(maxiter):
if verbose and i % 10 == 0:
pg.info("RRLS Iteration:", i, "residual:", float(rr), "relative:", float(rr / rr0))
p = _matrix_multiply(A, w, use_gpu, parallel, n_jobs, xp)
if p.ndim == 1:
p = p.reshape(-1, 1)
denom = xp.dot(p.T, p)
if xp.isclose(denom, 0.0):
break
lam = xp.dot(p.T, r) / denom
x = x + w * float(lam) # Convert lam to scalar
r = r - p * float(lam)
s = _matrix_multiply(A.T, r, use_gpu, parallel, n_jobs, xp)
if s.ndim == 1:
s = s.reshape(-1, 1)
if damp > 0:
s = s + damp * x
w = s
rr = float(xp.dot(r.T, r))
if rr / rr0 < tol:
if verbose:
pg.info(f"RRLS converged after {i+1} iterations")
break
return x.get() if use_gpu and GPU_AVAILABLE else x
[docs]class LinearSolver:
"""Base class for linear system solvers."""
[docs] def __init__(self, method="cgls", max_iterations=200, tolerance=1e-8,
use_gpu=False, parallel=False, n_jobs=-1, damping=0.0,
verbose=False):
"""
Initialize solver with computational options.
Args:
method: Solver method ('cgls', 'lsqr', 'rrlsqr', 'rrls')
max_iterations: Maximum number of iterations
tolerance: Convergence tolerance
use_gpu: Whether to use GPU acceleration
parallel: Whether to use parallel computation
n_jobs: Number of parallel jobs
damping: Damping parameter
verbose: Whether to print progress
"""
self.method = method.lower()
self.max_iterations = max_iterations
self.tolerance = tolerance
self.use_gpu = use_gpu and GPU_AVAILABLE
self.parallel = parallel and PARALLEL_AVAILABLE
self.n_jobs = n_jobs
self.damping = damping
self.verbose = verbose
# Check method
valid_methods = ['cgls', 'lsqr', 'rrlsqr', 'rrls']
if self.method not in valid_methods:
raise ValueError(f"Invalid method: {method}. Must be one of {valid_methods}")
# Check GPU availability
if use_gpu and not GPU_AVAILABLE:
print("Warning: GPU acceleration requested but CuPy not available. Using CPU.")
self.use_gpu = False
# Check parallel availability
if parallel and not PARALLEL_AVAILABLE:
print("Warning: Parallel computation requested but joblib not available. Using serial.")
self.parallel = False
[docs] def solve(self, A, b, x0=None):
"""
Solve linear system Ax = b.
Args:
A: System matrix
b: Right-hand side vector
x0: Initial guess (None for zeros)
Returns:
Solution vector
"""
return generalized_solver(
A, b, method=self.method, x=x0,
maxiter=self.max_iterations, tol=self.tolerance,
verbose=self.verbose, damp=self.damping,
use_gpu=self.use_gpu, parallel=self.parallel, n_jobs=self.n_jobs
)
[docs]class CGLSSolver(LinearSolver):
"""CGLS (Conjugate Gradient Least Squares) solver."""
[docs] def __init__(self, max_iterations=200, tolerance=1e-8, use_gpu=False,
parallel=False, n_jobs=-1, damping=0.0, verbose=False):
"""
Initialize CGLS solver.
Args:
max_iterations: Maximum number of iterations
tolerance: Convergence tolerance
use_gpu: Whether to use GPU acceleration
parallel: Whether to use parallel computation
n_jobs: Number of parallel jobs
damping: Damping parameter
verbose: Whether to print progress
"""
super().__init__(
method="cgls", max_iterations=max_iterations, tolerance=tolerance,
use_gpu=use_gpu, parallel=parallel, n_jobs=n_jobs,
damping=damping, verbose=verbose
)
[docs]class LSQRSolver(LinearSolver):
"""LSQR solver for least squares problems."""
[docs] def __init__(self, max_iterations=200, tolerance=1e-8, use_gpu=False,
parallel=False, n_jobs=-1, damping=0.0, verbose=False):
"""
Initialize LSQR solver.
Args:
max_iterations: Maximum number of iterations
tolerance: Convergence tolerance
use_gpu: Whether to use GPU acceleration
parallel: Whether to use parallel computation
n_jobs: Number of parallel jobs
damping: Damping parameter
verbose: Whether to print progress
"""
super().__init__(
method="lsqr", max_iterations=max_iterations, tolerance=tolerance,
use_gpu=use_gpu, parallel=parallel, n_jobs=n_jobs,
damping=damping, verbose=verbose
)
[docs]class RRLSQRSolver(LinearSolver):
"""Regularized LSQR solver."""
[docs] def __init__(self, max_iterations=200, tolerance=1e-8, use_gpu=False,
parallel=False, n_jobs=-1, damping=0.1, verbose=False):
"""
Initialize regularized LSQR solver.
Args:
max_iterations: Maximum number of iterations
tolerance: Convergence tolerance
use_gpu: Whether to use GPU acceleration
parallel: Whether to use parallel computation
n_jobs: Number of parallel jobs
damping: Damping parameter (regularization strength)
verbose: Whether to print progress
"""
super().__init__(
method="rrlsqr", max_iterations=max_iterations, tolerance=tolerance,
use_gpu=use_gpu, parallel=parallel, n_jobs=n_jobs,
damping=damping, verbose=verbose
)
[docs]class RRLSSolver(LinearSolver):
"""Range-Restricted Least Squares solver."""
[docs] def __init__(self, max_iterations=200, tolerance=1e-8, use_gpu=False,
parallel=False, n_jobs=-1, damping=0.0, verbose=False):
"""
Initialize RRLS solver.
Args:
max_iterations: Maximum number of iterations
tolerance: Convergence tolerance
use_gpu: Whether to use GPU acceleration
parallel: Whether to use parallel computation
n_jobs: Number of parallel jobs
damping: Damping parameter
verbose: Whether to print progress
"""
super().__init__(
method="rrls", max_iterations=max_iterations, tolerance=tolerance,
use_gpu=use_gpu, parallel=parallel, n_jobs=n_jobs,
damping=damping, verbose=verbose
)
# Additional solver implementations
import scipy.linalg
[docs]def direct_solver(A, b, method="lu", **kwargs):
"""
Solve a linear system using direct methods.
Args:
A: System matrix
b: Right-hand side vector
method: Direct solver method ('lu', 'qr', 'svd', 'cholesky')
**kwargs: Additional parameters for specific methods
Returns:
Solution vector
"""
# Handle sparse matrices
if scipy.sparse.isspmatrix(A):
if method == "lu":
# Sparse LU decomposition
return splinalg.spsolve(A, b)
elif method == "cholesky":
# Check if A is symmetric positive definite
try:
factor = splinalg.cholesky(A.tocsc())
return factor.solve(b)
except:
print("Warning: Matrix not SPD, falling back to spsolve")
return splinalg.spsolve(A, b)
else:
# Fall back to sparse solve for other methods
return splinalg.spsolve(A, b)
else:
# Dense matrix solvers
if method == "lu":
# LU decomposition
return scipy.linalg.solve(A, b)
elif method == "qr":
# QR decomposition
q, r = scipy.linalg.qr(A)
return scipy.linalg.solve_triangular(r, q.T @ b)
elif method == "svd":
# SVD decomposition
u, s, vh = scipy.linalg.svd(A, full_matrices=False)
# Filter small singular values
tol = kwargs.get('tol', 1e-10)
s_inv = np.where(s > tol, 1/s, 0)
return vh.T @ (s_inv * (u.T @ b))
elif method == "cholesky":
# Cholesky decomposition
try:
L = scipy.linalg.cholesky(A, lower=True)
return scipy.linalg.solve_triangular(
L.T,
scipy.linalg.solve_triangular(L, b, lower=True),
lower=False
)
except:
print("Warning: Matrix not SPD, falling back to LU")
return scipy.linalg.solve(A, b)
else:
raise ValueError(f"Unknown direct solver method: {method}")
[docs]class TikhonvRegularization:
"""Tikhonov regularization for ill-posed problems."""
[docs] def __init__(self, regularization_matrix=None,
alpha=1.0, regularization_type='identity'):
"""
Initialize Tikhonov regularization.
Args:
regularization_matrix: Custom regularization matrix (if None, one will be generated)
alpha: Regularization parameter
regularization_type: Type of regularization ('identity', 'gradient', 'laplacian')
"""
self.alpha = alpha
self.regularization_matrix = regularization_matrix
self.regularization_type = regularization_type
[docs] def create_regularization_matrix(self, n):
"""
Create regularization matrix based on the selected type.
Args:
n: Size of model vector
Returns:
Regularization matrix
"""
if self.regularization_type == 'identity':
# 0th order Tikhonov (identity matrix)
return scipy.sparse.eye(n)
elif self.regularization_type == 'gradient':
# 1st order Tikhonov (gradient operator)
D = scipy.sparse.diags([[-1], [1]], offsets=[0, 1], shape=(n-1, n))
return D
elif self.regularization_type == 'laplacian':
# 2nd order Tikhonov (Laplacian operator)
D = scipy.sparse.diags([[1], [-2], [1]], offsets=[-1, 0, 1], shape=(n-2, n))
return D
else:
raise ValueError(f"Unknown regularization type: {self.regularization_type}")
[docs] def apply(self, A, b, solver=None):
"""
Apply Tikhonov regularization to the linear system.
Args:
A: System matrix
b: Right-hand side vector
solver: Solver to use (None for direct solver)
Returns:
Regularized solution
"""
m = A.shape[1]
# Create regularization matrix if not provided
if self.regularization_matrix is None:
L = self.create_regularization_matrix(m)
else:
L = self.regularization_matrix
# Augment system with regularization
A_aug = scipy.sparse.vstack([A, np.sqrt(self.alpha) * L])
b_aug = np.hstack([b, np.zeros(L.shape[0])])
# Solve regularized system
if solver is None:
# Use direct solver for small systems
if A.shape[0] * A.shape[1] < 1e6:
try:
return direct_solver(A_aug.T @ A_aug, A_aug.T @ b_aug)
except:
# Fall back to LSQR
return splinalg.lsqr(A_aug, b_aug)[0]
else:
# Use LSQR for large systems
return splinalg.lsqr(A_aug, b_aug)[0]
else:
# Use provided solver
return solver.solve(A_aug, b_aug)
[docs]class IterativeRefinement:
"""
Iterative refinement to improve accuracy of a solution to a linear system.
"""
[docs] def __init__(self, max_iterations=5, tolerance=1e-10,
use_double_precision=True):
"""
Initialize iterative refinement.
Args:
max_iterations: Maximum number of refinement iterations
tolerance: Convergence tolerance
use_double_precision: Whether to use double precision for residual
"""
self.max_iterations = max_iterations
self.tolerance = tolerance
self.use_double_precision = use_double_precision
[docs] def refine(self, A, b, x0, solver_func):
"""
Perform iterative refinement.
Args:
A: System matrix
b: Right-hand side vector
x0: Initial solution
solver_func: Function that solves A*x = b
Returns:
Improved solution
"""
x = x0.copy()
for i in range(self.max_iterations):
# Compute residual (optionally in higher precision)
if self.use_double_precision and not isinstance(x, np.float64):
residual = b - A.dot(x.astype(np.float64))
residual = residual.astype(x.dtype)
else:
residual = b - A.dot(x)
# Check convergence
if np.linalg.norm(residual) < self.tolerance:
break
# Solve for correction
correction = solver_func(A, residual)
# Update solution
x = x + correction
return x
[docs]def get_optimal_solver(A, b, estimate_condition=True,
time_limit=None, memory_limit=None):
"""
Automatically select the optimal solver for a given linear system.
Args:
A: System matrix
b: Right-hand side vector
estimate_condition: Whether to estimate condition number
time_limit: Maximum allowed solution time (seconds)
memory_limit: Maximum allowed memory usage (bytes)
Returns:
Tuple of (solver_object, solver_info)
"""
# Get matrix info
is_sparse = scipy.sparse.isspmatrix(A)
n, m = A.shape
# Estimate memory requirements
if is_sparse:
nnz = A.nnz
density = nnz / (n * m)
memory_estimate = nnz * 8 * 3 # Rough estimate for sparse solvers
else:
density = 1.0
memory_estimate = n * m * 8 * 2 # Rough estimate for dense solvers
# Check memory limit
if memory_limit is not None and memory_estimate > memory_limit:
# Use iterative method with low memory requirements
solver = CGLSSolver(max_iterations=min(n, 1000))
return solver, {"type": "cgls", "reason": "memory_limit"}
# Check problem size
if n * m < 1e6 and density > 0.2:
# Small, relatively dense problem
try:
# Estimate condition number (if requested)
if estimate_condition:
if is_sparse:
# For sparse matrices, use cheaper estimator
try:
import scipy.sparse.linalg as spla
lu = spla.splu(A.tocsc())
condition_est = lu.rcond
well_conditioned = condition_est > 1e-6
except:
well_conditioned = True # Assume well-conditioned if estimation fails
else:
# For dense matrices, use SVD-based estimator
try:
s = scipy.linalg.svdvals(A)
condition_number = s[0] / s[-1]
well_conditioned = condition_number < 1e6
except:
well_conditioned = True # Assume well-conditioned if estimation fails
else:
well_conditioned = True
if well_conditioned:
# Use direct solver for well-conditioned problems
if is_sparse:
solver = lambda A, b: direct_solver(A, b, method="lu")
return solver, {"type": "direct_sparse", "method": "lu"}
else:
# Check if matrix is symmetric
is_symmetric = np.allclose(A, A.T)
if is_symmetric:
try:
# Check if positive definite
scipy.linalg.cholesky(A)
solver = lambda A, b: direct_solver(A, b, method="cholesky")
return solver, {"type": "direct_dense", "method": "cholesky"}
except:
pass
solver = lambda A, b: direct_solver(A, b, method="lu")
return solver, {"type": "direct_dense", "method": "lu"}
else:
# Ill-conditioned problem, use regularized solver
tikhonov = TikhonvRegularization(alpha=1e-6)
solver = lambda A, b: tikhonov.apply(A, b)
return solver, {"type": "tikhonov", "condition": "ill"}
except Exception as e:
# Fall back to iterative solver
print(f"Warning: Direct solver selection failed: {str(e)}")
# Large or sparse problem, use iterative solver
# Check if matrix is square
if n == m:
# Square system, try conjugate gradient variants
is_symmetric = False
if is_sparse:
# Cheap test for symmetry
is_symmetric = (A - A.T).nnz == 0
else:
is_symmetric = np.allclose(A, A.T)
if is_symmetric:
# For symmetric systems
try:
# Test for positive definiteness
if is_sparse:
# Randomly sample a few values on diagonal
import random
pos_def = all(A[i,i] > 0 for i in random.sample(range(n), min(10, n)))
else:
pos_def = np.all(np.linalg.eigvalsh(A) > 0)
if pos_def:
# Symmetric positive definite, use CG
solver = lambda A, b: splinalg.cg(A, b)[0]
return solver, {"type": "cg", "matrix": "spd"}
except:
pass
# Symmetric but not necessarily positive definite, use MINRES
solver = lambda A, b: splinalg.minres(A, b)[0]
return solver, {"type": "minres", "matrix": "symmetric"}
else:
# General square system, use GMRES
solver = lambda A, b: splinalg.gmres(A, b)[0]
return solver, {"type": "gmres", "matrix": "square"}
# Rectangular or fallback, use LSQR
solver = RRLSQRSolver(max_iterations=min(n, 1000))
return solver, {"type": "rrlsqr", "matrix": "rectangular"}