import email.policy
import logging
import markdown
-import subprocess
+import smtplib
+import socket
import tornado.locale
import tornado.template
from . import users
from .constants import TEMPLATESDIR
+from .decorators import *
# Setup logging
log = logging.getLogger("pakfire.buildservice.messages")
def init(self):
self.templates = tornado.template.Loader(TEMPLATESDIR)
- def __iter__(self):
- messages = self.db.query("SELECT * FROM messages \
- WHERE sent_at IS NULL ORDER BY queued_at")
-
- return iter(messages)
-
- def __len__(self):
- res = self.db.get("SELECT COUNT(*) AS count FROM messages \
- WHERE sent_at IS NULL")
-
- return res.count
-
- def process_queue(self):
+ @lazy_property
+ def queue(self):
"""
- Sends all emails in the queue
+ The message queue
"""
- for message in self:
- with self.db.transaction():
- self.__sendmail(message)
-
- # Delete all old emails
- with self.db.transaction():
- self.cleanup()
-
- def cleanup(self):
- self.db.execute("DELETE FROM messages WHERE sent_at <= NOW() - INTERVAL '24 hours'")
+ return Queue(self.backend)
- def send_to(self, recipient, message, sender=None, headers={}):
+ def send_to(self, recipient, message, sender=None, headers={}, priority=None):
# Parse the message
if not isinstance(message, email.message.Message):
message = email.message_from_string(message)
message.add_header(k, v)
# Queue the message
- self.queue(message.as_string())
+ self.queue.enqueue(message.as_string(), priority=priority)
def send_template(self, recipient, name, sender=None, headers={}, **kwargs):
# Get user (if we have one)
for recipient in recipients:
self.send_template(recipient, *args, **kwargs)
- def queue(self, message):
- res = self.db.get("INSERT INTO messages(message) VALUES(%s) RETURNING id", message)
- logging.info("Message queued as %s", res.id)
+class Queue(base.Object):
+ """
+ Queues and delivers any emails
+ """
+ @property
+ def messages(self):
+ return self.db.query("""
+ SELECT
+ *
+ FROM
+ messages
+ WHERE
+ sent_at IS NULL
+ ORDER BY
+ priority DESC,
+ queued_at ASC
+ """,
+ )
+
+ def enqueue(self, message, priority=None):
+ """
+ Enqueues a new message
+ """
+ res = self.db.get("""
+ INSERT INTO
+ messages(
+ message,
+ priority
+ )
+ VALUES(
+ %s, %s
+ )
+ RETURNING
+ id
+ """, message.as_string(policy=policy), priority or 0)
+
+ log.debug("Message queued with ID %s" % res.id)
+
+ @lazy_property
+ def relay(self):
+ """
+ Connection to the local mail relay
+ """
+ hostname = socket.getfqdn()
+
+ # Open SMTP connection
+ conn = smtplib.SMTP(hostname)
+
+ # Start TLS connection
+ conn.starttls()
- def __sendmail(self, message):
- # Convert message from string
+ return conn
+
+ async def send(self):
+ """
+ Sends all pending messages from the queue
+ """
+ for message in self.messages:
+ with self.db.transaction():
+ self._send(message)
+
+ def _send(self, message):
+ """
+ Delivers the given message to the local mail relay
+ """
+ # Parse the message from what is in the database
msg = email.message_from_string(message.message)
- # Get some headers
- recipient = msg.get("To")
- subject = msg.get("Subject")
+ log.debug("Sending a message %s to: %s" % (
+ msg.get("Subject"), msg.get("To"),
+ ))
+
+ error_messages = []
+ rejected_recipients = {}
+
+ # Try delivering the email
+ try:
+ rejected_recipients = self.relay.send_message(msg)
+
+ except smtplib.SMTPRecipientsRefused as e:
+ rejected_recipients = e.recipients
+
+ except smtplib.SMTPException as e:
+ log.error("SMTP Exception: %s" % e)
+ error_messages.append("%s" % e)
+
+ # Log all emails that could not be delivered
+ for recipient in rejected_recipients:
+ code, reason = rejected_recipients[recipient]
- logging.info("Sending mail to %s: %s" % (recipient, subject))
+ error_messages.append("Recipient refused: %s - %s (%s)" % \
+ (recipient, code, reason.decode()))
- # Run sendmail and the email in
- p = subprocess.Popen(["/usr/lib/sendmail", "-t"], bufsize=0, close_fds=True,
- stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+ if error_messages:
+ self.db.execute("UPDATE messages SET error_message = %s \
+ WHERE id = %s", "; ".join(error_messages), message.id)
- stdout, stderr = p.communicate(msg.as_string())
+ log.error("Could not send email: %s" % message.id)
+ for line in error_messages:
+ log.error(line)
- # Wait until sendmail has finished.
- p.wait()
+ # After the email has been successfully sent, we mark it as such
+ self.db.execute("UPDATE messages SET sent_at = CURRENT_TIMESTAMP \
+ WHERE id = %s", message.id)
- if p.returncode:
- raise Exception("Could not send mail: %s" % stderr)
+ async def cleanup(self):
+ log.debug("Cleaning up message queue")
- # Mark message as sent
- self.db.execute("UPDATE messages SET sent_at = NOW() WHERE id = %s", message.id)
+ self.db.execute("""
+ DELETE FROM
+ messages
+ WHERE
+ sent_at IS NOT NULL
+ AND
+ sent_at <= CURRENT_TIMESTAMP - INTERVAL '30 days'
+ """,
+ )
id integer NOT NULL,
message text NOT NULL,
queued_at timestamp without time zone DEFAULT now() NOT NULL,
- sent_at timestamp without time zone
+ sent_at timestamp without time zone,
+ priority integer DEFAULT 0 NOT NULL,
+ error_message text
);
ADD CONSTRAINT idx_2198256_primary PRIMARY KEY (id);
---
--- Name: messages idx_2198274_primary; Type: CONSTRAINT; Schema: public; Owner: -
---
-
-ALTER TABLE ONLY public.messages
- ADD CONSTRAINT idx_2198274_primary PRIMARY KEY (id);
-
-
--
-- Name: jobs_packages jobs_packages_unique; Type: CONSTRAINT; Schema: public; Owner: -
--
ADD CONSTRAINT keys_pkey PRIMARY KEY (id);
+--
+-- Name: messages messages_pkey; Type: CONSTRAINT; Schema: public; Owner: -
+--
+
+ALTER TABLE ONLY public.messages
+ ADD CONSTRAINT messages_pkey PRIMARY KEY (id);
+
+
--
-- Name: mirrors_checks mirrors_checks_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--
--
--- Name: messages_order; Type: INDEX; Schema: public; Owner: -
+-- Name: messages_queued; Type: INDEX; Schema: public; Owner: -
--
-CREATE INDEX messages_order ON public.messages USING btree (queued_at) WHERE (sent_at IS NULL);
+CREATE INDEX messages_queued ON public.messages USING btree (priority DESC, queued_at) WHERE (sent_at IS NULL);
--
self._commands = {
# Bugzilla
- "bugzilla:version" : self.backend.bugzilla.version,
+ "bugzilla:version" : self.backend.bugzilla.version,
# Cleanup
- "cleanup" : self.backend.cleanup,
+ "cleanup" : self.backend.cleanup,
# Jobs
- "jobs:depcheck" : self._jobs_depcheck,
+ "jobs:depcheck" : self._jobs_depcheck,
# Keys
- "keys:generate" : self.backend.keys.generate,
+ "keys:generate" : self.backend.keys.generate,
+
+ # Messages
+ "messages:queue:send" : self.backend.messages.queue.send,
# Repositories
- "repos:rotate-keys": self.backend.repos.rotate_keys,
- "repos:write" : self.backend.repos.write,
+ "repos:rotate-keys" : self.backend.repos.rotate_keys,
+ "repos:write" : self.backend.repos.write,
# Sync
- "sync" : self.backend.sync,
+ "sync" : self.backend.sync,
# Run mirror check
#"check-mirrors" : self.backend.mirrors.check,
# List repository
#"list-repository" : self._list_repository,
- # Sends all queued messages
- #"process-message-queue" : self.backend.messages.process_queue,
-
# Pull sources
#"pull-sources" : self.backend.sources.pull,