Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Two qubit alignment #992

Merged
merged 10 commits into from
Aug 22, 2024
107 changes: 68 additions & 39 deletions src/qibolab/compilers/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
rz_rule,
z_rule,
)
from qibolab.identifier import ChannelId
from qibolab.platform import Platform
from qibolab.pulses import Delay
from qibolab.qubits import QubitId
Expand All @@ -26,8 +27,9 @@

@dataclass
class Compiler:
"""Compiler that transforms a :class:`qibo.models.Circuit` to a
:class:`qibolab.pulses.PulseSequence`.
"""Compile native circuits into pulse sequences.

It transforms a :class:`qibo.models.Circuit` to a :class:`qibolab.pulses.PulseSequence`.

The transformation is done using a dictionary of rules which map each Qibo gate to a
pulse sequence and some virtual Z-phases.
Expand Down Expand Up @@ -65,7 +67,7 @@ def default(cls):
)

def register(self, gate_cls: type[gates.Gate]) -> Callable[[Rule], Rule]:
"""Decorator for registering a function as a rule in the compiler.
"""Register a function as a rule in the compiler.

Using this decorator is optional. Alternatively the user can set the rules directly
via ``__setitem__``.
Expand All @@ -81,8 +83,9 @@ def inner(func: Rule) -> Rule:
return inner

def get_sequence(self, gate: gates.Gate, platform: Platform) -> PulseSequence:
"""Get pulse sequence implementing the given gate using the registered
rules.
"""Get pulse sequence implementing the given gate.

The sequence is obtained using the registered rules.

Args:
gate (:class:`qibo.gates.Gate`): Qibo gate to convert to pulses.
Expand Down Expand Up @@ -118,11 +121,66 @@ def get_sequence(self, gate: gates.Gate, platform: Platform) -> PulseSequence:

raise NotImplementedError(f"{type(gate)} is not a native gate.")

# FIXME: pulse.qubit and pulse.channel do not exist anymore
def _compile_gate(
self,
gate: gates.Gate,
platform: Platform,
channel_clock: defaultdict[ChannelId, float],
) -> PulseSequence:
def qubit_clock(el: QubitId):
return max(channel_clock[ch.name] for ch in platform.qubits[el].channels)

def coupler_clock(el: QubitId):
return max(channel_clock[ch.name] for ch in platform.couplers[el].channels)

gate_seq = self.get_sequence(gate, platform)
# qubits receiving pulses
qubits = {
q
for q in [platform.qubit_channels.get(ch) for ch in gate_seq.channels]
if q is not None
}
# couplers receiving pulses
couplers = {
c
for c in [platform.coupler_channels.get(ch) for ch in gate_seq.channels]
if c is not None
}

# add delays to pad all involved channels to begin at the same time
start = max(
[qubit_clock(q) for q in qubits] + [coupler_clock(c) for c in couplers],
default=0.0,
)
initial = PulseSequence()
for ch in gate_seq.channels:
delay = start - channel_clock[ch]
if delay > 0:
initial.append((ch, Delay(duration=delay)))
channel_clock[ch] = start + gate_seq.channel_duration(ch)

# pad all qubits to have at least one channel busy for the duration of the gate
# (drive arbitrarily chosen, as always present)
end = start + gate_seq.duration
final = PulseSequence()
for q in gate.qubits:
qubit = platform.get_qubit(q)
# all actual qubits have a non-null drive channel, and couplers are not
# explicitedly listed in gates
assert qubit.drive is not None
delay = end - channel_clock[qubit.drive.name]
if delay > 0:
final.append((qubit.drive.name, Delay(duration=delay)))
channel_clock[qubit.drive.name] += delay
# couplers do not require individual padding, because they do are only
# involved in gates where both of the other qubits are involved

return initial + gate_seq + final

def compile(
self, circuit: Circuit, platform: Platform
) -> tuple[PulseSequence, dict[gates.M, PulseSequence]]:
"""Transforms a circuit to pulse sequence.
"""Transform a circuit to pulse sequence.

Args:
circuit (qibo.models.Circuit): Qibo circuit that respects the platform's
Expand All @@ -134,48 +192,19 @@ def compile(
sequence (qibolab.pulses.PulseSequence): Pulse sequence that implements the circuit.
measurement_map (dict): Map from each measurement gate to the sequence of readout pulse implementing it.
"""
ch_to_qb = platform.channels_map

sequence = PulseSequence()
# FIXME: This will not work with qubits that have string names
# TODO: Implement a mapping between circuit qubit ids and platform ``Qubit``s

measurement_map = {}
channel_clock = defaultdict(float)

def qubit_clock(el: QubitId):
elements = platform.qubits if el in platform.qubits else platform.couplers
return max(channel_clock[ch.name] for ch in elements[el].channels)

# process circuit gates
for moment in circuit.queue.moments:
for gate in set(filter(lambda x: x is not None, moment)):
delay_sequence = PulseSequence()
gate_sequence = self.get_sequence(gate, platform)
increment = defaultdict(int)
start = max(
(
qubit_clock(el)
for el in {ch_to_qb[ch] for ch in gate_sequence.channels}
),
default=0.0,
)
for ch in gate_sequence.channels:
delay = start - channel_clock[ch]
if delay > 0:
delay_sequence.append((ch, Delay(duration=delay)))
channel_clock[ch] += delay
increment[ch] = gate_sequence.channel_duration(ch)
# add the increment only after computing them, since multiple channels
# are related to each other because belonging to the same qubit
for ch, inc in increment.items():
channel_clock[ch] += inc
sequence.concatenate(delay_sequence)
sequence.concatenate(gate_sequence)
for gate in {x for x in moment if x is not None}:
sequence += self._compile_gate(gate, platform, channel_clock)

# register readout sequences to ``measurement_map`` so that we can
# properly map acquisition results to measurement gates
if isinstance(gate, gates.M):
measurement_map[gate] = gate_sequence
measurement_map[gate] = self.get_sequence(gate, platform)

return sequence.trim(), measurement_map
18 changes: 12 additions & 6 deletions src/qibolab/platform/platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from qibolab.components import Config
from qibolab.execution_parameters import ExecutionParameters
from qibolab.identifier import ChannelId
from qibolab.instruments.abstract import Controller, Instrument, InstrumentId
from qibolab.parameters import NativeGates, Parameters, Settings, update_configs
from qibolab.pulses import Delay
Expand Down Expand Up @@ -84,7 +85,7 @@ def estimate_duration(
)


def _channels_map(elements: QubitMap):
def _channels_map(elements: QubitMap) -> dict[ChannelId, QubitId]:
"""Map channel names to element (qubit or coupler)."""
return {ch.name: id for id, el in elements.items() for ch in el.channels}

Expand Down Expand Up @@ -164,14 +165,19 @@ def components(self) -> set[str]:
return set(self.parameters.configs.keys())

@property
def channels(self) -> list[str]:
def channels(self) -> list[ChannelId]:
"""Channels in the platform."""
return list(self.channels_map)
return list(self.qubit_channels) + list(self.coupler_channels)

@property
def channels_map(self) -> dict[str, QubitId]:
"""Channel to element map."""
return _channels_map(self.qubits) | _channels_map(self.couplers)
def qubit_channels(self) -> dict[ChannelId, QubitId]:
"""Channel to qubit map."""
return _channels_map(self.qubits)

@property
def coupler_channels(self):
"""Channel to coupler map."""
return _channels_map(self.couplers)

def config(self, name: str) -> Config:
"""Returns configuration of given component."""
Expand Down
4 changes: 3 additions & 1 deletion src/qibolab/qubits.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from collections.abc import Iterable
from typing import Annotated, Optional

from pydantic import BeforeValidator, ConfigDict, PlainSerializer

from .components import AcquireChannel, DcChannel, IqChannel
from .components.channels import Channel
from .identifier import ChannelType, QubitId
from .serialize import Model

Expand Down Expand Up @@ -35,7 +37,7 @@ class Qubit(Model):
flux: Optional[DcChannel] = None

@property
def channels(self):
def channels(self) -> Iterable[Channel]:
for ct in ChannelType:
channel = getattr(self, ct.value)
if channel is not None:
Expand Down
98 changes: 97 additions & 1 deletion tests/test_compilers_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@

from qibolab import create_platform
from qibolab.compilers import Compiler
from qibolab.identifier import ChannelId
from qibolab.identifier import ChannelId, ChannelType
from qibolab.native import FixedSequenceFactory, TwoQubitNatives
from qibolab.platform import Platform
from qibolab.pulses import Delay
from qibolab.pulses.envelope import Rectangular
from qibolab.pulses.pulse import Pulse
from qibolab.sequence import PulseSequence


Expand Down Expand Up @@ -203,3 +206,96 @@ def test_align_multiqubit(platform: Platform):
probe_delay = next(iter(sequence.channel(ChannelId.load(f"qubit_{q}/probe"))))
assert isinstance(probe_delay, Delay)
assert flux_duration == probe_delay.duration


@pytest.mark.parametrize("joint", [True, False])
def test_inactive_qubits(platform: Platform, joint: bool):
main, coupled = 0, 1
circuit = Circuit(2)
circuit.add(gates.CZ(main, coupled))
# another gate on drive is needed, to prevent trimming the delay, if alone
circuit.add(gates.GPI2(coupled, phi=0.15))
if joint:
circuit.add(gates.M(main, coupled))
else:
circuit.add(gates.M(main))
circuit.add(gates.M(coupled))

natives = platform.natives.two_qubit[(main, coupled)] = TwoQubitNatives(
CZ=FixedSequenceFactory([])
)
assert natives.CZ is not None
natives.CZ.clear()
sequence = compile_circuit(circuit, platform)

def no_measurement(seq: PulseSequence):
return [
el
for el in seq
if el[0].channel_type not in (ChannelType.PROBE, ChannelType.ACQUISITION)
]

assert len(no_measurement(sequence)) == 1

mflux = f"qubit_{main}/flux"
cdrive = f"qubit_{coupled}/drive"
duration = 200
natives.CZ.extend(
PulseSequence.load(
[
(
mflux,
Pulse(duration=duration, amplitude=0.42, envelope=Rectangular()),
)
]
)
)
padded_seq = compile_circuit(circuit, platform)
assert len(no_measurement(padded_seq)) == 3
cdrive_delay = next(iter(padded_seq.channel(ChannelId.load(cdrive))))
assert isinstance(cdrive_delay, Delay)
assert (
cdrive_delay.duration
== next(iter(padded_seq.channel(ChannelId.load(mflux)))).duration
)


def test_joint_split_equivalence(platform: Platform):
"""Test joint-split equivalence after 2q gate.

Joint measurements are only equivalent to split in specific
circumstances. When the two qubits involved are just coming out of a
mutual interaction is one of those cases.

Cf.
https://github.com/qiboteam/qibolab/pull/992#issuecomment-2302708439
"""
circuit = Circuit(3)
circuit.add(gates.CZ(1, 2))
circuit.add(gates.GPI2(2, phi=0.15))
circuit.add(gates.CZ(0, 2))

joint = Circuit(3)
joint.add(gates.M(0, 2))

joint_seq = compile_circuit(circuit + joint, platform)

split = Circuit(3)
split.add(gates.M(0))
split.add(gates.M(2))

split_seq = compile_circuit(circuit + split, platform)

# the inter-channel sorting is unreliable, and mostly irrelevant (unless align
# instructions are involved, which is not the case)
assert not any(
isinstance(p, gates.Align) for seq in (joint_seq, split_seq) for _, p in seq
) # TODO: gates.Align is just a placeholder, replace with the pulse-like when available
for ch in (
"qubit_0/acquisition",
"qubit_2/acquisition",
"qubit_0/probe",
"qubit_2/probe",
):
chid = ChannelId.load(ch)
assert list(joint_seq.channel(chid)) == list(split_seq.channel(chid))