.. changes for IMAP4_SSL by Tino Lange <Tino.Lange@isg.de>, March 2002
.. changes for IMAP4_stream by Piers Lauder <piers@communitysolutions.com.au>,
November 2002
+.. changes for IMAP4 IDLE by Forest <forestix@nom.one>, August 2024
**Source code:** :source:`Lib/imaplib.py`
you want to avoid having an argument string quoted (eg: the *flags* argument to
``STORE``) then enclose the string in parentheses (eg: ``r'(\Deleted)'``).
-Each command returns a tuple: ``(type, [data, ...])`` where *type* is usually
+Most commands return a tuple: ``(type, [data, ...])`` where *type* is usually
``'OK'`` or ``'NO'``, and *data* is either the text from the command response,
or mandated results from the command. Each *data* is either a ``bytes``, or a
tuple. If a tuple, then the first part is the header of the response, and the
of the IMAP4 QUOTA extension defined in rfc2087.
+.. method:: IMAP4.idle(duration=None)
+
+ Return an :class:`!Idler`: an iterable context manager implementing the
+ IMAP4 ``IDLE`` command as defined in :rfc:`2177`.
+
+ The returned object sends the ``IDLE`` command when activated by the
+ :keyword:`with` statement, produces IMAP untagged responses via the
+ :term:`iterator` protocol, and sends ``DONE`` upon context exit.
+
+ All untagged responses that arrive after sending the ``IDLE`` command
+ (including any that arrive before the server acknowledges the command) will
+ be available via iteration. Any leftover responses (those not iterated in
+ the :keyword:`with` context) can be retrieved in the usual way after
+ ``IDLE`` ends, using :meth:`IMAP4.response`.
+
+ Responses are represented as ``(type, [data, ...])`` tuples, as described
+ in :ref:`IMAP4 Objects <imap4-objects>`.
+
+ The *duration* argument sets a maximum duration (in seconds) to keep idling,
+ after which any ongoing iteration will stop. It can be an :class:`int` or
+ :class:`float`, or ``None`` for no time limit.
+ Callers wishing to avoid inactivity timeouts on servers that impose them
+ should keep this at most 29 minutes (1740 seconds).
+ Requires a socket connection; *duration* must be ``None`` on
+ :class:`IMAP4_stream` connections.
+
+ .. code-block:: pycon
+
+ >>> with M.idle(duration=29 * 60) as idler:
+ ... for typ, data in idler:
+ ... print(typ, data)
+ ...
+ EXISTS [b'1']
+ RECENT [b'1']
+
+
+ .. method:: Idler.burst(interval=0.1)
+
+ Yield a burst of responses no more than *interval* seconds apart
+ (expressed as an :class:`int` or :class:`float`).
+
+ This :term:`generator` is an alternative to iterating one response at a
+ time, intended to aid in efficient batch processing. It retrieves the
+ next response along with any immediately available subsequent responses.
+ (For example, a rapid series of ``EXPUNGE`` responses after a bulk
+ delete.)
+
+ Requires a socket connection; does not work on :class:`IMAP4_stream`
+ connections.
+
+ .. code-block:: pycon
+
+ >>> with M.idle() as idler:
+ ... # get a response and any others following by < 0.1 seconds
+ ... batch = list(idler.burst())
+ ... print(f'processing {len(batch)} responses...')
+ ... print(batch)
+ ...
+ processing 3 responses...
+ [('EXPUNGE', [b'2']), ('EXPUNGE', [b'1']), ('RECENT', [b'0'])]
+
+ .. tip::
+
+ The ``IDLE`` context's maximum duration, as passed to
+ :meth:`IMAP4.idle`, is respected when waiting for the first response
+ in a burst. Therefore, an expired :class:`!Idler` will cause this
+ generator to return immediately without producing anything. Callers
+ should consider this if using it in a loop.
+
+
+ .. note::
+
+ The iterator returned by :meth:`IMAP4.idle` is usable only within a
+ :keyword:`with` statement. Before or after that context, unsolicited
+ responses are collected internally whenever a command finishes, and can
+ be retrieved with :meth:`IMAP4.response`.
+
+ .. note::
+
+ The :class:`!Idler` class name and structure are internal interfaces,
+ subject to change. Calling code can rely on its context management,
+ iteration, and public method to remain stable, but should not subclass,
+ instantiate, compare, or otherwise directly reference the class.
+
+ .. versionadded:: next
+
+
.. method:: IMAP4.list([directory[, pattern]])
List mailbox names in *directory* matching *pattern*. *directory* defaults to
# GET/SETQUOTA contributed by Andreas Zeidler <az@kreativkombinat.de> June 2002.
# PROXYAUTH contributed by Rick Holbert <holbert.13@osu.edu> November 2002.
# GET/SETANNOTATION contributed by Tomas Lindroos <skitta@abo.fi> June 2005.
+# IDLE contributed by Forest <forestix@nom.one> August 2024.
-__version__ = "2.58"
+__version__ = "2.59"
import binascii, errno, random, re, socket, subprocess, sys, time, calendar
from datetime import datetime, timezone, timedelta
# search command can be quite large, so we now use 1M.
_MAXLINE = 1000000
-# Data larger than this will be read in chunks, to prevent extreme
-# overallocation.
-_SAFE_BUF_SIZE = 1 << 20
# Commands
'GETANNOTATION':('AUTH', 'SELECTED'),
'GETQUOTA': ('AUTH', 'SELECTED'),
'GETQUOTAROOT': ('AUTH', 'SELECTED'),
+ 'IDLE': ('AUTH', 'SELECTED'),
'MYRIGHTS': ('AUTH', 'SELECTED'),
'LIST': ('AUTH', 'SELECTED'),
'LOGIN': ('NONAUTH',),
class error(Exception): pass # Logical errors - debug required
class abort(error): pass # Service errors - close and retry
class readonly(abort): pass # Mailbox status changed to READ-ONLY
+ class _responsetimeout(TimeoutError): pass # No response during IDLE
def __init__(self, host='', port=IMAP4_PORT, timeout=None):
self.debug = Debug
self.tagged_commands = {} # Tagged commands awaiting response
self.untagged_responses = {} # {typ: [data, ...], ...}
self.continuation_response = '' # Last continuation response
+ self._idle_responses = [] # Response queue for idle iteration
+ self._idle_capture = False # Whether to queue responses for idle
self.is_readonly = False # READ-ONLY desired state
self.tagnum = 0
self._tls_established = False
self._mode_ascii()
+ self._readbuf = []
# Open socket to server.
self.host = host
self.port = port
self.sock = self._create_socket(timeout)
- self.file = self.sock.makefile('rb')
+ self._file = self.sock.makefile('rb')
+
+
+ @property
+ def file(self):
+ # The old 'file' attribute is no longer used now that we do our own
+ # read() and readline() buffering, with which it conflicts.
+ # As an undocumented interface, it should never have been accessed by
+ # external code, and therefore does not warrant deprecation.
+ # Nevertheless, we provide this property for now, to avoid suddenly
+ # breaking any code in the wild that might have been using it in a
+ # harmless way.
+ import warnings
+ warnings.warn(
+ 'IMAP4.file is unsupported, can cause errors, and may be removed.',
+ RuntimeWarning,
+ stacklevel=2)
+ return self._file
def read(self, size):
"""Read 'size' bytes from remote."""
- cursize = min(size, _SAFE_BUF_SIZE)
- data = self.file.read(cursize)
- while cursize < size and len(data) == cursize:
- delta = min(cursize, size - cursize)
- data += self.file.read(delta)
- cursize += delta
- return data
+ # We need buffered read() to continue working after socket timeouts,
+ # since we use them during IDLE. Unfortunately, the standard library's
+ # SocketIO implementation makes this impossible, by setting a permanent
+ # error condition instead of letting the caller decide how to handle a
+ # timeout. We therefore implement our own buffered read().
+ # https://github.com/python/cpython/issues/51571
+ #
+ # Reading in chunks instead of delegating to a single
+ # BufferedReader.read() call also means we avoid its preallocation
+ # of an unreasonably large memory block if a malicious server claims
+ # it will send a huge literal without actually sending one.
+ # https://github.com/python/cpython/issues/119511
+
+ parts = []
+
+ while size > 0:
+
+ if len(parts) < len(self._readbuf):
+ buf = self._readbuf[len(parts)]
+ else:
+ try:
+ buf = self.sock.recv(DEFAULT_BUFFER_SIZE)
+ except ConnectionError:
+ break
+ if not buf:
+ break
+ self._readbuf.append(buf)
+
+ if len(buf) >= size:
+ parts.append(buf[:size])
+ self._readbuf = [buf[size:]] + self._readbuf[len(parts):]
+ break
+ parts.append(buf)
+ size -= len(buf)
+
+ return b''.join(parts)
def readline(self):
"""Read line from remote."""
- line = self.file.readline(_MAXLINE + 1)
+ # The comment in read() explains why we implement our own readline().
+
+ LF = b'\n'
+ parts = []
+ length = 0
+
+ while length < _MAXLINE:
+
+ if len(parts) < len(self._readbuf):
+ buf = self._readbuf[len(parts)]
+ else:
+ try:
+ buf = self.sock.recv(DEFAULT_BUFFER_SIZE)
+ except ConnectionError:
+ break
+ if not buf:
+ break
+ self._readbuf.append(buf)
+
+ pos = buf.find(LF)
+ if pos != -1:
+ pos += 1
+ parts.append(buf[:pos])
+ self._readbuf = [buf[pos:]] + self._readbuf[len(parts):]
+ break
+ parts.append(buf)
+ length += len(buf)
+
+ line = b''.join(parts)
if len(line) > _MAXLINE:
raise self.error("got more than %d bytes" % _MAXLINE)
return line
def shutdown(self):
"""Close I/O established in "open"."""
- self.file.close()
+ self._file.close()
try:
self.sock.shutdown(socket.SHUT_RDWR)
except OSError as exc:
return typ, [quotaroot, quota]
+ def idle(self, duration=None):
+ """Return an iterable IDLE context manager producing untagged responses.
+ If the argument is not None, limit iteration to 'duration' seconds.
+
+ with M.idle(duration=29 * 60) as idler:
+ for typ, data in idler:
+ print(typ, data)
+
+ Note: 'duration' requires a socket connection (not IMAP4_stream).
+ """
+ return Idler(self, duration)
+
+
def list(self, directory='""', pattern='*'):
"""List mailbox names in directory matching pattern.
if typ == 'OK':
self.sock = ssl_context.wrap_socket(self.sock,
server_hostname=self.host)
- self.file = self.sock.makefile('rb')
+ self._file = self.sock.makefile('rb')
self._tls_established = True
self._get_capabilities()
else:
def _append_untagged(self, typ, dat):
if dat is None:
dat = b''
+
+ # During idle, queue untagged responses for delivery via iteration
+ if self._idle_capture:
+ # Responses containing literal strings are passed to us one data
+ # fragment at a time, while others arrive in a single call.
+ if (not self._idle_responses or
+ isinstance(self._idle_responses[-1][1][-1], bytes)):
+ # We are not continuing a fragmented response; start a new one
+ self._idle_responses.append((typ, [dat]))
+ else:
+ # We are continuing a fragmented response; append the fragment
+ response = self._idle_responses[-1]
+ assert response[0] == typ
+ response[1].append(dat)
+ if __debug__ and self.debug >= 5:
+ self._mesg(f'idle: queue untagged {typ} {dat!r}')
+ return
+
ur = self.untagged_responses
if __debug__:
if self.debug >= 5:
self.capabilities = tuple(dat.split())
- def _get_response(self):
+ def _get_response(self, start_timeout=False):
# Read response and store.
#
# Returns None for continuation responses,
# otherwise first response line received.
-
- resp = self._get_line()
+ #
+ # If start_timeout is given, temporarily uses it as a socket
+ # timeout while waiting for the start of a response, raising
+ # _responsetimeout if one doesn't arrive. (Used by Idler.)
+
+ if start_timeout is not False and self.sock:
+ assert start_timeout is None or start_timeout > 0
+ saved_timeout = self.sock.gettimeout()
+ self.sock.settimeout(start_timeout)
+ try:
+ resp = self._get_line()
+ except TimeoutError as err:
+ raise self._responsetimeout from err
+ finally:
+ self.sock.settimeout(saved_timeout)
+ else:
+ resp = self._get_line()
# Command completion response?
n -= 1
+class Idler:
+ """Iterable IDLE context manager: start IDLE & produce untagged responses.
+
+ An object of this type is returned by the IMAP4.idle() method.
+
+ Note: The name and structure of this class are subject to change.
+ """
+
+ def __init__(self, imap, duration=None):
+ if 'IDLE' not in imap.capabilities:
+ raise imap.error("Server does not support IMAP4 IDLE")
+ if duration is not None and not imap.sock:
+ # IMAP4_stream pipes don't support timeouts
+ raise imap.error('duration requires a socket connection')
+ self._duration = duration
+ self._deadline = None
+ self._imap = imap
+ self._tag = None
+ self._saved_state = None
+
+ def __enter__(self):
+ imap = self._imap
+ assert not imap._idle_responses
+ assert not imap._idle_capture
+
+ if __debug__ and imap.debug >= 4:
+ imap._mesg(f'idle start duration={self._duration}')
+
+ # Start capturing untagged responses before sending IDLE,
+ # so we can deliver via iteration any that arrive while
+ # the IDLE command continuation request is still pending.
+ imap._idle_capture = True
+
+ try:
+ self._tag = imap._command('IDLE')
+ # As with any command, the server is allowed to send us unrelated,
+ # untagged responses before acting on IDLE. These lines will be
+ # returned by _get_response(). When the server is ready, it will
+ # send an IDLE continuation request, indicated by _get_response()
+ # returning None. We therefore process responses in a loop until
+ # this occurs.
+ while resp := imap._get_response():
+ if imap.tagged_commands[self._tag]:
+ typ, data = imap.tagged_commands.pop(self._tag)
+ if typ == 'NO':
+ raise imap.error(f'idle denied: {data}')
+ raise imap.abort(f'unexpected status response: {resp}')
+
+ if __debug__ and imap.debug >= 4:
+ prompt = imap.continuation_response
+ imap._mesg(f'idle continuation prompt: {prompt}')
+ except BaseException:
+ imap._idle_capture = False
+ raise
+
+ if self._duration is not None:
+ self._deadline = time.monotonic() + self._duration
+
+ self._saved_state = imap.state
+ imap.state = 'IDLING'
+
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ imap = self._imap
+
+ if __debug__ and imap.debug >= 4:
+ imap._mesg('idle done')
+ imap.state = self._saved_state
+
+ # Stop intercepting untagged responses before sending DONE,
+ # since we can no longer deliver them via iteration.
+ imap._idle_capture = False
+
+ # If we captured untagged responses while the IDLE command
+ # continuation request was still pending, but the user did not
+ # iterate over them before exiting IDLE, we must put them
+ # someplace where the user can retrieve them. The only
+ # sensible place for this is the untagged_responses dict,
+ # despite its unfortunate inability to preserve the relative
+ # order of different response types.
+ if leftovers := len(imap._idle_responses):
+ if __debug__ and imap.debug >= 4:
+ imap._mesg(f'idle quit with {leftovers} leftover responses')
+ while imap._idle_responses:
+ typ, data = imap._idle_responses.pop(0)
+ # Append one fragment at a time, just as _get_response() does
+ for datum in data:
+ imap._append_untagged(typ, datum)
+
+ try:
+ imap.send(b'DONE' + CRLF)
+ status, [msg] = imap._command_complete('IDLE', self._tag)
+ if __debug__ and imap.debug >= 4:
+ imap._mesg(f'idle status: {status} {msg!r}')
+ except OSError:
+ if not exc_type:
+ raise
+
+ return False # Do not suppress context body exceptions
+
+ def __iter__(self):
+ return self
+
+ def _pop(self, timeout, default=('', None)):
+ # Get the next response, or a default value on timeout.
+ # The timeout arg can be an int or float, or None for no timeout.
+ # Timeouts require a socket connection (not IMAP4_stream).
+ # This method ignores self._duration.
+
+ # Historical Note:
+ # The timeout was originally implemented using select() after
+ # checking for the presence of already-buffered data.
+ # That allowed timeouts on pipe connetions like IMAP4_stream.
+ # However, it seemed possible that SSL data arriving without any
+ # IMAP data afterward could cause select() to indicate available
+ # application data when there was none, leading to a read() call
+ # that would block with no timeout. It was unclear under what
+ # conditions this would happen in practice. Our implementation was
+ # changed to use socket timeouts instead of select(), just to be
+ # safe.
+
+ imap = self._imap
+ if imap.state != 'IDLING':
+ raise imap.error('_pop() only works during IDLE')
+
+ if imap._idle_responses:
+ # Response is ready to return to the user
+ resp = imap._idle_responses.pop(0)
+ if __debug__ and imap.debug >= 4:
+ imap._mesg(f'idle _pop({timeout}) de-queued {resp[0]}')
+ return resp
+
+ if __debug__ and imap.debug >= 4:
+ imap._mesg(f'idle _pop({timeout}) reading')
+
+ if timeout is not None:
+ if timeout <= 0:
+ return default
+ timeout = float(timeout) # Required by socket.settimeout()
+
+ try:
+ imap._get_response(timeout) # Reads line, calls _append_untagged()
+ except IMAP4._responsetimeout:
+ if __debug__ and imap.debug >= 4:
+ imap._mesg(f'idle _pop({timeout}) done')
+ return default
+
+ resp = imap._idle_responses.pop(0)
+
+ if __debug__ and imap.debug >= 4:
+ imap._mesg(f'idle _pop({timeout}) read {resp[0]}')
+ return resp
+
+ def __next__(self):
+ imap = self._imap
+
+ if self._duration is None:
+ timeout = None
+ else:
+ timeout = self._deadline - time.monotonic()
+ typ, data = self._pop(timeout)
+
+ if not typ:
+ if __debug__ and imap.debug >= 4:
+ imap._mesg('idle iterator exhausted')
+ raise StopIteration
+
+ return typ, data
+
+ def burst(self, interval=0.1):
+ """Yield a burst of responses no more than 'interval' seconds apart.
+
+ with M.idle() as idler:
+ # get a response and any others following by < 0.1 seconds
+ batch = list(idler.burst())
+ print(f'processing {len(batch)} responses...')
+ print(batch)
+
+ Note: This generator requires a socket connection (not IMAP4_stream).
+ """
+ if not self._imap.sock:
+ raise self._imap.error('burst() requires a socket connection')
+
+ try:
+ yield next(self)
+ except StopIteration:
+ return
+
+ while response := self._pop(interval, None):
+ yield response
+
+
if HAVE_SSL:
class IMAP4_SSL(IMAP4):
self.host = None # For compatibility with parent class
self.port = None
self.sock = None
- self.file = None
+ self._file = None
self.process = subprocess.Popen(self.command,
bufsize=DEFAULT_BUFFER_SIZE,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
self._send_tagged(tag, 'BAD', 'No mailbox selected')
+class IdleCmdDenyHandler(SimpleIMAPHandler):
+ capabilities = 'IDLE'
+ def cmd_IDLE(self, tag, args):
+ self._send_tagged(tag, 'NO', 'IDLE is not allowed at this time')
+
+
+class IdleCmdHandler(SimpleIMAPHandler):
+ capabilities = 'IDLE'
+ def cmd_IDLE(self, tag, args):
+ # pre-idle-continuation response
+ self._send_line(b'* 0 EXISTS')
+ self._send_textline('+ idling')
+ # simple response
+ self._send_line(b'* 2 EXISTS')
+ # complex response: fragmented data due to literal string
+ self._send_line(b'* 1 FETCH (BODY[HEADER.FIELDS (DATE)] {41}')
+ self._send(b'Date: Fri, 06 Dec 2024 06:00:00 +0000\r\n\r\n')
+ self._send_line(b')')
+ # simple response following a fragmented one
+ self._send_line(b'* 3 EXISTS')
+ # response arriving later
+ time.sleep(1)
+ self._send_line(b'* 1 RECENT')
+ r = yield
+ if r == b'DONE\r\n':
+ self._send_line(b'* 9 RECENT')
+ self._send_tagged(tag, 'OK', 'Idle completed')
+ else:
+ self._send_tagged(tag, 'BAD', 'Expected DONE')
+
+
+class IdleCmdDelayedPacketHandler(SimpleIMAPHandler):
+ capabilities = 'IDLE'
+ def cmd_IDLE(self, tag, args):
+ self._send_textline('+ idling')
+ # response line spanning multiple packets, the last one delayed
+ self._send(b'* 1 EX')
+ time.sleep(0.2)
+ self._send(b'IS')
+ time.sleep(1)
+ self._send(b'TS\r\n')
+ r = yield
+ if r == b'DONE\r\n':
+ self._send_tagged(tag, 'OK', 'Idle completed')
+ else:
+ self._send_tagged(tag, 'BAD', 'Expected DONE')
+
+
class NewIMAPTestsMixin():
client = None
# command tests
+ def test_idle_capability(self):
+ client, _ = self._setup(SimpleIMAPHandler)
+ with self.assertRaisesRegex(imaplib.IMAP4.error,
+ 'does not support IMAP4 IDLE'):
+ with client.idle():
+ pass
+
+ def test_idle_denied(self):
+ client, _ = self._setup(IdleCmdDenyHandler)
+ client.login('user', 'pass')
+ with self.assertRaises(imaplib.IMAP4.error):
+ with client.idle() as idler:
+ pass
+
+ def test_idle_iter(self):
+ client, _ = self._setup(IdleCmdHandler)
+ client.login('user', 'pass')
+ with client.idle() as idler:
+ # iteration should include response between 'IDLE' & '+ idling'
+ response = next(idler)
+ self.assertEqual(response, ('EXISTS', [b'0']))
+ # iteration should produce responses
+ response = next(idler)
+ self.assertEqual(response, ('EXISTS', [b'2']))
+ # fragmented response (with literal string) should arrive whole
+ expected_fetch_data = [
+ (b'1 (BODY[HEADER.FIELDS (DATE)] {41}',
+ b'Date: Fri, 06 Dec 2024 06:00:00 +0000\r\n\r\n'),
+ b')']
+ typ, data = next(idler)
+ self.assertEqual(typ, 'FETCH')
+ self.assertEqual(data, expected_fetch_data)
+ # response after a fragmented one should arrive separately
+ response = next(idler)
+ self.assertEqual(response, ('EXISTS', [b'3']))
+ # iteration should have consumed untagged responses
+ _, data = client.response('EXISTS')
+ self.assertEqual(data, [None])
+ # responses not iterated should be available after idle
+ _, data = client.response('RECENT')
+ self.assertEqual(data[0], b'1')
+ # responses received after 'DONE' should be available after idle
+ self.assertEqual(data[1], b'9')
+
+ def test_idle_burst(self):
+ client, _ = self._setup(IdleCmdHandler)
+ client.login('user', 'pass')
+ # burst() should yield immediately available responses
+ with client.idle() as idler:
+ batch = list(idler.burst())
+ self.assertEqual(len(batch), 4)
+ # burst() should not have consumed later responses
+ _, data = client.response('RECENT')
+ self.assertEqual(data, [b'1', b'9'])
+
+ def test_idle_delayed_packet(self):
+ client, _ = self._setup(IdleCmdDelayedPacketHandler)
+ client.login('user', 'pass')
+ # If our readline() implementation fails to preserve line fragments
+ # when idle timeouts trigger, a response spanning delayed packets
+ # can be corrupted, leaving the protocol stream in a bad state.
+ try:
+ with client.idle(0.5) as idler:
+ self.assertRaises(StopIteration, next, idler)
+ except client.abort as err:
+ self.fail('multi-packet response was corrupted by idle timeout')
+
def test_login(self):
client, _ = self._setup(SimpleIMAPHandler)
typ, data = client.login('user', 'pass')
self.assertEqual(data[0], b'Returned to authenticated state. (Success)')
self.assertEqual(client.state, 'AUTH')
+ # property tests
+
+ def test_file_property_should_not_be_accessed(self):
+ client, _ = self._setup(SimpleIMAPHandler)
+ # the 'file' property replaced a private attribute that is now unsafe
+ with self.assertWarns(RuntimeWarning):
+ client.file
+
class NewIMAPTests(NewIMAPTestsMixin, unittest.TestCase):
imap_class = imaplib.IMAP4