PinTanDummyEncryptionMechanism, PinTanOneStepAuthenticationMechanism,
PinTanTwoStepAuthenticationMechanism,
)
-from .segments import HIBPA3, HIRMG2, HIRMS2, HIUPA4
+from .segments import HIBPA3, HIRMG2, HIRMS2, HIUPA4, HIPINS1
from .segments.accounts import HISPA1, HKSPA, HKSPA1
-from .segments.auth import HKTAB, HKTAN, HKTAB4, HKTAB5, HKTAN5
+from .segments.auth import HKTAB, HKTAN, HKTAB4, HKTAB5, HKTAN3, HKTAN5
from .segments.depot import HKWPD5, HKWPD6
from .segments.dialog import HISYN4, HKSYN3
from .segments.saldo import HKSAL5, HKSAL6, HKSAL7
SYSTEM_ID_UNASSIGNED = '0'
DATA_BLOB_MAGIC = b'python-fints_DATABLOB'
+class NeedRetryResponse:
+ pass
+
class FinTS3Client:
def __init__(self, bank_identifier, user_id, customer_id=None, set_data=None):
self.accounts = []
def _ensure_system_id(self):
raise NotImplemented()
+ def _send(self, dialog, *segments):
+ "Internal send, may be overriden in subclasses"
+ return dialog.send(*segments)
+
def __enter__(self):
if self._standing_dialog:
- raise Error("Cannot double __enter__() {}".format(self))
+ raise Exception("Cannot double __enter__() {}".format(self))
self._standing_dialog = self._get_dialog()
self._standing_dialog.__enter__()
if self._standing_dialog:
self._standing_dialog.__exit__(exc_type, exc_value, traceback)
else:
- raise Error("Cannot double __exit__() {}".format(self))
+ raise Exception("Cannot double __exit__() {}".format(self))
self._standing_dialog = None
def _get_dialog(self, lazy_init=False):
if lazy_init and self._standing_dialog:
- raise Error("Cannot _get_dialog(lazy_init=True) with _standing_dialog")
+ raise Exception("Cannot _get_dialog(lazy_init=True) with _standing_dialog")
if self._standing_dialog:
return self._standing_dialog
self._call_callbacks(None, response)
if response.code.startswith('9'):
- raise Error("FinTS error response: {!r}".format(response))
+ raise Exception("FinTS error response: {!r}".format(response))
self._process_response(None, response)
logger.debug('No HIWPD response segment found - maybe account has no holdings?')
return []
- def _create_send_tan_message(self, dialog: FinTSDialogOLD, challenge: TANChallenge, tan):
- return self._new_message(dialog, [
- HKTAN(3, '2', challenge.reference, '', challenge.version)
- ], tan)
-
- def send_tan(self, challenge: TANChallenge, tan: str):
- """
- Sends a TAN to confirm a pending operation.
-
- :param challenge: TANChallenge to respond to
- :param tan: TAN value
- :return: Currently no response
- """
- if challenge.tan_process != '4':
- raise NotImplementedError("TAN process {} currently not implemented".format(challenge.tan_process))
- with self.pin.protect():
- logger.debug('Sending HKTAN: {}'.format(self._create_send_tan_message(
- challenge.dialog, challenge, tan
- )))
-
- resp = challenge.dialog.send(self._create_send_tan_message(
- challenge.dialog, challenge, tan
- ))
- logger.debug('Got HKTAN response: {}'.format(resp))
-
- challenge.dialog.end()
-
def start_simple_sepa_transfer(self, account: SEPAAccount, iban: str, bic: str,
recipient_name: str, amount: Decimal, account_name: str, reason: str,
endtoend_id='NOTPROVIDED'):
xml = sepa.export()
return self.start_sepa_transfer(account, xml)
- def _get_start_sepa_transfer_message(self, dialog, account: SEPAAccount, pain_message: str, tan_method,
- tan_description, multiple, control_sum, currency, book_as_single):
- if multiple:
- if not control_sum:
- raise ValueError("Control sum required.")
- segreq = HKCCM(3, account, pain_message, control_sum, currency, book_as_single)
- else:
- segreq = HKCCS(3, account, pain_message)
- segtan = HKTAN(4, '4', '', tan_description, tan_method.version)
- return self._new_message(dialog, [
- segreq,
- segtan
- ])
-
- def _get_tan_segment(self, orig_seg, tan_process):
- tan_mechanism = self.get_tan_mechanisms()[self.get_current_tan_mechanism()]
-
- hitans = self.bpd.find_segment_first('HITANS', tan_mechanism.VERSION)
- hktan = {
- 5: HKTAN5,
- }.get(tan_mechanism.VERSION)
-
- seg = hktan(tan_process=tan_process)
-
- if tan_process == '1':
- seg.segment_type = orig_seg.header.type
- account_ = getattr(orig_seg, 'account', None)
- if isinstance(account, KTI1):
- seg.account = account
- raise NotImplementedError("TAN-Process 1 not implemented")
-
- if tan_process in ('1', '3', '4') and \
- tan_mechanism.supported_media_number > 1 and \
- tan_mechanism.description_required == DescriptionRequired.MUST:
- seg.tan_medium_name = self.selected_tan_medium
-
- return seg
-
-
def start_sepa_transfer(self, account: SEPAAccount, pain_message: bytes, multiple=False,
control_sum=None, currency='EUR', book_as_single=False,
pain_descriptor='urn:iso:std:iso:20022:tech:xsd:pain.001.001.03'):
if book_as_single:
seg.request_single_booking = True
- tan_seg = self._get_tan_segment(seg, '4')
- response = dialog.send(seg, tan_seg)
+ response = self._send(dialog, seg)
+ if isinstance(response, NeedRetryResponse):
+ return response
- #return self._tan_requiring_response(dialog, resp)
+ # FIXME Properly find return code
+ return True
def _get_start_sepa_debit_message(self, dialog, account: SEPAAccount, pain_message: str, tan_method,
tan_description, multiple, control_sum, currency, book_as_single):
logger.debug('Got response: {}'.format(resp))
return self._tan_requiring_response(dialog, resp)
- def _tan_requiring_response(self, dialog, resp):
- seg = resp._find_segment('HITAN')
- if seg[0][2] == '3':
- model = TANChallenge3
- elif seg[0][2] == '4':
- model = TANChallenge4
- elif seg[0][2] == '5':
- model = TANChallenge5
- elif seg[0][2] == '6':
- model = TANChallenge6
- else:
- raise NotImplementedError(
- "HITAN segment version {} is currently not implemented".format(
- seg[0][2]
- )
- )
- return model(dialog, *s[1:1 + len(model.args)])
-
def get_tan_mechanisms(self):
"""
Get the available TAN mechanisms.
According to 'FinTS Financial Transaction Services, Schnittstellenspezifikation, Formals',
version 3.0, section C.3.1.3, you should fill this with useful information about the
end-user product, *NOT* the FinTS library."""
-
+
self.product_name = product_name
self.product_version = product_version
# Exiting the context here ends the dialog, unless frozen with pause_dialog() again.
"""
if not self._standing_dialog:
- raise Error("Cannot pause dialog, no standing dialog exists")
+ raise Exception("Cannot pause dialog, no standing dialog exists")
return self._standing_dialog.pause()
@contextmanager
def resume_dialog(self, dialog_data):
# FIXME document, test, NOTE NO UNTRUSTED SOURCES
if self._standing_dialog:
- raise Error("Cannot resume dialog, existing standing dialog")
+ raise Exception("Cannot resume dialog, existing standing dialog")
self._standing_dialog = FinTSDialog.create_resume(self, dialog_data)
with self._standing_dialog:
yield self
self._standing_dialog = None
+class NeedTANResponse(NeedRetryResponse):
+ def __init__(self, command_seg, hitan):
+ self.command_seg = command_seg
+ self.hitan = hitan
class FinTS3PinTanClient(FinTS3Client):
def __init__(self, bank_identifier, user_id, pin, server, customer_id=None, *args, **kwargs):
self.pin = Password(pin)
+ self._pending_tan = None
self.connection = FinTSHTTPSConnection(server)
super().__init__(bank_identifier=bank_identifier, user_id=user_id, customer_id=customer_id, *args, **kwargs)
auth_mechanisms=[auth],
)
- def _new_message(self, dialog: FinTSDialogOLD, segments, tan=None):
- return FinTSMessageOLD(self.blz, self.username, self.pin, dialog.systemid, dialog.dialogid, dialog.msgno,
- segments, dialog.tan_mechs, tan)
-
def _ensure_system_id(self):
if self.system_id != SYSTEM_ID_UNASSIGNED:
return
if not seg:
raise ValueError('Could not find system_id')
self.system_id = seg.system_id
+
+ def _get_tan_segment(self, orig_seg, tan_process, tan_seg=None):
+ tan_mechanism = self.get_tan_mechanisms()[self.get_current_tan_mechanism()]
+
+ hitans = self.bpd.find_segment_first('HITANS', tan_mechanism.VERSION)
+ hktan = {
+ 3: HKTAN3,
+ 5: HKTAN5,
+ }.get(tan_mechanism.VERSION)
+
+ seg = hktan(tan_process=tan_process)
+
+ if tan_process == '1':
+ seg.segment_type = orig_seg.header.type
+ account_ = getattr(orig_seg, 'account', None)
+ if isinstance(account, KTI1):
+ seg.account = account
+ raise NotImplementedError("TAN-Process 1 not implemented")
+
+ if tan_process in ('1', '3', '4') and \
+ tan_mechanism.supported_media_number > 1 and \
+ tan_mechanism.description_required == DescriptionRequired.MUST:
+ seg.tan_medium_name = self.selected_tan_medium
+
+ if tan_process in ('2', '3'):
+ seg.task_reference = tan_seg.task_reference
+
+ if tan_process in ('1', '2'):
+ seg.further_tan_follows = False
+
+ return seg
+
+ def _need_twostep_tan_for_segment(self, seg):
+ if not self.selected_security_function or self.selected_security_function == '999':
+ return False
+ else:
+ hipins = self.bpd.find_segment_first(HIPINS1)
+ if not hipins:
+ return False
+ else:
+ for requirement in hipins.parameter.transaction_tans_required:
+ if seg.header.type == requirement.transaction:
+ return requirement.tan_required
+
+ return False
+
+ def _send(self, dialog, *segments):
+ need_twostep = any(self._need_twostep_tan_for_segment(seg) for seg in segments)
+
+ with dialog:
+ if need_twostep:
+ assert len(segments) == 1
+ seg = segments[0]
+ tan_seg = self._get_tan_segment(seg, '4')
+
+ response = super()._send(dialog, seg, tan_seg)
+
+ for resp in response.responses(tan_seg):
+ if resp.code == '0030':
+ response = NeedTANResponse(seg, response.find_segment_first('HITAN'))
+
+ else:
+ response = super()._send(dialog, *segments)
+
+ return response
+
+
+ def send_tan(self, challenge: NeedTANResponse, tan: str):
+ """
+ Sends a TAN to confirm a pending operation.
+
+ :param challenge: NeedTANResponse to respond to
+ :param tan: TAN value
+ :return: Currently no response
+ """
+
+ with self._get_dialog() as dialog:
+ tan_seg = self._get_tan_segment(challenge.command_seg, '2', challenge.hitan)
+ self._pending_tan = tan
+
+ response = self._send(dialog, tan_seg)
+
+ # FIXME Try to return a better return code
+
+ return response
super().__init__(segno, data)
+class HKTAN3(FinTS3Segment):
+ """Zwei-Schritt-TAN-Einreichung, version 3
+
+ Source: FinTS Financial Transaction Services, Schnittstellenspezifikation, Sicherheitsverfahren PIN/TAN"""
+ tan_process = DataElementField(type='code', length=1, _d="TAN-Prozess")
+ task_hash_value = DataElementField(type='bin', max_length=256, required=False, _d="Auftrags-Hashwert")
+ task_reference = DataElementField(type='an', max_length=35, required=False, _d="Auftragsreferenz")
+ tan_list_number = DataElementField(type='an', max_length=20, required=False, _d="TAN-Listennummer")
+ further_tan_follows = DataElementField(type='jn', length=1, required=False, _d="Weitere TAN folgt")
+ cancel_task = DataElementField(type='jn', length=1, required=False, _d="Auftrag stornieren")
+ challenge_class = DataElementField(type='num', max_length=2, required=False, _d="Challenge-Klasse")
+ parameter_challenge_class = DataElementGroupField(type=ParameterChallengeClass, required=False, _d="Parameter Challenge-Klasse")
+ tan_medium_name = DataElementField(type='an', max_length=32, required=False, _d="Bezeichnung des TAN-Mediums")
+
+
class HKTAN5(FinTS3Segment):
"""Zwei-Schritt-TAN-Einreichung, version 5
response_hhd_uc = DataElementGroupField(type=ResponseHHDUC, required=False, _d="Antwort HHD_UC")
+class HITAN3(FinTS3Segment):
+ """Zwei-Schritt-TAN-Einreichung Rückmeldung, version 3
+
+ Source: FinTS Financial Transaction Services, Schnittstellenspezifikation, Sicherheitsverfahren PIN/TAN"""
+ tan_process = DataElementField(type='code', length=1, _d="TAN-Prozess")
+ task_hash_value = DataElementField(type='bin', max_length=256, required=False, _d="Auftrags-Hashwert")
+ task_reference = DataElementField(type='an', max_length=35, required=False, _d="Auftragsreferenz")
+ challenge = DataElementField(type='an', max_length=2048, required=False, _d="Challenge")
+ challenge_valid_until = DataElementGroupField(type=ChallengeValidUntil, required=False, _d="Gültigkeitsdatum und -uhrzeit für Challenge")
+ tan_list_number = DataElementField(type='an', max_length=20, required=False, _d="TAN-Listennummer")
+ ben = DataElementField(type='an', max_length=99, required=False, _d="BEN")
+ tan_medium_name = DataElementField(type='an', max_length=32, required=False, _d="Bezeichnung des TAN-Mediums")
+
class HITAN5(FinTS3Segment):
"""Zwei-Schritt-TAN-Einreichung Rückmeldung, version 5