Last active
January 27, 2020 14:21
-
-
Save simeoncarstens/3d3cc9bb340edeecf21bb58f419da860 to your computer and use it in GitHub Desktop.
Importance Sampling, Sequential Monte Carlo, Particle Filtering and stuff
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import matplotlib.pyplot as plt | |
from scipy.special import logsumexp | |
choice = np.random.choice | |
class Pdf(object): | |
def __call__(self, x): | |
"""log-probability""" | |
pass | |
def sample(self, n): | |
pass | |
class ImportanceSampler(object): | |
def __init__(self, posterior): | |
self.posterior = posterior | |
def sample(self, imp_dist, n_samples): | |
imp_samples = imp_dist.sample(n_samples) | |
imp_weights = self.calculate_weights(imp_dist, imp_samples) | |
# doing stuff in log-space to avoid underflows | |
normalized_imp_weights = imp_weights - logsumexp(imp_weights) | |
return imp_samples, normalized_imp_weights | |
def calculate_weights(self, imp_dist, imp_samples): | |
return self.posterior(imp_samples) - imp_dist(imp_samples) | |
class SequentialImportanceSampler(ImportanceSampler): | |
def __init__(self, posterior): | |
super(SequentialImportanceSampler, self).__init__(posterior) | |
self.weights = None | |
def sample(self, n_samples): | |
(imp_samples, weights) = super( | |
SequentialImportanceSampler, self).sample(self.posterior.prior, | |
n_samples) | |
# store current weights to make them available to recursion | |
self.weights = weights | |
return imp_samples, weights | |
def calculate_weights(self, _, imp_samples): | |
if self.weights is None: | |
self.weights = np.log(np.ones(len(imp_samples)) / len(imp_samples)) | |
likelihoods = self.posterior.likelihood_function(imp_samples) | |
unnormalized_weights = self.weights + likelihoods | |
self.weights = unnormalized_weights - logsumexp(unnormalized_weights) | |
return self.weights | |
class Gaussian(Pdf): | |
def __init__(self, mu=0, sigma=1): | |
self.mu = mu | |
self.sigma = sigma | |
def __call__(self, x): | |
return -0.5 * (x - self.mu) ** 2 / self.sigma / self.sigma | |
def sample(self, n): | |
return np.random.normal(self.mu, self.sigma, n) | |
class Uniform(Pdf): | |
def __init__(self, low, high): | |
self.low = low | |
self.high = high | |
def __call__(self, x): | |
return np.repeat(-np.log(self.high - self.low), len(x)) | |
def sample(self, n): | |
return np.random.uniform(self.low, self.high, n) | |
class HMMPosterior(Pdf): | |
def __init__(self, trans_matrix, obs_probs): | |
self.observations = [] | |
self.obs_probs = obs_probs | |
self.prior = trans_matrix | |
def likelihood_function(self, hidden_states): | |
return np.log(self.obs_probs[hidden_states, self.observations[-1]]) | |
def add_observation(self, x): | |
self.observations.append(x) | |
class TransitionMatrix(object): | |
def __init__(self, transition_matrix): | |
self.transition_matrix = transition_matrix | |
self.hidden_states = np.arange(len(transition_matrix)) | |
def sample(self, old_states): | |
return np.array([choice(self.hidden_states, | |
p=self.transition_matrix[hidden_state]) | |
for hidden_state in old_states]) | |
class SequentialPrior(Pdf): | |
def __init__(self, distribution, first_state): | |
self.distribution = distribution | |
self.samples = first_state | |
def sample(self, n_samples): | |
self.samples = self.distribution.sample(self.samples) | |
return self.samples | |
if True: | |
if not False: | |
# importance sampling | |
n_samples = 200000 | |
target = Gaussian() | |
imp = Uniform(-10, 10) | |
sampler = ImportanceSampler(target) | |
biased_samples, logws = sampler.sample(imp, n_samples) | |
# we have samples from the importance distribution, | |
# but not from the target distribution. So we have | |
# to reweight. To get samples from the target, | |
# just draw from the list of importance samples with | |
# probabilities given by the weights | |
samples = choice( | |
biased_samples, 2*n_samples, p=np.exp(logws)) | |
fig, ax = plt.subplots() | |
ax.hist(samples, bins=30) | |
plt.show() | |
if True: | |
# sequential importance sampling | |
n_particles = 100 | |
# define multinomial HMM (discrete hidden states, | |
# discrete observation states) | |
hidden_states = [0, 1] | |
observation_states = [0, 1] | |
start_prob = np.array([0.5, 0.5]) | |
trans_matrix = np.array([[0.9, 0.1], | |
[0.1, 0.9]]) | |
obs_probs = np.array([[0.9, 0.1], | |
[0.1, 0.9]]) | |
# draw first state of Markov chain from the start distribution | |
start_state = choice(hidden_states, n_particles, p=start_prob) | |
pdist = TransitionMatrix(trans_matrix) | |
prior = SequentialPrior(pdist, start_state) | |
posterior = HMMPosterior(prior, obs_probs) | |
sampler = SequentialImportanceSampler(posterior) | |
# generate data | |
n_observations = 30 | |
# we have to start with some state... | |
true_states = [0] | |
observations = [] | |
for _ in range(n_observations): | |
new_state = choice(hidden_states, p=trans_matrix[true_states[-1]]) | |
new_observation = choice( | |
observation_states, p=obs_probs[new_state]) | |
observations.append(new_observation) | |
true_states.append(new_state) | |
# will store sucessive posterior samples; is enlarged in each iteration | |
states_samples = None | |
# stores log-weights for all samples | |
all_log_ws = [] | |
for observation in observations: | |
# we're on-line: a new observation is arriving! | |
posterior.add_observation(observation) | |
# draw samples from importance distribution and | |
# store them along with their corresponding importance | |
# weights | |
(samples, log_ws) = sampler.sample(n_particles) | |
if states_samples is None: | |
states_samples = samples.reshape(1, n_particles) | |
else: | |
# append n_particles samples to states_samples | |
states_samples = np.vstack((states_samples, samples)) | |
all_log_ws.append(log_ws) | |
# ... and back to actual probabilities | |
all_weights = np.exp(all_log_ws) | |
# get samples from target by reweighting importance samples | |
target_samples = np.array([choice(samples, n_samples, p=weight) | |
for weight, samples in zip(all_weights, | |
states_samples)]) | |
# make fancy plots | |
times = np.arange(n_observations) | |
fig, (ax1, ax2) = plt.subplots(1, 2) | |
# plot data and time trace of one of the posterior probabilities | |
ax1.scatter(times, observations) | |
ax1.plot(times, target_samples.sum(1) / n_samples, | |
ls='--', color='orange', label=r'$p(x=1|y_{1:t})$') | |
ax1.set_yticks((0, 1)) | |
ax1.set_xlabel("time") | |
ax1.legend() | |
# for each time step, plot cumulative distribution of weights | |
# whose lines get darker with increasing time step | |
for alpha, weights in zip(np.linspace(1, 0, n_observations), | |
all_weights): | |
ax2.plot(np.sort(weights), | |
np.linspace(0, 1, n_particles, endpoint=False), | |
color="black", alpha=alpha) | |
# double log scale because the plot sucks otherwise | |
ax2.set_xscale("log") | |
ax2.set_yscale("log") | |
ax2.set_ylabel("empirical cumulative\ndistribution function") | |
ax2.set_xlabel("weight value") | |
plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment