diff --git a/src/qibocal/protocols/__init__.py b/src/qibocal/protocols/__init__.py index 9ead06f72..0b43c6f70 100644 --- a/src/qibocal/protocols/__init__.py +++ b/src/qibocal/protocols/__init__.py @@ -27,7 +27,7 @@ from .flux_dependence.qubit_flux_tracking import qubit_flux_tracking from .flux_dependence.resonator_crosstalk import resonator_crosstalk from .flux_dependence.resonator_flux_dependence import resonator_flux -from .qua import rb_ondevice +from .qua import rb_ondevice, rb_qua_two_qubit from .qubit_power_spectroscopy import qubit_power_spectroscopy from .qubit_spectroscopy import qubit_spectroscopy from .qubit_spectroscopy_ef import qubit_spectroscopy_ef @@ -152,4 +152,5 @@ "optimize_two_qubit_gate", "ramsey_zz", "rb_ondevice", + "rb_qua_two_qubit", ] diff --git a/src/qibocal/protocols/qua/__init__.py b/src/qibocal/protocols/qua/__init__.py index deeb29b40..d4b4df2e0 100644 --- a/src/qibocal/protocols/qua/__init__.py +++ b/src/qibocal/protocols/qua/__init__.py @@ -1 +1,2 @@ from .rb_ondevice import rb_ondevice +from .rb_two_qubit import rb_qua_two_qubit diff --git a/src/qibocal/protocols/qua/rb_two_qubit.py b/src/qibocal/protocols/qua/rb_two_qubit.py new file mode 100644 index 000000000..d56c1837b --- /dev/null +++ b/src/qibocal/protocols/qua/rb_two_qubit.py @@ -0,0 +1,215 @@ +from dataclasses import dataclass +from typing import Optional + +import mpld3 +import numpy as np +from qibolab.platform import Platform +from qibolab.qubits import QubitId, QubitPairId +from qm.qua import align, assign, declare, dual_demod, fixed, measure, save, wait +from qualang_tools.bakery.bakery import Baking + +from qibocal.auto.operation import Parameters, Results, Routine + +from .configuration import generate_config +from .two_qubit_rb import QuaTwoQubitRbData, TwoQubitRb + + +@dataclass +class QuaTwoQubitRbParameters(Parameters): + circuit_depths: list[int] + """How many consecutive Clifford gates within one executed circuit + + (https://qiskit.org/documentation/apidoc/circuit.html) + """ + num_circuits_per_depth: int + """How many random circuits within one depth.""" + num_shots_per_circuit: int + """Repetitions of the same circuit (averaging).""" + debug: Optional[str] = None + """Dump QUA script and config in a file with this name.""" + + +def _acquisition( + params: QuaTwoQubitRbParameters, platform: Platform, targets: list[QubitPairId] +) -> QuaTwoQubitRbData: + assert len(targets) == 1 + qubit1, qubit2 = targets[0] + try: + qubit1 = int(qubit1) + qubit2 = int(qubit2) + except ValueError: + pass + + cz_sequence, phases = platform.pairs[(qubit1, qubit2)].native_gates.CZ.sequence() + if cz_sequence[0].qubit != qubit1: + if cz_sequence[0].qubit == qubit2: + qubit1, qubit2 = qubit2, qubit1 + else: + raise ValueError("Flux pulse on different qubit not supported.") + + ############################## + ## General helper functions ## + ############################## + def prep(): + # thermal preparation in clock cycles + wait(params.relaxation_time // 4) + align() + + def multiplexed_readout(I, I_st, Q, Q_st, qubits): + """Perform multiplexed readout on two resonators""" + for ind, qb in enumerate(qubits): + measure( + "measure", + f"readout{qb}", + None, + dual_demod.full("cos", "out1", "sin", "out2", I[ind]), + dual_demod.full("minus_sin", "out1", "cos", "out2", Q[ind]), + ) + if I_st is not None: + save(I[ind], I_st[ind]) + if Q_st is not None: + save(Q[ind], Q_st[ind]) + + def discriminate(target, I, Q, state): + threshold = platform.qubits[target].threshold + iq_angle = platform.qubits[target].iq_angle + cos = np.cos(iq_angle) + sin = np.sin(iq_angle) + assign(state, I * cos - Q * sin > threshold) + + def meas(): + I1 = declare(fixed) + I2 = declare(fixed) + Q1 = declare(fixed) + Q2 = declare(fixed) + state1 = declare(bool) + state2 = declare(bool) + # readout macro for multiplexed readout + multiplexed_readout([I1, I2], None, [Q1, Q2], None, qubits=[qubit1, qubit2]) + discriminate(qubit1, I1, Q1, state1) + discriminate(qubit2, I2, Q2, state2) + return state1, state2 + + ############################## + ## Two-qubit RB functions ## + ############################## + # single qubit generic gate constructor Z^{z}Z^{a}X^{x}Z^{-a} + # that can reach any point on the Bloch sphere (starting from arbitrary points) + def bake_phased_xz(baker: Baking, q, x, z, a): + if q == 1: + element = f"drive{qubit1}" + else: + element = f"drive{qubit2}" + + baker.frame_rotation_2pi(a / 2, element) + baker.play("x180", element, amp=x) + baker.frame_rotation_2pi(-(a + z) / 2, element) + + # single qubit phase corrections in units of 2pi applied after the CZ gate + qubit1_frame_update = phases[qubit1] / (2 * np.pi) + qubit2_frame_update = phases[qubit2] / (2 * np.pi) + + # defines the CZ gate that realizes the mapping |00> -> |00>, |01> -> |01>, |10> -> |10>, |11> -> -|11> + def bake_cz(baker: Baking, q1, q2): + q1_xy_element = f"drive{qubit1}" + q2_xy_element = f"drive{qubit2}" + q1_z_element = f"flux{qubit1}" + + baker.play("cz", q1_z_element) + baker.align() + baker.frame_rotation_2pi(qubit1_frame_update, q1_xy_element) + baker.frame_rotation_2pi(qubit2_frame_update, q2_xy_element) + baker.align() + + ############################## + ## Two-qubit RB execution ## + ############################## + controller = platform._controller + qmm = controller.manager + + # create RB experiment from configuration and defined functions + config = generate_config(platform, platform.qubits.keys(), targets=[qubit1, qubit2]) + + # for debugging when there is an error + # from qm import generate_qua_script + # from qm.qua import program + # with open("rb2q_qua_config.py", "w") as file: + # with program() as prog: + # align() + # file.write(generate_qua_script(prog, config)) + + rb = TwoQubitRb( + config=config, + single_qubit_gate_generator=bake_phased_xz, + two_qubit_gate_generators={ + "CZ": bake_cz + }, # can also provide e.g. "CNOT": bake_cnot + prep_func=prep, + measure_func=meas, + interleaving_gate=None, + # interleaving_gate=[cirq.CZ(cirq.LineQubit(0), cirq.LineQubit(1))], + verify_generation=False, + ) + + data = rb.run( + qmm, + circuit_depths=params.circuit_depths, + num_circuits_per_depth=params.num_circuits_per_depth, + num_shots_per_circuit=params.num_shots_per_circuit, + offsets=[ + (qb.flux.name, qb.sweetspot) + for qb in platform.qubits.values() + if qb.flux is not None + ], + debug=params.debug, + ) + + # verify/save the random sequences created during the experiment + # rb.save_sequences_to_file( + # "sequences.txt" + # ) # saves the gates used in each random sequence + # rb.save_command_mapping_to_file( + # "commands.txt" + # ) # saves mapping from "command id" to sequence + # rb.print_sequence() + # rb.print_command_mapping() + # rb.verify_sequences() # simulates random sequences to ensure they recover to ground state. takes a while... + return data + + +@dataclass +class QuaTwoQubitRbResults(Results): + pass + + +def _fit(data: QuaTwoQubitRbData) -> QuaTwoQubitRbResults: + return QuaTwoQubitRbResults() + + +def _plot(data: QuaTwoQubitRbData, target: QubitId, fit: QuaTwoQubitRbResults): + fitting_report = "" + + figures = [ + data.plot_hist(n_cols=6, figsize=(12, len(data.circuit_depths) / 2)), + data.plot_with_fidelity(figsize=(12, 6)), + ] + + figures = [mpld3.fig_to_html(fig) for fig in figures] + return figures, fitting_report + + +# # get the interleaved gate fidelity +# from two_qubit_rb.RBResult import get_interleaved_gate_fidelity +# interleaved_gate_fidelity = get_interleaved_gate_fidelity( +# num_qubits=2, +# reference_alpha=0.12345, # replace with value from prior, non-interleaved experiment +# # interleaved_alpha=res.fit_exponential()[1], # alpha from the interleaved experiment +# ) +# print(f"Interleaved Gate Fidelity: {interleaved_gate_fidelity*100:.3f}") + + +def _update(results: QuaTwoQubitRbResults, platform: Platform, target: QubitId): + pass + + +rb_qua_two_qubit = Routine(_acquisition, _fit, _plot, _update) diff --git a/src/qibocal/protocols/qua/two_qubit_rb/RBBaker.py b/src/qibocal/protocols/qua/two_qubit_rb/RBBaker.py new file mode 100644 index 000000000..b9d2fdd07 --- /dev/null +++ b/src/qibocal/protocols/qua/two_qubit_rb/RBBaker.py @@ -0,0 +1,179 @@ +import copy +import json +from typing import Callable, Dict, List, Optional + +import cirq +from cirq import GateOperation +from qm.qua import align, case_, declare, for_, switch_ +from qualang_tools.bakery.bakery import Baking, baking +from tqdm import tqdm + +from .gates import GateGenerator, gate_db +from .verification.command_registry import CommandRegistry + + +class RBBaker: + def __init__( + self, + config, + single_qubit_gate_generator: Callable, + two_qubit_gate_generators: Dict[str, Callable], + interleaving_gate: Optional[List[cirq.GateOperation]] = None, + command_registry: Optional[CommandRegistry] = None, + ): + self._command_registry = command_registry + self._config = copy.deepcopy(config) + self._single_qubit_gate_generator = single_qubit_gate_generator + self._two_qubit_gate_generators = two_qubit_gate_generators + self._interleaving_gate = interleaving_gate + self._symplectic_generator = GateGenerator( + set(two_qubit_gate_generators.keys()) + ) + self._all_elements = self._collect_all_elements() + self._cmd_to_op = {} + self._op_to_baking = {} + + @property + def all_elements(self): + return self._all_elements + + @staticmethod + def _get_qubits(op): + return [q.x for q in op.qubits] + + @staticmethod + def _get_phased_xz_args(op): + return op.gate.x_exponent, op.gate.z_exponent, op.gate.axis_phase_exponent + + def _validate_two_qubit_gate_available(self, name): + if name not in self._two_qubit_gate_generators: + raise RuntimeError(f"Two qubit gate '{name}' implementation not provided.") + + def _gen_gate(self, baker: Baking, gate_op: GateOperation): + if type(gate_op.gate) == cirq.PhasedXZGate: + self._single_qubit_gate_generator( + baker, self._get_qubits(gate_op)[0], *self._get_phased_xz_args(gate_op) + ) + elif type(gate_op.gate) == cirq.ISwapPowGate and gate_op.gate.exponent == 0.5: + self._validate_two_qubit_gate_available("sqr_iSWAP") + self._two_qubit_gate_generators["sqr_iSWAP"]( + baker, *self._get_qubits(gate_op) + ) + elif type(gate_op.gate) == cirq.CNotPowGate and gate_op.gate.exponent == 1: + self._validate_two_qubit_gate_available("CNOT") + self._two_qubit_gate_generators["CNOT"](baker, *self._get_qubits(gate_op)) + elif type(gate_op.gate) == cirq.CZPowGate and gate_op.gate.exponent == 1: + self._validate_two_qubit_gate_available("CZ") + self._two_qubit_gate_generators["CZ"](baker, *self._get_qubits(gate_op)) + else: + raise RuntimeError("unsupported gate") + + def _collect_all_elements(self): + config = copy.deepcopy(self._config) + qes = set() + for cmd_id, command in enumerate(gate_db.commands): + if self._command_registry is not None: + self._command_registry.set_current_command_id(cmd_id) + with baking(config) as b: + self._update_baking_from_cmd_id(b, cmd_id) + qes.update(b.get_qe_set()) + b.update_config = False + if self._interleaving_gate is not None: + if self._command_registry is not None: + self._command_registry.set_current_command_id(cmd_id + 1) + with baking(config) as b: + self._update_baking_from_gates(b, self._interleaving_gate) + qes.update(b.get_qe_set()) + b.update_config = False + if self._command_registry is not None: + self._command_registry.finish() + return qes + + def _update_baking_from_gates(self, b: Baking, gate_ops, elements=None): + prev_gate_qubits = [] + for gate_op in gate_ops: + gate_qubits = self._get_qubits(gate_op) + if len(gate_qubits) != len(prev_gate_qubits) and elements is not None: + b.align(*elements) + prev_gate_qubits = gate_qubits + self._gen_gate(b, gate_op) + if elements is not None: + b.align(*elements) + + def gates_from_cmd_id(self, cmd_id): + if 0 <= cmd_id < len(gate_db.commands): + gate_ops = self._symplectic_generator.generate(cmd_id) + elif self._interleaving_gate is not None and cmd_id == len( + gate_db.commands + ): # Interleaving gate + gate_ops = self._interleaving_gate + else: + raise RuntimeError("command out of range") + return gate_ops + + def _update_baking_from_cmd_id(self, b: Baking, cmd_id, elements=None): + gate_ops = self.gates_from_cmd_id(cmd_id) + return self._update_baking_from_gates(b, gate_ops, elements) + + @staticmethod + def _unique_baker_identifier_for_qe(b: Baking, qe: str): + identifier = {"samples": b._samples_dict[qe], "info": b._qe_dict[qe]} + return json.dumps(identifier) + + def _bake_all_ops(self, config: dict): + waveform_id_per_qe = {qe: 0 for qe in self._all_elements} + waveform_to_baking = {qe: {} for qe in self._all_elements} + cmd_to_op = {qe: {} for qe in self._all_elements} + op_to_baking = {qe: [] for qe in self._all_elements} + num_of_commands = len(gate_db.commands) + ( + 0 if self._interleaving_gate is None else 1 + ) + for cmd_id in tqdm( + range(num_of_commands), + desc="Baking pulses which comprise Cliffords", + unit="command", + ): + with baking(config) as b: + self._update_baking_from_cmd_id(b, cmd_id, self._all_elements) + any_qe_used = False + for qe in self._all_elements: + key = self._unique_baker_identifier_for_qe(b, qe) + if key not in waveform_to_baking[qe]: + waveform_to_baking[qe][key] = waveform_id_per_qe[qe], b + op_to_baking[qe].append(b) + waveform_id_per_qe[qe] += 1 + any_qe_used = True + cmd_to_op[qe][cmd_id] = waveform_to_baking[qe][key][0] + b.update_config = any_qe_used + return cmd_to_op, op_to_baking + + def bake(self) -> dict: + config = copy.deepcopy(self._config) + self._cmd_to_op, self._op_to_baking = self._bake_all_ops(config) + return config + + def decode(self, cmd_id, element): + return self._cmd_to_op[element][cmd_id] + + @staticmethod + def _run_baking_for_qe(b: Baking, qe: str): + orig_get_qe_set = b.get_qe_set + b.get_qe_set = lambda: {qe} + b.run() + b.get_qe_set = orig_get_qe_set + + def run(self, op_list_per_qe: dict, length, unsafe=True): + if set(op_list_per_qe.keys()) != self._all_elements: + raise RuntimeError( + f"must specify ops for all elements: {', '.join(self._all_elements)} " + ) + + align() + for qe, op_list in op_list_per_qe.items(): + cmd_i = declare(int) + with for_(cmd_i, 0, cmd_i < length, cmd_i + 1): + with switch_(op_list[cmd_i], unsafe=unsafe): + for op_id, b in enumerate(self._op_to_baking[qe]): + with case_(op_id): + self._run_baking_for_qe(b, qe) + align() diff --git a/src/qibocal/protocols/qua/two_qubit_rb/RBResult.py b/src/qibocal/protocols/qua/two_qubit_rb/RBResult.py new file mode 100644 index 000000000..e1dca31f5 --- /dev/null +++ b/src/qibocal/protocols/qua/two_qubit_rb/RBResult.py @@ -0,0 +1,177 @@ +from dataclasses import dataclass + +import numpy as np +import numpy.typing as npt +import xarray as xr +from matplotlib import pyplot as plt +from scipy.optimize import curve_fit + +from qibocal.auto.operation import Data + + +@dataclass +class QuaTwoQubitRbData(Data): + """ + Class for analyzing and visualizing the results of a Randomized Benchmarking (RB) experiment. + + Attributes: + circuit_depths (list[int]): List of circuit depths used in the RB experiment. + num_repeats (int): Number of repeated sequences at each circuit depth. + num_averages (int): Number of averages for each sequence. + state (np.ndarray): Measured states from the RB experiment. + """ + + circuit_depths: list[int] + num_repeats: int + num_averages: int + data: dict[int, npt.NDArray] + + def __post_init__(self): + """ + Initializes the xarray Dataset to store the RB experiment data. + """ + self.xdata = xr.Dataset( + data_vars={"state": (["circuit_depth", "repeat", "average"], self.data[0])}, + coords={ + "circuit_depth": self.circuit_depths, + "repeat": range(self.num_repeats), + "average": range(self.num_averages), + }, + ) + + def plot_hist(self, n_cols=3, figsize=None): + """ + Plots histograms of the N-qubit state distribution at each circuit depth. + + Args: + n_cols (int): Number of columns in the plot grid. Adjusted if fewer circuit depths are provided. + """ + if len(self.circuit_depths) < n_cols: + n_cols = len(self.circuit_depths) + n_rows = max(int(np.ceil(len(self.circuit_depths) / n_cols)), 1) + fig = plt.figure(figsize=figsize) + for i, circuit_depth in enumerate(self.circuit_depths, start=1): + ax = plt.subplot(n_rows, n_cols, i) + self.xdata.state.sel(circuit_depth=circuit_depth).plot.hist( + ax=ax, xticks=range(4) + ) + plt.tight_layout() + return fig + + def plot(self): + """ + Plots the raw recovery probability decay curve as a function of circuit depth. + The curve is plotted using the averaged probability and without any fitting. + """ + recovery_probability = (self.xdata.state == 0).sum(("repeat", "average")) / ( + self.num_repeats * self.num_averages + ) + recovery_probability.rename("Recovery Probability").plot.line() + + def plot_with_fidelity(self, figsize=None): + """ + Plots the RB fidelity as a function of circuit depth, including a fit to an exponential decay model. + The fitted curve is overlaid with the raw data points. + """ + A, alpha, B = self.fit_exponential() + fidelity = self.get_fidelity(alpha) + + fig = plt.figure(figsize=figsize) + plt.plot(self.circuit_depths, self.get_decay_curve(), "o", label="Data") + plt.plot( + self.circuit_depths, + rb_decay_curve(np.array(self.circuit_depths), A, alpha, B), + "-", + label=f"Fidelity={fidelity*100:.3f}%\nalpha={alpha:.4f}", + ) + plt.xlabel("Circuit Depth") + plt.ylabel("Fidelity") + plt.title("2Q Randomized Benchmarking Fidelity") + plt.legend() + return fig + + def fit_exponential(self): + """ + Fits the decay curve of the RB data to an exponential model. + + Returns: + tuple: Fitted parameters (A, alpha, B) where: + - A is the amplitude. + - alpha is the decay constant. + - B is the offset. + """ + decay_curve = self.get_decay_curve() + + popt, _ = curve_fit( + rb_decay_curve, + self.circuit_depths, + decay_curve, + # p0=[0.75, -0.1, 0.25], + maxfev=10000, + ) + A, alpha, B = popt + + return A, alpha, B + + def get_fidelity(self, alpha): + """ + Calculates the average fidelity per Clifford based on the decay constant. + + Args: + alpha (float): Decay constant from the exponential fit. + + Returns: + float: Estimated average fidelity per Clifford. + """ + n_qubits = 2 # Assuming 2 qubits as per the context + d = 2**n_qubits + r = 1 - alpha - (1 - alpha) / d + fidelity = 1 - r + + return fidelity + + def get_decay_curve(self): + """ + Calculates the decay curve from the RB data. + + Returns: + np.ndarray: Decay curve representing the fidelity as a function of circuit depth. + """ + return (self.xdata.state == 0).sum(("repeat", "average")) / ( + self.num_repeats * self.num_averages + ) + + +def rb_decay_curve(x, A, alpha, B): + """ + Exponential decay model for RB fidelity. + + Args: + x (array-like): Circuit depths. + A (float): Amplitude of the decay. + alpha (float): Decay constant. + B (float): Offset of the curve. + + Returns: + np.ndarray: Calculated decay curve. + """ + return A * alpha**x + B + + +def get_interleaved_gate_fidelity( + num_qubits: int, reference_alpha: float, interleaved_alpha: float +): + """ + Calculates the interleaved gate fidelity using the formula from https://arxiv.org/pdf/1210.7011. + + Args: + num_qubits (int): Number of qubits involved. + reference_alpha (float): Decay constant from the reference RB experiment. + interleaved_alpha (float): Decay constant from the interleaved RB experiment. + + Returns: + float: Estimated interleaved gate fidelity. + """ + return 1 - ( + (2**num_qubits - 1) * (1 - interleaved_alpha / reference_alpha) / 2**num_qubits + ) diff --git a/src/qibocal/protocols/qua/two_qubit_rb/TwoQubitRB.py b/src/qibocal/protocols/qua/two_qubit_rb/TwoQubitRB.py new file mode 100644 index 000000000..d074025b6 --- /dev/null +++ b/src/qibocal/protocols/qua/two_qubit_rb/TwoQubitRB.py @@ -0,0 +1,341 @@ +from pathlib import Path +from typing import Callable, Dict, List, Literal, Optional, Tuple, Union + +import cirq +import numpy as np +from qm import QuantumMachinesManager, generate_qua_script +from qm.jobs.running_qm_job import RunningQmJob +from qm.qua import * # nopycln: import +from qualang_tools.bakery.bakery import Baking + +from .gates import GateGenerator, gate_db, tableau_from_cirq +from .RBBaker import RBBaker +from .RBResult import QuaTwoQubitRbData +from .simple_tableau import SimpleTableau +from .util import pbar, run_in_thread +from .verification.command_registry import ( + CommandRegistry, + decorate_single_qubit_generator_with_command_recording, + decorate_two_qubit_gate_generator_with_command_recording, +) +from .verification.sequence_tracker import SequenceTracker + + +class TwoQubitRb: + _buffer_length = 4096 + + def __init__( + self, + config: dict, + single_qubit_gate_generator: Callable[[Baking, int, float, float, float], None], + two_qubit_gate_generators: Dict[ + Literal["sqr_iSWAP", "CNOT", "CZ"], Callable[[Baking, int, int], None] + ], + prep_func: Callable[[], None], + measure_func: Callable[[], Tuple], + verify_generation: bool = False, + interleaving_gate: Optional[List[cirq.GateOperation]] = None, + ): + """ + A class for running two qubit randomized benchmarking experiments. + + This class is used to generate the experiment configuration and run the experiment. + The experiment is run by calling the run method. + + Gate generation is performed using the `Baking`[https://github.com/qua-platform/py-qua-tools/blob/main/qualang_tools/bakery/README.md] class. + This class adds to QUA the ability to generate arbitrary waveforms ("baked waveforms") using syntax similar to QUA. + + Args: + config: A QUA configuration containing the configuration for the experiment. + + single_qubit_gate_generator: A callable used to generate a single qubit gate using a signature similar to `phasedXZ`[https://quantumai.google/reference/python/cirq/PhasedXZGate]. + This is done using the baking object (see above). + Note that this allows us to execute every type of single qubit gate. + Callable arguments: + baking: The baking object. + qubit: The qubit number. + x: The x rotation exponent. + z: The z rotation exponent. + a: the axis phase exponent. + + two_qubit_gate_generators: A dictionary mapping one or more two qubit gate names to callables used to generate those gates. + This is done using the baking object (see above). + Callable arguments: + baking: The baking object. + qubit1: The first qubit number. + qubit2: The second qubit number. + This callable should generate a two qubit gate. + + prep_func: A callable used to reset the qubits to the |00> state. This function does not use the baking object, and is a proper QUA code macro. + Callable arguments: None + + measure_func: A callable used to measure the qubits. This function does not use the baking object, and is a proper QUA code macro. + Callable[[], Tuple[_Expression, _Expression]]: A tuple containing the measured values of the two qubits as Qua expressions. + The expression must evaluate to a boolean value. False means |0>, True means |1>. The MSB is the first qubit. + + verify_generation: A boolean indicating whether to verify the generated sequences. Not be used in production, as it is very slow. + + interleaving_gate: Interleaved gate represented as list of cirq GateOperation + """ + for i, qe in config["elements"].items(): + if "operations" not in qe: + qe["operations"] = {} + + self._command_registry = CommandRegistry() + self._sequence_tracker = SequenceTracker( + command_registry=self._command_registry + ) + + single_qubit_gate_generator = ( + decorate_single_qubit_generator_with_command_recording( + single_qubit_gate_generator, self._command_registry + ) + ) + two_qubit_gate_generators = ( + decorate_two_qubit_gate_generator_with_command_recording( + two_qubit_gate_generators, self._command_registry + ) + ) + self._rb_baker = RBBaker( + config, + single_qubit_gate_generator, + two_qubit_gate_generators, + interleaving_gate, + self._command_registry, + ) + + self._interleaving_gate = interleaving_gate + self._interleaving_tableau = ( + tableau_from_cirq(interleaving_gate) + if interleaving_gate is not None + else None + ) + self._config = self._rb_baker.bake() + self._symplectic_generator = GateGenerator( + set(two_qubit_gate_generators.keys()) + ) + self._prep_func = prep_func + self._measure_func = measure_func + self._verify_generation = verify_generation + + def convert_sequence_to_cirq(self, sequence: List[int]) -> List[cirq.GateOperation]: + gates = [] + for cmd_id in sequence: + gates.extend(self._rb_baker.gates_from_cmd_id(cmd_id)) + return gates + + def _verify_rb_sequence(self, gate_ids, final_tableau: SimpleTableau): + if final_tableau != SimpleTableau(np.eye(4), [0, 0, 0, 0]): + raise RuntimeError("Verification of RB sequence failed") + gates = [] + for gate_id in gate_ids: + if gate_id == gate_db.get_interleaving_gate(): + gates.extend(self._interleaving_gate) + else: + gates.extend(self._symplectic_generator.generate(gate_id)) + + unitary = cirq.Circuit(gates).unitary() + fixed_phase_unitary = np.conj(np.trace(unitary) / 4) * unitary + if np.linalg.norm(fixed_phase_unitary - np.eye(4)) > 1e-12: + raise RuntimeError("Verification of RB sequence failed") + + def _gen_rb_sequence(self, depth): + gate_ids = [] + tableau = SimpleTableau(np.eye(4), [0, 0, 0, 0]) + for i in range(depth): + symplectic = gate_db.rand_symplectic() + pauli = gate_db.rand_pauli() + gate_ids.append(symplectic) + gate_ids.append(pauli) + + tableau = tableau.then(gate_db.get_tableau(symplectic)).then( + gate_db.get_tableau(pauli) + ) + + if self._interleaving_tableau is not None: + gate_ids.append(gate_db.get_interleaving_gate()) + tableau = tableau.then(self._interleaving_tableau) + + inv_tableau = tableau.inverse() + inv_id = gate_db.find_symplectic_gate_id_by_tableau_g(inv_tableau) + after_inv_tableau = tableau.then(gate_db.get_tableau(inv_id)) + + pauli = gate_db.find_pauli_gate_id_by_tableau_alpha(after_inv_tableau) + + gate_ids.append(inv_id) + gate_ids.append(pauli) + + if self._verify_generation: + final_tableau = after_inv_tableau.then(gate_db.get_tableau(pauli)) + self._verify_rb_sequence(gate_ids, final_tableau) + + return gate_ids + + def _gen_qua_program( + self, + sequence_depths: list[int], + num_repeats: int, + num_averages: int, + offsets: list[tuple[str, float]], + ): + with program() as prog: + sequence_depth = declare(int) + repeat = declare(int) + n_avg = declare(int) + state = declare(int) + length = declare(int) + progress = declare(int) + progress_os = declare_stream() + state_os = declare_stream() + gates_len_is = declare_input_stream(int, name="__gates_len_is__", size=1) + gates_is = { + qe: declare_input_stream(int, name=f"{qe}_is", size=self._buffer_length) + for qe in self._rb_baker.all_elements + } + + # force offset setting due to bug in QOP320 + for qb, offset in offsets: + set_dc_offset(qb, "single", offset) + + assign(progress, 0) + with for_each_(sequence_depth, sequence_depths): + with for_(repeat, 0, repeat < num_repeats, repeat + 1): + assign(progress, progress + 1) + save(progress, progress_os) + advance_input_stream(gates_len_is) + for gate_is in gates_is.values(): + advance_input_stream(gate_is) + assign(length, gates_len_is[0]) + with for_(n_avg, 0, n_avg < num_averages, n_avg + 1): + self._prep_func() + self._rb_baker.run(gates_is, length) + out1, out2 = self._measure_func() + assign(state, (Cast.to_int(out2) << 1) + Cast.to_int(out1)) + save(state, state_os) + + with stream_processing(): + state_os.buffer(len(sequence_depths), num_repeats, num_averages).save( + "state" + ) + progress_os.save("progress") + return prog + + def _decode_sequence_for_element(self, element: str, seq: list): + seq = [self._rb_baker.decode(i, element) for i in seq] + if len(seq) > self._buffer_length: + RuntimeError("Buffer is too small") + return seq + [0] * (self._buffer_length - len(seq)) + + @run_in_thread + def _insert_all_input_stream( + self, + job: RunningQmJob, + sequence_depths: List[int], + num_repeats: int, + callback: Optional[Callable[[List[int]], None]] = None, + ): + for sequence_depth in sequence_depths: + for repeat in range(num_repeats): + sequence = self._gen_rb_sequence(sequence_depth) + if self._sequence_tracker is not None: + self._sequence_tracker.make_sequence(sequence) + job.insert_input_stream("__gates_len_is__", len(sequence)) + for qe in self._rb_baker.all_elements: + job.insert_input_stream( + f"{qe}_is", self._decode_sequence_for_element(qe, sequence) + ) + + if callback is not None: + callback(sequence) + + def run( + self, + qmm: QuantumMachinesManager, + circuit_depths: List[int], + num_circuits_per_depth: int, + num_shots_per_circuit: int, + offsets: Optional[list[tuple[str, float]]] = None, + debug: Optional[str] = None, + **kwargs, + ): + """ + Runs the randomized benchmarking experiment. The experiment is sweep over Clifford circuits with varying depths. + For every depth, we generate a number of random circuits and run them. The number of different circuits is determined by + the num_circuits_per_depth parameter. The number of shots per individual circuit is determined by the num_averages parameter. + + Args: + qmm (QuantumMachinesManager): The Quantum Machines Manager object which is used to run the experiment. + circuit_depths (List[int]): A list of the number of Cliffords per circuit (not including inverse). + num_circuits_per_depth (int): The number of different circuit randomizations per depth. + num_shots_per_circuit (int): The number of shots per particular circuit. + debug (str): If not ``None`` it dumps the QUA script and config in a file with the given name. + """ + if offsets is None: + offsets = [] + prog = self._gen_qua_program( + circuit_depths, num_circuits_per_depth, num_shots_per_circuit, offsets + ) + + if debug is not None: + with open(f"{debug}.py", "w") as file: + file.write(generate_qua_script(prog, self._config)) + + qm = qmm.open_qm(self._config) + job = qm.execute(prog) + + gen_sequence_callback = ( + kwargs["gen_sequence_callback"] + if "gen_sequence_callback" in kwargs + else None + ) + self._insert_all_input_stream( + job, circuit_depths, num_circuits_per_depth, gen_sequence_callback + ) + + full_progress = len(circuit_depths) * num_circuits_per_depth + pbar(job.result_handles, full_progress, "progress") + job.result_handles.wait_for_all_values() + + return QuaTwoQubitRbData( + circuit_depths=circuit_depths, + num_repeats=num_circuits_per_depth, + num_averages=num_shots_per_circuit, + data={0: job.result_handles.get("state").fetch_all()}, + ) + + def print_command_mapping(self): + """ + Prints the mapping of Command ID index, which is understood by the + input stream, into single-qubit and two-qubit gates. + """ + self._command_registry.print_commands() + + def print_sequences(self): + """ + Prints a break-down of all gates/commands which were played in + each random sequence. + """ + self._sequence_tracker.print_sequences() + + def save_command_mapping_to_file(self, path: Union[str, Path]): + """ + Saves a text file containing the mapping of Command ID index, which + is understood by the input stream, into single-qubit and two-qubit gates. + """ + self._command_registry.save_to_file(path) + + def save_sequences_to_file(self, path: Union[str, Path]): + """ + Save a text file of the break-down of all gates/commands which + were played in each random sequence + """ + self._sequence_tracker.save_to_file(path) + + def verify_sequences(self): + """ + Simulates the application of all random sequences on the |00> state + to ensure that they recover the qubit to |00> correctly. + + Note: You should only call this function *after* self.run(). + """ + self._sequence_tracker.verify_sequences() diff --git a/src/qibocal/protocols/qua/two_qubit_rb/__init__.py b/src/qibocal/protocols/qua/two_qubit_rb/__init__.py new file mode 100644 index 000000000..44b1f4354 --- /dev/null +++ b/src/qibocal/protocols/qua/two_qubit_rb/__init__.py @@ -0,0 +1,5 @@ +from .gates import gate_db +from .RBBaker import RBBaker +from .RBResult import QuaTwoQubitRbData +from .simple_tableau import SimpleTableau +from .TwoQubitRB import TwoQubitRb diff --git a/src/qibocal/protocols/qua/two_qubit_rb/gates.py b/src/qibocal/protocols/qua/two_qubit_rb/gates.py new file mode 100644 index 000000000..0c131a28a --- /dev/null +++ b/src/qibocal/protocols/qua/two_qubit_rb/gates.py @@ -0,0 +1,444 @@ +import dataclasses +import os +import pathlib +import pickle +import random +from typing import List, Set + +import cirq +import numpy as np + +from .simple_tableau import SimpleTableau + +q1, q2 = cirq.LineQubit.range(1, 3) + +C1_reduced = [ + cirq.PhasedXZGate(axis_phase_exponent=0, x_exponent=0, z_exponent=0), + cirq.PhasedXZGate(axis_phase_exponent=0, x_exponent=-0.5, z_exponent=0), + cirq.PhasedXZGate(axis_phase_exponent=0.5, x_exponent=-0.5, z_exponent=1), + cirq.PhasedXZGate(axis_phase_exponent=0.5, x_exponent=-0.5, z_exponent=-0.5), + cirq.PhasedXZGate(axis_phase_exponent=0, x_exponent=0.5, z_exponent=0.5), + cirq.PhasedXZGate(axis_phase_exponent=0, x_exponent=0, z_exponent=0.5), +] + +S1 = [ + cirq.PhasedXZGate(axis_phase_exponent=0, x_exponent=0, z_exponent=0), + cirq.PhasedXZGate(axis_phase_exponent=0, x_exponent=0.5, z_exponent=0.5), + cirq.PhasedXZGate(axis_phase_exponent=0.5, x_exponent=-0.5, z_exponent=-0.5), +] + +pauli = [ + cirq.PhasedXZGate(axis_phase_exponent=0, x_exponent=0, z_exponent=0), # I + cirq.PhasedXZGate(axis_phase_exponent=0, x_exponent=1.0, z_exponent=0), # X + cirq.PhasedXZGate(axis_phase_exponent=0.5, x_exponent=1.0, z_exponent=0), # Y + cirq.PhasedXZGate(axis_phase_exponent=0, x_exponent=0, z_exponent=1.0), # Z +] + +pauli_phase = [ + [0, 0], # I + [0, 1], # X + [1, 1], # Y + [1, 0], # Z +] + +native_2_qubit_gates = { + "sqr_iSWAP": { + "CNOT": [ + cirq.PhasedXZGate(axis_phase_exponent=0.5, x_exponent=0.5, z_exponent=-1)( + q1 + ), + cirq.PhasedXZGate(axis_phase_exponent=0, x_exponent=0, z_exponent=1)(q2), + cirq.ISWAP(q1, q2) ** 0.5, + cirq.PhasedXZGate(axis_phase_exponent=0, x_exponent=1, z_exponent=0)(q1), + cirq.PhasedXZGate(axis_phase_exponent=0, x_exponent=0, z_exponent=0)(q2), + cirq.ISWAP(q1, q2) ** 0.5, + cirq.PhasedXZGate(axis_phase_exponent=0.5, x_exponent=0.5, z_exponent=0.5)( + q1 + ), + cirq.PhasedXZGate(axis_phase_exponent=-1, x_exponent=0.5, z_exponent=1)(q2), + ], # compilation of CNOT in terms of phased XZ and sqiSWAP + "iSWAP": [ + cirq.ISWAP(q1, q2) ** 0.5, + cirq.ISWAP(q1, q2) ** 0.5, + ], # compilation of iSWAP in terms of phased XZ and sqiSWAP + "SWAP": [ + cirq.PhasedXZGate(axis_phase_exponent=0, x_exponent=0.5, z_exponent=0.5)( + q1 + ), + cirq.PhasedXZGate(axis_phase_exponent=0.5, x_exponent=0.5, z_exponent=0)( + q2 + ), + cirq.ISWAP(q1, q2) ** 0.5, + cirq.PhasedXZGate(axis_phase_exponent=-1, x_exponent=0.5, z_exponent=1)(q1), + cirq.PhasedXZGate(axis_phase_exponent=-1, x_exponent=0.5, z_exponent=1)(q2), + cirq.ISWAP(q1, q2) ** 0.5, + cirq.PhasedXZGate(axis_phase_exponent=0.5, x_exponent=0.5, z_exponent=0)( + q1 + ), + cirq.PhasedXZGate(axis_phase_exponent=0.5, x_exponent=0.5, z_exponent=0)( + q2 + ), + cirq.ISWAP(q1, q2) ** 0.5, + cirq.PhasedXZGate(axis_phase_exponent=0, x_exponent=0, z_exponent=-0.5)(q1), + cirq.PhasedXZGate(axis_phase_exponent=0, x_exponent=0, z_exponent=1)(q2), + ], # compilation of SWAP in terms of phased XZ and sqiSWAP + }, + # TODO: add more implementations + "CNOT": { + "CNOT": [cirq.CNOT(q1, q2)], + "iSWAP": [ + cirq.PhasedXZGate( + axis_phase_exponent=-1.0, x_exponent=0.5, z_exponent=-0.5 + )( + q1 + ), # S + H -> PhasedXZ + cirq.PhasedXZGate(axis_phase_exponent=0.0, x_exponent=0.0, z_exponent=0.5)( + q2 + ), # S + cirq.CNOT(q1, q2), + cirq.CNOT(q2, q1), + cirq.PhasedXZGate(axis_phase_exponent=0.0, x_exponent=0.0, z_exponent=0.0)( + q1 + ), # I + cirq.PhasedXZGate( + axis_phase_exponent=-0.5, x_exponent=0.5, z_exponent=-1.0 + )( + q2 + ), # H -> PhasedXZ + ], + "SWAP": [cirq.CNOT(q2, q1), cirq.CNOT(q1, q2), cirq.CNOT(q2, q1)], + }, + "CZ": { + "CNOT": [ + cirq.PhasedXZGate( + axis_phase_exponent=-0.5, x_exponent=0.5, z_exponent=-1.0 + )(q2), + cirq.PhasedXZGate(axis_phase_exponent=0.0, x_exponent=0.0, z_exponent=0.0)( + q1 + ), + cirq.CZ(q1, q2), + cirq.PhasedXZGate( + axis_phase_exponent=-0.5, x_exponent=0.5, z_exponent=-1.0 + )(q2), + cirq.PhasedXZGate(axis_phase_exponent=0.0, x_exponent=0.0, z_exponent=0.0)( + q1 + ), + ], + "iSWAP": [ + cirq.PhasedXZGate( + axis_phase_exponent=-1.0, x_exponent=0.5, z_exponent=-0.5 + )(q1), + cirq.PhasedXZGate( + axis_phase_exponent=-1.0, x_exponent=0.5, z_exponent=-0.5 + )(q2), + cirq.CZ(q1, q2), + cirq.PhasedXZGate( + axis_phase_exponent=-0.5, x_exponent=0.5, z_exponent=-1.0 + )(q1), + cirq.PhasedXZGate( + axis_phase_exponent=-0.5, x_exponent=0.5, z_exponent=-1.0 + )(q2), + cirq.CZ(q1, q2), + cirq.PhasedXZGate( + axis_phase_exponent=-0.5, x_exponent=0.5, z_exponent=-1.0 + )(q1), + cirq.PhasedXZGate( + axis_phase_exponent=-0.5, x_exponent=0.5, z_exponent=-1.0 + )(q2), + ], + "SWAP": [ + cirq.PhasedXZGate( + axis_phase_exponent=-0.5, x_exponent=0.5, z_exponent=-1.0 + )(q2), + cirq.PhasedXZGate(axis_phase_exponent=0.0, x_exponent=0.0, z_exponent=0.0)( + q1 + ), + cirq.CZ(q1, q2), + cirq.PhasedXZGate( + axis_phase_exponent=-0.5, x_exponent=0.5, z_exponent=-1.0 + )(q1), + cirq.PhasedXZGate( + axis_phase_exponent=-0.5, x_exponent=0.5, z_exponent=-1.0 + )(q2), + cirq.CZ(q1, q2), + cirq.PhasedXZGate( + axis_phase_exponent=-0.5, x_exponent=0.5, z_exponent=-1.0 + )(q1), + cirq.PhasedXZGate( + axis_phase_exponent=-0.5, x_exponent=0.5, z_exponent=-1.0 + )(q2), + cirq.CZ(q1, q2), + cirq.PhasedXZGate( + axis_phase_exponent=-0.5, x_exponent=0.5, z_exponent=-1.0 + )(q2), + cirq.PhasedXZGate(axis_phase_exponent=0.0, x_exponent=0.0, z_exponent=0.0)( + q1 + ), + ], + }, +} + + +##### Conversion from unitary / cirq to tableau + +I = np.matrix([[1, 0], [0, 1]]) # identity +X = np.matrix([[0, 1], [1, 0]]) # pi x +Y = np.matrix([[0, -1j], [1j, 0]]) # pi y +Z = np.matrix([[1, 0], [0, -1]]) # pi z + +paulis = [I, X, Y, Z] +twoQBPaulis = [np.kron(i, j) for i in paulis for j in paulis] + +symplecticTable = [ + [0, 0, 0, 0], # II + [0, 0, 1, 0], # IX + [0, 0, 1, 1], # IY + [0, 0, 0, 1], # IZ + [1, 0, 0, 0], # XI + [1, 0, 1, 0], # XX + [1, 0, 1, 1], # XY + [1, 0, 0, 1], # XZ + [1, 1, 0, 0], # YI + [1, 1, 1, 0], # YX + [1, 1, 1, 1], # YY + [1, 1, 0, 1], # YZ + [0, 1, 0, 0], # ZI + [0, 1, 1, 0], # ZX + [0, 1, 1, 1], # ZY + [0, 1, 0, 1], +] # ZZ + + +def get_pauli_prod(m): + # input: tensor product of two Pauli matrices + # output: tableau column of m + + for i, p in enumerate(twoQBPaulis): + prod = m @ p + if np.trace(prod) > 3.9: + return symplecticTable[i], 0 + if np.trace(prod) < -3.9: + return symplecticTable[i], 1 + + +def tableau_from_unitary(m): + # Turns a two-qubit unitary into the full tableau representation + # input: two-qubit unitary m + # outputs: simple tableau + s = np.zeros([4, 4]) + p = np.zeros(4) + + s[:, 0], p[0] = get_pauli_prod(m @ np.kron(X, I) @ m.H) + s[:, 1], p[1] = get_pauli_prod(m @ np.kron(Z, I) @ m.H) + s[:, 2], p[2] = get_pauli_prod(m @ np.kron(I, X) @ m.H) + s[:, 3], p[3] = get_pauli_prod(m @ np.kron(I, Z) @ m.H) + + return SimpleTableau(s, p) + + +def tableau_from_cirq(gates: List[cirq.GateOperation]) -> SimpleTableau: + return tableau_from_unitary(np.matrix(cirq.Circuit(gates).unitary())) + + +######################################################### + + +def combine_to_phased_x_z( + first_gate: cirq.GateOperation, second_gate: cirq.GateOperation +) -> cirq.GateOperation: + unitary = cirq.Circuit([first_gate, second_gate]).unitary() + if unitary.shape != (2, 2): + raise RuntimeError("Cannot combine multi qubit gate to PhasedXZ") + return cirq.PhasedXZGate.from_matrix(unitary)(first_gate.qubits[0]) + + +@dataclasses.dataclass +class GateCommand: + type: str + q1: tuple + q2: tuple + + def get_qubit_ops(self, q): + if q == 0: + return self.q1 + elif q == 1: + return self.q2 + else: + raise RuntimeError("q should be 0 or 1") + + +class _GateDatabase: + def __init__(self): + self._commands, self._tableaus, self._symplectic_range, self._pauli_range = ( + self._gen_commands_and_tableaus() + ) + + @staticmethod + def _gen_commands_and_tableaus(): + compilation_path = ( + pathlib.Path(os.path.dirname(os.path.abspath(__file__))) + / "symplectic_compilation_XZ.pkl" + ) + with open(compilation_path, "rb") as f: + compilation = pickle.load(f) + symplectics = compilation["symplectics"] + phases = compilation["phases"] + commands = compilation["commands"] + + rb_commands = [] + for command in commands: + if command[0] == "C1's": + rb_commands.append(GateCommand("C1", (command[1],), (command[2],))) + elif command[0] == "CNOT's": + rb_commands.append( + GateCommand( + "CNOT", (command[1], command[3]), (command[2], command[4]) + ) + ) + elif command[0] == "iSWAP's": + rb_commands.append( + GateCommand( + "iSWAP", (command[1], command[3]), (command[2], command[4]) + ) + ) + elif command[0] == "SWAP's": + rb_commands.append(GateCommand("SWAP", (command[1],), (command[2],))) + + # Generate Paulis: + for i1 in range(len(pauli)): + for i2 in range(len(pauli)): + rb_commands.append(GateCommand("PAULI", (i1,), (i2,))) + + tableaus = [] + for i in range(len(symplectics)): + tableaus.append(SimpleTableau(symplectics[i], phases[i])) + + # Generate Paulis: + for i1 in range(len(pauli)): + for i2 in range(len(pauli)): + tableaus.append( + SimpleTableau(np.eye(4), pauli_phase[i1] + pauli_phase[i2]) + ) + + symplectic_range = (0, len(commands)) + pauli_range = (len(commands), len(rb_commands)) + return rb_commands, tableaus, symplectic_range, pauli_range + + @property + def commands(self): + return self._commands + + @property + def tableaus(self): + return self._tableaus + + def get_command(self, gate_id) -> GateCommand: + return self._commands[gate_id] + + def get_tableau(self, gate_id) -> SimpleTableau: + return self._tableaus[gate_id] + + def rand_symplectic(self): + return random.randrange(*self._symplectic_range) + + def rand_pauli(self): + return random.randrange(*self._pauli_range) + + def get_interleaving_gate(self): + return self._pauli_range[1] + + def find_symplectic_gate_id_by_tableau_g(self, tableau: SimpleTableau): + tableaus = self._tableaus[self._symplectic_range[0] : self._symplectic_range[1]] + return next(i for i, x in enumerate(tableaus) if np.array_equal(x.g, tableau.g)) + + def find_pauli_gate_id_by_tableau_alpha(self, tableau: SimpleTableau): + tableaus = self._tableaus[self._pauli_range[0] : self._pauli_range[1]] + return self._pauli_range[0] + next( + i for i, x in enumerate(tableaus) if np.array_equal(x.alpha, tableau.alpha) + ) + + +gate_db = _GateDatabase() + + +class GateGenerator: + two_qubit_imp_priority = { # TODO: verify this priority table + "CNOT": ["CNOT", "CZ", "iSWAP", "sqr_iSWAP"], + "iSWAP": ["iSWAP", "sqr_iSWAP", "CNOT", "CZ"], + "SWAP": ["CNOT", "CZ", "iSWAP", "sqr_iSWAP"], + } + + def __init__(self, native_two_qubit_gates: Set[str]): + self._two_qubit_dict = self._generate_two_qubit_dict(native_two_qubit_gates) + + @staticmethod + def _generate_two_qubit_dict(native_two_qubit_gates: Set[str]) -> dict: + two_qubit_dict = {} + for k, v in GateGenerator.two_qubit_imp_priority.items(): + available_imp = [x for x in v if x in native_two_qubit_gates] + if ( + len(available_imp) == 0 + or available_imp[0] not in native_2_qubit_gates.keys() + ): + raise RuntimeError( + f"Cannot implement gate '{k}' with provided native two qubit gates" + ) + two_qubit_dict[k] = available_imp[0] + return two_qubit_dict + + @staticmethod + def _reduce_gate(gate: List[cirq.GateOperation]): + qubit_ops = {1: None, 2: None} + output = [] + + def append_qubit_ops(): + for q in [1, 2]: + if qubit_ops[q] is not None: + output.append(qubit_ops[q]) + qubit_ops[q] = None + + for op in gate: + if len(op.qubits) == 1: + prev_op = qubit_ops[op.qubits[0].x] + qubit_ops[op.qubits[0].x] = ( + op if prev_op is None else combine_to_phased_x_z(prev_op, op) + ) + else: + append_qubit_ops() + output.append(op) + append_qubit_ops() + return output + + def generate(self, cmd_id): + gate = [] + command = gate_db.get_command(cmd_id) + two_qubit_imp = ( + self._two_qubit_dict[command.type] + if command.type in self._two_qubit_dict + else None + ) + if command.type == "C1": + gate.append(C1_reduced[command.q1[0]](q1)) + gate.append(C1_reduced[command.q2[0]](q2)) + elif command.type == "CNOT": + gate.append(C1_reduced[command.q1[0]](q1)) + gate.append(C1_reduced[command.q2[0]](q2)) + gate.extend(native_2_qubit_gates[two_qubit_imp]["CNOT"]) + gate.append(S1[command.q1[1]](q1)) + gate.append(S1[command.q2[1]](q2)) + elif command.type == "iSWAP": + gate.append(C1_reduced[command.q1[0]](q1)) + gate.append(C1_reduced[command.q2[0]](q2)) + gate.extend(native_2_qubit_gates[two_qubit_imp]["iSWAP"]) + gate.append(S1[command.q1[1]](q1)) + gate.append(S1[command.q2[1]](q2)) + elif command.type == "SWAP": + gate.append(C1_reduced[command.q1[0]](q1)) + gate.append(C1_reduced[command.q2[0]](q2)) + gate.extend(native_2_qubit_gates[two_qubit_imp]["SWAP"]) + elif command.type == "PAULI": + gate.append(pauli[command.q1[0]](q1)) + gate.append(pauli[command.q2[0]](q2)) + else: + raise RuntimeError(f"unknown command {command.type}") + return self._reduce_gate(gate) diff --git a/src/qibocal/protocols/qua/two_qubit_rb/simple_tableau.py b/src/qibocal/protocols/qua/two_qubit_rb/simple_tableau.py new file mode 100644 index 000000000..79a6f30b8 --- /dev/null +++ b/src/qibocal/protocols/qua/two_qubit_rb/simple_tableau.py @@ -0,0 +1,256 @@ +from typing import Tuple, Union + +import numpy as np + + +def _beta(v, u): + lut = [[0, 0, 0, 0], [0, 0, 3, 1], [0, 1, 0, 3], [0, 3, 1, 0]] + lut_map = {(0, 0): 0, (1, 0): 1, (0, 1): 2, (1, 1): 3} + n = len(u) // 2 + beta = 0 + for i in range(n): + beta += lut[lut_map[tuple(v[2 * i : 2 * i + 2])]][ + lut_map[tuple(u[2 * i : 2 * i + 2])] + ] + return beta + + +def _compose_alpha(g1, alpha1, g2, alpha2): + n = len(alpha1) // 2 + alpha21 = [] + two_alpha21 = np.zeros(2 * n, dtype=np.uint8) + for i in range(2 * n): + b_i = _calc_b_i(g1, g2, i) + two_alpha21[i] += (2 * alpha1[i] + 2 * np.dot(g1[:, i], alpha2) + b_i) % 4 + assert two_alpha21[i] % 2 == 0 + alpha21.append(two_alpha21[i] // 2) + return np.array(alpha21).astype(np.uint8) + + +def _calc_b_i(g1, g2, i): + # add i for every Y in the column of g1 + b_i = np.dot(g1[::2, i], g1[1::2, i]) % 4 + n = g1.shape[0] // 2 + + # reduce using beta + current = np.zeros(2 * n, dtype=np.uint8) + for j in range(2 * n): + b_i = (b_i + _beta(current, g1[j, i] * g2[:, j])) % 4 + current = (current + g1[j, i] * g2[:, j]) % 2 + return b_i + + +def _calc_inverse_alpha(g1, alpha1): + n = len(alpha1) // 2 + lam = _lambda(n) + inv_g1 = lam @ g1.T @ lam % 2 + b = np.array([_calc_b_i(g1, inv_g1, i) for i in range(2 * n)]) + two_alpha2 = -(inv_g1.T @ (2 * alpha1 + b)) % 4 + assert np.all(two_alpha2 % 2 == 0) + return two_alpha2 // 2 + + +class SimpleTableau: + """ + A class for representing and acting of Clifford Tableaus using only elementary logical and + arithmetic operations. Implements the `CliffordTableau` API. + """ + + def __init__(self, g, alpha): + g = np.array(g, dtype=np.uint8) + alpha = np.array(alpha, dtype=np.uint8) + if not len(alpha) % 2 == 0: + raise ValueError( + f"alpha needs to have an even length but length is {len(alpha)}" + ) + g_shape = g.shape + if not (len(g_shape) == 2 and g_shape[0] == g_shape[1] and g_shape[0] % 2 == 0): + raise ValueError( + f"g has shape {g_shape}, which is not an even square matrix" + ) + if not len(alpha) % 2 == 0: + raise ValueError(f"alpha has len {len(alpha)}, which is not even") + self._n = len(alpha) // 2 + if not _is_symplectic(g, self._n): + raise ValueError("g is not a symplectic matrix") + self._np_repr = np.vstack((g, alpha)).astype(np.uint8) + + def __call__(self, *args, **kwargs): + raise NotImplementedError() + + @property + def g(self): + return self._np_repr[:-1, :] + + @property + def alpha(self): + return self._np_repr[-1] + + @property + def n(self): + return self._n + + def __str__(self): + # todo: work with more than single digit qubit number + st = " " * 2 + "|" + " ".join(f"x{i} z{i}" for i in range(self._n)) + "\n" + st += "-" * 2 + "+" + "-" * 6 * self._n + "\n" + for i in range(2 * self._n): + st += f"z{i // 2}|" if i % 2 else f"x{i // 2}|" + st += " ".join(str(entry) for entry in self._np_repr[i]) + "\n" + st += "s |" + " ".join(str(entry) for entry in self._np_repr[i + 1]) + "\n" + return st + + def __repr__(self): + return str(self) + + def __eq__(self, other): + return np.array_equal(self._np_repr, other._np_repr) + + def __hash__(self): + return hash(self._np_repr.tobytes()) + + def then(self, other: "SimpleTableau") -> "SimpleTableau": + if self.n != other.n: + raise ValueError( + f"number of qubits of self={self.n} and of other={other.n} is incompatible" + ) + g12 = other.g @ self.g % 2 + alpha12 = _compose_alpha(self.g, self.alpha, other.g, other.alpha) + return SimpleTableau(g12, alpha12) + + def inverse(self) -> "SimpleTableau": + lam = _lambda(self.n) + return SimpleTableau( + (lam @ self.g.T @ lam) % 2, _calc_inverse_alpha(self.g, self.alpha) + ) + + def is_identity(self): + return np.array_equal(self.g, np.eye(2 * self.n)) and np.array_equal( + self.alpha, np.zeros(2 * self.n) + ) + + +_single_qubit_gate_conversions = { + "I": (np.identity(2), np.zeros(2)), + "H": (np.array([[0, 1], [1, 0]]), np.zeros(2)), + "X": (np.identity(2), np.array([0, 1])), + "Z": (np.identity(2), np.array([1, 0])), + "Y": (np.identity(2), np.array([1, 1])), + "S": (np.array([[1, 0], [1, 1]]), np.zeros(2)), + "SX": (np.array([[1, 1], [0, 1]]), np.array([0, 1])), + "SY": (np.array([[0, 1], [1, 0]]), np.array([1, 0])), + "-SY": (np.array([[0, 1], [1, 0]]), np.array([0, 1])), + "-SX": (np.array([[1, 1], [0, 1]]), np.array([0, 0])), +} + +_two_qubit_gate_conversions = { + "CNOT": ( + np.array([[1, 0, 1, 0], [0, 1, 0, 0], [0, 0, 1, 0], [0, 1, 0, 1]]).T, + np.zeros(4), + ), + "ISWAP": ( + np.array([[0, 1, 1, 1], [1, 1, 0, 1], [0, 0, 0, 1], [0, 0, 1, 0]]).T, + np.zeros(4), + ), # not 100% sure this is correct + "SWAP": ( + np.array([[0, 0, 1, 0], [0, 0, 0, 1], [1, 0, 0, 0], [0, 1, 0, 0]]).T, + np.zeros(4), + ), + "CZ": ( + np.array([[1, 0, 0, 1], [0, 1, 0, 0], [0, 1, 1, 0], [0, 0, 0, 1]]).T, + np.zeros(4), + ), +} + +known_names = set(_single_qubit_gate_conversions.keys()).union( + set(_two_qubit_gate_conversions.keys()) +) + + +def generate_from_name( + name: str, target: Union[int, Tuple[int, int]], n=None +) -> SimpleTableau: + """ + Generate a `SimpleTableau` object from a name. + Args: + name: gate name, must be in `known_names` + target: For a single qubit gate, the qubit on which the gate operates, from 0 to n-1. For two qubit gates, + a tuple of the form (control, target) on which the gate operates + n: number of total qubits on which gate operates. If omitted, will be determined by the highest target index. + + Returns: the `SimpleTableau` object + + """ + if n is None: + n = np.max(target) + 1 + g = np.identity(2 * n) + alpha = np.zeros(2 * n) + if name in _single_qubit_gate_conversions: + if not isinstance(target, int) or target >= n: + raise ValueError( + f"invalid target {target} for single qubit gate {name} on {n} qubits" + ) + _embed_single_qubit_gate(alpha, g, name, target) + return SimpleTableau(g, alpha) + + elif name in _two_qubit_gate_conversions: + if not isinstance(target, tuple) or max(target) >= n: + raise ValueError( + f"invalid target {target} for two qubit gate {name} on {n} qubits" + ) + _embed_two_qubit_gate(alpha, g, name, target) + return SimpleTableau(g, alpha) + else: + raise ValueError(f"unknown gate {name}") + + +def _embed_two_qubit_gate(alpha, g, name, target): + g2q, alpha2q = _two_qubit_gate_conversions[name] + g[2 * target[0] : 2 * target[0] + 2, 2 * target[0] : 2 * target[0] + 2] = g2q[ + :2, :2 + ] + g[2 * target[0] : 2 * target[0] + 2, 2 * target[1] : 2 * target[1] + 2] = g2q[ + :2, 2:4 + ] + g[2 * target[1] : 2 * target[1] + 2, 2 * target[0] : 2 * target[0] + 2] = g2q[ + 2:4, :2 + ] + g[2 * target[1] : 2 * target[1] + 2, 2 * target[1] : 2 * target[1] + 2] = g2q[ + 2:4, 2:4 + ] + alpha[2 * target[0] : 2 * target[0] + 2] = alpha2q[:2] + alpha[2 * target[1] : 2 * target[1] + 2] = alpha2q[2:4] + + +def _embed_single_qubit_gate(alpha, g, name, target): + g[2 * target : 2 * target + 2, 2 * target : 2 * target + 2] = ( + _single_qubit_gate_conversions[name][0] + ) + alpha[2 * target : 2 * target + 2] = _single_qubit_gate_conversions[name][1] + + +def _lambda(n): + return np.diag([1] + [0, 1] * (n - 1), 1) + np.diag([1] + [0, 1] * (n - 1), -1) + + +def _is_symplectic(mat, n): + lhs = mat @ _lambda(n) @ mat.T % 2 + return np.all(lhs == _lambda(n)) + + +_int_bit_map = {0: [0, 0], 1: [1, 0], 2: [1, 1], 3: [0, 1]} + + +def stim_to_simple(tableau) -> SimpleTableau: + n = len(tableau) + g = np.zeros((2 * n, 2 * n), dtype=np.uint8) + alpha = np.zeros(2 * n, dtype=np.uint8) + for j in range(0, 2 * n, 2): + pauli_string_x = tableau.x_output(j // 2) + pauli_string_z = tableau.z_output(j // 2) + alpha[j] = pauli_string_x.sign.real < 0 + alpha[j + 1] = pauli_string_z.sign.real < 0 + for i in range(0, 2 * n, 2): + g[i : i + 2, j] = _int_bit_map[pauli_string_x[i // 2]] + g[i : i + 2, j + 1] = _int_bit_map[pauli_string_z[i // 2]] + return SimpleTableau(g, alpha) diff --git a/src/qibocal/protocols/qua/two_qubit_rb/symplectic_compilation_XZ.pkl b/src/qibocal/protocols/qua/two_qubit_rb/symplectic_compilation_XZ.pkl new file mode 100644 index 000000000..960626046 Binary files /dev/null and b/src/qibocal/protocols/qua/two_qubit_rb/symplectic_compilation_XZ.pkl differ diff --git a/src/qibocal/protocols/qua/two_qubit_rb/util.py b/src/qibocal/protocols/qua/two_qubit_rb/util.py new file mode 100644 index 000000000..5d0b1f848 --- /dev/null +++ b/src/qibocal/protocols/qua/two_qubit_rb/util.py @@ -0,0 +1,38 @@ +import threading +from datetime import datetime +from time import sleep + +from tqdm import tqdm + + +def run_in_thread(fn): + def run(*k, **kw): + t = threading.Thread(target=fn, args=k, kwargs=kw) + t.start() + + return run + + +def pbar(res_handles, n_avg, n_label, timeout=10, return_times=False): + n_now = 0 + n = None + m = 0 + while n is None: + if m * 0.1 > timeout: + print("reached timeout") + break + sleep(0.1) + n = res_handles.get(n_label).fetch_all() + + times_vec = [] + with tqdm(total=n_avg, desc=n_label) as pbar_obj: + while n < n_avg: + n = res_handles.get(n_label).fetch_all() + 1 + sleep(0.1) + if n is not None and n > n_now: + pbar_obj.update(n - n_now) + n_now = n + if return_times: + times_vec.append(datetime.now()) + if return_times: + return times_vec diff --git a/src/qibocal/protocols/qua/two_qubit_rb/verification/__init__.py b/src/qibocal/protocols/qua/two_qubit_rb/verification/__init__.py new file mode 100644 index 000000000..9ed072754 --- /dev/null +++ b/src/qibocal/protocols/qua/two_qubit_rb/verification/__init__.py @@ -0,0 +1,3 @@ +from . import gates +from .command_registry import CommandRegistry +from .sequence_tracker import SequenceTracker diff --git a/src/qibocal/protocols/qua/two_qubit_rb/verification/command_registry.py b/src/qibocal/protocols/qua/two_qubit_rb/verification/command_registry.py new file mode 100644 index 000000000..67b4771c2 --- /dev/null +++ b/src/qibocal/protocols/qua/two_qubit_rb/verification/command_registry.py @@ -0,0 +1,130 @@ +from pathlib import Path +from typing import Callable, Literal, Union + +from qualang_tools.bakery.bakery import Baking + +from .gates import CNOT, CZ, Gate, PhasedXZ + +Command = list[Gate] + + +class CommandRegistry: + """ + Dataclass to track which single- or two-qubit gates are being baked in order + to construct a two-qubit randomized benchmarking command. + """ + + def __init__(self): + self._current_command_id = 0 + self._commands: dict[int, Command] = {} + self._is_finished = False + + def register_phase_xz(self, q, x, z, a): + gate = PhasedXZ(q=q, z=z, x=x, a=a) + self._register_gate(gate) + + def register_cz(self): + gate = CZ() + self._register_gate(gate) + + def register_cnot(self, q): + gate = CNOT(q=q) + self._register_gate(gate) + + def _register_gate(self, gate: Gate): + if self.is_finished(): + return + if self._current_command_id in self._commands: + command_list = self._commands[self._current_command_id] + else: + command_list = [] + self._commands[self._current_command_id] = command_list + command_list.append(gate) + + def get_command_by_id(self, command_id: int): + return self._commands[command_id] + + def set_current_command_id(self, command_id: int): + self._current_command_id = command_id + + def _serialize_commands(self): + result = "" + for i in self._commands.keys(): + result += f"Command {i}:\n" + for j, gate in enumerate(self._commands[i]): + result += f"\t{j}: {gate}\n" + result += "\n" + return result + + def print_commands(self): + print(self._serialize_commands()) + + def save_to_file(self, path: Union[str, Path]): + with open(path, "w+") as f: + f.write(self._serialize_commands()) + + def finish(self): + """disable the incidental recording of any more commands.""" + self._is_finished = True + + def is_finished(self): + return self._is_finished + + +PhasedXZGeneratorFunc = Callable[[Baking, int, float, float, float], None] +SingleQubitGateGeneratorFunc = Union[PhasedXZGeneratorFunc] + + +def decorate_single_qubit_generator_with_command_recording( + single_qubit_gate_generator: PhasedXZGeneratorFunc, + command_registry: CommandRegistry, +) -> PhasedXZGeneratorFunc: + """ + Decorates the `single_qubit_gate_generator` function so that it records + every function call an input parameters it receives and registers them + with the provided `command_registry`. + """ + + def decorated_generator(baker: Baking, q: int, x: float, z: float, a: float): + command_registry.register_phase_xz(q=q, x=x, z=z, a=a) + return single_qubit_gate_generator(baker, q, x, z, a) + + return decorated_generator + + +CZGeneratorFunc = Callable[[Baking, int, int], None] +CNOTGeneratorFunc = Callable[[Baking, int, int], None] +TwoQubitGateGeneratorFunc = Union[CZGeneratorFunc, CNOTGeneratorFunc] +TwoQubitGateGenerator = dict[Literal["CZ", "CNOT"], TwoQubitGateGeneratorFunc] + + +def decorate_two_qubit_gate_generator_with_command_recording( + two_qubit_gate_generator: TwoQubitGateGenerator, command_registry: CommandRegistry +) -> TwoQubitGateGenerator: + """ + Decorates the `single_qubit_gate_generator` function so that it + records every call to itself and registers the corresponding input parameters. + """ + decorated_two_qubit_gate_generator = {} + for gate_name, gate_generator in two_qubit_gate_generator.items(): + if gate_name == "CZ": + + def decorated_generator(*args): + command_registry.register_cz() + return gate_generator(*args) + + elif gate_name == "CNOT": + + def decorated_generator(*args): + control_qubit_index = args[1] + command_registry.register_cnot(q=control_qubit_index) + return gate_generator(*args) + + else: + raise NotImplementedError( + f"Command recording not implemented for {gate_name}. Please contact customer success." + ) + + decorated_two_qubit_gate_generator[gate_name] = decorated_generator + + return decorated_two_qubit_gate_generator diff --git a/src/qibocal/protocols/qua/two_qubit_rb/verification/gates.py b/src/qibocal/protocols/qua/two_qubit_rb/verification/gates.py new file mode 100644 index 000000000..809d4e1ac --- /dev/null +++ b/src/qibocal/protocols/qua/two_qubit_rb/verification/gates.py @@ -0,0 +1,63 @@ +from dataclasses import dataclass + +import cirq +import numpy as np + + +@dataclass +class Gate: + def matrix(self) -> np.ndarray: + raise NotImplementedError() + + +@dataclass +class PhasedXZ(Gate): + q: int # control qubit index + x: float # amplitude scaling float + z: float + a: float + + def __str__(self): + return f"PXZ({self.q}, amp={self.x}, z={self.z}, a={self.a})" + + def matrix(self): + phased_xz = cirq.PhasedXZGate( + axis_phase_exponent=self.a, x_exponent=self.x, z_exponent=self.z + ) + + phased_xz = cirq.unitary(phased_xz) + I = cirq.unitary(cirq.I) + + if self.q == 1: + return np.kron(phased_xz, I) + elif self.q == 2: + return np.kron(I, phased_xz) + else: + raise NotImplementedError() + + +@dataclass +class CZ(Gate): + def __str__(self): + return f"CZ" + + def matrix(self): + return cirq.unitary(cirq.CZ) + + +@dataclass +class CNOT(Gate): + q: int # control qubit index + + def __str__(self): + return f"CNOT({self.q}, {2 if self.q == 1 else 1})" + + def matrix(self): + if self.q == 1: + cnot = np.array([[1, 0, 0, 0], [0, 1, 0, 0], [0, 0, 0, 1], [0, 0, 1, 0]]) + elif self.q == 2: + cnot = np.array([[1, 0, 0, 0], [0, 0, 0, 1], [0, 0, 1, 0], [0, 1, 0, 0]]) + else: + raise NotImplementedError() + + return cnot diff --git a/src/qibocal/protocols/qua/two_qubit_rb/verification/sequence_tracker.py b/src/qibocal/protocols/qua/two_qubit_rb/verification/sequence_tracker.py new file mode 100644 index 000000000..9cfb1a699 --- /dev/null +++ b/src/qibocal/protocols/qua/two_qubit_rb/verification/sequence_tracker.py @@ -0,0 +1,76 @@ +from typing import Union + +import numpy as np + +from .command_registry import * + + +class SequenceTracker: + """ + Tracks a randomly-generated sequence by recording both a list of qubit gates + corresponding to the sequence, and the raw command IDs which are used as + input to the input stream to map into baked pulses. + """ + + def __init__(self, command_registry: CommandRegistry): + self.command_registry: CommandRegistry = command_registry + self._sequences_as_gates: list[Command] = [] + self._sequences_as_command_ids: list[list[int]] = [] + + def make_sequence(self, command_ids: list[int]): + """ + Adds a new sequence to the tracker, recorded as both a list + of gates and command ids + """ + self._record_sequence_as_list_of_command_ids(command_ids) + self._record_sequence_as_list_of_gates(command_ids) + + def _record_sequence_as_list_of_command_ids(self, command_ids: list[int]): + self._sequences_as_command_ids.append([]) + for command_id in command_ids: + self._sequences_as_command_ids[-1].append(command_id) + + def _record_sequence_as_list_of_gates(self, command_ids: list[int]): + self._sequences_as_gates.append([]) + for command_id in command_ids: + command = self.command_registry.get_command_by_id(command_id) + for gate in command: + self._sequences_as_gates[-1].append(gate) + + def _serialize_sequences(self): + result = "" + for i, sequence in enumerate(self._sequences_as_gates): + result += f"Sequence {i}:\n" + result += f"\tCommand IDs: {self._sequences_as_command_ids[i]}\n" + result += f"\tGates:\n" + for j, operation in enumerate(sequence): + result += f"\t\t{j}: {operation}\n" + result += "\n" + return result + + def verify_sequences(self): + """ + Checks that the application of all gates in a sequence to the |00> + state correctly recovers to the |00> state at the end. + """ + for i, sequence in enumerate(self._sequences_as_gates): + ground_state = np.kron(np.array([1, 0]), np.array([1, 0])) + ground_state_rho = np.outer(ground_state, ground_state.conj()) + rho = ground_state_rho + for gate in sequence: + rho = gate.matrix() @ rho @ gate.matrix().conj().T + + assert np.allclose( + rho, ground_state_rho + ), f"expected to recover to at {ground_state_rho}, got {rho}" + + print( + f"Verification passed for all {len(self._sequences_as_gates)} sequence(s)." + ) + + def print_sequences(self): + print(self._serialize_sequences()) + + def save_to_file(self, path: Union[str, Path]): + with open(path, "w+") as f: + f.write(self._serialize_sequences())