"""Manages execution of the CURE algorithm.
Author: Cameron F. Abrams <cfa22@drexel.edu>
"""
import logging
import os
from enum import Enum
from functools import partial
from itertools import product
from multiprocessing import Pool
import yaml
import numpy as np
import pandas as pd
from ..analysis.plot import trace
from ..core import projectfilesystem as pfs
from ..core.molecule import MoleculeDict
from ..core.topocoord import TopoCoord, BTRC
from ..cure.reaction import ReactionList
from ..cure.reaction import reaction_stage
from ..external.gromacs import gromacs_distance, mdp_modify
from ..utils import checkpoint as cp
from ..utils.stringthings import my_logger
logger = logging.getLogger(__name__)
[docs]
class cure_step(Enum):
"""Enumerated CURE step
"""
cure_bondsearch=0
cure_drag=1
cure_update=2
cure_relax=3
cure_equilibrate=4
cap_bondsearch=5
cap_update=6 # dragging is not needed/allowed in capping since these are likely intramolecular bonds...
cap_relax=7
cap_equilibrate=8
finished=9
unknown=99
def __str__(self):
return self.name
[docs]
def basename(self):
name=str(self)
if 'cap_' in name:
return name[len('cap_'):]
if 'cure_' in name:
return name[len('cure_'):]
[docs]
class CureState:
def __init__(self):
"""Generates a new, initialized CureState object.
"""
self.iter=0
self.max_nxlinkbonds=0
self.cum_nxlinkbonds=0
self.desired_nxlinkbonds=0
self.max_search_radius=0.0
self.max_radidx=0
self.step=cure_step.unknown
self.curr_nxlinkbonds=0
self.current_stage={}
self.current_stage['drag']=0
self.current_stage['relax']=0
self.current_radidx=0
self.current_radius=0.0
self.bonds_file=''
def _to_yaml(self,filename='../cure_state.yaml'):
"""Writes CureState object to YAML file.
Args:
filename (str): name of output YAML file, defaults to '../cure_state.yaml'
"""
with open(filename,'w') as f:
f.write(yaml.dump(self))
[docs]
@classmethod
def from_yaml(cls,filename='cure_state.yaml'):
"""Returns a new CureState object generated by reading in the YAML file.
Args:
filename (str): input file name, defaults to 'cure_state.yaml'
Returns:
CureState: new CureState object returned by yaml.load
"""
with open(filename,'r') as f:
yaml_string=f.read()
# FullLoader was tightened in recent PyYAML versions to reject Python
# object tags; use yaml.Loader to round-trip yaml.dump(self) output.
return yaml.load(yaml_string,Loader=yaml.Loader)
[docs]
def reset(self):
"""Resets this CureState object to begin a new CURE iteration.
"""
self.iter=1
self.step=cure_step.cure_bondsearch
self.curr_nxlinkbonds=0
self.current_stage['drag']=0
self.current_stage['relax']=0
self.current_radidx=0
self.current_radius=0.0
self.bonds_file=''
[docs]
class CureController:
default_equilibration_sequence = [ { 'ensemble': 'min' },
{ 'ensemble': 'nvt', 'temperature': 600, 'nsteps': 1000 },
{ 'ensemble': 'npt', 'temperature': 600, 'pressure': 1, 'nsteps': 2000 }
]
curedict_defaults={
'output': {
'bonds_file': 'bonds.csv'
},
'controls': {
'max_conversion_per_iteration': 1.0,
'search_radius': 0.5,
'radial_increment': 0.05,
'late_threshold': 0.85,
'max_iterations': 100,
'desired_conversion': 0.5,
'min_allowable_bondcycle_length':-1, # not set
'min_bonds_per_iteration': 10, # grow radius until >= this many bonds (or max radius)
'ncpu' : os.cpu_count()
},
'drag': {
'limit': 0.0,
'kb': 300000.0, # kJ/mol/nm
'trigger_distance': 0.0,
'nstages': 0,
'increment': 0.0,
'cutoff_pad': 0.2,
'equilibration': default_equilibration_sequence
},
'relax': {
'nstages': 6,
'increment': 0.0,
'cutoff_pad': 0.2,
'equilibration': default_equilibration_sequence
},
'equilibrate': {
'temperature': 300,
'pressure': 1,
'nsteps': 50000,
'ensemble': 'npt'
},
'gromacs': {
'rdefault': 0.9
}
}
def __init__(self,curedict={}):
"""Generates a new CureController object populated from directives in curedict.
Args:
curedict (dict): dictionary of cure directives, defaults to {}
"""
self.state=CureState()
self.bonds_df:pd.DataFrame=None
self.bonds_are='nonexistent!'
self.search_failed=False
self.dicts={}
for k,v in self.curedict_defaults.items():
self.dicts[k]=curedict.get(k,v) # loads entire default if dict is just missing
if type(v)==dict: # assign subitem defaults if their keys are missing
for kk,vv in v.items():
if not kk in self.dicts[k]:
self.dicts[k][kk]=vv
self.chain_manager=None
self.dragging_enabled=False
d=self.dicts['drag']
if (d['nstages']>0 or d['increment']>0.0) and d['limit']>0.0:
self.dragging_enabled=True
[docs]
def setup(self,max_nxlinkbonds=0,desired_nxlinkbonds=0,max_search_radius=0.0):
"""Sets up this CureController object.
Args:
max_nxlinkbonds (int): maximum allowable number of bonds, defaults to 0
desired_nxlinkbonds (int): desired number of bonds, defaults to 0
max_search_radius (float): maximum search radius (nm), defaults to 0.0
"""
st=self.state
st.max_nxlinkbonds=max_nxlinkbonds
st.desired_nxlinkbonds=desired_nxlinkbonds
st.max_search_radius=max_search_radius
d=self.dicts['controls']
st.max_radidx=int((st.max_search_radius-d['search_radius'])/d['radial_increment'])
[docs]
@cp.enableCheckpoint
def do_iter(self,TC:TopoCoord,RL:ReactionList,MD:MoleculeDict,gromacs_dict={}):
"""Performs one CURE iteration.
Args:
TC (TopoCoord): TopoCoord object; all topological and coordinate information is in here
RL (ReactionList): list of all reaction types
MD (MoleculeDict): dictionary of all molecules, keyed by name
gromacs_dict (dict): dictionary of custom gromacs parameters, defaults to {}
Returns:
dict: dictionary of resulting output file names
"""
my_logger(f'Iteration {self.state.iter} begins',logger.info,fill='~')
TC.grab_files() # copy files locally
self.state.step=cure_step.cure_bondsearch
self._do_bondsearch(TC,RL,MD)
self._do_preupdate_dragging(TC,gromacs_dict)
self._do_topology_update(TC,MD)
self._do_relax(TC,gromacs_dict)
self._do_equilibrate(TC,gromacs_dict)
self.state.cum_nxlinkbonds+=self.bonds_df.shape[0]
logger.info(f'Iteration {self.state.iter} current conversion {self._curr_conversion():.3f} or {self.state.cum_nxlinkbonds} bonds')
return {c:os.path.basename(x) for c,x in TC.files.items() if c!='mol2'}
[docs]
@cp.enableCheckpoint
def do_capping(self,TC:TopoCoord,RL:ReactionList,MD:MoleculeDict,gromacs_dict={}):
"""Manages generation of all capping bonds.
Args:
TC (TopoCoord): TopoCoord object containing all topology and coordinate information
RL (ReactionList): list of all reaction types
MD (MoleculeDict): dictionary of all molecule information
gromacs_dict (dict): dictionary of custom gromacs parameters, defaults to {}
Returns:
dict: dictionary of resulting output files, keyed by extension ('gro', 'top', 'grx')
"""
self._do_cap_bondsearch(TC,RL,MD)
self._do_topology_update(TC,MD)
self._do_relax(TC)
self._do_equilibrate(TC,gromacs_dict)
return {c:os.path.basename(x) for c,x in TC.files.items() if c!='mol2'}
[docs]
def is_cured(self):
"""Returns True if system is cured.
Returns:
bool: True if system is cured; False otherwise
"""
st=self.state
logger.debug(f'search_failed? {self.search_failed}')
logger.debug(f'cumxlinks {st.cum_nxlinkbonds} maxxlinks {st.max_nxlinkbonds}: {(st.cum_nxlinkbonds>=st.max_nxlinkbonds)}')
finished=self.search_failed or (st.cum_nxlinkbonds>=st.desired_nxlinkbonds)
if finished:
st.step=cure_step.cap_bondsearch
return finished
def _curr_conversion(self):
"""Returns current conversion expressed as a fraction of the maximum number of possible bonds.
Returns:
float: conversion
"""
if not self.state.max_nxlinkbonds: return 0
return float(self.state.cum_nxlinkbonds)/self.state.max_nxlinkbonds
[docs]
def reset(self):
"""Resets this CureController object by resetting its CureState, its bonds dataframe, and its search_failed member.
"""
self.state.reset()
self.bonds_df=pd.DataFrame()
self.bonds_are='nonexistent!'
self.search_failed=False
[docs]
def next_iter(self):
"""Resets this CureController for the next CURE iteration.
Returns:
bool: True if new iteration index exceeds the maximum number of iterations allowed by runtime
"""
i=self.state.iter+1
self.reset()
self.state.iter=i
return self.state.iter>=self.dicts['controls']['max_iterations']
def _do_bondsearch(self,TC:TopoCoord,RL:ReactionList,MD:MoleculeDict,reentry=False):
"""Manages the search for new bonds.
Args:
TC (TopoCoord): TopoCoord object containing all topology and coordinate information
RL (ReactionList): list of all reaction types
MD (MoleculeDict): dictionary of all molecular templates, keyed by name
reentry (bool): True if we are restarting a bondsearch in this directory, defaults to False
"""
if self.state.step!=cure_step.cure_bondsearch: return
opfx=self._pfx()
d=self.dicts['controls']
TC.min_bondcycle_length=d['min_allowable_bondcycle_length']
self.state.current_radius=d['search_radius']+self.state.current_radidx*d['radial_increment']
logger.info(f'Bond search using radius {self.state.current_radius} nm initiated')
curr_conversion=self._curr_conversion()
apply_probabilities=curr_conversion<d['late_threshold']
bond_limit=int(d['max_conversion_per_iteration']*self.state.max_nxlinkbonds)
bond_target=int((d['desired_conversion']-curr_conversion)*self.state.max_nxlinkbonds)
bond_limit=min([bond_limit,bond_target])
logger.debug(f'Iteration limited to at most {bond_limit} new bonds')
# Minimum bonds to find before we stop growing the search radius.
# Clamped against bond_target (don't demand more than physically
# possible) and bond_limit (don't demand more than we'd accept).
# If we hit max radius with fewer than this many but at least one,
# we still proceed -- the post-loop branch handles that.
min_floor=max(1,min(int(d.get('min_bonds_per_iteration',1)),bond_target,bond_limit))
logger.debug(f'Iteration will grow radius until >= {min_floor} bonds (or max radius)')
nbdf=pd.DataFrame()
nbonds=0
while nbonds<min_floor and self.state.current_radidx<self.state.max_radidx:
nbdf=self._searchbonds(TC,RL,MD,stage=reaction_stage.cure,abs_max=bond_limit,apply_probabilities=apply_probabilities,reentry=reentry)
nbonds=nbdf.shape[0]
logger.debug(f'Search at radius {self.state.current_radius:.3f} nm found {nbonds} bond(s) (floor {min_floor})')
if nbonds<min_floor and self.state.current_radidx<self.state.max_radidx:
self.state.current_radidx+=1
self.state.current_radius+=d['radial_increment']
logger.info(f'Radius increased to {self.state.current_radius} nm ({nbonds}/{min_floor} eligible bonds so far)')
if nbonds>0:
ess='' if nbonds==1 else 's'
logger.info(f'Iteration {self.state.iter} will generate {nbdf.shape[0]} new bond{ess}')
pairs=pd.DataFrame() # empty placeholder
''' compute all lengths '''
TC.add_length_attribute(nbdf,attr_name='initial_distance')
''' register all new bonds '''
self._register_bonds(nbdf,pairs,f'{opfx}-bonds.csv',bonds_are='identified')
''' declare intention to go to dragging or topology-update '''
self.state.step=cure_step.cure_drag if self.dragging_enabled else cure_step.cure_update
else:
self.search_failed=True
''' declare intention to proceed to bond capping '''
self.state.step=cure_step.cap_bondsearch
self.state._to_yaml()
logger.debug(f'next: {self.state.step}')
def _do_preupdate_dragging(self,TC:TopoCoord,gromacs_dict={}):
"""Manages the preupdate dragging CURE step.
Args:
TC (TopoCoord): the global system topology and coordinates
gromacs_dict (dict): dictionary of optional gromacs directives, defaults to {}
"""
if self.state.step!=cure_step.cure_drag: return
nbdf=self.bonds_df
assert nbdf.shape[0]>0
d=self.dicts['drag']
nogos=[not self.dragging_enabled,nbdf['initial_distance'].max()<d['trigger_distance']]
logger.debug(f'{nogos} {any(nogos)}')
if any(nogos):
logger.debug(f'No dragging is necessary')
else:
self._distance_attenuation(TC,mode='drag',gromacs_dict=gromacs_dict)
self.state.step=cure_step.cure_update
self.state._to_yaml()
logger.debug(f'next: {self.state.step}')
def _do_relax(self,TC:TopoCoord,gromacs_dict={}):
"""Manages relaxation steps within the CURE algorithm.
Args:
TC (TopoCoord): global topology and coordinates
gromacs_dict (dict): dictionary of optional gromacs directives, defaults to {}
"""
if self.state.step!=cure_step.cure_relax and self.state.step!=cure_step.cap_relax: return
nbdf=self.bonds_df
if nbdf.shape[0]==0: # no bonds identified
if self.state.step==cure_step.cure_relax:
self.state.step=cure_step.cap_bondsearch # drop out of CURE loop
else:
self.state.step=cure_step.finished
return
self._distance_attenuation(TC,mode='relax',gromacs_dict=gromacs_dict)
if self.state.step==cure_step.cure_relax:
self.state.step=cure_step.cure_equilibrate
else:
self.state.step=cure_step.cap_equilibrate
self.state._to_yaml()
logger.debug(f'next: {self.state.step}')
def _do_topology_update(self,TC:TopoCoord,MD:MoleculeDict):
"""Manages the topology update in CURE.
Args:
TC (TopoCoord): global system topology and coordinates
MD (MoleculeDict): dictionary of all molecular templates
"""
if self.state.step!=cure_step.cure_update and self.state.step!=cure_step.cap_update: return
opfx=self._pfx()
logger.debug(f'Topology update')
bonds_df,pairs_df=TC.update_topology_and_coordinates(self.bonds_df,template_dict=MD,write_mapper_to=f'{opfx}-idx-mapper.csv',chain_manager=self.chain_manager)
TC.add_length_attribute(bonds_df,attr_name='initial_distance')
TC.add_length_attribute(pairs_df,attr_name='initial_distance')
self._register_bonds(bonds_df,pairs_df,f'{opfx}-bonds.csv',bonds_are='unrelaxed')
TC.write_gro(f'{opfx}.gro')
TC.write_top(f'{opfx}.top')
TC.write_tpx(f'{opfx}.tpx')
TC.write_grx_attributes(f'{opfx}.grx')
if self.state.step==cure_step.cure_update:
self.state.step=cure_step.cure_relax
else:
self.state.step=cure_step.cap_relax
self.state._to_yaml()
def _do_equilibrate(self,TC:TopoCoord,gromacs_dict={}):
"""Manages pre- and post-cure equilibration runs.
Args:
TC (TopoCoord): global system topology and coordinates
gromacs_dict (dict): dictionary of optional gromacs directives, defaults to {}
"""
if self.state.step!=cure_step.cure_equilibrate and self.state.step!=cure_step.cap_equilibrate: return
d=self.dicts['equilibrate']
opfx=self._pfx()
edr_list=TC.equilibrate(deffnm=f'{opfx}',edict=d,gromacs_dict=gromacs_dict) #,plot_pfx=f'iter-{self.state.iter}-{str(self.state.step)}')
ens=d['ensemble']
if ens=='npt':
plot_pfx=f'iter-{self.state.iter}-{str(self.state.step)}'
trace('Density',edr_list,outfile=os.path.join(pfs.proj(),f'plots/{plot_pfx}-density.png'))
if self.state.step==cure_step.cure_equilibrate:
# go to next iteration -- this whole method is skipped if nbonds==0 in relax
self.state.step=cure_step.cure_bondsearch # if not self.search_failed else state.cap_bondsearch
elif self.state.step==cure_step.cap_equilibrate:
self.state.step=cure_step.finished
self.state._to_yaml()
def _do_cap_bondsearch(self,TC:TopoCoord,RL:ReactionList,MD:MoleculeDict):
"""Manages the post-cure capping reaction bond search.
Args:
TC (TopoCoord): global system topology and coordinates
RL (ReactionList): list of all reactions
MD (MoleculeDict): dictionary of all molecular templates
"""
if self.state.step!=cure_step.cap_bondsearch: return
opfx=self._pfx()
# multi: nbdf=self.make_cadidates(...,stage='cap')
nbdf=self._searchbonds(TC,RL,MD,stage=reaction_stage.cap)
nbonds=nbdf.shape[0]
ess='' if nbonds==1 else 's'
logger.info(f'Capping will generate {nbdf.shape[0]} new bond{ess}')
if nbonds>0:
cwd=pfs.go_to(pfs.Dirs.systems_capping)
pairs=pd.DataFrame() # empty placeholder
TC.add_length_attribute(nbdf,attr_name='initial_distance')
self._register_bonds(nbdf,pairs,f'{opfx}-bonds.csv',bonds_are='identified')
self.state.step=cure_step.cap_update
else:
self.state.step=cure_step.finished
self.state._to_yaml()
def _write_bonds_df(self,bondsfile='bonds.csv'):
"""Writes all bonds in the bonds dataframe to a CSV file.
Args:
bondsfile (str): name of csv file to write, defaults to 'bonds.csv'
"""
self.bonds_df.to_csv(bondsfile,sep=' ',mode='w',index=False,header=True,doublequote=False)
def _read_bonds_df(self,bonds_file_override=''):
"""Reads bonds into bonds dataframe from a CSV file.
Args:
bonds_file_override (str): name of file to read from (overrides default), defaults to ''
"""
infile=self.dicts['output']['bonds_file'] if not bonds_file_override else bonds_file_override
assert os.path.exists(infile),f'Error: {infile} not found'
self.bonds_df=pd.read_csv(infile,sep=r'\s+',header=0)
self.dicts['output']['bonds_file']=os.path.abspath(infile)
def _register_bonds(self,bonds,pairs,bonds_file,bonds_are='unrelaxed'):
"""Registers the bonds and 1-4 pairs from the input parameters.
Args:
bonds (pd.DataFrame): dataframe of bonds
pairs (pd.DataFrame): dataframe of 1-4 pairs
bonds_file (str): file to which bonds are written
bonds_are (str): a little flag to remind us the state of these bonds, defaults to 'unrelaxed'
"""
self.bonds_df=bonds
self.pairs_df=pairs
self.bonds_are=bonds_are
self.state.bonds_file=bonds_file
self._write_bonds_df(bonds_file)
self.dicts['output']['bonds_file']=os.path.abspath(bonds_file)
def _pfx(self):
"""Generates a little string that can be used as a state-specific filename prefix.
Returns:
str: the little string
"""
return f'{self.state.step.value}-{self.state.step}'
def _distance_attenuation(self,TC:TopoCoord,mode='drag',gromacs_dict={}):
"""Manages the progressive attenuation of bond parameters for new bonds.
Args:
TC (TopoCoord): global system topology and coordinates
mode (str): string indicating whether this is prebonding or postbonding attenuation; defaults to 'drag' (prebonding); other option is 'relax' (postbonding)
gromacs_dict (dict): dictionary of optional gromacs directives, defaults to {}
"""
assert mode in ['drag','relax']
statename=self.state.step.basename()
opfx=self._pfx()
nbdf=self.bonds_df.copy()
pdf=self.pairs_df.copy()
d=self.dicts[mode]
nbdf['current_lengths']=nbdf['initial_distance'].copy()
maxL,minL,meanL=nbdf['current_lengths'].max(),nbdf['current_lengths'].min(),nbdf['current_lengths'].mean()
logger.debug(f'Lengths avg/min/max: {meanL:.3f}/{minL:.3f}/{maxL:.3f}')
ess='' if nbdf.shape[0]==1 else 's'
logger.info(f'Step "{str(self.state.step)}" initiated on {nbdf.shape[0]} distance{ess} (max {maxL:.3f} nm)')
roptions=[self.dicts['gromacs']['rdefault'],maxL]
if mode=='drag':
TC.add_restraints(nbdf,typ=6)
logger.info(' Stage Max-distance (nm)')
else:
pdf['current_lengths']=pdf['initial_distance'].copy()
pmaxL,pminL,pmeanL=pdf['current_lengths'].max(),pdf['current_lengths'].min(),pdf['current_lengths'].mean()
logger.debug(f'1-4 distances lengths avg/min/max: {pmeanL:.3f}/{pminL:.3f}/{pmaxL:.3f}')
roptions.append(pmaxL)
logger.info(' Stage Max-distance (nm) Max-1-4-distance (nm)')
rcommon=max(roptions)
for stg_dict in d['equilibration']:
ensemble=stg_dict['ensemble']
impfx=f'{statename}-{ensemble}' # e.g., drag-min, drag-nvt, drag-npt
pfs.checkout(pfs.Dirs.mdp_file(impfx))
mdp_mods_dict={'rvdw':rcommon,'rcoulomb':rcommon,'rlist':rcommon}
if ensemble!='min':
mdp_mods_dict.update({'gen-temp':stg_dict['temperature'],'ref_t':stg_dict['temperature'],'gen-vel':'yes','nsteps':stg_dict['nsteps']})
if ensemble=='npt':
mdp_mods_dict['ref_p']=stg_dict['pressure']
mdp_modify(f'{impfx}.mdp',mdp_mods_dict,add_if_missing=(ensemble!='min'))
this_nstages=int(maxL/d['increment'])
this_firststage=self.state.current_stage[mode]
logger.debug(f'{self.state.step} {this_nstages} {this_firststage}')
saveT=TC.Topology.copy_bond_parameters(self.bonds_df)
for i in range(this_firststage,this_nstages):
self.state.current_stage[mode]=i
if mode=='drag':
TC.Topology.attenuate_bond_parameters(self.bonds_df,i,this_nstages,minimum_distance=d['limit'],init_colname='initial_distance')
else:
TC.Topology.attenuate_bond_parameters(self.bonds_df,i,this_nstages,init_colname='initial_distance')
stagepfx=f'{opfx}-stage-{i+1}'
TC.write_top(f'{stagepfx}.top')
for stg_dict in d['equilibration']:
ensemble=stg_dict['ensemble']
impfx=f'{statename}-{ensemble}' # e.g., drag-min, drag-nvt, drag-npt
TC.grompp_and_mdrun(out=f'{stagepfx}-{ensemble}',mdp=f'{impfx}',**gromacs_dict)
# logger.debug(f'{TC.files["gro"]}')
TC.Topology.restore_bond_parameters(saveT)
TC.add_length_attribute(nbdf,attr_name='current_lengths')
maxL,minL,meanL=nbdf['current_lengths'].max(),nbdf['current_lengths'].min(),nbdf['current_lengths'].mean()
logger.debug(f'Distances avg/min/max: {meanL:.3f}/{minL:.3f}/{maxL:.3f}')
if mode=='drag':
logger.info(f'{i+1:>10d} {maxL:>17.3f}')
else:
TC.add_length_attribute(pdf,attr_name='current_lengths')
pmaxL,pminL,pmeanL=pdf['current_lengths'].max(),pdf['current_lengths'].min(),pdf['current_lengths'].mean()
logger.debug(f'1-4 distances lengths avg/min/max: {pmeanL:.3f}/{pminL:.3f}/{pmaxL:.3f}')
logger.info(f'{i+1:>10d} {maxL:>17.3f} {pmaxL:>21.3f}')
self.state._to_yaml()
if mode=='drag':
TC.Topology.remove_restraints(self.bonds_df)
TC.write_top(f'{opfx}-complete.top')
self._register_bonds(nbdf,pdf,f'{opfx}-{mode}-bonds.csv',bonds_are=('relaxed' if mode=='relax' else 'dragged'))
self.state.current_stage[mode]=0
self.state._to_yaml()
# TODO: move to searchbonds.by; split into make_candidates() and apply_filters()
def _searchbonds(self,TC:TopoCoord,RL:ReactionList,MD:MoleculeDict,stage=reaction_stage.cure,abs_max=0,apply_probabilities=False,reentry=False):
"""Manages the search for bonds.
Args:
TC (TopoCoord): global system topology and coordinates
RL (ReactionList): list of all reactions
MD (MoleculeDict): dictionary of all molecular templates
stage (reaction_stage): which reaction stage, defaults to reaction_stage.cure, other choice is reaction_stage.cap
abs_max (int): upper limit to number of new bonds allowed, defaults to 0, signifying no limit
apply_probabilities (bool): if true, bond probabilities are applied, defaults to False
reentry (bool): if true, existing linkcell data stored on disk is used, defaults to False
Returns:
pd.DataFrame: the dataframe of proposed bonds
"""
adf=TC.gro_DataFrame('atoms')
gro=TC.files['gro']
ncpu=self.dicts['controls']['ncpu']
if stage==reaction_stage.cure:
TC.linkcell_initialize(self.state.current_radius,force_repopulate=reentry)
raset=adf[adf['z']>0] # this view will be used for downselecting to potential A-B partners
bdf=pd.DataFrame()
Rlist=[x for x in RL if (x.stage==stage and x.probability>0.0)]
logger.debug(f'reactioncount {len(Rlist)} atomscount {raset.shape[0]}')
for R in Rlist:
logger.debug(f'Reaction {R.name} with {len(R.bonds)} bond(s)')
prob=R.probability
for bond in R.bonds:
A=R.atoms[bond['atoms'][0]]
B=R.atoms[bond['atoms'][1]]
order=bond['order']
aname=A['atom']
areactantname_template=R.reactants[A['reactant']]
aresid_template=A['resid']
aresname=MD[areactantname_template].get_resname(aresid_template)
az=A['z']
bname=B['atom']
breactantname_template=R.reactants[B['reactant']]
bresid_template=B['resid']
bresname=MD[breactantname_template].get_resname(bresid_template)
bz=B['z']
if stage==reaction_stage.cap:
assert areactantname_template==breactantname_template,f'Error: capping reaction {R.name} lists a bond whose atoms are in different reactants'
assert aresname==bresname,f'Error: capping reaction {R.name} lists a bond whose atoms are in different residues'
Aset=raset[(raset['atomName']==aname)&(raset['resName']==aresname)&(raset['z']==az)&(raset['reactantName']==areactantname_template)]
Bset=raset[(raset['atomName']==bname)&(raset['resName']==bresname)&(raset['z']==bz)&(raset['reactantName']==breactantname_template)]
logger.debug(f'Aset {Aset.shape[0]} atoms')
logger.debug(f'Bset {Bset.shape[0]} atoms')
alist=list(zip(Aset['globalIdx'].to_list(),Aset['resNum'].to_list(),Aset['molecule'].to_list()))
blist=list(zip(Bset['globalIdx'].to_list(),Bset['resNum'].to_list(),Bset['molecule'].to_list()))
all_possible_pairs=list(product(alist,blist))
idf=pd.DataFrame({'ai': [int(x[0][0]) for x in all_possible_pairs],
'ri': [int(x[0][1]) for x in all_possible_pairs],
'mi': [int(x[0][2]) for x in all_possible_pairs],
'aj': [int(x[1][0]) for x in all_possible_pairs],
'rj': [int(x[1][1]) for x in all_possible_pairs],
'mj': [int(x[1][2]) for x in all_possible_pairs],
'prob': [prob for _ in all_possible_pairs],
'reactantName': [R.product for _ in all_possible_pairs],
'order': [order for _ in all_possible_pairs]})
if stage==reaction_stage.cure:
# exclude atom pairs that have same resid or molid
idf=idf[(idf['ri']!=idf['rj'])&(idf['mi']!=idf['mj'])].copy()
logger.debug(f'Examining {idf.shape[0]} bond-candidates of order {order}')
if idf.shape[0]>0:
ess='' if idf.shape[0]!=1 else 's'
bondtestoutcomes={k:0 for k in BTRC}
p=Pool(processes=ncpu)
idf_split=[idf.iloc[s] for s in np.array_split(np.arange(len(idf)),ncpu)]
packets=[(i,idf_split[i]) for i in range(ncpu)]
logger.debug(f'Decomposed dataframe lengths: {", ".join([str(x.shape[0]) for x in idf_split])}')
results=p.map(partial(gromacs_distance,gro=gro,new_column_name='r'),packets)
p.close()
p.join()
# reassemble final dataframe:
# logger.debug(f'Checking dataframe lengths: {", ".join([str(x.shape[0]) for x in results])}')
idf=pd.concat(results,ignore_index=True)
# gromacs_distance(idf,gro,new_column_name='r') # use "gmx distance" to very quickly get all lengths
logger.debug(f'{idf.shape[0]} bond-candidate length{ess} avg/min/max: {idf["r"].mean():0.3f}/{idf["r"].min():0.3f}/{idf["r"].max():0.3f}')
idf=idf[idf['r']<self.state.current_radius].copy().reset_index(drop=True)
ess='' if idf.shape[0]!=1 else 's'
logger.debug(f'{idf.shape[0]} bond-candidate{ess} with lengths below {self.state.current_radius} nm')
p=Pool(processes=ncpu)
idf_split=[idf.iloc[s] for s in np.array_split(np.arange(len(idf)),ncpu)]
# logger.debug(f'Decomposed dataframe lengths: {", ".join([str(x.shape[0]) for x in idf_split])}')
results=p.map(partial(TC.bondtest_df),idf_split)
p.close()
p.join()
# reassemble final dataframe:
# logger.debug(f'Checking dataframe lengths: {", ".join([str(x.shape[0]) for x in results])}')
idf=pd.concat(results,ignore_index=True)
if not idf.empty:
logger.debug(f'Bond-candidate test outcomes:')
for k in bondtestoutcomes:
bondtestoutcomes[k]=idf[idf['result']==k].shape[0]
logger.debug(f' {str(k)}: {bondtestoutcomes[k]}')
idf=idf[idf['result']==BTRC.passed].copy()
elif stage==reaction_stage.cap:
idf=idf[idf['ri']==idf['rj']].copy()
logger.debug(f'Examining {idf.shape[0]} bond-candidates of order {order}')
if not idf.empty:
bdf=pd.concat((bdf,idf),ignore_index=True)
# logger.debug('Filtered (by single-bond tests) bonds:')
# for ln in bdf.to_string().split('\n'):
# logger.debug(ln)
if stage==reaction_stage.cure and bdf.shape[0]>0:
bdf=bdf.sort_values('r',axis=0,ignore_index=True).reset_index(drop=True)
bdf['allowed']=[True for x in range(bdf.shape[0])]
unique_atomidx=set(bdf.ai.to_list()).union(set(bdf.aj.to_list()))
unique_resids=set(bdf.ri.to_list()).union(set(bdf.rj.to_list()))
for i,r in bdf.iterrows():
if r.ai in unique_atomidx:
unique_atomidx.remove(r.ai)
else:
# logger.debug(f'Disallowing bond {i} due to repeated atom index {r.ai}')
bdf.loc[i,'allowed']=False
if r.aj in unique_atomidx:
unique_atomidx.remove(r.aj)
else:
# logger.debug(f'Disallowing bond {i} due to repeated atom index {r.aj}')
bdf.loc[i,'allowed']=False
if r.ri in unique_resids:
unique_resids.remove(r.ri)
else:
# logger.debug(f'Disallowing bond {i} due to repeated residue index {r.ri}')
bdf.loc[i,'allowed']=False
if r.rj in unique_resids:
unique_resids.remove(r.rj)
else:
# logger.debug(f'Disallowing bond {i} due to repeated residue index {r.rj}')
bdf.loc[i,'allowed']=False
logger.debug(f'{bdf[bdf["allowed"]==False].shape[0]} out of {bdf.shape[0]} bonds disallowed due to repeated atom indexes or residue indexes')
bdf=bdf[bdf['allowed']==True].copy().reset_index(drop=True)
# logger.debug('Allowed bonds:')
# for ln in bdf.to_string().split('\n'):
# logger.debug(ln)
bdf=TC.bondcycle_collective(bdf,chain_manager=self.chain_manager)
logger.debug(f'{bdf[bdf["remove-to-uncyclize"]==True].shape[0]} out of {bdf.shape[0]} bonds removed to break nascent cycles')
bdf=bdf[bdf['remove-to-uncyclize']==False].copy().reset_index(drop=True)
# logger.debug('Non-cyclizing bonds:')
# for ln in bdf.to_string().split('\n'):
# logger.debug(ln)
''' roll the dice '''
bdf['lucky']=[True for _ in range(bdf.shape[0])]
if apply_probabilities:
for i,r in bdf.iterrows():
x=np.random.random()
if x>r.prob:
bdf.loc[i,'lucky']=False
logger.debug(f'{bdf[bdf["lucky"]==True].shape[0]} bonds survive probability application')
bdf=bdf[bdf['lucky']==True].copy().reset_index(drop=True)
# logger.debug('Lucky bonds:')
# for ln in bdf.to_string().split('\n'):
# logger.debug(ln)
''' apply the stated limit '''
if abs_max>-1:
if abs_max<bdf.shape[0]:
bdf=bdf.loc[:abs_max].copy().reset_index(drop=True)
logger.debug(f'Limiting to {bdf.shape[0]} allowed bonds')
logger.debug('Final bonds:')
for ln in bdf.to_string().split('\n'):
logger.debug(ln)
if stage==reaction_stage.cure:
TC.linkcell_cleanup()
return bdf