"""Handle file input/output in parllel HDF5 fashion
"""
import numpy as np
import h5py
import os
import logging
import getpass
from mpi4py import MPI
from .logger import Logger, get_version
[docs]class OutDataset:
"""HDF5 dataset handler for file output"""
def __init__(
self,
dest_directory,
config,
double_out=False,
disable_mpio=False,
comm=MPI.COMM_WORLD,
):
"""Constructor
Parameters
----------
dest_directory : str
Ouput directory for saving data.
config : Config
Configuration object.
double_out : bool, optional
If :code:`True`, the output HDF5 objects are written in eight byte
floating point representation. Otherwise, four byte single
precision is used.
disable_mpio : bool, optional
If :code:`True`, disables parallel MPI-enabled HDF5 file output.
This is a compatibility option used if HDF5 is not compiled with
MPI support. This makes everything much harder by splitting all
output in one file per MPI rank, and having an MPI-enabled HDF5
library is **highly recommended**.
comm : mpi4py.Comm
MPI communicator to use for rank commuication.
"""
self.disable_mpio = disable_mpio
self.config = config
if double_out:
self.float_dtype = "float64"
else:
self.float_dtype = "float32"
if disable_mpio:
self.file = h5py.File(
os.path.join(
dest_directory, f"sim.hdf5-{comm.rank:6d}-of-{comm.size:6d}"
),
"w",
)
else:
self.file = h5py.File(
os.path.join(dest_directory, "sim.H5"), "w", driver="mpio", comm=comm
)
[docs] def is_open(self, comm=MPI.COMM_WORLD):
"""Check if HDF5 output file is open
Parameters
----------
comm : mpi4py.Comm
MPI communicator to use for rank communication.
"""
comm.Barrier()
return self.file.__bool__()
[docs] def close_file(self, comm=MPI.COMM_WORLD):
"""Closes the HDF5 output file
Parameters
----------
comm : mpi4py.Comm
MPI communicator to use for rank communication.
"""
comm.Barrier()
self.file.close()
[docs] def flush(self):
"""Flushes output buffers, forcing file writes"""
self.file.flush()
[docs]def setup_time_dependent_element(
name, parent_group, n_frames, shape, dtype, units=None
):
"""Helper function for setting up time-dependent HDF5 group datasets
All output groups must adhere to the H5MD standard, meaning a structure of
| ┗━ **group** particle group (e.g. :code:`all`) or :code:`observables` group
| ┗━ **group** time-dependent data
| ┣━ **dataset** :code:`step` :code:`shape=(n_frames,)`
| ┣━ **dataset** :code:`time` :code:`shape=(n_frames,)`
| ┗━ **dataset** :code:`value` :code:`shape=(n_frames, *)`
is necessary.
References
----------
H5MD specification :
https://www.nongnu.org/h5md/h5md.html
""" # noqa: E501
group = parent_group.create_group(name)
step = group.create_dataset("step", (n_frames,), "int32")
time = group.create_dataset("time", (n_frames,), "float32")
value = group.create_dataset("value", (n_frames, *shape), dtype)
if units is not None:
value.attrs["unit"] = units
time.attrs["unit"] = "ps"
return group, step, time, value
[docs]def store_static(
h5md,
rank_range,
names,
types,
indices,
config,
bonds_2_atom1,
bonds_2_atom2,
molecules=None,
velocity_out=False,
force_out=False,
charges=False,
dielectrics=False,
plumed_out=False,
comm=MPI.COMM_WORLD,
):
"""Outputs all static time-independent quantities to the HDF5 output file
Parameters
----------
h5md : OutDataset
HDF5 dataset handler.
rank_range : list[int]
Start and end indices for global arrays for each MPI rank.
names : (N,) numpy.ndarray
Array of names for :code:`N` particles.
types : (N,) numpy.ndarray
Array of type indices for :code:`N` particles.
indices : (N,) numpy.ndarray
Array of indices for :code:`N` particles.
config : Config
Configuration object.
bonds_2_atom1 : (B,) numpy.ndarray
Array of indices of the first particle for :code:`B` total
two-particle bonds.
bonds_2_atom2 : (B,) numpy.ndarray
Array of indices of the second particle for :code:`B` total
two-particle bonds.
molecules : (N,) numpy.ndarray, optional
Array of integer molecule affiliation for each of :code:`N` particles.
Global (across all MPI ranks) or local (local indices on this MPI rank
only) may be used, both, without affecting the result.
velocity_out : bool, optional
If :code:`True`, velocities are written to output HDF5 file.
force_out : bool, optional
If :code:`True`, forces are written to output HDF5 file.
charges : (N,) numpy.ndarray
Array of particle charge values for :code:`N` particles.
dielectrics : (N,) numpy.ndarray
Array of particle relative dielectric values for :code:`N` particles.
plumed_out : bool, optional
If :code:`True`, PLUMED bias is written to output HDF5 file.
comm : mpi4py.Comm
MPI communicator to use for rank commuication.
See also
--------
prepare_bonds :
Constructs two-, three-, and four-particle bonds from toplogy input
file and bond configuration information.
distribute_input :
Distributes input arrays onto MPI ranks, attempting load balancing.
"""
dtype = h5md.float_dtype
h5md_group = h5md.file.create_group("/h5md")
h5md.h5md_group = h5md_group
h5md.observables = h5md.file.create_group("/observables")
h5md.connectivity = h5md.file.create_group("/connectivity")
h5md.parameters = h5md.file.create_group("/parameters")
h5md_group.attrs["version"] = np.array([1, 1], dtype=int)
author_group = h5md_group.create_group("author")
author_group.attrs["name"] = np.string_(getpass.getuser())
creator_group = h5md_group.create_group("creator")
creator_group.attrs["name"] = np.string_("Hylleraas MD")
# Get HyMD version
creator_group.attrs["version"] = np.string_(get_version())
h5md.particles_group = h5md.file.create_group("/particles")
h5md.all_particles = h5md.particles_group.create_group("all")
mass = h5md.all_particles.create_dataset("mass", (config.n_particles,), dtype)
mass[...] = config.mass
if charges is not False:
charge = h5md.all_particles.create_dataset(
"charge", (config.n_particles,), dtype="float32"
)
charge[indices] = charges
if dielectrics is not False:
dielectric = h5md.all_particles.create_dataset(
"dielectric", (config.n_particles,), dtype="float32"
)
dielectric[indices] = dielectrics
box = h5md.all_particles.create_group("box")
box.attrs["dimension"] = 3
box.attrs["boundary"] = np.array(
[np.string_(s) for s in 3 * ["periodic"]], dtype="S8"
)
n_frames = config.n_steps // config.n_print
if np.mod(config.n_steps - 1, config.n_print) != 0:
n_frames += 1
if np.mod(config.n_steps, config.n_print) == 1:
n_frames += 1
if n_frames == config.n_steps:
n_frames += 1
species = h5md.all_particles.create_dataset(
"species",
(config.n_particles,),
dtype="i",
)
(
_,
h5md.positions_step,
h5md.positions_time,
h5md.positions,
) = setup_time_dependent_element(
"position",
h5md.all_particles,
n_frames,
(config.n_particles, 3),
dtype,
units="nm",
)
if velocity_out:
(
_,
h5md.velocities_step,
h5md.velocities_time,
h5md.velocities,
) = setup_time_dependent_element(
"velocity",
h5md.all_particles,
n_frames,
(config.n_particles, 3),
dtype,
units="nm ps-1",
)
if force_out:
(
_,
h5md.forces_step,
h5md.forces_time,
h5md.forces,
) = setup_time_dependent_element(
"force",
h5md.all_particles,
n_frames,
(config.n_particles, 3),
dtype,
units="kJ mol-1 nm-1",
)
(
_,
h5md.total_energy_step,
h5md.total_energy_time,
h5md.total_energy,
) = setup_time_dependent_element(
"total_energy",
h5md.observables,
n_frames,
(1,),
dtype,
units="kJ mol-1", # noqa: E501
)
(
_,
h5md.kinetc_energy_step,
h5md.kinetc_energy_time,
h5md.kinetc_energy,
) = setup_time_dependent_element(
"kinetic_energy",
h5md.observables,
n_frames,
(1,),
dtype,
units="kJ mol-1", # noqa: E501
)
(
_,
h5md.potential_energy_step,
h5md.potential_energy_time,
h5md.potential_energy,
) = setup_time_dependent_element(
"potential_energy",
h5md.observables,
n_frames,
(1,),
dtype,
units="kJ mol-1", # noqa: E501
)
(
_,
h5md.bond_energy_step,
h5md.bond_energy_time,
h5md.bond_energy,
) = setup_time_dependent_element(
"bond_energy",
h5md.observables,
n_frames,
(1,),
dtype,
units="kJ mol-1", # noqa: E501
)
(
_,
h5md.angle_energy_step,
h5md.angle_energy_time,
h5md.angle_energy,
) = setup_time_dependent_element(
"angle_energy",
h5md.observables,
n_frames,
(1,),
dtype,
units="kJ mol-1", # noqa: E501
)
(
_,
h5md.dihedral_energy_step,
h5md.dihedral_energy_time,
h5md.dihedral_energy,
) = setup_time_dependent_element(
"dihedral_energy",
h5md.observables,
n_frames,
(1,),
dtype,
units="kJ mol-1", # noqa: E501
)
(
_,
h5md.field_energy_step,
h5md.field_energy_time,
h5md.field_energy,
) = setup_time_dependent_element(
"field_energy",
h5md.observables,
n_frames,
(1,),
dtype,
units="kJ mol-1", # noqa: E501
)
if charges is not False:
(
_,
h5md.field_q_energy_step,
h5md.field_q_energy_time,
h5md.field_q_energy,
) = setup_time_dependent_element(
"field_q_energy",
h5md.observables,
n_frames,
(1,),
dtype,
units="kJ mol-1", # noqa: E501
) # <-------- xinmeng
if plumed_out is not False:
(
_,
h5md.plumed_bias_step,
h5md.plumed_bias_time,
h5md.plumed_bias,
) = setup_time_dependent_element(
"plumed_bias", h5md.observables, n_frames, (1,), dtype, units="kJ mol-1" # noqa: E501
)
(
_,
h5md.total_momentum_step,
h5md.total_momentum_time,
h5md.total_momentum,
) = setup_time_dependent_element( # noqa: E501
"total_momentum",
h5md.observables,
n_frames,
(3,),
dtype,
units="nm g ps-1 mol-1",
)
(
_,
h5md.angular_momentum_step,
h5md.angular_momentum_time,
h5md.angular_momentum,
) = setup_time_dependent_element( # noqa: E501
"angular_momentum",
h5md.observables,
n_frames,
(3,),
dtype,
units="nm+2 g ps-1 mol-1",
)
(
_,
h5md.torque_step,
h5md.torque_time,
h5md.torque,
) = setup_time_dependent_element( # noqa: E501
"torque",
h5md.observables,
n_frames,
(3,),
dtype,
units="kJ nm+2 mol-1",
)
(
_,
h5md.temperature_step,
h5md.temperature_time,
h5md.temperature,
) = setup_time_dependent_element(
"temperature", h5md.observables, n_frames, (3,), dtype, units="K"
)
(
_,
h5md.thermostat_work_step,
h5md.thermostat_work_time,
h5md.thermostat_work,
) = setup_time_dependent_element(
"thermostat_work",
h5md.observables,
n_frames,
(1,),
"float32",
units="kJ mol-1", # noqa: E501
)
(
_,
h5md.pressure_step,
h5md.pressure_time,
h5md.pressure,
) = setup_time_dependent_element(
"pressure", h5md.observables, n_frames, (18,), "float32", units="Bar"
)
(_, h5md.box_step, h5md.box_time, h5md.box_value,) = setup_time_dependent_element(
"edges", box, n_frames, (3, 3), "float32", units="nm"
)
ind_sort = np.argsort(indices)
for i in ind_sort:
species[indices[i]] = config.name_to_type_map[names[i].decode("utf-8")]
h5md.parameters.attrs["config.toml"] = np.string_(str(config))
vmd_group = h5md.parameters.create_group("vmd_structure")
index_of_species = vmd_group.create_dataset(
"indexOfSpecies", (config.n_types,), "i"
)
index_of_species[:] = np.array(list(range(config.n_types)))
# VMD-h5mdplugin maximum name/type name length is 16 characters (for
# whatever reason [VMD internals?]).
name_dataset = vmd_group.create_dataset("name", (config.n_types,), "S16")
type_dataset = vmd_group.create_dataset("type", (config.n_types,), "S16")
if molecules is not None:
resid_dataset = vmd_group.create_dataset(
"resid",
(config.n_particles,),
"i",
)
# Change this
for i, n in config.type_to_name_map.items():
name_dataset[i] = np.string_(n[:16])
if n == "W":
type_dataset[i] = np.string_("solvent")
else:
type_dataset[i] = np.string_("membrane")
total_bonds = comm.allreduce(len(bonds_2_atom1), MPI.SUM)
n_bonds_local = len(bonds_2_atom1)
receive_buffer = comm.gather(n_bonds_local, root=0)
n_bonds_global = None
if comm.Get_rank() == 0:
n_bonds_global = receive_buffer
n_bonds_global = np.array(comm.bcast(n_bonds_global, root=0))
rank_bond_start = np.sum(n_bonds_global[: comm.Get_rank()])
bonds_from = vmd_group.create_dataset("bond_from", (total_bonds,), "i")
bonds_to = vmd_group.create_dataset("bond_to", (total_bonds,), "i")
for i in range(n_bonds_local):
a = bonds_2_atom1[i]
b = bonds_2_atom2[i]
bonds_from[rank_bond_start + i] = indices[a] + 1
bonds_to[rank_bond_start + i] = indices[b] + 1
if molecules is not None:
resid_dataset[indices[ind_sort]] = molecules
# store data old vs
"""
h5md, step, frame, indices, positions, velocities, forces, box_size,
temperature, kinetic_energy, bond2_energy, bond3_energy, bond4_energy,
field_energy, field_q_energy, time_step, config, velocity_out=False,
force_out=False, charge_out=False, dump_per_particle=False,
"""
[docs]def store_data(
h5md,
step,
frame,
indices,
positions,
velocities,
forces,
box_size,
temperature,
pressure,
kinetic_energy,
bond2_energy,
bond3_energy,
bond4_energy,
field_energy,
field_q_energy,
plumed_bias,
time_step,
config,
velocity_out=False,
force_out=False,
charge_out=False,
plumed_out=False,
dump_per_particle=False,
comm=MPI.COMM_WORLD,
):
"""Writes time-step data to HDF5 output file
Handles all quantities which change during simulation, as opposed to
static quanitities (see :code:`store_static`).
Parameters
----------
h5md : OutDataset
HDF5 dataset handler.
step : int
Step number.
frame : int
Output frame number (:code:`step // n_print`).
indices : (N,) numpy.ndarray
Array of indices for :code:`N` particles.
positions : (N,) numpy.ndarray
Array of positions for :code:`N` particles in :code:`D` dimensions.
velocities : (N,) numpy.ndarray
Array of velocities for :code:`N` particles in :code:`D` dimensions.
forces : (N,) numpy.ndarray
Array of forces for :code:`N` particles in :code:`D` dimensions.
box_size : (D,) numpy.ndarray
Array containing the simulation box size.
temperature : float
Calculated instantaneous temperature.
kinetic_energy : float
Calculated instantaneous kinetic energy.
bond2_energy : float
Calculated instantaneous harmonic two-particle bond energy.
bond3_energy : float
Calculated instantaneous harmonic angular three-particle bond energy.
bond4_energy : float
Calculated instantaneous dihedral four-particle torsion energy.
field_energy : float
Calculated instantaneous particle-field energy.
field_q_energy : float
Calculated instantaneous electrostatic energy.
plumed_bias : float
PLUMED instantaneous bias energy.
time_step : float
Value of the time step.
config : Config
Configuration object.
velocity_out : bool, optional
If :code:`True`, velocities are written to output HDF5 file.
force_out : bool, optional
If :code:`True`, forces are written to output HDF5 file.
charge_out : bool, optional
If :code:`True`, electrostatic energies are written to the output
HDF5 file.
plumed_out : bool, optional
If :code:`True`, PLUMED bias is written to the output HDF5 file.
dump_per_particle : bool, optional
If :code:`True`, all quantities are written **per particle**.
comm : mpi4py.Comm
MPI communicator to use for rank commuication.
See also
--------
store_static :
Outputs all static time-independent quantities to the HDF5 output file
"""
for dset in (
h5md.positions_step,
h5md.total_energy_step,
h5md.potential_energy_step,
h5md.kinetc_energy_step,
h5md.bond_energy_step,
h5md.angle_energy_step,
h5md.dihedral_energy_step,
h5md.field_energy_step,
h5md.total_momentum_step,
h5md.angular_momentum_step,
h5md.torque_step,
h5md.temperature_step,
h5md.pressure_step,
h5md.box_step,
h5md.thermostat_work_step,
):
dset[frame] = step
for dset in (
h5md.positions_time,
h5md.total_energy_time,
h5md.potential_energy_time,
h5md.kinetc_energy_time,
h5md.bond_energy_time,
h5md.angle_energy_time,
h5md.dihedral_energy_time,
h5md.field_energy_time,
h5md.total_momentum_time,
h5md.angular_momentum_time,
h5md.torque_time,
h5md.temperature_time,
h5md.pressure_time,
h5md.box_time,
h5md.thermostat_work_time,
):
dset[frame] = step * time_step
if velocity_out:
h5md.velocities_step[frame] = step
h5md.velocities_time[frame] = step * time_step
if force_out:
h5md.forces_step[frame] = step
h5md.forces_time[frame] = step * time_step
if charge_out:
h5md.field_q_energy_step[frame] = step
h5md.field_q_energy_time[frame] = step * time_step
if plumed_out:
h5md.plumed_bias_step[frame] = step
h5md.plumed_bias_time[frame] = step * time_step
ind_sort = np.argsort(indices)
h5md.positions[frame, indices[ind_sort]] = positions[ind_sort]
if velocity_out:
h5md.velocities[frame, indices[ind_sort]] = velocities[ind_sort]
if force_out:
h5md.forces[frame, indices[ind_sort]] = forces[ind_sort]
if charge_out:
h5md.field_q_energy[frame] = field_q_energy
if plumed_out:
h5md.plumed_bias[frame] = plumed_bias
potential_energy = (
bond2_energy + bond3_energy + bond4_energy + field_energy + field_q_energy
)
total_momentum = config.mass * comm.allreduce(np.sum(velocities, axis=0), MPI.SUM)
angular_momentum = config.mass * comm.allreduce(
np.sum(np.cross(positions, velocities), axis=0), MPI.SUM
)
torque = config.mass * comm.allreduce(
np.sum(np.cross(positions, forces), axis=0), MPI.SUM
)
h5md.total_energy[frame] = kinetic_energy + potential_energy
h5md.potential_energy[frame] = potential_energy
h5md.kinetc_energy[frame] = kinetic_energy
h5md.bond_energy[frame] = bond2_energy
h5md.angle_energy[frame] = bond3_energy
h5md.dihedral_energy[frame] = bond4_energy
h5md.field_energy[frame] = field_energy
h5md.total_momentum[frame, :] = total_momentum
h5md.angular_momentum[frame, :] = angular_momentum
h5md.torque[frame, :] = torque
h5md.temperature[frame] = temperature
h5md.pressure[frame] = pressure
for d in range(3):
h5md.box_value[frame, d, d] = box_size[d]
h5md.thermostat_work[frame] = config.thermostat_work
fmt_ = [
"step",
"time",
"temp",
"tot E",
"kin E",
"pot E",
"field E",
"elec E",
"bond E",
"ang E",
"dih E",
"bias E",
"Px",
"Py",
"Pz",
"ΔH" if config.target_temperature else "ΔE",
]
fmt_ = np.array(fmt_)
# create mask to show only energies != 0
en_array = np.array([
field_energy,
field_q_energy,
bond2_energy,
bond3_energy,
bond4_energy,
plumed_bias,
])
mask = np.full_like(fmt_, True, dtype=bool)
mask[range(6,12)] = en_array != 0.
header_ = fmt_[mask].shape[0] * "{:>13}"
if config.initial_energy is None:
fmt_[-1] = ""
divide_by = 1.0
if dump_per_particle:
for i in range(3, 9):
fmt_[i] = fmt_[i][:-2] + "E/N"
fmt_[-1] += "/N"
divide_by = config.n_particles
total_energy = kinetic_energy + potential_energy
if config.initial_energy is not None:
if config.target_temperature:
H_tilde = total_energy - config.initial_energy - config.thermostat_work
else:
H_tilde = total_energy - config.initial_energy
else:
H_tilde = 0.0
header = header_.format(*fmt_[mask])
data_fmt = f'{"{:13}"}{(fmt_[mask].shape[0]-1) * "{:13.5g}" }'
all_data = [
step,
time_step * step,
temperature,
total_energy / divide_by,
kinetic_energy / divide_by,
potential_energy / divide_by,
field_energy / divide_by,
field_q_energy / divide_by,
bond2_energy / divide_by,
bond3_energy / divide_by,
bond4_energy / divide_by,
plumed_bias / divide_by,
total_momentum[0] / divide_by,
total_momentum[1] / divide_by,
total_momentum[2] / divide_by,
H_tilde / divide_by,
]
data = data_fmt.format(*[val for i, val in enumerate(all_data) if mask[i]])
Logger.rank0.log(logging.INFO, ("\n" + header + "\n" + data))