#!/usr/bin/python3
import difflib
import logging
import os.path
import re
import tornado.gen
import urllib.parse
from . import misc
from . import util
from .decorators import *
INTERWIKIS = {
"google" : ("https://www.google.com/search?q=%(url)s", None, "fab fa-google"),
"rfc" : ("https://tools.ietf.org/html/rfc%(name)s", "RFC %s", None),
"wp" : ("https://en.wikipedia.org/wiki/%(name)s", None, "fab fa-wikipedia-w"),
}
class Wiki(misc.Object):
def _get_pages(self, query, *args):
res = self.db.query(query, *args)
for row in res:
yield Page(self.backend, row.id, data=row)
def _get_page(self, query, *args):
res = self.db.get(query, *args)
if res:
return Page(self.backend, res.id, data=res)
def get_page_title(self, page, default=None):
# Try to retrieve title from cache
title = self.memcache.get("wiki:title:%s" % page)
if title:
return title
# If the title has not been in the cache, we will
# have to look it up
doc = self.get_page(page)
if doc:
title = doc.title
else:
title = os.path.basename(page)
# Save in cache for forever
self.memcache.set("wiki:title:%s" % page, title)
return title
def get_page(self, page, revision=None):
page = Page.sanitise_page_name(page)
assert page
if revision:
return self._get_page("SELECT * FROM wiki WHERE page = %s \
AND timestamp = %s", page, revision)
else:
return self._get_page("SELECT * FROM wiki WHERE page = %s \
ORDER BY timestamp DESC LIMIT 1", page)
def get_recent_changes(self, account, limit=None):
pages = self._get_pages("SELECT * FROM wiki \
WHERE timestamp >= NOW() - INTERVAL '4 weeks' \
ORDER BY timestamp DESC")
for page in pages:
if not page.check_acl(account):
continue
yield page
limit -= 1
if not limit:
break
def create_page(self, page, author, content, changes=None, address=None):
page = Page.sanitise_page_name(page)
# Write page to the database
page = self._get_page("INSERT INTO wiki(page, author_uid, markdown, changes, address) \
VALUES(%s, %s, %s, %s, %s) RETURNING *", page, author.uid, content or None, changes, address)
# Update cache
self.memcache.set("wiki:title:%s" % page.page, page.title)
# Send email to all watchers
page._send_watcher_emails(excludes=[author])
return page
def delete_page(self, page, author, **kwargs):
# Do nothing if the page does not exist
if not self.get_page(page):
return
# Just creates a blank last version of the page
self.create_page(page, author=author, content=None, **kwargs)
def make_breadcrumbs(self, url):
# Split and strip all empty elements (double slashes)
parts = list(e for e in url.split("/") if e)
ret = []
for part in ("/".join(parts[:i]) for i in range(1, len(parts))):
ret.append(("/%s" % part, self.get_page_title(part, os.path.basename(part))))
return ret
def search(self, query, account=None, limit=None):
query = util.parse_search_query(query)
res = self._get_pages("SELECT wiki.* FROM wiki_search_index search_index \
LEFT JOIN wiki ON search_index.wiki_id = wiki.id \
WHERE search_index.document @@ to_tsquery('english', %s) \
ORDER BY ts_rank(search_index.document, to_tsquery('english', %s)) DESC",
query, query)
pages = []
for page in res:
# Skip any pages the user doesn't have permission for
if not page.check_acl(account):
continue
# Return any other pages
pages.append(page)
# Break when we have found enough pages
if limit and len(pages) >= limit:
break
return pages
def refresh(self):
"""
Needs to be called after a page has been changed
"""
self.db.execute("REFRESH MATERIALIZED VIEW wiki_search_index")
# ACL
def check_acl(self, page, account):
res = self.db.query("SELECT * FROM wiki_acls \
WHERE %s ILIKE (path || '%%') ORDER BY LENGTH(path) DESC LIMIT 1", page)
for row in res:
# Access not permitted when user is not logged in
if not account:
return False
# If user is in a matching group, we grant permission
for group in row.groups:
if group in account.groups:
return True
# Otherwise access is not permitted
return False
# If no ACLs are found, we permit access
return True
# Files
def _get_files(self, query, *args):
res = self.db.query(query, *args)
for row in res:
yield File(self.backend, row.id, data=row)
def _get_file(self, query, *args):
res = self.db.get(query, *args)
if res:
return File(self.backend, res.id, data=res)
def get_files(self, path):
files = self._get_files("SELECT * FROM wiki_files \
WHERE path = %s AND deleted_at IS NULL ORDER BY filename", path)
return list(files)
def get_file_by_path(self, path):
path, filename = os.path.dirname(path), os.path.basename(path)
return self._get_file("SELECT * FROM wiki_files \
WHERE path = %s AND filename = %s AND deleted_at IS NULL", path, filename)
def upload(self, path, filename, data, mimetype, author, address):
# Upload the blob first
blob = self.db.get("INSERT INTO wiki_blobs(data) VALUES(%s) RETURNING id", data)
# Create entry for file
return self._get_file("INSERT INTO wiki_files(path, filename, author_uid, address, \
mimetype, blob_id, size) VALUES(%s, %s, %s, %s, %s, %s, %s) RETURNING *", path,
filename, author.uid, address, mimetype, blob.id, len(data))
def find_image(self, path, filename):
for p in (path, os.path.dirname(path)):
file = self.get_file_by_path(os.path.join(p, filename))
if file and file.is_image():
return file
class Page(misc.Object):
# Interwiki links e.g. [[wp>IPFire]]
interwiki_link = re.compile(r"\[\[(\w+)>(.+?)(?:\|(.+?))?\]\]")
def init(self, id, data=None):
self.id = id
self.data = data
def __repr__(self):
return "<%s %s %s>" % (self.__class__.__name__, self.page, self.timestamp)
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.id == other.id
def __lt__(self, other):
if isinstance(other, self.__class__):
if self.page == other.page:
return self.timestamp < other.timestamp
return self.page < other.page
@staticmethod
def sanitise_page_name(page):
if not page:
return "/"
# Make sure that the page name does NOT end with a /
if page.endswith("/"):
page = page[:-1]
# Make sure the page name starts with a /
if not page.startswith("/"):
page = "/%s" % page
# Remove any double slashes
page = page.replace("//", "/")
return page
@property
def url(self):
return self.page
@property
def full_url(self):
return "https://wiki.ipfire.org%s" % self.url
@property
def page(self):
return self.data.page
@property
def title(self):
return self._title or os.path.basename(self.page[1:])
@property
def _title(self):
if not self.markdown:
return
# Find first H1 headline in markdown
markdown = self.markdown.splitlines()
m = re.match(r"^# (.*)( #)?$", markdown[0])
if m:
return m.group(1)
@lazy_property
def author(self):
if self.data.author_uid:
return self.backend.accounts.get_by_uid(self.data.author_uid)
def _render_interwiki_link(self, m):
wiki = m.group(1)
if not wiki:
return
# Retrieve URL
try:
url, repl, icon = INTERWIKIS[wiki]
except KeyError:
logging.warning("Invalid interwiki: %s" % wiki)
return
# Name of the page
name = m.group(2)
# Expand URL
url = url % {
"name" : name,
"url" : urllib.parse.quote(name),
}
# Get alias (if present)
alias = m.group(3)
if not alias and repl:
alias = repl % name
# Put everything together
s = []
if icon:
s.append("" % icon)
s.append("""%s""" % (url, alias or name))
return " ".join(s)
def _render(self, text):
logging.debug("Rendering %s" % self)
# Link images
replacements = []
for match in re.finditer(r"!\[(.*?)\]\((.*?)\)", text):
alt_text, url = match.groups()
# Skip any absolute and external URLs
if url.startswith("/") or url.startswith("https://") or url.startswith("http://"):
continue
# Try to split query string
url, delimiter, qs = url.partition("?")
# Parse query arguments
args = urllib.parse.parse_qs(qs)
# Find image
file = self.backend.wiki.find_image(self.page, url)
if not file:
continue
# Scale down the image if not already done
if not "s" in args:
args["s"] = "768"
# Format URL
url = "%s?%s" % (file.url, urllib.parse.urlencode(args))
replacements.append((match.span(), file, alt_text, url))
# Apply all replacements
for (start, end), file, alt_text, url in reversed(replacements):
text = text[:start] + "[![%s](%s)](%s?action=detail)" % (alt_text, url, file.url) + text[end:]
# Handle interwiki links
text = self.interwiki_link.sub(self._render_interwiki_link, text)
# Add wiki links
patterns = (
(r"\[\[([\w\d\/\-\.]+)(?:\|(.+?))\]\]", r"\1", r"\2", None, True),
(r"\[\[([\w\d\/\-\.]+)\]\]", r"\1", r"\1", self.backend.wiki.get_page_title, True),
# External links
(r"\[\[((?:ftp|git|https?|rsync|sftp|ssh|webcal)\:\/\/.+?)(?:\|(.+?))\]\]",
r"\1", r"\2", None, False),
# Mail
(r"\[\[([a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)\]\]",
r"\1", r"\1", None, False),
)
for pattern, link, title, repl, internal in patterns:
replacements = []
for match in re.finditer(pattern, text):
l = match.expand(link)
t = match.expand(title)
if internal:
# Allow relative links
if not l.startswith("/"):
l = os.path.join(self.page, l)
# Normalise links
l = os.path.normpath(l)
if callable(repl):
t = repl(l) or t
replacements.append((match.span(), t or l, l))
# Apply all replacements
for (start, end), t, l in reversed(replacements):
text = text[:start] + "[%s](%s)" % (t, l) + text[end:]
# Borrow this from the blog
return self.backend.blog._render_text(text, lang="markdown")
@property
def markdown(self):
return self.data.markdown or ""
@property
def html(self):
return self._render(self.markdown)
@property
def timestamp(self):
return self.data.timestamp
def was_deleted(self):
return self.markdown is None
@lazy_property
def breadcrumbs(self):
return self.backend.wiki.make_breadcrumbs(self.page)
def get_latest_revision(self):
revisions = self.get_revisions()
# Return first object
for rev in revisions:
return rev
def get_revisions(self):
return self.backend.wiki._get_pages("SELECT * FROM wiki \
WHERE page = %s ORDER BY timestamp DESC", self.page)
@lazy_property
def previous_revision(self):
return self.backend.wiki._get_page("SELECT * FROM wiki \
WHERE page = %s AND timestamp < %s ORDER BY timestamp DESC \
LIMIT 1", self.page, self.timestamp)
@property
def changes(self):
return self.data.changes
# ACL
def check_acl(self, account):
return self.backend.wiki.check_acl(self.page, account)
# Sidebar
@lazy_property
def sidebar(self):
parts = self.page.split("/")
while parts:
sidebar = self.backend.wiki.get_page("%s/sidebar" % os.path.join(*parts))
if sidebar:
return sidebar
parts.pop()
# Watchers
@lazy_property
def diff(self):
if self.previous_revision:
diff = difflib.unified_diff(
self.previous_revision.markdown.splitlines(),
self.markdown.splitlines(),
)
return "\n".join(diff)
@property
def watchers(self):
res = self.db.query("SELECT uid FROM wiki_watchlist \
WHERE page = %s", self.page)
for row in res:
# Search for account by UID and skip if none was found
account = self.backend.accounts.get_by_uid(row.uid)
if not account:
continue
# Return the account
yield account
def is_watched_by(self, account):
res = self.db.get("SELECT 1 FROM wiki_watchlist \
WHERE page = %s AND uid = %s", self.page, account.uid)
if res:
return True
return False
def add_watcher(self, account):
if self.is_watched_by(account):
return
self.db.execute("INSERT INTO wiki_watchlist(page, uid) \
VALUES(%s, %s)", self.page, account.uid)
def remove_watcher(self, account):
self.db.execute("DELETE FROM wiki_watchlist \
WHERE page = %s AND uid = %s", self.page, account.uid)
def _send_watcher_emails(self, excludes=[]):
# Nothing to do if there was no previous revision
if not self.previous_revision:
return
for watcher in self.watchers:
# Skip everyone who is excluded
if watcher in excludes:
logging.debug("Excluding %s" % watcher)
continue
logging.debug("Sending watcher email to %s" % watcher)
# Compose message
self.backend.messages.send_template("wiki/messages/page-changed",
recipients=[watcher], page=self, priority=-10)
class File(misc.Object):
def init(self, id, data):
self.id = id
self.data = data
@property
def url(self):
return os.path.join(self.path, self.filename)
@property
def path(self):
return self.data.path
@property
def filename(self):
return self.data.filename
@property
def mimetype(self):
return self.data.mimetype
@property
def size(self):
return self.data.size
@lazy_property
def author(self):
if self.data.author_uid:
return self.backend.accounts.get_by_uid(self.data.author_uid)
@property
def created_at(self):
return self.data.created_at
def is_pdf(self):
return self.mimetype in ("application/pdf", "application/x-pdf")
def is_image(self):
return self.mimetype.startswith("image/")
@lazy_property
def blob(self):
res = self.db.get("SELECT data FROM wiki_blobs \
WHERE id = %s", self.data.blob_id)
if res:
return bytes(res.data)
def get_thumbnail(self, size):
cache_key = "-".join((self.path, util.normalize(self.filename), self.created_at.isoformat(), "%spx" % size))
# Try to fetch the data from the cache
thumbnail = self.memcache.get(cache_key)
if thumbnail:
return thumbnail
# Generate the thumbnail
thumbnail = util.generate_thumbnail(self.blob, size)
# Put it into the cache for forever
self.memcache.set(cache_key, thumbnail)
return thumbnail