]> git.ipfire.org Git - ipfire.org.git/blob - src/backend/wiki.py
wiki: Embed images
[ipfire.org.git] / src / backend / wiki.py
1 #!/usr/bin/python3
2
3 import PIL
4 import io
5 import logging
6 import os.path
7 import re
8 import urllib.parse
9
10 from . import misc
11 from . import util
12 from .decorators import *
13
14 class Wiki(misc.Object):
15 def _get_pages(self, query, *args):
16 res = self.db.query(query, *args)
17
18 for row in res:
19 yield Page(self.backend, row.id, data=row)
20
21 def _get_page(self, query, *args):
22 res = self.db.get(query, *args)
23
24 if res:
25 return Page(self.backend, res.id, data=res)
26
27 def get_page_title(self, page, default=None):
28 doc = self.get_page(page)
29 if doc:
30 return doc.title
31
32 return default or os.path.basename(page)
33
34 def get_page(self, page, revision=None):
35 page = Page.sanitise_page_name(page)
36 assert page
37
38 if revision:
39 return self._get_page("SELECT * FROM wiki WHERE page = %s \
40 AND timestamp = %s", page, revision)
41 else:
42 return self._get_page("SELECT * FROM wiki WHERE page = %s \
43 ORDER BY timestamp DESC LIMIT 1", page)
44
45 def get_recent_changes(self, limit=None):
46 return self._get_pages("SELECT * FROM wiki \
47 WHERE timestamp >= NOW() - INTERVAL '4 weeks' \
48 ORDER BY timestamp DESC LIMIT %s", limit)
49
50 def create_page(self, page, author, content, changes=None, address=None):
51 page = Page.sanitise_page_name(page)
52
53 return self._get_page("INSERT INTO wiki(page, author_uid, markdown, changes, address) \
54 VALUES(%s, %s, %s, %s, %s) RETURNING *", page, author.uid, content or None, changes, address)
55
56 def delete_page(self, page, author, **kwargs):
57 # Do nothing if the page does not exist
58 if not self.get_page(page):
59 return
60
61 # Just creates a blank last version of the page
62 self.create_page(page, author=author, content=None, **kwargs)
63
64 def make_breadcrumbs(self, url):
65 # Split and strip all empty elements (double slashes)
66 parts = list(e for e in url.split("/") if e)
67
68 ret = []
69 for part in ("/".join(parts[:i]) for i in range(1, len(parts))):
70 ret.append(("/%s" % part, self.get_page_title(part, os.path.basename(part))))
71
72 return ret
73
74 def search(self, query, limit=None):
75 query = util.parse_search_query(query)
76
77 res = self._get_pages("SELECT wiki.* FROM wiki_search_index search_index \
78 LEFT JOIN wiki ON search_index.wiki_id = wiki.id \
79 WHERE search_index.document @@ to_tsquery('english', %s) \
80 ORDER BY ts_rank(search_index.document, to_tsquery('english', %s)) DESC \
81 LIMIT %s", query, query, limit)
82
83 return list(res)
84
85 def refresh(self):
86 """
87 Needs to be called after a page has been changed
88 """
89 self.db.execute("REFRESH MATERIALIZED VIEW wiki_search_index")
90
91 # Files
92
93 def _get_files(self, query, *args):
94 res = self.db.query(query, *args)
95
96 for row in res:
97 yield File(self.backend, row.id, data=row)
98
99 def _get_file(self, query, *args):
100 res = self.db.get(query, *args)
101
102 if res:
103 return File(self.backend, res.id, data=res)
104
105 def get_files(self, path):
106 files = self._get_files("SELECT * FROM wiki_files \
107 WHERE path = %s AND deleted_at IS NULL ORDER BY filename", path)
108
109 return list(files)
110
111 def get_file_by_path(self, path):
112 path, filename = os.path.dirname(path), os.path.basename(path)
113
114 return self._get_file("SELECT * FROM wiki_files \
115 WHERE path = %s AND filename = %s AND deleted_at IS NULL", path, filename)
116
117 def upload(self, path, filename, data, mimetype, author, address):
118 # Upload the blob first
119 blob = self.db.get("INSERT INTO wiki_blobs(data) VALUES(%s) RETURNING id", data)
120
121 # Create entry for file
122 return self._get_file("INSERT INTO wiki_files(path, filename, author_uid, address, \
123 mimetype, blob_id, size) VALUES(%s, %s, %s, %s, %s, %s, %s) RETURNING *", path,
124 filename, author.uid, address, mimetype, blob.id, len(data))
125
126 def find_image(self, path, filename):
127 for p in (path, os.path.dirname(path)):
128 file = self.get_file_by_path(os.path.join(p, filename))
129
130 if file and file.is_image():
131 return file
132
133
134 class Page(misc.Object):
135 def init(self, id, data=None):
136 self.id = id
137 self.data = data
138
139 def __lt__(self, other):
140 if isinstance(other, self.__class__):
141 if self.page == other.page:
142 return self.timestamp < other.timestamp
143
144 return self.page < other.page
145
146 @staticmethod
147 def sanitise_page_name(page):
148 if not page:
149 return "/"
150
151 # Make sure that the page name does NOT end with a /
152 if page.endswith("/"):
153 page = page[:-1]
154
155 # Make sure the page name starts with a /
156 if not page.startswith("/"):
157 page = "/%s" % page
158
159 # Remove any double slashes
160 page = page.replace("//", "/")
161
162 return page
163
164 @property
165 def url(self):
166 return self.page
167
168 @property
169 def page(self):
170 return self.data.page
171
172 @property
173 def title(self):
174 return self._title or self.page[1:]
175
176 @property
177 def _title(self):
178 if not self.markdown:
179 return
180
181 # Find first H1 headline in markdown
182 markdown = self.markdown.splitlines()
183
184 m = re.match(r"^# (.*)( #)?$", markdown[0])
185 if m:
186 return m.group(1)
187
188 @lazy_property
189 def author(self):
190 if self.data.author_uid:
191 return self.backend.accounts.get_by_uid(self.data.author_uid)
192
193 def _render(self, text):
194 logging.debug("Rendering %s" % self)
195
196 print(text)
197
198 # Link images
199 replacements = []
200 for match in re.finditer(r"!\[(.*)\]\((.*)\)", text):
201 alt_text, url = match.groups()
202
203 # Skip any absolute and external URLs
204 if url.startswith("/") or url.startswith("https://") or url.startswith("http://"):
205 continue
206
207 # Try to split query string
208 url, delimiter, qs = url.partition("?")
209
210 # Parse query arguments
211 args = urllib.parse.parse_qs(qs)
212
213 # Find image
214 file = self.backend.wiki.find_image(self.page, url)
215 if not file:
216 continue
217
218 # Scale down the image if not already done
219 if not "s" in args:
220 args["s"] = "768"
221
222 # Format URL
223 url = "%s?%s" % (file.url, urllib.parse.urlencode(args))
224
225 replacements.append((match.span(), alt_text, url))
226
227 # Apply all replacements
228 for (start, end), alt_text, url in reversed(replacements):
229 text = text[:start] + "![%s](%s)" % (alt_text, url) + text[end:]
230
231 print(text)
232
233 # Add wiki links
234 patterns = (
235 (r"\[\[([\w\d\/]+)(?:\|([\w\d\s]+))\]\]", r"/\1", r"\2", None, None),
236 (r"\[\[([\w\d\/\-]+)\]\]", r"/\1", r"\1", self.backend.wiki.get_page_title, r"\1"),
237 )
238
239 for pattern, link, title, repl, args in patterns:
240 replacements = []
241
242 for match in re.finditer(pattern, text):
243 l = match.expand(link)
244 t = match.expand(title)
245
246 if callable(repl):
247 t = repl(match.expand(args)) or t
248
249 replacements.append((match.span(), t or l, l))
250
251 # Apply all replacements
252 for (start, end), t, l in reversed(replacements):
253 text = text[:start] + "[%s](%s)" % (t, l) + text[end:]
254
255 # Borrow this from the blog
256 return self.backend.blog._render_text(text, lang="markdown")
257
258 @property
259 def markdown(self):
260 return self.data.markdown
261
262 @property
263 def html(self):
264 return self.data.html or self._render(self.markdown)
265
266 @property
267 def timestamp(self):
268 return self.data.timestamp
269
270 def was_deleted(self):
271 return self.markdown is None
272
273 @lazy_property
274 def breadcrumbs(self):
275 return self.backend.wiki.make_breadcrumbs(self.page)
276
277 def get_latest_revision(self):
278 revisions = self.get_revisions()
279
280 # Return first object
281 for rev in revisions:
282 return rev
283
284 def get_revisions(self):
285 return self.backend.wiki._get_pages("SELECT * FROM wiki \
286 WHERE page = %s ORDER BY timestamp DESC", self.page)
287
288 @property
289 def changes(self):
290 return self.data.changes
291
292 # Sidebar
293
294 @lazy_property
295 def sidebar(self):
296 parts = self.page.split("/")
297
298 while parts:
299 sidebar = self.backend.wiki.get_page("%s/sidebar" % os.path.join(*parts))
300 if sidebar:
301 return sidebar
302
303 parts.pop()
304
305
306 class File(misc.Object):
307 def init(self, id, data):
308 self.id = id
309 self.data = data
310
311 @property
312 def url(self):
313 return os.path.join(self.path, self.filename)
314
315 @property
316 def path(self):
317 return self.data.path
318
319 @property
320 def filename(self):
321 return self.data.filename
322
323 @property
324 def mimetype(self):
325 return self.data.mimetype
326
327 @property
328 def size(self):
329 return self.data.size
330
331 def is_image(self):
332 return self.mimetype.startswith("image/")
333
334 @lazy_property
335 def blob(self):
336 res = self.db.get("SELECT data FROM wiki_blobs \
337 WHERE id = %s", self.data.blob_id)
338
339 if res:
340 return bytes(res.data)
341
342 def get_thumbnail(self, size):
343 image = PIL.Image.open(io.BytesIO(self.blob))
344
345 # Resize the image to the desired resolution
346 image.thumbnail((size, size), PIL.Image.ANTIALIAS)
347
348 with io.BytesIO() as f:
349 # If writing out the image does not work with optimization,
350 # we try to write it out without any optimization.
351 try:
352 image.save(f, image.format, optimize=True, quality=98)
353 except:
354 image.save(f, image.format, quality=98)
355
356 return f.getvalue()