]> git.ipfire.org Git - ipfire.org.git/blob - src/backend/zeiterfassung.py
donation: Fix typo
[ipfire.org.git] / src / backend / zeiterfassung.py
1 #!/usr/bin/python3
2
3 import hashlib
4 import hmac
5 import json
6 import logging
7 import tornado.httpclient
8 import urllib.parse
9
10 from .misc import Object
11
12 class ZeiterfassungClient(Object):
13 algorithm = "Zeiterfassung-HMAC-SHA512"
14
15 def init(self):
16 self.url = self.settings.get("zeiterfassung_url")
17
18 # API credentials
19 self.api_key = self.settings.get("zeiterfassung_api_key")
20 self.api_secret = self.settings.get("zeiterfassung_api_secret")
21
22 # Check if all configuration values are set
23 if not all((self.url, self.api_key, self.api_secret)):
24 raise RuntimeError("%s is not configured" % self.__class__.__name__)
25
26 def _sign_request(self, method, path, body):
27 # Empty since we only support POST
28 canonical_query = ""
29
30 # Put everything together
31 string_to_sign = "\n".join((
32 method, path, canonical_query,
33 )).encode("utf-8") + body
34
35 # Compute HMAC
36 h = hmac.new(self.api_secret.encode("utf-8"), string_to_sign, hashlib.sha512)
37
38 return h.hexdigest()
39
40 def _sign_response(self, body):
41 h = hmac.new(self.api_secret.encode("utf-8"), body, hashlib.sha512)
42
43 return h.hexdigest()
44
45 async def send_request(self, path, **kwargs):
46 url = urllib.parse.urljoin(self.url, path)
47
48 request = tornado.httpclient.HTTPRequest(url, method="POST")
49 request.body = urllib.parse.urlencode(kwargs)
50
51 # Compose the signature
52 signature = self._sign_request("POST", path, request.body)
53
54 # Add authorization header
55 request.headers["Authorization"] = " ".join(
56 (self.algorithm, self.api_key, signature)
57 )
58
59 # Log request
60 logging.debug("Sending request to %s:" % request.url)
61 for header in sorted(request.headers):
62 logging.debug(" %s: %s" % (header, request.headers[header]))
63
64 # Send the request
65 response = await self.backend.http_client.fetch(request)
66
67 # Log response
68 logging.debug("Got response %s from %s in %.2fms:" % \
69 (response.code, response.effective_url, response.request_time * 1000))
70 for header in response.headers:
71 logging.debug(" %s: %s" % (header, response.headers[header]))
72
73 # Fetch the whole body
74 body = response.body
75
76 # Fetch the signature
77 signature = response.headers.get("Hash")
78 if not signature:
79 raise RuntimeError("Could not find signature on response")
80
81 expected_signature = self._sign_response(body)
82 if not hmac.compare_digest(expected_signature, signature):
83 raise RuntimeError("Invalid signature: %s" % signature)
84
85 # Decode the JSON response
86 return json.loads(body)