Source code for mdcraft.analysis.transport

"""
Transport properties
====================
.. moduleauthor:: Benjamin Ye <GitHub: @bbye98>

This module contains classes to evaluate the transport properties of
fluid systems.
"""

import itertools
from typing import Union
import warnings

import MDAnalysis as mda
from MDAnalysis.lib.log import ProgressBar
import numpy as np
from scipy import optimize

from .base import Hash, SerialAnalysisBase
from .. import FOUND_OPENMM, Q_, ureg
from ..algorithm import correlation
from ..algorithm.molecule import center_of_mass
from ..algorithm.topology import unwrap, wrap
from ..algorithm.unit import is_unitless, strip_unit
from ..fit.polynomial import poly1

if FOUND_OPENMM:
    from openmm import unit

[docs] def msd_fft( r_i: np.ndarray[float], r_j: np.ndarray[float] = None, /, axis: int = None, *, average: bool = True) -> np.ndarray[float]: r""" Evaluates the mean squared displacements (MSD) or the analogous cross displacements (CD) of positions :math:`\mathbf{r}_i(t)` and :math:`\mathbf{r}_j(t)` using fast Fourier transforms (FFT). .. seealso:: This function is an alias for :func:`mdcraft.algorithm.correlation.msd_fft`. """ return correlation.msd_fft(r_i, r_j, axis, average=average)
[docs] def msd_shift( r_i: np.ndarray[float], r_j: np.ndarray[float] = None, /, axis: int = None, *, average: bool = True) -> np.ndarray[float]: r""" Evaluates the mean squared displacements (MSD) or the analogous cross displacements (CD) of positions :math:`\mathbf{r}_i(t)` and :math:`\mathbf{r}_j(t)` using the Einstein relation. .. seealso:: This function is an alias for :func:`mdcraft.algorithm.correlation.msd_shift`. """ return correlation.msd_shift(r_i, r_j, axis, average=average)
[docs] def calculate_transport_coefficients( times: np.ndarray[float], msd_cross: np.ndarray[float], msd_self: np.ndarray[float], Ns: np.ndarray[int], dimensions: np.ndarray[float], kBT: float, start: int = 1, stop: int = None, scale: str = "log", *, start_self: int = None, stop_self: int = None, scale_self: str = None, enforce_linear: bool = True, verbose: bool = False ) -> tuple[np.ndarray[float], np.ndarray[float], np.ndarray[float]]: r""" Fits the mean squared displacements (MSDs) or the analogous cross displacements (CDs) to evaluate the self-diffusion coefficients :math:`D_i` and the Onsager transport coefficients :math:`L_{ij}` and :math:`L_{ii}^\mathrm{self}`. Parameters ---------- times : `numpy.ndarray` Changes in time :math:`t-t_0`. **Shape**: :math:`(N_t,)`. **Reference unit**: :math:`\mathrm{ps}`. msd_cross : `numpy.ndarray` MSDs and CDs that include the dimensionality scaling factor. **Shape**: :math:`(C(N_\mathrm{groups}+1,\,2),\,N_t)` or :math:`(C(N_\mathrm{groups}+1,\,2),\,N_\mathrm{blocks},\,N_t)`. **Reference unit**: :math:`\mathrm{Å}^2`. msd_self : `numpy.ndarray` Self MSDs that include the dimensionality scaling factor. **Shape**: :math:`(N_\mathrm{groups},\,N_t)` or :math:`(N_\mathrm{groups},\,N_\mathrm{blocks},\,N_t)`. **Reference unit**: :math:`\mathrm{Å}^2`. Ns : `numpy.ndarray` Number of atoms or centers of mass :math:`N_i` in each group. dimensions : `numpy.ndarray` System dimensions. **Shape**: :math:`(3,)`. **Reference unit**: :math:`\mathrm{Å}`. kBT : `float` Thermal energy scale. **Reference unit**: :math:`\mathrm{kJ/mol}`. start : `int`, default: :code:`1` Starting frame with respect to the interval used in :meth:`Onsager.run` for fitting the MSDs (or their analogs). stop : `int`, optional Ending frame with respect to the interval used in :meth:`Onsager.run` for fitting the MSDs (or their analogs). scale : `str`, :code:`{"log", "linear"}` Data scaling for fitting the MSDs (or their analogs) against time. .. container:: **Valid values**: * :code:`"linear"`: Linear :math:`x`- and :math:`y`-axes. * :code:`"log"`: Logarithmic :math:`x`- and :math:`y`-axes. start_self : `int`, keyword-only, optional Starting frame with respect to the interval used in :meth:`Onsager.run` for fitting the self MSDs. If not provided, `start_self` shares a value with `start`. stop_self : `int`, keyword-only, optional Ending frame with respect to the interval used in :meth:`Onsager.run` for fitting the self MSDs. If not provided, `stop_self` shares a value with `stop`. scale_self : `str`, keyword-only, optional Data scaling for fitting the self MSDs against time. If not provided, `scale_self` shares a value with `scale`. .. container:: **Valid values**: * :code:`"linear"`: Linear :math:`x`- and :math:`y`-axes. * :code:`"log"`: Logarithmic :math:`x`- and :math:`y`-axes. enforce_linear : `bool`, keyword-only, default: :code:`True` Enforce linear fits for data on a logarithmic scale by setting the slope to :math:`1`, or .. math:: \log(\mathrm{MSD})=\log(t)+b verbose : `bool`, keyword-only, default: :code:`False` Determines whether detailed progress is shown. Returns ------- L_ij : `numpy.ndarray` Onsager transport coefficients :math:`L_{ij}`. **Shape**: :math:`(N_\mathrm{blocks},\,N_\mathrm{groups},\, N_\mathrm{groups})`. **Reference unit**: :math:`\mathrm{mol/(kJ\cdotÅ\cdot ps)}`. L_ii_self : `numpy.ndarray` Self-diffusion contribution to the single-species Onsager transport coefficients :math:`L_{ii}^\mathrm{self}`. **Shape**: :math:`(N_\mathrm{blocks},\,N_\mathrm{groups})`. **Reference unit**: :math:`\mathrm{mol/(kJ\cdotÅ\cdot ps)}`. D_i : `numpy.ndarray` Self-diffusion coefficients :math:`D_i`. **Shape**: :math:`(N_\mathrm{blocks},\,N_\mathrm{groups})`. **Reference unit**: :math:`\mathrm{Å/ps}`. """ # Set default settings for fitting self MSDs if start_self is None: start_self = start if stop_self is None: stop_self = stop if scale_self is None: scale_self = scale # Preallocate arrays to hold Onsager transport coefficients and # self-diffusion coefficients ndim = msd_self.ndim if ndim not in {2, 3}: emsg = ("The arrays containing the cross- and self-MSDs have " "invalid shapes.") raise ValueError(emsg) elif ndim == 2: n_groups = msd_self.shape[0] n_blocks = 1 elif ndim == 3: n_groups, n_blocks = msd_self.shape[:2] L_ij = np.zeros((n_blocks, n_groups, n_groups)) D_i = np.zeros((n_blocks, n_groups)) # Get indices of upper-triangular entries in L_ij and D_i # arrays r_ud, c_ud = np.triu_indices(n_groups) # Calculate MSD "normalization" factor (kBT * V) denom = kBT * dimensions[~np.isclose(dimensions, 0)].prod() # Iterate through all blocks for b in ProgressBar(range(n_blocks), verbose=verbose): # Iterate through all unique group pairings for i, msd in enumerate(msd_cross[:, b] / denom): y = msd[start:stop] valid = np.isfinite(y) & (y > 0) y = y[valid] x = times[start:stop][valid] if len(x) > 1: # Calculate L_ij if scale == "linear": L_ij[b, r_ud[i], c_ud[i]] = np.polyfit(x, y, 1)[0] elif scale == "log": if enforce_linear: L_ij[b, r_ud[i], c_ud[i]] = np.exp( optimize.curve_fit(lambda x, b: poly1(x, 1, b), np.log(x), np.log(y))[0] ) else: fit = np.polyfit(np.log(x), np.log(y), 1) if abs(1 - fit[0]) >= 0.01: wmsg = ( f"The slope for the log(msd_cross[{i}]) " f"vs. log(times) fit is {fit[0]:.6f}." ) warnings.warn(wmsg) L_ij[b, r_ud[i], c_ud[i]] = np.exp(fit[1]) else: L_ij[b, r_ud[i], c_ud[i]] = np.nan # Mirror the L_ij matrix over the diagonal L_ij[b] = L_ij[b] + L_ij[b].T - np.diag(np.diag(L_ij[b])) # Iterate through all self groups for i, msd in enumerate(msd_self[:, b]): y = msd[start_self:stop_self] valid = np.isfinite(y) & (y > 0) y = y[valid] x = times[start_self:stop_self][valid] if len(x) > 1: # Calculate D_i if scale_self == "linear": D_i[b, i] = np.polyfit(x, y, 1)[0] elif scale_self == "log": if enforce_linear: D_i[b, i] = np.exp( optimize.curve_fit(lambda x, b: poly1(x, 1, b), np.log(x), np.log(y))[0] ) else: fit = np.polyfit(np.log(x), np.log(y), 1) if abs(1 - fit[0]) >= 0.01: wmsg = ( f"The slope for the log(msd_self[{i}]) " f"vs. log(times) fit is {fit[0]:.6f}." ) warnings.warn(wmsg) D_i[b, i] = np.exp(fit[1]) else: D_i[b, i] = np.nan return L_ij, Ns * D_i / denom, D_i
[docs] def calculate_conductivity( L_ij: np.ndarray[float], charges: np.ndarray[float], *, reduced: bool = False) -> np.ndarray[float]: r""" Calculates the ionic conductivity :math:`\kappa` using the Onsager transport coefficients :math:`L_{ij}`. Parameters ---------- L_ij : `numpy.ndarray` Onsager transport coefficients :math:`L_{ij}`. **Shape**: :math:`(N_\mathrm{blocks},\,N_\mathrm{groups},\, N_\mathrm{groups})`. **Reference unit**: :math:`\mathrm{mol/(kJ\cdotÅ\cdot ps)}`. charges : array-like Charges :math:`q_i` shared by all atoms or centers of mass in each species :math:`i`. **Shape**: :math:`(N_\mathrm{groups},)`. **Reference unit**: :math:`\mathrm{e}`. reduced : `bool`, keyword-only, default: :code:`False` Specifies whether the data is in reduced units. Returns ------- kappa : `numpy.ndarray` Conductivity :math:`\kappa`. **Shape**: :math:`(N_\mathrm{blocks},\,)`. **Reference unit**: :math:`\mathrm{C^2/(kJ\cdotÅ\cdot ps)`. **To SI unit**: :math:`1\times10^{19}\,\mathrm{S/m}`. """ kappas = np.einsum("bij,ij->b", L_ij, charges * charges[:, None]) if not reduced: kappas = (kappas * ureg.avogadro_constant * ureg.elementary_charge ** 2 * ureg.mole / ureg.coulomb ** 2).to("").magnitude return kappas
[docs] def calculate_electrophoretic_mobilities( L_ij: np.ndarray[float], charges: np.ndarray[float], rhos: np.ndarray[float], *, reduced: bool = False ) -> np.ndarray[float]: r""" Calculates the electrophoretic mobility :math:`\mu_i` of each species using the Onsager transport coefficients :math:`L_{ij}`. Parameters ---------- L_ij : `numpy.ndarray` Onsager transport coefficients :math:`L_{ij}`. **Shape**: :math:`(N_\mathrm{blocks},\,N_\mathrm{groups},\, N_\mathrm{groups})`. **Reference unit**: :math:`\mathrm{mol/(kJ\cdotÅ\cdot ps)}`. charges : array-like Charges :math:`q_i` shared by all atoms or centers of mass in each species :math:`i`. **Shape**: :math:`(N_\mathrm{groups},)`. **Reference unit**: :math:`\mathrm{e}`. rhos : array-like Number densities :math:`n_i` of the atoms or centers of mass in each species :math:`i`. **Shape**: :math:`(N_\mathrm{groups},)`. **Reference unit**: :math:`\mathrm{Å}^{-3}`. reduced : `bool`, keyword-only, default: :code:`False` Specifies whether the data is in reduced units. Returns ------- mus : `numpy.ndarray` Electrophoretic mobilities :math:`\mu_i` of the atoms or centers of mass in each species :math:`i`. **Shape**: :math:`(N_\mathrm{blocks},\,N_\mathrm{groups})`. **Reference unit**: :math:`\mathrm{Å^2\cdot C/(kJ\cdot ps)}`. **To SI unit**: :math:`1\times 10^{-11}\,\mathrm{m^2/(V\cdot s)`. """ mus = (L_ij * charges / rhos[:, None]).sum(axis=-1) if not reduced: mus = (mus * ureg.avogadro_constant * ureg.elementary_charge * ureg.mole / ureg.coulomb).to_reduced_units().magnitude return mus
[docs] def calculate_transference_numbers( L_ij: np.ndarray[float], charges: np.ndarray[float]) -> np.ndarray[float]: r""" Calculates the transference number :math:`t_i` of each species using the Onsager transport coefficients :math:`L_{ij}`. Parameters ---------- L_ij : `numpy.ndarray` Onsager transport coefficients :math:`L_{ij}`. **Shape**: :math:`(N_\mathrm{blocks},\,N_\mathrm{groups},\, N_\mathrm{groups})`. **Reference unit**: :math:`\mathrm{mol/(kJ\cdotÅ\cdot ps)}`. charges : array-like Charges :math:`q_i` shared by all atoms or centers of mass in each species :math:`i`. **Shape**: :math:`(N_\mathrm{groups},)`. **Reference unit**: :math:`\mathrm{e}`. Returns ------- ts : `numpy.ndarray` Transference numbers :math:`t_i` of the atoms or centers of mass in each species :math:`i`. **Shape**: :math:`(N_\mathrm{blocks},\,N_\mathrm{groups})`. """ s = charges * (L_ij * charges).sum(axis=-1) return s / s.sum(axis=-1)
[docs] class Onsager(SerialAnalysisBase): """ A serial implementation to calculate the Onsager transport coefficients and its related properties. .. note:: The simulation must have been run with a constant timestep :math:`\\Delta t` and the frames to be analyzed must be evenly spaced and proceed forward in time for this analysis module to function correctly. The Onsager transport framework [1]_ can be used to analyze transport properties in bulk constant-volume fluids and electrolytic systems. The Onsager transport equation .. math:: \\mathbf{J}_i=-\\sum_j L_{ij}\\nabla\\bar{\\mu}_j relates the flux :math:`\\mathbf{J}_i` of species :math:`i` to the Onsager transport coefficients :math:`L_{ij}` and the electrochemical potential :math:`\\bar{\\mu}_j` of species :math:`j`. There is an Onsager transport coefficient for each pair of species that, unlike the Nernst–Einstein equation, captures the strong cross-correlations in electrolytes. The Onsager transport coefficients can be calculated from the particle positions over time using the Einstein relation .. math:: L_{ij}=\\frac{1}{6k_\\mathrm{B}TV}\\lim_{t\\rightarrow\\infty} \\frac{d}{dt}\\left\\langle\\sum_\\alpha\\left[ \\mathbf{r}_{i,\\alpha}(t)-\\mathbf{r}_{i,\\alpha}(0)\\right] \\cdot\\sum_\\beta\\left[\\mathbf{r}_{j,\\beta}(t) -\\mathbf{r}_{j,\\beta}(0)\\right]\\right\\rangle where :math:`k_\\mathrm{B}` is the Boltzmann constant, :math:`T` is the system temperature, :math:`V` is the system volume, :math:`t` is time, and :math:`\\mathbf{r}_\\alpha` and :math:`\\mathbf{r}_\\beta` are the positions of particles :math:`\\alpha` and :math:`\\beta` belonging to species :math:`i` and :math:`j`, respectively. The angular brackets denote the ensemble average. It is evident that :math:`L_{ij}=L_{ji}`; hence, the equation above is an Onsager reciprocal relation. The diagonal terms :math:`L_{ii}` in the matrix of Onsager transport coefficients captures the self-diffusion and self-correlations for a single species :math:`i` and has two contributions: .. math:: L_{ii}=L_{ii}^\\mathrm{self}+L_{ii}^\\mathrm{distinct} The self term .. math:: L_{ii}^\\mathrm{self}=\\frac{1}{6k_\\mathrm{B}TV} \\lim_{t\\rightarrow\\infty}\\frac{d}{dt} \\sum_\\alpha\\left\\langle\\left[ \\mathbf{r}_{i,\\alpha}(t)-\\mathbf{r}_{i,\\alpha}(0) \\right]^2\\right\\rangle is given by the autocorrelation function of the flux of a single particle :math:`\\alpha` when :math:`\\alpha=\\beta`, while the distinct term .. math:: L_{ii}^\\mathrm{distinct}=\\frac{1}{6k_\\mathrm{B}TV} \\lim_{t\\rightarrow\\infty}\\frac{d}{dt} \\sum_\\alpha\\sum_{\\beta\\neq\\alpha}\\left\\langle\\left[ \\mathbf{r}_{i,\\alpha}(t)-\\mathbf{r}_{i,\\alpha}(0)\\right]\\cdot \\left[\\mathbf{r}_{i,\\beta}(t)-\\mathbf{r}_{i,\\beta}(0)\\right] \\right\\rangle is given by the cross-correlation between two distinct particles when :math:`\\alpha\\neq\\beta`. Naturally, the self term is related to the self-diffusion coefficient :math:`D_i` via .. math:: L_{ii}^\\mathrm{self}=\\frac{D_i\\rho_i}{k_\\mathrm{B}T} where :math:`\\rho_i` is the number density of species :math:`i`. Finally, the Onsager transport coefficients can be used to obtain experimentally relevant transport properties: * Ionic conductivity: .. math:: \\kappa=F^2\\sum_i\\sum_j z_iz_jL_{ij} :math:`F` is the Faraday constant. * Electrophoretic mobility: .. math:: \\mu_i=\\frac{F}{\\rho_i}\\sum_j z_jL_{ij} * Transference number: .. math:: t_i=\\frac{\\sum_j z_iz_jL_{ij}}{\\sum_k\\sum_l z_kz_lL_{kl}} Parameters ---------- groups : `MDAnalysis.AtomGroup` or array-like Group of atoms for which the mean squared displacements (or analogs) are calculated. groupings : `str` or array-like, default: :code:`"atoms"` Determines whether the centers of mass are used in lieu of individual atom positions. If `groupings` is a `str`, the same value is used for all `groups`. .. note:: If the desired grouping is not :code:`"atoms"`, * the trajectory file should have segments (or chains) containing residues (or molecules) and residues containing atoms, and * residues and segments should be locally unwrapped at the simulation box edges, if not already, using :class:`MDAnalysis.transformations.wrap.unwrap`, :meth:`MDAnalysis.core.groups.AtomGroup.unwrap`, or :func:`MDAnalysis.lib.mdamath.make_whole`. .. container:: **Valid values**: * :code:`"atoms"`: Atom positions (generally or for coarse-grained simulations). * :code:`"residues"`: Residues' centers of mass (for atomistic simulations). * :code:`"segments"`: Segments' centers of mass (for atomistic polymer simulations). temperature : `float`, `openmm.unit.Quantity`, or `pint.Quantity`, \ default: :code:`300` System temperature :math:`T`. .. note:: If :code:`reduced=True`, `temperature` should be equal to the energy scale. When the Lennard-Jones potential is used, it generally means that :math:`T^*=1`, or `temperature=1`. **Reference unit**: :math:`\\mathrm{K}`. charges : array-like, `openmm.unit.Quantity`, or `pint.Quantity`, \ keyword-only, optional Charges :math:`q_i` for the entities in the :math:`N_\\mathrm{groups}` atom groups in `groups`. If not provided, they will be retrieved from the main :class:`MDAnalysis.core.universe.Universe` object only if it contains charge information. .. note:: Depending on the grouping for a specific atom group, all entities (atoms, residues, or segments) should carry the same charge. **Shape**: :math:`(N_\\mathrm{groups},)`. **Reference unit**: :math:`\\mathrm{e}`. dimensions : array-like, `openmm.unit.Quantity`, or \ `pint.Quantity`, keyword-only, optional System dimensions :math:`(L_x,\\,L_y,\\,L_z)`. If the :class:`MDAnalysis.core.universe.Universe` object that the atom groups in `groups` belong to does not contain dimensionality information, provide it here. .. tip:: You can zero out dimensions by setting them to :code:`0` if your simulation is pseudo-1D or pseudo-2D. For example, if you have a one-layer thick slab in the :math:`xy`-plane, you can set :code:`dimensions=np.array((L_x, L_y, 0))` to evaluate the transport properties in 2D. Note that this affects the reference units of the Onsager transport coefficients and its related properties, but not the self-diffusivity. **Shape**: :math:`(3,)`. **Reference unit**: :math:`\\mathrm{Å}`. dt : `float`, `openmm.unit.Quantity`, or `pint.Quantity`, \ keyword-only, optional Time between frames :math:`\\Delta t`. While this is normally determined from the trajectory, the trajectory may not have the correct information if the data is in reduced units. For example, if your reduced timestep is :math:`0.01` and you output trajectory data every :math:`10,000` timesteps, then :math:`\\Delta t=100`. **Reference unit**: :math:`\\mathrm{ps}`. n_blocks : `int`, keyword-only, default: :code:`1` Number of blocks :math:`N_\\mathrm{blocks}` to split the trajectory into. center : `bool`, keyword-only, default: :code:`False` Determines whether the system center of mass is subtracted from the positions used to compute the transport properties. center_atom : `bool`, keyword-only, default: :code:`False` Determines whether the system center of mass is computed using the atom positions, regardless of `groupings`. Has no effect if :code:`center=False`. center_wrap : `bool`, keyword-only, default: :code:`False` Determines whether the system center of mass is computed using the wrapped particle positions. Has no effect if :code:`center=False`. fft : `bool`, keyword-only, default: :code:`True` Determines whether fast Fourier transforms (FFT) are used to evaluate the mean squared displacements (or analogs). reduced : `bool`, keyword-only, default: :code:`False` Specifies whether the data is in reduced units. Affects `results.times`, etc. unwrap : `bool`, keyword-only, default: :code:`False` Determines if atom positions are unwrapped. Ensure that :code:`unwrap=False` when the trajectory already contains unwrapped particle positions, as this parameter is used in conjunction with `center_wrap` to determine the appropriate system center of mass. verbose : `bool`, keyword-only, default: :code:`True` Determines whether detailed progress is shown. **kwargs Additional keyword arguments to pass to :class:`MDAnalysis.analysis.base.AnalysisBase`. Attributes ---------- universe : `MDAnalysis.Universe` :class:`MDAnalysis.core.universe.Universe` object containing all information describing the system. results.units : `dict` Reference units for the results. For example, to get the reference units for :code:`results.times`, call :code:`results.units["times"]`. results.pairs : `tuple` All unique pairs of indices of the groups of atoms in `groups`. The ordering aligns with the column indices in `results.msd`. results.times : `numpy.ndarray` Changes in time :math:`t-t_0`. **Shape**: :math:`(N_t,)`. **Reference unit**: :math:`\\mathrm{ps}`. results.msd_cross : `numpy.ndarray` MSDs (or analogs) that includes the dimensionality scaling factor. See Notes to understand what this value actually is. **Shape**: :math:`(C(N_\\mathrm{groups}+1,\\,2),\\,N_t)` or :math:`(C(N_\\mathrm{groups}+1,\\,2),\\,N_\\mathrm{blocks},\\, N_t)`. **Reference unit**: :math:`\\mathrm{Å}^2`. results.msd_self : `numpy.ndarray` Self MSDs that include the dimensionality scaling factor. See Notes to understand what this value actually is. **Shape**: :math:`(N_\\mathrm{groups},\\,N_t)` or :math:`(N_\\mathrm{groups},\\,N_\\mathrm{blocks},\\,N_t)`. **Reference unit**: :math:`\\mathrm{Å}^2`. results.L_ij : `numpy.ndarray` Onsager transport coefficients :math:`L_{ij}`. Only available after running :meth:`calculate_transport_coefficients`. **Shape**: :math:`(N_\\mathrm{blocks},\\,N_\\mathrm{groups},\\, N_\\mathrm{groups})`. **Reference unit**: :math:`\\mathrm{mol/(kJ}\\cdot\\mathrm{Å}\\cdot\\mathrm{ps)}`. results.L_ii_self : `numpy.ndarray` Self-diffusion contribution to the single-species Onsager transport coefficients :math:`L_{ii}^\\mathrm{self}`. Note that :math:`L_{ii}^\\mathrm{self}` is related to :math:`D_i` via .. math:: L_{ii}^\\mathrm{self}=\\dfrac{N}{k_\\mathrm{B}TV}D_i Only available after running :meth:`calculate_transport_coefficients`. **Shape**: :math:`(N_\\mathrm{blocks},\\,N_\\mathrm{groups})`. **Reference unit**: :math:`\\mathrm{mol/(kJ\\cdotÅ\\cdot ps)}`. results.D_i : `numpy.ndarray` Self-diffusion coefficients :math:`D_i`. Only available after running :meth:`calculate_transport_coefficients`. **Shape**: :math:`(N_\\mathrm{blocks},\\,N_\\mathrm{groups})`. **Reference unit**: :math:`\\mathrm{Å^2/ps}`. results.conductivity : `numpy.ndarray` Conductivity :math:`\\kappa`. Only available after running :meth:`calculate_conductivity`. **Shape**: :math:`(N_\\mathrm{blocks},\\,N_\\mathrm{groups})`. **Reference unit**: :math:`\\mathrm{C^2/(kJ\\cdotÅ\\cdot ps)`. **To SI unit**: :math:`1\\times10^{19}\\,\\mathrm{S}/\\mathrm{m}`. results.electrophoretic_mobilities : `numpy.ndarray` Electrophoretic mobilities :math:`\\mu_i`. Only available after running :meth:`calculate_electrophoretic_mobilities`. **Shape**: :math:`(N_\\mathrm{blocks},\\,N_\\mathrm{groups})`. **Reference unit**: :math:`\\mathrm{Å^2\\cdotC/(kJ\\cdot ps)}`. **To SI unit**: :math:`1\\times10^{-11}\\,\\mathrm{m}^2/ (\\mathrm{V}\\cdot\\mathrm{s})`. results.transference_numbers : `numpy.ndarray` Transference numbers :math:`t_i`. Only available after running :meth:`calculate_transference_numbers`. **Shape**: :math:`(N_\\mathrm{blocks},\\,N_\\mathrm{groups})`. Notes ----- * The values in `results.msd_cross` are actually "summed squared displacements"—that is, `results.msd_cross` contains the sum of all particles' squared displacements instead of their average. However, it is extremely unlikely the raw values in `results.msd_cross` will be used other than to calculate the Onsager transport coefficients. On the other hand, `results.msd_self` is averaged over all particles. Therefore, it can be plotted against `results.times` to directly get the self-diffusion coefficients. References ---------- .. [1] Fong, K. D.; Self, J.; McCloskey, B. D.; Persson, K. A. Onsager Transport Coefficients and Transference Numbers in Polyelectrolyte Solutions and Polymerized Ionic Liquids. *Macromolecules* **2020**, **53** (21), 9503–9512. https://doi.org/10.1021/acs.macromol.0c02001. """ def __init__( self, groups: Union[mda.AtomGroup, tuple[mda.AtomGroup]], groupings: Union[str, tuple[str]] = "atoms", temperature: Union[float, "unit.Quantity", Q_] = 300, *, charges: Union[np.ndarray[float], "unit.Quantity", Q_] = None, dimensions: Union[np.ndarray[float], "unit.Quantity", Q_] = None, dt: Union[float, "unit.Quantity", Q_] = None, n_blocks: int = 1, center: bool = False, center_atom: bool = False, center_wrap: bool = False, fft: bool = True, reduced: bool = False, unwrap: bool = False, verbose: bool = True, **kwargs) -> None: self._groups = [groups] if isinstance(groups, mda.AtomGroup) else groups self.universe = self._groups[0].universe super().__init__(self.universe.trajectory, verbose=verbose, **kwargs) GROUPINGS = {"atoms", "residues", "segments"} self._n_groups = len(self._groups) if isinstance(groupings, str): if groupings not in GROUPINGS: emsg = (f"Invalid grouping '{groupings}'. Valid values: " "'" + "', '".join(GROUPINGS) + "'.") raise ValueError(emsg) self._groupings = self._n_groups * [groupings] else: if self._n_groups != len(groupings): emsg = ("The shape of 'groupings' is incompatible with " "that of 'groups'.") raise ValueError(emsg) for gr in groupings: if gr not in GROUPINGS: emsg = (f"Invalid grouping '{gr}'. Valid values: " "'" + "', '".join(GROUPINGS) + "'.") raise ValueError(emsg) self._groupings = groupings if reduced and not is_unitless(temperature): emsg = "'temperature' cannot have units when reduced=True." raise TypeError(emsg) self._kBT = strip_unit(temperature, "K")[0] if not reduced: self._kBT = ( self._kBT * ureg.avogadro_constant * ureg.boltzmann_constant * ureg.kelvin ).m_as(ureg.kilojoule / ureg.mole) if dimensions is not None: if len(dimensions) != 3: raise ValueError("'dimensions' must have length 3.") if reduced and not is_unitless(dimensions): emsg = "'dimensions' cannot have units when 'reduced=True'." raise TypeError(emsg) self._dimensions = np.asarray(strip_unit(dimensions, "Å")[0]) elif self.universe.dimensions is not None: self._dimensions = self.universe.dimensions[:3].copy() else: raise ValueError("No system dimensions found or provided.") if dt is not None: if reduced and not is_unitless(dt): raise TypeError("'dt' cannot have units when reduced=True.") self._dt = strip_unit(dt, "ps")[0] else: self._dt = self._trajectory.dt if charges is not None: if len(charges) != self._n_groups: emsg = ("The shape of 'charges' is incompatible with " "that of 'groups'.") raise ValueError(emsg) if reduced and not is_unitless(charges): emsg = "'charges' cannot have units when 'reduced=True'." raise TypeError(emsg) self._charges = np.asarray(strip_unit(charges, "e")[0]) elif hasattr(self.universe.atoms, "charges"): self._charges = np.empty(self._n_groups) for i, (ag, gr) in enumerate(zip(self._groups, self._groupings)): qs = getattr(ag, gr).charges if not np.allclose(qs, q := qs[0]): self._charges = None wmsg = (f"Not all {gr} in `groups[{i}]` share the " "same charge. Charge information will not " "be stored.") warnings.warn(wmsg) break self._charges[i] = q else: self._charges = None self._Ns = np.fromiter( (getattr(ag, f"n_{gr}") for (ag, gr) in zip(self._groups, self._groupings)), dtype=int, count=self._n_groups ) self._N = self._Ns.sum() self._slices = [] _ = 0 for N in self._Ns: self._slices.append(slice(_, _ + N)) _ += N if np.all(~np.isclose(self._dimensions, 0)): self._rhos = np.fromiter( (getattr(g, f"n_{gr}") for g, gr in zip(self._groups, self._groupings)), count=self._n_groups, dtype=float ) / self._dimensions.prod() self._n_blocks = n_blocks self._center = center self._center_atom = center_atom self._center_wrap = center_wrap self._fft = fft self._reduced = reduced self._unwrap = unwrap self._verbose = verbose def _prepare(self) -> None: # Ensure frames are evenly spaced and proceed forward in time if hasattr(self._sliced_trajectory, "frames"): dfs = np.diff(self._sliced_trajectory.frames) if (df := dfs[0]) <= 0 or not np.allclose(dfs, df): emsg = ("The selected frames must be evenly spaced and " "proceed forward in time.") raise ValueError(emsg) elif (df := self.step) <= 0: raise ValueError("The analysis must proceed forward in time.") # Determine number of frames used when the trajectory is split # into blocks self._n_frames_block = self.n_frames // self._n_blocks self._n_frames = self._n_blocks * self._n_frames_block if (extra_frames := self.n_frames - self._n_frames) > 0: wmsg = (f"The trajectory is not divisible into {self._n_blocks:,} " f"blocks, so the last {extra_frames:,} frame(s) will be " "discarded. To maximize performance, set appropriate " "starting and ending frames in run() so that the number " "of frames to be analyzed is divisible by the number of " "blocks.") warnings.warn(wmsg) # Find all unique AtomGroup combinations self.results.pairs = tuple( itertools.combinations_with_replacement(range(self._n_groups), 2) ) # Preallocate arrays to store MSDs and CDs self.results.msd_cross = np.empty( (len(self.results.pairs), self._n_blocks, self._n_frames_block), dtype=float ) self.results.msd_self = np.empty( (self._n_groups, self._n_blocks, self._n_frames_block), dtype=float ) # Preallocate arrays to store positions and number of boundary # crossings self._positions = np.empty((self.n_frames, self._N, 3)) if self._unwrap: self._sliced_trajectory[0] self._positions_old = self.universe.atoms.unwrap() self._images = np.zeros((self.universe.atoms.n_atoms, 3), dtype=int) self._thresholds = self._dimensions / 2 # Store time changes self.results.times = df * self._dt * np.arange(self._n_frames_block) # Store reference units self.results.units = Hash({"times": ureg.picosecond}) self.results.units["msd_cross"] = \ self.results.units["msd_self"] = ureg.angstrom ** 2 def _single_frame(self) -> None: # Get and store unwrapped positions positions = self.universe.atoms.positions.copy() if self._unwrap: unwrap( positions, self._positions_old, self._dimensions, thresholds=self._thresholds, images=self._images ) for ag, gr, s in zip(self._groups, self._groupings, self._slices): self._positions[self._frame_index, s] = ( positions[ag.indices] if gr == "atoms" else center_of_mass( ag, gr, images=self._images[ag.indices] if self._unwrap else None ) ) # Subtract system center of mass from entity positions if self._center: if self._center_atom: if self._center_wrap: wrap(positions, self._dimensions) scom = center_of_mass(positions=positions, masses=self.universe.atoms.masses) else: if self._center_wrap: positions = wrap(self._positions[self._frame_index], self._dimensions, in_place=False) else: positions = self._positions[self._frame_index] scom = center_of_mass( positions=positions, masses=np.concatenate( [getattr(g, gr).masses for g, gr in zip(self._groups, self._groupings)] ) ) self._positions[self._frame_index] -= scom def _conclude(self) -> None: # Truncate the positions array if there are extra frames if self.n_frames != self._n_frames: self._positions = self._positions[:self._n_frames] # Compute the MSDs (or their analogs) for each unique AtomGroup # pair msd = msd_fft if self._fft else msd_shift delete_dimensions = np.isclose(self._dimensions, 0) for i, (i1, i2) in enumerate(ProgressBar(self.results.pairs, verbose=self._verbose)): if i1 == i2: if self._Ns[i1]: positions = self._positions[:, self._slices[i1]].reshape( (self._n_blocks, -1, self._Ns[i1], 3) ) positions[:, :, :, delete_dimensions] = 0 self.results.msd_cross[i] = msd(positions.sum(axis=2), axis=1) self.results.msd_self[i1] = ( msd(positions, axis=1, average=False).sum(axis=-1) / self._Ns[i1] ) else: self.results.msd_cross[i] \ = self.results.msd_self[i1] = np.nan elif self._Ns[i1] and self._Ns[i2]: positions1 = self._positions[:, self._slices[i1]].reshape( (self._n_blocks, -1, self._Ns[i1], 3) ).sum(axis=2) positions2 = self._positions[:, self._slices[i2]].reshape( (self._n_blocks, -1, self._Ns[i2], 3) ).sum(axis=2) positions1[:, :, delete_dimensions] \ = positions2[:, :, delete_dimensions] = 0 self.results.msd_cross[i] = msd(positions1, positions2, axis=1) else: self.results.msd_cross[i] = np.nan # Account for dimensionality by dividing by 2 * D D = 2 * (~delete_dimensions).sum() self.results.msd_cross /= D self.results.msd_self /= D
[docs] def calculate_transport_coefficients( self, start: int = 1, stop: int = None, scale: str = "log", *, start_self: int = None, stop_self: int = None, scale_self: str = None, enforce_linear: bool = True) -> None: r""" Fits the mean squared displacements (MSDs) (or analogs) to evaluate the Onsager transport coefficients :math:`L_{ij}` and :math:`L_{ii}^\mathrm{self}`. Parameters ---------- start : `int`, default: :code:`1` Starting frame with respect to the interval used in :meth:`Onsager.run` for fitting the MSDs (or their analogs). stop : `int`, optional Ending frame with respect to the interval used in :meth:`Onsager.run` for fitting the MSDs (or their analogs). scale : `str`, :code:`{"log", "linear"}` Data scaling for fitting the MSDs (or their analogs) against time. .. container:: **Valid values**: * :code:`"linear"`: Linear :math:`x`- and :math:`y`-axes. * :code:`"log"`: Logarithmic :math:`x`- and :math:`y`-axes. start_self : `int`, keyword-only, optional Starting frame with respect to the interval used in :meth:`Onsager.run` for fitting the self MSDs. If not provided, `start_self` shares a value with `start`. stop_self : `int`, keyword-only, optional Ending frame with respect to the interval used in :meth:`Onsager.run` for fitting the self MSDs. If not provided, `stop_self` shares a value with `stop`. scale_self : `str`, keyword-only, optional Data scaling for fitting the self MSDs against time. If not provided, `scale_self` shares a value with `scale`. .. container:: **Valid values**: * :code:`"linear"`: Linear :math:`x`- and :math:`y`-axes. * :code:`"log"`: Logarithmic :math:`x`- and :math:`y`-axes. enforce_linear : `bool`, keyword-only, default: :code:`True` Enforce linear fits for data on a logarithmic scale by setting the slope to :math:`1`, or .. math:: \log(\mathrm{MSD})=\log(t)+b """ if not hasattr(self.results, "msd_cross"): emsg = ("Call Onsager.run() before " "Onsager.calculate_transport_coefficients().") raise RuntimeError(emsg) self.results.L_ij, self.results.L_ii_self, self.results.D_i \ = calculate_transport_coefficients( self.results.times, self.results.msd_cross, self.results.msd_self, self._Ns, self._dimensions, self._kBT, start, stop, scale, start_self=start_self, stop_self=stop_self, scale_self=scale_self, enforce_linear=enforce_linear, verbose=self._verbose ) # Store reference units self.results.units["D_i"] = ureg.angstrom ** 2 / ureg.picosecond self.results.units["L_ii_self"] = self.results.units["L_ij"] \ = 1 / (ureg.kilojoule * ureg.angstrom * ureg.picosecond / ureg.mole)
[docs] def calculate_conductivity( self, *, charges: Union[np.ndarray[float], "unit.Quantity", Q_] = None ) -> None: """ Calculates the ionic conductivity :math:`\\kappa` using the Onsager transport coefficients :math:`L_{ij}`. Parameters ---------- charges : array-like, `openmm.unit.Quantity`, or \ `pint.Quantity`, keyword-only, optional Charge numbers :math:`z_i` of the groupings in the :math:`N_\\mathrm{g}` groups. This argument is optional only if `charges` has previously been passed to a calculation method belonging to this class. **Shape**: :math:`(N_\\mathrm{g},)`. **Reference unit**: :math:`\\mathrm{e}`. """ if not hasattr(self.results, "L_ij"): emsg = ("Call Onsager.calculate_transport_coefficients() before " "Onsager.calculate_conductivity().") raise RuntimeError(emsg) if charges is not None: if len(charges) != self._n_groups: emsg = ("The shape of 'charges' is incompatible with " "that of 'groups'.") raise ValueError(emsg) if self._reduced and not is_unitless(charges): emsg = "'charges' cannot have units when 'reduced=True'." raise TypeError(emsg) self._charges = np.asarray(strip_unit(charges, "e")[0]) if self._charges is None: raise ValueError("No charge number information available.") self.results.conductivity = calculate_conductivity( self.results.L_ij, self._charges, reduced=self._reduced ) self.results.units["conductivity"] = \ ureg.coulomb ** 2 / (ureg.kilojoule * ureg.angstrom * ureg.picosecond)
[docs] def calculate_electrophoretic_mobilities( self, *, charges: Union[np.ndarray[float], "unit.Quantity", Q_] = None, rhos: Union[np.ndarray[float], "unit.Quantity", Q_] = None ) -> None: """ Calculates the electrophoretic mobility :math:`\\mu_i` of each species using the Onsager transport coefficients :math:`L_{ij}`. Parameters ---------- charges : array-like, `openmm.unit.Quantity`, or \ `pint.Quantity`, keyword-only, optional Charge numbers :math:`z_i` of the groupings in the :math:`N_\\mathrm{g}` groups. This argument is optional only if charge information is present in the topology or `charges` has previously been passed to a calculation method belonging to this class. **Shape**: :math:`(N_\\mathrm{g},)`. **Reference unit**: :math:`\\mathrm{e}`. rhos : array-like, `openmm.unit.Quantity`, or \ `pint.Quantity`, keyword-only, optional Number densities :math:`n_i` of the groupings in the :math:`N_\\mathrm{g}` groups. **Shape**: :math:`(N_\\mathrm{g},)`. **Reference unit**: :math:`\\mathrm{Å}^{-3}`. """ if not hasattr(self.results, "L_ij"): emsg = ("Call Onsager.calculate_transport_coefficients() before " "Onsager.calculate_electrophoretic_mobilities().") raise RuntimeError(emsg) if charges is not None: if len(charges) != self._n_groups: emsg = ("The shape of 'charges' is incompatible with " "that of 'groups'.") raise ValueError(emsg) if self._reduced and not is_unitless(charges): emsg = "'charges' cannot have units when 'reduced=True'." raise TypeError(emsg) self._charges = np.asarray(strip_unit(charges, "e")[0]) if self._charges is None: raise ValueError("No charge number information available.") if rhos is not None: if len(rhos) != self._n_groups: emsg = ("The shape of 'rhos' is incompatible with " "that of 'groups'.") raise ValueError(emsg) if self._reduced and not is_unitless(rhos): emsg = "'rhos' cannot have units when 'reduced=True'." raise TypeError(emsg) self.rhos = np.asarray(strip_unit(rhos, "Å^-3")[0]) if self._rhos is None: raise ValueError("No number density information available.") self.results.electrophoretic_mobilities \ = calculate_electrophoretic_mobilities( self.results.L_ij, self._charges, self._rhos, reduced=self._reduced ) self.results.units["electrophoretic_mobilities"] = \ ureg.angstrom ** 2 * ureg.coulomb / (ureg.kilojoule * ureg.picosecond)
[docs] def calculate_transference_numbers( self, *, charges: Union[np.ndarray[float], "unit.Quantity", Q_] = None ) -> None: """ Calculates the transference number of each species using the Onsager transport coefficients :math:`L_{ij}`. Parameters ---------- charges : array-like, `openmm.unit.Quantity`, or \ `pint.Quantity`, keyword-only, optional Charge numbers :math:`z_i` of the groupings in the :math:`N_\\mathrm{g}` groups. This argument is optional only if charge information is present in the topology or `charges` has previously been passed to a calculation method belonging to this class. **Shape**: :math:`(N_\\mathrm{g},)`. **Reference unit**: :math:`\\mathrm{e}`. """ if not hasattr(self.results, "L_ij"): emsg = ("Call Onsager.calculate_transport_coefficients() before " "Onsager.calculate_transference_numbers().") raise RuntimeError(emsg) if charges is not None: if len(charges) != self._n_groups: emsg = ("The shape of 'charges' is incompatible with " "that of 'groups'.") raise ValueError(emsg) if self._reduced and not is_unitless(charges): emsg = "'charges' cannot have units when 'reduced=True'." raise TypeError(emsg) self._charges = np.asarray(strip_unit(charges, "e")[0]) if self._charges is None: raise ValueError("No charge number information available.") self.results.transference_numbers = calculate_transference_numbers( self.results.L_ij, self._charges )