-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathOpenMM-MD.py
executable file
·1757 lines (1596 loc) · 90.4 KB
/
OpenMM-MD.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
"""
@package run
Run a MD simulation in OpenMM. NumPy is required.
Copyright And License
@author Lee-Ping Wang <[email protected]>
All code in this file is released under the GNU General Public License.
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation, either version 3 of the License, or (at your option) any later
version.
This program is distributed in the hope that it will be useful, but without any
warranty; without even the implied warranty of merchantability or fitness for a
particular purpose. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program. If not, see <http://www.gnu.org/licenses/>.
"""
#==================#
#| Global Imports |#
#==================#
from __future__ import print_function, division
import time
from datetime import datetime, timedelta
t0 = time.time()
import argparse
from xml.etree import ElementTree as ET
import os
import sys
import pickle
import shutil
import numpy as np
from re import sub
from collections import namedtuple, defaultdict, OrderedDict
try:
from openmm.unit import *
from openmm import *
from openmm.app import *
except ImportError:
from simtk.unit import *
from simtk.openmm import *
from simtk.openmm.app import *
import warnings
# Suppress warnings from PDB reading.
warnings.simplefilter("ignore")
import logging
logging.basicConfig()
sys.setrecursionlimit(10000)
#================================#
# Set up the logger #
#================================#
logger = logging.getLogger(__name__)
logger.setLevel('INFO')
sh = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter(fmt='%(asctime)s - %(message)s', datefmt="%H:%M:%S")
sh.setFormatter(formatter)
logger.addHandler(sh)
logger.propagate = False
#================================#
# Subroutines #
#================================#
def GetTime(sec):
sec = timedelta(seconds=sec)
d = datetime(1,1,1) + sec
if d.year > 1:
return("%dY%02dM%02dd%02dh%02dm%02ds" % (d.year-1, d.month-1, d.day-1, d.hour, d.minute, d.second))
elif d.month > 1:
return("%dM%02dd%02dh%02dm%02ds" % (d.month-1, d.day-1, d.hour, d.minute, d.second))
elif d.day > 1:
return("%dd%02dh%02dm%02ds" % (d.day-1, d.hour, d.minute, d.second))
elif d.hour > 0:
return("%dh%02dm%02ds" % (d.hour, d.minute, d.second))
elif d.minute > 0:
return("%dm%02ds" % (d.minute, d.second))
elif d.second > 0:
return("%ds" % (d.second))
def statisticalInefficiency(A_n, B_n=None, fast=False, mintime=3):
"""
Compute the (cross) statistical inefficiency of (two) timeseries.
Notes
The same timeseries can be used for both A_n and B_n to get the autocorrelation statistical inefficiency.
The fast method described in Ref [1] is used to compute g.
References
[1] J. D. Chodera, W. C. Swope, J. W. Pitera, C. Seok, and K. A. Dill. Use of the weighted
histogram analysis method for the analysis of simulated and parallel tempering simulations.
JCTC 3(1):26-41, 2007.
Examples
Compute statistical inefficiency of timeseries data with known correlation time.
>>> import timeseries
>>> A_n = timeseries.generateCorrelatedTimeseries(N=100000, tau=5.0)
>>> g = statisticalInefficiency(A_n, fast=True)
@param[in] A_n (required, numpy array) - A_n[n] is nth value of
timeseries A. Length is deduced from vector.
@param[in] B_n (optional, numpy array) - B_n[n] is nth value of
timeseries B. Length is deduced from vector. If supplied, the
cross-correlation of timeseries A and B will be estimated instead of
the autocorrelation of timeseries A.
@param[in] fast (optional, boolean) - if True, will use faster (but
less accurate) method to estimate correlation time, described in
Ref. [1] (default: False)
@param[in] mintime (optional, int) - minimum amount of correlation
function to compute (default: 3) The algorithm terminates after
computing the correlation time out to mintime when the correlation
function furst goes negative. Note that this time may need to be
increased if there is a strong initial negative peak in the
correlation function.
@return g The estimated statistical inefficiency (equal to 1 + 2
tau, where tau is the correlation time). We enforce g >= 1.0.
"""
# Create numpy copies of input arguments.
A_n = np.array(A_n)
if B_n is not None:
B_n = np.array(B_n)
else:
B_n = np.array(A_n)
# Get the length of the timeseries.
N = A_n.size
# Be sure A_n and B_n have the same dimensions.
if(A_n.shape != B_n.shape):
raise ParameterError('A_n and B_n must have same dimensions.')
# Initialize statistical inefficiency estimate with uncorrelated value.
g = 1.0
# Compute mean of each timeseries.
mu_A = A_n.mean()
mu_B = B_n.mean()
# Make temporary copies of fluctuation from mean.
dA_n = A_n.astype(np.float64) - mu_A
dB_n = B_n.astype(np.float64) - mu_B
# Compute estimator of covariance of (A,B) using estimator that will ensure C(0) = 1.
sigma2_AB = (dA_n * dB_n).mean() # standard estimator to ensure C(0) = 1
# Trap the case where this covariance is zero, and we cannot proceed.
if(sigma2_AB == 0):
logger.info('Sample covariance sigma_AB^2 = 0 -- cannot compute statistical inefficiency')
return 1.0
# Accumulate the integrated correlation time by computing the normalized correlation time at
# increasing values of t. Stop accumulating if the correlation function goes negative, since
# this is unlikely to occur unless the correlation function has decayed to the point where it
# is dominated by noise and indistinguishable from zero.
t = 1
increment = 1
while (t < N-1):
# compute normalized fluctuation correlation function at time t
C = sum( dA_n[0:(N-t)]*dB_n[t:N] + dB_n[0:(N-t)]*dA_n[t:N] ) / (2.0 * float(N-t) * sigma2_AB)
# Terminate if the correlation function has crossed zero and we've computed the correlation
# function at least out to 'mintime'.
if (C <= 0.0) and (t > mintime):
break
# Accumulate contribution to the statistical inefficiency.
g += 2.0 * C * (1.0 - float(t)/float(N)) * float(increment)
# Increment t and the amount by which we increment t.
t += increment
# Increase the interval if "fast mode" is on.
if fast: increment += 1
# g must be at least unity
if (g < 1.0): g = 1.0
# Return the computed statistical inefficiency.
return g
def compute_volume(box_vectors):
""" Compute the total volume of an OpenMM system. """
[a,b,c] = box_vectors
A = np.array([a/a.unit, b/a.unit, c/a.unit])
# Compute volume of parallelepiped.
volume = np.linalg.det(A) * a.unit**3
return volume
def compute_mass(system):
""" Compute the total mass of an OpenMM system. """
mass = 0.0 * amu
for i in range(system.getNumParticles()):
mass += system.getParticleMass(i)
return mass
def compute_total_charge(system):
""" Compute the total charge of an OpenMM system with the NonbondedForce or AmoebaMultipoleForce """
total_charge = 0.0 * elementary_charge
for f in system.getForces():
if f.__class__.__name__ == 'AmoebaMultipoleForce':
for i in range(system.getNumParticles()):
total_charge += f.getMultipoleParameters(i)[0]
if f.__class__.__name__ == 'NonbondedForce':
for i in range(system.getNumParticles()):
total_charge += f.getParticleParameters(i)[0]
return total_charge
def balance_charge_atoms(system, balance_charge_atoms):
amf = None
for f in system.getForces():
if f.__class__.__name__ == 'AmoebaMultipoleForce':
amf = f
if amf is None:
raise RuntimeError("AmoebaMultipoleForce is not found in the system")
noa = system.getNumParticles()
# get charges
atom_MultipoleParameters = [amf.getMultipoleParameters(i) for i in range(noa)]
total_charge = sum([atom_MultipoleParameters[i][0] for i in range(noa)])
logger.info("Original total charge in the system is %s"%total_charge)
atom_indices = uncommadash(balance_charge_atoms)
average_shift = -total_charge / len(atom_indices)
for i in atom_indices:
atom_MultipoleParameters[i][0] += average_shift
amf.setMultipoleParameters(i, *atom_MultipoleParameters[i])
logger.info("Charge of each atom in %s is shifted by %s to make the total charge 0." % (balance_charge_atoms, average_shift))
def printcool(text,sym="#",bold=False,color=2,ansi=None,bottom='-',minwidth=50):
"""Cool-looking printout for slick formatting of output.
@param[in] text The string that the printout is based upon. This function
will print out the string, ANSI-colored and enclosed in the symbol
for example:\n
<tt> ################# </tt>\n
<tt> ### I am cool ### </tt>\n
<tt> ################# </tt>
@param[in] sym The surrounding symbol\n
@param[in] bold Whether to use bold print
@param[in] color The ANSI color:\n
1 red\n
2 green\n
3 yellow\n
4 blue\n
5 magenta\n
6 cyan\n
7 white
@param[in] bottom The symbol for the bottom bar
@param[in] minwidth The minimum width for the box, if the text is very short
then we insert the appropriate number of padding spaces
@return bar The bottom bar is returned for the user to print later, e.g. to mark off a 'section'
"""
if logger.getEffectiveLevel() < 20: return
def newlen(l):
return len(sub("\x1b\[[0-9;]*m","",l))
text = text.split('\n')
width = max(minwidth,max([newlen(line) for line in text]))
bar = ''.join([sym for i in range(width + 8)])
print('\n'+bar)
for line in text:
padleft = ' ' * int((width - newlen(line)) / 2)
padright = ' '* (width - newlen(line) - len(padleft))
if ansi != None:
ansi = str(ansi)
print("%s| \x1b[%sm%s" % (sym, ansi, padleft),line,"%s\x1b[0m |%s" % (padright, sym))
elif color != None:
print("%s| \x1b[%s9%im%s" % (sym, bold and "1;" or "", color, padleft),line,"%s\x1b[0m |%s" % (padright, sym))
else:
warn_press_key("Inappropriate use of printcool")
print(bar)
return sub(sym,bottom,bar)
def printcool_dictionary(Dict,title="General options",bold=False,color=2,keywidth=25,topwidth=50):
"""See documentation for printcool; this is a nice way to print out keys/values in a dictionary.
The keys in the dictionary are sorted before printing out.
@param[in] dict The dictionary to be printed
@param[in] title The title of the printout
"""
if logger.getEffectiveLevel() < 20: return
if Dict == None: return
bar = printcool(title,bold=bold,color=color,minwidth=topwidth)
def magic_string(str):
# This cryptic command returns a string with the number of characters specified as a variable. :P
# Useful for printing nice-looking dictionaries, i guess.
#print "\'%%-%is\' %% '%s'" % (keywidth,str.replace("'","\\'").replace('"','\\"'))
return eval("\'%%-%is\' %% '%s'" % (keywidth,str.replace("'","\\'").replace('"','\\"')))
if isinstance(Dict, OrderedDict):
print('\n'.join(["%s %s " % (magic_string(str(key)),str(Dict[key])) for key in Dict if Dict[key] != None]))
else:
print('\n'.join(["%s %s " % (magic_string(str(key)),str(Dict[key])) for key in sorted([i for i in Dict]) if Dict[key] != None]))
print(bar)
def EnergyDecomposition(Sim, verbose=False):
# Before using EnergyDecomposition, make sure each Force is set to a different group.
EnergyTerms = OrderedDict()
Potential = Sim.context.getState(getEnergy=True).getPotentialEnergy() / kilojoules_per_mole
Kinetic = Sim.context.getState(getEnergy=True).getKineticEnergy() / kilojoules_per_mole
for i in range(Sim.system.getNumForces()):
EnergyTerms[Sim.system.getForce(i).__class__.__name__] = Sim.context.getState(getEnergy=True,groups=2**i).getPotentialEnergy() / kilojoules_per_mole
EnergyTerms['Potential'] = Potential
EnergyTerms['Kinetic'] = Kinetic
EnergyTerms['Total'] = Potential+Kinetic
return EnergyTerms
def MTSVVVRIntegrator(temperature, collision_rate, timestep, system, ninnersteps=4):
"""
Create a multiple timestep velocity verlet with velocity randomization (VVVR) integrator.
ARGUMENTS
temperature (numpy.unit.Quantity compatible with kelvin) - the temperature
collision_rate (numpy.unit.Quantity compatible with 1/picoseconds) - the collision rate
timestep (numpy.unit.Quantity compatible with femtoseconds) - the integration timestep
system (simtk.openmm.System) - system whose forces will be partitioned
ninnersteps (int) - number of inner timesteps (default: 4)
RETURNS
integrator (openmm.CustomIntegrator) - a VVVR integrator
NOTES
This integrator is equivalent to a Langevin integrator in the velocity Verlet discretization with a
timestep correction to ensure that the field-free diffusion constant is timestep invariant. The inner
velocity Verlet discretization is transformed into a multiple timestep algorithm.
REFERENCES
VVVR Langevin integrator:
* http://arxiv.org/abs/1301.3800
* http://arxiv.org/abs/1107.2967 (to appear in PRX 2013)
TODO
Move initialization of 'sigma' to setting the per-particle variables.
"""
# Multiple timestep Langevin integrator.
for i in system.getForces():
if i.__class__.__name__ in ["NonbondedForce", "CustomNonbondedForce", "AmoebaVdwForce", "AmoebaMultipoleForce"]:
# Slow force.
print(i.__class__.__name__, "is a Slow Force")
i.setForceGroup(1)
else:
print(i.__class__.__name__, "is a Fast Force")
# Fast force.
i.setForceGroup(0)
kB = BOLTZMANN_CONSTANT_kB * AVOGADRO_CONSTANT_NA
kT = kB * temperature
integrator = openmm.CustomIntegrator(timestep)
integrator.addGlobalVariable("dt_fast", timestep/float(ninnersteps)) # fast inner timestep
integrator.addGlobalVariable("kT", kT) # thermal energy
integrator.addGlobalVariable("a", numpy.exp(-collision_rate*timestep)) # velocity mixing parameter
integrator.addGlobalVariable("b", numpy.sqrt((2/(collision_rate*timestep)) * numpy.tanh(collision_rate*timestep/2))) # timestep correction parameter
integrator.addPerDofVariable("sigma", 0)
integrator.addPerDofVariable("x1", 0) # position before application of constraints
#
# Pre-computation.
# This only needs to be done once, but it needs to be done for each degree of freedom.
# Could move this to initialization?
#
integrator.addComputePerDof("sigma", "sqrt(kT/m)")
#
# Velocity perturbation.
#
integrator.addComputePerDof("v", "sqrt(a)*v + sqrt(1-a)*sigma*gaussian")
integrator.addConstrainVelocities();
#
# Symplectic inner multiple timestep.
#
integrator.addUpdateContextState();
integrator.addComputePerDof("v", "v + 0.5*b*dt*f1/m")
for innerstep in range(ninnersteps):
# Fast inner symplectic timestep.
integrator.addComputePerDof("v", "v + 0.5*b*dt_fast*f0/m")
integrator.addComputePerDof("x", "x + v*b*dt_fast")
integrator.addComputePerDof("x1", "x")
integrator.addConstrainPositions();
integrator.addComputePerDof("v", "v + 0.5*b*dt_fast*f0/m + (x-x1)/dt_fast")
integrator.addComputePerDof("v", "v + 0.5*b*dt*f1/m") # TODO: Additional velocity constraint correction?
integrator.addConstrainVelocities();
#
# Velocity randomization
#
integrator.addComputePerDof("v", "sqrt(a)*v + sqrt(1-a)*sigma*gaussian")
integrator.addConstrainVelocities();
return integrator
def MTSLangevinIntegrator(temperature, collision_rate, timestep, system, ninnersteps=4):
"""
New in OpenMM 7.5 and up.
MTSLangevinIntegrator implements the BAOAB-RESPA multiple time step algorithm for constant temperature dynamics.
ARGUMENTS
temperature (numpy.unit.Quantity compatible with kelvin) - the temperature
collision_rate (numpy.unit.Quantity compatible with 1/picoseconds) - the collision rate
timestep (numpy.unit.Quantity compatible with femtoseconds) - the integration timestep
system (simtk.openmm.System) - system whose forces will be partitioned
ninnersteps (int) - number of inner timesteps (default: 4)
RETURNS
integrator (openmm.MTSLangevinIntegrator) - a multiple timestep Langevin integrator
"""
# Multiple timestep Langevin integrator.
for i in system.getForces():
if i.__class__.__name__ in ["NonbondedForce", "CustomNonbondedForce", "AmoebaVdwForce", "AmoebaMultipoleForce"]:
# Slow force.
print(i.__class__.__name__, "is a Slow Force")
i.setForceGroup(1)
else:
print(i.__class__.__name__, "is a Fast Force")
# Fast force.
i.setForceGroup(0)
import openmm
integrator = openmm.MTSLangevinIntegrator(temperature, collision_rate, timestep, [(0,ninnersteps), (1,1)])
return integrator
def MTSIntegrator(timestep, system, ninnersteps=4):
"""
MTSIntegrator implements the rRESPA multiple time step integration algorithm.
ARGUMENTS
timestep (numpy.unit.Quantity compatible with femtoseconds) - the integration timestep
system (simtk.openmm.System) - system whose forces will be partitioned
ninnersteps (int) - number of inner timesteps (default: 4)
RETURNS
integrator (openmm.MTSIntegrator) - a multiple timestep integrator
"""
# Multiple timestep Langevin integrator.
for i in system.getForces():
if i.__class__.__name__ in ["NonbondedForce", "CustomNonbondedForce", "AmoebaVdwForce", "AmoebaMultipoleForce"]:
# Slow force.
print(i.__class__.__name__, "is a Slow Force")
i.setForceGroup(1)
else:
print(i.__class__.__name__, "is a Fast Force")
# Fast force.
i.setForceGroup(0)
import openmm
integrator = openmm.MTSIntegrator(timestep, [(0,ninnersteps), (1,1)])
return integrator
def bak(fnm):
oldfnm = fnm
if os.path.exists(oldfnm):
base, ext = os.path.splitext(fnm)
i = 0
while os.path.exists(fnm):
fnm = "%s_%i%s" % (base,i,ext)
i += 1
logger.info("Backing up %s -> %s" % (oldfnm, fnm))
shutil.move(oldfnm,fnm)
def uncommadash(s):
# Takes a string like '27-31,88-91,100,136-139'
# and turns it into a list like [26, 27, 28, 29, 30, 87, 88, 89, 90, 99, 135, 136, 137, 138]
L = []
try:
for w in s.split(','):
ws = w.split('-')
a = int(ws[0])-1
if len(ws) == 1:
b = int(ws[0])
elif len(ws) == 2:
b = int(ws[1])
else:
logger.warning("Dash-separated list cannot exceed length 2\n")
raise
if a < 0 or b <= 0 or b <= a:
if a < 0 or b <= 0:
logger.warning("Items in list cannot be zero or negative: %d %d\n" % (a, b))
else:
logger.warning("Second number cannot be smaller than first: %d %d\n" % (a, b))
raise
newL = range(a,b)
if any([i in L for i in newL]):
logger.warning("Duplicate entries found in list\n")
raise
L += newL
if sorted(L) != L:
logger.warning("List is out of order\n")
raise
except:
logger.error('Invalid string for converting to list of numbers: %s\n' % s)
raise RuntimeError
return L
def string_to_type(val, typ):
assert type(val) == str
v = val.lower()
if v == "none":
return None
if typ == str:
return val
elif typ == bool:
if v == "false":
return False
elif v == "true":
return True
else:
try:
return bool(float(v))
except:
raise TypeError("%s can not be converted to bool" % val)
elif typ == int:
try:
return int(val)
except:
raise TypeError("%s can not be converted to int" % val)
elif typ == float:
try:
return float(val)
except:
raise TypeError("%s can not be converted to float" % val)
elif typ in (list, tuple):
assert val[0] in '[(' and val[-1] in '])', "%s to %s should be in format [,] or (,)"%(val, str(typ))
ls = val[1:-1].split(',')
# we don't know what type is desired in the list, assuming int or float
try:
return typ(map(int, ls))
except:
pass
try:
return typ(map(float, ls))
except:
return ls
else:
raise NotImplementedError("Converting %s to type %s not implemented yet." % (val, str(typ)))
#================================#
# The input file parser #
#================================#
class SimulationOptions(object):
""" Class for parsing the input file. """
def set_active(self,key,default,typ,doc,allowed=None,depend=True,clash=False,msg=None):
""" Set one option. The arguments are:
key : The name of the option.
default : The default value.
typ : The type of the value.
doc : The documentation string.
allowed : An optional list of allowed values.
depend : A condition that must be True for the option to be activated.
clash : A condition that must be False for the option to be activated.
msg : A warning that is printed out if the option is not activated.
"""
doc = sub("\.$","",doc.strip())+"."
self.Documentation[key] = "%-8s " % ("(" + sub("'>","",sub("<type '","",str(typ)))+")") + doc
if key in self.UserOptions:
val = self.UserOptions[key]
val = string_to_type(val, typ)
else:
val = default
if val != None:
assert type(val) == typ, "Default value %s is not in desired type %s!" % (default, str(typ))
if type(allowed) is list:
self.Documentation[key] += " Allowed values are %s" % str(allowed)
if val not in allowed:
print(list(val))
raise Exception("Tried to set option \x1b[1;91m%s\x1b[0m to \x1b[94m%s\x1b[0m but it's not allowed (choose from \x1b[92m%s\x1b[0m)" % (key, str(val), str(allowed)))
# Try my own function to convert string to deired type
# val = string_to_type(val, typ)
if val != None and type(val) is not typ:
raise Exception("Tried to set option \x1b[1;91m%s\x1b[0m to \x1b[94m%s\x1b[0m but it's not the right type (%s required)" % (key, str(val), str(typ)))
if depend and not clash:
if key in self.InactiveOptions:
del self.InactiveOptions[key]
self.ActiveOptions[key] = val
else:
if key in self.ActiveOptions:
del self.ActiveOptions[key]
self.InactiveOptions[key] = val
self.InactiveWarnings[key] = msg
def force_active(self,key,val=None,msg=None):
""" Force an option to be active and set it to the provided value,
regardless of the user input. There are no safeguards, so use carefully.
key : The name of the option.
val : The value that the option is being set to.
msg : A warning that is printed out if the option is not activated.
"""
if msg == None:
msg == "Option forced to active for no given reason."
if key not in self.ActiveOptions:
if val == None:
val = self.InactiveOptions[key]
del self.InactiveOptions[key]
self.ActiveOptions[key] = val
self.ForcedOptions[key] = val
self.ForcedWarnings[key] = msg
elif val != None and self.ActiveOptions[key] != val:
self.ActiveOptions[key] = val
self.ForcedOptions[key] = val
self.ForcedWarnings[key] = msg
elif val == None:
self.ForcedOptions[key] = self.ActiveOptions[key]
self.ForcedWarnings[key] = msg + " (Warning: Forced active but it was already active.)"
def deactivate(self,key,msg=None):
""" Deactivate one option. The arguments are:
key : The name of the option.
msg : A warning that is printed out if the option is not activated.
"""
if key in self.ActiveOptions:
self.InactiveOptions[key] = self.ActiveOptions[key]
del self.ActiveOptions[key]
self.InactiveWarnings[key] = msg
def __getattr__(self,key):
if key in self.ActiveOptions:
return self.ActiveOptions[key]
elif key in self.InactiveOptions:
return None
else:
return getattr(super(SimulationOptions,self),key)
def record(self):
out = []
cmd = ' '.join(sys.argv)
out.append("")
out.append("Your command was: %s" % cmd)
out.append("To reproduce / customize your simulation, paste the following text into an input file")
out.append("and rerun the script with the -I argument (e.g. '-I openmm.in')")
out.append("")
out.append("#===========================================#")
out.append("#| Input file for OpenMM MD script |#")
out.append("#| Lines beginning with '#' are comments |#")
out.append("#===========================================#")
TopBar = False
UserSupplied = []
for key in self.ActiveOptions:
if key in self.UserOptions and key not in self.ForcedOptions:
UserSupplied.append("%-22s %20s # %s" % (key, str(self.ActiveOptions[key]), self.Documentation[key]))
if len(UserSupplied) > 0:
if TopBar:
out.append("#===========================================#")
else:
TopBar = True
out.append("#| User-supplied options: |#")
out.append("#===========================================#")
out += UserSupplied
Forced = []
for key in self.ActiveOptions:
if key in self.ForcedOptions:
Forced.append("%-22s %20s # %s" % (key, str(self.ActiveOptions[key]), self.Documentation[key]))
Forced.append("%-22s %20s # Reason : %s" % ("","",self.ForcedWarnings[key]))
if len(Forced) > 0:
if TopBar:
out.append("#===========================================#")
else:
TopBar = True
out.append("#| Options enforced by the script: |#")
out.append("#===========================================#")
out += Forced
ActiveDefault = []
for key in self.ActiveOptions:
if key not in self.UserOptions and key not in self.ForcedOptions:
ActiveDefault.append("%-22s %20s # %s" % (key, str(self.ActiveOptions[key]), self.Documentation[key]))
if len(ActiveDefault) > 0:
if TopBar:
out.append("#===========================================#")
else:
TopBar = True
out.append("#| Active options at default values: |#")
out.append("#===========================================#")
out += ActiveDefault
# out.append("")
out.append("#===========================================#")
out.append("#| End of Input File |#")
out.append("#===========================================#")
Deactivated = []
for key in self.InactiveOptions:
Deactivated.append("%-22s %20s # %s" % (key, str(self.InactiveOptions[key]), self.Documentation[key]))
Deactivated.append("%-22s %20s # Reason : %s" % ("","",self.InactiveWarnings[key]))
if len(Deactivated) > 0:
out.append("")
out.append("#===========================================#")
out.append("#| Deactivated or conflicting options: |#")
out.append("#===========================================#")
out += Deactivated
Unrecognized = []
for key in self.UserOptions:
if key not in self.ActiveOptions and key not in self.InactiveOptions:
Unrecognized.append("%-22s %20s" % (key, self.UserOptions[key]))
if len(Unrecognized) > 0:
# out.append("")
out.append("#===========================================#")
out.append("#| Unrecognized options: |#")
out.append("#===========================================#")
out += Unrecognized
return out
def __init__(self, input_file, pdbfnm):
super(SimulationOptions,self).__init__()
basename = os.path.splitext(pdbfnm)[0]
self.Documentation = OrderedDict()
self.UserOptions = OrderedDict()
self.ActiveOptions = OrderedDict()
self.ForcedOptions = OrderedDict()
self.ForcedWarnings = OrderedDict()
self.InactiveOptions = OrderedDict()
self.InactiveWarnings = OrderedDict()
# First build a dictionary of user supplied options.
if input_file != None:
for line in open(input_file).readlines():
line = sub('#.*$','',line.strip())
s = line.split()
if len(s) > 0:
# Options are case insensitive
key = s[0].lower()
val = line.replace(s[0],'',1).strip()
self.UserOptions[key] = val
# Now go through the logic of determining which options are activated.
self.set_active('integrator','verlet',str,"Molecular dynamics integrator",allowed=["verlet","langevin","langevinmiddle","velocity-verlet","mts","mtsvvvr","mtslangevin"])
self.set_active('minimize',False,bool,"Specify whether to minimize the energy before running dynamics.")
self.set_active('timestep',1.0,float,"Time step in femtoseconds.")
self.set_active('hydrogen_mass',1.0,float,"Hydrogen mass in amu (set to 4 to enable 4 fs time step).")
self.set_active('innerstep',0.5,float,"Inner time step in femtoseconds for MTS integrator.",depend=(self.integrator in ["mts", "mtsvvvr", "mtslangevin"]))
self.set_active('restart_filename','restart.p',str,"Restart information will be read from / written to this file (will be backed up).")
self.set_active('read_restart',True,bool,"Restart simulation from the restart file.",
depend=(os.path.exists(self.restart_filename)), msg="Cannot restart; file specified by restart_filename does not exist.")
self.set_active('restart_interval',1000,int,"Specify a timestep interval for writing the restart file.")
self.set_active('equilibrate',0,int,"Number of steps reserved for equilibration.")
self.set_active('production',1000,int,"Number of steps in production run.")
self.set_active('report_interval',100,int,"Number of steps between every progress report.")
self.set_active('temperature',0.0,float,"Simulation temperature for Langevin integrator or Andersen thermostat.")
if self.temperature <= 0.0 and self.integrator in ["langevin", "langevinmiddle", "mtsvvvr", "mtslangevin"]:
raise Exception("You need to set a finite temperature if using a Langevin-type integrator!")
self.set_active('gentemp',self.temperature,float,"Specify temperature for generating velocities")
self.set_active('collision_rate',0.1,float,"Collision frequency for Langevin integrator or Andersen thermostat in ps^-1.",
depend=(self.integrator in ["langevin", "langevinmiddle", "mtsvvvr", "mtslangevin"] or self.temperature != 0.0),
msg="We're not running a constant temperature simulation")
self.set_active('pressure',0.0,float,"Simulation pressure; set a positive number to activate.",
clash=(self.temperature <= 0.0),
msg="For constant pressure simulations, the temperature must be finite")
self.set_active('anisotropic',None,str,"Specify 'x' and/or 'y' and/or 'z' for anisotropic box scaling in NPT simulations",
depend=("pressure" in self.ActiveOptions and self.pressure > 0.0), msg = "We're not running a constant pressure simulation", allowed=[None,'x','y','z','xy','xz','yz','xyz'])
self.set_active('nbarostat',25,int,"Step interval for MC barostat volume adjustments.",
depend=("pressure" in self.ActiveOptions and self.pressure > 0.0), msg = "We're not running a constant pressure simulation")
self.set_active('nonbonded_method','PME',str,"Set the method for nonbonded interactions.", allowed=["NoCutoff","CutoffNonPeriodic","CutoffPeriodic","Ewald","PME"])
self.nonbonded_method_obj = {"NoCutoff":NoCutoff,"CutoffNonPeriodic":CutoffNonPeriodic,"CutoffPeriodic":CutoffPeriodic,"Ewald":Ewald,"PME":PME}[self.nonbonded_method]
self.set_active('nonbonded_cutoff',0.9,float,"Nonbonded cutoff distance in nanometers.")
self.set_active('vdw_switch',True,bool,"Use a multiplicative switching function to ensure twice-differentiable vdW energies near the cutoff distance.")
self.set_active('switch_distance',0.8,float,"Set the distance where the switching function starts; must be less than the nonbonded cutoff.")
self.set_active('dispersion_correction',True,bool,"Isotropic long-range dispersion correction for periodic systems.")
self.set_active('ewald_error_tolerance',0.0,float,"Error tolerance for Ewald and PME methods. Don't go below 5e-5 for PME unless running in double precision.",
depend=(self.nonbonded_method_obj in [Ewald, PME]), msg="Nonbonded method must be set to Ewald or PME.")
platformNames = [Platform.getPlatform(i).getName() for i in range(Platform.getNumPlatforms())]
default_platform = 'CUDA' if 'CUDA' in platformNames else 'Reference'
self.set_active('platform',default_platform,str,"The simulation platform.", allowed=platformNames)
self.set_active('precision','mixed',str,"The precision of the CUDA platform.", allowed=["single","mixed","double"],
depend=(self.platform == "CUDA"), msg="The simulation platform needs to be set to CUDA")
self.set_active('device',None,int,"Specify the device (GPU) number; will default to the fastest available.", depend=(self.platform in ["CUDA", "OpenCL"]),
msg="The simulation platform needs to be set to CUDA or OpenCL")
self.set_active('initial_report',False,bool,"Perform one Report prior to running any dynamics.")
self.set_active('constraints',None,str,"Specify constraints.", allowed=[None,"HBonds","AllBonds","HAngles"])
self.constraint_obj = {None: None, "None":None,"HBonds":HBonds,"HAngles":HAngles,"AllBonds":AllBonds}[self.constraints]
self.set_active('rigidwater',False,bool,"Add constraints to make water molecules rigid.")
self.set_active('constraint_tolerance',1e-5,float,"Set the constraint error tolerance in the integrator (default value recommended by Peter Eastman).")
self.set_active('serialize',None,str,"Provide a file name for writing the serialized System object.")
self.set_active('vdw_cutoff',0.9,float,"Separate vdW cutoff distance in nanometers for AMOEBA simulation.")
self.set_active('polarization_direct',False,bool,"Use direct polarization in AMOEBA force field")
self.set_active('polar_eps',1e-5,float,"Polarizable dipole convergence parameter in AMOEBA force field.",
depend=(not self.polarization_direct),
msg="Polarization_direct is turned on")
self.set_active('aewald',5.4459052,float,"Set the Ewald alpha parameter for periodic AMOEBA simulations.")
self.set_active('pmegrid',None,list,"Set the PME grid for AMOEBA simulations.", depend=(self.nonbonded_method == "PME"),
msg="The nonbonded method must be set to PME.")
self.set_active('pdb_report_interval',0,int,"Specify a timestep interval for PDB reporter.")
self.set_active('pdb_report_filename',"output_%s.pdb" % basename,str,"Specify an file name for writing output PDB file.",
depend=(self.pdb_report_interval > 0), msg="pdb_report_interval needs to be set to a whole number.")
self.set_active('dcd_report_interval',0,int,"Specify a timestep interval for DCD reporter.")
self.set_active('dcd_report_filename',"output_%s.dcd" % basename,str,"Specify an file name for writing output DCD file.",
depend=(self.dcd_report_interval > 0), msg="dcd_report_interval needs to be set to a whole number.")
self.set_active('eda_report_interval',0,int,"Specify a timestep interval for Energy reporter.", clash=(self.integrator in ["mts", "mtsvvvr", "mtslangevin"]), msg="EDA reporter incompatible with MTS integrator.")
self.set_active('eda_report_filename',"output_%s.eda" % basename,str,"Specify an file name for writing output Energy file.",
depend=(self.eda_report_interval is not None and self.eda_report_interval > 0), msg="eda_report_interval needs to be set to a whole number.")
self.set_active('maxvf_report_interval',0,int,"Whether to print the maximum velocity/force report.")
if self.pmegrid != None:
assert len(self.pmegrid) == 3, "The pme argument must be a length-3 list of integers"
self.set_active('tinkerpath',None,str,"Specify a path for TINKER executables for running AMOEBA validation.")
if self.tinkerpath != None:
assert os.path.exists(os.path.join(self.tinkerpath,'dynamic')), "tinkerpath must point to a directory that contains TINKER executables."
self.set_active('pos_res_k',1e5,float,"Set the force constant for the positional restraints.")
self.set_active('pos_res_atoms',None,str,"Set the indices of the atoms that will be restrained to their original position.", depend=(self.pos_res_k > 0),msg="Restrain force constants k must > 0")
self.set_active('cent_res_k_per_atom',1e5,float,"Set the force constant for the centroid restraints.")
self.set_active('cent_res_atoms',None,str,"Set the indices of the atoms whose center will be restrained to their original position.", depend=(self.cent_res_k_per_atom > 0),msg="Restrain force constants k must > 0")
self.set_active('cent_res_atoms2',None,str,"Set the indices of the atoms whose center will be restrained to their original position.", depend=(self.cent_res_k_per_atom > 0),msg="Restrain force constants k must > 0")
self.set_active('cent_res_xy_atoms',None,str,"Set the indices of the atoms whose center on the x and y direction will be restrained to their original position.", depend=(self.cent_res_k_per_atom > 0),msg="Restrain force constants k must > 0")
self.set_active('remove_cm_motion',True,bool,"Add a center-of-mass motion remover to the system.")
self.set_active('group_up_pressure',1.0,float,"Pressue on the group of atoms along z direction. Unit: bar")
self.set_active('group_up_atoms',None,str,"Set the indices of the atoms in Group to be pushed upwards.", depend=(self.group_up_pressure>0), msg="Group Up Pressure must > 0")
self.set_active('group_down_pressure',1.0,float,"Pressue on the group of atoms along -z direction. Unit: bar")
self.set_active('group_down_atoms',None,str,"Set the indices of the atoms in Group to be pushed downwards.", depend=(self.group_down_pressure>0), msg="Group Down Pressure must > 0")
self.set_active('block_z_pos',6.0,float,"Set the z position of the block. (nm)")
self.set_active('block_z_atoms',None,str,"Set the indices of the atoms that will be blocked from crossing the block_z_pos.")
self.set_active('balance_charge_atoms',None,str,"Set the indices of atoms that will be used to balance the total charge of the system.")
#================================#
# The command line parser #
#================================#
# Taken from MSMBulder - it allows for easy addition of arguments and allows "-h" for help.
def add_argument(group, *args, **kwargs):
if 'default' in kwargs:
d = 'Default: {d}'.format(d=kwargs['default'])
if 'help' in kwargs:
kwargs['help'] += ' {d}'.format(d=d)
else:
kwargs['help'] = d
group.add_argument(*args, **kwargs)
print()
print( " #===========================================#")
print( " #| OpenMM general purpose simulation |#")
print( " #| (Hosted @ github.com/leeping/OpenMM-MD) |#")
print( " #| Use the -h argument for detailed help |#")
print( " #===========================================#")
print()
print()
print('OpenMM Version:', Platform.getOpenMMVersion())
print('Git Revision:', version.git_revision)
print()
parser = argparse.ArgumentParser()
add_argument(parser, 'pdb', nargs=1, metavar='input.pdb', help='Specify one PDB or AMBER inpcrd file \x1b[1;91m(Required)\x1b[0m', type=str)
add_argument(parser, 'xml', nargs='+', metavar='forcefield.xml', help='Specify multiple force field XML files, one System XML file, or one AMBER prmtop file \x1b[1;91m(Required)\x1b[0m', type=str)
add_argument(parser, '-I', '--inputfile', help='Specify an input file with options in simple two-column format. This script will autogenerate one for you', default=None, type=str)
cmdline = parser.parse_args()
pdbfnm = cmdline.pdb[0]
xmlfnm = cmdline.xml
args = SimulationOptions(cmdline.inputfile, pdbfnm)
#================================#
# Define custom reporters here #
#================================#
class ProgressReport(object):
def __init__(self, file, reportInterval, simulation, total, first=0):
self._reportInterval = reportInterval
self._openedFile = isinstance(file, str)
self._initial = True
if self._openedFile:
self._out = open(file, 'w')
else:
self._out = file
self._interval = args.report_interval * args.timestep * femtosecond
self._units = OrderedDict()
self._units['energy'] = kilojoule_per_mole
self._units['kinetic'] = kilojoule_per_mole
self._units['potential'] = kilojoule_per_mole
self._units['temperature'] = kelvin
if simulation.topology.getUnitCellDimensions() != None :
self._units['density'] = kilogram / meter**3
self._units['volume'] = nanometer**3
self._data = defaultdict(list)
self._total = total
# The time step at the creation of this report.
self._first = first
self.run_time = 0.0*picosecond
self.rt00 = first * args.timestep * femtosecond
self.t0 = time.time()
def describeNextReport(self, simulation):
steps = self._reportInterval - simulation.currentStep%self._reportInterval
return (steps, False, False, False, True)
def analyze(self, simulation):
PrintDict = OrderedDict()
for datatype in self._units:
data = np.array(self._data[datatype])
mean = np.mean(data)
dmean = data - mean
stdev = np.std(dmean)
g = statisticalInefficiency(dmean)
stderr = np.sqrt(g) * stdev / np.sqrt(len(data))
acorr = 0.5*(g-1)*self._interval/nanosecond
# Perform a linear fit.
x = np.linspace(0, 1, len(data))
z = np.polyfit(x, data, 1)
p = np.polyval(z, x)
# Compute the drift.
drift = p[-1] - p[0]
# Compute the driftless standard deviation.
stdev1 = np.std(data-p)
PrintDict[datatype+" (%s)" % self._units[datatype]] = "%13.5f %13.5e %13.5f %13.5f %13.5f %13.5e" % (mean, stdev, stderr, acorr, drift, stdev1)
printcool_dictionary(PrintDict,"Summary statistics - total simulation time %.3f ns:\n%-26s %13s %13s %13s %13s %13s %13s\n%-26s %13s %13s %13s %13s %13s %13s" % (self.run_time/nanosecond,
"", "", "", "", "", "", "Stdev",
"Quantity", "Mean", "Stdev", "Stderr", "Acorr(ns)", "Drift", "(NoDrift)"),keywidth=30)
def report(self, simulation, state):
# Compute total mass in grams.
mass = compute_mass(simulation.system).in_units_of(gram / mole) / AVOGADRO_CONSTANT_NA
# The center-of-mass motion remover subtracts 3 more DoFs
ndof = 3*simulation.system.getNumParticles() - simulation.system.getNumConstraints() - 3
kinetic = state.getKineticEnergy()
potential = state.getPotentialEnergy() / self._units['potential']
kB = BOLTZMANN_CONSTANT_kB * AVOGADRO_CONSTANT_NA
temperature = 2.0 * kinetic / kB / ndof / self._units['temperature']
kinetic /= self._units['kinetic']
energy = kinetic + potential
pct = 100 * float(simulation.currentStep - self._first) / self._total
#self.run_time = float(simulation.currentStep - self._first) * args.timestep * femtosecond
self.run_time = float(simulation.currentStep) * args.timestep * femtosecond
if pct != 0.0:
timeleft = (time.time()-self.t0)*(100.0 - pct)/pct
else:
timeleft = 0.0
# Simulation speed is calculated over a single progress report window
if self.t00 is None:
nsday = 0.0
else:
days = (time.time()-self.t00)/86400
nsday = (self.run_time - self.rt00) / nanoseconds / days
self.t00 = time.time()
self.rt00 = self.run_time
if simulation.topology.getUnitCellDimensions() != None :
box_vectors = state.getPeriodicBoxVectors()
volume = compute_volume(box_vectors) / self._units['volume']
density = (mass / compute_volume(box_vectors)) / self._units['density']
if self._initial:
logger.info("%8s %17s %15s %13s %13s %13s %13s %13s %13s %13s" % ('Progress', 'E.T.A', 'Speed (ns/day)', 'Time(ns)', 'Temp(K)', 'Kin(kJ)', 'Pot(kJ)', 'Ene(kJ)', 'Vol(nm3)', 'Rho(kg/m3)'))
logger.info("%7.3f%% %17s %15.3f %13.6f %13.5f %13.5f %13.5f %13.5f %13.5f %13.5f" % (pct, GetTime(timeleft), nsday, self.run_time / nanoseconds, temperature, kinetic, potential, energy, volume, density))
self._data['volume'].append(volume)
self._data['density'].append(density)
else:
if self._initial:
logger.info("%8s %17s %15s %13s %13s %13s %13s %13s" % ('Progress', 'E.T.A', 'Speed (ns/day)', 'Time(ns)', 'Temp(K)', 'Kin(kJ)', 'Pot(kJ)', 'Ene(kJ)'))
logger.info("%7.3f%% %17s %15.3f %13.5f %13.5f %13.5f %13.5f %13.5f" % (pct, GetTime(timeleft), nsday, self.run_time / nanoseconds, temperature, kinetic, potential, energy))
self._data['energy'].append(energy)
self._data['kinetic'].append(kinetic)
self._data['potential'].append(potential)
self._data['temperature'].append(temperature)
self._initial = False
def __del__(self):
if self._openedFile:
self._out.close()
class MaxVFReporter(object):
def __init__(self, file, reportInterval):
self._reportInterval = reportInterval
self._openedFile = isinstance(file, str)
self._initial = True
if self._openedFile:
self._out = open(file, 'w')
else:
self._out = file
def describeNextReport(self, simulation):
steps = self._reportInterval - simulation.currentStep%self._reportInterval
return (steps, False, False, False, True)
def report(self, simulation, state):
my_state = simulation.context.getState(getEnergy=True,getPositions=True,getVelocities=True,getForces=True)
frc = my_state.getForces(asNumpy=True) / (kilocalorie_per_mole/angstrom)
frc_nrm = np.linalg.norm(frc, axis=1)
vel = my_state.getVelocities(asNumpy=True) / (meter/second)
vel_nrm = np.linalg.norm(vel, axis=1)
logger.info("Velocity report (m/s): Mean = %10.3f Stdev = %10.3f" % (np.mean(vel_nrm), np.std(vel_nrm)))
vel_rank = np.argsort(vel_nrm)[::-1]
logger.info("Top 10 velocities: " + ' '.join(["%10i" % vel_rank[i] for i in range(10)]))
logger.info(" " + ' '.join(["%10.3f" % vel_nrm[vel_rank[i]] for i in range(10)]))