""" Main grand canonical basin hopping class """
import os
import sys
import subprocess
import shutil
import pickle
from time import strftime, localtime
import numpy as np
import yaml
from ase import Atoms
from ase import units
from ase.io import read
from ase.io.trajectory import Trajectory
from ase.io.formats import UnknownFileTypeError
from ase.optimize.optimize import Dynamics
from ase.optimize import BFGS
from ase.neighborlist import NeighborList, natural_cutoffs
from gg.reference import get_ref_coeff
from gg.utils import (
NoReasonableStructureFound,
check_contact,
get_area,
sort_atoms,
extract_lowest_energy_from_oszicar,
extract_lowest_energy_from_outlog,
)
from gg.utils_graph import atoms_to_graph, is_unique_graph
from gg.logo import logo
from gg.predefined_sites import FlexibleSites
from gg.modifiers import Remove
__author__ = "Kaustubh Sawant, Geng Sun"
def get_current_time():
"""
Returns:
str: Current time
"""
time_label = strftime("%d-%b-%Y %H:%M:%S", localtime())
return time_label
[docs]
class Gcbh(Dynamics):
"""Basin hopping algorithm.
After Wales and Doye, J. Phys. Chem. A, vol 101 (1997) 5111-5116
and
David J. Wales and Harold A. Scheraga, Science, Vol. 285, 1368 (1999)
"""
def __init__(
self,
atoms: Atoms,
logfile: str = "gcbh.log",
trajectory: str = "gcbh.traj",
config_file: str = None,
restart: bool = False,
optimizer=BFGS,
):
"""
Args:
atoms (ase.Atoms): The Atoms object to operate on.
logfile (str, optional): If *logfile* is a string, a file will be opened.
Defaults to "gcbh.log".
trajectory (str, optional): Pickle file used to store the trajectory of atomic movement.
Defaults to "gcbh.traj".
config_file (str[.yaml file], optional): Input file to gcbh. Defaults to None.
restart (bool, optional):
optimizer: ase optimizer for geometric relaxation.
Defaults to ase.optimize.BFGS
"""
if not isinstance(atoms, Atoms):
raise RuntimeError("The input atoms is not as Atoms object")
if atoms.calc is None:
raise RuntimeError("The atoms instance has no calculator")
else:
self.calc = atoms.calc
# Intitalize by setting up the parent Dynamics Class
super().__init__(atoms=atoms, logfile=logfile, trajectory=None)
self.logfile.write(logo())
self.logfile.flush()
self.atoms.center()
# Read Config File if it exists
self.c = {
"temp": 1500,
"max_temp": None,
"min_temp": None,
"stop_steps": 40,
"stop_opt": 500,
"vasp_opt": False,
"chemical_potential": None,
"max_history": 25,
"max_bond": 2,
"max_bond_ratio": 0,
"check_graphs": True,
"area": False,
"fmax": 0.05,
"opt_traj": False,
"vib_correction": False,
"initialize": True,
"detect_gas": None,
"graph_method": "fullgraph",
"check_contact_error": 0.5,
}
self.optimizer = optimizer
if config_file:
self.set_config(config_file)
self.logtxt(f'Current Temp : {self.c["temp"]}')
# Setup Temperature
if self.c["max_temp"] is None:
self.c["max_temp"] = 1.0 / ((1.0 / self.c["temp"]) / 1.5)
else:
self.c["max_temp"] = max([self.c["max_temp"], self.c["temp"]])
self.logtxt(f'Setting Max Temp to {self.c["max_temp"]}')
if self.c["min_temp"] is None:
self.c["min_temp"] = 1.0 / ((1.0 / self.c["temp"]) * 1.5)
else:
self.c["min_temp"] = min([self.c["min_temp"], self.c["temp"]])
self.logtxt(f'Setting Min Temp to {self.c["min_temp"]}')
# Some file names and folders are hardcoded
self.status_file = "current_status.pkl"
self.opt_folder = "opt_folder"
if os.path.exists("local_minima.traj"):
self.lm_trajectory = Trajectory("local_minima.traj", "a", atoms)
else:
self.lm_trajectory = Trajectory("local_minima.traj", "w", atoms)
if os.path.exists(trajectory):
self.traj = Trajectory(trajectory, "a", atoms)
else:
self.traj = Trajectory(trajectory, "w", atoms)
self.structure_modifiers = {} # Setup empty class to add structure modifiers
self.vib_correction = {} # Setup empty class to add specific corrections
self.c["acc_hist"] = []
self.remove_gas_inst = []
# used for adjusting the temperature of Metropolis algorithm
# a series of 0 and 1, 0 stands for not accepted, 1 stands for accepted
# Print the chemical potential for different elements
if self.c["chemical_potential"]:
self.mu = self.c["chemical_potential"]
for k, v in self.mu.items():
self.logtxt(f"Chemical potential of {k} is {v}")
else:
raise RuntimeError("No chemical potential was provided")
self.c["energy"] = None
self.c["fe"] = None
self.c["fe_min"] = None
self.c["no_impr_step"] = 0
self.c["nsteps"] = 0
self.c["mod_weights"] = {}
# negative value indicates no ongoing structure optimization
self.c["opt_on"] = -1
# Graphing
self.c["graphs"] = []
if restart:
if os.path.exists(self.status_file):
self.update_from_file(self.status_file)
if any(
arg is None
for arg in [self.c["energy"], self.c["fe"], self.c["fe_min"]]
):
self.logtxt(
"Cannot restart since energy,fe or fe_min value is missing !"
)
self.initialize()
else:
if self.c["opt_on"] > 0:
subdir = os.path.join(
os.getcwd(), self.opt_folder, f'opt_{self.c["opt_on"]}'
)
if os.path.isdir(subdir):
print(f'Restarting from {self.c["opt_on"]}')
self.logtxt(f'Restarting from {self.c["opt_on"]}')
else:
self.logtxt(f'Starting a clean restart from {self.c["nsteps"]+1}')
self.logtxt(f'Best F: {self.c["fe_min"]:.2f}')
else:
self.logtxt("Cannot restart since current_status.pkl file is missing !")
self.initialize()
try:
self.atoms = read("local_minima.traj")
self.atoms.calc = self.calc
self.logtxt(f'Current best structure is {self.atoms}')
except UnknownFileTypeError as e:
print(f"Cannot read local_minima.traj due to {e}")
else:
self.initialize()
def set_config(self, config_file: str):
"""_
Args:
config_file (dict): Dictionary of key inputs
"""
with open(config_file, "r", encoding="utf-8") as f:
input_config = yaml.safe_load(f)
self.c.update(input_config)
def todict(self) -> dict:
description = {
"type": "optimization",
"optimizer": self.__class__.__name__,
}
return description
def dump(self, filename: str):
"""dump dictionary of variables to store"""
for key, value in self.structure_modifiers.items():
del value.atoms
self.c["mod_weights"][key] = value.weight
with open(filename, "wb") as file:
pickle.dump(self.c, file, protocol=pickle.HIGHEST_PROTOCOL)
return
def update_from_file(self, filename: str):
"""Update variable dictionary from file"""
with open(filename, "rb") as file:
c_old = pickle.load(file)
self.c.update(c_old)
if self.c["mod_weights"]:
for key, value in self.c["mod_weights"].items():
self.logtxt(f"{key} weight = {value:.2f}")
return
def logtxt(
self,
msg="",
):
"""Dump txt into logfile
Args:
msg (str, optional): The message to dump. Defaults to "".
"""
real_message = f"{msg.strip()} \n"
self.logfile.write(real_message)
self.logfile.flush()
return
def add_modifier(self, instance, name: str):
"""
Args:
instance (Modifier): Instance of a ParentModifier or a child
name (str): Name for the instance/modifier
"""
if name in self.structure_modifiers:
raise RuntimeError(f"Structure modifier {name} exists already!\n")
self.structure_modifiers[name] = instance
if name in self.c["mod_weights"]:
self.structure_modifiers[name].weight = self.c["mod_weights"][name]
return
def select_modifier(self) -> str:
"""
Returns:
str: random modifier name
"""
modifier_names = list(self.structure_modifiers.keys())
modifier_weights = np.asarray(
[self.structure_modifiers[key].weight for key in modifier_names]
)
modifier_weights = modifier_weights / modifier_weights.sum()
return np.random.choice(modifier_names, p=modifier_weights)
def add_vib_correction(self, instance, name: str):
"""
Args:
instance (Modifier): Instance of a ParentModifier or a child
name (str): Name for the instance/modifier
"""
if name in self.vib_correction:
raise RuntimeError(f"Correction: {name} exists already!\n")
previous_correction = 0
if self.c["vib_correction"]:
previous_correction = self.get_vib_correction(self.atoms)
self.vib_correction[name] = instance
# Update fe
if self.c["vib_correction"]:
self.logtxt(f"Updating F according to {name}")
total_correction = self.get_vib_correction(self.atoms)
correction = total_correction - previous_correction
correction = self._normalize_by_area(correction, self.atoms)
self.c["fe"] += correction
self.logtxt(f'Updated F after {name}: {self.c["fe"]:.6f}')
self.c["fe_min"] = self.c["fe"]
return
def update_modifier_weights(self, name: str, action: str):
"""
Args:
name (str): _description_
action (str): _description_
"""
if name not in self.structure_modifiers:
raise RuntimeError(f"operator name {name} not recognized")
action = action.split()[0][0]
if action not in ["i", "d", "r"]:
raise RuntimeError("action must be 'increase', 'decrease' or 'rest'")
elif action == "r":
og_weight = self.structure_modifiers[name].og_weight
self.structure_modifiers[name].weight = og_weight
self.logtxt(
f"Modifier {name} weight is reset to original weight : {og_weight:.2f}"
)
elif action == "i":
og_weight = self.structure_modifiers[name].og_weight
self.structure_modifiers[name].weight = min(
[
og_weight * 2,
self.structure_modifiers[name].weight * 1.05,
]
)
elif action == "d":
og_weight = self.structure_modifiers[name].og_weight
self.structure_modifiers[name].weight = max(
[
og_weight / 2.0,
self.structure_modifiers[name].weight / 1.05,
]
)
for key, value in self.structure_modifiers.items():
self.logtxt(f"{key} weight = {value.weight:.2f}")
return
def get_ref_potential(self, atoms: Atoms):
"""
Args:
atoms (ase.Atoms):
Returns:
float: total ref value to substract
"""
if self.mu:
formula = atoms.get_chemical_formula()
ref_sum = 0
to_print = f"{formula} ="
ref_coeff = get_ref_coeff(self.mu, formula)
for key, value in self.mu.items():
ref_sum += ref_coeff[key] * value
to_print += f" {ref_coeff[key]:+.2f} {key} "
self.logtxt(to_print)
return ref_sum
else:
return 0
def get_vib_correction(self, atoms: Atoms):
"""
Args:
atoms (ase.Atoms):
Returns:
float: total correction to add
"""
if self.c["vib_correction"]:
if len(self.vib_correction) > 0:
corr_sum = 0
a = atoms.copy()
correction_terms = []
for key, value in self.vib_correction.items():
try:
ind_to_remove = value.get_ind_to_remove_list(a)
except NoReasonableStructureFound:
continue
else:
flattened_list = set(
[item for sublist in ind_to_remove for item in sublist]
)
n = len(ind_to_remove)
corr_value = value.weight
corr_sum += n * corr_value
self.logtxt(f"Adding correction {key}:{n}*{corr_value}")
correction_terms.append(f"{key}:{n}*{corr_value:.6f}")
del a[[int(i) for i in flattened_list]]
if correction_terms:
self.logtxt(
f'Vibration correction results: {"; ".join(correction_terms)} => total {corr_sum:.6f} eV'
)
else:
self.logtxt("Vibration correction results: no correction terms found")
return corr_sum
else:
self.logtxt("Vibration correction results: no correction modifiers added")
return 0
else:
return 0
def adjust_temp(self, int_accept: int):
"""Start adjusting temperature beyond max_history
Args:
int_accept (int): 0 or 1
"""
# adjust the temperatures
self.c["acc_hist"].append(int_accept)
if len(self.c["acc_hist"]) > self.c["max_history"]:
self.c["acc_hist"].pop(0)
_balance = sum(self.c["acc_hist"]) / float(self.c["max_history"])
if _balance > 2.0 * (1 - _balance):
self.c["temp"] = self.c["temp"] / 1.03
elif _balance < 0.5 * (1 - _balance):
self.c["temp"] = self.c["temp"] * 1.03
if self.c["temp"] < self.c["min_temp"]:
self.c["temp"] = self.c["min_temp"]
elif self.c["temp"] > self.c["max_temp"]:
self.c["temp"] = self.c["max_temp"]
self.logtxt(f'Current Temp: {self.c["temp"]}')
return
def move(self, name: str) -> Atoms:
"""Move atoms by a random modifier."""
atoms = self.atoms
atoms.wrap()
self.logtxt(f"Modifier '{name}' formula {atoms.get_chemical_formula()}")
atoms = self.structure_modifiers[name].get_modified_atoms(atoms)
atoms.wrap()
atoms.center()
atoms = sort_atoms(atoms)
return atoms
def initialize(self):
"""Initialize Atoms"""
if not self.c["initialize"]:
self.logtxt(
"Warning!!! Skipping initialization, I hope you know what you are doing"
)
self.c["fe"] = float("inf")
self.c["fe_min"] = self.c["fe"]
self.c["energy"] = 0
self.c["nsteps"] += 1
self.append_graph(self.atoms, unique_method=self.c["graph_method"])
self.dump(self.status_file)
self.traj.write(self.atoms, energy=0)
self.lm_trajectory.write(self.atoms, energy=0, accept=1)
return
self.c["opt_on"] = 0
self.atoms, en = self.optimize(self.atoms)
if self.c["opt_on"] == -1 or en < -100000:
raise RuntimeError("The initial optimization failed, check logs")
self.dump(self.status_file)
self.c["energy"] = en
ref = self.get_ref_potential(self.atoms)
self.c["fe"] = self.c["energy"] - ref
if self.c["vib_correction"]:
self.c["fe"] += self.get_vib_correction(self.atoms)
self.c["fe"] = self._normalize_by_area(self.c["fe"], self.atoms)
self.c["fe_min"] = self.c["fe"]
self.c["opt_on"] = -1
self.dump(self.status_file)
self.c["nsteps"] += 1
formula = self.atoms.get_chemical_formula()
en = self.c["energy"]
self.append_graph(self.atoms, unique_method=self.c["graph_method"])
self.traj.write(self.atoms, energy=en)
self.lm_trajectory.write(self.atoms, energy=en, accept=1)
self.logtxt(
f'Atoms: {formula} E(initial): {en:.2f} F(initial) {self.c["fe"]:.2f}'
)
def _normalize_by_area(self, value: float, atoms: Atoms) -> float:
"""Normalize thermodynamic quantity by area if requested."""
if self.c["area"]:
return value / get_area(atoms)
return value
[docs]
def run(self, steps: int = 4000, maximum_trial: int = 30):
"""
Args:
steps (int): Number of steps to run
Defaults to 4000
maximum_trial (int): Number of failed modification steps before terminating.
Defaults to 30
"""
self.logtxt("Intitial Weights:")
for key, value in self.structure_modifiers.items():
self.logtxt(f"{key} weight = {value.weight:.2f}")
for step in range(steps):
if self.c["no_impr_step"] >= self.c["stop_steps"]:
self.logtxt(
f'The best solution has not improved after {self.c["no_impr_step"]} steps'
)
break
self.logtxt(
f"{step}-------------------------------------------------------"
)
self.logtxt(
f'{get_current_time()}: Starting Basin-Hopping Step {self.c["nsteps"]}'
)
for trials in range(maximum_trial):
modifier_name = self.select_modifier()
try:
newatoms = self.move(modifier_name)
except (
NoReasonableStructureFound
) as emsg: # emsg stands for error message
self.logtxt(
f"{modifier_name} did not find a good structure because {emsg} {type(emsg)}"
)
else:
if self.append_graph(
newatoms, unique_method=self.c["graph_method"]
):
self.c["opt_on"] = self.c["nsteps"]
self.logtxt(
f"One structure found with modifier {modifier_name}"
)
self.dump(self.status_file)
converged_atoms, en = self.optimize(newatoms)
self.traj.write(converged_atoms)
self.append_graph(
converged_atoms, unique_method=self.c["graph_method"]
)
if self.c["opt_on"] == -1 or en < -100000:
self.c["opt_on"] = -1
self.c["nsteps"] += 1
continue
self.logtxt(f"Optimization Done with E = {en:.2f}")
self.accepting_new_structures(
converged_atoms, modifier_name, en
)
self.c["opt_on"] = -1 # switch off the optimization status
self.dump(self.status_file)
self.c["nsteps"] += 1
break
else:
self.logtxt("Atoms object visited previously")
continue
else:
self.logtxt(
f"Program does not find a good structure after {trials+1} tests"
)
raise RuntimeError(
f"Program does not find a good structure after {trials+1} tests"
)
def accepting_new_structures(self, newatoms: Atoms, modifier_name: str, en):
"""This function takes care of all the accepting algorithms. I.E metropolis algorithms
newatoms is the newly optimized structure
"""
assert newatoms is not None # Energy_new
fn = en - self.get_ref_potential(newatoms) # Free_energy_new
if self.c["vib_correction"]:
fn += self.get_vib_correction(newatoms)
fn = self._normalize_by_area(fn, newatoms)
if fn < self.c["fe"]:
accept = True
modifier_weight_action = "increase"
# Check Probability for acceptance
elif np.random.uniform() < np.exp(
-(fn - self.c["fe"]) / self.c["temp"] / units.kB
):
accept = True
modifier_weight_action = "decrease"
else:
accept = False
modifier_weight_action = "decrease"
n = len(newatoms)
if self.remove_gas_inst:
newatoms = self.identify_and_delete_gas(newatoms)
if n > len(newatoms):
accept = False
modifier_weight_action = "decrease"
self.update_modifier_weights(name=modifier_name, action=modifier_weight_action)
if accept:
int_accept = 1
self.logtxt(
f'Accepted, F(old)={self.c["fe"]:.2f} F(new)={fn:.2f} at step {self.c["nsteps"]}'
)
self.atoms = newatoms
self.c["energy"] = en
self.c["fe"] = fn
self.lm_trajectory.write(newatoms, energy=en, accept=1)
else:
int_accept = 0
self.logtxt(
f'Rejected, F(old)={self.c["fe"]:.2f} F(new)={fn:.2f} at step {self.c["nsteps"]}'
)
self.adjust_temp(int_accept)
# update the best result for this basin-hopping
if self.c["fe"] < self.c["fe_min"]:
self.c["fe_min"] = self.c["fe"]
self.c["no_impr_step"] = 0
else:
self.c["no_impr_step"] += 1
self.dump(self.status_file)
self.logtxt("-------------------------------------------------------")
def _reject_touching_atoms(self, atoms: Atoms) -> bool:
"""Return True if atoms are touching and log the rejection."""
if check_contact(atoms,error=self.c["check_contact_error"]):
self.logtxt(
f'{get_current_time()}: Atoms touching after optimization at step {self.c["nsteps"]}. Rejecting structure.'
)
self.c["opt_on"] = -1
return True
return False
def optimize(self, atoms: Atoms):
"""Optimize atoms"""
optimizer = self.optimizer
if atoms.calc is None:
atoms.calc = self.calc
self.logtxt(
f'{get_current_time()}: Begin structure optimization routine at step {self.c["nsteps"]}'
)
opt_dir = self.opt_folder
topdir = os.getcwd()
subdir = os.path.join(topdir, opt_dir, f'opt_{self.c["nsteps"]}')
optimization_completed = False
if not os.path.isdir(subdir):
os.makedirs(subdir)
os.chdir(subdir)
try:
if self.c["vasp_opt"]:
en = atoms.get_potential_energy()
if self._reject_touching_atoms(atoms):
en = -1e6
optimization_completed = True
else:
atoms.write("POSCAR", format="vasp")
dyn = optimizer(atoms, logfile="opt.log")
if self.c["opt_traj"]:
traj = Trajectory("opt.traj", "w", atoms)
dyn.attach(traj.write, interval=self.c["opt_traj"])
dyn.run(fmax=self.c["fmax"], steps=self.c["stop_opt"])
if dyn.nsteps == self.c["stop_opt"]:
self.logtxt(
f'Opt is incomplete in {self.c["stop_opt"]} steps, ignore gcbh {self.c["nsteps"]} step'
)
self.c["opt_on"] = -1
else:
atoms.write("CONTCAR", format="vasp")
optimization_completed = True
en = atoms.get_potential_energy()
if self._reject_touching_atoms(atoms):
en = -1e6
except Exception as exc: # pragma: no cover - safety net for optimizer failures
self.logtxt(
f'Optimization failed at step {self.c["nsteps"]} because {exc}'
)
self.c["opt_on"] = -1
en = -1e6
finally:
os.chdir(topdir)
if optimization_completed:
try:
self.logtxt(
f'{get_current_time()}: Structure optimization completed for {self.c["nsteps"]}'
)
except Exception: # pragma: no cover - logging must not override optimization outcome
pass
return atoms, en
def append_graph(self, atoms, unique_method="fullgraph"):
"""Append the graph to list if its unique
Args:
atoms (_type_): _description_
"""
if self.c["check_graphs"]:
nl = NeighborList(
natural_cutoffs(atoms), self_interaction=False, bothways=True
)
nl.update(atoms)
new_g = atoms_to_graph(
atoms,
nl,
max_bond=self.c["max_bond"],
max_bond_ratio=self.c["max_bond_ratio"],
)
if self.c["graphs"]:
if is_unique_graph(new_g, self.c["graphs"], comp_type=unique_method):
self.logtxt(
f"Add graph step:{self.c['nsteps']} and graph loc:{len(self.c['graphs'])}"
)
self.c["graphs"].append(new_g)
return True
else:
return False
else:
self.c["graphs"].append(new_g)
self.logtxt("Appending first graph")
else:
return True
def add_delete_gas(self, gas_species=None, max_bond=2):
"""
Args:
gas_species (list, optional): List of gas species or dict configs to remove.
Defaults to None.
Dict config supports:
- species/formula: gas identifier string (e.g. "CO2")
- allow_external_symbols: list of symbols allowed to bond to the gas
subgraph (e.g. ["Pt"] to only allow surface bonds)
- max_external_neighbors: maximum number of external neighbors allowed
Example:
gcbh.add_delete_gas([
{"species": "CO2", "allow_external_symbols": ["Pt"], "max_external_neighbors": 2},
"CO",
])
"""
if not gas_species:
gas_species = self.c["delete_gas"]
ss = FlexibleSites(constraints=True, max_bond=max_bond)
for gas in gas_species:
gas_config = {}
if isinstance(gas, dict):
gas_config = gas
gas = gas_config.get("species") or gas_config.get("formula")
allow_external_symbols = gas_config.get("allow_external_symbols")
max_external_neighbors = gas_config.get("max_external_neighbors")
r = Remove(
ss,
to_del=gas,
max_bond=self.c["max_bond"],
max_bond_ratio=self.c["max_bond_ratio"],
allow_external_symbols=allow_external_symbols,
max_external_neighbors=max_external_neighbors,
)
self.remove_gas_inst.append(r)
return
def identify_and_delete_gas(self, atoms):
"""
Args:
atoms: identify gas species and delete them
"""
for inst in self.remove_gas_inst:
try:
ind_to_remove = inst.get_ind_to_remove_list(atoms)
except NoReasonableStructureFound:
continue
else:
flattened_list = set(
[item for sublist in ind_to_remove for item in sublist]
)
if flattened_list:
self.logtxt(f"Found {inst.to_del} at {flattened_list}")
del atoms[[int(i) for i in flattened_list]]
return atoms
[docs]
class GcbhFlexOpt(Gcbh):
"""
optimizer_file (str): Path to file that will run in opt_n folder
copied_files (str): Files to move into opt_n folder to help in optimization
"""
def __init__(
self,
atoms: Atoms,
logfile: str = "gcbh.log",
trajectory: str = "gcbh.traj",
config_file: str = None,
restart: bool = False,
optimizer_file: str = "optimize.sh",
copied_files: list = None,
):
"""
Args:
atoms (ase.Atoms): The Atoms object to operate on.
logfile (str, optional): If *logfile* is a string, a file will be opened.
Defaults to "gcbh.log".
trajectory (str, optional): Pickle file used to store the trajectory of atomic movement.
Defaults to "gcbh.traj".
config_file (str[.yaml file], optional): Input file to gcbh. Defaults to None.
restart (bool, optional):
optimizer (str):
"""
self.opt_file = optimizer_file
self.copied_files = copied_files if copied_files is not None else ["opt.py"]
super().__init__(atoms, logfile, trajectory, config_file, restart)
def optimize(self, atoms: Atoms):
"""Optimize atoms"""
self.logtxt(
f'{get_current_time()}: Begin structure optimization routine at step {self.c["nsteps"]}'
)
script = self.opt_file
copied_files = self.copied_files
opt_dir = self.opt_folder
topdir = os.getcwd()
subdir = os.path.join(topdir, opt_dir, f'opt_{self.c["nsteps"]}')
optimization_completed = False
if not os.path.isdir(subdir):
os.makedirs(subdir)
if script not in copied_files:
copied_files.append(script)
for file in copied_files:
assert os.path.isfile(file)
shutil.copy(os.path.join(topdir, file), os.path.join(subdir, file))
atoms.write(os.path.join(subdir, "input.traj"))
try:
os.chdir(subdir)
opt_job = subprocess.Popen(["bash", script], cwd=subdir)
opt_job.wait()
if opt_job.returncode < 0:
sys.stderr.write(
f"optimization does not terminate properly at {subdir}"
)
sys.exit(1)
except Exception as esc:
raise RuntimeError(
f"some error encountered at folder {subdir} during optimizations"
) from esc
else:
fn = os.path.join(subdir, "opt.traj")
assert os.path.isfile(fn)
atoms = read(fn)
optimization_completed = True
finally:
os.chdir(topdir)
if optimization_completed:
try:
self.logtxt(
f'{get_current_time()}: Structure optimization completed for {self.c["nsteps"]}'
)
except Exception: # pragma: no cover - logging must not override optimization outcome
pass
en = atoms.get_potential_energy()
if self._reject_touching_atoms(atoms):
en = 1e6
return atoms, en
def generate_all(self, traj=None):
"""
Generate all possible structures for all modifiers by looping over each
modifier, generating modified atoms, filtering unique structures, and
saving each unique structure as a POSCAR in a new subfolder with the
required files copied.
"""
self.logtxt(
"Starting GcbhAll run: generating all unique structures for all modifiers."
)
if not traj:
traj = [self.atoms]
if not isinstance(traj, list):
mod_structures = [traj]
index = 0
for a in traj:
for mod_name, mod_instance in self.structure_modifiers.items():
atoms = a.copy()
self.logtxt(f"Processing modifier: {mod_name}")
try:
# Generate modified structures.
mod_instance.unique = True
mod_instance.print_movie = True
mod_structures = mod_instance.get_modified_atoms(atoms)
self.logtxt(f"Found {len(mod_structures)} structures")
except (
NoReasonableStructureFound
) as emsg: # emsg stands for error message
self.logtxt(
f"{mod_name} did not find a good structure because {emsg} {type(emsg)}"
)
continue
if not isinstance(mod_structures, list):
mod_structures = [mod_structures]
# Process each generated structure.
for struct in mod_structures:
# Check uniqueness by using the already available append_graph method.
if self.c["check_graphs"]:
# append_graph returns True if the structure is unique.
is_unique = self.append_graph(
struct, unique_method=mod_instance.unique_method
)
else:
is_unique = True
if is_unique:
# Increment the step counter to generate a unique folder name.
subfolder = os.path.join(
os.getcwd(),
self.opt_folder,
f"opt_{self.c['nsteps']}",
f"opt_{index}",
)
if not os.path.isdir(subfolder):
os.makedirs(subfolder)
# Write the POSCAR file in VASP format.
poscar_file = os.path.join(subfolder, "POSCAR")
struct.write(poscar_file, format="vasp")
# Copy any additional required files into the new subfolder.
for file in self.copied_files:
if os.path.isfile(file):
shutil.copy(file, subfolder)
else:
self.logtxt(f"File {file} not found for copying.")
self.logtxt(
f"Unique structure from modifier '{mod_name}' saved in {subfolder}"
)
else:
self.logtxt(
f"Duplicate structure from modifier '{mod_name}' encountered; skipping."
)
index += 1
self.c["nsteps"] += 1
def update_lowest_energy(
self, folder=None, unique_method="fullgraph", file_type=["OSZICAR", "CONTCAR"]
):
"""
Loops over all subdirectories in self.opt_folder.
"""
best_fe = self.c["fe"]
best_atoms = self.atoms
if not folder:
folder = self.opt_folder
opt_folder_path = os.path.join(os.getcwd(), folder)
if not os.path.isdir(opt_folder_path):
self.logtxt(f"Optimization folder '{folder}' does not exist.")
return
# Walk through all directories recursively
for root, _, files in os.walk(opt_folder_path):
if file_type[0] in files and file_type[1] in files:
contcar_path = os.path.join(root, file_type[1])
en_path = os.path.join(root, file_type[0])
if file_type[0] == "OSZICAR":
en = extract_lowest_energy_from_oszicar(en_path)
elif file_type[0] == "out.log":
en = extract_lowest_energy_from_outlog(en_path)
else:
en = None
raise RuntimeError(
"The energy should be output in out.log or OSZICAR"
)
if en is not None:
atoms = read(contcar_path, format="vasp")
fn = en - self.get_ref_potential(atoms)
if self.c["vib_correction"]:
fn += self.get_vib_correction(atoms)
if fn < best_fe:
best_fe = fn
best_atoms = atoms
self.logtxt(f"Accepted; F(new)={fn:.2f} at {root}")
self.append_graph(atoms, unique_method=unique_method)
self.atoms = best_atoms
self.c["fe"] = best_fe