from tornado import ioloop
from tornado import web
from tornado.util import unicode_type
+from tornado.options import options, define
try:
long
except NameError:
long = int
+define("port", default=9888, help="TCP port to listen on")
+define("root_directory", default="/tmp/s3", help="Root storage directory")
+define("bucket_depth", default=0, help="Bucket file system depth limit")
-def start(port, root_directory="/tmp/s3", bucket_depth=0):
+
+def start(port, root_directory, bucket_depth):
"""Starts the mock S3 server on the given port at the given path."""
application = S3Application(root_directory, bucket_depth)
http_server = httpserver.HTTPServer(application)
def render_xml(self, value):
assert isinstance(value, dict) and len(value) == 1
self.set_header("Content-Type", "application/xml; charset=UTF-8")
- name = value.keys()[0]
+ name = list(value.keys())[0]
parts = []
- parts.append('<' + escape.utf8(name) +
+ parts.append('<' + name +
' xmlns="http://doc.s3.amazonaws.com/2006-03-01">')
- self._render_parts(value.values()[0], parts)
- parts.append('</' + escape.utf8(name) + '>')
+ self._render_parts(value[name], parts)
+ parts.append('</' + name + '>')
self.finish('<?xml version="1.0" encoding="UTF-8"?>\n' +
''.join(parts))
elif isinstance(value, datetime.datetime):
parts.append(value.strftime("%Y-%m-%dT%H:%M:%S.000Z"))
elif isinstance(value, dict):
- for name, subvalue in value.iteritems():
+ for name, subvalue in value.items():
if not isinstance(subvalue, list):
subvalue = [subvalue]
for subsubvalue in subvalue:
- parts.append('<' + escape.utf8(name) + '>')
+ parts.append('<' + name + '>')
self._render_parts(subsubvalue, parts)
- parts.append('</' + escape.utf8(name) + '>')
+ parts.append('</' + name + '>')
else:
raise Exception("Unknown S3 value type %r", value)
os.unlink(path)
self.set_status(204)
self.finish()
+
+
+if __name__ == "__main__":
+ options.parse_command_line()
+ start(options.port, options.root_directory, options.bucket_depth)