# along with Patchwork; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+import argparse
import codecs
import datetime
from email import message_from_file
return normalise_space(u' '.join(fragments))
-def find_project(mail):
+def find_project_by_id(list_id):
+ """Find a `project` object with given `list_id`."""
+ project = None
+ try:
+ project = Project.objects.get(listid=list_id)
+ except Project.DoesNotExist:
+ pass
+ return project
+
+
+def find_project_by_header(mail):
project = None
listid_res = [re.compile(r'.*<([^>]+)>.*', re.S),
re.compile(r'^([\S]+)$', re.S)]
listid = match.group(1)
- try:
- project = Project.objects.get(listid=listid)
+ project = find_project_by_id(listid)
+ if project:
break
- except Project.DoesNotExist:
- pass
return project
-
def find_author(mail):
from_header = clean_header(mail.get('From'))
return None
-def parse_mail(mail):
- """Parse a mail and add to the database."""
+def parse_mail(mail, list_id=None):
+ """Parse a mail and add to the database.
+
+ Args:
+ mail (`mbox.Mail`): Mail to parse and add.
+ list_id (str): Mailing list ID
+
+ Returns:
+ None
+ """
# some basic sanity checks
if 'From' not in mail:
return 0
if hint == 'ignore':
return 0
- project = find_project(mail)
+ if list_id:
+ project = find_project_by_id(list_id)
+ else:
+ project = find_project_by_header(mail)
if project is None:
print("no project found")
def main(args):
django.setup()
logger = setup_error_handler()
- mail = message_from_file(sys.stdin)
+ parser = argparse.ArgumentParser()
+
+ parser.add_argument('infile', nargs='?', type=argparse.FileType('r'),
+ default=sys.stdin, help='input mbox file (a filename '
+ 'or stdin)')
+
+ group = parser.add_argument_group('Mail parsing configuration')
+ group.add_argument('--list-id', help='mailing list ID. If not supplied '
+ 'this will be extracted from the mail headers.')
+
+ args = vars(parser.parse_args())
+
+ mail = message_from_file(args['infile'])
try:
- return parse_mail(mail)
+ return parse_mail(mail, args['list_id'])
except:
if logger:
logger.exception('Error when parsing incoming email', extra={
'mail': mail.as_string(),
})
raise
-
+ return parse_mail(mail, args['list_id'])
if __name__ == '__main__':
sys.exit(main(sys.argv))
default_subject = defaults.subject
project = defaults.project
-from patchwork.bin.parsemail import find_content, find_author, find_project, \
- parse_mail, split_prefixes, clean_subject
+from patchwork.bin.parsemail import (find_content, find_author,
+ find_project_by_header, parse_mail,
+ split_prefixes, clean_subject)
class InlinePatchTest(PatchTest):
patch_filename = '0001-add-line.patch'
def testNoListId(self):
email = MIMEText('')
- project = find_project(email)
+ project = find_project_by_header(email)
self.assertEquals(project, None)
def testBlankListId(self):
email = MIMEText('')
email['List-Id'] = ''
- project = find_project(email)
+ project = find_project_by_header(email)
self.assertEquals(project, None)
def testWhitespaceListId(self):
email = MIMEText('')
email['List-Id'] = ' '
- project = find_project(email)
+ project = find_project_by_header(email)
self.assertEquals(project, None)
def testSubstringListId(self):
email = MIMEText('')
email['List-Id'] = 'example.com'
- project = find_project(email)
+ project = find_project_by_header(email)
self.assertEquals(project, None)
def testShortListId(self):
is only the list ID itself (without enclosing angle-brackets). """
email = MIMEText('')
email['List-Id'] = self.project.listid
- project = find_project(email)
+ project = find_project_by_header(email)
self.assertEquals(project, self.project)
def testLongListId(self):
email = MIMEText('')
email['List-Id'] = 'Test text <%s>' % self.project.listid
- project = find_project(email)
+ project = find_project_by_header(email)
self.assertEquals(project, self.project)
def tearDown(self):