# Source code for krotov.optimize

```
import copy
import inspect
import logging
import time
from functools import partial
import numpy as np
from qutip import Qobj
from qutip.parallel import serial_map
from .conversions import (
control_onto_interval,
discretize,
extract_controls,
extract_controls_mapping,
plug_in_pulse_values,
pulse_onto_tlist,
pulse_options_dict_to_list,
)
from .info_hooks import chain
from .mu import derivative_wrt_pulse
from .propagators import Propagator, expm
from .result import Result
from .second_order import _overlap
from .shapes import one_shape, zero_shape
__all__ = ['optimize_pulses']
[docs]def optimize_pulses(
objectives,
pulse_options,
tlist,
*,
propagator,
chi_constructor,
mu=None,
sigma=None,
iter_start=0,
iter_stop=5000,
check_convergence=None,
info_hook=None,
modify_params_after_iter=None,
storage='array',
parallel_map=None,
store_all_pulses=False,
continue_from=None,
skip_initial_forward_propagation=False,
norm=None,
overlap=None
):
r"""Use Krotov's method to optimize towards the given `objectives`.
Optimize all time-dependent controls found in the Hamiltonians or
Liouvillians of the given `objectives`.
Args:
objectives (list[Objective]): List of objectives
pulse_options (dict): Mapping of time-dependent controls found in the
Hamiltonians of the objectives to a dictionary of options for that
control. There must be options given for *every* control. As numpy
arrays are unhashable and thus cannot be used as dict keys, the
options for a control that is an array must be set using the key
``id(control)`` (see the example below). The options of any
particular control *must* contain the following keys:
* ``'lambda_a'``: the Krotov step size (float value). This governs
the overall magnitude of the pulse update. Large values result in
small updates. Small values may lead to sharp spikes and
numerical instability.
* ``'update_shape'`` : Function S(t) in the range [0, 1] that
scales the pulse update for the pulse value at t. This can be
used to ensure boundary conditions (S(0) = S(T) = 0), and enforce
smooth switch-on and switch-off. This can be a callable that
takes a single argument `t`; or the values 1 or 0 for a constant
update-shape. The value 0 disables the optimization of that
particular control.
In addition, the following keys *may* occur:
* ``'args'``: If the control is a callable with arguments
``(t, args)`` (as required by QuTiP), a dict of argument values
to pass as `args`. If ``'args'`` is not specified via the
`pulse_options`, controls will be discretized using the default
``args=None``.
For example, for `objectives` that contain a Hamiltonian of the
form ``[H0, [H1, u], [H2, g]]``, where ``H0``, ``H1``, and ``H2``
are :class:`~qutip.Qobj` instances, ``u`` is a numpy array
.. doctest::
>>> u = numpy.zeros(1000)
and ``g`` is a control function
.. doctest::
>>> def g(t, args):
... E0 = args.get('E0', 0.0)
... return E0
then a possible value for `pulse_options` would look like this:
.. doctest::
>>> from krotov.shapes import flattop
>>> from functools import partial
>>> pulse_options = {
... id(u): {'lambda_a': 1.0, 'update_shape': 1},
... g: dict(
... lambda_a=1.0,
... update_shape=partial(
... flattop, t_start=0, t_stop=10, t_rise=1.5
... ),
... args=dict(E0=1.0)
... )
... }
The use of :class:`dict` and the ``{...}`` syntax are completely
equivalent, but :class:`dict` is better for nested indentation.
tlist (numpy.ndarray): Array of time grid values, cf.
:func:`~qutip.mesolve.mesolve`
propagator (callable or list[callable]): Function that propagates the
state backward or forwards in time by a single time step, between
two points in `tlist`. Alternatively, a list of functions, one for
each objective. If the propagator is stateful, it should be an
instance of :class:`krotov.propagators.Propagator`. See
:mod:`krotov.propagators` for details.
chi_constructor (callable): Function that calculates the boundary
condition for the backward propagation. This is where the
final-time functional (indirectly) enters the optimization.
See :mod:`krotov.functionals` for details.
mu (None or callable): Function that calculates the derivative
$\frac{\partial H}{\partial\epsilon}$ for an equation of motion
$\dot{\phi}(t) = -i H[\phi(t)]$ of an abstract operator $H$ and an
abstract state $\phi$. If None, defaults to
:func:`krotov.mu.derivative_wrt_pulse`, which covers the standard
Schrödinger and master equations. See :mod:`krotov.mu` for a
full explanation of the role of `mu` in the optimization, and the
required function signature.
sigma (None or krotov.second_order.Sigma): Function (instance of a
:class:`.Sigma` subclass) that calculates the
second-order contribution. If None, the first-order Krotov method
is used.
iter_start (int): The formal iteration number at which to start the
optimization
iter_stop (int): The iteration number after which to end the
optimization, whether or not convergence has been reached
check_convergence (None or callable): Function that determines whether
the optimization has converged. If None, the optimization will only
end when `iter_stop` is reached. See :mod:`krotov.convergence` for
details.
info_hook (None or callable): Function that is called after each
iteration of the optimization, for the purpose of analysis. Any
value returned by `info_hook` (e.g. an evaluated functional
:math:`J_T`) will be stored, for each iteration, in the `info_vals`
attribute of the returned :class:`.Result`. The `info_hook` must
have the same signature as
:func:`krotov.info_hooks.print_debug_information`. It should not
modify its arguments in any way, except for `shared_data`.
modify_params_after_iter (None or callable): Function that is called
after each iteration, which may modify its arguments for certain
advanced use cases, such as dynamically adjusting `lambda_vals`, or
applying spectral filters to the `optimized_pulses`. It has the
same interface as `info_hook` but should not return anything. The
`modify_params_after_iter` function is called immediately before
`info_hook`, and can transfer arbitrary data to any subsequent
`info_hook` via the `shared_data` argument.
storage (callable): Storage constructor for the storage of
propagated states. Must accept an integer parameter `N` and return
an empty array-like container of length `N`. The default value
'array' is equivalent to
``functools.partial(numpy.empty, dtype=object)``.
parallel_map (callable or tuple or None): Parallel function evaluator.
If given as a callable, the argument must have the same
specification as :func:`qutip.parallel.serial_map`.
A value of None is the same as passing
:func:`qutip.parallel.serial_map`. If given as a tuple, that tuple
must contain three callables, each of which has the same
specification as :func:`qutip.parallel.serial_map`. These three
callables are used to parallelize (1) the initial
forward-propagation, (2) the backward-propagation under the guess
pulses, and (3) the forward-propagation by a single time step under
the optimized pulses. See :mod:`krotov.parallelization` for
details.
store_all_pulses (bool): Whether or not to store the optimized pulses
from *all* iterations in :class:`.Result`.
continue_from (None or Result): If given, continue an optimization from
a previous :class:`.Result`. The result must have identical
`objectives`.
skip_initial_forward_propagation (bool): If given as `True` together
with `continue_from`, skip the initial forward propagation ("zeroth
iteration"), and take the forward-propagated states from
:attr:`.Result.states` instead.
norm (callable or None): A single-argument function to calculate the
norm of states. If None, delegate to the :meth:`~qutip.Qobj.norm`
method of the states.
overlap (callable or None): A two-argument function to calculate the
complex overlap of two states. If None, delegate to
:meth:`qutip.Qobj.overlap` for Hilbert space states and to the
Hilbert-Schmidt norm $\tr[\rho_1^\dagger \rho2]$ for density
matrices or operators.
Returns:
Result: The result of the optimization.
Raises:
ValueError: If any controls are not real-valued, or if any update
shape is not a real-valued function in the range [0, 1]; if using
`continue_from` with a :class:`.Result` with differing
`objectives`; if there are any required keys missing in
`pulse_options`.
"""
logger = logging.getLogger('krotov')
# Initialization
logger.info("Initializing optimization with Krotov's method")
if mu is None:
mu = derivative_wrt_pulse
second_order = sigma is not None
if norm is None:
norm = lambda state: state.norm()
if overlap is None:
overlap = _overlap
if modify_params_after_iter is not None:
# From a technical perspective, the `modify_params_after_iter` is
# really just another info_hook, the only difference is the
# convention that info_hooks shouldn't modify the parameters.
if info_hook is None:
info_hook = modify_params_after_iter
else:
info_hook = chain(modify_params_after_iter, info_hook)
if isinstance(propagator, list):
propagators = propagator
assert len(propagators) == len(objectives)
else:
propagators = [copy.deepcopy(propagator) for _ in objectives]
# copy.deepcopy will only do something on Propagator objects. For
# functions (even with closures), it just returns the same function.
_check_propagators_interface(propagators, logger)
adjoint_objectives = [obj.adjoint() for obj in objectives]
if storage == 'array':
storage = partial(np.empty, dtype=object)
if parallel_map is None:
parallel_map = serial_map
if not isinstance(parallel_map, (tuple, list)):
parallel_map = (parallel_map, parallel_map, parallel_map)
(
guess_controls, # "controls": sampled on the time grid
guess_pulses, # "pulses": sampled on the time grid intervals
pulses_mapping, # keep track of where to plug in pulse values
lambda_vals, # Krotov step width λₐ, for each control
shape_arrays, # update shape S(t), per control, sampled on intervals
) = _initialize_krotov_controls(objectives, pulse_options, tlist)
if continue_from is not None:
guess_controls, guess_pulses = _restore_from_previous_result(
continue_from, objectives, tlist, store_all_pulses
)
g_a_integrals = np.zeros(len(guess_pulses))
# ∫gₐ(t)dt is a very useful measure of whether λₐ is too small (large
# ∫gₐ(t)dt, relative to the pulse amplitude), and whether we're approaching
# convergence ("speeding up" for increasing values, "slowing down" for
# decreasing values)
if continue_from is None:
result = Result()
result.start_local_time = time.localtime()
else:
result = copy.deepcopy(continue_from)
# Initial forward-propagation
tic = time.time()
if skip_initial_forward_propagation:
forward_states = _skip_initial_forward_propagation(
objectives, continue_from, sigma, logger
)
else:
forward_states = parallel_map[0](
_forward_propagation,
list(range(len(objectives))),
(
objectives,
guess_pulses,
pulses_mapping,
tlist,
propagators,
storage,
),
)
toc = time.time()
fw_states_T = [states[-1] for states in forward_states]
tau_vals = np.array(
[
overlap(obj.target, state_T)
for (state_T, obj) in zip(fw_states_T, objectives)
]
)
if second_order:
forward_states0 = forward_states # ∀t: Δϕ=0, for iteration 0
else:
# the forward-propagated states only need to be stored for the second
# order update
forward_states0 = forward_states = None
info = None
optimized_pulses = copy.deepcopy(guess_pulses)
info_hook_static_args = dict(
# these do no change between iterations (although
# `modify_params_after_iter` may modify any of these, to
# the extent that they're mutable)
objectives=objectives,
adjoint_objectives=adjoint_objectives,
lambda_vals=lambda_vals,
shape_arrays=shape_arrays,
tlist=tlist,
propagator=propagator,
chi_constructor=chi_constructor,
mu=mu,
sigma=sigma,
iter_start=iter_start,
iter_stop=iter_stop,
)
if info_hook is not None:
info = info_hook(
backward_states=None,
forward_states=forward_states,
forward_states0=forward_states0,
guess_pulses=guess_pulses,
optimized_pulses=optimized_pulses,
g_a_integrals=g_a_integrals,
fw_states_T=fw_states_T,
tau_vals=tau_vals,
start_time=tic,
stop_time=toc,
iteration=0,
info_vals=[],
shared_data={},
**info_hook_static_args
)
# Initialize Result object
result.tlist = tlist
result.objectives = objectives
result.guess_controls = guess_controls
result.optimized_controls = optimized_pulses
result.controls_mapping = pulses_mapping
if continue_from is None:
# we only store information about the "0" iteration if we're starting a
# new optimization
if info is not None:
result.info_vals.append(info)
result.iters.append(0)
result.iter_seconds.append(int(toc - tic))
if not np.all(tau_vals == None): # noqa
result.tau_vals.append(tau_vals)
if store_all_pulses:
result.all_pulses.append(guess_pulses)
else:
iter_start = continue_from.iters[-1]
logger.info(
"Continuing from previous result, with iteration %d",
iter_start + 1,
)
result.states = fw_states_T
# Main optimization loop
for krotov_iteration in range(iter_start + 1, iter_stop + 1):
logger.info("Started Krotov iteration %d", krotov_iteration)
tic = time.time()
# Boundary condition for the backward propagation
# -- this is where the functional enters the optimization.
# `fw_states_T` are the states forward-propagated under the guess pulse
# of the current iteration, which is the optimized pulse of the
# previous iteration. This is how we get the `fw_states_T` here: they
# are left over from the forward-propagation in the previous iteration.
chi_states = chi_constructor(
fw_states_T=fw_states_T, objectives=objectives, tau_vals=tau_vals
)
chi_norms = [norm(chi) for chi in chi_states]
# normalizing χ improves numerical stability; the norm then has to be
# taken into account when calculating Δϵ
chi_states = [chi / nrm for (chi, nrm) in zip(chi_states, chi_norms)]
# Backward propagation
backward_states = parallel_map[1](
_backward_propagation,
list(range(len(objectives))),
(
chi_states,
adjoint_objectives,
guess_pulses,
pulses_mapping,
tlist,
propagators,
storage,
),
)
# Forward propagation and pulse update
logger.info("Started forward propagation/pulse update")
if second_order:
forward_states = [
storage(len(tlist)) for _ in range(len(objectives))
]
g_a_integrals[:] = 0.0
if second_order:
# In the update for the pulses in the first time interval, we use
# the states at t=0. Hence, Δϕ(t=0) = 0
delta_phis = [
Qobj(np.zeros(shape=chi_states[k].shape))
for k in range(len(objectives))
]
if second_order:
for i_obj in range(len(objectives)):
forward_states[i_obj][0] = objectives[i_obj].initial_state
delta_eps = [
np.zeros(len(tlist) - 1, dtype=np.complex128) for _ in guess_pulses
]
optimized_pulses = copy.deepcopy(guess_pulses)
fw_states = [obj.initial_state for obj in objectives]
for time_index in range(len(tlist) - 1): # iterate over time intervals
dt = tlist[time_index + 1] - tlist[time_index]
if second_order:
σ = sigma(tlist[time_index] + 0.5 * dt)
# pulse update
for (i_pulse, guess_pulse) in enumerate(guess_pulses):
for (i_obj, objective) in enumerate(objectives):
χ = backward_states[i_obj][time_index]
μ = mu(
objectives,
i_obj,
guess_pulses,
pulses_mapping,
i_pulse,
time_index,
)
Ψ = fw_states[i_obj]
update = overlap(χ, μ(Ψ)) # ⟨χ|μ|Ψ⟩ ∈ ℂ
update *= chi_norms[i_obj]
if second_order:
update += 0.5 * σ * overlap(delta_phis[i_obj], μ(Ψ))
delta_eps[i_pulse][time_index] += update
λₐ = lambda_vals[i_pulse]
S_t = shape_arrays[i_pulse][time_index]
Δϵ = (S_t / λₐ) * delta_eps[i_pulse][time_index].imag # ∈ ℝ
g_a_integrals[i_pulse] += abs(Δϵ) ** 2 * dt # dt may vary!
optimized_pulses[i_pulse][time_index] += Δϵ
# forward propagation
fw_states = parallel_map[2](
_forward_propagation_step,
list(range(len(objectives))),
(
fw_states,
objectives,
optimized_pulses,
pulses_mapping,
tlist,
time_index,
propagators,
),
)
if second_order:
# Δϕ(t + dt), to be used for the update in the next interval
delta_phis = [
fw_states[k] - forward_states0[k][time_index + 1]
for k in range(len(objectives))
]
# storage
for i_obj in range(len(objectives)):
forward_states[i_obj][time_index + 1] = fw_states[i_obj]
logger.info("Finished forward propagation/pulse update")
fw_states_T = fw_states
tau_vals = np.array(
[
overlap(obj.target, fw_state_T)
for (fw_state_T, obj) in zip(fw_states_T, objectives)
]
)
toc = time.time()
# Display information about iteration
if info_hook is not None:
info = info_hook(
backward_states=backward_states,
forward_states=forward_states,
forward_states0=forward_states0,
fw_states_T=fw_states_T,
guess_pulses=guess_pulses,
optimized_pulses=optimized_pulses,
g_a_integrals=g_a_integrals,
tau_vals=tau_vals,
start_time=tic,
stop_time=toc,
info_vals=result.info_vals,
shared_data={},
iteration=krotov_iteration,
**info_hook_static_args
)
# Update optimization `result` with info from finished iteration
result.iters.append(krotov_iteration)
result.iter_seconds.append(int(toc - tic))
if info is not None:
result.info_vals.append(info)
if not np.all(tau_vals == None): # noqa
result.tau_vals.append(tau_vals)
result.optimized_controls = optimized_pulses
# pulses (time intervals) will be converted to controls (time grid
# points) farther below in "Finalize"
if store_all_pulses:
# we need to make a copy, so that the conversion in "Finalize"
# doesn't affect `all_pulses` as well.
result.all_pulses.append(copy.deepcopy(optimized_pulses))
result.states = fw_states_T
logger.info("Finished Krotov iteration %d", krotov_iteration)
# Convergence check
msg = None
if check_convergence is not None:
msg = check_convergence(result)
if krotov_iteration >= info_hook_static_args['iter_stop']:
# modify_params_after_iter may change iter_stop!
iter_stop = info_hook_static_args['iter_stop']
result.message = "Reached %d iterations" % iter_stop
break
if bool(msg) is True: # this is not an anti-pattern!
result.message = "Reached convergence"
if isinstance(msg, str):
result.message += ": " + msg
break
else:
# prepare for next iteration
guess_pulses = optimized_pulses
if second_order:
sigma.refresh(
forward_states=forward_states,
forward_states0=forward_states0,
chi_states=chi_states,
chi_norms=chi_norms,
optimized_pulses=optimized_pulses,
guess_pulses=guess_pulses,
objectives=objectives,
result=result,
)
forward_states0 = forward_states
else: # optimization finished without `check_convergence` break
result.message = "Reached %d iterations" % max(iter_start, iter_stop)
# Finalize
result.end_local_time = time.localtime()
for i, pulse in enumerate(optimized_pulses):
result.optimized_controls[i] = pulse_onto_tlist(pulse)
return result
def _shape_val_to_callable(val):
if val == 1:
return one_shape
elif val == 0:
return zero_shape
else:
if callable(val):
return val
else:
raise ValueError("update_shape must be a callable")
def _enforce_shape_array_range(shape_array):
"""Enforce values ∈ [0, 1] in shape array, with some room for
rounding errors that will be clipped away.
"""
assert not np.iscomplexobj(shape_array) # `discretize` should catch this
# the rounding errors can be introduced by control_onto_interval, and
# result in values slightly below 0 or above 1. We allow a generous margin
# of ±0.01; if something nonsensical is passed as a shape, we can be pretty
# sure that it will deviate by a significantly larger error.
if np.min(shape_array) < -0.01 or np.max(shape_array) > 1.01:
raise ValueError(
"Update shapes ('update_shape' in pulse options-dict) must have "
"values in the range [0, 1], not [%s, %s]"
% (np.min(shape_array), np.max(shape_array))
)
return np.clip(shape_array, a_min=0.0, a_max=1.0)
def _check_propagators_interface(propagators, logger):
"""Warn if any of the propagators do not have the expected interface.
In order to pass muster, each propagator must either have the same
interface as :func:`krotov.propagators.expm`, or
:meth:`krotov.propagators.Propagator.__call__`
"""
for propagator in propagators:
spec = inspect.getfullargspec(propagator)
spec_func = inspect.getfullargspec(expm)
spec_obj = inspect.getfullargspec(Propagator.__call__)
if spec != spec_func and spec != spec_obj:
logger.warning(
"The propagator %s does not have the expected interface.",
propagator,
)
def _initialize_krotov_controls(objectives, pulse_options, tlist):
"""Extract discretized guess controls and pulses from `objectives`, and
return them with the associated mapping and option data"""
guess_controls = extract_controls(objectives)
pulses_mapping = extract_controls_mapping(objectives, guess_controls)
options_list = pulse_options_dict_to_list(pulse_options, guess_controls)
try:
guess_controls = [
discretize(
control,
tlist,
args=(pulse_options[control].get('args', None),),
)
for control in guess_controls
]
except (TypeError, np.ComplexWarning) as exc_info:
raise ValueError(
"Cannot discretize controls: %s. Note that "
"all controls must be real-valued. Complex controls must be "
"split into an independent real and imaginary part in the "
"objectives before passing them to the optimization" % exc_info
)
guess_pulses = [ # defined on the tlist intervals
control_onto_interval(control) for control in guess_controls
]
try:
lambda_vals = np.array(
[float(options['lambda_a']) for options in options_list]
)
except KeyError:
raise ValueError(
"Each value in pulse_options must be a dict that contains "
"the key 'lambda_a'."
)
shape_arrays = []
for options in options_list:
try:
S = discretize(
_shape_val_to_callable(options['update_shape']), tlist, args=()
)
except KeyError:
raise ValueError(
"Each value in pulse_options must be a dict that contains "
"the key 'update_shape'."
)
except (TypeError, np.ComplexWarning) as exc_info:
raise ValueError(
"Update shapes ('update_shape' in pulse options-dict) must be "
"real-valued: %s" % exc_info
)
shape_arrays.append(
_enforce_shape_array_range(control_onto_interval(S))
)
return (
guess_controls,
guess_pulses,
pulses_mapping,
lambda_vals,
shape_arrays,
)
def _restore_from_previous_result(result, objectives, tlist, store_all_pulses):
"""Load `guess_controls` and `guess_pulses` from the given Result object.
Raises:
ValueError: if `result` is incompatible with the given `objectives` and
`tlist`.
"""
if not isinstance(result, Result):
raise ValueError("Continuation is only possible from a Result object")
if len(objectives) != len(result.objectives):
raise ValueError(
"When continuing from a previous Result, the number of "
"objectives must be the same"
)
for (a, b) in zip(objectives, result.objectives):
if a != b:
raise ValueError(
"When continuing from a previous Result, the objectives must "
"remain unchanged"
)
if store_all_pulses:
if len(result.all_pulses) == 0:
raise ValueError(
"The store_all_pulses parameter cannot be changed when "
"continuing from a previous Result. Pass it as False."
)
else:
if len(result.all_pulses) > 0:
raise ValueError(
"The store_all_pulses parameter cannot be changed when "
"continuing from a previous Result. Pass it as True."
)
try:
if np.max(np.abs(np.array(tlist) - np.array(result.tlist))) > 1e-5:
# we're ok with pretty significant rounding errors: if someone is
# doing something stupid (like changing physical units), the error
# should be large
raise ValueError("tlist does not match")
except ValueError:
# if the size of the tlist has changed, the "if" statement will already
# raise a ValueError
raise ValueError(
"When continuing from a previous Result, the controls must be "
"defined on the same time grid"
)
nt = len(tlist)
guess_controls = []
for control in result.optimized_controls:
if len(control) == nt - 1:
# the Result was dumped before the optimization was complete
# (see krotov.konvergence.dump_result), so that the
# `optimized_controls` are actually pulses (defined on the
# intervals)
guess_controls.append(pulse_onto_tlist(control))
elif len(control) == nt:
# the Result was dumped after the optimization was complete, so
# that the `optmized_controls` are in fact controls (defined on the
# points of tlist)
guess_controls.append(control)
else:
# this should never happen
raise ValueError(
"Invalid Result: optimized_controls and tlist are incongruent"
)
guess_pulses = [ # defined on the tlist intervals
control_onto_interval(control) for control in guess_controls
]
return guess_controls, guess_pulses
def _skip_initial_forward_propagation(objectives, result, sigma, logger):
"""Extract forward_states from existing result"""
if sigma is not None:
raise ValueError(
"skip_initial_forward_propagation is incompatible with "
"second order Krotov (sigma is not None)"
)
if result is not None:
# forwards_states is normally a list of storage objects that store
# the *entire* forward propagation for each objective. The stored
# states are only needed for second order Krotov. When skipping the
# forward propagation, we'll use a fake "storage object" that
# contains only the final state. That'll be sufficient for setting
# fw_states_T and tau_vals below
forward_states = [[state] for state in result.states]
else:
logger.warning(
"You should not use `skip_initial_forward_propagation` unless "
"you are also passing `continue_from`"
)
# If the chi_constructor does not depend on the fw_states_T (like
# chis_re), and you're using first order Krotov, you don't actually
# have to do the initial forward propagation. It's not really worth
# it to skip that propagation, though, as we usually have to do
# hundreds of iterations. So, this is an undocumented feature.
forward_states = [[None] for _ in objectives]
return forward_states
def _forward_propagation(
i_objective,
objectives,
pulses,
pulses_mapping,
tlist,
propagators,
storage,
store_all=True,
):
"""Forward propagation of the initial state of a single objective over the
entire `tlist`"""
logger = logging.getLogger('krotov')
logger.info(
"Started initial forward propagation of objective %d", i_objective
)
obj = objectives[i_objective]
state = obj.initial_state
if store_all:
storage_array = storage(len(tlist))
storage_array[0] = state
mapping = pulses_mapping[i_objective]
for time_index in range(len(tlist) - 1): # index over intervals
H = plug_in_pulse_values(obj.H, pulses, mapping[0], time_index)
c_ops = [
plug_in_pulse_values(c_op, pulses, mapping[ic + 1], time_index)
for (ic, c_op) in enumerate(obj.c_ops)
]
dt = tlist[time_index + 1] - tlist[time_index]
state = propagators[i_objective](
H, state, dt, c_ops, initialize=(time_index == 0)
)
if store_all:
storage_array[time_index + 1] = state
logger.info(
"Finished initial forward propagation of objective %d", i_objective
)
if store_all:
return storage_array
else:
return state
def _backward_propagation(
i_state,
chi_states,
adjoint_objectives,
pulses,
pulses_mapping,
tlist,
propagators,
storage,
):
"""Backward propagation of chi_states[i_state] over the entire `tlist`"""
logger = logging.getLogger('krotov')
logger.info("Started backward propagation of state %d", i_state)
state = chi_states[i_state]
obj = adjoint_objectives[i_state]
storage_array = storage(len(tlist))
storage_array[-1] = state
mapping = pulses_mapping[i_state]
for time_index in range(len(tlist) - 2, -1, -1): # index bw over intervals
H = plug_in_pulse_values(
obj.H, pulses, mapping[0], time_index, conjugate=True
)
c_ops = [
plug_in_pulse_values(c_op, pulses, mapping[ic + 1], time_index)
for (ic, c_op) in enumerate(obj.c_ops)
]
dt = tlist[time_index + 1] - tlist[time_index]
state = propagators[i_state](
H,
state,
dt,
c_ops,
backwards=True,
initialize=(time_index == len(tlist) - 2),
)
storage_array[time_index] = state
logger.info("Finished backward propagation of state %d", i_state)
return storage_array
def _forward_propagation_step(
i_state,
states,
objectives,
pulses,
pulses_mapping,
tlist,
time_index,
propagators,
):
"""Forward-propagate states[i_state] by a single time step"""
state = states[i_state]
obj = objectives[i_state]
mapping = pulses_mapping[i_state]
H = plug_in_pulse_values(obj.H, pulses, mapping[0], time_index)
c_ops = [
plug_in_pulse_values(c_op, pulses, mapping[ic + 1], time_index)
for (ic, c_op) in enumerate(obj.c_ops)
]
dt = tlist[time_index + 1] - tlist[time_index]
return propagators[i_state](
H, state, dt, c_ops, initialize=(time_index == 0)
)
```