]>
git.ipfire.org Git - ipfire.org.git/blob - src/backend/wiki.py
bc779fdc907655ea227fa635540b3141313f23fe
12 from .decorators
import *
15 "google" : ("https://www.google.com/search?q=%(url)s", None, "fab fa-google"),
16 "rfc" : ("https://tools.ietf.org/html/rfc%(name)s", "RFC %s", None),
17 "wp" : ("https://en.wikipedia.org/wiki/%(name)s", None, "fab fa-wikipedia-w"),
20 class Wiki(misc
.Object
):
21 def _get_pages(self
, query
, *args
):
22 res
= self
.db
.query(query
, *args
)
25 yield Page(self
.backend
, row
.id, data
=row
)
27 def _get_page(self
, query
, *args
):
28 res
= self
.db
.get(query
, *args
)
31 return Page(self
.backend
, res
.id, data
=res
)
33 def get_page_title(self
, page
, default
=None):
34 # Try to retrieve title from cache
35 title
= self
.memcache
.get("wiki:title:%s" % page
)
39 # If the title has not been in the cache, we will
41 doc
= self
.get_page(page
)
45 title
= os
.path
.basename(page
)
47 # Save in cache for forever
48 self
.memcache
.set("wiki:title:%s" % page
, title
)
52 def get_page(self
, page
, revision
=None):
53 page
= Page
.sanitise_page_name(page
)
57 return self
._get
_page
("SELECT * FROM wiki WHERE page = %s \
58 AND timestamp = %s", page
, revision
)
60 return self
._get
_page
("SELECT * FROM wiki WHERE page = %s \
61 ORDER BY timestamp DESC LIMIT 1", page
)
63 def get_recent_changes(self
, account
, limit
=None):
64 pages
= self
._get
_pages
("SELECT * FROM wiki \
65 ORDER BY timestamp DESC")
68 if not page
.check_acl(account
):
77 def create_page(self
, page
, author
, content
, changes
=None, address
=None):
78 page
= Page
.sanitise_page_name(page
)
80 # Write page to the database
81 page
= self
._get
_page
("INSERT INTO wiki(page, author_uid, markdown, changes, address) \
82 VALUES(%s, %s, %s, %s, %s) RETURNING *", page
, author
.uid
, content
or None, changes
, address
)
85 self
.memcache
.set("wiki:title:%s" % page
.page
, page
.title
)
87 # Send email to all watchers
88 page
._send
_watcher
_emails
(excludes
=[author
])
92 def delete_page(self
, page
, author
, **kwargs
):
93 # Do nothing if the page does not exist
94 if not self
.get_page(page
):
97 # Just creates a blank last version of the page
98 self
.create_page(page
, author
=author
, content
=None, **kwargs
)
100 def make_breadcrumbs(self
, url
):
101 # Split and strip all empty elements (double slashes)
102 parts
= list(e
for e
in url
.split("/") if e
)
105 for part
in ("/".join(parts
[:i
]) for i
in range(1, len(parts
))):
106 ret
.append(("/%s" % part
, self
.get_page_title(part
, os
.path
.basename(part
))))
110 def search(self
, query
, account
=None, limit
=None):
111 query
= util
.parse_search_query(query
)
113 res
= self
._get
_pages
("SELECT wiki.* FROM wiki_search_index search_index \
114 LEFT JOIN wiki ON search_index.wiki_id = wiki.id \
115 WHERE search_index.document @@ to_tsquery('english', %s) \
116 ORDER BY ts_rank(search_index.document, to_tsquery('english', %s)) DESC",
121 # Skip any pages the user doesn't have permission for
122 if not page
.check_acl(account
):
125 # Return any other pages
128 # Break when we have found enough pages
129 if limit
and len(pages
) >= limit
:
136 Needs to be called after a page has been changed
138 self
.db
.execute("REFRESH MATERIALIZED VIEW wiki_search_index")
140 def get_watchlist(self
, account
):
141 pages
= self
._get
_pages
(
142 "WITH pages AS (SELECT * FROM wiki_current \
143 LEFT JOIN wiki ON wiki_current.id = wiki.id) \
144 SELECT * FROM wiki_watchlist watchlist \
145 LEFT JOIN pages ON watchlist.page = pages.page \
146 WHERE watchlist.uid = %s",
154 def check_acl(self
, page
, account
):
155 res
= self
.db
.query("SELECT * FROM wiki_acls \
156 WHERE %s ILIKE (path || '%%') ORDER BY LENGTH(path) DESC LIMIT 1", page
)
159 # Access not permitted when user is not logged in
163 # If user is in a matching group, we grant permission
164 for group
in row
.groups
:
165 if group
in account
.groups
:
168 # Otherwise access is not permitted
171 # If no ACLs are found, we permit access
176 def _get_files(self
, query
, *args
):
177 res
= self
.db
.query(query
, *args
)
180 yield File(self
.backend
, row
.id, data
=row
)
182 def _get_file(self
, query
, *args
):
183 res
= self
.db
.get(query
, *args
)
186 return File(self
.backend
, res
.id, data
=res
)
188 def get_files(self
, path
):
189 files
= self
._get
_files
("SELECT * FROM wiki_files \
190 WHERE path = %s AND deleted_at IS NULL ORDER BY filename", path
)
194 def get_file_by_path(self
, path
):
195 path
, filename
= os
.path
.dirname(path
), os
.path
.basename(path
)
197 return self
._get
_file
("SELECT * FROM wiki_files \
198 WHERE path = %s AND filename = %s AND deleted_at IS NULL", path
, filename
)
200 def upload(self
, path
, filename
, data
, mimetype
, author
, address
):
201 # Upload the blob first
202 blob
= self
.db
.get("INSERT INTO wiki_blobs(data) VALUES(%s) RETURNING id", data
)
204 # Create entry for file
205 return self
._get
_file
("INSERT INTO wiki_files(path, filename, author_uid, address, \
206 mimetype, blob_id, size) VALUES(%s, %s, %s, %s, %s, %s, %s) RETURNING *", path
,
207 filename
, author
.uid
, address
, mimetype
, blob
.id, len(data
))
209 def find_image(self
, path
, filename
):
210 for p
in (path
, os
.path
.dirname(path
)):
211 file = self
.get_file_by_path(os
.path
.join(p
, filename
))
213 if file and file.is_image():
216 def render(self
, path
, text
):
217 r
= WikiRenderer(self
.backend
, path
)
219 return r
.render(text
)
222 class Page(misc
.Object
):
223 def init(self
, id, data
=None):
228 return "<%s %s %s>" % (self
.__class
__.__name
__, self
.page
, self
.timestamp
)
230 def __eq__(self
, other
):
231 if isinstance(other
, self
.__class
__):
232 return self
.id == other
.id
234 def __lt__(self
, other
):
235 if isinstance(other
, self
.__class
__):
236 if self
.page
== other
.page
:
237 return self
.timestamp
< other
.timestamp
239 return self
.page
< other
.page
242 def sanitise_page_name(page
):
246 # Make sure that the page name does NOT end with a /
247 if page
.endswith("/"):
250 # Make sure the page name starts with a /
251 if not page
.startswith("/"):
254 # Remove any double slashes
255 page
= page
.replace("//", "/")
265 return "https://wiki.ipfire.org%s" % self
.url
269 return self
.data
.page
273 return self
._title
or os
.path
.basename(self
.page
[1:])
277 if not self
.markdown
:
280 # Find first H1 headline in markdown
281 markdown
= self
.markdown
.splitlines()
283 m
= re
.match(r
"^# (.*)( #)?$", markdown
[0])
289 if self
.data
.author_uid
:
290 return self
.backend
.accounts
.get_by_uid(self
.data
.author_uid
)
294 return self
.data
.markdown
or ""
298 return self
.backend
.wiki
.render(self
.page
, self
.markdown
)
302 return self
.data
.timestamp
304 def was_deleted(self
):
305 return self
.markdown
is None
308 def breadcrumbs(self
):
309 return self
.backend
.wiki
.make_breadcrumbs(self
.page
)
311 def get_latest_revision(self
):
312 revisions
= self
.get_revisions()
314 # Return first object
315 for rev
in revisions
:
318 def get_revisions(self
):
319 return self
.backend
.wiki
._get
_pages
("SELECT * FROM wiki \
320 WHERE page = %s ORDER BY timestamp DESC", self
.page
)
323 def previous_revision(self
):
324 return self
.backend
.wiki
._get
_page
("SELECT * FROM wiki \
325 WHERE page = %s AND timestamp < %s ORDER BY timestamp DESC \
326 LIMIT 1", self
.page
, self
.timestamp
)
330 return self
.data
.changes
334 def check_acl(self
, account
):
335 return self
.backend
.wiki
.check_acl(self
.page
, account
)
341 parts
= self
.page
.split("/")
344 sidebar
= self
.backend
.wiki
.get_page("%s/sidebar" % os
.path
.join(*parts
))
354 if self
.previous_revision
:
355 diff
= difflib
.unified_diff(
356 self
.previous_revision
.markdown
.splitlines(),
357 self
.markdown
.splitlines(),
360 return "\n".join(diff
)
364 res
= self
.db
.query("SELECT uid FROM wiki_watchlist \
365 WHERE page = %s", self
.page
)
368 # Search for account by UID and skip if none was found
369 account
= self
.backend
.accounts
.get_by_uid(row
.uid
)
376 def is_watched_by(self
, account
):
377 res
= self
.db
.get("SELECT 1 FROM wiki_watchlist \
378 WHERE page = %s AND uid = %s", self
.page
, account
.uid
)
385 def add_watcher(self
, account
):
386 if self
.is_watched_by(account
):
389 self
.db
.execute("INSERT INTO wiki_watchlist(page, uid) \
390 VALUES(%s, %s)", self
.page
, account
.uid
)
392 def remove_watcher(self
, account
):
393 self
.db
.execute("DELETE FROM wiki_watchlist \
394 WHERE page = %s AND uid = %s", self
.page
, account
.uid
)
396 def _send_watcher_emails(self
, excludes
=[]):
397 # Nothing to do if there was no previous revision
398 if not self
.previous_revision
:
401 for watcher
in self
.watchers
:
402 # Skip everyone who is excluded
403 if watcher
in excludes
:
404 logging
.debug("Excluding %s" % watcher
)
408 if not self
.backend
.wiki
.check_acl(self
.page
, watcher
):
409 logging
.debug("Watcher %s does not have permissions" % watcher
)
412 logging
.debug("Sending watcher email to %s" % watcher
)
415 self
.backend
.messages
.send_template("wiki/messages/page-changed",
416 recipients
=[watcher
], page
=self
, priority
=-10)
419 class File(misc
.Object
):
420 def init(self
, id, data
):
426 return os
.path
.join(self
.path
, self
.filename
)
430 return self
.data
.path
434 return self
.data
.filename
438 return self
.data
.mimetype
442 return self
.data
.size
446 if self
.data
.author_uid
:
447 return self
.backend
.accounts
.get_by_uid(self
.data
.author_uid
)
450 def created_at(self
):
451 return self
.data
.created_at
454 return self
.mimetype
in ("application/pdf", "application/x-pdf")
457 return self
.mimetype
.startswith("image/")
461 res
= self
.db
.get("SELECT data FROM wiki_blobs \
462 WHERE id = %s", self
.data
.blob_id
)
465 return bytes(res
.data
)
467 def get_thumbnail(self
, size
):
468 cache_key
= "-".join((self
.path
, util
.normalize(self
.filename
), self
.created_at
.isoformat(), "%spx" % size
))
470 # Try to fetch the data from the cache
471 thumbnail
= self
.memcache
.get(cache_key
)
475 # Generate the thumbnail
476 thumbnail
= util
.generate_thumbnail(self
.blob
, size
)
478 # Put it into the cache for forever
479 self
.memcache
.set(cache_key
, thumbnail
)
484 class WikiRenderer(misc
.Object
):
486 wiki_link
= re
.compile(r
"\[\[([\w\d\/\-\.]+)(?:\|(.+?))?\]\]")
489 external_link
= re
.compile(r
"\[\[((?:ftp|git|https?|rsync|sftp|ssh|webcal)\:\/\/.+?)(?:\|(.+?))?\]\]")
491 # Interwiki links e.g. [[wp>IPFire]]
492 interwiki_link
= re
.compile(r
"\[\[(\w+)>(.+?)(?:\|(.+?))?\]\]")
495 email_link
= re
.compile(r
"\[\[([a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)(?:\|(.+?))?\]\]")
497 def init(self
, path
):
500 def _render_wiki_link(self
, m
):
501 path
, alias
= m
.groups()
503 # Nothing to do for absolute links
504 if path
.startswith("/"):
507 # Relative links (one-level down)
508 elif path
.startswith("./"):
509 path
= os
.path
.join(self
.path
, path
)
511 # All other relative links
513 p
= os
.path
.dirname(self
.path
)
514 path
= os
.path
.join(p
, path
)
517 path
= os
.path
.normpath(path
)
519 return """<a href="%s">%s</a>""" % (
521 alias
or self
.backend
.wiki
.get_page_title(path
),
524 def _render_external_link(self
, m
):
525 url
, alias
= m
.groups()
527 return """<a class="link-external" href="%s">%s</a>""" % (url
, alias
or url
)
529 def _render_interwiki_link(self
, m
):
536 url
, repl
, icon
= INTERWIKIS
[wiki
]
538 logging
.warning("Invalid interwiki: %s" % wiki
)
547 "url" : urllib
.parse
.quote(name
),
550 # Get alias (if present)
553 if not alias
and repl
:
556 # Put everything together
560 s
.append("<span class=\"%s\"></span>" % icon
)
562 s
.append("""<a class="link-external" href="%s">%s</a>""" % (url
, alias
or name
))
566 def _render_email_link(self
, m
):
567 address
, alias
= m
.groups()
569 return """<a class="link-external" href="mailto:%s">%s</a>""" \
570 % (address
, alias
or address
)
572 def render(self
, text
):
573 logging
.debug("Rendering %s" % self
.path
)
577 for match
in re
.finditer(r
"!\[(.*?)\]\((.*?)\)", text
):
578 alt_text
, url
= match
.groups()
580 # Skip any absolute and external URLs
581 if url
.startswith("/") or url
.startswith("https://") or url
.startswith("http://"):
584 # Try to split query string
585 url
, delimiter
, qs
= url
.partition("?")
587 # Parse query arguments
588 args
= urllib
.parse
.parse_qs(qs
)
591 file = self
.backend
.wiki
.find_image(self
.path
, url
)
595 # Scale down the image if not already done
600 url
= "%s?%s" % (file.url
, urllib
.parse
.urlencode(args
))
602 replacements
.append((match
.span(), file, alt_text
, url
))
604 # Apply all replacements
605 for (start
, end
), file, alt_text
, url
in reversed(replacements
):
606 text
= text
[:start
] + "[![%s](%s)](%s?action=detail)" % (alt_text
, url
, file.url
) + text
[end
:]
609 text
= self
.wiki_link
.sub(self
._render
_wiki
_link
, text
)
611 # Handle interwiki links
612 text
= self
.interwiki_link
.sub(self
._render
_interwiki
_link
, text
)
614 # Handle external links
615 text
= self
.external_link
.sub(self
._render
_external
_link
, text
)
618 text
= self
.email_link
.sub(self
._render
_email
_link
, text
)
620 # Borrow this from the blog
621 return self
.backend
.blog
._render
_text
(text
, lang
="markdown")