From: Michael Tremer Date: Wed, 5 Feb 2025 15:34:47 +0000 (+0000) Subject: messages: Refactor the whole thing X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=edfa71f2f6e15bf6e931e372cfdd5fb99d10fa69;p=pbs.git messages: Refactor the whole thing This will now asynchronously send messages from the main thread. Signed-off-by: Michael Tremer --- diff --git a/src/buildservice/__init__.py b/src/buildservice/__init__.py index ac6e89b2..17cbc74d 100644 --- a/src/buildservice/__init__.py +++ b/src/buildservice/__init__.py @@ -110,6 +110,9 @@ class Backend(object): # XXX Disabled for now #self.run_periodic_task(300, self.monitorings.check) + # Regularly try sending messages + self.run_periodic_task(600, self.messages.queue.send) + # Cleanup regularly self.run_periodic_task(3600, self.cleanup) diff --git a/src/buildservice/messages.py b/src/buildservice/messages.py index 15472d59..b5d8249f 100644 --- a/src/buildservice/messages.py +++ b/src/buildservice/messages.py @@ -1,5 +1,7 @@ #!/usr/bin/python +import asyncio +import datetime import email import email.charset import email.mime.text @@ -12,7 +14,11 @@ import socket import tornado.locale import tornado.template +import sqlalchemy +from sqlalchemy import Boolean, Column, DateTime, Integer, Text + from . import base +from . import database from .constants import TEMPLATESDIR from .decorators import * @@ -26,6 +32,38 @@ email.charset.add_charset("utf-8", email.charset.SHORTEST, email.charset.QP, "ut # Default policy for internal email handling policy = email.policy.HTTP +class Message(database.Base, database.BackendMixin): + """ + A simple class that represents a message that needs to be sent + """ + __tablename__ = "messages" + + # ID + + id = Column(Integer, primary_key=True) + + # Message - The actual payload + + message = Column(Text, nullable=False) + + # Queued At + + queued_at = Column(DateTime(timezone=False), nullable=False, + server_default=sqlalchemy.func.current_timestamp()) + + # Sent At + + sent_at = Column(DateTime(timezone=False)) + + # Priority + + priority = Column(Integer, default=0) + + # Error Message + + error_message = Column(Text) + + class Messages(base.Object): def init(self): self.template_loader = tornado.template.Loader(TEMPLATESDIR, autoescape=None) @@ -37,7 +75,7 @@ class Messages(base.Object): """ return Queue(self.backend) - def send(self, message, priority=None): + async def send(self, message, priority=None): # Check if To is set if not "To" in message: raise ValueError("Message has no To: header") @@ -52,18 +90,21 @@ class Messages(base.Object): # Add From header if it does not exist if not "From" in message: - default_sender_address = self.settings.get( - "default_sender_address", "no-reply@ipfire.org") + sender = self.backend.config.get("mail", "sender", + fallback="Pakfire Build Service ") - message.add_header("From", "Pakfire Build Service <%s>" % default_sender_address) + message.add_header("From", sender) # Send any errors to the bounce address - default_bounce_address = self.settings.get("default_bounce_address") - if default_bounce_address: - message.add_header("Errors-To", "<%s>" % default_bounce_address) + bounce_address = self.backend.config.get("mail", "bounce-address", fallback=None) + if bounce_address: + message.add_header("Errors-To", "<%s>" % bounce_address) # Send the message - self.queue.enqueue(message, priority=priority) + message = await self.queue.enqueue(message, priority=priority) + + # Launch a new task to send all queued messages + self.backend.run_task(self.queue.send) return message @@ -177,68 +218,50 @@ class Messages(base.Object): return message - def send_template(self, template_name, priority=None, **kwargs): + async def send_template(self, template_name, priority=None, **kwargs): # Render message message = self.render(template_name, **kwargs) # Send the message - return self.send(message, priority=priority) + return await self.send(message, priority=priority) class Queue(base.Object): """ Queues and delivers any emails """ - def __iter__(self): - messages = self.db.query(""" - SELECT - * - FROM - messages - WHERE - sent_at IS NULL - ORDER BY - priority DESC, - queued_at ASC - """, - ) - - return iter(messages) - - def __len__(self): - res = self.db.get(""" - SELECT - COUNT(*) AS length - FROM - messages - WHERE - sent_at IS NULL - """, + def get_messages(self, limit=None): + stmt = ( + sqlalchemy + .select( + Message, + ) + .where( + Message.sent_at == None, + ) + .order_by( + Message.priority.desc(), + Message.queued_at.asc(), + ) + .limit(limit) ) - return res.length + return self.db.fetch(stmt) - def enqueue(self, message, priority=None): + async def enqueue(self, message, priority=0): """ Enqueues a new message """ - res = self.db.get(""" - INSERT INTO - messages( - message, - priority - ) - VALUES( - %s, %s - ) - RETURNING - id - """, message.as_string(policy=policy), priority or 0) + # Insert into the database + message = await self.db.insert( + Message, + message = message.as_string(policy=policy), + priority = priority, + ) - log.debug("Message queued with ID %s" % res.id) + log.debug("Message queued with ID %s" % message.id) - @lazy_property - def relay(self): + def connect(self): """ Connection to the local mail relay """ @@ -252,15 +275,34 @@ class Queue(base.Object): return conn - async def send(self): + _send_lock = asyncio.Lock() + + async def send(self, **kwargs): """ Sends all pending messages from the queue """ - for message in self: - with self.db.transaction(): - self._send(message) + relay = None + + async with self._send_lock: + while True: + messages = self.get_messages(limit=1) + + # If there are no messages left, we can quit + if not messages: + break + + # Connect to the relay + if relay is None: + relay = self.connect() - def _send(self, message): + # Send the messages one by one + async for message in messages: + async with await self.db.transaction(): + await self._send(relay, message, **kwargs) + + log.debug("All messages sent") + + async def _send(self, relay, message): """ Delivers the given message to the local mail relay """ @@ -276,7 +318,8 @@ class Queue(base.Object): # Try delivering the email try: - rejected_recipients = self.relay.send_message(msg) + # Run this in an extra thread because smtplib blocks + rejected_recipients = await asyncio.to_thread(relay.send_message, msg) except smtplib.SMTPRecipientsRefused as e: rejected_recipients = e.recipients @@ -292,27 +335,34 @@ class Queue(base.Object): error_messages.append("Recipient refused: %s - %s (%s)" % \ (recipient, code, reason.decode())) + # Store any errors if error_messages: - self.db.execute("UPDATE messages SET error_message = %s \ - WHERE id = %s", "; ".join(error_messages), message.id) - log.error("Could not send email: %s" % message.id) + + message.error_message = "; ".join(error_messages) + for line in error_messages: log.error(line) # After the email has been successfully sent, we mark it as such - self.db.execute("UPDATE messages SET sent_at = CURRENT_TIMESTAMP \ - WHERE id = %s", message.id) + message.sent_at = sqlalchemy.func.current_timestamp() async def cleanup(self): + """ + Deletes all successfully sent emails + """ log.debug("Cleaning up message queue") - self.db.execute(""" - DELETE FROM - messages - WHERE - sent_at IS NOT NULL - AND - sent_at <= CURRENT_TIMESTAMP - INTERVAL '30 days' - """, + stmt = ( + sqlalchemy + .delete( + Message, + ) + .where( + Message.sent_at <= sqlalchemy.func.current_timestamp() - datetime.timedelta(days=30), + ) ) + + # Run the query + async with await self.db.transaction(): + await self.db.execute(stmt) diff --git a/src/scripts/pakfire-build-service b/src/scripts/pakfire-build-service index 12e23d8b..f33a1825 100644 --- a/src/scripts/pakfire-build-service +++ b/src/scripts/pakfire-build-service @@ -30,9 +30,6 @@ class Cli(object): # Jobs "jobs:installcheck" : self._jobs_installcheck, - # Messages - "messages:queue:send" : self.backend.messages.queue.send, - # Mirrors "mirrors:check" : self._mirrors_check,