Skip to content

Instantly share code, notes, and snippets.

@simeoncarstens
Last active January 27, 2020 14:21
Show Gist options
  • Save simeoncarstens/3d3cc9bb340edeecf21bb58f419da860 to your computer and use it in GitHub Desktop.
Save simeoncarstens/3d3cc9bb340edeecf21bb58f419da860 to your computer and use it in GitHub Desktop.
Importance Sampling, Sequential Monte Carlo, Particle Filtering and stuff
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
import numpy as np
import matplotlib.pyplot as plt
from scipy.special import logsumexp
choice = np.random.choice
class Pdf(object):
def __call__(self, x):
"""log-probability"""
pass
def sample(self, n):
pass
class ImportanceSampler(object):
def __init__(self, posterior):
self.posterior = posterior
def sample(self, imp_dist, n_samples):
imp_samples = imp_dist.sample(n_samples)
imp_weights = self.calculate_weights(imp_dist, imp_samples)
# doing stuff in log-space to avoid underflows
normalized_imp_weights = imp_weights - logsumexp(imp_weights)
return imp_samples, normalized_imp_weights
def calculate_weights(self, imp_dist, imp_samples):
return self.posterior(imp_samples) - imp_dist(imp_samples)
class SequentialImportanceSampler(ImportanceSampler):
def __init__(self, posterior):
super(SequentialImportanceSampler, self).__init__(posterior)
self.weights = None
def sample(self, n_samples):
(imp_samples, weights) = super(
SequentialImportanceSampler, self).sample(self.posterior.prior,
n_samples)
# store current weights to make them available to recursion
self.weights = weights
return imp_samples, weights
def calculate_weights(self, _, imp_samples):
if self.weights is None:
self.weights = np.log(np.ones(len(imp_samples)) / len(imp_samples))
likelihoods = self.posterior.likelihood_function(imp_samples)
unnormalized_weights = self.weights + likelihoods
self.weights = unnormalized_weights - logsumexp(unnormalized_weights)
return self.weights
class Gaussian(Pdf):
def __init__(self, mu=0, sigma=1):
self.mu = mu
self.sigma = sigma
def __call__(self, x):
return -0.5 * (x - self.mu) ** 2 / self.sigma / self.sigma
def sample(self, n):
return np.random.normal(self.mu, self.sigma, n)
class Uniform(Pdf):
def __init__(self, low, high):
self.low = low
self.high = high
def __call__(self, x):
return np.repeat(-np.log(self.high - self.low), len(x))
def sample(self, n):
return np.random.uniform(self.low, self.high, n)
class HMMPosterior(Pdf):
def __init__(self, trans_matrix, obs_probs):
self.observations = []
self.obs_probs = obs_probs
self.prior = trans_matrix
def likelihood_function(self, hidden_states):
return np.log(self.obs_probs[hidden_states, self.observations[-1]])
def add_observation(self, x):
self.observations.append(x)
class TransitionMatrix(object):
def __init__(self, transition_matrix):
self.transition_matrix = transition_matrix
self.hidden_states = np.arange(len(transition_matrix))
def sample(self, old_states):
return np.array([choice(self.hidden_states,
p=self.transition_matrix[hidden_state])
for hidden_state in old_states])
class SequentialPrior(Pdf):
def __init__(self, distribution, first_state):
self.distribution = distribution
self.samples = first_state
def sample(self, n_samples):
self.samples = self.distribution.sample(self.samples)
return self.samples
if True:
if not False:
# importance sampling
n_samples = 200000
target = Gaussian()
imp = Uniform(-10, 10)
sampler = ImportanceSampler(target)
biased_samples, logws = sampler.sample(imp, n_samples)
# we have samples from the importance distribution,
# but not from the target distribution. So we have
# to reweight. To get samples from the target,
# just draw from the list of importance samples with
# probabilities given by the weights
samples = choice(
biased_samples, 2*n_samples, p=np.exp(logws))
fig, ax = plt.subplots()
ax.hist(samples, bins=30)
plt.show()
if True:
# sequential importance sampling
n_particles = 100
# define multinomial HMM (discrete hidden states,
# discrete observation states)
hidden_states = [0, 1]
observation_states = [0, 1]
start_prob = np.array([0.5, 0.5])
trans_matrix = np.array([[0.9, 0.1],
[0.1, 0.9]])
obs_probs = np.array([[0.9, 0.1],
[0.1, 0.9]])
# draw first state of Markov chain from the start distribution
start_state = choice(hidden_states, n_particles, p=start_prob)
pdist = TransitionMatrix(trans_matrix)
prior = SequentialPrior(pdist, start_state)
posterior = HMMPosterior(prior, obs_probs)
sampler = SequentialImportanceSampler(posterior)
# generate data
n_observations = 30
# we have to start with some state...
true_states = [0]
observations = []
for _ in range(n_observations):
new_state = choice(hidden_states, p=trans_matrix[true_states[-1]])
new_observation = choice(
observation_states, p=obs_probs[new_state])
observations.append(new_observation)
true_states.append(new_state)
# will store sucessive posterior samples; is enlarged in each iteration
states_samples = None
# stores log-weights for all samples
all_log_ws = []
for observation in observations:
# we're on-line: a new observation is arriving!
posterior.add_observation(observation)
# draw samples from importance distribution and
# store them along with their corresponding importance
# weights
(samples, log_ws) = sampler.sample(n_particles)
if states_samples is None:
states_samples = samples.reshape(1, n_particles)
else:
# append n_particles samples to states_samples
states_samples = np.vstack((states_samples, samples))
all_log_ws.append(log_ws)
# ... and back to actual probabilities
all_weights = np.exp(all_log_ws)
# get samples from target by reweighting importance samples
target_samples = np.array([choice(samples, n_samples, p=weight)
for weight, samples in zip(all_weights,
states_samples)])
# make fancy plots
times = np.arange(n_observations)
fig, (ax1, ax2) = plt.subplots(1, 2)
# plot data and time trace of one of the posterior probabilities
ax1.scatter(times, observations)
ax1.plot(times, target_samples.sum(1) / n_samples,
ls='--', color='orange', label=r'$p(x=1|y_{1:t})$')
ax1.set_yticks((0, 1))
ax1.set_xlabel("time")
ax1.legend()
# for each time step, plot cumulative distribution of weights
# whose lines get darker with increasing time step
for alpha, weights in zip(np.linspace(1, 0, n_observations),
all_weights):
ax2.plot(np.sort(weights),
np.linspace(0, 1, n_particles, endpoint=False),
color="black", alpha=alpha)
# double log scale because the plot sucks otherwise
ax2.set_xscale("log")
ax2.set_yscale("log")
ax2.set_ylabel("empirical cumulative\ndistribution function")
ax2.set_xlabel("weight value")
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment