#!/usr/bin/python
+import asyncio
+import datetime
import email
import email.charset
import email.mime.text
import tornado.locale
import tornado.template
+import sqlalchemy
+from sqlalchemy import Boolean, Column, DateTime, Integer, Text
+
from . import base
+from . import database
from .constants import TEMPLATESDIR
from .decorators import *
# Default policy for internal email handling
policy = email.policy.HTTP
+class Message(database.Base, database.BackendMixin):
+ """
+ A simple class that represents a message that needs to be sent
+ """
+ __tablename__ = "messages"
+
+ # ID
+
+ id = Column(Integer, primary_key=True)
+
+ # Message - The actual payload
+
+ message = Column(Text, nullable=False)
+
+ # Queued At
+
+ queued_at = Column(DateTime(timezone=False), nullable=False,
+ server_default=sqlalchemy.func.current_timestamp())
+
+ # Sent At
+
+ sent_at = Column(DateTime(timezone=False))
+
+ # Priority
+
+ priority = Column(Integer, default=0)
+
+ # Error Message
+
+ error_message = Column(Text)
+
+
class Messages(base.Object):
def init(self):
self.template_loader = tornado.template.Loader(TEMPLATESDIR, autoescape=None)
"""
return Queue(self.backend)
- def send(self, message, priority=None):
+ async def send(self, message, priority=None):
# Check if To is set
if not "To" in message:
raise ValueError("Message has no To: header")
# Add From header if it does not exist
if not "From" in message:
- default_sender_address = self.settings.get(
- "default_sender_address", "no-reply@ipfire.org")
+ sender = self.backend.config.get("mail", "sender",
+ fallback="Pakfire Build Service <no-reply@ipfire.org>")
- message.add_header("From", "Pakfire Build Service <%s>" % default_sender_address)
+ message.add_header("From", sender)
# Send any errors to the bounce address
- default_bounce_address = self.settings.get("default_bounce_address")
- if default_bounce_address:
- message.add_header("Errors-To", "<%s>" % default_bounce_address)
+ bounce_address = self.backend.config.get("mail", "bounce-address", fallback=None)
+ if bounce_address:
+ message.add_header("Errors-To", "<%s>" % bounce_address)
# Send the message
- self.queue.enqueue(message, priority=priority)
+ message = await self.queue.enqueue(message, priority=priority)
+
+ # Launch a new task to send all queued messages
+ self.backend.run_task(self.queue.send)
return message
return message
- def send_template(self, template_name, priority=None, **kwargs):
+ async def send_template(self, template_name, priority=None, **kwargs):
# Render message
message = self.render(template_name, **kwargs)
# Send the message
- return self.send(message, priority=priority)
+ return await self.send(message, priority=priority)
class Queue(base.Object):
"""
Queues and delivers any emails
"""
- def __iter__(self):
- messages = self.db.query("""
- SELECT
- *
- FROM
- messages
- WHERE
- sent_at IS NULL
- ORDER BY
- priority DESC,
- queued_at ASC
- """,
- )
-
- return iter(messages)
-
- def __len__(self):
- res = self.db.get("""
- SELECT
- COUNT(*) AS length
- FROM
- messages
- WHERE
- sent_at IS NULL
- """,
+ def get_messages(self, limit=None):
+ stmt = (
+ sqlalchemy
+ .select(
+ Message,
+ )
+ .where(
+ Message.sent_at == None,
+ )
+ .order_by(
+ Message.priority.desc(),
+ Message.queued_at.asc(),
+ )
+ .limit(limit)
)
- return res.length
+ return self.db.fetch(stmt)
- def enqueue(self, message, priority=None):
+ async def enqueue(self, message, priority=0):
"""
Enqueues a new message
"""
- res = self.db.get("""
- INSERT INTO
- messages(
- message,
- priority
- )
- VALUES(
- %s, %s
- )
- RETURNING
- id
- """, message.as_string(policy=policy), priority or 0)
+ # Insert into the database
+ message = await self.db.insert(
+ Message,
+ message = message.as_string(policy=policy),
+ priority = priority,
+ )
- log.debug("Message queued with ID %s" % res.id)
+ log.debug("Message queued with ID %s" % message.id)
- @lazy_property
- def relay(self):
+ def connect(self):
"""
Connection to the local mail relay
"""
return conn
- async def send(self):
+ _send_lock = asyncio.Lock()
+
+ async def send(self, **kwargs):
"""
Sends all pending messages from the queue
"""
- for message in self:
- with self.db.transaction():
- self._send(message)
+ relay = None
+
+ async with self._send_lock:
+ while True:
+ messages = self.get_messages(limit=1)
+
+ # If there are no messages left, we can quit
+ if not messages:
+ break
+
+ # Connect to the relay
+ if relay is None:
+ relay = self.connect()
- def _send(self, message):
+ # Send the messages one by one
+ async for message in messages:
+ async with await self.db.transaction():
+ await self._send(relay, message, **kwargs)
+
+ log.debug("All messages sent")
+
+ async def _send(self, relay, message):
"""
Delivers the given message to the local mail relay
"""
# Try delivering the email
try:
- rejected_recipients = self.relay.send_message(msg)
+ # Run this in an extra thread because smtplib blocks
+ rejected_recipients = await asyncio.to_thread(relay.send_message, msg)
except smtplib.SMTPRecipientsRefused as e:
rejected_recipients = e.recipients
error_messages.append("Recipient refused: %s - %s (%s)" % \
(recipient, code, reason.decode()))
+ # Store any errors
if error_messages:
- self.db.execute("UPDATE messages SET error_message = %s \
- WHERE id = %s", "; ".join(error_messages), message.id)
-
log.error("Could not send email: %s" % message.id)
+
+ message.error_message = "; ".join(error_messages)
+
for line in error_messages:
log.error(line)
# After the email has been successfully sent, we mark it as such
- self.db.execute("UPDATE messages SET sent_at = CURRENT_TIMESTAMP \
- WHERE id = %s", message.id)
+ message.sent_at = sqlalchemy.func.current_timestamp()
async def cleanup(self):
+ """
+ Deletes all successfully sent emails
+ """
log.debug("Cleaning up message queue")
- self.db.execute("""
- DELETE FROM
- messages
- WHERE
- sent_at IS NOT NULL
- AND
- sent_at <= CURRENT_TIMESTAMP - INTERVAL '30 days'
- """,
+ stmt = (
+ sqlalchemy
+ .delete(
+ Message,
+ )
+ .where(
+ Message.sent_at <= sqlalchemy.func.current_timestamp() - datetime.timedelta(days=30),
+ )
)
+
+ # Run the query
+ async with await self.db.transaction():
+ await self.db.execute(stmt)