from .segments.saldo import HKSAL
from .segments.statement import HKKAZ
from .segments.transfer import HKCCS, HKCCM
-from .utils import mt940_to_array, MT535_Miniparser, split_for_data_groups, split_for_data_elements, Password
+from .utils import mt940_to_array, MT535_Miniparser, Password
logger = logging.getLogger(__name__)
logger.debug('Got HKSPA response: {}'.format(resp))
dialog.end()
- accounts = resp._find_segment('HISPA')
- accountlist = accounts.split('+')[1:]
+ seg = resp._find_segment('HISPA')
self.accounts = []
- for acc in accountlist:
- arr = acc.split(':')
+ for arr in seg[1:]:
self.accounts.append(SEPAAccount(
iban=arr[1], bic=arr[2], accountnumber=arr[3], subaccount=arr[4], blz=arr[6]
))
logger.info('Fetching done.')
- re_data = re.compile(r'[^@]*@([0-9]+)@(.+)', flags=re.MULTILINE | re.DOTALL)
statement = []
for resp in responses:
seg = resp._find_segment('HIKAZ')
- if seg:
- m = re_data.match(seg)
- if m:
- statement += mt940_to_array(m.group(2))
+ ## FIXME What is the encoding of MT940 messages?
+ statement += mt940_to_array(seg[1].decode('iso-8859-1'))
logger.debug('Statement: {}'.format(statement))
if hversion in (4, 5, 6):
acc = ':'.join([
- account.accountnumber, account.subaccount, str(280), account.blz
+ account.accountnumber, account.subaccount or '', str(280), account.blz
])
elif hversion == 7:
acc = ':'.join([
- account.iban, account.bic, account.accountnumber, account.subaccount, str(280), account.blz
+ account.iban, account.bic, account.accountnumber, account.subaccount or '', str(280), account.blz
])
else:
raise ValueError('Unsupported HKKAZ version {}'.format(hversion))
# find segment and split up to balance part
seg = resp._find_segment('HISAL')
- arr = split_for_data_elements(split_for_data_groups(seg)[4])
+ arr = seg[4]
# get balance date
date = datetime.datetime.strptime(arr[3], "%Y%m%d").date()
if hversion in (1, 2, 3, 4, 5, 6):
acc = ':'.join([
- account.accountnumber, account.subaccount, str(280), account.blz
+ account.accountnumber, account.subaccount or '', str(280), account.blz
])
elif hversion == 7:
acc = ':'.join([
- account.iban, account.bic, account.accountnumber, account.subaccount, str(280), account.blz
+ account.iban, account.bic, account.accountnumber, account.subaccount or '', str(280), account.blz
])
else:
raise ValueError('Unsupported HKSAL version {}'.format(hversion))
# end dialog
dialog.end()
+
+ ## FIXME BROKEN
# find segment and split up to balance part
seg = resp._find_segment('HIWPD')
if seg:
if hversion in (1, 2, 3, 4, 5, 6):
acc = ':'.join([
- account.accountnumber, account.subaccount, str(280), account.blz
+ account.accountnumber, account.subaccount or '', str(280), account.blz
])
elif hversion == 7:
acc = ':'.join([
- account.iban, account.bic, account.accountnumber, account.subaccount, str(280), account.blz
+ account.iban, account.bic, account.accountnumber, account.subaccount or '', str(280), account.blz
])
else:
raise ValueError('Unsupported HKSAL version {}'.format(hversion))
def _tan_requiring_response(self, dialog, resp):
seg = resp._find_segment('HITAN')
- s = split_for_data_groups(seg)
- spl = split_for_data_elements(s[0])
- if spl[2] == '3':
+ if seg[0][2] == '3':
model = TANChallenge3
- elif spl[2] == '4':
+ elif seg[0][2] == '4':
model = TANChallenge4
- elif spl[2] == '5':
+ elif seg[0][2] == '5':
model = TANChallenge5
- elif spl[2] == '6':
+ elif seg[0][2] == '6':
model = TANChallenge6
else:
raise NotImplementedError(
"HITAN segment version {} is currently not implemented".format(
- spl[2]
+ seg[0][2]
)
)
return model(dialog, *s[1:1 + len(model.args)])
dialog.end()
seg = resp._find_segment('HITAB')
- deg = split_for_data_groups(seg)
- return deg[2]
+ return seg[2]
class FinTS3PinTanClient(FinTS3Client):
+from enum import Enum
import random
import re
from fints.models import TANMethod1, TANMethod2, TANMethod3, TANMethod4, TANMethod5, TANMethod6
-from fints.utils import split_for_data_groups, split_for_data_elements, fints_unescape
from .segments.message import HNHBK, HNHBS, HNSHA, HNSHK, HNVSD, HNVSK
+TOKEN_RE = re.compile(rb"""
+ ^(?: (?: \? (?P<ECHAR>.) )
+ | (?P<CHAR>[^?:+@']+)
+ | (?P<TOK>[+:'])
+ | (?: @ (?P<BINLEN>[0-9]+) @ )
+ )""", re.X | re.S)
+
+class Token(Enum):
+ EOF = 'eof'
+ CHAR = 'char'
+ BINARY = 'bin'
+ PLUS = '+'
+ COLON = ':'
+ APOSTROPHE = "'"
+
+class ParserState:
+ def __init__(self, data: bytes, start=0, end=None, encoding='iso-8859-1'):
+ self._token = None
+ self._value = None
+ self._encoding = encoding
+ self._tokenizer = iter(self._tokenize(data, start, end or len(data), encoding))
+
+ def peek(self):
+ if not self._token:
+ self._token, self._value = next(self._tokenizer)
+ return self._token
+
+ def consume(self, token=None):
+ self.peek()
+ if token and token != self._token:
+ raise ValueError
+ self._token = None
+ return self._value
+
+ @staticmethod
+ def _tokenize(data, start, end, encoding):
+ pos = start
+ unclaimed = []
+ last_was = None
+
+ while pos < end:
+ match = TOKEN_RE.match(data[pos:end])
+ if match:
+ pos += match.end()
+ d = match.groupdict()
+ if d['ECHAR'] is not None:
+ unclaimed.append(d['ECHAR'])
+ elif d['CHAR'] is not None:
+ unclaimed.append(d['CHAR'])
+ else:
+ if unclaimed:
+ if last_was in (Token.BINARY, Token.CHAR):
+ raise ValueError
+ yield Token.CHAR, b''.join(unclaimed).decode(encoding)
+ unclaimed.clear()
+ last_was = Token.CHAR
+
+ if d['TOK'] is not None:
+ token = Token(d['TOK'].decode('us-ascii'))
+ yield token, d['TOK']
+ last_was = token
+ elif d['BINLEN'] is not None:
+ blen = int(d['BINLEN'].decode('us-ascii'), 10)
+ if last_was in (Token.BINARY, Token.CHAR):
+ raise ValueError
+ yield Token.BINARY, data[pos:pos+blen]
+ pos += blen
+ last_was = Token.BINARY
+ else:
+ raise ValueError
+ else:
+ raise ValueError
+
+ if unclaimed:
+ if last_was in (Token.BINARY, Token.CHAR):
+ raise ValueError
+ yield Token.CHAR, b''.join(unclaimed).decode(encoding)
+ unclaimed.clear()
+ last_was = Token.CHAR
+
+ yield Token.EOF, b''
+
+
+class FinTSMessageBase:
+ def __init__(self, *segments):
+ self.segments = []
+ for segment in segments:
+ self.add_segment(segment)
+
+ def add_segment(self, segment):
+ self.segments.append(segment)
+
+ @classmethod
+ def parse(cls, data: bytes, start=0, end=None):
+ return cls(*cls.parse_segments(data, start, end))
+
+ @classmethod
+ def parse_segments(cls, data: bytes, start=0, end=None):
+ segments = []
+
+ parser = ParserState(data, start, end)
+
+ while parser.peek() != Token.EOF:
+ segment = []
+ while parser.peek() not in (Token.APOSTROPHE, Token.EOF):
+ data = None
+ deg = []
+ while parser.peek() in (Token.BINARY, Token.CHAR, Token.COLON):
+ if parser.peek() in (Token.BINARY, Token.CHAR):
+ data = parser.consume()
+
+ elif parser.peek() == Token.COLON:
+ deg.append(data)
+ data = None
+ parser.consume(Token.COLON)
+
+ if data and deg:
+ deg.append(data)
+ data = deg
+
+ segment.append(data)
+ if parser.peek() == Token.PLUS:
+ parser.consume(Token.PLUS)
+
+ parser.consume(Token.APOSTROPHE)
+ segments.append(segment)
+
+ parser.consume(Token.EOF)
+
+ return segments
+
class FinTSMessage:
def __init__(self, blz, username, pin, systemid, dialogid, msgno, encrypted_segments, tan_mechs=None, tan=None):
return str(self.build_header()) + ''.join([str(s) for s in self.segments])
-class FinTSResponse:
- RE_UNWRAP = re.compile('HNVSD:\d+:\d+\+@\d+@(.+)\'\'')
- RE_SEGMENTS = re.compile("'(?=[A-Z]{4,}:\d|')")
- RE_SYSTEMID = re.compile("HISYN:\d+:\d+:\d+\+(.+)")
-
+class FinTSResponse(FinTSMessageBase):
def __init__(self, data):
- self.response = self._unwrap(data)
- self.segments = self.RE_SEGMENTS.split(data)
+ self.segments = self.parse_segments(data)
+ self.payload = self.segments
+ for seg in self.segments:
+ if seg[0][0] == 'HNVSD':
+ self.payload = self.parse_segments(seg[1])
def __str__(self):
- return self.response
-
- def _unwrap(self, data):
- m = self.RE_UNWRAP.match(data)
- if m:
- return m.group(1)
- else:
- return data
+ return str(self.payload)
def is_success(self):
summary = self.get_summary_by_segment('HIRMG')
return False
return True
- def _get_segment_index(self, idx, seg):
- seg = split_for_data_groups(seg)
- if len(seg) > idx - 1:
- return seg[idx - 1]
- return None
-
def get_dialog_id(self):
seg = self._find_segment('HNHBK')
if not seg:
raise ValueError('Invalid response, no HNHBK segment')
- return self._get_segment_index(4, seg)
+ return seg[3]
def get_bank_name(self):
seg = self._find_segment('HIBPA')
if seg:
- parts = split_for_data_groups(seg)
- if len(parts) > 3:
- return parts[3]
+ if len(seg) > 3:
+ return seg[3]
def get_systemid(self):
seg = self._find_segment('HISYN')
- m = self.RE_SYSTEMID.match(seg)
- if not m:
+ if not seg:
raise ValueError('Could not find systemid')
- return m.group(1)
+ return seg[1]
def get_summary_by_segment(self, name=None):
if name and name not in ('HIRMS', 'HIRMG'):
res = {}
for name in names:
seg = self._find_segment(name)
- parts = split_for_data_groups(seg)[1:]
- for de in parts:
- de = split_for_data_elements(de)
+ for de in seg[1:]:
res[de[0]] = de[2]
return res
segs = self._find_segments('HIRMS')
tan_methods = []
for seg in segs:
- deg = split_for_data_groups(seg)
- for de in deg:
- if de[0:4] == '3920':
- d = split_for_data_elements(de)
- for i in range(3, len(d)):
- tan_methods.append(d[i])
+ for deg in seg:
+ if deg[0] == '3920':
+ tan_methods.extend( deg[3:] )
# Get parameters for tan methods
- seg = self._find_segments('HITANS')
+ segs = self._find_segments('HITANS')
methods = []
- for s in seg:
- spl = split_for_data_elements(s)
- if spl[2] == '1':
+ for seg in segs:
+ if seg[0][2] == '1':
model = TANMethod1
- elif spl[2] == '2':
+ elif seg[0][2] == '2':
model = TANMethod2
- elif spl[2] == '3':
+ elif seg[0][2] == '3':
model = TANMethod3
- elif spl[2] == '4':
+ elif seg[0][2] == '4':
model = TANMethod4
- elif spl[2] == '5':
+ elif seg[0][2] == '5':
model = TANMethod5
- elif spl[2] == '6':
+ elif seg[0][2] == '6':
model = TANMethod6
else:
raise NotImplementedError(
"HITANS segment version {} is currently not implemented".format(
- spl[2]
+ seg[0][2]
)
)
step = len(model.args)
- for i in range(len(spl) // step):
- part = spl[6 + i * step:6 + (i + 1) * step]
+ tan_params = seg[3][3:]
+ for i in range(len(tan_params) // step):
+ part = spl[i * step:(i + 1) * step]
method = model(*part)
if method.security_feature in tan_methods:
methods.append(method)
def _find_segment_for_reference(self, name, ref):
segs = self._find_segments(name)
for seg in segs:
- segsplit = split_for_data_elements(split_for_data_groups(seg)[0])
- if segsplit[3] == str(ref.segmentno):
+ if len(seg[0]) < 4: continue
+ if seg[0][3] == str(ref.segmentno):
return seg
def get_touchdowns(self, msg: FinTSMessage):
for msgseg in msg.encrypted_segments:
seg = self._find_segment_for_reference('HIRMS', msgseg)
if seg:
- parts = split_for_data_groups(seg)[1:]
- for p in parts:
- psplit = split_for_data_elements(p)
- if psplit[0] == "3040":
- td = psplit[3]
- touchdown[msgseg.type] = fints_unescape(td)
+ for p in seg[1:]:
+ if p[0] == "3040":
+ touchdown[msgseg.type] = p[3]
return touchdown
def _get_segment_max_version(self, name):
v = 3
segs = self._find_segments(name)
for s in segs:
- parts = split_for_data_groups(s)
- segheader = split_for_data_elements(parts[0])
- curver = int(segheader[2])
+ curver = int(s[0][2])
if curver > v:
v = curver
return v
def _find_segment(self, name):
return self._find_segments(name, True)
- def _find_segments(self, name, one=False):
- found = [] if not one else ''
- for s in self.segments:
- spl = s.split(':', 1)
- if spl[0] == name:
+ def _find_segments(self, name, one=False, in_payload=False):
+ found = []
+ for s in (self.payload if in_payload else self.segments):
+ if s[0][0] == name:
if one:
return s
found.append(s)
+ # FIXME Simple hack: Seach in inner message if no success in outer message
+ if not found and not in_payload:
+ return self._find_segments(name, one, in_payload=True)
return found