diff --git a/examples/ex_inertial_gps.py b/examples/ex_inertial_gps.py index 0599be0f..f1d961cd 100644 --- a/examples/ex_inertial_gps.py +++ b/examples/ex_inertial_gps.py @@ -3,16 +3,16 @@ import matplotlib.pyplot as plt import numpy as np -import pynav.lib.datasets as datasets +from pynav.lib.datasets import SimulatedInertialGPSDataset from pynav.filters import ExtendedKalmanFilter, run_filter -from pynav.utils import GaussianResult, GaussianResultList, plot_error, randvec +from pynav.utils import GaussianResultList, plot_error, randvec np.set_printoptions(precision=3, suppress=True, linewidth=200) np.random.seed(0) # ############################################################################## # Create simulated IMU/GPS data -data = datasets.SimulatedInertialGPS() +data = SimulatedInertialGPSDataset() gt_states = data.get_ground_truth() input_data = data.get_input_data() meas_data = data.get_meas_data() @@ -31,14 +31,7 @@ estimate_list = run_filter(ekf, x0, P0, input_data, meas_data) # Postprocess the results and plot -results = GaussianResultList( - [ - GaussianResult(estimate_list[i], gt_states[i]) - for i in range(len(estimate_list)) - ] -) - -import seaborn as sns +results = GaussianResultList.from_estimates(estimate_list, gt_states) # ############################################################################## # Plot results @@ -51,7 +44,6 @@ plot_poses(gt_states, ax, line_color="tab:red", step=500, label="Groundtruth") ax.legend() -sns.set_theme() fig, axs = plot_error(results) axs[0, 0].set_title("Attitude") axs[0, 1].set_title("Velocity") diff --git a/examples/ex_inertial_nav.py b/examples/ex_inertial_nav.py index e0dda66f..592670f8 100644 --- a/examples/ex_inertial_nav.py +++ b/examples/ex_inertial_nav.py @@ -184,12 +184,7 @@ def input_profile(stamp: float, x: IMUState) -> np.ndarray: estimate_list = run_filter(ekf, x0, P0, input_list, invariants) # Postprocess the results and plot -results = GaussianResultList( - [ - GaussianResult(estimate_list[i], states_true[i]) - for i in range(len(estimate_list)) - ] -) +results = GaussianResultList.from_estimates(estimate_list, states_true) from pynav.utils import plot_poses import seaborn as sns diff --git a/examples/ex_iterated_ekf_se3.py b/examples/ex_iterated_ekf_se3.py index 02551267..dab9f30a 100644 --- a/examples/ex_iterated_ekf_se3.py +++ b/examples/ex_iterated_ekf_se3.py @@ -1,5 +1,5 @@ -import pynav.lib.datasets as datasets -from pynav.filters import ExtendedKalmanFilter, IteratedKalmanFilter +from pynav.lib.datasets import SimulatedPoseRangingDataset +from pynav.filters import IteratedKalmanFilter from pynav.utils import GaussianResult, GaussianResultList, plot_error, randvec from pynav.types import StateWithCovariance import time @@ -8,7 +8,7 @@ # ############################################################################## # Create simulated pose ranging data -data = datasets.SimulatedPoseRanging() +data = SimulatedPoseRangingDataset() gt_states = data.get_ground_truth() input_data = data.get_input_data() meas_data = data.get_meas_data() diff --git a/examples/ex_monte_carlo.py b/examples/ex_monte_carlo.py index 7aed4914..8d47f8ba 100644 --- a/examples/ex_monte_carlo.py +++ b/examples/ex_monte_carlo.py @@ -6,24 +6,22 @@ expected NEES are all automatically calculated for you. """ -from pynav.lib.states import SE3State -from pynav.lib.models import BodyFrameVelocity, RangePoseToAnchor -from pynav.datagen import DataGenerator -from pynav.filters import ExtendedKalmanFilter -from pynav.utils import ( +from pynav.lib import SE3State, BodyFrameVelocity, RangePoseToAnchor +from pynav import ( + DataGenerator, + ExtendedKalmanFilter, GaussianResult, GaussianResultList, monte_carlo, plot_error, + plot_nees, randvec, + run_filter ) -from pynav.types import StateWithCovariance -import time -from pylie import SE3 import numpy as np from typing import List -x0_true = SE3State(SE3.Exp([0, 0, 0, 0, 0, 0]), stamp=0.0) +x0_true = SE3State([0, 0, 0, 0, 0, 0], stamp=0.0) P0 = np.diag([0.1**2, 0.1**2, 0.1**2, 0.3**3, 0.3**2, 0.3**2]) Q = np.diag([0.01**2, 0.01**2, 0.01**2, 0.1, 0.1, 0.1]) process_model = BodyFrameVelocity(Q) @@ -47,6 +45,7 @@ def input_profile(t, x): ] dg = DataGenerator(process_model, input_profile, Q, 100, range_models, 10) +ekf = ExtendedKalmanFilter(process_model) def ekf_trial(trial_number: int) -> List[GaussianResult]: """ @@ -61,29 +60,8 @@ def ekf_trial(trial_number: int) -> List[GaussianResult]: state_true, input_data, meas_data = dg.generate(x0_true, 0, 10, noise=True) x0_check = x0_true.plus(randvec(P0)) - x = StateWithCovariance(x0_check, P0) - ekf = ExtendedKalmanFilter(process_model) - - meas_idx = 0 - y = meas_data[meas_idx] - results_list = [] - for k in range(len(input_data) - 1): - results_list.append(GaussianResult(x, state_true[k])) - - u = input_data[k] - - # Fuse any measurements that have occurred. - while y.stamp < input_data[k + 1].stamp and meas_idx < len(meas_data): - - x = ekf.correct(x, y, u) - meas_idx += 1 - if meas_idx < len(meas_data): - y = meas_data[meas_idx] - - dt = input_data[k + 1].stamp - x.stamp - x = ekf.predict(x, u, dt) - - return GaussianResultList(results_list) + estimates = run_filter(ekf, x0_check, P0, input_data, meas_data, True) + return GaussianResultList.from_estimates(estimates, state_true) # %% Run the monte carlo experiment @@ -94,32 +72,7 @@ def ekf_trial(trial_number: int) -> List[GaussianResult]: # %% Plot import matplotlib.pyplot as plt -import seaborn as sns - -sns.set_theme() - -fig, ax = plt.subplots(1, 1) -ax.plot(results.stamp, results.average_nees) -ax.plot(results.stamp, results.expected_nees, color="r", label="Expected NEES") -ax.plot( - results.stamp, - results.nees_lower_bound(0.99), - color="k", - linestyle="--", - label="99 percent c.i.", -) -ax.plot( - results.stamp, - results.nees_upper_bound(0.99), - color="k", - linestyle="--", -) -ax.set_title("{0}-trial average NEES".format(results.num_trials)) -ax.set_ylim(0, None) -ax.set_xlabel("Time (s)") -ax.set_ylabel("NEES") -ax.legend() - +fig, ax = plot_nees(results) if N < 15: diff --git a/examples/ex_ukf_se2.py b/examples/ex_ukf_se2.py index d7275607..ad3e695a 100644 --- a/examples/ex_ukf_se2.py +++ b/examples/ex_ukf_se2.py @@ -1,17 +1,13 @@ -from pynav.lib import SE3State, BodyFrameVelocity -from pynav.filters import SigmaPointKalmanFilter -from pynav.utils import GaussianResult, GaussianResultList, plot_error, randvec -from pynav.types import StateWithCovariance -from pynav.lib.datasets import SimulatedPoseRangingDataset +from pynav.lib import SE3State, BodyFrameVelocity, SimulatedPoseRangingDataset +from pynav import SigmaPointKalmanFilter, run_filter, GaussianResultList, plot_error, randvec import time -from pylie import SE3 import numpy as np -from typing import List + np.random.seed(0) # ############################################################################## # Problem Setup -x0 = SE3State(SE3.Exp([0, 0, 0, 0, 0, 0]), stamp=0.0, direction="right") +x0 = SE3State([0, 0, 0, 0, 0, 0], stamp=0.0, direction="right") P0 = np.diag([0.1**2, 0.1**2, 0.1**2, 1, 1, 1]) Q = np.diag([0.01**2, 0.01**2, 0.01**2, 0.1, 0.1, 0.1]) noise_active = True @@ -26,43 +22,19 @@ x0 = x0.plus(randvec(P0)) # %% ########################################################################### # Run Filter -x = StateWithCovariance(x0, P0) -ukf = SigmaPointKalmanFilter(process_model, method = 'cubature', iterate_mean=False) +ukf = SigmaPointKalmanFilter(process_model, method = 'unscented', iterate_mean=False) -meas_idx = 0 start_time = time.time() -y = meas_data[meas_idx] -results_list = [] -for k in range(len(input_data) - 1): - results_list.append(GaussianResult(x, state_true[k])) - - u = input_data[k] - - # Fuse any measurements that have occurred. - while y.stamp < input_data[k + 1].stamp and meas_idx < len(meas_data): - - x = ukf.correct(x, y, u) - meas_idx += 1 - if meas_idx < len(meas_data): - y = meas_data[meas_idx] - - dt = input_data[k + 1].stamp - x.stamp - x = ukf.predict(x, u, dt) - - - +estimates = run_filter(ukf, x0, P0, input_data, meas_data) print("Average filter computation frequency (Hz):") print(1 / ((time.time() - start_time) / len(input_data))) -results = GaussianResultList(results_list) +results = GaussianResultList.from_estimates(estimates, state_true) # ############################################################################## # Plot -import seaborn as sns import matplotlib.pyplot as plt - -sns.set_theme() fig, axs = plot_error(results) axs[-1][0].set_xlabel("Time (s)") axs[-1][1].set_xlabel("Time (s)") diff --git a/examples/ex_ukf_vector.py b/examples/ex_ukf_vector.py index 0b5f58fb..33e61117 100644 --- a/examples/ex_ukf_vector.py +++ b/examples/ex_ukf_vector.py @@ -1,9 +1,5 @@ -from pynav.filters import ExtendedKalmanFilter, SigmaPointKalmanFilter -from pynav.lib.states import VectorState -from pynav.datagen import DataGenerator -from pynav.types import StateWithCovariance -from pynav.utils import GaussianResult, GaussianResultList, randvec -from pynav.lib.models import SingleIntegrator, RangePointToAnchor +from pynav.lib.models import SingleIntegrator, RangePointToAnchor, VectorState +from pynav import run_filter, randvec, GaussianResultList, SigmaPointKalmanFilter, DataGenerator, plot_error import numpy as np from typing import List import time @@ -51,47 +47,20 @@ if noise_active: x0 = x0.plus(randvec(P0)) -x = StateWithCovariance(x0, P0) ukf = SigmaPointKalmanFilter(process_model, method= 'cubature', iterate_mean=False) -# ukf = IteratedKalmanFilter(process_model) # or try the Iukf! -meas_idx = 0 start_time = time.time() -y = meas_data[meas_idx] -results_list = [] -for k in range(len(input_data) - 1): - - u = input_data[k] - - # Fuse any measurements that have occurred. - while y.stamp < input_data[k + 1].stamp and meas_idx < len(meas_data): - - x = ukf.correct(x, y, u) - - # Load the next measurement - meas_idx += 1 - if meas_idx < len(meas_data): - y = meas_data[meas_idx] - - - dt = input_data[k + 1].stamp - x.state.stamp - x = ukf.predict(x, u, dt) - - results_list.append(GaussianResult(x, gt_data[k+1])) - +estimates = run_filter(ukf, x0, P0, input_data, meas_data) print("Average filter computation frequency (Hz):") print(1 / ((time.time() - start_time) / len(input_data))) +results = GaussianResultList.from_estimates(estimates, gt_data) # ############################################################################## # Post processing -results = GaussianResultList(results_list) import matplotlib.pyplot as plt -import seaborn as sns - -sns.set_theme() fig, ax = plt.subplots(1, 1) ax.plot(results.value[:, 0], results.value[:, 1], label="Estimate") ax.plot( @@ -102,16 +71,6 @@ ax.set_ylabel("y (m)") ax.legend() -fig, axs = plt.subplots(2, 1) -axs: List[plt.Axes] = axs -for i in range(len(axs)): - axs[i].fill_between( - results.stamp, - results.three_sigma[:, i], - -results.three_sigma[:, i], - alpha=0.5, - ) - axs[i].plot(results.stamp, results.error[:, i]) -axs[0].set_title("Estimation error") -axs[1].set_xlabel("Time (s)") +fig, axs = plot_error(results) +fig.suptitle("Estimation Error") plt.show() diff --git a/examples/ex_varying_noise.py b/examples/ex_varying_noise.py index 6fff6c5d..fd9e88c6 100644 --- a/examples/ex_varying_noise.py +++ b/examples/ex_varying_noise.py @@ -1,12 +1,19 @@ # %% -from pynav.filters import ExtendedKalmanFilter -from pynav.lib.states import VectorState -from pynav.datagen import DataGenerator -from pynav.types import StateWithCovariance -from pynav.utils import GaussianResult, GaussianResultList, monte_carlo, plot_error, randvec +from pynav.lib import VectorState, DoubleIntegrator, RangePointToAnchor +from pynav import ( + run_filter, + monte_carlo, + GaussianResult, + GaussianResultList, + plot_error, + plot_nees, + randvec, + DataGenerator, + ExtendedKalmanFilter -from pynav.lib.models import DoubleIntegrator, OneDimensionalPositionVelocityRange + +) import numpy as np from typing import List from matplotlib import pyplot as plt @@ -20,14 +27,14 @@ """ -x0_true = VectorState(np.array([1, 0])) +x0_true = VectorState(np.array([1, 0]), stamp=0.0) P0 = np.diag([0.5, 0.5]) R = 0.01**2 Q = 0.1 * np.identity(1) -N = 100 # Number MC trials +N = 10 # Number MC trials range_models = [ - OneDimensionalPositionVelocityRange(R), + RangePointToAnchor([0.0], R), ] @@ -70,6 +77,7 @@ def covariance(self, x, u, dt) -> np.ndarray: ) +ekf = ExtendedKalmanFilter(process_model) def ekf_trial(trial_number:int) -> List[GaussianResult]: """ @@ -81,46 +89,17 @@ def ekf_trial(trial_number:int) -> List[GaussianResult]: state_true, input_data, meas_data = dg.generate(x0_true, 0, t_max, noise=True) x0_check = x0_true.plus(randvec(P0)) - x = StateWithCovariance(x0_check, P0) - - - meas_idx = 0 - y = meas_data[meas_idx] - results_list = [] - ekf = ExtendedKalmanFilter(process_model) - for k in range(len(input_data) - 1): - - results_list.append(GaussianResult(x, state_true[k])) - u = input_data[k] - - # Fuse any measurements that have occurred. - while y.stamp < input_data[k + 1].stamp and meas_idx < len(meas_data): - x = ekf.correct(x, y, u) - meas_idx += 1 - if meas_idx < len(meas_data): - y = meas_data[meas_idx] - - dt = input_data[k + 1].stamp - x.stamp - x = ekf.predict(x, u, dt=dt) + estimates = run_filter(ekf, x0_check, P0, input_data, meas_data, True) - return GaussianResultList(results_list) + return GaussianResultList.from_estimates(estimates, state_true) # %% Run the monte carlo experiment results = monte_carlo(ekf_trial, N) # %% Plot -fig, ax = plt.subplots(1,1) -ax.plot(results.stamp, results.average_nees) -ax.plot(results.stamp, results.expected_nees, color = 'r', label = "Expected NEES") -ax.plot(results.stamp, results.nees_lower_bound(0.99), color='k', linestyle="--", label="99 percent c.i.") -ax.plot(results.stamp, results.nees_upper_bound(0.99), color='k', linestyle="--",) -ax.set_title("{0}-trial average NEES".format(results.num_trials)) -ax.set_ylim(0,None) -ax.set_xlabel("Time (s)") -ax.set_ylabel("NEES") -ax.legend() +fig, ax = plot_nees(results) if N < 15: diff --git a/pynav/filters.py b/pynav/filters.py index 6846f9c5..c599aa50 100644 --- a/pynav/filters.py +++ b/pynav/filters.py @@ -807,6 +807,7 @@ def run_filter( P0: np.ndarray, input_data: List[Input], meas_data: List[Measurement], + disable_progress_bar: bool = False, ) -> List[StateWithCovariance]: """ Executes a predict-correct-style filter given lists of input and measurement @@ -849,7 +850,7 @@ def run_filter( y = meas_data[meas_idx] results_list = [] - for k in tqdm(range(len(input_data) - 1)): + for k in tqdm(range(len(input_data) - 1), disable=disable_progress_bar): u = input_data[k] # Fuse any measurements that have occurred. if len(meas_data) > 0: diff --git a/pynav/utils.py b/pynav/utils.py index 2c09b8f4..eb81e092 100644 --- a/pynav/utils.py +++ b/pynav/utils.py @@ -315,10 +315,12 @@ def __init__(self, trial_results: List[GaussianResultList]): self.average_nees: np.ndarray = np.average( np.array([t.nees for t in trial_results]), axis=0 ) + self.nees = self.average_nees #:numpy.ndarray with shape (N,): average EES throughout trajectory self.average_ees: np.ndarray = np.average( np.array([t.ees for t in trial_results]), axis=0 ) + self.ees = self.average_ees #:numpy.ndarray with shape (N,dof): root-mean-squared error of each component self.rmse: np.ndarray = np.sqrt( np.average( @@ -579,6 +581,8 @@ def plot_error( ) axs[i].plot(results.stamp, results.error[:, i], label=label, **kwargs) + + fig: plt.Figure = fig # For type hinting return fig, axs_og def plot_nees(