]> git.ipfire.org Git - thirdparty/python-fints.git/commitdiff
Move all of the PIN/TAN specific handling into FinTS3PinTanClient
authorHenryk Plötz <henryk@ploetzli.ch>
Sat, 25 Aug 2018 14:45:17 +0000 (16:45 +0200)
committerRaphael Michel <mail@raphaelmichel.de>
Mon, 3 Dec 2018 18:34:29 +0000 (19:34 +0100)
Move callbacks so that callback/logging is not called for internal processing

fints/client.py
fints/dialog.py

index 6c368a88f7d915ec42ee530925bd2fa9262e4a14..28e60d9e85519afca96fc129508d8c43d6072a43 100644 (file)
@@ -15,7 +15,7 @@ from .formals import (
     SynchronisationMode, TwoStepParametersCommon,
     TANMediaType2, TANMediaClass4,
 )
-from .message import FinTSMessageOLD
+from .message import FinTSInstituteMessage, FinTSMessageOLD
 from .models import (
     SEPAAccount, TANChallenge, TANChallenge3,
     TANChallenge4, TANChallenge5, TANChallenge6,
@@ -62,9 +62,6 @@ class FinTS3Client:
         self.upd_version = 0
         self.upa = None
         self.upd = SegmentSequence()
-        self.allowed_security_functions = []
-        self.selected_security_function = None
-        self.selected_tan_medium = None
         self.product_name = 'pyfints'
         self.product_version = '0.2'
         self.response_callbacks = []
@@ -82,9 +79,56 @@ class FinTS3Client:
     def _ensure_system_id(self):
         raise NotImplemented()
 
+    def _process_response(self, segment, response):
+        pass
+
+    def _process_response_segments(self, message: FinTSInstituteMessage, internal_send=True):
+        bpa = message.find_segment_first(HIBPA3)
+        if bpa:
+            self.bpa = bpa
+            self.bpd_version = bpa.bpd_version
+            self.bpd = SegmentSequence(
+                message.find_segments(
+                    callback = lambda m: len(m.header.type) == 6 and m.header.type[1] == 'I' and m.header.type[5] == 'S'
+                )
+            )
+
+        upa = message.find_segment_first(HIUPA4)
+        if upa:
+            self.upa = upa
+            self.upd_version = upa.upd_version
+            self.upd = SegmentSequence(
+                message.find_segments('HIUPD')
+            )
+
+        for seg in message.find_segments(HIRMG2):
+            for response in seg.responses:
+                if not internal_send:
+                    self._log_response(None, response)
+
+                    self._call_callbacks(None, response)
+
+                self._process_response(None, response)
+
+        for seg in message.find_segments(HIRMS2):
+            for response in seg.responses:
+                segment = None # FIXME: Provide segment
+
+                if not internal_send:
+                    self._log_response(segment, response)
+
+                    self._call_callbacks(segment, response)
+
+                self._process_response(segment, response)
+
+    def process_dialog_response(self, message: FinTSInstituteMessage):
+        self._process_response_segments(message, internal_send=True)
+
     def _send(self, dialog, *segments):
         "Internal send, may be overriden in subclasses"
-        return dialog.send(*segments)
+        response = dialog.send(*segments)
+        self._process_response_segments(response, internal_send=False)
+        return response
 
     def __enter__(self):
         if self._standing_dialog:
@@ -116,29 +160,23 @@ class FinTS3Client:
         self.system_id = data.get('system_id', self.system_id)
 
         if all(x in data for x in ('bpd_bin', 'bpa_bin', 'bpd_version')):
-            if data['bpd_version'] >= self.bpd_version:
+            if data['bpd_version'] >= self.bpd_version and data['bpa_bin']:
                 self.bpd = SegmentSequence(data['bpd_bin'])
                 self.bpa = SegmentSequence(data['bpa_bin']).segments[0]
                 self.bpd_version = data['bpd_version']
 
-        self.selected_security_function = data.get('selected_security_function', self.selected_security_function)
-        self.allowed_security_functions = data.get('allowed_security_functions', self.allowed_security_functions)
-
         if all(x in data for x in ('upd_bin', 'upa_bin', 'upd_version')):
-            if data['upd_version'] >= self.upd_version:
+            if data['upd_version'] >= self.upd_version and data['upa_bin']:
                 self.upd = SegmentSequence(data['upd_bin'])
                 self.upa = SegmentSequence(data['upa_bin']).segments[0]
                 self.upd_version = data['upd_version']
 
-    def get_data(self, including_private=False):
-        # FIXME Test, document
+    def _get_data_v1(self, including_private=False):
         data = {
             "system_id": self.system_id,
             "bpd_bin": self.bpd.render_bytes(),
             "bpa_bin": FinTS3Serializer().serialize_message(self.bpa) if self.bpa else None,
             "bpd_version": self.bpd_version,
-            "selected_security_function": self.selected_security_function,
-            "selected_tan_medium": self.selected_tan_medium,
         }
 
         if including_private:
@@ -146,92 +184,19 @@ class FinTS3Client:
                 "upd_bin": self.upd.render_bytes(),
                 "upa_bin": FinTS3Serializer().serialize_message(self.upa) if self.upa else None,
                 "upd_version": self.upd_version,
-                "allowed_security_functions": self.allowed_security_functions,
             })
 
+        return data
+
+    def get_data(self, including_private=False):
+        # FIXME Test, document
+        data = self._get_data_v1(including_private=including_private)
         return compress_datablob(DATA_BLOB_MAGIC, 1, data)
 
     def set_data(self, blob):
         # FIXME Test, document
         decompress_datablob(DATA_BLOB_MAGIC, self, blob)
 
-    def process_institute_response(self, message):
-        bpa = message.find_segment_first(HIBPA3)
-        if bpa:
-            self.bpa = bpa
-            self.bpd_version = bpa.bpd_version
-            self.bpd = SegmentSequence(
-                message.find_segments(
-                    callback = lambda m: len(m.header.type) == 6 and m.header.type[1] == 'I' and m.header.type[5] == 'S'
-                )
-            )
-
-        upa = message.find_segment_first(HIUPA4)
-        if upa:
-            self.upa = upa
-            self.upd_version = upa.upd_version
-            self.upd = SegmentSequence(
-                message.find_segments('HIUPD')
-            )
-
-        # We'll delay calling the callback for HIRMG/3060 ("please note the warnings")
-        #  to see if any warning other than HIRMS(HKVVD)/3050 ("UPD/BPD not current; current version attached")
-        #  or HIRMS(HKVVD)/3920 ("allowed TAN methods") are present
-        # 3050 and 3920 are handled completely internally and never passed to the callback
-        # 3060 is only passed to the callbacks, if any warning/error other than 3050/3920 is present
-        self.delayed_callback_calls = []
-
-        for seg in message.find_segments(HIRMG2):
-            for response in seg.responses:
-                self._log_response(None, response)
-
-                if response.code == '3060':
-                    self.delayed_callback_calls.append( (None, response) )
-                else:
-                    self._call_callbacks(None, response)
-
-                if response.code.startswith('9'):
-                    raise Exception("FinTS error response: {!r}".format(response))
-
-                self._process_response(None, response)
-
-        for seg in message.find_segments(HIRMS2):
-            for response in seg.responses:
-                segment = None # FIXME: Provide segment
-
-                self._log_response(None, response)
-
-                if response.code not in ('3050', '3920'):
-                    if not response.code.startswith('0') and not response.code.startswith('1'):
-                        for cb_data in self.delayed_callback_calls:
-                            self._call_callbacks(*cb_data)
-                        self.delayed_callback_calls.clear()
-
-                    self._call_callbacks(segment, response)
-
-                self._process_response(segment, response)   
-
-    def _process_response(self, segment, response):
-        if response.code == '3920':
-            self.allowed_security_functions = list(response.parameters)
-            if self.selected_security_function is None or not self.selected_security_function in self.allowed_security_functions:
-                # Select the first available twostep security_function that we support
-                for security_function, parameter in self.get_tan_mechanisms().items():
-                    if security_function == '999':
-                        # Skip onestep TAN
-                        continue
-                    if parameter.tan_process != '2':
-                        # Only support process variant 2 for now
-                        continue
-                    try:
-                        self.set_tan_mechanism(parameter.security_function)
-                        break
-                    except NotImplementedError:
-                        pass
-                else:
-                    # Fall back to onestep
-                    self.set_tan_mechanism('999')
-
     def _log_response(self, segment, response):
         if response.code[0] in ('0', '1'):
             log_target = logger.info
@@ -254,7 +219,7 @@ class FinTS3Client:
         """
 
         with self._get_dialog() as dialog:
-            response = dialog.send(HKSPA1())
+            response = self._send(dialog, HKSPA1())
             
         self.accounts = []
         for seg in response.find_segments(HISPA1):
@@ -277,7 +242,7 @@ class FinTS3Client:
         while touchdown or touchdown_counter == 1:
             seg = segment_factory(touchdown)
 
-            rm = dialog.send(seg)
+            rm = self._send(dialog, seg)
 
             for resp in rm.response_segments(seg, *args, **kwargs):
                 responses.append(resp)
@@ -366,7 +331,7 @@ class FinTS3Client:
                 all_accounts=False,
             )
 
-            response = dialog.send(seg)
+            response = self._send(dialog, seg)
 
             for resp in response.response_segments(seg, 'HISAL'):
                 return resp.balance_booked.as_mt940_Balance()
@@ -386,7 +351,7 @@ class FinTS3Client:
                 account=hkwpd._fields['account'].type.from_sepa_account(account),
             )
 
-            response = dialog.send(seg)
+            response = self._send(dialog, seg)
 
             for resp in response.response_segments(seg, 'HIWPD'):
                 ## FIXME BROKEN
@@ -532,57 +497,13 @@ class FinTS3Client:
                 book_as_single
             )))
 
-        resp = dialog.send(self._get_start_sepa_debit_message(
+        resp = self._send(dialog, self._get_start_sepa_debit_message(
             dialog, account, pain_message, tan_method, tan_description, multiple, control_sum, currency,
             book_as_single
         ))
         logger.debug('Got response: {}'.format(resp))
         return self._tan_requiring_response(dialog, resp)
 
-    def get_tan_mechanisms(self):
-        """
-        Get the available TAN mechanisms.
-
-        :return: Dictionary of security_function: TwoStepParameters[1-5] objects.
-        """
-
-        retval = OrderedDict()
-
-        for version in range(1, 6):
-            for seg in self.bpd.find_segments('HITANS', version):
-                for parameter in seg.parameter.twostep_parameters:
-                    if parameter.security_function in self.allowed_security_functions:
-                        retval[parameter.security_function] = parameter
-
-        return retval
-
-    def get_current_tan_mechanism(self):
-        return self.selected_security_function
-
-    def set_tan_mechanism(self, security_function):
-        self.selected_security_function = security_function
-
-    def set_tan_medium(self, tan_medium):
-        self.selected_tan_medium = tan_medium
-
-    def get_tan_media(self, media_type = TANMediaType2.ALL, media_class = TANMediaClass4.ALL):
-        """Get information about TAN lists/generators.
-
-        Returns tuple of fints.formals.TANUsageOption and a list of fints.formals.TANMedia4 or fints.formals.TANMedia5 objects."""
-
-        with self._get_dialog() as dialog:
-            hktab = self._find_highest_supported_command(HKTAB4, HKTAB5)
-
-            seg = hktab(
-                tan_media_type = media_type,
-                tan_media_class = str(media_class),
-            )
-
-            response = dialog.send(seg)
-
-            for resp in response.response_segments(seg, 'HITAB'):
-                return resp.tan_usage_option, list(resp.tan_media_list)
-
     def add_response_callback(self, cb):
         # FIXME document
         self.response_callbacks.append(cb)
@@ -667,6 +588,9 @@ class FinTS3PinTanClient(FinTS3Client):
         self.pin = Password(pin)
         self._pending_tan = None
         self.connection = FinTSHTTPSConnection(server)
+        self.allowed_security_functions = []
+        self.selected_security_function = None
+        self.selected_tan_medium = None
         super().__init__(bank_identifier=bank_identifier, user_id=user_id, customer_id=customer_id, *args, **kwargs)
 
     def _new_dialog(self, lazy_init=False):
@@ -701,6 +625,26 @@ class FinTS3PinTanClient(FinTS3Client):
             raise ValueError('Could not find system_id')
         self.system_id = seg.system_id
 
+    def _set_data_v1(self, data):
+        super()._set_data_v1(data)
+        self.selected_tan_medium = data.get('selected_tan_medium', self.selected_tan_medium)
+        self.selected_security_function = data.get('selected_security_function', self.selected_security_function)
+        self.allowed_security_functions = data.get('allowed_security_functions', self.allowed_security_functions)
+
+    def _get_data_v1(self, including_private=False):
+        data = super()._get_data_v1(including_private=including_private)
+        data.update({
+            "selected_security_function": self.selected_security_function,
+            "selected_tan_medium": self.selected_tan_medium,
+        })
+
+        if including_private:
+            data.update({
+                "allowed_security_functions": self.allowed_security_functions,
+            })
+
+        return data
+
     def _get_tan_segment(self, orig_seg, tan_process, tan_seg=None):
         tan_mechanism = self.get_tan_mechanisms()[self.get_current_tan_mechanism()]
 
@@ -785,3 +729,73 @@ class FinTS3PinTanClient(FinTS3Client):
             # FIXME Try to return a better return code
 
         return response
+
+    def _process_response(self, segment, response):
+        if response.code == '3920':
+            self.allowed_security_functions = list(response.parameters)
+            if self.selected_security_function is None or not self.selected_security_function in self.allowed_security_functions:
+                # Select the first available twostep security_function that we support
+                for security_function, parameter in self.get_tan_mechanisms().items():
+                    if security_function == '999':
+                        # Skip onestep TAN
+                        continue
+                    if parameter.tan_process != '2':
+                        # Only support process variant 2 for now
+                        continue
+                    try:
+                        self.set_tan_mechanism(parameter.security_function)
+                        break
+                    except NotImplementedError:
+                        pass
+                else:
+                    # Fall back to onestep
+                    self.set_tan_mechanism('999')
+
+    def get_tan_mechanisms(self):
+        """
+        Get the available TAN mechanisms.
+
+        :return: Dictionary of security_function: TwoStepParameters[1-5] objects.
+        """
+
+        retval = OrderedDict()
+
+        for version in range(1, 6):
+            for seg in self.bpd.find_segments('HITANS', version):
+                for parameter in seg.parameter.twostep_parameters:
+                    if parameter.security_function in self.allowed_security_functions:
+                        retval[parameter.security_function] = parameter
+
+        return retval
+
+    def get_current_tan_mechanism(self):
+        return self.selected_security_function
+
+    def set_tan_mechanism(self, security_function):
+        if self._standing_dialog:
+            raise Exception("Cannot change TAN mechanism with a standing dialog")
+        self.selected_security_function = security_function
+
+    def set_tan_medium(self, tan_medium):
+        if self._standing_dialog:
+            raise Exception("Cannot change TAN medium with a standing dialog")
+        self.selected_tan_medium = tan_medium
+
+    def get_tan_media(self, media_type = TANMediaType2.ALL, media_class = TANMediaClass4.ALL):
+        """Get information about TAN lists/generators.
+
+        Returns tuple of fints.formals.TANUsageOption and a list of fints.formals.TANMedia4 or fints.formals.TANMedia5 objects."""
+
+        with self._get_dialog() as dialog:
+            hktab = self._find_highest_supported_command(HKTAB4, HKTAB5)
+
+            seg = hktab(
+                tan_media_type = media_type,
+                tan_media_class = str(media_class),
+            )
+
+            response = self._send(dialog, seg)
+
+            for resp in response.response_segments(seg, 'HITAB'):
+                return resp.tan_usage_option, list(resp.tan_media_list)
+
index b528bb44e522aaea7d9aab61e2f887f1520b3070..ff303ee7e9409d89ae5f4d0830475ff639ac79d4 100644 (file)
@@ -75,6 +75,7 @@ class FinTSDialog:
             try:
                 self.open = True
                 retval = self.send(*segments)
+                self.client.process_dialog_response(retval)
                 self.need_init = False
                 return retval
             except:
@@ -88,7 +89,8 @@ class FinTSDialog:
             raise Error("Cannot end() on a paused dialog")
 
         if self.open:
-            self.send(HKEND1(self.dialogue_id))
+            response = self.send(HKEND1(self.dialogue_id))
+            self.client.process_dialog_response(response)
             self.open = False
 
     def send(self, *segments):
@@ -130,8 +132,6 @@ class FinTSDialog:
                 raise ValueError('Could not find dialogue_id')
             self.dialogue_id = seg.dialogue_id
 
-        self.client.process_institute_response(response)
-
         return response
 
     def new_customer_message(self):