Source code for mdcraft.io.reader

from __future__ import annotations
import bz2
from collections import defaultdict
import concurrent.futures
from functools import cached_property, lru_cache
from pathlib import Path
from typing import Any, TextIO
import warnings

import numpy as np
import pandas as pd
from pint.errors import DefinitionSyntaxError, DimensionalityError
from scipy.io import netcdf_file

from . import INTERNAL_UNITS
from .base import BaseTopologyReader, BaseTrajectoryReader
from .. import FOUND, Q_, U_, ureg
from ..lib.cell import (
    convert_cell_representation,
    reduce_box_vectors,
    scale_coordinates,
)
from ..lib.unit import strip_unit

if FOUND["netCDF4"]:
    import netCDF4 as nc
if FOUND["openmm"]:
    from openmm import unit


[docs] class LAMMPSDataReader(BaseTopologyReader): # TODO """ """ _ATOM_ATTRIBUTES = { "atom-ID": "ids", "molecule-ID": "molecule_ids", "atom-type": "types", ("q", "charge"): "charges", "mass": "masses", ("x", "y", "z"): "positions", ("bodyflag", "ellipsoidflag", "lineflag", "triangleflag"): "flags", "diameter": "sizes", ("rho", "density"): "densities", ("mux", "muy", "muz"): "dipole_moments", } _ATOM_STYLES = { "amoeba": ["atom-ID", "molecule-ID", "atom-type", "q", "x", "y", "z"], "angle": ["atom-ID", "molecule-ID", "atom-type", "x", "y", "z"], "atomic": ["atom-ID", "atom-type", "x", "y", "z"], "body": ["atom-ID", "atom-type", "bodyflag", "mass", "x", "y", "z"], "bond": ["atom-ID", "molecule-ID", "atom-type", "x", "y", "z"], "bpm/sphere": [ "atom-ID", "molecule-ID", "atom-type", "diameter", "density", "x", "y", "z", ], "charge": ["atom-ID", "atom-type", "q", "x", "y", "z"], "dielectric": [ "atom-ID", "atom-type", "q", "x", "y", "z", "mux", "muy", "muz", "area", "ed", "em", "epsilon", "curvature", ], "dipole": [ "atom-ID", "atom-type", "q", "x", "y", "z", "mux", "muy", "muz", ], "dpd": ["atom-ID", "atom-type", "theta", "x", "y", "z"], "edpd": [ "atom-ID", "atom-type", "edpd_temp", "edpd_cv", "x", "y", "z", ], "electron": [ "atom-ID", "atom-type", "q", "espin", "eradius", "x", "y", "z", ], "ellipsoid": [ "atom-ID", "atom-type", "ellipsoidflag", "density", "x", "y", "z", ], "full": ["atom-ID", "molecule-ID", "atom-type", "q", "x", "y", "z"], "line": [ "atom-ID", "molecule-ID", "atom-type", "lineflag", "density", "x", "y", "z", ], "mdpd": ["atom-ID", "atom-type", "rho", "x", "y", "z"], "molecular": ["atom-ID", "molecule-ID", "atom-type", "x", "y", "z"], "oxdna": ["atom-ID", "atom-type", "x", "y", "z"], "peri": ["atom-ID", "atom-type", "volume", "density", "x", "y", "z"], "rheo": ["atom-ID", "atom-type", "status", "rho", "x", "y", "z"], "rheo/thermal": [ "atom-ID", "atom-type", "status", "rho", "energy", "x", "y", "z", ], "smd": [ "atom-ID", "atom-type", "molecule-ID", "volume", "mass", "kradius", "cradius", "x0", "y0", "z0", "x", "y", "z", ], "sph": ["atom-ID", "atom-type", "rho", "esph", "cv", "x", "y", "z"], "sphere": [ "atom-ID", "atom-type", "diameter", "density", "x", "y", "z", ], "spin": [ "atom-ID", "atom-type", "x", "y", "z", "spx", "spy", "spz", "sp", ], "tdpd": ["atom-ID", "atom-type", "x", "y", "z", "cc*"], "template": [ "atom-ID", "atom-type", "molecule-ID", "template-index", "template-atom", "x", "y", "z", ], "tri": [ "atom-ID", "molecule-ID", "atom-type", "triangleflag", "density", "x", "y", "z", ], "wavepacket": [ "atom-ID", "atom-type", "charge", "espin", "eradius", "etag", "cs_re", "cs_im", "x", "y", "z", ], } _EXTENSIONS = {".data"} _FORMAT = "LAMMPSDATA" _HEADER_KEYWORDS = { "atoms", "bonds", "angles", "dihedrals", "impropers", "atom types", "bond types", "angle types", "dihedral types", "improper types", "ellipsoids", "lines", "triangles", "bodies", "xlo xhi", "ylo yhi", "zlo zhi", "xy xz yz", "avec", "bvec", "cvec", "abc origin", } _PARALLELIZABLE = True _SECTION_KEYWORDS = { "Atoms": "atoms", "Velocities": "atoms", "Masses": "atom_types", "Ellipsoids": "ellipsoids", "Lines": "lines", "Triangles": "triangles", "Bodies": "bodies", "Bonds": "bonds", "Angles": "angles", "Dihedrals": "dihedrals", "Impropers": "impropers", "Atom Type Labels": "atom_types", "Bond Type Labels": "bond_types", "Angle Type Labels": "angle_types", "Dihedral Type Labels": "dihedral_types", "Improper Type Labels": "improper_types", "Pair Coeffs": "atom_types", "PairIJ Coeffs": None, "Bond Coeffs": "bond_types", "Angle Coeffs": "angle_types", "Dihedral Coeffs": "dihedral_types", "Improper Coeffs": "improper_types", "BondBond Coeffs": "angle_types", "BondAngle Coeffs": "angle_types", "MiddleBondTorsion Coeffs": "dihedral_types", "EndBondTorsion Coeffs": "dihedral_types", "AngleTorsion Coeffs": "dihedral_types", "AngleAngleTorsion Coeffs": "dihedral_types", "BondBond13 Coeffs": "dihedral_types", "AngleAngle Coeffs": "improper_types", } def __init__( self, filename: str | Path, /, atom_style: str | list[str] | None = None, *, reduced: bool = False, n_workers: int | None = 1, ) -> None: super().__init__(filename, n_workers=n_workers) # Create and store handle to file self.open() # Skip first line self._file.readline() # Read header and get topology summary and simulation box size information # NOTE: https://docs.lammps.org/read_data.html self._n_ = defaultdict(int) self._dimensions = {} while line := self._file.readline(): if line := line.rstrip(): if line in self._SECTION_KEYWORDS: break *value, header = line.split( maxsplit=1 + ("lo" in line) + 2 * any(s in line for s in ("xy", "vec", "abc")) ) if header not in self._HEADER_KEYWORDS: raise RuntimeError( f"Invalid LAMMPS data file '{self._filename.name}'. " f"Unknown header keyword '{header}'." ) if len(value) == 1: self._n_[header.replace(" ", "_")] = int(value[0]) else: self._dimensions[header] = np.array(value, np.float64) if self._n_["atoms"] == 0: raise RuntimeError( f"Invalid LAMMPS data file '{self._filename.name}'. No atoms found." ) if "xlo xhi" in self._dimensions: xlo, xhi = self._dimensions["xlo xhi"] try: ylo, yhi = self._dimensions["ylo yhi"] zlo, zhi = self._dimensions["zlo zhi"] except KeyError: raise RuntimeError( f"Invalid LAMMPS data file '{self._filename.name}'. " "Incomplete simulation box size information." ) box_vectors = np.diag((xhi - xlo, yhi - ylo, zhi - zlo)) if "xy xz yz" in self._dimensions: *box_vectors[1:, 0], box_vectors[2, 1] = self._dimensions[ "xy xz yz" ] elif "avec" in self._dimensions: box_vectors = np.stack( ( self._dimensions["avec"], self._dimensions["bvec"], self._dimensions["cvec"], ) ) self._dimensions = convert_cell_representation( box_vectors, "parameters" ) # Find all sections file_size = self._filename.stat().st_size start = self._file.tell() - len(line) - 1 if self._n_workers > 1: chunk_size = np.ceil((file_size - start) / self._n_workers).astype( int ) self._offsets = {} with concurrent.futures.ProcessPoolExecutor( max_workers=self._n_workers ) as executor: for future in concurrent.futures.as_completed( executor.submit( self._find_sections, self._filename, i * chunk_size, min((i + 1) * chunk_size, file_size), ) for i in range(self._n_workers) ): offsets = future.result() if offsets is not None: self._offsets |= offsets else: self._offsets = self._find_sections(self._file, start, file_size) # Figure out and store atom style self._file.seek(self._offsets["Atoms"]) self._file.readline() self._file.readline() n_attributes = len(self._file.readline().split()) if atom_style is None: if n_attributes == 5: self._atom_style = "atomic" elif n_attributes == 7: self._atom_style = "full" else: raise RuntimeError( "Could not determine atom style for " f"'{self._filename.name}'. Use the `atom_style` " "parameter to specify it." ) warnings.warn( f"Atom style for '{self._filename.name}' assumed to " f"be '{self._atom_style}'." ) self._atom_style = self._ATOM_STYLES[self._atom_style] else: atom_attributes = set( attribute for attributes in self._ATOM_STYLES.values() for attribute in attributes ) self._atom_style = [] if isinstance(atom_style, str): atom_style = atom_style.split() for style in atom_style: if style in self._ATOM_STYLES: for attribute in self._ATOM_STYLES[style]: if attribute not in self._atom_style: self._atom_style.append(attribute) elif ( style in atom_attributes or style.startswith("cc") ) and style not in self._atom_style: self._atom_style.append(style) if len(self._atom_style) != n_attributes: raise RuntimeError( f"Invalid atom style for '{self._filename.name}'. " f"Expected {n_attributes} attributes, but got " f"{len(self._atom_style)}: {self._atom_style}." ) # Store settings self._reduced = reduced def __repr__(self) -> str: return ( f"{self.__class__.__name__}" f"('{self._filename.name}', reduced={self._reduced}, " f"n_workers={self._n_workers})" ) def _find_sections( self, file: str | Path | TextIO, start: int, end: int ) -> dict[str, int]: """ Finds all sections in the LAMMPS data file. Parameters ---------- file : `str`, `pathlib.Path`, or `io.TextIO` Filename, path, or handle to the data file. start : `int` Byte offset to start reading from. end : `int` Byte offset to stop reading at. Returns ------- sections : `dict` Sections found in the data file and their starting byte offsets. """ manual = isinstance(file, (str, Path)) if manual: file = open(file, "r") byte_counter = file.seek(start) sections = {} while byte_counter < end: line = file.readline() byte_counter += len(line) if (l := line.rstrip()) in self._SECTION_KEYWORDS: sections[l] = byte_counter - len(line) return sections def _parse_section( self, file: TextIO, section: str ) -> dict[str, np.ndarray[np.uint32 | np.float64]]: file.seek(self._offsets[section]) n_lines = ( (self.n_atom_types + 1) * self.n_atom_types // 2 if section == "PairIJ Coeffs" else self._n_[self._SECTION_KEYWORDS[section]] ) file.readline() # <header keyword> file.readline() # blank line section_data = {} if "Labels" in section: section_data["types"] = np.empty(n_lines, np.uint32) section_data["labels"] = np.empty(n_lines, object) for i in range(n_lines): section_data["types"][i], section_data["labels"][i] = ( file.readline().split() ) else: # data = np.loadtxt(file, max_rows=n_lines) data = pd.read_csv( file, sep="\\s+", header=None, nrows=n_lines ).to_numpy() if section == "Atoms": for i, attribute in enumerate(self._atom_style): if attribute.endswith("*"): section_data[attribute[:-1]] = data[ :, i : ( -n_columns_left if ( n_columns_left := len(self._atom_style) - i - 1 ) else None ), ] else: section_data[attribute] = data[:, i] elif section == "Bodies": debug = True elif section == "Ellipsoids": section_data["ID"] = data[:, 0].astype(int) section_data["shape"] = data[:, 1:4] section_data["quaternion"] = data[:, 4:] elif section == "Lines": section_data["ID"] = data[:, 0].astype(int) section_data["point1"] = data[:, 1:3] section_data["point2"] = data[:, 3:] elif section == "Masses": section_data["atom-type"] = data[:, 0].astype(int) section_data["mass"] = data[:, 1] elif section == "Triangles": section_data["ID"] = data[:, 0].astype(int) section_data["corner1"] = data[:, 1:4] section_data["corner2"] = data[:, 4:7] section_data["corner3"] = data[:, 7:] elif section == "Velocities": debug = True elif "Coeffs" in section: if "IJ" in section: section_data["atom-type1"] = data[:, 0].astype(int) section_data["atom-type2"] = data[:, 1].astype(int) section_data["coeffs"] = data[:, 2:] else: section_data["atom-type"] = data[:, 0].astype(int) section_data["coeffs"] = data[:, 1:] else: # Bonds, Angles, Dihedrals, Impropers section_data["ID"] = data[:, 0].astype(int) section_data["type"] = data[:, 1] section_data["atom-types"] = data[:, 2:].astype(int) return section_data def _parse_topology( self, file: TextIO, _convert_units: bool = True, ) -> dict[str, dict[str, np.ndarray[np.uint32 | np.float64]]]: sections = {} for section in self._offsets: sections[section] = self._parse_section(file, section) topology = {} return topology @property def dimensions(self) -> np.ndarray[np.float64] | None: """ Simulation box dimensions (or lattice parameters). If `None`, the system size could not be determined from the topology. **Reference units**: :math:`\\mathrm{nm}` for lengths and degrees (:math:`^\\circ`) for angles. """ return self._dimensions @property def n_atoms(self) -> int: """ Number of atoms. """ return self._n_["atoms"] @property def n_bonds(self) -> int: """ Number of bonds. """ return self._n_["bonds"] @property def n_angles(self) -> int: """ Number of angles. """ return self._n_["angles"] @property def n_dihedrals(self) -> int: """ Number of proper dihedrals. """ return self._n_["dihedrals"] @property def n_improper_dihedrals(self) -> int: """ Number of improper dihedrals. """ return self._n_["impropers"] @property def n_atom_types(self) -> int: """ Number of atom types. """ return self._n_["atom_types"] @property def n_bond_types(self) -> int: """ Number of bond types. """ return self._n_["bond_types"] @property def n_angle_types(self) -> int: """ Number of angle types. """ return self._n_["angle_types"] @property def n_dihedral_types(self) -> int: """ Number of proper dihedral types. """ return self._n_["dihedral_types"] @property def n_improper_dihedral_types(self) -> int: """ Number of improper dihedral types. """ return self._n_["improper_types"] @property def n_ellipsoids(self) -> int: """ Number of ellipsoids. """ return self._n_["ellipsoids"] @property def n_lines(self) -> int: """ Number of lines. """ return self._n_["lines"] @property def n_triangles(self) -> int: """ Number of triangles. """ return self._n_["triangles"] @property def n_bodies(self) -> int: """ Number of bodies. """ return self._n_["bodies"] @property def n_residues(self) -> int: """ Number of residues. """ raise NotImplementedError @property def n_chains(self) -> int: """ Number of chains. """ raise NotImplementedError
[docs] def open(self) -> None: """ Opens the LAMMPS data file and stores a handle to it. """ self._file = open(self._filename, "r")
[docs] def close(self) -> None: """ Closes the LAMMPS data file and deletes the handle. """ if hasattr(self, "_file"): self._file.close() del self._file
[docs] class CompositeReader(BaseTrajectoryReader): # TODO _EXTENSIONS = {}
[docs] class LAMMPSDumpReader(BaseTrajectoryReader): """ LAMMPS dump file reader. This class is not a full-featured LAMMPS dump file reader, but can process most, if not all, dump files written using a single :code:`dump` command with :code:`atom`, :code:`custom`, :code:`grid`, or :code:`local` styles. Notably, it supports * both compressed (:code:`.bz2`) and text dump files, * frame headers with units and/or time information (using :code:`dump_modify units yes` and/or :code:`dump_modify time yes`, respectively), * frames with different numbers of atoms/entities, * orthogonal, restricted triclinic, and general triclinic simulation boxes, * any combination of (un)scaled and (un)wrapped coordinates, and * all standard and custom attributes. Furthermore, it can read in parallel, which can be much faster for parsing large dump files and performing analyses on multiple frames in those dump files. .. important:: If the :code:`dump_modify` command is used to alter the output in the dump file, the :code:`colname` and :code:`header` keywords must *not* be used. If the dump file is written in the :code:`local` style, the :code:`label ATOMS` option must also *not* be used. .. note:: Frames in a dump file are ordered by their timestep numbers and are expected to proceed forward in time. If the :code:`reset_timestep` command is used in the LAMMPS input script used to generate the dump file, this trajectory reader may behave unexpectedly. .. seealso:: For more information on the LAMMPS dump file format, see the `dump command <https://docs.lammps.org/dump.html>`_ page of the LAMMPS documentation. Parameters ---------- filename : `str` or `pathlib.Path`, positional-only Filename or path to the dump file. coordinate_formats : `str` or `list` of `str`, optional Coordinate formats. If a `str` is provided, it is used for all axes. If the trajectory does not contain particle positions for a specific axis, use `None` for that axis. .. container:: **Valid values**: * :code:`""` for unscaled and wrapped coordinates, * :code:`"s"` for scaled and wrapped coordinates, * :code:`"u"` for unscaled and unwrapped coordinates, or * :code:`"su"` for scaled and unwrapped coordinates. dt : `float`, optional Simulation time step size. Only used when the dump file was not written with :code:`dump_modify time yes`. If not specified and the dump file does not contain time information, the step size is assumed to be the LAMMPS default for the style of units used. extras : `bool`, `str` or `list` of `str`, keyword-only, optional Extra per-atom information to read if the dump file was written in the :code:`atom` or :code:`custom` styles. If `True`, all extra attributes found in the trajectory are read. **Valid values:** +-------------------------------------+-----------------------------------------------+------------------------+ | Attribute | LAMMPS dump attribute(s) | Per-atom data type | +=====================================+===============================================+========================+ | :code:`"dipole_moments"` | (:code:`mux`, :code:`muy`, :code:`muz`) | `numpy.ndarray[float]` | +-------------------------------------+-----------------------------------------------+------------------------+ | :code:`"dipole_moments_magnitudes"` | :code:`mu` | `float` | +-------------------------------------+-----------------------------------------------+------------------------+ | :code:`"angular_velocities"` | (:code:`omegax`, :code:`omegay`, | `numpy.ndarray[float]` | | | :code:`omegaz`) | | +-------------------------------------+-----------------------------------------------+------------------------+ | :code:`"angular_momenta"` | (:code:`angmomx`, :code:`angmomy`, | `numpy.ndarray[float]` | | | :code:`angmomz`) | | +-------------------------------------+-----------------------------------------------+------------------------+ | :code:`"torques"` | (:code:`tqx`, :code:`tqy`, :code:`tqz`) | `numpy.ndarray[float]` | +-------------------------------------+-----------------------------------------------+------------------------+ | :code:`"c_{compute_id}"` | (:code:`c_{compute_id}[i]`, ...) | `numpy.ndarray[float]` | +-------------------------------------+-----------------------------------------------+------------------------+ | :code:`"d_{name}"` | (:code:`d_{name}[i]`, ...) | `numpy.ndarray[float]` | +-------------------------------------+-----------------------------------------------+------------------------+ | :code:`"d2_{name}[i]"` | (:code:`d2_{name}[i][j]`, ...) | `numpy.ndarray[float]` | +-------------------------------------+-----------------------------------------------+------------------------+ | :code:`"f_{fix_id}"` | (:code:`f_{fix_id}[i]`, ...) | `numpy.ndarray[float]` | +-------------------------------------+-----------------------------------------------+------------------------+ | :code:`"i_{name}"` | (:code:`i_{name}[i]`, ...) | `numpy.ndarray[int]` | +-------------------------------------+-----------------------------------------------+------------------------+ | :code:`"i2_{name}[i]"` | (:code:`i2_{name}[i][j]`, ...) | `numpy.ndarray[int]` | +-------------------------------------+-----------------------------------------------+------------------------+ | :code:`"v_{name}"` | (:code:`v_{name}[i]`, ...) | `numpy.ndarray[float]` | +-------------------------------------+-----------------------------------------------+------------------------+ units_style : `str`, optional Style of units used in the dump file. If not specified, the style of units is determined from the file (when :code:`dump_modify units yes` was used) or assumed to be :code:`units lj` (the LAMMPS default) otherwise. **Valid values:** :code:`"lj"`, :code:`"real"`, :code:`"metal"`, :code:`"si"`, :code:`"cgs"`, :code:`"electron"`, :code:`"micro"`, and :code:`"nano"`. n_workers : `int`, keyword-only, default: :code:`1` Number of threads to use when reading the file. If :code:`None`, the number of available logical threads is used. .. important:: `n_workers` should be set to a value that is less than half the number of frames in the dump file if `dt` is not specified and is to be determined automatically. Examples -------- The simplest way to load a LAMMPS dump file :code:`dump.lammpstrj` containing atom trajectories is: >>> reader = LAMMPSDumpReader("dump.lammpstrj") Trajectory properties, like the coordinate formats, time step size, and style of units, are automatically determined from the file (if possible). Alternatively, the :class:`LAMMPSDumpReader` constructor accepts keyword arguments—`coordinate_formats`, `dt`, and `units_style`, respectively—to directly specify these properties. Additionally, the constructor also accepts the `extras` keyword argument to specify extra attributes to read from the dump file, and the `n_workers` keyword arguments to control parallel parsing of the dump file. For example, to parse the same dump file in parallel with 4 CPU threads and specify that it is in reduced Lennard-Jones units, the coordinates are scaled and unwrapped in the :math:`x`- and :math:`y`-directions but unscaled and wrapped in the :math:`z`-direction, the time step size is :math:`\\Delta t^*=0.005`, and all extra attributes should be read: >>> reader = LAMMPSDumpReader( ... "dump.lammpstrj", ... ["su", "su", ""], ... dt=0.005, ... extras=True, ... units_style="lj", ... n_workers=4 ... ) Then, the `reader` instance can be used to get basic trajectory properties: >>> reader.dt 0.005 >>> reader.timestep 5 >>> reader.times array([ 0., 5., 10., 15., 20.]) >>> reader.n_frames 5 >>> reader.n_atoms 1000 or get raw data, like dimensions and positions, from frames: >>> reader.read_frames([0, 1]) [{'time': 0, 'timestep': 0, 'dimensions': array([10., 10., 10., 90., 90., 90.]), ...}, {'time': 5, 'timestep': 1, 'dimensions': array([10., 10., 10., 90., 90., 90.]), ...}] """ _ATTRIBUTES = { "id", "mol", "proc", "procp1", "type", "typelabel", "element", "mass", "x", "y", "z", "xs", "ys", "zs", "xu", "yu", "zu", "xsu", "ysu", "xsu", "ix", "iy", "iz", "vx", "vy", "vz", "fx", "fy", "fz", "q", "index", } _COORDINATE_FORMATS = ("u", "su", "", "s") _CUSTOM_ATTRIBUTE_PREFIXES = ("c_", "d_", "d2_", "f_", "i_", "i2_", "v_") _DEFAULT_TIME_STEP_SIZES = { "lj": 0.005, "real": 1.0, "metal": 0.001, "si": 1e-8, "cgs": 1e-8, "electron": 0.001, "micro": 2.0, "nano": 0.00045, } _EXTENSIONS = {".bz2", ".dump", ".lammpsdump", ".lammpstrj"} _EXTRA_ATTRIBUTES = { "dipole_moments": ("mux", "muy", "muz"), "dipole_moment_magnitudes": ("mu",), "angular_velocities": ("omegax", "omegay", "omegaz"), "angular_momenta": ("angmomx", "angmomy", "angmomz"), "torques": ("tqx", "tqy", "tqz"), } _FORMAT = "LAMMPSDUMP" _PARALLELIZABLE = True _UNIT_STYLES = { "lj": { "charge": ureg.dimensionless, "energy": ureg.dimensionless, "length": ureg.dimensionless, "mass": ureg.dimensionless, "temperature": ureg.dimensionless, "time": ureg.dimensionless, }, "real": { "charge": ureg.elementary_charge, "energy": ureg.kilocalorie / ureg.mole, "length": ureg.angstrom, "mass": ureg.gram / ureg.mole, "temperature": ureg.kelvin, "time": ureg.femtosecond, }, "metal": { "charge": ureg.elementary_charge, "energy": ureg.electron_volt, "length": ureg.angstrom, "mass": ureg.gram / ureg.mole, "temperature": ureg.kelvin, "time": ureg.picosecond, }, "si": { "charge": ureg.coulomb, "energy": ureg.joule, "length": ureg.meter, "mass": ureg.kilogram, "temperature": ureg.kelvin, "time": ureg.second, }, "cgs": { "charge": ureg.franklin, "energy": ureg.erg, "length": ureg.centimeter, "mass": ureg.gram, "temperature": ureg.kelvin, "time": ureg.second, }, "electron": { "charge": ureg.elementary_charge, "energy": ureg.hartree, "length": ureg.bohr, "mass": ureg.unified_atomic_mass_unit, "temperature": ureg.kelvin, "time": ureg.femtosecond, }, "micro": { "charge": ureg.picocoulomb, "energy": ureg.picogram * ureg.micrometer**2 / ureg.microsecond**2, "length": ureg.micrometer, "mass": ureg.picogram, "temperature": ureg.kelvin, "time": ureg.microsecond, }, "nano": { "charge": ureg.elementary_charge, "energy": ureg.attogram * ureg.nanometer**2 / ureg.nanosecond**2, "length": ureg.nanometer, "mass": ureg.attogram, "temperature": ureg.kelvin, "time": ureg.nanosecond, }, } _VECTOR_ATTRIBUTES = { "positions", "velocities", "forces", "dipole_moments", "angular_velocities", "angular_momenta", "torques", } def __init__( self, filename: str | Path, /, coordinate_formats: str | list[str] | None = None, *, dt: float | unit.Quantity | Q_ | None = None, extras: bool | str | list[str] | None = None, units_style: str | None = None, n_workers: int | None = 1, **kwargs, ) -> None: super().__init__(filename, n_workers=n_workers) # Create and store handle to file self.open() # Loosely check if file is a LAMMPS dump file line = self._file.readline().rstrip() if not line.startswith("ITEM:"): raise ValueError( f"'{self._filename.name}' is not a valid LAMMPS dump file." ) # Figure out per-frame format and style of units self._has_units_header = line == "ITEM: UNITS" if self._has_units_header: _units_style = self._file.readline().rstrip() if units_style is not None and units_style != _units_style: warnings.warn( f"LAMMPS dump file '{self._filename.name}' uses " f"`units {_units_style}`, but `{units_style=}` " "was specified." ) self._units_style = _units_style line = self._file.readline().rstrip() elif units_style is None: self._units_style = "lj" else: if units_style not in self._UNIT_STYLES: raise ValueError( f"Invalid units style '{units_style}'. Valid values: " "'" + "', '".join(self._UNIT_STYLES) + "'." ) self._units_style = units_style self._units = self._UNIT_STYLES[self._units_style] self._has_time_header = line == "ITEM: TIME" if self._has_time_header: self._file.readline() self._file.readline() self._file.readline() # <timestep number> # Figure out dump style and number of header lines self._dump_style = None line = self._file.readline().rstrip() if line.startswith("ITEM: BOX BOUNDS"): self._dump_style = "grid" attributes_header_prefix = "ITEM: GRID CELLS " extras = True n_skip_lines = 6 else: if line == "ITEM: NUMBER OF ATOMS": self._dump_style = "custom" self._entity_name = "atoms" else: self._dump_style = "local" self._entity_name = line.split()[-1].lower() extras = True attributes_header_prefix = f"ITEM: {self._entity_name.upper()} " n_skip_lines = 5 for _ in range(n_skip_lines): self._file.readline() if self._dump_style == "grid": self._n_entities = np.prod( np.array(self._file.readline().split(), int) ) # Get and store attributes available in file self._attributes = { attr: col for col, attr in enumerate( self._file.readline() .removeprefix(attributes_header_prefix) .split() ) } # Find and note columns for standard attributes self._attribute_columns = defaultdict(list) if ( col := self._attributes.get("id", self._attributes.get("index")) ) is not None: self._attribute_columns["ids"] = col if (col := self._attributes.get("mol")) is not None: self._attribute_columns["molecule_ids"] = col if (col := self._attributes.get("type")) is not None: self._attribute_columns["types"] = col if (col := self._attributes.get("typelabel")) is not None: self._attribute_columns["labels"] = col if (col := self._attributes.get("element")) is not None: self._attribute_columns["elements"] = col if (col := self._attributes.get("mass")) is not None: self._attribute_columns["masses"] = col if (col := self._attributes.get("q")) is not None: self._attribute_columns["charges"] = col # Figure out available information in atom trajectories if self._dump_style == "custom": # Validate and store coordinate columns and formats if coordinate_formats is None: self._coordinate_formats = [] for axis in "xyz": for fmt in self._COORDINATE_FORMATS: if col := self._attributes.get(f"{axis}{fmt}"): self._attribute_columns["positions"].append(col) self._coordinate_formats.append(fmt) break else: self._attribute_columns["positions"].append(None) self._coordinate_formats.append(None) elif isinstance(coordinate_formats, str): if coordinate_formats not in self._COORDINATE_FORMATS: raise ValueError( f"Invalid format '{coordinate_formats}' in " "`coordinate_formats`. Valid values: '" "', '".join(self._COORDINATE_FORMATS) + "'." ) self._coordinate_formats = [] for axis in "xyz": if col := self._attributes.get( f"{axis}{coordinate_formats}" ): self._attribute_columns["positions"].append(col) self._coordinate_formats.append(coordinate_formats) else: warnings.warn( f"Coordinate format '{coordinate_formats}' " f"was specified, but '{self._filename.name}' " "does not contain the " f"`{axis}{coordinate_formats}` attribute." ) self._attribute_columns["positions"].append(None) self._coordinate_formats.append(None) if not any(self._coordinate_formats): raise ValueError( f"Coordinate format '{coordinate_formats}' was " f"specified, but '{self._filename.name}' does not " f"contain any of the `x{coordinate_formats}`, " f"`y{coordinate_formats}`, or " f"`z{coordinate_formats}` attributes." ) else: if len(coordinate_formats) == 3: for axis, fmt in zip("xyz", coordinate_formats): if fmt is None: self._attribute_columns["positions"].append(None) continue if fmt not in self._COORDINATE_FORMATS: raise ValueError( f"Invalid format '{fmt}' in " "`coordinate_formats`. Valid values: '" "', '".join(self._COORDINATE_FORMATS) + "'." ) col = self._attributes.get(f"{axis}{fmt}") if col is None: raise ValueError( f"{axis}-coordinate format '{fmt}' was specified, " f"but '{self._filename.name}' does not contain " f"the `{axis}{fmt}` attribute." ) self._attribute_columns["positions"].append(col) self._coordinate_formats = coordinate_formats else: raise ValueError( "`coordinate_formats` must have length 3 when " f"it is an array, not {len(coordinate_formats)}." ) # Check if image flags and velocity and force information are # available image_flag_columns = [] force_columns = [] velocity_columns = [] for axis in "xyz": image_flag_columns.append(self._attributes.get(f"i{axis}")) force_columns.append(self._attributes.get(f"f{axis}")) velocity_columns.append(self._attributes.get(f"v{axis}")) if any(image_flag_columns): self._attribute_columns["image_flags"] = image_flag_columns if any(force_columns): self._attribute_columns["forces"] = force_columns if any(velocity_columns): self._attribute_columns["velocities"] = velocity_columns # Validate and note extra attributes to read self._extra_attribute_indices = defaultdict(list) if extras is None: extra_attribute_columns = {} elif isinstance(extras, bool): if extras: extra_attribute_columns = defaultdict(dict) for attr, col in self._attributes.items(): if attr not in self._ATTRIBUTES: if attr.startswith(self._CUSTOM_ATTRIBUTE_PREFIXES): if (split_index := attr.rfind("[")) != -1: name = attr[:split_index] index = int(attr[split_index + 1 : -1]) self._extra_attribute_indices[name].append( index ) extra_attribute_columns[name][index] = col else: self._extra_attribute_indices[attr] = None extra_attribute_columns[attr] = col else: for name, attrs in self._EXTRA_ATTRIBUTES.items(): if attr in attrs: extra_attribute_columns[name][ attrs.index(attr) ] = col for name, cols in extra_attribute_columns.items(): if name in self._EXTRA_ATTRIBUTES: self._extra_attribute_indices[ name ] = extra_attribute_columns[name] = [ i for _, i in sorted(cols.items()) ] elif self._extra_attribute_indices[name] is not None: self._extra_attribute_indices[name].sort() extra_attribute_columns[name] = [ cols[i] for i in self._extra_attribute_indices[name] ] else: extra_attribute_columns = {} else: if isinstance(extras, str): extras = [extras] extra_attribute_columns = defaultdict(dict) for attr in extras: if attr in self._EXTRA_ATTRIBUTES: columns = tuple( self._attributes.get(attr) for attr in self._EXTRA_ATTRIBUTES[attr] ) if not any(columns): raise ValueError( f"Extra attribute '{attr}' was requested, but " f"'{self._filename.name}' does not contain any of the " f"attributes in {self._EXTRA_ATTRIBUTES[attr]}." ) extra_attribute_columns[attr] = columns elif attr.startswith(self._CUSTOM_ATTRIBUTE_PREFIXES): mapping = { int(name[len(attr) + 1 : -1]): col for name, col in self._attributes.items() if name.startswith(attr) } self._extra_attribute_indices[attr] = sorted(mapping.keys()) extra_attribute_columns[attr] = [ mapping[i] for i in self._extra_attribute_indices[attr] ] else: raise ValueError(f"Invalid attribute '{attr}' in `extras`.") self._attribute_columns |= extra_attribute_columns # Get time step, number of atoms, and byte offsets for frames in file # NOTE: Possible values for `*dt`, `*time_step`, and # `*n_entities` are # - `True` if the value has not been set yet, # - `False` if the value has not yet been found in the # trajectory (subset), # - an `int` if the value is constant, or # - `None` if the value is not constant. file_size = self._filename.stat().st_size if self._n_workers > 1: chunk_size = np.ceil(file_size / self._n_workers).astype(int) self._dt = self._time_step = True self._offsets = [] self._times = [] self._timesteps = [] if self._dump_style != "grid": self._n_entities = True with concurrent.futures.ProcessPoolExecutor( max_workers=self._n_workers ) as executor: for future in concurrent.futures.as_completed( executor.submit( self._find_frames, self._filename, i * chunk_size, min((i + 1) * chunk_size, file_size), ) for i in range(self._n_workers) ): _dt, time_step, n_entities, offsets, times, timesteps = ( future.result() ) if _dt is not False and self._dt != _dt: if self._dt is True: self._dt = _dt elif _dt is not None: self._dt = None if time_step is not False and self._time_step != time_step: if self._time_step is True: self._time_step = time_step elif time_step is not None: self._time_step = None if self._dump_style != "grid": if ( n_entities is not False and self._n_entities != n_entities ): if self._n_entities is True: self._n_entities = n_entities elif n_entities is not None: self._n_entities = None self._offsets.extend(offsets) if times is not None: self._times.extend(times) self._timesteps.extend(timesteps) order = np.argsort(self._offsets) self._offsets = np.array(self._offsets)[order] if self._times: self._times = np.array(self._times)[order] self._timesteps = np.array(self._timesteps)[order] else: ( self._dt, self._time_step, n_entities, self._offsets, self._times, self._timesteps, ) = self._find_frames(self._file, 0, file_size) self._offsets = np.array(self._offsets) if self._times is not None: self._times = np.array(self._times) self._timesteps = np.array(self._timesteps) if self._dump_style != "grid": self._n_entities = n_entities # Give properties default values if not set if dt: if self._dt != dt and not isinstance(self._dt, bool): warnings.warn( f"`{dt=}` was specified, but the time step size " "was determined to be " f"{'variable' if self._dt is None else self._dt}." ) self._dt = strip_unit(dt, self._units["time"])[0] elif isinstance(self._dt, bool): self._dt = self._DEFAULT_TIME_STEP_SIZES[self._units_style] if self._times is None or isinstance(self._times, bool): self._times = self._dt * self._timesteps if isinstance(self._time_step, bool): self._time_step = ( None if len(self._times) == 1 or len(time_steps := set(np.round(np.diff(self._times), 15))) > 1 else time_steps.pop() ) if isinstance(self._n_entities, bool): self._n_entities = None # Store other settings self._reduced = self._units_style == "lj" def __repr__(self) -> str: return ( f"{self.__class__.__name__}('{self._filename.name}', " f"coordinate_formats={getattr(self, '_coordinate_formats', None)}, " f"dt={self.dt}, extras={list(self._extra_attribute_indices.keys())}, " f"units_style='{self._units_style}', n_workers={self._n_workers})" ) def _find_frames( self, file: str | Path | TextIO, start: int, end: int ) -> tuple[ bool | float | None, bool | float | None, bool | int | None, list[int], list[float] | None, list[int], ]: """ Finds the time step size, time step between frames, number of entities, byte offsets, times, and timesteps for frames in (a subset of) the LAMMPS dump file. Parameters ---------- file : `str`, `pathlib.Path`, or `io.TextIO` Filename, path, or handle to the dump file. start : `int` Byte offset to start reading from. end : `int` Byte offset to stop reading at. Returns ------- dt : `bool` or `float` Time step size between timesteps. Is `False` if :code:`dump_modify time yes` was not used or `None` if the step size is not constant. **Reference unit**: :math:`\\mathrm{ps}`. time_step : `bool` or `float` Time step between frames. Is `False` if :code:`dump_modify time yes` was not used or `None` if the time step is not constant. **Reference unit**: :math:`\\mathrm{ps}`. n_entities : `bool` or `int` Number of entities in each frame. Is `False` if the number of entities is not found in the trajectory (subset) or `None` if the number of entities is not constant. offsets : `list` Byte offsets for frames in the file. times : `list` Simulation times of the frames in the file. Is `None` if :code:`dump_modify time yes` was not used. **Reference unit**: :math:`\\mathrm{ps}`. timesteps : `list` Simulation timesteps for frames in the file. """ manual = isinstance(file, (str, Path)) if manual: filename = file file = open(file, "r") else: filename = file.name byte_counter = file.seek(start) is_style_grid = self._dump_style == "grid" frame_marker = ( "ITEM: TIME" if self._has_time_header else "ITEM: TIMESTEP" ) n_preheader_lines = 5 + 4 * is_style_grid dt = time_step = n_entities = False offsets = [] times = [] timesteps = [] # Find first frame while byte_counter < end: line = file.readline() byte_counter += len(line) if line.rstrip() == frame_marker: offsets.append(byte_counter - len(line)) if self._has_time_header: _time = file.readline() byte_counter += len(_time) # <time> _time = float(_time) times.append(_time) byte_counter += len(file.readline()) # ITEM: TIMESTEP _timestep = file.readline() byte_counter += len(_timestep) # <timestep> _timestep = int(_timestep) timesteps.append(_timestep) if is_style_grid: n_entities = _n_entities = self._n_entities else: byte_counter += len( file.readline() ) # ITEM: NUMBER OF [...] _n_entities = file.readline() byte_counter += len(_n_entities) # <n_entities> n_entities = _n_entities = int(_n_entities) n_lines_per_frame = n_preheader_lines + _n_entities break # Find remaining frames while byte_counter < end: for _ in range(n_lines_per_frame): byte_counter += len(file.readline()) if byte_counter >= end: break else: # End of file not reached offsets.append(byte_counter) byte_counter += len(file.readline()) if self._has_time_header: time = file.readline() byte_counter += len(time) # <time> time = float(time) times.append(time) byte_counter += len(file.readline()) # ITEM: TIMESTEP timestep = file.readline() byte_counter += len(timestep) # <timestep> timestep = int(timestep) timesteps.append(timestep) if self._has_time_header: _time_step = time - _time if not np.isclose(time_step, _time_step): if time_step is False: time_step = _time_step elif time_step is not None: time_step = None if not np.isclose( dt, _dt := _time_step / (timestep - _timestep) ): if dt is False: dt = _dt elif dt is not None: dt = None _time, _timestep = time, timestep if not is_style_grid: byte_counter += len( file.readline() ) # ITEM: NUMBER OF [...] _n_entities = file.readline() byte_counter += len(_n_entities) # <n_entities> _n_entities = int(_n_entities) if n_entities != _n_entities and n_entities is not None: n_entities = None n_lines_per_frame = n_preheader_lines + _n_entities if manual: file.close() if len(offsets) != len(timesteps): raise RuntimeError( "Number of frames and timesteps found in the LAMMPS " f"dump file '{filename}' do not match." ) if self._has_time_header and len(offsets) != len(times): raise RuntimeError( "Number of frames and times found in the LAMMPS " f"dump file '{filename}' do not match." ) return ( dt, time_step, n_entities, offsets, times if self._has_time_header else None, timesteps, ) def _parse_frame( self, file: TextIO, frame_index: int, convert_units: bool ) -> dict[str, Any]: """ Reads data from a single frame in the specified LAMMPS dump file. Parameters ---------- file : `io.TextIO` Handle to the dump file. frame_index : `int` Index of frame to read. convert_units : `bool` Specifies whether to convert the data from LAMMPS units to consistent MDCraft units. Returns ------- frame_data : `dict` Data from the frame. Possible keys include: * :code:`time` (if available) * :code:`timestep` * :code:`ids` (if available) * :code:`molecule_ids` (if available) * :code:`types` (if available) * :code:`labels` (if available) * :code:`elements` (if available) * :code:`masses` (if available) * :code:`charges` (if available) * :code:`n_atoms`, :code:`n_grids`, or :code:`n_entities` * :code:`dimensions` * :code:`positions` (if available) * :code:`image_flags` (if available) * :code:`velocities` (if available) * :code:`forces` (if available) * extra attributes, like :code:`dipole_moments`, :code:`dipole_moment_magnitudes`, :code:`angular_velocities`, :code:`angular_momenta`, and :code:`torques` (if available) * custom variables prefixed with :code:`c_`, :code:`d_`, :code:`d2_`, :code:`f_`, :code:`i_`, :code:`i2_`, or :code:`v_` (if available) """ # Seek to frame file.seek(self._offsets[frame_index]) # Initialize data dictionary frame_data = {} # Read time, if available if self._has_time_header: file.readline() # ITEM: TIME frame_data["time"] = float(file.readline()) # Read timestep file.readline() # ITEM: TIMESTEP frame_data["timestep"] = int(file.readline()) if not self._has_time_header and self._dt is not None: frame_data["time"] = frame_data["timestep"] * self._dt # Read number of atoms or entities if self._dump_style == "grid": n_entities = self.n_grids else: file.readline() # ITEM: NUMBER OF [...] frame_data[f"n_{self._entity_name}"] = n_entities = int( file.readline() ) # Read system dimensions if is_general_triclinic := "abc origin" in ( box_header := file.readline() ): # ITEM: BOX BOUNDS box_vectors = np.vstack( [ [float(val) for val in file.readline().split()[:3]] for _ in range(3) ] ) frame_data["dimensions"] = convert_cell_representation( box_vectors, "parameters" ) elif "xy xz yz" in box_header: # restricted triclinic box xlo, xhi, xy = (float(val) for val in file.readline().split()) ylo, yhi, xz = (float(val) for val in file.readline().split()) zlo, zhi, yz = (float(val) for val in file.readline().split()) # https://docs.lammps.org/Howto_triclinic.html # #output-of-restricted-and-general-triclinic-boxes-in-a-dump-file xlo -= min(0, xy, xz, xy + xz) xhi -= max(0, xy, xz, xy + xz) ylo -= min(0, yz) yhi -= max(0, yz) box_vectors = np.zeros((3, 3)) box_vectors[0, 0] = xhi - xlo box_vectors[1, :2] = xy, yhi - ylo box_vectors[2] = xz, yz, zhi - zlo frame_data["dimensions"] = convert_cell_representation( box_vectors, "parameters" ) else: # orthorhombic box xlo, xhi = (float(val) for val in file.readline().split()) ylo, yhi = (float(val) for val in file.readline().split()) zlo, zhi = (float(val) for val in file.readline().split()) frame_data["dimensions"] = np.array( (xhi - xlo, yhi - ylo, zhi - zlo, 90.0, 90.0, 90.0) ) box_vectors = convert_cell_representation( frame_data["dimensions"], "vectors" ) for _ in range(1 + 4 * (self._dump_style == "grid")): file.readline() # Read frame data # data = np.loadtxt(file, max_rows=n_entities) data = pd.read_csv( file, sep="\\s+", header=None, nrows=n_entities ).to_numpy() for name, columns in self._attribute_columns.items(): if isinstance(columns, int): frame_data[name] = data[:, columns] else: frame_data[name] = data[:, [col or 0 for col in columns]] frame_data[name][ :, [i for i, col in enumerate(columns) if col is None] ] = 0 if name in {"ids", "molecule_ids", "types"} or name.startswith( ("i_", "i2_") ): frame_data[name] = frame_data[name].astype(int) # Recover Cartesian coordinates from scaled coordinates and system dimensions if self._dump_style == "custom": scaled_flags = ["s" in fmt for fmt in self._coordinate_formats] if any(scaled_flags): if np.allclose(frame_data["dimensions"][3:], 90): frame_data["positions"][:, scaled_flags] *= frame_data[ "dimensions" ][:3][scaled_flags] else: # Rotate coordinates for triclinic boxes scale_coordinates( frame_data["positions"], box_vectors, scaled_flags ) frame_data["positions"] @= ( reduce_box_vectors(box_vectors) if is_general_triclinic else box_vectors ).T # Convert from LAMMPS units to consistent MDCraft units if convert_units: frame_data["time"] = ( frame_data["time"] * self._units["time"] ).m_as(INTERNAL_UNITS["time"]) frame_data["dimensions"][:3] = ( frame_data["dimensions"][:3] * self._units["length"] ).m_as(INTERNAL_UNITS["length"]) if self._dump_style == "custom": frame_data["positions"] = ( frame_data["positions"] * self._units["length"] ).m_as(INTERNAL_UNITS["length"]) if "forces" in frame_data: frame_data["forces"] = ( frame_data["forces"] * self._units["energy"] / self._units["length"] ) if "[substance]" not in frame_data["forces"].dimensionality: frame_data["forces"] /= ureg.avogadro_constant frame_data["forces"] = frame_data["forces"].m_as( INTERNAL_UNITS["energy"] / INTERNAL_UNITS["length"] ) if "velocities" in frame_data: frame_data["velocities"] = ( frame_data["velocities"] * self._units["length"] / self._units["time"] ).m_as(INTERNAL_UNITS["length"] / INTERNAL_UNITS["time"]) return frame_data @property def dt(self) -> float | None: """ Time step size between timesteps in the trajectory. If `None`, the step size is not constant across frames. **Reference unit**: :math:`\\mathrm{ps}`. """ return self._dt @property def time_step(self) -> float | None: """ Time step between frames in the trajectory. If `None`, the time step is not constant across frames. **Reference unit**: :math:`\\mathrm{ps}`. """ return self._time_step @property def times(self) -> np.ndarray[np.float64]: """ Simulation times found in the trajectory. May not be accurate if the time step size (`dt`) was not specified and could not be determined from the dump file. **Reference unit**: :math:`\\mathrm{ps}`. """ return self._times @property def timesteps(self) -> np.ndarray[np.uint32]: """ Simulation timesteps found in the trajectory. """ return self._timesteps @property def n_atoms(self) -> int | None: """ Number of atoms in each frame. Only available if the dump file was written in the :code:`atom` or :code:`custom` styles. If `None`, the number of atoms is not constant across frames. """ return self._n_entities if self._dump_style == "custom" else None @property def n_entities(self) -> int | None: """ Number of atoms (:code:`atom` or :code:`custom` dump styles), grids (:code:`grid` dump style), or local entities (:code:`local` dump style) in each frame. If `None`, the number of entities is not constant across frames. """ return self._n_entities @property def n_frames(self) -> int: """ Number of frames in the trajectory. """ return len(self._offsets) @property def n_grids(self) -> tuple[int, int, int] | None: """ Number of grid cells in each dimension. Only available if the dump file was written in the :code:`grid` style. """ return self._n_entities if self._dump_style == "grid" else None
[docs] def open(self) -> None: """ Opens the LAMMPS dump file and stores a handle to it. """ with open(self._filename, "rb") as f: self._file = ( bz2.open(self._filename, "rt") if f.read(3) == b"BZh" else open(self._filename, "r") )
[docs] def close(self) -> None: """ Closes the LAMMPS dump file and deletes the handle. """ if hasattr(self, "_file"): self._file.close() del self._file
[docs] class NetCDFReader(BaseTrajectoryReader): # TODO """ AMBER NetCDF trajectory/restart file reader. .. seealso:: For more information on the AMBER NetCDF file format, see the `AMBER NetCDF Trajectory/Restart Convention <https://ambermd.org/netcdf/nctraj.xhtml>`_. Parameters ---------- filename : `str` or `pathlib.Path`, positional-only Filename or path to the NetCDF file. package : `str`, optional, keyword-only, default: :code:`"netcdf4"` Specifies which package to use for reading the NetCDF file. **Valid values**: :code:`"netcdf4"` or :code:`"scipy"`. dt : `float`, optional Simulation time step size. **Reference unit**: :math:`\\mathrm{ps}`. Examples -------- """ _EXTENSIONS = {".nc", ".ncdf"} _FORMAT = "NETCDF" _PARALLELIZABLE = False _VARIABLES = { "spatial", "cell_spatial", "cell_angular", "time", "coordinates", "cell_lengths", "cell_angles", "velocities", "forces", } _REMD_VARIABLES = { "temp0", "remd_dimtype", "remd_indices", "remd_repidx", "remd_crdidx", "remd_values", } _LAMMPS_COORDINATE_VARIABLES = { "scaled_coordinates", "unwrapped_coordinates", "xsu", "ysu", "zsu", "ix", "iy", "iz", } def __init__( self, filename: str | Path, /, *, package: str = "netcdf4", dt: float | unit.Quantity | Q_ | None = None, **kwargs, ) -> None: super().__init__(filename) # Store which package to use for reading if (pkg := package.lower()) in {"scipy", "netcdf4"}: self._package = pkg else: raise ValueError( f"'{package}' is not a supported package for reading NetCDF files." ) # Create and store handle to file self.open() # Store trajectory properties if self._package == "scipy": self._n_atoms = self._file.dimensions["atom"] self._n_frames = self._file._recs self._restart = b"AMBERRESTART" in self._file.Conventions self._lammps = self._file.program == b"LAMMPS" else: self._n_atoms = self._file.dimensions["atom"].size self._n_frames = self._file.dimensions["frame"].size self._restart = "AMBERRESTART" in self._file.Conventions self._lammps = self._file.program == "LAMMPS" self._dt = strip_unit(dt, "ps")[0] # Define cached version of self.get_dimensions() self._get_dimensions = lru_cache()(self.get_dimensions) # Define instance variables self._reduced = False self._units = { "charge": ureg.elementary_charge, "energy": ureg.kilocalorie / ureg.mole, "length": ureg.angstrom, "mass": ureg.gram / ureg.mole, "temperature": ureg.kelvin, "time": ureg.picosecond, } self._custom_units = {q: False for q in self._units} def __repr__(self) -> str: return ( f"{self.__class__.__name__}('{self._filename.name}', " f"package='{self._package}', dt={self.dt})" ) def _open(self) -> "nc.Dataset" | netcdf_file: """ Opens the NetCDF file using the user-specified package. Returns ------- file : `netCDF4.Dataset` or `scipy.io.netcdf_file` Handle to the NetCDF file. """ if self._package == "scipy": file = netcdf_file(self._filename, "r") else: file = nc.Dataset(self._filename, mode="r") file.set_auto_mask(False) return file def _parse_frame( self, file: "nc.Dataset" | netcdf_file, frame_index: int, convert_units: bool, ) -> dict[str, Any]: """ Reads data from a single frame in the specified NetCDF file. Parameters ---------- file : `netCDF4.Dataset` or `scipy.io.netcdf_file` Handle to the NetCDF file. frame_index : `int` Index of frame to read. convert_units : `bool` Specifies whether to convert the data from AMBER NetCDF units to consistent MDCraft units. Returns ------- frame_data : `dict` Data from the frame. """ return { "n_atoms": self.n_atoms, "time": self.get_times(frame_index, convert_units, _file=file), "dimensions": self._get_dimensions( frame_index, convert_units, _file=file ), "positions": self.get_positions( frame_index, convert_units, _file=file ), "velocities": self.get_velocities( frame_index, convert_units, _file=file ), "forces": self.get_forces(frame_index, convert_units, _file=file), # **self.get_remd_variables(frame_index, convert_units, _file=file), **self.get_extra_variables(frame_index, _file=file), } @property def dt(self) -> float | None: """ Time step size between timesteps in the trajectory. A value is returned only when a step size was specified using `dt` in the constructor since NetCDF trajectories do not contain this information. **Unit**: :math:`\\mathrm{ps}`. """ return self._dt
[docs] @cached_property def time_step(self) -> float | None: """ Time step between frames in the trajectory. If `None`, the time step is not constant across frames. **Unit**: :math:`\\mathrm{ps}`. """ time_steps = np.diff(self.times) return ts if np.allclose(ts := time_steps.mean(), time_steps) else None
[docs] @cached_property def times(self) -> np.ndarray[np.float64]: """ Simulation times. **Unit**: :math:`\\mathrm{ps}`. """ return self.get_times()
[docs] @cached_property def timesteps(self) -> np.ndarray[np.uint32] | None: """ Simulation timesteps found in the trajectory. An array is returned only when a step size was specified using `dt` in the constructor since NetCDF trajectories do not contain this information. """ if self.dt is not None: return np.round(self.times / self.dt).astype(int)
@property def n_atoms(self) -> int: """ Number of atoms in each frame. """ return self._n_atoms @property def n_frames(self) -> int: """ Number of frames in the trajectory. """ return self._n_frames
[docs] def open(self) -> None: """ Opens the NetCDF file and stores a handle to it. """ self._file = self._open()
[docs] def close(self) -> None: """ Closes the NetCDF file and deletes the handle. """ if hasattr(self, "_file"): self._file.close() del self._file
[docs] def get_dimensions( self, frame_indices: int | list[int] | slice | None = None, convert_units: bool = True, *, _file: "nc.Dataset" | netcdf_file | None = None, ) -> tuple[np.ndarray[np.float64] | np.ndarray[np.float64]]: """ Gets the dimensions (lattice parameters) of the simulation box. Parameters ---------- frame_indices : `int`, `list`, or `slice`, optional Frame indices. If :code:`None`, the dimensions across all frames are returned. convert_units : `bool`, default :code:`True` Specifies whether to convert the data from AMBER NetCDF units to consistent MDCraft units. Returns ------- dimensions : `numpy.ndarray` Simulation box dimensions. **Shape**: :math:`(6,)` or :math:`(N_\\mathrm{frames},6)`. **Reference units**: :math:`\\mathrm{Å}` for the lengths and :math:`^\\circ` for the angles. """ if _file is None: _file = getattr(self, "_file", None) if _file is None: _file = self._open() manual = True else: manual = False if frame_indices is None: frame_indices = slice(None) if "cell_lengths" in _file.variables: var = _file.variables["cell_lengths"] cell_lengths = var[frame_indices] unit = getattr(var, "units", None) if "cell_angles" in _file.variables: cell_angles = _file.variables["cell_angles"][frame_indices] else: warnings.warn( "'cell_lengths' was found in the NetCDF file, but not " "'cell_angles'. It will be assumed that the simulation box " "is orthorhombic." ) cell_angles = np.full_like(cell_lengths, 90.0) dimensions = np.hstack((cell_lengths, cell_angles)) else: return None # Overwrite units, if necessary if unit in {None, "lj"}: if not self._reduced: warnings.warn( "No or 'lj' units were found for system dimensions. It " "will be assumed that the trajectory uses reduced units." ) self._reduced = True self._units["length"] = ureg.dimensionless elif self._units["length"] != (unit := U_(unit)): if self._reduced or self._custom_units["length"]: raise RuntimeError( f"System dimensions have a unit of '{unit}', which " f"does not match '{self._units['length']}' of the " "other lengths in the trajectory." ) self._units["length"] = unit self._custom_units["length"] = True if convert_units and not self._reduced: dimensions[:3] = (dimensions[:3] * self._units["length"]).m_as( INTERNAL_UNITS["length"] ) if manual: _file.close() return dimensions
[docs] def get_forces( self, frame_indices: int | list[int] | slice | None = None, convert_units: bool = True, *, _file: "nc.Dataset" | netcdf_file | None = None, ) -> np.ndarray[np.float64]: """ Gets the forces acting on the atoms. Parameters ---------- frame_indices : `int`, `list`, or `slice`, optional Frame indices. If :code:`None`, the dimensions across all frames are returned. convert_units : `bool`, default :code:`True` Specifies whether to convert the data from AMBER NetCDF units to consistent MDCraft units. Returns ------- forces : `numpy.ndarray` or `openmm.unit.Quantity` Forces acting on the atoms. If the NetCDF file does not contain this information, :code:`None` is returned. **Shape**: :math:`(N_\\mathrm{atoms},3)` or :math:`(N_\\mathrm{frames},N_\\mathrm{atoms},3)`. **Reference unit**: :math:`\\mathrm{kJ/(mol\\cdot Å)}`. """ if _file is None: _file = getattr(self, "_file", None) if _file is None: _file = self._open() manual = True else: manual = False if frame_indices is None: frame_indices = slice(None) if "forces" in _file.variables: var = _file.variables["forces"] forces = var[frame_indices] units = getattr(var, "units", None) else: return None # Overwrite units, if necessary if units in {None, "lj"}: if not self._reduced: warnings.warn( "No or 'lj' units were found for forces exerted on " "atoms. It will be assumed that the trajectory uses " "reduced units." ) self._reduced = True self._units["energy"] = self._units["length"] = ureg.dimensionless else: try: units = U_(units) except DefinitionSyntaxError: # Fix extra parenthesis and wrong capitalization in LAMMPS units units = U_(units[:-1].lower()) if not ( units.is_compatible_with("J/m") or units.is_compatible_with("J/(mol*m)") ): raise RuntimeError( f"Invalid unit '{units}' found for forces exerted on atoms." ) length_unit = U_( next( u for u in units._units if U_(u).dimensionality == "[length]" ) ) if self._units["length"] != length_unit: if self._reduced or self._custom_units["length"]: raise RuntimeError( "Forces exerted on atoms have a length unit of " f"'{length_unit}', which does not match " f"'{self._units['length']}' of the other " "lengths in the trajectory." ) self._units["length"] = length_unit self._custom_units["length"] = True energy_unit = units * length_unit if self._units["energy"] != energy_unit: if self._reduced or self._custom_units["energy"]: raise RuntimeError( "Forces exerted on atoms have an energy unit " f"of '{energy_unit}', which does not match " f"'{self._units['energy']}' of the other " "energies in the trajectory." ) self._units["energy"] = energy_unit self._custom_units["energy"] = True if convert_units and not self._reduced: forces = ( forces * self._units["energy"] / self._units["length"] ).m_as(INTERNAL_UNITS["energy"] / INTERNAL_UNITS["length"]) if manual: _file.close() return forces
[docs] def get_positions( self, frame_indices: int | list[int] | slice | None = None, convert_units: bool = True, *, _file: "nc.Dataset" | netcdf_file | None = None, ) -> np.ndarray[np.float64]: """ Gets the atom positions. Parameters ---------- frame_indices : `int`, `list`, or `slice`, optional Frame indices. If :code:`None`, the positions across all frames are returned. convert_units : `bool`, default :code:`True` Specifies whether to convert the data from AMBER NetCDF units to consistent MDCraft units. Returns ------- positions : `numpy.ndarray` Atom positions. **Shape**: :math:`(N_\\mathrm{atoms},3)` or :math:`(N_\\mathrm{frames},N_\\mathrm{atoms},3)`. **Reference unit**: :math:`\\mathrm{Å}`. """ if _file is None: _file = getattr(self, "_file", None) if _file is None: _file = self._open() manual = True else: manual = False if frame_indices is None: frame_indices = slice(None) # NOTE: The following logic is complex due to support for NetCDF # files written by LAMMPS, which stores scaled and/or # wrapped coordinates to the nonstandard # "scaled_coordinates", "wrapped_coordinates", and # "xsu"/"ysu"/"zsu" keys. scaled_flags = np.full(3, True, np.bool) if "coordinates" in _file.variables: var = _file.variables["coordinates"] positions = var[frame_indices] unit = getattr(var, "units", None) # Check for empty coordinate axes empty_flags = np.any(positions == var.get_fill_value(), axis=0) scaled_flags[~empty_flags] = False elif self._lammps: positions = np.empty( ( () if isinstance(frame_indices, int) else (len(self.times[frame_indices]),) ) + (self.n_atoms, 3), np.float64 if self._restart else np.float32, ) empty_flags = scaled_flags.copy() unit = None else: return None # Special logic for when "coordinates" key is not found or # empty coordinate axes are found reduced_units = {None, "lj"} if self._lammps: if empty_flags.any(): # Check for unwrapped coordinates if "unwrapped_coordinates" in _file.variables: var = _file.variables["unwrapped_coordinates"] unwrapped_positions = var[frame_indices] unwrapped_filled = ~np.any( unwrapped_positions == var.get_fill_value(), axis=0 ) positions[..., unwrapped_filled] = unwrapped_positions[ ..., unwrapped_filled ] empty_flags[unwrapped_filled] = False scaled_flags[unwrapped_filled] = False if unit != (_unit := getattr(var, "units", None)): if unit is None: unit = _unit elif not ( unit in reduced_units and _unit in reduced_units ): raise RuntimeError( f"Unwrapped coordinates have a unit of '{_unit}', " f"which does not match '{unit}' of the other " "coordinates in the trajectory." ) # Check for scaled and unwrapped coordinates if empty_flags.any(): for axis in np.where(empty_flags)[0]: if (key := f"{chr(120 + axis)}su") in _file.variables: var = _file.variables[key] positions[..., axis] = var[frame_indices] empty_flags[axis] = False # NOTE: The following code block is disabled because # LAMMPS currently does not provide units for # scaled and unwrapped coordinates. # if unit != (_unit := getattr(var, "units", None)): # if unit is None: # unit = _unit # elif not ( # unit in reduced_units and _unit in reduced_units # ): # raise RuntimeError( # "Scaled and unwrapped coordinates " # f"have a unit of '{_unit}', which does not " # f"match '{unit}' of the other coordinates " # "in the trajectory." # ) # Check for scaled coordinates if ( empty_flags.any() and "scaled_coordinates" in _file.variables ): var = _file.variables["scaled_coordinates"] scaled_positions = var[frame_indices] scaled_filled = ~np.any( scaled_positions == var.get_fill_value(), axis=0 ) positions[..., scaled_filled] = scaled_positions[ ..., scaled_filled ] empty_flags[scaled_filled] = False # NOTE: The following code block is disabled because LAMMPS # currently does not provide units for scaled coordinates. # if unit != (_unit := getattr(var, "units", None)): # if unit is None: # unit = _unit # elif not (unit in reduced_units and _unit in reduced_units): # raise RuntimeError( # f"Scaled coordinates have a unit of '{_unit}', " # f"which does not match '{unit}' of the other " # "coordinates in the trajectory." # ) # Fill remaining empty coordinate axes with zeros positions[..., empty_flags] = 0 # Recover Cartesian coordinates from scaled coordinates and # system dimensions if scaled_flags.any(): dimensions = self._get_dimensions( frame_indices, convert_units, _file=_file ) if np.allclose(dimensions[3:], 90): positions[..., scaled_flags] *= dimensions[:3][scaled_flags] else: box_vectors = convert_cell_representation( dimensions, "vectors" ) scale_coordinates( positions[..., scaled_flags], box_vectors, scaled_flags, ) positions @= box_vectors # TODO: Check for image flags # TODO: Apply the same fix to LAMMPSDumpReader debug = True # Overwrite units, if necessary if unit in reduced_units: if not self._reduced: warnings.warn( "No or 'lj' units were found for atom positions. " "It will be assumed that the trajectory uses " "reduced units." ) self._reduced = True self._units["length"] = ureg.dimensionless elif self._units["length"] != (unit := U_(unit)): if self._reduced or self._custom_units["length"]: raise RuntimeError( f"Atom positions have a unit of '{unit}', " f"which does not match '{self._units['length']}' " "of the other lengths in the trajectory." ) self._units["length"] = unit self._custom_units["length"] = True if convert_units and not self._reduced: positions = (positions * self._units["length"]).m_as( INTERNAL_UNITS["length"] ) if manual: _file.close() return positions
[docs] def get_times( self, frame_indices: int | list[int] | slice | None = None, convert_units: bool = True, *, _file: "nc.Dataset" | netcdf_file | None = None, ) -> int | np.ndarray[np.float64]: """ Gets the simulation times. Parameters ---------- frames : `int`, `list`, or `slice`, optional Frame indices. If :code:`None`, the times across all frames are returned. convert_units : `bool`, default :code:`True` Specifies whether to convert the data from AMBER NetCDF units to consistent MDCraft units. Returns ------- times : `int` or `numpy.ndarray` Simulation times. **Shape**: Scalar or :math:`(N_\\mathrm{frames},)`. **Reference unit**: :math:`\\mathrm{ps}`. """ if _file is None: _file = getattr(self, "_file", None) if _file is None: _file = self._open() manual = True else: manual = False if frame_indices is None: frame_indices = slice(None) if "time" in _file.variables: var = _file.variables["time"] times = var[frame_indices] units = getattr(var, "units", None) else: return None # Overwrite units, if necessary if units in {None, "lj"}: if not self._reduced: warnings.warn( "No or 'lj' units were found for simulation times. " "It will be assumed that the trajectory uses reduced " "units." ) self._reduced = True self._units["time"] = ureg.dimensionless elif self._units["time"] != (units := U_(units)): if self._reduced or self._custom_units["time"]: raise RuntimeError( f"Simulation times have a unit of '{units}', which " f"does not match '{self._units['time']}' of the " "other times in the trajectory." ) self._units["time"] = units self._custom_units["time"] = True if convert_units and not self._reduced: times = (times * self._units["time"]).m_as(INTERNAL_UNITS["time"]) if manual: _file.close() return times
[docs] def get_velocities( self, frame_indices: int | list[int] | slice | None = None, convert_units: bool = True, *, _file: "nc.Dataset" | netcdf_file | None = None, ) -> np.ndarray[np.float64]: """ Gets the atom velocities. Parameters ---------- frame_indices : `int`, `list`, or `slice`, optional Frame indices. If :code:`None`, the velocities across all frames are returned. convert_units : `bool`, default :code:`True` Specifies whether to convert the data from AMBER NetCDF units to consistent MDCraft units. Returns ------- velocities : `numpy.ndarray` or `openmm.unit.Quantity` Atom velocities. If the NetCDF file does not contain this information, :code:`None` is returned. **Shape**: :math:`(N_\\mathrm{atoms},3)` or :math:`(N_\\mathrm{frames},N_\\mathrm{atoms},3)`. **Reference unit**: :math:`\\mathrm{Å/ps}`. """ if _file is None: _file = getattr(self, "_file", None) if _file is None: _file = self._open() manual = True else: manual = False if frame_indices is None: frame_indices = slice(None) if "velocities" in _file.variables: var = _file.variables["velocities"] velocities = var[frame_indices] units = getattr(var, "units", None) else: return None # Overwrite units, if necessary if units in {None, "lj"}: if not self._reduced: warnings.warn( "No or 'lj' units were found for atom velocities. It " "will be assumed that the trajectory uses reduced units." ) self._reduced = True self._units["length"] = self._units["time"] = ureg.dimensionless else: units = U_(units) if not units.is_compatible_with("m/s"): raise RuntimeError( f"Invalid unit '{units}' found for atom velocities." ) length_unit = U_( next( u for u in units._units if U_(u).dimensionality == "[length]" ) ) if self._units["length"] != length_unit: if self._reduced or self._custom_units["length"]: raise RuntimeError( "Atom velocities have a length unit of " f"'{length_unit}', which does not match " f"'{self._units['length']}' of the other " "lengths in the trajectory." ) self._units["length"] = length_unit self._custom_units["length"] = True time_unit = length_unit / units if self._units["time"] != time_unit: if self._reduced or self._custom_units["time"]: raise RuntimeError( "Atom velocities have a time unit of " f"'{time_unit}', which does not match " f"'{self._units['time']}' of the other " "times in the trajectory." ) self._units["time"] = time_unit self._custom_units["time"] = True if convert_units and not self._reduced: velocities = ( velocities * self._units["length"] / self._units["time"] ).m_as(INTERNAL_UNITS["length"] / INTERNAL_UNITS["time"]) if manual: _file.close() return velocities
def get_remd_variables( self, frame_indices: int | list[int] | slice | None = None, *, _file: "nc.Dataset" | netcdf_file | None = None, ) -> dict[str, np.ndarray[np.float64]]: raise NotImplementedError
[docs] def get_extra_variables( self, frame_indices: int | list[int] | slice | None = None, *, _file: "nc.Dataset" | netcdf_file | None = None, ) -> dict[str, np.ndarray[np.float64]]: """ Gets extra attvariablesributes found in the NetCDF file. Parameters ---------- frame_indices : `int`, `list`, or `slice`, optional Frame indices. If :code:`None`, the dimensions across all frames are returned. Returns ------- variables : `dict` Extra variables found in the NetCDF file. """ if _file is None: _file = getattr(self, "_file", None) if _file is None: _file = self._open() manual = True else: manual = False if frame_indices is None: frame_indices = slice(None) variables = {} # Handle other extra attributes generally for var in ( _file.variables.keys() - self._VARIABLES - self._REMD_VARIABLES - self._LAMMPS_COORDINATE_VARIABLES ): debug = True if manual: _file.close() return variables
if FOUND["MDAnalysis"]: from MDAnalysis.coordinates.base import ReaderBase class _LAMMPSDumpReader(LAMMPSDumpReader, ReaderBase): """ LAMMPS dump file reader compatible with MDAnalysis. .. seealso:: For more information on the features of this reader, see the documentation for :class:`~mdcraft.io.LAMMPSDumpReader`. Parameters ---------- filename : `str` or `pathlib.Path`, positional-only Filename or path to the dump file. coordinate_formats : `str` or `list` of `str`, optional Coordinate formats. If a `str` is provided, it is used for all axes. If the trajectory does not contain particle positions for a specific axis, use `None` for that axis. .. container:: **Valid values**: * :code:`""` for unscaled and wrapped coordinates, * :code:`"s"` for scaled and wrapped coordinates, * :code:`"u"` for unscaled and unwrapped coordinates, or * :code:`"su"` for scaled and unwrapped coordinates. extras : `bool`, `str` or `list` of `str`, keyword-only, optional Extra per-atom information to read. If `True`, all extra attributes found in the trajectory are read. **Valid values:** +-------------------------------------+-------------------------------------------+------------------------+ | Attribute | LAMMPS dump attribute(s) | Per-atom data type | +=====================================+===========================================+========================+ | :code:`"dipole_moments"` | (:code:`mux`, :code:`muy`, :code:`muz`) | `numpy.ndarray[float]` | +-------------------------------------+-------------------------------------------+------------------------+ | :code:`"dipole_moments_magnitudes"` | :code:`mu` | `float` | +-------------------------------------+-------------------------------------------+------------------------+ | :code:`"angular_velocities"` | (:code:`omegax`, :code:`omegay`, | `numpy.ndarray[float]` | | | :code:`omegaz`) | | +-------------------------------------+-------------------------------------------+------------------------+ | :code:`"angular_momenta"` | (:code:`angmomx`, :code:`angmomy`, | `numpy.ndarray[float]` | | | :code:`angmomz`) | | +-------------------------------------+-------------------------------------------+------------------------+ | :code:`"torques"` | (:code:`tqx`, :code:`tqy`, :code:`tqz`) | `numpy.ndarray[float]` | +-------------------------------------+-------------------------------------------+------------------------+ | :code:`"c_{compute_id}"` | (:code:`c_{compute_id}[i]`, ...) | `numpy.ndarray[float]` | +-------------------------------------+-------------------------------------------+------------------------+ | :code:`"d_{name}"` | (:code:`d_{name}[i]`, ...) | `numpy.ndarray[float]` | +-------------------------------------+-------------------------------------------+------------------------+ | :code:`"d2_{name}[i]"` | (:code:`d2_{name}[i][j]`, ...) | `numpy.ndarray[float]` | +-------------------------------------+-------------------------------------------+------------------------+ | :code:`"f_{fix_id}"` | (:code:`f_{fix_id}[i]`, ...) | `numpy.ndarray[float]` | +-------------------------------------+-------------------------------------------+------------------------+ | :code:`"i_{name}"` | (:code:`i_{name}[i]`, ...) | `numpy.ndarray[int]` | +-------------------------------------+-------------------------------------------+------------------------+ | :code:`"i2_{name}[i]"` | (:code:`i2_{name}[i][j]`, ...) | `numpy.ndarray[int]` | +-------------------------------------+-------------------------------------------+------------------------+ | :code:`"v_{name}"` | (:code:`v_{name}[i]`, ...) | `numpy.ndarray[float]` | +-------------------------------------+-------------------------------------------+------------------------+ reduced : `bool`, keyword-only, optional Specifies whether the data is in reduced units. n_workers : `int`, keyword-only Number of threads to use when reading the file. If :code:`None`, the number of available logical threads is used. **kwargs : `dict`, optional Additional keyword arguments to pass to the :class:`MDAnalysis.coordinates.base.ReaderBase` constructor. """ format = "LAMMPSDUMP" def __init__( self, filename: str | Path, /, coordinate_formats: str | list[str] | None = None, *, extras: bool | str | list[str] | None = None, n_workers: int | None = 1, **kwargs, ) -> None: LAMMPSDumpReader.__init__( self, filename, coordinate_formats=coordinate_formats, extras=extras, n_workers=n_workers, ) if self._dump_style != "custom": raise ValueError( "MDAnalysis only supports LAMMPS dump files " "written in the `atom` or `custom` styles." ) ReaderBase.__init__(self, filename, **kwargs) self._reopen() self._read_next_timestep() def __repr__(self) -> str: return ( f"<LAMMPSDumpReader {self._filename.name} with " f"{self.n_frames} frames of {self.n_atoms} atoms>" ) def _reopen(self) -> None: """ Reopens the LAMMPS dump file and stores a handle to it. """ self.close() self.open() self.ts = self._Timestep(self.n_atoms, **self._ts_kwargs) self.ts.frame = -1 def _read_frame(self, frame_index: int) -> "ReaderBase._Timestep": """ Reads a specific frame from the LAMMPS dump file. Parameters ---------- frame_index : `int`, optional Index of frame to read. Returns ------- timestep : `ReaderBase._Timestep` Timestep object with all information from a frame in the trajectory. """ self._file.seek(self._offsets[frame_index]) self.ts.frame = frame_index - 1 return self._read_next_timestep() def _read_next_timestep(self) -> "ReaderBase._Timestep": """ Reads the next timestep from the LAMMPS dump file. Returns ------- timestep : `ReaderBase._Timestep` Timestep object with all information from a frame in the trajectory. """ # Set up timestep ts = self.ts ts.frame += 1 self._check_frame(ts.frame) data = self.read_frames(ts.frame, _convert_units=False) ts.data["step"] = data["timestep"] ts.data["time"] = data["timestep"] * ts.dt ts.dimensions = data["dimensions"] ts.positions = data["positions"] ts.has_forces = "forces" in data if ts.has_forces: ts.forces = data["forces"] ts.has_velocities = "velocities" in data if ts.has_velocities: ts.velocities = data["velocities"] for attr in self._extra_attribute_indices: ts.data[attr] = data[attr] if "ids" in data: order = np.argsort(data["ids"]) ts.positions = ts.positions[order] if ts.has_forces: ts.forces = ts.forces[order] if ts.has_velocities: ts.velocities = ts.velocities[order] for attr in self._extra_attribute_indices: ts.data[attr] = ts.data[attr][order] return ts