From: Henryk Plötz Date: Mon, 6 Aug 2018 18:09:49 +0000 (+0200) Subject: New-style object oriented interface X-Git-Tag: v2.0.0~1^2~160 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=5444d53bb5cb725509bcb227bcb3c1b749e782c4;p=thirdparty%2Fpython-fints.git New-style object oriented interface --- diff --git a/fints/formals.py b/fints/formals.py new file mode 100644 index 0000000..7bb7cd7 --- /dev/null +++ b/fints/formals.py @@ -0,0 +1,402 @@ +import re +import warnings +from contextlib import suppress +from inspect import getmro +from copy import deepcopy +from collections import OrderedDict + +from fints.utils import classproperty, SubclassesMixin + +class ValueList: + def __init__(self, parent): + self._parent = parent + self._data = [] + + def __getitem__(self, i): + if i >= len(self._data): + self.__setitem__(i, None) + if i < 0: + raise IndexError("Cannot access negative index") + return self._data[i] + + def __setitem__(self, i, value): + if i < 0: + raise IndexError("Cannot access negative index") + + if self._parent.count is not None: + if i >= self._parent.count: + raise IndexError("Cannot access index {} beyond count {}".format(i, self._parent.count)) + elif self._parent.max_count is not None: + if i >= self._parent.max_count: + raise IndexError("Cannot access index {} beyound max_count {}".format(i, self._parent.max_count)) + + for x in range(len(self._data), i): + self.__setitem__(x, None) + + if value is None: + value = self._parent._default_value() + else: + value = self._parent._parse_value(value) + self._parent._check_value(value) + + if i == len(self._data): + self._data.append(value) + else: + self._data[i] = value + + def __delitem__(self, i): + self.__setitem__(i, None) + + def __len__(self): + if self._parent.count is not None: + return self._parent.count + else: + retval = len(self._data) + if self._parent.min_count is not None: + if self._parent.min_count > retval: + retval = self._parent.min_count + return retval + + def __iter__(self): + for i in range(len(self)): + yield self[i] + +class Field: + def __init__(self, length=None, min_length=None, max_length=None, count=None, min_count=None, max_count=None, required=True): + if length is not None and (min_length is not None or max_length is not None): + raise ValueError("May not specify both 'length' AND 'min_length'/'max_length'") + if count is not None and (min_count is not None or max_count is not None): + raise ValueError("May not specify both 'count' AND 'min_count'/'max_count'") + + self.length = length + self.min_length = min_length + self.max_length = max_length + self.count = count + self.min_count = min_count + self.max_count = max_count + self.required = required + + if not self.count and not self.min_count and not self.max_count: + self.count = 1 + + def _default_value(self): + return None + + def __get__(self, instance, owner): + if self not in instance._values: + self.__set__(instance, None) + + return instance._values[self] + + def __set__(self, instance, value): + if value is None: + if self.count == 1: + instance._values[self] = self._default_value() + else: + instance._values[self] = ValueList(parent=self) + else: + if self.count == 1: + value_ = self._parse_value(value) + self._check_value(value_) + else: + value_ = ValueList(parent=self) + for i, v in enumerate(value): + value_[i] = v + + instance._values[self] = value_ + + def __delete__(self, instance): + self.__set__(instance, None) + + def _parse_value(self, value): + raise NotImplementedError('Needs to be implemented in subclass') + + def _render_value(self, value): + raise NotImplementedError('Needs to be implemented in subclass') + + def _check_value(self, value): + with suppress(NotImplementedError): + self._render_value(value) + +class TypedField(Field, SubclassesMixin): + flat_length = 1 + + def __new__(cls, *args, **kwargs): + target_cls = None + fallback_cls = None + if 'type' in kwargs: + for subcls in cls._all_subclasses(): + if getattr(subcls, 'type', '') is None: + fallback_cls = subcls + if getattr(subcls, 'type', None) == kwargs['type']: + target_cls = subcls + break + if target_cls is None and fallback_cls is not None and issubclass(fallback_cls, cls): + target_cls = fallback_cls + retval = object.__new__(target_cls or cls) + return retval + + def __init__(self, type=None, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.type = type or getattr(self.__class__, 'type', None) + + +class DataElementField(TypedField): + pass + +class ContainerField(TypedField): + def _check_value(self, value): + if self.type: + if not isinstance(value, self.type): + raise TypeError("Value {!r} is not of type {!r}".format(value, self.type)) + super()._check_value(value) + + def _default_value(self): + return self.type() + + @property + def flat_length(self): + result = 0 + for name, field in self.type._fields.items(): + if field.count is None: + raise TypeError("Cannot compute flat length of field {}.{} with variable count".format(self.__class__.__name__, name)) + result = result + field.count * field.flat_length + return result + + +class DataElementGroupField(ContainerField): + pass + +class GenericField(DataElementField): + type = None + def _parse_value(self, value): + warnings.warn("Generic field used for type {!r} value {!r}".format(self.type, value)) + return value + +class GenericGroupField(DataElementGroupField): + type = None + + def _default_value(self): + if self.type is None: + return Container() + else: + return self.type() + + def _parse_value(self, value): + if self.type is None: + warnings.warn("Generic field used for type {!r} value {!r}".format(self.type, value)) + return value + +class TextField(DataElementField): + type = 'txt' + + def _parse_value(self, value): return str(value) + +class AlphanumericField(TextField): + type = 'an' + +class DTAUSField(DataElementField): + type = 'dta' + +class NumericField(DataElementField): + type = 'num' + + def _parse_value(self, value): + _value = str(value) + if len(_value) > 1 and _value[0] == '0': + raise TypeError("Leading zeroes not allowed for value of type 'num': {!r}".format(value)) + return int(_value, 10) + +class DigitsField(DataElementField): + type = 'dig' + + def _parse_value(self, value): + _value = str(value) + if not re.match(r'^\d*$', _value): + raise TypeError("Only digits allowed for value of type 'dig': {!r}".format(value)) + return _value + +class FloatField(DataElementField): + type = 'float' + +class BinaryField(DataElementField): + type = 'bin' + + def _parse_value(self, value): return bytes(value) + + +class SegmentSequence: + def __init__(self, segments = None): + if isinstance(segments, bytes): + from .parser import FinTS3Parser + parser = FinTS3Parser() + data = parser.explode_segments(segments) + segments = [parser.parse_segment(segment) for segment in data] + self.segments = segments or [] + + def __repr__(self): + return "{}({!r})".format(self.__class__.__name__, self.segments) + + def print_nested(self, stream=None, level=0, indent=" ", prefix="", first_level_indent=True, trailer=""): + import sys + stream = stream or sys.stdout + stream.write( + ( (prefix + level*indent) if first_level_indent else "") + + "{}([".format(self.__class__.__name__) + "\n" + ) + for segment in self.segments: + segment.print_nested(stream=stream, level=level+1, indent=indent, prefix=prefix, first_level_indent=True, trailer=",") + stream.write( (prefix + level*indent) + "]){}\n".format(trailer) ) + +class SegmentSequenceField(DataElementField): + type = 'sf' + + def _parse_value(self, value): + if isinstance(value, SegmentSequence): + return value + else: + return SegmentSequence(value) + + +class ContainerMeta(type): + @classmethod + def __prepare__(metacls, name, bases): + return OrderedDict() + + def __new__(cls, name, bases, classdict): + retval = super().__new__(cls, name, bases, classdict) + retval._fields = OrderedDict() + for supercls in reversed(bases): + retval._fields.update((k,v) for (k,v) in supercls.__dict__.items() if isinstance(v, Field)) + retval._fields.update((k,v) for (k,v) in classdict.items() if isinstance(v, Field)) + return retval + +class Container(metaclass=ContainerMeta): + def __init__(self, *args, **kwargs): + init_values = OrderedDict() + + additional_data = kwargs.pop("_additional_data", []) + + for init_value, field_name in zip(args, self._fields): + init_values[field_name] = init_value + args = () + + for field_name in self._fields: + if field_name in kwargs: + if field_name in init_values: + raise TypeError("__init__() got multiple values for argument {}".format(field_name)) + init_values[field_name] = kwargs.pop(field_name) + + super().__init__(*args, **kwargs) + self._values = {} + self._additional_data = additional_data + + for k,v in init_values.items(): + setattr(self, k, v) + + @classmethod + def naive_parse(cls, data): + retval = cls() + for ((name, field), value) in zip(retval._fields.items(), data): + setattr(retval, name, value) + return retval + + def __repr__(self): + return "{}{}({})".format( + "{}.".format(self.__class__.__module__), + self.__class__.__name__, + ", ".join( + "{}={!r}".format(name, getattr(self, name)) + for name in list(self._fields.keys())+( ['_additional_data'] if self._additional_data else [] ) + ) + ) + + def print_nested(self, stream=None, level=0, indent=" ", prefix="", first_level_indent=True, trailer=""): + import sys + stream = stream or sys.stdout + + stream.write( + ( (prefix + level*indent) if first_level_indent else "") + + "{}.{}(".format(self.__class__.__module__, self.__class__.__name__) + "\n" + ) + for name, field in self._fields.items(): + val = getattr(self, name) + if not hasattr( getattr(val, 'print_nested', None), '__call__'): + stream.write( + (prefix + (level+1)*indent) + "{} = {!r},\n".format(name, val) + ) + else: + stream.write( + (prefix + (level+1)*indent) + "{} = ".format(name) + ) + val.print_nested(stream=stream, level=level+2, indent=indent, prefix=prefix, first_level_indent=False, trailer=",") + if self._additional_data: + stream.write( + (prefix + (level+1)*indent) + "_additional_data=\n" + + (prefix + (level+2)*indent) + "{!r},\n".format(self._additional_data) + ) + stream.write( (prefix + level*indent) + "){}\n".format(trailer) ) + + +class DataElementGroup(Container): + pass + +class SegmentHeader(DataElementGroup): + type = AlphanumericField(max_length=6) + number = NumericField(max_length=3) + version = NumericField(max_length=3) + reference = NumericField(max_length=3, required=False) + +class ReferenceMessage(DataElementGroup): + dialogue_id = DataElementField(type='id') + message_number = NumericField(max_length=4) + +class SecurityProfile(DataElementGroup): + security_method = DataElementField(type='code', length=3) + security_method_version = DataElementField(type='num') + +class SecurityIdentificationDetails(DataElementGroup): + name_party = DataElementField(type='code', max_length=3) + cid = DataElementField(type='bin', max_length=256) + identifier_party = DataElementField(type='id') + +class SecurityDateTime(DataElementGroup): + datetime_type = DataElementField(type='code', max_length=3) + date = DataElementField(type='dat') + time = DataElementField(type='tim') + +class EncryptionAlgorithm(DataElementGroup): + usage_encryption = DataElementField(type='code', max_length=3) + operation_mode = DataElementField(type='code', max_length=3) + encryption_algorithm = DataElementField(type='code', max_length=3) + algorithm_parameter_value = DataElementField(type='bin', max_length=512) + algorithm_parameter_name = DataElementField(type='code', max_length=3) + algorithm_parameter_iv_name = DataElementField(type='code', max_length=3) + algorithm_parameter_iv_value = DataElementField(type='bin', max_length=512) + +class HashAlgorithm(DataElementGroup): + usage_hash = DataElementField(type='code', max_length=3) + hash_algorithm = DataElementField(type='code', max_length=3) + algorithm_parameter_name = DataElementField(type='code', max_length=3) + algorithm_parameter_value = DataElementField(type='bin', max_length=512) + +class SignatureAlgorithm(DataElementGroup): + usage_signature = DataElementField(type='code', max_length=3) + signature_algorithm = DataElementField(type='code', max_length=3) + operation_mode = DataElementField(type='code', max_length=3) + +class BankIdentifier(DataElementGroup): + country_identifier = DataElementField(type='ctr') + bank_code = DataElementField(type='an', max_length=30) + +class KeyName(DataElementGroup): + bank_identifier = DataElementGroupField(type=BankIdentifier) + user_id = DataElementField(type='id') + key_type = DataElementField(type='code', length=1) + key_number = DataElementField(type='num', max_length=3) + key_version = DataElementField(type='num', max_length=3) + +class Certificate(DataElementGroup): + certificate_type = DataElementField(type='code') + certificate_content = DataElementField(type='bin', max_length=4096) diff --git a/fints/message.py b/fints/message.py index 1c570cd..7ca1037 100644 --- a/fints/message.py +++ b/fints/message.py @@ -4,138 +4,8 @@ import re from fints.models import TANMethod1, TANMethod2, TANMethod3, TANMethod4, TANMethod5, TANMethod6 from .segments.message import HNHBK, HNHBS, HNSHA, HNSHK, HNVSD, HNVSK +from .parser import FinTS3Parser -TOKEN_RE = re.compile(rb""" - ^(?: (?: \? (?P.) ) - | (?P[^?:+@']+) - | (?P[+:']) - | (?: @ (?P[0-9]+) @ ) - )""", re.X | re.S) - -class Token(Enum): - EOF = 'eof' - CHAR = 'char' - BINARY = 'bin' - PLUS = '+' - COLON = ':' - APOSTROPHE = "'" - -class ParserState: - def __init__(self, data: bytes, start=0, end=None, encoding='iso-8859-1'): - self._token = None - self._value = None - self._encoding = encoding - self._tokenizer = iter(self._tokenize(data, start, end or len(data), encoding)) - - def peek(self): - if not self._token: - self._token, self._value = next(self._tokenizer) - return self._token - - def consume(self, token=None): - self.peek() - if token and token != self._token: - raise ValueError - self._token = None - return self._value - - @staticmethod - def _tokenize(data, start, end, encoding): - pos = start - unclaimed = [] - last_was = None - - while pos < end: - match = TOKEN_RE.match(data[pos:end]) - if match: - pos += match.end() - d = match.groupdict() - if d['ECHAR'] is not None: - unclaimed.append(d['ECHAR']) - elif d['CHAR'] is not None: - unclaimed.append(d['CHAR']) - else: - if unclaimed: - if last_was in (Token.BINARY, Token.CHAR): - raise ValueError - yield Token.CHAR, b''.join(unclaimed).decode(encoding) - unclaimed.clear() - last_was = Token.CHAR - - if d['TOK'] is not None: - token = Token(d['TOK'].decode('us-ascii')) - yield token, d['TOK'] - last_was = token - elif d['BINLEN'] is not None: - blen = int(d['BINLEN'].decode('us-ascii'), 10) - if last_was in (Token.BINARY, Token.CHAR): - raise ValueError - yield Token.BINARY, data[pos:pos+blen] - pos += blen - last_was = Token.BINARY - else: - raise ValueError - else: - raise ValueError - - if unclaimed: - if last_was in (Token.BINARY, Token.CHAR): - raise ValueError - yield Token.CHAR, b''.join(unclaimed).decode(encoding) - unclaimed.clear() - last_was = Token.CHAR - - yield Token.EOF, b'' - - -class FinTSMessageBase: - def __init__(self, *segments): - self.segments = [] - for segment in segments: - self.add_segment(segment) - - def add_segment(self, segment): - self.segments.append(segment) - - @classmethod - def parse(cls, data: bytes, start=0, end=None): - return cls(*cls.parse_segments(data, start, end)) - - @classmethod - def parse_segments(cls, data: bytes, start=0, end=None): - segments = [] - - parser = ParserState(data, start, end) - - while parser.peek() != Token.EOF: - segment = [] - while parser.peek() not in (Token.APOSTROPHE, Token.EOF): - data = None - deg = [] - while parser.peek() in (Token.BINARY, Token.CHAR, Token.COLON): - if parser.peek() in (Token.BINARY, Token.CHAR): - data = parser.consume() - - elif parser.peek() == Token.COLON: - deg.append(data) - data = None - parser.consume(Token.COLON) - - if data and deg: - deg.append(data) - data = deg - - segment.append(data) - if parser.peek() == Token.PLUS: - parser.consume(Token.PLUS) - - parser.consume(Token.APOSTROPHE) - segments.append(segment) - - parser.consume(Token.EOF) - - return segments - class FinTSMessage: def __init__(self, blz, username, pin, systemid, dialogid, msgno, encrypted_segments, tan_mechs=None, tan=None): @@ -194,13 +64,13 @@ class FinTSMessage: return str(self.build_header()) + ''.join([str(s) for s in self.segments]) -class FinTSResponse(FinTSMessageBase): +class FinTSResponse: def __init__(self, data): - self.segments = self.parse_segments(data) + self.segments = FinTS3Parser.explode_segments(data) self.payload = self.segments for seg in self.segments: if seg[0][0] == 'HNVSD': - self.payload = self.parse_segments(seg[1]) + self.payload = FinTS3Parser.explode_segments(seg[1]) def __str__(self): return str(self.payload) diff --git a/fints/parser.py b/fints/parser.py new file mode 100644 index 0000000..6886f67 --- /dev/null +++ b/fints/parser.py @@ -0,0 +1,218 @@ +from enum import Enum +from collections import Iterable +from contextlib import suppress +import re +from .segments import FinTS3Segment +from .formals import DataElementField, DataElementGroupField, SegmentSequence + +# +# FinTS 3.0 structure: +# Message := ( Segment "'" )+ +# Segment := ( DEG "+" )+ +# DEG := ( ( DE | DEG ) ":")+ +# +# First DEG in segment is segment header +# Many DEG (Data Element Group) on Segment level are just DE (Data Element), +# Recursion DEG -> DEG must be limited, since no other separator characters +# are available. In general, a second order DEG must have fixed length but +# may have a variable repeat count if it is at the end of the segment +# + +TOKEN_RE = re.compile(rb""" + ^(?: (?: \? (?P.) ) + | (?P[^?:+@']+) + | (?P[+:']) + | (?: @ (?P[0-9]+) @ ) + )""", re.X | re.S) + +class Token(Enum): + EOF = 'eof' + CHAR = 'char' + BINARY = 'bin' + PLUS = '+' + COLON = ':' + APOSTROPHE = "'" + +class ParserState: + def __init__(self, data: bytes, start=0, end=None, encoding='iso-8859-1'): + self._token = None + self._value = None + self._encoding = encoding + self._tokenizer = iter(self._tokenize(data, start, end or len(data), encoding)) + + def peek(self): + if not self._token: + self._token, self._value = next(self._tokenizer) + return self._token + + def consume(self, token=None): + self.peek() + if token and token != self._token: + raise ValueError + self._token = None + return self._value + + @staticmethod + def _tokenize(data, start, end, encoding): + pos = start + unclaimed = [] + last_was = None + + while pos < end: + match = TOKEN_RE.match(data[pos:end]) + if match: + pos += match.end() + d = match.groupdict() + if d['ECHAR'] is not None: + unclaimed.append(d['ECHAR']) + elif d['CHAR'] is not None: + unclaimed.append(d['CHAR']) + else: + if unclaimed: + if last_was in (Token.BINARY, Token.CHAR): + raise ValueError + yield Token.CHAR, b''.join(unclaimed).decode(encoding) + unclaimed.clear() + last_was = Token.CHAR + + if d['TOK'] is not None: + token = Token(d['TOK'].decode('us-ascii')) + yield token, d['TOK'] + last_was = token + elif d['BINLEN'] is not None: + blen = int(d['BINLEN'].decode('us-ascii'), 10) + if last_was in (Token.BINARY, Token.CHAR): + raise ValueError + yield Token.BINARY, data[pos:pos+blen] + pos += blen + last_was = Token.BINARY + else: + raise ValueError + else: + raise ValueError + + if unclaimed: + if last_was in (Token.BINARY, Token.CHAR): + raise ValueError + yield Token.CHAR, b''.join(unclaimed).decode(encoding) + unclaimed.clear() + last_was = Token.CHAR + + yield Token.EOF, b'' + +class FinTS3Parser: + def parse_message(self, data): + if isinstance(data, bytes): + data = self.explode_segments(data) + + message = SegmentSequence() + for segment in data: + seg = self.parse_segment(segment) + message.segments.append(seg) + return message + + def parse_segment(self, segment): + clazz = FinTS3Segment.find_subclass(segment) + seg = clazz() + + data = iter(segment) + for name, field in seg._fields.items(): + try: + val = next(data) + except StopIteration: + pass + else: + deg = self.parse_n_deg(field, val) + setattr(seg, name, deg) + seg._additional_data = list(data) + + return seg + + def parse_n_deg(self, field, data): + if not isinstance(data, Iterable) or isinstance(data, (str, bytes)): + data = [data] + + data_i = iter(data) + field_index = 0 + field_length = field.flat_length + + retval = [] + eod = False + + while not eod: + vals = [] + try: + for x in range(field_length): + vals.append(next(data_i)) + except StopIteration: + eod = True + + if field.count == 1: + if isinstance(field, DataElementField): + if not len(vals): + return + return vals[0] + elif isinstance(field, DataElementGroupField): + return self.parse_deg(field.type, vals) + else: + raise Error("Internal error") + break + + if field_index >= (field.count if field.count is not None else len(data) // field_length): + break + + if isinstance(field, DataElementField): + retval.append(vals[0] if len(vals) else None) + elif isinstance(field, DataElementGroupField): + retval.append(self.parse_deg(field.type, vals)) + else: + raise Error("Internal error") + + return retval + + def parse_deg(self, clazz, vals): + retval = clazz() + + data_i = iter(vals) + for name, field in retval._fields.items(): + deg = self.parse_n_deg(field, data_i) + setattr(retval, name, deg) + + return retval + + + @staticmethod + def explode_segments(data: bytes, start=0, end=None): + segments = [] + + parser = ParserState(data, start, end) + + while parser.peek() != Token.EOF: + segment = [] + while parser.peek() not in (Token.APOSTROPHE, Token.EOF): + data = None + deg = [] + while parser.peek() in (Token.BINARY, Token.CHAR, Token.COLON): + if parser.peek() in (Token.BINARY, Token.CHAR): + data = parser.consume() + + elif parser.peek() == Token.COLON: + deg.append(data) + data = None + parser.consume(Token.COLON) + + if data and deg: + deg.append(data) + data = deg + + segment.append(data) + if parser.peek() == Token.PLUS: + parser.consume(Token.PLUS) + + parser.consume(Token.APOSTROPHE) + segments.append(segment) + + parser.consume(Token.EOF) + + return segments + diff --git a/fints/segments/__init__.py b/fints/segments/__init__.py index 8692601..014f5f8 100644 --- a/fints/segments/__init__.py +++ b/fints/segments/__init__.py @@ -1,14 +1,87 @@ -class FinTS3Segment: - type = '???' - country_code = 280 - version = 2 - - def __init__(self, segmentno, data): - self.segmentno = segmentno - self.data = data - - def __str__(self): - res = '{}:{}:{}'.format(self.type, self.segmentno, self.version) - for d in self.data: - res += '+' + str(d) - return res + "'" +import re + +from fints.formals import Container, SegmentHeader, DataElementGroupField, DataElementField, ReferenceMessage, SegmentSequenceField, SecurityProfile, SecurityIdentificationDetails, SecurityDateTime, EncryptionAlgorithm, KeyName, Certificate, HashAlgorithm, SignatureAlgorithm + +from fints.utils import classproperty, SubclassesMixin + +TYPE_VERSION_RE = re.compile(r'^([A-Z]+)(\d+)$') + +class FinTS3Segment(Container, SubclassesMixin): + header = DataElementGroupField(type=SegmentHeader) + + @classproperty + def TYPE(cls): + match = TYPE_VERSION_RE.match(cls.__name__) + if match: + return match.group(1) + + @classproperty + def VERSION(cls): + match = TYPE_VERSION_RE.match(cls.__name__) + if match: + return int( match.group(2) ) + + def __init__(self, *args, **kwargs): + if 'header' not in kwargs: + kwargs['header'] = None + + args = (kwargs.pop('header'), ) + args + + return super().__init__(*args, **kwargs) + + @classmethod + def find_subclass(cls, segment): + h = SegmentHeader.naive_parse(segment[0]) + target_cls = None + + for possible_cls in cls._all_subclasses(): + if getattr(possible_cls, 'TYPE', None) == h.type and getattr(possible_cls, 'VERSION', None) == h.version: + target_cls = possible_cls + + if not target_cls: + target_cls = cls + + return target_cls + +class HNHBK3(FinTS3Segment): + message_size = DataElementField(type='dig', length=12) + hbci_version = DataElementField(type='num', max_length=3) + dialogue_id = DataElementField(type='id') + message_number = DataElementField(type='num', max_length=4) + reference_message = DataElementGroupField(type=ReferenceMessage, required=False) + +class HNHBS1(FinTS3Segment): + message_number = DataElementField(type='num', max_length=4) + + +class HNVSD1(FinTS3Segment): + data = SegmentSequenceField() + +class HNVSK3(FinTS3Segment): + security_profile = DataElementGroupField(type=SecurityProfile) + security_function = DataElementField(type='code', max_length=3) + security_role = DataElementField(type='code', max_length=3) + security_identification_details = DataElementGroupField(type=SecurityIdentificationDetails) + security_datetime = DataElementGroupField(type=SecurityDateTime) + encryption_algorithm = DataElementGroupField(type=EncryptionAlgorithm) + key_name = DataElementGroupField(type=KeyName) + compression_function = DataElementField(type='code', max_length=3) + certificate = DataElementGroupField(type=Certificate) + +class HNSHK4(FinTS3Segment): + security_profile = DataElementGroupField(type=SecurityProfile) + security_function = DataElementField(type='code', max_length=3) + security_reference = DataElementField(type='an', max_length=14) + security_application_area = DataElementField(type='code', max_length=3) + security_role = DataElementField(type='code', max_length=3) + security_identification_details = DataElementGroupField(type=SecurityIdentificationDetails) + security_reference_number = DataElementField(type='num', max_length=16) + security_datetime = DataElementGroupField(type=SecurityDateTime) + hash_algorithm = DataElementGroupField(type=HashAlgorithm) + signature_algorithm = DataElementGroupField(type=SignatureAlgorithm) + key_name = DataElementGroupField(type=KeyName) + certificate = DataElementGroupField(type=Certificate) + + + + diff --git a/fints/utils.py b/fints/utils.py index c6fce95..50fdfc7 100644 --- a/fints/utils.py +++ b/fints/utils.py @@ -40,6 +40,23 @@ def split_for_data_elements(deg): return re.split(':(?