>>> from fints.parser import FinTS3Parser
>>> s = FinTS3Parser().parse_message(message)
>>> s
- SegmentSequence([fints.segments.HNHBK3(header=fints.formals.SegmentHeader('HNHBK', 1, 3), message_size='000000000428', hbci_version=300, dialogue_id='430711670077=043999659571CN9D=', message_number=2, reference_message=fints.formals.ReferenceMessage(dialogue_id='430711670077=043999659571CN9D=', message_number=2)), fints.segments.HNVSK3(header=fints.formals.SegmentHeader('HNVSK', 998, 3), security_profile=fints.formals.SecurityProfile(security_method='PIN', security_method_version=1), security_function='998', security_role='1', security_identification_details=fints.formals.SecurityIdentificationDetails(name_party='2', cid=None, identifier_party='oIm3BlHv6mQBAADYgbPpp+kWrAQA'), security_datetime=fints.formals.SecurityDateTime(datetime_type='1'), encryption_algorithm=fints.formals.EncryptionAlgorithm(usage_encryption='2', operation_mode='2', encryption_algorithm='13', algorithm_parameter_value=b'00000000', algorithm_parameter_name='5', algorithm_parameter_iv_name='1'), key_name=fints.formals.KeyName(bank_identifier=fints.formals.BankIdentifier(country_identifier='280', bank_code='15050500'), user_id='hermes', key_type='S', key_number=0, key_version=0), compression_function='0'), fints.segments.HNVSD1(header=fints.formals.SegmentHeader('HNVSD', 999, 1), data=SegmentSequence([fints.segments.HNSHK4(header=fints.formals.SegmentHeader('HNSHK', 2, 4), security_profile=fints.formals.SecurityProfile(security_method='PIN', security_method_version=1), security_function='999', security_reference='9166926', security_application_area='1', security_role='1', security_identification_details=fints.formals.SecurityIdentificationDetails(name_party='2', cid=None, identifier_party='oIm3BlHv6mQBAADYgbPpp+kWrAQA'), security_reference_number=1, security_datetime=fints.formals.SecurityDateTime(datetime_type='1'), hash_algorithm=fints.formals.HashAlgorithm(usage_hash='1', hash_algorithm='999', algorithm_parameter_name='1'), signature_algorithm=fints.formals.SignatureAlgorithm(usage_signature='6', signature_algorithm='10', operation_mode='16'), key_name=fints.formals.KeyName(bank_identifier=fints.formals.BankIdentifier(country_identifier='280', bank_code='15050500'), user_id='hermes', key_type='S', key_number=0, key_version=0)), fints.segments.HIRMG2(header=fints.formals.SegmentHeader('HIRMG', 3, 2), response=[fints.formals.Response(response_code='0010', reference_element=None, response_text='Nachricht entgegengenommen.'), fints.formals.Response(response_code='0100', reference_element=None, response_text='Dialog beendet.')]), fints.segments.HNSHA2(header=fints.formals.SegmentHeader('HNSHA', 4, 2), security_reference='9166926')])), fints.segments.HNHBS1(header=fints.formals.SegmentHeader('HNHBS', 5, 1), message_number=2)])
+ SegmentSequence([fints.segments.HNHBK3(header=fints.formals.SegmentHeader('HNHBK', 1, 3), message_size='000000000428', hbci_version=300, dialogue_id='430711670077=043999659571CN9D=', message_number=2, reference_message=fints.formals.ReferenceMessage(dialogue_id='430711670077=043999659571CN9D=', message_number=2)), fints.segments.HNVSK3(header=fints.formals.SegmentHeader('HNVSK', 998, 3), security_profile=fints.formals.SecurityProfile(security_method='PIN', security_method_version=1), security_function='998', security_role='1', security_identification_details=fints.formals.SecurityIdentificationDetails(name_party='2', cid=None, identifier_party='oIm3BlHv6mQBAADYgbPpp+kWrAQA'), security_datetime=fints.formals.SecurityDateTime(datetime_type='1'), encryption_algorithm=fints.formals.EncryptionAlgorithm(usage_encryption='2', operation_mode='2', encryption_algorithm='13', algorithm_parameter_value=b'00000000', algorithm_parameter_name='5', algorithm_parameter_iv_name='1'), key_name=fints.formals.KeyName(bank_identifier=fints.formals.BankIdentifier(country_identifier='280', bank_code='15050500'), user_id='hermes', key_type='S', key_number=0, key_version=0), compression_function='0'), fints.segments.HNVSD1(header=fints.formals.SegmentHeader('HNVSD', 999, 1), data=SegmentSequence([fints.segments.HNSHK4(header=fints.formals.SegmentHeader('HNSHK', 2, 4), security_profile=fints.formals.SecurityProfile(security_method='PIN', security_method_version=1), security_function='999', security_reference='9166926', security_application_area='1', security_role='1', security_identification_details=fints.formals.SecurityIdentificationDetails(name_party='2', cid=None, identifier_party='oIm3BlHv6mQBAADYgbPpp+kWrAQA'), security_reference_number=1, security_datetime=fints.formals.SecurityDateTime(datetime_type='1'), hash_algorithm=fints.formals.HashAlgorithm(usage_hash='1', hash_algorithm='999', algorithm_parameter_name='1'), signature_algorithm=fints.formals.SignatureAlgorithm(usage_signature='6', signature_algorithm='10', operation_mode='16'), key_name=fints.formals.KeyName(bank_identifier=fints.formals.BankIdentifier(country_identifier='280', bank_code='15050500'), user_id='hermes', key_type='S', key_number=0, key_version=0)), fints.segments.HIRMG2(header=fints.formals.SegmentHeader('HIRMG', 3, 2), responses=[fints.formals.Response(code='0010', reference_element=None, text='Nachricht entgegengenommen.'), fints.formals.Response(code='0100', reference_element=None, text='Dialog beendet.')]), fints.segments.HNSHA2(header=fints.formals.SegmentHeader('HNSHA', 4, 2), security_reference='9166926')])), fints.segments.HNHBS1(header=fints.formals.SegmentHeader('HNHBS', 5, 1), message_number=2)])
>>> from fints.parser import FinTS3Serializer
>>> FinTS3Serializer().serialize_message(s)
b"HNHBK:1:3+000000000428+300+430711670077=043999659571CN9D=+2+430711670077=043999659571CN9D=:2'HNVSK:998:3+PIN:1+998+1+2::oIm3BlHv6mQBAADYgbPpp?+kWrAQA+1+2:2:13:@8@00000000:5:1+280:15050500:hermes:S:0:0+0'HNVSD:999:1+@195@HNSHK:2:4+PIN:1+999+9166926+1+1+2::oIm3BlHv6mQBAADYgbPpp?+kWrAQA+1+1+1:999:1+6:10:16+280:15050500:hermes:S:0:0'HIRMG:3:2+0010::Nachricht entgegengenommen.+0100::Dialog beendet.'HNSHA:4:2+9166926''HNHBS:5:1+2'"
>>> s.message_number = 12345
ValueError: Value '12345' cannot be rendered: max_length=4 exceeded
-The only exception is: Every field can be set to ``None`` in order to clear the field and make it unset, recursively. No checking is performed whether all fields that are required (or conditionally required) by the specification are set.
+The only exception is: Every field can be set to ``None`` in order to clear the field and make it unset, recursively. No checking is performed whether all fields that are required (or conditionally required) by the specification are set. For convenience, an unset constructed field will still be filled with an instance of the field's value type, so that subfield accessing will always work, without encountering ``None`` values on the way.
.. code-block:: python
>>> from fints.segments import HIRMG2
>>> s = HIRMG2()
>>> s
- fints.segments.HIRMG2(header=fints.formals.SegmentHeader('HIRMG', None, 2), response=[fints.formals.Response(response_code=None, reference_element=None, response_text=None)])
- >>> s.response[0].response_code = '0010'
- >>> s.response[1].response_code = '0100'
+ fints.segments.HIRMG2(header=fints.formals.SegmentHeader('HIRMG', None, 2), responses=[fints.formals.Response(code=None, reference_element=None, text=None)])
+ >>> s.responses[0].code = '0010'
+ >>> s.responses[1].code = '0100'
>>> s.print_nested()
fints.segments.HIRMG2(
header = fints.formals.SegmentHeader('HIRMG', None, 2),
- response = [
+ responses = [
fints.formals.Response(
- response_code = '0010',
+ code = '0010',
reference_element = None,
- response_text = None,
+ text = None,
),
fints.formals.Response(
- response_code = '0100',
+ code = '0100',
reference_element = None,
- response_text = None,
+ text = None,
),
],
)
- >>> HIRMG2(response=[fints.formals.Response('2342')]).print_nested()
+ >>> HIRMG2(responses=[fints.formals.Response('2342')]).print_nested()
fints.segments.HIRMG2(
header = fints.formals.SegmentHeader('HIRMG', None, 2),
- response = [
+ responses = [
fints.formals.Response(
- response_code = '2342',
+ code = '2342',
reference_element = None,
- response_text = None,
+ text = None,
),
],
)
import requests
-from .message import FinTSMessage
+from .message import FinTSMessage, FinTSResponse
from fints.parser import FinTS3Parser
from fints.utils import Password
)
if r.status_code < 200 or r.status_code > 299:
raise FinTSConnectionError('Bad status code {}'.format(r.status_code))
- retval = base64.b64decode(r.content.decode('iso-8859-1'))
+ response = base64.b64decode(r.content.decode('iso-8859-1'))
+ retval = FinTSResponse(response)
print("Received <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
with Password.protect():
- FinTS3Parser().parse_message(retval).print_nested()
+ retval.print_nested()
print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
return retval
\ No newline at end of file
If a list/tuple is specified, segments returning any matching type will be returned.
:param version: Either an int specifying a segment version, or a list or tuple of ints.
If a list/tuple is specified, segments returning any matching version will be returned.
- :param callback: A callable that will be given the segment as its sole argument and must return a booleans indicating whether to return this segment.
+ :param callback: A callable that will be given the segment as its sole argument and must return a boolean indicating whether to return this segment.
:param recurse: If True (the default), recurse into SegmentSequenceField values, otherwise only look at segments in this SegmentSequence.
The match results of all given parameters will be AND-combined.
tan = DataElementField(type='an', max_length=99, required=False)
class Response(DataElementGroup):
- response_code = DataElementField(type='dig', length=4)
+ code = DataElementField(type='dig', length=4)
reference_element = DataElementField(type='an', max_length=7)
- response_text = DataElementField(type='an', max_length=80)
+ text = DataElementField(type='an', max_length=80)
parameters = DataElementField(type='an', max_length=35, max_count=10, required=False)
class AccountInformation(DataElementGroup):
user_id_field_text = DataElementField(type='an', max_length=30, required=False)
customer_id_field_text = DataElementField(type='an', max_length=30, required=False)
transaction_tans_required = DataElementGroupField(type=TransactionTanRequired, max_count=999, required=False)
+
+class SupportedLanguages2(DataElementGroup):
+ languages = DataElementField(type='code', max_length=3, min_count=1, max_count=9)
+
+class SupportedHBCIVersions2(DataElementGroup):
+ versions = DataElementField(type='code', max_length=3, min_count=1, max_count=9)
import random
import re
-from fints.models import TANMethod1, TANMethod2, TANMethod3, TANMethod4, TANMethod5, TANMethod6
from .segments.message import HNHBK, HNHBS, HNSHA, HNSHK, HNVSD, HNVSK
from .parser import FinTS3Parser
-
+from .formals import SegmentSequence
+from .segments import ParameterSegment
class FinTSMessage:
def __init__(self, blz, username, pin, systemid, dialogid, msgno, encrypted_segments, tan_mechs=None, tan=None):
return str(self.build_header()) + ''.join([str(s) for s in self.segments])
-class FinTSResponse:
- def __init__(self, data):
- self.segments = FinTS3Parser.explode_segments(data)
- self.payload = self.segments
- for seg in self.segments:
- if seg[0][0] == 'HNVSD':
- self.payload = FinTS3Parser.explode_segments(seg[1])
-
- def __str__(self):
- return str(self.payload)
-
+class FinTSResponse(SegmentSequence):
def is_success(self):
- summary = self.get_summary_by_segment('HIRMG')
- for code, msg in summary.items():
- if code[0] == "9":
- return False
+ for seg in self.find_segments('HIRMG'):
+ for response in seg.responses:
+ if response.code.startswith('9'):
+ return False
return True
def get_dialog_id(self):
- seg = self._find_segment('HNHBK')
+ seg = self.find_segment_first('HNHBK')
if not seg:
raise ValueError('Invalid response, no HNHBK segment')
- return seg[3]
+ return seg.dialogue_id
def get_bank_name(self):
- seg = self._find_segment('HIBPA')
+ seg = self.find_segment_first('HIBPA')
if seg:
- if len(seg) > 3:
- return seg[3]
+ return seg.bank_name
def get_systemid(self):
- seg = self._find_segment('HISYN')
+ seg = self.find_segment_first('HISYN')
if not seg:
raise ValueError('Could not find systemid')
- return seg[1]
-
- def get_summary_by_segment(self, name=None):
- if name and name not in ('HIRMS', 'HIRMG'):
- raise ValueError('Unsupported segment for message summary')
- if name:
- names = [name]
- else:
- names = ('HIRMS', 'HIRMG')
-
- res = {}
- for name in names:
- seg = self._find_segment(name)
- for de in seg[1:]:
- res[de[0]] = de[2]
- return res
+ return seg.customer_system_id
def get_hkkaz_max_version(self):
- return self._get_segment_max_version('HIKAZS')
+ return max((seg.header.version for seg in self.find_segments('HIKAZS')), default=3)
def get_hksal_max_version(self):
- return self._get_segment_max_version('HISALS')
+ return max((seg.header.version for seg in self.find_segments('HISALS')), default=3)
def get_supported_tan_mechanisms(self):
- segs = self._find_segments('HIRMS')
tan_methods = []
- for seg in segs:
- for deg in seg:
- if deg[0] == '3920':
- tan_methods.extend( deg[3:] )
+ for seg in self.find_segments('HIRMS'):
+ for response in seg.responses:
+ if response.code == '3920':
+ tan_methods.extend( response.parameters )
# Get parameters for tan methods
- segs = self._find_segments('HITANS')
methods = []
- for seg in segs:
- if seg[0][2] == '1':
- model = TANMethod1
- elif seg[0][2] == '2':
- model = TANMethod2
- elif seg[0][2] == '3':
- model = TANMethod3
- elif seg[0][2] == '4':
- model = TANMethod4
- elif seg[0][2] == '5':
- model = TANMethod5
- elif seg[0][2] == '6':
- model = TANMethod6
- else:
+ for seg in self.find_segments('HITANS'):
+ if not isinstance(seg, ParameterSegment):
raise NotImplementedError(
"HITANS segment version {} is currently not implemented".format(
- seg[0][2]
+ seg.header.version
)
)
- step = len(model.args)
- tan_params = seg[3][3:]
- for i in range(len(tan_params) // step):
- part = spl[i * step:(i + 1) * step]
- method = model(*part)
- if method.security_feature in tan_methods:
- methods.append(method)
+ if seg.parameters.twostep_parameters.security_function in tan_methods:
+ methods.append(method)
return methods
def _find_segment_for_reference(self, name, ref):
- segs = self._find_segments(name)
- for seg in segs:
- if len(seg[0]) < 4: continue
- if seg[0][3] == str(ref.segmentno):
+ for seg in self.find_segments(name):
+ if seg.header.reference == int(str(ref.segmentno)):
return seg
def get_touchdowns(self, msg: FinTSMessage):
if p[0] == "3040":
touchdown[msgseg.type] = p[3]
return touchdown
-
- def _get_segment_max_version(self, name):
- v = 3
- segs = self._find_segments(name)
- for s in segs:
- curver = int(s[0][2])
- if curver > v:
- v = curver
- return v
-
- def _find_segment(self, name):
- return self._find_segments(name, True)
-
- def _find_segments(self, name, one=False, in_payload=False):
- found = []
- for s in (self.payload if in_payload else self.segments):
- if s[0][0] == name:
- if one:
- return s
- found.append(s)
- # FIXME Simple hack: Seach in inner message if no success in outer message
- if not found and not in_payload:
- return self._find_segments(name, one, in_payload=True)
- return found
for val in getattr(segment, name):
seg.append( self.serialize_deg(val) )
else:
- seg.append( self.serialize_deg(getattr(segment, name)) )
+ seg.append( self.serialize_deg(getattr(segment, name), allow_skip=True) )
if segment._additional_data:
seg.extend(segment._additional_data)
return seg
- def serialize_deg(self, deg):
+ def serialize_deg(self, deg, allow_skip=False):
result = []
- skipping_end = False
+ filler = []
for name,field in deg._fields.items():
repeat = field.count != 1
val = getattr(deg, name)
empty = False
- if not field.required:
+ if field.count == 1 and not field.required:
if isinstance(val, Container):
if val.is_unset():
empty = True
elif val is None:
empty = True
- if skipping_end and not empty:
- raise ValueError("Inconsistency during serialization: Field {}.{} not empty, but a field before it was".format(deg.__class__.__name__, name))
-
if empty:
- skipping_end = True
+ if allow_skip:
+ filler.append(None)
+ else:
+ result.append(None)
continue
+ else:
+ if filler:
+ result.extend(filler)
+ filler.clear()
if not constructed:
if repeat:
import re
-from fints.formals import Container, ContainerMeta, SegmentHeader, DataElementGroupField, DataElementField, ReferenceMessage, SegmentSequenceField, SecurityProfile, SecurityIdentificationDetails, SecurityDateTime, EncryptionAlgorithm, KeyName, Certificate, HashAlgorithm, SignatureAlgorithm, UserDefinedSignature, Response, AccountInformation, AccountLimit, AllowedTransaction, ParameterTwostepTAN1, ParameterTwostepTAN3, ParameterPinTan
+from fints.formals import Container, ContainerMeta, SegmentHeader, DataElementGroupField, DataElementField, ReferenceMessage, SegmentSequenceField, SecurityProfile, SecurityIdentificationDetails, SecurityDateTime, EncryptionAlgorithm, KeyName, Certificate, HashAlgorithm, SignatureAlgorithm, UserDefinedSignature, Response, AccountInformation, AccountLimit, AllowedTransaction, ParameterTwostepTAN1, ParameterTwostepTAN3, ParameterPinTan, SupportedLanguages2, SupportedHBCIVersions2, BankIdentifier
from fints.utils import classproperty, SubclassesMixin
user_defined_signature = DataElementGroupField(type=UserDefinedSignature, required=False)
class HIRMG2(FinTS3Segment):
- response = DataElementGroupField(type=Response, min_count=1, max_count=99)
+ responses = DataElementGroupField(type=Response, min_count=1, max_count=99)
class HIRMS2(FinTS3Segment):
- response = DataElementGroupField(type=Response, min_count=1, max_count=99)
+ responses = DataElementGroupField(type=Response, min_count=1, max_count=99)
class HIUPA4(FinTS3Segment):
user_identifier = DataElementField(type='id')
parameters = DataElementGroupField(type=ParameterPinTan, _d="Parameter PIN/TAN-spezifische Informationen")
+class HIBPA3(FinTS3Segment):
+ bpd_version = DataElementField(type='num', max_length=3)
+ bank_identifier = DataElementGroupField(type=BankIdentifier)
+ bank_name = DataElementField(type='an', max_length=60)
+ number_tasks = DataElementField(type='num', max_length=3)
+ supported_languages = DataElementGroupField(type=SupportedLanguages2)
+ supported_hbci_version = DataElementGroupField(type=SupportedHBCIVersions2)
+ max_message_length = DataElementField(type='num', max_length=4, required=False)
+ min_timeout = DataElementField(type='num', max_length=4, required=False)
+ max_timeout = DataElementField(type='num', max_length=4, required=False)