]> git.ipfire.org Git - pbs.git/commitdiff
message: Move queuing logic out of the base class
authorMichael Tremer <michael.tremer@ipfire.org>
Fri, 14 Oct 2022 13:11:59 +0000 (13:11 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Fri, 14 Oct 2022 13:12:52 +0000 (13:12 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/buildservice/__init__.py
src/buildservice/messages.py
src/crontab/pakfire-build-service
src/database.sql
src/scripts/pakfire-build-service

index ed09699911bada04978166af1d3171c21338d6c0..6ead74ac4075b363efdbc50803c496d8791d2f1f 100644 (file)
@@ -290,6 +290,9 @@ class Backend(object):
                """
                        Called regularly to cleanup any left-over resources
                """
+               # Messages
+               await self.messages.queue.cleanup()
+
                # Sessions
                await self.sessions.cleanup()
 
index 64fcea7ffea0b7e4191ef454697e0acccab4eeb9..14572d56e7fe4a4aaeb10370bae7eea09a686da4 100644 (file)
@@ -7,7 +7,8 @@ import email.mime.text
 import email.policy
 import logging
 import markdown
-import subprocess
+import smtplib
+import socket
 import tornado.locale
 import tornado.template
 
@@ -15,6 +16,7 @@ from . import base
 from . import users
 
 from .constants import TEMPLATESDIR
+from .decorators import *
 
 # Setup logging
 log = logging.getLogger("pakfire.buildservice.messages")
@@ -29,34 +31,14 @@ class Messages(base.Object):
        def init(self):
                self.templates = tornado.template.Loader(TEMPLATESDIR)
 
-       def __iter__(self):
-               messages = self.db.query("SELECT * FROM messages \
-                       WHERE sent_at IS NULL ORDER BY queued_at")
-
-               return iter(messages)
-
-       def __len__(self):
-               res = self.db.get("SELECT COUNT(*) AS count FROM messages \
-                       WHERE sent_at IS NULL")
-
-               return res.count
-
-       def process_queue(self):
+       @lazy_property
+       def queue(self):
                """
-                       Sends all emails in the queue
+                       The message queue
                """
-               for message in self:
-                       with self.db.transaction():
-                               self.__sendmail(message)
-
-               # Delete all old emails
-               with self.db.transaction():
-                       self.cleanup()
-
-       def cleanup(self):
-               self.db.execute("DELETE FROM messages WHERE sent_at <= NOW() - INTERVAL '24 hours'")
+               return Queue(self.backend)
 
-       def send_to(self, recipient, message, sender=None, headers={}):
+       def send_to(self, recipient, message, sender=None, headers={}, priority=None):
                # Parse the message
                if not isinstance(message, email.message.Message):
                        message = email.message_from_string(message)
@@ -81,7 +63,7 @@ class Messages(base.Object):
                        message.add_header(k, v)
 
                # Queue the message
-               self.queue(message.as_string())
+               self.queue.enqueue(message.as_string(), priority=priority)
 
        def send_template(self, recipient, name, sender=None, headers={}, **kwargs):
                # Get user (if we have one)
@@ -159,32 +141,121 @@ class Messages(base.Object):
                for recipient in recipients:
                        self.send_template(recipient, *args, **kwargs)
 
-       def queue(self, message):
-               res = self.db.get("INSERT INTO messages(message) VALUES(%s) RETURNING id", message)
 
-               logging.info("Message queued as %s", res.id)
+class Queue(base.Object):
+       """
+               Queues and delivers any emails
+       """
+       @property
+       def messages(self):
+               return self.db.query("""
+                       SELECT
+                               *
+                       FROM
+                               messages
+                       WHERE
+                               sent_at IS NULL
+                       ORDER BY
+                               priority DESC,
+                               queued_at ASC
+                       """,
+               )
+
+       def enqueue(self, message, priority=None):
+               """
+                       Enqueues a new message
+               """
+               res = self.db.get("""
+                       INSERT INTO
+                               messages(
+                                       message,
+                                       priority
+                               )
+                       VALUES(
+                               %s, %s
+                       )
+                       RETURNING
+                               id
+                       """, message.as_string(policy=policy), priority or 0)
+
+               log.debug("Message queued with ID %s" % res.id)
+
+       @lazy_property
+       def relay(self):
+               """
+                       Connection to the local mail relay
+               """
+               hostname = socket.getfqdn()
+
+               # Open SMTP connection
+               conn = smtplib.SMTP(hostname)
+
+               # Start TLS connection
+               conn.starttls()
 
-       def __sendmail(self, message):
-               # Convert message from string
+               return conn
+
+       async def send(self):
+               """
+                       Sends all pending messages from the queue
+               """
+               for message in self.messages:
+                       with self.db.transaction():
+                               self._send(message)
+
+       def _send(self, message):
+               """
+                       Delivers the given message to the local mail relay
+               """
+               # Parse the message from what is in the database
                msg = email.message_from_string(message.message)
 
-               # Get some headers
-               recipient = msg.get("To")
-               subject   = msg.get("Subject")
+               log.debug("Sending a message %s to: %s" % (
+                       msg.get("Subject"), msg.get("To"),
+               ))
+
+               error_messages = []
+               rejected_recipients = {}
+
+               # Try delivering the email
+               try:
+                       rejected_recipients = self.relay.send_message(msg)
+
+               except smtplib.SMTPRecipientsRefused as e:
+                       rejected_recipients = e.recipients
+
+               except smtplib.SMTPException as e:
+                       log.error("SMTP Exception: %s" % e)
+                       error_messages.append("%s" % e)
+
+               # Log all emails that could not be delivered
+               for recipient in rejected_recipients:
+                       code, reason = rejected_recipients[recipient]
 
-               logging.info("Sending mail to %s: %s" % (recipient, subject))
+                       error_messages.append("Recipient refused: %s - %s (%s)" % \
+                               (recipient, code, reason.decode()))
 
-               # Run sendmail and the email in
-               p = subprocess.Popen(["/usr/lib/sendmail", "-t"], bufsize=0, close_fds=True,
-                       stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+               if error_messages:
+                       self.db.execute("UPDATE messages SET error_message = %s \
+                               WHERE id = %s", "; ".join(error_messages), message.id)
 
-               stdout, stderr = p.communicate(msg.as_string())
+                       log.error("Could not send email: %s" % message.id)
+                       for line in error_messages:
+                               log.error(line)
 
-               # Wait until sendmail has finished.
-               p.wait()
+               # After the email has been successfully sent, we mark it as such
+               self.db.execute("UPDATE messages SET sent_at = CURRENT_TIMESTAMP \
+                       WHERE id = %s", message.id)
 
-               if p.returncode:
-                       raise Exception("Could not send mail: %s" % stderr)
+       async def cleanup(self):
+               log.debug("Cleaning up message queue")
 
-               # Mark message as sent
-               self.db.execute("UPDATE messages SET sent_at = NOW() WHERE id = %s", message.id)
+               self.db.execute("""
+                       DELETE FROM
+                               messages
+                       WHERE
+                               sent_at IS NOT NULL
+                       AND
+                               sent_at <= CURRENT_TIMESTAMP - INTERVAL '30 days'
+                       """,
+               )
index 71654a5f2e3159520aeafdd00b1320b88e9b604c..e747b72fd4c20d4fb02b5642cf9368409fbbd69c 100644 (file)
@@ -1,7 +1,7 @@
 MAILTO=pakfire@ipfire.org
 
 # Send queued emails once a minute
-#* * * * *             _pakfire        pakfire-build-service process-message-queue &>/dev/null
+* * * * *              _pakfire        pakfire-build-service --logging=warning messages:queue:send
 
 # Synchronize mirrors once every five minutes
 */5 * * * *            _pakfire        pakfire-build-service --logging=warning sync
index 1c8bf93ef6a2b5ec6c21a123976ce7954e78c27a..3816b63be41a889abce4b2175747fca272ecaad7 100644 (file)
@@ -522,7 +522,9 @@ CREATE TABLE public.messages (
     id integer NOT NULL,
     message text NOT NULL,
     queued_at timestamp without time zone DEFAULT now() NOT NULL,
-    sent_at timestamp without time zone
+    sent_at timestamp without time zone,
+    priority integer DEFAULT 0 NOT NULL,
+    error_message text
 );
 
 
@@ -1385,14 +1387,6 @@ ALTER TABLE ONLY public.users_emails
     ADD CONSTRAINT idx_2198256_primary PRIMARY KEY (id);
 
 
---
--- Name: messages idx_2198274_primary; Type: CONSTRAINT; Schema: public; Owner: -
---
-
-ALTER TABLE ONLY public.messages
-    ADD CONSTRAINT idx_2198274_primary PRIMARY KEY (id);
-
-
 --
 -- Name: jobs_packages jobs_packages_unique; Type: CONSTRAINT; Schema: public; Owner: -
 --
@@ -1417,6 +1411,14 @@ ALTER TABLE ONLY public.keys
     ADD CONSTRAINT keys_pkey PRIMARY KEY (id);
 
 
+--
+-- Name: messages messages_pkey; Type: CONSTRAINT; Schema: public; Owner: -
+--
+
+ALTER TABLE ONLY public.messages
+    ADD CONSTRAINT messages_pkey PRIMARY KEY (id);
+
+
 --
 -- Name: mirrors_checks mirrors_checks_pkey; Type: CONSTRAINT; Schema: public; Owner: -
 --
@@ -1681,10 +1683,10 @@ CREATE UNIQUE INDEX keys_fingerprint ON public.keys USING btree (fingerprint) WH
 
 
 --
--- Name: messages_order; Type: INDEX; Schema: public; Owner: -
+-- Name: messages_queued; Type: INDEX; Schema: public; Owner: -
 --
 
-CREATE INDEX messages_order ON public.messages USING btree (queued_at) WHERE (sent_at IS NULL);
+CREATE INDEX messages_queued ON public.messages USING btree (priority DESC, queued_at) WHERE (sent_at IS NULL);
 
 
 --
index df921397d978bdc7ae785b857a62313a71d6393f..51127c1e716299a9ca1907adce4fcd66acbcbf34 100644 (file)
@@ -16,23 +16,26 @@ class Cli(object):
 
                self._commands = {
                        # Bugzilla
-                       "bugzilla:version" : self.backend.bugzilla.version,
+                       "bugzilla:version"    : self.backend.bugzilla.version,
 
                        # Cleanup
-                       "cleanup"          : self.backend.cleanup,
+                       "cleanup"             : self.backend.cleanup,
 
                        # Jobs
-                       "jobs:depcheck"    : self._jobs_depcheck,
+                       "jobs:depcheck"       : self._jobs_depcheck,
 
                        # Keys
-                       "keys:generate"    : self.backend.keys.generate,
+                       "keys:generate"       : self.backend.keys.generate,
+
+                       # Messages
+                       "messages:queue:send" : self.backend.messages.queue.send,
 
                        # Repositories
-                       "repos:rotate-keys": self.backend.repos.rotate_keys,
-                       "repos:write"      : self.backend.repos.write,
+                       "repos:rotate-keys"   : self.backend.repos.rotate_keys,
+                       "repos:write"         : self.backend.repos.write,
 
                        # Sync
-                       "sync"             : self.backend.sync,
+                       "sync"                : self.backend.sync,
 
                        # Run mirror check
                        #"check-mirrors" : self.backend.mirrors.check,
@@ -43,9 +46,6 @@ class Cli(object):
                        # List repository
                        #"list-repository" : self._list_repository,
 
-                       # Sends all queued messages
-                       #"process-message-queue" : self.backend.messages.process_queue,
-
                        # Pull sources
                        #"pull-sources" : self.backend.sources.pull,