]> git.ipfire.org Git - pbs.git/commitdiff
messages: Refactor the whole thing
authorMichael Tremer <michael.tremer@ipfire.org>
Wed, 5 Feb 2025 15:34:47 +0000 (15:34 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Wed, 5 Feb 2025 15:34:47 +0000 (15:34 +0000)
This will now asynchronously send messages from the main thread.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/buildservice/__init__.py
src/buildservice/messages.py
src/scripts/pakfire-build-service

index ac6e89b2fb1ddbcd26fbc5136720141aef989641..17cbc74d3fc538c29d91b0a360623e11099b8296 100644 (file)
@@ -110,6 +110,9 @@ class Backend(object):
                # XXX Disabled for now
                #self.run_periodic_task(300, self.monitorings.check)
 
+               # Regularly try sending messages
+               self.run_periodic_task(600, self.messages.queue.send)
+
                # Cleanup regularly
                self.run_periodic_task(3600, self.cleanup)
 
index 15472d59b3ca7f046e481a5f363f5c4e3bc99744..b5d8249fb391aea4ed58107ba0e6253afa463cb4 100644 (file)
@@ -1,5 +1,7 @@
 #!/usr/bin/python
 
+import asyncio
+import datetime
 import email
 import email.charset
 import email.mime.text
@@ -12,7 +14,11 @@ import socket
 import tornado.locale
 import tornado.template
 
+import sqlalchemy
+from sqlalchemy import Boolean, Column, DateTime, Integer, Text
+
 from . import base
+from . import database
 
 from .constants import TEMPLATESDIR
 from .decorators import *
@@ -26,6 +32,38 @@ email.charset.add_charset("utf-8", email.charset.SHORTEST, email.charset.QP, "ut
 # Default policy for internal email handling
 policy = email.policy.HTTP
 
+class Message(database.Base, database.BackendMixin):
+       """
+               A simple class that represents a message that needs to be sent
+       """
+       __tablename__ = "messages"
+
+       # ID
+
+       id = Column(Integer, primary_key=True)
+
+       # Message - The actual payload
+
+       message = Column(Text, nullable=False)
+
+       # Queued At
+
+       queued_at = Column(DateTime(timezone=False), nullable=False,
+               server_default=sqlalchemy.func.current_timestamp())
+
+       # Sent At
+
+       sent_at = Column(DateTime(timezone=False))
+
+       # Priority
+
+       priority = Column(Integer, default=0)
+
+       # Error Message
+
+       error_message = Column(Text)
+
+
 class Messages(base.Object):
        def init(self):
                self.template_loader = tornado.template.Loader(TEMPLATESDIR, autoescape=None)
@@ -37,7 +75,7 @@ class Messages(base.Object):
                """
                return Queue(self.backend)
 
-       def send(self, message, priority=None):
+       async def send(self, message, priority=None):
                # Check if To is set
                if not "To" in message:
                        raise ValueError("Message has no To: header")
@@ -52,18 +90,21 @@ class Messages(base.Object):
 
                # Add From header if it does not exist
                if not "From" in message:
-                       default_sender_address = self.settings.get(
-                               "default_sender_address", "no-reply@ipfire.org")
+                       sender = self.backend.config.get("mail", "sender",
+                               fallback="Pakfire Build Service <no-reply@ipfire.org>")
 
-                       message.add_header("From", "Pakfire Build Service <%s>" % default_sender_address)
+                       message.add_header("From", sender)
 
                # Send any errors to the bounce address
-               default_bounce_address = self.settings.get("default_bounce_address")
-               if default_bounce_address:
-                       message.add_header("Errors-To", "<%s>" % default_bounce_address)
+               bounce_address = self.backend.config.get("mail", "bounce-address", fallback=None)
+               if bounce_address:
+                       message.add_header("Errors-To", "<%s>" % bounce_address)
 
                # Send the message
-               self.queue.enqueue(message, priority=priority)
+               message = await self.queue.enqueue(message, priority=priority)
+
+               # Launch a new task to send all queued messages
+               self.backend.run_task(self.queue.send)
 
                return message
 
@@ -177,68 +218,50 @@ class Messages(base.Object):
 
                return message
 
-       def send_template(self, template_name, priority=None, **kwargs):
+       async def send_template(self, template_name, priority=None, **kwargs):
                # Render message
                message = self.render(template_name, **kwargs)
 
                # Send the message
-               return self.send(message, priority=priority)
+               return await self.send(message, priority=priority)
 
 
 class Queue(base.Object):
        """
                Queues and delivers any emails
        """
-       def __iter__(self):
-               messages = self.db.query("""
-                       SELECT
-                               *
-                       FROM
-                               messages
-                       WHERE
-                               sent_at IS NULL
-                       ORDER BY
-                               priority DESC,
-                               queued_at ASC
-                       """,
-               )
-
-               return iter(messages)
-
-       def __len__(self):
-               res = self.db.get("""
-                       SELECT
-                               COUNT(*) AS length
-                       FROM
-                               messages
-                       WHERE
-                               sent_at IS NULL
-                       """,
+       def get_messages(self, limit=None):
+               stmt = (
+                       sqlalchemy
+                       .select(
+                               Message,
+                       )
+                       .where(
+                               Message.sent_at == None,
+                       )
+                       .order_by(
+                               Message.priority.desc(),
+                               Message.queued_at.asc(),
+                       )
+                       .limit(limit)
                )
 
-               return res.length
+               return self.db.fetch(stmt)
 
-       def enqueue(self, message, priority=None):
+       async def enqueue(self, message, priority=0):
                """
                        Enqueues a new message
                """
-               res = self.db.get("""
-                       INSERT INTO
-                               messages(
-                                       message,
-                                       priority
-                               )
-                       VALUES(
-                               %s, %s
-                       )
-                       RETURNING
-                               id
-                       """, message.as_string(policy=policy), priority or 0)
+               # Insert into the database
+               message = await self.db.insert(
+                       Message,
+                       message  = message.as_string(policy=policy),
+                       priority = priority,
+               )
 
-               log.debug("Message queued with ID %s" % res.id)
+               log.debug("Message queued with ID %s" % message.id)
 
-       @lazy_property
-       def relay(self):
+       def connect(self):
                """
                        Connection to the local mail relay
                """
@@ -252,15 +275,34 @@ class Queue(base.Object):
 
                return conn
 
-       async def send(self):
+       _send_lock = asyncio.Lock()
+
+       async def send(self, **kwargs):
                """
                        Sends all pending messages from the queue
                """
-               for message in self:
-                       with self.db.transaction():
-                               self._send(message)
+               relay = None
+
+               async with self._send_lock:
+                       while True:
+                               messages = self.get_messages(limit=1)
+
+                               # If there are no messages left, we can quit
+                               if not messages:
+                                       break
+
+                               # Connect to the relay
+                               if relay is None:
+                                       relay = self.connect()
 
-       def _send(self, message):
+                               # Send the messages one by one
+                               async for message in messages:
+                                       async with await self.db.transaction():
+                                               await self._send(relay, message, **kwargs)
+
+               log.debug("All messages sent")
+
+       async def _send(self, relay, message):
                """
                        Delivers the given message to the local mail relay
                """
@@ -276,7 +318,8 @@ class Queue(base.Object):
 
                # Try delivering the email
                try:
-                       rejected_recipients = self.relay.send_message(msg)
+                       # Run this in an extra thread because smtplib blocks
+                       rejected_recipients = await asyncio.to_thread(relay.send_message, msg)
 
                except smtplib.SMTPRecipientsRefused as e:
                        rejected_recipients = e.recipients
@@ -292,27 +335,34 @@ class Queue(base.Object):
                        error_messages.append("Recipient refused: %s - %s (%s)" % \
                                (recipient, code, reason.decode()))
 
+               # Store any errors
                if error_messages:
-                       self.db.execute("UPDATE messages SET error_message = %s \
-                               WHERE id = %s", "; ".join(error_messages), message.id)
-
                        log.error("Could not send email: %s" % message.id)
+
+                       message.error_message = "; ".join(error_messages)
+
                        for line in error_messages:
                                log.error(line)
 
                # After the email has been successfully sent, we mark it as such
-               self.db.execute("UPDATE messages SET sent_at = CURRENT_TIMESTAMP \
-                       WHERE id = %s", message.id)
+               message.sent_at = sqlalchemy.func.current_timestamp()
 
        async def cleanup(self):
+               """
+                       Deletes all successfully sent emails
+               """
                log.debug("Cleaning up message queue")
 
-               self.db.execute("""
-                       DELETE FROM
-                               messages
-                       WHERE
-                               sent_at IS NOT NULL
-                       AND
-                               sent_at <= CURRENT_TIMESTAMP - INTERVAL '30 days'
-                       """,
+               stmt = (
+                       sqlalchemy
+                       .delete(
+                               Message,
+                       )
+                       .where(
+                               Message.sent_at <= sqlalchemy.func.current_timestamp() - datetime.timedelta(days=30),
+                       )
                )
+
+               # Run the query
+               async with await self.db.transaction():
+                       await self.db.execute(stmt)
index 12e23d8b2f435ecb5f5c59092601d12e2f5607c2..f33a1825df6105e0c5736f7c000104b1eb578caf 100644 (file)
@@ -30,9 +30,6 @@ class Cli(object):
                        # Jobs
                        "jobs:installcheck"   : self._jobs_installcheck,
 
-                       # Messages
-                       "messages:queue:send" : self.backend.messages.queue.send,
-
                        # Mirrors
                        "mirrors:check"       : self._mirrors_check,