Source code for pymopac.input
from os.path import isfile
from rdkit.Chem.AllChem import warnings
from .helpers import xyz_identifier, BlockToXyz, checkOverlap
from .output import MopacOutput
from . import __mopac_path__
import os
from time import time_ns, sleep
import subprocess
try:
MOPAC_PATH = __mopac_path__
if MOPAC_PATH is None:
MOPAC_PATH = "mopac"
print("MOPAC not found in path, falling back to trying 'mopac'. expect issues.")
except Exception as e:
print(e)
MOPAC_PATH = "mopac"
class BaseInput:
"""
Base class that sets up the basic mechanisms of launching a job and getting
outputs.
"""
def __init__(self, **kwargs):
pass
def run(self):
pass
[docs]
class MopacInput(BaseInput):
"""
Class that sets up the input file for a MOPAC calculation.
geometry:
+ pure xyz block
+ SMILES (str)
+ rdkit Mol
model: str
all supported MOPAC keywords, e.g. AM1, PM6, ...
custom_header: str
custom string that is attached to other header keywords
comment: str
second line in the input file
path:
if False, a dir is created under /tmp/, else under the given str
AddHs: bool
calls AddHs on the mol object
preopt: bool
uses MMFF to optimize the mol structure
verbose: bool
if True, prints additional class internal statements
stream: bool
if True, streams the outfile to stdout
plot: bool
if True, plots progress via matplotlib
Methods:
+ run()
runs MOPAC as a subprocess
returns MopacOutput class
+ getInpFile()
returns the MOPAC input as a string
"""
def __init__(self, geometry,
model: str = "PM7",
custom_header: str = "",
comment: str = "#",
path=False,
addHs: bool = True,
preopt: bool = True,
verbose: bool = False,
stream: bool = False,
plot: bool = False,
aux: bool = True):
self.geometry = geometry
self.model = model
self.custom_header = custom_header
self.comment = comment
self.path = path
self.addHs = addHs
self.preopt = preopt
self.verbose = verbose
self.stream = stream
self.plot = plot
self.aux = aux
self.xyz = self.geoToXyz()
if not path:
ts = time_ns()
self.path = f"/tmp/pymopac_{ts}"
if not os.path.isdir(self.path):
os.mkdir(self.path)
if self.verbose:
print(f"created path at {self.path}")
else:
if not os.path.isdir(self.path):
os.mkdir(str(self.path))
if self.verbose:
print(f"created path at {self.path}")
[docs]
def geoToXyz(self):
"""
Helper function that tries to infer a geometry from any input and
returns a xyz block
"""
# check for xyz block
xyz_status, xyz_block = xyz_identifier(self.geometry)
if xyz_status:
return BlockToXyz(xyz_block)
from rdkit import Chem
from rdkit.Chem import AllChem
if isinstance(self. geometry, str):
try:
mol = Chem.MolFromSmiles(self.geometry)
except Exception as e:
raise RuntimeError("Failed to parse input as SMILES", e)
else:
mol = self.geometry
if self.addHs:
mol = AllChem.AddHs(mol)
if self.preopt:
AllChem.EmbedMolecule(mol)
AllChem.MMFFOptimizeMolecule(mol)
# check if there are some overlapping atoms, reoptimize as needed
overlapping_pairs = checkOverlap(mol)
if overlapping_pairs is not []:
# print(overlapping_pairs)
conf = mol.GetConformer(0)
for i, j in overlapping_pairs:
import numpy as np
wiggle = np.random.normal(0, 0.3, 3)
conf.SetAtomPosition(i, conf.GetAtomPosition(i) + wiggle)
conf.SetAtomPosition(j, conf.GetAtomPosition(j) - wiggle)
# print(check_overlap(mol))
# print(Chem.MolToXYZBlock(mol))
AllChem.MMFFOptimizeMolecule(mol)
return BlockToXyz(Chem.MolToXYZBlock(mol))
[docs]
def getInpFile(self):
header = self.model + self.custom_header
if self.aux:
header += " AUX"
return "\n".join([header, str(self.comment), "", str(self.xyz)])
[docs]
def run(self):
"""
runs MOPAC as a subprocess
returns MopacOutput class
"""
self.inpath = f"{self.path}/pymopac.mop"
self.outpath = f"{self.path}/pymopac.out"
if self.aux:
self.auxpath = f"{self.path}/pymopac.aux"
with open(self.inpath, "w") as file:
file.write(self.getInpFile())
if self.verbose:
print(f"input file written to {self.inpath}")
if self.stream or self.plot:
process = self.verboseRun()
else:
process = self.silentRun()
return MopacOutput(outfile=self.getOutResult(),
stderr=process.stderr, stdout=process.stdout,
aux=self.getAuxResult())
[docs]
def stream_run(self):
"""
returns both the mopac process and the stream yielding lines
"""
with open(self.outpath, 'a'):
os.utime(self.outpath, None)
while not os.path.isfile(self.outpath):
sleep(0.01)
# start tail process first and create stream generator
tail_process = subprocess.Popen(["tail", "-f", self.outpath],
stdout=subprocess.PIPE,
universal_newlines=True)
def pure_stream(mopac_proc, tail_proc):
try:
for line in tail_proc.stdout:
yield line.strip()
if mopac_proc.poll() is not None:
break
if "MOPAC DONE" in line:
break
finally:
tail_proc.terminate()
tail_proc.wait()
mopac_proc.wait()
# Start MOPAC process after tail is ready
mopac_process = subprocess.Popen(["mopac", self.inpath])
# Create stream with both processes
stream = pure_stream(mopac_process, tail_process)
return mopac_process, stream
[docs]
def verboseRun(self):
"""
Sets up the basics for a stream run and contains switches for being
verbose and streaming. If plot is True, it creates a live-updating
matplotlib figure of gradient values.
"""
process, lines = self.stream_run()
line_counter = 0
if self.plot:
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
self.grads = []
self.heats = []
self.fig, (self.ax, self.ax_heat) = plt.subplots(2, 1)
self.line, = self.ax.plot([], [])
self.line_heat, = self.ax_heat.plot([], [])
self.ax.set_xlabel('Cycle')
self.ax.set_ylabel('Gradient')
self.ax_heat.set_xlabel('Cycle')
self.ax_heat.set_ylabel('Heat')
self.ax.set_title('Gradient vs. Cycle')
self.ax_heat.set_title('Heat vs. Cycle')
def update_plot(frame):
self.line.set_data(range(len(self.grads)), self.grads)
self.line_heat.set_data(range(len(self.heats)), self.heats)
self.ax.relim()
self.ax.autoscale_view()
self.ax_heat.relim()
self.ax_heat.autoscale_view()
return self.line, self.line_heat
self.ani = FuncAnimation(self.fig, update_plot,
frames=None, interval=100, blit=True,
cache_frame_data=False, save_count=100)
plt.tight_layout()
plt.show(block=False)
# Process all lines until the generator is exhausted
for line in lines:
if self.stream:
print(line)
if self.plot and "GRAD.:" in line:
spl = line.split()
i = spl.index("GRAD.:")
i_heat = spl.index("HEAT:")
self.grads.append(float(spl[i+1]))
self.heats.append(float(spl[i_heat+1]))
self.fig.canvas.flush_events()
line_counter += 1
# Generator has finished, wait for process to complete
process.wait()
if line_counter < 2:
print("No lines captured, calculations presumably done too fast")
if self.plot:
import matplotlib.pyplot as plt
plt.show() # Keep the plot open after the function finishes
return process
[docs]
def silentRun(self):
"""
just runs MOPAC, no feedback or streaming
"""
process = subprocess.run(
[MOPAC_PATH, self.inpath],
capture_output=True)
if process.returncode == 0:
pass
else:
raise Exception(process.stderr)
return process
[docs]
def getOutResult(self):
with open(self.outpath, "r") as f:
out = f.read()
result_splitter = "-------------------------------------------------------------------------------\n"
length = len(result_splitter)
result_splitter += " " + \
"\n ".join(self.getInpFile().split("\n")[:2])
try:
i = out.index(result_splitter) + length
except:
warnings.warn("no output splitter found, proceed with caution")
i = 0
result = out[i:]
return result
[docs]
def getAuxResult(self):
if hasattr(self, "auxpath"):
if os.path.isfile(self.auxpath):
with open(self.auxpath, "r") as f:
return f.read()
else:
return None