Skip to content

Commit

Permalink
Merge pull request #944 from qiboteam/keysight-qcs
Browse files Browse the repository at this point in the history
  • Loading branch information
alecandido authored Feb 28, 2025
2 parents 9a2b1c6 + d0d92f4 commit fb8a9b3
Show file tree
Hide file tree
Showing 8 changed files with 538 additions and 0 deletions.
7 changes: 7 additions & 0 deletions src/qibolab/_core/instruments/keysight/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from . import components, qcs
from .components import *
from .qcs import *

__all__ = []
__all__ += qcs.__all__
__all__ += components.__all__
5 changes: 5 additions & 0 deletions src/qibolab/_core/instruments/keysight/components/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from . import configs
from .configs import *

__all__ = []
__all__ += configs.__all__
16 changes: 16 additions & 0 deletions src/qibolab/_core/instruments/keysight/components/configs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import Annotated, Literal, Optional

from pydantic import Field

from qibolab._core.components import AcquisitionConfig
from qibolab._core.serialize import NdArray

__all__ = ["QcsAcquisitionConfig"]


class QcsAcquisitionConfig(AcquisitionConfig):
"""Acquisition config for Keysight QCS."""

kind: Literal["qcs-acquisition"] = "qcs-acquisition"

state_iq_values: Annotated[Optional[NdArray], Field(repr=False)] = None
169 changes: 169 additions & 0 deletions src/qibolab/_core/instruments/keysight/pulse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
"""Utils for pulse handling."""

from collections import defaultdict
from collections.abc import Iterable
from typing import Union

from keysight import qcs

from qibolab._core.pulses import Drag, Envelope, Gaussian, PulseId, Rectangular
from qibolab._core.pulses.pulse import PulseLike

NS_TO_S = 1e-9


def generate_qcs_envelope(shape: Envelope) -> qcs.Envelope:
"""Converts a Qibolab pulse envelope to a QCS Envelope object."""
if isinstance(shape, Rectangular):
return qcs.ConstantEnvelope()

elif isinstance(shape, (Gaussian, Drag)):
return qcs.GaussianEnvelope(shape.rel_sigma)

else:
# TODO: Rework this code to support other Qibolab pulse envelopes
# raw_envelope = shape.i(num_samples) + 1j * shape.q(num_samples)
# return qcs.ArbitraryEnvelope(
# times=np.linspace(0, 1, num_samples), amplitudes=raw_envelope
# )
raise Exception("Envelope not supported")


def process_acquisition_channel_pulses(
program: qcs.Program,
pulses: Iterable[PulseLike],
frequency: Union[float, qcs.Scalar],
virtual_channel: qcs.Channels,
probe_virtual_channel: qcs.Channels,
sweeper_pulse_map: defaultdict[PulseId, dict[str, qcs.Scalar]],
classifier: qcs.Classifier = None,
):
"""Processes Qibolab pulses on the acquisition channel into QCS hardware
instructions and adds it to the current program.
Arguments:
program (qcs.Program): Program object for the current sequence.
pulses (Iterable[PulseLike]): Array of pulse objects to be processed.
frequency (Union[float, qcs.Scalar]): Frequency of the channel.
virtual_channel (qcs.Channels): QCS virtual digitizer channel.
probe_virtual_channel (qcs.Channels): QCS virtual AWG channel connected to the digitzer.
sweeper_pulse_map (defaultdict[PulseId, dict[str, qcs.Scalar]]): Map of pulse ID to map of parameter
to be swept and corresponding QCS variable.
"""

for pulse in pulses:
sweep_param_map = sweeper_pulse_map.get(pulse.id, {})

if pulse.kind == "delay":
qcs_pulse = qcs.Delay(
sweep_param_map.get("duration", pulse.duration * NS_TO_S)
)
program.add_waveform(qcs_pulse, virtual_channel)
program.add_waveform(qcs_pulse, probe_virtual_channel)

elif pulse.kind == "acquisition":
duration = sweep_param_map.get("duration", pulse.duration * NS_TO_S)
program.add_acquisition(duration, virtual_channel)

elif pulse.kind == "readout":
sweep_param_map = sweeper_pulse_map.get(pulse.probe.id, {})
qcs_pulse = qcs.RFWaveform(
duration=sweep_param_map.get(
"duration", pulse.probe.duration * NS_TO_S
),
envelope=generate_qcs_envelope(pulse.probe.envelope),
amplitude=sweep_param_map.get("amplitude", pulse.probe.amplitude),
rf_frequency=frequency,
instantaneous_phase=sweep_param_map.get(
"relative_phase", pulse.probe.relative_phase
),
)
integration_filter = qcs.IntegrationFilter(qcs_pulse)
program.add_waveform(qcs_pulse, probe_virtual_channel)
program.add_acquisition(integration_filter, virtual_channel, classifier)


def process_iq_channel_pulses(
program: qcs.Program,
pulses: Iterable[PulseLike],
frequency: Union[float, qcs.Scalar],
virtual_channel: qcs.Channels,
sweeper_pulse_map: defaultdict[PulseId, dict[str, qcs.Scalar]],
):
"""Processes Qibolab pulses on the IQ channel into QCS hardware
instructions and adds it to the current program.
Arguments:
program (qcs.Program): Program object for the current sequence.
pulses (Iterable[PulseLike]): Array of pulse objects to be processed.
frequency (Union[float, qcs.Scalar]): Frequency of the channel.
virtual_channel (qcs.Channels): QCS virtual RF AWG channel.
sweeper_pulse_map (defaultdict[PulseId, dict[str, qcs.Scalar]]): Map of pulse ID to map of parameter
to be swept and corresponding QCS variable.
"""
qcs_pulses = []
for pulse in pulses:
sweep_param_map = sweeper_pulse_map.get(pulse.id, {})

if pulse.kind == "delay":
qcs_pulse = qcs.Delay(
sweep_param_map.get("duration", pulse.duration * NS_TO_S)
)
elif pulse.kind == "virtualz":
qcs_pulse = qcs.PhaseIncrement(
phase=sweep_param_map.get("relative_phase", pulse.phase)
)
elif pulse.kind == "pulse":
qcs_pulse = qcs.RFWaveform(
duration=sweep_param_map.get("duration", pulse.duration * NS_TO_S),
envelope=generate_qcs_envelope(pulse.envelope),
amplitude=sweep_param_map.get("amplitude", pulse.amplitude),
rf_frequency=frequency,
instantaneous_phase=sweep_param_map.get(
"relative_phase", pulse.relative_phase
),
)
if pulse.envelope.kind == "drag":
qcs_pulse = qcs_pulse.drag(coeff=pulse.envelope.beta)
else:
raise ValueError("Unrecognized pulse type", pulse.kind)

qcs_pulses.append(qcs_pulse)

program.add_waveform(qcs_pulses, virtual_channel)


def process_dc_channel_pulses(
program: qcs.Program,
pulses: Iterable[PulseLike],
virtual_channel: qcs.Channels,
sweeper_pulse_map: defaultdict[PulseId, dict[str, qcs.Scalar]],
):
"""Processes Qibolab pulses on the DC channel into QCS hardware
instructions and adds it to the current program.
Arguments:
program (qcs.Program): Program object for the current sequence.
pulses (Iterable[PulseLike]): Array of pulse objects to be processed.
virtual_channel (qcs.Channels): QCS virtual baseband AWG channel.
sweeper_pulse_map (defaultdict[PulseId, dict[str, qcs.Scalar]]): Map of pulse ID to map of parameter
to be swept and corresponding QCS variable.
"""
qcs_pulses = []
for pulse in pulses:
sweep_param_map = sweeper_pulse_map.get(pulse.id, {})
if pulse.kind == "delay":
qcs_pulse = qcs.Delay(
sweep_param_map.get("duration", pulse.duration * NS_TO_S)
)
elif pulse.kind == "pulse":
qcs_pulse = qcs.DCWaveform(
duration=sweep_param_map.get("duration", pulse.duration * NS_TO_S),
envelope=generate_qcs_envelope(pulse.envelope),
amplitude=sweep_param_map.get("amplitude", pulse.amplitude),
)
else:
raise ValueError("Unrecognized pulse type", pulse.kind)
qcs_pulses.append(qcs_pulse)

program.add_waveform(qcs_pulses, virtual_channel)
172 changes: 172 additions & 0 deletions src/qibolab/_core/instruments/keysight/qcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
"""Qibolab driver for Keysight QCS instrument set."""

from collections import defaultdict
from functools import reduce
from typing import ClassVar

import numpy as np
from keysight import qcs

from qibolab._core.components import AcquisitionChannel, Config, DcChannel, IqChannel
from qibolab._core.execution_parameters import AveragingMode, ExecutionParameters
from qibolab._core.identifier import ChannelId, Result
from qibolab._core.instruments.abstract import Controller
from qibolab._core.pulses import PulseId
from qibolab._core.sequence import InputOps, PulseSequence
from qibolab._core.sweeper import ParallelSweepers

from .pulse import (
process_acquisition_channel_pulses,
process_dc_channel_pulses,
process_iq_channel_pulses,
)
from .results import fetch_result, parse_result
from .sweep import process_sweepers

NS_TO_S = 1e-9

__all__ = ["KeysightQCS"]


def sweeper_reducer(
program: qcs.Program, sweepers: tuple[list[qcs.Array], list[qcs.Scalar]]
):
"""Helper method to unpack the QCS sweep parameters when processing sweepers."""
return program.sweep(*sweepers)


class KeysightQCS(Controller):
"""Driver for interacting with QCS controller server."""

bounds: str = "qcs/bounds"

qcs_channel_map: qcs.ChannelMapper
"""Map of QCS virtual channels to QCS physical channels."""
virtual_channel_map: dict[ChannelId, qcs.Channels]
"""Map of Qibolab channel IDs to QCS virtual channels."""
sampling_rate: ClassVar[float] = (
qcs.SAMPLE_RATES[qcs.InstrumentEnum.M5300AWG] * NS_TO_S
)

def connect(self):
self.backend = qcs.HclBackend(self.qcs_channel_map, hw_demod=True)
self.backend.is_system_ready()

def create_program(
self,
sequence: PulseSequence,
configs: dict[str, Config],
sweepers: list[ParallelSweepers],
num_shots: int,
) -> tuple[qcs.Program, list[tuple[int, int]]]:
# SWEEPER MANAGEMENT
probe_channel_ids = {
chan.probe
for chan in self.channels.values()
if isinstance(chan, AcquisitionChannel)
}

(
hardware_sweepers,
software_sweepers,
sweeper_channel_map,
sweeper_pulse_map,
) = process_sweepers(sweepers, probe_channel_ids)
# Here we are telling the program to run hardware sweepers first, then software sweepers
# It is essential that we match the original sweeper order to the modified sweeper order
# to reconcile the results at the end
program = reduce(
sweeper_reducer,
software_sweepers,
reduce(sweeper_reducer, hardware_sweepers, qcs.Program()).n_shots(
num_shots
),
)

# WAVEFORM COMPILATION
# Iterate over channels and convert qubit pulses to QCS waveforms
for channel_id in sequence.channels:
channel = self.channels[channel_id]
virtual_channel = self.virtual_channel_map[channel_id]

if isinstance(channel, AcquisitionChannel):
probe_channel_id = channel.probe
classifier_reference = configs[channel_id].state_iq_values
process_acquisition_channel_pulses(
program=program,
pulses=sequence.channel(channel_id),
frequency=sweeper_channel_map.get(
probe_channel_id, configs[probe_channel_id].frequency
),
virtual_channel=virtual_channel,
probe_virtual_channel=self.virtual_channel_map[probe_channel_id],
sweeper_pulse_map=sweeper_pulse_map,
classifier=(
None
if classifier_reference is None
else qcs.MinimumDistanceClassifier(classifier_reference)
),
)

elif isinstance(channel, IqChannel):
process_iq_channel_pulses(
program=program,
pulses=sequence.channel(channel_id),
frequency=sweeper_channel_map.get(
channel_id, configs[channel_id].frequency
),
virtual_channel=virtual_channel,
sweeper_pulse_map=sweeper_pulse_map,
)

elif isinstance(channel, DcChannel):
process_dc_channel_pulses(
program=program,
pulses=sequence.channel(channel_id),
virtual_channel=virtual_channel,
sweeper_pulse_map=sweeper_pulse_map,
)

return program

def play(
self,
configs: dict[str, Config],
sequences: list[PulseSequence],
options: ExecutionParameters,
sweepers: list[ParallelSweepers],
) -> dict[int, Result]:
if options.relaxation_time is not None:
self.backend._init_time = int(options.relaxation_time)

ret: dict[PulseId, np.ndarray] = {}
for sequence in sequences:
results = self.backend.apply(
self.create_program(
sequence.align_to_delays(), configs, sweepers, options.nshots
)
).results
acquisition_map: defaultdict[qcs.Channels, list[InputOps]] = defaultdict(
list
)

for channel_id, input_op in sequence.acquisitions:
channel = self.virtual_channel_map[channel_id]
acquisition_map[channel].append(input_op)

averaging = options.averaging_mode is not AveragingMode.SINGLESHOT
for channel, input_ops in acquisition_map.items():
raw = fetch_result(
results=results,
channel=channel,
acquisition_type=options.acquisition_type,
averaging=averaging,
)

for result, input_op in zip(raw.values(), input_ops):
ret[input_op.id] = parse_result(result, options)

return ret

def disconnect(self):
pass
Loading

0 comments on commit fb8a9b3

Please sign in to comment.