"""
OpenMM utility functions
========================
.. moduleauthor:: Benjamin Ye <GitHub: @bbye98>
This module contains OpenMM-related utility functions.
"""
from datetime import datetime
import itertools
import logging
from typing import Union
import numpy as np
import openmm
from openmm import unit
def _create_context(
system: openmm.System, integrator: openmm.Integrator,
positions: np.ndarray[float], platform: openmm.Platform,
properties: dict) -> openmm.Context:
r"""
Creates an OpenMM Context by cloning the Integrator passed to this
function. Useful for benchmarking different simulation systems.
Parameters
----------
system : `openmm.System`
OpenMM molecular system.
integrator : `openmm.Integrator`
OpenMM integrator or thermostat.
positions : `np.ndarray` or `unit.Quantity`
Initial positions of the :math:`N` particles in the system.
**Shape**: :math:`(N,\,3)`.
**Reference unit**: :math:`\mathrm{nm}`.
platform : `openmm.Platform`
OpenMM platform.
properties : `dict`
Dictionary of platform-specific properties.
Returns
-------
context : `openmm.Context`
OpenMM simulation context.
"""
integrator = openmm.XmlSerializer.clone(integrator)
context = openmm.Context(system, integrator, platform, properties)
context.setPositions(positions)
return context
def _benchmark_integrator(context: openmm.Context, steps: int) -> float:
"""
Benchmarks the performance of an OpenMM Integrator.
Parameters
----------
context : `openmm.Context`
OpenMM simulation context.
Returns
-------
time : `float`
Elapsed time, in seconds.
"""
start = datetime.now()
context.getIntegrator().step(steps)
return (datetime.now() - start).total_seconds()
[docs]
def optimize_pme(
system: openmm.System, integrator: openmm.Integrator,
positions: Union[np.ndarray[float], unit.Quantity],
platform: openmm.Platform, properties: dict,
min_cutoff: Union[float, unit.Quantity],
max_cutoff: Union[float, unit.Quantity], *,
pmeforce: openmm.NonbondedForce = None, cpu_pme: bool = True,
target: float = 10, target_std: float = None, window: int = 3,
fastest: int = 5, rerun: int = 2, verbose: bool = True
) -> tuple[unit.Quantity, bool]:
"""
Runs a series of simulations using different parameters to determine
the optimal configuration for evaluating electrostatic interactions
with the particle mesh Ewald (PME) method on a GPU (CUDA or OpenCL).
The cutoff distance for the Coulomb potential can be freely varied
with no accuracy penalty since OpenMM automatically selects internal
parameters to satisfy the specified error tolerance. However, it may
affect the accuracy of other nonbonded interactions, so care must be
taken to ensure the optimal Coulomb potential cutoff is compatible
with the other pair potentials in the system.
In certain cases, OpenMM can perform better with the reciprocal
space calculations being done on the CPU while the direct space
calculations are being evaluated on the GPU.
Parameters
----------
system : `openmm.System`
OpenMM molecular system.
integrator : `openmm.Integrator`
OpenMM integrator or thermostat.
positions : `np.ndarray` or `unit.Quantity`
Initial positions of the :math:`N` particles in the system.
**Shape**: :math:`(N,\\,3)`.
**Reference unit**: :math:`\\mathrm{nm}`.
platform : `openmm.Platform`
OpenMM platform.
properties : `dict`
Dictionary of platform-specific properties.
min_cutoff : `float` or `unit.Quantity`
Minimum cutoff distance to test.
**Reference unit**: :math:`\\mathrm{nm}`.
max_cutoff : `float` or `unit.Quantity`
Maximum cutoff distance to test.
**Reference unit**: :math:`\\mathrm{nm}`.
pmeforce : `openmm.NonbondedForce` or `openmm.AmoebaMultipoleForce`, \
keyword-only, optional
Pair potential with electrostatic interactions that are
evaluated using PME.
cpu_pme : `bool`, keyword-only, default: :code:`True`
Determines whether CPU PME should be benchmarked.
target : `float`, keyword-only, default: :code:`10`
Target simulation time for each test run, in seconds.
target_std : `float`, keyword-only, optional
Allowed variability for `target`, in seconds. If set to a value
that is too small, the target simulation time may never be
satisfied. If not specified, it is set to 10% of `target`.
window : `int`, keyword-only, default: :code:`3`
Number of previous runs to look at before deciding whether to
stop testing larger cutoffs.
fastest : `int`, keyword-only, default: :code:`5`
Number of fastest configurations to retest before deciding on
the optimal cutoff distance and hardware (architecture).
rerun : `int`, keyword-only, default :code:`2`
Number of reruns to complete for the fastest preliminary runs.
verbose : `bool`, keyword-only, default: :code:`True`
Determines whether detailed progress is shown.
Returns
-------
cutoff : `unit.Quantity`
Optimal cutoff distance.
**Reference unit**: :math:`\\mathrm{nm}`.
cpu_pme : `bool`
Specifies whether to use the CPU to perform reciprocal space
calculations.
"""
# Set up logger
logging.basicConfig(format="{asctime} | {levelname:^8s} | {message}",
style="{",
level=logging.INFO if verbose else logging.WARNING)
# Get information about the pair potential of interest
if pmeforce is None:
for force in system.getForces():
if isinstance(force, (openmm.NonbondedForce,
openmm.AmoebaMultipoleForce)):
pmeforce = force
break
if pmeforce.getNonbondedMethod() != openmm.NonbondedForce.PME:
raise ValueError("The provided (or guessed) pair potential is "
"not being evaluated using the particle mesh "
"Ewald (PME) method.")
cpu_pme &= isinstance(pmeforce, openmm.NonbondedForce) \
and platform.supportsKernels(["CalcPmeReciprocalForce"])
tol = pmeforce.getEwaldErrorTolerance()
# Determine the number of timesteps to run for each cutoff distance
logging.info("Determining a reasonable number of timesteps for PME optimizer...")
pmeforce.setCutoffDistance(np.sqrt(min_cutoff * max_cutoff))
properties["UseCpuPme"] = "false"
context = _create_context(system, integrator, positions, platform,
properties)
if target_std is None:
target_std = 0.1 * target
time_width = max(9, np.ceil(np.log10(target)).astype(int) + 7)
lb, ub = target - target_std, target + target_std
steps, time = 20, 0
while True:
time = _benchmark_integrator(context, steps)
logging.info(f" GPU: {steps:14,} ts "
f"===> {time:{time_width}.5f} s elapsed")
if lb < time < ub:
break
steps = int(target * steps / time)
if cpu_pme:
properties["UseCpuPme"] = "true"
context = _create_context(system, integrator, positions, platform,
properties)
steps_cpu, time = 20, 0
while True:
time = _benchmark_integrator(context, steps_cpu)
logging.info(f" CPU: {steps_cpu:14,} ts "
f"===> {time:{time_width}.5f} s elapsed")
if lb < time < ub:
break
steps_cpu = int(target * steps_cpu / time)
steps = min(steps, steps_cpu)
steps = np.round(steps, 2 - np.ceil(np.log10(steps)).astype(int))
logging.info(f"Starting PME optimizer (using {steps:,} timesteps)...")
# Build list of cutoff distances to test
if isinstance(min_cutoff, unit.Quantity):
min_cutoff = min_cutoff.value_in_unit(unit.nanometer)
if isinstance(max_cutoff, unit.Quantity):
max_cutoff = max_cutoff.value_in_unit(unit.nanometer)
cutoffs = {"gpu": {min_cutoff}}
if cpu_pme:
cutoffs["cpu"] = {min_cutoff}
for dim in [v[i].value_in_unit(unit.nanometer)
for i, v in enumerate(system.getDefaultPeriodicBoxVectors())]:
# Iterate through possible grid sizes
for n_mesh in itertools.count(start=5):
# Check if grid size is legal for FFT
check = n_mesh
for factor in (2, 3, 5, 7):
while check > 1 and check % factor == 0:
check /= factor
if check not in (1, 11, 13):
continue
# Compute smallest cutoff that will give the current grid
# size
alpha = 1.5 * n_mesh * tol ** 0.2 / dim
cutoff = np.round(np.sqrt(-np.log(2 * tol) / alpha), 3)
if cutoff < min_cutoff:
break
if cutoff < max_cutoff:
if cpu_pme:
cutoffs["cpu"].add(cutoff)
if check == 1:
cutoffs["gpu"].add(cutoff)
# Get preliminary times for the different architectures and cutoffs
cutoff_width = max(7, np.ceil(
np.log10(max(max(v) for v in cutoffs.values()))
).astype(int) + 6)
times = {}
for arch, cut in cutoffs.items():
cutoffs[arch] = np.array(sorted(cut))
times[arch] = np.empty(cutoffs[arch].shape)
times[arch][:] = np.nan
for i, cutoff in enumerate(cutoffs[arch]):
pmeforce.setCutoffDistance(cutoff)
properties["UseCpuPme"] = str(arch == "cpu").lower()
context = _create_context(system, integrator, positions, platform,
properties)
times[arch][i] = _benchmark_integrator(context, steps)
logging.info(f" {arch.upper()}: {cutoff:{cutoff_width}.4f} nm cutoff "
f"===> {times[arch][i]:{time_width}.5f} s elapsed")
# Stop iteration if simulation is continuously getting slower
if i > window and np.all(times[arch][i - window:i] >
times[arch][i - window - 1:i - 1]):
break
best = sorted([t, c, a] for a in times.keys()
for c, t in zip(cutoffs[a], times[a]))[:fastest]
# Rerun the fastest configurations to ensure correct results
for i, (time, cutoff, arch) in enumerate(best):
pmeforce.setCutoffDistance(cutoff)
properties["UseCpuPme"] = str(arch == "cpu").lower()
context = _create_context(system, integrator, positions, platform,
properties)
# Replace preliminary time with median time from reruns
best[i][0] = sorted(
(time, *[_benchmark_integrator(context, steps) for _ in range(rerun)])
)[1]
best.sort()
# Display fastest configurations and timings
time_width = 8 + 2 * np.ceil(max(0, time_width - 8) // 2).astype(int)
cutoff_width = 11 + 2 * np.ceil(max(0, cutoff_width - 11) // 2).astype(int)
logging.info(f"PME optimization completed.\n"
f" Rank | {'Time (s)':^{time_width}} "
f"| {'Cutoff (nm)':^{cutoff_width}} | CPU PME\n"
f" ------|{'-' * (time_width + 2)}"
f"|{'-' * (cutoff_width + 2)}|---------\n " +
"\n ".join(f" {i + 1:>4} | {time:{time_width}.5f} |"
f" {cutoff:{cutoff_width}.4f} | {arch == 'cpu'}"
for i, (time, cutoff, arch) in enumerate(best)))
# Return optimized configuration
return best[0][1] * unit.nanometer, arch == "cpu"