Source code for htpolynet.core.topology

"""Class for managing gromacs .top file data.

Author: Cameron F. Abrams <cfa22@drexel.edu>
"""
import json
import logging
import os

from copy import deepcopy
from itertools import product

import networkx as nx
import numpy as np
import pandas as pd

from networkx.readwrite import json_graph
from scipy.constants import physical_constants

from ..geometry.bondlist import Bondlist
from ..geometry.ring import Ring, RingList

logger = logging.getLogger(__name__)

_PAD_ = -99.99

[docs] def typeorder(a): """Correctly orders the tuple of atom types for particular interaction types. Args: a (tuple): atom indices/types from a [ bond ], [ pair ], [ angle ], or [ dihedral ] record Returns: tuple: same atom indices/types correctly ordered to allow for easy searching/sorting """ assert type(a) == tuple, 'error: typeorder() requires a tuple argument' if len(a) == 2: # bond return a if a[0] < a[1] else a[::-1] elif len(a) == 3: # angle return a if a[0] < a[2] else a[::-1] elif len(a) == 4: # dihedral if a[1] == a[2]: return a if a[0] < a[3] else a[::-1] return a if a[1] < a[2] else a[::-1]
idxorder = typeorder # same syntax to order global atom indices in an interaction index
[docs] def repeat_check(t, msg=''): """Checks for repeated index tuples. Args: t (list): list of index tuples msg (str): optional message, defaults to '' """ for i in range(len(t)): for j in range(i + 1, len(t)): assert t[i] != t[j], f'Error: repeated index in {len(t)}-tuple {t}: t({i})={t[i]}\n{msg}'
[docs] def df_typeorder(df, typs): """Type-orders the atom type attributes in each row of dataframe df. Args: df (pandas.DataFrame): a Topology type-directive dataframe; [ atomtypes ], [ bondtypes ], etc. typs (list): list of type-attribute names; typically ['i','j',...] """ for i in df.index: df.loc[i,typs] = typeorder(tuple(df.loc[i, typs]))
_GromacsIntegers_ = ('nr', 'atnum', 'resnr', 'ai', 'aj', 'ak', 'al', '#mols', 'nrexcl', 'funct', 'func', 'nbfunc', 'comb-rule') _GromacsFloats_ = ('charge', 'mass', 'chargeB', 'massB', *tuple([f'c{i}' for i in range(5)]), 'b0', 'kb', 'th0', 'cth', 'rub', 'kub', 'phase', 'kd', 'pn', 'fudgeLJ', 'fudgeQQ')
[docs] def typedata(h, s): if h in _GromacsIntegers_: return int(s) if h in _GromacsFloats_: return float(s) return s
_GromacsExtensiveDirectives_ = ['atoms', 'pairs', 'bonds', 'angles', 'dihedrals'] _NonGromacsExtensiveDirectives_ = ['mol2_bonds'] _GromacsTopologyDirectiveOrder_ = ['defaults', 'atomtypes', 'bondtypes', 'angletypes', 'dihedraltypes', 'moleculetype', 'atoms', 'pairs', 'bonds', 'angles', 'dihedrals', 'system', 'molecules'] _GromacsTopologyDirectiveHeaders_ = { 'atoms': ['nr', 'type', 'resnr', 'residue', 'atom', 'cgnr', 'charge', 'mass','typeB', 'chargeB', 'massB'], 'pairs': ['ai', 'aj', 'funct', 'c0', 'c1'], 'bonds': ['ai', 'aj', 'funct', 'c0', 'c1'], 'angles': ['ai', 'aj', 'ak', 'funct', 'c0', 'c1'], 'dihedrals': ['ai', 'aj', 'ak', 'al', 'funct', 'c0', 'c1', 'c2', 'c3', 'c4', 'c5'], 'atomtypes': ['name', 'atnum', 'mass', 'charge', 'ptype', 'sigma', 'epsilon'], 'moleculetype': ['name', 'nrexcl'], 'bondtypes': ['i', 'j', 'func', 'b0','kb'], 'angletypes': ['i', 'j', 'k', 'func', 'th0', 'cth', 'rub', 'kub'], 'dihedraltypes': ['i', 'j', 'k', 'l', 'func', 'phase', 'kd', 'pn'], 'system': ['Name'], 'molecules': ['Compound', '#mols'], 'defaults': ['nbfunc', 'comb-rule', 'gen-pairs', 'fudgeLJ', 'fudgeQQ'] } _GromacsTopologyIntegerColumns_ = { # columns that must be written without a trailing '.0'; the dataframes can carry these as float when pandas demotes during NaN handling, so we re-cast to nullable Int64 just before serialization 'atoms': ['nr', 'resnr', 'cgnr'], 'pairs': ['ai', 'aj', 'funct'], 'bonds': ['ai', 'aj', 'funct'], 'angles': ['ai', 'aj', 'ak', 'funct'], 'dihedrals': ['ai', 'aj', 'ak', 'al', 'funct'], 'atomtypes': ['atnum'], 'moleculetype': ['nrexcl'], 'bondtypes': ['func'], 'angletypes': ['func'], 'dihedraltypes': ['func', 'pn'], 'molecules': ['#mols'], 'defaults': ['nbfunc', 'comb-rule'], } _GromacsTopologyHashables_ = { # attributes/columns that should always have values, no NaNs; these are how each item is sorted 'atoms': ['nr'], 'pairs': ['ai', 'aj'], 'bonds': ['ai', 'aj'], 'angles': ['ai', 'aj', 'ak'], 'dihedrals': ['ai', 'aj', 'ak', 'al'], 'atomtypes': ['name'], 'bondtypes': ['i', 'j'], 'angletypes': ['i', 'j', 'k'], 'dihedraltypes': ['i', 'j', 'k', 'l'] } _GromacsTopologyDirectiveDefaults_ = { 'system': ['A_generic_system'], 'molecules': ['None', 1], 'moleculetype': ['None', 3], 'defaults': [1, 2, 'yes', 0.5, 0.83333333] }
[docs] def select_topology_type_option(options, typename='dihedraltypes', rule='stiffest'): """Selects from a list of topological interaction options of type typename using the provided rule. Args: options (list): list of parameterization options for a particular interaction typename (str): string designation of interaction type, defaults to 'dihedraltypes' rule (str): string describing the selection rule, defaults to 'stiffest' Returns: dict: the selected parameterization option (an element of options) """ hashables = _GromacsTopologyHashables_[typename] headers = _GromacsTopologyDirectiveHeaders_[typename].copy() for i in hashables: headers.remove(i) if rule == 'stiffest' or rule == 'softest': if typename == 'bondtypes': parindex = headers.index('kb') elif typename == 'angletypes': parindex = headers.index('cth') elif typename == 'dihedraltypes': parindex = headers.index('kd') sorted_options = list(sorted(options, key=lambda x: x[parindex])) if rule == 'stiffest': return sorted_options[-1] elif rule == 'softest': return sorted_options[0] return []
[docs] class Topology: """ Class for handling gromacs topology data """ def __init__(self, system_name=''): """Constructor for Topology class. Args: system_name (str): optional name of system, defaults to '' """ ''' D: a dictionay keyed on Gromacs topology directives with values that are lists of one or more pandas dataframes corresponding to sections ''' self.D: dict[str, pd.DataFrame] = {} for k, v in _GromacsTopologyDirectiveDefaults_.items(): hdr = _GromacsTopologyDirectiveHeaders_[k] dfdict = {kk: [a] for kk, a in zip(hdr, v)} self.D[k] = pd.DataFrame(dfdict) self.D['system'] = pd.DataFrame({'name': [system_name]}) ''' bondlist: a class that owns a dictionary keyed on atom global index with values that are lists of global atom indices bound to the key ''' self.bondlist = Bondlist() self.residue_network = nx.Graph() self.rings = RingList([]) self.empty = True
[docs] @classmethod def read_top(cls, filename, pad=_PAD_): """Reads a GROMACS-style topology file and returns a Topology instance. Each directive section is stored as a pandas DataFrame. Duplicate 'dihedrals' and 'dihedraltypes' sections are merged. Args: filename (str): name of GROMACS top file to read pad (float): padding value for missing fields, defaults to _PAD_ Raises: KeyError: if an unrecognized topology directive is encountered Returns: Topology: a new Topology instance """ assert os.path.exists(filename), f'Error: {filename} not found.' inst = cls() inst.filename = filename dirname = os.path.dirname(filename) inst.includes = [] with open(filename, 'r') as f: data = f.read().split('[') stanzas = [('['+x).split('\n') for x in data][1:] for s in stanzas: directive = s[0].split()[1].strip() contentlines = [l for l in s[1:] if not (len(l.strip()) == 0 or l.strip().startswith(';'))] if not directive in _GromacsTopologyDirectiveHeaders_: raise KeyError(f'unrecognized topology directive "{directive}"') header = _GromacsTopologyDirectiveHeaders_[directive] series = {k: [] for k in header} for line in contentlines: if line.startswith('#include'): tokens = [x.strip() for x in line.split()] inst.includes.append(tokens[1].replace('"', '')) continue # ignore anything after a ';' line = line.split(';')[0].strip() if directive != 'system': # no need to split line tokens = [x.strip() for x in line.split()] else: tokens = [line] padded = [typedata(k, _) for _, k in zip(tokens, header)] # pad row so that it is same length as series for _ in range(len(tokens), len(header)): padded.append(pad) # logger.debug(f'padded row: {padded}') assert len(padded) == len(header), f'Error: Padding solution does not work! {directive} {len(tokens)}:{len(padded)}!={len(header)} {",".join(tokens)} {",".join(header)}' for k, v in zip(header, padded): series[k].append(v) tdf = pd.DataFrame(series) if directive == 'dihedraltypes': if directive in inst.D: # logger.info(f'Found second set of {len(tdf)} [ dihedraltypes ] in {inst.filename}; merging into set of {len(inst.D["dihedraltypes"])} types already read in...') # we have already read-in a dihedraltypes section # so let's append this one inst.D['dihedraltypes'] = pd.concat([inst.D['dihedraltypes'], tdf], ignore_index=True).drop_duplicates() # logger.info(f' -> now there are {len(inst.D["dihedraltypes"])} dihedral types.') else: inst.D[directive] = tdf elif directive == 'dihedrals': # if there is already a dihedrals section, assume if directive in inst.D: inst.D['dihedrals'] = pd.concat([inst.D['dihedrals'], tdf], ignore_index=True) else: inst.D[directive] = tdf else: inst.D[directive] = tdf # we must assume the 'atoms' are sorted by global index; however, all other # sections need not be sorted. For convenience, we will keep them sorted by # atom indices or atom type name, where appropriate. inst.null_check(msg=f'read from {filename}') if 'atomtypes' in inst.D: inst.D['atomtypes'] = inst.D['atomtypes'].sort_values(by='name').reset_index(drop=True) if 'bonds' in inst.D: inst.D['bonds'] = inst.D['bonds'].sort_values(by=['ai','aj']).reset_index(drop=True) inst.bondlist = Bondlist.fromDataFrame(inst.D['bonds']) if 'bondtypes' in inst.D: df_typeorder(inst.D['bondtypes'], typs=['i', 'j']) inst.D['bondtypes'] = inst.D['bondtypes'].sort_values(by=['i', 'j']).reset_index(drop=True) if 'pairs' in inst.D: inst.D['pairs'] = inst.D['pairs'].sort_values(by=['ai', 'aj']).reset_index(drop=True) if 'pairtypes' in inst.D: df_typeorder(inst.D['pairtypes'], typs=['i', 'j']) inst.D['pairtypes'] = inst.D['pairtypes'].sort_values(by=['i', 'j']).reset_index(drop=True) if 'angles' in inst.D: inst.D['angles'] = inst.D['angles'].sort_values(by=['aj', 'ai', 'ak']).reset_index(drop=True) if 'angletypes' in inst.D: df_typeorder(inst.D['angletypes'], typs=['i', 'j', 'k']) inst.D['angletypes'] = inst.D['angletypes'].sort_values(by=['j', 'i', 'k']).reset_index(drop=True) if 'dihedrals' in inst.D: inst.D['dihedrals'] = inst.D['dihedrals'].sort_values(by=['aj', 'ak', 'ai', 'al']).reset_index(drop=True) if 'dihedraltypes' in inst.D: df_typeorder(inst.D['dihedraltypes'], typs=['i', 'j', 'k', 'l']) inst.D['dihedraltypes'] = inst.D['dihedraltypes'].sort_values(by=['j', 'k', 'i', 'l']).reset_index(drop=True) for f in inst.includes: inst.merge(Topology.read_top(os.path.join(dirname, f))) inst.empty = False return inst
[docs] def bond_source_check(self): """Checks that the 'bonds' and 'mol2_bonds' dataframes contain the same bonds.""" if 'bonds' in self.D and 'mol2_bonds' in self.D: # logger.debug(f'Consistency check between gromacs-top bonds and mol2-bonds requested.') grobonds = self.D['bonds'].sort_values(by=['ai', 'aj']) bmi = grobonds.set_index(['ai', 'aj']).index mol2bonds = self.D['mol2_bonds'].sort_values(by=['ai', 'aj']) mbmi = mol2bonds.set_index(['ai', 'aj']).index check = all([x == y for x, y in zip(bmi, mbmi)]) # logger.debug(f'Result: {check}') if not check: logger.error(f'Gromacs/Mol2 bond inconsistency detected') logger.debug(f'GROMACS:') logger.debug(grobonds[['ai','aj']].head().to_string()) logger.debug(f'MOL2:') logger.debug(mol2bonds[['ai','aj']].head().to_string()) for x, y in zip(bmi, mbmi): logger.debug(f'{x} {y} {x == y}')
[docs] def shiftatomsidx(self, idxshift, directive, rows=[], idxlabels=[]): """Shifts all atom indexes in a topology directive dataframe. Args: idxshift (int): integer index shift directive (str): name of GROMACS topology directive ('atoms','bonds','pairs','angles','dihedrals') rows (list): row boundaries [start, end], defaults to [] idxlabels (list): names of columns that contain atom indexes, defaults to [] """ if directive in self.D: cols = self.D[directive].columns.get_indexer(idxlabels) self.D[directive].iloc[rows[0]:rows[1], cols] += idxshift
[docs] def detect_rings(self): """Detects unique rings in the topology.""" g = self.bondlist.graph() self.rings = RingList([]) for c in nx.chordless_cycles(g): self.rings.append(Ring(c))
[docs] def read_tpx(self, filename): if not os.path.exists(filename): # .tpx only holds the ring list, which we can rebuild from the bond # graph. This path matters for "Using cached parameterization" on a # molecule whose library had .gro/.top/.mol2 but no .tpx checked in. logger.debug(f'{filename} not found; detecting rings from topology graph.') self.detect_rings() return with open(filename, 'r') as f: data = f.read().split('[') stanzas = [('[' + x).split('\n') for x in data][1:] for s in stanzas: directive = s[0].split()[1].strip() contentlines = [l for l in s[1:] if not (len(l.strip()) == 0 or l.strip().startswith(';'))] if directive == 'rings': # each line is a space-delimited list of global atom indices that identifies one ring self.rings = RingList([]) for line in contentlines: self.rings.append(Ring([int(x) for x in line.split()])) else: logger.debug(f'tpx directive {directive} in {filename} is not recognized.')
[docs] def write_tpx(self, filename): with open(filename, 'w') as f: f.write('[ rings ]\n') for r in self.rings: f.write(' '.join([str(x) for x in r.idx]) + '\n')
[docs] def rep_ex(self, count=0): """Replicates extensive topology components (atoms, pairs, bonds, angles, dihedrals). Args: count (int): number of replicas to generate, defaults to 0 Raises: Exception: if self is missing an atoms dataframe """ if count > 0: counts = {k: 0 for k in _GromacsExtensiveDirectives_} counts.update({k: 0 for k in _NonGromacsExtensiveDirectives_}) for t in _GromacsExtensiveDirectives_: if t in self.D: counts[t] = len(self.D[t]) for t in _NonGromacsExtensiveDirectives_: if t in self.D: counts[t] = len(self.D[t]) try: idxshift = counts['atoms'] except: raise Exception(f'Error: expected an "atoms" dataframe') # Per-copy resnr span: shift each replica by this many residues so # multi-residue molecules don't collide on the same resid. resnr_per_copy = int(self.D['atoms']['resnr'].max()) for t in _GromacsExtensiveDirectives_: if t in self.D: self.D[t] = pd.concat([self.D[t]] * count, ignore_index=True) new_rings = RingList([]) for c in range(1, count): self.shiftatomsidx(idxshift * c, 'atoms', rows=[(c * counts['atoms']), ((c + 1) * counts['atoms'])], idxlabels=['nr']) self.shiftatomsidx(resnr_per_copy * c, 'atoms', rows=[(c * counts['atoms']), ((c + 1) * counts['atoms'])], idxlabels=['resnr']) self.shiftatomsidx(idxshift * c, 'bonds', rows=[(c * counts['bonds']), ((c + 1) * counts['bonds'])], idxlabels=['ai', 'aj']) self.shiftatomsidx(idxshift * c, 'mol2_bonds', rows=[(c * counts['mol2_bonds']), ((c + 1) * counts['mol2_bonds'])], idxlabels=['ai', 'aj']) self.shiftatomsidx(idxshift * c, 'pairs', rows=[(c * counts['pairs']), ((c + 1) * counts['pairs'])], idxlabels=['ai', 'aj']) self.shiftatomsidx(idxshift * c, 'angles', rows=[(c * counts['angles']), ((c + 1) * counts['angles'])], idxlabels=['ai', 'aj', 'ak']) self.shiftatomsidx(idxshift * c, 'dihedrals', rows=[(c * counts['dihedrals']), ((c + 1) * counts['dihedrals'])], idxlabels=['ai', 'aj', 'ak', 'al']) new_ringblock = RingList([]) for r in self.rings: new_ringblock.append(r.copy().shift(idxshift * c)) new_rings.extend(new_ringblock) self.rings.extend(new_rings) if 'bonds' in self.D: self.bondlist = Bondlist.fromDataFrame(self.D['bonds'])
[docs] @classmethod def from_ex(cls,other): """Makes a new Topology instance by copying only the extensive dataframes from an existing topology. Args: other (Topology): the other topology Returns: Topology: a new Topology built from the extensive dataframes of other """ ''' ''' inst = cls() for t in _GromacsExtensiveDirectives_: if t in other.D: inst.D[t] = other.D[t].copy() if 'bonds' in inst.D: inst.bondlist = Bondlist.fromDataFrame(inst.D['bonds']) inst.rings = deepcopy(other.rings) return inst
[docs] def write_top(self, filename): """Writes topology to a GROMACS-format top file. Args: filename (str): name of top file to write """ self.null_check(msg=f'writing {filename}') assert 'defaults' in self.D, 'Error: no [ defaults ] in topology?' with open(filename, 'w') as f: f.write('; Gromacs-format topology written by HTPolyNet\n') for k in _GromacsTopologyDirectiveOrder_: if k not in self.D: continue # Replace pad sentinels with NA so to_csv writes nothing for missing fields; # operate on a copy so self.D[k] is never mutated. df = self.D[k].replace(_PAD_, pd.NA) f.write(f'[ {k} ]\n; ') if k in _GromacsTopologyHashables_: df = df.sort_values(by=_GromacsTopologyHashables_[k]) # Cast integer-valued columns to nullable Int64 so they serialize # as e.g. "2" rather than "2.0" (parmed's gromacs reader insists # on parsing pn/func/atom-index columns as int). for col in _GromacsTopologyIntegerColumns_.get(k, []): if col in df.columns: df[col] = df[col].astype('Int64') df.to_csv(f, sep=' ', index=False, header=True, doublequote=False) f.write('\n') f.write('; end\n')
[docs] def null_check(self, msg=''): """Checks for NaNs in dataframe locations that should never have NaNs. Args: msg (str): context message, defaults to '' Raises: Exception: if a NaN is found in a required field """ for k in _GromacsTopologyDirectiveOrder_: if k in self.D and k in _GromacsTopologyHashables_: for a in _GromacsTopologyHashables_[k]: has_nulls = any(self.D[k][a].isnull()) if has_nulls: logger.debug(f'{msg} null in {k} {a}\n{self.D[k].to_string()}') raise ValueError(f'{msg} NaN in {k}[{a}]\n{self.D[k].to_string()}')
[docs] def total_charge(self): """Computes and returns total system charge. Returns: float: total system charge """ if 'atoms' in self.D: return self.D['atoms']['charge'].sum() return 0.0
[docs] def adjust_charges(self, atoms=[], desired_charge=0.0, overcharge_threshhold=0.1, msg=''): """Adjusts atom partial charges so that total system charge equals the desired value. Args: atoms (list): global atom indices to adjust, defaults to [] desired_charge (float): target system charge, defaults to 0.0 overcharge_threshhold (float): threshold overcharge that triggers a warning message, defaults to 0.1 msg (str): message to write if pre-adjusted system charge is too high, defaults to '' Returns: Topology: self """ apparent_charge = self.total_charge() overcharge = apparent_charge - desired_charge logger.debug(f'Adjusting charges of {len(atoms)} atoms due to overcharge of {overcharge:.6f}') if np.abs(overcharge) > overcharge_threshhold: logger.debug(f'{msg}') cpa = -overcharge / len(atoms) logger.debug(f'Adjustment is {cpa:.4e} per atom') for i in atoms: self.D['atoms'].loc[self.D['atoms']['nr'] == i, 'charge'] += cpa logger.debug(f'New total charge after adjustment: {self.total_charge():.6f}') return self
[docs] def total_mass(self, units='gromacs'): """Returns total mass of all atoms in the Topology. Args: units (str): unit system; 'SI' returns kg, 'gromacs' returns amu, defaults to 'gromacs' Returns: float: total mass in amu (gromacs) or kg (SI) """ fac = 1.0 if units == 'SI': fac = physical_constants['atomic mass constant'][0] if 'atoms' in self.D: M_amu = self.D['atoms']['mass'].sum() logger.debug(f'mass {M_amu} fac {fac}') return M_amu * fac return 0.0
[docs] def atomcount(self): """Returns the total number of atoms. Returns: int: number of atoms """ if 'atoms' in self.D: return self.D['atoms'].shape[0] return 0
[docs] def add_restraints(self, pairdf, typ=6, kb=300000.): """Adds type-6 (non-topological) bonds to help drag atoms toward each other. Args: pairdf (pandas.DataFrame): dataframe of pairs ['ai','aj'] typ (int): bond type, defaults to 6 kb (float): bond spring constant (kJ/mol/nm^2), defaults to 300000 """ bmi = self.D['bonds'].set_index(['ai','aj']).sort_index().index for i, b in pairdf.iterrows(): ai, aj = idxorder((b['ai'],b['aj'])) b0 = b['initial_distance'] if not (ai, aj) in bmi: h = _GromacsTopologyDirectiveHeaders_['bonds'] data = [ai, aj, typ, b0, kb] # this new bond will have override parameters bonddict = {k: [v] for k, v in zip(h, data)} bdtoadd = pd.DataFrame(bonddict) self.D['bonds'] = pd.concat((self.D['bonds'], bdtoadd), ignore_index=True)
[docs] def remove_restraints(self, pairdf): """Removes all non-topological restraint bonds represented in pairdf. Deleting these bonds does not influence angles or dihedrals. Args: pairdf (pandas.DataFrame): dataframe of pairs ['ai','aj'] """ d = self.D['bonds'] to_drop = [] for i, b in pairdf.iterrows(): ai, aj = idxorder((b['ai'], b['aj'])) to_drop.append(d[(d.ai == ai) & (d.aj == aj)].index.values[0]) self.D['bonds'] = self.D['bonds'].drop(to_drop)
[docs] def add_bonds(self, pairs=[]): """Adds bonds indicated in list pairs to the topology. Args: pairs (list): list of (ai, aj, order) tuples, defaults to [] Raises: Exception: if an existing bond is in the list of pairs """ # logger.debug('begins') at = self.D['atoms'] ij = self.D['bondtypes'].set_index(['i', 'j']) #mb=self.D['mol2_bonds'] bmi = self.D['bonds'].set_index(['ai', 'aj']).sort_index().index pmi = self.D['pairs'].set_index(['ai', 'aj']).sort_index().index newbonds = [] for b in pairs: # logger.debug(f'{b}') # assert type(b[0])==int bondtuple = (int(b[0]), int(b[1])) order = int(b[2]) ai, aj = idxorder(bondtuple) assert type(ai) == int assert type(aj) == int ''' if this bond is not in the topology, then add it ''' if not (ai, aj) in bmi: newbonds.append((ai, aj)) # logger.debug(f'asking types of {ai} and {aj}; at.shape {at.shape}') it = at.iloc[ai-1].type jt = at.iloc[aj-1].type idx = typeorder((it, jt)) if idx in ij.index: bt = ij.loc[idx, 'func'] # why don't i need need values[0] kb = ij.loc[idx, 'kb'] b0 = ij.loc[idx, 'b0'] else: logger.debug(f'no bondtype {idx} found; using placeholder parameters') bt = 1 b0 = 0.15 kb = 999999 # raise Exception(f'no bondtype {idx} found.') ''' add a new bond! ''' h = _GromacsTopologyDirectiveHeaders_['bonds'] data = [ai, aj, bt, b0, kb] # this new bond will have override parameters assert len(h) == len(data), 'Error: not enough data for new bond?' bonddict = {k: [v] for k, v in zip(h, data)} bdtoadd = pd.DataFrame(bonddict) self.D['bonds'] = pd.concat((self.D['bonds'], bdtoadd), ignore_index=True) # logger.info(f'add_bond:\n{bdtoadd.to_string()}') logger.debug(f'just added {bonddict}') if 'mol2_bonds' in self.D: data = [len(self.D['mol2_bonds']), ai, aj, '1'] # assume single bond; order is str dtype bonddict = {k: [v] for k, v in zip(['bondIdx', 'ai', 'aj', 'order'], data)} self.D['mol2_bonds'] = pd.concat((self.D['mol2_bonds'], pd.DataFrame(bonddict)), ignore_index=True) # remove this pair from pairs if it's in there (it won't be) if idx in pmi: logger.debug(f'Warning: new bond {ai}-{aj} was evidently in the [ pairs ]!') d = self.D['pairs'] indexes_to_drop = d[(d.ai.isin(idx)) & (d.aj.isin(idx))].index logger.debug(f'Dropping [ pair ]:\n{d[indexes_to_drop].to_string()}') indexes_to_keep = set(range(d.shape[0])) - set(indexes_to_drop) self.D['pairs'] = d.take(list(indexes_to_keep)).reset_index(drop=True) else: ''' if it is, do nothing; it will be templated; if mol2_bonds are present (usually because a Topology is part of a molecule being parameterized), update the order of the bond. ''' if 'mol2_bonds' in self.D: mb = self.D['mol2_bonds'] bi = (mb['ai'] == ai) & (mb['aj'] == aj) mb.loc[bi, 'order'] = str(order) ''' update the bondlist ''' for b in newbonds: self.bondlist.append(b) logger.debug(f'Added {len(newbonds)} new bonds')
[docs] def delete_atoms(self, idx=[], reindex=True, return_idx_of=[], **kwargs): """Deletes atoms from topology. Args: idx (list): atom indexes to delete, defaults to [] reindex (bool): reindex atoms after deleting, defaults to True return_idx_of (list): old indices whose new indices should be returned, defaults to [] Returns: dict: old-index-to-new-index mapper """ idx_set = set(idx) paranoid_about_pairs = kwargs.get('paranoid_about_pairs', False) self.null_check(msg='beginning of delete atoms') d = self.D['atoms'] new_idx = [] drop_mask = d['nr'].isin(idx_set) total_missing_charge = d.loc[drop_mask, 'charge'].sum() logger.debug(f'Deleting {drop_mask.sum()} [ atoms ]; charge to make up: {total_missing_charge:.4f}') self.D['atoms'] = d[~drop_mask].reset_index(drop=True) mapper = {} if reindex: d = self.D['atoms'] oldGI = d['nr'].copy() d['nr'] = d.index + 1 mapper = {k: v for k, v in zip(oldGI, d['nr'])} assert not any(x in mapper for x in idx_set), 'Error: Some deleted atoms in mapper.' if any(np.isnan(list(mapper.keys()))): logger.error('null in mapper keys') if any(np.isnan(list(mapper.values()))): logger.error('null in mapper values') if len(return_idx_of) > 0: new_idx = [mapper[o] for o in return_idx_of] for pt in ['bonds', 'mol2_bonds', 'pairs']: if pt not in self.D: continue d = self.D[pt] drop_mask = d['ai'].isin(idx_set) | d['aj'].isin(idx_set) logger.debug(f'Deleting {drop_mask.sum()} [ {pt} ]') self.D[pt] = d[~drop_mask].reset_index(drop=True) if reindex: d = self.D[pt] assert not any(x in idx_set for x in d.ai), f'Error: deleted atom survived in {pt} ai' assert not any(x in idx_set for x in d.aj), f'Error: deleted atom survived in {pt} aj' assert all(x in mapper for x in d.ai), f'Error: surviving {pt} atom ai old idx not in mapper' assert all(x in mapper for x in d.aj), f'Error: surviving {pt} atom aj old idx not in mapper' # Defer remapping pairs until after dihedrals are pruned. if pt != 'pairs': d.ai = d.ai.map(mapper) d.aj = d.aj.map(mapper) if pt == 'bonds': self.bondlist = Bondlist.fromDataFrame(d) if pt == 'mol2_bonds': self.D[pt]['bondIdx'] = list(range(1, self.D[pt].shape[0] + 1)) d = self.D['angles'] drop_mask = d['ai'].isin(idx_set) | d['aj'].isin(idx_set) | d['ak'].isin(idx_set) logger.debug(f'Deleting {drop_mask.sum()} [ angles ]') self.D['angles'] = d[~drop_mask].reset_index(drop=True) assert d.ai.dtype == int, f'post-delete lost angle ai dtype {d.ai.dtype}' assert d.aj.dtype == int, f'post-delete lost angle aj dtype {d.aj.dtype}' assert d.ak.dtype == int, f'post-delete lost angle ak dtype {d.ak.dtype}' self.null_check(msg='inside delete atoms before angles reindex') if reindex: d = self.D['angles'] assert not any(x in idx_set for x in d.ai), 'Error: deleted atom survived in angle ai' assert not any(x in idx_set for x in d.aj), 'Error: deleted atom survived in angle aj' zombie_tags = [x in idx_set for x in d.ak] if any(zombie_tags): logger.debug(f'Zombie ak angles:\n{self.D["angles"][zombie_tags].to_string()}') assert not any(zombie_tags), 'Error: deleted atom survived in angle ak' assert all(x in mapper for x in d.ai), 'Error: surviving angle atom ai old idx not in mapper' assert all(x in mapper for x in d.aj), 'Error: surviving angle atom aj old idx not in mapper' assert all(x in mapper for x in d.ak), 'Error: surviving angle atom ak old idx not in mapper' d.ai = d.ai.map(mapper) d.aj = d.aj.map(mapper) d.ak = d.ak.map(mapper) self.null_check(msg='inside delete atoms after angles reindex') d = self.D['dihedrals'] drop_mask = d['ai'].isin(idx_set) | d['aj'].isin(idx_set) | d['ak'].isin(idx_set) | d['al'].isin(idx_set) logger.debug(f'Deleting {drop_mask.sum()} [ dihedrals ]') # Optionally remove pairs whose defining dihedral (ai…al) is being deleted. if paranoid_about_pairs: dp = self.D['pairs'] ddp = d[drop_mask] pair_drop = pd.Index([]) for pi, pl in zip(ddp.ai, ddp.al): pair_drop = pair_drop.union( dp[((dp.ai == pi) & (dp.aj == pl)) | ((dp.ai == pl) & (dp.aj == pi))].index ) if len(pair_drop) > 0: logger.debug(f' -> and deleting {len(pair_drop)} [ pairs ] from those dihedrals') self.D['pairs'] = dp.drop(index=pair_drop).reset_index(drop=True) self.D['dihedrals'] = d[~drop_mask].reset_index(drop=True) if reindex: d = self.D['dihedrals'] d.ai = d.ai.map(mapper) d.aj = d.aj.map(mapper) d.ak = d.ak.map(mapper) d.al = d.al.map(mapper) d = self.D['pairs'] d.ai = d.ai.map(mapper) d.aj = d.aj.map(mapper) # Drop any duplicate 1-4 pairs that survive after remapping. pdrops = d.duplicated(subset=['ai', 'aj'], keep='first') if pdrops.any(): logger.debug(f'Deleting {pdrops.sum()} duplicate 1-4 pair descriptors -- this is likely due to a bug somewhere') self.D['pairs'] = d[~pdrops].reset_index(drop=True) self.null_check(msg='end of delete atoms') logger.debug('finished.') if len(return_idx_of) > 0: return new_idx return mapper
[docs] def prune_stale_14_pairs(self): """Drops entries from [ pairs ] whose atoms are not actually 1-4 in the current bond graph. A template-derived 1-4 pair can become stale during cure when a subsequently formed bond shortens the topological path between its two atoms to 1-3 (or 1-2). Such an entry incorrectly re-introduces a scaled nonbonded interaction on top of the bond/angle exclusion. Returns: int: number of pair entries dropped. """ if 'pairs' not in self.D or self.D['pairs'].empty: return 0 bl = self.bondlist keep_mask = [] for ai, aj in zip(self.D['pairs']['ai'], self.D['pairs']['aj']): one_hop = set(bl.partners_of(int(ai))) if int(aj) in one_hop: keep_mask.append(False) continue two_hop = set() for p in one_hop: two_hop.update(bl.partners_of(int(p))) two_hop.discard(int(ai)) two_hop -= one_hop if int(aj) in two_hop: keep_mask.append(False) continue three_hop = set() for p in two_hop: three_hop.update(bl.partners_of(int(p))) three_hop.discard(int(ai)) three_hop -= one_hop three_hop -= two_hop keep_mask.append(int(aj) in three_hop) dropped = len(keep_mask) - sum(keep_mask) if dropped: self.D['pairs'] = self.D['pairs'][keep_mask].reset_index(drop=True) return dropped
def _myconcat(self, other, directive='', idxlabel=[], idxshift=0, drop_duplicates=False): if not directive in other.D: return if directive in self.D: # shift atom indices for i in idxlabel: other.D[directive][i] += idxshift if drop_duplicates: self.D[directive] = pd.concat((self.D[directive], other.D[directive]), ignore_index=True).drop_duplicates() else: self.D[directive] = pd.concat((self.D[directive], other.D[directive]), ignore_index=True) else: self.D[directive] = other.D[directive]
[docs] def merge(self, other): """Merges another topology into self. Args: other (Topology): topology to merge in """ # logger.debug('Topology.merge begins') # look for duplicated types between self and other. If any are found, delete those types from other and copy their parameters into the explicit interactions they correspond to. self.merge_ex(other) self.merge_types(other)
# logger.debug('Topology.merge ends') # def handle_duplicate_types(self,other,copy_directive='other_to_self',typename='',funcidx=4): # if not typename in self.D or not typename in other.D: # return # stdf=self.D[typename] # otdf=other.D[typename] # hashables=_GromacsTopologyHashables_[typename] # headers=_GromacsTopologyDirectiveHeaders_[typename].copy() # logger.debug(f'{hashables} {headers}') # for i in hashables: # headers.remove(i) # common=[] # fi=headers.index('func') # copy_idx_pairs=[] # for idx,r in otdf.iterrows(): # typidx=typeorder(tuple([r[i] for i in hashables])) # idata=[r[i] for i in headers] # for jdx,q in stdf.iterrows(): # typjdx=typeorder(tuple([q[i] for i in hashables])) # jdata=[q[i] for i in headers] # if typidx==typjdx and idata==jdata: # common.append(typidx) # for jdx,q in stdf.iterrows(): # typjdx=typeorder(tuple([q[i] for i in hashables])) # jdata=[q[i] for i in headers] # if typidx==typjdx and idata!=jdata and not typidx in common and idata[fi]==funcidx and jdata[fi]==funcidx: # logger.debug(f'duplicate {typename} {typidx}') # logger.debug(f'o {idx} {idata}') # logger.debug(f's {jdx} {jdata}') # if not (idx,jdx) in copy_idx_pairs: # copy_idx_pairs.append((idx,jdx)) # if copy_directive=='other_to_self' and len(copy_idx_pairs)>0: # for o,s in copy_idx_pairs: # self.D[typename].iloc[s]=other.D[typename].iloc[o] # elif copy_directive=='self_to_other' and len(copy_idx_pairs)>0: # for o,s in copy_idx_pairs: # other.D[typename].iloc[o]=self.D[typename].iloc[s] # # logger.debug(f'{typidx}') # # pass
[docs] def report_type(self, typidx_q, typename='', funcidx=-1): if not typename in self.D: return stdf = self.D[typename] hashables = _GromacsTopologyHashables_[typename] headers = _GromacsTopologyDirectiveHeaders_[typename].copy() for i in hashables: headers.remove(i) fi = headers.index('func') for idx, r in stdf.iterrows(): typidx=typeorder(tuple([r[i] for i in hashables])) idata=[r[i] for i in headers] if typidx==typidx_q and idata[fi]==funcidx: return [r[i] for i in headers] return []
''' 'bonds': ['ai', 'aj', 'funct', 'c0', 'c1'], 'bondtypes': [ 'i', 'j', 'func', 'b0', 'kb'], 'angles': ['ai', 'aj', 'ak', 'funct', 'c0', 'c1'], 'angletypes': [ 'i', 'j', 'k', 'func', 'th0', 'cth', 'rub', 'kub'], 'dihedrals': ['ai', 'aj', 'ak', 'al', 'funct', 'c0', 'c1', 'c2', 'c3', 'c4', 'c5'], 'dihedraltypes':[ 'i', 'j', 'k', 'l', 'func', 'phase', 'kd', 'pn'], '''
[docs] def reset_override_from_type(self, interactionname, typename, inst_idx): if not typename in self.D: return typ_hashables = _GromacsTopologyHashables_[typename] stdf = self.D[typename].set_index(typ_hashables) sidf = self.D[interactionname] typ_headers = _GromacsTopologyDirectiveHeaders_[typename].copy() ins_hashables = _GromacsTopologyHashables_[interactionname] ins_headers = _GromacsTopologyDirectiveHeaders_[interactionname].copy() typidx = [self.D['atoms'].iloc[x - 1]['type'] for x in inst_idx] typidx = typeorder(tuple(typidx)) for i in typ_hashables: typ_headers.remove(i) for i in ins_hashables: ins_headers.remove(i) iidx = idxorder(tuple(inst_idx)) idx = -1 for i, r in sidf.iterrows(): jdx = tuple([r[a] for a in ins_hashables]) if iidx == jdx: idx = i break assert idx != -1 num_data = min([len(typ_headers), len(ins_headers)]) typ_headers = typ_headers[:num_data] ins_headers = ins_headers[:num_data] typrec = stdf.loc[typidx][typ_headers] cols = self.D[interactionname].columns.get_indexer(ins_headers) logger.debug(f'Resetting override in {interactionname} for {inst_idx} from') for ln in self.D[interactionname].iloc[idx,cols].to_string().split('\n'): logger.debug(ln) logger.debug('to') self.D[interactionname].iloc[idx,cols] = typrec for ln in self.D[interactionname].iloc[idx,cols].to_string().split('\n'): logger.debug(ln)
[docs] def reset_type(self, typename, typidx_t, values): if not typename in self.D: return stdf = self.D[typename] hashables = _GromacsTopologyHashables_[typename] headers = _GromacsTopologyDirectiveHeaders_[typename].copy() for i in hashables: headers.remove(i) cols = self.D[typename].columns.get_indexer(headers) idxs = [] for idx, r in stdf.iterrows(): typidx = typeorder(tuple([r[i] for i in hashables])) if typidx == typidx_t: idxs.append(idx) assert len(headers) == len(values) logger.debug(f'Resetting {len(idxs)} entries {headers} to {values}') for idx in idxs: self.D[typename].iloc[idx,cols] = values for ln in self.D[typename].iloc[idx][headers].to_string().split('\n'): logger.debug(ln)
[docs] def report_duplicate_types(self,other,typename='',funcidx=4): if not typename in self.D or not typename in other.D: return stdf = self.D[typename] otdf = other.D[typename] hashables = _GromacsTopologyHashables_[typename] headers = _GromacsTopologyDirectiveHeaders_[typename].copy() # logger.debug(f'{hashables} {headers}') for i in hashables: headers.remove(i) common = [] fi = headers.index('func') true_duplicate_types = [] for idx, r in otdf.iterrows(): typidx = typeorder(tuple([r[i] for i in hashables])) idata = [r[i] for i in headers] for jdx, q in stdf.iterrows(): typjdx = typeorder(tuple([q[i] for i in hashables])) jdata = [q[i] for i in headers] if typidx == typjdx and idata == jdata: common.append(typidx) for idx, r in otdf.iterrows(): typidx = typeorder(tuple([r[i] for i in hashables])) idata = [r[i] for i in headers] for jdx, q in stdf.iterrows(): typjdx = typeorder(tuple([q[i] for i in hashables])) jdata = [q[i] for i in headers] if typidx == typjdx and idata != jdata and not typidx in common and idata[fi] == funcidx and jdata[fi] == funcidx: # logger.debug(f'duplicate {typename} {typidx}') true_duplicate_types.append(typidx) return true_duplicate_types
[docs] def dup_check(self, die=True): """Checks for duplicate type-like topology records. Args: die (bool): if True, raise an exception when a duplicate is found, defaults to True Raises: Exception: if a duplicate type with different parameters is detected and die is True """ L = ['atomtypes', 'bondtypes', 'angletypes'] Not = ' not' if not die else '' logger.debug(f'Checking for duplicate {L}; will{Not} die if found.') for t in L: i = _GromacsTopologyHashables_[t] ''' checking for types with duplicate atom-type indices ''' dups = self.D[t].duplicated(subset=i, keep=False) if any(dups): logger.error(f'Duplicate {t} with different parameters detected\n' + self.D[t][dups].to_string()) if die: raise Exception('duplicate topology types with different parameters detected')
[docs] def merge_types(self, other): """Merges type-like topology dataframes from other to self. Args: other (Topology): topology containing attribute D, a dictionary of dataframes """ # self.handle_duplicate_types(other,typename='dihedraltypes',funcidx=4,drop_directive='drop_from_self') L = ['atomtypes', 'bondtypes', 'angletypes', 'dihedraltypes'] for t in L: self._myconcat(other, directive=t, drop_duplicates=True)
[docs] def merge_ex(self, other): logger.debug(f' extensive merging...') ''' merge EXTENSIVE quantities ''' idxshift = 0 if 'atoms' not in self.D else len(self.D['atoms']) rdxshift = 0 if 'atoms' not in self.D else self.D['atoms'].iloc[-1]['resnr'] if 'atoms' in other.D: other.D['atoms']['resnr'] += rdxshift self._myconcat(other, directive='atoms', idxlabel=['nr'], idxshift=idxshift) self._myconcat(other, directive='bonds', idxlabel=['ai','aj'], idxshift=idxshift) if 'bonds' in other.D: self.bondlist.update(other.D['bonds']) if 'mol2_bonds' in self.D and 'mol2_bonds' in other.D: other.D['mol2_bonds'].bondIdx += self.D['mol2_bonds'].shape[0] self._myconcat(other, directive='mol2_bonds', idxlabel=['ai','aj'], idxshift=idxshift) self._myconcat(other, directive='pairs', idxlabel=['ai','aj'], idxshift=idxshift) self._myconcat(other, directive='angles', idxlabel=['ai','aj','ak'], idxshift=idxshift) self._myconcat(other, directive='dihedrals', idxlabel=['ai','aj','ak','al'], idxshift=idxshift) logger.debug(f'merging {len(other.rings)} rings into base list of {len(self.rings)} with idxshift {idxshift}') other.rings.shift(idxshift) self.rings.extend(other.rings)
[docs] def get_atom_attribute(self, idx, attribute): """Returns value of attribute of atom idx. Args: idx (int): global atom index attribute (str): atom attribute name Returns: atom attribute value """ return self.D['atoms'].loc[self.D['atoms']['nr'] == idx, attribute].values[0]
[docs] def get_atomtype(self, idx): """Gets atom type of atom with global index idx. Args: idx (int): atom global index Returns: str: atom type """ return self.D['atoms'].loc[self.D['atoms']['nr'] == idx, 'type'].values[0]
[docs] def build_interresidue_graph(self, G, ri): if ri in G: return adf = self.D['atoms'] ri_at_set = set(adf.loc[adf['resnr'] == ri, 'nr']) # Collect all bonded neighbours of ri's atoms that belong to other residues. neighbour_atoms = set() for a in ri_at_set: neighbour_atoms.update(self.bondlist.partners_of(a)) neighbour_atoms -= ri_at_set ri_partners = adf.loc[adf['nr'].isin(neighbour_atoms), 'resnr'].unique().tolist() G.add_node(ri) for rj in ri_partners: self.build_interresidue_graph(G, rj)
[docs] def local_resid_cluster(self,ri): G = nx.DiGraph() # logger.debug(str(self.bondlist)) self.build_interresidue_graph(G, ri) return [x for x in G]
[docs] def make_resid_graph(self, json_file=None): adf = self.D['atoms'] N = adf.shape[0] self.residue_network = nx.DiGraph() residues = adf['residue'].unique() for rn in residues: rs = adf[adf['residue'] == rn]['resnr'].unique() self.residue_network.add_nodes_from(rs, resName=rn) resnrs = adf['resnr'].unique() for i in resnrs: # atom global indexes in this resnr ats = adf[adf['resnr'] == i]['nr'].to_list() connectors = [] for a in ats: an = self.bondlist.partners_of(a) natsrn = adf[adf['nr'].isin(an)]['resnr'].unique() if len(natsrn) > 1: for n in natsrn: if n != i: # this is an inter-residue connection connectors.append((a, n)) for c in connectors: a, n = c bondtype = "cross" if not self.residue_network.has_edge(i, n): self.residue_network.add_edge(i, n, bondtype=bondtype) if json_file: the_data = json_graph.node_link_data(self.residue_network) logger.debug(f'writing graph node_link_data to {json_file}') with open(json_file, 'w') as f: json.dump(the_data, f, default=str)
[docs] def copy_bond_parameters(self,bonds): """Generates and returns a copy of a bonds dataframe that contains all bonds listed in bonds. Args: bonds (pandas.DataFrame): dataframe of bonds managed by runtime, 'ai','aj','reactantName' Returns: pandas.DataFrame: bonds dataframe extracted from system with all parameters """ bdf = self.D['bonds'] assert not any(bdf['c0'].isna()) pairs = [idxorder((b.ai, b.aj)) for b in bonds.itertuples()] idx = pd.MultiIndex.from_tuples(pairs, names=['ai', 'aj']) rows = [] for ai, aj in pairs: rows.append(bdf[(bdf['ai'] == ai) & (bdf['aj'] == aj)].copy()) saveme = pd.concat(rows, ignore_index=True) if rows else pd.DataFrame(columns=bdf.columns) for c in bdf.columns: if c not in saveme: saveme[c] = _PAD_ return saveme
[docs] def attenuate_bond_parameters(self, bondsdf, stage, max_stages, minimum_distance=0.0, init_colname='initial_distance'): """Alters the kb and b0 parameters for new crosslink bonds according to the values prior to relaxation, their equilibrium values, and the ratio stage/max_stages. Let stage/max_stages be x, and 1/max_stages <= x <= 1. The spring constant for each bond is multiplied by x and the distance is 1/x of the way from its maximum value to its equilibrium value. Args: bondsdf (pandas.DataFrame): dataframe of bonds managed by runtime, 'ai','aj','reactantName' stage (int): index of stage in the series of post-bond-formation relaxation max_stages (int): total number of relaxation stages for this iteration minimum_distance (float): minimum bond length allowed, overriding type-specific b0 if greater than 0 """ bdf = self.D['bonds'] factor = (stage + 1) / max_stages logger.debug(f'Attenuating {bondsdf.shape[0]} bond{"s" if bondsdf.shape[0] > 1 else ""} in stage {stage + 1}/{max_stages}') jdx = list(bondsdf.columns).index(init_colname) for b in bondsdf.itertuples(index=False): ai, aj = idxorder((b.ai, b.aj)) rij = b[jdx] b0, kb = self.get_bond_parameters(ai, aj) if minimum_distance > 0.0: b0 = minimum_distance new_b0 = rij - factor * (rij - b0) new_kb = kb * factor # logger.debug(f'bond attenuation target for {ai}-{aj}:\nb0 {b0:.5f} kb {kb:.2f}; using b0 {new_b0:.5f} kb {new_kb:.2f}') bdf.loc[(bdf['ai'] == ai) & (bdf['aj'] == aj), 'c0'] = new_b0 bdf.loc[(bdf['ai'] == ai) & (bdf['aj'] == aj), 'c1'] = new_kb
[docs] def get_bond_parameters(self, ai, aj): """Gets b0 and kb for bond between atoms with global indexes ai and aj. Args: ai (int): global atom index aj (int): global atom index Returns: tuple: b0, kb — equilibrium bond length and spring constant """ ai, aj = idxorder((ai, aj)) adf = self.D['atoms'] bdf = self.D['bonds'] tdf = self.D['bondtypes'] b0 = bdf.loc[(bdf['ai'] == ai) & (bdf['aj'] == aj), 'c0'].values[0] kb = bdf.loc[(bdf['ai'] == ai) & (bdf['aj'] == aj), 'c1'].values[0] if pd.isna(b0) or pd.isna(kb): ''' no overrides for this bond, so take from types ''' it = adf.loc[adf['nr'] == ai, 'type'].values[0] jt = adf.loc[adf['nr'] == aj, 'type'].values[0] it, jt = typeorder((it, jt)) b0 = tdf.loc[(tdf['i'] == it) & (tdf['j'] == jt), 'b0'].values[0] kb = tdf.loc[(tdf['i'] == it) & (tdf['j'] == jt), 'kb'].values[0] return b0, kb
[docs] def restore_bond_parameters(self, df): """Copies data from all bonds in dataframe df to global dataframe. Args: df (pandas.DataFrame): dataframe of bonds ['ai','aj','c0','c1'] """ bdf = self.D['bonds'] for r in df.itertuples(): ai, aj = r.ai, r.aj c0, c1 = r.c0, r.c1 bdf.loc[(bdf['ai'] == ai) & (bdf['aj'] == aj), 'c0'] = c0 bdf.loc[(bdf['ai'] == ai) & (bdf['aj'] == aj), 'c1'] = c1
[docs] def attenuate_pair_parameters(self, pairsdf, stage, max_stages, draglimit_nm=0.3): """Alters the kb and b0 parameters for new pre-crosslink pairs according to the values prior to dragging, the desired lower limit of interatomic distance, and the ratio stage/max_stages. Args: pairsdf (pandas.DataFrame): pairs dataframe (['ai'],['aj'],['initial_distance']) stage (int): index of stage in the series of pre-bond-formation dragging max_stages (int): total number of drag stages for this iteration draglimit_nm (float): lower limit of interatomic distance requested from drag """ pdf = self.D['pairs'] ess = 's' if pairsdf.shape[0] > 1 else '' factor = (stage + 1) / max_stages logger.debug(f'Attenuating {pairsdf.shape[0]} pair{ess} in stage {stage+1}/{max_stages}') for b in pairsdf.itertuples(): ai, aj = idxorder((b.ai, b.aj)) b0 = b.initial_distance kb = pdf.loc[(pdf['ai'] == ai) & (pdf['aj'] == aj), 'c1'].values[0] pdf.loc[(pdf['ai'] == ai) & (pdf['aj'] == aj), 'c0'] = draglimit_nm - factor * (b0 - draglimit_nm) pdf.loc[(pdf['ai'] == ai) & (pdf['aj'] == aj), 'c1'] = kb * factor