...using the 'autopep8' tool.
Signed-off-by: Stephen Finucane <stephen.finucane@intel.com>
DEFAULT_URL = "http://patchwork/xmlrpc/"
CONFIG_FILE = os.path.expanduser('~/.pwclientrc')
+
class Filter(object):
+
"""Filter for selecting patches."""
+
def __init__(self):
# These fields refer to specific objects, so they are special
# because we have to resolve them to IDs before passing the
if self.state != "":
id = state_id_by_name(rpc, self.state)
if id == 0:
- sys.stderr.write("Note: No State found matching %s*, " \
+ sys.stderr.write("Note: No State found matching %s*, "
"ignoring filter\n" % self.state)
else:
self.d['state_id'] = id
if self.project != None:
id = project_id_by_name(rpc, self.project)
if id == 0:
- sys.stderr.write("Note: No Project found matching %s, " \
+ sys.stderr.write("Note: No Project found matching %s, "
"ignoring filter\n" % self.project)
else:
self.d['project_id'] = id
"""Return human-readable description of the filter."""
return str(self.d)
+
class BasicHTTPAuthTransport(xmlrpclib.SafeTransport):
- def __init__(self, username = None, password = None, use_https = False):
+ def __init__(self, username=None, password=None, use_https=False):
self.username = username
self.password = password
self.use_https = use_https
fn = xmlrpclib.Transport.make_connection
return fn(self, host)
+
def project_id_by_name(rpc, linkname):
"""Given a project short name, look up the Project ID."""
if len(linkname) == 0:
return project['id']
return 0
+
def state_id_by_name(rpc, name):
"""Given a partial state name, look up the state ID."""
if len(name) == 0:
return state['id']
return 0
+
def person_ids_by_name(rpc, name):
"""Given a partial name or email address, return a list of the
person IDs that match."""
people = rpc.person_list(name, 0)
return [x['id'] for x in people]
+
def list_patches(patches, format_str=None):
"""Dump a list of patches to stdout."""
if format_str:
print("%-7s %-12s %s" % ("ID", "State", "Name"))
print("%-7s %-12s %s" % ("--", "-----", "----"))
for patch in patches:
- print("%-7d %-12s %s" % (patch['id'], patch['state'], patch['name']))
+ print("%-7d %-12s %s" %
+ (patch['id'], patch['state'], patch['name']))
+
def action_list(rpc, filter, submitter_str, delegate_str, format_str=None):
filter.resolve_ids(rpc)
if submitter_str != None:
ids = person_ids_by_name(rpc, submitter_str)
if len(ids) == 0:
- sys.stderr.write("Note: Nobody found matching *%s*\n" % \
+ sys.stderr.write("Note: Nobody found matching *%s*\n" %
submitter_str)
else:
for id in ids:
if delegate_str != None:
ids = person_ids_by_name(rpc, delegate_str)
if len(ids) == 0:
- sys.stderr.write("Note: Nobody found matching *%s*\n" % \
+ sys.stderr.write("Note: Nobody found matching *%s*\n" %
delegate_str)
else:
for id in ids:
patches = rpc.patch_list(filter.d)
list_patches(patches, format_str)
+
def action_projects(rpc):
projects = rpc.project_list("", 0)
print("%-5s %-24s %s" % ("ID", "Name", "Description"))
print("%-5s %-24s %s" % ("--", "----", "-----------"))
for project in projects:
- print("%-5d %-24s %s" % (project['id'], \
- project['linkname'], \
- project['name']))
+ print("%-5d %-24s %s" % (project['id'],
+ project['linkname'],
+ project['name']))
+
def action_checks(rpc):
checks = rpc.check_list("", 0)
for check in checks:
print("%d (for '%s')" % (check['id'], check['patch']))
+
def action_states(rpc):
states = rpc.state_list("", 0)
print("%-5s %s" % ("ID", "Name"))
for state in states:
print("%-5d %s" % (state['id'], state['name']))
+
def action_info(rpc, patch_id):
patch = rpc.patch_get(patch_id)
s = "Information for patch id %d" % (patch_id)
for key, value in sorted(patch.items()):
print("- %- 14s: %s" % (key, unicode(value).encode("utf-8")))
+
def action_get(rpc, patch_id):
patch = rpc.patch_get(patch_id)
s = rpc.patch_get_mbox(patch_id)
sys.stderr.write("Failed to write to %s\n" % fname)
sys.exit(1)
+
def action_apply(rpc, patch_id, apply_cmd=None):
patch = rpc.patch_get(patch_id)
if patch == {}:
- sys.stderr.write("Error getting information on patch ID %d\n" % \
+ sys.stderr.write("Error getting information on patch ID %d\n" %
patch_id)
sys.exit(1)
if apply_cmd is None:
- print('Applying patch #%d to current directory' % patch_id)
- apply_cmd = ['patch', '-p1']
+ print('Applying patch #%d to current directory' % patch_id)
+ apply_cmd = ['patch', '-p1']
else:
- print('Applying patch #%d using %s' %
- (patch_id, repr(' '.join(apply_cmd))))
+ print('Applying patch #%d using %s' %
+ (patch_id, repr(' '.join(apply_cmd))))
print('Description: %s' % patch['name'])
s = rpc.patch_get_mbox(patch_id)
if len(s) > 0:
- proc = subprocess.Popen(apply_cmd, stdin = subprocess.PIPE)
+ proc = subprocess.Popen(apply_cmd, stdin=subprocess.PIPE)
proc.communicate(unicode(s).encode('utf-8'))
return proc.returncode
else:
sys.stderr.write("Error: No patch content found\n")
sys.exit(1)
-def action_update_patch(rpc, patch_id, state = None, archived = None, commit = None):
+
+def action_update_patch(rpc, patch_id, state=None, archived=None, commit=None):
patch = rpc.patch_get(patch_id)
if patch == {}:
- sys.stderr.write("Error getting information on patch ID %d\n" % \
+ sys.stderr.write("Error getting information on patch ID %d\n" %
patch_id)
sys.exit(1)
if not success:
sys.stderr.write("Patch not updated\n")
+
def patch_id_from_hash(rpc, project, hash):
try:
patch = rpc.patch_get_by_project_hash(project, hash)
auth_actions = ['update']
+
def main():
hash_parser = argparse.ArgumentParser(add_help=False)
hash_parser.add_argument(
help='''Filter by patch state (e.g., 'New', 'Accepted', etc.)'''
)
filter_parser.add_argument(
- '-a', choices=['yes','no'],
+ '-a', choices=['yes', 'no'],
help='''Filter by patch archived state'''
)
filter_parser.add_argument(
filter_parser.add_argument(
'-f', metavar='FORMAT',
help='''Print output in the given format. You can use tags matching '''
- '''fields, e.g. %%{id}, %%{state}, or %%{msgid}.'''
+ '''fields, e.g. %%{id}, %%{state}, or %%{msgid}.'''
)
filter_parser.add_argument(
'patch_name', metavar='STR', nargs='?',
)
update_parser.set_defaults(subcmd='update')
list_parser = subparsers.add_parser("list",
- #aliases=['search'],
- parents=[filter_parser],
- help='''List patches, using the optional filters specified
+ # aliases=['search'],
+ parents=[filter_parser],
+ help='''List patches, using the optional filters specified
below and an optional substring to search for patches
by name'''
- )
+ )
list_parser.set_defaults(subcmd='list')
search_parser = subparsers.add_parser("search",
- parents=[filter_parser],
- help='''Alias for "list"'''
- )
+ parents=[filter_parser],
+ help='''Alias for "list"'''
+ )
# Poor man's argparse aliases:
# We register the "search" parser but effectively use "list" for the
# help-text.
if args.get('hash') and len(args.get('id')):
# mimic mutual exclusive group
locals()[action + '_parser'].error(
- "[-h HASH] and [ID [ID ...]] are mutually exlusive")
+ "[-h HASH] and [ID [ID ...]] are mutually exlusive")
# set defaults
filt = Filter()
if args.get('c'):
# update multiple IDs with a single commit-hash does not make sense
if action == 'update' and patch_ids and len(patch_ids) > 1:
- update_parser.error("Declining update with COMMIT-REF on multiple IDs")
+ update_parser.error(
+ "Declining update with COMMIT-REF on multiple IDs")
commit_str = args.get('c')
if state_str is None and archived_str is None and action == 'update':
- update_parser.error('Must specify one or more update options (-a or -s)')
+ update_parser.error(
+ 'Must specify one or more update options (-a or -s)')
if args.get('n') != None:
try:
if not config.has_section('options') and os.path.exists(CONFIG_FILE):
sys.stderr.write('~/.pwclientrc is in the old format. Migrating it...')
- old_project = config.get('base','project')
+ old_project = config.get('base', 'project')
new_config = ConfigParser.ConfigParser()
new_config.add_section('options')
- new_config.set('options','default',old_project)
+ new_config.set('options', 'default', old_project)
new_config.add_section(old_project)
- new_config.set(old_project,'url',config.get('base','url'))
+ new_config.set(old_project, 'url', config.get('base', 'url'))
if config.has_option('auth', 'username'):
- new_config.set(old_project,'username',config.get('auth','username'))
+ new_config.set(
+ old_project, 'username', config.get('auth', 'username'))
if config.has_option('auth', 'password'):
- new_config.set(old_project,'password',config.get('auth','password'))
+ new_config.set(
+ old_project, 'password', config.get('auth', 'password'))
old_config_file = CONFIG_FILE + '.orig'
- shutil.copy2(CONFIG_FILE,old_config_file)
+ shutil.copy2(CONFIG_FILE, old_config_file)
with open(CONFIG_FILE, 'wb') as fd:
new_config.write(fd)
sys.stderr.write(' Done.\n')
- sys.stderr.write('Your old ~/.pwclientrc was saved to %s\n' % old_config_file)
- sys.stderr.write('and was converted to the new format. You may want to\n')
+ sys.stderr.write(
+ 'Your old ~/.pwclientrc was saved to %s\n' % old_config_file)
+ sys.stderr.write(
+ 'and was converted to the new format. You may want to\n')
sys.stderr.write('inspect it before continuing.\n')
sys.exit(1)
try:
project_str = config.get('options', 'default')
except:
- action_parser.error("No default project configured in ~/.pwclientrc")
+ action_parser.error(
+ "No default project configured in ~/.pwclientrc")
if not config.has_section(project_str):
- sys.stderr.write('No section for project %s in ~/.pwclientrc\n' % project_str)
+ sys.stderr.write(
+ 'No section for project %s in ~/.pwclientrc\n' % project_str)
sys.exit(1)
if not config.has_option(project_str, 'url'):
- sys.stderr.write('No URL for project %s in ~/.pwclientrc\n' % project_str)
+ sys.stderr.write(
+ 'No URL for project %s in ~/.pwclientrc\n' % project_str)
sys.exit(1)
if not do_signoff and config.has_option('options', 'signoff'):
do_signoff = config.getboolean('options', 'signoff')
use_https = url.startswith('https')
- transport = BasicHTTPAuthTransport( \
- config.get(project_str, 'username'),
- config.get(project_str, 'password'),
- use_https)
+ transport = BasicHTTPAuthTransport(
+ config.get(project_str, 'username'),
+ config.get(project_str, 'password'),
+ use_https)
else:
sys.stderr.write(("The %s action requires authentication, "
- "but no username or password\nis configured\n") % action)
+ "but no username or password\nis configured\n") % action)
sys.exit(1)
if project_str:
filt.add("msgid", msgid_str)
try:
- rpc = xmlrpclib.Server(url, transport = transport)
+ rpc = xmlrpclib.Server(url, transport=transport)
except:
sys.stderr.write("Unable to connect to %s\n" % url)
sys.exit(1)
"""Error out if no patch IDs were specified"""
if patch_ids == None or len(patch_ids) < 1:
sys.stderr.write("Error: Missing Argument! " +
- "Either [-h HASH] or [ID [ID ...]] are required\n")
+ "Either [-h HASH] or [ID [ID ...]] are required\n")
if h:
h.print_help()
sys.exit(1)
elif action == 'update':
for patch_id in non_empty(h, patch_ids):
- action_update_patch(rpc, patch_id, state = state_str,
- archived = archived_str, commit = commit_str
- )
+ action_update_patch(rpc, patch_id, state=state_str,
+ archived=archived_str, commit=commit_str
+ )
else:
sys.stderr.write("Unknown action '%s'\n" % action)
import subprocess
import sys
+
def commits(options, revlist):
cmd = ['git', 'rev-list', revlist]
- proc = subprocess.Popen(cmd, stdout = subprocess.PIPE, cwd = options.repodir)
+ proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, cwd=options.repodir)
revs = []
return revs
+
def commit(options, rev):
cmd = ['git', 'diff', '%(rev)s^..%(rev)s' % {'rev': rev}]
- proc = subprocess.Popen(cmd, stdout = subprocess.PIPE, cwd = options.repodir)
+ proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, cwd=options.repodir)
buf = proc.communicate()[0]
def main(args):
- parser = OptionParser(usage = '%prog [options] revspec')
- parser.add_option("-p", "--project", dest = "project", action = 'store',
- help="use project PROJECT", metavar="PROJECT")
- parser.add_option("-d", "--dir", dest = "repodir", action = 'store',
- help="use git repo in DIR", metavar="DIR")
+ parser = OptionParser(usage='%prog [options] revspec')
+ parser.add_option("-p", "--project", dest="project", action='store',
+ help="use project PROJECT", metavar="PROJECT")
+ parser.add_option("-d", "--dir", dest="repodir", action='store',
+ help="use git repo in DIR", metavar="DIR")
(options, args) = parser.parse_args(args[1:])
if __name__ == '__main__':
sys.exit(main(sys.argv))
-
class Filter(object):
+
def __init__(self, filters):
self.filters = filters
self.applied = False
def form(self):
if self.forced:
return mark_safe('<input type="hidden" value="%s">%s' % (self.param,
- self.condition()))
+ self.condition()))
return self.condition()
return self._form()
class SubmitterFilter(Filter):
param = 'submitter'
+
def __init__(self, filters):
super(SubmitterFilter, self).__init__(filters)
self.name = 'Submitter'
return
if submitter_id:
- self.person = Person.objects.get(id = int(str))
+ self.person = Person.objects.get(id=int(str))
self.applied = True
return
-
- people = Person.objects.filter(name__icontains = str)
+ people = Person.objects.filter(name__icontains=str)
if not people:
return
user = self.person.user
if user:
return {'submitter__in':
- Person.objects.filter(user = user).values('pk').query}
+ Person.objects.filter(user=user).values('pk').query}
return {'submitter': self.person}
if self.person_match:
return ''
def _form(self):
- return mark_safe(('<input type="text" name="submitter" ' + \
+ return mark_safe(('<input type="text" name="submitter" ' +
'id="submitter_input" class="form-control">'))
def key(self):
return self.person.id
return self.person_match
+
class StateFilter(Filter):
param = 'state'
any_key = '*'
if self.state is not None:
return {'state': self.state}
else:
- return {'state__in': \
- State.objects.filter(action_required = True) \
- .values('pk').query}
+ return {'state__in':
+ State.objects.filter(action_required=True)
+ .values('pk').query}
def condition(self):
if self.state:
str += '<option value="%d" %s>%s</option>' % \
(state.id, selected, state.name)
str += '</select>'
- return mark_safe(str);
+ return mark_safe(str)
def form_function(self):
return 'function(form) { return form.x.value }'
qs += '&'
return qs + '%s=%s' % (self.param, self.any_key)
+
class SearchFilter(Filter):
param = 'q'
+
def __init__(self, filters):
super(SearchFilter, self).__init__(filters)
self.name = 'Search'
value = ''
if self.search:
value = escape(self.search)
- return mark_safe('<input name="%s" class="form-control" value="%s">' %\
- (self.param, value))
+ return mark_safe('<input name="%s" class="form-control" value="%s">' %
+ (self.param, value))
def form_function(self):
return mark_safe('function(form) { return form.x.value }')
+
class ArchiveFilter(Filter):
param = 'archive'
+
def __init__(self, filters):
super(ArchiveFilter, self).__init__(filters)
self.name = 'Archived'
selected = ''
if self.archive_state == b:
selected = 'checked="true"'
- s += ('<label class="checkbox-inline">' \
- ' <input type="radio" name="%(param)s" ' + \
- '%(selected)s value="%(value)s">%(label)s' + \
- '</label>') % \
- {'label': label,
- 'param': self.param,
- 'selected': selected,
- 'value': self.param_map[b]
- }
+ s += ('<label class="checkbox-inline">'
+ ' <input type="radio" name="%(param)s" ' +
+ '%(selected)s value="%(value)s">%(label)s' +
+ '</label>') % \
+ {'label': label,
+ 'param': self.param,
+ 'selected': selected,
+ 'value': self.param_map[b]
+ }
return mark_safe(s)
def url_without_me(self):
applied = False
try:
- self.delegate = User.objects.get(id = str)
+ self.delegate = User.objects.get(id=str)
self.applied = True
except:
pass
return self.no_delegate_str
def _form(self):
- delegates = User.objects.filter(profile__maintainer_projects =
- self.filters.project)
+ delegates = User.objects.filter(
+ profile__maintainer_projects=self.filters.project)
str = '<select name="delegate" class="form-control">'
selected = 'selected'
str += '<option %s value="%s">%s</option>' % \
- (selected, self.no_delegate_key, self.no_delegate_str)
+ (selected, self.no_delegate_key, self.no_delegate_str)
for d in delegates:
selected = ''
selected = ' selected'
str += '<option %s value="%s">%s</option>' % (selected,
- d.id, d.profile.name())
+ d.id, d.profile.name())
str += '</select>'
return mark_safe(str)
self.applied = False
self.forced = True
-filterclasses = [SubmitterFilter, \
+filterclasses = [SubmitterFilter,
StateFilter,
SearchFilter,
ArchiveFilter,
DelegateFilter]
+
class Filters:
def __init__(self, request):
return queryset.filter(**kwargs)
def params(self):
- return [ (f.param, f.key()) for f in self._filters \
- if f.key() is not None ]
+ return [(f.param, f.key()) for f in self._filters
+ if f.key() is not None]
- def querystring(self, remove = None):
+ def querystring(self, remove=None):
params = dict(self.params())
for (k, v) in self.dict.items():
return quote(s.encode('utf-8'))
return '?' + '&'.join(['%s=%s' % (sanitise(k), sanitise(v))
- for (k, v) in list(params.items())])
+ for (k, v) in list(params.items())])
def querystring_without_filter(self, filter):
return self.querystring(filter)
class RegistrationForm(forms.Form):
- first_name = forms.CharField(max_length = 30, required = False)
- last_name = forms.CharField(max_length = 30, required = False)
- username = forms.RegexField(regex = r'^\w+$', max_length=30,
+ first_name = forms.CharField(max_length=30, required=False)
+ last_name = forms.CharField(max_length=30, required=False)
+ username = forms.RegexField(regex=r'^\w+$', max_length=30,
label=u'Username')
email = forms.EmailField(max_length=100, label=u'Email address')
password = forms.CharField(widget=forms.PasswordInput(),
- label='Password')
+ label='Password')
def clean_username(self):
value = self.cleaned_data['username']
try:
- user = User.objects.get(username__iexact = value)
+ user = User.objects.get(username__iexact=value)
except User.DoesNotExist:
return self.cleaned_data['username']
- raise forms.ValidationError('This username is already taken. ' + \
+ raise forms.ValidationError('This username is already taken. ' +
'Please choose another.')
def clean_email(self):
value = self.cleaned_data['email']
try:
- user = User.objects.get(email__iexact = value)
+ user = User.objects.get(email__iexact=value)
except User.DoesNotExist:
return self.cleaned_data['email']
- raise forms.ValidationError('This email address is already in use ' + \
+ raise forms.ValidationError('This email address is already in use ' +
'for the account "%s".\n' % user.username)
def clean(self):
return self.cleaned_data
+
class LoginForm(forms.Form):
- username = forms.CharField(max_length = 30)
- password = forms.CharField(widget = forms.PasswordInput)
+ username = forms.CharField(max_length=30)
+ password = forms.CharField(widget=forms.PasswordInput)
+
class BundleForm(forms.ModelForm):
- name = forms.RegexField(regex = r'^[^/]+$', max_length=50, label=u'Name',
- error_messages = {'invalid': 'Bundle names can\'t contain slashes'})
+ name = forms.RegexField(regex=r'^[^/]+$', max_length=50, label=u'Name',
+ error_messages={'invalid': 'Bundle names can\'t contain slashes'})
class Meta:
model = Bundle
fields = ['name', 'public']
+
class CreateBundleForm(BundleForm):
+
def __init__(self, *args, **kwargs):
super(CreateBundleForm, self).__init__(*args, **kwargs)
def clean_name(self):
name = self.cleaned_data['name']
- count = Bundle.objects.filter(owner = self.instance.owner, \
- name = name).count()
+ count = Bundle.objects.filter(owner=self.instance.owner,
+ name=name).count()
if count > 0:
- raise forms.ValidationError('A bundle called %s already exists' \
- % name)
+ raise forms.ValidationError('A bundle called %s already exists'
+ % name)
return name
+
class DeleteBundleForm(forms.Form):
name = 'deletebundleform'
- form_name = forms.CharField(initial = name, widget = forms.HiddenInput)
- bundle_id = forms.IntegerField(widget = forms.HiddenInput)
+ form_name = forms.CharField(initial=name, widget=forms.HiddenInput)
+ bundle_id = forms.IntegerField(widget=forms.HiddenInput)
+
class DelegateField(forms.ModelChoiceField):
+
def __init__(self, project, *args, **kwargs):
- queryset = User.objects.filter(profile__in = \
- UserProfile.objects \
- .filter(maintainer_projects = project) \
- .values('pk').query)
+ queryset = User.objects.filter(profile__in=UserProfile.objects
+ .filter(maintainer_projects=project)
+ .values('pk').query)
super(DelegateField, self).__init__(queryset, *args, **kwargs)
class PatchForm(forms.ModelForm):
- def __init__(self, instance = None, project = None, *args, **kwargs):
+
+ def __init__(self, instance=None, project=None, *args, **kwargs):
if (not project) and instance:
project = instance.project
if not project:
raise Exception("meep")
- super(PatchForm, self).__init__(instance = instance, *args, **kwargs)
- self.fields['delegate'] = DelegateField(project, required = False)
+ super(PatchForm, self).__init__(instance=instance, *args, **kwargs)
+ self.fields['delegate'] = DelegateField(project, required=False)
class Meta:
model = Patch
fields = ['state', 'archived', 'delegate']
+
class UserProfileForm(forms.ModelForm):
+
class Meta:
model = UserProfile
fields = ['primary_project', 'patches_per_page']
+
class OptionalDelegateField(DelegateField):
no_change_choice = ('*', 'no change')
to_field_name = None
- def __init__(self, no_change_choice = None, *args, **kwargs):
+ def __init__(self, no_change_choice=None, *args, **kwargs):
self.filter = None
if (no_change_choice):
self.no_change_choice = no_change_choice
super(OptionalDelegateField, self). \
- __init__(initial = self.no_change_choice[0], *args, **kwargs)
+ __init__(initial=self.no_change_choice[0], *args, **kwargs)
def _get_choices(self):
choices = list(
- super(OptionalDelegateField, self)._get_choices())
+ super(OptionalDelegateField, self)._get_choices())
choices.append(self.no_change_choice)
return choices
return value
return super(OptionalDelegateField, self).clean(value)
+
class OptionalModelChoiceField(forms.ModelChoiceField):
no_change_choice = ('*', 'no change')
to_field_name = None
- def __init__(self, no_change_choice = None, *args, **kwargs):
+ def __init__(self, no_change_choice=None, *args, **kwargs):
self.filter = None
if (no_change_choice):
self.no_change_choice = no_change_choice
super(OptionalModelChoiceField, self). \
- __init__(initial = self.no_change_choice[0], *args, **kwargs)
+ __init__(initial=self.no_change_choice[0], *args, **kwargs)
def _get_choices(self):
choices = list(
- super(OptionalModelChoiceField, self)._get_choices())
+ super(OptionalModelChoiceField, self)._get_choices())
choices.append(self.no_change_choice)
return choices
return value
return super(OptionalModelChoiceField, self).clean(value)
+
class MultipleBooleanField(forms.ChoiceField):
no_change_choice = ('*', 'no change')
+
def __init__(self, *args, **kwargs):
super(MultipleBooleanField, self).__init__(*args, **kwargs)
self.choices = [self.no_change_choice] + \
- [(True, 'Archived'), (False, 'Unarchived')]
+ [(True, 'Archived'), (False, 'Unarchived')]
def is_no_change(self, value):
return value == self.no_change_choice[0]
else:
raise ValueError('Unknown value: %s' % value)
+
class MultiplePatchForm(forms.Form):
action = 'update'
- state = OptionalModelChoiceField(queryset = State.objects.all())
+ state = OptionalModelChoiceField(queryset=State.objects.all())
archived = MultipleBooleanField()
def __init__(self, project, *args, **kwargs):
super(MultiplePatchForm, self).__init__(*args, **kwargs)
- self.fields['delegate'] = OptionalDelegateField(project = project,
- required = False)
+ self.fields['delegate'] = OptionalDelegateField(project=project,
+ required=False)
- def save(self, instance, commit = True):
+ def save(self, instance, commit=True):
opts = instance.__class__._meta
if self.errors:
raise ValueError("The %s could not be changed because the data "
- "didn't validate." % opts.object_name)
+ "didn't validate." % opts.object_name)
data = self.cleaned_data
# Update the instance
for f in opts.fields:
instance.save()
return instance
+
class EmailForm(forms.Form):
- email = forms.EmailField(max_length = 200)
+ email = forms.EmailField(max_length=200)
UserPersonLinkForm = EmailForm
OptinoutRequestForm = EmailForm
class Check(models.Model):
+
"""Check for a patch.
Checks store the results of any tests executed (or executing) for a
# parts from:
# http://blog.localkinegrinds.com/2007/09/06/digg-style-pagination-in-django/
+
class Paginator(paginator.Paginator):
+
def __init__(self, request, objects):
patches_per_page = settings.DEFAULT_PATCHES_PER_PAGE
pages = self.num_pages
if pages <= LEADING_PAGE_RANGE_DISPLAYED:
- self.adjacent_set = [n for n in range(1, pages + 1) \
- if n > 0 and n <= pages]
+ self.adjacent_set = [n for n in range(1, pages + 1)
+ if n > 0 and n <= pages]
elif page_no <= LEADING_PAGE_RANGE:
- self.adjacent_set = [n for n in \
- range(1, LEADING_PAGE_RANGE_DISPLAYED + 1) \
- if n > 0 and n <= pages]
- self.leading_set = [n + pages for n in \
- range(0, -NUM_PAGES_OUTSIDE_RANGE, -1)]
+ self.adjacent_set = [n for n in
+ range(1, LEADING_PAGE_RANGE_DISPLAYED + 1)
+ if n > 0 and n <= pages]
+ self.leading_set = [n + pages for n in
+ range(0, -NUM_PAGES_OUTSIDE_RANGE, -1)]
elif page_no > pages - TRAILING_PAGE_RANGE:
- self.adjacent_set = [n for n in \
- range(pages - TRAILING_PAGE_RANGE_DISPLAYED + 1, \
- pages + 1) if n > 0 and n <= pages]
- self.trailing_set = [n + 1 for n in range(0, \
- NUM_PAGES_OUTSIDE_RANGE)]
+ self.adjacent_set = [n for n in
+ range(pages - TRAILING_PAGE_RANGE_DISPLAYED + 1,
+ pages + 1) if n > 0 and n <= pages]
+ self.trailing_set = [n + 1 for n in range(0,
+ NUM_PAGES_OUTSIDE_RANGE)]
else:
- self.adjacent_set = [n for n in range(page_no - ADJACENT_PAGES, \
- page_no + ADJACENT_PAGES + 1) if n > 0 and n <= pages]
- self.leading_set = [n + pages for n in \
- range(0, -NUM_PAGES_OUTSIDE_RANGE, -1)]
- self.trailing_set = [n + 1 for n in \
- range(0, NUM_PAGES_OUTSIDE_RANGE)]
-
+ self.adjacent_set = [n for n in range(page_no - ADJACENT_PAGES,
+ page_no + ADJACENT_PAGES + 1) if n > 0 and n <= pages]
+ self.leading_set = [n + pages for n in
+ range(0, -NUM_PAGES_OUTSIDE_RANGE, -1)]
+ self.trailing_set = [n + 1 for n in
+ range(0, NUM_PAGES_OUTSIDE_RANGE)]
self.leading_set.reverse()
self.long_page = \
- len(self.current_page.object_list) >= LONG_PAGE_THRESHOLD
+ len(self.current_page.object_list) >= LONG_PAGE_THRESHOLD
_hunk_re = re.compile('^\@\@ -\d+(?:,(\d+))? \+\d+(?:,(\d+))? \@\@')
_filename_re = re.compile('^(---|\+\+\+) (\S+)')
+
def parse_patch(text):
patchbuf = ''
commentbuf = ''
lc = (0, 0)
hunk = 0
-
for line in text.split('\n'):
line += '\n'
return (patchbuf, commentbuf)
+
def hash_patch(str):
# normalise spaces
str = str.replace('\r', '')
return hash
+
def extract_tags(content, tags):
counts = Counter()
return counts
+
def main(args):
from optparse import OptionParser
parser = OptionParser()
- parser.add_option('-p', '--patch', action = 'store_true',
- dest = 'print_patch', help = 'print parsed patch')
- parser.add_option('-c', '--comment', action = 'store_true',
- dest = 'print_comment', help = 'print parsed comment')
- parser.add_option('-#', '--hash', action = 'store_true',
- dest = 'print_hash', help = 'print patch hash')
+ parser.add_option('-p', '--patch', action='store_true',
+ dest='print_patch', help='print parsed patch')
+ parser.add_option('-c', '--comment', action='store_true',
+ dest='print_comment', help='print parsed comment')
+ parser.add_option('-#', '--hash', action='store_true',
+ dest='print_hash', help='print patch hash')
(options, args) = parser.parse_args()
# Replace this with your own details
ADMINS = (
-# ('Jeremy Kerr', 'jk@ozlabs.org'),
+ # ('Jeremy Kerr', 'jk@ozlabs.org'),
)
DEFAULT_FROM_EMAIL = 'Patchwork <patchwork@patchwork.example.com>'
#
STATIC_ROOT = '/srv/patchwork/htdocs/static'
-
register = template.Library()
+
@register.filter(name='patch_tags')
def patch_tags(patch):
counts = []
register = template.Library()
+
@register.filter
def personify(person, project):
else:
linktext = escape(person.email)
- url = reverse('patchwork.views.patch.list', kwargs = {'project_id' : project.linkname})
+ url = reverse(
+ 'patchwork.views.patch.list', kwargs={'project_id': project.linkname})
str = '<a href="%s?%s=%s">%s</a>' % \
- (url, SubmitterFilter.param, escape(person.id), linktext)
+ (url, SubmitterFilter.param, escape(person.id), linktext)
return mark_safe(str)
-
-
register = template.Library()
+
def _compile(t):
(r, str) = t
return (re.compile(r, re.M | re.I), str)
_patch_span_res = list(map(_compile, [
- ('^(Index:?|diff|\-\-\-|\+\+\+|\*\*\*) .*$', 'p_header'),
- ('^\+.*$', 'p_add'),
- ('^-.*$', 'p_del'),
- ('^!.*$', 'p_mod'),
- ]))
+ ('^(Index:?|diff|\-\-\-|\+\+\+|\*\*\*) .*$', 'p_header'),
+ ('^\+.*$', 'p_add'),
+ ('^-.*$', 'p_del'),
+ ('^!.*$', 'p_mod'),
+]))
_patch_chunk_re = \
- re.compile('^(@@ \-\d+(?:,\d+)? \+\d+(?:,\d+)? @@)(.*)$', re.M | re.I)
+ re.compile('^(@@ \-\d+(?:,\d+)? \+\d+(?:,\d+)? @@)(.*)$', re.M | re.I)
_comment_span_res = list(map(_compile, [
- ('^\s*Signed-off-by: .*$', 'signed-off-by'),
- ('^\s*Acked-by: .*$', 'acked-by'),
- ('^\s*Nacked-by: .*$', 'nacked-by'),
- ('^\s*Tested-by: .*$', 'tested-by'),
- ('^\s*Reviewed-by: .*$', 'reviewed-by'),
- ('^\s*From: .*$', 'from'),
- ('^\s*>.*$', 'quote'),
- ]))
+ ('^\s*Signed-off-by: .*$', 'signed-off-by'),
+ ('^\s*Acked-by: .*$', 'acked-by'),
+ ('^\s*Nacked-by: .*$', 'nacked-by'),
+ ('^\s*Tested-by: .*$', 'tested-by'),
+ ('^\s*Reviewed-by: .*$', 'reviewed-by'),
+ ('^\s*From: .*$', 'from'),
+ ('^\s*>.*$', 'quote'),
+]))
_span = '<span class="%s">%s</span>'
+
@register.filter
def patchsyntax(patch):
content = escape(patch.content)
- for (r,cls) in _patch_span_res:
+ for (r, cls) in _patch_span_res:
content = r.sub(lambda x: _span % (cls, x.group(0)), content)
- content = _patch_chunk_re.sub( \
- lambda x: \
- _span % ('p_chunk', x.group(1)) + ' ' + \
- _span % ('p_context', x.group(2)), \
- content)
+ content = _patch_chunk_re.sub(
+ lambda x:
+ _span % ('p_chunk', x.group(1)) + ' ' +
+ _span % ('p_context', x.group(2)),
+ content)
return mark_safe(content)
+
@register.filter
def commentsyntax(comment):
content = escape(comment.content)
- for (r,cls) in _comment_span_res:
+ for (r, cls) in _comment_span_res:
content = r.sub(lambda x: _span % (cls, x.group(0)), content)
return mark_safe(content)
else:
from django.contrib.staticfiles.testing import StaticLiveServerTestCase
from selenium.common.exceptions import (
- NoSuchElementException, StaleElementReferenceException,
- TimeoutException)
+ NoSuchElementException, StaleElementReferenceException,
+ TimeoutException)
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
class Wait(WebDriverWait):
+
"""Subclass of WebDriverWait.
Includes a predetermined timeout and poll frequency. Also deals with a
from patchwork.tests.utils import defaults, create_user, find_in_context
from django.utils.six.moves import range, zip
+
def bundle_url(bundle):
return '/bundle/%s/%s/' % (bundle.owner.username, bundle.name)
+
class BundleListTest(TestCase):
+
def setUp(self):
self.user = create_user()
- self.client.login(username = self.user.username,
- password = self.user.username)
+ self.client.login(username=self.user.username,
+ password=self.user.username)
def testNoBundles(self):
response = self.client.get('/user/bundles/')
self.assertEqual(response.status_code, 200)
self.assertEqual(
- len(find_in_context(response.context, 'bundles')), 0)
+ len(find_in_context(response.context, 'bundles')), 0)
def testSingleBundle(self):
defaults.project.save()
- bundle = Bundle(owner = self.user, project = defaults.project)
+ bundle = Bundle(owner=self.user, project=defaults.project)
bundle.save()
response = self.client.get('/user/bundles/')
self.assertEqual(response.status_code, 200)
self.assertEqual(
- len(find_in_context(response.context, 'bundles')), 1)
+ len(find_in_context(response.context, 'bundles')), 1)
def tearDown(self):
self.user.delete()
+
class BundleTestBase(TestCase):
fixtures = ['default_states']
+
def setUp(self, patch_count=3):
- patch_names = ['testpatch%d' % (i) for i in range(1, patch_count+1)]
+ patch_names = ['testpatch%d' % (i) for i in range(1, patch_count + 1)]
self.user = create_user()
- self.client.login(username = self.user.username,
- password = self.user.username)
+ self.client.login(username=self.user.username,
+ password=self.user.username)
defaults.project.save()
- self.bundle = Bundle(owner = self.user, project = defaults.project,
- name = 'testbundle')
+ self.bundle = Bundle(owner=self.user, project=defaults.project,
+ name='testbundle')
self.bundle.save()
self.patches = []
for patch_name in patch_names:
- patch = Patch(project = defaults.project,
- msgid = patch_name, name = patch_name,
- submitter = Person.objects.get(user = self.user),
- content = '')
+ patch = Patch(project=defaults.project,
+ msgid=patch_name, name=patch_name,
+ submitter=Person.objects.get(user=self.user),
+ content='')
patch.save()
self.patches.append(patch)
self.bundle.delete()
self.user.delete()
+
class BundleViewTest(BundleTestBase):
def testEmptyBundle(self):
# reorder and recheck
i = 0
for patch in self.patches.__reversed__():
- bundlepatch = BundlePatch.objects.get(bundle = self.bundle,
- patch = patch)
+ bundlepatch = BundlePatch.objects.get(bundle=self.bundle,
+ patch=patch)
bundlepatch.order = i
bundlepatch.save()
i += 1
self.assertTrue(next_pos < pos)
pos = next_pos
+
class BundleUpdateTest(BundleTestBase):
def setUp(self):
response = self.client.post(bundle_url(self.bundle), data)
self.assertEqual(response.status_code, 200)
- bundle = Bundle.objects.get(pk = self.bundle.pk)
+ bundle = Bundle.objects.get(pk=self.bundle.pk)
self.assertEqual(bundle.name, self.bundle.name)
self.assertEqual(bundle.public, self.bundle.public)
'public': self.publicString(self.bundle.public)
}
response = self.client.post(bundle_url(self.bundle), data)
- bundle = Bundle.objects.get(pk = self.bundle.pk)
+ bundle = Bundle.objects.get(pk=self.bundle.pk)
self.assertRedirects(response, bundle_url(bundle))
self.assertEqual(bundle.name, newname)
self.assertEqual(bundle.public, self.bundle.public)
}
response = self.client.post(bundle_url(self.bundle), data)
self.assertEqual(response.status_code, 200)
- bundle = Bundle.objects.get(pk = self.bundle.pk)
+ bundle = Bundle.objects.get(pk=self.bundle.pk)
self.assertEqual(bundle.name, self.bundle.name)
self.assertEqual(bundle.public, not self.bundle.public)
# check other forms for errors
self.checkPatchformErrors(response)
+
class BundleMaintainerUpdateTest(BundleUpdateTest):
def setUp(self):
profile.maintainer_projects.add(defaults.project)
profile.save()
+
class BundlePublicViewTest(BundleTestBase):
def setUp(self):
response = self.client.get(self.url)
self.assertEqual(response.status_code, 404)
+
class BundlePublicViewMboxTest(BundlePublicViewTest):
+
def setUp(self):
super(BundlePublicViewMboxTest, self).setUp()
self.url = bundle_url(self.bundle) + "mbox/"
+
class BundlePublicModifyTest(BundleTestBase):
+
"""Ensure that non-owners can't modify bundles"""
def setUp(self):
def testBundleFormPresence(self):
"""Check for presence of the modify form on the bundle"""
- self.client.login(username = self.other_user.username,
- password = self.other_user.username)
+ self.client.login(username=self.other_user.username,
+ password=self.other_user.username)
response = self.client.get(bundle_url(self.bundle))
self.assertNotContains(response, 'name="form" value="bundle"')
self.assertNotContains(response, 'Change order')
self.bundle.save()
# first, check that we can modify with the owner
- self.client.login(username = self.user.username,
- password = self.user.username)
+ self.client.login(username=self.user.username,
+ password=self.user.username)
response = self.client.post(bundle_url(self.bundle), data)
- self.bundle = Bundle.objects.get(pk = self.bundle.pk)
+ self.bundle = Bundle.objects.get(pk=self.bundle.pk)
self.assertEqual(self.bundle.name, newname)
# reset bundle name
self.bundle.save()
# log in with a different user, and check that we can no longer modify
- self.client.login(username = self.other_user.username,
- password = self.other_user.username)
+ self.client.login(username=self.other_user.username,
+ password=self.other_user.username)
response = self.client.post(bundle_url(self.bundle), data)
- self.bundle = Bundle.objects.get(pk = self.bundle.pk)
+ self.bundle = Bundle.objects.get(pk=self.bundle.pk)
self.assertNotEqual(self.bundle.name, newname)
+
class BundleCreateFromListTest(BundleTestBase):
+
def testCreateEmptyBundle(self):
newbundlename = 'testbundle-new'
params = {'form': 'patchlistform',
'project': defaults.project.id}
response = self.client.post(
- '/project/%s/list/' % defaults.project.linkname,
- params)
+ '/project/%s/list/' % defaults.project.linkname,
+ params)
self.assertContains(response, 'Bundle %s created' % newbundlename)
'patch_id:%d' % patch.id: 'checked'}
response = self.client.post(
- '/project/%s/list/' % defaults.project.linkname,
- params)
+ '/project/%s/list/' % defaults.project.linkname,
+ params)
self.assertContains(response, 'Bundle %s created' % newbundlename)
self.assertContains(response, 'added to bundle %s' % newbundlename,
- count = 1)
+ count=1)
- bundle = Bundle.objects.get(name = newbundlename)
+ bundle = Bundle.objects.get(name=newbundlename)
self.assertEqual(bundle.patches.count(), 1)
self.assertEqual(bundle.patches.all()[0], patch)
'patch_id:%d' % patch.id: 'checked'}
response = self.client.post(
- '/project/%s/list/' % defaults.project.linkname,
- params)
+ '/project/%s/list/' % defaults.project.linkname,
+ params)
self.assertContains(response, 'No bundle name was specified',
- status_code = 200)
+ status_code=200)
# test that no new bundles are present
self.assertEqual(n_bundles, Bundle.objects.count())
'patch_id:%d' % patch.id: 'checked'}
response = self.client.post(
- '/project/%s/list/' % defaults.project.linkname,
- params)
+ '/project/%s/list/' % defaults.project.linkname,
+ params)
n_bundles = Bundle.objects.count()
self.assertContains(response, 'Bundle %s created' % newbundlename)
self.assertContains(response, 'added to bundle %s' % newbundlename,
- count = 1)
+ count=1)
- bundle = Bundle.objects.get(name = newbundlename)
+ bundle = Bundle.objects.get(name=newbundlename)
self.assertEqual(bundle.patches.count(), 1)
self.assertEqual(bundle.patches.all()[0], patch)
response = self.client.post(
- '/project/%s/list/' % defaults.project.linkname,
- params)
+ '/project/%s/list/' % defaults.project.linkname,
+ params)
self.assertNotContains(response, 'Bundle %s created' % newbundlename)
self.assertContains(response, 'You already have a bundle called')
self.assertEqual(Bundle.objects.count(), n_bundles)
self.assertEqual(bundle.patches.count(), 1)
+
class BundleCreateFromPatchTest(BundleTestBase):
+
def testCreateNonEmptyBundle(self):
newbundlename = 'testbundle-new'
patch = self.patches[0]
response = self.client.post('/patch/%d/' % patch.id, params)
self.assertContains(response,
- 'Bundle %s created' % newbundlename)
+ 'Bundle %s created' % newbundlename)
- bundle = Bundle.objects.get(name = newbundlename)
+ bundle = Bundle.objects.get(name=newbundlename)
self.assertEqual(bundle.patches.count(), 1)
self.assertEqual(bundle.patches.all()[0], patch)
response = self.client.post('/patch/%d/' % patch.id, params)
self.assertContains(response,
- 'A bundle called %s already exists' % newbundlename)
+ 'A bundle called %s already exists' % newbundlename)
count = Bundle.objects.count()
self.assertEqual(Bundle.objects.count(), 1)
+
class BundleAddFromListTest(BundleTestBase):
+
def testAddToEmptyBundle(self):
patch = self.patches[0]
params = {'form': 'patchlistform',
'patch_id:%d' % patch.id: 'checked'}
response = self.client.post(
- '/project/%s/list/' % defaults.project.linkname,
- params)
+ '/project/%s/list/' % defaults.project.linkname,
+ params)
self.assertContains(response, 'added to bundle %s' % self.bundle.name,
- count = 1)
+ count=1)
self.assertEqual(self.bundle.patches.count(), 1)
self.assertEqual(self.bundle.patches.all()[0], patch)
'patch_id:%d' % patch.id: 'checked'}
response = self.client.post(
- '/project/%s/list/' % defaults.project.linkname,
- params)
+ '/project/%s/list/' % defaults.project.linkname,
+ params)
self.assertContains(response, 'added to bundle %s' % self.bundle.name,
- count = 1)
+ count=1)
self.assertEqual(self.bundle.patches.count(), 2)
self.assertIn(self.patches[0], self.bundle.patches.all())
self.assertIn(self.patches[1], self.bundle.patches.all())
# check order
- bps = [ BundlePatch.objects.get(bundle = self.bundle,
- patch = self.patches[i]) \
- for i in [0, 1] ]
+ bps = [BundlePatch.objects.get(bundle=self.bundle,
+ patch=self.patches[i])
+ for i in [0, 1]]
self.assertTrue(bps[0].order < bps[1].order)
def testAddDuplicate(self):
'patch_id:%d' % patch.id: 'checked'}
response = self.client.post(
- '/project/%s/list/' % defaults.project.linkname,
- params)
+ '/project/%s/list/' % defaults.project.linkname,
+ params)
- self.assertContains(response, 'Patch '%s' already in bundle' \
- % patch.name, count = 1, status_code = 200)
+ self.assertContains(response, 'Patch '%s' already in bundle'
+ % patch.name, count=1, status_code=200)
self.assertEqual(count, self.bundle.patches.count())
'patch_id:%d' % self.patches[1].id: 'checked'}
response = self.client.post(
- '/project/%s/list/' % defaults.project.linkname,
- params)
-
- self.assertContains(response, 'Patch '%s' already in bundle' \
- % patch.name, count = 1, status_code = 200)
- self.assertContains(response, 'Patch '%s' added to bundle' \
- % self.patches[1].name, count = 1,
- status_code = 200)
+ '/project/%s/list/' % defaults.project.linkname,
+ params)
+
+ self.assertContains(response, 'Patch '%s' already in bundle'
+ % patch.name, count=1, status_code=200)
+ self.assertContains(response, 'Patch '%s' added to bundle'
+ % self.patches[1].name, count=1,
+ status_code=200)
self.assertEqual(count + 1, self.bundle.patches.count())
+
class BundleAddFromPatchTest(BundleTestBase):
+
def testAddToEmptyBundle(self):
patch = self.patches[0]
params = {'action': 'addtobundle',
response = self.client.post('/patch/%d/' % patch.id, params)
self.assertContains(response,
- 'added to bundle "%s"' % self.bundle.name,
- count = 1)
+ 'added to bundle "%s"' % self.bundle.name,
+ count=1)
self.assertEqual(self.bundle.patches.count(), 1)
self.assertEqual(self.bundle.patches.all()[0], patch)
response = self.client.post('/patch/%d/' % patch.id, params)
self.assertContains(response,
- 'added to bundle "%s"' % self.bundle.name,
- count = 1)
+ 'added to bundle "%s"' % self.bundle.name,
+ count=1)
self.assertEqual(self.bundle.patches.count(), 2)
self.assertIn(self.patches[0], self.bundle.patches.all())
self.assertIn(self.patches[1], self.bundle.patches.all())
# check order
- bps = [ BundlePatch.objects.get(bundle = self.bundle,
- patch = self.patches[i]) \
- for i in [0, 1] ]
+ bps = [BundlePatch.objects.get(bundle=self.bundle,
+ patch=self.patches[i])
+ for i in [0, 1]]
self.assertTrue(bps[0].order < bps[1].order)
+
class BundleInitialOrderTest(BundleTestBase):
+
"""When creating bundles from a patch list, ensure that the patches in the
bundle are ordered by date"""
# put patches in an arbitrary order
idxs = [2, 4, 3, 1, 0]
- self.patches = [ self.patches[i] for i in idxs ]
+ self.patches = [self.patches[i] for i in idxs]
# set dates to be sequential
last_patch = self.patches[0]
'bundle_name': newbundlename,
'action': 'Create',
'project': defaults.project.id,
- }
+ }
data = urlencode(params) + \
- ''.join([ '&patch_id:%d=checked' % i for i in ids ])
+ ''.join(['&patch_id:%d=checked' % i for i in ids])
response = self.client.post(
- '/project/%s/list/' % defaults.project.linkname,
- data = data,
- content_type = 'application/x-www-form-urlencoded',
- )
+ '/project/%s/list/' % defaults.project.linkname,
+ data=data,
+ content_type='application/x-www-form-urlencoded',
+ )
self.assertContains(response, 'Bundle %s created' % newbundlename)
self.assertContains(response, 'added to bundle %s' % newbundlename,
- count = 5)
+ count=5)
- bundle = Bundle.objects.get(name = newbundlename)
+ bundle = Bundle.objects.get(name=newbundlename)
# BundlePatches should be sorted by .order by default
- bps = BundlePatch.objects.filter(bundle = bundle)
+ bps = BundlePatch.objects.filter(bundle=bundle)
for (bp, p) in zip(bps, expected_order):
self.assertEqual(bp.patch.pk, p.pk)
ids.reverse()
self._testOrder(ids, self.patches)
+
class BundleReorderTest(BundleTestBase):
+
def setUp(self):
super(BundleReorderTest, self).setUp(5)
for i in range(5):
self.bundle.append_patch(self.patches[i])
def checkReordering(self, neworder, start, end):
- neworder_ids = [ self.patches[i].id for i in neworder ]
+ neworder_ids = [self.patches[i].id for i in neworder]
- firstpatch = BundlePatch.objects.get(bundle = self.bundle,
- patch = self.patches[start]).patch
+ firstpatch = BundlePatch.objects.get(bundle=self.bundle,
+ patch=self.patches[start]).patch
slice_ids = neworder_ids[start:end]
params = {'form': 'reorderform',
self.assertEqual(response.status_code, 200)
- bps = BundlePatch.objects.filter(bundle = self.bundle) \
- .order_by('order')
+ bps = BundlePatch.objects.filter(bundle=self.bundle) \
+ .order_by('order')
# check if patch IDs are in the expected order:
- bundle_ids = [ bp.patch.id for bp in bps ]
+ bundle_ids = [bp.patch.id for bp in bps]
self.assertEqual(neworder_ids, bundle_ids)
# check if order field is still sequential:
- order_numbers = [ bp.order for bp in bps ]
- expected_order = list(range(1, len(neworder)+1)) # [1 ... len(neworder)]
+ order_numbers = [bp.order for bp in bps]
+ # [1 ... len(neworder)]
+ expected_order = list(range(1, len(neworder) + 1))
self.assertEqual(order_numbers, expected_order)
def testBundleReorderAll(self):
# reorder all patches:
- self.checkReordering([2,1,4,0,3], 0, 5)
+ self.checkReordering([2, 1, 4, 0, 3], 0, 5)
def testBundleReorderEnd(self):
# reorder only the last three patches
- self.checkReordering([0,1,3,2,4], 2, 5)
+ self.checkReordering([0, 1, 3, 2, 4], 2, 5)
def testBundleReorderBegin(self):
# reorder only the first three patches
- self.checkReordering([2,0,1,3,4], 0, 3)
+ self.checkReordering([2, 0, 1, 3, 4], 0, 3)
def testBundleReorderMiddle(self):
# reorder only 2nd, 3rd, and 4th patches
- self.checkReordering([0,2,3,1,4], 1, 4)
+ self.checkReordering([0, 2, 3, 1, 4], 1, 4)
+
class BundleRedirTest(BundleTestBase):
# old URL: private bundles used to be under /user/bundle/<id>
def testMboxRedir(self):
url = '/user/bundle/%d/mbox/' % self.bundle.id
response = self.client.get(url)
- self.assertRedirects(response,'/bundle/%s/%s/mbox/' %
- (self.bundle.owner.username,
- self.bundle.name))
+ self.assertRedirects(response, '/bundle/%s/%s/mbox/' %
+ (self.bundle.owner.username,
+ self.bundle.name))
self.create_check()
self.create_check(context='new/test1')
self.assertCheckEqual(self.patch, Check.STATE_SUCCESS)
-
def _confirmation_url(conf):
- return reverse('patchwork.views.confirm', kwargs = {'key': conf.key})
+ return reverse('patchwork.views.confirm', kwargs={'key': conf.key})
+
class TestUser(object):
username = 'testuser'
def __init__(self):
self.password = User.objects.make_random_password()
self.user = User.objects.create_user(self.username,
- self.email, self.password)
+ self.email, self.password)
+
class InvalidConfirmationTest(TestCase):
+
def setUp(self):
EmailConfirmation.objects.all().delete()
Person.objects.all().delete()
self.user = TestUser()
- self.conf = EmailConfirmation(type = 'userperson',
- email = self.user.secondary_email,
- user = self.user.user)
+ self.conf = EmailConfirmation(type='userperson',
+ email=self.user.secondary_email,
+ user=self.user.user)
self.conf.save()
def testInactiveConfirmation(self):
self.assertTemplateUsed(response, 'patchwork/confirm-error.html')
self.assertEqual(response.context['error'], 'expired')
self.assertEqual(response.context['conf'], self.conf)
-
defaults.project.save()
defaults.patch_author_person.save()
self.patch_content = read_patch(self.patch_filename,
- encoding = self.patch_encoding)
- self.patch = Patch(project = defaults.project,
- msgid = 'x', name = defaults.patch_name,
- submitter = defaults.patch_author_person,
- content = self.patch_content)
+ encoding=self.patch_encoding)
+ self.patch = Patch(project=defaults.project,
+ msgid='x', name=defaults.patch_name,
+ submitter=defaults.patch_author_person,
+ content=self.patch_content)
self.patch.save()
self.client = Client()
def testMboxView(self):
response = self.client.get('/patch/%d/mbox/' % self.patch.id)
self.assertEqual(response.status_code, 200)
- self.assertTrue(self.patch.content in \
- response.content.decode(self.patch_encoding))
+ self.assertTrue(self.patch.content in
+ response.content.decode(self.patch_encoding))
def testRawView(self):
response = self.client.get('/patch/%d/raw/' % self.patch.id)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content.decode(self.patch_encoding),
- self.patch.content)
+ self.patch.content)
def tearDown(self):
self.patch.delete()
defaults.patch_author_person.delete()
defaults.project.delete()
+
class UTF8HeaderPatchViewTest(UTF8PatchViewTest):
fixtures = ['default_states']
patch_filename = '0002-utf-8.patch'
def setUp(self):
defaults.project.save()
- self.patch_author = Person(name = self.patch_author_name,
- email = defaults.patch_author_person.email)
+ self.patch_author = Person(name=self.patch_author_name,
+ email=defaults.patch_author_person.email)
self.patch_author.save()
self.patch_content = read_patch(self.patch_filename,
- encoding = self.patch_encoding)
- self.patch = Patch(project = defaults.project,
- msgid = 'x', name = defaults.patch_name,
- submitter = self.patch_author,
- content = self.patch_content)
+ encoding=self.patch_encoding)
+ self.patch = Patch(project=defaults.project,
+ msgid='x', name=defaults.patch_name,
+ submitter=self.patch_author,
+ content=self.patch_content)
self.patch.save()
self.client = Client()
user.save()
conf = EmailConfirmation(type='registration', user=user,
- email=user.email)
+ email=user.email)
conf.date = date
conf.save()
def testOldRegistrationExpiry(self):
date = ((datetime.datetime.now() - EmailConfirmation.validity) -
- datetime.timedelta(hours = 1))
+ datetime.timedelta(hours=1))
(user, conf) = self.register(date)
do_expiry()
- self.assertFalse(User.objects.filter(pk = user.pk).exists())
- self.assertFalse(EmailConfirmation.objects.filter(pk = conf.pk)
- .exists())
-
+ self.assertFalse(User.objects.filter(pk=user.pk).exists())
+ self.assertFalse(EmailConfirmation.objects.filter(pk=conf.pk)
+ .exists())
def testRecentRegistrationExpiry(self):
date = ((datetime.datetime.now() - EmailConfirmation.validity) +
- datetime.timedelta(hours = 1))
+ datetime.timedelta(hours=1))
(user, conf) = self.register(date)
do_expiry()
- self.assertTrue(User.objects.filter(pk = user.pk).exists())
- self.assertTrue(EmailConfirmation.objects.filter(pk = conf.pk)
- .exists())
+ self.assertTrue(User.objects.filter(pk=user.pk).exists())
+ self.assertTrue(EmailConfirmation.objects.filter(pk=conf.pk)
+ .exists())
def testInactiveRegistrationExpiry(self):
(user, conf) = self.register(datetime.datetime.now())
do_expiry()
- self.assertTrue(User.objects.filter(pk = user.pk).exists())
- self.assertFalse(EmailConfirmation.objects.filter(pk = conf.pk)
- .exists())
+ self.assertTrue(User.objects.filter(pk=user.pk).exists())
+ self.assertFalse(EmailConfirmation.objects.filter(pk=conf.pk)
+ .exists())
def testPatchSubmitterExpiry(self):
defaults.project.save()
defaults.patch_author_person.save()
# someone submits a patch...
- patch = Patch(project = defaults.project,
- msgid = 'test@example.com', name = 'test patch',
- submitter = defaults.patch_author_person,
- content = defaults.patch)
+ patch = Patch(project=defaults.project,
+ msgid='test@example.com', name='test patch',
+ submitter=defaults.patch_author_person,
+ content=defaults.patch)
patch.save()
# ... then starts registration...
date = ((datetime.datetime.now() - EmailConfirmation.validity) -
- datetime.timedelta(hours = 1))
+ datetime.timedelta(hours=1))
userid = 'test-user'
user = User.objects.create_user(userid,
- defaults.patch_author_person.email, userid)
+ defaults.patch_author_person.email, userid)
user.is_active = False
user.date_joined = user.last_login = date
user.save()
self.assertEqual(user.email, patch.submitter.email)
conf = EmailConfirmation(type='registration', user=user,
- email=user.email)
+ email=user.email)
conf.date = date
conf.save()
do_expiry()
# we should see no matching user
- self.assertFalse(User.objects.filter(email = patch.submitter.email)
- .exists())
+ self.assertFalse(User.objects.filter(email=patch.submitter.email)
+ .exists())
# but the patch and person should still be present
self.assertTrue(Person.objects.filter(
- pk = defaults.patch_author_person.pk).exists())
- self.assertTrue(Patch.objects.filter(pk = patch.pk).exists())
+ pk=defaults.patch_author_person.pk).exists())
+ self.assertTrue(Patch.objects.filter(pk=patch.pk).exists())
# and there should be no user associated with the person
- self.assertEqual(Person.objects.get(pk =
- defaults.patch_author_person.pk).user, None)
+ self.assertEqual(
+ Person.objects.get(pk=defaults.patch_author_person.pk).user, None)
class FilterQueryStringTest(TestCase):
+
def testFilterQSEscaping(self):
"""test that filter fragments in a query string are properly escaped,
and stray ampersands don't get reflected back in the filter
project = defaults.project
defaults.project.save()
url = reverse('patchwork.views.patch.list',
- kwargs={'project_id': project.linkname})
+ kwargs={'project_id': project.linkname})
response = self.client.get(url)
self.assertContains(response, 'No patches to display')
self.assertNotContains(response, 'tbody')
+
class PatchOrderTest(TestCase):
fixtures = ['default_states']
d = datetime.datetime
patchmeta = [
- ('AlCMyjOsx', 'AlxMyjOsx@nRbqkQV.wBw', d(2014,3,16,13, 4,50, 155643)),
- ('MMZnrcDjT', 'MMmnrcDjT@qGaIfOl.tbk', d(2014,1,25,13, 4,50, 162814)),
- ('WGirwRXgK', 'WGSrwRXgK@TriIETY.GhE', d(2014,2,14,13, 4,50, 169305)),
- ('isjNIuiAc', 'issNIuiAc@OsEirYx.EJh', d(2014,3,15,13, 4,50, 176264)),
- ('XkAQpYGws', 'XkFQpYGws@hzntTcm.JSE', d(2014,1,18,13, 4,50, 182493)),
- ('uJuCPWMvi', 'uJACPWMvi@AVRBOBl.ecy', d(2014,3,12,13, 4,50, 189554)),
- ('TyQmWtcbg', 'TylmWtcbg@DzrNeNH.JuB', d(2014,2, 3,13, 4,50, 195685)),
- ('FpvAhWRdX', 'FpKAhWRdX@agxnCAI.wFO', d(2014,3,15,13, 4,50, 201398)),
- ('bmoYvnyWa', 'bmdYvnyWa@aeoPnlX.juy', d(2014,3, 4,13, 4,50, 206800)),
- ('CiReUQsAq', 'CiieUQsAq@DnOYRuf.TTI', d(2014,3,28,13, 4,50, 212169)),
+ ('AlCMyjOsx', 'AlxMyjOsx@nRbqkQV.wBw',
+ d(2014, 3, 16, 13, 4, 50, 155643)),
+ ('MMZnrcDjT', 'MMmnrcDjT@qGaIfOl.tbk',
+ d(2014, 1, 25, 13, 4, 50, 162814)),
+ ('WGirwRXgK', 'WGSrwRXgK@TriIETY.GhE',
+ d(2014, 2, 14, 13, 4, 50, 169305)),
+ ('isjNIuiAc', 'issNIuiAc@OsEirYx.EJh',
+ d(2014, 3, 15, 13, 4, 50, 176264)),
+ ('XkAQpYGws', 'XkFQpYGws@hzntTcm.JSE',
+ d(2014, 1, 18, 13, 4, 50, 182493)),
+ ('uJuCPWMvi', 'uJACPWMvi@AVRBOBl.ecy',
+ d(2014, 3, 12, 13, 4, 50, 189554)),
+ ('TyQmWtcbg', 'TylmWtcbg@DzrNeNH.JuB',
+ d(2014, 2, 3, 13, 4, 50, 195685)),
+ ('FpvAhWRdX', 'FpKAhWRdX@agxnCAI.wFO',
+ d(2014, 3, 15, 13, 4, 50, 201398)),
+ ('bmoYvnyWa', 'bmdYvnyWa@aeoPnlX.juy',
+ d(2014, 3, 4, 13, 4, 50, 206800)),
+ ('CiReUQsAq', 'CiieUQsAq@DnOYRuf.TTI',
+ d(2014, 3, 28, 13, 4, 50, 212169)),
]
def setUp(self):
for (name, email, date) in self.patchmeta:
patch_name = 'testpatch' + name
- person = Person(name = name, email = email)
+ person = Person(name=name, email=email)
person.save()
- patch = Patch(project = defaults.project, msgid = patch_name,
- submitter = person, content = '', date = date)
+ patch = Patch(project=defaults.project, msgid=patch_name,
+ submitter=person, content='', date=date)
patch.save()
def _extract_patch_ids(self, response):
id_re = re.compile('<tr id="patch_row:(\d+)"')
- ids = [ int(m.group(1)) for m in id_re.finditer(response.content.decode()) ]
+ ids = [int(m.group(1))
+ for m in id_re.finditer(response.content.decode())]
return ids
def _test_sequence(self, response, test_fn):
ids = self._extract_patch_ids(response)
self.assertTrue(bool(ids))
- patches = [ Patch.objects.get(id = i) for i in ids ]
+ patches = [Patch.objects.get(id=i) for i in ids]
pairs = list(zip(patches, patches[1:]))
- [ test_fn(p1, p2) for (p1, p2) in pairs ]
+ [test_fn(p1, p2) for (p1, p2) in pairs]
def testDateOrder(self):
url = reverse('patchwork.views.patch.list',
- kwargs={'project_id': defaults.project.linkname})
+ kwargs={'project_id': defaults.project.linkname})
response = self.client.get(url + '?order=date')
+
def test_fn(p1, p2):
self.assertLessEqual(p1.date, p2.date)
self._test_sequence(response, test_fn)
def testDateReverseOrder(self):
url = reverse('patchwork.views.patch.list',
- kwargs={'project_id': defaults.project.linkname})
+ kwargs={'project_id': defaults.project.linkname})
response = self.client.get(url + '?order=-date')
+
def test_fn(p1, p2):
self.assertGreaterEqual(p1.date, p2.date)
self._test_sequence(response, test_fn)
def testSubmitterOrder(self):
url = reverse('patchwork.views.patch.list',
- kwargs={'project_id': defaults.project.linkname})
+ kwargs={'project_id': defaults.project.linkname})
response = self.client.get(url + '?order=submitter')
+
def test_fn(p1, p2):
self.assertLessEqual(p1.submitter.name.lower(),
p2.submitter.name.lower())
def testSubmitterReverseOrder(self):
url = reverse('patchwork.views.patch.list',
- kwargs={'project_id': defaults.project.linkname})
+ kwargs={'project_id': defaults.project.linkname})
response = self.client.get(url + '?order=-submitter')
+
def test_fn(p1, p2):
self.assertGreaterEqual(p1.submitter.name.lower(),
p2.submitter.name.lower())
self._test_sequence(response, test_fn)
-
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, 'patchwork/mail-form.html')
self.assertFormError(response, 'form', 'email',
- 'This field is required.')
+ 'This field is required.')
def testMailSettingsPOSTInvalid(self):
response = self.client.post(self.url, {'email': 'foo'})
def testMailSettingsPOSTOptedOut(self):
email = u'foo@example.com'
- EmailOptout(email = email).save()
+ EmailOptout(email=email).save()
response = self.client.post(self.url, {'email': email})
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, 'patchwork/mail-settings.html')
optin_url = reverse('patchwork.views.mail.optin')
self.assertContains(response, ('action="%s"' % optin_url))
+
class OptoutRequestTest(TestCase):
def setUp(self):
def testOptOutRequestGET(self):
response = self.client.get(self.url)
- self.assertRedirects(response, reverse('patchwork.views.mail.settings'))
+ self.assertRedirects(
+ response, reverse('patchwork.views.mail.settings'))
def testOptoutRequestValidPOST(self):
email = u'foo@example.com'
# check for a confirmation object
self.assertEqual(EmailConfirmation.objects.count(), 1)
- conf = EmailConfirmation.objects.get(email = email)
+ conf = EmailConfirmation.objects.get(email=email)
# check confirmation page
self.assertEqual(response.status_code, 200)
self.assertContains(response, email)
# check email
- url = reverse('patchwork.views.confirm', kwargs = {'key': conf.key})
+ url = reverse('patchwork.views.confirm', kwargs={'key': conf.key})
self.assertEqual(len(mail.outbox), 1)
msg = mail.outbox[0]
self.assertEqual(msg.to, [email])
response = self.client.post(self.url, {'email': ''})
self.assertEqual(response.status_code, 200)
self.assertFormError(response, 'form', 'email',
- 'This field is required.')
+ 'This field is required.')
self.assertTrue(response.context['error'])
self.assertNotIn('email_sent', response.context)
self.assertEqual(len(mail.outbox), 0)
self.assertNotIn('email_sent', response.context)
self.assertEqual(len(mail.outbox), 0)
+
class OptoutTest(TestCase):
def setUp(self):
self.url = reverse('patchwork.views.mail.optout')
self.email = u'foo@example.com'
- self.conf = EmailConfirmation(type = 'optout', email = self.email)
+ self.conf = EmailConfirmation(type='optout', email=self.email)
self.conf.save()
def testOptoutValidHash(self):
url = reverse('patchwork.views.confirm',
- kwargs = {'key': self.conf.key})
+ kwargs={'key': self.conf.key})
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
# check that the confirmation is now inactive
self.assertFalse(EmailConfirmation.objects.get(
- pk = self.conf.pk).active)
+ pk=self.conf.pk).active)
class OptoutPreexistingTest(OptoutTest):
+
"""Test that a duplicated opt-out behaves the same as the initial one"""
+
def setUp(self):
super(OptoutPreexistingTest, self).setUp()
- EmailOptout(email = self.email).save()
+ EmailOptout(email=self.email).save()
+
class OptinRequestTest(TestCase):
def setUp(self):
self.url = reverse('patchwork.views.mail.optin')
self.email = u'foo@example.com'
- EmailOptout(email = self.email).save()
+ EmailOptout(email=self.email).save()
def testOptInRequestGET(self):
response = self.client.get(self.url)
- self.assertRedirects(response, reverse('patchwork.views.mail.settings'))
+ self.assertRedirects(
+ response, reverse('patchwork.views.mail.settings'))
def testOptInRequestValidPOST(self):
response = self.client.post(self.url, {'email': self.email})
# check for a confirmation object
self.assertEqual(EmailConfirmation.objects.count(), 1)
- conf = EmailConfirmation.objects.get(email = self.email)
+ conf = EmailConfirmation.objects.get(email=self.email)
# check confirmation page
self.assertEqual(response.status_code, 200)
self.assertContains(response, self.email)
# check email
- url = reverse('patchwork.views.confirm', kwargs = {'key': conf.key})
+ url = reverse('patchwork.views.confirm', kwargs={'key': conf.key})
self.assertEqual(len(mail.outbox), 1)
msg = mail.outbox[0]
self.assertEqual(msg.to, [self.email])
response = self.client.post(self.url, {'email': ''})
self.assertEqual(response.status_code, 200)
self.assertFormError(response, 'form', 'email',
- 'This field is required.')
+ 'This field is required.')
self.assertTrue(response.context['error'])
self.assertNotIn('email_sent', response.context)
self.assertEqual(len(mail.outbox), 0)
self.assertNotIn('email_sent', response.context)
self.assertEqual(len(mail.outbox), 0)
+
class OptinTest(TestCase):
def setUp(self):
self.email = u'foo@example.com'
- self.optout = EmailOptout(email = self.email)
+ self.optout = EmailOptout(email=self.email)
self.optout.save()
- self.conf = EmailConfirmation(type = 'optin', email = self.email)
+ self.conf = EmailConfirmation(type='optin', email=self.email)
self.conf.save()
def testOptinValidHash(self):
url = reverse('patchwork.views.confirm',
- kwargs = {'key': self.conf.key})
+ kwargs={'key': self.conf.key})
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
# check that the confirmation is now inactive
self.assertFalse(EmailConfirmation.objects.get(
- pk = self.conf.pk).active)
+ pk=self.conf.pk).active)
+
class OptinWithoutOptoutTest(TestCase):
+
"""Test an opt-in with no existing opt-out"""
def setUp(self):
self.assertTrue(bool(response.context['error']))
self.assertContains(response, 'not on the patchwork opt-out list')
+
class UserProfileOptoutFormTest(TestCase):
+
"""Test that the correct optin/optout forms appear on the user profile
page, for logged-in users"""
self.secondary_email = 'test2@example.com'
self.user = create_user()
- self.client.login(username = self.user.username,
- password = self.user.username)
+ self.client.login(username=self.user.username,
+ password=self.user.username)
def _form_re(self, url, email):
return re.compile(self.form_re_template % {'url': url, 'email': email},
self.assertTrue(form_re.search(response.content.decode()) is not None)
def testMainEmailOptinForm(self):
- EmailOptout(email = self.user.email).save()
+ EmailOptout(email=self.user.email).save()
form_re = self._form_re(self.optin_url, self.user.email)
response = self.client.get(self.url)
self.assertEqual(response.status_code, 200)
self.assertTrue(form_re.search(response.content.decode()) is not None)
def testSecondaryEmailOptoutForm(self):
- p = Person(email = self.secondary_email, user = self.user)
+ p = Person(email=self.secondary_email, user=self.user)
p.save()
form_re = self._form_re(self.optout_url, p.email)
response = self.client.get(self.url)
self.assertTrue(form_re.search(response.content.decode()) is not None)
def testSecondaryEmailOptinForm(self):
- p = Person(email = self.secondary_email, user = self.user)
+ p = Person(email=self.secondary_email, user=self.user)
p.save()
- EmailOptout(email = p.email).save()
+ EmailOptout(email=p.email).save()
form_re = self._form_re(self.optin_url, p.email)
response = self.client.get(self.url)
fixtures = ['default_states']
""" Test that the mbox view appends the Acked-by from a patch comment """
+
def setUp(self):
defaults.project.save()
self.person = defaults.patch_author_person
self.person.save()
- self.patch = Patch(project = defaults.project,
- msgid = 'p1', name = 'testpatch',
- submitter = self.person, content = '')
+ self.patch = Patch(project=defaults.project,
+ msgid='p1', name='testpatch',
+ submitter=self.person, content='')
self.patch.save()
- comment = Comment(patch = self.patch, msgid = 'p1',
- submitter = self.person,
- content = 'comment 1 text\nAcked-by: 1\n')
+ comment = Comment(patch=self.patch, msgid='p1',
+ submitter=self.person,
+ content='comment 1 text\nAcked-by: 1\n')
comment.save()
- comment = Comment(patch = self.patch, msgid = 'p2',
- submitter = self.person,
- content = 'comment 2 text\nAcked-by: 2\n')
+ comment = Comment(patch=self.patch, msgid='p2',
+ submitter=self.person,
+ content='comment 2 text\nAcked-by: 2\n')
comment.save()
def testPatchResponse(self):
response = self.client.get('/patch/%d/mbox/' % self.patch.id)
self.assertContains(response,
- 'Acked-by: 1\nAcked-by: 2\n')
+ 'Acked-by: 1\nAcked-by: 2\n')
+
class MboxPatchSplitResponseTest(TestCase):
fixtures = ['default_states']
""" Test that the mbox view appends the Acked-by from a patch comment,
and places it before an '---' update line. """
+
def setUp(self):
defaults.project.save()
self.person = defaults.patch_author_person
self.person.save()
- self.patch = Patch(project = defaults.project,
- msgid = 'p1', name = 'testpatch',
- submitter = self.person, content = '')
+ self.patch = Patch(project=defaults.project,
+ msgid='p1', name='testpatch',
+ submitter=self.person, content='')
self.patch.save()
- comment = Comment(patch = self.patch, msgid = 'p1',
- submitter = self.person,
- content = 'comment 1 text\nAcked-by: 1\n---\nupdate\n')
+ comment = Comment(patch=self.patch, msgid='p1',
+ submitter=self.person,
+ content='comment 1 text\nAcked-by: 1\n---\nupdate\n')
comment.save()
- comment = Comment(patch = self.patch, msgid = 'p2',
- submitter = self.person,
- content = 'comment 2 text\nAcked-by: 2\n')
+ comment = Comment(patch=self.patch, msgid='p2',
+ submitter=self.person,
+ content='comment 2 text\nAcked-by: 2\n')
comment.save()
def testPatchResponse(self):
response = self.client.get('/patch/%d/mbox/' % self.patch.id)
self.assertContains(response,
- 'Acked-by: 1\nAcked-by: 2\n')
+ 'Acked-by: 1\nAcked-by: 2\n')
+
class MboxPassThroughHeaderTest(TestCase):
fixtures = ['default_states']
self.to_header = 'To: To Person <to@example.com>'
self.date_header = 'Date: Fri, 7 Jun 2013 15:42:54 +1000'
- self.patch = Patch(project = defaults.project,
- msgid = 'p1', name = 'testpatch',
- submitter = self.person, content = '')
+ self.patch = Patch(project=defaults.project,
+ msgid='p1', name='testpatch',
+ submitter=self.person, content='')
def testCCHeader(self):
self.patch.headers = self.cc_header + '\n'
response = self.client.get('/patch/%d/mbox/' % self.patch.id)
self.assertContains(response, self.date_header)
+
class MboxBrokenFromHeaderTest(TestCase):
fixtures = ['default_states']
self.person.name = u'©ool guŷ'
self.person.save()
- self.patch = Patch(project = defaults.project,
- msgid = 'p1', name = 'testpatch',
- submitter = self.person, content = '')
+ self.patch = Patch(project=defaults.project,
+ msgid='p1', name='testpatch',
+ submitter=self.person, content='')
def testFromHeader(self):
self.patch.save()
response = self.client.get('/patch/%d/mbox/' % self.patch.id)
self.assertContains(response, from_email)
+
class MboxDateHeaderTest(TestCase):
fixtures = ['default_states']
self.person = defaults.patch_author_person
self.person.save()
- self.patch = Patch(project = defaults.project,
- msgid = 'p1', name = 'testpatch',
- submitter = self.person, content = '')
+ self.patch = Patch(project=defaults.project,
+ msgid='p1', name='testpatch',
+ submitter=self.person, content='')
self.patch.save()
def testDateHeader(self):
mail_date = dateutil.parser.parse(mail['Date'])
# patch dates are all in UTC
patch_date = self.patch.date.replace(tzinfo=dateutil.tz.tzutc(),
- microsecond=0)
+ microsecond=0)
self.assertEqual(mail_date, patch_date)
def testSuppliedDateHeader(self):
hour_offset = 3
tz = dateutil.tz.tzoffset(None, hour_offset * 60 * 60)
- date = datetime.datetime.utcnow() - datetime.timedelta(days = 1)
+ date = datetime.datetime.utcnow() - datetime.timedelta(days=1)
date = date.replace(tzinfo=tz, microsecond=0)
self.patch.headers = 'Date: %s\n' % date.strftime("%a, %d %b %Y %T %z")
mail_date = dateutil.parser.parse(mail['Date'])
self.assertEqual(mail_date, date)
+
class MboxCommentPostcriptUnchangedTest(TestCase):
fixtures = ['default_states']
""" Test that the mbox view doesn't change the postscript part of a mail.
There where always a missing blank right after the postscript
delimiter '---' and an additional newline right before. """
+
def setUp(self):
defaults.project.save()
self.person = defaults.patch_author_person
self.person.save()
- self.patch = Patch(project = defaults.project,
- msgid = 'p1', name = 'testpatch',
- submitter = self.person, content = '')
+ self.patch = Patch(project=defaults.project,
+ msgid='p1', name='testpatch',
+ submitter=self.person, content='')
self.patch.save()
self.txt = 'some comment\n---\n some/file | 1 +\n'
- comment = Comment(patch = self.patch, msgid = 'p1',
- submitter = self.person,
- content = self.txt)
+ comment = Comment(patch=self.patch, msgid='p1',
+ submitter=self.person,
+ content=self.txt)
comment.save()
def testCommentUnchanged(self):
self.project.save()
self.submitter = defaults.patch_author_person
self.submitter.save()
- self.patch = Patch(project = self.project, msgid = 'testpatch',
- name = 'testpatch', content = '',
- submitter = self.submitter)
+ self.patch = Patch(project=self.project, msgid='testpatch',
+ name='testpatch', content='',
+ submitter=self.submitter)
def tearDown(self):
self.patch.delete()
"""Ensure we get a notification for interesting patch changes"""
self.patch.save()
oldstate = self.patch.state
- state = State.objects.exclude(pk = oldstate.pk)[0]
+ state = State.objects.exclude(pk=oldstate.pk)[0]
self.patch.state = state
self.patch.save()
"""Ensure we cancel notifications that are no longer valid"""
self.patch.save()
oldstate = self.patch.state
- state = State.objects.exclude(pk = oldstate.pk)[0]
+ state = State.objects.exclude(pk=oldstate.pk)[0]
self.patch.state = state
self.patch.save()
but keep the original patch details"""
self.patch.save()
oldstate = self.patch.state
- newstates = State.objects.exclude(pk = oldstate.pk)[:2]
+ newstates = State.objects.exclude(pk=oldstate.pk)[:2]
self.patch.state = newstates[0]
self.patch.save()
notification = PatchChangeNotification.objects.all()[0]
self.assertEqual(notification.orig_state, oldstate)
orig_timestamp = notification.last_modified
-
+
self.patch.state = newstates[1]
self.patch.save()
self.assertEqual(PatchChangeNotification.objects.count(), 1)
self.patch.save()
oldstate = self.patch.state
- state = State.objects.exclude(pk = oldstate.pk)[0]
+ state = State.objects.exclude(pk=oldstate.pk)[0]
self.patch.state = state
self.patch.save()
self.assertEqual(PatchChangeNotification.objects.count(), 0)
+
class PatchNotificationEmailTest(TestCase):
fixtures = ['default_states']
self.project.save()
self.submitter = defaults.patch_author_person
self.submitter.save()
- self.patch = Patch(project = self.project, msgid = 'testpatch',
- name = 'testpatch', content = '',
- submitter = self.submitter)
+ self.patch = Patch(project=self.project, msgid='testpatch',
+ name='testpatch', content='',
+ submitter=self.submitter)
self.patch.save()
def tearDown(self):
def _expireNotifications(self, **kwargs):
timestamp = datetime.datetime.now() - \
- datetime.timedelta(minutes =
- settings.NOTIFICATION_DELAY_MINUTES + 1)
+ datetime.timedelta(minutes=settings.NOTIFICATION_DELAY_MINUTES + 1)
qs = PatchChangeNotification.objects.all()
if kwargs:
qs = qs.filter(**kwargs)
- qs.update(last_modified = timestamp)
+ qs.update(last_modified=timestamp)
def testNoNotifications(self):
self.assertEqual(send_notifications(), [])
def testNoReadyNotifications(self):
""" We shouldn't see immediate notifications"""
- PatchChangeNotification(patch = self.patch,
- orig_state = self.patch.state).save()
+ PatchChangeNotification(patch=self.patch,
+ orig_state=self.patch.state).save()
errors = send_notifications()
self.assertEqual(errors, [])
self.assertEqual(len(mail.outbox), 0)
def testNotifications(self):
- PatchChangeNotification(patch = self.patch,
- orig_state = self.patch.state).save()
+ PatchChangeNotification(patch=self.patch,
+ orig_state=self.patch.state).save()
self._expireNotifications()
errors = send_notifications()
def testNotificationEscaping(self):
self.patch.name = 'Patch name with " character'
self.patch.save()
- PatchChangeNotification(patch = self.patch,
- orig_state = self.patch.state).save()
+ PatchChangeNotification(patch=self.patch,
+ orig_state=self.patch.state).save()
self._expireNotifications()
errors = send_notifications()
def testNotificationOptout(self):
"""ensure opt-out addresses don't get notifications"""
- PatchChangeNotification(patch = self.patch,
- orig_state = self.patch.state).save()
+ PatchChangeNotification(patch=self.patch,
+ orig_state=self.patch.state).save()
self._expireNotifications()
- EmailOptout(email = self.submitter.email).save()
+ EmailOptout(email=self.submitter.email).save()
errors = send_notifications()
self.assertEqual(errors, [])
def testNotificationMerge(self):
patches = [self.patch,
- Patch(project = self.project, msgid = 'testpatch-2',
- name = 'testpatch 2', content = '',
- submitter = self.submitter)]
+ Patch(project=self.project, msgid='testpatch-2',
+ name='testpatch 2', content='',
+ submitter=self.submitter)]
for patch in patches:
patch.save()
- PatchChangeNotification(patch = patch,
- orig_state = patch.state).save()
+ PatchChangeNotification(patch=patch,
+ orig_state=patch.state).save()
self.assertEqual(PatchChangeNotification.objects.count(), len(patches))
self._expireNotifications()
at least one within the notification delay, that other notifications
are held"""
patches = [self.patch,
- Patch(project = self.project, msgid = 'testpatch-2',
- name = 'testpatch 2', content = '',
- submitter = self.submitter)]
+ Patch(project=self.project, msgid='testpatch-2',
+ name='testpatch 2', content='',
+ submitter=self.submitter)]
for patch in patches:
patch.save()
- PatchChangeNotification(patch = patch,
- orig_state = patch.state).save()
+ PatchChangeNotification(patch=patch,
+ orig_state=patch.state).save()
self.assertEqual(PatchChangeNotification.objects.count(), len(patches))
self._expireNotifications()
# update one notification, to bring it out of the notification delay
- patches[0].state = State.objects.exclude(pk = patches[0].state.pk)[0]
+ patches[0].state = State.objects.exclude(pk=patches[0].state.pk)[0]
patches[0].save()
# the updated notification should prevent the other from being sent
def setUp(self):
self.orig_patch = read_patch(self.patch_filename)
- email = create_email(self.test_comment, multipart = True)
- attachment = MIMEText(self.orig_patch, _subtype = self.content_subtype)
+ email = create_email(self.test_comment, multipart=True)
+ attachment = MIMEText(self.orig_patch, _subtype=self.content_subtype)
email.attach(attachment)
(self.patch, self.comment) = find_content(self.project, email)
+
class AttachmentXDiffPatchTest(AttachmentPatchTest):
content_subtype = 'x-diff'
+
class UTF8InlinePatchTest(InlinePatchTest):
patch_filename = '0002-utf-8.patch'
patch_encoding = 'utf-8'
def setUp(self):
self.orig_patch = read_patch(self.patch_filename, self.patch_encoding)
email = create_email(self.test_comment + '\n' + self.orig_patch,
- content_encoding = self.patch_encoding)
+ content_encoding=self.patch_encoding)
(self.patch, self.comment) = find_content(self.project, email)
+
class NoCharsetInlinePatchTest(InlinePatchTest):
+
""" Test mails with no content-type or content-encoding header """
patch_filename = '0001-add-line.patch'
del email['Content-Transfer-Encoding']
(self.patch, self.comment) = find_content(self.project, email)
+
class SignatureCommentTest(InlinePatchTest):
patch_filename = '0001-add-line.patch'
test_comment = 'Test comment\nmore comment'
def setUp(self):
self.orig_patch = read_patch(self.patch_filename)
- email = create_email( \
- self.test_comment + '\n' + \
- '-- \nsig\n' + self.orig_patch)
+ email = create_email(
+ self.test_comment + '\n' +
+ '-- \nsig\n' + self.orig_patch)
(self.patch, self.comment) = find_content(self.project, email)
def setUp(self):
self.orig_patch = read_patch(self.patch_filename)
- email = create_email( \
- self.test_comment + '\n' + \
- '_______________________________________________\n' + \
- 'Linuxppc-dev mailing list\n' + \
- self.orig_patch)
+ email = create_email(
+ self.test_comment + '\n' +
+ '_______________________________________________\n' +
+ 'Linuxppc-dev mailing list\n' +
+ self.orig_patch)
(self.patch, self.comment) = find_content(self.project, email)
class UpdateCommentTest(InlinePatchTest):
+
""" Test for '---\nUpdate: v2' style comments to patches. """
patch_filename = '0001-add-line.patch'
test_comment = 'Test comment\nmore comment\n---\nUpdate: test update'
+
class UpdateSigCommentTest(SignatureCommentTest):
+
""" Test for '---\nUpdate: v2' style comments to patches, with a sig """
patch_filename = '0001-add-line.patch'
test_comment = 'Test comment\nmore comment\n---\nUpdate: test update'
+
class SenderEncodingTest(TestCase):
sender_name = u'example user'
sender_email = 'user@example.com'
self.assertEqual(self.person.email, self.sender_email)
def testDBQueryName(self):
- db_person = Person.objects.get(name = self.sender_name)
+ db_person = Person.objects.get(name=self.sender_name)
self.assertEqual(self.person, db_person)
def testDBQueryEmail(self):
- db_person = Person.objects.get(email = self.sender_email)
+ db_person = Person.objects.get(email=self.sender_email)
self.assertEqual(self.person, db_person)
sender_name = u'\xe9xample user'
from_header = '=?utf-8?q?=C3=A9xample=20user?= <user@example.com>'
+
class SenderUTF8QPSplitEncodingTest(SenderEncodingTest):
sender_name = u'\xe9xample user'
from_header = '=?utf-8?q?=C3=A9xample?= user <user@example.com>'
+
class SenderUTF8B64EncodingTest(SenderUTF8QPEncodingTest):
from_header = '=?utf-8?B?w6l4YW1wbGUgdXNlcg==?= <user@example.com>'
+
class SubjectEncodingTest(PatchTest):
sender = 'example user <user@example.com>'
subject = 'test subject'
(patch, comment) = find_content(self.project, self.email)
self.assertEqual(patch.name, self.subject)
+
class SubjectUTF8QPEncodingTest(SubjectEncodingTest):
subject = u'test s\xfcbject'
subject_header = '=?utf-8?q?test=20s=c3=bcbject?='
+
class SubjectUTF8QPMultipleEncodingTest(SubjectEncodingTest):
subject = u'test s\xfcbject'
subject_header = 'test =?utf-8?q?s=c3=bcbject?='
+
class SenderCorrelationTest(TestCase):
existing_sender = 'Existing Sender <existing@example.com>'
non_existing_sender = 'Non-existing Sender <nonexisting@example.com>'
def tearDown(self):
self.person.delete()
+
class MultipleProjectPatchTest(TestCase):
+
""" Test that patches sent to multiple patchwork projects are
handled correctly """
msgid = '<1@example.com>'
def setUp(self):
- self.p1 = Project(linkname = 'test-project-1', name = 'Project 1',
- listid = '1.example.com', listemail='1@example.com')
- self.p2 = Project(linkname = 'test-project-2', name = 'Project 2',
- listid = '2.example.com', listemail='2@example.com')
+ self.p1 = Project(linkname='test-project-1', name='Project 1',
+ listid='1.example.com', listemail='1@example.com')
+ self.p2 = Project(linkname='test-project-2', name='Project 2',
+ listid='2.example.com', listemail='2@example.com')
self.p1.save()
self.p2.save()
parse_mail(email)
def testParsedProjects(self):
- self.assertEqual(Patch.objects.filter(project = self.p1).count(), 1)
- self.assertEqual(Patch.objects.filter(project = self.p2).count(), 1)
+ self.assertEqual(Patch.objects.filter(project=self.p1).count(), 1)
+ self.assertEqual(Patch.objects.filter(project=self.p2).count(), 1)
def tearDown(self):
self.p1.delete()
class MultipleProjectPatchCommentTest(MultipleProjectPatchTest):
+
""" Test that followups to multiple-project patches end up on the
correct patch """
def testParsedComment(self):
for project in [self.p1, self.p2]:
- patch = Patch.objects.filter(project = project)[0]
+ patch = Patch.objects.filter(project=project)[0]
# we should see two comments now - the original mail with the patch,
# and the one we parsed in setUp()
- self.assertEqual(Comment.objects.filter(patch = patch).count(), 2)
+ self.assertEqual(Comment.objects.filter(patch=patch).count(), 2)
+
class ListIdHeaderTest(TestCase):
+
""" Test that we parse List-Id headers from mails correctly """
+
def setUp(self):
- self.project = Project(linkname = 'test-project-1', name = 'Project 1',
- listid = '1.example.com', listemail='1@example.com')
+ self.project = Project(linkname='test-project-1', name='Project 1',
+ listid='1.example.com', listemail='1@example.com')
self.project.save()
def testNoListId(self):
def tearDown(self):
self.project.delete()
+
class MBoxPatchTest(PatchTest):
+
def setUp(self):
- self.mail = read_mail(self.mail_file, project = self.project)
+ self.mail = read_mail(self.mail_file, project=self.project)
+
class GitPullTest(MBoxPatchTest):
mail_file = '0001-git-pull-request.mbox'
self.assertTrue(patch.content is None)
self.assertTrue(comment is not None)
+
class GitPullWrappedTest(GitPullTest):
mail_file = '0002-git-pull-request-wrapped.mbox'
+
class GitPullWithDiffTest(MBoxPatchTest):
mail_file = '0003-git-pull-request-with-diff.mbox'
(patch, comment) = find_content(self.project, self.mail)
self.assertTrue(patch is not None)
self.assertEqual('git://git.kernel.org/pub/scm/linux/kernel/git/tip/' +
- 'linux-2.6-tip.git x86-fixes-for-linus', patch.pull_url)
+ 'linux-2.6-tip.git x86-fixes-for-linus', patch.pull_url)
self.assertTrue(
- patch.content.startswith('diff --git a/arch/x86/include/asm/smp.h'),
+ patch.content.startswith(
+ 'diff --git a/arch/x86/include/asm/smp.h'),
patch.content)
self.assertTrue(comment is not None)
+
class GitPullGitSSHUrlTest(GitPullTest):
mail_file = '0004-git-pull-request-git+ssh.mbox'
+
class GitPullSSHUrlTest(GitPullTest):
mail_file = '0005-git-pull-request-ssh.mbox'
+
class GitPullHTTPUrlTest(GitPullTest):
mail_file = '0006-git-pull-request-http.mbox'
+
class GitRenameOnlyTest(MBoxPatchTest):
mail_file = '0008-git-rename.mbox'
self.assertEqual(patch.content.count("\nrename from "), 2)
self.assertEqual(patch.content.count("\nrename to "), 2)
+
class GitRenameWithDiffTest(MBoxPatchTest):
mail_file = '0009-git-rename-with-diff.mbox'
self.assertEqual(patch.content.count("\nrename to "), 2)
self.assertEqual(patch.content.count('\n-a\n+b'), 1)
+
class CVSFormatPatchTest(MBoxPatchTest):
mail_file = '0007-cvs-format-diff.mbox'
self.assertTrue(comment is not None)
self.assertTrue(patch.content.startswith('Index'))
+
class CharsetFallbackPatchTest(MBoxPatchTest):
+
""" Test mail with and invalid charset name, and check that we can parse
with one of the fallback encodings"""
self.assertTrue(patch is not None)
self.assertTrue(comment is not None)
+
class NoNewlineAtEndOfFilePatchTest(MBoxPatchTest):
mail_file = '0011-no-newline-at-end-of-file.mbox'
(patch, comment) = find_content(self.project, self.mail)
self.assertTrue(patch is not None)
self.assertTrue(comment is not None)
- self.assertTrue(patch.content.startswith('diff --git a/tools/testing/selftests/powerpc/Makefile'))
+ self.assertTrue(patch.content.startswith(
+ 'diff --git a/tools/testing/selftests/powerpc/Makefile'))
# Confirm the trailing no newline marker doesn't end up in the comment
- self.assertFalse(comment.content.rstrip().endswith('\ No newline at end of file'))
+ self.assertFalse(
+ comment.content.rstrip().endswith('\ No newline at end of file'))
# Confirm it's instead at the bottom of the patch
- self.assertTrue(patch.content.rstrip().endswith('\ No newline at end of file'))
+ self.assertTrue(
+ patch.content.rstrip().endswith('\ No newline at end of file'))
# Confirm we got both markers
self.assertEqual(2, patch.content.count('\ No newline at end of file'))
+
class DelegateRequestTest(TestCase):
fixtures = ['default_states']
patch_filename = '0001-add-line.patch'
def setUp(self):
self.patch = read_patch(self.patch_filename)
self.user = create_user()
- self.p1 = Project(linkname = 'test-project-1', name = 'Project 1',
- listid = '1.example.com', listemail='1@example.com')
+ self.p1 = Project(linkname='test-project-1', name='Project 1',
+ listid='1.example.com', listemail='1@example.com')
self.p1.save()
def get_email(self):
self.p1.delete()
self.user.delete()
+
class InitialPatchStateTest(TestCase):
fixtures = ['default_states']
patch_filename = '0001-add-line.patch'
def setUp(self):
self.patch = read_patch(self.patch_filename)
self.user = create_user()
- self.p1 = Project(linkname = 'test-project-1', name = 'Project 1',
- listid = '1.example.com', listemail='1@example.com')
+ self.p1 = Project(linkname='test-project-1', name='Project 1',
+ listid='1.example.com', listemail='1@example.com')
self.p1.save()
self.default_state = get_default_initial_patch_state()
self.nondefault_state = State.objects.get(name="Accepted")
self.p1.delete()
self.user.delete()
+
class ParseInitialTagsTest(PatchTest):
patch_filename = '0001-add-line.patch'
test_comment = ('test comment\n\n' +
- 'Tested-by: Test User <test@example.com>\n' +
- 'Reviewed-by: Test User <test@example.com>\n')
+ 'Tested-by: Test User <test@example.com>\n' +
+ 'Reviewed-by: Test User <test@example.com>\n')
fixtures = ['default_tags', 'default_states']
def setUp(self):
project.save()
self.orig_patch = read_patch(self.patch_filename)
email = create_email(self.test_comment + '\n' + self.orig_patch,
- project = project)
+ project=project)
email['Message-Id'] = '<1@example.com>'
parse_mail(email)
self.assertEqual(Patch.objects.count(), 1)
patch = Patch.objects.all()[0]
self.assertEqual(patch.patchtag_set.filter(
- tag__name='Acked-by').count(), 0)
+ tag__name='Acked-by').count(), 0)
self.assertEqual(patch.patchtag_set.get(
- tag__name='Reviewed-by').count, 1)
+ tag__name='Reviewed-by').count, 1)
self.assertEqual(patch.patchtag_set.get(
- tag__name='Tested-by').count, 1)
+ tag__name='Tested-by').count, 1)
+
class PrefixTest(TestCase):
self.assertEqual(split_prefixes('PATCH,RFC'), ['PATCH', 'RFC'])
self.assertEqual(split_prefixes('PATCH 1/2'), ['PATCH', '1/2'])
+
class SubjectTest(TestCase):
def testCleanSubject(self):
self.assertEqual(clean_subject('[PATCH,RFC] meep'), '[RFC] meep')
self.assertEqual(clean_subject('[PATCH,1/2] meep'), '[1/2] meep')
self.assertEqual(clean_subject('[PATCH RFC 1/2] meep'),
- '[RFC,1/2] meep')
+ '[RFC,1/2] meep')
self.assertEqual(clean_subject('[PATCH] [RFC] meep'),
- '[RFC] meep')
+ '[RFC] meep')
self.assertEqual(clean_subject('[PATCH] [RFC,1/2] meep'),
- '[RFC,1/2] meep')
+ '[RFC,1/2] meep')
self.assertEqual(clean_subject('[PATCH] [RFC] [1/2] meep'),
- '[RFC,1/2] meep')
+ '[RFC,1/2] meep')
self.assertEqual(clean_subject('[PATCH] rewrite [a-z] regexes'),
- 'rewrite [a-z] regexes')
+ 'rewrite [a-z] regexes')
self.assertEqual(clean_subject('[PATCH] [RFC] rewrite [a-z] regexes'),
- '[RFC] rewrite [a-z] regexes')
+ '[RFC] rewrite [a-z] regexes')
self.assertEqual(clean_subject('[foo] [bar] meep', ['foo']),
- '[bar] meep')
+ '[bar] meep')
self.assertEqual(clean_subject('[FOO] [bar] meep', ['foo']),
- '[bar] meep')
+ '[bar] meep')
from patchwork.models import EmailConfirmation, Person, Bundle
+
class SubmitterCompletionTest(TestCase):
+
def setUp(self):
self.people = [
- Person(name = "Test Name", email = "test1@example.com"),
- Person(email = "test2@example.com"),
+ Person(name="Test Name", email="test1@example.com"),
+ Person(email="test2@example.com"),
]
list(map(lambda p: p.save(), self.people))
self.assertEqual(data[0]['email'], 'test2@example.com')
def testCompleteLimit(self):
- for i in range(3,10):
- person = Person(email = 'test%d@example.com' % i)
+ for i in range(3, 10):
+ person = Person(email='test%d@example.com' % i)
person.save()
response = self.client.get('/submitter/', {'q': 'test', 'l': 5})
self.assertEqual(response.status_code, 200)
def _confirmation_url(conf):
- return reverse('patchwork.views.confirm', kwargs = {'key': conf.key})
+ return reverse('patchwork.views.confirm', kwargs={'key': conf.key})
+
class TestUser(object):
firstname = 'Test'
email = 'test@example.com'
password = 'foobar'
+
class RegistrationTest(TestCase):
+
def setUp(self):
self.user = TestUser()
self.client = Client()
response = self.client.post('/register/', data)
self.assertEqual(response.status_code, 200)
self.assertFormError(response, 'form', 'username',
- 'This username is already taken. Please choose another.')
+ 'This username is already taken. Please choose another.')
def testExistingEmail(self):
user = create_user()
response = self.client.post('/register/', data)
self.assertEqual(response.status_code, 200)
self.assertFormError(response, 'form', 'email',
- 'This email address is already in use ' + \
- 'for the account "%s".\n' % user.username)
+ 'This email address is already in use ' +
+ 'for the account "%s".\n' % user.username)
def testValidRegistration(self):
response = self.client.post('/register/', self.default_data)
self.assertContains(response, 'confirmation email has been sent')
# check for presence of an inactive user object
- users = User.objects.filter(username = self.user.username)
+ users = User.objects.filter(username=self.user.username)
self.assertEqual(users.count(), 1)
user = users[0]
self.assertEqual(user.username, self.user.username)
self.assertEqual(user.is_active, False)
# check for confirmation object
- confs = EmailConfirmation.objects.filter(user = user,
- type = 'registration')
+ confs = EmailConfirmation.objects.filter(user=user,
+ type='registration')
self.assertEqual(len(confs), 1)
conf = confs[0]
self.assertEqual(conf.email, self.user.email)
response = self.client.get(_confirmation_url(conf))
self.assertEqual(response.status_code, 200)
+
class RegistrationConfirmationTest(TestCase):
def setUp(self):
response = self.client.get(_confirmation_url(conf))
self.assertEqual(response.status_code, 200)
- self.assertTemplateUsed(response, 'patchwork/registration-confirm.html')
+ self.assertTemplateUsed(
+ response, 'patchwork/registration-confirm.html')
- conf = EmailConfirmation.objects.get(pk = conf.pk)
+ conf = EmailConfirmation.objects.get(pk=conf.pk)
self.assertTrue(conf.user.is_active)
self.assertFalse(conf.active)
response = self.client.get(_confirmation_url(conf))
self.assertEqual(response.status_code, 200)
- qs = Person.objects.filter(email = self.user.email)
+ qs = Person.objects.filter(email=self.user.email)
self.assertTrue(qs.exists())
- person = Person.objects.get(email = self.user.email)
+ person = Person.objects.get(email=self.user.email)
self.assertEqual(person.name,
- self.user.firstname + ' ' + self.user.lastname)
+ self.user.firstname + ' ' + self.user.lastname)
def testRegistrationExistingPersonSetup(self):
""" Check that the person object created after registration has the
correct details """
- fullname = self.user.firstname + ' ' + self.user.lastname
- person = Person(name = fullname, email = self.user.email)
+ fullname = self.user.firstname + ' ' + self.user.lastname
+ person = Person(name=fullname, email=self.user.email)
person.save()
# register
response = self.client.get(_confirmation_url(conf))
self.assertEqual(response.status_code, 200)
- person = Person.objects.get(email = self.user.email)
+ person = Person.objects.get(email=self.user.email)
self.assertEqual(person.name, fullname)
""" Check that an unconfirmed registration can't modify an existing
Person object"""
- fullname = self.user.firstname + ' ' + self.user.lastname
- person = Person(name = fullname, email = self.user.email)
+ fullname = self.user.firstname + ' ' + self.user.lastname
+ person = Person(name=fullname, email=self.user.email)
person.save()
# register
response = self.client.post('/register/', data)
self.assertEqual(response.status_code, 200)
- self.assertEqual(Person.objects.get(pk = person.pk).name, fullname)
+ self.assertEqual(Person.objects.get(pk=person.pk).name, fullname)
def assertTagsEqual(self, str, acks, reviews, tests):
counts = extract_tags(str, Tag.objects.all())
self.assertEqual((acks, reviews, tests),
- (counts[Tag.objects.get(name='Acked-by')],
- counts[Tag.objects.get(name='Reviewed-by')],
- counts[Tag.objects.get(name='Tested-by')]))
+ (counts[Tag.objects.get(name='Acked-by')],
+ counts[Tag.objects.get(name='Reviewed-by')],
+ counts[Tag.objects.get(name='Tested-by')]))
def testEmpty(self):
self.assertTagsEqual("", 0, 0, 0)
def testMultipleTypes(self):
str = "Acked-by: %s\nAcked-by: %s\nReviewed-by: %s\n" % (
- (self.name_email,) * 3)
+ (self.name_email,) * 3)
self.assertTagsEqual(str, 2, 1, 0)
def testLower(self):
def testAckInReply(self):
self.assertTagsEqual("> Acked-by: %s\n" % self.name_email, 0, 0, 0)
+
class PatchTagsTest(TransactionTestCase):
ACK = 1
REVIEW = 2
self.assertEqual(counts, (acks, reviews, tests))
- def create_tag(self, tagtype = None):
+ def create_tag(self, tagtype=None):
tags = {
self.ACK: 'Acked',
self.REVIEW: 'Reviewed',
return ''
return '%s-by: %s\n' % (tags[tagtype], self.tagger)
- def create_tag_comment(self, patch, tagtype = None):
+ def create_tag_comment(self, patch, tagtype=None):
comment = Comment(patch=patch, msgid=str(datetime.datetime.now()),
- submitter=defaults.patch_author_person,
- content=self.create_tag(tagtype))
+ submitter=defaults.patch_author_person,
+ content=self.create_tag(tagtype))
comment.save()
return comment
def setUp(self):
settings.DEBUG = True
project = Project(linkname='test-project', name='Test Project',
- use_tags=True)
+ use_tags=True)
project.save()
defaults.patch_author_person.save()
self.patch = Patch(project=project,
c1.save()
self.assertTagsEqual(self.patch, 1, 1, 0)
+
class PatchTagManagerTest(PatchTagsTest):
def assertTagsEqual(self, patch, acks, reviews, tests):
# projects table.
with self.assertNumQueries(2):
patch = Patch.objects.with_tag_counts(project=patch.project) \
- .get(pk = patch.pk)
+ .get(pk=patch.pk)
counts = (
getattr(patch, tagattrs['Acked-by']),
)
self.assertEqual(counts, (acks, reviews, tests))
-
def setUp(self):
defaults.project.save()
self.user = create_maintainer(defaults.project)
- self.client.login(username = self.user.username,
- password = self.user.username)
+ self.client.login(username=self.user.username,
+ password=self.user.username)
self.properties_form_id = 'patchform-properties'
self.url = reverse(
- 'patchwork.views.patch.list', args = [defaults.project.linkname])
+ 'patchwork.views.patch.list', args=[defaults.project.linkname])
self.base_data = {
'action': 'Update', 'project': str(defaults.project.id),
'form': 'patchlistform', 'archived': '*', 'delegate': '*',
'state': '*'}
self.patches = []
for name in ['patch one', 'patch two', 'patch three']:
- patch = Patch(project = defaults.project, msgid = name,
- name = name, content = '',
- submitter = Person.objects.get(user = self.user))
+ patch = Patch(project=defaults.project, msgid=name,
+ name=name, content='',
+ submitter=Person.objects.get(user=self.user))
patch.save()
self.patches.append(patch)
self._selectAllPatches(data)
response = self.client.post(self.url, data)
self.assertContains(response, 'No patches to display',
- status_code = 200)
- for patch in [Patch.objects.get(pk = p.pk) for p in self.patches]:
+ status_code=200)
+ for patch in [Patch.objects.get(pk=p.pk) for p in self.patches]:
self.assertTrue(patch.archived)
def testUnArchivingPatches(self):
self._selectAllPatches(data)
response = self.client.post(self.url, data)
self.assertContains(response, self.properties_form_id,
- status_code = 200)
- for patch in [Patch.objects.get(pk = p.pk) for p in self.patches]:
+ status_code=200)
+ for patch in [Patch.objects.get(pk=p.pk) for p in self.patches]:
self.assertFalse(patch.archived)
def _testStateChange(self, state):
self._selectAllPatches(data)
response = self.client.post(self.url, data)
self.assertContains(response, self.properties_form_id,
- status_code = 200)
+ status_code=200)
return response
def testStateChangeValid(self):
states = [patch.state.pk for patch in self.patches]
- state = State.objects.exclude(pk__in = states)[0]
+ state = State.objects.exclude(pk__in=states)[0]
self._testStateChange(state.pk)
for p in self.patches:
- self.assertEqual(Patch.objects.get(pk = p.pk).state, state)
+ self.assertEqual(Patch.objects.get(pk=p.pk).state, state)
def testStateChangeInvalid(self):
- state = max(State.objects.all().values_list('id', flat = True)) + 1
+ state = max(State.objects.all().values_list('id', flat=True)) + 1
orig_states = [patch.state for patch in self.patches]
response = self._testStateChange(state)
- self.assertEqual( \
- [Patch.objects.get(pk = p.pk).state for p in self.patches],
- orig_states)
+ self.assertEqual(
+ [Patch.objects.get(pk=p.pk).state for p in self.patches],
+ orig_states)
self.assertFormError(response, 'patchform', 'state',
- 'Select a valid choice. That choice is not one ' + \
- 'of the available choices.')
+ 'Select a valid choice. That choice is not one ' +
+ 'of the available choices.')
def _testDelegateChange(self, delegate_str):
data = self.base_data.copy()
delegate = create_maintainer(defaults.project)
response = self._testDelegateChange(str(delegate.pk))
for p in self.patches:
- self.assertEqual(Patch.objects.get(pk = p.pk).delegate, delegate)
+ self.assertEqual(Patch.objects.get(pk=p.pk).delegate, delegate)
def testDelegateClear(self):
response = self._testDelegateChange('')
for p in self.patches:
- self.assertEqual(Patch.objects.get(pk = p.pk).delegate, None)
+ self.assertEqual(Patch.objects.get(pk=p.pk).delegate, None)
def _confirmation_url(conf):
- return reverse('patchwork.views.confirm', kwargs = {'key': conf.key})
+ return reverse('patchwork.views.confirm', kwargs={'key': conf.key})
+
class TestUser(object):
class UserPersonRequestTest(TestCase):
+
def setUp(self):
self.user = TestUser()
- self.client.login(username = self.user.username,
- password = self.user.password)
+ self.client.login(username=self.user.username,
+ password=self.user.password)
EmailConfirmation.objects.all().delete()
def testUserPersonRequestForm(self):
self.assertEqual(response.status_code, 200)
self.assertTrue(response.context['linkform'])
self.assertFormError(response, 'linkform', 'email',
- 'This field is required.')
+ 'This field is required.')
def testUserPersonRequestInvalid(self):
response = self.client.post('/user/link/', {'email': 'foo'})
self.assertEqual(response.status_code, 200)
self.assertTrue(response.context['linkform'])
self.assertFormError(response, 'linkform', 'email',
- error_strings['email'])
+ error_strings['email'])
def testUserPersonRequestValid(self):
response = self.client.post('/user/link/',
- {'email': self.user.secondary_email})
+ {'email': self.user.secondary_email})
self.assertEqual(response.status_code, 200)
self.assertTrue(response.context['confirmation'])
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, 'patchwork/user-link-confirm.html')
+
class UserPersonConfirmTest(TestCase):
+
def setUp(self):
EmailConfirmation.objects.all().delete()
Person.objects.all().delete()
self.user = TestUser()
- self.client.login(username = self.user.username,
- password = self.user.password)
- self.conf = EmailConfirmation(type = 'userperson',
- email = self.user.secondary_email,
- user = self.user.user)
+ self.client.login(username=self.user.username,
+ password=self.user.password)
+ self.conf = EmailConfirmation(type='userperson',
+ email=self.user.secondary_email,
+ user=self.user.user)
self.conf.save()
def testUserPersonConfirm(self):
# check that the Person object has been created and linked
self.assertEqual(Person.objects.count(), 1)
- person = Person.objects.get(email = self.user.secondary_email)
+ person = Person.objects.get(email=self.user.secondary_email)
self.assertEqual(person.email, self.user.secondary_email)
self.assertEqual(person.user, self.user.user)
# check that the confirmation has been marked as inactive. We
# need to reload the confirmation to check this.
- conf = EmailConfirmation.objects.get(pk = self.conf.pk)
+ conf = EmailConfirmation.objects.get(pk=self.conf.pk)
self.assertEqual(conf.active, False)
+
class UserLoginRedirectTest(TestCase):
def testUserLoginRedirect(self):
response = self.client.get(url)
self.assertRedirects(response, settings.LOGIN_URL + '?next=' + url)
+
class UserProfileTest(TestCase):
def setUp(self):
self.user = TestUser()
- self.client.login(username = self.user.username,
- password = self.user.password)
+ self.client.login(username=self.user.username,
+ password=self.user.password)
def testUserProfile(self):
response = self.client.get('/user/')
project = defaults.project
project.save()
- bundle = Bundle(project = project, name = 'test-1',
- owner = self.user.user)
+ bundle = Bundle(project=project, name='test-1',
+ owner=self.user.user)
bundle.save()
response = self.client.get('/user/')
def testPasswordChangeForm(self):
self.user = TestUser()
- self.client.login(username = self.user.username,
- password = self.user.password)
+ self.client.login(username=self.user.username,
+ password=self.user.password)
response = self.client.get(self.form_url)
self.assertContains(response, 'Change my password')
def testPasswordChange(self):
self.user = TestUser()
- self.client.login(username = self.user.username,
- password = self.user.password)
+ self.client.login(username=self.user.username,
+ password=self.user.password)
old_password = self.user.password
new_password = User.objects.make_random_password()
response = self.client.post(self.form_url, data)
self.assertRedirects(response, self.done_url)
- user = User.objects.get(id = self.user.user.id)
+ user = User.objects.get(id=self.user.user.id)
self.assertFalse(user.check_password(old_password))
self.assertTrue(user.check_password(new_password))
response = self.client.get(self.done_url)
self.assertContains(response,
- "Your password has been changed sucessfully")
+ "Your password has been changed sucessfully")
+
class UserUnlinkTest(TestCase):
+
def setUp(self):
self.form_url = '/user/unlink/{pid}/'
self.done_url = '/user/'
class LoginTestCase(SeleniumTestCase):
+
def setUp(self):
super(LoginTestCase, self).setUp()
self.user = TestUser()
@unittest.skipUnless(settings.ENABLE_XMLRPC,
- "requires xmlrpc interface (use the ENABLE_XMLRPC setting)")
+ "requires xmlrpc interface (use the ENABLE_XMLRPC setting)")
class XMLRPCTest(LiveServerTestCase):
fixtures = ['default_states']
def testGetRedirect(self):
response = self.client.patch(self.url)
self.assertRedirects(response,
- reverse('patchwork.views.help',
- kwargs = {'path': 'pwclient/'}))
+ reverse('patchwork.views.help',
+ kwargs={'path': 'pwclient/'}))
def testList(self):
defaults.project.save()
defaults.patch_author_person.save()
- patch = Patch(project = defaults.project,
- submitter = defaults.patch_author_person,
- msgid = defaults.patch_name,
- content = defaults.patch)
+ patch = Patch(project=defaults.project,
+ submitter=defaults.patch_author_person,
+ msgid=defaults.patch_name,
+ content=defaults.patch)
patch.save()
patches = self.rpc.patch_list()
# helper functions for tests
-_test_mail_dir = os.path.join(os.path.dirname(__file__), 'mail')
+_test_mail_dir = os.path.join(os.path.dirname(__file__), 'mail')
_test_patch_dir = os.path.join(os.path.dirname(__file__), 'patches')
+
class defaults(object):
- project = Project(linkname = 'test-project', name = 'Test Project',
- listid = 'test.example.com')
+ project = Project(linkname='test-project', name='Test Project',
+ listid='test.example.com')
patch_author = 'Patch Author <patch-author@example.com>'
- patch_author_person = Person(name = 'Patch Author',
- email = 'patch-author@example.com')
+ patch_author_person = Person(name='Patch Author',
+ email='patch-author@example.com')
comment_author = 'Comment Author <comment-author@example.com>'
}
_user_idx = 1
+
+
def create_user():
global _user_idx
userid = 'test%d' % _user_idx
user = User.objects.create_user(userid, email, userid)
user.save()
- person = Person(email = email, name = userid, user = user)
+ person = Person(email=email, name=userid, user=user)
person.save()
return user
+
def create_maintainer(project):
user = create_user()
profile = user.profile
profile.save()
return user
+
def find_in_context(context, key):
if isinstance(context, list):
for c in context:
return context[key]
return None
-def read_patch(filename, encoding = None):
+
+def read_patch(filename, encoding=None):
file_path = os.path.join(_test_patch_dir, filename)
if encoding is not None:
- f = codecs.open(file_path, encoding = encoding)
+ f = codecs.open(file_path, encoding=encoding)
else:
f = open(file_path)
return f.read()
-def read_mail(filename, project = None):
+
+def read_mail(filename, project=None):
file_path = os.path.join(_test_mail_dir, filename)
mail = message_from_file(open(file_path))
if 'Message-Id' not in mail:
mail['List-Id'] = project.listid
return mail
-def create_email(content, subject = None, sender = None, multipart = False,
- project = None, content_encoding = None):
+
+def create_email(content, subject=None, sender=None, multipart=False,
+ project=None, content_encoding=None):
if subject is None:
subject = defaults.subject
if sender is None:
if multipart:
msg = MIMEMultipart()
- body = MIMEText(content, _subtype = 'plain',
- _charset = content_encoding)
+ body = MIMEText(content, _subtype='plain',
+ _charset=content_encoding)
msg.attach(body)
else:
- msg = MIMEText(content, _charset = content_encoding)
+ msg = MIMEText(content, _charset=content_encoding)
msg['Message-Id'] = make_msgid()
msg['Subject'] = subject
admin.autodiscover()
-urlpatterns = patterns('',
+urlpatterns = patterns(
+ '',
url(r'^admin/', include(admin.site.urls)),
(r'^$', 'patchwork.views.projects'),
# password change
url(r'^user/password-change/$', auth_views.password_change,
- name='password_change'),
+ name='password_change'),
url(r'^user/password-change/done/$', auth_views.password_change_done,
- name='password_change_done'),
+ name='password_change_done'),
url(r'^user/password-reset/$', auth_views.password_reset,
- name='password_reset'),
+ name='password_reset'),
url(r'^user/password-reset/mail-sent/$', auth_views.password_reset_done,
- name='password_reset_done'),
+ name='password_reset_done'),
url(r'^user/password-reset/(?P<uidb64>[0-9A-Za-z_\-]+)/'
- r'(?P<token>[0-9A-Za-z]{1,13}-[0-9A-Za-z]{1,20})/$',
- auth_views.password_reset_confirm,
- name='password_reset_confirm'),
+ r'(?P<token>[0-9A-Za-z]{1,13}-[0-9A-Za-z]{1,20})/$',
+ auth_views.password_reset_confirm,
+ name='password_reset_confirm'),
url(r'^user/password-reset/complete/$',
- auth_views.password_reset_complete,
- name='password_reset_complete'),
+ auth_views.password_reset_complete,
+ name='password_reset_complete'),
# login/logout
url(r'^user/login/$', auth_views.login,
{'template_name': 'patchwork/login.html'},
- name = 'auth_login'),
+ name='auth_login'),
url(r'^user/logout/$', auth_views.logout,
{'template_name': 'patchwork/logout.html'},
- name = 'auth_logout'),
+ name='auth_logout'),
# registration
(r'^register/', 'patchwork.views.user.register'),
# public view for bundles
(r'^bundle/(?P<username>[^/]*)/(?P<bundlename>[^/]*)/$',
- 'patchwork.views.bundle.bundle'),
+ 'patchwork.views.bundle.bundle'),
(r'^bundle/(?P<username>[^/]*)/(?P<bundlename>[^/]*)/mbox/$',
- 'patchwork.views.bundle.mbox'),
+ 'patchwork.views.bundle.mbox'),
(r'^confirm/(?P<key>[0-9a-f]+)/$', 'patchwork.views.confirm'),
)
if settings.ENABLE_XMLRPC:
- urlpatterns += patterns('',
+ urlpatterns += patterns(
+ '',
(r'xmlrpc/$', 'patchwork.views.xmlrpc.xmlrpc'),
(r'^pwclient/$', 'patchwork.views.pwclient'),
(r'^project/(?P<project_id>[^/]+)/pwclientrc/$',
- 'patchwork.views.pwclientrc'),
+ 'patchwork.views.pwclientrc'),
)
# redirect from old urls
if settings.COMPAT_REDIR:
- urlpatterns += patterns('',
+ urlpatterns += patterns(
+ '',
(r'^user/bundle/(?P<bundle_id>[^/]+)/$',
'patchwork.views.bundle.bundle_redir'),
(r'^user/bundle/(?P<bundle_id>[^/]+)/mbox/$',
'patchwork.views.bundle.mbox_redir'),
)
-
PatchChangeNotification, EmailOptout,
EmailConfirmation)
-def get_patch_ids(d, prefix = 'patch_id'):
+
+def get_patch_ids(d, prefix='patch_id'):
ids = []
for (k, v) in d.items():
return ids
+
class Order(object):
order_map = {
'date': 'date',
}
default_order = ('date', True)
- def __init__(self, str = None, editable = False):
+ def __init__(self, str=None, editable=False):
self.reversed = False
self.editable = editable
(self.order, self.reversed) = self.default_order
return qs.order_by(*orders)
bundle_actions = ['create', 'add', 'remove']
+
+
def set_bundle(user, project, action, data, patches, context):
# set up the bundle
bundle = None
if not bundle_name:
return ['No bundle name was specified']
- if Bundle.objects.filter(owner = user, name = bundle_name).count() > 0:
+ if Bundle.objects.filter(owner=user, name=bundle_name).count() > 0:
return ['You already have a bundle called "%s"' % bundle_name]
- bundle = Bundle(owner = user, project = project,
- name = bundle_name)
+ bundle = Bundle(owner=user, project=project,
+ name=bundle_name)
bundle.save()
context.add_message("Bundle %s created" % bundle.name)
- elif action =='add':
- bundle = get_object_or_404(Bundle, id = data['bundle_id'])
+ elif action == 'add':
+ bundle = get_object_or_404(Bundle, id=data['bundle_id'])
- elif action =='remove':
- bundle = get_object_or_404(Bundle, id = data['removed_bundle_id'])
+ elif action == 'remove':
+ bundle = get_object_or_404(Bundle, id=data['removed_bundle_id'])
if not bundle:
return ['no such bundle']
for patch in patches:
if action == 'create' or action == 'add':
- bundlepatch_count = BundlePatch.objects.filter(bundle = bundle,
- patch = patch).count()
+ bundlepatch_count = BundlePatch.objects.filter(bundle=bundle,
+ patch=patch).count()
if bundlepatch_count == 0:
bundle.append_patch(patch)
- context.add_message("Patch '%s' added to bundle %s" % \
- (patch.name, bundle.name))
+ context.add_message("Patch '%s' added to bundle %s" %
+ (patch.name, bundle.name))
else:
- context.add_message("Patch '%s' already in bundle %s" % \
- (patch.name, bundle.name))
+ context.add_message("Patch '%s' already in bundle %s" %
+ (patch.name, bundle.name))
elif action == 'remove':
try:
- bp = BundlePatch.objects.get(bundle = bundle, patch = patch)
+ bp = BundlePatch.objects.get(bundle=bundle, patch=patch)
bp.delete()
- context.add_message("Patch '%s' removed from bundle %s\n" % \
- (patch.name, bundle.name))
+ context.add_message("Patch '%s' removed from bundle %s\n" %
+ (patch.name, bundle.name))
except Exception:
pass
return []
+
def send_notifications():
date_limit = datetime.datetime.now() - \
- datetime.timedelta(minutes =
- settings.NOTIFICATION_DELAY_MINUTES)
+ datetime.timedelta(minutes=settings.NOTIFICATION_DELAY_MINUTES)
# This gets funky: we want to filter out any notifications that should
# be grouped with other notifications that aren't ready to go out yet. To
# Person -> Patch -> max(PCN.last_modified)), filtering out any maxima
# that are with the date_limit.
qs = PatchChangeNotification.objects \
- .annotate(m = Max('patch__submitter__patch__patchchangenotification'
+ .annotate(m=Max('patch__submitter__patch__patchchangenotification'
'__last_modified')) \
- .filter(m__lt = date_limit)
+ .filter(m__lt=date_limit)
groups = itertools.groupby(qs.order_by('patch__submitter'),
lambda n: n.patch.submitter)
for (recipient, notifications) in groups:
notifications = list(notifications)
- projects = set([ n.patch.project.linkname for n in notifications ])
+ projects = set([n.patch.project.linkname for n in notifications])
def delete_notifications():
- pks = [ n.pk for n in notifications ]
- PatchChangeNotification.objects.filter(pk__in = pks).delete()
+ pks = [n.pk for n in notifications]
+ PatchChangeNotification.objects.filter(pk__in=pks).delete()
if EmailOptout.is_optout(recipient.email):
delete_notifications()
}
subject = render_to_string(
- 'patchwork/patch-change-notification-subject.text',
- context).strip()
+ 'patchwork/patch-change-notification-subject.text',
+ context).strip()
content = render_to_string('patchwork/patch-change-notification.mail',
- context)
+ context)
- message = EmailMessage(subject = subject, body = content,
- from_email = settings.NOTIFICATION_FROM_EMAIL,
- to = [recipient.email],
- headers = {'Precedence': 'bulk'})
+ message = EmailMessage(subject=subject, body=content,
+ from_email=settings.NOTIFICATION_FROM_EMAIL,
+ to=[recipient.email],
+ headers={'Precedence': 'bulk'})
try:
message.send()
return errors
+
def do_expiry():
# expire any pending confirmations
- q = (Q(date__lt = datetime.datetime.now() - EmailConfirmation.validity) |
- Q(active = False))
+ q = (Q(date__lt=datetime.datetime.now() - EmailConfirmation.validity) |
+ Q(active=False))
EmailConfirmation.objects.filter(q).delete()
# expire inactive users with no pending confirmation
pending_confs = EmailConfirmation.objects.values('user')
users = User.objects.filter(
- is_active = False,
- last_login = F('date_joined')
- ).exclude(
- id__in = pending_confs
- )
+ is_active=False,
+ last_login=F('date_joined')
+ ).exclude(
+ id__in=pending_confs
+ )
# delete users
users.delete()
-
-
-
from patchwork.forms import MultiplePatchForm
from patchwork.models import Comment
+
def generic_list(request, project, view,
- view_args = {}, filter_settings = [], patches = None,
- editable_order = False):
+ view_args={}, filter_settings=[], patches=None,
+ editable_order=False):
context = PatchworkRequestContext(request,
- list_view = view,
- list_view_params = view_args)
+ list_view=view,
+ list_view_params=view_args)
context.project = project
data = {}
if data and data.get('form', '') == 'patchlistform':
data_tmp = data
- properties_form = MultiplePatchForm(project, data = data_tmp)
+ properties_form = MultiplePatchForm(project, data=data_tmp)
if request.method == 'POST' and data.get('form') == 'patchlistform':
action = data.get('action', '').lower()
if data.get('bundle_name', False):
action = 'create'
- ps = Patch.objects.filter(id__in = get_patch_ids(data))
+ ps = Patch.objects.filter(id__in=get_patch_ids(data))
if action in bundle_actions:
errors = set_bundle(user, project, action, data, ps, context)
paginator = Paginator(request, patches)
context.update({
- 'page': paginator.current_page,
- 'patchform': properties_form,
- 'project': project,
- 'order': order,
- })
+ 'page': paginator.current_page,
+ 'patchform': properties_form,
+ 'project': project,
+ 'order': order,
+ })
return context
for patch in patches:
if not patch.is_editable(user):
errors.append("You don't have permissions to edit patch '%s'"
- % patch.name)
+ % patch.name)
continue
changed_patches += 1
return errors
+
class PatchMbox(MIMENonMultipart):
patch_charset = 'utf-8'
+
def __init__(self, _text):
MIMENonMultipart.__init__(self, 'text', 'plain',
- **{'charset': self.patch_charset})
+ **{'charset': self.patch_charset})
self.set_payload(_text.encode(self.patch_charset))
encode_7or8bit(self)
+
def patch_to_mbox(patch):
postscript_re = re.compile('\n-{2,3} ?\n')
comment = None
try:
- comment = Comment.objects.get(patch = patch, msgid = patch.msgid)
+ comment = Comment.objects.get(patch=patch, msgid=patch.msgid)
except Exception:
pass
else:
postscript = ''
- for comment in Comment.objects.filter(patch = patch) \
- .exclude(msgid = patch.msgid):
+ for comment in Comment.objects.filter(patch=patch) \
+ .exclude(msgid=patch.msgid):
body += comment.patch_responses()
if postscript:
body += '\n' + patch.content
delta = patch.date - datetime.datetime.utcfromtimestamp(0)
- utc_timestamp = delta.seconds + delta.days*24*3600
+ utc_timestamp = delta.seconds + delta.days * 24 * 3600
mail = PatchMbox(body)
mail['Subject'] = patch.name
mail['From'] = email.utils.formataddr((
- str(Header(patch.submitter.name, mail.patch_charset)),
- patch.submitter.email))
+ str(Header(patch.submitter.name, mail.patch_charset)),
+ patch.submitter.email))
mail['X-Patchwork-Id'] = str(patch.id)
mail['Message-Id'] = patch.msgid
mail.set_unixfrom('From patchwork ' + patch.date.ctime())
-
copied_headers = ['To', 'Cc', 'Date']
orig_headers = HeaderParser().parsestr(str(patch.headers))
for header in copied_headers:
from patchwork.models import Patch, Project, Person, EmailConfirmation
from patchwork.requestcontext import PatchworkRequestContext
+
def projects(request):
context = PatchworkRequestContext(request)
projects = Project.objects.all()
if projects.count() == 1:
return HttpResponseRedirect(
- urlresolvers.reverse('patchwork.views.patch.list',
- kwargs = {'project_id': projects[0].linkname}))
+ urlresolvers.reverse('patchwork.views.patch.list',
+ kwargs={'project_id': projects[0].linkname}))
context['projects'] = projects
return render_to_response('patchwork/projects.html', context)
+
def pwclientrc(request, project_id):
- project = get_object_or_404(Project, linkname = project_id)
+ project = get_object_or_404(Project, linkname=project_id)
context = PatchworkRequestContext(request)
context.project = project
if settings.FORCE_HTTPS_LINKS or request.is_secure():
context['scheme'] = 'https'
else:
context['scheme'] = 'http'
- response = HttpResponse(content_type = "text/plain")
+ response = HttpResponse(content_type="text/plain")
response['Content-Disposition'] = 'attachment; filename=.pwclientrc'
response.write(render_to_string('patchwork/pwclientrc', context))
return response
+
def pwclient(request):
context = PatchworkRequestContext(request)
- response = HttpResponse(content_type = "text/x-python")
+ response = HttpResponse(content_type="text/x-python")
response['Content-Disposition'] = 'attachment; filename=pwclient'
response.write(render_to_string('patchwork/pwclient', context))
return response
+
def confirm(request, key):
- import patchwork.views.user, patchwork.views.mail
+ import patchwork.views.user
+ import patchwork.views.mail
views = {
'userperson': patchwork.views.user.link_confirm,
'registration': patchwork.views.user.register_confirm,
'optin': patchwork.views.mail.optin_confirm,
}
- conf = get_object_or_404(EmailConfirmation, key = key)
+ conf = get_object_or_404(EmailConfirmation, key=key)
if conf.type not in views:
raise Http404
return render_to_response('patchwork/confirm-error.html', context)
+
def submitter_complete(request):
search = request.GET.get('q', '')
limit = request.GET.get('l', None)
if len(search) <= 3:
return HttpResponse(content_type="application/json")
- queryset = Person.objects.filter(Q(name__icontains = search) |
- Q(email__icontains = search))
+ queryset = Person.objects.filter(Q(name__icontains=search) |
+ Q(email__icontains=search))
if limit is not None:
try:
limit = int(limit)
limit = None
if limit is not None and limit > 0:
- queryset = queryset[:limit]
+ queryset = queryset[:limit]
data = []
for submitter in queryset:
help_pages = {'': 'index.html',
'about/': 'about.html',
- }
+ }
if settings.ENABLE_XMLRPC:
help_pages['pwclient/'] = 'pwclient.html'
+
def help(request, path):
context = PatchworkRequestContext(request)
if path in help_pages:
return render_to_response('patchwork/help/' + help_pages[path], context)
raise Http404
-
from patchwork.utils import get_patch_ids
from patchwork.views import generic_list, patch_to_mbox
+
@login_required
def setbundle(request):
context = PatchworkRequestContext(request)
pass
elif action == 'create':
project = get_object_or_404(Project,
- id = request.POST.get('project'))
- bundle = Bundle(owner = request.user, project = project,
- name = request.POST['name'])
+ id=request.POST.get('project'))
+ bundle = Bundle(owner=request.user, project=project,
+ name=request.POST['name'])
bundle.save()
patch_id = request.POST.get('patch_id', None)
if patch_id:
- patch = get_object_or_404(Patch, id = patch_id)
+ patch = get_object_or_404(Patch, id=patch_id)
try:
bundle.append_patch(patch)
except Exception:
bundle.save()
elif action == 'add':
bundle = get_object_or_404(Bundle,
- owner = request.user, id = request.POST['id'])
+ owner=request.user, id=request.POST['id'])
bundle.save()
patch_id = request.get('patch_id', None)
for id in patch_ids:
try:
- patch = Patch.objects.get(id = id)
+ patch = Patch.objects.get(id=id)
bundle.append_patch(patch)
except:
pass
bundle.save()
elif action == 'delete':
try:
- bundle = Bundle.objects.get(owner = request.user,
- id = request.POST['id'])
+ bundle = Bundle.objects.get(owner=request.user,
+ id=request.POST['id'])
bundle.delete()
except Exception:
pass
bundle = None
else:
- bundle = get_object_or_404(Bundle, owner = request.user,
- id = request.POST['bundle_id'])
+ bundle = get_object_or_404(Bundle, owner=request.user,
+ id=request.POST['bundle_id'])
if 'error' in context:
pass
if bundle:
return HttpResponseRedirect(
- django.core.urlresolvers.reverse(
- 'patchwork.views.bundle.bundle',
- kwargs = {'bundle_id': bundle.id}
- )
- )
+ django.core.urlresolvers.reverse(
+ 'patchwork.views.bundle.bundle',
+ kwargs={'bundle_id': bundle.id}
+ )
+ )
else:
return HttpResponseRedirect(
- django.core.urlresolvers.reverse(
- 'patchwork.views.bundle.list')
- )
+ django.core.urlresolvers.reverse(
+ 'patchwork.views.bundle.list')
+ )
+
@login_required
def bundles(request):
form = DeleteBundleForm(request.POST)
if form.is_valid():
bundle = get_object_or_404(Bundle,
- id = form.cleaned_data['bundle_id'])
+ id=form.cleaned_data['bundle_id'])
bundle.delete()
- bundles = Bundle.objects.filter(owner = request.user)
+ bundles = Bundle.objects.filter(owner=request.user)
for bundle in bundles:
- bundle.delete_form = DeleteBundleForm(auto_id = False,
- initial = {'bundle_id': bundle.id})
+ bundle.delete_form = DeleteBundleForm(auto_id=False,
+ initial={'bundle_id': bundle.id})
context['bundles'] = bundles
return render_to_response('patchwork/bundles.html', context)
+
def bundle(request, username, bundlename):
- bundle = get_object_or_404(Bundle, owner__username = username,
- name = bundlename)
+ bundle = get_object_or_404(Bundle, owner__username=username,
+ name=bundlename)
filter_settings = [(DelegateFilter, DelegateFilter.AnyDelegate)]
is_owner = request.user == bundle.owner
if action == 'delete':
bundle.delete()
return HttpResponseRedirect(
- django.core.urlresolvers.reverse(
- 'patchwork.views.user.profile')
- )
+ django.core.urlresolvers.reverse(
+ 'patchwork.views.user.profile')
+ )
elif action == 'update':
- form = BundleForm(request.POST, instance = bundle)
+ form = BundleForm(request.POST, instance=bundle)
if form.is_valid():
form.save()
# if we've changed the bundle name, redirect to new URL
- bundle = Bundle.objects.get(pk = bundle.pk)
+ bundle = Bundle.objects.get(pk=bundle.pk)
if bundle.name != bundlename:
return HttpResponseRedirect(bundle.get_absolute_url())
else:
- form = BundleForm(instance = bundle)
+ form = BundleForm(instance=bundle)
else:
- form = BundleForm(instance = bundle)
+ form = BundleForm(instance=bundle)
if request.method == 'POST' and \
- request.POST.get('form') == 'reorderform':
- order = get_object_or_404(BundlePatch, bundle = bundle,
- patch__id = request.POST.get('order_start')).order
+ request.POST.get('form') == 'reorderform':
+ order = get_object_or_404(BundlePatch, bundle=bundle,
+ patch__id=request.POST.get('order_start')).order
for patch_id in request.POST.getlist('neworder'):
bundlepatch = get_object_or_404(BundlePatch,
- bundle = bundle, patch__id = patch_id)
+ bundle=bundle, patch__id=patch_id)
bundlepatch.order = order
bundlepatch.save()
order += 1
form = None
context = generic_list(request, bundle.project,
- 'patchwork.views.bundle.bundle',
- view_args = {'username': bundle.owner.username,
- 'bundlename': bundle.name},
- filter_settings = filter_settings,
- patches = bundle.ordered_patches(),
- editable_order = is_owner)
+ 'patchwork.views.bundle.bundle',
+ view_args={'username': bundle.owner.username,
+ 'bundlename': bundle.name},
+ filter_settings=filter_settings,
+ patches=bundle.ordered_patches(),
+ editable_order=is_owner)
context['bundle'] = bundle
context['bundleform'] = form
return render_to_response('patchwork/bundle.html', context)
+
def mbox(request, username, bundlename):
- bundle = get_object_or_404(Bundle, owner__username = username,
- name = bundlename)
+ bundle = get_object_or_404(Bundle, owner__username=username,
+ name=bundlename)
if not (request.user == bundle.owner or bundle.public):
return HttpResponseNotFound()
mbox = '\n'.join([patch_to_mbox(p).as_string(True)
- for p in bundle.ordered_patches()])
+ for p in bundle.ordered_patches()])
response = HttpResponse(content_type='text/plain')
response['Content-Disposition'] = \
- 'attachment; filename=bundle-%d-%s.mbox' % (bundle.id, bundle.name)
+ 'attachment; filename=bundle-%d-%s.mbox' % (bundle.id, bundle.name)
response.write(mbox)
return response
+
@login_required
def bundle_redir(request, bundle_id):
- bundle = get_object_or_404(Bundle, id = bundle_id, owner = request.user)
+ bundle = get_object_or_404(Bundle, id=bundle_id, owner=request.user)
return HttpResponseRedirect(bundle.get_absolute_url())
+
@login_required
def mbox_redir(request, bundle_id):
- bundle = get_object_or_404(Bundle, id = bundle_id, owner = request.user)
+ bundle = get_object_or_404(Bundle, id=bundle_id, owner=request.user)
return HttpResponseRedirect(django.core.urlresolvers.reverse(
- 'patchwork.views.bundle.mbox', kwargs = {
+ 'patchwork.views.bundle.mbox', kwargs={
'username': request.user.username,
'bundlename': bundle.name,
}))
-
-
-
from patchwork.models import EmailOptout, EmailConfirmation
from patchwork.requestcontext import PatchworkRequestContext
+
def settings(request):
context = PatchworkRequestContext(request)
if request.method == 'POST':
- form = EmailForm(data = request.POST)
+ form = EmailForm(data=request.POST)
if form.is_valid():
email = form.cleaned_data['email']
- is_optout = EmailOptout.objects.filter(email = email).count() > 0
+ is_optout = EmailOptout.objects.filter(email=email).count() > 0
context.update({
'email': email,
'is_optout': is_optout,
context['form'] = form
return render_to_response('patchwork/mail-form.html', context)
+
def optout_confirm(request, conf):
context = PatchworkRequestContext(request)
email = conf.email.strip().lower()
# silently ignore duplicated optouts
- if EmailOptout.objects.filter(email = email).count() == 0:
- optout = EmailOptout(email = email)
+ if EmailOptout.objects.filter(email=email).count() == 0:
+ optout = EmailOptout(email=email)
optout.save()
conf.deactivate()
return render_to_response('patchwork/optout.html', context)
+
def optin_confirm(request, conf):
context = PatchworkRequestContext(request)
email = conf.email.strip().lower()
- EmailOptout.objects.filter(email = email).delete()
+ EmailOptout.objects.filter(email=email).delete()
conf.deactivate()
context['email'] = conf.email
return render_to_response('patchwork/optin.html', context)
+
def optinout(request, action, description):
context = PatchworkRequestContext(request)
if request.method != 'POST':
return HttpResponseRedirect(reverse(settings))
- form = OptinoutRequestForm(data = request.POST)
+ form = OptinoutRequestForm(data=request.POST)
if not form.is_valid():
context['error'] = ('There was an error in the %s form. ' +
- 'Please review the form and re-submit.') % \
- description
+ 'Please review the form and re-submit.') % \
+ description
context['form'] = form
return render_to_response(html_template, context)
email = form.cleaned_data['email']
if action == 'optin' and \
- EmailOptout.objects.filter(email = email).count() == 0:
+ EmailOptout.objects.filter(email=email).count() == 0:
context['error'] = ('The email address %s is not on the ' +
'patchwork opt-out list, so you don\'t ' +
'need to opt back in') % email
context['form'] = form
return render_to_response(html_template, context)
- conf = EmailConfirmation(type = action, email = email)
+ conf = EmailConfirmation(type=action, email=email)
conf.save()
context['confirmation'] = conf
mail = render_to_string(mail_template, context)
try:
send_mail('Patchwork %s confirmation' % description, mail,
- conf_settings.DEFAULT_FROM_EMAIL, [email])
+ conf_settings.DEFAULT_FROM_EMAIL, [email])
context['email'] = mail
context['email_sent'] = True
except Exception as ex:
return render_to_response(html_template, context)
+
def optout(request):
return optinout(request, 'optout', 'opt-out')
+
def optin(request):
return optinout(request, 'optin', 'opt-in')
from patchwork.requestcontext import PatchworkRequestContext
from patchwork.views import generic_list, patch_to_mbox
+
def patch(request, patch_id):
context = PatchworkRequestContext(request)
patch = get_object_or_404(Patch, id=patch_id)
createbundleform = None
if editable:
- form = PatchForm(instance = patch)
+ form = PatchForm(instance=patch)
if request.user.is_authenticated():
createbundleform = CreateBundleForm()
action = action.lower()
if action == 'createbundle':
- bundle = Bundle(owner = request.user, project = patch.project)
- createbundleform = CreateBundleForm(instance = bundle,
- data = request.POST)
+ bundle = Bundle(owner=request.user, project=patch.project)
+ createbundleform = CreateBundleForm(instance=bundle,
+ data=request.POST)
if createbundleform.is_valid():
createbundleform.save()
bundle.append_patch(patch)
context.add_message('Bundle %s created' % bundle.name)
elif action == 'addtobundle':
- bundle = get_object_or_404(Bundle, id = \
- request.POST.get('bundle_id'))
+ bundle = get_object_or_404(
+ Bundle, id=request.POST.get('bundle_id'))
try:
bundle.append_patch(patch)
bundle.save()
context.add_message('Patch added to bundle "%s"' % bundle.name)
except Exception as ex:
- context.add_message("Couldn't add patch '%s' to bundle %s: %s" \
- % (patch.name, bundle.name, ex.message))
+ context.add_message("Couldn't add patch '%s' to bundle %s: %s"
+ % (patch.name, bundle.name, ex.message))
# all other actions require edit privs
elif not editable:
return HttpResponseForbidden()
elif action is None:
- form = PatchForm(data = request.POST, instance = patch)
+ form = PatchForm(data=request.POST, instance=patch)
if form.is_valid():
form.save()
context.add_message('Patch updated')
return render_to_response('patchwork/patch.html', context)
+
def content(request, patch_id):
patch = get_object_or_404(Patch, id=patch_id)
response = HttpResponse(content_type="text/x-patch")
patch.filename().replace(';', '').replace('\n', '')
return response
+
def mbox(request, patch_id):
patch = get_object_or_404(Patch, id=patch_id)
response = HttpResponse(content_type="text/plain")
def list(request, project_id):
project = get_object_or_404(Project, linkname=project_id)
context = generic_list(request, project, 'patchwork.views.patch.list',
- view_args = {'project_id': project.linkname})
+ view_args={'project_id': project.linkname})
return render_to_response('patchwork/list.html', context)
from patchwork.models import Patch, Project
from patchwork.requestcontext import PatchworkRequestContext
+
def project(request, project_id):
context = PatchworkRequestContext(request)
- project = get_object_or_404(Project, linkname = project_id)
+ project = get_object_or_404(Project, linkname=project_id)
context.project = project
- context['maintainers'] = User.objects.filter( \
- profile__maintainer_projects = project)
- context['n_patches'] = Patch.objects.filter(project = project,
- archived = False).count()
- context['n_archived_patches'] = Patch.objects.filter(project = project,
- archived = True).count()
+ context['maintainers'] = User.objects.filter(
+ profile__maintainer_projects=project)
+ context['n_patches'] = Patch.objects.filter(project=project,
+ archived=False).count()
+ context['n_archived_patches'] = Patch.objects.filter(project=project,
+ archived=True).count()
return render_to_response('patchwork/project.html', context)
from patchwork.requestcontext import PatchworkRequestContext
from patchwork.views import generic_list
+
def register(request):
context = PatchworkRequestContext(request)
if request.method == 'POST':
user = auth.models.User.objects.create_user(data['username'],
data['email'],
data['password'])
- user.is_active = False;
+ user.is_active = False
user.first_name = data.get('first_name', '')
user.last_name = data.get('last_name', '')
user.save()
# create confirmation
- conf = EmailConfirmation(type = 'registration', user = user,
- email = user.email)
+ conf = EmailConfirmation(type='registration', user=user,
+ email=user.email)
conf.save()
# send email
'confirmation': conf}
subject = render_to_string('patchwork/activation_email_subject.txt',
- mail_ctx).replace('\n', ' ').strip()
-
+ mail_ctx).replace('\n', ' ').strip()
+
message = render_to_string('patchwork/activation_email.txt',
- mail_ctx)
-
+ mail_ctx)
+
send_mail(subject, message, settings.DEFAULT_FROM_EMAIL,
- [conf.email])
+ [conf.email])
# setting 'confirmation' in the template indicates success
context['confirmation'] = conf
else:
form = RegistrationForm()
-
+
return render_to_response('patchwork/registration_form.html',
- { 'form': form },
+ {'form': form},
context_instance=context)
+
def register_confirm(request, conf):
conf.user.is_active = True
conf.user.save()
conf.deactivate()
try:
- person = Person.objects.get(email__iexact = conf.user.email)
+ person = Person.objects.get(email__iexact=conf.user.email)
except Person.DoesNotExist:
- person = Person(email = conf.user.email,
- name = conf.user.profile.name())
+ person = Person(email=conf.user.email,
+ name=conf.user.profile.name())
person.user = conf.user
person.save()
return render_to_response('patchwork/registration-confirm.html')
+
@login_required
def profile(request):
context = PatchworkRequestContext(request)
if request.method == 'POST':
- form = UserProfileForm(instance = request.user.profile,
- data = request.POST)
+ form = UserProfileForm(instance=request.user.profile,
+ data=request.POST)
if form.is_valid():
form.save()
else:
- form = UserProfileForm(instance = request.user.profile)
+ form = UserProfileForm(instance=request.user.profile)
context.project = request.user.profile.primary_project
- context['bundles'] = Bundle.objects.filter(owner = request.user)
+ context['bundles'] = Bundle.objects.filter(owner=request.user)
context['profileform'] = form
optout_query = '%s.%s IN (SELECT %s FROM %s)' % (
- Person._meta.db_table,
- Person._meta.get_field('email').column,
- EmailOptout._meta.get_field('email').column,
- EmailOptout._meta.db_table)
- people = Person.objects.filter(user = request.user) \
- .extra(select = {'is_optout': optout_query})
+ Person._meta.db_table,
+ Person._meta.get_field('email').column,
+ EmailOptout._meta.get_field('email').column,
+ EmailOptout._meta.db_table)
+ people = Person.objects.filter(user=request.user) \
+ .extra(select={'is_optout': optout_query})
context['linked_emails'] = people
context['linkform'] = UserPersonLinkForm()
return render_to_response('patchwork/profile.html', context)
+
@login_required
def link(request):
context = PatchworkRequestContext(request)
if request.method == 'POST':
form = UserPersonLinkForm(request.POST)
if form.is_valid():
- conf = EmailConfirmation(type = 'userperson',
- user = request.user,
- email = form.cleaned_data['email'])
+ conf = EmailConfirmation(type='userperson',
+ user=request.user,
+ email=form.cleaned_data['email'])
conf.save()
context['confirmation'] = conf
try:
send_mail('Patchwork email address confirmation',
- render_to_string('patchwork/user-link.mail',
- context),
- settings.DEFAULT_FROM_EMAIL,
- [form.cleaned_data['email']])
+ render_to_string('patchwork/user-link.mail',
+ context),
+ settings.DEFAULT_FROM_EMAIL,
+ [form.cleaned_data['email']])
except Exception:
context['confirmation'] = None
context['error'] = 'An error occurred during confirmation. ' + \
return render_to_response('patchwork/user-link.html', context)
+
@login_required
def link_confirm(request, conf):
context = PatchworkRequestContext(request)
try:
- person = Person.objects.get(email__iexact = conf.email)
+ person = Person.objects.get(email__iexact=conf.email)
except Person.DoesNotExist:
- person = Person(email = conf.email)
+ person = Person(email=conf.email)
person.link_to_user(conf.user)
person.save()
return render_to_response('patchwork/user-link-confirm.html', context)
+
@login_required
def unlink(request, person_id):
- person = get_object_or_404(Person, id = person_id)
+ person = get_object_or_404(Person, id=person_id)
if request.method == 'POST':
if person.email != request.user.email:
todo_lists = []
for project in Project.objects.all():
- patches = request.user.profile.todo_patches(project = project)
+ patches = request.user.profile.todo_patches(project=project)
if not patches.count():
continue
context.project = request.user.profile.primary_project
return render_to_response('patchwork/todo-lists.html', context)
+
@login_required
def todo_list(request, project_id):
- project = get_object_or_404(Project, linkname = project_id)
- patches = request.user.profile.todo_patches(project = project)
+ project = get_object_or_404(Project, linkname=project_id)
+ patches = request.user.profile.todo_patches(project=project)
filter_settings = [(DelegateFilter,
- {'delegate': request.user})]
+ {'delegate': request.user})]
context = generic_list(request, project,
- 'patchwork.views.user.todo_list',
- view_args = {'project_id': project.linkname},
- filter_settings = filter_settings,
- patches = patches)
+ 'patchwork.views.user.todo_list',
+ view_args={'project_id': project.linkname},
+ filter_settings=filter_settings,
+ patches=patches)
context['action_required_states'] = \
- State.objects.filter(action_required = True).all()
+ State.objects.filter(action_required=True).all()
return render_to_response('patchwork/todo-list.html', context)
except:
# report exception back to server
response = self.dumps(
- six.moves.xmlrpc_client.Fault(1, '%s:%s' % (sys.exc_info()[0], sys.exc_info()[1])),
+ six.moves.xmlrpc_client.Fault(
+ 1, '%s:%s' % (sys.exc_info()[0], sys.exc_info()[1])),
)
return response
'context': obj.context,
}
+
def patch_check_to_dict(obj):
"""Return a combined patch check."""
state_names = dict(Check.STATE_CHOICES)