"""Math for MD
Description:
This module provides a unified interface for molecular dynamics and
structural analysis routines within the reForge package. It wraps a variety
of operations including FFT-based cross-correlation, covariance matrix
computation, perturbation matrix calculations (for DFI/DCI metrics), and
elastic network model (ENM) Hessian evaluations. Both CPU and GPU implementations
are supported, with fallbacks to CPU methods if CUDA is not available.
Usage Example:
>>> import numpy as np
>>> from mdm import fft_ccf, calc_and_save_covmats, inverse_matrix
>>>
>>> # Compute FFT-based cross-correlation function in serial mode
>>> ccf = fft_ccf(signal1, signal2, mode='serial')
>>>
>>> # Calculate and save covariance matrices from trajectory positions
>>> calc_and_save_covmats(positions, outdir='./covmats', n=5)
>>>
>>> # Compute the inverse of a matrix using the unified inversion wrapper
>>> inv_mat = inverse_matrix(matrix, device='cpu_sparse')
Requirements:
- Python 3.x
- MDAnalysis
- NumPy
- CuPy (if GPU routines are used)
- Pandas
- reForge utilities (logger, etc.)
- reForge rfgmath modules (rcmath, rpymath)
Author: DY
Date: YYYY-MM-DD
"""
import os
import numpy as np
import cupy as cp
from reforge.utils import logger # removed unused imports: sys, MDAnalysis, pandas, timeit, memprofit, cuda_detected
from reforge.rfgmath import rcmath, rpymath
[docs]
def fft_ccf(*args, mode="serial", **kwargs):
"""
Unified wrapper for FFT-based correlation functions.
This function dispatches to one of the internal FFT correlation routines
based on the specified mode:
- 'serial' for sfft_ccf,
- 'parallel' for pfft_ccf, or
- 'gpu' for gfft_ccf.
Parameters
----------
*args :
Positional arguments for the chosen correlation function.
mode : str, optional
Mode to use ('serial', 'parallel', or 'gpu'). Default is 'serial'.
**kwargs :
Additional keyword arguments for the internal routines.
Returns
-------
np.ndarray
The computed correlation function.
Raises
------
ValueError
If an unsupported mode is specified.
"""
if mode == "serial":
return rpymath.sfft_ccf(*args, **kwargs)
if mode == "parallel":
return rpymath.pfft_ccf(*args, **kwargs)
if mode == "gpu":
result = rpymath.gfft_ccf(*args, **kwargs)
# Use .get() only if available (e.g. for CuPy arrays)
return result.get() if hasattr(result, "get") else result
raise ValueError("Mode must be 'serial', 'parallel' or 'gpu'.")
[docs]
def ccf(*args, **kwargs):
"""Similar to the previous. Unified wrapper for calculating cross-correlations."""
corr = rpymath.ccf(*args, **kwargs)
return corr
[docs]
def covariance_matrix(positions, dtype=np.float64):
"""Compute the covariance matrix from trajectory positions.
Parameters
----------
positions : np.ndarray
Array of position coordinates.
dtype : data-type, optional
Desired data type (default is np.float64).
Returns
-------
np.ndarray
The computed covariance matrix.
"""
return rpymath.covariance_matrix(positions, dtype=dtype)
[docs]
def calc_and_save_covmats(positions, outdir, n=1, outtag="covmat", dtype=np.float32):
"""Calculate and save covariance matrices by splitting a trajectory into segments.
Parameters
----------
positions : np.ndarray
Array of positions with shape (n_coords, n_frames).
outdir : str
Directory where the covariance matrices will be saved.
n : int, optional
Number of segments to split the trajectory into (default is 1).
outtag : str, optional
Base tag for output file names (default is 'covmat').
dtype : data-type, optional
Desired data type for covariance computation (default is np.float32).
Returns
-------
None
"""
trajs = np.array_split(positions, n, axis=-1)
for idx, traj in enumerate(trajs, start=1):
logger.info("Processing covariance matrix %d", idx)
covmat = covariance_matrix(traj, dtype=dtype)
outfile = os.path.join(outdir, f"{outtag}_{idx}.npy")
np.save(outfile, covmat)
logger.info("Saved covariance matrix to %s", outfile)
[docs]
def calc_and_save_rmsf(positions, outdir, n=1, outtag="rmsf", dtype=np.float64):
"""Calculate and save RMSF by splitting a trajectory into segments.
Parameters
----------
positions : np.ndarray
Array of positions with shape (n_coords, n_frames).
outdir : str
Directory where the RMSF data will be saved.
n : int, optional
Number of segments to split the trajectory into (default is 1).
outtag : str, optional
Base tag for output file names (default is 'rmsf').
dtype : data-type, optional
Desired data type for covariance computation (default is np.float32).
Returns
-------
None
"""
trajs = np.array_split(positions, n, axis=-1)
for idx, traj in enumerate(trajs, start=1):
logger.info("Processing segment %d", idx)
rmsf_comps = np.std(traj, axis=-1, dtype=dtype)
rmsf_comps_reshaped = np.reshape(rmsf_comps, (len(rmsf_comps)//3, 3))
rmsf_sq = np.sum(rmsf_comps_reshaped**2, axis=-1)
rmsf = np.sqrt(rmsf_sq)
outfile = os.path.join(outdir, f"{outtag}_{idx}.npy")
np.save(outfile, rmsf)
logger.info("Saved RMSF to %s", outfile)
logger.info("Done!")
##############################################################
## DFI / DCI Calculations
##############################################################
[docs]
def perturbation_matrix(covmat, dtype=np.float64):
"""Compute the perturbation matrix from a covariance matrix.
Parameters
----------
covmat : np.ndarray
The covariance matrix.
dtype : data-type, optional
Desired data type (default is np.float64).
Returns
-------
np.ndarray
The computed perturbation matrix.
"""
covmat = covmat.astype(np.float64)
pertmat = rcmath.perturbation_matrix(covmat)
return pertmat
[docs]
def perturbation_matrix_iso(covmat, dtype=np.float64):
"""Compute the perturbation matrix from a covariance matrix"""
covmat = covmat.astype(np.float64)
pertmat = rcmath.perturbation_matrix_iso(covmat)
return pertmat
[docs]
def td_perturbation_matrix(covmat, dtype=np.float64):
"""Compute the block-wise (td) perturbation matrix from a covariance matrix.
Parameters
----------
covmat : np.ndarray
The covariance matrix.
dtype : data-type, optional
Desired data type (default is np.float64).
Returns
-------
np.ndarray
The computed block-wise perturbation matrix.
"""
covmat = covmat.astype(np.float64)
pertmat = rcmath.td_perturbation_matrix(covmat)
return pertmat
[docs]
def dfi(pert_mat):
"""Calculate the Dynamic Flexibility Index (DFI) from a perturbation matrix.
Parameters
----------
pert_mat : np.ndarray
The perturbation matrix.
Returns
-------
np.ndarray
The DFI values.
"""
dfi_val = np.average(pert_mat, axis=-1)
return dfi_val
[docs]
def dci(pert_mat, asym=False):
"""Calculate the Dynamic Coupling Index (DCI) from a perturbation matrix.
Parameters
----------
pert_mat : np.ndarray
The perturbation matrix.
asym : bool, optional
If True, return an asymmetric version (default is False).
Returns
-------
np.ndarray
The DCI matrix.
"""
dci_val = pert_mat / np.average(pert_mat, axis=-1, keepdims=True)
if asym:
dci_val = dci_val - dci_val.T
return dci_val
[docs]
def group_molecule_dci(pert_mat, groups=None, asym=False, transpose=False):
"""Calculate the DCI between groups of atoms and the remainder of the molecule.
Parameters
----------
pert_mat : np.ndarray
The perturbation matrix.
groups : list of lists, optional
A list of groups, each containing indices of atoms.
Defaults to a list containing an empty list.
asym : bool, optional
If True, use the asymmetric DCI (default is False).
Returns
-------
list of np.ndarray
A list of DCI values for each group.
"""
if groups is None:
groups = [[]]
dcis = []
dci_tot = pert_mat / np.sum(pert_mat, axis=-1, keepdims=True)
if asym:
dci_tot = dci_tot - dci_tot.T
if transpose:
dci_tot = dci_tot.T
for ids in groups:
top = np.sum(dci_tot[:, ids], axis=-1) * pert_mat.shape[0]
bot = len(ids)
dci_val = top / bot
dcis.append(dci_val)
return dcis
[docs]
def group_group_dci(pert_mat, groups=None, asym=False):
"""Calculate the inter-group DCI matrix.
Parameters
----------
pert_mat : np.ndarray
The perturbation matrix.
groups : list of lists, optional
A list of groups, each containing indices of atoms.
Defaults to a list containing an empty list.
asym : bool, optional
If True, compute the asymmetric DCI (default is False).
Returns
-------
list of lists
A 2D list containing the DCI values between each pair of groups.
"""
if groups is None:
groups = [[]]
dcis = []
dci_tot = pert_mat / np.sum(pert_mat, axis=-1, keepdims=True)
if asym:
dci_tot = dci_tot - dci_tot.T
for ids1 in groups:
temp = []
for ids2 in groups:
idx1, idx2 = np.meshgrid(ids1, ids2, indexing="ij")
top = np.sum(dci_tot[idx1, idx2]) * pert_mat.shape[0]
bot = len(ids1) * len(ids2)
dci_val = top / bot
temp.append(dci_val)
dcis.append(temp)
return dcis
##############################################################
## Elastic Network Model (ENM)
##############################################################
[docs]
def hessian(vecs, cutoff, spring_constant=1e3, dd=0):
"""Compute the Hessian matrix using an elastic network model.
Parameters
----------
vecs : np.ndarray
Coordinate matrix of shape (n, 3) where each row corresponds to a residue.
cutoff : float
Distance cutoff threshold.
spring_constant : float
Base spring constant.
dd : int
Exponent modifier for the inverse distance.
Returns
-------
np.ndarray
The computed Hessian matrix.
"""
# pylint: disable=c-extension-no-member
return rcmath.hessian(vecs, cutoff, spring_constant, dd)
[docs]
def inverse_matrix(matrix, device="cpu_sparse", k_singular=6, n_modes=100, dtype=None, **kwargs):
"""Unified wrapper for computing the inverse of a matrix via eigen-decomposition.
Depending on the 'device' parameter, the function selects an appropriate routine.
Parameters
----------
matrix : np.ndarray
The input matrix.
device : str, optional
Inversion method ('cpu_sparse', 'cpu_dense', 'gpu_sparse', or 'gpu_dense').
Default is 'cpu_sparse'.
k_singular : int, optional
Number of smallest eigenvalues to set to zero (default is 6).
n_modes : int, optional
Number of eigenmodes to compute/consider.
dtype : data-type, optional
Desired data type for computations (default: matrix.dtype).
**kwargs :
Additional keyword arguments for the eigensolver.
Returns
-------
np.ndarray or cp.ndarray
The computed inverse matrix.
"""
if dtype is None:
dtype = matrix.dtype
if device.lower().startswith("gpu"):
try:
if not cp.cuda.is_available():
raise RuntimeError("CUDA not available.")
if device.lower() == "gpu_sparse":
if device.lower() == "gpu_sparse":
return rpymath.inverse_sparse_matrix_gpu(
matrix, k_singular=k_singular, n_modes=n_modes, dtype=dtype, **kwargs
)
if device.lower() == "gpu_dense":
return rpymath.inverse_matrix_gpu(
matrix, k_singular=k_singular, n_modes=n_modes, dtype=dtype, **kwargs
)
logger.info("Unknown GPU method; falling back to CPU sparse inversion.")
return rpymath.inverse_sparse_matrix_cpu(
matrix, k_singular=k_singular, n_modes=n_modes, dtype=dtype, **kwargs
)
except Exception as e: # pylint: disable=broad-exception-caught
logger.info("GPU inversion failed with error '%s'. Falling back to CPU sparse inversion.", e)
return rpymath.inverse_sparse_matrix_cpu(
matrix, k_singular=k_singular, n_modes=n_modes, dtype=dtype, **kwargs
)
if device.lower() == "cpu_dense":
return rpymath.inverse_matrix_cpu(
matrix, k_singular=k_singular, n_modes=n_modes, dtype=dtype, **kwargs
)
return rpymath.inverse_sparse_matrix_cpu(
matrix, k_singular=k_singular, n_modes=n_modes, dtype=dtype, **kwargs
)
##############################################################
## Miscellaneous Functions
##############################################################
[docs]
def percentile(x):
"""Compute the percentile ranking for each element in an array.
Parameters
----------
x : np.ndarray
Input array.
Returns
-------
np.ndarray
An array containing the percentile (from 0 to 1) of each element in x.
"""
sorted_idx = np.argsort(x)
px = np.zeros(len(x))
for n in range(len(x)):
px[n] = np.where(sorted_idx == n)[0][0] / len(x)
return px
if __name__ == "__main__":
pass