+import datetime
import logging
import re
-import datetime
-
from decimal import Decimal
+from mt940.models import Balance
+from sepaxml import SepaTransfer
+
from .connection import FinTSHTTPSConnection
from .dialog import FinTSDialog
from .message import FinTSMessage
-from .models import SEPAAccount, TANMethod5, TANMethod6
-from .segments.auth import HKTAN, HKTAB
+from .message import FinTSResponse
+from .models import SEPAAccount, TANMethod
from .segments.accounts import HKSPA
-from .segments.statement import HKKAZ
-from .segments.saldo import HKSAL
+from .segments.auth import HKTAN, HKTAB
from .segments.depot import HKWPD
+from .segments.saldo import HKSAL
+from .segments.statement import HKKAZ
from .segments.transfer import HKCCS
-from .message import FinTSResponse
from .utils import mt940_to_array, MT535_Miniparser, split_for_data_groups, split_for_data_elements, Password
-from mt940.models import Balance
-from sepaxml import SepaTransfer
logger = logging.getLogger(__name__)
def get_tan_methods(self):
dialog = self._new_dialog()
+ dialog.sync()
dialog.init()
+ return dialog.tan_mechs
- # Get tan methods
- res = FinTSResponse(str(dialog.bpd))
- seg = res._find_segment('HIRMS')
- deg = split_for_data_groups(seg)
- tan_methods = []
- for de in deg:
- if de[0:4] == '3920':
- d = split_for_data_elements(de)
- for i in range(3, len(d)):
- tan_methods.append(d[i])
-
- # Get parameters for tan methods
- seg = res._find_segments('HITANS')
- methods = []
- for s in seg:
- spl = split_for_data_elements(s)
- if spl[2] == '5':
- model = TANMethod5
- elif spl[2] == '6':
- model = TANMethod6
- else:
- raise NotImplementedError(
- "HITANS segment version {} is currently not implemented".format(
- spl[2]
- )
- )
-
- step = len(model._fields)
- for i in range(len(spl) // step):
- part = spl[6 + i * step:6 + (i + 1) * step]
- method = model(*part)
- if method.security_feature in tan_methods:
- methods.append(method)
-
- return methods
+ def _create_get_tan_description_message(self, dialog: FinTSDialog):
+ return self._new_message(dialog, [
+ HKTAB(3)
+ ])
def get_tan_description(self):
dialog = self._new_dialog()
dialog.sync()
dialog.init()
- def _get_msg():
- return self._new_message(dialog, [
- HKTAB(3)
- ])
-
with self.pin.protect():
- logger.debug('Sending HKTAB: {}'.format(_get_msg()))
+ logger.debug('Sending HKTAB: {}'.format(self._create_get_tan_description_message(dialog)))
- resp = dialog.send(_get_msg())
+ resp = dialog.send(self._create_get_tan_description_message(dialog))
logger.debug('Got HKTAB response: {}'.format(resp))
dialog.end()
self.hksalversion = 6
self.hkkazversion = 6
self.tan_mechs = []
- self.bpd = None # Bank Parameter Data
def _get_msg_sync(self):
seg_identification = HKIDN(3, self.blz, self.username, 0)
logger.debug('Dialog ID: {}'.format(self.dialogid))
logger.debug('HKKAZ max version: {}'.format(self.hkkazversion))
logger.debug('HKSAL max version: {}'.format(self.hksalversion))
- logger.debug('TAN mechanisms: {}'.format(', '.join(self.tan_mechs)))
+ logger.debug('TAN mechanisms: {}'.format(', '.join(str(t) for t in self.tan_mechs)))
self.end()
def init(self):
with self.pin.protect():
logger.debug('Sending INIT: {}'.format(self._get_msg_init()))
- self.bpd = self.send(self._get_msg_init())
- logger.debug('Got INIT response: {}'.format(self.bpd))
+ res = self.send(self._get_msg_init())
+ logger.debug('Got INIT response: {}'.format(res))
- self.dialogid = self.bpd.get_dialog_id()
+ self.dialogid = res.get_dialog_id()
logger.info('Received dialog ID: {}'.format(self.dialogid))
return self.dialogid
import random
import re
+from fints.models import TANMethod5, TANMethod6
from fints.utils import split_for_data_groups, split_for_data_elements, fints_unescape
from .segments.message import HNHBK, HNHBS, HNSHA, HNSHK, HNVSD, HNVSK
self.segments = []
self.encrypted_segments = []
- if tan_mechs and '999' not in tan_mechs:
+ if tan_mechs and '999' not in [t.security_feature for t in tan_mechs]:
self.profile_version = 2
- self.security_function = tan_mechs[0]
+ self.security_function = tan_mechs[0].security_feature
else:
self.profile_version = 1
self.security_function = '999'
def get_supported_tan_mechanisms(self):
segs = self._find_segments('HIRMS')
- for s in segs:
- seg = split_for_data_groups(s)[1:]
- for s in seg:
- id, msg = s.split('::', 1)
- if id == "3920":
- m = self.RE_TANMECH.search(msg)
- if m:
- return [m.group(0)]
- return False
+ tan_methods = []
+ for seg in segs:
+ deg = split_for_data_groups(seg)
+ for de in deg:
+ if de[0:4] == '3920':
+ d = split_for_data_elements(de)
+ for i in range(3, len(d)):
+ tan_methods.append(d[i])
+
+ # Get parameters for tan methods
+ seg = self._find_segments('HITANS')
+ methods = []
+ for s in seg:
+ spl = split_for_data_elements(s)
+ if spl[2] == '5':
+ model = TANMethod5
+ elif spl[2] == '6':
+ model = TANMethod6
+ else:
+ raise NotImplementedError(
+ "HITANS segment version {} is currently not implemented".format(
+ spl[2]
+ )
+ )
+
+ step = len(model.args)
+ for i in range(len(spl) // step):
+ part = spl[6 + i * step:6 + (i + 1) * step]
+ method = model(*part)
+ if method.security_feature in tan_methods:
+ methods.append(method)
+
+ return methods
def _find_segment_for_reference(self, name, ref):
segs = self._find_segments(name)
'ISIN name market_value value_symbol valuation_date pieces total_value acquisitionprice')
-# Source: PIN/TAN docs – Verfahrensparameter Zwei-Schritt-Verfahren, Elementversion #5
-TANMethod5 = namedtuple('TANMethod5', 'security_feature tan_process tech_id zka_id zka_version name max_length_input '
- 'allowed_format text_returnvalue max_length_returnvalue '
- 'number_of_supported_lists multiple_tans_allowed '
- 'tan_time_dialog_association tan_list_number_required '
- 'cancel_allowed sms_charge_account_required principal_account_required '
- 'challenge_class_required challenge_value_required '
- 'initialization_mode description_required supported_media_number')
-
-# Source: PIN/TAN docs – Verfahrensparameter Zwei-Schritt-Verfahren, Elementversion #6
-TANMethod6 = namedtuple('TANMethod6', 'security_feature tan_process tech_id zka_id zka_version name max_length_input '
- 'allowed_format text_returnvalue max_length_returnvalue '
- 'multiple_tans_allowed tan_time_dialog_association '
- 'cancel_allowed sms_charge_account_required principal_account_required '
- 'challenge_class_required challenge_structured '
- 'initialization_mode description_required hhd_uc_required '
- 'supported_media_number')
+class TANMethod:
+ args = ['security_feature']
+
+ def __init__(self, *args, **kwargs):
+ for i, a in enumerate(args):
+ setattr(self, self.args[i], a)
+ for k, v in kwargs.items():
+ if k in self.args:
+ setattr(self, k, v)
+
+ def __repr__(self):
+ return '{}({})'.format(
+ self.__class__.__name__,
+ ', '.join(['{}={}'.format(k, repr(getattr(self, k))) for k in self.args])
+ )
+
+
+class TANMethod5(TANMethod):
+ # Source: PIN/TAN docs – Verfahrensparameter Zwei-Schritt-Verfahren, Elementversion #5
+ args = ['security_feature', 'tan_process', 'tech_id', 'zka_id', 'zka_version', 'name', 'max_length_input',
+ 'allowed_format', 'text_returnvalue', 'max_length_returnvalue', 'number_of_supported_lists',
+ 'multiple_tans_allowed', 'tan_time_dialog_association', 'tan_list_number_required', 'cancel_allowed',
+ 'sms_charge_account_required', 'principal_account_required', 'challenge_class_required',
+ 'challenge_value_required', 'initialization_mode', 'description_required', 'supported_media_number']
+
+
+class TANMethod6(TANMethod):
+ # Source: PIN/TAN docs – Verfahrensparameter Zwei-Schritt-Verfahren, Elementversion #6
+ args = ['security_feature', 'tan_process', 'tech_id', 'zka_id', 'zka_version', 'name', 'max_length_input',
+ 'allowed_format', 'text_returnvalue', 'max_length_returnvalue', 'multiple_tans_allowed',
+ 'tan_time_dialog_association', 'cancel_allowed', 'sms_charge_account_required',
+ 'principal_account_required',
+ 'challenge_class_required', 'challenge_structured', 'initialization_mode', 'description_required',
+ 'hhd_uc_required', 'supported_media_number']