Last active
December 21, 2015 22:39
-
-
Save shackenberg/6376677 to your computer and use it in GitHub Desktop.
Comparison of a forward function written in theano with the numpy version.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from numpy import zeros, dot, exp, tanh, array, allclose | |
from numpy.random import randn | |
from copy import deepcopy | |
from time import time | |
from theano import tensor as T | |
from theano import function, shared, config | |
"""Flags: | |
export THEANO_FLAGS=mode=FAST_RUN,device=gpu,floatX=float32 | |
""" | |
FLOAT_PRECISION = config.floatX | |
class Network: | |
def __init__(self,ni,ns,initial=0.1,maxlen=2500): | |
na = 1+ni+ns | |
self.dims = ni,ns,na | |
self.init_variables(initial) | |
def init_variables(self,initial,maxlen=2500): | |
n = maxlen | |
ni,ns,na = self.dims | |
self.WGI = array(randn(ns,na)*initial, dtype=FLOAT_PRECISION) | |
self.WGO = array(randn(ns,na)*initial, dtype=FLOAT_PRECISION) | |
self.WCI = array(randn(ns,na)*initial, dtype=FLOAT_PRECISION) | |
self.source = array(zeros([n,na]), dtype=FLOAT_PRECISION) | |
self.cix = array(zeros([n,ns]), dtype=FLOAT_PRECISION) | |
self.ci = array(zeros([n,ns]), dtype=FLOAT_PRECISION) | |
self.gix = array(zeros([n,ns]), dtype=FLOAT_PRECISION) | |
self.gi = array(zeros([n,ns]), dtype=FLOAT_PRECISION) | |
self.gox = array(zeros([n,ns]), dtype=FLOAT_PRECISION) | |
self.go = array(zeros([n,ns]), dtype=FLOAT_PRECISION) | |
self.state = array(zeros([n,ns]), dtype=FLOAT_PRECISION) | |
self.output = array(zeros([n,ns]), dtype=FLOAT_PRECISION) | |
def forward(self,xs): | |
def ffunc(x): | |
return 1.0/(1.0+exp(-x)) | |
ni,ns,na = self.dims | |
n = len(xs) | |
for t in range(n): | |
self.source[t,0] = 1 | |
self.source[t,1:1+ni] = xs[t] | |
self.source[t,1+ni:] = zeros(ns) | |
dot(self.WGI,self.source[t],out=self.gix[t]) | |
dot(self.WGO,self.source[t],out=self.gox[t]) | |
dot(self.WCI,self.source[t],out=self.cix[t]) | |
self.gi[t] = ffunc(self.gix[t]) | |
self.ci[t] = tanh(self.cix[t]) | |
self.state[t] = self.ci[t]*self.gi[t] | |
self.go[t] = ffunc(self.gox[t]) | |
self.output[t] = tanh(self.state[t]) * self.go[t] | |
return self.output[:n] | |
class Network_Theano(Network): | |
def __init__(self, original_net, ni,ns,maxlen=2500): | |
na = 1+ni+ns | |
self.copy_weights(original_net) | |
# to make sure, both networks produce the same results | |
self.uploadweightsTheano() | |
self.initforwardTheano(ns, maxlen, na) | |
def copy_weights(self, original_net): | |
self.WGI = deepcopy(original_net.WGI) | |
self.WGO = deepcopy(original_net.WGO) | |
self.WCI = deepcopy(original_net.WCI) | |
def uploadweightsTheano(self): | |
self.TWGI_shared = shared(self.WGI) | |
self.TWGO_shared = shared(self.WGO) | |
self.TWCI_shared = shared(self.WCI) | |
def initforwardTheano(self, ns, n, na): | |
def Tffunc(x): | |
Tone = array([1.0], dtype=FLOAT_PRECISION) | |
return Tone/(Tone+T.exp(-x)) | |
self.Toutput = shared(zeros([n, ns], dtype=FLOAT_PRECISION)) | |
self.Tstate_shared = shared(zeros([n, ns], dtype=FLOAT_PRECISION)) | |
self.Tgo_shared = shared(zeros([n, ns], dtype=FLOAT_PRECISION)) | |
self.Tsource_shared = shared(zeros([n, na], dtype=FLOAT_PRECISION)) | |
self.Tgi_shared = shared(zeros([n, ns], dtype=FLOAT_PRECISION)) | |
self.Tci_shared = shared(zeros([n, ns], dtype=FLOAT_PRECISION)) | |
self.Txs_shared = shared(zeros([n, ns], dtype=FLOAT_PRECISION)) | |
Tstate = T.vector('Tstate') | |
Tt = T.iscalar('Tt') | |
Txs = self.Txs_shared[Tt] | |
Tzeros = T.zeros([ns], dtype=FLOAT_PRECISION) | |
Tsource = T.vector('Tsource') | |
Tone = array([1.0], dtype=FLOAT_PRECISION) | |
Tsource = T.concatenate([Tone, Txs, Tzeros]) | |
Tgix = T.dot(self.TWGI_shared, Tsource) | |
Tgox = T.dot(self.TWGO_shared, Tsource) | |
Tcix = T.dot(self.TWCI_shared, Tsource) | |
Tgi = Tffunc(Tgix) | |
Tci = T.tanh(Tcix) | |
Tstate = Tci * Tgi | |
Tgo = Tffunc(Tgox) | |
TToutput = (self.Toutput, T.set_subtensor(self.Toutput[Tt], T.tanh(Tstate) * Tgo)) | |
TTstate = (self.Tstate_shared, T.set_subtensor(self.Tstate_shared[Tt], Tstate)) | |
TTsource = (self.Tsource_shared, T.set_subtensor(self.Tsource_shared[Tt], Tsource)) | |
TTgo = (self.Tgo_shared, T.set_subtensor(self.Tgo_shared[Tt], Tgo)) | |
TTgi = (self.Tgi_shared, T.set_subtensor(self.Tgi_shared[Tt], Tgi)) | |
TTci = (self.Tci_shared, T.set_subtensor(self.Tci_shared[Tt], Tci)) | |
updates = [TTgo, TToutput, TTstate, TTsource, TTgi, TTci] | |
self.Tforward = function([Tt], updates=updates) | |
def forward(self, xs): | |
n = len(xs) | |
for t in range(n): | |
self.Tforward(t) | |
# init | |
ninput = 48 | |
nstates = 100 | |
seqlength = 1000 | |
network_orig = Network(ninput, nstates) | |
network_theano = Network_Theano(network_orig, ninput, nstates) | |
data = array(randn(seqlength, ninput), dtype=FLOAT_PRECISION) | |
# numpy | |
starttime = time() | |
output = network_orig.forward(data) | |
print "nympy takes {}s".format(time() - starttime) | |
# theano | |
network_theano.Txs_shared.set_value(data) | |
starttime = time() | |
network_theano.forward(data) | |
print "theano takes {}s".format(time() - starttime) | |
output_theano = network_theano.Toutput.get_value()[:seqlength] | |
#check the results | |
if not allclose(output, output_theano, rtol=1e-04, atol=1e-07): | |
import pdb; pdb.set_trace() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment