]> git.ipfire.org Git - thirdparty/python-fints.git/commitdiff
Fix logic error in counted elements parser, can now parse HIRMG2
authorHenryk Plötz <henryk@ploetzli.ch>
Wed, 8 Aug 2018 10:58:30 +0000 (12:58 +0200)
committerRaphael Michel <mail@raphaelmichel.de>
Mon, 3 Dec 2018 18:34:17 +0000 (19:34 +0100)
fints/formals.py
fints/parser.py
tests/test_message_parser.py

index 79f990f2b5e3c820060325f574337e23e21fcc0d..e50213d11ca3a7ddecdd8ffc046ef24a857c3929 100644 (file)
@@ -134,6 +134,7 @@ class Field:
 
 class TypedField(Field, SubclassesMixin):
     flat_length = 1
+    flat_length_max = 1
 
     def __new__(cls, *args, **kwargs):
         target_cls = None
@@ -186,6 +187,19 @@ class ContainerField(TypedField):
                 raise TypeError("Cannot compute flat length of field {}.{} with variable count".format(self.__class__.__name__, name))
             result = result + field.count * field.flat_length
         return result
+
+    @property
+    def flat_length_max(self):
+        result = 0
+        for name, field in self.type._fields.items():
+            # Note: We're *not* recursing into flat_length_max, because we don't want variable count fields at deeper levels
+            if field.count is not None:
+                result = result + field.count * field.flat_length
+            elif field.max_count is not None:
+                result = result + field.max_count * field.flat_length
+            else:
+                raise TypeError("Cannot compute max flat length of field {}.{} without count and max_count".format(self.__class__.__name__, name))
+        return result
     
 
 class DataElementGroupField(ContainerField):
index 34c6c2494a3428651b649e9f09713c108347f810..817ad20b2ebf9bcc43a768626ca3ff59fcbc1658 100644 (file)
@@ -116,60 +116,70 @@ class FinTS3Parser:
         seg = clazz()
 
         data = iter(segment)
-        for name, field in seg._fields.items():
-            try:
-                val = next(data)
-            except StopIteration:
-                if field.required:
-                    raise ValueError("Required field {}.{} was not present".format(clazz.__name__, name))
+        for number, (name, field) in enumerate(seg._fields.items()):
+            vals = self.parse_repeat(field, data, number == len(seg._fields)-1)
+            if field.count == 1:
+                if len(vals):
+                    setattr(seg, name, vals[0])
+                else:
+                    if field.required:
+                        raise ValueError("Required field {}.{} was not present".format(seg.__class__.__name__, name))
             else:
-                deg = self.parse_n_deg(field, val)
-                setattr(seg, name, deg)
+                setattr(seg, name, vals)
+
         seg._additional_data = list(data)
 
         return seg
 
-    def parse_n_deg(self, field, data):
-        if not isinstance(data, Iterable) or isinstance(data, (str, bytes)):
-            data = [data]
-
-        data_i = iter(data)
-        field_index = 0
-        field_length = field.flat_length
-
+    def parse_repeat(self, field, data_i, is_last):
         retval = []
-        eod = False
 
-        while not eod:
-            vals = []
+        if field.count == 1:
             try:
-                for x in range(field_length):
-                    vals.append(next(data_i))
+                val = next(data_i)
             except StopIteration:
-                eod = True
-
-            if field.count == 1:
-                if isinstance(field, DataElementField):
-                    if not len(vals):
-                        return
-                    return vals[0]
-                elif isinstance(field, DataElementGroupField):
-                    return self.parse_deg(field.type, vals)
+                pass
+            else:
+                retval.append( self.parse_n_deg(field, val) )
+        else:
+            for i in range(field.count if field.count is not None else field.max_count):
+                try:
+                    val = next(data_i)
+                except StopIteration:
+                    break
                 else:
-                    raise Error("Internal error")
-                break
+                    retval.append(self.parse_n_deg(field, val, is_last))
 
-            if field_index >= (field.count if field.count is not None else len(data) // field_length):
-                break
+        return retval
 
-            if isinstance(field, DataElementField):
-                retval.append(vals[0] if len(vals) else None)
-            elif isinstance(field, DataElementGroupField):
-                retval.append(self.parse_deg(field.type, vals))
-            else:
-                raise Error("Internal error")
 
-        return retval
+    def parse_n_deg(self, field, data, is_last=False):
+        if not isinstance(data, Iterable) or isinstance(data, (str, bytes)):
+            data = [data]
+
+        data_i = iter(data)
+        if is_last:
+            field_length = field.flat_length_max
+        else:
+            field_length = field.flat_length
+
+        eod = False
+
+        vals = []
+        try:
+            for x in range(field_length):
+                vals.append(next(data_i))
+        except StopIteration:
+            pass
+
+        if isinstance(field, DataElementField):
+            if not len(vals):
+                return
+            return vals[0]
+        elif isinstance(field, DataElementGroupField):
+            return self.parse_deg(field.type, vals)
+        else:
+            raise Error("Internal error")
 
     def parse_deg(self, clazz, vals):
         retval = clazz()
index 76561f73bf4abb5092a92c5bf3210dd6a28ce482..8bd82d36bc4fcb83a8fb9dca197cce896be10d3b 100644 (file)
@@ -48,14 +48,14 @@ def test_parse_counted():
         a = NumericField(max_count=3)
 
     m2 = FinTS3Parser().parse_message(b"ITST:1:2+1+2+3'")
-    assert m1.segments[0].a[2] == 3
+    assert m2.segments[0].a[2] == 3
 
-    with pytest.raises(IndexError):
-        FinTS3Parser().parse_message(b"ITST:1:2+1+2+3+4'")
+    m3 = FinTS3Parser().parse_message(b"ITST:1:2+1+2+3+4'")
+    assert m3.segments[0]._additional_data == ['4']
 
-    m = FinTS3Parser().parse_message(b"ITST:1:2+1+2'")
-    assert len(m2.segments[0].a) == 2
-    assert m2.segments[0].a[1] == 2
+    m4 = FinTS3Parser().parse_message(b"ITST:1:2+1+2'")
+    assert len(m4.segments[0].a) == 2
+    assert m4.segments[0].a[1] == 2
 
 
 def test_parse_HIRMG2():