Source code for hamil_clever_sim.hamil_runner

"""
This module provides the core simulation and data handling processes for the app.
We expose a few helper classes to act as the "Model" of the data that the view layer can use.

"""

from __future__ import annotations

import asyncio
import time
import typing as ty
from asyncio import Task

import numpy as np
import qiskit as q
from qiskit import quantum_info as qi
from qiskit.circuit import Instruction
from qiskit.quantum_info import Operator, SparsePauliOp, Statevector
from qiskit.visualization.circuit.circuit_visualization import _text_circuit_drawer
from qiskit.visualization.circuit.text import TextDrawing
from qiskit_aer import AerSimulator
from scipy.linalg import expm
from typing_extensions import Callable, Optional, OrderedDict, Tuple, cast

from hamil_clever_sim.evolve_pauli import evolve_pauli
from hamil_clever_sim.inputs import PauliStringValidator, SimulationKindSet


[docs] def create_u(op: str, coef: Optional[float] = None): """Implements operator U=exp(iHt) for a given pauli term, with an optional coefficient. :param op: A valid pauli string, single term only. :param coef: A coefficient in the form of a float, that will multiply the time value (from relation $e^{ic_kP_kt}$) :return: A closure that of the form U(t), generating a circuit for a given time value. :rtype: (time: float, label: Optional[str]) -> QuantumCircuit """ weight = coef if coef is not None else 1.0 def u_n(t, label=None): weighted_t = t * weight paul = evolve_pauli(qi.Pauli(op), weighted_t, label=label) if label is None: label = f"$U({t} * {weight})$" # paul.name = f"{label}={paul.name}" paul.name = "unitary_evolve" return paul return u_n
[docs] def build_iterative_circuit(t: float, *u_states, N: int = 4): r"""Implements a trotterised circuit from a vararg of U(t) circuits. This method assumes that given trotterised circuits of X+Y and X+Y+Z are valid, then X+Y+Z+X+...+Z are also valid. :param N: Trotterisation terms / simulation accuracy. Higher values are better, but will balloon the gate count. :param t: How many time steps to simulate for. This term does not introduce more gates, just what the rotation value will be. :return: Returns a circuit that implements $\ket{\psi_t}$ :rtype: :class:`QuantumCircuit` """ # start = time.perf_counter_ns() u_1 = u_states[0] print(u_1(t).num_qubits) circ = q.QuantumCircuit(u_1(t).num_qubits) circ.barrier(label="n=0") # NOTE: don't use the for_loop interface, it seems # to only be of use for when we are performing actions based on # some sort of conditional around the loop within the circuit. # otherwise it spawns a bunch of parameters and causes issues when # trying to decompose the circuit. for n in range(1, N + 1): states = list(zip(u_states, range(1, len(u_states) + 1))) for u, i in states: circ.append(u(t / N, label=f"U_{i}({t} / {N})"), range(0, circ.num_qubits)) circ.barrier(label=f"n={n}") # end = time.perf_counter_ns() # print(f"Construction of iterated circuit took {end - start} ns") circ.save_statevector() # type:ignore return circ
[docs] def build_operator_circuit( t, *u_states, N: int = 4, qubits: Optional[int] = None ) -> Operator: r"""This method implements trotterisation a hamiltonian while also not needing to create a copy of the circuit N times. Instead, the first step of the simulation is taken, and then converted into a unitary operator of form $UU^\dagger=I$. This operator is then taken to the power of N, turning the problem into an optimised linalg computation. This operator can then be fed directly into a prepared :class:`Statevector`, using the :func:`Statevector.evolve` method. :param N: Trotterisation terms / simulation accuracy. Higher values are better, but will balloon the gate count. :param t: How many time steps to simulate for. This term does not introduce more gates, just what the rotation value will be. :param qubits: The largest qubit value, in case not all pauli terms are of equal dimensions. :return: An operator that can implement $\ket{\phi_t}$ for a statevector. """ start = time.perf_counter_ns() # we assume all u_gates are of equal dims if qubits arg is not passed u_1 = u_states[0] circ = q.QuantumCircuit(qubits if qubits is not None else u_1(t).num_qubits) for u, i in zip(u_states, range(1, len(u_states) + 1)): circ.append(u(t / N, label=f"U_{i}({t} / {N})"), range(0, circ.num_qubits)) np_op = Operator(circ).power(N) end = time.perf_counter_ns() print(f"Construction of operator circuit took {end-start} ns") return np_op
[docs] def statevec_backend() -> AerSimulator: # apparently the statevector sim is being depreciated, thanks qiskit! # backend = statevector_simulator.StatevectorSimulator() backend = AerSimulator(method="statevector") return backend
backend = statevec_backend()
[docs] class SimulationCircuitMetadata: factor: int gates: ty.OrderedDict[str, int] # circuit_repr: ty.Optional[Callable[[], TextDrawing]] circuit_repr: Optional[TextDrawing] def __init__( self, gates: ty.OrderedDict[Instruction, int], factor: int, drawn: Optional[Callable[[], TextDrawing]] = None, ) -> None: self.factor = factor self.gates = OrderedDict( [(str(instruction), val) for instruction, val in gates.items()] ) self.circuit_repr = drawn() if drawn is not None else None def get_counts(self) -> ty.Iterable[str]: return iter( [f"{gate}: {val}×{self.factor}" for gate, val in self.gates.items()] )
[docs] class SimulationTimingData: start_as_timestamp = time.localtime() start: int = time.perf_counter_ns() finish_build: int | None = None # start_sim:int | None = None end_sim: int | None = None update_callback: ty.Callable[[ty.Any], None] | None def __init__(self, callback=None) -> None: self.start = time.perf_counter_ns() self.start_as_timestamp = time.localtime() if callback is not None: self.update_callback = callback def register_callback(self, callback): self.update_callback = callback def register_update(self): if self.update_callback is not None: self.update_callback(self) def build_finished(self): self.finish_build = time.perf_counter_ns() self.register_update() def __repr__(self): return ( f"SimulationTimingData(\nstart={self.start}," + f"\nfinish_build={self.finish_build},\n" + f"\nend_sim={self.end_sim})" ) def sim_ended(self): self.end_sim = time.perf_counter_ns() self.register_update()
[docs] class SimulationRunnerResult: """ This class encapsulates the process of running specific kind of simulation, and processing the output into a standard form that can be displayed via the :class:`~hamil_clever_sim.components.statevector_display.StatevectorDisplay` widget. Generating a class per-type allows us to handle each siulation async/on worker threads, such that we can get the output of the much faster methods to display, while the slower methods work in the background. :param meta: The :class:`~hamil_clever_sim.hamil_runner.SimulationRunner` that spawned this runner. :param timing_data: A :class:`~hamil_clever_sim.hamil_runner.SimulationTimingData` that is updated for each stage of the simulation. :param type: A single member bitflag from :class:`~hamil_clever_sim.inputs.SimulationKindSet` that represents the type of simulation this runner will use. :param data: The final statevector output given after the simulation completes. The data will be in the form of a key value pair, with the key being a ket notation of a state (i.e 001, 010, 110) and the value will be a two tuple, with the first element being the complex amplitude, and the second as the probability out of 100% that this state will be measured. """ Data = dict[str, Tuple[complex, float]] type: SimulationKindSet meta: SimulationRunner data: Data job: Task[Statevector] def __init__(self, meta: SimulationRunner): self.meta = meta # starting the timer at initiation. # technically bad behaviour because # the sim/building may not have started just yet # but it makes it a lot easier to directly bind # data to the view layer. self.timing_data = SimulationTimingData() def set_type(self, type: SimulationKindSet): self.type = type def process(self, data: Statevector) -> Data: out = data.to_dict(decimals=self.meta.OUTPUT_PRECISION) probabilities = data.probabilities_dict(decimals=self.meta.OUTPUT_PRECISION) for key, prob in probabilities.items(): out[key] = (out[key], prob) self.data = out return self.data async def run(self, callback=None): self.timing_data.register_callback(callback) if self.type == SimulationKindSet.QC_METHOD: self.get_qc_circuit_metadata() return await self.run_qc_simulation() elif self.type == SimulationKindSet.OP_METHOD: return await self.run_op_simulation() else: return await self.run_direct_calc() async def run_qc_simulation(self): runner = self.meta paulis_circ = [ create_u(term, float(coef) if coef is not None else 1) for term, coef in runner.weighted_paulis ] circ = build_iterative_circuit(runner.time, *paulis_circ, N=runner.n) self.timing_data.build_finished() job = backend.run(circ.decompose(reps=2)) async def poll_job(interval=0.15): try: while job.running(): await asyncio.sleep(interval) return job.result() finally: if not job.in_final_state(): job.cancel() result = await poll_job() self.timing_data.sim_ended() st = result.get_statevector() assert isinstance(st, Statevector) return self.process(st) async def run_op_simulation(self): runner = self.meta paulis_circ = [ create_u(term, float(coef) if coef is not None else 1) for term, coef in runner.weighted_paulis ] largest = max([pc(runner.time).num_qubits for pc in paulis_circ]) op = build_operator_circuit( runner.time, *paulis_circ, N=runner.n, qubits=largest ) self.timing_data.build_finished() init_state = Statevector.from_label("0" * largest) res = init_state.evolve(op) self.timing_data.sim_ended() return self.process(res) async def run_direct_calc(self): runner = self.meta paulis_circ = [ (term, float(coef) if coef is not None else 1) for term, coef in runner.weighted_paulis ] largest = max([len(term) for term, _ in paulis_circ]) init_state = Statevector.from_label("0" * largest) pauli_collect = cast(tuple[tuple[str], tuple[float]], tuple(zip(*paulis_circ))) pauli_sparse = SparsePauliOp(list(pauli_collect[0]), np.array(pauli_collect[1])) self.timing_data.build_finished() exp = expm(-1j * pauli_sparse.to_matrix() * self.meta.time) self.timing_data.sim_ended() out = Statevector(np.matmul(exp, init_state)) return self.process(out) def get_qc_circuit_metadata(self, with_draw=False): runner = self.meta paulis_circ = [ create_u(term, float(coef) if coef is not None else 1) for term, coef in runner.weighted_paulis ] circ = build_iterative_circuit(runner.time, *paulis_circ, N=1) decomped = circ.decompose( ["rzx", "rxz", "rzz", "ryy", "rxx", "unitary_evolve"], reps=2 ) counted = decomped.count_ops() # qiskit is mega dumb, as per usual # defer attempting to "draw" until we are # able to put it in a fake stdout context # yes, even calling these methods directly # and setting the encoding doesn't work. # it seems to really want the stdout handle, # but we are in a tui context and need to fake it. def defer_drawing(): drawing = _text_circuit_drawer( decomped, initial_state=False, vertical_compression="high", plot_barriers=False, fold=-1, with_layout=False, encoding="utf8", ) return drawing return SimulationCircuitMetadata( counted, self.meta.n, drawn=defer_drawing, )
[docs] class SimulationRunner: OUTPUT_PRECISION = 4 def __init__( self, pauli: str, time: float, n: int, kind: SimulationKindSet ) -> None: self.paulis = pauli.split("+") self.weighted_paulis = [] for split_pauli in self.paulis: match = PauliStringValidator.pauli_string_regex.fullmatch(split_pauli) assert match is not None coef = match.group("coef") term = match.group("term") self.weighted_paulis.append((term, coef)) print(repr(self.weighted_paulis)) self.time = time self.n = n self.kind = kind def run_job_for_type(self, kind: SimulationKindSet) -> SimulationRunnerResult: assert len(kind) == 1 resulter = SimulationRunnerResult(self) if bool(kind & SimulationKindSet.QC_METHOD): resulter.set_type(SimulationKindSet.QC_METHOD) elif bool(kind & SimulationKindSet.OP_METHOD): resulter.set_type(SimulationKindSet.OP_METHOD) elif bool(kind & SimulationKindSet.EXP_METHOD): resulter.set_type(SimulationKindSet.EXP_METHOD) else: raise ValueError(f"{kind} does not contain any correct values") return resulter
if __name__ == "__main__": p1 = "XZ" p2 = "YX" N = 10 t = 10 init_state = Statevector.from_label("0" * len(p1)) u1 = create_u(p1, 3) u2 = create_u(p2, 5) start_1 = time.perf_counter_ns() sim = build_iterative_circuit(t, u1, u2, N=N) job = backend.run(sim.decompose(reps=2)) result = job.result().get_statevector() end_1 = time.perf_counter_ns() start_2 = time.perf_counter_ns() op = build_operator_circuit(t, u1, u2, N=N) res = init_state.evolve(op) end_2 = time.perf_counter_ns() print(f"Simulating via iterative method took {(end_1 - start_1) / 1000} ms") print(f"Simulating via operator method took {(end_2 - start_2) / 1000} ms") import pprint assert isinstance(result, Statevector) print("[iterated method] \n", pprint.pformat(result.to_dict())) print( f"[operator method (time ratio {(end_1 - start_1)/(end_2-start_2) }% faster)] = \n", pprint.pformat(res.to_dict()), ) print("[statevector probabilities]", pprint.pformat(result.probabilities_dict()))