A message is a sequence of segments. The :class:`~fints.formals.SegmentSequence` object allows searching for segments by type and version, by default recursing into nested sequences.
-.. autoclass:: fints.formals.SegmentSequence
+.. autoclass:: fints.types.SegmentSequence
:members:
:undoc-members: print_nested
.. automodule:: fints.segments
:members:
:inherited-members:
+ :undoc-members:
+ :show-inheritance:
:exclude-members: print_nested
:member-order: bysource
+Defining new Segment classes
+----------------------------
+
Field types
-___________
+~~~~~~~~~~~
+
+.. automodule:: fints.fields
+ :members:
+ :undoc-members:
+ :exclude-members: print_nested
+ :member-order: bysource
+
+
+Constructed and helper types
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. automodule:: fints.formals
:members:
:undoc-members:
:exclude-members: print_nested
:member-order: bysource
+
+
--- /dev/null
+import re
+import warnings
+from contextlib import suppress
+
+import fints.types
+from fints.utils import Password, SubclassesMixin
+
+
+class Field:
+ def __init__(self, length=None, min_length=None, max_length=None, count=None, min_count=None, max_count=None, required=True, _d=None):
+ if length is not None and (min_length is not None or max_length is not None):
+ raise ValueError("May not specify both 'length' AND 'min_length'/'max_length'")
+ if count is not None and (min_count is not None or max_count is not None):
+ raise ValueError("May not specify both 'count' AND 'min_count'/'max_count'")
+
+ self.length = length
+ self.min_length = min_length
+ self.max_length = max_length
+ self.count = count
+ self.min_count = min_count
+ self.max_count = max_count
+ self.required = required
+
+ if not self.count and not self.min_count and not self.max_count:
+ self.count = 1
+
+ self.__doc__ = _d
+
+ def _default_value(self):
+ return None
+
+ def __get__(self, instance, owner):
+ if self not in instance._values:
+ self.__set__(instance, None)
+
+ return instance._values[self]
+
+ def __set__(self, instance, value):
+ if value is None:
+ if self.count == 1:
+ instance._values[self] = self._default_value()
+ else:
+ instance._values[self] = fints.types.ValueList(parent=self)
+ else:
+ if self.count == 1:
+ value_ = self._parse_value(value)
+ self._check_value(value_)
+ else:
+ value_ = fints.types.ValueList(parent=self)
+ for i, v in enumerate(value):
+ value_[i] = v
+
+ instance._values[self] = value_
+
+ def __delete__(self, instance):
+ self.__set__(instance, None)
+
+ def _parse_value(self, value):
+ raise NotImplementedError('Needs to be implemented in subclass')
+
+ def _render_value(self, value):
+ raise NotImplementedError('Needs to be implemented in subclass')
+
+ def _check_value(self, value):
+ with suppress(NotImplementedError):
+ self._render_value(value)
+
+ def _check_value_length(self, value):
+ if self.max_length is not None and len(value) > self.max_length:
+ raise ValueError("Value {!r} cannot be rendered: max_length={} exceeded".format(value, self.max_length))
+
+ if self.min_length is not None and len(value) < self.min_length:
+ raise ValueError("Value {!r} cannot be rendered: min_length={} not reached".format(value, self.min_length))
+
+ if self.length is not None and len(value) != self.length:
+ raise ValueError("Value {!r} cannot be rendered: length={} not satisfied".format(value, self.length))
+
+ def render(self, value):
+ if value is None:
+ return None
+
+ return self._render_value(value)
+
+class TypedField(Field, SubclassesMixin):
+ flat_length = 1
+
+ def __new__(cls, *args, **kwargs):
+ target_cls = None
+ fallback_cls = None
+ for subcls in cls._all_subclasses():
+ if getattr(subcls, 'type', '') is None:
+ fallback_cls = subcls
+ if getattr(subcls, 'type', None) == kwargs.get('type', None):
+ target_cls = subcls
+ break
+ if target_cls is None and fallback_cls is not None and issubclass(fallback_cls, cls):
+ target_cls = fallback_cls
+ retval = object.__new__(target_cls or cls)
+ return retval
+
+ def __init__(self, type=None, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ self.type = type or getattr(self.__class__, 'type', None)
+
+
+class DocTypeMixin:
+ _DOC_TYPE = None
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ type_ = self._DOC_TYPE
+ if type_ is None:
+ if isinstance(getattr(self, 'type', None), type):
+ type_ = getattr(self, 'type')
+
+ if type_ is not None:
+ if not self.__doc__:
+ self.__doc__ = ""
+
+ name = type_.__name__
+ if type_.__module__ != 'builtins':
+ name = "{}.{}".format(type_.__module__, name)
+
+ self.__doc__ = self.__doc__ + "\n\n:type: :class:`{}`".format(name)
+
+class DataElementField(DocTypeMixin, TypedField):
+ pass
+
+class FieldRenderFormatStringMixin:
+ _FORMAT_STRING = None
+
+ def _render_value(self, value):
+ retval = self._FORMAT_STRING.format(value)
+ self._check_value_length(retval)
+
+ return retval
+
+class ContainerField(TypedField):
+ def _check_value(self, value):
+ if self.type:
+ if not isinstance(value, self.type):
+ raise TypeError("Value {!r} is not of type {!r}".format(value, self.type))
+ super()._check_value(value)
+
+ def _default_value(self):
+ return self.type()
+
+ @property
+ def flat_length(self):
+ result = 0
+ for name, field in self.type._fields.items():
+ if field.count is None:
+ raise TypeError("Cannot compute flat length of field {}.{} with variable count".format(self.__class__.__name__, name))
+ result = result + field.count * field.flat_length
+ return result
+
+
+class DataElementGroupField(DocTypeMixin, ContainerField):
+ pass
+
+class GenericField(FieldRenderFormatStringMixin, DataElementField):
+ type = None
+ _FORMAT_STRING = "{}"
+
+ def _parse_value(self, value):
+ warnings.warn("Generic field used for type {!r} value {!r}".format(self.type, value))
+ return value
+
+class GenericGroupField(DataElementGroupField):
+ type = None
+
+ def _default_value(self):
+ if self.type is None:
+ return fints.types.Container()
+ else:
+ return self.type()
+
+ def _parse_value(self, value):
+ if self.type is None:
+ warnings.warn("Generic field used for type {!r} value {!r}".format(self.type, value))
+ return value
+
+class TextField(FieldRenderFormatStringMixin, DataElementField):
+ type = 'txt'
+ _DOC_TYPE = str
+ _FORMAT_STRING = "{}" ## FIXME Restrict CRLF
+
+ def _parse_value(self, value): return str(value)
+
+class AlphanumericField(TextField):
+ type = 'an'
+
+class DTAUSField(DataElementField):
+ type = 'dta'
+
+class NumericField(FieldRenderFormatStringMixin, DataElementField):
+ type = 'num'
+ _DOC_TYPE = int
+ _FORMAT_STRING = "{:d}"
+
+ def _parse_value(self, value):
+ _value = str(value)
+ if len(_value) > 1 and _value[0] == '0':
+ raise TypeError("Leading zeroes not allowed for value of type 'num': {!r}".format(value))
+ return int(_value, 10)
+
+class DigitsField(FieldRenderFormatStringMixin, DataElementField):
+ type = 'dig'
+ _DOC_TYPE = str
+ _FORMAT_STRING = "{}"
+
+ def _parse_value(self, value):
+ _value = str(value)
+ if not re.match(r'^\d*$', _value):
+ raise TypeError("Only digits allowed for value of type 'dig': {!r}".format(value))
+ return _value
+
+class FloatField(FieldRenderFormatStringMixin, DataElementField):
+ type = 'float'
+ ## FIXME: Not implemented, no one uses this?
+
+class BinaryField(DataElementField):
+ type = 'bin'
+ _DOC_TYPE = bytes
+
+ def _render_value(self, value):
+ retval = bytes(value)
+ self._check_value_length(retval)
+
+ return retval
+
+ def _parse_value(self, value): return bytes(value)
+
+class FixedLengthMixin:
+ _FIXED_LENGTH = [None, None, None]
+ _DOC_TYPE = str
+
+ def __init__(self, *args, **kwargs):
+ for i, a in enumerate(('length', 'min_length', 'max_length')):
+ kwargs[a] = self._FIXED_LENGTH[i] if len(self._FIXED_LENGTH) > i else None
+
+ super().__init__(*args, **kwargs)
+
+class IDField(FixedLengthMixin, AlphanumericField):
+ type = 'id'
+ _DOC_TYPE = str
+ _FIXED_LENGTH = [None, None, 30]
+
+class BooleanField(FixedLengthMixin, AlphanumericField):
+ type = 'jn'
+ _DOC_TYPE = bool
+ _FIXED_LENGTH = [1]
+
+ def _render_value(self, value):
+ return "J" if value else "N"
+
+ def _parse_value(self, value):
+ if value is None:
+ return None
+ if value == "J":
+ return True
+ elif value == "N":
+ return False
+ else:
+ raise ValueError("Invalid value {!r} for BooleanField".format(value))
+
+class CodeField(AlphanumericField):
+ type = 'code'
+ _DOC_TYPE = str
+
+ ## FIXME: Not further implemented, might want to use Enums
+
+class CountryField(FixedLengthMixin, DigitsField):
+ type = 'ctr'
+ _FIXED_LENGTH = [3]
+
+class CurrencyField(FixedLengthMixin, AlphanumericField):
+ type = 'cur'
+ _FIXED_LENGTH = [3]
+
+class DateField(FixedLengthMixin, NumericField):
+ type = 'dat'
+ _FIXED_LENGTH = [8]
+
+class TimeField(FixedLengthMixin, DigitsField):
+ type = 'tim'
+ _FIXED_LENGTH = [6]
+
+class PasswordField(AlphanumericField):
+ type = ''
+ _DOC_TYPE = Password
+
+ def _parse_value(self, value):
+ return Password(value)
+
+ def _render_value(self, value):
+ return str(value)
+
+class SegmentSequenceField(DataElementField):
+ type = 'sf'
+
+ def _parse_value(self, value):
+ if isinstance(value, fints.types.SegmentSequence):
+ return value
+ else:
+ return fints.types.SegmentSequence(value)
+
+ def _render_value(self, value):
+ return value.render_bytes()
import re
-import warnings
-from collections import Iterable, OrderedDict
-from contextlib import suppress
-
-from fints.utils import Password, SubclassesMixin
-
-
-class ValueList:
- def __init__(self, parent):
- self._parent = parent
- self._data = []
-
- def __getitem__(self, i):
- if i >= len(self._data):
- self.__setitem__(i, None)
- if i < 0:
- raise IndexError("Cannot access negative index")
- return self._data[i]
-
- def __setitem__(self, i, value):
- if i < 0:
- raise IndexError("Cannot access negative index")
-
- if self._parent.count is not None:
- if i >= self._parent.count:
- raise IndexError("Cannot access index {} beyond count {}".format(i, self._parent.count))
- elif self._parent.max_count is not None:
- if i >= self._parent.max_count:
- raise IndexError("Cannot access index {} beyound max_count {}".format(i, self._parent.max_count))
-
- for x in range(len(self._data), i):
- self.__setitem__(x, None)
-
- if value is None:
- value = self._parent._default_value()
- else:
- value = self._parent._parse_value(value)
- self._parent._check_value(value)
-
- if i == len(self._data):
- self._data.append(value)
- else:
- self._data[i] = value
-
- def __delitem__(self, i):
- self.__setitem__(i, None)
-
- def __len__(self):
- if self._parent.count is not None:
- return self._parent.count
- else:
- retval = 0
- for i, val in enumerate(self._data):
- if isinstance(val, Container):
- if val.is_unset():
- continue
- elif val is None:
- continue
- retval = i+1
- if self._parent.min_count is not None:
- if self._parent.min_count > retval:
- retval = self._parent.min_count
- return retval
-
- def __iter__(self):
- for i in range(len(self)):
- yield self[i]
- def __repr__(self):
- return "{!r}".format(list(self))
-
- def print_nested(self, stream=None, level=0, indent=" ", prefix="", first_level_indent=True, trailer=""):
- import sys
- stream = stream or sys.stdout
-
- stream.write(
- ( (prefix + level*indent) if first_level_indent else "")
- + "[\n"
- )
- for val in self:
- if not hasattr( getattr(val, 'print_nested', None), '__call__'):
- stream.write(
- (prefix + (level+1)*indent) + "{!r},\n".format(val)
- )
- else:
- val.print_nested(stream=stream, level=level+2, indent=indent, prefix=prefix, trailer=",")
- stream.write( (prefix + level*indent) + "]{}\n".format(trailer) )
-
-class Field:
- def __init__(self, length=None, min_length=None, max_length=None, count=None, min_count=None, max_count=None, required=True, _d=None):
- if length is not None and (min_length is not None or max_length is not None):
- raise ValueError("May not specify both 'length' AND 'min_length'/'max_length'")
- if count is not None and (min_count is not None or max_count is not None):
- raise ValueError("May not specify both 'count' AND 'min_count'/'max_count'")
-
- self.length = length
- self.min_length = min_length
- self.max_length = max_length
- self.count = count
- self.min_count = min_count
- self.max_count = max_count
- self.required = required
-
- if not self.count and not self.min_count and not self.max_count:
- self.count = 1
-
- self.__doc__ = _d
-
- def _default_value(self):
- return None
-
- def __get__(self, instance, owner):
- if self not in instance._values:
- self.__set__(instance, None)
-
- return instance._values[self]
-
- def __set__(self, instance, value):
- if value is None:
- if self.count == 1:
- instance._values[self] = self._default_value()
- else:
- instance._values[self] = ValueList(parent=self)
- else:
- if self.count == 1:
- value_ = self._parse_value(value)
- self._check_value(value_)
- else:
- value_ = ValueList(parent=self)
- for i, v in enumerate(value):
- value_[i] = v
-
- instance._values[self] = value_
-
- def __delete__(self, instance):
- self.__set__(instance, None)
-
- def _parse_value(self, value):
- raise NotImplementedError('Needs to be implemented in subclass')
-
- def _render_value(self, value):
- raise NotImplementedError('Needs to be implemented in subclass')
-
- def _check_value(self, value):
- with suppress(NotImplementedError):
- self._render_value(value)
-
- def _check_value_length(self, value):
- if self.max_length is not None and len(value) > self.max_length:
- raise ValueError("Value {!r} cannot be rendered: max_length={} exceeded".format(value, self.max_length))
-
- if self.min_length is not None and len(value) < self.min_length:
- raise ValueError("Value {!r} cannot be rendered: min_length={} not reached".format(value, self.min_length))
-
- if self.length is not None and len(value) != self.length:
- raise ValueError("Value {!r} cannot be rendered: length={} not satisfied".format(value, self.length))
-
- def render(self, value):
- if value is None:
- return None
-
- return self._render_value(value)
-
-class TypedField(Field, SubclassesMixin):
- flat_length = 1
-
- def __new__(cls, *args, **kwargs):
- target_cls = None
- fallback_cls = None
- for subcls in cls._all_subclasses():
- if getattr(subcls, 'type', '') is None:
- fallback_cls = subcls
- if getattr(subcls, 'type', None) == kwargs.get('type', None):
- target_cls = subcls
- break
- if target_cls is None and fallback_cls is not None and issubclass(fallback_cls, cls):
- target_cls = fallback_cls
- retval = object.__new__(target_cls or cls)
- return retval
-
- def __init__(self, type=None, *args, **kwargs):
- super().__init__(*args, **kwargs)
-
- self.type = type or getattr(self.__class__, 'type', None)
-
-
-class DocTypeMixin:
- _DOC_TYPE = None
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
-
- type_ = self._DOC_TYPE
- if type_ is None:
- if isinstance(getattr(self, 'type', None), type):
- type_ = getattr(self, 'type')
-
- if type_ is not None:
- if not self.__doc__:
- self.__doc__ = ""
-
- name = type_.__name__
- if type_.__module__ != 'builtins':
- name = "{}.{}".format(type_.__module__, name)
-
- self.__doc__ = self.__doc__ + "\n\n:type: :class:`{}`".format(name)
-
-class DataElementField(DocTypeMixin, TypedField):
- pass
-
-class FieldRenderFormatStringMixin:
- _FORMAT_STRING = None
-
- def _render_value(self, value):
- retval = self._FORMAT_STRING.format(value)
- self._check_value_length(retval)
-
- return retval
-
-class ContainerField(TypedField):
- def _check_value(self, value):
- if self.type:
- if not isinstance(value, self.type):
- raise TypeError("Value {!r} is not of type {!r}".format(value, self.type))
- super()._check_value(value)
-
- def _default_value(self):
- return self.type()
-
- @property
- def flat_length(self):
- result = 0
- for name, field in self.type._fields.items():
- if field.count is None:
- raise TypeError("Cannot compute flat length of field {}.{} with variable count".format(self.__class__.__name__, name))
- result = result + field.count * field.flat_length
- return result
-
-
-class DataElementGroupField(DocTypeMixin, ContainerField):
- pass
-
-class GenericField(FieldRenderFormatStringMixin, DataElementField):
- type = None
- _FORMAT_STRING = "{}"
-
- def _parse_value(self, value):
- warnings.warn("Generic field used for type {!r} value {!r}".format(self.type, value))
- return value
-
-class GenericGroupField(DataElementGroupField):
- type = None
-
- def _default_value(self):
- if self.type is None:
- return Container()
- else:
- return self.type()
-
- def _parse_value(self, value):
- if self.type is None:
- warnings.warn("Generic field used for type {!r} value {!r}".format(self.type, value))
- return value
-
-class TextField(FieldRenderFormatStringMixin, DataElementField):
- type = 'txt'
- _DOC_TYPE = str
- _FORMAT_STRING = "{}" ## FIXME Restrict CRLF
-
- def _parse_value(self, value): return str(value)
-
-class AlphanumericField(TextField):
- type = 'an'
-
-class DTAUSField(DataElementField):
- type = 'dta'
-
-class NumericField(FieldRenderFormatStringMixin, DataElementField):
- type = 'num'
- _DOC_TYPE = int
- _FORMAT_STRING = "{:d}"
-
- def _parse_value(self, value):
- _value = str(value)
- if len(_value) > 1 and _value[0] == '0':
- raise TypeError("Leading zeroes not allowed for value of type 'num': {!r}".format(value))
- return int(_value, 10)
-
-class DigitsField(FieldRenderFormatStringMixin, DataElementField):
- type = 'dig'
- _DOC_TYPE = str
- _FORMAT_STRING = "{}"
-
- def _parse_value(self, value):
- _value = str(value)
- if not re.match(r'^\d*$', _value):
- raise TypeError("Only digits allowed for value of type 'dig': {!r}".format(value))
- return _value
-
-class FloatField(FieldRenderFormatStringMixin, DataElementField):
- type = 'float'
- ## FIXME: Not implemented, no one uses this?
-
-class BinaryField(DataElementField):
- type = 'bin'
- _DOC_TYPE = bytes
-
- def _render_value(self, value):
- retval = bytes(value)
- self._check_value_length(retval)
-
- return retval
-
- def _parse_value(self, value): return bytes(value)
-
-class FixedLengthMixin:
- _FIXED_LENGTH = [None, None, None]
- _DOC_TYPE = str
-
- def __init__(self, *args, **kwargs):
- for i, a in enumerate(('length', 'min_length', 'max_length')):
- kwargs[a] = self._FIXED_LENGTH[i] if len(self._FIXED_LENGTH) > i else None
-
- super().__init__(*args, **kwargs)
-
-class IDField(FixedLengthMixin, AlphanumericField):
- type = 'id'
- _DOC_TYPE = str
- _FIXED_LENGTH = [None, None, 30]
-
-class BooleanField(FixedLengthMixin, AlphanumericField):
- type = 'jn'
- _DOC_TYPE = bool
- _FIXED_LENGTH = [1]
-
- def _render_value(self, value):
- return "J" if value else "N"
-
- def _parse_value(self, value):
- if value is None:
- return None
- if value == "J":
- return True
- elif value == "N":
- return False
- else:
- raise ValueError("Invalid value {!r} for BooleanField".format(value))
-
-class CodeField(AlphanumericField):
- type = 'code'
- _DOC_TYPE = str
-
- ## FIXME: Not further implemented, might want to use Enums
-
-class CountryField(FixedLengthMixin, DigitsField):
- type = 'ctr'
- _FIXED_LENGTH = [3]
-
-class CurrencyField(FixedLengthMixin, AlphanumericField):
- type = 'cur'
- _FIXED_LENGTH = [3]
-
-class DateField(FixedLengthMixin, NumericField):
- type = 'dat'
- _FIXED_LENGTH = [8]
-
-class TimeField(FixedLengthMixin, DigitsField):
- type = 'tim'
- _FIXED_LENGTH = [6]
-
-class PasswordField(AlphanumericField):
- type = ''
- _DOC_TYPE = Password
-
- def _parse_value(self, value):
- return Password(value)
+from fints.types import * # The order is important!
+from fints.fields import *
- def _render_value(self, value):
- return str(value)
-
-class SegmentSequence:
- """A sequence of FinTS3Segment objects"""
-
- def __init__(self, segments = None):
- if isinstance(segments, bytes):
- from .parser import FinTS3Parser
- parser = FinTS3Parser()
- data = parser.explode_segments(segments)
- segments = [parser.parse_segment(segment) for segment in data]
- self.segments = segments or []
-
- def render_bytes(self) -> bytes:
- from .parser import FinTS3Serializer
- return FinTS3Serializer().serialize_message(self)
-
- def __repr__(self):
- return "{}({!r})".format(self.__class__.__name__, self.segments)
-
- def print_nested(self, stream=None, level=0, indent=" ", prefix="", first_level_indent=True, trailer=""):
- import sys
- stream = stream or sys.stdout
- stream.write(
- ( (prefix + level*indent) if first_level_indent else "")
- + "{}([".format(self.__class__.__name__) + "\n"
- )
- for segment in self.segments:
- segment.print_nested(stream=stream, level=level+1, indent=indent, prefix=prefix, first_level_indent=True, trailer=",")
- stream.write( (prefix + level*indent) + "]){}\n".format(trailer) )
-
- def find_segments(self, query=None, version=None, callback=None, recurse=True):
- """Yields an iterable of all matching segments.
-
- :param query: Either a str or class specifying a segment type (such as 'HNHBK', or :class:`~fints.segments.HNHBK3`), or a list or tuple of strings or classes.
- If a list/tuple is specified, segments returning any matching type will be returned.
- :param version: Either an int specifying a segment version, or a list or tuple of ints.
- If a list/tuple is specified, segments returning any matching version will be returned.
- :param callback: A callable that will be given the segment as its sole argument and must return a boolean indicating whether to return this segment.
- :param recurse: If True (the default), recurse into SegmentSequenceField values, otherwise only look at segments in this SegmentSequence.
-
- The match results of all given parameters will be AND-combined.
- """
-
- if query is None:
- query = []
- elif isinstance(query, str) or not isinstance(query, (list, tuple, Iterable)):
- query = [query]
-
- if version is None:
- version = []
- elif not isinstance(version, (list, tuple, Iterable)):
- version = [version]
-
- if callback is None:
- callback = lambda s: True
-
- for s in self.segments:
- if ((not query) or any( (isinstance(s, t) if isinstance(t, type) else s.header.type == t) for t in query)) and \
- ((not version) or any(s.header.version == v for v in version)) and \
- callback(s):
- yield s
-
- if recurse:
- for name, field in s._fields.items():
- if isinstance(field, SegmentSequenceField):
- val = getattr(s, name)
- if val:
- yield from val.find_segments(query=query, version=version, callback=callback, recurse=recurse)
-
- def find_segment_first(self, *args, **kwargs):
- """Finds the first matching segment.
-
- Same parameters as find_segments(), but only returns the first match, or None if no match is found."""
-
- for m in self.find_segments(*args, **kwargs):
- return m
-
- return None
-
-class SegmentSequenceField(DataElementField):
- type = 'sf'
-
- def _parse_value(self, value):
- if isinstance(value, SegmentSequence):
- return value
- else:
- return SegmentSequence(value)
-
- def _render_value(self, value):
- return value.render_bytes()
-
-
-class ContainerMeta(type):
- @classmethod
- def __prepare__(metacls, name, bases):
- return OrderedDict()
-
- def __new__(cls, name, bases, classdict):
- retval = super().__new__(cls, name, bases, classdict)
- retval._fields = OrderedDict()
- for supercls in reversed(bases):
- if hasattr(supercls, '_fields'):
- retval._fields.update((k,v) for (k,v) in supercls._fields.items())
- retval._fields.update((k,v) for (k,v) in classdict.items() if isinstance(v, Field))
- return retval
-
-class Container(metaclass=ContainerMeta):
- def __init__(self, *args, **kwargs):
- init_values = OrderedDict()
-
- additional_data = kwargs.pop("_additional_data", [])
-
- for init_value, field_name in zip(args, self._fields):
- init_values[field_name] = init_value
- args = ()
-
- for field_name in self._fields:
- if field_name in kwargs:
- if field_name in init_values:
- raise TypeError("__init__() got multiple values for argument {}".format(field_name))
- init_values[field_name] = kwargs.pop(field_name)
-
- super().__init__(*args, **kwargs)
- self._values = {}
- self._additional_data = additional_data
-
- for k,v in init_values.items():
- setattr(self, k, v)
-
- @classmethod
- def naive_parse(cls, data):
- retval = cls()
- for ((name, field), value) in zip(retval._fields.items(), data):
- setattr(retval, name, value)
- return retval
-
- def is_unset(self):
- for name in self._fields.keys():
- val = getattr(self, name)
- if isinstance(val, Container):
- if not val.is_unset():
- return False
- elif val is not None:
- return False
- return True
-
- @property
- def _repr_items(self):
- for name, field in self._fields.items():
- val = getattr(self, name)
- if not field.required:
- if isinstance(val, Container):
- if val.is_unset():
- continue
- elif isinstance(val, ValueList):
- if len(val) == 0:
- continue
- elif val is None:
- continue
- yield (name, val)
-
- if self._additional_data:
- yield ("_additional_data", self._additional_data)
-
- def __repr__(self):
- return "{}.{}({})".format(
- self.__class__.__module__,
- self.__class__.__name__,
- ", ".join(
- "{}={!r}".format(name, val) for (name, val) in self._repr_items
- )
- )
-
- def print_nested(self, stream=None, level=0, indent=" ", prefix="", first_level_indent=True, trailer=""):
- """Structured nested print of the object to the given stream.
-
- The print-out is eval()able to reconstruct the object."""
- import sys
- stream = stream or sys.stdout
-
- stream.write(
- ( (prefix + level*indent) if first_level_indent else "")
- + "{}.{}(".format(self.__class__.__module__, self.__class__.__name__) + "\n"
- )
- for name, value in self._repr_items:
- val = getattr(self, name)
- if not hasattr( getattr(val, 'print_nested', None), '__call__'):
- stream.write(
- (prefix + (level+1)*indent) + "{} = {!r},\n".format(name, val)
- )
- else:
- stream.write(
- (prefix + (level+1)*indent) + "{} = ".format(name)
- )
- val.print_nested(stream=stream, level=level+2, indent=indent, prefix=prefix, first_level_indent=False, trailer=",")
- stream.write( (prefix + level*indent) + "){}\n".format(trailer) )
class ShortReprMixin:
def __repr__(self):
--- /dev/null
+from collections import Iterable, OrderedDict
+
+import fints.fields
+
+
+class ValueList:
+ def __init__(self, parent):
+ self._parent = parent
+ self._data = []
+
+ def __getitem__(self, i):
+ if i >= len(self._data):
+ self.__setitem__(i, None)
+ if i < 0:
+ raise IndexError("Cannot access negative index")
+ return self._data[i]
+
+ def __setitem__(self, i, value):
+ if i < 0:
+ raise IndexError("Cannot access negative index")
+
+ if self._parent.count is not None:
+ if i >= self._parent.count:
+ raise IndexError("Cannot access index {} beyond count {}".format(i, self._parent.count))
+ elif self._parent.max_count is not None:
+ if i >= self._parent.max_count:
+ raise IndexError("Cannot access index {} beyound max_count {}".format(i, self._parent.max_count))
+
+ for x in range(len(self._data), i):
+ self.__setitem__(x, None)
+
+ if value is None:
+ value = self._parent._default_value()
+ else:
+ value = self._parent._parse_value(value)
+ self._parent._check_value(value)
+
+ if i == len(self._data):
+ self._data.append(value)
+ else:
+ self._data[i] = value
+
+ def __delitem__(self, i):
+ self.__setitem__(i, None)
+
+ def __len__(self):
+ if self._parent.count is not None:
+ return self._parent.count
+ else:
+ retval = 0
+ for i, val in enumerate(self._data):
+ if isinstance(val, Container):
+ if val.is_unset():
+ continue
+ elif val is None:
+ continue
+ retval = i+1
+ if self._parent.min_count is not None:
+ if self._parent.min_count > retval:
+ retval = self._parent.min_count
+ return retval
+
+ def __iter__(self):
+ for i in range(len(self)):
+ yield self[i]
+
+ def __repr__(self):
+ return "{!r}".format(list(self))
+
+ def print_nested(self, stream=None, level=0, indent=" ", prefix="", first_level_indent=True, trailer=""):
+ import sys
+ stream = stream or sys.stdout
+
+ stream.write(
+ ( (prefix + level*indent) if first_level_indent else "")
+ + "[\n"
+ )
+ for val in self:
+ if not hasattr( getattr(val, 'print_nested', None), '__call__'):
+ stream.write(
+ (prefix + (level+1)*indent) + "{!r},\n".format(val)
+ )
+ else:
+ val.print_nested(stream=stream, level=level+2, indent=indent, prefix=prefix, trailer=",")
+ stream.write( (prefix + level*indent) + "]{}\n".format(trailer) )
+
+class SegmentSequence:
+ """A sequence of FinTS3Segment objects"""
+
+ def __init__(self, segments = None):
+ if isinstance(segments, bytes):
+ from .parser import FinTS3Parser
+ parser = FinTS3Parser()
+ data = parser.explode_segments(segments)
+ segments = [parser.parse_segment(segment) for segment in data]
+ self.segments = segments or []
+
+ def render_bytes(self) -> bytes:
+ from .parser import FinTS3Serializer
+ return FinTS3Serializer().serialize_message(self)
+
+ def __repr__(self):
+ return "{}.{}({!r})".format(self.__class__.__module__, self.__class__.__name__, self.segments)
+
+ def print_nested(self, stream=None, level=0, indent=" ", prefix="", first_level_indent=True, trailer=""):
+ import sys
+ stream = stream or sys.stdout
+ stream.write(
+ ( (prefix + level*indent) if first_level_indent else "")
+ + "{}.{}([".format(self.__class__.__module__, self.__class__.__name__) + "\n"
+ )
+ for segment in self.segments:
+ segment.print_nested(stream=stream, level=level+1, indent=indent, prefix=prefix, first_level_indent=True, trailer=",")
+ stream.write( (prefix + level*indent) + "]){}\n".format(trailer) )
+
+ def find_segments(self, query=None, version=None, callback=None, recurse=True):
+ """Yields an iterable of all matching segments.
+
+ :param query: Either a str or class specifying a segment type (such as 'HNHBK', or :class:`~fints.segments.HNHBK3`), or a list or tuple of strings or classes.
+ If a list/tuple is specified, segments returning any matching type will be returned.
+ :param version: Either an int specifying a segment version, or a list or tuple of ints.
+ If a list/tuple is specified, segments returning any matching version will be returned.
+ :param callback: A callable that will be given the segment as its sole argument and must return a boolean indicating whether to return this segment.
+ :param recurse: If True (the default), recurse into SegmentSequenceField values, otherwise only look at segments in this SegmentSequence.
+
+ The match results of all given parameters will be AND-combined.
+ """
+
+ if query is None:
+ query = []
+ elif isinstance(query, str) or not isinstance(query, (list, tuple, Iterable)):
+ query = [query]
+
+ if version is None:
+ version = []
+ elif not isinstance(version, (list, tuple, Iterable)):
+ version = [version]
+
+ if callback is None:
+ callback = lambda s: True
+
+ for s in self.segments:
+ if ((not query) or any( (isinstance(s, t) if isinstance(t, type) else s.header.type == t) for t in query)) and \
+ ((not version) or any(s.header.version == v for v in version)) and \
+ callback(s):
+ yield s
+
+ if recurse:
+ for name, field in s._fields.items():
+ if isinstance(field, fints.fields.SegmentSequenceField):
+ val = getattr(s, name)
+ if val:
+ yield from val.find_segments(query=query, version=version, callback=callback, recurse=recurse)
+
+ def find_segment_first(self, *args, **kwargs):
+ """Finds the first matching segment.
+
+ Same parameters as find_segments(), but only returns the first match, or None if no match is found."""
+
+ for m in self.find_segments(*args, **kwargs):
+ return m
+
+ return None
+
+class ContainerMeta(type):
+ @classmethod
+ def __prepare__(metacls, name, bases):
+ return OrderedDict()
+
+ def __new__(cls, name, bases, classdict):
+ retval = super().__new__(cls, name, bases, classdict)
+ retval._fields = OrderedDict()
+ for supercls in reversed(bases):
+ if hasattr(supercls, '_fields'):
+ retval._fields.update((k,v) for (k,v) in supercls._fields.items())
+ retval._fields.update((k,v) for (k,v) in classdict.items() if isinstance(v, fints.fields.Field))
+ return retval
+
+class Container(metaclass=ContainerMeta):
+ def __init__(self, *args, **kwargs):
+ init_values = OrderedDict()
+
+ additional_data = kwargs.pop("_additional_data", [])
+
+ for init_value, field_name in zip(args, self._fields):
+ init_values[field_name] = init_value
+ args = ()
+
+ for field_name in self._fields:
+ if field_name in kwargs:
+ if field_name in init_values:
+ raise TypeError("__init__() got multiple values for argument {}".format(field_name))
+ init_values[field_name] = kwargs.pop(field_name)
+
+ super().__init__(*args, **kwargs)
+ self._values = {}
+ self._additional_data = additional_data
+
+ for k,v in init_values.items():
+ setattr(self, k, v)
+
+ @classmethod
+ def naive_parse(cls, data):
+ retval = cls()
+ for ((name, field), value) in zip(retval._fields.items(), data):
+ setattr(retval, name, value)
+ return retval
+
+ def is_unset(self):
+ for name in self._fields.keys():
+ val = getattr(self, name)
+ if isinstance(val, Container):
+ if not val.is_unset():
+ return False
+ elif val is not None:
+ return False
+ return True
+
+ @property
+ def _repr_items(self):
+ for name, field in self._fields.items():
+ val = getattr(self, name)
+ if not field.required:
+ if isinstance(val, Container):
+ if val.is_unset():
+ continue
+ elif isinstance(val, ValueList):
+ if len(val) == 0:
+ continue
+ elif val is None:
+ continue
+ yield (name, val)
+
+ if self._additional_data:
+ yield ("_additional_data", self._additional_data)
+
+ def __repr__(self):
+ return "{}.{}({})".format(
+ self.__class__.__module__,
+ self.__class__.__name__,
+ ", ".join(
+ "{}={!r}".format(name, val) for (name, val) in self._repr_items
+ )
+ )
+
+ def print_nested(self, stream=None, level=0, indent=" ", prefix="", first_level_indent=True, trailer=""):
+ """Structured nested print of the object to the given stream.
+
+ The print-out is eval()able to reconstruct the object."""
+ import sys
+ stream = stream or sys.stdout
+
+ stream.write(
+ ( (prefix + level*indent) if first_level_indent else "")
+ + "{}.{}(".format(self.__class__.__module__, self.__class__.__name__) + "\n"
+ )
+ for name, value in self._repr_items:
+ val = getattr(self, name)
+ if not hasattr( getattr(val, 'print_nested', None), '__call__'):
+ stream.write(
+ (prefix + (level+1)*indent) + "{} = {!r},\n".format(name, val)
+ )
+ else:
+ stream.write(
+ (prefix + (level+1)*indent) + "{} = ".format(name)
+ )
+ val.print_nested(stream=stream, level=level+2, indent=indent, prefix=prefix, first_level_indent=False, trailer=",")
+ stream.write( (prefix + level*indent) + "){}\n".format(trailer) )
def test_sequence_repr():
s = SegmentSequence()
- assert repr(s) == 'SegmentSequence([])'
+ assert repr(s) == 'fints.types.SegmentSequence([])'
def test_valuelist_repr():
class A(Container):