]> git.ipfire.org Git - thirdparty/python-fints.git/commitdiff
VoP support (#199)
authorrhn <gihu.rhn@porcupinefactory.org>
Thu, 27 Nov 2025 18:33:22 +0000 (19:33 +0100)
committerGitHub <noreply@github.com>
Thu, 27 Nov 2025 18:33:22 +0000 (19:33 +0100)
* WIP: Add VoP segments

* VoP generally works

* works also with full match

* document a bit

* Improved implementation

* Fix failing tests

---------

Co-authored-by: Raphael Michel <michel@rami.io>
docs/registration.rst [new file with mode: 0644]
docs/transfers.rst
docs/trouble.rst
fints/client.py
fints/fields.py
fints/segments/auth.py
fints/types.py

diff --git a/docs/registration.rst b/docs/registration.rst
new file mode 100644 (file)
index 0000000..7757bd6
--- /dev/null
@@ -0,0 +1,11 @@
+Registration necessary
+======================
+
+As of September 14th, 2019, all FinTS programs need to be registered with the ZKA or
+banks will block access. You need to fill out a PDF form and will be assigned a
+product ID that you can pass above.
+
+Click here to read more about the `registration process`_.
+
+
+.. _registration process: https://www.hbci-zka.de/register/prod_register.htm
index 787a9cac4f5ef829152c1062271fe81eddcfe9da..343cf8305c76d14b2bd8f32df4f8ebb1365cca32 100644 (file)
@@ -12,8 +12,16 @@ You can create a simple SEPA transfer using this convenient client method:
    :members: simple_sepa_transfer
    :noindex:
 
+The return value may be a `NeedVOPResponse` in which case you need to call `approve_vop_response` to proceed.
+
+At any point, you might receive a `NeedTANResponse`.
 You should then enter a TAN, read our chapter :ref:`tans` to find out more.
 
+.. autoclass:: fints.client.FinTS3PinTanClient
+   :members: approve_vop_response
+   :noindex:
+
+
 Advanced mode
 -------------
 
@@ -55,20 +63,37 @@ Full example
             endtoend_id='NOTPROVIDED',
         )
 
-        while isinstance(res, NeedTANResponse):
-            print("A TAN is required", res.challenge)
-
-            if getattr(res, 'challenge_hhduc', None):
-                try:
-                    terminal_flicker_unix(res.challenge_hhduc)
-                except KeyboardInterrupt:
-                    pass
-
-            if result.decoupled:
-                tan = input('Please press enter after confirming the transaction in your app:')
-            else:
-                tan = input('Please enter TAN:')
-            res = client.send_tan(res, tan)
+        while isinstance(res, NeedTANResponse | NeedVOPResponse):
+            if isinstance(res, NeedTANResponse):
+                print("A TAN is required", res.challenge)
+
+                if getattr(res, 'challenge_hhduc', None):
+                    try:
+                        terminal_flicker_unix(res.challenge_hhduc)
+                    except KeyboardInterrupt:
+                        pass
+
+                if result.decoupled:
+                    tan = input('Please press enter after confirming the transaction in your app:')
+                else:
+                    tan = input('Please enter TAN:')
+                res = client.send_tan(res, tan)
+            elif isinstance(res, NeedVOPResponse):
+                if res.vop_result.vop_single_result.result == "RVMC":
+                    print("Payee name is a close match")
+                    print("Name retrieved by bank:", res.vop_result.vop_single_result.close_match_name)
+                    if res.vop_result.vop_single_result.other_identification:
+                        print("Other info retrieved by bank:", res.vop_result.vop_single_result.other_identification)
+                elif res.vop_result.vop_single_result.result == "RVNM":
+                    print("Payee name does not match match")
+                elif res.vop_result.vop_single_result.result == "RVNA":
+                    print("Payee name could not be verified")
+                    print("Reason:", res.vop_result.vop_single_result.na_reason)
+                elif res.vop_result.vop_single_result.result == "PDNG":
+                    print("Payee name could not be verified (pending state, can't be handled by this library)")
+                print("Do you want to continue? Your bank will not be liable if the money ends up in the wrong place.")
+                input('Please press enter to confirm or Ctrl+C to cancel')
+                res = client.approve_vop_response(res)
 
         print(res.status)
         print(res.responses)
index 0a4a9c7d9d04f7353b28a0ef1ba47fef468b080d..95ab464f6d4718b72891361bf65c84c59d466721 100644 (file)
@@ -65,6 +65,24 @@ the problem.
         return f.send_tan(response, tan)
 
 
+    def ask_for_vop(response: NeedVOPResponse):
+        if response.vop_result.vop_single_result.result == "RVMC":
+            print("Payee name is a close match")
+            print("Name retrieved by bank:", response.vop_result.vop_single_result.close_match_name)
+            if response.vop_result.vop_single_result.other_identification:
+                print("Other info retrieved by bank:", response.vop_result.vop_single_result.other_identification)
+        elif response.vop_result.vop_single_result.result == "RVNM":
+            print("Payee name does not match match")
+        elif response.vop_result.vop_single_result.result == "RVNA":
+            print("Payee name could not be verified")
+            print("Reason:", response.vop_result.vop_single_result.na_reason)
+        elif response.vop_result.vop_single_result.result == "PDNG":
+            print("Payee name could not be verified (pending state, can't be handled by this library)")
+        print("Do you want to continue? Your bank will not be liable if the money ends up in the wrong place.")
+        input('Please press enter to confirm or Ctrl+C to cancel')
+        return f.approve_vop_response(response)
+
+
     # Open the actual dialog
     with f:
         # Since PSD2, a TAN might be needed for dialog initialization. Let's check if there is one required
@@ -172,8 +190,11 @@ the problem.
                         endtoend_id='NOTPROVIDED',
                     )
 
-                    while isinstance(res, NeedTANResponse):
-                        res = ask_for_tan(res)
+                    while isinstance(res, NeedTANResponse | NeedVOPResponse):
+                        if isinstance(res, NeedTANResponse):
+                            res = ask_for_tan(res)
+                        elif isinstance(res, NeedVOPResponse):
+                            res = ask_for_vop(res)
                 elif choice == 11:
                     print("Select statement")
                     statements = f.get_statements(account)
index 8dd513ffd4575578f236989bcbd28f8d6d0535e8..c10846185e5764878cea87e4f2c285a983ea6e14 100644 (file)
@@ -27,7 +27,7 @@ from .security import (
     PinTanTwoStepAuthenticationMechanism,
 )
 from .segments.accounts import HISPA1, HKSPA1
-from .segments.auth import HIPINS1, HKTAB4, HKTAB5, HKTAN2, HKTAN3, HKTAN5, HKTAN6, HKTAN7
+from .segments.auth import HIPINS1, HKTAB4, HKTAB5, HKTAN2, HKTAN3, HKTAN5, HKTAN6, HKTAN7, HIVPPS1, HIVPP1, PSRD1, HKVPA1
 from .segments.bank import HIBPA3, HIUPA4, HKKOM4
 from .segments.debit import (
     HKDBS1, HKDBS2, HKDMB1, HKDMC1, HKDME1, HKDME2,
@@ -828,7 +828,7 @@ class FinTS3Client:
         :param reason: Transfer reason
         :param instant_payment: Whether to use instant payment (defaults to ``False``)
         :param endtoend_id: End-to-end-Id (defaults to ``NOTPROVIDED``)
-        :return: Returns either a NeedRetryResponse or TransactionResponse
+        :return: Returns either a NeedRetryResponse or NeedVOPResponse or TransactionResponse
         """
         config = {
             "name": account_name,
@@ -908,7 +908,7 @@ class FinTS3Client:
                 if book_as_single:
                     seg.request_single_booking = True
 
-            return self._send_with_possible_retry(dialog, seg, self._continue_sepa_transfer)
+            return self._send_pay_with_possible_retry(dialog, seg, self._continue_sepa_transfer)
 
     def _continue_sepa_transfer(self, command_seg, response):
         retval = TransactionResponse(response)
@@ -1074,6 +1074,41 @@ class FinTS3Client:
         self._standing_dialog = None
 
 
+class NeedVOPResponse(NeedRetryResponse):
+
+    def __init__(self, vop_result, command_seg, resume_method=None):
+        self.vop_result = vop_result
+        self.command_seg = command_seg
+        if hasattr(resume_method, '__func__'):
+            self.resume_method = resume_method.__func__.__name__
+        else:
+            self.resume_method = resume_method
+
+    def __repr__(self):
+        return '<o.__class__.__name__(vop_result={o.vop_result!r})>'.format(o=self)
+
+    @classmethod
+    def _from_data_v1(cls, data):
+        if data["version"] == 1:
+            segs = SegmentSequence(data['segments_bin']).segments
+            return cls(segs[0], segs[1], resume_method=data['resume_method'])
+
+        raise Exception("Wrong blob data version")
+
+    def get_data(self) -> bytes:
+        """Return a compressed datablob representing this object.
+
+        To restore the object, use :func:`fints.client.NeedRetryResponse.from_data`.
+        """
+        data = {
+            "_class_name": self.__class__.__name__,
+            "version": 1,
+            "segments_bin": SegmentSequence([self.vop_result, self.command_seg]).render_bytes(),
+            "resume_method": self.resume_method,
+        }
+        return compress_datablob(DATA_BLOB_MAGIC_RETRY, 1, data)
+
+
 class NeedTANResponse(NeedRetryResponse):
     challenge_raw = None  #: Raw challenge as received by the bank
     challenge = None  #: Textual challenge to be displayed to the user
@@ -1081,12 +1116,14 @@ class NeedTANResponse(NeedRetryResponse):
     challenge_hhduc = None  #: HHD_UC challenge to be transmitted to the TAN generator
     challenge_matrix = None  #: Matrix code challenge: tuple(mime_type, data)
     decoupled = None  #: Use decoupled process
+    vop_result = None  #: VoP result
 
-    def __init__(self, command_seg, tan_request, resume_method=None, tan_request_structured=False, decoupled=False):
+    def __init__(self, command_seg, tan_request, resume_method=None, tan_request_structured=False, decoupled=False, vop_result=None):
         self.command_seg = command_seg
         self.tan_request = tan_request
         self.tan_request_structured = tan_request_structured
         self.decoupled = decoupled
+        self.vop_result = vop_result
         if hasattr(resume_method, '__func__'):
             self.resume_method = resume_method.__func__.__name__
         else:
@@ -1315,6 +1352,23 @@ class FinTS3PinTanClient(FinTS3Client):
 
         return seg
 
+    def _find_vop_format_for_segment(self, seg):
+        vpps = self.bpd.find_segment_first('HIVPPS')
+        if not vpps:
+            return
+
+        needed = str(seg.header.type) in list(vpps.parameter.payment_order_segment)
+        
+        if not needed:
+            return
+
+        bank_supported = str(vpps.parameter.supported_report_formats)
+
+        if "sepade.pain.002.001.10.xsd" != bank_supported:
+            logger.warning("No common supported SEPA version. Defaulting to what bank supports and hoping for the best: %s.", bank_supported)
+        
+        return bank_supported
+
     def _need_twostep_tan_for_segment(self, seg):
         if not self.selected_security_function or self.selected_security_function == '999':
             return False
@@ -1351,6 +1405,65 @@ class FinTS3PinTanClient(FinTS3Client):
                 response = dialog.send(command_seg)
 
             return resume_func(command_seg, response)
+        
+    def _send_pay_with_possible_retry(self, dialog, command_seg, resume_func):
+        """
+        This adds VoP under the assumption that TAN will be sent,
+        There appears to be no VoP flow without sending any authentication.
+        
+        There are really 2 VoP flows: with a full match and otherwise.
+        The second flow returns a NeedVOPResponse as intended by the specification flowcharts.
+        In this case cases, the application should ask the user for confirmation based on HIVPP data in resp.vop_result.
+        
+        The kind of response is in resp.vop_result.single_vop_result.result:
+        - 'RCVC' - full match
+        - 'RVMC' - partial match, extra info in single_vop_result.close_match_name and .other_identification.
+        - 'RVNM' - no match, no extra info seen
+        - 'RVNA' - check not available, reason in single_vop_result.na_reason
+        - 'PDNG' - pending, seems related to something not implemented right now.
+        """
+        vop_seg = []
+        vop_standard = self._find_vop_format_for_segment(command_seg)
+        if vop_standard:
+            from .segments.auth import HKVPP1
+            vop_seg = [HKVPP1(supported_reports=PSRD1(psrd=[vop_standard]))]
+
+        with dialog:
+            if self._need_twostep_tan_for_segment(command_seg):
+                tan_seg = self._get_tan_segment(command_seg, '4')
+                segments = vop_seg + [command_seg, tan_seg]
+
+                response = dialog.send(*segments)
+
+                if vop_standard:
+                    hivpp = response.find_segment_first(HIVPP1, throw=True)
+
+                    vop_result = hivpp.vop_single_result
+                    if vop_result.result in ('RVNA', 'RVNM', 'RVMC'):  # Not Applicable, No Match, Close Match
+                        return NeedVOPResponse(
+                            vop_result=hivpp,
+                            command_seg=command_seg,
+                            resume_method=resume_func,
+                        )
+                else:
+                    hivpp = None
+
+                for resp in response.responses(tan_seg):
+                    if resp.code in ('0030', '3955'):
+                        return NeedTANResponse(
+                            command_seg,
+                            response.find_segment_first('HITAN'),
+                            resume_func,
+                            self.is_challenge_structured(),
+                            resp.code == '3955',
+                            hivpp,
+                        )
+                    if resp.code.startswith('9'):
+                        raise Exception("Error response: {!r}".format(response))
+            else:
+                response = dialog.send(command_seg)
+
+            return resume_func(command_seg, response)
 
     def is_challenge_structured(self):
         param = self.get_tan_mechanisms()[self.get_current_tan_mechanism()]
@@ -1358,6 +1471,33 @@ class FinTS3PinTanClient(FinTS3Client):
             return param.challenge_structured
         return False
 
+    def approve_vop_response(self, challenge: NeedVOPResponse):
+        """
+        Approves an operation that had a non-match VoP (verification of payee) response.
+
+        :param challenge: NeedVOPResponse to respond to
+        :return: New response after sending VOP response
+        """
+        with self._get_dialog() as dialog:
+            vop_seg = [HKVPA1(vop_id=challenge.vop_result.vop_id)]
+            tan_seg = self._get_tan_segment(challenge.command_seg, '4')
+            segments = vop_seg + [challenge.command_seg, tan_seg]
+            response = dialog.send(*segments)
+
+            for resp in response.responses(tan_seg):
+                if resp.code in ('0030', '3955'):
+                    return NeedTANResponse(
+                        challenge.command_seg,
+                        response.find_segment_first('HITAN'),
+                        challenge.resume_method,
+                        self.is_challenge_structured(),
+                        resp.code == '3955',
+                        challenge.vop_result,
+                    )
+
+            resume_func = getattr(self, challenge.resume_method)
+            return resume_func(challenge.command_seg, response)
+
     def send_tan(self, challenge: NeedTANResponse, tan: str):
         """
         Sends a TAN to confirm a pending operation.
@@ -1370,7 +1510,6 @@ class FinTS3PinTanClient(FinTS3Client):
         :param tan: TAN value
         :return: New response after sending TAN
         """
-
         with self._get_dialog() as dialog:
             if challenge.decoupled:
                 tan_seg = self._get_tan_segment(challenge.command_seg, 'S', challenge.tan_request)
@@ -1378,7 +1517,11 @@ class FinTS3PinTanClient(FinTS3Client):
                 tan_seg = self._get_tan_segment(challenge.command_seg, '2', challenge.tan_request)
                 self._pending_tan = tan
 
-            response = dialog.send(tan_seg)
+            vop_seg = []
+            if challenge.vop_result and challenge.vop_result.vop_single_result.result == 'RCVC':
+                vop_seg = [HKVPA1(vop_id=challenge.vop_result.vop_id)]
+            segments = vop_seg + [tan_seg]
+            response = dialog.send(*segments)
 
             if challenge.decoupled:
                 # TAN process = S
index 39dcc33f45945a357dabb45d5c3b898fbb6c201f..7f8d6e301c56a490ee63e11fee2cff7ccdba2308 100644 (file)
@@ -291,6 +291,21 @@ class TimeField(FixedLengthMixin, DigitsField):
         return super()._render_value(val)
 
 
+class TimestampField(DataElementField):
+    # Defined in the VoP standard, but missing in the Formals document. We just treat it as
+    # opaque bytes.
+    type = 'tsp'
+    _DOC_TYPE = bytes
+
+    def _render_value(self, value):
+        retval = bytes(value)
+        self._check_value_length(retval)
+        return retval
+
+    def _parse_value(self, value):
+        return bytes(value)
+
+
 class PasswordField(AlphanumericField):
     type = ''
     _DOC_TYPE = Password
index 57f665f2181868a8784f30d4fbb102ef72e53f25..9dcdd2eb12039e320259a6c0ffef333476959306 100644 (file)
@@ -1,5 +1,6 @@
 from fints.fields import CodeField, DataElementField, DataElementGroupField
 from fints.formals import (
+    DataElementGroup,
     KTI1, BankIdentifier, ChallengeValidUntil, Language2,
     ParameterChallengeClass, ParameterPinTan, ParameterTwostepTAN1,
     ParameterTwostepTAN2, ParameterTwostepTAN3, ParameterTwostepTAN4,
@@ -97,6 +98,75 @@ class HKTAN6(FinTS3Segment):
     response_hhd_uc = DataElementGroupField(type=ResponseHHDUC, required=False, _d="Antwort HHD_UC")
 
 
+class PSRD1(DataElementGroup):
+    """Unterstütze Payment Status Reports
+
+    Source: FinTS Financial Transaction Services, Schnittstellenspezifikation, Messages -- Multibankfähige Geschäftsvorfälle, version 3.0 Rel 2022, FV """
+    psrd = DataElementField(type='an', max_length=256, required=True, _d="Payment Status Report Descriptor", max_count=99)
+    # urn:iso:std:iso:20022:tech:xsd:pain.002.001.14
+
+
+class HKVPP1(FinTS3Segment):
+    """Namensabgleich Prüfauftrag, version 1
+
+    Source: FinTS Financial Transaction Services, Schnittstellenspezifikation, Verification of Payee"""
+    supported_reports = DataElementGroupField(type=PSRD1, required=True, _d="Unterstützte Payment Status Reports")
+    polling_id = DataElementField(type='bin', required=False, _d="Polling-ID")
+    max_queries = DataElementField(type='num', max_length=4, required=False, _d="Maximale Anzahl Einträge")
+    aufsetzpunkt = DataElementField(type='an', max_length=35, required=False, _d="Aufsetzpunkt")
+
+
+class EVPE(DataElementGroup):
+    """Ergebnis VOP-Prüfung Einzeltransaktion
+
+    Source: FinTS Financial Transaction Services, Schnittstellenspezifikation, Verification of Payee"""
+    recipient_IBAN = DataElementField(type='an', max_length=34, required=False, _d="IBAN Empfänger")
+    info_IBAN = DataElementField(type='an', max_length=140, required=False, _d="IBAN-Zusatzinformationen")
+    close_match_name = DataElementField(type='an', max_length=140, required=False, _d="Abweichender Empfängername")
+    other_identification = DataElementField(type='an', max_length=256, required=False, _d="Anderes Identifikationmerkmal")
+    # RVMC, RCVC, RVNM,  RVNA, PDNG
+    result = DataElementField(type='code', length=4, required=False, _d="VOP-Prüfergebnis")
+    na_reason = DataElementField(type='an', max_length=256, required=False, _d="Grund RVNA")
+
+
+class HIVPP1(FinTS3Segment):
+    """Namensabgleich Namensabgleich Prüfergebnis, version 1
+    Source: FinTS Financial Transaction Services, Schnittstellenspezifikation, Verification of Payee"""
+    vop_id = DataElementField(type='bin', required=False, _d="VOP-ID")
+    vop_id_valid_until = DataElementGroupField(type=ChallengeValidUntil, required=False, _d="VOP-ID gültig bis")
+    polling_id = DataElementField(type='bin', required=False, _d="Polling-ID")
+    payment_status_report_descriptor = DataElementField(type='an', max_length=256, required=False, _d="Payment Status Report Descriptor")
+    payment_status_report = DataElementField(type='bin', required=False, _d="Payment Status Report")
+    # Only for a single transaction. Mutually exclusive with payment status report.
+    vop_single_result = DataElementGroupField(type=EVPE, required=False, _d="Ergebnis VOP-Prüfung Einzeltransaktion")
+    manual_authorization_notice = DataElementField(type='an', max_length=65535, required=False, _d="Aufklärungstext Autorisierung trotz Abweichung")
+    wait_for_seconds = DataElementField(type='num', length=1, required=False, _d="Wartezeit vor nächster Abfrage")
+
+
+class ParameterVoP(DataElementGroup):
+    max_trans = DataElementField(type='num', max_length=7, required=False, _d="Maximale Anzahl CreditTransferTransactionInformation OptIn")
+    notice_is_structured = DataElementField(type='jn', required=False, _d="Aufklärungstext strukturiert")
+    # complete: V, piecemeal: S
+    report_complete = DataElementField(type='code', length=1, required=False, _d="Art der Lieferung Payment Status Report")
+    batch_payment_allowed = DataElementField(type='jn', required=False, _d="Sammelzahlungen mit einem Auftrag erlaubt")
+    multiple_allowed = DataElementField(type='jn', required=False, _d="Eingabe Anzahl Einträge erlaubt")
+    supported_report_formats = DataElementField(type='an', max_length=1024, required=False, _d="Unterstützte Payment Status Report Daten-formate")
+    payment_order_segment = DataElementField(type='an', min_count=1, max_length=6, required=False, _d="VOP-pflichtiger Zahlungsverkehrsauftrag")
+
+
+class HIVPPS1(ParameterSegment):
+    """Namensabgleich Prüfauftrag Parameter, version 1
+
+    Source: FinTS Financial Transaction Services, Schnittstellenspezifikation, Verification of Payee"""
+    parameter = DataElementGroupField(type=ParameterVoP, _d="Parameter Namensabgleich Prüfauftrag") 
+
+
+class HKVPA1(FinTS3Segment):
+    """Namensabgleich Namensabgleich Ausführungsauftrag, version 1
+    Source: FinTS Financial Transaction Services, Schnittstellenspezifikation, Verification of Payee"""
+    vop_id = DataElementField(type='bin', required=False, _d="VOP-ID")
+
+
 class HKTAN7(FinTS3Segment):
     """Zwei-Schritt-TAN-Einreichung, version 7
 
@@ -191,7 +261,6 @@ class HKTAB4(FinTS3Segment):
     tan_media_type = CodeField(enum=TANMediaType2, _d="TAN-Medium-Art")
     tan_media_class = CodeField(enum=TANMediaClass3, _d="TAN-Medium-Klasse")
 
-
 class HITAB4(FinTS3Segment):
     """TAN-Generator/Liste anzeigen Bestand Rückmeldung, version 4
 
index aa75a545ef841f165d169bd72a15583fdf0ac227..2518597bba1fa8da5349d405d389e0acf3cc1c9e 100644 (file)
@@ -214,7 +214,7 @@ class SegmentSequence:
         if isinstance(segments, bytes):
             from .parser import FinTS3Parser
             parser = FinTS3Parser()
-            data = parser.explode_segments(segments)
+            data = list(parser.explode_segments(segments))
             segments = [parser.parse_segment(segment) for segment in data]
         self.segments = list(segments) if segments else []