PinTanDummyEncryptionMechanism, PinTanOneStepAuthenticationMechanism,
PinTanTwoStepAuthenticationMechanism,
)
-from .segments import HIBPA3, HIRMS2, HIUPA4
+from .segments import HIBPA3, HIRMG2, HIRMS2, HIUPA4
from .segments.accounts import HISPA1, HKSPA, HKSPA1
from .segments.auth import HKTAB, HKTAN, HKTAB4, HKTAB5, HKTAN5
from .segments.depot import HKWPD5, HKWPD6
self.selected_tan_medium = None
self.product_name = 'pyfints'
self.product_version = '0.2'
+ self.response_callbacks = []
self._standing_dialog = None
if set_data:
message.find_segments('HIUPD')
)
+ # We'll delay calling the callback for HIRMG/3060 ("please note the warnings")
+ # to see if any warning other than HIRMS(HKVVD)/3050 ("UPD/BPD not current; current version attached")
+ # or HIRMS(HKVVD)/3920 ("allowed TAN methods") are present
+ # 3050 and 3920 are handled completely internally and never passed to the callback
+ # 3060 is only passed to the callbacks, if any warning/error other than 3050/3920 is present
+ self.delayed_callback_calls = []
+
+ for seg in message.find_segments(HIRMG2):
+ for response in seg.responses:
+ self._log_response(None, response)
+
+ if response.code == '3060':
+ self.delayed_callback_calls.append( (None, response) )
+ else:
+ self._call_callbacks(None, response)
+
+ if response.code.startswith('9'):
+ raise Error("FinTS error response: {!r}".format(response))
+
+ self._process_response(None, response)
+
for seg in message.find_segments(HIRMS2):
for response in seg.responses:
- if response.code == '3920':
- self.allowed_security_functions = list(response.parameters)
- if self.selected_security_function is None or not self.selected_security_function in self.allowed_security_functions:
- # Select the first available twostep security_function that we support
- for security_function, parameter in self.get_tan_mechanisms().items():
- if security_function == '999':
- # Skip onestep TAN
- continue
- if parameter.tan_process != '2':
- # Only support process variant 2 for now
- continue
- try:
- self.set_tan_mechanism(parameter.security_function)
- break
- except NotImplementedError:
- pass
- else:
- # Fall back to onestep
- self.set_tan_mechanism('999')
+ segment = None # FIXME: Provide segment
+
+ self._log_response(None, response)
+
+ if response.code not in ('3050', '3920'):
+ if not response.code.startswith('0') and not response.code.startswith('1'):
+ for cb_data in self.delayed_callback_calls:
+ self._call_callbacks(*cb_data)
+ self.delayed_callback_calls.clear()
+
+ self._call_callbacks(segment, response)
+
+ self._process_response(segment, response)
+
+ def _process_response(self, segment, response):
+ if response.code == '3920':
+ self.allowed_security_functions = list(response.parameters)
+ if self.selected_security_function is None or not self.selected_security_function in self.allowed_security_functions:
+ # Select the first available twostep security_function that we support
+ for security_function, parameter in self.get_tan_mechanisms().items():
+ if security_function == '999':
+ # Skip onestep TAN
+ continue
+ if parameter.tan_process != '2':
+ # Only support process variant 2 for now
+ continue
+ try:
+ self.set_tan_mechanism(parameter.security_function)
+ break
+ except NotImplementedError:
+ pass
+ else:
+ # Fall back to onestep
+ self.set_tan_mechanism('999')
+
+ def _log_response(self, segment, response):
+ if response.code[0] in ('0', '1'):
+ log_target = logger.info
+ elif response.code[0] in ('3'):
+ log_target = logger.warning
+ else:
+ log_target = logger.error
+
+ log_target("Dialog response: {} - {}{}".format(
+ response.code,
+ response.text,
+ " ({!r})".format(response.parameters) if response.parameters else "")
+ )
def get_sepa_accounts(self):
"""
for resp in response.response_segments(seg, 'HITAB'):
return resp.tan_usage_option, list(resp.tan_media_list)
+ def add_response_callback(self, cb):
+ # FIXME document
+ self.response_callbacks.append(cb)
+
+ def remove_response_callback(self, cb):
+ # FIXME document
+ self.response_callbacks.remove(cb)
+
+ def _call_callbacks(self, *cb_data):
+ for cb in self.response_callbacks:
+ cb(*cb_data)
+
def pause_dialog(self):
"""Pause a standing dialog and return the saved dialog state.
import base64
+import logging
+import io
import requests
from fints.parser import FinTS3Parser
from .message import FinTSInstituteMessage, FinTSMessage
+logger = logging.getLogger(__name__)
class FinTSConnectionError(Exception):
pass
self.url = url
def send(self, msg: FinTSMessage):
- print("Sending >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
+ log_out = io.StringIO()
with Password.protect():
- msg.print_nested()
- print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
+ msg.print_nested(stream=log_out, prefix="\t")
+ logger.debug("Sending >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n{}\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n".format(log_out.getvalue()))
+ log_out.truncate(0)
+
r = requests.post(
self.url, data=base64.b64encode(msg.render_bytes()),
)
+
if r.status_code < 200 or r.status_code > 299:
raise FinTSConnectionError('Bad status code {}'.format(r.status_code))
+
response = base64.b64decode(r.content.decode('iso-8859-1'))
retval = FinTSInstituteMessage(segments=response)
- #import pprint; pprint.pprint(response) FIXME Remove
- print("Received <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
+
with Password.protect():
- retval.print_nested()
- print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
+ retval.print_nested(stream=log_out, prefix="\t")
+ logger.debug("Received <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n{}\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n".format(log_out.getvalue()))
return retval