From: Henryk Plötz Date: Sat, 25 Aug 2018 14:45:17 +0000 (+0200) Subject: Move all of the PIN/TAN specific handling into FinTS3PinTanClient X-Git-Tag: v2.0.0~1^2~82 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=198658a033c6657156a5614f277aa57f0511041f;p=thirdparty%2Fpython-fints.git Move all of the PIN/TAN specific handling into FinTS3PinTanClient Move callbacks so that callback/logging is not called for internal processing --- diff --git a/fints/client.py b/fints/client.py index 6c368a8..28e60d9 100644 --- a/fints/client.py +++ b/fints/client.py @@ -15,7 +15,7 @@ from .formals import ( SynchronisationMode, TwoStepParametersCommon, TANMediaType2, TANMediaClass4, ) -from .message import FinTSMessageOLD +from .message import FinTSInstituteMessage, FinTSMessageOLD from .models import ( SEPAAccount, TANChallenge, TANChallenge3, TANChallenge4, TANChallenge5, TANChallenge6, @@ -62,9 +62,6 @@ class FinTS3Client: self.upd_version = 0 self.upa = None self.upd = SegmentSequence() - self.allowed_security_functions = [] - self.selected_security_function = None - self.selected_tan_medium = None self.product_name = 'pyfints' self.product_version = '0.2' self.response_callbacks = [] @@ -82,9 +79,56 @@ class FinTS3Client: def _ensure_system_id(self): raise NotImplemented() + def _process_response(self, segment, response): + pass + + def _process_response_segments(self, message: FinTSInstituteMessage, internal_send=True): + bpa = message.find_segment_first(HIBPA3) + if bpa: + self.bpa = bpa + self.bpd_version = bpa.bpd_version + self.bpd = SegmentSequence( + message.find_segments( + callback = lambda m: len(m.header.type) == 6 and m.header.type[1] == 'I' and m.header.type[5] == 'S' + ) + ) + + upa = message.find_segment_first(HIUPA4) + if upa: + self.upa = upa + self.upd_version = upa.upd_version + self.upd = SegmentSequence( + message.find_segments('HIUPD') + ) + + for seg in message.find_segments(HIRMG2): + for response in seg.responses: + if not internal_send: + self._log_response(None, response) + + self._call_callbacks(None, response) + + self._process_response(None, response) + + for seg in message.find_segments(HIRMS2): + for response in seg.responses: + segment = None # FIXME: Provide segment + + if not internal_send: + self._log_response(segment, response) + + self._call_callbacks(segment, response) + + self._process_response(segment, response) + + def process_dialog_response(self, message: FinTSInstituteMessage): + self._process_response_segments(message, internal_send=True) + def _send(self, dialog, *segments): "Internal send, may be overriden in subclasses" - return dialog.send(*segments) + response = dialog.send(*segments) + self._process_response_segments(response, internal_send=False) + return response def __enter__(self): if self._standing_dialog: @@ -116,29 +160,23 @@ class FinTS3Client: self.system_id = data.get('system_id', self.system_id) if all(x in data for x in ('bpd_bin', 'bpa_bin', 'bpd_version')): - if data['bpd_version'] >= self.bpd_version: + if data['bpd_version'] >= self.bpd_version and data['bpa_bin']: self.bpd = SegmentSequence(data['bpd_bin']) self.bpa = SegmentSequence(data['bpa_bin']).segments[0] self.bpd_version = data['bpd_version'] - self.selected_security_function = data.get('selected_security_function', self.selected_security_function) - self.allowed_security_functions = data.get('allowed_security_functions', self.allowed_security_functions) - if all(x in data for x in ('upd_bin', 'upa_bin', 'upd_version')): - if data['upd_version'] >= self.upd_version: + if data['upd_version'] >= self.upd_version and data['upa_bin']: self.upd = SegmentSequence(data['upd_bin']) self.upa = SegmentSequence(data['upa_bin']).segments[0] self.upd_version = data['upd_version'] - def get_data(self, including_private=False): - # FIXME Test, document + def _get_data_v1(self, including_private=False): data = { "system_id": self.system_id, "bpd_bin": self.bpd.render_bytes(), "bpa_bin": FinTS3Serializer().serialize_message(self.bpa) if self.bpa else None, "bpd_version": self.bpd_version, - "selected_security_function": self.selected_security_function, - "selected_tan_medium": self.selected_tan_medium, } if including_private: @@ -146,92 +184,19 @@ class FinTS3Client: "upd_bin": self.upd.render_bytes(), "upa_bin": FinTS3Serializer().serialize_message(self.upa) if self.upa else None, "upd_version": self.upd_version, - "allowed_security_functions": self.allowed_security_functions, }) + return data + + def get_data(self, including_private=False): + # FIXME Test, document + data = self._get_data_v1(including_private=including_private) return compress_datablob(DATA_BLOB_MAGIC, 1, data) def set_data(self, blob): # FIXME Test, document decompress_datablob(DATA_BLOB_MAGIC, self, blob) - def process_institute_response(self, message): - bpa = message.find_segment_first(HIBPA3) - if bpa: - self.bpa = bpa - self.bpd_version = bpa.bpd_version - self.bpd = SegmentSequence( - message.find_segments( - callback = lambda m: len(m.header.type) == 6 and m.header.type[1] == 'I' and m.header.type[5] == 'S' - ) - ) - - upa = message.find_segment_first(HIUPA4) - if upa: - self.upa = upa - self.upd_version = upa.upd_version - self.upd = SegmentSequence( - message.find_segments('HIUPD') - ) - - # We'll delay calling the callback for HIRMG/3060 ("please note the warnings") - # to see if any warning other than HIRMS(HKVVD)/3050 ("UPD/BPD not current; current version attached") - # or HIRMS(HKVVD)/3920 ("allowed TAN methods") are present - # 3050 and 3920 are handled completely internally and never passed to the callback - # 3060 is only passed to the callbacks, if any warning/error other than 3050/3920 is present - self.delayed_callback_calls = [] - - for seg in message.find_segments(HIRMG2): - for response in seg.responses: - self._log_response(None, response) - - if response.code == '3060': - self.delayed_callback_calls.append( (None, response) ) - else: - self._call_callbacks(None, response) - - if response.code.startswith('9'): - raise Exception("FinTS error response: {!r}".format(response)) - - self._process_response(None, response) - - for seg in message.find_segments(HIRMS2): - for response in seg.responses: - segment = None # FIXME: Provide segment - - self._log_response(None, response) - - if response.code not in ('3050', '3920'): - if not response.code.startswith('0') and not response.code.startswith('1'): - for cb_data in self.delayed_callback_calls: - self._call_callbacks(*cb_data) - self.delayed_callback_calls.clear() - - self._call_callbacks(segment, response) - - self._process_response(segment, response) - - def _process_response(self, segment, response): - if response.code == '3920': - self.allowed_security_functions = list(response.parameters) - if self.selected_security_function is None or not self.selected_security_function in self.allowed_security_functions: - # Select the first available twostep security_function that we support - for security_function, parameter in self.get_tan_mechanisms().items(): - if security_function == '999': - # Skip onestep TAN - continue - if parameter.tan_process != '2': - # Only support process variant 2 for now - continue - try: - self.set_tan_mechanism(parameter.security_function) - break - except NotImplementedError: - pass - else: - # Fall back to onestep - self.set_tan_mechanism('999') - def _log_response(self, segment, response): if response.code[0] in ('0', '1'): log_target = logger.info @@ -254,7 +219,7 @@ class FinTS3Client: """ with self._get_dialog() as dialog: - response = dialog.send(HKSPA1()) + response = self._send(dialog, HKSPA1()) self.accounts = [] for seg in response.find_segments(HISPA1): @@ -277,7 +242,7 @@ class FinTS3Client: while touchdown or touchdown_counter == 1: seg = segment_factory(touchdown) - rm = dialog.send(seg) + rm = self._send(dialog, seg) for resp in rm.response_segments(seg, *args, **kwargs): responses.append(resp) @@ -366,7 +331,7 @@ class FinTS3Client: all_accounts=False, ) - response = dialog.send(seg) + response = self._send(dialog, seg) for resp in response.response_segments(seg, 'HISAL'): return resp.balance_booked.as_mt940_Balance() @@ -386,7 +351,7 @@ class FinTS3Client: account=hkwpd._fields['account'].type.from_sepa_account(account), ) - response = dialog.send(seg) + response = self._send(dialog, seg) for resp in response.response_segments(seg, 'HIWPD'): ## FIXME BROKEN @@ -532,57 +497,13 @@ class FinTS3Client: book_as_single ))) - resp = dialog.send(self._get_start_sepa_debit_message( + resp = self._send(dialog, self._get_start_sepa_debit_message( dialog, account, pain_message, tan_method, tan_description, multiple, control_sum, currency, book_as_single )) logger.debug('Got response: {}'.format(resp)) return self._tan_requiring_response(dialog, resp) - def get_tan_mechanisms(self): - """ - Get the available TAN mechanisms. - - :return: Dictionary of security_function: TwoStepParameters[1-5] objects. - """ - - retval = OrderedDict() - - for version in range(1, 6): - for seg in self.bpd.find_segments('HITANS', version): - for parameter in seg.parameter.twostep_parameters: - if parameter.security_function in self.allowed_security_functions: - retval[parameter.security_function] = parameter - - return retval - - def get_current_tan_mechanism(self): - return self.selected_security_function - - def set_tan_mechanism(self, security_function): - self.selected_security_function = security_function - - def set_tan_medium(self, tan_medium): - self.selected_tan_medium = tan_medium - - def get_tan_media(self, media_type = TANMediaType2.ALL, media_class = TANMediaClass4.ALL): - """Get information about TAN lists/generators. - - Returns tuple of fints.formals.TANUsageOption and a list of fints.formals.TANMedia4 or fints.formals.TANMedia5 objects.""" - - with self._get_dialog() as dialog: - hktab = self._find_highest_supported_command(HKTAB4, HKTAB5) - - seg = hktab( - tan_media_type = media_type, - tan_media_class = str(media_class), - ) - - response = dialog.send(seg) - - for resp in response.response_segments(seg, 'HITAB'): - return resp.tan_usage_option, list(resp.tan_media_list) - def add_response_callback(self, cb): # FIXME document self.response_callbacks.append(cb) @@ -667,6 +588,9 @@ class FinTS3PinTanClient(FinTS3Client): self.pin = Password(pin) self._pending_tan = None self.connection = FinTSHTTPSConnection(server) + self.allowed_security_functions = [] + self.selected_security_function = None + self.selected_tan_medium = None super().__init__(bank_identifier=bank_identifier, user_id=user_id, customer_id=customer_id, *args, **kwargs) def _new_dialog(self, lazy_init=False): @@ -701,6 +625,26 @@ class FinTS3PinTanClient(FinTS3Client): raise ValueError('Could not find system_id') self.system_id = seg.system_id + def _set_data_v1(self, data): + super()._set_data_v1(data) + self.selected_tan_medium = data.get('selected_tan_medium', self.selected_tan_medium) + self.selected_security_function = data.get('selected_security_function', self.selected_security_function) + self.allowed_security_functions = data.get('allowed_security_functions', self.allowed_security_functions) + + def _get_data_v1(self, including_private=False): + data = super()._get_data_v1(including_private=including_private) + data.update({ + "selected_security_function": self.selected_security_function, + "selected_tan_medium": self.selected_tan_medium, + }) + + if including_private: + data.update({ + "allowed_security_functions": self.allowed_security_functions, + }) + + return data + def _get_tan_segment(self, orig_seg, tan_process, tan_seg=None): tan_mechanism = self.get_tan_mechanisms()[self.get_current_tan_mechanism()] @@ -785,3 +729,73 @@ class FinTS3PinTanClient(FinTS3Client): # FIXME Try to return a better return code return response + + def _process_response(self, segment, response): + if response.code == '3920': + self.allowed_security_functions = list(response.parameters) + if self.selected_security_function is None or not self.selected_security_function in self.allowed_security_functions: + # Select the first available twostep security_function that we support + for security_function, parameter in self.get_tan_mechanisms().items(): + if security_function == '999': + # Skip onestep TAN + continue + if parameter.tan_process != '2': + # Only support process variant 2 for now + continue + try: + self.set_tan_mechanism(parameter.security_function) + break + except NotImplementedError: + pass + else: + # Fall back to onestep + self.set_tan_mechanism('999') + + def get_tan_mechanisms(self): + """ + Get the available TAN mechanisms. + + :return: Dictionary of security_function: TwoStepParameters[1-5] objects. + """ + + retval = OrderedDict() + + for version in range(1, 6): + for seg in self.bpd.find_segments('HITANS', version): + for parameter in seg.parameter.twostep_parameters: + if parameter.security_function in self.allowed_security_functions: + retval[parameter.security_function] = parameter + + return retval + + def get_current_tan_mechanism(self): + return self.selected_security_function + + def set_tan_mechanism(self, security_function): + if self._standing_dialog: + raise Exception("Cannot change TAN mechanism with a standing dialog") + self.selected_security_function = security_function + + def set_tan_medium(self, tan_medium): + if self._standing_dialog: + raise Exception("Cannot change TAN medium with a standing dialog") + self.selected_tan_medium = tan_medium + + def get_tan_media(self, media_type = TANMediaType2.ALL, media_class = TANMediaClass4.ALL): + """Get information about TAN lists/generators. + + Returns tuple of fints.formals.TANUsageOption and a list of fints.formals.TANMedia4 or fints.formals.TANMedia5 objects.""" + + with self._get_dialog() as dialog: + hktab = self._find_highest_supported_command(HKTAB4, HKTAB5) + + seg = hktab( + tan_media_type = media_type, + tan_media_class = str(media_class), + ) + + response = self._send(dialog, seg) + + for resp in response.response_segments(seg, 'HITAB'): + return resp.tan_usage_option, list(resp.tan_media_list) + diff --git a/fints/dialog.py b/fints/dialog.py index b528bb4..ff303ee 100644 --- a/fints/dialog.py +++ b/fints/dialog.py @@ -75,6 +75,7 @@ class FinTSDialog: try: self.open = True retval = self.send(*segments) + self.client.process_dialog_response(retval) self.need_init = False return retval except: @@ -88,7 +89,8 @@ class FinTSDialog: raise Error("Cannot end() on a paused dialog") if self.open: - self.send(HKEND1(self.dialogue_id)) + response = self.send(HKEND1(self.dialogue_id)) + self.client.process_dialog_response(response) self.open = False def send(self, *segments): @@ -130,8 +132,6 @@ class FinTSDialog: raise ValueError('Could not find dialogue_id') self.dialogue_id = seg.dialogue_id - self.client.process_institute_response(response) - return response def new_customer_message(self):