Source code for scqubits.core.param_sweep

# param_sweep.py
#
# This file is part of scqubits: a Python package for superconducting qubits,
# Quantum 5, 583 (2021). https://quantum-journal.org/papers/q-2021-11-17-583/
#
#    Copyright (c) 2019 and later, Jens Koch and Peter Groszkowski
#    All rights reserved.
#
# This source code is licensed under the BSD-style license found in the LICENSE file
# in the root directory of this source tree.
# ###########################################################################

import copy
import dill
import functools
import inspect
import itertools
import warnings

from abc import ABC
from collections import OrderedDict
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    List,
    Optional,
    Tuple,
    Union,
    overload,
)

import numpy as np
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from numpy import ndarray
from qutip import Qobj
from scipy.sparse import csc_matrix
from typing_extensions import Literal

import scqubits as scq
import scqubits.core.central_dispatch as dispatch
import scqubits.core.descriptors as descriptors
import scqubits.core.sweeps as sweeps
import scqubits.io_utils.fileio_serializers as serializers
import scqubits.utils.cpu_switch as cpu_switch
import scqubits.utils.misc as utils
import scqubits.utils.plotting as plot
from scqubits import settings as settings
from scqubits.core.hilbert_space import HilbertSpace
from scqubits.core.namedslots_array import (
    NamedSlotsNdarray,
    Parameters,
    convert_to_std_npindex,
)
from scqubits.core.oscillator import Oscillator
from scqubits.core.qubit_base import QuantumSystem, QubitBaseClass
from scqubits.core.spec_lookup import SpectrumLookupMixin
from scqubits.core.storage import SpectrumData

if TYPE_CHECKING:
    from scqubits.io_utils.fileio import IOData

if settings.IN_IPYTHON:
    from tqdm.notebook import tqdm
else:
    from tqdm import tqdm

from scqubits.utils.typedefs import GIndexTuple, NpIndices

BareLabel = Tuple[int, ...]
DressedLabel = int
StateLabel = Union[DressedLabel, BareLabel]


_faulty_interactionterm_warning_issued = False  # flag to ensure single-time warning


[docs]class ParameterSlice: """ Stores information about a 1d slice of a (possibly) multi-dimensional parameter sweep. Parameters ---------- param_name: name of the single parameter which is being swept param_val: single selected value of the parameter (as used, e.g., in the Explorer) fixed_params: dictionary giving the names of the fixed parameters and their corresponding values params_ordered: list of all parameter names, giving their ordering """ def __init__( self, param_name: str, param_val: float, fixed_params: Dict[str, float], params_ordered: List[str], ): self.param_name = param_name self.param_val = param_val self.fixed_dict = fixed_params self.all_dict = {param_name: param_val, **fixed_params} self.fixed = tuple( slice(name, value) for name, value in self.fixed_dict.items() ) self.all = tuple(slice(name, value) for name, value in self.all_dict.items()) self.all_values = [self.all_dict[name] for name in params_ordered]
[docs]class ParameterSweepBase(ABC, SpectrumLookupMixin): """ The_ParameterSweepBase class is an abstract base class for ParameterSweep and StoredSweep """ _lookup_exists = False _parameters = descriptors.WatchedProperty(Parameters, "PARAMETERSWEEP_UPDATE") _evals_count = descriptors.WatchedProperty(int, "PARAMETERSWEEP_UPDATE") _data = descriptors.WatchedProperty(Dict[str, ndarray], "PARAMETERSWEEP_UPDATE") _hilbertspace: HilbertSpace _out_of_sync = False _out_of_sync_warning_issued = False _current_param_indices: NpIndices @property def hilbertspace(self) -> HilbertSpace: return self._hilbertspace @property def parameters(self) -> Parameters: """Return the Parameter object (access parameter values/indexing)""" return self._parameters @property def param_info(self) -> Dict[str, ndarray]: """Return a dictionary of the parameter names and values used in this sweep.""" return self._parameters.paramvals_by_name def get_subsys(self, index: int) -> QuantumSystem: return self.hilbertspace[index] def subsys_by_id_str(self, id_str: str) -> QuantumSystem: return self.hilbertspace.subsys_by_id_str(id_str) def subsys_evals_count(self, subsys_index: int) -> int: return self["bare_evals"]["subsys":subsys_index].shape[-1]
[docs] def dressed_evals_count(self) -> int: """Returns number of dressed eigenvalues included in sweep.""" return self._evals_count
def get_subsys_index(self, subsys: QuantumSystem) -> int: return self.hilbertspace.get_subsys_index(subsys) @property def osc_subsys_list(self) -> List[Oscillator]: return self.hilbertspace.osc_subsys_list @property def qbt_subsys_list(self) -> List[QubitBaseClass]: return self.hilbertspace.qbt_subsys_list @property def subsystem_count(self) -> int: return self.hilbertspace.subsystem_count @utils.check_sync_status def __getitem__(self, key): if isinstance(key, str): return self._data[key] # The following enables the pre-slicing syntax: # <Sweep>[p1, p2, ...].dressed_eigenstates() if isinstance(key, tuple): self._current_param_indices = convert_to_std_npindex(key, self._parameters) elif isinstance(key, slice): if key == slice(None) or key == slice(None, None, None): key = (key,) * len(self._parameters) else: key = (key,) self._current_param_indices = convert_to_std_npindex(key, self._parameters) elif isinstance(key, np.integer): key = (key,) self._current_param_indices = convert_to_std_npindex(key, self._parameters) return self
[docs] def receive(self, event: str, sender: object, **kwargs) -> None: """Hook to CENTRAL_DISPATCH. This method is accessed by the global CentralDispatch instance whenever an event occurs that ParameterSweep is registered for. In reaction to update events, the lookup table is marked as out of sync. Parameters ---------- event: type of event being received sender: identity of sender announcing the event **kwargs """ if self._data: if event == "HILBERTSPACE_UPDATE" and sender is self.hilbertspace: self._out_of_sync = True elif event == "PARAMETERSWEEP_UPDATE" and sender is self: self._out_of_sync = True
[docs] def set_update_func(self, update_hilbertspace: Callable) -> Callable: """Account for the two possible signatures of the `update_hilbertspace` function. Inspect whether a `self` argument is given. If not, return a function that accepts `self` as a dummy argument.""" arguments = inspect.signature(update_hilbertspace) if len(arguments.parameters) == len(self._parameters) + 1: # update_hilbertspace function already includes self argument return update_hilbertspace # function is missing self argument; create function with self dummy variable def full_update_func(sweep: "ParameterSweep", *args): return update_hilbertspace(*args) return full_update_func
@property def bare_specdata_list(self) -> List[SpectrumData]: """ Wrap bare eigensystem data into a SpectrumData object. To be used with pre-slicing, e.g. `<ParameterSweep>[0, :].bare_specdata_list` Returns ------- List of `SpectrumData` objects with bare eigensystem data, one per subsystem """ multi_index = self._current_param_indices sweep_param_indices = self.get_sweep_indices(multi_index) # type:ignore if len(sweep_param_indices) != 1: raise ValueError( "All but one parameter must be fixed for `bare_specdata_list`." ) sweep_param_name = self._parameters.name_by_index[sweep_param_indices[0]] specdata_list: List[SpectrumData] = [] for subsys_index, subsystem in enumerate(self.hilbertspace): evals_swp = self["bare_evals"][subsys_index][multi_index] evecs_swp = self["bare_evecs"][subsys_index][multi_index] specdata_list.append( SpectrumData( energy_table=evals_swp.toarray(), state_table=evecs_swp.toarray(), system_params=self.hilbertspace.get_initdata(), param_name=sweep_param_name, param_vals=self._parameters[sweep_param_name], ) ) self._preslicing_reset() return specdata_list @property def dressed_specdata(self) -> "SpectrumData": """ Wrap dressed eigensystem data into a SpectrumData object. To be used with pre-slicing, e.g. `<ParameterSweep>[0, :].dressed_specdata` Returns ------- `SpectrumData` object with bare eigensystem data """ multi_index = self._current_param_indices sweep_param_indices = self.get_sweep_indices(multi_index) # type:ignore if len(sweep_param_indices) != 1: raise ValueError( "All but one parameter must be fixed for `dressed_specdata`." ) sweep_param_name = self._parameters.name_by_index[sweep_param_indices[0]] specdata = SpectrumData( energy_table=self["evals"][multi_index].toarray(), state_table=self["evecs"][multi_index].toarray(), system_params=self.hilbertspace.get_initdata(), param_name=sweep_param_name, param_vals=self._parameters[sweep_param_name], ) self._preslicing_reset() return specdata
[docs] def get_sweep_indices(self, multi_index: GIndexTuple) -> List[int]: """ For given generalized multi-index, return a list of the indices that are being swept. """ std_multi_index = convert_to_std_npindex(multi_index, self._parameters) sweep_indices = [ index for index, index_obj in enumerate(std_multi_index) if isinstance( self._parameters.paramvals_list[index][index_obj], (list, tuple, ndarray), ) ] self._preslicing_reset() return sweep_indices
@property def system_params(self) -> Dict[str, Any]: return self.hilbertspace.get_initdata() def _preslicing_reset(self) -> None: self._current_param_indices = slice(None, None, None) def _slice_is_1d_sweep(self, param_indices: Optional[NpIndices]) -> bool: param_indices = param_indices or self._current_param_indices reduced_parameters = self._parameters.create_sliced(param_indices) return len(reduced_parameters) == 1 def _final_states_for_subsys_transition( self, subsystem: QuantumSystem, initial_tuple: BareLabel ) -> List[BareLabel]: """For given initial state of the composite quantum system, return the final states possible to reach by changing the energy level of the given `subsystem`""" subsys_index = self.hilbertspace.get_subsys_index(subsystem) final_tuples_list = [] for level in range(subsystem.truncated_dim): final_state = list(initial_tuple) final_state[subsys_index] = level final_tuples_list.append(tuple(final_state)) final_tuples_list.remove(initial_tuple) return final_tuples_list def _get_final_states_list( self, initial_state: Union[BareLabel, DressedLabel], subsys_list: List[QuantumSystem], sidebands: bool, ) -> List[BareLabel]: """Construct and return the possible final states as a list, based on the provided initial state, a list of active subsystems and flag for whether to include sideband transitions.""" if isinstance(initial_state, DressedLabel): return self._bare_product_states_labels if not sidebands: final_state_list = [] for subsys in subsys_list: final_state_list += self._final_states_for_subsys_transition( subsys, initial_state ) return final_state_list range_list = [range(dim) for dim in self.hilbertspace.subsystem_dims] for subsys_index, subsys in enumerate(self.hilbertspace): if subsys not in subsys_list: range_list[subsys_index] = [initial_state[subsys_index]] # type:ignore final_state_list = list(itertools.product(*range_list)) return final_state_list def _complete_state( self, partial_state: BareLabel, subsys_list: List[QuantumSystem], ) -> BareLabel: """A partial state only includes entries for active subsystems. Complete this state by inserting 0 entries for all inactive subsystems.""" state_full = [0] * len(self.hilbertspace) for entry, subsys in zip(partial_state, subsys_list): subsys_index = self.get_subsys_index(subsys) state_full[subsys_index] = entry return tuple(state_full) def _process_subsystems_option( self, subsystems: Optional[Union[QuantumSystem, List[QuantumSystem]]] ) -> List[QuantumSystem]: if subsystems is None: return self.hilbertspace.subsystem_list if isinstance(subsystems, list): return subsystems if isinstance(subsystems, QuantumSystem): return [subsystems] raise TypeError("Argument `subsystems` has invalid type.") def _process_initial_option( self, initial: Union[None, StateLabel], subsys_list: List[QuantumSystem], ) -> Tuple[bool, Callable, StateLabel]: if isinstance(initial, DressedLabel): initial_dressed = True return initial_dressed, self.energy_by_dressed_index, initial if initial is None: initial_dressed = False initial = (0,) * len(self.hilbertspace) return initial_dressed, self.energy_by_bare_index, initial initial_dressed = False if len(initial) not in [len(self.hilbertspace), len(subsys_list)]: raise ValueError( "State information provided is not compatible " "with the set of subsystems(s)." ) if len(initial) < len(self.hilbertspace): initial = tuple(self._complete_state(initial, subsys_list)) return initial_dressed, self.energy_by_bare_index, initial def _process_final_option( self, final: Union[None, StateLabel], initial: StateLabel, subsys_list: List[QuantumSystem], sidebands: bool, ) -> Tuple[bool, Callable, Union[List[DressedLabel], List[BareLabel]]]: if final is None: final_dressed = False final_states_list = self._get_final_states_list( initial, subsys_list, sidebands ) return final_dressed, self.energy_by_bare_index, final_states_list if final == -1: final_dressed = True return ( final_dressed, self.energy_by_dressed_index, list(range(0, self.dressed_evals_count())), ) if isinstance(final, DressedLabel): final_dressed = True return final_dressed, self.energy_by_dressed_index, [final] if isinstance(final, tuple): final_dressed = False return final_dressed, self.energy_by_bare_index, [final] raise TypeError("Invalid type for final state.") def _validate_bare_initial( self, initial: BareLabel, initial_energies: NamedSlotsNdarray, param_indices: NpIndices, ) -> None: if np.isnan(initial_energies.toarray().astype(np.float_)).any(): warnings.warn( "The initial state undergoes significant hybridization. " "Identification with a bare product state was not (fully) " "successful. Consider running ParameterSweep with " "`ignore_low_overlap=True` or specify `initial` as a dressed-state " "index (integer) instead of a bare product state.\n", UserWarning, ) elif sum(initial) == 0 and not np.all( initial_energies == self["evals"][param_indices][..., 0] ): warnings.warn( "The state (0,0, ...,0) may not be dispersively connected " "to the true ground state. Specifying `initial=0` (dressed-state " "index) may be preferable.\n", UserWarning, ) def _generate_transition_labels( self, initial_dressed: bool, final_dressed: bool, transitions: List[Tuple[StateLabel, StateLabel]], ) -> List[str]: identity_map = lambda x: x initial_label_func = ( identity_map if initial_dressed else utils.tuple_to_short_str ) final_label_func = identity_map if final_dressed else utils.tuple_to_short_str return [ r"{}$\to${}".format(initial_label_func(initial), final_label_func(final)) for initial, final in transitions ] @overload def transitions( self, as_specdata: Literal[True] = True, subsystems: Optional[Union[QuantumSystem, List[QuantumSystem]]] = None, initial: Optional[StateLabel] = None, final: Optional[StateLabel] = None, sidebands: bool = False, photon_number: int = 1, make_positive: bool = False, param_indices: Optional[NpIndices] = None, ) -> SpectrumData: ... @overload def transitions( self, as_specdata: Literal[False], subsystems: Optional[Union[QuantumSystem, List[QuantumSystem]]] = None, initial: Optional[StateLabel] = None, final: Optional[StateLabel] = None, sidebands: bool = False, photon_number: int = 1, make_positive: bool = False, param_indices: Optional[NpIndices] = None, ) -> Tuple[List[Tuple[StateLabel, StateLabel]], List[NamedSlotsNdarray]]: ...
[docs] def transitions( self, as_specdata: bool = False, subsystems: Optional[Union[QuantumSystem, List[QuantumSystem]]] = None, initial: Optional[StateLabel] = None, final: Optional[StateLabel] = None, sidebands: bool = False, photon_number: int = 1, make_positive: bool = False, param_indices: Optional[NpIndices] = None, ) -> Union[ Tuple[List[Tuple[StateLabel, StateLabel]], List[NamedSlotsNdarray]], SpectrumData, ]: """ Use dressed eigenenergy data and lookup based on bare product state labels to extract transition energy data. Usage is based on preslicing to select all or a subset of parameters to be involved in the sweep, e.g., `<ParameterSweep>[0, :, 2].transitions()` produces all eigenenergy differences for transitions starting in the ground state (default when no initial state is specified) as a function of the middle parameter while parameters 1 and 3 are fixed by the indices 0 and 2. Parameters ---------- subsystems: single subsystems or list of subsystems considered as "active" for the transitions to be generated; if omitted as a parameter, all subsystems are considered as actively participating in the transitions initial: initial state from which transitions originate, specified either (1) as a bare product state (tuple of excitation numbers of all subsystems or of the active ones given in `subsystems`); or (2) as a dressed-state index in the form of an integer >= 0. (default: (0,0,...,0) which is usually closest to the ground state) final: concrete final state for which the transition energy should be generated, given either as a bare product state (tuple of excitation numbers), or as a dressed state (non-negative integer). If `final` is omitted a list of final states is generated for dispersive transitions within each (active) subsystem. Sidebands can be switched on with the subsequent keyword option. `final=-1` can be chosen for a final state list to all other dressed states (helpful when the dispersive limit breaks down). sidebands: if set to true, sideband transitions with multiple subsystems changing excitation levels are included (default: False) photon_number: number of photons involved in transition; transition frequencies are divided by this number (default: photon_number=1, i.e., single-photon transitions) make_positive: boolean option relevant if the initial state is an excited state; downwards transition energies would regularly be negative, but are converted to positive if this flag is set to True as_specdata: whether data is handed back in raw array form or wrapped into a SpectrumData object (default: False) param_indices: usually to be omitted, as param_indices will be set via pre-slicing Returns ------- A tuple consisting of a list of all the transitions and a corresponding list of difference energies, e.g. ((0,0,0), (0,0,1)), <energy array for transition 0,0,0 -> 0,0,1>. If `as_specdata` is set to True, a SpectrumData object is returned instead, saving transition label info in an attribute named `labels`. """ subsys_list = self._process_subsystems_option(subsystems) ( initial_dressed, initial_energy_lookup_func, initial_state, ) = self._process_initial_option(initial, subsys_list) ( final_dressed, final_energy_lookup_func, final_states_list, ) = self._process_final_option(final, initial_state, subsys_list, sidebands) transitions: List[Tuple[StateLabel, StateLabel]] = [] transition_energies: List[NamedSlotsNdarray] = [] param_indices = param_indices or self._current_param_indices _ = self[param_indices] # trigger pre-slicing initial_energies = initial_energy_lookup_func(initial_state) if not initial_dressed: self._validate_bare_initial(initial_state, initial_energies, param_indices) for final_state in final_states_list: final_energies = final_energy_lookup_func(final_state) diff_energies = (final_energies - initial_energies).astype(float) diff_energies /= photon_number if make_positive: diff_energies = np.abs(diff_energies) if not np.isnan(diff_energies).all(): # omit transitions with all nans transitions.append((initial_state, final_state)) transition_energies.append(diff_energies) self.reset_preslicing() if not as_specdata: return transitions, transition_energies label_list = self._generate_transition_labels( initial_dressed, final_dressed, transitions ) if self._slice_is_1d_sweep(param_indices): reduced_parameters = self._parameters.create_sliced(param_indices) name = reduced_parameters.names[0] vals = reduced_parameters[name] return SpectrumData( energy_table=np.asarray(transition_energies).T, system_params=self.system_params, param_name=name, param_vals=vals, labels=label_list, subtract=np.asarray( [initial_energies] * self._evals_count, dtype=float ).T, ) return SpectrumData( energy_table=np.asarray(transition_energies), system_params=self.system_params, labels=label_list, )
[docs] def plot_transitions( self, subsystems: Optional[Union[QuantumSystem, List[QuantumSystem]]] = None, initial: Optional[StateLabel] = None, final: Optional[StateLabel] = None, sidebands: bool = False, photon_number: int = 1, make_positive: bool = True, coloring: Union[str, ndarray] = "transition", param_indices: Optional[NpIndices] = None, **kwargs, ) -> Tuple[Figure, Axes]: """ Plot transition energies as a function of one external parameter. Usage is based on preslicing of the ParameterSweep object to select a single parameter to be involved in the sweep. E.g., `<ParameterSweep>[0, :, 2].plot_transitions()` plots all eigenenergy differences for transitions starting in the ground state (default when no initial state is specified) as a function of the middle parameter while parameters 1 and 3 are fixed by the indices 0 and 2. Parameters ---------- subsystems: single subsystems or list of subsystems considered as "active" for the transitions to be generated; if omitted as a parameter, all subsystems are considered as actively participating in the transitions initial: initial state from which transitions originate: the initial state can either be specified as a tuple referring to a bare product state, or as an integer representing the dressed state index. For bare product states, the required tuple has as many entries as the underlying `HilbertSpace` object has subsystems. (If `subsystems` is given, then the tuple may be reduced to entries for just these subsystems; other subsystems are given a "0" entry automatically.) The dressed state corresponding to the given bare product state is determined by considerations of overlaps. Note: for an initial dressed state, the `sidebands` option is ignored. final: concrete final state for which the transition energy should be generated, given either as a bare product state (tuple of excitation numbers), or as a dressed state (non-negative integer). If `final` is omitted a list of final states is generated for dispersive transitions within each (active) subsystem. Sidebands can be switched on with the subsequent keyword option. `final=-1` can be chosen for a final state list to all other dressed states (helpful when the dispersive limit breaks down). sidebands: if set to true, sideband transitions with multiple subsystems changing excitation levels are included (default: False). This option is ignored if `initial` is given as an integer dressed state index. photon_number: number of photons involved in transition; transition frequencies are divided by this number (default: photon_number=1, i.e., single-photon transitions) make_positive: boolean option relevant if the initial state is an excited state; downwards transition energies would regularly be negative, but are converted to positive if this flag is set to True (default: True) coloring: For `"transition"` (default), transitions are colored by their dispersive nature; for "`plain`" no selective highlighting is attempted. param_indices: usually to be omitted, as param_indices will be set via pre-slicing Returns ------- Plot Figure and Axes objects """ param_indices = param_indices or self._current_param_indices if not self._slice_is_1d_sweep(param_indices): raise ValueError( "Transition plots are only supported for a sweep over a " "single parameter. You can reduce a multi-dimensional " "sweep by pre-slicing, e.g., <ParameterSweep>[0, :, " "0].plot_transitions(...)" ) specdata_for_highlighting = self.transitions( subsystems=subsystems, initial=initial, final=final, sidebands=sidebands, photon_number=photon_number, make_positive=make_positive, as_specdata=True, param_indices=param_indices, ) specdata_all = copy.deepcopy(self[param_indices].dressed_specdata) specdata_all.energy_table -= specdata_for_highlighting.subtract # type:ignore specdata_all.energy_table /= photon_number if make_positive: specdata_all.energy_table = np.abs(specdata_all.energy_table) self.reset_preslicing() if coloring == "plain": return specdata_all.plot_evals_vs_paramvals() if "fig_ax" in kwargs: fig_ax = kwargs.pop("fig_ax") else: fig_ax = None fig_ax = specdata_all.plot_evals_vs_paramvals( color="gainsboro", linewidth=0.75, fig_ax=fig_ax ) labellines_status = plot._LABELLINES_ENABLED plot._LABELLINES_ENABLED = False fig, axes = specdata_for_highlighting.plot_evals_vs_paramvals( label_list=specdata_for_highlighting.labels, fig_ax=fig_ax, **kwargs, # type:ignore ) plot._LABELLINES_ENABLED = labellines_status return fig, axes
def keys(self): return self._data.keys()
[docs] def add_sweep( self, sweep_function: Union[str, Callable], sweep_name: Optional[str] = None, **kwargs, ) -> None: """ Add a new sweep to the ParameterSweep object. The generated data is subsequently accessible through <ParameterSweep>[<sweep_function>] or <ParameterSweep>[<sweep_name>] Parameters ---------- sweep_function: name of a sweep function in scq.sweeps as str, or custom function ( callable) provided by the user sweep_name: if given, the generated data is stored in <ParameterSweep>[<sweep_name>] rather than [<sweep_name>] kwargs: keyword arguments handed over to the sweep function Returns ------- None """ if callable(sweep_function): if not hasattr(sweep_function, "__name__") and not sweep_name: raise ValueError( "Sweep function name cannot be accessed. Provide an " "explicit `sweep_name` instead." ) sweep_name = sweep_name or sweep_function.__name__ func = sweep_function self._data[sweep_name] = generator(self, func, **kwargs) else: sweep_name = sweep_name or sweep_function func = getattr(sweeps, sweep_function) self._data[sweep_name] = func(**kwargs)
[docs] def add_matelem_sweep( self, operator: Union[str, Qobj], sweep_name: str, subsystem: QuantumSystem = None, ) -> None: """Generate data for matrix elements with respect to a given operator, as a function of the sweep parameter(s) Parameters ---------- operator: name of the operator in question (str), or full operator in Qobj form sweep_name: The sweep data will be accessible as <ParameterSweep>[<sweep_name>] subsystem: subsystems for which to compute matrix elements, required if operator is given in str form Returns ------- None; results are saved as <ParameterSweep>[<sweep_name>] """ if isinstance(operator, str): operator_func = functools.partial( sweeps.bare_matrixelement, operator_name=operator, subsystem=subsystem, ) elif isinstance(operator, Qobj): operator_func = functools.partial( sweeps.dressed_matrixelement, operator=operator, ) else: raise TypeError( "Unrecognized type of operator for matrix elements; " "expected: str or Qobj." ) matrix_element_data = generator( self, operator_func, ) self._data[sweep_name] = matrix_element_data
[docs]class ParameterSweep( # type:ignore ParameterSweepBase, dispatch.DispatchClient, serializers.Serializable ): """ Create multi-dimensional parameter sweeps for a quantum system described by a `HilbertSpace` object. Parameters ---------- hilbertspace: `HilbertSpace` object describing the quantum system of interest paramvals_by_name: Dictionary which specifies a parameter name for each set of parameter values, and the set of values to be used in the sweep. update_hilbertspace: function that updates the associated `hilbertspace` object with a given set of parameters; signature is either `update_hilbertspace(paramval1, paramval2, ...)` or `update_hilbertspace(self, paramval1, paramval2, ...)` where `self` makes the `ParameterSweep` instance available, and thereby dict-like access to subsystems and interaction terms evals_count: number of dressed eigenvalues/eigenstates to keep. (The number of bare eigenvalues/eigenstates is determined for each subsystem by `truncated_dim`.) (default: 20) subsys_update_info: To speed up calculations, the user may provide information that specifies which subsystems are being updated for each of the given parameter sweeps. This information is specified by a dictionary of the following form:: { "<parameter name 1>": [<subsystem a>], "<parameter name 2>": [<subsystem b>, <subsystem c>, ...], ... } This indicates that changes in `<parameter name 1>` only require updates of `<subsystem a>` while leaving other subsystems unchanged. Similarly, sweeping `<parameter name 2>` affects `<subsystem b>`, `<subsystem c>` etc. bare_only: if set to True, only bare eigendata is calculated; useful when performing a sweep for a single quantum system, no interaction (default: False) ignore_low_overlap: if set to False (default), bare product states and dressed eigenstates are identified if `|<psi_bare|psi_dressed>|^2 > 0.5`; if True, then identification will always take place based on which bare product state has the maximum overlap autorun: Determines whether to directly run the sweep or delay it until `.run()` is called manually. (Default: `settings.AUTORUN_SWEEP=True`) deepcopy: if set to True, the parameter sweep is run with an exact copy of the Hilbert space; this ensures that all parameters after the sweep are identical to parameters before the sweep. Note: changing global HilbertSpace or QuantumSystem attributes will have no effect with this option; all updates must be made via `<ParameterSweep>.hilbertspace[<id_str>] = ...` If set to False (default), updates to global instances have the expected effect. The HilbertSpace object and all its constituent parts are left in the state reached by the very final parameter update. num_cpus: number of CPU cores requested for computing the sweep (default value `settings.NUM_CPUS`) Notes ----- `ParameterSweep` supports array-like access ("pre-slicing") and dict-like access. With dict-like access via string-keywords `<ParameterSweep>[<str>]`, the following data is returned: `"evals"` and `"evecs"` dressed eigenenergies and eigenstates as `NamedSlotsNdarray`; eigenstates are decomposed in the bare product-state basis of the non-interacting subsystems' eigenbases `"bare_evals"` and `"bare_evecs"` bare eigenenergies and eigenstates as `NamedSlotsNdarray` `"lamb"`, `"chi"`, and `"kerr"` dispersive energy coefficients `"<custom sweep>"` NamedSlotsNdarray for custom data generated with `add_sweep`. Array-like access is responsible for "pre-slicing", enable lookup functionality such as `<Sweep>[p1, p2, ...].eigensys()` """ def __init__( self, hilbertspace: HilbertSpace, paramvals_by_name: Dict[str, ndarray], update_hilbertspace: Callable, evals_count: int = 20, subsys_update_info: Optional[Dict[str, List[QuantumSystem]]] = None, bare_only: bool = False, ignore_low_overlap: bool = False, autorun: bool = settings.AUTORUN_SWEEP, deepcopy: bool = False, num_cpus: Optional[int] = None, ) -> None: num_cpus = num_cpus or settings.NUM_CPUS self._parameters = Parameters(paramvals_by_name) self._hilbertspace = hilbertspace self._evals_count = evals_count self._update_hilbertspace = self.set_update_func(update_hilbertspace) self._subsys_update_info = subsys_update_info self._data: Dict[str, Any] = {} self._bare_only = bare_only self._ignore_low_overlap = ignore_low_overlap self._deepcopy = deepcopy self._num_cpus = num_cpus self._out_of_sync = False self.reset_preslicing() dispatch.CENTRAL_DISPATCH.register("PARAMETERSWEEP_UPDATE", self) dispatch.CENTRAL_DISPATCH.register("HILBERTSPACE_UPDATE", self) global _faulty_interactionterm_warning_issued if ( self.faulty_interactionterm_suspected() and not _faulty_interactionterm_warning_issued ): warnings.warn( "The interactions specified for this HilbertSpace object involve coupling operators stored as fixed " "matrices. This may be unintended, as the operators of quantum systems (specifically, their " "representation with respect to some basis) may change as a function of sweep parameters. \nFor that " "reason, it is recommended to use coupling operators specified as callable functions.\n", UserWarning, ) _faulty_interactionterm_warning_issued = True if autorun: self.run() @property def tqdm_disabled(self) -> bool: return settings.PROGRESSBAR_DISABLED or (self._num_cpus > 1)
[docs] def faulty_interactionterm_suspected(self) -> bool: """Check if any interaction terms are specified as fixed matrices""" for interactionterm in self._hilbertspace.interaction_list: if isinstance(interactionterm, (ndarray, Qobj, csc_matrix)): return True for idx_operator in interactionterm.operator_list: if isinstance(idx_operator[1], (ndarray, Qobj, csc_matrix)): return True return False
def cause_dispatch(self) -> None: initial_parameters = tuple(paramvals[0] for paramvals in self._parameters) self._update_hilbertspace(self, *initial_parameters)
[docs] @classmethod def deserialize(cls, iodata: "IOData") -> "StoredSweep": pass
[docs] def serialize(self) -> "IOData": """ Convert the content of the current class instance into IOData format. Returns ------- IOData """ initdata = { "paramvals_by_name": self._parameters.ordered_dict, "hilbertspace": self._hilbertspace, "evals_count": self._evals_count, "_data": self._data, } iodata = serializers.dict_serialize(initdata) iodata.typename = "StoredSweep" return iodata
[docs] def run(self) -> None: """Create all sweep data: bare spectral data, dressed spectral data, lookup data and custom sweep data.""" # generate one dispatch before temporarily disabling CENTRAL_DISPATCH self._out_of_sync = False self._out_of_sync_warning_issued = False self._lookup_exists = True if self._deepcopy: instance_str = dill.dumps(self.hilbertspace) stored_hilbertspace = dill.loads(instance_str) self._hilbertspace = dill.loads(instance_str) else: self.cause_dispatch() settings.DISPATCH_ENABLED = False ( self._data["bare_evals"], self._data["bare_evecs"], self._data["circuit_esys"], ) = self._bare_spectrum_sweep() if not self._bare_only: self._data["evals"], self._data["evecs"] = self._dressed_spectrum_sweep() self._data["dressed_indices"] = self.generate_lookup() ( self._data["lamb"], self._data["chi"], self._data["kerr"], ) = self._dispersive_coefficients() if self._deepcopy: self._hilbertspace = stored_hilbertspace # restore original state settings.DISPATCH_ENABLED = True
def _bare_spectrum_sweep(self) -> Tuple[NamedSlotsNdarray, NamedSlotsNdarray]: """ The bare energy spectra are computed according to the following scheme. 1. Perform a loop over all subsystems to separately obtain the bare energy eigenvalues and eigenstates for each subsystems. 2. If `update_subsystem_info` is given, remove those sweeps that leave the subsystems fixed. 3. If self._num_cpus > 1, parallelize. Returns ------- NamedSlotsNdarray[<paramname1>, <paramname2>, ..., "subsys"] for evals, likewise for evecs; here, "subsys": 0, 1, ... enumerates subsystems and """ bare_evals = np.empty((self.subsystem_count,), dtype=object) bare_evecs = np.empty((self.subsystem_count,), dtype=object) # creating data arrays for subsystems, to store the esys for all subsystems when HD is used circuit_esys = np.empty((self.subsystem_count,), dtype=object) for subsys_index, subsystem in enumerate(self.hilbertspace): bare_esys = self._subsys_bare_spectrum_sweep(subsystem) if ( hasattr(subsystem, "hierarchical_diagonalization") and subsystem.hierarchical_diagonalization ): evals = np.empty_like(bare_esys[..., 0]) evecs = np.empty_like(bare_esys[..., 0]) for array_index, esys in np.ndenumerate(bare_esys[..., 0]): evals[array_index] = esys[0] evecs[array_index] = esys[1] else: evals = bare_esys[..., 0] evecs = bare_esys[..., 1] bare_evals[subsys_index] = NamedSlotsNdarray( np.asarray(evals.tolist()), self._parameters.paramvals_by_name, ) bare_evecs[subsys_index] = NamedSlotsNdarray( np.asarray(evecs.tolist()), self._parameters.paramvals_by_name, ) circuit_esys[subsys_index] = ( bare_esys # when param =(p0, p1, p2, ...), subsys i esys is circuit_esys[i][p0, p1, p3, ...] ) return ( NamedSlotsNdarray(bare_evals, {"subsys": np.arange(self.subsystem_count)}), NamedSlotsNdarray(bare_evecs, {"subsys": np.arange(self.subsystem_count)}), NamedSlotsNdarray( circuit_esys, {"subsys": np.arange(self.subsystem_count)} ), ) def _update_subsys_compute_esys( self, update_func: Callable, subsystem: QuantumSystem, paramval_tuple: Tuple[float], ) -> ndarray: update_func(self, *paramval_tuple) # use the Circuit method to return esys for all the subsystems when HD is used if isinstance(subsystem, (scq.Circuit, scq.core.circuit.Subsystem)): return subsystem.generate_bare_eigensys() evals, evecs = subsystem.eigensys(evals_count=subsystem.truncated_dim) esys_array = np.empty(shape=(2,), dtype=object) esys_array[0] = evals esys_array[1] = evecs return esys_array def _paramnames_no_subsys_update(self, subsystem) -> List[str]: if self._subsys_update_info is None: return [] updating_parameters = [ name for name in self._subsys_update_info.keys() if subsystem in self._subsys_update_info[name] ] return list(set(self._parameters.names) - set(updating_parameters)) def _subsys_bare_spectrum_sweep(self, subsystem) -> ndarray: """ Parameters ---------- subsystem: subsystem for which the bare spectrum sweep is to be computed Returns ------- multidimensional array of the format array[p1, p2, p3, ..., pN] = np.asarray[[evals, evecs]] """ fixed_paramnames = self._paramnames_no_subsys_update(subsystem) reduced_parameters = self._parameters.create_reduced(fixed_paramnames) total_count = np.prod([len(param_vals) for param_vals in reduced_parameters]) target_map = cpu_switch.get_map_method(self._num_cpus) with utils.InfoBar( "Parallel compute bare eigensys for subsystem {} [num_cpus={}]".format( subsystem.id_str, self._num_cpus ), self._num_cpus, ) as p: bare_eigendata = tqdm( target_map( functools.partial( self._update_subsys_compute_esys, self._update_hilbertspace, subsystem, ), itertools.product(*reduced_parameters.paramvals_list), ), total=total_count, desc="Bare spectra", leave=False, disable=self.tqdm_disabled, ) bare_eigendata = np.asarray(list(bare_eigendata), dtype=object) bare_eigendata = bare_eigendata.reshape((*reduced_parameters.counts, 2)) # Bare spectral data was only computed once for each parameter that has no # update effect on the subsystems. Now extend the array to reflect this # for the full parameter array by repeating for name in fixed_paramnames: index = self._parameters.index_by_name[name] param_count = self._parameters.counts[index] bare_eigendata = np.repeat(bare_eigendata, param_count, axis=index) return bare_eigendata def _update_and_compute_dressed_esys( self, hilbertspace: HilbertSpace, evals_count: int, update_func: Callable, paramindex_tuple: Tuple[int], ) -> ndarray: paramval_tuple = self._parameters[paramindex_tuple] update_func(self, *paramval_tuple) assert self._data is not None bare_esys: Dict[int, List[ndarray]] = { subsys_index: [ self._data["bare_evals"][subsys_index][paramindex_tuple], self._data["bare_evecs"][subsys_index][paramindex_tuple], ] for subsys_index, _ in enumerate(self.hilbertspace) } # update the lookuptables for subsystems using hierarchical diagonalization for subsys_index, subsys in enumerate(hilbertspace.subsystem_list): if ( hasattr(subsys, "hierarchical_diagonalization") and subsys.hierarchical_diagonalization ): subsys.set_bare_eigensys( self._data["circuit_esys"][subsys_index][paramindex_tuple] ) if hasattr( hilbertspace.subsystem_list[0], "parent" ): # update necessary interactions and attributes hilbertspace.subsystem_list[0].parent.update(calculate_bare_esys=False) evals, evecs = hilbertspace.eigensys( evals_count=evals_count, bare_esys=bare_esys # type:ignore ) esys_array = np.empty(shape=(2,), dtype=object) esys_array[0] = evals esys_array[1] = evecs return esys_array def _dressed_spectrum_sweep( self, ) -> Tuple[NamedSlotsNdarray, NamedSlotsNdarray]: """ Returns ------- NamedSlotsNdarray[<paramname1>, <paramname2>, ...] of eigenvalues, likewise for eigenvectors """ target_map = cpu_switch.get_map_method(self._num_cpus) total_count = np.prod(self._parameters.counts) with utils.InfoBar( "Parallel compute dressed eigensys [num_cpus={}]".format(self._num_cpus), self._num_cpus, ) as p: spectrum_data = list( tqdm( target_map( functools.partial( self._update_and_compute_dressed_esys, self._hilbertspace, self._evals_count, self._update_hilbertspace, ), itertools.product(*self._parameters.ranges), ), total=total_count, desc="Dressed spectrum", leave=False, disable=self.tqdm_disabled, ) ) spectrum_data_ndarray = np.asarray(spectrum_data, dtype=object) spectrum_data_ndarray = spectrum_data_ndarray.reshape( (*self._parameters.counts, 2) ) slotparamvals_by_name = OrderedDict(self._parameters.ordered_dict.copy()) evals = np.asarray(spectrum_data_ndarray[..., 0].tolist()) evecs = spectrum_data_ndarray[..., 1] return ( NamedSlotsNdarray(evals, slotparamvals_by_name), NamedSlotsNdarray(evecs, slotparamvals_by_name), ) def _energies_1(self, subsys): bare_label = np.zeros(len(self.hilbertspace)) bare_label[self.get_subsys_index(subsys)] = 1 energies_all_l = np.empty(self._parameters.counts + (subsys.truncated_dim,)) for l in range(subsys.truncated_dim): energies_all_l[..., l] = self[:].energy_by_bare_index(tuple(l * bare_label)) return energies_all_l def _energies_2(self, subsys1, subsys2): bare_label1 = np.zeros(len(self.hilbertspace)) bare_label1[self.get_subsys_index(subsys1)] = 1 bare_label2 = np.zeros(len(self.hilbertspace)) bare_label2[self.get_subsys_index(subsys2)] = 1 energies_all_l1_l2 = np.empty( self._parameters.counts + (subsys1.truncated_dim,) + (subsys2.truncated_dim,) ) for l1 in range(subsys1.truncated_dim): for l2 in range(subsys2.truncated_dim): energies_all_l1_l2[..., l1, l2] = self[:].energy_by_bare_index( tuple(l1 * bare_label1 + l2 * bare_label2) ) return energies_all_l1_l2 def _dispersive_coefficients( self, ) -> Tuple[NamedSlotsNdarray, NamedSlotsNdarray, NamedSlotsNdarray]: energy_0 = self[:].energy_by_dressed_index(0).toarray() lamb_data = np.empty(self.subsystem_count, dtype=object) kerr_data = np.empty((self.subsystem_count, self.subsystem_count), dtype=object) chi_data = np.empty((self.subsystem_count, self.subsystem_count), dtype=object) # Lamb shifts for subsys_index1, subsys1 in enumerate(self.hilbertspace): energy_subsys1_all_l1 = self._energies_1(subsys1) bare_energy_subsys1_all_l1 = self["bare_evals"][subsys_index1].toarray() lamb_subsys1_all_l1 = ( energy_subsys1_all_l1 - energy_0[..., None] - bare_energy_subsys1_all_l1 + bare_energy_subsys1_all_l1[..., 0][..., None] ) lamb_data[subsys_index1] = NamedSlotsNdarray( lamb_subsys1_all_l1, self._parameters.paramvals_by_name ) # Kerr and ac Stark for subsys_index1, subsys1 in enumerate(self.hilbertspace): energy_subsys1_all_l1 = self._energies_1(subsys1) for subsys_index2, subsys2 in enumerate(self.hilbertspace): energy_subsys2_all_l2 = self._energies_1(subsys2) energy_subsys1_subsys2_all_l1_l2 = self._energies_2(subsys1, subsys2) kerr_subsys1_subsys2_all_l1_l2 = ( energy_subsys1_subsys2_all_l1_l2 + energy_0[..., None, None] - energy_subsys1_all_l1[..., :, None] - energy_subsys2_all_l2[..., None, :] ) # self-Kerr and cross-Kerr: oscillator modes if subsys1 in self.osc_subsys_list and subsys2 in self.osc_subsys_list: if subsys1 is subsys2: # oscillator self-Kerr kerr_subsys1_subsys2_all_l1_l2 /= 2.0 # osc self-Kerr: 1/2 kerr_data[subsys_index1, subsys_index2] = NamedSlotsNdarray( kerr_subsys1_subsys2_all_l1_l2[..., 1, 1], self._parameters.paramvals_by_name, ) chi_data[subsys_index1, subsys_index2] = np.asarray([]) # self-Kerr and cross-Kerr: qubit modes elif ( subsys1 in self.qbt_subsys_list and subsys2 in self.qbt_subsys_list ): kerr_data[subsys_index1, subsys_index2] = NamedSlotsNdarray( kerr_subsys1_subsys2_all_l1_l2, self._parameters.paramvals_by_name, ) chi_data[subsys_index1, subsys_index2] = np.asarray([]) # ac Stark shifts else: if subsys1 in self.qbt_subsys_list: chi_data[subsys_index1, subsys_index2] = NamedSlotsNdarray( kerr_subsys1_subsys2_all_l1_l2[..., 1, :], self._parameters.paramvals_by_name, ) else: chi_data[subsys_index1, subsys_index2] = NamedSlotsNdarray( kerr_subsys1_subsys2_all_l1_l2[..., :, 1], self._parameters.paramvals_by_name, ) kerr_data[subsys_index1, subsys_index2] = np.asarray([]) sys_indices = np.arange(self.subsystem_count) lamb_data = NamedSlotsNdarray(lamb_data, {"subsys": sys_indices}) kerr_data = NamedSlotsNdarray( kerr_data, {"subsys1": sys_indices, "subsys2": sys_indices} ) chi_data = NamedSlotsNdarray( chi_data, {"subsys1": sys_indices, "subsys2": sys_indices} ) return lamb_data, chi_data, kerr_data
[docs]class StoredSweep( ParameterSweepBase, dispatch.DispatchClient, serializers.Serializable ): _parameters = descriptors.WatchedProperty(Parameters, "PARAMETERSWEEP_UPDATE") _evals_count = descriptors.WatchedProperty(int, "PARAMETERSWEEP_UPDATE") _data = descriptors.WatchedProperty(Dict[str, ndarray], "PARAMETERSWEEP_UPDATE") _hilbertspace: HilbertSpace def __init__( self, paramvals_by_name: Dict[str, ndarray], hilbertspace: HilbertSpace, evals_count: int, _data, ) -> None: self._lookup_exists = True self._parameters = Parameters(paramvals_by_name) self._hilbertspace = hilbertspace self._evals_count = evals_count self._data = _data self._out_of_sync = False self._current_param_indices: NpIndices = slice(None, None, None)
[docs] @classmethod def deserialize(cls, iodata: "IOData") -> "StoredSweep": """ Take the given IOData and return an instance of the described class, initialized with the data stored in io_data. Parameters ---------- iodata: IOData Returns ------- StoredSweep """ return StoredSweep(**iodata.as_kwargs())
[docs] def serialize(self) -> "IOData": pass
def get_hilbertspace(self) -> HilbertSpace: return self.hilbertspace def new_sweep( self, paramvals_by_name: Dict[str, ndarray], update_hilbertspace: Callable, evals_count: int = 6, subsys_update_info: Optional[Dict[str, List[QuantumSystem]]] = None, autorun: bool = settings.AUTORUN_SWEEP, num_cpus: Optional[int] = None, ) -> ParameterSweep: return ParameterSweep( self.hilbertspace, paramvals_by_name, update_hilbertspace, evals_count=evals_count, subsys_update_info=subsys_update_info, autorun=autorun, num_cpus=num_cpus, )
[docs]def generator(sweep: "ParameterSweepBase", func: Callable, **kwargs) -> np.ndarray: """Method for computing custom data as a function of the external parameter, calculated via the function `func`. Parameters ---------- sweep: ParameterSweep object containing HilbertSpace and spectral information func: signature: `func(parametersweep, paramindex_tuple, paramvals_tuple, **kwargs)`, specifies how to calculate the data for a single choice of parameter(s) **kwargs: keyword arguments to be included in func Returns ------- array of custom data """ # obtain reduced parameters from pre-slicing info reduced_parameters = sweep._parameters.create_sliced( sweep._current_param_indices, remove_fixed=False ) total_count = np.prod(reduced_parameters.counts) def func_effective(paramindex_tuple: Tuple[int], params, **kw) -> Any: paramvals_tuple = params[paramindex_tuple] return func( sweep, paramindex_tuple=paramindex_tuple, paramvals_tuple=paramvals_tuple, **kw, ) if hasattr(func, "__name__"): func_name = func.__name__ else: func_name = "" data_array = list( tqdm( map( functools.partial( func_effective, params=reduced_parameters, **kwargs, ), itertools.product(*reduced_parameters.ranges), ), total=total_count, desc=f"sweeping {func_name}", leave=False, disable=settings.PROGRESSBAR_DISABLED, ) ) element_shape: Tuple[int, ...] = tuple() if isinstance(data_array[0], np.ndarray): element_shape = data_array[0].shape data_ndarray = np.asarray(data_array) return NamedSlotsNdarray( data_ndarray.reshape(reduced_parameters.counts + element_shape), reduced_parameters.paramvals_by_name, )