]> git.ipfire.org Git - ipfire.org.git/blob - src/web/auth.py
wiki: Only match usernames when a word starts with @
[ipfire.org.git] / src / web / auth.py
1 #!/usr/bin/python
2
3 import logging
4 import tornado.web
5 import urllib.parse
6
7 from . import base
8 from . import ui_modules
9
10 class AuthenticationMixin(object):
11 def login(self, account):
12 # User has logged in, create a session
13 session_id, session_expires = self.backend.accounts.create_session(
14 account, self.request.host)
15
16 # Check if a new session was created
17 if not session_id:
18 raise tornado.web.HTTPError(500, "Could not create session")
19
20 # Send session cookie to the client
21 self.set_cookie("session_id", session_id,
22 domain=self.request.host, expires=session_expires, secure=True)
23
24 def logout(self):
25 session_id = self.get_cookie("session_id")
26 if not session_id:
27 return
28
29 success = self.backend.accounts.destroy_session(session_id, self.request.host)
30 if success:
31 self.clear_cookie("session_id")
32
33
34 class LoginHandler(base.AnalyticsMixin, AuthenticationMixin, base.BaseHandler):
35 def get(self):
36 next = self.get_argument("next", None)
37
38 self.render("auth/login.html", next=next,
39 incorrect=False, username=None)
40
41 @base.ratelimit(minutes=15, requests=10)
42 def post(self):
43 username = self.get_argument("username")
44 password = self.get_argument("password")
45 next = self.get_argument("next", "/")
46
47 # Find user
48 account = self.backend.accounts.auth(username, password)
49 if not account:
50 logging.error("Unknown user or invalid password: %s" % username)
51
52 # Set status to 401
53 self.set_status(401)
54
55 # Render login page again
56 return self.render("auth/login.html",
57 incorrect=True, username=username, next=next,
58 )
59
60 # Create session
61 with self.db.transaction():
62 self.login(account)
63
64 # Redirect the user
65 return self.redirect(next)
66
67
68 class LogoutHandler(AuthenticationMixin, base.BaseHandler):
69 def get(self):
70 with self.db.transaction():
71 self.logout()
72
73 # Get back to the start page
74 self.redirect("/")
75
76
77 class JoinHandler(base.AnalyticsMixin, base.BaseHandler):
78 def get(self):
79 # Redirect logged in users away
80 if self.current_user:
81 self.redirect("/")
82 return
83
84 self.render("auth/join.html")
85
86 @base.ratelimit(minutes=15, requests=5)
87 async def post(self):
88 uid = self.get_argument("uid")
89 email = self.get_argument("email")
90
91 first_name = self.get_argument("first_name")
92 last_name = self.get_argument("last_name")
93
94 # Register account
95 try:
96 with self.db.transaction():
97 self.backend.accounts.join(uid, email,
98 first_name=first_name, last_name=last_name,
99 country_code=self.current_country_code)
100 except ValueError as e:
101 raise tornado.web.HTTPError(400, "%s" % e) from e
102
103 self.render("auth/join-success.html")
104
105
106 class ActivateHandler(AuthenticationMixin, base.BaseHandler):
107 def get(self, uid, activation_code):
108 self.render("auth/activate.html")
109
110 def post(self, uid, activation_code):
111 password1 = self.get_argument("password1")
112 password2 = self.get_argument("password2")
113
114 if not password1 == password2:
115 raise tornado.web.HTTPError(400, "Passwords do not match")
116
117 with self.db.transaction():
118 account = self.backend.accounts.activate(uid, activation_code)
119 if not account:
120 raise tornado.web.HTTPError(400, "Account not found: %s" % uid)
121
122 # Set the new password
123 account.passwd(password1)
124
125 # Create session
126 self.login(account)
127
128 # Redirect to success page
129 self.render("auth/activated.html", account=account)
130
131
132 class PasswordResetInitiationHandler(base.BaseHandler):
133 def get(self):
134 username = self.get_argument("username", None)
135
136 self.render("auth/password-reset-initiation.html", username=username)
137
138 @base.ratelimit(minutes=15, requests=10)
139 def post(self):
140 username = self.get_argument("username")
141
142 # Fetch account and submit password reset
143 account = self.backend.accounts.get_by_uid(username)
144 if account:
145 with self.db.transaction():
146 account.request_password_reset()
147
148 self.render("auth/password-reset-successful.html")
149
150
151 class PasswordResetHandler(AuthenticationMixin, base.BaseHandler):
152 def get(self, uid, reset_code):
153 account = self.backend.accounts.get_by_uid(uid)
154 if not account:
155 raise tornado.web.HTTPError(404, "Could not find account: %s" % uid)
156
157 self.render("auth/password-reset.html", account=account)
158
159 def post(self, uid, reset_code):
160 account = self.backend.accounts.get_by_uid(uid)
161 if not account:
162 raise tornado.web.HTTPError(404, "Could not find account: %s" % uid)
163
164 password1 = self.get_argument("password1")
165 password2 = self.get_argument("password2")
166
167 if not password1 == password2:
168 raise tornado.web.HTTPError(400, "Passwords do not match")
169
170 # Try to perform password reset
171 with self.db.transaction():
172 account.reset_password(reset_code, password1)
173
174 # Login the user straight away after reset was successful
175 self.login(account)
176
177 # Redirect back to /
178 self.redirect("/")
179
180
181 class WellKnownChangePasswordHandler(base.BaseHandler):
182 @tornado.web.authenticated
183 def get(self):
184 """
185 Implements https://web.dev/articles/change-password-url
186 """
187 self.redirect("/users/%s/passwd" % self.current_user.uid)
188
189
190 class SSODiscourse(base.BaseHandler):
191 @base.ratelimit(minutes=24*60, requests=100)
192 @tornado.web.authenticated
193 def get(self):
194 # Fetch Discourse's parameters
195 sso = self.get_argument("sso")
196 sig = self.get_argument("sig")
197
198 # Decode payload
199 try:
200 params = self.accounts.decode_discourse_payload(sso, sig)
201
202 # Raise bad request if the signature is invalid
203 except ValueError:
204 raise tornado.web.HTTPError(400)
205
206 # Redirect back if user is already logged in
207 args = {
208 "nonce" : params.get("nonce"),
209 "external_id" : self.current_user.uid,
210
211 # Pass email address
212 "email" : self.current_user.email,
213 "require_activation" : "false",
214
215 # More details about the user
216 "username" : self.current_user.uid,
217 "name" : "%s" % self.current_user,
218 "bio" : self.current_user.description or "",
219
220 # Avatar
221 "avatar_url" : self.current_user.avatar_url(absolute=True),
222 "avatar_force_update" : "true",
223
224 # Send a welcome message
225 "suppress_welcome_message" : "false",
226
227 # Group memberships
228 "admin" : "true" if self.current_user.is_admin() else "false",
229 "moderator" : "true" if self.current_user.is_moderator() else "false",
230 }
231
232 # Format payload and sign it
233 payload = self.accounts.encode_discourse_payload(**args)
234 signature = self.accounts.sign_discourse_payload(payload)
235
236 qs = urllib.parse.urlencode({
237 "sso" : payload,
238 "sig" : signature,
239 })
240
241 # Redirect user
242 self.redirect("%s?%s" % (params.get("return_sso_url"), qs))
243
244
245 class PasswordModule(ui_modules.UIModule):
246 def render(self, account=None):
247 return self.render_string("auth/modules/password.html", account=account)
248
249 def javascript_files(self):
250 return "js/zxcvbn.js"
251
252 def embedded_javascript(self):
253 return self.render_string("auth/modules/password.js")
254
255
256 class APICheckUID(base.APIHandler):
257 @base.ratelimit(minutes=1, requests=100)
258 def get(self):
259 uid = self.get_argument("uid")
260 result = None
261
262 if not uid:
263 result = "empty"
264
265 # Check if the username is syntactically valid
266 elif not self.backend.accounts.uid_is_valid(uid):
267 result = "invalid"
268
269 # Check if the username is already taken
270 elif self.backend.accounts.uid_exists(uid):
271 result = "taken"
272
273 # Username seems to be okay
274 self.finish({ "result" : result or "ok" })
275
276
277 class APICheckEmail(base.APIHandler):
278 @base.ratelimit(minutes=1, requests=100)
279 def get(self):
280 email = self.get_argument("email")
281 result = None
282
283 if not email:
284 result = "empty"
285
286 elif not self.backend.accounts.mail_is_valid(email):
287 result = "invalid"
288
289 # Check if this email address is blacklisted
290 elif self.backend.accounts.mail_is_blacklisted(email):
291 result = "blacklisted"
292
293 # Check if this email address is already useed
294 elif self.backend.accounts.get_by_mail(email):
295 result = "taken"
296
297 self.finish({ "result" : result or "ok" })