from __future__ import annotations
from abc import abstractmethod
from collections.abc import Iterable
from pathlib import Path
from types import TracebackType
from typing import Any, Self, TextIO
import weakref
import numpy as np
import psutil
from .. import U_
[docs]
class BaseReader:
"""
Base class for topology and trajectory readers.
Subclasses must implement the :meth:`open` and :meth:`close` methods
to handle the opening and closing of the file.
Parameters
----------
filename : `str` or `pathlib.Path`, positional-only
Filename or path to the topology or trajectory file.
n_workers : `int`, keyword-only
Number of threads to use when reading the file. If :code:`None`,
the number of available logical threads is used.
"""
_PARALLELIZABLE: bool
def __init__(self, filename: str | Path, /, *, n_workers: int | None) -> None:
# Resolve full path to file
self._filename = Path(filename).resolve(True)
# Store settings
self._n_workers = n_workers or psutil.cpu_count()
# Create finalizer
self._finalizer = weakref.finalize(self, self.close)
def __enter__(self) -> Self:
return self
def __exit__(
self,
exc_type: type | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
self._finalizer()
def __getstate__(self) -> dict[str, Any]:
# Close file before pickling for parallel reading
self.close()
return self.__dict__
[docs]
@abstractmethod
def open(self) -> None:
"""
Opens the topology or trajectory file and stores a handle to it.
"""
pass
[docs]
@abstractmethod
def close(self) -> None:
"""
Closes the topology or trajectory file and deletes the handle.
"""
pass
[docs]
class BaseTopologyReader(BaseReader): # TODO
"""
Base class for topology readers.
Subclasses must set values for
* the :attr:`_FORMAT` and :attr:`_EXTENSIONS` attributes, which
specify the format and standard extension(s) of the topology
file, respectively,
* the :attr:`_PARALLELIZABLE` attribute, which specifies whether the
reader can process a file in parallel,
* the :attr:`_units` attribute, which specifies the base units
(charge, energy, length, mass, temperature, and time) used by the
simulation software that generated the trajectory file,
* the :attr:`_reduced` attribute, which specifies whether the data
is in reduced units,
* the :meth:`dimensions` property, which specifies the simulation
box dimensions (or lattice parameters), and
* the :meth:`n_atoms`, :meth:`n_bonds`, :meth:`n_angles`,
:meth:`n_dihedrals`, :meth:`n_improper_dihedrals`,
:meth:`n_residues`, :meth:`n_segments`, :meth:`n_chains`, and
:meth:`n_molecules` properties, which specify the number of atoms,
bonds, angles, dihedrals, improper dihedrals, residues, segments,
chains, and molecules, respectively,
and implement
* the :meth:`__repr__` method to provide a string representation of
the reader that can be used to recreate it, and
* the :meth:`open` and :meth:`close` methods to handle the opening
and closing of the file.
Parameters
----------
filename : `str` or `pathlib.Path`, positional-only
Filename or path to the topology file.
n_workers : `int`, keyword-only, default: :code:`1`
Number of threads to use when reading the file. If :code:`None`,
the number of available logical threads is used.
"""
_EXTENSIONS: set[str]
_FORMAT: str
_units: dict[str, U_]
_reduced: bool
def __init__(
self,
filename: str | Path,
/,
*,
n_workers: int | None = 1,
) -> None:
super().__init__(filename, n_workers=n_workers)
@abstractmethod
def __repr__(self) -> str:
pass
def __str__(self) -> str:
return (
f"{self.__class__.__name__}: '{self._filename.name}', "
f"{self.n_atoms:,} atoms"
)
@staticmethod
def _get_supported_formats() -> dict[str, object]:
"""
Supported topology formats.
"""
return {r._FORMAT: r for r in BaseTopologyReader.__subclasses__()}
@abstractmethod
def _parse_topology(self, file: TextIO) -> dict[str, Any]:
pass
@property
@abstractmethod
def dimensions(self) -> np.ndarray[np.float64] | None:
"""
Simulation box dimensions (or lattice parameters). If `None`,
the system size could not be determined from the topology.
**Reference units**: :math:`\\mathrm{nm}` for lengths and
degrees (:math:`^\\circ`) for angles.
"""
pass
@property
@abstractmethod
def n_atoms(self) -> int:
"""
Number of atoms.
"""
pass
@property
@abstractmethod
def n_bonds(self) -> int:
"""
Number of bonds.
"""
pass
@property
@abstractmethod
def n_angles(self) -> int:
"""
Number of angles.
"""
pass
@property
@abstractmethod
def n_dihedrals(self) -> int:
"""
Number of dihedrals.
"""
pass
@property
@abstractmethod
def n_residues(self) -> int:
"""
Number of residues.
"""
pass
@property
@abstractmethod
def n_segments(self) -> int:
"""
Number of segments.
"""
pass
@property
@abstractmethod
def n_chains(self) -> int:
"""
Number of chains.
"""
pass
@property
@abstractmethod
def n_molecules(self) -> int:
"""
Number of molecules.
"""
pass
def read_topology(
self,
/,
*,
_convert_units: bool = True,
) -> dict[str, Any] | list[dict[str, Any]]:
pass # TODO
# TODO: Finish specification before implementing readers.
# TODO: Support smaller subdivisions like molecules (residues) and segments (chains)?
[docs]
class BaseTrajectoryReader(BaseReader):
"""
Base class for trajectory readers.
Subclasses must set values for
* the :attr:`_FORMAT` and :attr:`_EXTENSIONS` attributes, which
specify the format and standard extension(s) of the trajectory
file, respectively,
* the :attr:`_PARALLELIZABLE` attribute, which specifies whether the
reader can process a file in parallel,
* the :attr:`_units` attribute, which specifies the base units
(charge, energy, length, mass, temperature, and time) used by the
simulation software that generated the trajectory file,
* the :attr:`_reduced` attribute, which specifies whether the data
is in reduced units,
* the :meth:`dt` and :meth:`time_step` properties, which specify the
time step size between timesteps and the time step between frames,
respectively,
* the :meth:`times` and :meth:`timesteps` properties, which specify
the simulation times and timesteps found in the trajectory, and
* the :meth:`n_atoms` and :meth:`n_frames` properties, which specify
the number of frames in the trajectory and the number of atoms in
each frame, respectively,
and implement
* the :meth:`__repr__` method to provide a string representation of
the reader that can be used to recreate it,
* the :meth:`_parse_frame` method to read and parse data from a
single frame in the trajectory file, and
* the :meth:`open` and :meth:`close` methods to handle the opening
and closing of the file.
Parameters
----------
filename : `str` or `pathlib.Path`, positional-only
Filename or path to the trajectory file.
n_workers : `int`, keyword-only, default: :code:`1`
Number of threads to use when reading the file. If :code:`None`,
the number of available logical threads is used.
"""
_EXTENSIONS: set[str]
_FORMAT: str
_units: dict[str, U_]
_reduced: bool
def __init__(
self,
filename: str | Path,
/,
*,
n_workers: int | None = 1,
) -> None:
super().__init__(filename, n_workers=n_workers)
@abstractmethod
def __repr__(self) -> str:
pass
def __str__(self) -> str:
string = (
f"{self.__class__.__name__}: '{self._filename.name}', "
f"{self.n_frames:,} frame(s)"
)
if self.n_atoms is not None:
string += f", {self.n_atoms:,} atom(s)"
return string
@staticmethod
def _get_supported_formats() -> dict[str, object]:
"""
Supported trajectory formats.
"""
return {r._FORMAT: r for r in BaseTrajectoryReader.__subclasses__()}
def _check_frame(self, frame_index: int) -> None:
"""
Checks if a frame index is valid.
Parameters
----------
frame_index : `int`
Index of frame to check.
"""
if not -self.n_frames <= frame_index < self.n_frames:
raise EOFError(
f"Frame with index {frame_index} was requested from "
f"'{self._filename.name}' with only {self.n_frames} frames."
)
@abstractmethod
def _parse_frame(
self, file: Any, frame_index: int, convert_units: bool
) -> dict[str, Any]:
"""
Reads data from a single frame in the specified trajectory
file.
Parameters
----------
file : any
Handle to the trajectory file.
frame_index : `int`
Index of frame to read.
convert_units : `bool`
Specifies whether to convert the data from LAMMPS units to
consistent MDCraft units.
Returns
-------
frame_data : `dict`
Data from the frame. Keys for common values should match the
parameter names in the :class:`~mdcraft.core.TrajectoryFrame`
class constructor.
"""
pass
@property
@abstractmethod
def dt(self) -> float | None:
"""
Time step size between timesteps in the trajectory. If `None`,
the time step size is not constant across frames or could not be
determined from the trajectory.
**Reference unit**: :math:`\\mathrm{ps}`.
"""
pass
@property
@abstractmethod
def time_step(self) -> float | None:
"""
Time step between frames in the trajectory. If `None`, the time
step is not constant across frames.
**Reference unit**: :math:`\\mathrm{ps}`.
"""
pass
@property
@abstractmethod
def times(self) -> np.ndarray[np.float64]:
"""
Simulation times found in the trajectory.
**Reference unit**: :math:`\\mathrm{ps}`.
"""
pass
@property
@abstractmethod
def timesteps(self) -> np.ndarray[np.uint32] | None:
"""
Simulation timesteps found in the trajectory. If `None`, the
timesteps could not be determined from the trajectory.
"""
pass
@property
@abstractmethod
def n_atoms(self) -> int | None:
"""
Number of atoms in each frame. If `None`, the number of atoms
is not constant across frames.
"""
pass
@property
@abstractmethod
def n_frames(self) -> int:
"""
Number of frames in the trajectory.
"""
pass
[docs]
def read_frames(
self,
frame_indices: int | slice | Iterable[int],
/,
*,
_convert_units: bool = True,
) -> dict[str, Any] | list[dict[str, Any]]:
"""
Reads data from one or more frames from the trajectory file.
Parameters
----------
frame_indices : `int`, `slice`, or array-like, positional-only
Indices of frames to read.
Returns
-------
data : `dict` or `list`
Data from the frame(s).
"""
# Validate indices of frames
if isinstance(frame_indices, (int, np.integer)):
self._check_frame(frame_indices)
else:
if isinstance(frame_indices, slice):
frame_indices = range(*frame_indices.indices(self.n_frames))
for fi in frame_indices:
self._check_frame(fi)
# Open file for parallel reading, if necessary
if self._n_workers > 1:
file = open(self._filename, "r")
else:
self.open()
file = self._file
# Read data from frame(s)
data = (
self._parse_frame(file, frame_indices, _convert_units and not self._reduced)
if isinstance(frame_indices, (int, np.integer))
else [
self._parse_frame(file, fi, _convert_units and not self._reduced)
for fi in frame_indices
]
)
# Close file, if necessary
if self._n_workers > 1:
file.close()
return data
[docs]
class BaseWriter:
"""
Base class for topology and trajectory writers.
Subclasses must implement the :meth:`open` and :meth:`close` methods
to handle the opening and closing of the file.
Parameters
----------
filename : `str` or `pathlib.Path`, positional-only
Filename or path to the topology or trajectory file.
"""
def __init__(self, filename: str | Path) -> None:
# Resolve full path to file
self._filename = Path(filename).resolve()
# Create finalizer
self._finalizer = weakref.finalize(self, self.close)
def __enter__(self) -> Self:
return self
def __exit__(
self,
exc_type: type | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
self._finalizer()
[docs]
@abstractmethod
def open(self) -> None:
"""
Opens the topology or trajectory file and stores a handle to it.
"""
pass
[docs]
@abstractmethod
def close(self) -> None:
"""
Closes the topology or trajectory file and deletes the handle.
"""
pass
[docs]
class BaseTopologyWriter(BaseWriter): # TODO
"""
Base class for topology writers.
"""
[docs]
class BaseTrajectoryWriter(BaseWriter):
"""
Base class for trajectory writers.
"""
_EXTENSIONS: set[str]
_FORMAT: str
_units: dict[str, U_]
# _reduced: bool
def __init__(self, filename: str | Path, **kwargs) -> None:
super().__init__(filename)
@abstractmethod
def __repr__(self) -> str:
pass
def __str__(self) -> str:
pass # TODO
@staticmethod
def _get_supported_formats() -> dict[str, object]:
"""
Supported trajectory formats.
"""
return {r._FORMAT: r for r in BaseTrajectoryReader.__subclasses__()}
@abstractmethod
def _write_frame(self, *args, convert_units: bool, **kwargs) -> None:
pass
@abstractmethod
def _write_frames(self, *args, convert_units: bool, **kwargs) -> None:
pass # TODO: Call _write_frame.