]>
git.ipfire.org Git - ipfire.org.git/blob - src/backend/wiki.py
12 from .decorators
import *
15 "google" : ("https://www.google.com/search?q=%(url)s", None, "fab fa-google"),
16 "rfc" : ("https://tools.ietf.org/html/rfc%(name)s", "RFC %s", None),
17 "wp" : ("https://en.wikipedia.org/wiki/%(name)s", None, "fab fa-wikipedia-w"),
20 class Wiki(misc
.Object
):
21 def _get_pages(self
, query
, *args
):
22 res
= self
.db
.query(query
, *args
)
25 yield Page(self
.backend
, row
.id, data
=row
)
27 def _get_page(self
, query
, *args
):
28 res
= self
.db
.get(query
, *args
)
31 return Page(self
.backend
, res
.id, data
=res
)
33 def make_path(self
, page
, path
):
34 # Nothing to do for absolute links
35 if path
.startswith("/"):
38 # Relative links (one-level down)
39 elif path
.startswith("./"):
40 path
= os
.path
.join(page
, path
)
42 # All other relative links
44 p
= os
.path
.dirname(page
)
45 path
= os
.path
.join(p
, path
)
48 return os
.path
.normpath(path
)
50 def get_page_title(self
, page
, default
=None):
51 # Try to retrieve title from cache
52 title
= self
.memcache
.get("wiki:title:%s" % page
)
56 # If the title has not been in the cache, we will
58 doc
= self
.get_page(page
)
62 title
= os
.path
.basename(page
)
64 # Save in cache for forever
65 self
.memcache
.set("wiki:title:%s" % page
, title
)
69 def get_page(self
, page
, revision
=None):
70 page
= Page
.sanitise_page_name(page
)
74 return self
._get
_page
("SELECT * FROM wiki WHERE page = %s \
75 AND timestamp = %s", page
, revision
)
77 return self
._get
_page
("SELECT * FROM wiki WHERE page = %s \
78 ORDER BY timestamp DESC LIMIT 1", page
)
80 def get_recent_changes(self
, account
, limit
=None):
81 pages
= self
._get
_pages
("SELECT * FROM wiki \
82 ORDER BY timestamp DESC")
85 if not page
.check_acl(account
):
94 def create_page(self
, page
, author
, content
, changes
=None, address
=None):
95 page
= Page
.sanitise_page_name(page
)
97 # Write page to the database
98 page
= self
._get
_page
("INSERT INTO wiki(page, author_uid, markdown, changes, address) \
99 VALUES(%s, %s, %s, %s, %s) RETURNING *", page
, author
.uid
, content
or None, changes
, address
)
102 self
.memcache
.set("wiki:title:%s" % page
.page
, page
.title
)
104 # Send email to all watchers
105 page
._send
_watcher
_emails
(excludes
=[author
])
109 def delete_page(self
, page
, author
, **kwargs
):
110 # Do nothing if the page does not exist
111 if not self
.get_page(page
):
114 # Just creates a blank last version of the page
115 self
.create_page(page
, author
=author
, content
=None, **kwargs
)
117 def make_breadcrumbs(self
, url
):
118 # Split and strip all empty elements (double slashes)
119 parts
= list(e
for e
in url
.split("/") if e
)
122 for part
in ("/".join(parts
[:i
]) for i
in range(1, len(parts
))):
123 ret
.append(("/%s" % part
, self
.get_page_title(part
, os
.path
.basename(part
))))
127 def search(self
, query
, account
=None, limit
=None):
128 query
= util
.parse_search_query(query
)
130 res
= self
._get
_pages
("SELECT wiki.* FROM wiki_search_index search_index \
131 LEFT JOIN wiki ON search_index.wiki_id = wiki.id \
132 WHERE search_index.document @@ to_tsquery('english', %s) \
133 ORDER BY ts_rank(search_index.document, to_tsquery('english', %s)) DESC",
138 # Skip any pages the user doesn't have permission for
139 if not page
.check_acl(account
):
142 # Return any other pages
145 # Break when we have found enough pages
146 if limit
and len(pages
) >= limit
:
153 Needs to be called after a page has been changed
155 self
.db
.execute("REFRESH MATERIALIZED VIEW wiki_search_index")
157 def get_watchlist(self
, account
):
158 pages
= self
._get
_pages
(
159 "WITH pages AS (SELECT * FROM wiki_current \
160 LEFT JOIN wiki ON wiki_current.id = wiki.id) \
161 SELECT * FROM wiki_watchlist watchlist \
162 LEFT JOIN pages ON watchlist.page = pages.page \
163 WHERE watchlist.uid = %s",
171 def check_acl(self
, page
, account
):
172 res
= self
.db
.query("SELECT * FROM wiki_acls \
173 WHERE %s ILIKE (path || '%%') ORDER BY LENGTH(path) DESC LIMIT 1", page
)
176 # Access not permitted when user is not logged in
180 # If user is in a matching group, we grant permission
181 for group
in row
.groups
:
182 if group
in account
.groups
:
185 # Otherwise access is not permitted
188 # If no ACLs are found, we permit access
193 def _get_files(self
, query
, *args
):
194 res
= self
.db
.query(query
, *args
)
197 yield File(self
.backend
, row
.id, data
=row
)
199 def _get_file(self
, query
, *args
):
200 res
= self
.db
.get(query
, *args
)
203 return File(self
.backend
, res
.id, data
=res
)
205 def get_files(self
, path
):
206 files
= self
._get
_files
("SELECT * FROM wiki_files \
207 WHERE path = %s AND deleted_at IS NULL ORDER BY filename", path
)
211 def get_file_by_path(self
, path
):
212 path
, filename
= os
.path
.dirname(path
), os
.path
.basename(path
)
214 return self
._get
_file
("SELECT * FROM wiki_files \
215 WHERE path = %s AND filename = %s AND deleted_at IS NULL", path
, filename
)
217 def upload(self
, path
, filename
, data
, mimetype
, author
, address
):
218 # Upload the blob first
219 blob
= self
.db
.get("INSERT INTO wiki_blobs(data) VALUES(%s) RETURNING id", data
)
221 # Create entry for file
222 return self
._get
_file
("INSERT INTO wiki_files(path, filename, author_uid, address, \
223 mimetype, blob_id, size) VALUES(%s, %s, %s, %s, %s, %s, %s) RETURNING *", path
,
224 filename
, author
.uid
, address
, mimetype
, blob
.id, len(data
))
226 def render(self
, path
, text
):
227 r
= WikiRenderer(self
.backend
, path
)
229 return r
.render(text
)
232 class Page(misc
.Object
):
233 def init(self
, id, data
=None):
238 return "<%s %s %s>" % (self
.__class
__.__name
__, self
.page
, self
.timestamp
)
240 def __eq__(self
, other
):
241 if isinstance(other
, self
.__class
__):
242 return self
.id == other
.id
244 def __lt__(self
, other
):
245 if isinstance(other
, self
.__class
__):
246 if self
.page
== other
.page
:
247 return self
.timestamp
< other
.timestamp
249 return self
.page
< other
.page
252 def sanitise_page_name(page
):
256 # Make sure that the page name does NOT end with a /
257 if page
.endswith("/"):
260 # Make sure the page name starts with a /
261 if not page
.startswith("/"):
264 # Remove any double slashes
265 page
= page
.replace("//", "/")
275 return "https://wiki.ipfire.org%s" % self
.url
279 return self
.data
.page
283 return self
._title
or os
.path
.basename(self
.page
[1:])
287 if not self
.markdown
:
290 # Find first H1 headline in markdown
291 markdown
= self
.markdown
.splitlines()
293 m
= re
.match(r
"^# (.*)( #)?$", markdown
[0])
299 if self
.data
.author_uid
:
300 return self
.backend
.accounts
.get_by_uid(self
.data
.author_uid
)
304 return self
.data
.markdown
or ""
308 return self
.backend
.wiki
.render(self
.page
, self
.markdown
)
312 return self
.data
.timestamp
314 def was_deleted(self
):
315 return self
.markdown
is None
318 def breadcrumbs(self
):
319 return self
.backend
.wiki
.make_breadcrumbs(self
.page
)
321 def get_latest_revision(self
):
322 revisions
= self
.get_revisions()
324 # Return first object
325 for rev
in revisions
:
328 def get_revisions(self
):
329 return self
.backend
.wiki
._get
_pages
("SELECT * FROM wiki \
330 WHERE page = %s ORDER BY timestamp DESC", self
.page
)
333 def previous_revision(self
):
334 return self
.backend
.wiki
._get
_page
("SELECT * FROM wiki \
335 WHERE page = %s AND timestamp < %s ORDER BY timestamp DESC \
336 LIMIT 1", self
.page
, self
.timestamp
)
340 return self
.data
.changes
344 def check_acl(self
, account
):
345 return self
.backend
.wiki
.check_acl(self
.page
, account
)
351 parts
= self
.page
.split("/")
354 sidebar
= self
.backend
.wiki
.get_page("%s/sidebar" % os
.path
.join(*parts
))
364 if self
.previous_revision
:
365 diff
= difflib
.unified_diff(
366 self
.previous_revision
.markdown
.splitlines(),
367 self
.markdown
.splitlines(),
370 return "\n".join(diff
)
374 res
= self
.db
.query("SELECT uid FROM wiki_watchlist \
375 WHERE page = %s", self
.page
)
378 # Search for account by UID and skip if none was found
379 account
= self
.backend
.accounts
.get_by_uid(row
.uid
)
386 def is_watched_by(self
, account
):
387 res
= self
.db
.get("SELECT 1 FROM wiki_watchlist \
388 WHERE page = %s AND uid = %s", self
.page
, account
.uid
)
395 def add_watcher(self
, account
):
396 if self
.is_watched_by(account
):
399 self
.db
.execute("INSERT INTO wiki_watchlist(page, uid) \
400 VALUES(%s, %s)", self
.page
, account
.uid
)
402 def remove_watcher(self
, account
):
403 self
.db
.execute("DELETE FROM wiki_watchlist \
404 WHERE page = %s AND uid = %s", self
.page
, account
.uid
)
406 def _send_watcher_emails(self
, excludes
=[]):
407 # Nothing to do if there was no previous revision
408 if not self
.previous_revision
:
411 for watcher
in self
.watchers
:
412 # Skip everyone who is excluded
413 if watcher
in excludes
:
414 logging
.debug("Excluding %s" % watcher
)
418 if not self
.backend
.wiki
.check_acl(self
.page
, watcher
):
419 logging
.debug("Watcher %s does not have permissions" % watcher
)
422 logging
.debug("Sending watcher email to %s" % watcher
)
425 self
.backend
.messages
.send_template("wiki/messages/page-changed",
426 recipients
=[watcher
], page
=self
, priority
=-10)
429 class File(misc
.Object
):
430 def init(self
, id, data
):
436 return os
.path
.join(self
.path
, self
.filename
)
440 return self
.data
.path
444 return self
.data
.filename
448 return self
.data
.mimetype
452 return self
.data
.size
456 if self
.data
.author_uid
:
457 return self
.backend
.accounts
.get_by_uid(self
.data
.author_uid
)
460 def created_at(self
):
461 return self
.data
.created_at
464 return self
.mimetype
in ("application/pdf", "application/x-pdf")
467 return self
.mimetype
.startswith("image/")
471 res
= self
.db
.get("SELECT data FROM wiki_blobs \
472 WHERE id = %s", self
.data
.blob_id
)
475 return bytes(res
.data
)
477 def get_thumbnail(self
, size
):
478 cache_key
= "-".join((self
.path
, util
.normalize(self
.filename
), self
.created_at
.isoformat(), "%spx" % size
))
480 # Try to fetch the data from the cache
481 thumbnail
= self
.memcache
.get(cache_key
)
485 # Generate the thumbnail
486 thumbnail
= util
.generate_thumbnail(self
.blob
, size
)
488 # Put it into the cache for forever
489 self
.memcache
.set(cache_key
, thumbnail
)
494 class WikiRenderer(misc
.Object
):
496 wiki_link
= re
.compile(r
"\[\[([\w\d\/\-\.]+)(?:\|(.+?))?\]\]")
499 external_link
= re
.compile(r
"\[\[((?:ftp|git|https?|rsync|sftp|ssh|webcal)\:\/\/.+?)(?:\|(.+?))?\]\]")
501 # Interwiki links e.g. [[wp>IPFire]]
502 interwiki_link
= re
.compile(r
"\[\[(\w+)>(.+?)(?:\|(.+?))?\]\]")
505 email_link
= re
.compile(r
"\[\[([a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)(?:\|(.+?))?\]\]")
508 images
= re
.compile(r
"<img src=\"(.*?
)\" alt
=\"(.*?
)\" (?
:title
=\"(.*?
)\" )?
/>")
510 def init(self, path):
513 def _render_wiki_link(self, m):
514 path, alias = m.groups()
516 path = self.backend.wiki.make_path(self.path, path)
518 return """<a href="%s">%s</a>""" % (
520 alias or self.backend.wiki.get_page_title(path),
523 def _render_external_link(self, m):
524 url, alias = m.groups()
526 return """<a class="link
-external
" href="%s">%s</a>""" % (url, alias or url)
528 def _render_interwiki_link(self, m):
535 url, repl, icon = INTERWIKIS[wiki]
537 logging.warning("Invalid interwiki
: %s" % wiki)
546 "url
" : urllib.parse.quote(name),
549 # Get alias (if present)
552 if not alias and repl:
555 # Put everything together
559 s.append("<span
class=\"%s\"></span
>" % icon)
561 s.append("""<a class="link
-external
" href="%s">%s</a>""" % (url, alias or name))
565 def _render_email_link(self, m):
566 address, alias = m.groups()
568 return """<a class="link
-external
" href="mailto
:%s">%s</a>""" \
569 % (address, alias or address)
571 def _render_image(self, m):
572 url, alt_text, caption = m.groups()
574 # Skip any absolute and external URLs
575 if url.startswith("/") or url.startswith("https
://") or url.startswith("http
://"):
576 return """<figure class="figure
"><img src="%s" class="figure
-img img
-fluid rounded
" alt="%s">
577 <figcaption class="figure
-caption
">%s</figcaption></figure>
578 """ % (url, alt_text, caption or "")
580 # Try to split query string
581 url, delimiter, qs = url.partition("?
")
583 # Parse query arguments
584 args = urllib.parse.parse_qs(qs)
586 # Build absolute path
587 url = self.backend.wiki.make_path(self.path, url)
590 file = self.backend.wiki.get_file_by_path(url)
591 if not file or not file.is_image():
592 return "<!-- Could
not find image
%s in %s -->" % (url, self.path)
594 # Scale down the image if not already done
598 return """<figure class="figure
"><img src="%s?
%s" class="figure
-img img
-fluid rounded
" alt="%s">
599 <figcaption class="figure
-caption
">%s</figcaption></figure>
600 """ % (url, urllib.parse.urlencode(args), caption, caption or "")
602 def render(self, text):
603 logging.debug("Rendering
%s" % self.path)
606 text = self.wiki_link.sub(self._render_wiki_link, text)
608 # Handle interwiki links
609 text = self.interwiki_link.sub(self._render_interwiki_link, text)
611 # Handle external links
612 text = self.external_link.sub(self._render_external_link, text)
615 text = self.email_link.sub(self._render_email_link, text)
617 # Borrow this from the blog
618 text = self.backend.blog._render_text(text, lang="markdown
")
620 # Postprocess images to <figure>
621 text = self.images.sub(self._render_image, text)