import email.charset
import email.message
import email.errors
+import sys
from email import quoprimime
class ContentManager:
def _encode_text(string, charset, cte, policy):
+ # If max_line_length is 0 or None, there is no limit.
+ maxlen = policy.max_line_length or sys.maxsize
lines = string.encode(charset).splitlines()
linesep = policy.linesep.encode('ascii')
def embedded_body(lines): return linesep.join(lines) + linesep
def normal_body(lines): return b'\n'.join(lines) + b'\n'
if cte is None:
# Use heuristics to decide on the "best" encoding.
- if max((len(x) for x in lines), default=0) <= policy.max_line_length:
+ if max(map(len, lines), default=0) <= maxlen:
try:
return '7bit', normal_body(lines).decode('ascii')
except UnicodeDecodeError:
if policy.cte_type == '8bit':
return '8bit', normal_body(lines).decode('ascii', 'surrogateescape')
sniff = embedded_body(lines[:10])
- sniff_qp = quoprimime.body_encode(sniff.decode('latin-1'),
- policy.max_line_length)
+ sniff_qp = quoprimime.body_encode(sniff.decode('latin-1'), maxlen)
sniff_base64 = binascii.b2a_base64(sniff)
# This is a little unfair to qp; it includes lineseps, base64 doesn't.
if len(sniff_qp) > len(sniff_base64):
data = normal_body(lines).decode('ascii', 'surrogateescape')
elif cte == 'quoted-printable':
data = quoprimime.body_encode(normal_body(lines).decode('latin-1'),
- policy.max_line_length)
+ maxlen)
elif cte == 'base64':
- data = _encode_base64(embedded_body(lines), policy.max_line_length)
+ data = _encode_base64(embedded_body(lines), maxlen)
else:
raise ValueError("Unknown content transfer encoding {}".format(cte))
return cte, data
parsed_msg = message_from_bytes(m.as_bytes(), policy=policy.default)
self.assertEqual(parsed_msg['Message-ID'], m['Message-ID'])
+ def test_no_wrapping_max_line_length(self):
+ # Test that falsey 'max_line_length' are converted to sys.maxsize.
+ for n in [0, None]:
+ with self.subTest(max_line_length=n):
+ self.do_test_no_wrapping_max_line_length(n)
+
+ def do_test_no_wrapping_max_line_length(self, falsey):
+ self.assertFalse(falsey)
+ pol = policy.default.clone(max_line_length=falsey)
+ subj = "S" * 100
+ body = "B" * 100
+ msg = EmailMessage(policy=pol)
+ msg["From"] = "a@ex.com"
+ msg["To"] = "b@ex.com"
+ msg["Subject"] = subj
+ msg.set_content(body)
+
+ raw = msg.as_bytes()
+ self.assertNotIn(b"=\n", raw,
+ "Found fold indicator; wrapping not disabled")
+
+ parsed = message_from_bytes(raw, policy=policy.default)
+ self.assertEqual(parsed["Subject"], subj)
+ parsed_body = parsed.get_body().get_content().rstrip('\n')
+ self.assertEqual(parsed_body, body)
+
def test_invalid_header_names(self):
invalid_headers = [
('Invalid Header', 'contains space'),