from email.header import make_header
from email.utils import mktime_tz
from email.utils import parsedate_tz
+from email.errors import HeaderParseError
from fnmatch import fnmatch
import logging
import re
then encode the result with make_header()
"""
+ try:
+ value = decode_header(header_contents)
+ except HeaderParseError:
+ # something has gone really wrong with header parsing.
+ # (e.g. base64 decoding) We probably can't recover, so:
+ return None
+
# We have some Py2/Py3 issues here.
#
# Firstly, the email parser (before we get here)
# We solve this by catching any Unicode errors, and then manually
# handling any interesting headers.
- value = decode_header(header_contents)
try:
header = make_header(value,
header_name=header_name,
sane_header = sanitise_header(header)
+ if sane_header is None:
+ return None
+
# on Py2, we want to do unicode(), on Py3, str().
# That gets us the decoded, un-wrapped header.
if six.PY2:
for header in list_id_headers:
if header in mail:
+ h = clean_header(mail.get(header))
+ if not h:
+ continue
for listid_re in listid_res:
- match = listid_re.match(clean_header(mail.get(header)))
+ match = listid_re.match(h)
if match:
break
Returns:
The matching ``Series`` instance, if any
"""
- for ref in [clean_header(mail.get('Message-Id'))] + find_references(mail):
+ refs = find_references(mail)
+ h = clean_header(mail.get('Message-Id'))
+ if h:
+ refs = [h] + refs
+ for ref in refs:
try:
return SeriesReference.objects.get(
msgid=ref, series__project=project).series
def find_author(mail):
from_header = clean_header(mail.get('From'))
+
+ if not from_header:
+ raise ValueError("Invalid 'From' header")
+
name, email = (None, None)
# tuple of (regex, fn)
def find_date(mail):
- t = parsedate_tz(clean_header(mail.get('Date', '')))
+ h = clean_header(mail.get('Date', ''))
+ if not h:
+ return datetime.datetime.utcnow()
+ t = parsedate_tz(h)
if not t:
return datetime.datetime.utcnow()
return datetime.datetime.utcfromtimestamp(mktime_tz(t))
for key, value in mail.items()]
strings = [('%s: %s' % (key, header.encode()))
- for (key, header) in headers]
+ for (key, header) in headers if header is not None]
return '\n'.join(strings)
if 'In-Reply-To' in mail:
for in_reply_to in mail.get_all('In-Reply-To'):
- refs.append(clean_header(in_reply_to).strip())
+ r = clean_header(in_reply_to)
+ if r:
+ refs.append(r)
if 'References' in mail:
for references_header in mail.get_all('References'):
- references = clean_header(references_header).split()
+ h = clean_header(references_header)
+ if not h:
+ continue
+ references = h.split()
references.reverse()
for ref in references:
ref = ref.strip()
prefix_re = re.compile(r'^\[([^\]]*)\]\s*(.*)$')
subject = clean_header(subject)
+ if not subject:
+ raise ValueError("Invalid 'Subject' header")
+
if drop_prefixes is None:
drop_prefixes = []
else:
match = prefix_re.match(subject)
subject = normalise_space(subject)
-
- subject = subject.strip()
if prefixes:
subject = '[%s] %s' % (','.join(prefixes), subject)
"""Determine if a mail is a reply."""
comment_re = re.compile(r'^(re)[:\s]\s*', re.I)
- return comment_re.match(clean_header(subject))
+ h = clean_header(subject)
+ if not h:
+ return False
+
+ return comment_re.match(h)
def clean_content(content):
def find_state(mail):
"""Return the state with the given name or the default."""
- state_name = clean_header(mail.get('X-Patchwork-State', '')).strip()
+ state_name = clean_header(mail.get('X-Patchwork-State', ''))
if state_name:
try:
return State.objects.get(name__iexact=state_name)
def find_delegate_by_header(mail):
"""Return the delegate with the given email or None."""
- delegate_email = clean_header(mail.get('X-Patchwork-Delegate', '')).strip()
+ delegate_email = clean_header(mail.get('X-Patchwork-Delegate', ''))
if delegate_email:
try:
return User.objects.get(email__iexact=delegate_email)
except User.DoesNotExist:
pass
+
return None
if 'Message-Id' not in mail:
raise ValueError("Missing 'Message-Id' header")
- hint = clean_header(mail.get('X-Patchwork-Hint', '')).lower()
- if hint == 'ignore':
+ hint = clean_header(mail.get('X-Patchwork-Hint', ''))
+ if hint and hint.lower() == 'ignore':
logger.debug("Ignoring email due to 'ignore' hint")
return
# parse metadata
- msgid = clean_header(mail.get('Message-Id')).strip()
+ msgid = clean_header(mail.get('Message-Id'))
+ if not msgid:
+ raise ValueError("Broken 'Message-Id' header")
+
author = find_author(mail)
subject = mail.get('Subject')
name, prefixes = clean_subject(subject, [project.linkname])