]>
git.ipfire.org Git - ipfire.org.git/blob - src/backend/zeiterfassung.py
7 import tornado
.httpclient
10 from .misc
import Object
12 class ZeiterfassungClient(Object
):
13 algorithm
= "Zeiterfassung-HMAC-SHA512"
16 self
.url
= self
.settings
.get("zeiterfassung_url")
19 self
.api_key
= self
.settings
.get("zeiterfassung_api_key")
20 self
.api_secret
= self
.settings
.get("zeiterfassung_api_secret")
22 # Check if all configuration values are set
23 if not all((self
.url
, self
.api_key
, self
.api_secret
)):
24 raise RuntimeError("%s is not configured" % self
.__class
__.__name
__)
26 def _sign_request(self
, method
, path
, body
):
27 # Empty since we only support POST
30 # Put everything together
31 string_to_sign
= "\n".join((
32 method
, path
, canonical_query
,
33 )).encode("utf-8") + body
36 h
= hmac
.new(self
.api_secret
.encode("utf-8"), string_to_sign
, hashlib
.sha512
)
40 def _sign_response(self
, body
):
41 h
= hmac
.new(self
.api_secret
.encode("utf-8"), body
, hashlib
.sha512
)
45 async def send_request(self
, path
, **kwargs
):
46 url
= urllib
.parse
.urljoin(self
.url
, path
)
48 request
= tornado
.httpclient
.HTTPRequest(url
, method
="POST")
49 request
.body
= urllib
.parse
.urlencode(kwargs
)
51 # Compose the signature
52 signature
= self
._sign
_request
("POST", path
, request
.body
)
54 # Add authorization header
55 request
.headers
["Authorization"] = " ".join(
56 (self
.algorithm
, self
.api_key
, signature
)
60 logging
.debug("Sending request to %s:" % request
.url
)
61 for header
in sorted(request
.headers
):
62 logging
.debug(" %s: %s" % (header
, request
.headers
[header
]))
65 response
= await self
.backend
.http_client
.fetch(request
)
68 logging
.debug("Got response %s from %s in %.2fms:" % \
69 (response
.code
, response
.effective_url
, response
.request_time
* 1000))
70 for header
in response
.headers
:
71 logging
.debug(" %s: %s" % (header
, response
.headers
[header
]))
73 # Fetch the whole body
77 signature
= response
.headers
.get("Hash")
79 raise RuntimeError("Could not find signature on response")
81 expected_signature
= self
._sign
_response
(body
)
82 if not hmac
.compare_digest(expected_signature
, signature
):
83 raise RuntimeError("Invalid signature: %s" % signature
)
85 # Decode the JSON response
86 return json
.loads(body
)