From 1d07d6e7d9cfca2d1513cf3c4511d67cb350f813 Mon Sep 17 00:00:00 2001 From: edwardlavender Date: Mon, 10 Feb 2025 11:58:14 +0100 Subject: [PATCH] Add batching (& boost speed of smoother) --- Project.toml | 1 + src/001-utilities.jl | 14 +- src/003-states.jl | 2 +- src/004-model-movement.jl | 25 +- src/008-particle-filter-initialise.jl | 2 +- src/009-particle-filter.jl | 407 ++++++++++++++------------ src/010-particle-smoother.jl | 356 ++++++++++++++++------ src/013-R-from-Julia.jl | 2 +- 8 files changed, 525 insertions(+), 284 deletions(-) diff --git a/Project.toml b/Project.toml index acf780a..09c87fb 100644 --- a/Project.toml +++ b/Project.toml @@ -12,6 +12,7 @@ Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" DimensionalData = "0703355e-b756-11e9-17c0-8b28908087d0" Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f" GeoArrays = "2fb1d81b-e6a0-5fc5-82e6-8e06903437ab" +JLD2 = "033835bb-8acc-5ee8-8aae-3f567f8a3819" LibGEOS = "a90b1aa1-3769-5649-ba7e-abc5a9d163eb" LogExpFunctions = "2ab3a3ac-af41-5b50-aa03-7779005ae688" OrderedCollections = "bac558e1-5e72-5ebc-8fee-abe8a469f55d" diff --git a/src/001-utilities.jl b/src/001-utilities.jl index ce913a8..f1e49c5 100644 --- a/src/001-utilities.jl +++ b/src/001-utilities.jl @@ -69,4 +69,16 @@ function check_timeline(t_sim::Vector{Dates.DateTime}, t_obs::Vector) check_timeline_entries(t_sim, t_obs) check_timeline_spacing(t_sim) nothing -end \ No newline at end of file +end + +######################### +######################### +#### Batching + +# Define a Vector of indices for each chunk +# * This function is inspired by parallel::splitIndices() in R +function split_indices(indices::Vector{Int}, n_chunk::Int) + div, rem = divrem(length(indices), n_chunk) + splits = [indices[(i-1)*div + min(i-1, rem) + 1 : i*div + min(i, rem)] for i in 1:n_chunk] + return splits +end \ No newline at end of file diff --git a/src/003-states.jl b/src/003-states.jl index 2e4e4fe..1581c3b 100644 --- a/src/003-states.jl +++ b/src/003-states.jl @@ -49,7 +49,7 @@ To use a new `State` sub-type in the simulation of animal movements (via [`simul - Define a corresponding [`ModelMove`](@ref) sub-type; - (optional) Define `Patter.map_init()` and `Patter.states_init()` methods for [`simulate_states_init()`](@ref) to simulate initial states; - Define a [`Patter.simulate_step()`](@ref) method (for [`Patter.simulate_move()`](@ref)) to update the state using a [`ModelMove`](@ref) instance (in [`simulate_path_walk()`](@ref) and [`particle_filter()`](@ref)); -- Define a [`Patter.logpdf_step()`](@ref) method (for [`Patter.logpdf_move()`](@ref)) to evaluate the probability density of movement from one state to another (in [`two_filter_smoother()`](@ref)); +- Define a [`Patter.logpdf_step()`](@ref) method (for [`Patter.logpdf_move()`](@ref)) to evaluate the probability density of movement from one state to another (in [`particle_smoother_two_filter()`](@ref)); """ abstract type State end diff --git a/src/004-model-movement.jl b/src/004-model-movement.jl index 5804146..de965d9 100644 --- a/src/004-model-movement.jl +++ b/src/004-model-movement.jl @@ -60,7 +60,7 @@ To use a new [`ModelMove`](@ref) sub-type in the simulation of animal movements - Define a corresponding [`State`](@ref) sub-type; - (optional) Define `Patter.map_init()` and `Patter.states_init()` methods for [`simulate_states_init()`](@ref) to simulate initial states; - Define a [`Patter.simulate_step()`](@ref) method (for [`Patter.simulate_move()`](@ref)) to update the state using a [`ModelMove`](@ref) instance (in [`simulate_path_walk()`](@ref) and [`particle_filter()`](@ref)); -- Define a [`Patter.logpdf_step()`](@ref) method (for [`Patter.logpdf_move()`](@ref)) to evaluate the probability density of movement from one state to another (in [`two_filter_smoother()`](@ref)); +- Define a [`Patter.logpdf_step()`](@ref) method (for [`Patter.logpdf_move()`](@ref)) to evaluate the probability density of movement from one state to another (in [`particle_smoother_two_filter()`](@ref)); """ abstract type ModelMove end @@ -268,7 +268,7 @@ end # Details -[`Patter.logpdf_step()`](@ref) is an internal generic function that evaluates the (unnormalised) log probability of an (unrestricted) movement step between two [`State`](@ref)(s) (i.e., locations). Methods are provided for the built-in [`State`](@ref) and [`ModelMove`](@ref) sub-types, but need to be provided for custom sub-types. Internally, [`Patter.logpdf_step()`](@ref) is wrapped by [`Patter.logpdf_move()`](@ref), which evaluates the log probability of movement between two [`State`](@ref)s, accounting for restrictions to movement; that is, [`Patter.logpdf_move()`](@ref) evaluates `logpdf_step(state_from, state_to, model_move, length, heading) + log(abs(determinate)) - log(Z)` where `Z` is the normalisation constant. This is required for particle smoothing (see [`two_filter_smoother()`](@ref)). +[`Patter.logpdf_step()`](@ref) is an internal generic function that evaluates the (unnormalised) log probability of an (unrestricted) movement step between two [`State`](@ref)(s) (i.e., locations). Methods are provided for the built-in [`State`](@ref) and [`ModelMove`](@ref) sub-types, but need to be provided for custom sub-types. Internally, [`Patter.logpdf_step()`](@ref) is wrapped by [`Patter.logpdf_move()`](@ref), which evaluates the log probability of movement between two [`State`](@ref)s, accounting for restrictions to movement; that is, [`Patter.logpdf_move()`](@ref) evaluates `logpdf_step(state_from, state_to, model_move, length, heading) + log(abs(determinate)) - log(Z)` where `Z` is the normalisation constant. This is required for particle smoothing (see [`particle_smoother_two_filter()`](@ref)). # Returns @@ -280,7 +280,7 @@ end * [`Patter.simulate_step()`](@ref) and [`Patter.simulate_move()`](@ref) to simulate new [`State`](@ref)s; * [`Patter.logpdf_step()`](@ref) and [`Patter.logpdf_move()`](@ref) to evaluate the log-probability of movement between two locations; * [`Patter.logpdf_move_normalisation()`](@ref) for estimation of the normalisation constant; -* [`two_filter_smoother()`](@ref) for the front-end function that uses these routines for particle smoothing; +* [`particle_smoother_two_filter()`](@ref) for the front-end function that uses these routines for particle smoothing; """ function logpdf_step end @@ -333,7 +333,7 @@ end # Details -[`Patter.logpdf_move()`](@ref) is an internal function that evaluates the log probability of a movement step between two [`State`](@ref)(s) (i.e., locations). This function wraps [`Patter.logpdf_step()`](@ref), accounting for accounting for restrictions to movement; that is, [`Patter.logpdf_move()`](@ref) evaluates `logpdf_step(state_from, state_to, model_move, t, length, heading) + log(abs(determinate)) - log(Z)` where `Z` is the normalisation constant. If `model_move` is 'horizontal (e.g., `state_from` and `state_to` are two-dimensional, `StateXY` instances), a 'validity map' (`vmap`) can be provided. This is a `GeoArray` that define the regions within which movements between two locations are always legal. In the case of an aquatic animal, this is the region of the study area that is the sea, shrunk by `state_from.mobility`. In this instance, the normalisation constant is simply `log(1.0)`. Otherwise, a Monte Carlo simulation of `n_sim` iterations is required to approximate the normalisation constant, accounting for invalid movements, which is more expensive (see [`logpdf_move_normalisation()`](@ref)). [`Patter.logpdf_move()`](@ref) is used for particle smoothing (see [`two_filter_smoother()`](@ref)). +[`Patter.logpdf_move()`](@ref) is an internal function that evaluates the log probability of a movement step between two [`State`](@ref)(s) (i.e., locations). This function wraps [`Patter.logpdf_step()`](@ref), accounting for accounting for restrictions to movement; that is, [`Patter.logpdf_move()`](@ref) evaluates `logpdf_step(state_from, state_to, model_move, t, length, heading) + log(abs(determinate)) - log(Z)` where `Z` is the normalisation constant. If `model_move` is 'horizontal (e.g., `state_from` and `state_to` are two-dimensional, `StateXY` instances), a 'validity map' (`vmap`) can be provided. This is a `GeoArray` that define the regions within which movements between two locations are always legal. In the case of an aquatic animal, this is the region of the study area that is the sea, shrunk by `state_from.mobility`. In this instance, the normalisation constant is simply `log(1.0)`. Otherwise, a Monte Carlo simulation of `n_sim` iterations is required to approximate the normalisation constant, accounting for invalid movements, which is more expensive (see [`logpdf_move_normalisation()`](@ref)). Set `n_sim = 0` and `cache = nothing` for unrestricted models (i.e., if [`particle_filter()`](@ref) was implemented with `n_move = 1`). [`Patter.logpdf_move()`](@ref) is used for particle smoothing (see [`particle_smoother_two_filter()`](@ref)). # Returns @@ -345,7 +345,7 @@ end * [`Patter.simulate_step()`](@ref) and [`Patter.simulate_move()`](@ref) to simulate new [`State`](@ref)s; * [`Patter.logpdf_step()`](@ref) and [`Patter.logpdf_move()`](@ref) to evaluate the log-probability of movement between two locations; * [`Patter.logpdf_move_normalisation()`](@ref) for estimation of the normalisation constant; -* [`two_filter_smoother()`](@ref) for the front-end function that uses these routines for particle smoothing; +* [`particle_smoother_two_filter()`](@ref) for the front-end function that uses these routines for particle smoothing; """ function logpdf_move(state_from::State, state_to::State, state_zdim::Bool, @@ -404,11 +404,11 @@ end - `model_move`: A [`ModelMove`](@ref) instance; - `t`: An integer that defines the time step; - `vmap`: (optional) A `GeoArray` that maps the region within which movements from `state` are always legal. Valid regions must equal 1. `vmap` can be provided for 'horizontal' movement models (e.g., if `state` is a `StateXY`); -- `n_sim`: An integer that defines the number of Monte Carlo simulations; +- `n_sim`: An integer that defines the number of Monte Carlo simulations. Set `n_sim = 0` if `n_move = 1` in [`particle_filter()`](@ref); # Details -This internal function computes the normalisation constant for the (log) probability of movement from one [`State`](@ref) (`state`) into another (required to account for the truncation of the movement model by land). If `model_move` is 'horizontal (e.g., `state` is a two-dimensional, `StateXY` instance), a 'validity map' (`vmap`) can be provided. This is a `GeoArray` that define the regions within which movements from that `state` are always legal. In the case of an aquatic animal, this is the region of the study area that is the sea, shrunk by `state.mobility`. In this instance, the normalisation constant is simply `log(1.0)`. Otherwise, a Monte Carlo simulation of `n_sim` iterations is used to estimate the normalisation constant. A Beta(1, 1) prior is used to correct for simulations that fail to generate valid move from `state`. This function is used by [`Patter.logpdf_move()`](@ref) to evaluate the (log) probability of movement between two states, which is required for particle smoothing (see [`two_filter_smoother()`](@ref)). +This internal function computes the normalisation constant for the (log) probability of movement from one [`State`](@ref) (`state`) into another. If `n_move = 1` in [`particle_filter()`](@ref), set `n_sim = 0` to return `log(1.0)`. Otherwise, we need to account for the truncation of the movement model by land. If `model_move` is 'horizontal' (e.g., `state` is a two-dimensional, `StateXY` instance), a 'validity map' (`vmap`) can be provided. This is a `GeoArray` that define the regions within which movements from that `state` are always legal. In the case of an aquatic animal, this is the region of the study area that is the sea, shrunk by `state.mobility`. In this instance, the normalisation constant is simply `log(1.0)`. Otherwise, a Monte Carlo simulation of `n_sim` iterations is used to estimate the normalisation constant. A Beta(1, 1) prior is used to correct for simulations that fail to generate valid move from `state`. This function is used by [`Patter.logpdf_move()`](@ref) to evaluate the (log) probability of movement between two states, which is required for particle smoothing (see [`particle_smoother_two_filter()`](@ref)). # Returns @@ -420,11 +420,18 @@ This internal function computes the normalisation constant for the (log) probabi * [`Patter.simulate_step()`](@ref) and [`Patter.simulate_move()`](@ref) to simulate new [`State`](@ref)s; * [`Patter.logpdf_step()`](@ref) and [`Patter.logpdf_move()`](@ref) to evaluate the log-probability of movement between two locations; * [`Patter.logpdf_move_normalisation()`](@ref) for estimation of the normalisation constant; -* [`two_filter_smoother()`](@ref) for the front-end function that uses these routines for particle smoothing; +* [`particle_smoother_two_filter()`](@ref) for the front-end function that uses these routines for particle smoothing; """ function logpdf_move_normalisation(state::State, state_zdim::Bool, model_move::ModelMove, t::Int, vmap::Union{GeoArray, Nothing}, n_sim::Int) + # (A) Set normalisation constant to zero for n_sim = 0 + # * This corresponds to n_move = 1 in particle_filter() + if n_sim == 0 + return 0.0 + end + + # (B) Extract normalisation constant from vmap, if possible if !isnothing(vmap) && isone(extract(vmap, state.x, state.y)) # (A) Use vmap # * vmap may be supplied for 'horizontal' (2D) movement models @@ -433,7 +440,7 @@ function logpdf_move_normalisation(state::State, state_zdim::Bool, model_move::M log_z = 0.0 else - # (B) Compute normalisation constant via simulation + # (C) Compute normalisation constant via simulation # Run simulation k = 0.0 for i in 1:n_sim diff --git a/src/008-particle-filter-initialise.jl b/src/008-particle-filter-initialise.jl index 1c25e14..d51a056 100644 --- a/src/008-particle-filter-initialise.jl +++ b/src/008-particle-filter-initialise.jl @@ -49,7 +49,7 @@ These functions support the simulation of initial states for animal movement wal If `xinit = nothing`, initial coordinates are sampled from `map`. -The region(s) within `map` from which initial coordinates are sampled can be optionally restricted by the provision of the observation datasets and the associated model sub-types (via [`Patter.map_init_iter()`](@ref)). This option does not apply to [`simulate_path_walk()`](@ref) but is used in [`particle_filter()`](@ref) where observation models are used. In this instance, [`Patter.map_init_iter()`](@ref) iterates over each model and uses the `Patter.map_init()` method to update `map`. The following methods are implemented: +The region(s) within `map` from which initial coordinates are sampled can be optionally restricted by the provision of the observation datasets and the associated model sub-types (via `Patter.map_init_iter()`). This option does not apply to [`simulate_path_walk()`](@ref) but is used in [`particle_filter()`](@ref) where observation models are used. In this instance, `Patter.map_init_iter()` iterates over each model and uses the `Patter.map_init()` method to update `map`. The following methods are implemented: - Default. The default method returns `map` unchanged. - `model_obs_type::ModelObsAcousticLogisTrunc`. This method uses acoustic observations to restrict `map` via Lavender et al.'s ([2023](https://doi.org/10.1111/2041-210X.14193)) acoustic--container algorithm. The function identifies the receiver(s) that recorded detection(s) immediately before, at and following the first time step (`timeline[start]`, where `start` is `1` if `direction = "forward"` and `length(timeline)` otherwise). The 'container' within which the individual must be located from the perspective of each receiver is defined by the time difference and the individual's mobility (that is, the maximum moveable distance the individual could move between two time steps), which must be specified in `model_move.mobility`. The intersection between all containers defines the possible locations of the individual at the first time step. - `model_obs_type::ModelObsDepthUniformSeabed`. This method uses the depth observations to restrict `map` (which should represent the bathymetry in a region). The individual must be within a region in which the observed depth at `timeline[start]` is within a depth envelope around the bathymetric depth defined by the parameters `depth_shallow_eps` and `depth_deep_eps` (see [`ModelObs`](@ref)). (If there is no observation at `timeline[start]`, `map` is returned unchanged.) diff --git a/src/009-particle-filter.jl b/src/009-particle-filter.jl index f271b85..b479c94 100644 --- a/src/009-particle-filter.jl +++ b/src/009-particle-filter.jl @@ -1,18 +1,19 @@ -using Random -using Dates -using LogExpFunctions: logsumexp using Base.Threads: @threads +using Dates using ProgressMeter: @showprogress +using JLD2 +using LogExpFunctions: logsumexp +using Random -export particle_filter, particle_filter_iter +export particle_filter """ - Particles(states::Matrix, diagnostics::DataFrame, callstats::DataFrame) + Particles(states::Union{Nothing, Matrix{<:State}}, diagnostics::DataFrame, callstats::DataFrame) # Fields -- `states`: A `Matrix` of [`State`](@ref)s: +- `states`: (optional) A `Matrix` of [`State`](@ref)s: - Each row corresponds to a particle; - Each column corresponds to the `timestep`; - `diagnostics`: A `DataFrame` of algorithm diagnostics: @@ -25,31 +26,38 @@ export particle_filter, particle_filter_iter - routine: A `String` that defines the algorithm; - n_particle: An `Int` that defines the number of particles; - n_iter: An `Int` or `NaN` that defines the number of iterations (trials); - - convergence: A `Boolian` that defines whether or not the algorithm reached the end of the `timeline`; + - convergence: A `Boolian` that defines convergence; - time: A `Float64` that defines the duration (s) of the function call; +# Details + +* `states` is `nothing` if [`particle_filter()`](@ref) or [`particle_smoother_two_filter()`](@ref) are implemented with `batch`ing. +* `convergence` is defined as follows: + - In [`particle_filter()`](@ref), `convergence` defines whether or not the filter reached the end of the `timeline`; + - In [`particle_smoother_two_filter()`](@ref), `convergence` defines whether or not correct smoothing was achieved on at least 95 % of time steps. 'Correct smoothing' is possible when there at at least some valid moves between the subset of recorded particles on the backward filter and those on the forward filter (for the previous time step). + # See also `Particles` objects are returned by: - - [`particle_filter()`](@ref) - - [`two_filter_smoother()`](@ref) +- [`particle_filter()`](@ref) +- [`particle_smoother_two_filter()`](@ref) """ struct Particles - states::Matrix{<:State} + states::Union{Nothing, Matrix{<:State}} diagnostics::DataFrame callstats::DataFrame end -# Create 'particles' structure in particle_filter() or two_filter_smoother() +# Create 'particles' structure in particle_filter() or particle_smoother_two_filter() function particulate(routine::String, timestamp::Dates.DateTime, timeline::Vector{Dates.DateTime}, - states::Matrix{<:State}, + states::Union{Nothing, Matrix{<:State}}, ess::Vector{Float64}, maxlp::Vector{Float64}, n_particle::Int, - n_iter::Union{Int, Float64}, # NaN in two_filter_smoother() + n_iter::Union{Int, Float64}, # NaN in particle_smoother_two_filter() convergence::Bool) diagnostics = DataFrame(timestep = collect(1:length(timeline)), @@ -81,7 +89,7 @@ end # Details -This is an internal function that implements systematic resampling in the particle filter (see [`particle_filter()`](@ref)) and smoothing algorithms (see [`two_filter_smoother()`](@ref)). Note that for large `n`, the function is not numerically stable. +This is an internal function that implements systematic resampling in the particle filter (see [`particle_filter()`](@ref)) and smoothing algorithms (see [`particle_smoother_two_filter()`](@ref)). Note that for large `n`, the function is not numerically stable. # Returns @@ -126,91 +134,20 @@ function resample(w::Vector{Float64}, n::Int = length(w)) end -""" -# Particle filter - -A particle filtering algorithm that samples from `f(X_t | {Y_1 ... Y_t}) for t ∈ 1:t`. - -# Arguments (keywords) - -- `timeline`: A `Vector{DateTime}` of ordered, regularly spaced time stamps that defines the time steps for the simulation; -- `xinit`: A `Vector` of [`State`](@ref) instances that defines the initial state(s) of the animal; -- `yobs`: A Dictionary of observations: - - Dictionary keys should match elements in `timeline`; - - Each element must be a `Vector` of `Tuple`s for that time step (one for each observation/sensor); - - Each `Tuple` should contain (a) the observation and (b) the model parameters (that is, a [`ModelObs`](@ref) instance); -- `model_move`: A [`ModelMove`](@ref) instance: - - The movement model describes movement from one time step to the next and therefore depends implicitly on the resolution of `timeline`; - - The movement model should align with the [`State`](@ref) instances in `xinit`. For example, a two-dimensional state (`StateXY`) requires a corresponding movement model instance (i.e., `ModelMoveXY`); -- `n_move`: An integer that defines the number of attempts used to find a legal move; - - All [`ModelMove`](@ref) sub-types contain a `map` field that defines the region(s) within which movements are allowed (see [`is_valid()`](@ref)); - - Each particle is moved up to `n_move` times, until a valid movement is simulated; - - Particles that fail to generate a valid move are killed; -- `n_record`: An integer that defines the number of particles to record at each time step: - - `n_record` particles are resampled at each time step and recorded in memory; -- `n_resample`: A number that defines the effective sample size for resampling: - - Particles are resampled when the effective sample size <= `n_resample`; -- `t_resample`: `nothing`, an `integer` or a Vector of `integer`s that define the time step(s) at which to force resampling; - - Particles are resampled at `t_resample` regardless of the effective sample size; -- `direction:` A `String` that defines the direction of the filter: - - `"forward"` runs the filter forwards in time; - - `"backward"` runs the filter backwards in time; - -# Algorithm - -## Initiation - -The algorithm is initiated using a `Vector` of `n_particle` [`State`](@ref)s (`xinit`). See [`simulate_states_init()`](@ref) to simulate initial states for the filter. - -## Movement - -For every time step in the `timeline`, the internal function [`Patter.simulate_move()`](@ref) simulates the movement of particles away from previous [`State`](@ref)s into new [`State`](@ref)s using the movement model, as specified by `model_move`. [`Patter.simulate_move()`](@ref) is an iterative wrapper for a [`Patter.simulate_step()`](@ref) method that simulates a new [`State`](@ref) instance from the previous [`State`](@ref). [`Patter.simulate_move()`](@ref) implements [`Patter.simulate_step()`](@ref) iteratively until a legal move is found (or `n_move` is reached). For custom [`State`](@ref) or [`ModelObs`](@ref) sub-types, a corresponding [`Patter.simulate_step()`](@ref) method is required. Illegal moves are those that land in `NaN` locations on the `map` or, in the case of [`State`](@ref)s that include a depth (`z`) component, are below the depth of the seabed (see [`is_valid()`](@ref)). Particles that fail to generate legal moves are eventually killed by re-sampling (see below). - -## Likelihood - -Observations are used to weight simulated particles. To simulate observations for filtering, use [`simulate_yobs()`](@ref). To assemble real-world observations for filtering, see [`assemble_yobs()`](@ref). For each valid [`State`](@ref) and time stamp in `yobs`, the log-probability of each observation, given the [`State`](@ref), is evaluated via `Patter.logpdf_obs()`. For custom [`State`](@ref) or [`ModelObs`](@ref) sub-types, a corresponding `Patter.logpdf_obs()` method is required. The maximum weight across all particles (`maxlp`) is recorded at each time step as an algorithm diagnostic. (This metric can be intepreted as the maximum log-posterior if resampling is implemented at every time step.) - -## Resampling - -Particles are periodically re-sampled, with replacement, using the low-variance systematic re-sampling algorithm (via [`Patter.resample()`](@ref)), at time steps in `t_resample` or when the effective sample size is less than or equal to `n_resample`. This has the effect of eliminating impossible particles and duplicating likely ones. - -The algorithm continues in this way, iterating over the `timeline`, simulating, weighting and (re)sampling particles. At each time step, `n_record` particles are saved in memory. If the function fails to converge, a warning is returned alongside the outputs up to that time step. Otherwise, the function will continue to the end of the time series. - -## Multi-threading - -The iteration over particles (i.e., simulated movements and likelihood evaluations) are multi-threaded. - -## Convergence and diagnostics - -Algorithm convergence is not guaranteed. The algorithm may reach a dead-end---a time step at which there are no valid locations into which the algorithm can step. This may be due to data errors, incorrect assumptions, insufficient sampling effort or poor tuning-parameter settings. - -# Returns - -- A [`Particles`](@ref) structure; - -# See also - -* [`State`](@ref), [`ModelMove`](@ref) and [`ModelObs`](@ref) for [`State`](@ref), movement model and observation model sub-types; -- [`simulate_yobs()`](@ref) and [`assemble_yobs()`](@ref) to prepare observations for the particle filter; -* [`Patter.simulate_step()`](@ref) and [`Patter.simulate_move()`](@ref) for the internal routines used to simulate new [`State`](@ref)s; -* `Patter.logpdf_obs()` methods to evaluate the log probability of observations; -* [`two_filter_smoother()`](@ref) to implement particle smoothing; - -""" -function particle_filter( +# Internal particle filter function +function _particle_filter( ; timeline::Vector{DateTime}, - xinit::Vector, + xinit::Vector{<:State}, yobs::Dict, model_move::ModelMove, n_move::Int = 100_000, n_record::Int = 1000, n_resample::Float64 = Float64(n_record), t_resample::Union{Nothing, Int, Vector{Int}} = nothing, - direction::String = "forward") + direction::String = "forward", + batch::Union{Nothing, Vector{String}} = nothing) #### Define essential parameters - # Call start - call_start = now() # Number of time steps nt = length(timeline) # Number of particles @@ -229,127 +166,233 @@ function particle_filter( error("The number of initial particles in `xinit` ($np) must be >= the number of recorded partices in `n_record` ($nr).") end + #### Handle batches + do_batch = !isnothing(batch) + nb = do_batch ? length(batch) : 1 + #### Define filter direction if direction == "forward" start = 1; finish = nt; timesteps = start:finish + if do_batch + batch = sort(batch) + end elseif direction == "backward" start = nt; finish = 1; timesteps = start:-1:finish + if do_batch + batch = sort(batch, rev = true) + end else error("`direction` must be \"forward\" or \"backward\".") end + # Batch time steps + timesteps = collect(timesteps) + timesteps_by_batch = split_indices(timesteps, nb) - #### Define particle objects + #### Define filter # Particles xpast = deepcopy(xinit) xnow = deepcopy(xinit) - xout = Matrix{eltype(xinit)}(undef, nr, nt); + xout = Matrix{eltype(xinit)}(undef, nr, length(timesteps_by_batch[1])) # (log) weights lw = zeros(np) - # Output particles - xout = Matrix{eltype(xinit)}(undef, nr, nt); - - #### Define diagnostic objects # Output ESS vector - ess = zeros(nt) + ess = fill(NaN, nt) # Output maxlp vector - maxlp = zeros(nt) + maxlp = fill(NaN, nt) #### Run filter - @showprogress desc = "Running filter..." for t in timesteps - - # println(t) - - #### Move particles & compute weights - # * We iterate once over particles b/c this is thread safe - timestamp = timeline[t] - has_obs_at_timestamp = haskey(yobs, timestamp) - @threads for i in 1:np - if isfinite(lw[i]) - # Move particles - if t != start - xnow[i], lwi = simulate_move(xpast[i], model_move, t, n_move) - lw[i] += lwi - end - # Evaluate likelihoods - if has_obs_at_timestamp && isfinite(lw[i]) - for (obs, model) in yobs[timestamp] - lw[i] += logpdf_obs(xnow[i], model, t, obs) + for b in 1:nb + + # Define output particles object + xout = Matrix{eltype(xinit)}(undef, nr, length(timesteps_by_batch[b])) + + # Define time steps and indicies for iteration + timesteps_for_batch = timesteps_by_batch[b] + indices_for_batch = collect(1:length(timesteps_for_batch)) + if direction == "backward" + indices_for_batch = reverse(indices_for_batch) + end + + # Run filter + @showprogress desc = "Running filter..." for (i, t) in zip(indices_for_batch, timesteps_for_batch) + + # println(t) + + #### Move particles & compute weights + # * We iterate once over particles b/c this is thread safe + timestamp = timeline[t] + has_obs_at_timestamp = haskey(yobs, timestamp) + @threads for j in 1:np + if isfinite(lw[j]) + # Move particles + if t != start + xnow[j], lwi = simulate_move(xpast[j], model_move, t, n_move) + lw[j] += lwi + end + # Evaluate likelihoods + if has_obs_at_timestamp && isfinite(lw[j]) + for (obs, model) in yobs[timestamp] + lw[j] += logpdf_obs(xnow[j], model, t, obs) + end end end end - end - #### Record diagnostics - maxlp[t] = maximum(lw) - - #### Validate weights - if !any(isfinite.(lw)) - # stop = ifelse(direction == "forward", t - 1, t + 1) - stop = t - pos = sort([start, stop]) - pos = pos[1]:pos[2] - julia_warning("Weights from filter ($start -> $finish) are zero at time $t): returning outputs from $(minimum(pos)):$(maximum(pos)). Note that all (log) weights at $t are -Inf.") - return particulate("filter: " * direction, call_start, - timeline[pos], xout[:, pos], ess[pos], maxlp[pos], - np, 1, false) - end - - #### Resample particles - # Normalise weights - lw_norm = lw .- logsumexp(lw) - # Evaluate ESS - ess[t] = exp(-logsumexp(2 * lw_norm)) - # Compute resampling indices - idx = resample(exp.(lw_norm), np) - # Record (subset) of resampled particles - # (A deep copy is implicitly made here via the subsetting) - xout[:, t] .= xnow[idx[1:nr]] - # Optionally resample particles for next iteration - do_resample = (do_t_resample && (t in t_resample)) || (ess[t] <= n_resample) - if do_resample - xpast .= xnow[idx] - lw .= zero(Float64) - else - xpast .= xnow + #### Record diagnostics + maxlp[t] = maximum(lw) + + #### Validate weights + if !any(isfinite.(lw)) + # stop = ifelse(direction == "forward", t - 1, t + 1) + stop = t + pos = sort([start, stop]) + pos = pos[1]:pos[2] + julia_warning("Weights from filter ($start -> $finish) are zero at time $t): returning outputs from $(minimum(pos)):$(maximum(pos)). Note that all (log) weights at $t are -Inf.") + if do_batch + if direction == "forward" + @save batch[b] xfwd = xout + else + @save batch[b] xbwd = xout + end + xout = nothing + end + return (states = xout, ess = ess, maxlp = maxlp, convergence = false) + end + + #### Resample particles + # Normalise weights + lw_norm = lw .- logsumexp(lw) + # Evaluate ESS + ess[t] = exp(-logsumexp(2 * lw_norm)) + # Compute resampling indices + idx = resample(exp.(lw_norm), np) + # Record (subset) of resampled particles + # (A deep copy is implicitly made here via the subsetting) + xout[:, i] .= xnow[idx[1:nr]] + # Optionally resample particles for next iteration + do_resample = (do_t_resample && (t in t_resample)) || (ess[t] <= n_resample) + if do_resample + xpast .= xnow[idx] + lw .= zero(Float64) + else + xpast .= xnow + end + end - end + # Write outputs to batch for file + if do_batch + if direction == "forward" + @save batch[b] xfwd = xout + else + @save batch[b] xbwd = xout + end + xout = nothing + end - return particulate("filter: " * direction, call_start, - timeline, xout, ess, maxlp, - np, 1, true) + end -end + return (states = xout, ess = ess, maxlp = maxlp, convergence = true) -""" -# Iterative particle filter +end -An interative implementation of the particle filter. +""" + particle_filter(; timeline::Vector{DateTime}, + xinit::Vector{<:State}, + yobs::Dict, + model_move::ModelMove, + n_move::Int = 100_000, + n_record::Int = 1000, + n_resample::Float64 = Float64(n_record), + t_resample::Union{Nothing, Int, Vector{Int}}, + n_iter::Int64 = 1, + direction::String = "forward", + batch::Union{Nothing, Vector{String}} = nothing) + +A particle filtering algorithm that samples from `f(s_t | y_{1:t})` for `t ∈ 1:t`. # Arguments (keywords) -- `...`: Keyword arguments passed to [`particle_filter`](@ref); +- `timeline`: A `Vector{DateTime}` of ordered, regularly spaced time stamps that defines the time steps for the simulation; +- `xinit`: A `Vector` of [`State`](@ref) instances that defines the initial state(s) of the animal; +- `yobs`: A Dictionary of observations: + - Dictionary keys should match elements in `timeline`; + - Each element must be a `Vector` of `Tuple`s for that time step (one for each observation/sensor); + - Each `Tuple` should contain (a) the observation and (b) the model parameters (that is, a [`ModelObs`](@ref) instance); +- `model_move`: A [`ModelMove`](@ref) instance: + - The movement model describes movement from one time step to the next and therefore depends implicitly on the resolution of `timeline`; + - The movement model should align with the [`State`](@ref) instances in `xinit`. For example, a two-dimensional state (`StateXY`) requires a corresponding movement model instance (i.e., `ModelMoveXY`); +- `n_move`: An integer that defines the number of attempts used to find a legal move; + - All [`ModelMove`](@ref) sub-types contain a `map` field that defines the region(s) within which movements are allowed (see [`is_valid()`](@ref)); + - Each particle is moved up to `n_move` times, until a valid movement is simulated; + - Particles that fail to generate a valid move are killed; +- `n_record`: An integer that defines the number of particles to record at each time step: + - `n_record` particles are resampled at each time step and recorded in memory; +- `n_resample`: A number that defines the effective sample size for resampling: + - Particles are resampled when the effective sample size <= `n_resample`; +- `t_resample`: `nothing`, an `integer` or a Vector of `integer`s that define the time step(s) at which to force resampling; + - Particles are resampled at `t_resample` regardless of the effective sample size; - `n_iter`: A integer that defines the maximum number of iterations (trials); +- `direction:` A `String` that defines the direction of the filter: + - `"forward"` runs the filter forwards in time; + - `"backward"` runs the filter backwards in time; +- (optional) `batch`: A Vector of `.jld2` file paths for particles (see Memory Management); -# Details +# Algorithm + +## Initiation + +The algorithm is initiated using a `Vector` of `n_particle` [`State`](@ref)s (`xinit`). See [`simulate_states_init()`](@ref) to simulate initial states for the filter. + +## Movement + +For every time step in the `timeline`, the internal function [`Patter.simulate_move()`](@ref) simulates the movement of particles away from previous [`State`](@ref)s into new [`State`](@ref)s using the movement model, as specified by `model_move`. [`Patter.simulate_move()`](@ref) is an iterative wrapper for a [`Patter.simulate_step()`](@ref) method that simulates a new [`State`](@ref) instance from the previous [`State`](@ref). [`Patter.simulate_move()`](@ref) implements [`Patter.simulate_step()`](@ref) iteratively until a legal move is found (or `n_move` is reached). For custom [`State`](@ref) or [`ModelObs`](@ref) sub-types, a corresponding [`Patter.simulate_step()`](@ref) method is required. Illegal moves are those that land in `NaN` locations on the `map` or, in the case of [`State`](@ref)s that include a depth (`z`) component, are below the depth of the seabed (see [`is_valid()`](@ref)). Particles that fail to generate legal moves are eventually killed by re-sampling (see below). + +## Likelihood + +Observations are used to weight simulated particles. To simulate observations for filtering, use [`simulate_yobs()`](@ref). To assemble real-world observations for filtering, see [`assemble_yobs()`](@ref). For each valid [`State`](@ref) and time stamp in `yobs`, the log-probability of each observation, given the [`State`](@ref), is evaluated via `Patter.logpdf_obs()`. For custom [`State`](@ref) or [`ModelObs`](@ref) sub-types, a corresponding `Patter.logpdf_obs()` method is required. The maximum weight across all particles (`maxlp`) is recorded at each time step as an algorithm diagnostic. (This metric can be intepreted as the maximum log-posterior if resampling is implemented at every time step.) + +## Resampling + +Particles are periodically re-sampled, with replacement, using the low-variance systematic re-sampling algorithm (via [`Patter.resample()`](@ref)), at time steps in `t_resample` or when the effective sample size is less than or equal to `n_resample`. This has the effect of eliminating impossible particles and duplicating likely ones. -This function wraps [`particle_filter`](@ref). The filter is implemented up to `n_iter` times, or until convergence is achieved. +The algorithm continues in this way, iterating over the `timeline`, simulating, weighting and (re)sampling particles. At each time step, `n_record` particles are saved in memory. If the function fails to converge, a warning is returned alongside the outputs up to that time step. Otherwise, the function will continue to the end of the time series. + +## Multi-threading + +The iteration over particles (i.e., simulated movements and likelihood evaluations) are multi-threaded. + +## Memory management + +By default, `n_record` particles at each time step are retained in memory. If `batch` is provided, the `timeline` is split into `length(batch)` batches. The filter still moves along the whole `timeline`, but only records the particles for the current batch in memory. At the end of each batch, the particles for that batch are written to file. This reduces total memory demand. + +`batch` file paths are sorted alphanumerically if `direction = "forward"` and in reverse order if `direction = "backward"`. For example: +* If you have a `timeline` of 10 time steps, `direction = "forward"` and `batch = ["fwd-1.jld2", "fwd-2.jld2", "fwd-3.jld2"]`, `fwd-1.jld2`, `fwd-2.jld2` and `fwd-3.jld2` contain the particle matrices for time steps `[1, 2, 3]`, `[4, 5, 6]` and [`7, 8, 9, 10`], respectively. +* If you have a `timeline` of 10 time steps, `direction = "backward"` and `batch = ["bwd-1.jld2", "bwd-2.jld2", "bwd-3.jld2"]`, `bwd-1.jld2`, `bwd-2.jld2` and `bwd-3.jld2` similarly contain the particle matrices for time steps `[1, 2, 3]`, `[4, 5, 6]` and [`7, 8, 9, 10`], respectively. + +## Convergence and diagnostics -# Returns +Algorithm convergence is not guaranteed. The algorithm may reach a dead-end---a time step at which there are no valid locations into which the algorithm can step. This may be due to data errors, incorrect assumptions, insufficient sampling effort or poor tuning-parameter settings. + +# Returns - A [`Particles`](@ref) structure; # See also -* [`particle_filter`](@ref) implements the particle filter; +* [`State`](@ref), [`ModelMove`](@ref) and [`ModelObs`](@ref) for [`State`](@ref), movement model and observation model sub-types; +- [`simulate_yobs()`](@ref) and [`assemble_yobs()`](@ref) to prepare observations for the particle filter; +* [`Patter.simulate_step()`](@ref) and [`Patter.simulate_move()`](@ref) for the internal routines used to simulate new [`State`](@ref)s; +* `Patter.logpdf_obs()` methods to evaluate the log probability of observations; +* [`particle_smoother_two_filter()`](@ref) to implement particle smoothing; """ -function particle_filter_iter( +function particle_filter( ; timeline::Vector{DateTime}, - xinit::Vector, + xinit::Vector{<:State}, yobs::Dict, model_move::ModelMove, n_move::Int = 100_000, @@ -357,7 +400,8 @@ function particle_filter_iter( n_resample::Float64 = Float64(n_record), t_resample::Union{Nothing, Int, Vector{Int}}, n_iter::Int64 = 1, - direction::String = "forward") + direction::String = "forward", + batch::Union{Nothing, Vector{String}} = nothing) # Run filter iteratively call_start = now() @@ -366,24 +410,23 @@ function particle_filter_iter( out = nothing while iter < n_iter iter = iter + 1 - out = particle_filter(timeline = timeline, - xinit = xinit, - yobs = yobs, - model_move = model_move, - n_move = n_move, - n_record = n_record, - n_resample = n_resample, - t_resample = t_resample, - direction = direction) - if out.callstats.convergence[1] + out = _particle_filter(timeline = timeline, + xinit = xinit, + yobs = yobs, + model_move = model_move, + n_move = n_move, + n_record = n_record, + n_resample = n_resample, + t_resample = t_resample, + direction = direction, + batch = batch) + if out.convergence[1] break end end - - # Define outputs - out.callstats.timestamp .= call_start - out.callstats.n_iter .= iter - out.callstats.time .= call_duration(call_start) - return out + + return particulate("filter: " * direction, call_start, + timeline, out.states, out.ess, out.maxlp, + length(xinit), iter, out.convergence) end \ No newline at end of file diff --git a/src/010-particle-smoother.jl b/src/010-particle-smoother.jl index ca17941..92f999c 100644 --- a/src/010-particle-smoother.jl +++ b/src/010-particle-smoother.jl @@ -1,31 +1,190 @@ using Base.Threads: @threads +using JLD2 using ProgressMeter: @showprogress -export two_filter_smoother +export particle_smoother_two_filter + + +# batch_indices() helper +# * This determines the UnitRange over which we iterate in _particle_smoother_two_filter() in line with the batch +# * `b` is the index of the current batch +# * `nb` is the total number of batches +# * `nt` is the total number of columns +function batch_indices(b::Int, nb::Int, nt::Int) + if nb == 1 + indices = 2:(nt - 1) + else + if b == 1 + indices = 2:nt + else + if b < nb + indices = 1:nt + else + indices = 1:(nt - 1) + end + end + end + return indices +end + + +# smooth_weights() helper to compute weights +# * Compartmentalising this function massively boosts computation time +# * `xbwd` and `xfwd` are Vectors of particles from `xbwd[:, t]` and `xfwd[:, t - 1]` +# * `t` is the _actual_ time step +# * `np` is the number of particles, which is predefined for speed only +# * `t` is the _actual_ time step +# * other arguments are as defined in particle_smoother_two_filter() +function smooth_weights(xbwd::Vector{<:State}, xfwd::Vector{<:State}, + np::Int, + zdim::Bool, + model_move::ModelMove, + t::Int, + vmap::Union{GeoArray, Nothing}, + n_sim::Int, + cache_norm_constants::Union{Dict{<:State,Float64},Nothing}) + w = zeros(np) + @threads for k in 1:np + for j in 1:np + # Evaluate probability density of movement between locations (i.e., the weight) for xbwd[k, t] to xfwd[j, t - 1] + w[k] += exp(logpdf_move(xbwd[k], xfwd[j], zdim, model_move, t, vmap, n_sim, cache_norm_constants)) + end + end + return w +end + + +# smooth_resample() helper to resample smoothed particles and record ESS +# * `xfwd` and `xbwd` are matrices +# * `t` is the _column index_ +# * `np`, `n_fwd_half` and `n_fwd_bwd` are constants (predefined for speed) +function smooth_resample(xfwd::Matrix{<:State}, xbwd::Matrix{<:State}, + w::Vector{Float64}, + t::Int, + np::Int, n_fwd_half::Int, n_bwd_half::Int) + if any(w .> 0) + # (A) If there are positive weights, normalise, compute ESS & resample + w .= w ./ sum(w) + ess_t = 1 / sum(abs2, w) + # Resample particles from xbwd + idx = resample(w, np) + xout_t = xbwd[idx, t] + else + # (B) If all weights are zero: + # * Sample 50 % of particles from forward filter & 50 % from the backward filter + # * (Particles are equally weighted thanks to resampling) + # * Leave ess[t] = NaN (throw warning in outer function) + ess_t = NaN + xout_t = vcat(xfwd[1:n_fwd_half, t], xbwd[1:n_bwd_half, t]) + end + return xout_t, ess_t +end + + +# Internal two-filter smoothing function, wrapped by particle_smoother_two_filter() +# * This function implements the core logic and permits batching in the wrapper function +# * `timesteps` is a Vector of the time steps for a specific batch e.g., (1, 2, 3 or 4, 5, 6 or 7, 8, 9 10) +# * `indices` is a UnitRange of indices over which we iterate +# - If a single batch, indices is 2:(nt-1) (particles at t = 1 are given by xbwd[:, 1]) +# - If batch one of several, indices is 2:nt +# - If batch two of several, indices = 1:nt +# - If batch N of N, indices = 1:(nt - 1) (particles at t = T are given by xfwd[1:, T]) +# * `xfwd_init` are the locations from the forward filter +# - This is needed for multi-batch implementations to compute density from xbwd[:, t] to xfwd[:, t-1] at t = 1 if we are on batch ID > 1 +# * `xfwd` and `xbwd` are the state (location) matrices (for a selected batch) +# * Other arguments are defined as for the wrapper function + +function _particle_smoother_two_filter(; timesteps::Vector{Int}, + indices::UnitRange{Int}, + xfwd_init::Union{Nothing, Vector{<:State}}, + xfwd::Matrix{<:State}, xbwd::Matrix{<:State}, + model_move::ModelMove, + vmap::Union{GeoArray, Nothing} = nothing, + n_sim::Int = 100, + cache::Bool = true) + + #### Check inputs + size(xfwd) == size(xbwd) || error("Forward and backward sample do not match!") + + #### Set up + # Identify the dimension of the movement model + zdim = hasfield(typeof(xfwd[1]), :z) + # Define smoothed particles matrix + # (rows: particles; columns: time steps) + xout = similar(xfwd) + # Define particle indices + # * If the two filters are incompatible (all weights zero), + # * ... we sample n_fwd_half and n_bwd_half particles from + # * ... the forward and backward filters respectively + np, nt = size(xout) + half = (np ÷ 2) + n_fwd_half = length(1:half) + n_bwd_half = length((half + 1):np) + # Initialise ESS vector + ess = fill(NaN, nt) + + #### Precomputations + # Precompute normalisation constants if cache = true & n_sim > 0 + # * This is possible for movement models for which the density only depends on `xbwd` fields + if cache & n_sim > 0 + cache_norm_constants = logpdf_move_normalisations(xbwd, model_move, vmap, n_sim) + else + cache_norm_constants = nothing + end + + #### Run smoothing for t = 2:(nt - 1), 1:nt or 2:(nt - 1): + @showprogress desc = "Running two-filter smoother..." for t in indices + + # println(t) + + # Compute weights + if t == 1 + # If the column index t = 1 and we are on batch 1, this step is not activated + # (indices = 2:(nt - 1) as we use particles from the backward filter at time = 1) + # For other batches, we compute the weight from the first column index to the previous position on the forward filter + # (This is recorded in xfwd_init) + w = smooth_weights(xbwd[:, 1], xfwd_init, np, zdim, model_move, timesteps[t], vmap, n_sim, cache_norm_constants) + else + w = smooth_weights(xbwd[:, t], xfwd[:, t - 1], np, zdim, model_move, timesteps[t], vmap, n_sim, cache_norm_constants) + end + + # Rsample particles & compute ESS using smoothed weights + xout[:, t], ess[t] = smooth_resample(xfwd, xbwd, w, t, np, n_fwd_half, n_bwd_half) + + end + + # Return xout and ess + return (xout = xout, ess = ess) + +end """ - two_filter_smoother(; timeline::Vector{DateTime}, - xfwd::Matrix, xbwd::Matrix, - model_move::ModelMove, - vmap::Union{GeoArray, Nothing}, - n_sim::Int, - cache::Bool) + particle_smoother_two_filter(; timeline::Vector{DateTime}, + xfwd::Union{Matrix{<:State}, Vector{String}}, + xbwd::Union{Matrix{<:State}, Vector{String}}, + model_move::ModelMove, + vmap::Union{GeoArray, Nothing} = nothing, + n_sim::Int = 100, + cache::Bool = true, + batch::Union{Nothing, Vector{String}} = nothing) -A two-filter particle smoother that samples from `f(X_t | {Y_1 ... Y_T}) for t ∈ 1:T`. +A two-filter particle smoother that samples from `f(s_t | y_{1:T})` for `t ∈ 1:T`. # Arguments (keywords) - `timeline`: A `Vector{DateTime}` of ordered, regularly spaced time stamps that defines the time steps for the simulation; -- `xfwd`: A `Matrix` of [`State`](@ref)s from the forward filter (see [`particle_filter()`](@ref)); -- `xbwd`: A `Matrix` of [`State`](@ref)s from the backward filter (see [`particle_filter()`](@ref)); +- `xfwd`, `xbwd`: Particles from the forward and backward filters (see [`particle_filter()`](@ref)), supplied as: + - A `Matrix` of [`State`](@ref)s (in memory); + - A `Vector` of file paths, if [`particle_filter()`](@ref) was implemented with `batch`; - `model_move`: A [`ModelMove`](@ref) instance; - `vmap`: (optional) A `GeoArray` that defines the 'validity map' (see [`Patter.logpdf_move()`](@ref)); - `n_sim`: An integer that defines the number of Monte Carlo simulations (see [`Patter.logpdf_move()`](@ref)); - `cache`: A `Bool` that defines whether or not to precompute and cache movement density normalisation constants (see [`Patter.logpdf_move()`](@ref)); +- (optional) `batch`: A `Vector` of `.jld2` file paths for particles (see [`particle_filter()`](@ref)); # Details -[`two_filter_smoother()`](@ref) smooths particles from the particle filter (see [`particle_filter()`](@ref)). The `timeline` from the particle filter should be supplied as well as a `Matrix` of particles from a forward run and a backward run. The two filter smoother works by iteratively resampling particles in line with the probability density of movement between particles from the backward filter at time `t` and particles from the forward filter at time `t - 1`. [`Patter.logpdf_move()`](@ref) is an internal function that evaluates the log probability of a movement step between particles. This function wraps the [`Patter.logpdf_step()`](@ref) generic. Methods are provided for built-in [`State`](@ref) and [`ModelMove`](@ref) sub-types. To use custom sub-types, a corresponding [`Patter.logpdf_step()`](@ref) method should be provided. In [`two_filter_smoother()`](@ref), the `vmap` and `n_sim` arguments support the calculate of probability densities (see [`Patter.logpdf_move()`](@ref)). For movement models for which the density only depends on fields in `xbwd` and `xfwd`, set `cache = true` to precompute and store normalisation constants for density calculations for unique `xbwd` elements. Note that since typically only a subsample of particles from [`particle_filter()`](@ref) are retained in memory, it is not guaranteed that valid moves will exist between particle pairs at all time steps. At time step(s) in which the two filters are incompatible, 50 % of particles are retained from the forward filter and 50 % from the backward filter with a warning. The effective sample size at such time steps is set to `NaN`, providing an index and counter for problematic time steps (see Returns). +[`particle_smoother_two_filter()`](@ref) smooths particles from the particle filter (see [`particle_filter()`](@ref)). The `timeline` from the particle filter should be supplied as well as a `Matrix` of particles from a forward run and a backward run (or a `Vector` of file paths to those matrices). The two filter smoother works by iteratively resampling particles in line with the probability density of movement between particles from the backward filter at time `t` and particles from the forward filter at time `t - 1`. [`Patter.logpdf_move()`](@ref) is an internal function that evaluates the log probability of a movement step between particles. This function wraps the [`Patter.logpdf_step()`](@ref) generic. Methods are provided for built-in [`State`](@ref) and [`ModelMove`](@ref) sub-types. To use custom sub-types, a corresponding [`Patter.logpdf_step()`](@ref) method should be provided. In [`particle_smoother_two_filter()`](@ref), the `vmap` and `n_sim` arguments support the calculate of probability densities (see [`Patter.logpdf_move()`](@ref)). For movement models for which the density only depends on fields in `xbwd` and `xfwd`, set `cache = true` to precompute and store normalisation constants for density calculations for unique `xbwd` elements. Note that since typically only a subsample of particles from [`particle_filter()`](@ref) are retained in memory, it is not guaranteed that valid moves will exist between particle pairs at all time steps. At time step(s) in which the two filters are incompatible, 50 % of particles are retained from the forward filter and 50 % from the backward filter with a warning. The effective sample size at such time steps is set to `NaN`, providing an index and counter for problematic time steps (see Returns). Batching is only implemented if the inputs (`xfwd`, and `xbwd`) and outputs (via `batch`) are batched (and contain the same number of batches). # Returns @@ -35,106 +194,125 @@ A two-filter particle smoother that samples from `f(X_t | {Y_1 ... Y_T}) for t - [`particle_filter()`](@ref) implements the particle filter; - [`Patter.logpdf_step()`](@ref), [`logpdf_move_normalisation()`](@ref) and [`Patter.logpdf_move()`](@ref) evaluate the log probability (density) of movement between two [`State`](@ref)s; -- [`two_filter_smoother()`](@ref) implements the two-filter particle smoother; +- [`particle_smoother_two_filter()`](@ref) implements the two-filter particle smoother; # Source Fearnhead, P., Wyncoll, D., Tawn, J., [2010](https://doi.org/10.1093/biomet/asq013). A sequential smoothing algorithm with linear computational cost. Biometrika 97, 447–464. """ -function two_filter_smoother(;timeline::Vector{DateTime}, - xfwd::Matrix, xbwd::Matrix, model_move::ModelMove, - vmap::Union{GeoArray, Nothing} = nothing, - n_sim::Int = 100, - cache::Bool = true) +function particle_smoother_two_filter(; timeline::Vector{DateTime}, + xfwd::Union{Matrix{<:State}, Vector{String}}, xbwd::Union{Matrix{<:State}, Vector{String}}, model_move::ModelMove, + vmap::Union{GeoArray, Nothing} = nothing, + n_sim::Int = 100, + cache::Bool = true, + batch::Union{Nothing, Vector{String}} = nothing) - #### Check inputs + #### Initialise call_start = now() + + #### Prepare batch settings + # Check xfwd & xbwd sizes (of matrices or files) match size(xfwd) == size(xbwd) || error("Forward and backward sample do not match!") + # We start by assuming 1 batch and set do_batch to false + nb = 1 + do_batch = false + # Handle batching + if (isa(xfwd, Vector{String})) + # If inputs are batched, we enforce the same number of output batches + !isnothing(batch) || error("Batched inputs required batched outputs.") + @assert length(xfwd) == length(xbwd) == length(batch) + # Record batch file paths + xfwd_batch = deepcopy(xfwd) + xbwd_batch = deepcopy(xbwd) + # Update number of batches and do_batch + nb = length(xfwd_batch) + do_batch = true + else + isnothing(batch) || error("`batch` is only implemented if inputs are batched.") + end - #### Set up - # Identify dimension of input state - # * This is used to check if states are valid (incl. for normalisation simulation) - zdim = hasfield(typeof(xfwd[1]), :z) - # Define smoothed particles matrix - # (rows: particles; columns: time steps) - xout = similar(xfwd) - xout[:, 1] = xbwd[:, 1] - xout[:, end] = xfwd[:, end] - # Define particle indices - # * If the two filters are incompatible (all weights zero), - # * ... we sample n_fwd_half and n_bwd_half particles from - # * ... the forward and backward filters respectively - np, nt = size(xout) - half = (np ÷ 2) - n_fwd_half = length(1:half) - n_bwd_half = length((half + 1):np) - warn_zero_weights = false - # Initialise ESS vector - ess = zeros(nt) - ess[1] = np # = 1 / sum(abs2, w) given equal weights from filter - ess[nt] = np + #### Prepare smoother objects + # Initialise state matrices/vectors + xout = nothing + xfwd_init = nothing + # Initialise ESS vector (over full timetime) + nt = length(timeline) + np = NaN # read np below + ess = fill(NaN, nt) # zeros(nt) + # Define indices for batches + # * timesteps_by_batch is a Vector of time steps (one element per batch), as defined in the filter + # - This is used to update the global ess Vector + # - E.g., 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 + timesteps_by_batch = split_indices(collect(1:length(timeline)), nb) - #### Precomputations - # Precompute normalisation constants if cache = true - # * This is possible for movement models for which the density only depends on `xbwd` fields - if cache - cache_norm_constants = logpdf_move_normalisations(xbwd, model_move, vmap, n_sim) - else - cache_norm_constants = nothing - end + #### Run smoothing (over batches as required) + for b in 1:nb - #### Run smoothing - @showprogress desc = "Running two-filter smoother..." for t in 2:(nt - 1) + # Read batch if required + if do_batch + @load xfwd_batch[b] xfwd + @load xbwd_batch[b] xbwd + end - # Compute weights - # * Multi-threaded implementation is somewhat faster with Dict() cache_norm_constants formulation - w = zeros(np) - @threads for k in 1:np - for j in 1:np - # Evaluate probability density of movement between locations (i.e., the weight) - w[k] += exp(logpdf_move(xbwd[k, t], xfwd[j, t - 1], zdim, model_move, t, vmap, n_sim, cache_norm_constants)) - end + # Define indicies for batch + timesteps = timesteps_by_batch[b] + indices = batch_indices(b, nb, size(xfwd, 2)) + + # Run smoother (for batch) + # If b = 1, xfwd_init = nothing as xfwd_init is not required + bout = _particle_smoother_two_filter(timesteps = timesteps, + indices = indices, + xfwd_init = xfwd_init, + xfwd = xfwd, + xbwd = xbwd, + model_move = model_move, + vmap = vmap, + n_sim = n_sim, + cache = cache) + + # Update particles for t = 1 and t = T + if b == 1 + np = size(xbwd, 1) # Record np once here for convenience (used below) + bout.xout[:, 1] = xbwd[:, 1] # Update particles end - - # Validate weights & implement resampling - if any(w .> 0) - # (A) If there are positive weights, normalise, compute ESS & resample - w .= w ./ sum(w) - ess[t] = 1 / sum(abs2, w) - # Resample particles from xbwd & store - idx = resample(w, np) - xout[:, t] = xbwd[idx, t] - else - # (B) If all weights are zero, set a warning flag - # * For speed, the warning is thrown only once outside the for loop - # * (as it may occur at multiple time steps) - warn_zero_weights = true - # Set ESS[t] = NaN - ess[t] = NaN - # Sample 50 % of particles from forward filter - # * We simply select the first n_fwd_half particles since - # * ... particles are equally weighted thanks to resampling - xout[1:half , t] = xfwd[1:n_fwd_half, t] - # Sample 50 % of particles from backward filter - xout[(half + 1):end, t] = xbwd[1:n_bwd_half, t] + if b == nb + bout.xout[:, end] = xfwd[:, end] + end + + # Record ess + # (ESS at t = 1 and t = T is updated outside of the loop over batches) + ess[timesteps] = bout.ess + # Record particles + if do_batch + # Record xfwd_init + # The last locations for this batch are the locations at t - 1 on the next batch + xfwd_init = xfwd[:, end] + # Write particles to file and leave xout = nothing + @save batch[b] xsmo = bout.xout + else + xout = bout.xout end - end + end - #### Implement post-processing - # Warn for smoothing failures - if warn_zero_weights - nan_count = count(isnan, ess) - nan_perc = round(nan_count / length(ess) * 100, digits = 2) + #### Update ESS + # Update ESS for t = 1 and t = T + ess[1] = Float64(np) # = 1 / sum(abs2, w) given equal weights from filter + ess[nt] = Float64(np) + # Evaluate smoothing success + # * Set convergence = false if > 5 % NaN ESS + nan_count = count(isnan, ess) + nan_perc = round(nan_count / length(ess) * 100, digits=2) + convergence = nan_perc <= 5 ? true : false + if nan_count > 0 julia_warning("All smoothing weights (from xbwd[k, t] to xfwd[j, t - 1]) are zero at $nan_count time step(s) ($nan_perc %).") - end + end #### Return outputs - # Follow particle_filter() format particulate("smoother: two-filter", call_start, timeline, xout, ess, fill(NaN, length(timeline)), - np, NaN, true) + np, NaN, convergence) end \ No newline at end of file diff --git a/src/013-R-from-Julia.jl b/src/013-R-from-Julia.jl index 210864e..32b9879 100644 --- a/src/013-R-from-Julia.jl +++ b/src/013-R-from-Julia.jl @@ -10,7 +10,7 @@ using OrderedCollections # * [`Patter.r_get_dataset()`](@ref) translates a Dictionary of observations into a `Vector` of `DataFrame`s that can be passed to `R`. # * `Patter.r_get_states()` translates a `Matrix` of [`State`](@ref)s into a `DataFrame` that can be passed to `R`. In the input `Matrix`, each row is a particle and each column is a time step. -# * [`Patter.r_get_particles()`](@ref) wraps `Patter.r_get_states()` and translates particle outputs (from [`particle_filter()`](@ref) and [`two_filter_smoother()`](@ref)) into a `NamedTuple` for `R`. +# * [`Patter.r_get_particles()`](@ref) wraps `Patter.r_get_states()` and translates particle outputs (from [`particle_filter()`](@ref) and [`particle_smoother_two_filter()`](@ref)) into a `NamedTuple` for `R`. # These functions are [`State`](@ref) and model agnostic; that is, they work irrespective of the input [`State`](@ref) and model sub-types. Custom methods are not required to handle novel sub-types.