class ResponseStatus(Enum):
- "Error status of the response"
+ """Error status of the response"""
UNKNOWN = 0
- SUCCESS = 1 #: Response indicates Success
- WARNING = 2 #: Response indicates a Warning
- ERROR = 3 #: Response indicates an Error
+ SUCCESS = 1 #: Response indicates Success
+ WARNING = 2 #: Response indicates a Warning
+ ERROR = 3 #: Response indicates an Error
_RESPONSE_STATUS_MAPPING = {
self.bpd_version = bpa.bpd_version
self.bpd = SegmentSequence(
message.find_segments(
- callback = lambda m: len(m.header.type) == 6 and m.header.type[1] == 'I' and m.header.type[5] == 'S'
+ callback=lambda m: len(m.header.type) == 6 and m.header.type[1] == 'I' and m.header.type[5] == 'S'
)
)
for seg in message.find_segments(HIRMS2):
for response in seg.responses:
- segment = None # FIXME: Provide segment
+ segment = None # FIXME: Provide segment
if not internal_send:
self._log_response(segment, response)
return data
- def get_data(self, including_private:bool=False) -> bytes:
+ def get_data(self, including_private: bool=False) -> bytes:
"""Return state of this FinTSClient instance as an opaque datablob.
Information about the connection is implicitly retrieved from the bank and
statement = []
for seg in responses:
- ## FIXME What is the encoding of MT940 messages?
+ # FIXME What is the encoding of MT940 messages?
statement += mt940_to_array(seg.statement_booked.decode('iso-8859-1'))
logger.debug('Statement: {}'.format(statement))
# The first line is empty - drop it.
del mt535_lines[0]
mt535 = MT535_Miniparser()
- holdings.extend( mt535.parse(mt535_lines) )
+ holdings.extend(mt535.parse(mt535_lines))
if not holdings:
logger.debug('No HIWPD response segment found - maybe account has no holdings?')
response = self.client.connection.send(message)
- ##assert response.segments[0].message_number == self.next_message_number[response.DIRECTION]
+ # assert response.segments[0].message_number == self.next_message_number[response.DIRECTION]
# FIXME Better handling of HKEND in exception case
self.messages[response.DIRECTION][response.segments[0].message_number] = response
self.next_message_number[response.DIRECTION] += 1
external_dialog = self
external_client = self.client
+
class SmartPickler(pickle.Pickler):
def persistent_id(self, obj):
if obj is external_dialog:
def _set_data_v1(self, data):
external_dialog = self
external_client = self.client
+
class SmartUnpickler(pickle.Unpickler):
def persistent_load(self, pid):
if pid == 'dialog':
class FinTSUnsupportedOperation(FinTSError):
- pass
\ No newline at end of file
+ pass
class TextField(FieldRenderFormatStringMixin, DataElementField):
type = 'txt'
_DOC_TYPE = str
- _FORMAT_STRING = "{}" ## FIXME Restrict CRLF
+ _FORMAT_STRING = "{}" # FIXME Restrict CRLF
def _parse_value(self, value): return str(value)
type = 'float'
_DOC_TYPE = float
_FORMAT_STRING = "{:.12f}" # Warning: Python's float is not exact!
- ## FIXME: Needs test
+ # FIXME: Needs test
def _parse_value(self, value):
if isinstance(value, float):
retval = super()._inline_doc_comment(value)
if self._enum:
addendum = value.__doc__
- if addendum and not addendum is value.__class__.__doc__:
+ if addendum and addendum is not value.__class__.__doc__:
if not retval:
retval = " # "
else:
class DateField(FixedLengthMixin, NumericField):
- type = 'dat' # FIXME Need test
+ type = 'dat' # FIXME Need test
_DOC_TYPE = datetime.date
_FIXED_LENGTH = [8]
class TimeField(FixedLengthMixin, DigitsField):
- type = 'tim' # FIXME Need test
+ type = 'tim' # FIXME Need test
_DOC_TYPE = datetime.time
_FIXED_LENGTH = [6]
@property
def VERSION(self):
"""TAN mechanism version"""
- return int( re.match(r'^(\D+)(\d+)$', self.__class__.__name__).group(2) )
+ return int(re.match(r'^(\D+)(\d+)$', self.__class__.__name__).group(2))
security_function = DataElementField(type='code', max_length=3, _d="Sicherheitsfunktion kodiert")
tan_process = DataElementField(type='code', length=1, _d="TAN-Prozess")
MOBILE = 'M' #: Mobiltelefon mit mobileTAN
SECODER = 'S' #: Secoder
+
class TANMediaClass4(RepresentableEnum):
"""TAN-Medium-Klasse, version 4
class StatusSEPATask1(RepresentableEnum):
- ## FIXME Enum names in english
+ # FIXME Enum names in english
PENDING = '1' #: In Terminierung
DECLINED = '2' #: Abgelehnt von erster Inkassostelle
IN_PROGRESS = '3' #: in Bearbeitung
"""
def parse_message(self, data: bytes) -> SegmentSequence:
- "Takes a FinTS 3.0 message as byte array, and returns a parsed segment sequence"
+ """Takes a FinTS 3.0 message as byte array, and returns a parsed segment sequence"""
if isinstance(data, bytes):
data = self.explode_segments(data)
"""
def serialize_message(self, message: SegmentSequence) -> bytes:
- "Serialize a message (as SegmentSequence, list of FinTS3Segment, or FinTS3Segment) into a byte array"
+ """Serialize a message (as SegmentSequence, list of FinTS3Segment, or FinTS3Segment) into a byte array"""
if isinstance(message, FinTS3Segment):
message = SegmentSequence([message])
if isinstance(message, (list, tuple, Iterable)):
result = []
for segment in message.segments:
- result.append( self.serialize_segment(segment) )
+ result.append(self.serialize_segment(segment))
return self.implode_segments(result)
seg = []
filler = []
- for name,field in segment._fields.items():
+ for name, field in segment._fields.items():
repeat = field.count != 1
constructed = isinstance(field, DataElementGroupField)
if not constructed:
if repeat:
- seg.extend( field.render(val) for val in getattr(segment, name) )
+ seg.extend(field.render(val) for val in getattr(segment, name))
else:
- seg.append( field.render(getattr(segment, name)) )
+ seg.append(field.render(getattr(segment, name)))
else:
if repeat:
for val in getattr(segment, name):
- seg.append( self.serialize_deg(val) )
+ seg.append(self.serialize_deg(val))
else:
- seg.append( self.serialize_deg(getattr(segment, name), allow_skip=True) )
+ seg.append(self.serialize_deg(getattr(segment, name), allow_skip=True))
if segment._additional_data:
seg.extend(segment._additional_data)
if not constructed:
if repeat:
- result.extend( field.render(val) for val in getattr(deg, name) )
+ result.extend(field.render(val) for val in getattr(deg, name))
else:
- result.append( field.render(getattr(deg, name)) )
+ result.append(field.render(getattr(deg, name)))
else:
if repeat:
for val in getattr(deg, name):
- result.extend( self.serialize_deg(val) )
+ result.extend(self.serialize_deg(val))
else:
- result.extend( self.serialize_deg(getattr(deg, name)) )
+ result.extend(self.serialize_deg(getattr(deg, name)))
return result
level2 = []
for deg in segment:
if isinstance(deg, (list, tuple)):
- highest_index = max(((i+1) for (i,e) in enumerate(deg) if e != b'' and e is not None), default=0)
+ highest_index = max(((i+1) for (i, e) in enumerate(deg) if e != b'' and e is not None), default=0)
level2.append(
b":".join(FinTS3Serializer.escape_value(de) for de in deg[:highest_index])
)
class PinTanAuthenticationMechanism(AuthenticationMechanism):
def __init__(self, pin):
- self.pin=pin
- self.pending_signature=None
- self.security_function=None
+ self.pin = pin
+ self.pending_signature = None
+ self.security_function = None
def sign_prepare(self, message: FinTSMessage):
_now = datetime.datetime.now()
rand = random.SystemRandom()
self.pending_signature = HNSHK4(
- security_profile = SecurityProfile(SecurityMethod.PIN, 1),
- security_function = self.security_function,
- security_reference = rand.randint(1000000, 9999999),
- security_application_area = SecurityApplicationArea.SHM,
- security_role = SecurityRole.ISS,
- security_identification_details = SecurityIdentificationDetails(
- IdentifiedRole.MS,
- identifier=message.dialog.client.system_id,
- ),
- security_reference_number = 1, ## FIXME
- security_datetime = SecurityDateTime(
- DateTimeType.STS,
- _now.date(),
- _now.time(),
- ),
- hash_algorithm = HashAlgorithm(
- usage_hash = '1',
- hash_algorithm = '999',
- algorithm_parameter_name = '1',
- ),
- signature_algorithm = SignatureAlgorithm(
- usage_signature = '6',
- signature_algorithm = '10',
- operation_mode = '16',
- ),
- key_name = KeyName(
- message.dialog.client.bank_identifier,
- message.dialog.client.user_id,
- KeyType.S,
- 0,
- 0,
- ),
+ security_profile=SecurityProfile(SecurityMethod.PIN, 1),
+ security_function=self.security_function,
+ security_reference=rand.randint(1000000, 9999999),
+ security_application_area=SecurityApplicationArea.SHM,
+ security_role=SecurityRole.ISS,
+ security_identification_details=SecurityIdentificationDetails(
+ IdentifiedRole.MS,
+ identifier=message.dialog.client.system_id,
+ ),
+ security_reference_number=1, # FIXME
+ security_datetime=SecurityDateTime(
+ DateTimeType.STS,
+ _now.date(),
+ _now.time(),
+ ),
+ hash_algorithm=HashAlgorithm(
+ usage_hash='1',
+ hash_algorithm='999',
+ algorithm_parameter_name='1',
+ ),
+ signature_algorithm=SignatureAlgorithm(
+ usage_signature='6',
+ signature_algorithm='10',
+ operation_mode='16',
+ ),
+ key_name=KeyName(
+ message.dialog.client.bank_identifier,
+ message.dialog.client.user_id,
+ KeyType.S,
+ 0,
+ 0,
+ ),
)
if self.pending_signature not in message.segments:
raise FinTSError("Cannot sign a message that was not prepared")
-
+
signature = HNSHA2(
- security_reference = self.pending_signature.security_reference,
- user_defined_signature = UserDefinedSignature(
+ security_reference=self.pending_signature.security_reference,
+ user_defined_signature=UserDefinedSignature(
pin=self.pin,
tan=self._get_tan(),
),
def VERSION(cls):
match = TYPE_VERSION_RE.match(cls.__name__)
if match:
- return int( match.group(2) )
+ return int(match.group(2))
def __init__(self, *args, **kwargs):
if 'header' not in kwargs:
args = (kwargs.pop('header'), ) + args
- return super().__init__(*args, **kwargs)
+ super().__init__(*args, **kwargs)
@classmethod
def find_subclass(cls, segment):
parameter = DataElementGroupField(type=ScheduledDebitParameter1, _d="Parameter terminierte SEPA-Sammellastschrift einreichen")
-
class HKDSE2(FinTS3Segment):
"""Terminierte SEPA-Einzellastschrift einreichen, version 2
task_cancelable = DataElementField(type='jn', required=False, _d="Auftrag löschbar")
task_changeable = DataElementField(type='jn', required=False, _d="Auftrag änderbar")
+
class HIDBSS1(ParameterSegment):
"""Bestand terminierter SEPA-Einzellastschriften Parameter, version 1
compression_function = CodeField(CompressionFunction, max_length=3, _d="Komprimierungsfunktion")
certificate = DataElementGroupField(type=Certificate, required=False, _d="Zertifikat")
+
class HNVSD1(FinTS3Segment):
"""Verschlüsselte Daten, version 1
continue
elif val is None:
continue
- retval = i+1
+ retval = i + 1
return retval
def __len__(self):
stream = stream or sys.stdout
stream.write(
- ( (prefix + level*indent) if first_level_indent else "")
+ ((prefix + level * indent) if first_level_indent else "")
+ "[{}\n".format(first_line_suffix)
)
min_true_length = self._get_minimal_true_length()
docstring = self._parent._inline_doc_comment(val)
else:
docstring = ""
- if not hasattr( getattr(val, 'print_nested', None), '__call__'):
+ if not hasattr(getattr(val, 'print_nested', None), '__call__'):
stream.write(
- (prefix + (level+1)*indent) + "{!r},{}\n".format(val, docstring)
+ (prefix + (level + 1) * indent) + "{!r},{}\n".format(val, docstring)
)
else:
- val.print_nested(stream=stream, level=level+2, indent=indent, prefix=prefix, trailer=",", print_doc=print_doc, first_line_suffix=docstring)
+ val.print_nested(stream=stream, level=level + 2, indent=indent, prefix=prefix, trailer=",", print_doc=print_doc, first_line_suffix=docstring)
if skipped_items:
- stream.write( (prefix + (level+1)*indent) + "# {} empty items skipped\n".format(skipped_items) )
- stream.write( (prefix + level*indent) + "]{}\n".format(trailer) )
+ stream.write((prefix + (level + 1) * indent) + "# {} empty items skipped\n".format(skipped_items))
+ stream.write((prefix + level * indent) + "]{}\n".format(trailer))
class SegmentSequence:
"""A sequence of FinTS3Segment objects"""
- def __init__(self, segments = None):
+ def __init__(self, segments=None):
if isinstance(segments, bytes):
from .parser import FinTS3Parser
parser = FinTS3Parser()
import sys
stream = stream or sys.stdout
stream.write(
- ( (prefix + level*indent) if first_level_indent else "")
+ ((prefix + level * indent) if first_level_indent else "")
+ "{}.{}([".format(self.__class__.__module__, self.__class__.__name__)
+ first_line_suffix
+ "\n"
docstring = " # {}".format(docstring)
else:
docstring = ""
- segment.print_nested(stream=stream, level=level+1, indent=indent, prefix=prefix, first_level_indent=True, trailer=",", print_doc=print_doc, first_line_suffix=docstring)
- stream.write( (prefix + level*indent) + "]){}\n".format(trailer) )
+ segment.print_nested(stream=stream, level=level + 1, indent=indent, prefix=prefix, first_level_indent=True, trailer=",", print_doc=print_doc,
+ first_line_suffix=docstring)
+ stream.write((prefix + level * indent) + "]){}\n".format(trailer))
def find_segments(self, query=None, version=None, callback=None, recurse=True):
"""Yields an iterable of all matching segments.
callback = lambda s: True
for s in self.segments:
- if ((not query) or any( (isinstance(s, t) if isinstance(t, type) else s.header.type == t) for t in query)) and \
- ((not version) or any(s.header.version == v for v in version)) and \
- callback(s):
+ if ((not query) or any((isinstance(s, t) if isinstance(t, type) else s.header.type == t) for t in query)) and \
+ ((not version) or any(s.header.version == v for v in version)) and \
+ callback(s):
yield s
if recurse:
retval._fields = OrderedDict()
for supercls in reversed(bases):
if hasattr(supercls, '_fields'):
- retval._fields.update((k,v) for (k,v) in supercls._fields.items())
- retval._fields.update((k,v) for (k,v) in classdict.items() if isinstance(v, Field))
+ retval._fields.update((k, v) for (k, v) in supercls._fields.items())
+ retval._fields.update((k, v) for (k, v) in classdict.items() if isinstance(v, Field))
return retval
+
class Container(metaclass=ContainerMeta):
def __init__(self, *args, **kwargs):
init_values = OrderedDict()
for init_value, field_name in zip(args, self._fields):
init_values[field_name] = init_value
args = ()
-
+
for field_name in self._fields:
if field_name in kwargs:
if field_name in init_values:
raise TypeError("__init__() got multiple values for argument {}".format(field_name))
init_values[field_name] = kwargs.pop(field_name)
-
+
super().__init__(*args, **kwargs)
self._values = {}
self._additional_data = additional_data
- for k,v in init_values.items():
+ for k, v in init_values.items():
setattr(self, k, v)
@classmethod
stream = stream or sys.stdout
stream.write(
- ( (prefix + level*indent) if first_level_indent else "")
+ ((prefix + level * indent) if first_level_indent else "")
+ "{}.{}(".format(self.__class__.__module__, self.__class__.__name__)
+ first_line_suffix
+ "\n"
docstring = self._fields[name]._inline_doc_comment(val)
else:
docstring = ""
- if not hasattr( getattr(val, 'print_nested', None), '__call__'):
+ if not hasattr(getattr(val, 'print_nested', None), '__call__'):
stream.write(
- (prefix + (level+1)*indent) + "{} = {!r},{}\n".format(name, val, docstring)
+ (prefix + (level + 1) * indent) + "{} = {!r},{}\n".format(name, val, docstring)
)
else:
stream.write(
- (prefix + (level+1)*indent) + "{} = ".format(name)
+ (prefix + (level + 1) * indent) + "{} = ".format(name)
)
- val.print_nested(stream=stream, level=level+2, indent=indent, prefix=prefix, first_level_indent=False, trailer=",", print_doc=print_doc, first_line_suffix=docstring)
- stream.write( (prefix + level*indent) + "){}\n".format(trailer) )
+ val.print_nested(stream=stream, level=level + 2, indent=indent, prefix=prefix, first_level_indent=False, trailer=",", print_doc=print_doc,
+ first_line_suffix=docstring)
+ stream.write((prefix + level * indent) + "){}\n".format(trailer))
# We will turn off robust mode generally for tests
fints.parser.robust_mode = False
+
@pytest.fixture(scope="session")
def fints_server():
- dialog_prefix = base64.b64encode( uuid.uuid4().bytes, altchars=b'_/' ).decode('us-ascii')
- system_prefix = base64.b64encode( uuid.uuid4().bytes, altchars=b'_/' ).decode('us-ascii')
+ dialog_prefix = base64.b64encode(uuid.uuid4().bytes, altchars=b'_/').decode('us-ascii')
+ system_prefix = base64.b64encode(uuid.uuid4().bytes, altchars=b'_/').decode('us-ascii')
dialogs = {}
systems = {}
result.append("HIKAZ::7:{}+@{}@".format(hkkaz.group(1).decode('us-ascii'), len(tx)).encode('us-ascii') + tx + b"'")
-
hkccs = re.search(rb"'HKCCS:(\d+):1.*@\d+@(.*)/Document>'", message)
if hkccs:
segno = hkccs.group(1).decode('us-ascii')
return b"".join(result)
-
def process_message(self, message):
incoming_dialog_id = re.match(rb'HNHBK:1:3\+\d+\+300\+([^+]+)', message)
self.end_headers()
self.wfile.write(content_data)
-
server = http.server.HTTPServer(('127.0.0.1', 0), FinTSHandler)
thread = threading.Thread(target=server.serve_forever, name="fints_server", daemon=True)
thread.start()
def test_serialize_2():
from fints.formals import SegmentSequence
- import fints.formals, fints.segments
+ import fints.segments
s = SegmentSequence([fints.segments.message.HNHBK3(header=fints.formals.SegmentHeader('HNHBK', 1, 3), message_size='000000000428', hbci_version=300,
dialogue_id='430711670077=043999659571CN9D=', message_number=2,
reference_message=fints.formals.ReferenceMessage(dialogue_id='430711670077=043999659571CN9D=',