+++ /dev/null
-from .connection import FinTSHTTPSConnection
-from .dialog import FinTSDialog
-
-
-class FinTS3Client:
- version = 300
-
-
-class FinTS3PinTanClient(FinTS3Client):
- def __init__(self, blz, username, pin, server):
- self.username = username
- self.blz = blz
- self.pin = pin
- self.connection = FinTSHTTPSConnection(server)
- self.systemid = 0
- super().__init__()
-
- def _new_dialog(self):
- dialog = FinTSDialog(self.blz, self.username, self.pin, self.systemid, self.connection)
- return dialog
-
- def get_sepa_accounts(self):
- dialog = self._new_dialog()
- dialog.sync()
-
+++ /dev/null
-import logging
-
-from .segments.auth import HKIDN, HKSYN, HKVVB
-from .message import FinTSMessage
-
-logger = logging.getLogger(__name__)
-
-
-class FinTSDialog:
- def __init__(self, blz, username, pin, systemid, connection):
- self.blz = blz
- self.username = username
- self.pin = pin
- self.systemid = systemid
- self.connection = connection
- self.msgno = 1
- self.dialogid = 0
-
- def sync(self):
- logger.info('Initialize SYNC')
-
- seg_identification = HKIDN(3, self.blz, self.username, 0)
- seg_prepare = HKVVB(4)
- seg_sync = HKSYN(5)
-
- msg_sync = FinTSMessage(self.blz, self.username, self.pin, self.systemid, self.dialogid, self.msgno, [
- seg_identification,
- seg_prepare,
- seg_sync
- ])
-
- logger.debug('Sending SYNC: {}'.format(msg_sync))
- resp = self.send(msg_sync)
- logger.debug('Got SYNC response: {}'.format(resp))
-
- def send(self, msg):
- logger.info('Sending Message')
- msg.msgno = self.msgno
- msg.dialogid = self.dialogid
-
- try:
- resp = self.connection.send(msg)
- self.msgno += 1
- return resp
- except:
- # TODO: Error handlign
- raise
+++ /dev/null
-import random
-
-from fints.protocol.fints3.segments.message import HNHBK, HNSHK, HNVSK, HNVSD, HNSHA, HNHBS
-
-
-class FinTSMessage:
- def __init__(self, blz, username, pin, systemid, dialogid, msgno, encrypted_segments):
- self.blz = blz
- self.username = username
- self.pin = pin
- self.systemid = systemid
- self.dialogid = dialogid
- self.msgno = msgno
- self.segments = []
- self.encrypted_segments = []
-
- sig_head = self.build_signature_head()
- enc_head = self.build_encryption_head()
- self.segments.append(enc_head)
-
- self.enc_envelop = HNVSD(999, '')
- self.segments.append(self.enc_envelop)
-
- self.append_enc_segment(sig_head)
- for s in encrypted_segments:
- self.append_enc_segment(s)
-
- cur_count = len(encrypted_segments) + 3
-
- sig_end = HNSHA(cur_count, self.secref, self.pin)
- self.append_enc_segment(sig_end)
- self.segments.append(HNHBS(cur_count + 1, msgno))
-
- def append_enc_segment(self, seg):
- self.encrypted_segments.append(seg)
- self.enc_envelop.set_data(self.enc_envelop.encoded_data + str(seg))
-
- def build_signature_head(self):
- rand = random.SystemRandom()
- self.secref = rand.randint(1000000, 9999999)
- return HNSHK(2, self.secref, self.blz, self.username, self.systemid)
-
- def build_encryption_head(self):
- return HNVSK(998, self.blz, self.username, self.systemid)
-
- def build_header(self):
- l = sum([len(str(s)) for s in self.segments])
- return HNHBK(l, self.dialogid, self.msgno)
-
- def __str__(self):
- return str(self.build_header()) + ''.join([str(s) for s in self.segments])
--- /dev/null
+import logging
+
+from .segments.accounts import HKSPA
+from .connection import FinTSHTTPSConnection
+from .dialog import FinTSDialog
+from .message import FinTSMessage
+from .models import SEPAAccount
+
+logger = logging.getLogger(__name__)
+
+
+class FinTS3Client:
+ version = 300
+
+ def __init__(self):
+ self.accounts = []
+
+ def _new_dialog(self):
+ raise NotImplemented()
+
+ def _new_message(self, dialog: FinTSDialog, segments):
+ raise NotImplemented()
+
+ def get_sepa_accounts(self):
+ dialog = self._new_dialog()
+ dialog.sync()
+ dialog.init()
+
+ msg_spa = self._new_message(dialog, [
+ HKSPA(3, None, None, None)
+ ])
+ logger.debug('Sending HKSPA: {}'.format(msg_spa))
+ resp = dialog.send(msg_spa)
+ logger.debug('Got HKSPA response: {}'.format(resp))
+ dialog.end()
+
+ accounts = resp._find_segment('HISPA')
+ accountlist = accounts.split('+')[1:]
+ self.accounts = []
+ for acc in accountlist:
+ arr = acc.split(':')
+ self.accounts.append(SEPAAccount(
+ iban=arr[1], bic=arr[2], accountnumber=arr[3], subaccount=arr[4], blz=arr[6]
+ ))
+
+ return self.accounts
+
+
+class FinTS3PinTanClient(FinTS3Client):
+ def __init__(self, blz, username, pin, server):
+ self.username = username
+ self.blz = blz
+ self.pin = pin
+ self.connection = FinTSHTTPSConnection(server)
+ self.systemid = 0
+ super().__init__()
+
+ def _new_dialog(self):
+ dialog = FinTSDialog(self.blz, self.username, self.pin, self.systemid, self.connection)
+ return dialog
+
+ def _new_message(self, dialog: FinTSDialog, segments):
+ return FinTSMessage(self.blz, self.username, self.pin, dialog.systemid, dialog.dialogid, dialog.msgno, segments)
+
import requests
-from fints.protocol.fints3.message import FinTSMessage
+from .message import FinTSMessage
class FinTSConnectionError(Exception):
--- /dev/null
+import logging
+
+from fints3.segments.auth import HKIDN, HKSYN, HKVVB
+from .message import FinTSMessage, FinTSResponse
+from .segments.dialog import HKEND
+
+logger = logging.getLogger(__name__)
+
+
+class FinTSDialogError(Exception):
+ pass
+
+
+class FinTSDialog:
+ def __init__(self, blz, username, pin, systemid, connection):
+ self.blz = blz
+ self.username = username
+ self.pin = pin
+ self.systemid = systemid
+ self.connection = connection
+ self.msgno = 1
+ self.dialogid = 0
+ self.hksalversion = 6
+ self.hkkazversion = 6
+ self.tan_mechs = []
+
+ def sync(self):
+ logger.info('Initialize SYNC')
+
+ seg_identification = HKIDN(3, self.blz, self.username, 0)
+ seg_prepare = HKVVB(4)
+ seg_sync = HKSYN(5)
+
+ msg_sync = FinTSMessage(self.blz, self.username, self.pin, self.systemid, self.dialogid, self.msgno, [
+ seg_identification,
+ seg_prepare,
+ seg_sync
+ ])
+
+ logger.debug('Sending SYNC: {}'.format(msg_sync))
+ resp = self.send(msg_sync)
+ logger.debug('Got SYNC response: {}'.format(resp))
+ self.systemid = resp.get_systemid()
+ self.dialogid = resp.get_dialog_id()
+ self.bankname = resp.get_bank_name()
+ self.hksalversion = resp.get_hksal_max_version()
+ self.hkkazversion = resp.get_hkkaz_max_version()
+ self.tan_mechs = resp.get_supported_tan_mechanisms()
+
+ logger.debug('Bank name: {}'.format(self.bankname))
+ logger.debug('System ID: {}'.format(self.systemid))
+ logger.debug('Dialog ID: {}'.format(self.dialogid))
+ logger.debug('HKKAZ max version: {}'.format(self.hkkazversion))
+ logger.debug('HKSAL max version: {}'.format(self.hksalversion))
+ logger.debug('TAN mechanisms: {}'.format(', '.join(self.tan_mechs)))
+ self.end()
+
+ def init(self):
+ logger.info('Initialize Dialog')
+
+ seg_identification = HKIDN(3, self.blz, self.username, 0)
+ seg_prepare = HKVVB(4)
+
+ msg_init = FinTSMessage(self.blz, self.username, self.pin, self.systemid, self.dialogid, self.msgno, [
+ seg_identification,
+ seg_prepare,
+ ])
+ logger.debug('Sending INIT: {}'.format(msg_init))
+ resp = self.send(msg_init)
+ logger.debug('Got INIT response: {}'.format(resp))
+
+ self.dialogid = resp.get_dialog_id()
+ logger.info('Received dialog ID: {}'.format(self.dialogid))
+
+ return self.dialogid
+
+ def end(self):
+ logger.info('Initialize END')
+
+ msg_end = FinTSMessage(self.blz, self.username, self.pin, self.systemid, self.dialogid, self.msgno, [
+ HKEND(3, self.dialogid)
+ ])
+ logger.debug('Sending END: {}'.format(msg_end))
+ resp = self.send(msg_end)
+ logger.debug('Got END response: {}'.format(resp))
+ logger.info('Resetting dialog ID and message number count')
+ self.dialogid = 0
+ self.msgno = 1
+ return resp
+
+ def send(self, msg):
+ logger.info('Sending Message')
+ msg.msgno = self.msgno
+ msg.dialogid = self.dialogid
+
+ try:
+ resp = FinTSResponse(self.connection.send(msg))
+ if not resp.is_success():
+ raise FinTSDialogError(resp.get_summary_by_segment('HIRMG'))
+ self.msgno += 1
+ return resp
+ except:
+ # TODO: Error handling
+ raise
--- /dev/null
+import random
+import re
+
+from fints3.segments.message import HNHBK, HNSHK, HNVSK, HNVSD, HNSHA, HNHBS
+
+
+class FinTSMessage:
+ def __init__(self, blz, username, pin, systemid, dialogid, msgno, encrypted_segments):
+ self.blz = blz
+ self.username = username
+ self.pin = pin
+ self.systemid = systemid
+ self.dialogid = dialogid
+ self.msgno = msgno
+ self.segments = []
+ self.encrypted_segments = []
+
+ sig_head = self.build_signature_head()
+ enc_head = self.build_encryption_head()
+ self.segments.append(enc_head)
+
+ self.enc_envelop = HNVSD(999, '')
+ self.segments.append(self.enc_envelop)
+
+ self.append_enc_segment(sig_head)
+ for s in encrypted_segments:
+ self.append_enc_segment(s)
+
+ cur_count = len(encrypted_segments) + 3
+
+ sig_end = HNSHA(cur_count, self.secref, self.pin)
+ self.append_enc_segment(sig_end)
+ self.segments.append(HNHBS(cur_count + 1, msgno))
+
+ def append_enc_segment(self, seg):
+ self.encrypted_segments.append(seg)
+ self.enc_envelop.set_data(self.enc_envelop.encoded_data + str(seg))
+
+ def build_signature_head(self):
+ rand = random.SystemRandom()
+ self.secref = rand.randint(1000000, 9999999)
+ return HNSHK(2, self.secref, self.blz, self.username, self.systemid)
+
+ def build_encryption_head(self):
+ return HNVSK(998, self.blz, self.username, self.systemid)
+
+ def build_header(self):
+ l = sum([len(str(s)) for s in self.segments])
+ return HNHBK(l, self.dialogid, self.msgno)
+
+ def __str__(self):
+ return str(self.build_header()) + ''.join([str(s) for s in self.segments])
+
+
+class FinTSResponse:
+ RE_UNWRAP = re.compile('HNVSD:\d+:\d+\+@\d+@(.+)\'\'')
+ RE_SEGMENTS = re.compile("'(?=[A-Z]{4,}:\d|')")
+ RE_SYSTEMID = re.compile("HISYN:\d+:\d+:\d+\+(.+)")
+ RE_TANMECH = re.compile('\d{3}')
+
+ def __init__(self, data):
+ self.response = self._unwrap(data)
+ self.segments = self.RE_SEGMENTS.split(data)
+
+ def __str__(self):
+ return self.response
+
+ def _unwrap(self, data):
+ m = self.RE_UNWRAP.match(data)
+ if m:
+ return m.group(1)
+ else:
+ return data
+
+ def is_success(self):
+ summary = self.get_summary_by_segment('HIRMG')
+ for code, msg in summary.items():
+ if code[0] == "9":
+ return False
+ return True
+
+ def _get_segment_index(self, idx, seg):
+ seg = seg.split('+')
+ if len(seg) > idx - 1:
+ return seg[idx - 1]
+ return None
+
+ def get_dialog_id(self):
+ seg = self._find_segment('HNHBK')
+ if not seg:
+ raise ValueError('Invalid response, no HNHBK segment')
+
+ return self._get_segment_index(4, seg)
+
+ def get_bank_name(self):
+ seg = self._find_segment('HIBPA')
+ if seg:
+ seg = seg.split('+')
+ if len(seg) > 3:
+ return seg[3]
+
+ def get_systemid(self):
+ seg = self._find_segment('HISYN')
+ m = self.RE_SYSTEMID.match(seg)
+ if not m:
+ raise ValueError('Could not find systemid')
+ return m.group(1)
+
+ def get_summary_by_segment(self, name):
+ if name not in ('HIRMS', 'HIRMG'):
+ raise ValueError('Unsupported segment for message summary')
+
+ res = {}
+ seg = self._find_segment(name)
+ seg = seg.split('+')[1:]
+ for de in seg:
+ de = de.split(':')
+ res[de[0]] = de[2]
+ return res
+
+ def get_hkkaz_max_version(self):
+ return self._get_segment_max_version('HIKAZS')
+
+ def get_hksal_max_version(self):
+ return self._get_segment_max_version('HISALS')
+
+ def get_supported_tan_mechanisms(self):
+ segs = self._find_segments('HIRMS')
+ for s in segs:
+ seg = s.split('+')[1:]
+ for s in seg:
+ id, msg = s.split('::', 1)
+ if id == "3920":
+ m = self.RE_TANMECH.search(msg)
+ if m:
+ return m.group(0)
+ return False
+
+ def _get_segment_max_version(self, name):
+ v = 3
+ segs = self._find_segments(name)
+ for s in segs:
+ parts = s.split('+')
+ segheader = parts[0].split(':')
+ curver = int(segheader[2])
+ if curver > v:
+ v = curver
+ return v
+
+ def _find_segment(self, name):
+ return self._find_segments(name, True)
+
+ def _find_segments(self, name, one=False):
+ found = []
+ for s in self.segments:
+ spl = s.split(':', 1)
+ if spl[0] == name:
+ if one:
+ return s
+ found.append(s)
+ return found
--- /dev/null
+from collections import namedtuple
+
+SEPAAccount = namedtuple('SEPAAccount', 'iban bic accountnumber subaccount blz')
--- /dev/null
+from . import FinTS3Segment
+
+
+class HKSPA(FinTS3Segment):
+ """
+ HKSPA (SEPA-Kontoverbindung anfordern)
+ Section C.10.1.3
+ """
+ type = 'HKSPA'
+ version = 1
+
+ def __init__(self, segno, accno, subaccfeature, blz):
+ data = [
+ ':'.join([
+ accno, subaccfeature,
+ self.country_code, blz
+ ]) if accno is not None else ''
+ ]
+ super().__init__(segno, data)
--- /dev/null
+from . import FinTS3Segment
+
+
+class HKEND(FinTS3Segment):
+ """
+ HKEND (Dialogende)
+ Section C.4.1.2
+ """
+ type = 'HKEND'
+ version = 1
+
+ def __init__(self, segno, dialogid):
+ data = [
+ dialogid,
+ ]
+ super().__init__(segno, data)