From: Henryk Plötz Date: Sun, 12 Aug 2018 12:25:46 +0000 (+0200) Subject: Add a 'robust mode': A parse error during interpretation of a segment will be turned... X-Git-Tag: v2.0.0~1^2~112 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=43183dc605abbff6b9dd45d841aa58da2f27e694;p=thirdparty%2Fpython-fints.git Add a 'robust mode': A parse error during interpretation of a segment will be turned into a Warning, and a generic object will be created. Should guarantee that no exceptions are triggered during communication, even if there's a bug in the segment definitions. Downside: find_segments('FOO') might return instances that are not of subclass FOOx. Compensation: Make all calls to find_segment type safe by requiring a specific subclass. --- diff --git a/fints/client.py b/fints/client.py index 2718b93..b9f5e48 100644 --- a/fints/client.py +++ b/fints/client.py @@ -20,6 +20,8 @@ from .segments.transfer import HKCCS, HKCCM from .formals import TwoStepParametersCommon from .utils import mt940_to_array, MT535_Miniparser, Password +from fints.segments import HISPA1 + logger = logging.getLogger(__name__) @@ -58,7 +60,7 @@ class FinTS3Client: dialog.end() self.accounts = [] - for seg in resp.find_segments('HISPA'): + for seg in resp.find_segments(HISPA1): self.accounts.extend(seg.accounts) return self.accounts diff --git a/fints/message.py b/fints/message.py index a1a38c6..f17e888 100644 --- a/fints/message.py +++ b/fints/message.py @@ -5,7 +5,7 @@ import re from .segments.message import HNHBK, HNHBS, HNSHA, HNSHK, HNVSD, HNVSK from .parser import FinTS3Parser from .formals import SegmentSequence -from .segments import ParameterSegment +from .segments import ParameterSegment, HIRMG2, HNHBK3, HIBPA3, HISYN4, HIRMS2, HITANSBase class FinTSMessage: def __init__(self, blz, username, pin, systemid, dialogid, msgno, encrypted_segments, tan_mechs=None, tan=None): @@ -66,26 +66,26 @@ class FinTSMessage: class FinTSResponse(SegmentSequence): def is_success(self): - for seg in self.find_segments('HIRMG'): + for seg in self.find_segments(HIRMG2): for response in seg.responses: if response.code.startswith('9'): return False return True def get_dialog_id(self): - seg = self.find_segment_first('HNHBK') + seg = self.find_segment_first(HNHBK3) if not seg: raise ValueError('Invalid response, no HNHBK segment') return seg.dialogue_id def get_bank_name(self): - seg = self.find_segment_first('HIBPA') + seg = self.find_segment_first(HIBPA3) if seg: return seg.bank_name def get_systemid(self): - seg = self.find_segment_first('HISYN') + seg = self.find_segment_first(HISYN4) if not seg: raise ValueError('Could not find systemid') return seg.customer_system_id @@ -95,21 +95,14 @@ class FinTSResponse(SegmentSequence): def get_supported_tan_mechanisms(self): tan_methods = [] - for seg in self.find_segments('HIRMS'): + for seg in self.find_segments(HIRMS2): for response in seg.responses: if response.code == '3920': tan_methods.extend( response.parameters ) # Get parameters for tan methods methods = [] - for seg in self.find_segments('HITANS'): - if not isinstance(seg, ParameterSegment): - raise NotImplementedError( - "HITANS segment version {} is currently not implemented".format( - seg.header.version - ) - ) - + for seg in self.find_segments(HITANSBase): for params in seg.parameter.twostep_parameters: if params.security_function in tan_methods: methods.append(params) @@ -124,7 +117,7 @@ class FinTSResponse(SegmentSequence): def get_touchdowns(self, msg: FinTSMessage): touchdown = {} for msgseg in msg.encrypted_segments: - seg = self._find_segment_for_reference('HIRMS', msgseg) + seg = self._find_segment_for_reference(HIRMS2, msgseg) if seg: for p in seg[1:]: if p[0] == "3040": diff --git a/fints/parser.py b/fints/parser.py index 9d6ae16..2f708a3 100644 --- a/fints/parser.py +++ b/fints/parser.py @@ -1,7 +1,7 @@ from enum import Enum from collections import Iterable from contextlib import suppress -import re +import re, warnings from .segments import FinTS3Segment from .formals import Container, ValueList, DataElementField, DataElementGroupField, SegmentSequence @@ -29,6 +29,19 @@ from .formals import Container, ValueList, DataElementField, DataElementGroupFie # An item on level 3 is always a Data Element, but which Data Element it is depends # on which fields have been consumed in the sequence before it. + +#: Operate the parser in "robust mode". In this mode, errors during segment parsing +#: will be turned into a FinTSParserWarning and a generic FinTS3Segment (not a subclass) +#: will be constructed. This allows for all syntactically correct FinTS messages to be +#: consumed, even in the presence of errors in this library. +robust_mode = True + +class FinTSParserWarning(UserWarning): + pass + +class FinTSParserError(ValueError): + pass + TOKEN_RE = re.compile(rb""" ^(?: (?: \? (?P.) ) | (?P[^?:+@']+) @@ -128,6 +141,17 @@ class FinTS3Parser: def parse_segment(self, segment): clazz = FinTS3Segment.find_subclass(segment) + + try: + return self._parse_segment_as_class(clazz, segment) + except FinTSParserError as e: + if robust_mode: + warnings.warn("Ignoring parser error and returning generic object: {}. Turn off robust_mode to see Exception.".format(str(e)), FinTSParserWarning) + return self._parse_segment_as_class(FinTS3Segment, segment) + else: + raise + + def _parse_segment_as_class(self, clazz, segment): seg = clazz() data = iter(segment) @@ -140,7 +164,7 @@ class FinTS3Parser: val = next(data) except StopIteration: if field.required: - raise ValueError("Required field {}.{} was not present".format(seg.__class__.__name__, name)) + raise FinTSParserError("Required field {}.{} was not present".format(seg.__class__.__name__, name)) break try: @@ -150,7 +174,7 @@ class FinTS3Parser: deg = self.parse_deg_noniter(field.type, val, field.required) setattr(seg, name, deg) except ValueError as e: - raise ValueError("Wrong input when setting {}.{}".format(seg.__class__.__name__, name)) from e + raise FinTSParserError("Wrong input when setting {}.{}".format(seg.__class__.__name__, name)) from e else: i = 0 while True: @@ -166,7 +190,7 @@ class FinTS3Parser: deg = self.parse_deg_noniter(field.type, val, field.required) getattr(seg, name)[i] = deg except ValueError as e: - raise ValueError("Wrong input when setting {}.{}".format(seg.__class__.__name__, name)) from e + raise FinTSParserError("Wrong input when setting {}.{}".format(seg.__class__.__name__, name)) from e i = i + 1 @@ -189,7 +213,7 @@ class FinTS3Parser: remainder = list(data_i) if remainder: - raise ValueError("Unparsed data {!r} after parsing {!r}".format(remainder, clazz)) + raise FinTSParserError("Unparsed data {!r} after parsing {!r}".format(remainder, clazz)) return retval @@ -209,13 +233,13 @@ class FinTS3Parser: setattr(retval, name, next(data_i)) except StopIteration: if required and field.required: - raise ValueError("Required field {}.{} was not present".format(retval.__class__.__name__, name)) + raise FinTSParserError("Required field {}.{} was not present".format(retval.__class__.__name__, name)) break else: deg = self.parse_deg(field.type, data_i, required and field.required) setattr(retval, name, deg) except ValueError as e: - raise ValueError("Wrong input when setting {}.{}".format(retval.__class__.__name__, name)) from e + raise FinTSParserError("Wrong input when setting {}.{}".format(retval.__class__.__name__, name)) from e else: i = 0 while True: @@ -232,7 +256,7 @@ class FinTS3Parser: getattr(retval, name)[i] = deg except ValueError as e: - raise ValueError("Wrong input when setting {}.{}".format(retval.__class__.__name__, name)) from e + raise FinTSParserError("Wrong input when setting {}.{}".format(retval.__class__.__name__, name)) from e i = i + 1 diff --git a/fints/segments/__init__.py b/fints/segments/__init__.py index 6202df9..3e60534 100644 --- a/fints/segments/__init__.py +++ b/fints/segments/__init__.py @@ -151,22 +151,25 @@ class ParameterSegment(FinTS3Segment): min_number_signatures = DataElementField(type='num', length=1, _d="Anzahl Signaturen mindestens") security_class = DataElementField(type='num', length=1, _d="Sicherheitsklasse") -class HITANS1(ParameterSegment): +class HITANSBase(ParameterSegment): + pass + +class HITANS1(HITANSBase): parameter = DataElementGroupField(type=ParameterTwostepTAN1) -class HITANS2(ParameterSegment): +class HITANS2(HITANSBase): parameter = DataElementGroupField(type=ParameterTwostepTAN2) -class HITANS3(ParameterSegment): +class HITANS3(HITANSBase): parameter = DataElementGroupField(type=ParameterTwostepTAN3) -class HITANS4(ParameterSegment): +class HITANS4(HITANSBase): parameter = DataElementGroupField(type=ParameterTwostepTAN4) -class HITANS5(ParameterSegment): +class HITANS5(HITANSBase): parameter = DataElementGroupField(type=ParameterTwostepTAN5) -class HITANS6(ParameterSegment): +class HITANS6(HITANSBase): parameter = DataElementGroupField(type=ParameterTwostepTAN6) class HIPINS1(ParameterSegment): diff --git a/tests/conftest.py b/tests/conftest.py index 4c09a62..9feb800 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,10 @@ import pytest, glob, os.path +import fints.parser TEST_MESSAGES = { os.path.basename(f).rsplit('.')[0]: open(f, 'rb').read() for f in glob.glob(os.path.join(os.path.dirname(__file__), "messages", "*.bin")) } + +# We will turn off robust mode generally for tests +fints.parser.robust_mode = False diff --git a/tests/test_message_parser.py b/tests/test_message_parser.py index 935ccde..c4bfc23 100644 --- a/tests/test_message_parser.py +++ b/tests/test_message_parser.py @@ -1,6 +1,7 @@ from fints.message import FinTSResponse -from fints.parser import FinTS3Parser +from fints.parser import FinTS3Parser, FinTSParserError, FinTSParserWarning from fints.formals import SegmentSequence +from fints.segments import FinTS3Segment import pytest from conftest import TEST_MESSAGES @@ -106,6 +107,14 @@ def test_invalid(): FinTS3Parser.explode_segments(message5) message6 = rb"""HNHBS:5:1'""" - with pytest.raises(ValueError, match='^Required field'): + with pytest.raises(FinTSParserError, match='^Required field'): m = FinTS3Parser().parse_message(message6) +def test_robust_mode(mock): + mock.patch('fints.parser.robust_mode', True) + + message1 = rb"""HNHBS:5:1'""" + with pytest.warns(FinTSParserWarning, match='^Ignoring parser error.*: Required field'): + m = FinTS3Parser().parse_message(message1) + assert m.segments[0].__class__ == FinTS3Segment +