]>
git.ipfire.org Git - ipfire.org.git/blob - src/backend/wiki.py
11 from .decorators
import *
13 class Wiki(misc
.Object
):
14 def _get_pages(self
, query
, *args
):
15 res
= self
.db
.query(query
, *args
)
18 yield Page(self
.backend
, row
.id, data
=row
)
20 def _get_page(self
, query
, *args
):
21 res
= self
.db
.get(query
, *args
)
24 return Page(self
.backend
, res
.id, data
=res
)
26 def get_page_title(self
, page
, default
=None):
27 doc
= self
.get_page(page
)
31 return default
or os
.path
.basename(page
)
33 def get_page(self
, page
, revision
=None):
34 page
= Page
.sanitise_page_name(page
)
38 return self
._get
_page
("SELECT * FROM wiki WHERE page = %s \
39 AND timestamp = %s", page
, revision
)
41 return self
._get
_page
("SELECT * FROM wiki WHERE page = %s \
42 ORDER BY timestamp DESC LIMIT 1", page
)
44 def get_recent_changes(self
, limit
=None):
45 return self
._get
_pages
("SELECT * FROM wiki \
46 WHERE timestamp >= NOW() - INTERVAL '4 weeks' \
47 ORDER BY timestamp DESC LIMIT %s", limit
)
49 def create_page(self
, page
, author
, content
, changes
=None, address
=None):
50 page
= Page
.sanitise_page_name(page
)
52 return self
._get
_page
("INSERT INTO wiki(page, author_uid, markdown, changes, address) \
53 VALUES(%s, %s, %s, %s, %s) RETURNING *", page
, author
.uid
, content
or None, changes
, address
)
55 def delete_page(self
, page
, author
, **kwargs
):
56 # Do nothing if the page does not exist
57 if not self
.get_page(page
):
60 # Just creates a blank last version of the page
61 self
.create_page(page
, author
=author
, content
=None, **kwargs
)
63 def make_breadcrumbs(self
, url
):
64 # Split and strip all empty elements (double slashes)
65 parts
= list(e
for e
in url
.split("/") if e
)
68 for part
in ("/".join(parts
[:i
]) for i
in range(1, len(parts
))):
69 ret
.append(("/%s" % part
, self
.get_page_title(part
, os
.path
.basename(part
))))
73 def search(self
, query
, limit
=None):
74 query
= util
.parse_search_query(query
)
76 res
= self
._get
_pages
("SELECT wiki.* FROM wiki_search_index search_index \
77 LEFT JOIN wiki ON search_index.wiki_id = wiki.id \
78 WHERE search_index.document @@ to_tsquery('english', %s) \
79 ORDER BY ts_rank(search_index.document, to_tsquery('english', %s)) DESC \
80 LIMIT %s", query
, query
, limit
)
86 Needs to be called after a page has been changed
88 self
.db
.execute("REFRESH MATERIALIZED VIEW wiki_search_index")
92 def _get_files(self
, query
, *args
):
93 res
= self
.db
.query(query
, *args
)
96 yield File(self
.backend
, row
.id, data
=row
)
98 def _get_file(self
, query
, *args
):
99 res
= self
.db
.get(query
, *args
)
102 return File(self
.backend
, res
.id, data
=res
)
104 def get_files(self
, path
):
105 files
= self
._get
_files
("SELECT * FROM wiki_files \
106 WHERE path = %s AND deleted_at IS NULL ORDER BY filename", path
)
110 def get_file_by_path(self
, path
):
111 path
, filename
= os
.path
.dirname(path
), os
.path
.basename(path
)
113 return self
._get
_file
("SELECT * FROM wiki_files \
114 WHERE path = %s AND filename = %s AND deleted_at IS NULL", path
, filename
)
116 def upload(self
, path
, filename
, data
, mimetype
, author
, address
):
117 # Upload the blob first
118 blob
= self
.db
.get("INSERT INTO wiki_blobs(data) VALUES(%s) RETURNING id", data
)
120 # Create entry for file
121 return self
._get
_file
("INSERT INTO wiki_files(path, filename, author_uid, address, \
122 mimetype, blob_id, size) VALUES(%s, %s, %s, %s, %s, %s, %s) RETURNING *", path
,
123 filename
, author
.uid
, address
, mimetype
, blob
.id, len(data
))
126 class Page(misc
.Object
):
127 def init(self
, id, data
=None):
131 def __lt__(self
, other
):
132 if isinstance(other
, self
.__class
__):
133 if self
.page
== other
.page
:
134 return self
.timestamp
< other
.timestamp
136 return self
.page
< other
.page
139 def sanitise_page_name(page
):
143 # Make sure that the page name does NOT end with a /
144 if page
.endswith("/"):
147 # Make sure the page name starts with a /
148 if not page
.startswith("/"):
151 # Remove any double slashes
152 page
= page
.replace("//", "/")
162 return self
.data
.page
166 return self
._title
or self
.page
[1:]
170 if not self
.markdown
:
173 # Find first H1 headline in markdown
174 markdown
= self
.markdown
.splitlines()
176 m
= re
.match(r
"^# (.*)( #)?$", markdown
[0])
182 if self
.data
.author_uid
:
183 return self
.backend
.accounts
.get_by_uid(self
.data
.author_uid
)
185 def _render(self
, text
):
186 logging
.debug("Rendering %s" % self
)
189 (r
"\[\[([\w\d\/]+)(?:\|([\w\d\s]+))\]\]", r
"/\1", r
"\2", None, None),
190 (r
"\[\[([\w\d\/\-]+)\]\]", r
"/\1", r
"\1", self
.backend
.wiki
.get_page_title
, r
"\1"),
193 for pattern
, link
, title
, repl
, args
in patterns
:
196 for match
in re
.finditer(pattern
, text
):
197 l
= match
.expand(link
)
198 t
= match
.expand(title
)
201 t
= repl(match
.expand(args
)) or t
203 replacements
.append((match
.span(), t
or l
, l
))
205 # Apply all replacements
206 for (start
, end
), t
, l
in reversed(replacements
):
207 text
= text
[:start
] + "[%s](%s)" % (t
, l
) + text
[end
:]
209 # Borrow this from the blog
210 return self
.backend
.blog
._render
_text
(text
, lang
="markdown")
214 return self
.data
.markdown
218 return self
.data
.html
or self
._render
(self
.markdown
)
222 return self
.data
.timestamp
224 def was_deleted(self
):
225 return self
.markdown
is None
228 def breadcrumbs(self
):
229 return self
.backend
.wiki
.make_breadcrumbs(self
.page
)
231 def get_latest_revision(self
):
232 revisions
= self
.get_revisions()
234 # Return first object
235 for rev
in revisions
:
238 def get_revisions(self
):
239 return self
.backend
.wiki
._get
_pages
("SELECT * FROM wiki \
240 WHERE page = %s ORDER BY timestamp DESC", self
.page
)
244 return self
.data
.changes
250 parts
= self
.page
.split("/")
253 sidebar
= self
.backend
.wiki
.get_page("%s/sidebar" % os
.path
.join(*parts
))
260 class File(misc
.Object
):
261 def init(self
, id, data
):
267 return os
.path
.join(self
.path
, self
.filename
)
271 return self
.data
.path
275 return self
.data
.filename
279 return self
.data
.mimetype
283 return self
.data
.size
286 return self
.mimetype
.startswith("image/")
290 res
= self
.db
.get("SELECT data FROM wiki_blobs \
291 WHERE id = %s", self
.data
.blob_id
)
294 return bytes(res
.data
)
296 def get_thumbnail(self
, size
):
297 image
= PIL
.Image
.open(io
.BytesIO(self
.blob
))
299 # Resize the image to the desired resolution
300 image
.thumbnail((size
, size
), PIL
.Image
.ANTIALIAS
)
302 with io
.BytesIO() as f
:
303 # If writing out the image does not work with optimization,
304 # we try to write it out without any optimization.
306 image
.save(f
, image
.format
, optimize
=True, quality
=98)
308 image
.save(f
, image
.format
, quality
=98)