From 351728714a2ee904d8c179fc5f32a5404f018497 Mon Sep 17 00:00:00 2001 From: zaqqwerty Date: Thu, 17 Feb 2022 00:06:50 -0600 Subject: [PATCH 1/5] import --- tests/circuit_infer_test.py | 315 +++++++++++++++++++++++------------- tests/test_util.py | 28 +++- 2 files changed, 219 insertions(+), 124 deletions(-) diff --git a/tests/circuit_infer_test.py b/tests/circuit_infer_test.py index 7b443358..9b112a11 100644 --- a/tests/circuit_infer_test.py +++ b/tests/circuit_infer_test.py @@ -14,9 +14,10 @@ # ============================================================================== """Tests for the circuit_infer module.""" -import itertools from absl import logging from absl.testing import parameterized +import functools +import itertools import random import string @@ -24,7 +25,6 @@ import math import sympy import tensorflow as tf -import tensorflow_probability as tfp import tensorflow_quantum as tfq from tensorflow_quantum.python import util as tfq_util @@ -33,13 +33,8 @@ from qhbmlib import circuit_model_utils from qhbmlib import energy_model from qhbmlib import hamiltonian_model -from qhbmlib import utils from tests import test_util -# Global tolerance, set for float32. -ATOL = 1e-5 -GRAD_ATOL = 2e-4 - class QuantumInferenceTest(parameterized.TestCase, tf.test.TestCase): """Tests the QuantumInference class.""" @@ -47,27 +42,23 @@ class QuantumInferenceTest(parameterized.TestCase, tf.test.TestCase): def setUp(self): """Initializes test objects.""" super().setUp() + self.tf_random_seed = 10 + self.tfp_seed = tf.constant([5, 6], dtype=tf.int32) + + self.close_atol = 1e-3 + self.close_rtol = 1e-3 + self.not_zero_atol = 2e-3 # Build QNN representing X^p|s> - self.num_qubits = 5 - self.raw_qubits = cirq.GridQubit.rect(1, self.num_qubits) + self.num_bits = 3 + self.raw_qubits = cirq.GridQubit.rect(1, self.num_bits) p_param = sympy.Symbol("p") p_circuit = cirq.Circuit(cirq.X(q)**p_param for q in self.raw_qubits) self.p_qnn = circuit_model.DirectQuantumCircuit( p_circuit, initializer=tf.keras.initializers.RandomUniform( - minval=-5.0, maxval=5.0), + minval=-1.0, maxval=1.0, seed=self.tf_random_seed), name="p_qnn") - self.tfp_seed = tf.constant([5, 6], dtype=tf.int32) - - self.close_rtol = 1e-2 - self.not_zero_atol = 1e-3 - - self.tf_random_seed = 10 - self.tfp_seed = tf.constant([5, 6], dtype=tf.int32) - - self.close_rtol = 1e-2 - self.not_zero_atol = 1e-4 def test_init(self): """Confirms QuantumInference is initialized correctly.""" @@ -87,7 +78,6 @@ def test_init(self): @test_util.eager_mode_toggle def test_expectation(self): r"""Confirms basic correct expectation values and derivatives. - Consider a circuit where each qubit has a gate X^p. Diagonalization of X is |0 1| |-1 1||-1 0||-1/2 1/2| X = |1 0| = | 1 1|| 0 1|| 1/2 1/2| @@ -110,13 +100,11 @@ def test_expectation(self): are, for W in {X, Y, Z}. Applying the above results, we have = 0 = -(-1)^s sin(pi * p) = (-1)^s cos(pi * p) - Since these expectation values are in terms of p, we can calculate their derivatives with respect to p: d/dp = 0 @@ -127,11 +115,10 @@ def test_expectation(self): # Build inference object exp_infer = circuit_infer.QuantumInference(self.p_qnn) - # Choose some bitstrings. - num_bitstrings = int(1e6) - initial_states = tfp.distributions.Bernoulli( - probs=[0.5] * self.num_qubits, dtype=tf.int8).sample(num_bitstrings) - bitstrings, _, counts = utils.unique_bitstrings_with_counts(initial_states) + # Get all the bitstrings multiple times. + initial_states_list = 5 * list( + itertools.product([0, 1], repeat=self.num_bits)) + initial_states = tf.constant(initial_states_list, dtype=tf.int8) # Get true expectation values based on the bitstrings. expected_x_exps = [] @@ -152,7 +139,7 @@ def test_expectation(self): ] sin_pi_p = math.sin(math.pi * self.p_qnn.symbol_values[0]) cos_pi_p = math.cos(math.pi * self.p_qnn.symbol_values[0]) - for bits in bitstrings.numpy().tolist(): + for bits in initial_states_list: for exps in expected + expected_grad: exps.append([]) for s in bits: @@ -162,17 +149,6 @@ def test_expectation(self): expected_y_exps_grad[-1].append(-((-1.0)**s) * math.pi * cos_pi_p) expected_z_exps[-1].append(((-1.0)**s) * cos_pi_p) expected_z_exps_grad[-1].append(-((-1.0)**s) * math.pi * sin_pi_p) - e_counts = tf.cast(tf.expand_dims(counts, 1), tf.float32) - total_counts = tf.cast(tf.reduce_sum(counts), tf.float32) - expected_reduced = [] - expected_grad_reduced = [] - for exps in expected: - expected_reduced.append(tf.reduce_sum(exps * e_counts, 0) / total_counts) - for exps in expected_grad: - expected_grad_reduced.append( - tf.reduce_sum(exps * e_counts, 0) / total_counts) - expected_reduced = tf.stack(expected_reduced) - expected_grad_reduced = tf.stack(expected_grad_reduced) # Measure operators on every qubit. x_ops = tfq.convert_to_tensor([1 * cirq.X(q) for q in self.raw_qubits]) @@ -181,29 +157,24 @@ def test_expectation(self): all_ops = [x_ops, y_ops, z_ops] expectation_wrapper = tf.function(exp_infer.expectation) - actual_reduced = [] - actual_grad_reduced = [] + actual = [] + actual_grad = [] for op in all_ops: with tf.GradientTape() as tape: current_exp = expectation_wrapper(initial_states, op) - reduced_exp = tf.math.reduce_mean(current_exp, 0) - reduced_grad = tf.squeeze( - tape.jacobian(reduced_exp, self.p_qnn.trainable_variables)) - actual_reduced.append(reduced_exp) - actual_grad_reduced.append(reduced_grad) - actual_reduced = tf.stack(actual_reduced) - actual_grad_reduced = tf.stack(actual_grad_reduced) - - self.assertAllClose(actual_reduced, expected_reduced, atol=ATOL) - self.assertAllClose( - actual_grad_reduced, expected_grad_reduced, atol=GRAD_ATOL) + current_grad = tf.squeeze( + tape.jacobian(current_exp, self.p_qnn.trainable_variables)) + actual.append(current_exp) + actual_grad.append(current_grad) + + self.assertAllClose(actual, expected, rtol=self.close_rtol) + self.assertAllClose(actual_grad, expected_grad, rtol=self.close_rtol) @test_util.eager_mode_toggle def test_expectation_cirq(self): """Compares library expectation values to those from Cirq.""" # observable - num_bits = 4 - qubits = cirq.GridQubit.rect(1, num_bits) + qubits = cirq.GridQubit.rect(1, self.num_bits) raw_ops = [ cirq.PauliSum.from_pauli_strings( [cirq.PauliString(cirq.Z(q)) for q in qubits]) @@ -212,9 +183,9 @@ def test_expectation_cirq(self): # unitary batch_size = 1 - n_moments = 10 - act_fraction = 0.9 - num_symbols = 2 + n_moments = 3 + act_fraction = 1.0 + num_symbols = 4 symbols = set() for _ in range(num_symbols): symbols.add("".join(random.sample(string.ascii_letters, 10))) @@ -222,53 +193,86 @@ def test_expectation_cirq(self): raw_circuits, _ = tfq_util.random_symbol_circuit_resolver_batch( qubits, symbols, batch_size, n_moments=n_moments, p=act_fraction) raw_circuit = raw_circuits[0] - random_values = tf.random.uniform([len(symbols)], -1, 1, tf.float32, - self.tf_random_seed).numpy().tolist() - resolver = dict(zip(symbols, random_values)) + initial_random_values = tf.random.uniform([len(symbols)], -1, 1, tf.float32, + self.tf_random_seed) + random_values = tf.Variable(initial_random_values) + + all_bitstrings = list(itertools.product([0, 1], repeat=self.num_bits)) + bitstring_circuit = circuit_model_utils.bit_circuit(qubits) + bitstring_symbols = sorted(tfq.util.get_circuit_symbols(bitstring_circuit)) + total_circuit = bitstring_circuit + raw_circuit + + def generate_resolvers(): + """Return the current resolver.""" + random_values_list = random_values.read_value().numpy().tolist() + base_resolver = dict(zip(symbols, random_values_list)) + bitstring_resolvers = [ + dict(zip(bitstring_symbols, b)) for b in all_bitstrings + ] + return [{**r, **base_resolver} for r in bitstring_resolvers] + + def delta_expectations_func(k, var, delta): + """Calculate the expectation with kth entry of `var` perturbed.""" + num_elts = tf.size(var) + old_value = var.read_value() + var.assign(old_value + delta * tf.one_hot(k, num_elts, 1.0, 0.0)) + total_resolvers = generate_resolvers() + raw_delta_expectations = tf.constant([[ + cirq.Simulator().simulate_expectation_values(total_circuit, o, + r)[0].real + for o in raw_ops + ] + for r in total_resolvers]) + delta_expectations = tf.constant(raw_delta_expectations) + var.assign(old_value) + return delta_expectations # hamiltonian model and inference circuit = circuit_model.QuantumCircuit( tfq.convert_to_tensor([raw_circuit]), qubits, tf.constant(symbols), - [tf.Variable([resolver[s] for s in symbols])], [[]]) + [random_values], [[]]) q_infer = circuit_infer.QuantumInference(circuit) - # bitstring injectors - all_bitstrings = list(itertools.product([0, 1], repeat=num_bits)) - bitstring_circuit = circuit_model_utils.bit_circuit(qubits) - bitstring_symbols = sorted(tfq.util.get_circuit_symbols(bitstring_circuit)) - bitstring_resolvers = [ - dict(zip(bitstring_symbols, b)) for b in all_bitstrings - ] - # calculate expected values - total_circuit = bitstring_circuit + raw_circuit - total_resolvers = [{**r, **resolver} for r in bitstring_resolvers] - raw_expectations = tf.constant([[ - cirq.Simulator().simulate_expectation_values(total_circuit, o, - r)[0].real for o in raw_ops - ] for r in total_resolvers]) - expected_expectations = tf.constant(raw_expectations) + expected_expectations = delta_expectations_func(0, random_values, 0) + print(f"expected_expectations: {expected_expectations}") + # Check that expectations are a reasonable size self.assertAllGreater( tf.math.abs(expected_expectations), self.not_zero_atol) expectation_wrapper = tf.function(q_infer.expectation) - actual_expectations = expectation_wrapper(all_bitstrings, ops) + with tf.GradientTape() as tape: + actual_expectations = expectation_wrapper(all_bitstrings, ops) self.assertAllClose( actual_expectations, expected_expectations, rtol=self.close_rtol) - # Ensure circuit parameter update changes the expectation value. - old_circuit_weights = circuit.get_weights() - circuit.set_weights([tf.ones_like(w) for w in old_circuit_weights]) - altered_circuit_expectations = expectation_wrapper(all_bitstrings, ops) - self.assertNotAllClose( - altered_circuit_expectations, actual_expectations, rtol=self.close_rtol) - circuit.set_weights(old_circuit_weights) + def expectations_derivative(variables_list): + """Approximately differentiates expectations with respect to the inputs""" + derivatives = [] + for var in variables_list: + var_derivative_list = [] + num_elts = tf.size(var) # Assumes variable is 1D + for n in range(num_elts): + this_derivative = test_util.approximate_derivative_unsummed( + functools.partial(delta_expectations_func, n, var)) + var_derivative_list.append(this_derivative.numpy()) + derivatives.append(tf.constant(var_derivative_list)) + return derivatives + + expected_expectations_derivative = tf.transpose( + tf.squeeze(expectations_derivative(circuit.trainable_variables))) + actual_expectations_derivative = tf.squeeze( + tape.jacobian(actual_expectations, circuit.trainable_variables)) - # Check that values return to start. - reset_expectations = expectation_wrapper(all_bitstrings, ops) - self.assertAllClose(reset_expectations, actual_expectations, - self.close_rtol) + self.assertNotAllClose( + expected_expectations_derivative, + tf.zeros_like(expected_expectations_derivative), + atol=self.not_zero_atol) + self.assertAllClose( + expected_expectations_derivative, + actual_expectations_derivative, + atol=self.close_atol) @parameterized.parameters({ "energy_class": energy_class, @@ -279,53 +283,132 @@ def test_expectation_cirq(self): def test_expectation_modular_hamiltonian(self, energy_class, energy_args): """Confirm expectation of modular Hamiltonians works.""" # set up the modular Hamiltonian to measure - num_bits = 3 - n_moments = 5 + # EBM + energy_h = energy_class(*([list(range(self.num_bits))] + energy_args)) + energy_h.build([None, self.num_bits]) + + # QNN + qubits = cirq.GridQubit.rect(1, self.num_bits) + batch_size = 1 + n_moments = 4 act_fraction = 1.0 - qubits = cirq.GridQubit.rect(1, num_bits) - energy_h = energy_class(*([list(range(num_bits))] + energy_args)) - energy_h.build([None, num_bits]) - raw_circuit_h = cirq.testing.random_circuit(qubits, n_moments, act_fraction) + num_symbols = 4 + symbols = set() + for _ in range(num_symbols): + symbols.add("".join(random.sample(string.ascii_letters, 10))) + symbols = sorted(list(symbols)) + raw_circuits, _ = tfq_util.random_symbol_circuit_resolver_batch( + qubits, symbols, batch_size, n_moments=n_moments, p=act_fraction) + raw_circuit_h = raw_circuits[0] circuit_h = circuit_model.DirectQuantumCircuit(raw_circuit_h) circuit_h.build([]) + initial_random_values = tf.random.uniform([len(symbols)], -1, 1, tf.float32, + self.tf_random_seed) + circuit_h.set_weights([initial_random_values]) hamiltonian_measure = hamiltonian_model.Hamiltonian(energy_h, circuit_h) raw_shards = tfq.from_tensor(hamiltonian_measure.operator_shards) - # set up the circuit and inference + # set up the circuit to measure against model_raw_circuit = cirq.testing.random_circuit(qubits, n_moments, act_fraction) model_circuit = circuit_model.DirectQuantumCircuit(model_raw_circuit) model_infer = circuit_infer.QuantumInference(model_circuit) # bitstring injectors - all_bitstrings = list(itertools.product([0, 1], repeat=num_bits)) + all_bitstrings = list(itertools.product([0, 1], repeat=self.num_bits)) bitstring_circuit = circuit_model_utils.bit_circuit(qubits) bitstring_symbols = sorted(tfq.util.get_circuit_symbols(bitstring_circuit)) - bitstring_resolvers = [ - dict(zip(bitstring_symbols, b)) for b in all_bitstrings - ] - - # calculate expected values total_circuit = bitstring_circuit + model_raw_circuit + raw_circuit_h**-1 - expected_expectations = tf.stack([ - tf.stack([ - hamiltonian_measure.energy.operator_expectation([ - cirq.Simulator().simulate_expectation_values( - total_circuit, o, r)[0].real for o in raw_shards - ]) - ]) for r in bitstring_resolvers - ]) + + def generate_resolvers(): + """Return the current resolver.""" + random_values_list = circuit_h.trainable_variables[0].read_value().numpy( + ).tolist() + base_resolver = dict(zip(symbols, random_values_list)) + bitstring_resolvers = [ + dict(zip(bitstring_symbols, b)) for b in all_bitstrings + ] + return [{**r, **base_resolver} for r in bitstring_resolvers] + + def delta_expectations_func(k, var, delta): + """Calculate the expectation with kth entry of `var` perturbed.""" + num_elts = tf.size(var) + old_value = var.read_value() + var.assign(old_value + delta * tf.one_hot(k, num_elts, 1.0, 0.0)) + total_resolvers = generate_resolvers() + raw_delta_expectations = tf.stack([ + tf.stack([ + hamiltonian_measure.energy.operator_expectation([ + cirq.Simulator().simulate_expectation_values( + total_circuit, o, r)[0].real for o in raw_shards + ]) + ]) for r in total_resolvers + ]) + delta_expectations = tf.constant(raw_delta_expectations) + var.assign(old_value) + return delta_expectations + + expected_expectations = delta_expectations_func( + 0, circuit_h.trainable_variables[0], 0) + self.assertNotAllClose( + expected_expectations, + tf.zeros_like(expected_expectations), + atol=self.not_zero_atol) expectation_wrapper = tf.function(model_infer.expectation) - actual_expectations = expectation_wrapper(all_bitstrings, - hamiltonian_measure) + with tf.GradientTape() as tape: + actual_expectations = expectation_wrapper(all_bitstrings, + hamiltonian_measure) self.assertAllClose(actual_expectations, expected_expectations) + def expectations_derivative(variables_list): + """Approximately differentiates expectations with respect to the inputs""" + derivatives = [] + for var in variables_list: + var_derivative_list = [] + num_elts = tf.size(var) # Assumes variable is 1D + for n in range(num_elts): + this_derivative = test_util.approximate_derivative_unsummed( + functools.partial(delta_expectations_func, n, var)) + var_derivative_list.append(this_derivative.numpy()) + derivatives.append(tf.constant(var_derivative_list)) + return derivatives + + expected_derivatives_thetas = tf.transpose( + tf.squeeze( + expectations_derivative( + hamiltonian_measure.energy.trainable_variables))) + self.assertNotAllClose( + expected_derivatives_thetas, + tf.zeros_like(expected_derivatives_thetas), + atol=self.not_zero_atol) + expected_derivatives_phis = tf.transpose( + tf.squeeze( + expectations_derivative( + hamiltonian_measure.circuit.trainable_variables))) + self.assertNotAllClose( + expected_derivatives_phis, + tf.zeros_like(expected_derivatives_phis), + atol=self.not_zero_atol) + actual_derivatives_thetas, actual_derivatives_phis = tape.jacobian( + actual_expectations, (hamiltonian_measure.energy.trainable_variables, + hamiltonian_measure.circuit.trainable_variables)) + actual_derivatives_thetas = tf.squeeze(actual_derivatives_thetas) + actual_derivatives_phis = tf.squeeze(actual_derivatives_phis) + self.assertAllClose( + actual_derivatives_phis, + expected_derivatives_phis, + rtol=self.close_rtol) + self.assertAllClose( + actual_derivatives_thetas, + expected_derivatives_thetas, + rtol=self.close_rtol) + @test_util.eager_mode_toggle def test_sample_basic(self): """Confirms correct sampling from identity, bit flip, and GHZ QNNs.""" bitstrings = tf.constant( - list(itertools.product([0, 1], repeat=self.num_qubits)), dtype=tf.int8) + list(itertools.product([0, 1], repeat=self.num_bits)), dtype=tf.int8) counts = tf.random.uniform([tf.shape(bitstrings)[0]], 10, 100, tf.int32) ident_qnn = circuit_model.DirectQuantumCircuit( @@ -362,15 +445,15 @@ def test_sample_basic(self): q_infer = circuit_infer.QuantumInference(ghz_qnn) sample_wrapper = tf.function(q_infer.sample) test_samples = sample_wrapper( - tf.expand_dims(tf.constant([0] * self.num_qubits, dtype=tf.int8), 0), + tf.expand_dims(tf.constant([0] * self.num_bits, dtype=tf.int8), 0), tf.expand_dims(counts[0], 0))[0].to_tensor() # Both |0...0> and |1...1> should be among the measured bitstrings self.assertTrue( test_util.check_bitstring_exists( - tf.constant([0] * self.num_qubits, dtype=tf.int8), test_samples)) + tf.constant([0] * self.num_bits, dtype=tf.int8), test_samples)) self.assertTrue( test_util.check_bitstring_exists( - tf.constant([1] * self.num_qubits, dtype=tf.int8), test_samples)) + tf.constant([1] * self.num_bits, dtype=tf.int8), test_samples)) @test_util.eager_mode_toggle def test_sample_uneven(self): diff --git a/tests/test_util.py b/tests/test_util.py index 7bc0cddb..d9e1318d 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -93,10 +93,8 @@ def get_random_hamiltonian_and_inference(qubits, def get_random_pauli_sum(qubits): """Test fixture. - Args: qubits: A list of `cirq.GridQubit`s on which to build the pauli sum. - Returns: pauli_sum: A `cirq.PauliSum` which is a linear combination of random pauli strings on `qubits`. @@ -130,17 +128,13 @@ def generate_pure_random_density_operator(num_qubits): def generate_mixed_random_density_operator(num_qubits, num_mixtures=5): """Generates a random mixed density matrix. - Generates `num_mixtures` random quantum states, takes their outer products, and generates a random convex combination of them. - NOTE: the states in the mixture are all orthogonal. - Args: num_qubits: 2**num_qubits is the size of the density matrix. num_mixtures: the number of pure states in the mixture. Must be greater than 2**num_qubits. - Returns: final_state: The mixed density matrix. prob: The probability of each random state in the mixture. @@ -220,11 +214,9 @@ def toggled_function(*args, **kwargs): def approximate_derivative(f, delta=1e-1): """Approximates the derivative of f using five point stencil. - See wikipedia page on "five point stencil", https://en.wikipedia.org/wiki/Five-point_stencil Note: the error of this method scales with delta ** 4. - Args: f: Function to approximately differentiate. Should take one input, which is a parameter setting the perturbation to the parameter to differentiate. @@ -240,3 +232,23 @@ def approximate_derivative(f, delta=1e-1): numerator = tf.reduce_sum( tf.stack(tf.nest.map_structure(tf.reduce_sum, numerator_flat))) return numerator / (12.0 * delta) + + +def approximate_derivative_unsummed(f, delta=1e-1): + """Approximates the derivative of f using five point stencil. + See wikipedia page on "five point stencil", + https://en.wikipedia.org/wiki/Five-point_stencil + Note: the error of this method scales with delta ** 4. + Args: + f: Function to approximately differentiate. Should take one input, which + is a parameter setting the perturbation to the parameter to differentiate. + delta: size of the fundamental perturbation in the stencil. + """ + forward_twice = f(2.0 * delta) + forward_once = f(delta) + backward_once = f(-1.0 * delta) + backward_twice = f(-2.0 * delta) + numerator = tf.nest.map_structure( + lambda a, b, c, d: -1.0 * a + 8.0 * b - 8.0 * c + d, forward_twice, + forward_once, backward_once, backward_twice) + return tf.nest.map_structure(lambda x: x / (12.0 * delta), numerator) From 99970a40c7768f85d17faf0ae880dc192276972a Mon Sep 17 00:00:00 2001 From: zaqqwerty Date: Thu, 17 Feb 2022 01:53:28 -0600 Subject: [PATCH 2/5] import --- qhbmlib/qmhl_loss.py | 33 +++++ qhbmlib/quantum_data.py | 59 ++++++++ tests/qmhl_loss_test.py | 291 +++++++++++++++++++++++++++++++++++++ tests/quantum_data_test.py | 52 +++++++ 4 files changed, 435 insertions(+) create mode 100644 qhbmlib/qmhl_loss.py create mode 100644 qhbmlib/quantum_data.py create mode 100644 tests/qmhl_loss_test.py create mode 100644 tests/quantum_data_test.py diff --git a/qhbmlib/qmhl_loss.py b/qhbmlib/qmhl_loss.py new file mode 100644 index 00000000..aeaa7b20 --- /dev/null +++ b/qhbmlib/qmhl_loss.py @@ -0,0 +1,33 @@ +# Copyright 2021 The QHBM Library Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Implementation of the QMHL loss function.""" + +from qhbmlib import hamiltonian_infer +from qhbmlib import quantum_data + + +def qmhl(data: quantum_data.QuantumData, qhbm: hamiltonian_infer.QHBM): + """Calculate the QMHL loss of the QHBM against the quantum data. + + See equation 21 in the appendix. + + Args: + data: The data mixed state to learn. + qhbm: QHBM being trained to approximate `data`. + + Returns: + The quantum cross-entropy between the data and the model. + """ + return data.expectation(qhbm.hamiltonian) + qhbm.e_inference.log_partition() diff --git a/qhbmlib/quantum_data.py b/qhbmlib/quantum_data.py new file mode 100644 index 00000000..595ab6a6 --- /dev/null +++ b/qhbmlib/quantum_data.py @@ -0,0 +1,59 @@ +# Copyright 2021 The QHBM Library Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Interface to quantum data sources.""" + +import abc +from typing import Union + +import tensorflow as tf + +from qhbmlib import hamiltonian_infer +from qhbmlib import hamiltonian_model + + +class QuantumData(abc.ABC): + """Interface for quantum datasets.""" + + @abc.abstractmethod + def expectation(self, observable: Union[tf.Tensor, + hamiltonian_model.Hamiltonian]): + """Take the expectation value of an observable against this dataset. + + Args: + observable: Hermitian operator to measure. If `tf.Tensor`, it is of type + `tf.string` with shape [1], result of calling `tfq.convert_to_tensor` + on a list of `cirq.PauliSum`, `[op]`. Otherwise, a Hamiltonian. + + Returns: + Scalar `tf.Tensor` which is the expectation value of `observable` against + this quantum data source. + """ + raise NotImplementedError() + + +class QHBMData(QuantumData): + """QuantumData defined by a QHBM.""" + + def __init__(self, qhbm: hamiltonian_infer.QHBM): + """Initializes a QHBMData. + + Args: + qhbm: An inference engine for a QHBM. + """ + self.qhbm = qhbm + + def expectation(self, observable): + """See base class docstring.""" + return tf.squeeze(self.qhbm.expectation(observable), 0) diff --git a/tests/qmhl_loss_test.py b/tests/qmhl_loss_test.py new file mode 100644 index 00000000..8761c802 --- /dev/null +++ b/tests/qmhl_loss_test.py @@ -0,0 +1,291 @@ +# Copyright 2021 The QHBM Library Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Tests for the QMHL loss and gradients.""" + +import functools +import math + +import cirq +import sympy +import tensorflow as tf +import tensorflow_probability as tfp + +from qhbmlib import energy_infer +from qhbmlib import energy_model +from qhbmlib import circuit_infer +from qhbmlib import circuit_model +from qhbmlib import hamiltonian_infer +from qhbmlib import hamiltonian_infer_utils +from qhbmlib import qmhl_loss +from qhbmlib import quantum_data +from tests import test_util + + +class QMHLTest(tf.test.TestCase): + """Tests for the QMHL loss and gradients.""" + + def setUp(self): + """Initializes test objects.""" + super().setUp() + self.num_qubits_list = [1, 2] + self.tf_random_seed = 4 + self.tf_random_seed_alt = 6 + self.tfp_seed = tf.constant([3, 6], tf.int32) + self.num_samples = int(1e6) + self.close_rtol = 2e-2 + self.zero_atol = 2e-3 + self.not_zero_atol = 2e-3 + + @test_util.eager_mode_toggle + def test_self_qmhl(self): + """Confirms known value of the QMHL loss of a model against itself.""" + num_layers = 1 + qmhl_wrapper = tf.function(qmhl_loss.qmhl) + for num_qubits in self.num_qubits_list: + qubits = cirq.GridQubit.rect(1, num_qubits) + data_h, data_infer = test_util.get_random_hamiltonian_and_inference( + qubits, num_layers, f"data_objects_{num_qubits}", self.num_samples) + model_h, model_infer = test_util.get_random_hamiltonian_and_inference( + qubits, + num_layers, + f"hamiltonian_objects_{num_qubits}", + self.num_samples, + initializer_seed=self.tf_random_seed) + # Set data equal to the model + data_h.set_weights(model_h.get_weights()) + data = quantum_data.QHBMData(data_infer) + + # Trained loss is the entropy. + expected_loss = model_infer.e_inference.entropy() + # Since this is the optimum, derivatives should all be zero. + expected_loss_derivative = [ + tf.zeros_like(v) for v in model_h.trainable_variables + ] + + with tf.GradientTape() as tape: + actual_loss = qmhl_wrapper(data, model_infer) + actual_loss_derivative = tape.gradient(actual_loss, + model_h.trainable_variables) + + self.assertAllClose(actual_loss, expected_loss, rtol=self.close_rtol) + self.assertAllClose( + actual_loss_derivative, expected_loss_derivative, atol=self.zero_atol) + + @test_util.eager_mode_toggle + def test_hamiltonian_qmhl(self): + """Tests derivatives of QMHL with respect to the model.""" + + # TODO(#171): Delta function seems generalizable. + def delta_qmhl(k, var, data, model_qhbm, delta): + """Calculates the qmhl loss with the kth entry of `var` perturbed.""" + num_elts = tf.size(var) + old_value = var.read_value() + var.assign(old_value + delta * tf.one_hot(k, num_elts, 1.0, 0.0)) + delta_loss = qmhl_loss.qmhl(data, model_qhbm) + var.assign(old_value) + return delta_loss + + qmhl_wrapper = tf.function(qmhl_loss.qmhl) + + def qmhl_derivative(variables_list, data, model_qhbm): + """Approximately differentiates QMHL wih respect to the inputs.""" + derivatives = [] + for var in variables_list: + var_derivative_list = [] + num_elts = tf.size(var) # Assumes variable is 1D + for n in range(num_elts): + this_derivative = test_util.approximate_derivative( + functools.partial(delta_qmhl, n, var, data, model_qhbm)) + var_derivative_list.append(this_derivative.numpy()) + derivatives.append(tf.constant(var_derivative_list)) + return derivatives + + for num_qubits in self.num_qubits_list: + qubits = cirq.GridQubit.rect(1, num_qubits) + num_layers = 2 + _, data_qhbm = test_util.get_random_hamiltonian_and_inference( + qubits, + num_layers, + f"data_objects_{num_qubits}", + self.num_samples, + initializer_seed=self.tf_random_seed, + ebm_seed=self.tfp_seed) + data = quantum_data.QHBMData(data_qhbm) + + model_h, model_qhbm = test_util.get_random_hamiltonian_and_inference( + qubits, + num_layers, + f"model_objects_{num_qubits}", + self.num_samples, + initializer_seed=self.tf_random_seed_alt, + ebm_seed=self.tfp_seed) + # Make sure variables are trainable + self.assertGreater(len(model_h.trainable_variables), 1) + with tf.GradientTape() as tape: + actual_loss = qmhl_wrapper(data, model_qhbm) + actual_derivative = tape.gradient(actual_loss, + model_h.trainable_variables) + + expected_derivative = qmhl_derivative(model_h.trainable_variables, data, + model_qhbm) + # Changing model parameters is working if finite difference derivatives + # are non-zero. Also confirms that model_h and data_h are different. + tf.nest.map_structure( + lambda x: self.assertAllGreater(tf.abs(x), self.not_zero_atol), + expected_derivative) + self.assertAllClose( + actual_derivative, expected_derivative, rtol=self.close_rtol) + + def test_loss_value_x_rot(self): + """Confirms correct values for a single qubit X rotation QHBM. + + We use a data state which is a Y rotation of an initially diagonal density + operator. The QHBM is a Bernoulli latent state with X rotation QNN. + + See the colab notebook at the following link for derivations: + https://colab.research.google.com/drive/14987JCMju_8AVvvVoojwe6hA7Nlw-Dhe?usp=sharing + + Since each qubit is independent, the loss is the sum over the individual + qubit losses, and the gradients are the the per-qubit gradients. + """ + ebm_const = 1.0 + q_const = math.pi + for num_qubits in self.num_qubits_list: + + # EBM + ebm_init = tf.keras.initializers.RandomUniform( + minval=ebm_const / 4, maxval=ebm_const, seed=self.tf_random_seed) + energy = energy_model.BernoulliEnergy(list(range(num_qubits)), ebm_init) + e_infer = energy_infer.BernoulliEnergyInference( + energy, self.num_samples, initial_seed=self.tfp_seed) + + # QNN + qubits = cirq.GridQubit.rect(1, num_qubits) + r_symbols = [sympy.Symbol(f"phi_{n}") for n in range(num_qubits)] + r_circuit = cirq.Circuit( + cirq.rx(r_s)(q) for r_s, q in zip(r_symbols, qubits)) + qnn_init = tf.keras.initializers.RandomUniform( + minval=q_const / 4, maxval=q_const, seed=self.tf_random_seed) + circuit = circuit_model.DirectQuantumCircuit(r_circuit, qnn_init) + q_infer = circuit_infer.QuantumInference(circuit) + qhbm_infer = hamiltonian_infer.QHBM(e_infer, q_infer) + model = qhbm_infer.hamiltonian + + # Confirm qhbm_model QHBM + test_thetas = model.energy.trainable_variables[0] + test_phis = model.circuit.trainable_variables[0] + with tf.GradientTape() as log_partition_tape: + actual_log_partition = qhbm_infer.e_inference.log_partition() + expected_log_partition = tf.reduce_sum( + tf.math.log(2 * tf.math.cosh(test_thetas))) + self.assertAllClose( + actual_log_partition, expected_log_partition, rtol=self.close_rtol) + # Confirm qhbm_model modular Hamiltonian for 1 qubit case + if num_qubits == 1: + actual_dm = hamiltonian_infer_utils.density_matrix(model) + actual_log_dm = tf.linalg.logm(actual_dm) + actual_ktp = -actual_log_dm - tf.eye( + 2, dtype=tf.complex64) * tf.cast(actual_log_partition, tf.complex64) + + a = complex((test_thetas[0] * tf.math.cos(test_phis[0])).numpy(), 0) + b = 1j * (test_thetas[0] * tf.math.sin(test_phis[0])).numpy() + c = -1j * (test_thetas[0] * tf.math.sin(test_phis[0])).numpy() + d = complex(-(test_thetas[0] * tf.math.cos(test_phis[0])).numpy(), 0) + expected_ktp = tf.constant([[a, b], [c, d]], dtype=tf.complex64) + + self.assertAllClose(actual_ktp, expected_ktp, rtol=self.close_rtol) + + # Build target data + alphas = tf.random.uniform([num_qubits], -q_const, q_const, tf.float32, + self.tf_random_seed) + y_rot = cirq.Circuit( + cirq.ry(r.numpy())(q) for r, q in zip(alphas, qubits)) + data_circuit = circuit_model.DirectQuantumCircuit(y_rot) + data_q_infer = circuit_infer.QuantumInference(data_circuit) + data_probs = tf.random.uniform([num_qubits], + dtype=tf.float32, + seed=self.tf_random_seed) + data_samples = tfp.distributions.Bernoulli( + probs=1 - data_probs, dtype=tf.int8).sample( + self.num_samples, seed=self.tfp_seed) + + # Load target data into a QuantumData class + class FixedData(quantum_data.QuantumData): + """Contains a fixed quantumd data set.""" + + def __init__(self, samples, q_infer): + """Initializes a FixedData.""" + self.samples = samples + self.q_infer = q_infer + + def expectation(self, observable): + """Averages over the fixed quantum data set.""" + raw_expectations = self.q_infer.expectation(self.samples, observable) + return tf.math.reduce_mean(raw_expectations) + + data = FixedData(data_samples, data_q_infer) + qmhl_wrapper = tf.function(qmhl_loss.qmhl) + with tf.GradientTape() as loss_tape: + actual_loss = qmhl_wrapper(data, qhbm_infer) + # TODO(zaqqwerty): add way to use a log QHBM as observable on states + expected_expectation = tf.reduce_sum(test_thetas * (2 * data_probs - 1) * + tf.math.cos(alphas) * + tf.math.cos(test_phis)) + with tf.GradientTape() as expectation_tape: + actual_expectation = data.expectation(qhbm_infer.hamiltonian) + self.assertAllClose(actual_expectation, expected_expectation, + self.close_rtol) + + expected_loss = expected_expectation + expected_log_partition + self.assertAllClose(actual_loss, expected_loss, rtol=self.close_rtol) + + expected_log_partition_grad = tf.math.tanh(test_thetas) + actual_log_partition_grad = log_partition_tape.gradient( + actual_log_partition, test_thetas) + self.assertAllClose( + actual_log_partition_grad, + expected_log_partition_grad, + rtol=self.close_rtol) + + expected_expectation_thetas_grad = ( + 2 * data_probs - 1) * tf.math.cos(alphas) * tf.math.cos(test_phis) + expected_expectation_phis_grad = -test_thetas * ( + 2 * data_probs - 1) * tf.math.cos(alphas) * tf.math.sin(test_phis) + (actual_expectation_thetas_grad, + actual_expectation_phis_grad) = expectation_tape.gradient( + actual_expectation, (test_thetas, test_phis)) + self.assertAllClose( + actual_expectation_thetas_grad, + expected_expectation_thetas_grad, + rtol=self.close_rtol) + self.assertAllClose( + actual_expectation_phis_grad, + expected_expectation_phis_grad, + rtol=self.close_rtol) + + actual_thetas_grads, actual_phis_grads = loss_tape.gradient( + actual_loss, (test_thetas, test_phis)) + expected_thetas_grads = ( + expected_expectation_thetas_grad + expected_log_partition_grad) + expected_phis_grads = expected_expectation_phis_grad + self.assertAllClose( + actual_thetas_grads, expected_thetas_grads, rtol=self.close_rtol) + self.assertAllClose( + actual_phis_grads, expected_phis_grads, rtol=self.close_rtol) + + +if __name__ == "__main__": + print("Running qmhl_loss_test.py ...") + tf.test.main() diff --git a/tests/quantum_data_test.py b/tests/quantum_data_test.py new file mode 100644 index 00000000..5e21717e --- /dev/null +++ b/tests/quantum_data_test.py @@ -0,0 +1,52 @@ +# Copyright 2021 The QHBM Library Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Tests quantum data sources.""" + +import cirq +import tensorflow as tf + +from qhbmlib import quantum_data +from tests import test_util + + +class QHBMDataTest(tf.test.TestCase): + """Tests the QHBM data class.""" + + def setUp(self): + """Initializes test objects.""" + self.num_qubits_list = [1, 2, 3] + self.tfp_seed = tf.constant([7, 8], tf.int32) + self.num_samples = int(1e6) + self.close_rtol = 3e-2 + + def test_expectation(self): + """Confirms initialization.""" + for num_qubits in self.num_qubits_list: + qubits = cirq.GridQubit.rect(1, num_qubits) + num_layers = 5 + _, qhbm = test_util.get_random_hamiltonian_and_inference( + qubits, + num_layers, + f"data_objects_{num_qubits}", + self.num_samples, + ebm_seed=self.tfp_seed) + hamiltonian, _ = test_util.get_random_hamiltonian_and_inference( + qubits, num_layers, f"observable_{num_qubits}", self.num_samples) + expected_expectation = tf.squeeze(qhbm.expectation(hamiltonian)) + + data = quantum_data.QHBMData(qhbm) + actual_expectation = data.expectation(hamiltonian) + + self.assertAllClose(actual_expectation, expected_expectation) From 35913d1480511d0cd3adaab4787a437e2ed9f884 Mon Sep 17 00:00:00 2001 From: zaqqwerty Date: Thu, 17 Feb 2022 02:16:57 -0600 Subject: [PATCH 3/5] trigger build From 082fba45754907e3941650f6436d0a51727f1a34 Mon Sep 17 00:00:00 2001 From: zaqqwerty Date: Thu, 17 Feb 2022 02:47:23 -0600 Subject: [PATCH 4/5] increased the tolerance of circuit test --- tests/circuit_infer_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/circuit_infer_test.py b/tests/circuit_infer_test.py index 9b112a11..7f77fea9 100644 --- a/tests/circuit_infer_test.py +++ b/tests/circuit_infer_test.py @@ -45,8 +45,8 @@ def setUp(self): self.tf_random_seed = 10 self.tfp_seed = tf.constant([5, 6], dtype=tf.int32) - self.close_atol = 1e-3 - self.close_rtol = 1e-3 + self.close_atol = 2e-3 + self.close_rtol = 2e-3 self.not_zero_atol = 2e-3 # Build QNN representing X^p|s> From cba8ccdc66d91b575e27ad89f32c5225804b15d1 Mon Sep 17 00:00:00 2001 From: zaqqwerty Date: Thu, 17 Feb 2022 02:56:09 -0600 Subject: [PATCH 5/5] address review comments --- tests/qmhl_loss_test.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/qmhl_loss_test.py b/tests/qmhl_loss_test.py index 8761c802..7d15e113 100644 --- a/tests/qmhl_loss_test.py +++ b/tests/qmhl_loss_test.py @@ -43,6 +43,7 @@ def setUp(self): self.tf_random_seed = 4 self.tf_random_seed_alt = 6 self.tfp_seed = tf.constant([3, 6], tf.int32) + # TODO(#190) self.num_samples = int(1e6) self.close_rtol = 2e-2 self.zero_atol = 2e-3 @@ -199,10 +200,10 @@ def test_loss_value_x_rot(self): actual_ktp = -actual_log_dm - tf.eye( 2, dtype=tf.complex64) * tf.cast(actual_log_partition, tf.complex64) - a = complex((test_thetas[0] * tf.math.cos(test_phis[0])).numpy(), 0) + a = (test_thetas[0] * tf.math.cos(test_phis[0])).numpy() + 0j b = 1j * (test_thetas[0] * tf.math.sin(test_phis[0])).numpy() c = -1j * (test_thetas[0] * tf.math.sin(test_phis[0])).numpy() - d = complex(-(test_thetas[0] * tf.math.cos(test_phis[0])).numpy(), 0) + d = -(test_thetas[0] * tf.math.cos(test_phis[0])).numpy() + 0j expected_ktp = tf.constant([[a, b], [c, d]], dtype=tf.complex64) self.assertAllClose(actual_ktp, expected_ktp, rtol=self.close_rtol) @@ -223,7 +224,7 @@ def test_loss_value_x_rot(self): # Load target data into a QuantumData class class FixedData(quantum_data.QuantumData): - """Contains a fixed quantumd data set.""" + """Contains a fixed quantum data set.""" def __init__(self, samples, q_infer): """Initializes a FixedData."""