]> git.ipfire.org Git - thirdparty/python-fints.git/commitdiff
Add full FinTS parser. Significantly changes the internal API, as segments are now...
authorHenryk Plötz <henryk@ploetzli.ch>
Mon, 30 Jul 2018 15:46:30 +0000 (17:46 +0200)
committerRaphael Michel <mail@raphaelmichel.de>
Mon, 3 Dec 2018 18:34:16 +0000 (19:34 +0100)
Fully supports all escaping and binary content. Decodes 'normal' data from ISO-8859-1 to Python strings, leaves binary data as binary.

Still broken: get_holdings()/HIWPD

fints/client.py
fints/connection.py
fints/message.py

index 2f4fdcd64ad23bdc3404f50511ec19a73727b2f3..6aa40d28be838332458d3fc32ac48f080d94b0e6 100644 (file)
@@ -17,7 +17,7 @@ from .segments.depot import HKWPD
 from .segments.saldo import HKSAL
 from .segments.statement import HKKAZ
 from .segments.transfer import HKCCS, HKCCM
-from .utils import mt940_to_array, MT535_Miniparser, split_for_data_groups, split_for_data_elements, Password
+from .utils import mt940_to_array, MT535_Miniparser, Password
 
 logger = logging.getLogger(__name__)
 
@@ -56,11 +56,9 @@ class FinTS3Client:
         logger.debug('Got HKSPA response: {}'.format(resp))
         dialog.end()
 
-        accounts = resp._find_segment('HISPA')
-        accountlist = accounts.split('+')[1:]
+        seg = resp._find_segment('HISPA')
         self.accounts = []
-        for acc in accountlist:
-            arr = acc.split(':')
+        for arr in seg[1:]:
             self.accounts.append(SEPAAccount(
                 iban=arr[1], bic=arr[2], accountnumber=arr[3], subaccount=arr[4], blz=arr[6]
             ))
@@ -111,14 +109,11 @@ class FinTS3Client:
 
         logger.info('Fetching done.')
 
-        re_data = re.compile(r'[^@]*@([0-9]+)@(.+)', flags=re.MULTILINE | re.DOTALL)
         statement = []
         for resp in responses:
             seg = resp._find_segment('HIKAZ')
-            if seg:
-                m = re_data.match(seg)
-                if m:
-                    statement += mt940_to_array(m.group(2))
+            ## FIXME What is the encoding of MT940 messages?
+            statement += mt940_to_array(seg[1].decode('iso-8859-1'))
 
         logger.debug('Statement: {}'.format(statement))
 
@@ -130,11 +125,11 @@ class FinTS3Client:
 
         if hversion in (4, 5, 6):
             acc = ':'.join([
-                account.accountnumber, account.subaccount, str(280), account.blz
+                account.accountnumber, account.subaccount or '', str(280), account.blz
             ])
         elif hversion == 7:
             acc = ':'.join([
-                account.iban, account.bic, account.accountnumber, account.subaccount, str(280), account.blz
+                account.iban, account.bic, account.accountnumber, account.subaccount or '', str(280), account.blz
             ])
         else:
             raise ValueError('Unsupported HKKAZ version {}'.format(hversion))
@@ -177,7 +172,7 @@ class FinTS3Client:
 
         # find segment and split up to balance part
         seg = resp._find_segment('HISAL')
-        arr = split_for_data_elements(split_for_data_groups(seg)[4])
+        arr = seg[4]
 
         # get balance date
         date = datetime.datetime.strptime(arr[3], "%Y%m%d").date()
@@ -190,11 +185,11 @@ class FinTS3Client:
 
         if hversion in (1, 2, 3, 4, 5, 6):
             acc = ':'.join([
-                account.accountnumber, account.subaccount, str(280), account.blz
+                account.accountnumber, account.subaccount or '', str(280), account.blz
             ])
         elif hversion == 7:
             acc = ':'.join([
-                account.iban, account.bic, account.accountnumber, account.subaccount, str(280), account.blz
+                account.iban, account.bic, account.accountnumber, account.subaccount or '', str(280), account.blz
             ])
         else:
             raise ValueError('Unsupported HKSAL version {}'.format(hversion))
@@ -232,6 +227,8 @@ class FinTS3Client:
         # end dialog
         dialog.end()
 
+
+        ## FIXME BROKEN
         # find segment and split up to balance part
         seg = resp._find_segment('HIWPD')
         if seg:
@@ -249,11 +246,11 @@ class FinTS3Client:
 
         if hversion in (1, 2, 3, 4, 5, 6):
             acc = ':'.join([
-                account.accountnumber, account.subaccount, str(280), account.blz
+                account.accountnumber, account.subaccount or '', str(280), account.blz
             ])
         elif hversion == 7:
             acc = ':'.join([
-                account.iban, account.bic, account.accountnumber, account.subaccount, str(280), account.blz
+                account.iban, account.bic, account.accountnumber, account.subaccount or '', str(280), account.blz
             ])
         else:
             raise ValueError('Unsupported HKSAL version {}'.format(hversion))
@@ -428,20 +425,18 @@ class FinTS3Client:
 
     def _tan_requiring_response(self, dialog, resp):
         seg = resp._find_segment('HITAN')
-        s = split_for_data_groups(seg)
-        spl = split_for_data_elements(s[0])
-        if spl[2] == '3':
+        if seg[0][2] == '3':
             model = TANChallenge3
-        elif spl[2] == '4':
+        elif seg[0][2] == '4':
             model = TANChallenge4
-        elif spl[2] == '5':
+        elif seg[0][2] == '5':
             model = TANChallenge5
-        elif spl[2] == '6':
+        elif seg[0][2] == '6':
             model = TANChallenge6
         else:
             raise NotImplementedError(
                 "HITAN segment version {} is currently not implemented".format(
-                    spl[2]
+                    seg[0][2]
                 )
             )
         return model(dialog, *s[1:1 + len(model.args)])
@@ -481,9 +476,8 @@ class FinTS3Client:
         dialog.end()
 
         seg = resp._find_segment('HITAB')
-        deg = split_for_data_groups(seg)
 
-        return deg[2]
+        return seg[2]
 
 
 class FinTS3PinTanClient(FinTS3Client):
index 2a6c21294665bacbd0b9a2c074cbdcea22f3a258..dfd1ccf694868e3f59f0ff9a6079b2274b354f91 100644 (file)
@@ -19,4 +19,4 @@ class FinTSHTTPSConnection:
         )
         if r.status_code < 200 or r.status_code > 299:
             raise FinTSConnectionError('Bad status code {}'.format(r.status_code))
-        return base64.b64decode(r.content.decode('iso-8859-1')).decode('iso-8859-1')
+        return base64.b64decode(r.content.decode('iso-8859-1'))
index f7120a273e446cbd242354e6bf076982f587de0e..1c570cd2c7f675acc8806e94be4d9dd0e8652ebe 100644 (file)
+from enum import Enum
 import random
 import re
 
 from fints.models import TANMethod1, TANMethod2, TANMethod3, TANMethod4, TANMethod5, TANMethod6
-from fints.utils import split_for_data_groups, split_for_data_elements, fints_unescape
 from .segments.message import HNHBK, HNHBS, HNSHA, HNSHK, HNVSD, HNVSK
 
+TOKEN_RE = re.compile(rb"""
+                        ^(?:  (?: \? (?P<ECHAR>.) )
+                            | (?P<CHAR>[^?:+@']+)
+                            | (?P<TOK>[+:'])
+                            | (?: @ (?P<BINLEN>[0-9]+) @ )
+                         )""", re.X | re.S)
+
+class Token(Enum):
+    EOF = 'eof'
+    CHAR = 'char'
+    BINARY = 'bin'
+    PLUS = '+'
+    COLON = ':'
+    APOSTROPHE = "'"
+
+class ParserState:
+    def __init__(self, data: bytes, start=0, end=None, encoding='iso-8859-1'):
+        self._token = None
+        self._value = None
+        self._encoding = encoding
+        self._tokenizer = iter(self._tokenize(data, start, end or len(data), encoding))
+
+    def peek(self):
+        if not self._token:
+            self._token, self._value = next(self._tokenizer)
+        return self._token
+
+    def consume(self, token=None):
+        self.peek()
+        if token and token != self._token:
+            raise ValueError
+        self._token = None
+        return self._value
+
+    @staticmethod
+    def _tokenize(data, start, end, encoding):
+        pos = start
+        unclaimed = []
+        last_was = None
+        
+        while pos < end:
+            match = TOKEN_RE.match(data[pos:end])
+            if match:
+                pos += match.end()
+                d = match.groupdict()
+                if d['ECHAR'] is not None:
+                    unclaimed.append(d['ECHAR'])
+                elif d['CHAR'] is not None:
+                    unclaimed.append(d['CHAR'])
+                else:
+                    if unclaimed:
+                        if last_was in (Token.BINARY, Token.CHAR):
+                            raise ValueError
+                        yield Token.CHAR, b''.join(unclaimed).decode(encoding)
+                        unclaimed.clear()
+                        last_was = Token.CHAR
+
+                    if d['TOK'] is not None:
+                        token = Token(d['TOK'].decode('us-ascii'))
+                        yield token, d['TOK']
+                        last_was = token
+                    elif d['BINLEN'] is not None:
+                        blen = int(d['BINLEN'].decode('us-ascii'), 10)
+                        if last_was in (Token.BINARY, Token.CHAR):
+                            raise ValueError
+                        yield Token.BINARY, data[pos:pos+blen]
+                        pos += blen
+                        last_was = Token.BINARY
+                    else:
+                        raise ValueError
+            else:
+                raise ValueError
+
+        if unclaimed:
+            if last_was in (Token.BINARY, Token.CHAR):
+                raise ValueError
+            yield Token.CHAR, b''.join(unclaimed).decode(encoding)
+            unclaimed.clear()
+            last_was = Token.CHAR
+
+        yield Token.EOF, b''
+
+
+class FinTSMessageBase:
+    def __init__(self, *segments):
+        self.segments = []
+        for segment in segments:
+            self.add_segment(segment)
+
+    def add_segment(self, segment):
+        self.segments.append(segment)
+
+    @classmethod
+    def parse(cls, data: bytes, start=0, end=None):
+        return cls(*cls.parse_segments(data, start, end))
+
+    @classmethod
+    def parse_segments(cls, data: bytes, start=0, end=None):
+        segments = []
+
+        parser = ParserState(data, start, end)
+
+        while parser.peek() != Token.EOF:
+            segment = []
+            while parser.peek() not in (Token.APOSTROPHE, Token.EOF):
+                data = None
+                deg = []
+                while parser.peek() in (Token.BINARY, Token.CHAR, Token.COLON):
+                    if parser.peek() in (Token.BINARY, Token.CHAR):
+                        data = parser.consume()
+
+                    elif parser.peek() == Token.COLON:
+                        deg.append(data)
+                        data = None
+                        parser.consume(Token.COLON)
+
+                if data and deg:
+                    deg.append(data)
+                    data = deg
+
+                segment.append(data)
+                if parser.peek() == Token.PLUS:
+                    parser.consume(Token.PLUS)
+
+            parser.consume(Token.APOSTROPHE)
+            segments.append(segment)
+
+        parser.consume(Token.EOF)
+
+        return segments
 
 class FinTSMessage:
     def __init__(self, blz, username, pin, systemid, dialogid, msgno, encrypted_segments, tan_mechs=None, tan=None):
@@ -63,24 +194,16 @@ class FinTSMessage:
         return str(self.build_header()) + ''.join([str(s) for s in self.segments])
 
 
-class FinTSResponse:
-    RE_UNWRAP = re.compile('HNVSD:\d+:\d+\+@\d+@(.+)\'\'')
-    RE_SEGMENTS = re.compile("'(?=[A-Z]{4,}:\d|')")
-    RE_SYSTEMID = re.compile("HISYN:\d+:\d+:\d+\+(.+)")
-
+class FinTSResponse(FinTSMessageBase):
     def __init__(self, data):
-        self.response = self._unwrap(data)
-        self.segments = self.RE_SEGMENTS.split(data)
+        self.segments = self.parse_segments(data)
+        self.payload = self.segments
+        for seg in self.segments:
+            if seg[0][0] == 'HNVSD':
+                self.payload = self.parse_segments(seg[1])
 
     def __str__(self):
-        return self.response
-
-    def _unwrap(self, data):
-        m = self.RE_UNWRAP.match(data)
-        if m:
-            return m.group(1)
-        else:
-            return data
+        return str(self.payload)
 
     def is_success(self):
         summary = self.get_summary_by_segment('HIRMG')
@@ -89,32 +212,24 @@ class FinTSResponse:
                 return False
         return True
 
-    def _get_segment_index(self, idx, seg):
-        seg = split_for_data_groups(seg)
-        if len(seg) > idx - 1:
-            return seg[idx - 1]
-        return None
-
     def get_dialog_id(self):
         seg = self._find_segment('HNHBK')
         if not seg:
             raise ValueError('Invalid response, no HNHBK segment')
 
-        return self._get_segment_index(4, seg)
+        return seg[3]
 
     def get_bank_name(self):
         seg = self._find_segment('HIBPA')
         if seg:
-            parts = split_for_data_groups(seg)
-            if len(parts) > 3:
-                return parts[3]
+            if len(seg) > 3:
+                return seg[3]
 
     def get_systemid(self):
         seg = self._find_segment('HISYN')
-        m = self.RE_SYSTEMID.match(seg)
-        if not m:
+        if not seg:
             raise ValueError('Could not find systemid')
-        return m.group(1)
+        return seg[1]
 
     def get_summary_by_segment(self, name=None):
         if name and name not in ('HIRMS', 'HIRMG'):
@@ -127,9 +242,7 @@ class FinTSResponse:
         res = {}
         for name in names:
             seg = self._find_segment(name)
-            parts = split_for_data_groups(seg)[1:]
-            for de in parts:
-                de = split_for_data_elements(de)
+            for de in seg[1:]:
                 res[de[0]] = de[2]
         return res
 
@@ -143,40 +256,37 @@ class FinTSResponse:
         segs = self._find_segments('HIRMS')
         tan_methods = []
         for seg in segs:
-            deg = split_for_data_groups(seg)
-            for de in deg:
-                if de[0:4] == '3920':
-                    d = split_for_data_elements(de)
-                    for i in range(3, len(d)):
-                        tan_methods.append(d[i])
+            for deg in seg:
+                if deg[0] == '3920':
+                    tan_methods.extend( deg[3:] )
 
         # Get parameters for tan methods
-        seg = self._find_segments('HITANS')
+        segs = self._find_segments('HITANS')
         methods = []
-        for s in seg:
-            spl = split_for_data_elements(s)
-            if spl[2] == '1':
+        for seg in segs:
+            if seg[0][2] == '1':
                 model = TANMethod1
-            elif spl[2] == '2':
+            elif seg[0][2] == '2':
                 model = TANMethod2
-            elif spl[2] == '3':
+            elif seg[0][2] == '3':
                 model = TANMethod3
-            elif spl[2] == '4':
+            elif seg[0][2] == '4':
                 model = TANMethod4
-            elif spl[2] == '5':
+            elif seg[0][2] == '5':
                 model = TANMethod5
-            elif spl[2] == '6':
+            elif seg[0][2] == '6':
                 model = TANMethod6
             else:
                 raise NotImplementedError(
                     "HITANS segment version {} is currently not implemented".format(
-                        spl[2]
+                        seg[0][2]
                     )
                 )
 
             step = len(model.args)
-            for i in range(len(spl) // step):
-                part = spl[6 + i * step:6 + (i + 1) * step]
+            tan_params = seg[3][3:]
+            for i in range(len(tan_params) // step):
+                part = spl[i * step:(i + 1) * step]
                 method = model(*part)
                 if method.security_feature in tan_methods:
                     methods.append(method)
@@ -186,8 +296,8 @@ class FinTSResponse:
     def _find_segment_for_reference(self, name, ref):
         segs = self._find_segments(name)
         for seg in segs:
-            segsplit = split_for_data_elements(split_for_data_groups(seg)[0])
-            if segsplit[3] == str(ref.segmentno):
+            if len(seg[0]) < 4: continue
+            if seg[0][3] == str(ref.segmentno):
                 return seg
 
     def get_touchdowns(self, msg: FinTSMessage):
@@ -195,21 +305,16 @@ class FinTSResponse:
         for msgseg in msg.encrypted_segments:
             seg = self._find_segment_for_reference('HIRMS', msgseg)
             if seg:
-                parts = split_for_data_groups(seg)[1:]
-                for p in parts:
-                    psplit = split_for_data_elements(p)
-                    if psplit[0] == "3040":
-                        td = psplit[3]
-                        touchdown[msgseg.type] = fints_unescape(td)
+                for p in seg[1:]:
+                    if p[0] == "3040":
+                        touchdown[msgseg.type] = p[3]
         return touchdown
 
     def _get_segment_max_version(self, name):
         v = 3
         segs = self._find_segments(name)
         for s in segs:
-            parts = split_for_data_groups(s)
-            segheader = split_for_data_elements(parts[0])
-            curver = int(segheader[2])
+            curver = int(s[0][2])
             if curver > v:
                 v = curver
         return v
@@ -217,12 +322,14 @@ class FinTSResponse:
     def _find_segment(self, name):
         return self._find_segments(name, True)
 
-    def _find_segments(self, name, one=False):
-        found = [] if not one else ''
-        for s in self.segments:
-            spl = s.split(':', 1)
-            if spl[0] == name:
+    def _find_segments(self, name, one=False, in_payload=False):
+        found = []
+        for s in (self.payload if in_payload else self.segments):
+            if s[0][0] == name:
                 if one:
                     return s
                 found.append(s)
+        # FIXME Simple hack: Seach in inner message if no success in outer message
+        if not found and not in_payload:
+            return self._find_segments(name, one, in_payload=True)
         return found