Source code for htpolynet.analysis.postsim

"""Handles the postsim subcommand.

Author: Cameron F. Abrams <cfa22@drexel.edu>
"""
import json
import logging
import os
import shutil

import yaml

import numpy as np

from ..analysis.plot import scatter
from ..core import projectfilesystem as pfs
from ..core.configuration import Configuration
from ..core.topocoord import TopoCoord
from ..external import software as software
from ..external.gromacs import mdp_get, mdp_modify, gmx_energy_trace
from ..utils.logsetup import setup_logging

logger=logging.getLogger(__name__)

[docs] class PostSimMD: """ Generic class for handling post-cure md simulations; this one just does simple NPT MD equilibration; Classes that inherit from this class should define their own default_params and build_npt """ default_params={ 'subdir': f'{pfs.Dirs.postsim}/equilibrate', 'input_top': f'{pfs.Dirs.systems_final}/final.top', 'input_gro': f'{pfs.Dirs.systems_final}/final.gro', 'input_grx': f'{pfs.Dirs.systems_final}/final.grx', 'gromacs' : { 'gmx': 'gmx', 'mdrun': 'gmx mdrun', 'options': '-quiet -nobackup', 'mdrun_single_molecule': 'gmx mdrun mdrun' }, 'ps': 1000, 'T': 300, 'P':1, 'output_deffnm': 'equilibrate', 'traces': ['Temperature','Density','Volume'], 'scatter': ('time(ps)',['Density'],'rho_v_ns.png') } def __init__(self,indict,strict=True): self.params={} for p,v in self.default_params.items(): self.params[p]=indict.get(p,v) for p,v in indict.items(): if not p in self.default_params: if strict: logger.info(f'Ignoring directive \'{p}\' in yaml input file') else: self.params[p]=v
[docs] def do(self,mdp_pfx='npt',**gromacs_dict): """Handles executing the postsim MD simulation. Args: mdp_pfx (str): filename prefix for output files, defaults to 'npt' """ p=self.params logger.info(f'do {p}') # if a gromacs dict is passed in, assume this overrides the one read in from the file if gromacs_dict: software.set_gmx_preferences(gromacs_dict) else: software.set_gmx_preferences(p['gromacs']) logger.info(f'going to {p["subdir"]}') pfs.go_to(p['subdir']) for input_file in ['input_top','input_gro','input_grx']: srcnm=os.path.join(pfs.proj(),p[input_file]) shutil.copy(srcnm,'.') local_top=os.path.basename(p['input_top']) local_gro=os.path.basename(p['input_gro']) local_grx=os.path.basename(p['input_grx']) TC=TopoCoord(topfilename=local_top,grofilename=local_gro,grxfilename=local_grx) logger.info(f'{TC.Coordinates.A.shape[0]} atoms {TC.Topology.total_mass(units="gromacs"):.2f} amu') box=TC.Coordinates.box pfs.checkout(pfs.Dirs.mdp_file('npt')) os.rename('npt.mdp',f'{mdp_pfx}.mdp') self.build_mdp(f'{mdp_pfx}.mdp',box=box) msg=TC.grompp_and_mdrun(out=p['output_deffnm'],mdp=mdp_pfx,quiet=False,mylogger=logger.info,**gromacs_dict) df=gmx_energy_trace(p['output_deffnm'],p['traces']) use_scatter=p['scatter'] temp_x=None temp_y=None scat_file=use_scatter[2] for sznm in ['Box-X','Box-Y','Box-Z']: if sznm in p['traces']: L0=box[0][0] if sznm=='Box-X' else box[1][1] if sznm=='Box-Y' else box[2][2] temp_x=f'{sznm}-strain' df[temp_x]=df[sznm]/L0-1.0 for sznm in ['Pres-XX','Pres-YY','Pres-ZZ']: if sznm in p['traces']: df[f'{sznm}-stress']=df[sznm]*(-1) temp_y=[f'{sznm}-stress'] if temp_x and temp_y: use_scatter=(temp_x,temp_y,scat_file) df.to_csv(f'{p["output_deffnm"]}.csv',header=True,index=False) scatter(df,*use_scatter) logger.info(f'Final coordinates in {p["output_deffnm"]}.gro') logger.info(f'Traces saved in {p["output_deffnm"]}.csv')
[docs] def build_mdp(self,mdpname,**kwargs): """Builds the GROMACS mdp file required for an NPT equilibration. Args: mdpname (str): name of mdp file """ params=self.params timestep=float(mdp_get(mdpname,'dt')) duration=params['ps'] nsteps=int(duration/timestep) mod_dict={ 'ref_t':params['T'], 'ref_p':params['P'], 'gen-temp':params['T'], 'gen-vel':'yes', 'nsteps':nsteps, 'tcoupl':'v-rescale','tau_t':0.5 } mdp_modify(mdpname,mod_dict)
[docs] class PostSimAnneal(PostSimMD): """ a class to handle temperature annealing MD simulation """ default_params={ 'subdir': f'{pfs.Dirs.postsim}/anneal', 'input_top': f'{pfs.Dirs.systems_final}/final.top', 'input_gro': f'{pfs.Dirs.systems_final}/final.gro', 'input_grx': f'{pfs.Dirs.systems_final}/final.grx', 'gromacs' : { 'gmx': 'gmx', 'mdrun': 'gmx mdrun', 'options': '-quiet -nobackup', 'mdrun_single_molecule': 'gmx mdrun' }, 'output_deffnm':'anneal', 'traces': ['Temperature','Density','Volume'], 'scatter': ('time(ps)',['Density'],'rho_v_ns.png'), 'T0': 300, 'T1': 600, 'ncycles': 1, 'T0_to_T1_ps': 1000, 'T1_ps': 1000, 'T1_to_T0_ps': 1000, 'T0_ps': 1000, 'P':1 }
[docs] def build_mdp(self,mdpname,**kwargs): """Builds the GROMACS mdp file required for an annealing MD simulation. Args: mdpname (str): name of mdp file """ params=self.params timestep=float(mdp_get(mdpname,'dt')) timeints=[0.0,params['T0_to_T1_ps'],params['T1_ps'],params['T1_to_T0_ps'],params['T0_ps']] timepoints=[0.0] temppoints=[params['T0'],params['T1'],params['T1'],params['T0'],params['T0']] for i in range(1,len(timeints)): timepoints.append(timepoints[-1]+timeints[i]) duration=timepoints[-1] nsteps=int(duration/timestep)*params['ncycles'] mod_dict={ 'ref_t':params['T0'], 'ref_p':params['P'], 'gen-temp':params['T0'], 'gen-vel':'yes', 'annealing-npoints':len(timepoints), 'annealing-temp':' '.join([str(x) for x in temppoints]), 'annealing-time':' '.join([str(x) for x in timepoints]), 'annealing':'periodic' if params['ncycles']>1 else 'single', 'nsteps':nsteps, 'tcoupl':'v-rescale','tau_t':0.5 } mdp_modify(mdpname,mod_dict)
[docs] class PostSimLadder(PostSimMD): """ a class to handle a temperature-ladder MD simulation """ default_params={ 'subdir': f'{pfs.Dirs.postsim}/ladder', 'input_top': f'{pfs.Dirs.systems_final}/final.top', 'input_gro': f'{pfs.Dirs.systems_final}/final.gro', 'input_grx': f'{pfs.Dirs.systems_final}/final.grx', 'gromacs' : { 'gmx': 'gmx', 'mdrun': 'gmx mdrun', 'options': '-quiet -nobackup', 'mdrun_single_molecule': 'gmx mdrun mdrun' }, 'output_deffnm':'ladder', 'traces': ['Temperature','Density','Volume'], 'scatter': ('time(ps)',['Density'],'rho_v_ns.png'), 'Tlo':300.0, 'Thi':600.0, 'deltaT':5, 'ps_per_run':1000, 'ps_per_rise':1000, 'warmup_ps':5000, 'P':1 }
[docs] def build_mdp(self,mdpname,**kwargs): """Builds the GROMACS mdp file required for a temperature-ladder MD simulation. Args: mdpname (str): name of mdp file """ params=self.params timestep=float(mdp_get(mdpname,'dt')) Tdomain=params['Thi']-params['Tlo'] nSteps=int(Tdomain/np.abs(params['deltaT']))+1 T0=params['Tlo'] if params['deltaT']>0.0 else params['Thi'] T1=params['Thi'] if params['deltaT']>0.0 else params['Tlo'] Tladder=np.linspace(T0,T1,nSteps) timepoints=[0.0,params['warmup_ps']] temppoints=[T0,T0] for i in range(nSteps): cT=Tladder[i] timepoints.append(timepoints[-1]+params['ps_per_rise']) temppoints.append(cT) timepoints.append(timepoints[-1]+params['ps_per_run']) temppoints.append(cT) duration=timepoints[-1] nMDsteps=int(duration/timestep) mod_dict={ 'ref_t':T0, 'ref_p':params['P'], 'gen-temp':T0, 'gen-vel':'yes', 'annealing-npoints':len(timepoints), 'annealing-temp':' '.join([str(x) for x in temppoints]), 'annealing-time':' '.join([str(x) for x in timepoints]), 'annealing':'single', 'nsteps':nMDsteps, 'tcoupl':'v-rescale','tau_t':0.5 } mdp_modify(mdpname,mod_dict)
[docs] class PostSimDeform(PostSimMD): """ a class to handle a uniaxial deformation MD simulation """ default_params={ 'subdir': f'{pfs.Dirs.postsim}/deform-x', 'input_top': f'{pfs.Dirs.systems_final}/final.top', 'input_gro': f'{pfs.Dirs.postsim}/equilibrate/equilibrate.gro', 'input_grx': f'{pfs.Dirs.systems_final}/final.grx', 'gromacs' : { 'gmx': 'gmx', 'mdrun': 'gmx mdrun', 'options': '-quiet -nobackup', 'mdrun_single_molecule': 'gmx mdrun mdrun' }, 'output_deffnm':'deform-x', 'traces': ['Box-X','Pres-XX'], 'scatter': ('Box-X',['Pres-XX'],'tension_v_xlength.png'), 'direction':'x', 'T':300.0, 'P':1.0, 'ps':1000, 'edot': 0.001 # strain rate in ps^-1 }
[docs] def build_mdp(self,mdpname,**kwargs): """Builds the GROMACS mdp file required for a uniaxial deformation MD simulation. Args: mdpname (str): name of mdp file """ params=self.params timestep=float(mdp_get(mdpname,'dt')) duration=params['ps'] nsteps=int(duration/timestep) box=kwargs.get('box',np.array([[0.0,0.0,0.0],[0.0,0.0,0.0],[0.0,0.0,0.0]])) edot=params.get('edot',0.0) direction=params.get('direction','x') mod_dict={ 'ref_t':params['T'], 'ref_p':params['P'], 'gen-temp':params['T'], 'gen-vel':'yes', 'tcoupl':'v-rescale', 'nsteps': nsteps, 'rlist': 1.2, 'rcoulomb': 1.2, 'rvdw': 1.2, 'tau_t':0.5, 'tau_p':1.0, 'refcoord_scaling': 'com', 'pcoupltype': 'anisotropic' } if direction=='x': strain_vel=box[0][0]*edot mod_dict['ref_p']='0.0 1.0 1.0 0 0 0' mod_dict['compressibility']='0.0 4.5e-5 4.5e-5 0 0 0' mod_dict['deform']=f'{strain_vel:.3e} 0 0 0 0 0' params['output_deffnm'] = 'deform-x' params['traces']=['Box-X','Pres-XX'] params['scatter']=('Box-X',['Pres-XX'],'tension_v_xlength.png') elif direction=='y': strain_vel=box[1][1]*edot mod_dict['ref_p']='1.0 0.0 1.0 0 0 0' mod_dict['compressibility']='4.5e-5 0.0 4.5e-5 0 0 0' mod_dict['deform']=f'0 {strain_vel:.3e} 0 0 0 0' params['output_deffnm'] = 'deform-y' params['traces']=['Box-Y','Pres-YY'] params['scatter']=('Box-Y',['Pres-YY'],'tension_v_ylength.png') elif direction=='z': strain_vel=box[2][2]*edot mod_dict['ref_p']='1.0 1.0 0.0 0 0 0' mod_dict['compressibility']='4.5e-5 4.5e-5 0.0 0 0 0' mod_dict['deform']=f'0 0 {strain_vel:.3e} 0 0 0' params['output_deffnm'] = 'deform-z' params['traces']=['Box-Z','Pres-ZZ'] params['scatter']=('Box-Z',['Pres-ZZ'],'tension_v_zlength.png') else: logger.error(f'Bad direction for uniaxial strain {direction}') mdp_modify(mdpname,mod_dict)
[docs] class PostsimConfiguration: """ handles reading and parsing a postcure simulation input config file. Config file format - { key1: {<paramdict>}} - { key2: {<paramdict>}} ... The config file is a list of single-element dictionaries, whose single keyword indicates the type of MD simulation to be run; simulations are run in the order they appear in the config file. Currently allowed simulation types: - 'equilibrate': simple NPT equilibration; - 'anneal': simple simulated annealing; - 'ladder': temperature ladder; - 'deform: constant strain-rate uniaxial deformation; """ default_classes={'equilibrate':PostSimMD,'anneal':PostSimAnneal,'ladder':PostSimLadder,'deform':PostSimDeform} def __init__(self): self.cfgFile='' self.baselist=[] self.stagelist=[]
[docs] @classmethod def read(cls,filename,parse=True,**kwargs): """Generates a new PostsimConfiguration object by reading in the JSON or YAML file indicated by filename. Args: filename (str): name of file from which to read new PostsimConfiguration object parse (bool): if True, parse the input configuration file, defaults to True Raises: Exception: if extension of filename is not '.json' or '.yaml' or '.yml' Returns: PostsimConfiguration: a new PostsimConfiguration object """ basename,extension=os.path.splitext(filename) if extension=='.json': return cls._read_json(filename,parse,**kwargs) elif extension=='.yaml' or extension=='.yml': return cls._read_yaml(filename,parse,**kwargs) else: raise Exception(f'Unknown config file extension {extension}')
@classmethod def _read_json(cls,filename,parse=True,**kwargs): """Creates a new PostsimConfiguration object by reading from JSON input. Args: filename (str): name of JSON file parse (bool): if True, parse the JSON data, defaults to True Returns: PostsimConfiguration: a new PostsimConfiguration object """ inst=cls() inst.cfgFile=filename with open(filename,'r') as f: inst.baselist=json.load(f) assert type(inst.baselist)==list,f'Poorly formatted {filename}' if parse: inst.parse(**kwargs) return inst @classmethod def _read_yaml(cls,filename,parse=True,**kwargs): """Creates a new PostsimConfiguration object by reading from YAML input. Args: filename (str): name of YAML file parse (bool): if True, parse the YAML data, defaults to True Returns: PostsimConfiguration: a new PostsimConfiguration object """ inst=cls() inst.cfgFile=filename with open(filename,'r') as f: inst.baselist=yaml.safe_load(f) assert type(inst.baselist)==list,f'Poorly formatted {filename}' if parse: inst.parse(**kwargs) return inst
[docs] def parse(self,**kwargs): """Parses a PostsimConfiguration file to build the list of stages to run.""" for p in self.baselist: assert len(p)==1,f'Poorly formatted {self.cfgFile}; each stanza may have only one keyword' simtype=list(p.keys())[0] assert simtype in self.default_classes,f'Simulation type "{simtype}" in {self.cfgFile} not understood.' logger.info(f'passing in {p[simtype]}') self.stagelist.append(self.default_classes[simtype](p[simtype]))
[docs] def postsim(args): """Handles the postsim subcommand for managing post-cure production MD simulations. Args: args (argparse.Namespace): command-line arguments """ setup_logging(args.loglevel, no_banner=args.no_banner) ess='y' if len(args.proj)==0 else 'ies' ogromacs={} if args.ocfg: ocfg=Configuration.read(args.ocfg) ogromacs=ocfg.gromacs cfg=PostsimConfiguration.read(args.cfg) logger.debug(f'{cfg.baselist}') logger.info(f'Project director{ess}: {args.proj}') software.sw_setup() logger.debug(f'ogromacs {ogromacs}') for d in args.proj: pfs.pfs_setup(root=os.getcwd(),topdirs=pfs.Dirs.postsim_topdirs,verbose=True,projdir=d,reProject=False,userlibrary=args.lib) pfs.go_to(pfs.Dirs.postsim) for stage in cfg.stagelist: stage.do(mdp_pfx='local',**ogromacs) pfs.go_root()