SynchronisationMode, TwoStepParametersCommon,
TANMediaType2, TANMediaClass4,
)
-from .message import FinTSMessageOLD
+from .message import FinTSInstituteMessage, FinTSMessageOLD
from .models import (
SEPAAccount, TANChallenge, TANChallenge3,
TANChallenge4, TANChallenge5, TANChallenge6,
self.upd_version = 0
self.upa = None
self.upd = SegmentSequence()
- self.allowed_security_functions = []
- self.selected_security_function = None
- self.selected_tan_medium = None
self.product_name = 'pyfints'
self.product_version = '0.2'
self.response_callbacks = []
def _ensure_system_id(self):
raise NotImplemented()
+ def _process_response(self, segment, response):
+ pass
+
+ def _process_response_segments(self, message: FinTSInstituteMessage, internal_send=True):
+ bpa = message.find_segment_first(HIBPA3)
+ if bpa:
+ self.bpa = bpa
+ self.bpd_version = bpa.bpd_version
+ self.bpd = SegmentSequence(
+ message.find_segments(
+ callback = lambda m: len(m.header.type) == 6 and m.header.type[1] == 'I' and m.header.type[5] == 'S'
+ )
+ )
+
+ upa = message.find_segment_first(HIUPA4)
+ if upa:
+ self.upa = upa
+ self.upd_version = upa.upd_version
+ self.upd = SegmentSequence(
+ message.find_segments('HIUPD')
+ )
+
+ for seg in message.find_segments(HIRMG2):
+ for response in seg.responses:
+ if not internal_send:
+ self._log_response(None, response)
+
+ self._call_callbacks(None, response)
+
+ self._process_response(None, response)
+
+ for seg in message.find_segments(HIRMS2):
+ for response in seg.responses:
+ segment = None # FIXME: Provide segment
+
+ if not internal_send:
+ self._log_response(segment, response)
+
+ self._call_callbacks(segment, response)
+
+ self._process_response(segment, response)
+
+ def process_dialog_response(self, message: FinTSInstituteMessage):
+ self._process_response_segments(message, internal_send=True)
+
def _send(self, dialog, *segments):
"Internal send, may be overriden in subclasses"
- return dialog.send(*segments)
+ response = dialog.send(*segments)
+ self._process_response_segments(response, internal_send=False)
+ return response
def __enter__(self):
if self._standing_dialog:
self.system_id = data.get('system_id', self.system_id)
if all(x in data for x in ('bpd_bin', 'bpa_bin', 'bpd_version')):
- if data['bpd_version'] >= self.bpd_version:
+ if data['bpd_version'] >= self.bpd_version and data['bpa_bin']:
self.bpd = SegmentSequence(data['bpd_bin'])
self.bpa = SegmentSequence(data['bpa_bin']).segments[0]
self.bpd_version = data['bpd_version']
- self.selected_security_function = data.get('selected_security_function', self.selected_security_function)
- self.allowed_security_functions = data.get('allowed_security_functions', self.allowed_security_functions)
-
if all(x in data for x in ('upd_bin', 'upa_bin', 'upd_version')):
- if data['upd_version'] >= self.upd_version:
+ if data['upd_version'] >= self.upd_version and data['upa_bin']:
self.upd = SegmentSequence(data['upd_bin'])
self.upa = SegmentSequence(data['upa_bin']).segments[0]
self.upd_version = data['upd_version']
- def get_data(self, including_private=False):
- # FIXME Test, document
+ def _get_data_v1(self, including_private=False):
data = {
"system_id": self.system_id,
"bpd_bin": self.bpd.render_bytes(),
"bpa_bin": FinTS3Serializer().serialize_message(self.bpa) if self.bpa else None,
"bpd_version": self.bpd_version,
- "selected_security_function": self.selected_security_function,
- "selected_tan_medium": self.selected_tan_medium,
}
if including_private:
"upd_bin": self.upd.render_bytes(),
"upa_bin": FinTS3Serializer().serialize_message(self.upa) if self.upa else None,
"upd_version": self.upd_version,
- "allowed_security_functions": self.allowed_security_functions,
})
+ return data
+
+ def get_data(self, including_private=False):
+ # FIXME Test, document
+ data = self._get_data_v1(including_private=including_private)
return compress_datablob(DATA_BLOB_MAGIC, 1, data)
def set_data(self, blob):
# FIXME Test, document
decompress_datablob(DATA_BLOB_MAGIC, self, blob)
- def process_institute_response(self, message):
- bpa = message.find_segment_first(HIBPA3)
- if bpa:
- self.bpa = bpa
- self.bpd_version = bpa.bpd_version
- self.bpd = SegmentSequence(
- message.find_segments(
- callback = lambda m: len(m.header.type) == 6 and m.header.type[1] == 'I' and m.header.type[5] == 'S'
- )
- )
-
- upa = message.find_segment_first(HIUPA4)
- if upa:
- self.upa = upa
- self.upd_version = upa.upd_version
- self.upd = SegmentSequence(
- message.find_segments('HIUPD')
- )
-
- # We'll delay calling the callback for HIRMG/3060 ("please note the warnings")
- # to see if any warning other than HIRMS(HKVVD)/3050 ("UPD/BPD not current; current version attached")
- # or HIRMS(HKVVD)/3920 ("allowed TAN methods") are present
- # 3050 and 3920 are handled completely internally and never passed to the callback
- # 3060 is only passed to the callbacks, if any warning/error other than 3050/3920 is present
- self.delayed_callback_calls = []
-
- for seg in message.find_segments(HIRMG2):
- for response in seg.responses:
- self._log_response(None, response)
-
- if response.code == '3060':
- self.delayed_callback_calls.append( (None, response) )
- else:
- self._call_callbacks(None, response)
-
- if response.code.startswith('9'):
- raise Exception("FinTS error response: {!r}".format(response))
-
- self._process_response(None, response)
-
- for seg in message.find_segments(HIRMS2):
- for response in seg.responses:
- segment = None # FIXME: Provide segment
-
- self._log_response(None, response)
-
- if response.code not in ('3050', '3920'):
- if not response.code.startswith('0') and not response.code.startswith('1'):
- for cb_data in self.delayed_callback_calls:
- self._call_callbacks(*cb_data)
- self.delayed_callback_calls.clear()
-
- self._call_callbacks(segment, response)
-
- self._process_response(segment, response)
-
- def _process_response(self, segment, response):
- if response.code == '3920':
- self.allowed_security_functions = list(response.parameters)
- if self.selected_security_function is None or not self.selected_security_function in self.allowed_security_functions:
- # Select the first available twostep security_function that we support
- for security_function, parameter in self.get_tan_mechanisms().items():
- if security_function == '999':
- # Skip onestep TAN
- continue
- if parameter.tan_process != '2':
- # Only support process variant 2 for now
- continue
- try:
- self.set_tan_mechanism(parameter.security_function)
- break
- except NotImplementedError:
- pass
- else:
- # Fall back to onestep
- self.set_tan_mechanism('999')
-
def _log_response(self, segment, response):
if response.code[0] in ('0', '1'):
log_target = logger.info
"""
with self._get_dialog() as dialog:
- response = dialog.send(HKSPA1())
+ response = self._send(dialog, HKSPA1())
self.accounts = []
for seg in response.find_segments(HISPA1):
while touchdown or touchdown_counter == 1:
seg = segment_factory(touchdown)
- rm = dialog.send(seg)
+ rm = self._send(dialog, seg)
for resp in rm.response_segments(seg, *args, **kwargs):
responses.append(resp)
all_accounts=False,
)
- response = dialog.send(seg)
+ response = self._send(dialog, seg)
for resp in response.response_segments(seg, 'HISAL'):
return resp.balance_booked.as_mt940_Balance()
account=hkwpd._fields['account'].type.from_sepa_account(account),
)
- response = dialog.send(seg)
+ response = self._send(dialog, seg)
for resp in response.response_segments(seg, 'HIWPD'):
## FIXME BROKEN
book_as_single
)))
- resp = dialog.send(self._get_start_sepa_debit_message(
+ resp = self._send(dialog, self._get_start_sepa_debit_message(
dialog, account, pain_message, tan_method, tan_description, multiple, control_sum, currency,
book_as_single
))
logger.debug('Got response: {}'.format(resp))
return self._tan_requiring_response(dialog, resp)
- def get_tan_mechanisms(self):
- """
- Get the available TAN mechanisms.
-
- :return: Dictionary of security_function: TwoStepParameters[1-5] objects.
- """
-
- retval = OrderedDict()
-
- for version in range(1, 6):
- for seg in self.bpd.find_segments('HITANS', version):
- for parameter in seg.parameter.twostep_parameters:
- if parameter.security_function in self.allowed_security_functions:
- retval[parameter.security_function] = parameter
-
- return retval
-
- def get_current_tan_mechanism(self):
- return self.selected_security_function
-
- def set_tan_mechanism(self, security_function):
- self.selected_security_function = security_function
-
- def set_tan_medium(self, tan_medium):
- self.selected_tan_medium = tan_medium
-
- def get_tan_media(self, media_type = TANMediaType2.ALL, media_class = TANMediaClass4.ALL):
- """Get information about TAN lists/generators.
-
- Returns tuple of fints.formals.TANUsageOption and a list of fints.formals.TANMedia4 or fints.formals.TANMedia5 objects."""
-
- with self._get_dialog() as dialog:
- hktab = self._find_highest_supported_command(HKTAB4, HKTAB5)
-
- seg = hktab(
- tan_media_type = media_type,
- tan_media_class = str(media_class),
- )
-
- response = dialog.send(seg)
-
- for resp in response.response_segments(seg, 'HITAB'):
- return resp.tan_usage_option, list(resp.tan_media_list)
-
def add_response_callback(self, cb):
# FIXME document
self.response_callbacks.append(cb)
self.pin = Password(pin)
self._pending_tan = None
self.connection = FinTSHTTPSConnection(server)
+ self.allowed_security_functions = []
+ self.selected_security_function = None
+ self.selected_tan_medium = None
super().__init__(bank_identifier=bank_identifier, user_id=user_id, customer_id=customer_id, *args, **kwargs)
def _new_dialog(self, lazy_init=False):
raise ValueError('Could not find system_id')
self.system_id = seg.system_id
+ def _set_data_v1(self, data):
+ super()._set_data_v1(data)
+ self.selected_tan_medium = data.get('selected_tan_medium', self.selected_tan_medium)
+ self.selected_security_function = data.get('selected_security_function', self.selected_security_function)
+ self.allowed_security_functions = data.get('allowed_security_functions', self.allowed_security_functions)
+
+ def _get_data_v1(self, including_private=False):
+ data = super()._get_data_v1(including_private=including_private)
+ data.update({
+ "selected_security_function": self.selected_security_function,
+ "selected_tan_medium": self.selected_tan_medium,
+ })
+
+ if including_private:
+ data.update({
+ "allowed_security_functions": self.allowed_security_functions,
+ })
+
+ return data
+
def _get_tan_segment(self, orig_seg, tan_process, tan_seg=None):
tan_mechanism = self.get_tan_mechanisms()[self.get_current_tan_mechanism()]
# FIXME Try to return a better return code
return response
+
+ def _process_response(self, segment, response):
+ if response.code == '3920':
+ self.allowed_security_functions = list(response.parameters)
+ if self.selected_security_function is None or not self.selected_security_function in self.allowed_security_functions:
+ # Select the first available twostep security_function that we support
+ for security_function, parameter in self.get_tan_mechanisms().items():
+ if security_function == '999':
+ # Skip onestep TAN
+ continue
+ if parameter.tan_process != '2':
+ # Only support process variant 2 for now
+ continue
+ try:
+ self.set_tan_mechanism(parameter.security_function)
+ break
+ except NotImplementedError:
+ pass
+ else:
+ # Fall back to onestep
+ self.set_tan_mechanism('999')
+
+ def get_tan_mechanisms(self):
+ """
+ Get the available TAN mechanisms.
+
+ :return: Dictionary of security_function: TwoStepParameters[1-5] objects.
+ """
+
+ retval = OrderedDict()
+
+ for version in range(1, 6):
+ for seg in self.bpd.find_segments('HITANS', version):
+ for parameter in seg.parameter.twostep_parameters:
+ if parameter.security_function in self.allowed_security_functions:
+ retval[parameter.security_function] = parameter
+
+ return retval
+
+ def get_current_tan_mechanism(self):
+ return self.selected_security_function
+
+ def set_tan_mechanism(self, security_function):
+ if self._standing_dialog:
+ raise Exception("Cannot change TAN mechanism with a standing dialog")
+ self.selected_security_function = security_function
+
+ def set_tan_medium(self, tan_medium):
+ if self._standing_dialog:
+ raise Exception("Cannot change TAN medium with a standing dialog")
+ self.selected_tan_medium = tan_medium
+
+ def get_tan_media(self, media_type = TANMediaType2.ALL, media_class = TANMediaClass4.ALL):
+ """Get information about TAN lists/generators.
+
+ Returns tuple of fints.formals.TANUsageOption and a list of fints.formals.TANMedia4 or fints.formals.TANMedia5 objects."""
+
+ with self._get_dialog() as dialog:
+ hktab = self._find_highest_supported_command(HKTAB4, HKTAB5)
+
+ seg = hktab(
+ tan_media_type = media_type,
+ tan_media_class = str(media_class),
+ )
+
+ response = self._send(dialog, seg)
+
+ for resp in response.response_segments(seg, 'HITAB'):
+ return resp.tan_usage_option, list(resp.tan_media_list)
+