Source code for pennylane.labs.resource_estimation.resource_tracking
# Copyright 2025 Xanadu Quantum Technologies Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
r"""Core resource tracking logic."""
import copy
from collections import defaultdict
from collections.abc import Callable, Iterable
from functools import singledispatch, wraps
from pennylane.labs.resource_estimation.qubit_manager import AllocWires, FreeWires, QubitManager
from pennylane.labs.resource_estimation.resource_mapping import map_to_resource_op
from pennylane.labs.resource_estimation.resource_operator import (
CompressedResourceOp,
GateCount,
ResourceOperator,
)
from pennylane.labs.resource_estimation.resources_base import Resources
from pennylane.operation import Operation
from pennylane.queuing import AnnotatedQueue, QueuingManager
from pennylane.wires import Wires
# pylint: disable=dangerous-default-value,protected-access,too-many-arguments
# user-friendly gateset for visual checks and initial compilation
StandardGateSet = {
"X",
"Y",
"Z",
"Hadamard",
"SWAP",
"CNOT",
"S",
"T",
"Adjoint(S)",
"Adjoint(T)",
"Toffoli",
"RX",
"RY",
"RZ",
"PhaseShift",
}
# realistic gateset for useful compilation of circuits
DefaultGateSet = {
"X",
"Y",
"Z",
"Hadamard",
"CNOT",
"S",
"T",
"Toffoli",
}
# parameters for further configuration of the decompositions
resource_config = {
"error_rx": 1e-9,
"error_ry": 1e-9,
"error_rz": 1e-9,
"precision_select_pauli_rot": 1e-9,
"precision_qubit_unitary": 1e-9,
"precision_qrom_state_prep": 1e-9,
"precision_alias_sampling": 1e-9,
}
[docs]
def estimate_resources(
obj: ResourceOperator | Callable | Resources | list,
gate_set: set = DefaultGateSet,
config: dict = resource_config,
work_wires: int | dict = 0,
tight_budget: bool = False,
single_qubit_rotation_error: float | None = None,
) -> Resources | Callable:
r"""Estimate the quantum resources required from a circuit or operation in terms of the gates
provided in the gateset.
Args:
obj (Union[ResourceOperator, Callable, Resources, List]): The quantum circuit or operation
to obtain resources from.
gate_set (Set, optional): A set of names (strings) of the fundamental operations to track
counts for throughout the quantum workflow.
config (Dict, optional): A dictionary of additional parameters which sets default values
when they are not specified on the operator.
single_qubit_rotation_error (Union[float, None]): The acceptable error when decomposing
single qubit rotations to `T`-gates using a Clifford + T approximation. This value takes
preference over the values set in the :code:`config`.
Returns:
Resources: the quantum resources required to execute the circuit
Raises:
TypeError: could not obtain resources for obj of type :code:`type(obj)`
**Example**
We can track the resources of a quantum workflow by passing the quantum function defining our
workflow directly into this function.
.. code-block:: python
import pennylane.labs.resource_estimation as plre
def my_circuit():
for w in range(2):
plre.ResourceHadamard(wires=w)
plre.ResourceCNOT(wires=[0,1])
plre.ResourceRX(wires=0)
plre.ResourceRY(wires=1)
plre.ResourceQFT(num_wires=3, wires=[0, 1, 2])
return
Note that we are passing a python function NOT a :class:`~.QNode`. The resources for this
workflow are then obtained by:
>>> res = plre.estimate_resources(
... my_circuit,
... gate_set = plre.DefaultGateSet,
... single_qubit_rotation_error = 1e-4,
... )()
...
>>> print(res)
--- Resources: ---
Total qubits: 3
Total gates : 279
Qubit breakdown:
clean qubits: 0, dirty qubits: 0, algorithmic qubits: 3
Gate breakdown:
{'Hadamard': 5, 'CNOT': 10, 'T': 264}
"""
if single_qubit_rotation_error is not None:
config = _update_config_single_qubit_rot_error(config, single_qubit_rotation_error)
return _estimate_resources(obj, gate_set, config, work_wires, tight_budget)
@singledispatch
def _estimate_resources(
obj: ResourceOperator | Callable | Resources | list,
gate_set: set = DefaultGateSet,
config: dict = resource_config,
work_wires: int | dict = 0,
tight_budget: bool = False,
) -> Resources | Callable:
r"""Raise error if there is no implementation registered for the object type."""
raise TypeError(
f"Could not obtain resources for obj of type {type(obj)}. obj must be one of Resources, Callable or ResourceOperator"
)
@_estimate_resources.register
def resources_from_qfunc(
obj: Callable,
gate_set: set = DefaultGateSet,
config: dict = resource_config,
work_wires=0,
tight_budget=False,
) -> Callable:
"""Get resources from a quantum function which queues operations"""
@wraps(obj)
def wrapper(*args, **kwargs):
with AnnotatedQueue() as q:
obj(*args, **kwargs)
qm = QubitManager(work_wires, tight_budget)
# Get algorithm wires:
num_algo_qubits = 0
circuit_wires = []
for op in q.queue:
if op._queue_category in ["_ops", "_resource_op"]:
if op.wires:
circuit_wires.append(op.wires)
else:
num_algo_qubits = max(num_algo_qubits, op.num_wires)
num_algo_qubits += len(Wires.all_wires(circuit_wires))
qm.algo_qubits = num_algo_qubits # set the algorithmic qubits in the qubit manager
# Obtain resources in the gate_set
compressed_res_ops_lst = _ops_to_compressed_reps(q.queue)
gate_counts = defaultdict(int)
for cmp_rep_op in compressed_res_ops_lst:
_counts_from_compressed_res_op(
cmp_rep_op, gate_counts, qbit_mngr=qm, gate_set=gate_set, config=config
)
return Resources(qubit_manager=qm, gate_types=gate_counts)
return wrapper
@_estimate_resources.register
def resources_from_resource(
obj: Resources,
gate_set: set = DefaultGateSet,
config: dict = resource_config,
work_wires=None,
tight_budget=None,
) -> Resources:
"""Further process resources from a resources object."""
existing_qm = obj.qubit_manager
if work_wires is not None:
if isinstance(work_wires, dict):
clean_wires = work_wires["clean"]
dirty_wires = work_wires["dirty"]
else:
clean_wires = work_wires
dirty_wires = 0
existing_qm._clean_qubit_counts = max(clean_wires, existing_qm._clean_qubit_counts)
existing_qm._dirty_qubit_counts = max(dirty_wires, existing_qm._dirty_qubit_counts)
if tight_budget is not None:
existing_qm.tight_budget = tight_budget
gate_counts = defaultdict(int)
for cmpr_rep_op, count in obj.gate_types.items():
_counts_from_compressed_res_op(
cmpr_rep_op,
gate_counts,
qbit_mngr=existing_qm,
gate_set=gate_set,
scalar=count,
config=config,
)
# Update:
return Resources(qubit_manager=existing_qm, gate_types=gate_counts)
@_estimate_resources.register
def resources_from_resource_ops(
obj: ResourceOperator,
gate_set: set = DefaultGateSet,
config: dict = resource_config,
work_wires=None,
tight_budget=None,
) -> Resources:
"""Extract resources from a resource operator."""
if isinstance(obj, Operation):
obj = map_to_resource_op(obj)
return resources_from_resource(
1 * obj,
gate_set,
config,
work_wires,
tight_budget,
)
@_estimate_resources.register
def resources_from_pl_ops(
obj: Operation,
gate_set: set = DefaultGateSet,
config: dict = resource_config,
work_wires=None,
tight_budget=None,
) -> Resources:
"""Extract resources from a pl operator."""
obj = map_to_resource_op(obj)
return resources_from_resource(
1 * obj,
gate_set,
config,
work_wires,
tight_budget,
)
def _counts_from_compressed_res_op(
cp_rep: CompressedResourceOp,
gate_counts_dict,
qbit_mngr,
gate_set: set,
scalar: int = 1,
config: dict = resource_config,
) -> None:
"""Modifies the `gate_counts_dict` argument by adding the (scaled) resources of the operation provided.
Args:
cp_rep (CompressedResourceOp): operation in compressed representation to extract resources from
gate_counts_dict (Dict): base dictionary to modify with the resource counts
gate_set (Set): the set of operations to track resources with respect to
scalar (int, optional): optional scalar to multiply the counts. Defaults to 1.
config (Dict, optional): additional parameters to specify the resources from an operator. Defaults to resource_config.
"""
## If op in gate_set add to resources
if cp_rep.name in gate_set:
gate_counts_dict[cp_rep] += scalar
return
## Else decompose cp_rep using its resource decomp [cp_rep --> list[GateCounts]] and extract resources
resource_decomp = cp_rep.op_type.resource_decomp(config=config, **cp_rep.params)
qubit_alloc_sum = _sum_allocated_wires(resource_decomp)
for action in resource_decomp:
if isinstance(action, GateCount):
_counts_from_compressed_res_op(
action.gate,
gate_counts_dict,
qbit_mngr=qbit_mngr,
scalar=scalar * action.count,
gate_set=gate_set,
config=config,
)
continue
if isinstance(action, AllocWires):
if qubit_alloc_sum != 0 and scalar > 1:
qbit_mngr.grab_clean_qubits(action.num_wires * scalar)
else:
qbit_mngr.grab_clean_qubits(action.num_wires)
if isinstance(action, FreeWires):
if qubit_alloc_sum != 0 and scalar > 1:
qbit_mngr.free_qubits(action.num_wires * scalar)
else:
qbit_mngr.free_qubits(action.num_wires)
return
def _sum_allocated_wires(decomp):
"""Sum together the allocated and released wires in a decomposition."""
s = 0
for action in decomp:
if isinstance(action, AllocWires):
s += action.num_wires
if isinstance(action, FreeWires):
s -= action.num_wires
return s
def _update_config_single_qubit_rot_error(config, error):
r"""Create a new config dictionary with the new single qubit
error threshold.
Args:
config (Dict): the configuration dictionary to override
error (float): the new error threshold to be set
"""
new_config = copy.copy(config)
new_config["error_rx"] = error
new_config["error_ry"] = error
new_config["error_rz"] = error
return new_config
@QueuingManager.stop_recording()
def _ops_to_compressed_reps(
ops: Iterable[Operation | ResourceOperator],
) -> list[CompressedResourceOp]:
"""Convert the sequence of operations to a list of compressed resource ops.
Args:
ops (Iterable[Union[Operation, ResourceOperator]]): set of operations to convert
Returns:
List[CompressedResourceOp]: set of converted compressed resource ops
"""
cmp_rep_ops = []
for op in ops: # We are skipping measurement processes here.
if op._queue_category == "_resource_op":
cmp_rep_ops.append(op.resource_rep_from_op())
elif op._queue_category == "_ops": # map: op --> res_op, then: res_op --> cmprsd_res_op
cmp_rep_ops.append(map_to_resource_op(op).resource_rep_from_op())
return cmp_rep_ops
_modules/pennylane/labs/resource_estimation/resource_tracking
Download Python script
Download Notebook
View on GitHub