From: Michael Tremer Date: Fri, 14 Oct 2022 13:11:59 +0000 (+0000) Subject: message: Move queuing logic out of the base class X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c58cb031b86763f7f22071416eb768488a9b0b99;p=pbs.git message: Move queuing logic out of the base class Signed-off-by: Michael Tremer --- diff --git a/src/buildservice/__init__.py b/src/buildservice/__init__.py index ed096999..6ead74ac 100644 --- a/src/buildservice/__init__.py +++ b/src/buildservice/__init__.py @@ -290,6 +290,9 @@ class Backend(object): """ Called regularly to cleanup any left-over resources """ + # Messages + await self.messages.queue.cleanup() + # Sessions await self.sessions.cleanup() diff --git a/src/buildservice/messages.py b/src/buildservice/messages.py index 64fcea7f..14572d56 100644 --- a/src/buildservice/messages.py +++ b/src/buildservice/messages.py @@ -7,7 +7,8 @@ import email.mime.text import email.policy import logging import markdown -import subprocess +import smtplib +import socket import tornado.locale import tornado.template @@ -15,6 +16,7 @@ from . import base from . import users from .constants import TEMPLATESDIR +from .decorators import * # Setup logging log = logging.getLogger("pakfire.buildservice.messages") @@ -29,34 +31,14 @@ class Messages(base.Object): def init(self): self.templates = tornado.template.Loader(TEMPLATESDIR) - def __iter__(self): - messages = self.db.query("SELECT * FROM messages \ - WHERE sent_at IS NULL ORDER BY queued_at") - - return iter(messages) - - def __len__(self): - res = self.db.get("SELECT COUNT(*) AS count FROM messages \ - WHERE sent_at IS NULL") - - return res.count - - def process_queue(self): + @lazy_property + def queue(self): """ - Sends all emails in the queue + The message queue """ - for message in self: - with self.db.transaction(): - self.__sendmail(message) - - # Delete all old emails - with self.db.transaction(): - self.cleanup() - - def cleanup(self): - self.db.execute("DELETE FROM messages WHERE sent_at <= NOW() - INTERVAL '24 hours'") + return Queue(self.backend) - def send_to(self, recipient, message, sender=None, headers={}): + def send_to(self, recipient, message, sender=None, headers={}, priority=None): # Parse the message if not isinstance(message, email.message.Message): message = email.message_from_string(message) @@ -81,7 +63,7 @@ class Messages(base.Object): message.add_header(k, v) # Queue the message - self.queue(message.as_string()) + self.queue.enqueue(message.as_string(), priority=priority) def send_template(self, recipient, name, sender=None, headers={}, **kwargs): # Get user (if we have one) @@ -159,32 +141,121 @@ class Messages(base.Object): for recipient in recipients: self.send_template(recipient, *args, **kwargs) - def queue(self, message): - res = self.db.get("INSERT INTO messages(message) VALUES(%s) RETURNING id", message) - logging.info("Message queued as %s", res.id) +class Queue(base.Object): + """ + Queues and delivers any emails + """ + @property + def messages(self): + return self.db.query(""" + SELECT + * + FROM + messages + WHERE + sent_at IS NULL + ORDER BY + priority DESC, + queued_at ASC + """, + ) + + def enqueue(self, message, priority=None): + """ + Enqueues a new message + """ + res = self.db.get(""" + INSERT INTO + messages( + message, + priority + ) + VALUES( + %s, %s + ) + RETURNING + id + """, message.as_string(policy=policy), priority or 0) + + log.debug("Message queued with ID %s" % res.id) + + @lazy_property + def relay(self): + """ + Connection to the local mail relay + """ + hostname = socket.getfqdn() + + # Open SMTP connection + conn = smtplib.SMTP(hostname) + + # Start TLS connection + conn.starttls() - def __sendmail(self, message): - # Convert message from string + return conn + + async def send(self): + """ + Sends all pending messages from the queue + """ + for message in self.messages: + with self.db.transaction(): + self._send(message) + + def _send(self, message): + """ + Delivers the given message to the local mail relay + """ + # Parse the message from what is in the database msg = email.message_from_string(message.message) - # Get some headers - recipient = msg.get("To") - subject = msg.get("Subject") + log.debug("Sending a message %s to: %s" % ( + msg.get("Subject"), msg.get("To"), + )) + + error_messages = [] + rejected_recipients = {} + + # Try delivering the email + try: + rejected_recipients = self.relay.send_message(msg) + + except smtplib.SMTPRecipientsRefused as e: + rejected_recipients = e.recipients + + except smtplib.SMTPException as e: + log.error("SMTP Exception: %s" % e) + error_messages.append("%s" % e) + + # Log all emails that could not be delivered + for recipient in rejected_recipients: + code, reason = rejected_recipients[recipient] - logging.info("Sending mail to %s: %s" % (recipient, subject)) + error_messages.append("Recipient refused: %s - %s (%s)" % \ + (recipient, code, reason.decode())) - # Run sendmail and the email in - p = subprocess.Popen(["/usr/lib/sendmail", "-t"], bufsize=0, close_fds=True, - stdin=subprocess.PIPE, stdout=subprocess.PIPE) + if error_messages: + self.db.execute("UPDATE messages SET error_message = %s \ + WHERE id = %s", "; ".join(error_messages), message.id) - stdout, stderr = p.communicate(msg.as_string()) + log.error("Could not send email: %s" % message.id) + for line in error_messages: + log.error(line) - # Wait until sendmail has finished. - p.wait() + # After the email has been successfully sent, we mark it as such + self.db.execute("UPDATE messages SET sent_at = CURRENT_TIMESTAMP \ + WHERE id = %s", message.id) - if p.returncode: - raise Exception("Could not send mail: %s" % stderr) + async def cleanup(self): + log.debug("Cleaning up message queue") - # Mark message as sent - self.db.execute("UPDATE messages SET sent_at = NOW() WHERE id = %s", message.id) + self.db.execute(""" + DELETE FROM + messages + WHERE + sent_at IS NOT NULL + AND + sent_at <= CURRENT_TIMESTAMP - INTERVAL '30 days' + """, + ) diff --git a/src/crontab/pakfire-build-service b/src/crontab/pakfire-build-service index 71654a5f..e747b72f 100644 --- a/src/crontab/pakfire-build-service +++ b/src/crontab/pakfire-build-service @@ -1,7 +1,7 @@ MAILTO=pakfire@ipfire.org # Send queued emails once a minute -#* * * * * _pakfire pakfire-build-service process-message-queue &>/dev/null +* * * * * _pakfire pakfire-build-service --logging=warning messages:queue:send # Synchronize mirrors once every five minutes */5 * * * * _pakfire pakfire-build-service --logging=warning sync diff --git a/src/database.sql b/src/database.sql index 1c8bf93e..3816b63b 100644 --- a/src/database.sql +++ b/src/database.sql @@ -522,7 +522,9 @@ CREATE TABLE public.messages ( id integer NOT NULL, message text NOT NULL, queued_at timestamp without time zone DEFAULT now() NOT NULL, - sent_at timestamp without time zone + sent_at timestamp without time zone, + priority integer DEFAULT 0 NOT NULL, + error_message text ); @@ -1385,14 +1387,6 @@ ALTER TABLE ONLY public.users_emails ADD CONSTRAINT idx_2198256_primary PRIMARY KEY (id); --- --- Name: messages idx_2198274_primary; Type: CONSTRAINT; Schema: public; Owner: - --- - -ALTER TABLE ONLY public.messages - ADD CONSTRAINT idx_2198274_primary PRIMARY KEY (id); - - -- -- Name: jobs_packages jobs_packages_unique; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -1417,6 +1411,14 @@ ALTER TABLE ONLY public.keys ADD CONSTRAINT keys_pkey PRIMARY KEY (id); +-- +-- Name: messages messages_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.messages + ADD CONSTRAINT messages_pkey PRIMARY KEY (id); + + -- -- Name: mirrors_checks mirrors_checks_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -1681,10 +1683,10 @@ CREATE UNIQUE INDEX keys_fingerprint ON public.keys USING btree (fingerprint) WH -- --- Name: messages_order; Type: INDEX; Schema: public; Owner: - +-- Name: messages_queued; Type: INDEX; Schema: public; Owner: - -- -CREATE INDEX messages_order ON public.messages USING btree (queued_at) WHERE (sent_at IS NULL); +CREATE INDEX messages_queued ON public.messages USING btree (priority DESC, queued_at) WHERE (sent_at IS NULL); -- diff --git a/src/scripts/pakfire-build-service b/src/scripts/pakfire-build-service index df921397..51127c1e 100644 --- a/src/scripts/pakfire-build-service +++ b/src/scripts/pakfire-build-service @@ -16,23 +16,26 @@ class Cli(object): self._commands = { # Bugzilla - "bugzilla:version" : self.backend.bugzilla.version, + "bugzilla:version" : self.backend.bugzilla.version, # Cleanup - "cleanup" : self.backend.cleanup, + "cleanup" : self.backend.cleanup, # Jobs - "jobs:depcheck" : self._jobs_depcheck, + "jobs:depcheck" : self._jobs_depcheck, # Keys - "keys:generate" : self.backend.keys.generate, + "keys:generate" : self.backend.keys.generate, + + # Messages + "messages:queue:send" : self.backend.messages.queue.send, # Repositories - "repos:rotate-keys": self.backend.repos.rotate_keys, - "repos:write" : self.backend.repos.write, + "repos:rotate-keys" : self.backend.repos.rotate_keys, + "repos:write" : self.backend.repos.write, # Sync - "sync" : self.backend.sync, + "sync" : self.backend.sync, # Run mirror check #"check-mirrors" : self.backend.mirrors.check, @@ -43,9 +46,6 @@ class Cli(object): # List repository #"list-repository" : self._list_repository, - # Sends all queued messages - #"process-message-queue" : self.backend.messages.process_queue, - # Pull sources #"pull-sources" : self.backend.sources.pull,