Source code for skpar.core.evaluate

"""Evaluator engine of SKPAR."""
import os
import shutil
import numpy as np
from skpar.core.utils import get_logger, normalise
from skpar.core.tasks import initialise_tasks
from skpar.core.database import Database

LOGGER = get_logger(__name__)

DEFAULT_GLOBAL_COST_FUNC = "rms"

DEFAULT_CONFIG = {
    "workroot": None,
    "templatedir": None,
    "keepworkdirs": True,
}


[docs]def abserr(ref, model): """Return the per-element difference model and reference.""" aref = np.asarray(ref) amod = np.asarray(model) assert aref.shape == amod.shape, (aref.shape, amod.shape) return amod - aref
[docs]def relerr(ref, model): """Return the per-element relative difference between model and reference. To handle cases where `ref` vanish, and possibly `model` vanish at the same time, we: * translate directly a vanishing absolute error into vanishing relative error (where both `ref` and `model` vanish. * take the model as a denominator, thus yielding 1, where `ref` vanishes but `model` is non zero """ aref = np.asarray(ref) amod = np.asarray(model) # get deviations err = abserr(aref, amod) # fix the denominator denom = aref.copy() denom[aref == 0.0] = amod[aref == 0.0] # assert 0 absolute error even for 0 denominator rel_err = np.zeros(err.shape) rel_err[err != 0] = err[err != 0] / denom[err != 0] return rel_err
[docs]def cost_rms(ref, model, weights, errf=abserr): """Return the weighted-RMS deviation""" assert np.asarray(ref).shape == np.asarray(model).shape assert np.asarray(ref).shape == np.asarray(weights).shape err2 = errf(ref, model) ** 2 rms = np.sqrt(np.sum(weights * err2)) return rms
[docs]def eval_objectives(objectives, database): """Evaluate fitness/cost""" fitness = np.array([objv(database) for objv in objectives]) return fitness
# ---------------------------------------------------------------------- # Function mappers # ---------------------------------------------------------------------- # use small letters for the function names; make sure # the input parser coerces any capitalisation in advance # cost functions COSTF = { "rms": cost_rms, } # error types ERRF = { "abs": abserr, "rel": relerr, "abserr": abserr, "relerr": relerr, } # ----------------------------------------------------------------------
[docs]class Evaluator: """**Evaluator** The evaluator is the only thing visible to the optimiser. It has several things to do: 1. Accept a list of parameter values (or key-value pairs), and an iteration number (or a tuple: (e.g. generation,particle_index)). 2. Update tasks (and underlying models) with new parameter values. 3. Execute the tasks to obtain model data with the new parameters. 4. Evaluate fitness for individual objectives. 5. Evaluate global fitness (cost) and return the value. It may be good to also return the max error, to be used as a stopping criterion. """ def __init__( self, objectives, tasklist, taskdict, parameternames, config=None, costf=COSTF[DEFAULT_GLOBAL_COST_FUNC], utopia=None, verbose=False, ): self.objectives = objectives self.weights = normalise([oo.weight for oo in objectives]) self.tasklist = tasklist # list of name,options pairs self.taskdict = taskdict # name:function mapping self.parnames = parameternames self.config = config if config is not None else DEFAULT_CONFIG self.costf = costf if utopia is None: self.utopia = np.zeros(len(objectives)) else: assert len(utopia) == len(objectives), (len(utopia), len(objectives)) self.utopia = utopia # configure logger self.logger = LOGGER if verbose: self._msg = self.logger.info else: self._msg = self.logger.debug # report objectives; these do not change over time for item in objectives: self._msg(item)
[docs] def evaluate(self, parametervalues, iteration=None): """Evaluate the global fitness of a given point in parameter space. This is the only object accessible to the optimiser, therefore only two arguments can be passed. Args: parametervalues (list): current point in design/parameter space iteration: (int or tupple): current iteration or current generation and individual index within generation Return: fitness (float): global fitness of the current design point """ # Create individual working directory for each evaluation origdir = os.getcwd() workroot = self.config["workroot"] if workroot is None: workdir = origdir else: workdir = get_workdir(iteration, workroot) create_workdir(workdir, self.config["templatedir"]) # Initialise model database self.logger.info("Initialising ModelDataBase.") # database may be something different than a dictionary, but # whatever object it is, should have: # update_modeldb(modelname, datadict) -- update modeldb # get_modeldb(modelname) -- return a ref to modeldb # The point is that for every individual evaluation, we must # have individual model DB, so evaluations can be done in parallel. database = Database() # database = {} currently works too # Wrap the environment in a single dict env = { "workroot": workdir, "logger": self.logger, "parameternames": self.parnames, "parametervalues": parametervalues, "iteration": iteration, "taskdict": self.taskdict, "objectives": self.objectives, } # Initialise and then execute the tasks tasks = initialise_tasks(self.tasklist, self.taskdict, report=False) self.logger.info("Iteration %s", iteration) self.logger.info("===========================") if self.parnames: parstr = [ "{:s}({:.4g})".format(name, val) for name, val in zip(self.parnames, parametervalues) ] self.logger.info("Parameters: {:s}".format(" ".join(parstr))) # do we really need to pass workdir and to os.chdir??? # move the for loop to a function. # execute_tasks(tasks, env, database, workdir, logger) for i, task in enumerate(tasks): os.chdir(workdir) try: task(env, database) except: self.logger.critical("Task %i FAILED:\n%s", i, task) raise # Evaluate individual fitness for each objective objvfitness = eval_objectives(self.objectives, database) # Evaluate global fitness cost = self.costf(self.utopia, objvfitness, self.weights) self._msg("{:<15s}: {}\n".format("Overall cost", cost)) # Remove iteration-specific working dir if not needed: if (not self.config["keepworkdirs"]) and (workroot is not None): destroy_workdir(workdir) os.chdir(origdir) return np.atleast_1d(cost)
def __call__(self, parametervalues, iteration=None): return self.evaluate(parametervalues, iteration) def __repr__(self): srepr = [] srepr.append("Evaluator:") srepr.append("\n-- Tasks:") srepr.append("--------------------") # for item in self.tasks: # srepr.append(item.__repr__()) srepr.append("\n-- Objectives:") srepr.append("--------------------") for item in self.objectives: srepr.append(item.__repr__()) srepr.append("\n-- Cost Func.:") srepr.append("--------------------") srepr.append(self.costf.__name__) return "\n".join(srepr)
[docs]def get_workdir(iteration, workroot): """Find what is the root of the work-tree at a given iteration""" if workroot is None: workdir = None else: if iteration is None: myworkdir = "noiter" else: try: myworkdir = "-".join([str(it) for it in iteration]) except TypeError: myworkdir = str(iteration) workdir = os.path.abspath(os.path.join(workroot, myworkdir)) return workdir
[docs]def create_workdir(workdir, templatedir): """Create a new and clean work directory tree from template""" if workdir is None: return if os.path.exists(workdir): shutil.rmtree(workdir) if templatedir is not None: shutil.copytree(templatedir, workdir, symlinks=True) else: os.mkdir(workdir)
[docs]def destroy_workdir(workdir): """Remove the entire work directory tree""" if workdir is not None: shutil.rmtree(workdir)