class TypedField(Field, SubclassesMixin):
flat_length = 1
+ flat_length_max = 1
def __new__(cls, *args, **kwargs):
target_cls = None
raise TypeError("Cannot compute flat length of field {}.{} with variable count".format(self.__class__.__name__, name))
result = result + field.count * field.flat_length
return result
+
+ @property
+ def flat_length_max(self):
+ result = 0
+ for name, field in self.type._fields.items():
+ # Note: We're *not* recursing into flat_length_max, because we don't want variable count fields at deeper levels
+ if field.count is not None:
+ result = result + field.count * field.flat_length
+ elif field.max_count is not None:
+ result = result + field.max_count * field.flat_length
+ else:
+ raise TypeError("Cannot compute max flat length of field {}.{} without count and max_count".format(self.__class__.__name__, name))
+ return result
class DataElementGroupField(ContainerField):
seg = clazz()
data = iter(segment)
- for name, field in seg._fields.items():
- try:
- val = next(data)
- except StopIteration:
- if field.required:
- raise ValueError("Required field {}.{} was not present".format(clazz.__name__, name))
+ for number, (name, field) in enumerate(seg._fields.items()):
+ vals = self.parse_repeat(field, data, number == len(seg._fields)-1)
+ if field.count == 1:
+ if len(vals):
+ setattr(seg, name, vals[0])
+ else:
+ if field.required:
+ raise ValueError("Required field {}.{} was not present".format(seg.__class__.__name__, name))
else:
- deg = self.parse_n_deg(field, val)
- setattr(seg, name, deg)
+ setattr(seg, name, vals)
+
seg._additional_data = list(data)
return seg
- def parse_n_deg(self, field, data):
- if not isinstance(data, Iterable) or isinstance(data, (str, bytes)):
- data = [data]
-
- data_i = iter(data)
- field_index = 0
- field_length = field.flat_length
-
+ def parse_repeat(self, field, data_i, is_last):
retval = []
- eod = False
- while not eod:
- vals = []
+ if field.count == 1:
try:
- for x in range(field_length):
- vals.append(next(data_i))
+ val = next(data_i)
except StopIteration:
- eod = True
-
- if field.count == 1:
- if isinstance(field, DataElementField):
- if not len(vals):
- return
- return vals[0]
- elif isinstance(field, DataElementGroupField):
- return self.parse_deg(field.type, vals)
+ pass
+ else:
+ retval.append( self.parse_n_deg(field, val) )
+ else:
+ for i in range(field.count if field.count is not None else field.max_count):
+ try:
+ val = next(data_i)
+ except StopIteration:
+ break
else:
- raise Error("Internal error")
- break
+ retval.append(self.parse_n_deg(field, val, is_last))
- if field_index >= (field.count if field.count is not None else len(data) // field_length):
- break
+ return retval
- if isinstance(field, DataElementField):
- retval.append(vals[0] if len(vals) else None)
- elif isinstance(field, DataElementGroupField):
- retval.append(self.parse_deg(field.type, vals))
- else:
- raise Error("Internal error")
- return retval
+ def parse_n_deg(self, field, data, is_last=False):
+ if not isinstance(data, Iterable) or isinstance(data, (str, bytes)):
+ data = [data]
+
+ data_i = iter(data)
+ if is_last:
+ field_length = field.flat_length_max
+ else:
+ field_length = field.flat_length
+
+ eod = False
+
+ vals = []
+ try:
+ for x in range(field_length):
+ vals.append(next(data_i))
+ except StopIteration:
+ pass
+
+ if isinstance(field, DataElementField):
+ if not len(vals):
+ return
+ return vals[0]
+ elif isinstance(field, DataElementGroupField):
+ return self.parse_deg(field.type, vals)
+ else:
+ raise Error("Internal error")
def parse_deg(self, clazz, vals):
retval = clazz()
a = NumericField(max_count=3)
m2 = FinTS3Parser().parse_message(b"ITST:1:2+1+2+3'")
- assert m1.segments[0].a[2] == 3
+ assert m2.segments[0].a[2] == 3
- with pytest.raises(IndexError):
- FinTS3Parser().parse_message(b"ITST:1:2+1+2+3+4'")
+ m3 = FinTS3Parser().parse_message(b"ITST:1:2+1+2+3+4'")
+ assert m3.segments[0]._additional_data == ['4']
- m = FinTS3Parser().parse_message(b"ITST:1:2+1+2'")
- assert len(m2.segments[0].a) == 2
- assert m2.segments[0].a[1] == 2
+ m4 = FinTS3Parser().parse_message(b"ITST:1:2+1+2'")
+ assert len(m4.segments[0].a) == 2
+ assert m4.segments[0].a[1] == 2
def test_parse_HIRMG2():