from email import message_from_file
from email.header import Header, decode_header
from email.utils import parsedate_tz, mktime_tz
+from functools import reduce
import logging
import operator
import re
from django.conf import settings
from django.contrib.auth.models import User
from django.utils.log import AdminEmailHandler
+from django.utils import six
+from django.utils.six.moves import map
from patchwork.models import (
Patch, Project, Person, Comment, State, get_default_initial_patch_state)
return frag_str.decode(frag_encoding)
return frag_str.decode()
- fragments = map(decode, decode_header(header))
+ fragments = list(map(decode, decode_header(header)))
return normalise_space(u' '.join(fragments))
return reduce(operator.__concat__,
['%s: %s\n' % (k, Header(v, header_name=k,
continuation_ws='\t').encode())
- for (k, v) in mail.items()])
+ for (k, v) in list(mail.items())])
def find_pull_request(content):
def try_decode(payload, charset):
try:
- payload = unicode(payload, charset)
+ payload = six.text_type(payload, charset)
except UnicodeDecodeError:
return None
return payload
payload = part.get_payload(decode=True)
subtype = part.get_content_subtype()
- if not isinstance(payload, unicode):
+ if not isinstance(payload, six.text_type):
charset = part.get_content_charset()
# Check that we have a charset that we understand. Otherwise,
def list_logging_levels():
"""Give a summary of all available logging levels."""
- return sorted(VERBOSITY_LEVELS.keys(),
+ return sorted(list(VERBOSITY_LEVELS.keys()),
key=lambda x: VERBOSITY_LEVELS[x])
parser.add_argument('infile', nargs='?', type=argparse.FileType('r'),
from __future__ import absolute_import
-from urllib import quote
-
from django.contrib.auth.models import User
from django.utils.html import escape
from django.utils.safestring import mark_safe
+from django.utils import six
+from django.utils.six.moves.urllib.parse import quote
from patchwork.models import Person, State
pass
def parse(self, dict):
- if self.param not in dict.keys():
+ if self.param not in dict:
return
self._set_key(dict[self.param])
def _set_key(self, str):
self.archive_state = False
self.applied = True
- for (k, v) in self.param_map.iteritems():
+ for (k, v) in self.param_map.items():
if str == v:
self.archive_state = k
if self.archive_state == None:
class Filters:
def __init__(self, request):
- self._filters = map(lambda c: c(self), filterclasses)
+ self._filters = [c(self) for c in filterclasses]
self.dict = request.GET
self.project = None
def querystring(self, remove = None):
params = dict(self.params())
- for (k, v) in self.dict.iteritems():
+ for (k, v) in self.dict.items():
if k not in params:
params[k] = v
if remove is not None:
- if remove.param in params.keys():
+ if remove.param in list(params.keys()):
del params[remove.param]
- pairs = params.iteritems()
-
def sanitise(s):
- if not isinstance(s, basestring):
- s = unicode(s)
+ if not isinstance(s, six.string_types):
+ s = six.text_type(s)
return quote(s.encode('utf-8'))
return '?' + '&'.join(['%s=%s' % (sanitise(k), sanitise(v))
- for (k, v) in pairs])
+ for (k, v) in list(params.items())])
def querystring_without_filter(self, filter):
return self.querystring(filter)
def applied_filters(self):
- return filter(lambda x: x.applied, self._filters)
+ return [x for x in self._filters if x.applied]
def available_filters(self):
return self._filters
from django.conf import settings
from django.core import paginator
+from django.utils.six.moves import range
DEFAULT_PATCHES_PER_PAGE = 100
import hashlib
import re
+from django.utils.six.moves import map
+
_hunk_re = re.compile('^\@\@ -\d+(?:,(\d+))? \+\d+(?:,(\d+))? \@\@')
_filename_re = re.compile('^(---|\+\+\+) (\S+)')
return 1
return int(x)
- lc = map(fn, match.groups())
+ lc = list(map(fn, match.groups()))
state = 4
patchbuf += buf + line
if not x:
return 1
return int(x)
- line_nos = map(fn, hunk_match.groups())
+ line_nos = list(map(fn, hunk_match.groups()))
line = '@@ -%d +%d @@' % tuple(line_nos)
elif line[0] in prefixes:
def __init__(self, kwargs):
super(ListURLNode, self).__init__(None, [], {}, False)
self.params = {}
- for (k, v) in kwargs.iteritems():
+ for (k, v) in kwargs.items():
if k in list_params:
self.params[k] = v
except Exception:
pass
- for (k, v) in self.params.iteritems():
+ for (k, v) in self.params.items():
params[smart_str(k, 'ascii')] = v.resolve(context)
if not params:
return str
return str + '?' + '&'.join(
- ['%s=%s' % (k, escape(v)) for (k, v) in params.iteritems()])
+ ['%s=%s' % (k, escape(v)) for (k, v) in list(params.items())])
@register.tag
from django import template
from django.utils.html import escape
from django.utils.safestring import mark_safe
+from django.utils.six.moves import map
register = template.Library()
(r, str) = t
return (re.compile(r, re.M | re.I), str)
-_patch_span_res = map(_compile, [
+_patch_span_res = list(map(_compile, [
('^(Index:?|diff|\-\-\-|\+\+\+|\*\*\*) .*$', 'p_header'),
('^\+.*$', 'p_add'),
('^-.*$', 'p_del'),
('^!.*$', 'p_mod'),
- ])
+ ]))
_patch_chunk_re = \
re.compile('^(@@ \-\d+(?:,\d+)? \+\d+(?:,\d+)? @@)(.*)$', re.M | re.I)
-_comment_span_res = map(_compile, [
+_comment_span_res = list(map(_compile, [
('^\s*Signed-off-by: .*$', 'signed-off-by'),
('^\s*Acked-by: .*$', 'acked-by'),
('^\s*Nacked-by: .*$', 'nacked-by'),
('^\s*Reviewed-by: .*$', 'reviewed-by'),
('^\s*From: .*$', 'from'),
('^\s*>.*$', 'quote'),
- ])
+ ]))
_span = '<span class="%s">%s</span>'
from patchwork.models import Patch, Bundle, BundlePatch, Person
from patchwork.tests.utils import defaults, create_user, find_in_context
+from django.utils.six.moves import range, zip
def bundle_url(bundle):
return '/bundle/%s/%s/' % (bundle.owner.username, bundle.name)
bundle.delete()
def testBundleForwardOrder(self):
- ids = map(lambda p: p.id, self.patches)
+ ids = [p.id for p in self.patches]
self._testOrder(ids, self.patches)
def testBundleReverseOrder(self):
- ids = map(lambda p: p.id, self.patches)
+ ids = [p.id for p in self.patches]
ids.reverse()
self._testOrder(ids, self.patches)
# check if order field is still sequential:
order_numbers = [ bp.order for bp in bps ]
- expected_order = range(1, len(neworder)+1) # [1 ... len(neworder)]
+ expected_order = list(range(1, len(neworder)+1)) # [1 ... len(neworder)]
self.failUnlessEqual(order_numbers, expected_order)
def testBundleReorderAll(self):
self.assertEqual(self.patch.check_set.count(), total)
- for state in state_counts.keys():
+ for state in state_counts:
self.assertEqual(counts[state], state_counts[state])
# also check the ones we didn't explicitly state
from django.core.urlresolvers import reverse
from django.test.client import Client
from django.test import TestCase
+from django.utils.six.moves import zip
from patchwork.models import Person, Patch
from patchwork.tests.utils import defaults, create_user, find_in_context
ids = self._extract_patch_ids(response)
self.assertTrue(bool(ids))
patches = [ Patch.objects.get(id = i) for i in ids ]
- pairs = zip(patches, patches[1:])
+ pairs = list(zip(patches, patches[1:]))
[ test_fn(p1, p2) for (p1, p2) in pairs ]
def testDateOrder(self):
from django.test.client import Client
from django.test import TestCase
+from django.utils.six.moves import map, range
from patchwork.models import EmailConfirmation, Person, Bundle
Person(name = "Test Name", email = "test1@example.com"),
Person(email = "test2@example.com"),
]
- map(lambda p: p.save(), self.people)
+ list(map(lambda p: p.save(), self.people))
def testNameComplete(self):
response = self.client.get('/submitter/', {'q': 'name'})
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import unittest
-import xmlrpclib
from django.conf import settings
from django.core.urlresolvers import reverse
from django.test import LiveServerTestCase
+from django.utils.six.moves import xmlrpc_client
from patchwork.models import Person, Patch
from patchwork.tests.utils import defaults
def setUp(self):
self.url = (self.live_server_url +
reverse('patchwork.views.xmlrpc.xmlrpc'))
- self.rpc = xmlrpclib.Server(self.url)
+ self.rpc = xmlrpc_client.Server(self.url)
def testGetRedirect(self):
response = self.client.patch(self.url)
if encoding is not None:
f = codecs.open(file_path, encoding = encoding)
else:
- f = file(file_path)
+ f = open(file_path)
return f.read()
str = str[1:]
reversed = True
- if str not in self.order_map.keys():
+ if str not in self.order_map:
return
self.order = str
conf_settings.DEFAULT_FROM_EMAIL, [email])
context['email'] = mail
context['email_sent'] = True
- except Exception, ex:
+ except Exception as ex:
context['error'] = 'An error occurred during confirmation . ' + \
'Please try again later.'
context['admins'] = conf_settings.ADMINS
bundle.append_patch(patch)
bundle.save()
context.add_message('Patch added to bundle "%s"' % bundle.name)
- except Exception, ex:
+ except Exception as ex:
context.add_message("Couldn't add patch '%s' to bundle %s: %s" \
% (patch.name, bundle.name, ex.message))
from __future__ import absolute_import
import base64
-from DocXMLRPCServer import XMLRPCDocGenerator
-from SimpleXMLRPCServer import SimpleXMLRPCDispatcher
+# NOTE(stephenfin) six does not seem to support this
+try:
+ from DocXMLRPCServer import XMLRPCDocGenerator
+except ImportError:
+ from xmlrpc.server import XMLRPCDocGenerator
import sys
-import xmlrpclib
from django.core import urlresolvers
from django.contrib.auth import authenticate
from django.http import (
HttpResponse, HttpResponseRedirect, HttpResponseServerError)
from django.views.decorators.csrf import csrf_exempt
+from django.utils import six
+from django.utils.six.moves import map, xmlrpc_client
+from django.utils.six.moves.xmlrpc_server import SimpleXMLRPCDispatcher
from patchwork.models import Patch, Project, Person, State, Check
from patchwork.views import patch_to_mbox
def _dumps(obj, *args, **kwargs):
kwargs['allow_none'] = self.allow_none
kwargs['encoding'] = self.encoding
- return xmlrpclib.dumps(obj, *args, **kwargs)
+ return xmlrpc_client.dumps(obj, *args, **kwargs)
self.dumps = _dumps
return authenticate(username=username, password=password)
def _dispatch(self, request, method, params):
- if method not in self.func_map.keys():
+ if method not in list(self.func_map.keys()):
raise Exception('method "%s" is not supported' % method)
auth_required, fn = self.func_map[method]
def _marshaled_dispatch(self, request):
try:
- params, method = xmlrpclib.loads(request.body)
+ params, method = six.moves.xmlrpc_client.loads(request.body)
response = self._dispatch(request, method, params)
# wrap response in a singleton tuple
response = (response,)
response = self.dumps(response, methodresponse=1)
- except xmlrpclib.Fault, fault:
+ except six.moves.xmlrpc_client.Fault as fault:
response = self.dumps(fault)
except:
# report exception back to server
response = self.dumps(
- xmlrpclib.Fault(1, '%s:%s' % (sys.exc_type, sys.exc_value)),
+ six.moves.xmlrpc_client.Fault(1, '%s:%s' % (sys.exc_info()[0], sys.exc_info()[1])),
)
return response
'id': obj.id,
'email': obj.email,
'name': name,
- 'user': unicode(obj.user).encode('utf-8'),
+ 'user': six.text_type(obj.user).encode('utf-8'),
}
"""
return {
'id': obj.id,
- 'date': unicode(obj.date).encode('utf-8'),
+ 'date': six.text_type(obj.date).encode('utf-8'),
'filename': obj.filename(),
'msgid': obj.msgid,
'name': obj.name,
- 'project': unicode(obj.project).encode('utf-8'),
+ 'project': six.text_type(obj.project).encode('utf-8'),
'project_id': obj.project_id,
- 'state': unicode(obj.state).encode('utf-8'),
+ 'state': six.text_type(obj.state).encode('utf-8'),
'state_id': obj.state_id,
'archived': obj.archived,
- 'submitter': unicode(obj.submitter).encode('utf-8'),
+ 'submitter': six.text_type(obj.submitter).encode('utf-8'),
'submitter_id': obj.submitter_id,
- 'delegate': unicode(obj.delegate).encode('utf-8'),
+ 'delegate': six.text_type(obj.delegate).encode('utf-8'),
'delegate_id': max(obj.delegate_id, 0),
'commit_ref': max(obj.commit_ref, ''),
}
object which is OK to send to the client."""
return {
'id': obj.id,
- 'date': unicode(obj.date).encode('utf-8'),
- 'patch': unicode(obj.patch).encode('utf-8'),
+ 'date': six.text_type(obj.date).encode('utf-8'),
+ 'patch': six.text_type(obj.patch).encode('utf-8'),
'patch_id': obj.patch_id,
- 'user': unicode(obj.user).encode('utf-8'),
+ 'user': six.text_type(obj.user).encode('utf-8'),
'user_id': obj.user_id,
'state': obj.get_state_display(),
'target_url': obj.target_url,
if max_count > 0:
return map(project_to_dict, projects)[:max_count]
else:
- return map(project_to_dict, projects)
+ return list(map(project_to_dict, projects))
except Project.DoesNotExist:
return []
if max_count > 0:
return map(person_to_dict, people)[:max_count]
else:
- return map(person_to_dict, people)
+ return list(map(person_to_dict, people))
except Person.DoesNotExist:
return []
patches = Patch.objects.filter(**dfilter)
if max_count > 0:
- return map(patch_to_dict, patches[:max_count])
+ return [patch_to_dict(patch) for patch in patches[:max_count]]
else:
- return map(patch_to_dict, patches)
+ return [patch_to_dict(patch) for patch in patches]
except Patch.DoesNotExist:
return []
if not patch.is_editable(user):
raise Exception('No permissions to edit this patch')
- for (k, v) in params.iteritems():
+ for (k, v) in params.items():
if k not in ok_params:
continue
if max_count > 0:
return map(state_to_dict, states)[:max_count]
else:
- return map(state_to_dict, states)
+ return list(map(state_to_dict, states))
except State.DoesNotExist:
return []
checks = Check.objects.filter(**dfilter)
if max_count > 0:
- return map(check_to_dict, checks[:max_count])
+ return list(map(check_to_dict, checks[:max_count]))
else:
- return map(check_to_dict, checks)
+ return list(map(check_to_dict, checks))
except Check.DoesNotExist:
return []