Created
November 20, 2018 15:13
-
-
Save batzner/aaed9cdb4cd1d05f85a8a88e67c57aac to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'''Trains a simple convnet on the MNIST dataset with adversarial training. | |
During the adversarial train step's feed-forward pass, the dropout masks can be | |
reused by setting REUSE_DROPOUT to True | |
For more info, see https://stackoverflow.com/questions/53395329/should-dropout-masks-be-reused-during-adversarial-training | |
Requirements: | |
- tensorflow==1.12.0 | |
- cleverhans bleeding edge: pip install git+https://github.com/tensorflow/cleverhans.git#egg=cleverhans | |
- pandas==0.22.0 | |
- tqdm==4.28.1 | |
''' | |
import numpy as np | |
import pandas as pd | |
import tensorflow as tf | |
from tensorflow.python.keras import backend as K | |
from tensorflow.python.keras.datasets import mnist | |
from tensorflow.python.keras.models import Sequential, Model | |
from tensorflow.python.keras.layers import Dense, Flatten, Dropout | |
from tensorflow.python.keras.layers import Conv2D | |
from tensorflow.python.keras.utils import to_categorical | |
from tensorflow.python.ops.losses.losses_impl import softmax_cross_entropy | |
from tensorflow.train import get_or_create_global_step | |
from cleverhans.attacks_tfe import FastGradientMethod | |
from cleverhans.model import Model as CleverhansBaseModel | |
from tqdm import tqdm | |
tf.enable_eager_execution() | |
np.random.seed(0) | |
tf.set_random_seed(0) | |
ADVERSARIAL_TRAINING = True | |
ADVERSARIAL_ALPHA = 0.5 | |
FGSM_PARAMS = {'eps': 0.3, 'clip_min': 0., 'clip_max': 1.} | |
REUSE_DROPOUT = True | |
BATCH_SIZE = 128 | |
EPOCHS = 20 | |
N_CLASSES = 10 | |
def get_model(input_shape): | |
# Except for the Dropout layer, this model is the same as the one used in | |
# cleverhans/cleverhans_tutorials/mnist_tutorial_keras_tf.py | |
model = Sequential() | |
model.add(Conv2D(64, kernel_size=(8, 8), strides=(2, 2), padding='same', | |
activation='relu', | |
input_shape=input_shape)) | |
model.add(Conv2D(128, kernel_size=(6, 6), strides=(2, 2), padding='valid', | |
activation='relu')) | |
model.add(Conv2D(128, kernel_size=(5, 5), strides=(1, 1), padding='valid', | |
activation='relu')) | |
model.add(Flatten()) | |
model.add(DropoutReuseMask(0.5)) | |
model.add(Dense(N_CLASSES, name='logits')) | |
return model | |
def main(): | |
dataset_train, dataset_test = get_datasets() | |
input_shape = dataset_train.output_shapes[0][1:] | |
model = get_model(input_shape) | |
ch_model = CleverhansModel(model) | |
fgsm = FastGradientMethod(ch_model) | |
# Train loop | |
optimizer = tf.train.AdamOptimizer() | |
progress_train = pd.DataFrame() | |
progress_test = pd.DataFrame() | |
for i_epoch in range(EPOCHS): | |
# Training | |
for x_batch, y_batch in tqdm(dataset_train): | |
if ADVERSARIAL_TRAINING: | |
loss, acc, loss_adv, acc_adv = train_step_adv(x_batch, y_batch, | |
model, optimizer, | |
fgsm) | |
step_progress = { | |
'train_loss': loss, | |
'train_loss_adv': loss_adv, | |
'train_acc': acc, | |
'train_acc_adv': acc_adv | |
} | |
else: | |
loss, acc = train_step(x_batch, y_batch, model, optimizer) | |
step_progress = { | |
'train_loss': loss, | |
'train_acc': acc | |
} | |
step_progress['epoch'] = i_epoch | |
step_progress['step'] = get_or_create_global_step().numpy() | |
progress_train = progress_train.append(step_progress, | |
ignore_index=True) | |
print('Epoch done') | |
print(progress_train.loc[progress_train['epoch'] == i_epoch].mean()) | |
# Evaluation | |
for x_batch, y_batch in tqdm(dataset_test): | |
loss, acc, loss_adv, acc_adv = test_step(x_batch, y_batch, model, | |
fgsm) | |
step_progress = { | |
'epoch': i_epoch, | |
'test_loss': loss, | |
'test_loss_adv': loss_adv, | |
'test_acc': acc, | |
'test_acc_adv': acc_adv | |
} | |
progress_test = progress_test.append(step_progress, | |
ignore_index=True) | |
print('Evaluation done') | |
print(progress_test.loc[progress_test['epoch'] == i_epoch].mean()) | |
print(progress_test.groupby('epoch')['test_acc', 'test_acc_adv'].mean()) | |
def train_step(x_batch, y_batch, model, optimizer): | |
K.set_learning_phase(1) | |
# Compute the gradient for the legitimate batch only | |
with tf.GradientTape() as tape: | |
logits = model(x_batch, training=True) | |
loss = softmax_cross_entropy(y_batch, logits) | |
grads = tape.gradient(loss, model.variables) | |
optimizer.apply_gradients(zip(grads, model.variables), | |
global_step=get_or_create_global_step()) | |
# Compute the accuracies | |
acc = np.mean(np.equal(np.argmax(y_batch, axis=-1), | |
np.argmax(logits, axis=-1))) | |
return loss.numpy(), acc | |
def train_step_adv(x_batch, y_batch, model, optimizer, fgsm): | |
K.set_learning_phase(1) | |
# Compute the gradient for the legitimate batch | |
with tf.GradientTape() as tape: | |
logits = model(x_batch, training=True) | |
loss = softmax_cross_entropy(y_batch, logits) | |
loss *= ADVERSARIAL_ALPHA | |
grads_legit = tape.gradient(loss, model.variables) | |
# Adversarial training | |
DropoutReuseMask.set_reuse_on_model(model, REUSE_DROPOUT) | |
x_adv = fgsm.generate(x_batch, **FGSM_PARAMS) | |
with tf.GradientTape() as tape: | |
logits_adv = model(x_adv, training=True) | |
loss_adv = softmax_cross_entropy(y_batch, logits_adv) | |
loss_adv *= (1 - ADVERSARIAL_ALPHA) | |
grads_adv = tape.gradient(loss_adv, model.variables) | |
DropoutReuseMask.set_reuse_on_model(model, False) | |
# Train | |
grads = sum_gradients(grads_legit, grads_adv) | |
optimizer.apply_gradients(zip(grads, model.variables), | |
global_step=get_or_create_global_step()) | |
# Compute the accuracies | |
acc = np.mean(np.equal(np.argmax(y_batch, axis=-1), | |
np.argmax(logits, axis=-1))) | |
acc_adv = np.mean(np.equal(np.argmax(y_batch, axis=-1), | |
np.argmax(logits_adv, axis=-1))) | |
return loss.numpy(), acc, loss_adv.numpy(), acc_adv | |
def test_step(x_batch, y_batch, model, fgsm): | |
K.set_learning_phase(0) | |
# Feed the input | |
logits = model(x_batch, training=False) | |
loss = softmax_cross_entropy(y_batch, logits) | |
# Adversarial test | |
x_adv = fgsm.generate(x_batch, **FGSM_PARAMS) | |
logits_adv = model(x_adv, training=False) | |
loss_adv = softmax_cross_entropy(y_batch, logits_adv) | |
# Compute the accuracies | |
acc = np.mean(np.equal(np.argmax(y_batch, axis=-1), | |
np.argmax(logits, axis=-1))) | |
acc_adv = np.mean(np.equal(np.argmax(y_batch, axis=-1), | |
np.argmax(logits_adv, axis=-1))) | |
return loss.numpy(), acc, loss_adv.numpy(), acc_adv | |
def get_datasets(): | |
# Fetch and format the mnist data | |
(x_train, y_train), (x_test, y_test) = mnist.load_data() | |
# convert class vectors to binary class matrices | |
y_train = to_categorical(y_train, N_CLASSES) | |
y_test = to_categorical(y_test, N_CLASSES) | |
dataset_train = tf.data.Dataset.from_tensor_slices( | |
(tf.cast(x_train[..., tf.newaxis] / 255, tf.float32), | |
tf.cast(y_train, tf.float32))) | |
dataset_train = dataset_train.shuffle(1000).batch(BATCH_SIZE) | |
dataset_test = tf.data.Dataset.from_tensor_slices( | |
(tf.cast(x_test[..., tf.newaxis] / 255, tf.float32), | |
tf.cast(y_test, tf.float32))) | |
dataset_test = dataset_test.shuffle(1000).batch(BATCH_SIZE) | |
return dataset_train, dataset_test | |
def sum_gradients(grads_a, grads_b): | |
grads = [] | |
for g_a, g_b in zip(grads_a, grads_b): | |
if g_a is not None: | |
if g_b is not None: | |
grads.append(g_a + g_b) | |
else: | |
grads.append(g_b) | |
else: | |
grads.append(g_b) | |
return grads | |
class DropoutReuseMask(Dropout): | |
def __init__(self, *args, **kwargs): | |
self.mask = None | |
self.reuse = False | |
super(DropoutReuseMask, self).__init__(*args, **kwargs) | |
def call(self, inputs, *args, **kwargs): | |
if not self.reuse: | |
# Create a new mask | |
mask_input = tf.ones_like(inputs) | |
self.mask = super(DropoutReuseMask, self).call(mask_input, | |
*args, | |
**kwargs) | |
return inputs * self.mask | |
@staticmethod | |
def set_reuse_on_model(model: Model, reuse): | |
for layer in model.layers: | |
if isinstance(layer, DropoutReuseMask): | |
layer.reuse = reuse | |
class CleverhansModel(CleverhansBaseModel): | |
def __init__(self, model): | |
self.model = model | |
super(CleverhansModel, self).__init__(nb_classes=N_CLASSES) | |
def fprop(self, x, **kwargs): | |
return { | |
CleverhansBaseModel.O_LOGITS: self.model(x, **kwargs) | |
} | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment