Source code for ase.optimize.optimize

"""Structure optimization. """
import time
import warnings
from collections.abc import Callable
from functools import cached_property
from math import sqrt
from os.path import isfile
from typing import IO, Any, Dict, List, Optional, Tuple, Union

from ase import Atoms
from ase.calculators.calculator import PropertyNotImplementedError
from ase.filters import UnitCellFilter
from ase.parallel import world
from ase.utils import IOContext
from ase.utils.abc import Optimizable

DEFAULT_MAX_STEPS = 100_000_000


class RestartError(RuntimeError):
    pass


class OptimizableAtoms(Optimizable):
    def __init__(self, atoms):
        self.atoms = atoms

    def get_positions(self):
        return self.atoms.get_positions()

    def set_positions(self, positions):
        self.atoms.set_positions(positions)

    def get_forces(self):
        return self.atoms.get_forces()

    @cached_property
    def _use_force_consistent_energy(self):
        # This boolean is in principle invalidated if the
        # calculator changes.  This can lead to weird things
        # in multi-step optimizations.
        try:
            self.atoms.get_potential_energy(force_consistent=True)
        except PropertyNotImplementedError:
            # warnings.warn(
            #     'Could not get force consistent energy (\'free_energy\').  '
            #     'Please make sure calculator provides \'free_energy\', even '
            #     'if equal to the ordinary energy.  '
            #     'This will raise an error in future versions of ASE.',
            #     FutureWarning)
            return False
        else:
            return True

    def get_potential_energy(self):
        force_consistent = self._use_force_consistent_energy
        return self.atoms.get_potential_energy(
            force_consistent=force_consistent)

    def iterimages(self):
        # XXX document purpose of iterimages
        return self.atoms.iterimages()

    def __len__(self):
        # TODO: return 3 * len(self.atoms), because we want the length
        # of this to be the number of DOFs
        return len(self.atoms)


class Dynamics(IOContext):
    """Base-class for all MD and structure optimization classes."""

    def __init__(
        self,
        atoms: Atoms,
        logfile: Optional[Union[IO, str]] = None,
        trajectory: Optional[str] = None,
        append_trajectory: bool = False,
        master: Optional[bool] = None,
        comm=world,
        *,
        loginterval: int = 1,
    ):
        """Dynamics object.

        Parameters
        ----------
        atoms : Atoms object
            The Atoms object to operate on.

        logfile : file object or str
            If *logfile* is a string, a file with that name will be opened.
            Use '-' for stdout.

        trajectory : Trajectory object or str
            Attach trajectory object.  If *trajectory* is a string a
            Trajectory will be constructed.  Use *None* for no
            trajectory.

        append_trajectory : bool
            Defaults to False, which causes the trajectory file to be
            overwriten each time the dynamics is restarted from scratch.
            If True, the new structures are appended to the trajectory
            file instead.

        master : bool
            Defaults to None, which causes only rank 0 to save files. If set to
            true, this rank will save files.

        comm : Communicator object
            Communicator to handle parallel file reading and writing.

        loginterval : int, default: 1
            Only write a log line for every *loginterval* time steps.
        """
        self.atoms = atoms
        self.optimizable = atoms.__ase_optimizable__()
        self.logfile = self.openfile(file=logfile, comm=comm, mode='a')
        self.observers: List[Tuple[Callable, int, Tuple, Dict[str, Any]]] = []
        self.nsteps = 0
        self.max_steps = 0  # to be updated in run or irun
        self.comm = comm

        if trajectory is not None:
            if isinstance(trajectory, str):
                from ase.io.trajectory import Trajectory
                mode = "a" if append_trajectory else "w"
                trajectory = self.closelater(Trajectory(
                    trajectory, mode=mode, master=master, comm=comm
                ))
            self.attach(
                trajectory,
                interval=loginterval,
                atoms=self.optimizable,
            )

        self.trajectory = trajectory

    def todict(self) -> Dict[str, Any]:
        raise NotImplementedError

    def get_number_of_steps(self):
        return self.nsteps

    def insert_observer(
        self, function, position=0, interval=1, *args, **kwargs
    ):
        """Insert an observer.

        This can be used for pre-processing before logging and dumping.

        Examples
        --------
        >>> from ase.build import bulk
        >>> from ase.calculators.emt import EMT
        >>> from ase.optimize import BFGS
        ...
        ...
        >>> def update_info(atoms, opt):
        ...     atoms.info["nsteps"] = opt.nsteps
        ...
        ...
        >>> atoms = bulk("Cu", cubic=True) * 2
        >>> atoms.rattle()
        >>> atoms.calc = EMT()
        >>> with BFGS(atoms, logfile=None, trajectory="opt.traj") as opt:
        ...     opt.insert_observer(update_info, atoms=atoms, opt=opt)
        ...     opt.run(fmax=0.05, steps=10)
        True
        """
        if not isinstance(function, Callable):
            function = function.write
        self.observers.insert(position, (function, interval, args, kwargs))

    def attach(self, function, interval=1, *args, **kwargs):
        """Attach callback function.

        If *interval > 0*, at every *interval* steps, call *function* with
        arguments *args* and keyword arguments *kwargs*.

        If *interval <= 0*, after step *interval*, call *function* with
        arguments *args* and keyword arguments *kwargs*.  This is
        currently zero indexed."""

        if hasattr(function, "set_description"):
            d = self.todict()
            d.update(interval=interval)
            function.set_description(d)
        if not isinstance(function, Callable):
            function = function.write
        self.observers.append((function, interval, args, kwargs))

    def call_observers(self):
        for function, interval, args, kwargs in self.observers:
            call = False
            # Call every interval iterations
            if interval > 0:
                if (self.nsteps % interval) == 0:
                    call = True
            # Call only on iteration interval
            elif interval <= 0:
                if self.nsteps == abs(interval):
                    call = True
            if call:
                function(*args, **kwargs)

    def irun(self, steps=DEFAULT_MAX_STEPS):
        """Run dynamics algorithm as generator.

        Parameters
        ----------
        steps : int, default=DEFAULT_MAX_STEPS
            Number of dynamics steps to be run.

        Yields
        ------
        converged : bool
            True if the forces on atoms are converged.

        Examples
        --------
        This method allows, e.g., to run two optimizers or MD thermostats at
        the same time.
        >>> opt1 = BFGS(atoms)
        >>> opt2 = BFGS(StrainFilter(atoms)).irun()
        >>> for _ in opt2:
        ...     opt1.run()
        """

        # update the maximum number of steps
        self.max_steps = self.nsteps + steps

        # compute the initial step
        self.optimizable.get_forces()

        # log the initial step
        if self.nsteps == 0:
            self.log()

            # we write a trajectory file if it is None
            if self.trajectory is None:
                self.call_observers()
            # We do not write on restart w/ an existing trajectory file
            # present. This duplicates the same entry twice
            elif len(self.trajectory) == 0:
                self.call_observers()

        # check convergence
        is_converged = self.converged()
        yield is_converged

        # run the algorithm until converged or max_steps reached
        while not is_converged and self.nsteps < self.max_steps:
            # compute the next step
            self.step()
            self.nsteps += 1

            # log the step
            self.log()
            self.call_observers()

            # check convergence
            is_converged = self.converged()
            yield is_converged

    def run(self, steps=DEFAULT_MAX_STEPS):
        """Run dynamics algorithm.

        This method will return when the forces on all individual
        atoms are less than *fmax* or when the number of steps exceeds
        *steps*.

        Parameters
        ----------
        steps : int, default=DEFAULT_MAX_STEPS
            Number of dynamics steps to be run.

        Returns
        -------
        converged : bool
            True if the forces on atoms are converged.
        """

        for converged in Dynamics.irun(self, steps=steps):
            pass
        return converged

    def converged(self):
        """" a dummy function as placeholder for a real criterion, e.g. in
        Optimizer """
        return False

    def log(self, *args):
        """ a dummy function as placeholder for a real logger, e.g. in
        Optimizer """
        return True

    def step(self):
        """this needs to be implemented by subclasses"""
        raise RuntimeError("step not implemented.")


[docs] class Optimizer(Dynamics): """Base-class for all structure optimization classes.""" # default maxstep for all optimizers defaults = {'maxstep': 0.2} _deprecated = object() def __init__( self, atoms: Atoms, restart: Optional[str] = None, logfile: Optional[Union[IO, str]] = None, trajectory: Optional[str] = None, append_trajectory: bool = False, **kwargs, ): """ Parameters ---------- atoms: :class:`~ase.Atoms` The Atoms object to relax. restart: str Filename for restart file. Default value is *None*. logfile: file object or str If *logfile* is a string, a file with that name will be opened. Use '-' for stdout. trajectory: Trajectory object or str Attach trajectory object. If *trajectory* is a string a Trajectory will be constructed. Use *None* for no trajectory. append_trajectory: bool Appended to the trajectory file instead of overwriting it. kwargs : dict, optional Extra arguments passed to :class:`~ase.optimize.optimize.Dynamics`. """ super().__init__( atoms=atoms, logfile=logfile, trajectory=trajectory, append_trajectory=append_trajectory, **kwargs, ) self.restart = restart self.fmax = None if restart is None or not isfile(restart): self.initialize() else: self.read() self.comm.barrier() def read(self): raise NotImplementedError def todict(self): description = { "type": "optimization", "optimizer": self.__class__.__name__, } # add custom attributes from subclasses for attr in ('maxstep', 'alpha', 'max_steps', 'restart', 'fmax'): if hasattr(self, attr): description.update({attr: getattr(self, attr)}) return description def initialize(self): pass def irun(self, fmax=0.05, steps=DEFAULT_MAX_STEPS): """Run optimizer as generator. Parameters ---------- fmax : float Convergence criterion of the forces on atoms. steps : int, default=DEFAULT_MAX_STEPS Number of optimizer steps to be run. Yields ------ converged : bool True if the forces on atoms are converged. """ self.fmax = fmax return Dynamics.irun(self, steps=steps) def run(self, fmax=0.05, steps=DEFAULT_MAX_STEPS): """Run optimizer. Parameters ---------- fmax : float Convergence criterion of the forces on atoms. steps : int, default=DEFAULT_MAX_STEPS Number of optimizer steps to be run. Returns ------- converged : bool True if the forces on atoms are converged. """ self.fmax = fmax return Dynamics.run(self, steps=steps) def converged(self, forces=None): """Did the optimization converge?""" if forces is None: forces = self.optimizable.get_forces() return self.optimizable.converged(forces, self.fmax) def log(self, forces=None): if forces is None: forces = self.optimizable.get_forces() fmax = sqrt((forces ** 2).sum(axis=1).max()) e = self.optimizable.get_potential_energy() T = time.localtime() if self.logfile is not None: name = self.__class__.__name__ if self.nsteps == 0: args = (" " * len(name), "Step", "Time", "Energy", "fmax") msg = "%s %4s %8s %15s %12s\n" % args self.logfile.write(msg) args = (name, self.nsteps, T[3], T[4], T[5], e, fmax) msg = "%s: %3d %02d:%02d:%02d %15.6f %15.6f\n" % args self.logfile.write(msg) self.logfile.flush() def dump(self, data): from ase.io.jsonio import write_json if self.comm.rank == 0 and self.restart is not None: with open(self.restart, 'w') as fd: write_json(fd, data) def load(self): from ase.io.jsonio import read_json with open(self.restart) as fd: try: from ase.optimize import BFGS if not isinstance(self, BFGS) and isinstance( self.atoms, UnitCellFilter ): warnings.warn( "WARNING: restart function is untested and may result " "in unintended behavior. Namely orig_cell is not " "loaded in the UnitCellFilter. Please test on your own" " to ensure consistent results." ) return read_json(fd, always_array=False) except Exception as ex: msg = ('Could not decode restart file as JSON. ' 'You may need to delete the restart file ' f'{self.restart}') raise RestartError(msg) from ex