"""Various utility methods for plotting and postprocessing.
Author: Cameron F. Abrams <cfa22@drexel.edu>
"""
import logging
import os
import networkx as nx
import numpy as np
import pandas as pd
from scipy.constants import physical_constants
from scipy.optimize import curve_fit
from ..core.coordinates import Coordinates
from ..core.topocoord import TopoCoord
from ..external.gromacs import gmx_energy_trace
logger=logging.getLogger(__name__)
[docs]
def density_from_gro(gro,mollib='./lib/molecules/parameterized',units='SI'):
"""Computes density from a Gromacs gro file.
Args:
gro (str): name of gro file
mollib (str): location of parameterized molecular templates, defaults to './lib/molecules/parameterized'
units (str): string indicating unit system, defaults to 'SI'
Returns:
float: density
"""
C=Coordinates.read_gro(gro)
resnames=list(set(C.A['resName']))
templates={x:TopoCoord.from_top_gro(os.path.join(mollib,f'{x}.top'),os.path.join(mollib,f'{x}.gro')) for x in resnames}
mass=0.0
counts={}
for nm,T in templates.items():
if not nm in counts:
counts[nm]={}
for an in T.Topology.D['atoms']['atom']:
counts[nm][an]=0
these=C.A[C.A['resName']==nm]['atomName']
for n in these:
counts[nm][n]+=1
for nm,adict in counts.items():
T=templates[nm].Topology.D['atoms']
for aname,c in adict.items():
mass+=c*T[T['atom']==aname]['mass'].values[0]
volume=np.prod(C.box.diagonal())
fac=1.0
if units=='SI':
mfac=physical_constants['atomic mass constant'][0]
lfac=1.e-9
vfac=lfac**3
fac=mfac/vfac
return fac*mass/volume
""" These globals are used in traversing a project directory to extract data from edr files """
_system_dirs=['densification','precure',r'iter-{iter:d}','capping','postcure']
_postsim_dirs=['anneal','equilibrate']
_measurement_dirs=['Tg','E']
_md_ensembles={'nvt':['Temperature','Potential'],'npt':['Temperature','Potential','Density'],'default':['Temperature','Potential']}
_indir_pfx={}
_indir_pfx['densification']=[r'densified-{ens:s}',r'densified-repeat-{repeat:d}-{ens:s}']
_indir_pfx['precure']=[r'preequilibration-{ens:s}',r'annealed',r'postequilibration-{ens:s}']
_indir_pfx['iter-n']=[r'1-cure_drag-stage-{stage:d}-{ens:s}',r'3-cure_relax-stage-{stage:d}-{ens:s}',r'4-cure_equilibrate-{ens:s}']
_indir_pfx['capping']=[r'7-cap_relax-stage-{stage:d}-{ens:s}',r'8-cap_equilibrate-{ens:s}']
_indir_pfx['postcure']=[r'preequilibration-{ens:s}',r'annealed',r'postequilibration-{ens:s}']
def _concat_from_edr(df,edr,names,add=[],add_if_missing=[('Density',0.0)]):
"""Concatenates rows onto dataframe df by reading data from an edr file.
Args:
df (pandas.DataFrame): a dataframe with data read in from edr files
edr (str): name of a new edr file to read from
names (list of str): names of energy-like quantities to be read in; each must have a column in df already
add (list): names and values to add to dataset, defaults to []
add_if_missing (list): names and values to add if not found in edr file, defaults to [('Density',0.0)]
Returns:
tuple: the dataframe and the scalar value of last time before this data increment is read in
"""
xshift=0.0
if not df.empty: xshift=df.iloc[-1]['time(ps)']
if len(names)==0: return df,xshift
# print(f'{add_if_missing}')
this_df=gmx_energy_trace(edr,names,xshift=xshift)
for m in add:
nm,val=m
this_df[nm]=np.ones(this_df.shape[0],dtype=type(val))*val
for m in add_if_missing:
nm,val=m
if not nm in this_df:
this_df[nm]=np.ones(this_df.shape[0],dtype=type(val))*val
df=pd.concat((df,this_df),ignore_index=True)
return df,xshift
[docs]
def postsim_density_evolution(proj_dir,append_dirname=False):
"""Returns a single dataframe that is a concatenation of the csv files in the 'postsim' subdirectories.
Args:
proj_dir (str): name of complete project directory
Returns:
pandas.DataFrame: the dataframe
"""
if not os.path.exists(proj_dir): return
sysd=os.path.join(proj_dir,'postsim')
fn=[f'{sysd}/anneal/anneal.csv',f'{sysd}/equilibrate/equilibrate.csv']
df=pd.DataFrame()
lt=0.0
for f in fn:
t=pd.read_csv(f,header=0,index_col=None)
logger.debug(f'shifting by {lt}')
t['time(ps)']+=lt
lt=t['time(ps)'].to_list()[-1]
df=pd.concat((df,t))
if append_dirname:
df.rename(columns={x:f'{x}-{proj_dir}' for x in df.columns})
return df
[docs]
def density_evolution(proj_dir):
"""Returns a single dataframe containing density, temperature, number of bonds vs time by reading all edrs in the correct order from a complete project directory.
Args:
proj_dir (str): name of complete project directory
Returns:
pandas.DataFrame: the dataframe
"""
if not os.path.exists(proj_dir): return
sysd=os.path.join(proj_dir,'systems')
df=pd.DataFrame()
xshift=0.0
transition_times=[xshift]
interval_labels=[]
markers=[]
nbonds=0
for subd in _system_dirs:
# print(f'subd {subd}')
if subd==r'iter-{iter:d}' or subd=='capping' or subd=='densification':
# loooking for iterations
# print(iter_mark0)
iter=1
iter_subd_rel=subd if subd in ['capping','densification'] else subd.format(iter=iter)
iter_subd=os.path.join(sysd,iter_subd_rel)
while os.path.exists(iter_subd):
logger.info(f'Reading edrs in {iter_subd_rel}...')
if r'iter' in subd:
markers.append(df.iloc[-1]['time(ps)'])
dirkey='iter-n' if subd==r'iter-{iter:d}' else subd
if subd==r'iter-{iter:d}':
if os.path.exists(os.path.join(iter_subd,'2-cure_update-bonds.csv')):
bdf=pd.read_csv(os.path.join(iter_subd,'2-cure_update-bonds.csv'),sep=r'\s+',header=0,index_col=None)
nbonds+=bdf.shape[0]
else:
if os.path.exists(os.path.join(iter_subd,'6-cap_update-bonds.csv')):
bdf=pd.read_csv(os.path.join(iter_subd,'6-cap_update-bonds.csv'),sep=r'\s+',header=0,index_col=None)
nbonds+=bdf.shape[0]
# print(f'iter_subd {iter_subd} dirkey {dirkey}')
for pfx in _indir_pfx[dirkey]:
# print(f'pfx {pfx}')
if r'stage' in pfx or r'repeat' in pfx:
stg=1
stg_present=any([os.path.exists(os.path.join(iter_subd,pfx.format(stage=stg,repeat=stg,ens=x))+r'.edr') for x in _md_ensembles])
# print(f'{iter_subd} {stg} {pfx} stg_present {stg_present}')
while stg_present:
for ens,qtys in _md_ensembles.items():
edr_pfx=os.path.join(iter_subd,pfx.format(stage=stg,repeat=stg,ens=ens))
gro=edr_pfx+r'.gro'
# print(edr_pfx,os.path.exists(edr_pfx+r'.edr'))
if os.path.exists(edr_pfx+r'.edr'):
density=0.0
if ens!='npt':
if os.path.exists(gro):
density=density_from_gro(gro,os.path.join(proj_dir,'molecules/parameterized'))
# print(f'density from gro {density}')
df,xshift=_concat_from_edr(df,edr_pfx,qtys,add=[('nbonds',nbonds)],add_if_missing=[('Density',density)])
transition_times.append(xshift)
interval_labels.append([edr_pfx])
stg+=1
stg_present=any([os.path.exists(os.path.join(iter_subd,pfx.format(stage=stg,repeat=stg,ens=x))+r'.edr') for x in _md_ensembles])
# print(f'->{iter_subd} {stg} {pfx} stg_present {stg_present}')
elif r'equil' in pfx or r'densif' in pfx:
eq_present=any([os.path.exists(os.path.join(iter_subd,pfx.format(ens=x))+r'.edr') for x in _md_ensembles])
# print(f'{iter_subd} eq_present? {eq_present}')
if eq_present:
for ens,qtys in _md_ensembles.items():
edr_pfx=os.path.join(iter_subd,pfx.format(ens=ens))
gro=edr_pfx+r'.gro'
# print(edr_pfx,os.path.exists(edr_pfx+r'.edr'),ens,qtys)
if os.path.exists(edr_pfx+r'.edr'):
density=0.0
if ens!='npt':
if os.path.exists(gro):
density=density_from_gro(gro,os.path.join(proj_dir,'molecules/parameterized'))
df,xshift=_concat_from_edr(df,edr_pfx,qtys,add=[('nbonds',nbonds)],add_if_missing=[('Density',density)])
transition_times.append(xshift)
interval_labels.append([edr_pfx])
iter+=1
iter_subd_rel=subd.format(iter=iter)
iter_subd=os.path.join(sysd,iter_subd_rel) if r'iter' in subd else 'I_BET_THIS_FILE_DNE'
# if r'iter' in subd: markers.append(df.iloc[-1]['time (ps)'])
else:
this_subd=os.path.join(sysd,subd)
logger.info(f'Reading edrs in {subd}...')
for pfx in _indir_pfx[subd]:
# print(f'this_subd {this_subd} pfx {pfx}')
if 'ens' in pfx:
for ens,qtys in _md_ensembles.items():
edr_pfx=os.path.join(this_subd,pfx.format(ens=ens))
gro=edr_pfx+r'.gro'
# print(edr_pfx,os.path.exists(edr_pfx+r'.edr'))
# print(gro,os.path.exists(gro))
if os.path.exists(edr_pfx+r'.edr'):
density=0.0
if ens=='nvt' and os.path.exists(gro):
density=density_from_gro(gro,os.path.join(proj_dir,'molecules/parameterized'))
df,xshift=_concat_from_edr(df,edr_pfx,qtys,add=[('nbonds',nbonds)],add_if_missing=[('Density',density)])
transition_times.append(xshift)
interval_labels.append([edr_pfx])
else:
edr_pfx=os.path.join(this_subd,pfx.format(ens=ens))
gro=edr_pfx+r'.gro'
# print(edr_pfx,os.path.exists(edr_pfx+r'.edr'))
if os.path.exists(edr_pfx+r'.edr'):
density=0.0
if os.path.exists(gro):
density=density_from_gro(gro,os.path.join(proj_dir,'molecules/parameterized'))
df,xshift=_concat_from_edr(df,edr_pfx,_md_ensembles['default'],add=[('nbonds',nbonds)],add_if_missing=[('Density',density)])
transition_times.append(xshift)
interval_labels.append([edr_pfx])
return df,transition_times,markers,interval_labels
[docs]
def graph_from_bondsfile(bondsfile):
"""Generates a networkx Graph in which each node is a molecule (from 'mi' and 'mj' records in the bondsfile) and edges indicate two molecules are joined by a covalent bond.
Args:
bondsfile (str): name of bondsfile
Returns:
networkx.Graph: the graph
"""
G=nx.Graph()
df=pd.read_csv(bondsfile,header=0,index_col=None,sep=r'\s+')
for i,r in df.iterrows():
G.add_edge(r['mi'],r['mj'])
return G
[docs]
def mwbxl(G:nx.Graph,crosslinker='GMA',monomer='STY'):
"""Computes the histogram of monomer counts 'n' between crosslinking sites using a molecular connectivity graph; used mainly for vinyl-based polymerizations.
Args:
G (nx.Graph): molecular connectivity graph
crosslinker (str): name of crosslinker molecule, defaults to 'GMA'
monomer (str): name of monomer, defaults to 'STY'
Returns:
pd.DataFrame: a dataframe of 'n' and 'count'
"""
xG=G.copy()
# traverse edges and label hetero/homo participants
for n in xG.nodes():
xG.nodes[n]['neighbor_count']=0
for u,v in xG.edges():
n,m=xG.nodes[u],xG.nodes[v]
n['neighbor_count']+=1
m['neighbor_count']+=1
if n['molecule_name']!=m['molecule_name']:
n['monomer_type']='hetero'
m['monomer_type']='hetero'
else:
n['monomer_type']='homo'
m['monomer_type']='homo'
wipe_us=[]
for n in xG.nodes():
if xG.nodes[n]['molecule_name']==crosslinker:
wipe_us.append(n)
xG.remove_nodes_from(wipe_us)
chaintype=[]
ch=nx.connected_components(xG)
mwhist={}
nb=0
for c in ch:
nhetero=0
for n in c:
node=xG.nodes[n]
if node.get('monomer_type','none')=='hetero':
nhetero+=1
if nhetero==0:
chaintype.append('isolated')
elif nhetero==1:
if len(c)==1:
node=list(xG.nodes.values())[0]
if node['neighbor_count']==1:
chaintype.append('dangling')
elif node['neighbor_count']==2:
chaintype.append('bridging')
if len(c) not in mwhist:
mwhist[len(c)]=0
mwhist[len(c)]+=1
else:
chaintype.append('dangling')
elif nhetero==2:
chaintype.append('bridging')
if len(c) not in mwhist:
mwhist[len(c)]=0
mwhist[len(c)]+=1
nb+=1
maxlen=max(list(mwhist.keys()))
mw=[]
counts=[]
for n in range(maxlen):
cnt=mwhist.get(n,0)
mw.append(n)
counts.append(cnt)
df=pd.DataFrame({'n':mw,'counts':counts})
return df
[docs]
def clusters(G:nx.Graph):
"""Performs a clustering analysis and returns a histogram of cluster sizes (in numbers of molecules) as a pandas DataFrame.
Args:
G (nx.Graph): molecular connectivity graph
Returns:
pd.DataFrame: cluster size histogram
"""
counts={}
for c in sorted(nx.connected_components(G),key=len,reverse=True):
size=len(c)
if not size in counts:
counts[size]=0
counts[size]+=1
sizes=[]
numbers=[]
for s,c in counts.items():
sizes.append(s)
numbers.append(c)
df=pd.DataFrame({'sizes':sizes,'counts':numbers})
df.sort_values(by='sizes',ascending=False,inplace=True)
return df
[docs]
def compute_tg(T,v,n_points=[10,20]):
"""Performs a Tg determination from (volume or density) vs temperature data by fitting lines to the low-T region and another to the high-T region, taking Tg as the temperature at which they intersect.
Args:
T (numpy.array): temperature values
v (numpy.array): volume or density data
n_points (list): number of points on the low and high side to fit lines to, defaults to [10,20]
"""
def func(x,a,b):
return x*a+b
hot_par=[]
cold_par=[]
x=np.array(T)[:n_points[0]]
y=np.array(v)[:n_points[0]]
# logger.info(f'Tg: {len(x)} data points on cold side; begins at {x[0]}')
cold_par,pcov=curve_fit(func,x,y)
sse=(y-func(x,*cold_par))**2
sst=(y-x.mean())**2
r2=1-sse.sum()/sst.sum()
# logger.info(f'cold fit: {cold_par} {r2}')
x=np.array(T)[-n_points[1]:]
y=np.array(v)[-n_points[1]:]
# logger.info(f'Tg: {len(x)} data points on hot side; ends at {x[-1]}')
hot_par,pcov=curve_fit(func,x,y)
sse=(y-func(x,*hot_par))**2
sst=(y-x.mean())**2
r2=1-sse.sum()/sst.sum()
# logger.info(f'hot fit: {hot_par} {r2}')
Tg=-1
if len(hot_par)>0 and len(cold_par)>0:
Tg=-(hot_par[1]-cold_par[1])/(hot_par[0]-cold_par[0])
return Tg,cold_par,hot_par
[docs]
def compute_E(strain,stress,fit_domain=[10,100]):
"""Computes the Young's modulus by performing a linear fit to an elastic regime in stress-vs-strain data.
Args:
strain (numpy.array): strain values
stress (numpy.array): stress values
fit_domain (list): domain over which fit is made, defaults to [10,100]
Returns:
tuple(float,float): E and R2 from fit
"""
x=np.array(strain[fit_domain[0]:fit_domain[1]])
y=np.array(stress[fit_domain[0]:fit_domain[1]])
# logger.info(f'x: {x[0]} -> {x[-1]}')
# logger.info(f'y: {y[0]} -> {y[-1]}')
def func(x,a):
return a*x
popt,pcov=curve_fit(func,x,y,p0=(1000.0))
sse=(y-func(x,*popt))**2
sst=(y-y.mean())**2
r2=1-sse.sum()/sst.sum()
return popt[0],r2