Source code for hymd.input_parser

"""Parses and handles the configuration information provided for the simulation
"""
import copy
import tomli
import datetime
import logging
import warnings
import numpy as np
from mpi4py import MPI
from dataclasses import dataclass, field
from typing import List, Union, ClassVar
from .force import Bond, Angle, Dihedral, Chi, Dielectric_type, propensity_potential_coeffs
from .barostat import Target_pressure
from .logger import Logger


[docs]@dataclass class Config: """Configuration object Handles and verifies the simulation configuration specified in the configuration file. Attributes ---------- gas_constant : float Constant value of the gas constant, R (equivalently the Boltzmann constant) in the units used internally in HyMD. coulomb_constant : float Constant value of the Coulomb constant which converts electric field values to forces and electric potential values to energies in the units used internally in HyMD. n_steps: int Number of time steps in the simulation. time_step: float *Outer* time step used in the simulation. If the rRESPA intergrator is used, the *inner* time step (the time step used in the integration of intramolecular bending, stretching, and torsional forces) is :code:`time_step / respa_inner`. box_size : list[float] or (D,) numpy.ndarray Simulation box size of simulation in :code:`D` dimensions in units of nanometers. mesh_size : list[int] or int or numpy.ndarray Number of grid points used for the discrete density grid. sigma : float Filter width representing the effective coarse-graining level of the particles in the simulation. kappa : float Compressibility parameter used in the relaxed incompressibility term in the interaction energy functional. n_print : int, optional Frequency of trajectory/energy output to the H5MD trajectory/energy output file (in units of number of time steps). tau : float, optional The time scale of the CSVR thermostat coupling. start_temperature : float, optional Generate starting temperature by assigning all particle velocities randomly according to the Maxwell-Boltzmann distribution at :code:`start_temperature` Kelvin prior to starting the simulation. target_temperature : float, optional Couple the system to a heat bath at :code:`target_temperature` Kelvin. mass : float, optional Mass of the particles in the simulation. hamiltonian : str, optional Specifies the interaction energy functional :math:`W[\\tilde\\phi]` for use with the particle-field interactions. Options: :code:`SquaredPhi`, :code:`DefaultNohChi`, or :code:`DefaultWithChi`. domain_decomposition : int, optional Specifies the interval (in time steps) of domain decomposition exchange, involving all MPI ranks sending and receiving particles according to the particles’ positions in the integration box and the MPI ranks’ assigned domain. integrator : str, optional Specifies the time integrator used in the simulation. Options: :code:`velocity-verlet` or :code:`respa`. respa_inner : int, optional The number of inner time steps in the rRESPA integrator. This denotes the number of intramolecular force calculations (stretching, bending, torsional) are performed between each impulse applied from the field forces. file_name : str, optional File path of the parsed configuration file. name : str, optional Name for the simulation. tags : list[str], optional Tags for the simulation. bonds : list[Bond], optional Specifies harmonic stretching potentials between particles in the same molecule. angle_bonds : list[Angle], optional Specifies harmonic angular bending potentials between particles in the same molecule. dihedrals : list[Dihedral], optional Specifies four-particle torsional potentials by cosine series. chi : list[Chi], optional Specifies :math:`\\chi`-interaction parameters between particle species. n_particles : int, optional Specifies the total number of particles in the input. Optional keyword for validation, ensuring the input HDF5 topology has the correct number of particles and molecules. max_molecule_size : int, optional Maximum size of any single molecule in the system. Used to speed up distribution of particles onto MPI ranks in a parallel fashion. n_flush : int, optional Frequency of HDF5 write buffer flush, forcing trajectory/energy to be written to disk (in units of number of :code:`n_print`). thermostat_work : float Work performed by the thermostat on the system. thermostat_coupling_groups : list[str], optional Specifies individual groups coupling independently to the CSVR thermostat. E.g. in a system containing :code:`"A"`, :code:`"B"`, and :code:`"C"` type particles, :code:`thermostat_coupling_groups = [["A", "B"], ["C"],]` would thermalise types :code:`"A"` and :code:`"B"` together and couple :code:`"C"` type particles to a different thermostat (all individual thermostats are at the same temperature, i.e. target_temperature Kelvin). initial_energy : float Value of the total energy prior to the start of the simulation. cancel_com_momentum : int, optional If :code:`True`, the total linear momentum of the center of mass is removed before starting the simulation. If an integer is specifed, the total linear momentum of the center of mass is removed every :code:`remove_com_momentum` time steps. If :code:`False`, the linear momentum is never removed. coulombtype : str, optional Specifies the type of electrostatic Coulomb interactions in the system. The strength of the electrostatic forces is modulated by the relative dielectric constant of the simulation medium, specified with the :code:`dielectric_const` keyword. Charges for individual particles are specified in the structure/topology HDF5 input file, not in the configuration file. If no charges (or peptide backbone dipoles) are present, the electrostatic forces will not be calculated even if this keyword is set to :code:`"PIC_Spectral"`. dielectric_const : float, optional Specifies the relative dielectric constant of the simulation medium which regulates the strength of the electrostatic interactions. When using helical propensity dihedrals, this keyword must be specified—even if electrostatics are not included with the :code:`coulombtype` keyword. dielectric_type: list[float], optional Specifies the relative dielectric constant of the simulation medium which regulates the strength of the electrostatic interactions. The list assigns relative dielectric values to each bead type, and an anisotropic dielectric function is obtained from a weighted average. See also -------- hymd.input_parser.Bond : Two-particle bond type dataclass. hymd.input_parser.Angle : Three-particle bond type dataclass. hymd.input_parser.Dihedral : Four-particle bond type dataclass. """ gas_constant: ClassVar[float] = 0.0083144621 # kJ mol-1 K-1 coulomb_constant: ClassVar[float] = 138.935458 # kJ nm mol-1 e-2 n_steps: int time_step: float mesh_size: Union[Union[List[int], np.ndarray], int] sigma: float kappa: float dtype: np.dtype = None box_size: Union[List[float], np.ndarray] = None n_print: int = None tau: float = None start_temperature: Union[float, bool] = None target_temperature: Union[float, bool] = None mass: float = None hamiltonian: str = None domain_decomposition: Union[int, bool] = None integrator: str = None respa_inner: int = 1 file_name: str = "<config file path unknown>" name: str = None tags: List[str] = field(default_factory=list) bonds: List[Bond] = field(default_factory=list) angle_bonds: List[Angle] = field(default_factory=list) dihedrals: List[Dihedral] = field(default_factory=list) chi: List[Chi] = field(default_factory=list) n_particles: int = None max_molecule_size: int = None n_flush: int = None thermostat_work: float = 0.0 thermostat_coupling_groups: List[List[str]] = field(default_factory=list) initial_energy: float = None cancel_com_momentum: Union[int, bool] = False coulombtype: str = None convergence_type: str = None pol_mixing: float = None dielectric_const: float = None conv_crit: float = None dielectric_type: List[Dielectric_type] = field(default_factory=list) self_energy: float = None type_charges: Union[List[float], np.ndarray] = None # For NPT runs rho0: float = None a: float = None pressure: bool = False barostat: str = None barostat_type: str = None tau_p: float = None target_pressure: List[Target_pressure] = field(default_factory=list) n_b: int = None m: List[float] = field(default_factory=list) def __str__(self): target_pressure_str = "\ttarget_pressure:\n" + "".join( "\t\tP_L: " + f"{self.target_pressure.P_L}\n" + "\t\tP_N: " + f"{self.target_pressure.P_N}\n" ) bonds_str = "\tbonds:\n" + "".join( [ (f"\t\t{k.atom_1} {k.atom_2}: " f"{k.equilibrium}, {k.strength}\n") for k in self.bonds ] ) angle_str = "\tangle_bonds:\n" + "".join( [ ( f"\t\t{k.atom_1} {k.atom_2} {k.atom_3}: " + f"{k.equilibrium}, {k.strength}\n" ) for k in self.angle_bonds ] ) dihedrals_str = "\tdihedrals:\n" + "".join( [ ( f"\t\t{k.atom_1} {k.atom_2} {k.atom_3} {k.atom_4}: " # This might need to be fixed/made prettier, probably # there's an easier way + ( "\n\t\t" + " " * len(f"{k.atom_1} {k.atom_2} {k.atom_3} {k.atom_4}: ") ).join( map( str, [ [round(num, 3) for num in c_in] if isinstance(c_in, list) else c_in for c_in in k.coeffs ], ) ) + ( "\n\t\t" + " " * len(f"{k.atom_1} {k.atom_2} {k.atom_3} {k.atom_4}: ") ) + f"dih_type = {k.dih_type}\n" ) for k in self.dihedrals ] ) chi_str = "\tchi:\n" + "".join( [ (f"\t\t{k.atom_1} {k.atom_2}: " + f"{k.interaction_energy}\n") for k in self.chi ] ) """ If dielectric wanted as dictionary dielectric_type_str = "\tdielectric_type:\n" + "".join( [ (f"\t\t{k.atom_1}: " + f"{k.dielectric_value}\n") for k in self.dielectric_type ] ) """ thermostat_coupling_groups_str = "" if any(self.thermostat_coupling_groups): thermostat_coupling_groups_str = ( "\tthermostat_coupling_groups:\n" + "".join( [ "\t\t" + ", ".join([f"{n}" for n in ng]) + "\n" for ng in self.thermostat_coupling_groups ] ) ) ret_str = f'\n\n\tConfig: {self.file_name}\n\t{50 * "-"}\n' for k, v in self.__dict__.items(): if k not in ( "target_pressure", "bonds", "angle_bonds", "dihedrals", "chi", "thermostat_coupling_groups", ): ret_str += f"\t{k}: {v}\n" ret_str += ( target_pressure_str + bonds_str + angle_str + dihedrals_str + chi_str + thermostat_coupling_groups_str ) return ret_str
[docs]def read_config_toml(file_path): with open(file_path, "r") as in_file: file_content = in_file.read() return file_content
[docs]def parse_config_toml(toml_content, file_path=None, comm=MPI.COMM_WORLD): config_dict = {} # read toml to a dictionary toml_content = tomli.loads(toml_content) # Defaults = None for n in ( "box_size", "n_print", "tau", "start_temperature", "target_temperature", "mass", "hamiltonian", "domain_decomposition", "integrator", "name", "n_particles", "max_molecule_size", "coulombtype", "convergence_type", "dielectric_const", "pol_mixing", "conv_crit", "dielectric_type", "n_b", "box_size", "n_flush", "self_energy", "dtype", ): config_dict[n] = None # Defaults = [] for n in ( "bonds", "angle_bonds", "dihedrals", "chi", "tags", "m", "dielectric_type", "target_pressure", "type_charges", ): config_dict[n] = [] # Defaults: bool config_dict["pressure"] = False # Flatten the .toml dictionary, ignoring the top level [tag] directives (if # any). for k, v in toml_content.items(): if isinstance(v, dict): if k == "nn": # Don't parse diff-hymd optimization options continue for nested_k, nested_v in v.items(): config_dict[nested_k] = nested_v else: config_dict[k] = v for k, v in config_dict.items(): if k == "bonds": config_dict["bonds"] = [None] * len(v) for i, b in enumerate(v): config_dict["bonds"][i] = Bond( atom_1=b[0], atom_2=b[1], equilibrium=b[2], strength=b[3], ) if k == "angle_bonds": config_dict["angle_bonds"] = [None] * len(v) for i, b in enumerate(v): config_dict["angle_bonds"][i] = Angle( atom_1=b[0], atom_2=b[1], atom_3=b[2], equilibrium=b[3], strength=b[4], ) if k == "dihedrals": config_dict["dihedrals"] = [None] * len(v) for i, b in enumerate(v): try: dih_type = int(b[2][0]) except IndexError: Logger.rank0.log( logging.WARNING, "Dihedral type not provided, defaulting to 0." ) dih_type = 0 # Probably it's better to move this in check_dihedrals? wrong_len = len(b[1]) not in (1, 2) wrong_type_1 = len(b[1]) == 1 and not isinstance(b[1][0], float) wrong_type_2 = len(b[1]) == 2 and not isinstance(b[1][0], list) if wrong_len or wrong_type_1 or wrong_type_2: err_str = ( "The coefficients specified for the dihedral type " "(0) do not match the correct structure. Either " "use [lambda] or [[cn_prop], [dn_prop]], or select" " the correct dihedral type." ) Logger.rank0.log(logging.ERROR, err_str) if comm.Get_rank() == 0: raise RuntimeError(err_str) # FIXME: this is messy af, I don't like it if dih_type == 0 and isinstance(b[1][0], (float, int)): coeff = propensity_potential_coeffs(b[1][0]) elif dih_type == 1 and len(b[1]) == 3: coeff = np.array( propensity_potential_coeffs(b[1][0][0]).tolist() + b[1][1:] ) elif dih_type == 2: coeff = np.array(b[1]) else: coeff = np.insert(np.array(b[1]), 2, np.zeros((2, 5)), axis=0) config_dict["dihedrals"][i] = Dihedral( atom_1=b[0][0], atom_2=b[0][1], atom_3=b[0][2], atom_4=b[0][3], coeffs=coeff, dih_type=dih_type, ) if k == "chi": config_dict["chi"] = [None] * len(v) for i, c in enumerate(v): c_ = sorted([c[0], c[1]]) config_dict["chi"][i] = Chi( atom_1=c_[0], atom_2=c_[1], interaction_energy=c[2] ) """ if k == "dielectric_type": config_dict["dielectric_type"] = [None] * len(v) for i, c in enumerate(v): c_ = sorted([c[0][0]]) config_dict["dielectric_type"][i] = Dielectric_type( atom_1=c_[0], dielectric_value=c[1][0] ) """ if k == "target_pressure": if len(v) == 2: config_dict["target_pressure"] = Target_pressure( P_L=v[0], P_N=v[1], # check condition # V array (still read as array?) ) elif len(v) == 1: config_dict["target_pressure"] = Target_pressure(P_L=v[0], P_N=None) else: config_dict["target_pressure"] = Target_pressure(P_L=None, P_N=None) if file_path is not None: config_dict["file_name"] = file_path for n in ( "n_steps", "time_step", "mesh_size", "sigma", "kappa" ): if n not in config_dict: err_str = ( f"No {n} specified in config file {file_path}. Unable to start" f" simulation." ) Logger.rank0.log(logging.ERROR, err_str) if comm.Get_rank() == 0: raise ValueError(err_str) return Config(**config_dict)
[docs]def check_n_particles(config, indices, comm=MPI.COMM_WORLD): n_particles = comm.allreduce(len(indices), MPI.SUM) if config.n_particles is None: config = copy.deepcopy(config) config.n_particles = n_particles info_str = ( f"No n_particles found in toml file {config.file_name}, defaulting" f" to indices.shape ({n_particles})" ) Logger.rank0.log(logging.INFO, info_str) return config if n_particles != config.n_particles: warn_str = ( f"n_particles in {config.file_name} ({config.n_particles}) does " "not match the shape of the indices array in the .HDF5 file " f"({n_particles}). Defaulting to using indices.shape " f"({n_particles})" ) Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) config = copy.deepcopy(config) config.n_particles = n_particles return config
[docs]def check_max_molecule_size(config, comm=MPI.COMM_WORLD): if config.max_molecule_size is None: info_str = ( f"No max_molecule_size found in toml file {config.file_name}, " f"defaulting to 201" ) Logger.rank0.log(logging.INFO, info_str) config = copy.deepcopy(config) config.max_molecule_size = 201 return config if config.max_molecule_size < 1: warn_str = ( f"max_molecule_size in {config.file_name} must be a positive " f"integer, not {config.max_molecule_size}, defaulting to 201" ) Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) config = copy.deepcopy(config) config.max_molecule_size = 201 return config return config
def _find_unique_names(config, names, comm=MPI.COMM_WORLD): unique_names = np.unique(names) receive_buffer = comm.gather(unique_names, root=0) gathered_unique_names = None if comm.Get_rank() == 0: gathered_unique_names = np.unique(np.concatenate(receive_buffer)) unique_names = comm.bcast(gathered_unique_names, root=0) unique_names = sorted([n.decode("UTF-8") for n in unique_names]) config.unique_names = unique_names config.n_types = len(unique_names) return config def _setup_type_to_name_map(config, names, types, comm=MPI.COMM_WORLD): if not hasattr(config, "unique_names"): config = _find_unique_names(config, names) name_to_type_ = {} for n, t in zip(names, types): n = n.decode("utf-8") if n not in name_to_type_: name_to_type_[n] = t receive_buffer = comm.gather(name_to_type_, root=0) gathered_dict = None if comm.Get_rank() == 0: gathered_dict = {} for d in receive_buffer: for k, v in d.items(): if k not in gathered_dict: gathered_dict[k] = v else: assert v == gathered_dict[k] name_to_type_map = comm.bcast(gathered_dict, root=0) config.name_to_type_map = name_to_type_map config.type_to_name_map = {v: k for k, v in name_to_type_map.items()} return config
[docs]def check_bonds(config, names, comm=MPI.COMM_WORLD): if not hasattr(config, "unique_names"): config = _find_unique_names(config, names) unique_names = config.unique_names for b in config.bonds: if b.atom_1 not in unique_names or b.atom_2 not in unique_names: missing_str = "" if b.atom_1 not in unique_names: if b.atom_2 not in unique_names: if b.atom_1 == b.atom_2: missing_str = f"no {b.atom_1} atoms" else: missing_str = f"neither {b.atom_1}, {b.atom_2} atoms" else: missing_str = f"no {b.atom_1} atoms" else: missing_str = f"no {b.atom_2} atoms" warn_str = ( f"Bond type {b.atom_1}--{b.atom_2} specified in " f"{config.file_name} but {missing_str} are present in the " f"specified system (names array)" ) Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) return config
[docs]def check_angles(config, names, comm=MPI.COMM_WORLD): if not hasattr(config, "unique_names"): config = _find_unique_names(config, names) unique_names = config.unique_names for a in config.angle_bonds: if ( a.atom_1 not in unique_names or a.atom_2 not in unique_names or a.atom_3 not in unique_names ): missing = [ a.atom_1 not in unique_names, a.atom_2 not in unique_names, a.atom_3 not in unique_names, ] missing_names = [ atom for i, atom in enumerate([a.atom_1, a.atom_2, a.atom_3]) if missing[i] ] missing_str = ", ".join(np.unique(missing_names)) warn_str = ( f"Angle bond type {a.atom_1}--{a.atom_2}--{a.atom_3} " f"specified in {config.file_name} but no {missing_str} atoms " f"are present in the specified system (names array)" ) Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) return config
[docs]def check_dihedrals(config, names, comm=MPI.COMM_WORLD): if not hasattr(config, "unique_names"): config = _find_unique_names(config, names) unique_names = config.unique_names for d in config.dihedrals: if ( d.atom_1 not in unique_names or d.atom_2 not in unique_names or d.atom_3 not in unique_names or d.atom_4 not in unique_names ): missing = [ d.atom_1 not in unique_names, d.atom_2 not in unique_names, d.atom_3 not in unique_names, d.atom_4 not in unique_names, ] missing_names = [ atom for i, atom in enumerate([d.atom_1, d.atom_2, d.atom_3, d.atom_4]) if missing[i] ] missing_str = ", ".join(np.unique(missing_names)) warn_str = ( f"Dihedral type {d.atom_1}--{d.atom_2}--{d.atom_3}--{d.atom_4}" f" specified in {config.file_name} but no {missing_str} atoms " f"are present in the specified system (names array)" ) Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) return config
[docs]def check_chi(config, names, comm=MPI.COMM_WORLD): if not hasattr(config, "unique_names"): config = _find_unique_names(config, names) unique_names = config.unique_names for c in config.chi: if c.atom_1 not in unique_names or c.atom_2 not in unique_names: missing_str = "" if c.atom_1 not in unique_names: if c.atom_2 not in unique_names: if c.atom_1 == c.atom_2: missing_str = f"no {c.atom_1} atoms" else: missing_str = f"neither {c.atom_1}, {c.atom_2} atoms" else: missing_str = f"no {c.atom_1} atoms" else: missing_str = f"no {c.atom_2} atoms" warn_str = ( f"Chi interaction {c.atom_1}--{c.atom_2} specified in " f"{config.file_name} but {missing_str} are present in the " f"specified system (names array)" ) Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) for i, n in enumerate(unique_names): for m in unique_names[i + 1 :]: found = False for c in config.chi: if (c.atom_1 == n and c.atom_2 == m) or ( c.atom_1 == m and c.atom_2 == n ): found = True if not found: config.chi.append(Chi(atom_1=n, atom_2=m, interaction_energy=0.0)) warn_str = ( f"Atom types {n} and {m} found in the " f"system, but no chi interaction {n}--{m} " f"specified in {config.file_name}. Defaulting to " f"chi[{n}, {m}] = 0" ) Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) return config
[docs]def check_box_size(config, input_box, comm=MPI.COMM_WORLD): if config.box_size is not None: config.box_size = np.array(config.box_size, dtype=np.float32) if input_box.all() and not np.allclose(config.box_size, input_box, atol=0.009): err_str = ( f"Box size specified in {config.file_name}: " f"{config.box_size} does not match input box:" f"{input_box}" ) Logger.rank0.log(logging.ERROR, err_str) if comm.Get_rank() == 0: raise ValueError(err_str) else: if input_box.all(): config.box_size = input_box else: err_str = f"No box information found" Logger.rank0.log(logging.ERROR, err_str) if comm.Get_rank() == 0: raise ValueError(err_str) for b in config.box_size: if b <= 0.0: err_str = ( f"Invalid box size specified in {config.file_name}: " f"{config.box_size}" ) Logger.rank0.log(logging.ERROR, err_str) if comm.Get_rank() == 0: raise ValueError(err_str) return config
[docs]def check_integrator(config, comm=MPI.COMM_WORLD): if config.integrator.lower() not in ("velocity-verlet", "respa"): err_str = ( f"Invalid integrator specified in {config.file_name}: " f'{config.integrator}. Available options "velocity-verlet" or ' f'"respa".' ) Logger.rank0.log(logging.ERROR, err_str) if comm.Get_rank() == 0: raise ValueError(err_str) if config.integrator.lower() == "respa": if isinstance(config.respa_inner, float): warn_str = ( f"Number of inner rRESPA time steps in " f"{config.file_name}: {config.respa_inner} specified " f"as float, using {int(config.respa_inner)}" ) Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) config.respa_inner = int(config.respa_inner) elif not isinstance(config.respa_inner, int): err_str = ( f"Invalid number of inner rRESPA time steps in " f"{config.file_name}: {config.respa_inner}. Must be " f"positive integer" ) Logger.rank0.log(logging.ERROR, err_str) raise TypeError(err_str) if config.respa_inner <= 0: err_str = ( f"Invalid number of inner rRESPA time steps in " f"{config.file_name}: {config.respa_inner}. Must be " f"positive integer" ) Logger.rank0.log(logging.ERROR, err_str) raise ValueError(err_str) if config.integrator.lower() == "velocity-verlet" and config.respa_inner != 1: warn_str = ( f"Integrator type Velocity-Verlet specified in {config.file_name} " f"and inner rRESPA time steps set to {config.respa_inner}. " f"Using respa_inner = 1" ) Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) config.respa_inner = 1 return config
[docs]def check_hamiltonian(config, comm=MPI.COMM_WORLD): valid_hamiltonians = ["defaultnochi", "defaultwithchi", "squaredphi"] if config.hamiltonian is None: if len(config.chi) == 0: warn_str = ( f"No hamiltonian form and no chi interactions specified in " f"{config.file_name}, defaulting to DefaultNoChi hamiltonian" ) Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) config.hamiltonian = "DefaultNoChi" else: warn_str = ( f"No hamiltonian form specified in {config.file_name}, but " f"chi interactions are specified, defaulting to " f"DefaultWithChi hamiltonian" ) Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) config.hamiltonian = "DefaultWithChi" elif config.hamiltonian.lower() not in valid_hamiltonians: err_str = ( f"The specified Hamiltonian {config.hamiltonian} was not " f"recognized as a valid Hamiltonian." ) Logger.rank0.log(logging.ERROR, err_str) raise NotImplementedError(err_str) return config
[docs]def check_n_flush(config, comm=MPI.COMM_WORLD): if config.n_print: if config.n_flush is None: config.n_flush = 10000 // config.n_print return config
[docs]def check_n_print(config, comm=MPI.COMM_WORLD): if ( not isinstance(config.n_print, int) and not isinstance(config.n_print, float) and config.n_print is not None ): err_str = f"invalid value for n_print ({config.n_print})" Logger.rank0.log(logging.ERROR, err_str) raise RuntimeError(err_str) if config.n_print is None or config.n_print <= 0: config.n_print = False elif not isinstance(config.n_print, int): if isinstance(config.n_print, float): warn_str = ( f"n_print is a float ({config.n_print}), not int, using " f"{int(round(config.n_print))}" ) Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) config.n_print = int(round(config.n_print)) else: err_str = f"invalid value for n_print ({config.n_print})" Logger.rank0.log(logging.ERROR, err_str) raise RuntimeError(err_str) return config
[docs]def check_tau(config, comm=MPI.COMM_WORLD): if config.tau is None and config.target_temperature is not None: warn_str = "target temp specified but no tau, defaulting 0.7" config.tau = 0.7 Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) return config
[docs]def check_start_and_target_temperature(config, comm=MPI.COMM_WORLD): """Validate provided starting and target thermostat temperatures Assesses the provided temperature target and ensures it is a non-negative floating point number or :code:`False`. Ensures the starting temperature is a non-negative floating point number or :code:`False`. If the value for either is :code:`None`, the returned configuration object has the values defaulted to :code:`False` in each case. Parameters ---------- config : Config Configuration object. Returns ------- validated_config : Config Configuration object with validated :code:`target_temperature` and :code:`start_temperature`. """ for t in ("start_temperature", "target_temperature"): if getattr(config, t) is not None: try: if getattr(config, t) < 0: warn_str = ( f"{t} set to negative value ({getattr(config, t)}), " f"defaulting to False" ) setattr(config, t, False) Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) except TypeError as e: err_str = ( f"Could not interpret {t} = {repr(getattr(config, t))} as " f"a number." ) raise TypeError(err_str) from e if config.start_temperature is None: config.start_temperature = False if config.target_temperature is None: config.target_temperature = False return config
[docs]def check_mass(config, comm=MPI.COMM_WORLD): if config.mass is None: info_str = "no mass specified, defaulting to 72.0" config.mass = 72.0 Logger.rank0.log(logging.INFO, info_str) elif not isinstance(config.mass, int) and not isinstance(config.mass, float): err_str = ( f"specified mass is invalid type {config.mass}, " f"({type(config.mass)})" ) Logger.rank0.log(logging.ERROR, err_str) raise TypeError(err_str) elif config.mass < 0: err_str = f"invalid mass specified, {config.mass}" Logger.rank0.log(logging.ERROR, err_str) raise ValueError(err_str) else: config.mass = float(config.mass) return config
[docs]def check_domain_decomposition(config, comm=MPI.COMM_WORLD): dd = config.domain_decomposition if dd is None or dd is False: config.domain_decomposition = False return config if isinstance(dd, int): if dd < 0: warn_str = "negative domain_decomposition specified, using False" config.domain_decomposition = False Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) elif isinstance(dd, float): if dd < 0.0: warn_str = "negative domain_decomposition specified, using False" config.domain_decomposition = False else: warn_str = ( f"domain_decomposition ({config.domain_decomposition}) is not " f"an integer, using {int(round(dd))}" ) config.domain_decomposition = int(round(dd)) Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) else: err_str = ( f"invalid value for domain_decomposition " f"({config.domain_decomposition}) use an integer" ) Logger.rank0.log(logging.ERROR, err_str) raise ValueError(err_str) return config
[docs]def check_name(config, comm=MPI.COMM_WORLD): if config.name is None: root_current_time = "" if comm.Get_rank() == 0: root_current_time = datetime.datetime.now().strftime("%m/%d/%Y, %H:%M:%S") current_time = comm.bcast(root_current_time, root=0) if config.name is None: config.name = "sim" + current_time else: config.name = str(config.name) return config
[docs]def check_NPT_conditions(config, comm=MPI.COMM_WORLD): """ Check validity of barostat_type, barostat, a, rho0, target_pressure, tau_p """ if config.barostat is None: if ( config.tau_p is not None or (config.target_pressure.P_L and config.target_pressure.P_N) is not None ): err_str = ( "barostat not specified but config.tau_p " "or config.target_pressure specified, cannot start simulation {config.barostat}" ) Logger.rank0.log(logging.ERROR, err_str) if comm.Get_rank() == 0: raise TypeError(err_str) if config.a: warn_str = "a specified but no barostat," "setting a to average density" Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) if config.rho0: warn_str = ( "rho0 specified but no barostat," "setting rho0 to average density" ) Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) if config.barostat is not None: if not config.barostat_type: config.barostat_type = "berendsen" warn_str = "barostat_type not specified," "setting to berendsen" Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) if config.barostat != "isotropic" and config.barostat != "semiisotropic": err_str = "barostat option not recognised. Valid options: isotropic, semiisotropic" Logger.rank0.log(logging.ERROR, err_str) if comm.Get_rank() == 0: raise TypeError(err_str) if config.target_pressure.P_L is None: config.target_pressure.P_L = 1.0 # bar config.target_pressure.P_N = None if config.barostat == "semiisotropic": config.target_pressure.P_N = 1.0 # bar warn_str = ( "barostat specified but no target_pressure, defaulting to 1.0 bar" ) Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) if config.tau_p is None: if config.tau <= 0.1: config.tau_p = config.tau * 10.0 else: config.tau_p = 1.0 warn_str = "barostat specified but no tau_p, defaulting to " + str( config.tau_p ) Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) if not config.a: err_str = "a not specified; cannot start simulation {config.a}" Logger.rank0.log(logging.ERROR, err_str) if comm.Get_rank() == 0: raise TypeError(err_str) if not config.rho0: warn_str = "rho0 not specified;" "setting rho0 to average density" Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) return config
[docs]def check_thermostat_coupling_groups(config, comm=MPI.COMM_WORLD): if any(config.thermostat_coupling_groups): found = [0 for _ in config.unique_names] unique_names = [n for n in config.unique_names] for i, group in enumerate(config.thermostat_coupling_groups): for species in group: try: ind = unique_names.index(species) found[ind] += 1 except ValueError as e: err_str = ( f"Particle species {species} specified in thermostat " f"coupling group {i}, but no {species} particles were " "found in the system." ) raise ValueError(err_str) from e if any([True if f > 1 else False for f in found]): for i, f in enumerate(found): if f > 1: species = unique_names[i] err_str = ( f"Particle species {species} specified in multiple " "thermostat coupling groups; " f"{config.thermostat_coupling_groups}." ) raise ValueError(err_str) if not all([True if f == 1 else False for f in found]): for i, f in enumerate(found): if f != 1: species = unique_names[i] err_str = ( f"Particle species {species} not specified in any " f"thermostat coupling group, but {species} particles " "were found in the system" ) raise ValueError(err_str) return config
[docs]def check_m(config, comm=MPI.COMM_WORLD): if config.m == []: config.m = [1.0 for t in range(config.n_types)] return config
[docs]def check_n_b(config, comm=MPI.COMM_WORLD): if config.n_b is None: warn_str = f"config.n_b not specified." "Defaulting to 1" config.n_b = 1 Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) return config
[docs]def check_cancel_com_momentum(config, comm=MPI.COMM_WORLD): if isinstance(config.cancel_com_momentum, int): if config.cancel_com_momentum == 0 or config.cancel_com_momentum < 0: config.cancel_com_momentum = False elif not isinstance(config.cancel_com_momentum, bool): err_str = ( f"Could not interpret {config.cancel_com_momentum} as an integer " f"or boolean." ) raise ValueError(err_str) return config
[docs]def sort_dielectric_by_type_id(config, charges, types): """ Creates a list of length N CG-particles, sorted after the charges from the input HDF5 file. Used in file_io.py """ dielectric_val = np.zeros(config.n_types) for j in range(config.n_types): name = config.dielectric_type[j][0][0] type_id = config.name_to_type_map[name] dielectric_val[type_id] = config.dielectric_type[j][1][0] config.dielectric_type = dielectric_val.copy() N = len(charges) len_list = np.zeros(config.n_types) dielectric_by_types = np.zeros(N) for i in range(N): dielectric_by_types[i] = dielectric_val[types[i]] # print("types ", types) # print("diel val", dielectric_by_types) # print("charges", charges) # print(config.name_to_type_map) return dielectric_by_types # by types with each particle id
[docs]def check_charges_types_list(config, types, charges, comm=MPI.COMM_WORLD): """ Creates a list of charge values of length types. Charges are sorted according to type ID. Used in field.py. # TODO: this is messy, we should fix it """ if charges is None: config.type_charges = [0.0] * config.n_types return config check_val = -100.0 # some random value that will never be encountered charges_list = np.full(config.n_types, check_val) # gatherv cant handle None rank = comm.Get_rank() # print(charges_list) for t_ in range(config.n_types): if t_ in types: charges_list[t_] = charges[types == t_][0] nprocs = int(comm.Get_size()) recv_charges = None if rank == 0: recv_charges = np.full( config.n_types * nprocs, check_val ) # gatherv cant handle None comm.Gather(charges_list, recv_charges, root=0) ## make a charges list if rank == 0: config_charges = np.zeros(config.n_types) test_config = np.full(config.n_types, check_val) for j in range(nprocs): for i in range(config.n_types): if recv_charges[i + j * config.n_types] != check_val: config_charges[i] = recv_charges[i + j * config.n_types] if np.any([test_config, config_charges]): continue else: break # print(config_charges) # print(config.name_to_type_map) # print(charges[types == 0][0],charges[types == 1][0],charges[types == 2][0],charges[types == 3][0], # charges[types == 4][0]) else: config_charges = np.zeros(config.n_types) comm.Bcast(config_charges, root=0) # print("config_charges", config_charges , "rank", rank) # print(recv_charges) # rank = comm.Get_rank() # print(all_charges) config.type_charges = config_charges return config
[docs]def check_dielectric(config, comm=MPI.COMM_WORLD): """ Error handling for electrostatics. Unit testing of toml/tomli input. """ err_str_const = "Dielectric constant not given." if config.coulombtype == "PIC_Spectral": assert config.dielectric_const != None, err_str_const err_str = "Dielectric list is empty." if config.coulombtype == "PIC_Spectral_GPE": assert len(config.dielectric_type) != 0, err_str # default values if config.pol_mixing is None: config.pol_mixing = 0.6 if config.conv_crit is None: config.conv_crit = 1e-6 return config
[docs]def check_charges(config, charges, comm=MPI.COMM_WORLD): """Check if charges across ranks sum to zero. Parameters ---------- charges : (N,) numpy.ndarray Array of floats with charges for :code:`N` particles. comm : mpi4py.Comm, optional MPI communicator, defaults to :code:`mpi4py.COMM_WORLD`. """ total_charge = comm.allreduce(np.sum(charges), MPI.SUM) if not np.isclose(total_charge, 0.0): warn_str = ( f"Charges in the input file do not sum to zero. " f"Total charge is {total_charge}." ) Logger.rank0.log(logging.WARNING, warn_str) if comm.Get_rank() == 0: warnings.warn(warn_str) # if charges are ok, compute self energy if config.coulombtype == "PIC_Spectral": from .field import compute_self_energy_q config.self_energy = compute_self_energy_q(config, charges, comm=comm) return config
[docs]def check_config( config, indices, names, types, charges, input_box, comm=MPI.COMM_WORLD ): """Performs various checks on the specfied config to ensure consistency Parameters ---------- config : Config Configuration object. indices : (N,) numpy.ndarray Array of integer indices for :code:`N` particles. names : (N,) numpy.ndarray Array of string names for :code:`N` particles. types : (N,) numpy.ndarray Array of integer type indices for :code:`N` particles. charges : (N,) numpy.ndarray Array of floats charges for :code:`N` particles. comm : mpi4py.Comm, optional MPI communicator, defaults to :code:`mpi4py.COMM_WORLD`. Returns ------- config : Config Validated configuration object. """ config = _find_unique_names(config, names, comm=comm) if types is not None: config = _setup_type_to_name_map(config, names, types, comm=comm) config = check_box_size(config, input_box, comm=comm) config = check_integrator(config, comm=comm) config = check_max_molecule_size(config, comm=comm) config = check_tau(config, comm=comm) config = check_start_and_target_temperature(config, comm=comm) config = check_mass(config, comm=comm) config = check_domain_decomposition(config, comm=comm) config = check_name(config, comm=comm) config = check_n_particles(config, indices, comm=comm) config = check_chi(config, names, comm=comm) config = check_bonds(config, names, comm=comm) config = check_angles(config, names, comm=comm) config = check_dihedrals(config, names, comm=comm) config = check_hamiltonian(config, comm=comm) config = check_NPT_conditions(config, comm=comm) config = check_n_b(config, comm=comm) config = check_m(config, comm=comm) config = check_thermostat_coupling_groups(config, comm=comm) config = check_cancel_com_momentum(config, comm=comm) config = check_dielectric(config, comm=comm) config = check_n_print(config, comm=comm) config = check_n_flush(config, comm=comm) config = check_charges_types_list(config, types, charges, comm=comm) if charges is not None: config = check_charges(config, charges, comm=comm) return config