# param_sweep.py
#
# This file is part of scqubits.
#
# Copyright (c) 2019 and later, Jens Koch and Peter Groszkowski
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the LICENSE file
# in the root directory of this source tree.
# ###########################################################################
import copy
import functools
import itertools
import warnings
from abc import ABC
from collections import OrderedDict
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union
import numpy as np
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from numpy import ndarray
import scqubits.core.central_dispatch as dispatch
import scqubits.core.descriptors as descriptors
import scqubits.core.sweeps as sweeps
import scqubits.io_utils.fileio_serializers as serializers
import scqubits.settings as settings
import scqubits.utils.cpu_switch as cpu_switch
import scqubits.utils.misc as utils
import scqubits.utils.plotting as plot
from scqubits.core._param_sweep import _ParameterSweep
from scqubits.core.hilbert_space import HilbertSpace
from scqubits.core.namedslots_array import (
NamedSlotsNdarray,
Parameters,
convert_to_std_npindex,
)
from scqubits.core.oscillator import Oscillator
from scqubits.core.qubit_base import QubitBaseClass
from scqubits.core.spectrum_lookup import SpectrumLookupMixin
from scqubits.core.storage import SpectrumData
if TYPE_CHECKING:
from scqubits.io_utils.fileio import IOData
if settings.IN_IPYTHON:
from tqdm.notebook import tqdm
else:
from tqdm import tqdm
QuantumSys = Union[QubitBaseClass, Oscillator]
GIndex = Union[int, float, complex, slice, Tuple[int], List[int]]
GIndexTuple = Tuple[GIndex, ...]
NpIndex = Union[int, slice, Tuple[int], List[int]]
NpIndexTuple = Tuple[NpIndex, ...]
NpIndices = Union[NpIndex, NpIndexTuple]
class ParameterSweepBase(ABC):
"""
The_ParameterSweepBase class is an abstract base class for ParameterSweep and
StoredSweep
"""
_parameters = descriptors.WatchedProperty("PARAMETERSWEEP_UPDATE")
_evals_count = descriptors.WatchedProperty("PARAMETERSWEEP_UPDATE")
_data = descriptors.WatchedProperty("PARAMETERSWEEP_UPDATE")
_hilbertspace: HilbertSpace
_out_of_sync = False
_current_param_indices: Union[NpIndices, None]
def get_subsys(self, index: int) -> QuantumSys:
return self._hilbertspace[index]
def get_subsys_index(self, subsys: QuantumSys) -> int:
return self._hilbertspace.get_subsys_index(subsys)
@property
def osc_subsys_list(self) -> List[Oscillator]:
return self._hilbertspace.osc_subsys_list
@property
def qbt_subsys_list(self) -> List[QubitBaseClass]:
return self._hilbertspace.qbt_subsys_list
@property
def subsystem_count(self) -> int:
return self._hilbertspace.subsystem_count
@utils.check_sync_status
def __getitem__(self, key):
if isinstance(key, str):
return self._data[key]
# The following enables the pre-slicing syntax:
# <Sweep>[p1, p2, ...].dressed_eigenstates()
if isinstance(key, tuple):
self._current_param_indices = convert_to_std_npindex(key, self._parameters)
elif isinstance(key, (int, slice)):
if key == slice(None) or key == slice(None, None, None):
key = (key,) * len(self._parameters)
else:
key = (key,)
self._current_param_indices = convert_to_std_npindex(key, self._parameters)
return self
def receive(self, event: str, sender: object, **kwargs) -> None:
"""Hook to CENTRAL_DISPATCH. This method is accessed by the global
CentralDispatch instance whenever an event occurs that ParameterSweep is
registered for. In reaction to update events, the lookup table is marked as
out of sync.
Parameters
----------
event:
type of event being received
sender:
identity of sender announcing the event
**kwargs
"""
if self._data:
if event == "HILBERTSPACE_UPDATE" and sender is self._hilbertspace:
self._out_of_sync = True
elif event == "PARAMETERSWEEP_UPDATE" and sender is self:
self._out_of_sync = True
@property
def bare_specdata_list(self) -> List[SpectrumData]:
"""
Wrap bare eigensystem data into a SpectrumData object. To be used with
pre-slicing, e.g. `<ParameterSweep>[0, :].bare_specdata_list`
Returns
-------
List of `SpectrumData` objects with bare eigensystem data, one per subsystem
"""
multi_index = self._current_param_indices
sweep_param_indices = self.get_sweep_indices(multi_index)
if len(sweep_param_indices) != 1:
raise ValueError(
"All but one parameter must be fixed for `bare_specdata_list`."
)
sweep_param_name = self._parameters.name_by_index[sweep_param_indices[0]]
specdata_list: List[SpectrumData] = []
for subsys_index, subsystem in enumerate(self._hilbertspace):
evals_swp = self["bare_evals"][subsys_index][multi_index]
evecs_swp = self["bare_evecs"][subsys_index][multi_index]
specdata_list.append(
SpectrumData(
energy_table=evals_swp.toarray(),
state_table=evecs_swp.toarray(),
system_params=self._hilbertspace.get_initdata(),
param_name=sweep_param_name,
param_vals=self._parameters[sweep_param_name],
)
)
self._current_param_indices = None
return specdata_list
@property
def dressed_specdata(self) -> "SpectrumData":
"""
Wrap dressed eigensystem data into a SpectrumData object. To be used with
pre-slicing, e.g. `<ParameterSweep>[0, :].dressed_specdata`
Returns
-------
`SpectrumData` object with bare eigensystem data
"""
multi_index = self._current_param_indices
sweep_param_indices = self.get_sweep_indices(multi_index)
if len(sweep_param_indices) != 1:
raise ValueError(
"All but one parameter must be fixed for `dressed_specdata`."
)
sweep_param_name = self._parameters.name_by_index[sweep_param_indices[0]]
specdata = SpectrumData(
energy_table=self["evals"][multi_index].toarray(),
state_table=self["evecs"][multi_index].toarray(),
system_params=self._hilbertspace.get_initdata(),
param_name=sweep_param_name,
param_vals=self._parameters[sweep_param_name],
)
self._current_param_indices = None
return specdata
def get_sweep_indices(self, multi_index: GIndexTuple) -> List[int]:
"""
For given generalized multi-index, return a list of the indices that are being
swept.
"""
std_multi_index = convert_to_std_npindex(multi_index, self._parameters)
sweep_indices = [
index
for index, index_obj in enumerate(std_multi_index)
if isinstance(
self._parameters.paramvals_list[index][index_obj],
(list, tuple, ndarray),
)
]
self._current_param_indices = None
return sweep_indices
@property
def system_params(self) -> Dict[str, Any]:
return self._hilbertspace.get_initdata()
def _final_states_subsys(
self, subsystem: QuantumSys, initial_tuple: Tuple[int, ...]
) -> List[Tuple[int, ...]]:
"""For given initial statet of the composite quantum system, return the final
states possible to reach by changing the energy level of the given
`subsystem`"""
subsys_index = self._hilbertspace.get_subsys_index(subsystem)
final_tuples_list = []
for level in range(subsystem.truncated_dim):
final_state = list(initial_tuple)
final_state[subsys_index] = level
final_tuples_list.append(tuple(final_state))
final_tuples_list.remove(initial_tuple)
return final_tuples_list
def _get_final_states(
self,
initial_state: Tuple[int],
subsys_list: List[QuantumSys],
final: Union[Tuple[int, ...], None],
sidebands: bool,
) -> List[Tuple[int, ...]]:
"""Construct and return the possible final states as a list, based on the
provided initial state, a list of active subsystems and flag for whether to
include sideband transitions."""
if final:
return [final]
if not sidebands:
final_state_list = []
for subsys in subsys_list:
final_state_list += self._final_states_subsys(subsys, initial_state)
return final_state_list
range_list = [range(dim) for dim in self._hilbertspace.subsystem_dims]
for subsys_index, subsys in enumerate(self._hilbertspace):
if subsys not in subsys_list:
range_list[subsys_index] = [initial_state[subsys_index]]
final_state_list = list(itertools.product(*range_list))
return final_state_list
def _complete_state(
self,
partial_state: Union[List[int], Tuple[int]],
subsys_list: List[QuantumSys],
) -> List[int]:
"""A partial state only includes entries for active subsystems. Complete this
state by inserting 0 entries for all inactive subsystems."""
state_full = [0] * len(self._hilbertspace)
for entry, subsys in zip(partial_state, subsys_list):
subsys_index = self.get_subsys_index(subsys)
state_full[subsys_index] = entry
return state_full
def transitions(
self,
subsystems: Optional[Union[QuantumSys, List[QuantumSys]]] = None,
initial: Optional[Union[int, Tuple[int, ...]]] = None,
final: Optional[Tuple[int, ...]] = None,
sidebands: bool = False,
make_positive: bool = False,
as_specdata: bool = False,
param_indices: Optional[NpIndices] = None,
) -> Union[Tuple[List[Tuple[int, ...]], List[NamedSlotsNdarray]], SpectrumData]:
"""
Use dressed eigenenergy data and lookup based on bare product state labels to
extract transition energy data. Usage is based on preslicing to select all or
a subset of parameters to be involved in the sweep, e.g.,
`<ParameterSweep>[0, :, 2].transitions()`
produces all eigenenergy differences for transitions starting in the ground
state (default when no initial state is specified) as a function of the middle
parameter while parameters 1 and 3 are fixed by the indices 0 and 2.
Parameters
----------
subsystems:
single subsystems or list of subsystems considered as "active" for the
transitions to be generated; if omitted as a parameter, all subsystems
are considered as actively participating in the transitions
initial:
initial state from which transitions originate, specified as a bare product
state of either all subsystems the subset of active subsystems
(default: ground state of the system)
final:
concrete final state for which the transition energy should be generated; if
not provided, a list of allowed final states is generated
sidebands:
if set to true, sideband transitions with multiple subsystems changing
excitation levels are included (default: False)
make_positive:
boolean option relevant if the initial state is an excited state;
downwards transition energies would regularly be negative, but are
converted to positive if this flag is set to True
as_specdata:
whether data is handed back in raw array form or wrapped into a SpectrumData
object (default: False)
param_indices:
usually to be omitted, as param_indices will be set via pre-slicing
Returns
-------
A tuple consisting of a list of all the transitions and a corresponding
list of difference energies, e.g.
((0,0,0), (0,0,1)), <energy array for transition 0,0,0 -> 0,0,1>.
If as_specdata is set to True, a SpectrumData object is returned instead,
saving transition label info in an attribute named `labels`.
"""
param_indices = param_indices or self._current_param_indices
if subsystems is None:
subsys_list = self._hilbertspace.subsys_list
elif isinstance(subsystems, (QubitBaseClass, Oscillator)):
subsys_list = [subsystems]
else:
subsys_list = subsystems
if initial is None:
initial_state = (0,) * len(self._hilbertspace)
elif isinstance(initial, int):
initial_state = (initial,)
else:
initial_state = initial
if len(initial_state) not in [len(self._hilbertspace), len(subsys_list)]:
raise ValueError(
"Initial state information provided is not compatible "
"with the specified subsystems(s) provided."
)
if len(initial_state) < len(self._hilbertspace):
initial_state = self._complete_initial_state(initial_state, subsys_list)
final_states_list = self._get_final_states(
initial_state, subsys_list, final, sidebands
)
transitions = []
transition_energies = []
initial_energies = self[param_indices].energy_by_bare_index(initial_state)
for final_state in final_states_list:
final_energies = self[param_indices].energy_by_bare_index(final_state)
diff_energies = (final_energies - initial_energies).astype(float)
if make_positive:
diff_energies = np.abs(diff_energies)
if not np.isnan(diff_energies.toarray()).all():
transitions.append((initial_state, final_state))
transition_energies.append(diff_energies)
self._current_param_indices = None
if not as_specdata:
return transitions, transition_energies
reduced_parameters = self._parameters.create_sliced(param_indices)
if len(reduced_parameters) == 1:
name = reduced_parameters.names[0]
vals = reduced_parameters[name]
return SpectrumData(
energy_table=np.asarray(transition_energies).T,
system_params=self.system_params,
param_name=name,
param_vals=vals,
labels=list(map(str, transitions)),
subtract=np.asarray(
[initial_energies] * self._evals_count, dtype=float
).T,
)
return SpectrumData(
energy_table=np.asarray(transition_energies),
system_params=self.system_params,
label=list(map(str, transitions)),
)
def plot_transitions(
self,
subsystems: Optional[Union[QuantumSys, List[QuantumSys]]] = None,
initial: Optional[Union[int, Tuple[int, ...]]] = None,
final: Optional[Union[int, Tuple[int, ...]]] = None,
sidebands: bool = False,
make_positive: bool = True,
coloring: Union[str, ndarray] = "transition",
param_indices: Optional[NpIndices] = None,
**kwargs,
) -> Tuple[Figure, Axes]:
"""
Plot transition energies as a function of one external parameter. Usage is based
on preslicing of the ParameterSweep object to select a single parameter to be
involved in the sweep. E.g.,
`<ParameterSweep>[0, :, 2].plot_transitions()`
plots all eigenenergy differences for transitions starting in the ground
state (default when no initial state is specified) as a function of the middle
parameter while parameters 1 and 3 are fixed by the indices 0 and 2.
Parameters
----------
subsystems:
single subsystems or list of subsystems considered as "active" for the
transitions to be generated; if omitted as a parameter, all subsystems
are considered as actively participating in the transitions
initial:
initial state from which transitions originate, specified as a bare product
state of either all subsystems the subset of active subsystems
(default: ground state of the system)
final:
concrete final state for which the transition energy should be generated; if
not provided, a list of allowed final states is generated
sidebands:
if set to true, sideband transitions with multiple subsystems changing
excitation levels are included (default: False)
make_positive:
boolean option relevant if the initial state is an excited state;
downwards transition energies would regularly be negative, but are
converted to positive if this flag is set to True (default: True)
coloring:
For `"transition"` (default), transitions are colored by their
dispersive nature; "`plain`", curves are colored naively
param_indices:
usually to be omitted, as param_indices will be set via pre-slicing
Returns
-------
A tuple consisting of a list of all the transitions and a corresponding
list of difference energies, e.g.
((0,0,0), (0,0,1)), <energy array for transition 0,0,0 -> 0,0,1>.
If as_specdata is set to True, a SpectrumData object is returned instead,
saving transition label info in an attribute named `labels`.
"""
param_indices = param_indices or self._current_param_indices
if len(self._parameters.create_sliced(param_indices)) > 1:
raise ValueError(
"Transition plots are only supported for a sweep over a "
"single parameter. You can reduce a multidimensional "
"sweep by pre-slicing, e.g., <ParameterSweep>[0, :, "
"0].plot_transitions(...)"
)
specdata = self.transitions(
subsystems,
initial,
final,
sidebands,
make_positive,
as_specdata=True,
param_indices=param_indices,
)
specdata_all = copy.deepcopy(self[param_indices].dressed_specdata)
specdata_all.energy_table -= specdata.subtract
if make_positive:
specdata_all.energy_table = np.abs(specdata_all.energy_table)
self._current_param_indices = None # reset from pre-slicing
if coloring == "plain":
return specdata_all.plot_evals_vs_paramvals()
fig_ax = specdata_all.plot_evals_vs_paramvals(color="gainsboro", linewidth=0.75)
labellines_status = plot._LABELLINES_ENABLED
plot._LABELLINES_ENABLED = False
fig, axes = specdata.plot_evals_vs_paramvals(
label_list=specdata.labels, fig_ax=fig_ax, **kwargs
)
plot._LABELLINES_ENABLED = labellines_status
return fig, axes
def add_sweep(
self,
sweep_function: Union[str, Callable],
sweep_name: Optional[str] = None,
**kwargs,
) -> None:
"""
Add a new sweep to the ParameterSweep object. The generated data is
subsequently accessible through <ParameterSweep>[<sweep_function>] or
<ParameterSweep>[<sweep_name>]
Parameters
----------
sweep_function:
name of a sweep function in scq.sweeps as str, or custom function (
callable) provided by the user
sweep_name:
if given, the generated data is stored in <ParameterSweep>[<sweep_name>]
rather than [<sweep_name>]
kwargs:
keyword arguments handed over to the sweep function
Returns
-------
None
"""
if callable(sweep_function):
if not hasattr(sweep_function, "__name__") and not sweep_name:
raise ValueError(
"Sweep function name cannot be accessed. Provide an "
"explicit `sweep_name` instead."
)
sweep_name = sweep_name or sweep_function.__name__
func = sweep_function
self._data[sweep_name] = sweeps.generator(self, func, **kwargs)
else:
sweep_name = sweep_name or sweep_function
func = getattr(sweeps, sweep_function)
self._data[sweep_name] = func(**kwargs)
def matrix_elements(
self,
operator_name: str,
sweep_name: str,
subsystem: "QuantumSys",
) -> None:
"""Generate data for matrix elements with respect to a given operator, as a
function of the sweep parameter(s)
Parameters
----------
operator_name:
name of the operator in question
sweep_name:
The sweep data will be accessible as <ParameterSweep>[<sweep_name>]
subsystem:
subsystems for which to compute matrix elements.
Returns
-------
None; results are saved as <ParameterSweep>[<sweep_name>]
"""
def _matrix_elements(
sweep: "ParameterSweep",
param_indices: Tuple[int, ...],
param_vals: Tuple[float, ...],
operator_name: Union[str, None] = None,
subsystem=None,
) -> np.ndarray:
subsys_index = sweep.get_subsys_index(subsystem)
bare_evecs = sweep["bare_evecs"][subsys_index][param_indices]
return subsystem.matrixelement_table(
operator=operator_name,
evecs=bare_evecs,
evals_count=subsystem.truncated_dim,
)
operator_func = functools.partial(
_matrix_elements,
sweep=self,
operator_name=operator_name,
subsystem=subsystem,
)
matrix_element_data = sweeps.generator(
self,
operator_func,
)
self._data[sweep_name] = matrix_element_data
[docs]class ParameterSweep(
ParameterSweepBase,
SpectrumLookupMixin,
dispatch.DispatchClient,
serializers.Serializable,
):
"""
`ParameterSweep` supports array-like access ("pre-slicing") and dict-like access.
With dict-like access via string-keywords `<ParameterSweep>[<str>]`,
the following data is returned:
`"evals"` and `"evecs"`
dressed eigenenergies and eigenstates as
`NamedSlotsNdarray`; eigenstates are decomposed in the bare product-state basis
of the non-interacting subsystems' eigenbases
`"bare_evals"` and `"bare_evecs"`
bare eigenenergies and eigenstates as `NamedSlotsNdarray`
`"lamb"`, `"chi"`, and `"kerr"`
dispersive energy coefficients
`"<custom sweep>"`
NamedSlotsNdarray for custom data generated with `add_sweep`.
Array-like access is responsible for "pre-slicing",
enable lookup functionality such as
`<Sweep>[p1, p2, ...].eigensys()`
Parameters
----------
hilbertspace:
HilbertSpace object describing the quantum system of interest
paramvals_by_name:
Dictionary that, for each set of parameter values, specifies a parameter name
and the set of values to be used in the sweep.
update_hilbertspace:
function that updates the associated `hilbertspace` object with a given
set of parameters
evals_count:
number of dressed eigenvalues/eigenstates to keep. (The number of bare
eigenvalues/eigenstates is determined for each subsystems by `truncated_dim`.)
[default: 20]
subsys_update_info:
To speed up calculations, the user may provide information that specifies which
subsystems are being updated for each of the given parameter sweeps. This
information is specified by a dictionary of the following form::
{
"<parameter name 1>": [<subsystem a>],
"<parameter name 2>": [<subsystem b>, <subsystem c>, ...],
...
}
This indicates that changes in `<parameter name 1>` only require updates of
`<subsystem a>` while leaving other subsystems unchanged. Similarly, sweeping
`<parameter name 2>` affects `<subsystem b>`, `<subsystem c>` etc.
bare_only:
if set to True, only bare eigendata is calculated; useful when performing a
sweep for a single quantum system, no interaction (default: False)
autorun:
Determines whether to directly run the sweep or delay it until `.run()` is
called manually. (Default: `settings.AUTORUN_SWEEP=True`)
num_cpus:
number of CPU cores requested for computing the sweep
(default value `settings.NUM_CPUS`)
"""
def __new__(cls, *args, **kwargs) -> "Union[ParameterSweep, _ParameterSweep]":
if args and isinstance(args[0], str) or "param_name" in kwargs:
# old-style ParameterSweep interface is being used
warnings.warn(
"The implementation of the `ParameterSweep` class has changed and this "
"old-style interface will cease to be supported in the future.",
FutureWarning,
)
return _ParameterSweep(*args, **kwargs)
else:
return super().__new__(cls, *args, **kwargs)
def __init__(
self,
hilbertspace: HilbertSpace,
paramvals_by_name: Dict[str, ndarray],
update_hilbertspace: Callable,
evals_count: int = 20,
subsys_update_info: Optional[Dict[str, List[QuantumSys]]] = None,
bare_only: bool = False,
autorun: bool = settings.AUTORUN_SWEEP,
num_cpus: Optional[int] = None,
) -> None:
num_cpus = num_cpus or settings.NUM_CPUS
self._parameters = Parameters(paramvals_by_name)
self._hilbertspace = hilbertspace
self._evals_count = evals_count
self._update_hilbertspace = update_hilbertspace
self._subsys_update_info = subsys_update_info
self._data: Dict[str, Optional[NamedSlotsNdarray]] = {}
self._bare_only = bare_only
self._num_cpus = num_cpus
self.tqdm_disabled = settings.PROGRESSBAR_DISABLED or (num_cpus > 1)
self._out_of_sync = False
self._current_param_indices = None
dispatch.CENTRAL_DISPATCH.register("PARAMETERSWEEP_UPDATE", self)
dispatch.CENTRAL_DISPATCH.register("HILBERTSPACE_UPDATE", self)
if autorun:
self.run()
def cause_dispatch(self) -> None:
initial_parameters = tuple(paramvals[0] for paramvals in self._parameters)
self._update_hilbertspace(*initial_parameters)
@classmethod
def deserialize(cls, iodata: "IOData") -> "StoredSweep":
pass
def serialize(self) -> "IOData":
"""
Convert the content of the current class instance into IOData format.
Returns
-------
IOData
"""
initdata = {
"paramvals_by_name": self._parameters.ordered_dict,
"hilbertspace": self._hilbertspace,
"evals_count": self._evals_count,
"_data": self._data,
}
iodata = serializers.dict_serialize(initdata)
iodata.typename = "StoredSweep"
return iodata
@property
def param_info(self) -> Dict[str, ndarray]:
"""Return a dictionary of the parameter names and values used in this sweep."""
return self._parameters.paramvals_by_name
[docs] def run(self) -> None:
"""Create all sweep data: bare spectral data, dressed spectral data, lookup
data and custom sweep data."""
# generate one dispatch before temporarily disabling CENTRAL_DISPATCH
self.cause_dispatch()
settings.DISPATCH_ENABLED = False
self._data["bare_evals"], self._data["bare_evecs"] = self._bare_spectrum_sweep()
if not self._bare_only:
self._data["evals"], self._data["evecs"] = self._dressed_spectrum_sweep()
self._data["dressed_indices"] = self.generate_lookup()
(
self._data["lamb"],
self._data["chi"],
self._data["kerr"],
) = self._dispersive_coefficients()
settings.DISPATCH_ENABLED = True
def _bare_spectrum_sweep(self) -> Tuple[NamedSlotsNdarray, NamedSlotsNdarray]:
"""
The bare energy spectra are computed according to the following scheme.
1. Perform a loop over all subsystems to separately obtain the bare energy
eigenvalues and eigenstates for each subsystems.
2. If `update_subsystem_info` is given, remove those sweeps that leave the
subsystems fixed.
3. If self._num_cpus > 1, parallelize.
Returns
-------
NamedSlotsNdarray[<paramname1>, <paramname2>, ..., "subsys"] for evals,
likewise for evecs;
here, "subsys": 0, 1, ... enumerates subsystems and
"""
bare_evals = np.empty((self.subsystem_count,), dtype=object)
bare_evecs = np.empty((self.subsystem_count,), dtype=object)
for subsys_index, subsystem in enumerate(self._hilbertspace):
bare_esys = self._subsys_bare_spectrum_sweep(subsystem)
bare_evals[subsys_index] = NamedSlotsNdarray(
np.asarray(bare_esys[..., 0].tolist()),
self._parameters.paramvals_by_name,
)
bare_evecs[subsys_index] = NamedSlotsNdarray(
np.asarray(bare_esys[..., 1].tolist()),
self._parameters.paramvals_by_name,
)
return (
NamedSlotsNdarray(bare_evals, {"subsys": np.arange(self.subsystem_count)}),
NamedSlotsNdarray(bare_evecs, {"subsys": np.arange(self.subsystem_count)}),
)
def _update_subsys_compute_esys(
self, update_func: Callable, subsystem: QuantumSys, paramval_tuple: Tuple[float]
) -> ndarray:
update_func(*paramval_tuple)
evals, evecs = subsystem.eigensys(evals_count=subsystem.truncated_dim)
esys_array = np.empty(shape=(2,), dtype=object)
esys_array[0] = evals
esys_array[1] = evecs
return esys_array
def _paramnames_no_subsys_update(self, subsystem) -> List[str]:
if self._subsys_update_info is None:
return []
updating_parameters = [
name
for name in self._subsys_update_info.keys()
if subsystem in self._subsys_update_info[name]
]
return list(set(self._parameters.names) - set(updating_parameters))
def _subsys_bare_spectrum_sweep(self, subsystem) -> ndarray:
"""
Parameters
----------
subsystem:
subsystem for which the bare spectrum sweep is to be computed
Returns
-------
multidimensional array of the format
array[p1, p2, p3, ..., pN] = np.asarray[[evals, evecs]]
"""
fixed_paramnames = self._paramnames_no_subsys_update(subsystem)
reduced_parameters = self._parameters.create_reduced(fixed_paramnames)
total_count = np.prod([len(param_vals) for param_vals in reduced_parameters])
multi_cpu = self._num_cpus > 1
target_map = cpu_switch.get_map_method(self._num_cpus)
with utils.InfoBar(
"Parallel compute bare eigensys [num_cpus={}]".format(self._num_cpus),
self._num_cpus,
) as p:
bare_eigendata = tqdm(
target_map(
functools.partial(
self._update_subsys_compute_esys,
self._update_hilbertspace,
subsystem,
),
itertools.product(*reduced_parameters.paramvals_list),
),
total=total_count,
desc="Bare spectra",
leave=False,
disable=multi_cpu,
)
bare_eigendata = np.asarray(list(bare_eigendata), dtype=object)
bare_eigendata = bare_eigendata.reshape((*reduced_parameters.counts, 2))
# Bare spectral data was only computed once for each parameter that has no
# update effect on the subsystems. Now extend the array to reflect this
# for the full parameter array by repeating
for name in fixed_paramnames:
index = self._parameters.index_by_name[name]
param_count = self._parameters.counts[index]
bare_eigendata = np.repeat(bare_eigendata, param_count, axis=index)
return bare_eigendata
def _update_and_compute_dressed_esys(
self,
hilbertspace: HilbertSpace,
evals_count: int,
update_func: Callable,
paramindex_tuple: Tuple[int],
) -> ndarray:
paramval_tuple = self._parameters[paramindex_tuple]
update_func(*paramval_tuple)
assert self._data is not None
bare_esys = {
subsys_index: [
self._data["bare_evals"][subsys_index][paramindex_tuple],
self._data["bare_evecs"][subsys_index][paramindex_tuple],
]
for subsys_index, _ in enumerate(self._hilbertspace)
}
evals, evecs = hilbertspace.eigensys(
evals_count=evals_count, bare_esys=bare_esys
)
esys_array = np.empty(shape=(2,), dtype=object)
esys_array[0] = evals
esys_array[1] = evecs
return esys_array
def _dressed_spectrum_sweep(
self,
) -> Tuple[NamedSlotsNdarray, NamedSlotsNdarray]:
"""
Returns
-------
NamedSlotsNdarray[<paramname1>, <paramname2>, ...] of eigenvalues,
likewise for eigenvectors
"""
multi_cpu = self._num_cpus > 1
target_map = cpu_switch.get_map_method(self._num_cpus)
total_count = np.prod(self._parameters.counts)
with utils.InfoBar(
"Parallel compute dressed eigensys [num_cpus={}]".format(self._num_cpus),
self._num_cpus,
) as p:
spectrum_data = list(
tqdm(
target_map(
functools.partial(
self._update_and_compute_dressed_esys,
self._hilbertspace,
self._evals_count,
self._update_hilbertspace,
),
itertools.product(*self._parameters.ranges),
),
total=total_count,
desc="Dressed spectrum",
leave=False,
disable=multi_cpu,
)
)
spectrum_data = np.asarray(spectrum_data, dtype=object)
spectrum_data = spectrum_data.reshape((*self._parameters.counts, 2))
slotparamvals_by_name = OrderedDict(self._parameters.ordered_dict.copy())
evals = np.asarray(spectrum_data[..., 0].tolist())
evecs = spectrum_data[..., 1]
return NamedSlotsNdarray(evals, slotparamvals_by_name), NamedSlotsNdarray(
evecs, slotparamvals_by_name
)
def _energies_1(self, subsys):
bare_label = np.zeros(len(self._hilbertspace))
bare_label[self.get_subsys_index(subsys)] = 1
energies_all_l = np.empty(self._parameters.counts + (subsys.truncated_dim,))
for l in range(subsys.truncated_dim):
energies_all_l[..., l] = self[:].energy_by_bare_index(tuple(l * bare_label))
return energies_all_l
def _energies_2(self, subsys1, subsys2):
bare_label1 = np.zeros(len(self._hilbertspace))
bare_label1[self.get_subsys_index(subsys1)] = 1
bare_label2 = np.zeros(len(self._hilbertspace))
bare_label2[self.get_subsys_index(subsys2)] = 1
energies_all_l1_l2 = np.empty(
self._parameters.counts
+ (subsys1.truncated_dim,)
+ (subsys2.truncated_dim,)
)
for l1 in range(subsys1.truncated_dim):
for l2 in range(subsys2.truncated_dim):
energies_all_l1_l2[..., l1, l2] = self[:].energy_by_bare_index(
tuple(l1 * bare_label1 + l2 * bare_label2)
)
return energies_all_l1_l2
def _dispersive_coefficients(
self,
) -> Tuple[NamedSlotsNdarray, NamedSlotsNdarray, NamedSlotsNdarray]:
energy_0 = self[:].energy_by_dressed_index(0).toarray()
lamb_data = np.empty(self.subsystem_count, dtype=object)
kerr_data = np.empty((self.subsystem_count, self.subsystem_count), dtype=object)
for subsys_index1, subsys1 in enumerate(self._hilbertspace):
energy_subsys1_all_l1 = self._energies_1(subsys1)
bare_energy_subsys1_all_l1 = self["bare_evals"][subsys_index1].toarray()
lamb_subsys1_all_l1 = (
energy_subsys1_all_l1
- energy_0[..., None]
- bare_energy_subsys1_all_l1
+ bare_energy_subsys1_all_l1[..., 0][..., None]
)
lamb_data[subsys_index1] = NamedSlotsNdarray(
lamb_subsys1_all_l1, self._parameters.paramvals_by_name
)
for subsys_index2, subsys2 in enumerate(self._hilbertspace):
energy_subsys2_all_l2 = self._energies_1(subsys2)
energy_subsys1_subsys2_all_l1_l2 = self._energies_2(subsys1, subsys2)
kerr_subsys1_subsys2_all_l1_l2 = (
energy_subsys1_subsys2_all_l1_l2
+ energy_0[..., None, None]
- energy_subsys1_all_l1[..., :, None]
- energy_subsys2_all_l2[..., None, :]
)
if subsys1 is subsys2:
kerr_subsys1_subsys2_all_l1_l2 /= 2.0 # self-Kerr needs factor 1/2
kerr_data[subsys_index1, subsys_index2] = NamedSlotsNdarray(
kerr_subsys1_subsys2_all_l1_l2, self._parameters.paramvals_by_name
)
sys_indices = np.arange(self.subsystem_count)
lamb_data = NamedSlotsNdarray(lamb_data, {"subsys": sys_indices})
kerr_data = NamedSlotsNdarray(
kerr_data, {"subsys1": sys_indices, "subsys2": sys_indices}
)
chi_data = kerr_data.copy()
for subsys_index1, subsys1 in enumerate(self._hilbertspace):
for subsys_index2, subsys2 in enumerate(self._hilbertspace):
if subsys1 in self.osc_subsys_list:
if subsys2 in self.qbt_subsys_list:
chi_data[subsys_index1, subsys_index2] = chi_data[
subsys_index1, subsys_index2
][..., 1, :]
kerr_data[subsys_index1, subsys_index2] = np.asarray([])
else:
chi_data[subsys_index1, subsys_index2] = np.asarray([])
elif subsys1 in self.qbt_subsys_list:
if subsys2 in self.osc_subsys_list:
chi_data[subsys_index1, subsys_index2] = chi_data[
subsys_index1, subsys_index2
][..., :, 1]
kerr_data[subsys_index1, subsys_index2] = np.asarray([])
else:
chi_data[subsys_index1, subsys_index2] = np.asarray([])
return lamb_data, chi_data, kerr_data
class StoredSweep(
ParameterSweepBase,
SpectrumLookupMixin,
dispatch.DispatchClient,
serializers.Serializable,
):
_parameters = descriptors.WatchedProperty("PARAMETERSWEEP_UPDATE")
_evals_count = descriptors.WatchedProperty("PARAMETERSWEEP_UPDATE")
_data = descriptors.WatchedProperty("PARAMETERSWEEP_UPDATE")
_hilbertspace: HilbertSpace
def __init__(self, paramvals_by_name, hilbertspace, evals_count, _data) -> None:
self._parameters = Parameters(paramvals_by_name)
self._hilbertspace = hilbertspace
self._evals_count = evals_count
self._data = _data
self._out_of_sync = False
self._current_param_indices = None
@classmethod
def deserialize(cls, iodata: "IOData") -> "StoredSweep":
"""
Take the given IOData and return an instance of the described class, initialized
with the data stored in io_data.
Parameters
----------
iodata: IOData
Returns
-------
StoredSweep
"""
return StoredSweep(**iodata.as_kwargs())
def serialize(self) -> "IOData":
pass
# StoredSweep: other methods
def get_hilbertspace(self) -> HilbertSpace:
return self._hilbertspace
def new_sweep(
self,
paramvals_by_name: Dict[str, ndarray],
update_hilbertspace: Callable,
evals_count: int = 6,
subsys_update_info: Optional[Dict[str, List[QuantumSys]]] = None,
autorun: bool = settings.AUTORUN_SWEEP,
num_cpus: Optional[int] = None,
) -> ParameterSweep:
return ParameterSweep(
self._hilbertspace,
paramvals_by_name,
update_hilbertspace,
evals_count=evals_count,
subsys_update_info=subsys_update_info,
autorun=autorun,
num_cpus=num_cpus,
)