Source code for mdcraft.lib.correlation

from __future__ import annotations
from typing import TYPE_CHECKING
import warnings

import numpy as np
from scipy import fft as pfft

if TYPE_CHECKING:  # pragma: no cover
    from .. import float_t, complex_t


[docs] def correlation( x: np.ndarray[float_t | complex_t], y: np.ndarray[float_t | complex_t] | None = None, /, axis: int | None = None, *, average: bool = False, fft: bool = True, symmetrize: bool = False, vector: bool = False, ) -> np.ndarray[float_t | complex_t]: """ Evaluates the autocorrelation function (ACF) :math:`\\mathrm{R_\\mathbf{XX}}(\\tau)` or cross-correlation function (CCF) :math:`\\mathrm{R_\\mathbf{XY}}(\\tau)` of time series :math:`\\mathbf{X}(t)` and :math:`\\mathbf{Y}(t)`. Using a naive sliding window technique, the ACF for a data set :math:`\\mathbf{X}(t)` can be computed using .. math:: \\mathrm{R}_{\\mathbf{XX}}(\\tau) =\\langle\\mathbf{X}(t_0+\\tau)\\cdot\\mathbf{X}^*(t_0)\\rangle =\\dfrac{1}{N_\\tau}\\sum_{j=1}^{N_\\tau} \\mathbf{X}(t_j+\\tau)\\cdot\\mathbf{X}^*(t_j) where :math:`\\tau` is the time lag, :math:`t_j` is an arbitrary reference time, :math:`N_\\tau` is the number of possible reference times, and the asterisk (:math:`^*`) denotes the complex conjugate. Similarly, the CCF for data sets :math:`\\mathbf{X}(t)` and :math:`\\mathbf{Y}(t)` can be computed using .. math:: \\mathrm{R}_{\\mathbf{XY}}(\\tau) =\\langle\\mathbf{X}(t_0+\\tau)\\cdot\\mathbf{Y}^*(t_0)\\rangle =\\dfrac{1}{N_\\tau}\\sum_{j=1}^{N_\\tau} \\mathbf{X}(t_j+\\tau)\\cdot\\mathbf{Y}^*(t_j) To minimize statistical noise, the ACF/CCF is calculated for and averaged over all possible reference times :math:`t_j`. As such, this approach has a time complexity of :math:`\\mathcal{O}(N^2)`, making it infeasible for large data sets. Alternatively, the ACF/CCF can be efficiently computed using the fast convolution algorithm (FCA) [1]_ [2]_, which leverages the Wiener–Khinchin theorem. FCA uses fast Fourier transforms (FFT) and has a time complexity of :math:`\\mathcal{O}(N\\log{N})`. Using FCA, the ACF for a data set :math:`\\mathbf{X}(t)` is computed using .. math:: \\begin{gather*} \\hat{\\mathbf{X}}(\\omega)=\\mathcal{F}[\\mathbf{X}(t)]\\\\ \\mathrm{R}_{\\mathbf{XX}}(\\tau)=\\mathcal{F}^{-1} [\\hat{\\mathbf{X}}(\\omega)\\hat{\\mathbf{X}}^*(\\omega)] \\end{gather*} and the CCF for data sets :math:`\\mathbf{X}(t)` and :math:`\\mathbf{Y}(t)` is computed using .. math:: \\mathrm{R}_{\\mathbf{XY}}(\\tau)=\\mathcal{F}^{-1}[\\mathcal{F} [\\mathbf{X}(t)]\\cdot\\mathcal{F}[\\mathbf{Y}(t)]] Parameters ---------- x : `numpy.ndarray`, positional-only Time evolution of scalar or :math:`d`-dimensional data :math:`\\mathbf{X}(t)` for :math:`N` entities over :math:`N_\\mathrm{b}` blocks of :math:`N_t` times each. **Shape**: :math:`(N_t[,d])`, :math:`(N_t,N[,d])`, :math:`(N_\\mathrm{b},N_t[,d])`, or :math:`(N_\\mathrm{b},N_t,N[,d])`. y : `numpy.ndarray`, positional-only, optional Time evolution of scalar or :math:`d`-dimensional data :math:`\\mathbf{Y}(t)` for another :math:`N` entities over :math:`N_\\mathrm{b}` blocks of :math:`N_t` times each. If provided, the CCF for `x` and `y` is evaluated. Otherwise, the ACF for `x` is evaluated. **Shape**: Same as `x`. axis : `int`, optional Axis along which time evolves. If not specified, the axis is determined automatically using the shape of `x`. average : `bool`, keyword-only, default: :code:`True` Specifies whether to average the ACF/CCF over all entities. Only available if `x` and `y` contain information for multiple entities. fft : `bool`, keyword-only, default: :code:`True` Specifies whether to use fast Fourier transforms (FFT) to evaluate the ACF/CCF. symmetrize : `bool`, keyword-only, default: :code:`False` Specifies whether to double the ACF or to combine the negative and positive time lags for the CCF. vector : `bool`, keyword-only, default: :code:`False` Specifies whether `x` and `y` contain vectors. If :code:`True`, the ACF/CCF is summed over the last axis. Returns ------- corr : `numpy.ndarray` ACF :math:`\\mathrm{R}_{\\mathbf{XX}}(\\tau)` or CCF :math:`\\mathrm{R}_{\\mathbf{XY}}(\\tau)`. .. container:: **Shape**: For ACF, the shape is that of `x` but with the following modifications: * If :code:`average=True`, the axis corresponding to the :math:`N` entities is no longer present. * If :code:`vector=True`, the last axis is no longer present. For CCF, the shape is that of `x` but with the following modifications: * If :code:`average=True`, the axis corresponding to the :math:`N` entities is no longer present. * If :code:`symmetrize=False`, the axis corresponding to the :math:`N_t` times now has a length of :math:`2N_t-1` to accomodate negative and positive time lags. * If :code:`vector=True`, the last axis is no longer present. References ---------- .. [1] Kneller, G. R.; Keiner, V.; Kneller, M.; Schiller, M. NMOLDYN: A Program Package for a Neutron Scattering Oriented Analysis of Molecular Dynamics Simulations. *Computer Physics Communications* **1995**, *91* (1–3), 191–214. https://doi.org/10.1016/0010-4655(95)00048-K. .. [2] Calandrini, V.; Pellegrini, E.; Calligari, P.; Hinsen, K.; Kneller, G. R. NMoldyn - Interfacing Spectroscopic Experiments, Molecular Dynamics Simulations and Models for Time Correlation Functions. *JDN* **2011**, *12*, 201–232. https://doi.org/10.1051/sfn/201112010. """ # Validate input arguments x = np.asarray(x) if x.size == 0: raise ValueError("The arrays cannot be empty.") n_dim = x.ndim if not 1 <= n_dim <= 4: raise ValueError( "The arrays must be one-, two-, three-, or four-dimensional." ) if vector and n_dim == 1: raise ValueError( "The arrays cannot be one-dimensional if `vector=True`." ) if y is not None: y = np.asarray(y) if x.shape != y.shape: raise ValueError("The arrays must have the same shape.") # Check or set axis along which to compute the ACF/CCF if axis is None: if n_dim == 4: axis = 1 else: axis = 0 if n_dim > 1: warnings.warn( "The axis along which to compute the ACF/CCF was" "not specified and is ambiguous for " "multidimensional arrays. By default, the ACF/CCF " "will be evaluated along the first axis (`axis=0`)." ) elif axis not in {0, 1}: raise ValueError( "The ACF/CCF can only be computed along the first " "(`axis=0`) or second axis (`axis=1`)." ) # Compute the ACF/CCF n_t = x.shape[axis] slices = (slice(None),) * axis if fft: # Determine whether faster real-valued FFT algorithms can be used real = np.isrealobj(x) and (y is None or np.isrealobj(y)) if real: f_fft = pfft.rfft f_ifft = pfft.irfft else: f_fft = pfft.fft f_ifft = pfft.ifft # Initialize array with axis slice(s) needed for normalization later axis_slices = [slice(n_t)] # Compute the power spectral density by first zero-padding the # arrays for linear convolution and then inverting it to get the # ACF/CCF n_fft = 2 * pfft.next_fast_len(n_t, real=real) if y is None: ft = f_fft(x, n=n_fft, axis=axis) corr = f_ifft(ft * ft.conj(), axis=axis) corr = (symmetrize + 1) * (corr[:, :n_t] if axis else corr[:n_t]) else: ft_x = f_fft(x, n=n_fft, axis=axis) ft_y = f_fft(y, n=n_fft, axis=axis) ft = ft_x.conj() * ft_y if symmetrize: corr = f_ifft(ft + ft_x * ft_y.conj(), axis=axis)[*slices, :n_t] else: corr = f_ifft(ft, axis=axis) axis_slices.append(slice(1 - n_t, None)) # Sum over the last axis if the arrays contain vectors if vector: corr = corr.sum(axis=-1) else: # Initialize array with axis slice(s) needed for normalization later axis_slices = [slice(-n_t, None)] # Use opposite sliding windows to get relevant data for each time lag if y is None: if n_dim == 1: corr = np.fromiter( (np.dot(x[i:], x[: -i if i else None]) for i in range(n_t)), x.dtype, n_t, ) else: ss_prefix = "b" * axis ss_lhs = f"{ss_prefix}t...{'d' * vector}" corr = np.stack( [ np.einsum( f"{ss_lhs},{ss_lhs}->{ss_prefix}...", x[*slices, i:], x[*slices, : -i if i else None], ) for i in range(n_t) ], axis=axis, ) else: start = np.r_[np.zeros(n_t - 1, np.uint32), 0:n_t] stop = np.r_[1 : n_t + 1, np.full(n_t - 1, n_t, np.uint32)] if n_dim == 1: corr = np.fromiter( ( np.dot(x[i:j], y[k:m]) for i, j, k, m in zip( start[::-1], stop[::-1], start, stop ) ), x.dtype, 2 * n_t - 1, ) else: ss_prefix = "b" * axis ss_lhs = f"{ss_prefix}t...{'d' * vector}" corr = np.stack( [ np.einsum( f"{ss_lhs},{ss_lhs}->{ss_prefix}...", x[*slices, i:j], y[*slices, k:m], ) for i, j, k, m in zip( start[::-1], stop[::-1], start, stop ) ], axis=axis, ) # Double the ACF or combine the negative and positive time lags for # the CCF, if desired if symmetrize: if y is None: corr *= 2 else: corr = corr[*slices, n_t - 1 :] + corr[*slices, n_t - 1 :: -1] else: axis_slices.append(slice(n_t - 1)) # Determine the axes over which to expand the dimensions of the # reversed time array for correct matrix division axes = list(range(n_dim - vector)) axes.remove(axis) # Normalize the ACF/CCF corr[*slices, axis_slices[0]] /= np.expand_dims(np.arange(n_t, 0, -1), axes) if corr.shape[axis] != n_t: corr[*slices, axis_slices[1]] /= np.expand_dims(np.arange(1, n_t), axes) if fft: corr = pfft.fftshift(corr, axis)[ *slices, (start := n_fft // 2 - n_t + 1) : start + 2 * n_t - 1 ] # Average over all entities, if desired if average and axis != (axis_avg := n_dim - vector - 1): return corr.mean(axis=axis_avg) return corr
[docs] def msd( r_i: np.ndarray[float_t], r_j: np.ndarray[float_t] | None = None, /, axis: int | None = None, *, average: bool = True, fft: bool = True, ) -> np.ndarray[float_t]: """ Evaluates the mean squared displacement (MSD) or the cross mean squared displacement (CMSD) of positions :math:`\\mathbf{r}_i(t)` and :math:`\\mathbf{r}_j(t)`. Using the Einstein relation, the MSD for a set of positions :math:`\\mathbf{r}_i(t)` can be computed using .. math:: \\mathrm{MSD}_i(\\tau)=\\langle[\\mathbf{r}_i(t_0+\\tau) -\\mathbf{r}_i(t_0)]^2\\rangle =\\dfrac{1}{N_\\tau}\\sum_{k=1}^{N_\\tau} [\\mathbf{r}_i(t_k+\\tau)-\\mathbf{r}_i(t_k)]^2 where :math:`\\tau` is the time lag, :math:`t_j` is an arbitrary reference time, and :math:`N_\\tau` is the number of possible reference times. Similarly, the CMSD for two sets of positions :math:`\\mathbf{r}_i(t)` and :math:`\\mathbf{r}_j(t)` can be computed using .. math:: \\mathrm{CMSD}_{ij}(\\tau)&=\\langle [\\mathbf{r}_i(t_0+\\tau)-\\mathbf{r}_i(t_0)]\\cdot [\\mathbf{r}_j(t_0+\\tau)-\\mathbf{r}_j(t_0)]\\rangle\\\\ &=\\dfrac{1}{N_\\tau}\\sum_{k=1}^{N_\\tau} [\\mathbf{r}_i(t_k+\\tau)-\\mathbf{r}_i(t_k)]\\cdot [\\mathbf{r}_j(t_k+\\tau)-\\mathbf{r}_j(t_k)] To minimize statistical noise, the MSD/CMSD is calculated for and averaged over all possible reference times :math:`t_k`. Alternatively, the MSD/CMSD can be efficiently computed using the fast convolution algorithm (FCA) [1]_ [2]_, which leverages the Wiener–Khinchin theorem. FCA uses fast Fourier transforms (FFT) and has a time complexity of :math:`\\mathcal{O}(N\\log{N})`. Using FCA, the MSD for a set of positions :math:`\\mathbf{r}_i(t)` is computed using .. math:: \\mathrm{MSD}_{i,m}&=\\frac{1}{N_t-m} \\sum_{k=0}^{N_t-m-1} [\\mathbf{r}_{i,k+m}-\\mathbf{r}_{i,k}]^2\\\\ &=\\frac{1}{N_t-m}\\sum_{k=0}^{N_t-m-1} \\left[\\mathbf{r}_{i,k+m}^2+\\mathbf{r}_{i,k}^2\\right] -\\frac{2}{N_t-m}\\sum_{k=0}^{N_t-m-1} \\mathbf{r}_{i,k}\\cdot\\mathbf{r}_{i,k+m}\\\\ &=\\mathrm{S}_{ii,m}-2\\mathrm{R}_{ii,m} where :math:`i` is the species index, :math:`m` is the index corresponding to time lag :math:`\\tau`, :math:`\\mathrm{R}_{ii,m}` is the autocorrelation of :math:`\\mathbf{r}_i(t)`, and :math:`S_m` is evaluated using the recursive relation .. math:: \\begin{gather*} D_{ii,m}=\\mathbf{r}_{i,m}^2\\\\ Q_{ii,-1}=2\\sum_{k=0}^{N_t-1}D_{ii,k}\\\\ Q_{ii,m}=Q_{ii,m-1}-D_{ii,m-1}-D_{ii,N_t-m}\\\\ S_{ii,m}=\\frac{Q_{ii,m}}{N_t-m} \\end{gather*} Similarly, the CMSD for two sets of positions :math:`\\mathbf{r}_i(t)` and :math:`\\mathbf{r}_j(t)` is computed using .. math:: \\mathrm{CMSD}_{ij,m}=S_{ij,m}-2\\mathrm{R}_{ij,m} where :math:`\\mathrm{R}_{ij,m}` is the cross-correlation of :math:`\\mathbf{r}_i(t)` and :math:`\\mathbf{r}_j(t)`, and :math:`S_{ij,m}` is evaluated using the recursive relation .. math:: \\begin{gather*} D_{ij,m}=\\mathbf{r}_{i,m}\\cdot\\mathbf{r}_{j,m}\\\\ Q_{ij,-1}=2\\sum_{k=0}^{N_t-1}D_{ij,k}\\\\ Q_{ij,m}=Q_{ij,m-1}-D_{ij,m-1}-D_{ij,N_t-m}\\\\ S_{ij,m}=\\frac{Q_{ij,m}}{N_t-m} \\end{gather*} .. note:: `r_i` and `r_j` should be summed over all entities before being passed to this function if it is being used to evaluate the cross terms in the Onsager transport coefficients [3]_ .. math:: L_{ij}=\\frac{1}{6k_\\mathrm{B}T}\\lim_{t\\rightarrow\\infty} \\frac{d}{d\\tau}\\left\\langle\\sum_{\\alpha=1}^{N_i} [\\mathbf{r}_{i,\\alpha}(t_0+\\tau) -\\mathbf{r}_{i,\\alpha}(t_0)]\\cdot \\sum_{\\beta=1}^{N_j}[\\mathbf{r}_{j,\\beta}(t_0+\\tau) -\\mathbf{r}_{j,\\beta}(t_0)]\\right\\rangle Parameters ---------- r_i : `numpy.ndarray`, positional-only Time evolution of individual or summed :math:`d`-dimensional positions :math:`\\mathbf{r}_i(t)` for :math:`N` entities over :math:`N_\\mathrm{b}` blocks of :math:`N_t` times each. **Shape**: :math:`(N_t,d)`, :math:`(N_t,N,d)`, :math:`(N_\\mathrm{b},N_t,d)`, or :math:`(N_\\mathrm{b},N_t,N,d)`. **Reference unit**: :math:`\\mathrm{nm}`. r_j : `numpy.ndarray`, positional-only, optional Time evolution of individual or summed :math:`d`-dimensional positions :math:`\\mathbf{r}_j(t)` for another :math:`N` entities over :math:`N_\\mathrm{b}` blocks of :math:`N_t` times each. **Shape**: Same as `r_i`. **Reference unit**: :math:`\\mathrm{nm}`. axis : `int`, optional Axis along which time evolves. If not specified, the axis is determined automatically using the shape of `r_i`. average : `bool`, keyword-only, default: :code:`True` Specifies whether to average the MSD/CMSD over all entities. Only available if `r_i` and `r_j` contain information for multiple entities. fft : `bool`, keyword-only, default: :code:`True` Specifies whether to use fast Fourier transforms (FFT) to evaluate the MSD/CMSD. Returns ------- disp : `numpy.ndarray` :math:`\\mathrm{MSD}_i(\\tau)` or :math:`\\mathrm{CMSD}_{ij}(\\tau)`. **Shape**: Shape of `r_i`, except the last axis is no longer present. If :code:`average=True`, the axis indexing the :math:`N` entities is also no longer present. **Reference unit**: :math:`\\mathrm{nm}^2`. References ---------- .. [1] Kneller, G. R.; Keiner, V.; Kneller, M.; Schiller, M. NMOLDYN: A Program Package for a Neutron Scattering Oriented Analysis of Molecular Dynamics Simulations. *Computer Physics Communications* **1995**, *91* (1–3), 191–214. https://doi.org/10.1016/0010-4655(95)00048-K. .. [2] Calandrini, V.; Pellegrini, E.; Calligari, P.; Hinsen, K.; Kneller, G. R. NMoldyn - Interfacing Spectroscopic Experiments, Molecular Dynamics Simulations and Models for Time Correlation Functions. *JDN* **2011**, *12*, 201–232. https://doi.org/10.1051/sfn/201112010. .. [3] Fong, K. D.; Self, J.; McCloskey, B. D.; Persson, K. A. Onsager Transport Coefficients and Transference Numbers in Polyelectrolyte Solutions and Polymerized Ionic Liquids. *Macromolecules* **2020**, *53* (21), 9503–9512. https://doi.org/10.1021/acs.macromol.0c02001. """ # Ensure input arrays have valid shapes r_i = np.asarray(r_i) if r_i.size == 0: raise ValueError("The position arrays cannot be empty.") ndim = r_i.ndim if not 2 <= ndim <= 4: raise ValueError( "The position arrays must be two-, three-, or four-dimensional." ) if r_j is not None: r_j = np.asarray(r_j) if r_i.shape != r_j.shape: raise ValueError("The position arrays must have the same shape.") # Check or set axis along which to compute the MSD/CMSD if axis is None: if ndim == 4: axis = 1 else: axis = 0 if ndim == 3: warnings.warn( "The axis along which to compute the MSD/CMSD " "was not specified and is ambiguous for " "three-dimensional position arrays. By default, " "the MSD/CMSD will be evaluated along the first " "axis (`axis=0`)." ) elif axis not in {0, 1}: raise ValueError( "The MSD/CMSD can only be computed along the first " "(`axis=0`) or second axis (`axis=1`)." ) # Compute the MSD/CMSD n_t = r_i.shape[axis] slices = (slice(None),) * axis if fft: # Evaluate necessary intermediate quantities R_ij = correlation( r_i, r_j, axis, average=False, symmetrize=True, vector=True ) D_ij = (r_i * (r_i if r_j is None else r_j)).sum(axis=-1) if ndim - axis == 3: # Evaluate the MSD/CMSD for each entity if not average: D_k = (np.vstack if ndim == 3 else np.hstack)( ( D_ij, np.zeros( r_i.shape[:axis] + (1,) + r_i.shape[axis + 1 : -1] ), ) ) return ( 2 * D_k.sum(axis=axis, keepdims=axis) * np.ones((*(1,) * axis, n_t, 1)) - np.cumsum( D_k[*slices, np.arange(-1, n_t - 1)] + D_k[*slices, n_t:0:-1], axis=axis, ) ) / np.arange(n_t, 0, -1)[:, None] - R_ij # Average the intermediate quantities over all entities R_ij = R_ij.mean(axis=ndim - 2) D_ij = D_ij.mean(axis=ndim - 2) # Evaluate the ensemble-averaged MSD/CMSD return ( 2 * D_ij.sum(axis=axis, keepdims=axis) * np.ones((1, n_t)) - np.concatenate( ( np.zeros_like(D_ij[*slices, :1]), np.cumsum( D_ij[*slices, : n_t - 1] + D_ij[*slices, n_t - 1 : 0 : -1], axis=axis, ), ), axis=axis, ) ) / np.arange(n_t, 0, -1) - R_ij else: if r_j is None: disp = np.stack( [ ( (r_i[*slices, : -i if i else None] - r_i[*slices, i:]) ** 2 ) .sum(axis=-1) .mean(axis=axis) for i in range(n_t) ], axis=axis, ) else: ss_prefix = "b" * axis disp = np.stack( [ np.einsum( f"{ss_prefix}t...d,{ss_prefix}t...d->{ss_prefix}t...", r_i[*slices, : -i if i else None] - r_i[*slices, i:], r_j[*slices, : -i if i else None] - r_j[*slices, i:], ).mean(axis=axis) for i in range(n_t) ], axis=axis, ) # Average over all entities, if desired if ndim - axis == 3 and average: return disp.mean(axis=ndim - 2) return disp