--- /dev/null
+Registration necessary
+======================
+
+As of September 14th, 2019, all FinTS programs need to be registered with the ZKA or
+banks will block access. You need to fill out a PDF form and will be assigned a
+product ID that you can pass above.
+
+Click here to read more about the `registration process`_.
+
+
+.. _registration process: https://www.hbci-zka.de/register/prod_register.htm
:members: simple_sepa_transfer
:noindex:
+The return value may be a `NeedVOPResponse` in which case you need to call `approve_vop_response` to proceed.
+
+At any point, you might receive a `NeedTANResponse`.
You should then enter a TAN, read our chapter :ref:`tans` to find out more.
+.. autoclass:: fints.client.FinTS3PinTanClient
+ :members: approve_vop_response
+ :noindex:
+
+
Advanced mode
-------------
endtoend_id='NOTPROVIDED',
)
- while isinstance(res, NeedTANResponse):
- print("A TAN is required", res.challenge)
-
- if getattr(res, 'challenge_hhduc', None):
- try:
- terminal_flicker_unix(res.challenge_hhduc)
- except KeyboardInterrupt:
- pass
-
- if result.decoupled:
- tan = input('Please press enter after confirming the transaction in your app:')
- else:
- tan = input('Please enter TAN:')
- res = client.send_tan(res, tan)
+ while isinstance(res, NeedTANResponse | NeedVOPResponse):
+ if isinstance(res, NeedTANResponse):
+ print("A TAN is required", res.challenge)
+
+ if getattr(res, 'challenge_hhduc', None):
+ try:
+ terminal_flicker_unix(res.challenge_hhduc)
+ except KeyboardInterrupt:
+ pass
+
+ if result.decoupled:
+ tan = input('Please press enter after confirming the transaction in your app:')
+ else:
+ tan = input('Please enter TAN:')
+ res = client.send_tan(res, tan)
+ elif isinstance(res, NeedVOPResponse):
+ if res.vop_result.vop_single_result.result == "RVMC":
+ print("Payee name is a close match")
+ print("Name retrieved by bank:", res.vop_result.vop_single_result.close_match_name)
+ if res.vop_result.vop_single_result.other_identification:
+ print("Other info retrieved by bank:", res.vop_result.vop_single_result.other_identification)
+ elif res.vop_result.vop_single_result.result == "RVNM":
+ print("Payee name does not match match")
+ elif res.vop_result.vop_single_result.result == "RVNA":
+ print("Payee name could not be verified")
+ print("Reason:", res.vop_result.vop_single_result.na_reason)
+ elif res.vop_result.vop_single_result.result == "PDNG":
+ print("Payee name could not be verified (pending state, can't be handled by this library)")
+ print("Do you want to continue? Your bank will not be liable if the money ends up in the wrong place.")
+ input('Please press enter to confirm or Ctrl+C to cancel')
+ res = client.approve_vop_response(res)
print(res.status)
print(res.responses)
return f.send_tan(response, tan)
+ def ask_for_vop(response: NeedVOPResponse):
+ if response.vop_result.vop_single_result.result == "RVMC":
+ print("Payee name is a close match")
+ print("Name retrieved by bank:", response.vop_result.vop_single_result.close_match_name)
+ if response.vop_result.vop_single_result.other_identification:
+ print("Other info retrieved by bank:", response.vop_result.vop_single_result.other_identification)
+ elif response.vop_result.vop_single_result.result == "RVNM":
+ print("Payee name does not match match")
+ elif response.vop_result.vop_single_result.result == "RVNA":
+ print("Payee name could not be verified")
+ print("Reason:", response.vop_result.vop_single_result.na_reason)
+ elif response.vop_result.vop_single_result.result == "PDNG":
+ print("Payee name could not be verified (pending state, can't be handled by this library)")
+ print("Do you want to continue? Your bank will not be liable if the money ends up in the wrong place.")
+ input('Please press enter to confirm or Ctrl+C to cancel')
+ return f.approve_vop_response(response)
+
+
# Open the actual dialog
with f:
# Since PSD2, a TAN might be needed for dialog initialization. Let's check if there is one required
endtoend_id='NOTPROVIDED',
)
- while isinstance(res, NeedTANResponse):
- res = ask_for_tan(res)
+ while isinstance(res, NeedTANResponse | NeedVOPResponse):
+ if isinstance(res, NeedTANResponse):
+ res = ask_for_tan(res)
+ elif isinstance(res, NeedVOPResponse):
+ res = ask_for_vop(res)
elif choice == 11:
print("Select statement")
statements = f.get_statements(account)
PinTanTwoStepAuthenticationMechanism,
)
from .segments.accounts import HISPA1, HKSPA1
-from .segments.auth import HIPINS1, HKTAB4, HKTAB5, HKTAN2, HKTAN3, HKTAN5, HKTAN6, HKTAN7
+from .segments.auth import HIPINS1, HKTAB4, HKTAB5, HKTAN2, HKTAN3, HKTAN5, HKTAN6, HKTAN7, HIVPPS1, HIVPP1, PSRD1, HKVPA1
from .segments.bank import HIBPA3, HIUPA4, HKKOM4
from .segments.debit import (
HKDBS1, HKDBS2, HKDMB1, HKDMC1, HKDME1, HKDME2,
:param reason: Transfer reason
:param instant_payment: Whether to use instant payment (defaults to ``False``)
:param endtoend_id: End-to-end-Id (defaults to ``NOTPROVIDED``)
- :return: Returns either a NeedRetryResponse or TransactionResponse
+ :return: Returns either a NeedRetryResponse or NeedVOPResponse or TransactionResponse
"""
config = {
"name": account_name,
if book_as_single:
seg.request_single_booking = True
- return self._send_with_possible_retry(dialog, seg, self._continue_sepa_transfer)
+ return self._send_pay_with_possible_retry(dialog, seg, self._continue_sepa_transfer)
def _continue_sepa_transfer(self, command_seg, response):
retval = TransactionResponse(response)
self._standing_dialog = None
+class NeedVOPResponse(NeedRetryResponse):
+
+ def __init__(self, vop_result, command_seg, resume_method=None):
+ self.vop_result = vop_result
+ self.command_seg = command_seg
+ if hasattr(resume_method, '__func__'):
+ self.resume_method = resume_method.__func__.__name__
+ else:
+ self.resume_method = resume_method
+
+ def __repr__(self):
+ return '<o.__class__.__name__(vop_result={o.vop_result!r})>'.format(o=self)
+
+ @classmethod
+ def _from_data_v1(cls, data):
+ if data["version"] == 1:
+ segs = SegmentSequence(data['segments_bin']).segments
+ return cls(segs[0], segs[1], resume_method=data['resume_method'])
+
+ raise Exception("Wrong blob data version")
+
+ def get_data(self) -> bytes:
+ """Return a compressed datablob representing this object.
+
+ To restore the object, use :func:`fints.client.NeedRetryResponse.from_data`.
+ """
+ data = {
+ "_class_name": self.__class__.__name__,
+ "version": 1,
+ "segments_bin": SegmentSequence([self.vop_result, self.command_seg]).render_bytes(),
+ "resume_method": self.resume_method,
+ }
+ return compress_datablob(DATA_BLOB_MAGIC_RETRY, 1, data)
+
+
class NeedTANResponse(NeedRetryResponse):
challenge_raw = None #: Raw challenge as received by the bank
challenge = None #: Textual challenge to be displayed to the user
challenge_hhduc = None #: HHD_UC challenge to be transmitted to the TAN generator
challenge_matrix = None #: Matrix code challenge: tuple(mime_type, data)
decoupled = None #: Use decoupled process
+ vop_result = None #: VoP result
- def __init__(self, command_seg, tan_request, resume_method=None, tan_request_structured=False, decoupled=False):
+ def __init__(self, command_seg, tan_request, resume_method=None, tan_request_structured=False, decoupled=False, vop_result=None):
self.command_seg = command_seg
self.tan_request = tan_request
self.tan_request_structured = tan_request_structured
self.decoupled = decoupled
+ self.vop_result = vop_result
if hasattr(resume_method, '__func__'):
self.resume_method = resume_method.__func__.__name__
else:
return seg
+ def _find_vop_format_for_segment(self, seg):
+ vpps = self.bpd.find_segment_first('HIVPPS')
+ if not vpps:
+ return
+
+ needed = str(seg.header.type) in list(vpps.parameter.payment_order_segment)
+
+ if not needed:
+ return
+
+ bank_supported = str(vpps.parameter.supported_report_formats)
+
+ if "sepade.pain.002.001.10.xsd" != bank_supported:
+ logger.warning("No common supported SEPA version. Defaulting to what bank supports and hoping for the best: %s.", bank_supported)
+
+ return bank_supported
+
def _need_twostep_tan_for_segment(self, seg):
if not self.selected_security_function or self.selected_security_function == '999':
return False
response = dialog.send(command_seg)
return resume_func(command_seg, response)
+
+ def _send_pay_with_possible_retry(self, dialog, command_seg, resume_func):
+ """
+ This adds VoP under the assumption that TAN will be sent,
+ There appears to be no VoP flow without sending any authentication.
+
+ There are really 2 VoP flows: with a full match and otherwise.
+ The second flow returns a NeedVOPResponse as intended by the specification flowcharts.
+ In this case cases, the application should ask the user for confirmation based on HIVPP data in resp.vop_result.
+
+ The kind of response is in resp.vop_result.single_vop_result.result:
+ - 'RCVC' - full match
+ - 'RVMC' - partial match, extra info in single_vop_result.close_match_name and .other_identification.
+ - 'RVNM' - no match, no extra info seen
+ - 'RVNA' - check not available, reason in single_vop_result.na_reason
+ - 'PDNG' - pending, seems related to something not implemented right now.
+ """
+ vop_seg = []
+ vop_standard = self._find_vop_format_for_segment(command_seg)
+ if vop_standard:
+ from .segments.auth import HKVPP1
+ vop_seg = [HKVPP1(supported_reports=PSRD1(psrd=[vop_standard]))]
+
+ with dialog:
+ if self._need_twostep_tan_for_segment(command_seg):
+ tan_seg = self._get_tan_segment(command_seg, '4')
+ segments = vop_seg + [command_seg, tan_seg]
+
+ response = dialog.send(*segments)
+
+ if vop_standard:
+ hivpp = response.find_segment_first(HIVPP1, throw=True)
+
+ vop_result = hivpp.vop_single_result
+ if vop_result.result in ('RVNA', 'RVNM', 'RVMC'): # Not Applicable, No Match, Close Match
+ return NeedVOPResponse(
+ vop_result=hivpp,
+ command_seg=command_seg,
+ resume_method=resume_func,
+ )
+ else:
+ hivpp = None
+
+ for resp in response.responses(tan_seg):
+ if resp.code in ('0030', '3955'):
+ return NeedTANResponse(
+ command_seg,
+ response.find_segment_first('HITAN'),
+ resume_func,
+ self.is_challenge_structured(),
+ resp.code == '3955',
+ hivpp,
+ )
+ if resp.code.startswith('9'):
+ raise Exception("Error response: {!r}".format(response))
+ else:
+ response = dialog.send(command_seg)
+
+ return resume_func(command_seg, response)
def is_challenge_structured(self):
param = self.get_tan_mechanisms()[self.get_current_tan_mechanism()]
return param.challenge_structured
return False
+ def approve_vop_response(self, challenge: NeedVOPResponse):
+ """
+ Approves an operation that had a non-match VoP (verification of payee) response.
+
+ :param challenge: NeedVOPResponse to respond to
+ :return: New response after sending VOP response
+ """
+ with self._get_dialog() as dialog:
+ vop_seg = [HKVPA1(vop_id=challenge.vop_result.vop_id)]
+ tan_seg = self._get_tan_segment(challenge.command_seg, '4')
+ segments = vop_seg + [challenge.command_seg, tan_seg]
+ response = dialog.send(*segments)
+
+ for resp in response.responses(tan_seg):
+ if resp.code in ('0030', '3955'):
+ return NeedTANResponse(
+ challenge.command_seg,
+ response.find_segment_first('HITAN'),
+ challenge.resume_method,
+ self.is_challenge_structured(),
+ resp.code == '3955',
+ challenge.vop_result,
+ )
+
+ resume_func = getattr(self, challenge.resume_method)
+ return resume_func(challenge.command_seg, response)
+
def send_tan(self, challenge: NeedTANResponse, tan: str):
"""
Sends a TAN to confirm a pending operation.
:param tan: TAN value
:return: New response after sending TAN
"""
-
with self._get_dialog() as dialog:
if challenge.decoupled:
tan_seg = self._get_tan_segment(challenge.command_seg, 'S', challenge.tan_request)
tan_seg = self._get_tan_segment(challenge.command_seg, '2', challenge.tan_request)
self._pending_tan = tan
- response = dialog.send(tan_seg)
+ vop_seg = []
+ if challenge.vop_result and challenge.vop_result.vop_single_result.result == 'RCVC':
+ vop_seg = [HKVPA1(vop_id=challenge.vop_result.vop_id)]
+ segments = vop_seg + [tan_seg]
+ response = dialog.send(*segments)
if challenge.decoupled:
# TAN process = S
return super()._render_value(val)
+class TimestampField(DataElementField):
+ # Defined in the VoP standard, but missing in the Formals document. We just treat it as
+ # opaque bytes.
+ type = 'tsp'
+ _DOC_TYPE = bytes
+
+ def _render_value(self, value):
+ retval = bytes(value)
+ self._check_value_length(retval)
+ return retval
+
+ def _parse_value(self, value):
+ return bytes(value)
+
+
class PasswordField(AlphanumericField):
type = ''
_DOC_TYPE = Password
from fints.fields import CodeField, DataElementField, DataElementGroupField
from fints.formals import (
+ DataElementGroup,
KTI1, BankIdentifier, ChallengeValidUntil, Language2,
ParameterChallengeClass, ParameterPinTan, ParameterTwostepTAN1,
ParameterTwostepTAN2, ParameterTwostepTAN3, ParameterTwostepTAN4,
response_hhd_uc = DataElementGroupField(type=ResponseHHDUC, required=False, _d="Antwort HHD_UC")
+class PSRD1(DataElementGroup):
+ """Unterstütze Payment Status Reports
+
+ Source: FinTS Financial Transaction Services, Schnittstellenspezifikation, Messages -- Multibankfähige Geschäftsvorfälle, version 3.0 Rel 2022, FV """
+ psrd = DataElementField(type='an', max_length=256, required=True, _d="Payment Status Report Descriptor", max_count=99)
+ # urn:iso:std:iso:20022:tech:xsd:pain.002.001.14
+
+
+class HKVPP1(FinTS3Segment):
+ """Namensabgleich Prüfauftrag, version 1
+
+ Source: FinTS Financial Transaction Services, Schnittstellenspezifikation, Verification of Payee"""
+ supported_reports = DataElementGroupField(type=PSRD1, required=True, _d="Unterstützte Payment Status Reports")
+ polling_id = DataElementField(type='bin', required=False, _d="Polling-ID")
+ max_queries = DataElementField(type='num', max_length=4, required=False, _d="Maximale Anzahl Einträge")
+ aufsetzpunkt = DataElementField(type='an', max_length=35, required=False, _d="Aufsetzpunkt")
+
+
+class EVPE(DataElementGroup):
+ """Ergebnis VOP-Prüfung Einzeltransaktion
+
+ Source: FinTS Financial Transaction Services, Schnittstellenspezifikation, Verification of Payee"""
+ recipient_IBAN = DataElementField(type='an', max_length=34, required=False, _d="IBAN Empfänger")
+ info_IBAN = DataElementField(type='an', max_length=140, required=False, _d="IBAN-Zusatzinformationen")
+ close_match_name = DataElementField(type='an', max_length=140, required=False, _d="Abweichender Empfängername")
+ other_identification = DataElementField(type='an', max_length=256, required=False, _d="Anderes Identifikationmerkmal")
+ # RVMC, RCVC, RVNM, RVNA, PDNG
+ result = DataElementField(type='code', length=4, required=False, _d="VOP-Prüfergebnis")
+ na_reason = DataElementField(type='an', max_length=256, required=False, _d="Grund RVNA")
+
+
+class HIVPP1(FinTS3Segment):
+ """Namensabgleich Namensabgleich Prüfergebnis, version 1
+ Source: FinTS Financial Transaction Services, Schnittstellenspezifikation, Verification of Payee"""
+ vop_id = DataElementField(type='bin', required=False, _d="VOP-ID")
+ vop_id_valid_until = DataElementGroupField(type=ChallengeValidUntil, required=False, _d="VOP-ID gültig bis")
+ polling_id = DataElementField(type='bin', required=False, _d="Polling-ID")
+ payment_status_report_descriptor = DataElementField(type='an', max_length=256, required=False, _d="Payment Status Report Descriptor")
+ payment_status_report = DataElementField(type='bin', required=False, _d="Payment Status Report")
+ # Only for a single transaction. Mutually exclusive with payment status report.
+ vop_single_result = DataElementGroupField(type=EVPE, required=False, _d="Ergebnis VOP-Prüfung Einzeltransaktion")
+ manual_authorization_notice = DataElementField(type='an', max_length=65535, required=False, _d="Aufklärungstext Autorisierung trotz Abweichung")
+ wait_for_seconds = DataElementField(type='num', length=1, required=False, _d="Wartezeit vor nächster Abfrage")
+
+
+class ParameterVoP(DataElementGroup):
+ max_trans = DataElementField(type='num', max_length=7, required=False, _d="Maximale Anzahl CreditTransferTransactionInformation OptIn")
+ notice_is_structured = DataElementField(type='jn', required=False, _d="Aufklärungstext strukturiert")
+ # complete: V, piecemeal: S
+ report_complete = DataElementField(type='code', length=1, required=False, _d="Art der Lieferung Payment Status Report")
+ batch_payment_allowed = DataElementField(type='jn', required=False, _d="Sammelzahlungen mit einem Auftrag erlaubt")
+ multiple_allowed = DataElementField(type='jn', required=False, _d="Eingabe Anzahl Einträge erlaubt")
+ supported_report_formats = DataElementField(type='an', max_length=1024, required=False, _d="Unterstützte Payment Status Report Daten-formate")
+ payment_order_segment = DataElementField(type='an', min_count=1, max_length=6, required=False, _d="VOP-pflichtiger Zahlungsverkehrsauftrag")
+
+
+class HIVPPS1(ParameterSegment):
+ """Namensabgleich Prüfauftrag Parameter, version 1
+
+ Source: FinTS Financial Transaction Services, Schnittstellenspezifikation, Verification of Payee"""
+ parameter = DataElementGroupField(type=ParameterVoP, _d="Parameter Namensabgleich Prüfauftrag")
+
+
+class HKVPA1(FinTS3Segment):
+ """Namensabgleich Namensabgleich Ausführungsauftrag, version 1
+ Source: FinTS Financial Transaction Services, Schnittstellenspezifikation, Verification of Payee"""
+ vop_id = DataElementField(type='bin', required=False, _d="VOP-ID")
+
+
class HKTAN7(FinTS3Segment):
"""Zwei-Schritt-TAN-Einreichung, version 7
tan_media_type = CodeField(enum=TANMediaType2, _d="TAN-Medium-Art")
tan_media_class = CodeField(enum=TANMediaClass3, _d="TAN-Medium-Klasse")
-
class HITAB4(FinTS3Segment):
"""TAN-Generator/Liste anzeigen Bestand Rückmeldung, version 4
if isinstance(segments, bytes):
from .parser import FinTS3Parser
parser = FinTS3Parser()
- data = parser.explode_segments(segments)
+ data = list(parser.explode_segments(segments))
segments = [parser.parse_segment(segment) for segment in data]
self.segments = list(segments) if segments else []