Source code for mdcraft.openmm.file

"""
Custom OpenMM topology and trajectory file readers and writers
==============================================================
.. moduleauthor:: Benjamin Ye <GitHub: @bbye98>

This module provides custom topology and trajectory file readers and
writers for OpenMM.
"""

import platform
from typing import Any, Union
import warnings

import netCDF4 as nc
import numpy as np
import openmm
from openmm import app, unit

from .. import VERSION

[docs] class NetCDFFile(): """ Interface for reading and writing AMBER NetCDF trajectory and restart files. Parameters ---------- file : `str` or `netcdf4.Dataset` NetCDF file. If `file` is a filename and does not have the :code:`.nc` or :code:`.ncdf` extension, :code:`.nc` will automatically be appended. mode : `str` NetCDF file access mode. restart : `bool`, default: :code:`False` Specifies whether the NetCDF file is a trajectory or restart file. **kwargs Keyword arguments to be passed to :code:`netCDF4.Dataset`. """ def __init__( self, file: Union[str, nc.Dataset], mode: str, restart: bool = False, **kwargs): if isinstance(file, str): if not file.endswith((".nc", ".ncdf")): file += ".nc" self._nc = nc.Dataset(file, mode=mode, format="NETCDF3_64BIT_OFFSET", **kwargs) else: self._nc = file self._nc.set_always_mask(False) if mode == "r": self._frame = self._nc.variables["time"].shape[0] self._restart = self._nc.Conventions == "AMBERRESTART" else: self._frame = 0 self._restart = restart
[docs] def get_dimensions( self, frames: Union[int, list[int], slice] = None, units: bool = True ) -> Union[tuple[np.ndarray[float], np.ndarray[float]], tuple[unit.Quantity, unit.Quantity]]: """ Get the simulation box dimensions. Parameters ---------- frames : `int`, `list`, or `slice`, optional Frame indices. If :code:`None`, the dimensions across all frames are returned. units : `bool`, default: :code:`True` Determines whether the dimensions are returned with units. Returns ------- cell_lengths : `numpy.ndarray`, optional Simulation box dimensions. **Reference unit**: :math:`\\mathrm{Å}`. cell_angles : `numpy.ndarray`, optional Angles that define the shape of the simulation box. **Reference unit**: :math:`^\\circ`. """ cell_lengths = (self._nc.variables["cell_lengths"][:] if frames is None else self._nc.variables["cell_lengths"][frames]) cell_angles = (self._nc.variables["cell_angles"][:] if frames is None else self._nc.variables["cell_angles"][frames]) if units: cell_lengths *= unit.angstrom cell_angles *= unit.degree return cell_lengths, cell_angles
[docs] def get_num_frames(self) -> int: """ Get the number of frames. Returns ------- num_frames : `int` Number of frames. """ return self._nc.dimensions["frame"].size
[docs] def get_num_atoms(self) -> int: """ Get the number of atoms. Returns ------- num_atoms : `int` Number of atoms. """ return self._nc.dimensions["atom"].size
[docs] def get_times( self, frames: Union[int, list[int], slice] = None, units: bool = True) -> Union[np.ndarray[float], unit.Quantity]: """ Get simulation times. Parameters ---------- frames : `int`, `list`, or `slice`, optional Frame indices. If :code:`None`, the times across all frames are returned. units : `bool`, default: :code:`True` Determines whether the times are returned with units. Returns ------- times : `numpy.ndarray` or `openmm.unit.Quantity` Simulation times. **Reference unit**: :math:`\\mathrm{ps}`. """ times = (self._nc.variables["time"][:] if frames is None else self._nc.variables["time"][frames]) if units: times *= unit.picosecond return times
[docs] def get_positions( self, frames: Union[int, list[int], slice] = None, units: bool = True) -> Union[np.ndarray[float], unit.Quantity]: """ Get the atom positions. Parameters ---------- frames : `int`, `list`, or `slice`, optional Frame indices. If :code:`None`, the positions across all frames are returned. units : `bool`, default: :code:`True` Determines whether the positions are returned with units. Returns ------- positions : `numpy.ndarray` or `openmm.unit.Quantity` Atom positions. **Reference unit**: :math:`\\mathrm{Å}`. """ positions = (self._nc.variables["coordinates"][:] if frames is None else self._nc.variables["coordinates"][frames]) if units: positions *= unit.angstrom return positions
[docs] def get_velocities( self, frames: Union[int, list[int], slice] = None, units: bool = True) -> Union[np.ndarray[float], unit.Quantity]: """ Get atom velocities. Parameters ---------- frames : `int`, `list`, or `slice`, optional Frame indices. If :code:`None`, the velocities across all frames are returned. units : `bool`, default: :code:`True` Determines whether the velocities are returned with units. Returns ------- velocities : `numpy.ndarray` or `openmm.unit.Quantity` Atom velocities. If the NetCDF file does not contain this information, :code:`None` is returned. **Reference unit**: :math:`\\mathrm{Å/ps}`. """ if "velocities" not in self._nc.variables: wmsg = ("The NetCDF file does not contain information about " "the atom velocities.") warnings.warn(wmsg) return None velocities = (self._nc.variables["velocities"][:] if frames is None else self._nc.variables["velocities"][frames]) if units: velocities *= unit.angstrom / unit.picosecond return velocities
[docs] def get_forces( self, frames: Union[int, list[int], slice] = None, units: bool = True) -> Union[np.ndarray[float], unit.Quantity]: """ Get the forces acting on the atoms. Parameters ---------- frames : `int`, `list`, or `slice`, optional Frame indices. If :code:`None`, the forces across all frames are returned. units : `bool`, default: :code:`True` Determines whether the forces are returned with units. Returns ------- forces : `numpy.ndarray` or `openmm.unit.Quantity` Forces acting on the atoms. If the NetCDF file does not contain this information, :code:`None` is returned. **Reference unit**: :math:`\\mathrm{Å/ps}`. """ if "forces" not in self._nc.variables: wmsg = ("The NetCDF file does not contain information about " "the forces acting on the atoms.") warnings.warn(wmsg) return None forces = (self._nc.variables["forces"][:] if frames is None else self._nc.variables["forces"][frames]) if units: forces *= unit.kilocalorie_per_mole / unit.angstrom return forces
[docs] def write_header( self: Any, N: int, cell: bool, velocities: bool, forces: bool, restart: bool = False, *, remd: str = None, temp0: float = None, remd_dimtype: np.ndarray[int] = None, remd_indices: np.ndarray[int] = None, remd_repidx: int = -1, remd_crdidx: int = -1, remd_values: np.ndarray[float] = None ) -> "NetCDFFile": """ Initialize a NetCDF file according to `AMBER NetCDF Trajectory/Restart Convention Version 1.0, Revision C <https://ambermd.org/netcdf/nctraj.xhtml>`_. .. note:: This function can be used as either a static or instance method. Parameters ---------- self : `str`, `netcdf4.Dataset`, or `mdcraft.openmm.file.NetCDFFile` If :meth:`write_header` is called as a static method, you must provide a filename or a NetCDF file object. Otherwise, the NetCDF file embedded in the current instance is used. N : `int` Number of atoms. cell : `bool` Specifies whether simulation box length and angle information is available. velocities : `bool` Specifies whether atom velocities should be written. forces : `bool` Specifies whether forces exerted on atoms should be written. restart : `bool`, default: :code:`False` Specifies whether the NetCDF file is a trajectory or restart file. remd : `str`, keyword-only, optional Specifies whether information about a replica exchange molecular dynamics (REMD) simulation is written. .. container:: **Valid values**: * :code:`"temp"` for regular REMD. * :code:`"multi"` for multi-dimensional REMD. temp0 : `float`, keyword-only, optional Temperature that the thermostat is set to maintain for a REMD restart file only. **Reference unit**: :math:`\\mathrm{K}`. remd_dimtype : array-like, keyword-only, optional Array specifying the exchange type(s) for the REMD dimension(s). Required for a multi-dimensional REMD restart file. remd_indices : array-like, keyword-only, optional Array specifying the position in all dimensions that each frame is in. Required for a multi-dimensional REMD restart file. remd_repidx : `int`, keyword-only, optional Overall index of the frame in replica space. remd_crdidx : `int`, keyword-only, optional Overall index of the frame in coordinate space. remd_values : array-like, keyword-only, optional Replica value the specified replica dimension has for that given frame. Required for a multi-dimensional REMD restart file. Returns ------- netcdf_file : `mdcraft.openmm.file.NetCDFFile` NetCDF file object. Only returned when this function is used as a static method. """ # Create NetCDF object if it doesn't already exist if not isinstance(self, NetCDFFile): self = NetCDFFile(self, "w", restart=restart) self._nc.Conventions = "AMBER" if self._restart: self._nc.Conventions += "RESTART" self._nc.ConventionVersion = "1.0" self._nc.program = "MDCraft" self._nc.programVersion = VERSION self._nc.title = (f"OpenMM {openmm.Platform.getOpenMMVersion()} / " f"{platform.node()}") if self._restart: self._nc.createDimension("frame", 1) else: self._nc.createDimension("frame", None) if remd == "multi": # pragma: no cover self._nc.createDimension("remd_dimension", len(remd_dimtype)) self._nc.createDimension("spatial", 3) self._nc.createDimension("atom", N) if self._restart: self._nc.createVariable("coordinates", "d", ("atom", "spatial")) else: self._nc.createVariable("coordinates", "f", ("frame", "atom", "spatial")) self._nc.variables["coordinates"].units = "angstrom" self._nc.createVariable("time", "d", ("frame",)) self._nc.variables["time"].units = "picosecond" if cell: self._nc.createDimension("cell_spatial", 3) self._nc.createDimension("cell_angular", 3) self._nc.createDimension("label", 5) self._nc.createVariable("spatial", "c", ("spatial",)) self._nc.variables["spatial"][:] = list("xyz") self._nc.createVariable("cell_spatial", "c", ("cell_spatial",)) self._nc.variables["cell_spatial"][:] = list("abc") self._nc.createVariable("cell_angular", "c", ("cell_angular", "label")) self._nc.variables["cell_angular"][:] = [list("alpha"), list("beta "), list("gamma")] if self._restart: self._nc.createVariable("cell_lengths", "d", ("cell_spatial",)) self._nc.createVariable("cell_angles", "d", ("cell_angular",)) else: self._nc.createVariable("cell_lengths", "f", ("frame", "cell_spatial")) self._nc.createVariable("cell_angles", "f", ("frame", "cell_angular")) self._nc.variables["cell_lengths"].units = "angstrom" self._nc.variables["cell_angles"].units = "degree" if velocities: if self._restart: self._nc.createVariable("velocities", "d", ("atom", "spatial")) else: self._nc.createVariable("velocities", "f", ("frame", "atom", "spatial")) self._nc.variables["velocities"].units = "angstrom/picosecond" self._nc.variables["velocities"].scale_factor = 20.455 if forces: if self._restart: self._nc.createVariable("forces", "d", ("atom", "spatial")) else: self._nc.createVariable("forces", "f", ("frame", "atom", "spatial")) self._nc.variables["forces"].units = "kilocalorie/mole/angstrom" if remd is not None: # pragma: no cover if remd == "temp": self._nc.createVariable("temp0", "d", ("frame",)) if self._restart: if temp0 is None: emsg = ("Temperature must be provided for a REMD " "restart file.") raise ValueError(emsg) self._nc.variables["temp0"][0] = temp0 self._nc.variables["temp0"].units = "kelvin" elif remd == "multi": self._nc.createVariable("remd_dimtype", "i", ("remd_dimension",)) self._nc.createVariable("remd_repidx", "i", ("frame",)) self._nc.createVariable("remd_crdidx", "i", ("frame",)) if self._restart: if remd_dimtype is None: emsg = ("Dimension types must be provided for a " "multi-dimensional REMD restart file.") raise ValueError(emsg) self._nc.variables["remd_dimtype"] = remd_dimtype self._nc.createVariable("remd_indices", "i", ("remd_dimension",)) if remd_indices is None: emsg = ("Dimension indices must be provided for a " "multi-dimensional REMD restart file.") raise ValueError(emsg) self._nc.variables["remd_indices"] = remd_indices self._nc.variables["remd_repidx"][0] = remd_repidx self._nc.variables["remd_crdidx"][0] = remd_crdidx self._nc.createVariable("remd_values", "d", ("remd_dimension",)) if remd_values is None: emsg = ("Replica values must be provided for a " "multi-dimensional REMD restart file.") raise ValueError(emsg) self._nc.variables["remd_values"][:] = remd_values else: self._nc.createVariable("remd_indices", "i", ("frame", "remd_dimension")) self._nc.createVariable("remd_values", "d", ("frame", "remd_dimension")) return self
[docs] def write_file(self: Any, state: openmm.State) -> "NetCDFFile": """ Write a single simulation state to a restart NetCDF file. .. note:: This function can be used as either a static or instance method. Parameters ---------- self : `str`, `netcdf4.Dataset`, or `mdcraft.openmm.file.NetCDFFile` If :meth:`write_file` is called as a static method, you must provide a filename or a NetCDF file object. Otherwise, the NetCDF file embedded in the current instance is used. state : `openmm.State` OpenMM simulation state from which to retrieve cell dimensions and atom positions, velocities, and forces. Returns ------- netcdf_file : `mdcraft.openmm.file.NetCDFFile` NetCDF file object. Only returned when this function is used as a static method. """ # Collect all available data in the state data = {} pbv = state.getPeriodicBoxVectors() if pbv is not None: (a, b, c, alpha, beta, gamma) = \ app.internal.unitcell.computeLengthsAndAngles(pbv) data["cell_lengths"] = 10 * np.array((a, b, c)) data["cell_angles"] = 180 * np.array((alpha, beta, gamma)) / np.pi data["coordinates"] = (state.getPositions(asNumpy=True) .value_in_unit(unit.angstrom)) try: data["velocities"] \ = state.getVelocities(asNumpy=True).value_in_unit( unit.angstrom / unit.picosecond ) except openmm.OpenMMException: # pragma: no cover pass try: data["forces"] \ = state.getForces(asNumpy=True).value_in_unit( unit.kilocalorie_per_mole / unit.angstrom ) except openmm.OpenMMException: # pragma: no cover pass # Create NetCDF file or object if it doesn't already exist if not isinstance(self, NetCDFFile): self = NetCDFFile(self, "w", restart=True) if not hasattr(self._nc, "Conventions"): self.write_header(data["coordinates"].shape[0], "cell_lengths" in data or "cell_angles" in data, "velocities" in data, "forces" in data) self._nc.set_always_mask(False) elif self._nc.Conventions != "AMBERRESTART": raise ValueError("The NetCDF file must be a restart file.") # Write data to NetCDF file for k, v in data.items(): self._nc.variables[k][:] = v self._nc.sync() return self
[docs] def write_model( self: Any, time: Union[float, np.ndarray[float]], coordinates: np.ndarray[float], velocities: np.ndarray[float] = None, forces: np.ndarray[float] = None, cell_lengths: np.ndarray[float] = None, cell_angles: np.ndarray[float] = None, *, restart: bool = False ) -> "NetCDFFile": """ Write simulation state(s) to a NetCDF file. .. note:: This function can be used as either a static or instance method. Parameters ---------- self : `str`, `netcdf4.Dataset`, or `mdcraft.openmm.file.NetCDFFile` If :meth:`write_model` is called as a static method, you must provide a filename or a NetCDF file object. Otherwise, the NetCDF file embedded in the current instance is used. time : `float` or `numpy.ndarray` Time(s). The dimensionality determines whether a single or multiple frames are written. **Reference unit**: :math:`\\mathrm{ps}`. coordinates : `numpy.ndarray` Coordinates of :math:`N` atoms over :math:`N_t` frames. The dimensionality depends on whether a single or multiple frames are to be written and must be compatible with that for `time`. **Shape**: :math:`(N,\\,3)` or :math:`(N_t,\\,N,\\,3)`. **Reference unit**: :math:`\\mathrm{Å}`. velocities : `numpy.ndarray`, optional Velocities of :math:`N` atoms over :math:`N_t` frames. The dimensionality depends on whether a single or multiple frames are to be written and must be compatible with that for `time`. **Shape**: :math:`(N,\\,3)` or :math:`(N_t,\\,N,\\,3)`. **Reference unit**: :math:`\\mathrm{Å/ps}`. forces : `numpy.ndarray`, optional Forces exerted on :math:`N` atoms over :math:`N_t` frames. The dimensionality depends on whether a single or multiple frames are to be written and must be compatible with that for `time`. **Shape**: :math:`(N,\\,3)` or :math:`(N_t,\\,N,\\,3)`. **Reference unit**: :math:`\\mathrm{Å/ps}`. cell_lengths : `numpy.ndarray`, optional Simulation box dimensions. **Shape**: :math:`(3,)`. **Reference unit**: :math:`\\mathrm{Å}`. cell_angles : `numpy.ndarray`, optional Angles that define the shape of the simulation box. **Shape**: :math:`(3,)`. **Reference unit**: :math:`^\\circ`. restart : `bool`, keyword-only, default: :code:`False` Prevents the frame index from being incremented if writing a NetCDF restart file. Returns ------- netcdf_file : `mdcraft.openmm.file.NetCDFFile` NetCDF file object. Only returned when this function is used as a static method. """ # Create NetCDF file or object if it doesn't already exist if not isinstance(self, NetCDFFile): self = NetCDFFile(self, "w", restart=restart) if not hasattr(self._nc, "Conventions"): self.write_header(coordinates.shape[0], cell_lengths is not None or cell_angles is not None, velocities is not None, forces is not None) self._nc.set_always_mask(False) # Write model to NetCDF file n_frames = len(time) if isinstance(time, (tuple, list, np.ndarray)) else 1 frames = slice(self._frame, self._frame + n_frames) self._nc.variables["time"][frames] = time self._nc.variables["coordinates"][frames] = coordinates if velocities is not None: self._nc.variables["velocities"][frames] = velocities if forces is not None: self._nc.variables["forces"][frames] = forces if cell_lengths is not None: self._nc.variables["cell_lengths"][frames] = cell_lengths if cell_angles is not None: self._nc.variables["cell_angles"][frames] = cell_angles self._nc.sync() if not restart: self._frame += n_frames return self