import datetime
import feedparser
-import markdown2
+import html2text
+import markdown
import re
import textile
import unicodedata
from . import misc
+from . import wiki
from .decorators import *
-# Used to automatically link some things
-link_patterns = (
- # Find bug reports
- (re.compile(r"(?:#(\d+))", re.I), r"https://bugzilla.ipfire.org/show_bug.cgi?id=\1"),
-
- # Email Addresses
- (re.compile(r"([a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)"), r"mailto:\1"),
-
- # CVE Numbers
- (re.compile(r"(?:CVE)[\s\-](\d{4}\-\d+)"), r"https://cve.mitre.org/cgi-bin/cvename.cgi?name=\1"),
-)
-
class Blog(misc.Object):
def _get_post(self, query, *args):
res = self.db.get(query, *args)
return self._get_post("SELECT * FROM blog \
WHERE id = %s", id)
- def get_by_slug(self, slug, published=True):
- if published:
- return self._get_post("SELECT * FROM blog \
- WHERE slug = %s AND published_at <= NOW()", slug)
-
+ def get_by_slug(self, slug):
return self._get_post("SELECT * FROM blog \
WHERE slug = %s", slug)
def get_newest(self, limit=None):
- return self._get_posts("SELECT * FROM blog \
+ posts = self._get_posts("SELECT * FROM blog \
WHERE published_at IS NOT NULL \
AND published_at <= NOW() \
ORDER BY published_at DESC LIMIT %s", limit)
+ return list(posts)
+
def get_by_tag(self, tag, limit=None):
return self._get_posts("SELECT * FROM blog \
WHERE published_at IS NOT NULL \
AND %s = ANY(tags) \
ORDER BY published_at DESC LIMIT %s", tag, limit)
- def get_by_author(self, author, limit=None):
- return self._get_posts("SELECT * FROM blog \
- WHERE (author = %s OR author_uid = %s) \
- AND published_at IS NOT NULL \
- AND published_at <= NOW() \
- ORDER BY published_at DESC LIMIT %s",
- author.name, author.uid, limit)
-
def get_by_year(self, year):
return self._get_posts("SELECT * FROM blog \
WHERE EXTRACT(year FROM published_at) = %s \
AND published_at <= NOW() \
ORDER BY published_at DESC", year)
- def get_drafts(self, author=None, limit=None):
- if author:
- return self._get_posts("SELECT * FROM blog \
- WHERE author_uid = %s \
- AND (published_at IS NULL OR published_at > NOW()) \
- ORDER BY COALESCE(updated_at, created_at) DESC LIMIT %s",
- author.uid, limit)
-
+ def get_drafts(self, author, limit=None):
return self._get_posts("SELECT * FROM blog \
- WHERE (published_at IS NULL OR published_at > NOW()) \
- ORDER BY COALESCE(updated_at, created_at) DESC LIMIT %s", limit)
+ WHERE author_uid = %s \
+ AND (published_at IS NULL OR published_at > NOW()) \
+ ORDER BY COALESCE(updated_at, created_at) DESC LIMIT %s",
+ author.uid, limit)
def search(self, query, limit=None):
- query = self._parse_search_query(query)
-
- return self._get_posts("SELECT blog.* FROM blog \
+ posts = self._get_posts("SELECT blog.* FROM blog \
LEFT JOIN blog_search_index search_index ON blog.id = search_index.post_id \
- WHERE search_index.document @@ to_tsquery('english', %s) \
- ORDER BY ts_rank(search_index.document, to_tsquery('english', %s)) DESC \
+ WHERE search_index.document @@ websearch_to_tsquery('english', %s) \
+ ORDER BY ts_rank(search_index.document, websearch_to_tsquery('english', %s)) DESC \
LIMIT %s", query, query, limit)
- def _parse_search_query(self, query):
- q = []
- for word in query.split():
- # Is this lexeme negated?
- negated = word.startswith("!")
+ return list(posts)
- # Remove any special characters
- word = re.sub(r"\W+", "", word, flags=re.UNICODE)
- if not word:
- continue
+ def has_had_recent_activity(self, **kwargs):
+ t = datetime.timedelta(**kwargs)
- # Restore negation
- if negated:
- word = "!%s" % word
+ res = self.db.get("SELECT COUNT(*) AS count FROM blog \
+ WHERE published_at IS NOT NULL AND published_at BETWEEN NOW() - %s AND NOW()", t)
- q.append(word)
+ if res and res.count > 0:
+ return True
- return " & ".join(q)
+ return False
def create_post(self, title, text, author, tags=[], lang="markdown"):
"""
def _render_text(self, text, lang="markdown"):
if lang == "markdown":
- return markdown2.markdown(text, link_patterns=link_patterns,
- extras=["footnotes", "link-patterns", "wiki-tables"])
+ return markdown.markdown(text,
+ extensions=[
+ wiki.PrettyLinksExtension(),
+ "codehilite",
+ "fenced_code",
+ "footnotes",
+ "nl2br",
+ "sane_lists",
+ "tables",
+ "toc",
+ ])
elif lang == "textile":
return textile.textile(text)
- return text
+ else:
+ return text
def refresh(self):
"""
for row in res:
yield row.year
- def update_feeds(self):
+ async def announce(self):
+ posts = self._get_posts("SELECT * FROM blog \
+ WHERE (published_at IS NOT NULL AND published_at <= NOW()) \
+ AND announced_at IS NULL")
+
+ for post in posts:
+ await post.announce()
+
+ async def update_feeds(self):
"""
Updates all enabled feeds
"""
"""
return self.data.html or self.backend.blog._render_text(self.text, lang=self.lang)
+ @lazy_property
+ def plaintext(self):
+ h = html2text.HTML2Text()
+ h.ignore_links = True
+
+ return h.handle(self.html)
+
+ # Excerpt
+
+ @property
+ def excerpt(self):
+ paragraphs = self.plaintext.split("\n\n")
+
+ excerpt = []
+
+ for paragraph in paragraphs:
+ excerpt.append(paragraph)
+
+ # Add another paragraph if we encountered a headline
+ if paragraph.startswith("#"):
+ continue
+
+ # End if this paragraph was long enough
+ if len(paragraph) >= 40:
+ break
+
+ return "\n\n".join(excerpt)
+
# Tags
@property
return self.backend.releases._get_release("SELECT * FROM releases \
WHERE published IS NOT NULL AND published <= NOW() AND blog_id = %s", self.id)
- def is_editable(self, editor):
+ def is_editable(self, user):
+ # Anonymous users cannot do anything
+ if not user:
+ return False
+
+ # Admins can edit anything
+ if user.is_admin():
+ return True
+
+ # User must have permission for the blog
+ if not user.is_blog_author():
+ return False
+
# Authors can edit their own posts
- return self.author == editor
+ return self.author == user
def update(self, title, text, tags=[]):
"""
# Update search indices
self.backend.blog.refresh()
+
+ async def announce(self):
+ # Get people who should receive this message
+ group = self.backend.groups.get_by_gid("promotional-consent")
+ if not group:
+ return
+
+ with self.db.transaction():
+ # Generate an email for everybody in this group
+ for account in group:
+ self.backend.messages.send_template("blog/messages/announcement",
+ account=account, post=self)
+
+ # Mark this post as announced
+ self.db.execute("UPDATE blog SET announced_at = CURRENT_TIMESTAMP \
+ WHERE id = %s", self.id)