Created
January 21, 2012 03:13
-
-
Save sarva/1651035 to your computer and use it in GitHub Desktop.
Streaming multiple files (multipart/form-data) PUT uploads to Tornado (uses a fork that supports custom body handlers)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This example requires the custom body handling tornado fork at https://github.com/joshmarshall/tornado. | |
Refer to http://groups.google.com/group/python-tornado/browse_thread/thread/6413ac33dd7444b0. | |
Supports uploading an unlimited number/size of files in a single | |
PUT multipart/form-data request. Each file is processed as the stream | |
finds the part in the form data. | |
==USAGE== | |
After starting this test tornado instance using the fork | |
at https://github.com/joshmarshall/tornado, this can be tested | |
using the curl command: | |
curl -X PUT -F file=@/path/to/file#1 -F file=@/path/to/file#2 http://localhost:8888 | |
Note that the RequestHandler is called *before* body streaming is started | |
so that custom file handlers can be used per RequestHandler. Once the event | |
callbacks are setup the RequestHandler must start the streaming process manually | |
with self.request.body_stream.start() | |
""" | |
import tornado.ioloop | |
import tornado.web | |
import tornado.httpserver | |
import tornado.httputil | |
import tornado.escape | |
import tempfile | |
import cgi | |
class MainHandler(tornado.web.RequestHandler): | |
@tornado.web.asynchronous | |
def put(self): | |
# directory to put uploaded files to | |
self.tmpdir = tempfile.mkdtemp(prefix="multipart-uploads-") | |
# setup callbacks for each part of the multiform data | |
self.request.body_stream.onpart = lambda filename, header: open("%s/%s" % (self.tmpdir, filename), "w") | |
self.request.body_stream.ondata = lambda file, chunk: file.write(chunk) | |
self.request.body_stream.onclose = lambda file: (file.close(), self.write("Finished upload of %s" % file.name), self.flush()) | |
self.request.body_stream.onfinish = lambda: self.finish() | |
# initialize streaming | |
self.request.body_stream.start() | |
class MultiPartBodyStream(tornado.httpserver.HTTPParseBody): | |
"""Process a multipart/form-data data stream""" | |
# flag to only accept form parts with file data (has a filename) | |
files_only = True | |
def __call__(self): | |
self.current_part = None | |
self.bytes_left = int(self.content_length) | |
self._buffer = "" | |
fields = self.content_type.split(";") | |
for field in fields: | |
k, sep, v = field.strip().partition("=") | |
if k == "boundary" and v: | |
if v.startswith(b'"') and v.endswith(b'"'): | |
self.boundary = tornado.escape.utf8(v[1:-1]) | |
else: | |
self.boundary = tornado.escape.utf8(v) | |
break | |
else: | |
raise tornado.httpserver._BadRequestException("Invalid multipart/form-data") | |
# make this body streamer accessable to the request callback | |
self.request.body_stream = self | |
# calling the request handler immediately which should manually start the | |
# streaming process once all the event handlers are setup | |
handler = self.context.request_callback(self.request) | |
# requires an asynchronous handler | |
assert not handler._auto_finish, "RequestHandler for %s must be asynchronous" % self.__class__.__name__ | |
def onpart(self, name, content_type): | |
raise NotImplementedError("Required onpart callback") | |
def ondata(self, part, data): | |
raise NotImplementedError("Required ondata callback") | |
def onclose(self, part): | |
raise NotImplementedError("Required onclose callback") | |
def onfinish(self): | |
raise NotImplementedError("Required onfinish callback") | |
def stream_data(self): | |
# don't like needing to access the iostream private read_buffer_size variable | |
# but it is needed to ensure we always consume as much as is available to avoid | |
# overflowing the max read buffer size on large uploads | |
self.stream.read_bytes(min(self.bytes_left, max(4096, self.stream._read_buffer_size)), self._stream) | |
# user-friend alias to get the stream started | |
start = stream_data | |
def close(self): | |
if self.current_part: | |
self.onerror(self.current_part) | |
def _stream(self, data): | |
self.bytes_left -= len(data) | |
data = self._buffer + data | |
delimiter = data.find(b"--%s" % self.boundary) | |
delimiter_len = len(b"--%s" % self.boundary) | |
eoh = None | |
if delimiter != -1: | |
data, self._buffer = data[0:delimiter], data[delimiter:] | |
eoh = self._buffer.find("\r\n\r\n") | |
else: | |
# leave the end of the chunk so the boundary does not get lost if | |
# it cutoff part-way | |
endlen = len(self.boundary) + 4 | |
data, self._buffer = data[0:-endlen], data[-endlen:] | |
# stream data to part handler | |
if data: | |
if self.current_part: | |
self.ondata(self.current_part, data) | |
# move on to the next part (or start) if we have a header in the buffer | |
if eoh >= 0: | |
self._header(self._buffer[delimiter_len+2:eoh]) | |
self._buffer = self._buffer[eoh+4:] | |
# check if the stream finished | |
if delimiter != -1 and self._buffer[delimiter_len:delimiter_len+2] == "--": | |
if self.current_part: | |
self.onclose(self.current_part) | |
self.current_part = None | |
return self.onfinish() | |
# continue streaming | |
self.stream_data() | |
def _header(self, header): | |
# close any open parts as they are done | |
if self.current_part: | |
self.onclose(self.current_part) | |
self.current_part = None | |
header_check = header.find(self.boundary) | |
if header_check != -1: | |
logging.warning("multipart/form-data missing headers") | |
header = header[header_check:] | |
# convert to dict | |
header = tornado.httputil.HTTPHeaders.parse(header.decode("utf-8")) | |
disp_header = header.get("Content-Disposition", "") | |
disposition, disp_params = cgi.parse_header(disp_header) | |
next = False | |
if disposition != "form-data": | |
logging.warning("Invalid multipart/form-data") | |
return | |
if not disp_params.get("name"): | |
logging.warning("multipart/form-data value missing name") | |
return | |
if self.files_only and "filename" not in disp_params: | |
# this part is invalid, move on to the next one | |
logging.warning("multipart/form-data part missing required file data") | |
return | |
# stream files non-blocking to the handler one file at a time | |
else: | |
self.current_part = self.onpart(disp_params["filename"] if self.files_only else disp_params["name"], header.get("Content-Type", "application/unknown")) | |
application = tornado.web.Application([ | |
(r"/", MainHandler), | |
]) | |
body_handlers = [ | |
("multipart/form-data.*", MultiPartBodyStream), | |
] | |
if __name__ == "__main__": | |
application.listen(8888, body_handlers=body_handlers) | |
tornado.ioloop.IOLoop.instance().start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment