BankIdentifier, Language2, SynchronisationMode, SystemIDStatus, CUSTOMER_ID_ANONYMOUS,
)
from .message import (
- FinTSCustomerMessage, FinTSMessage, FinTSMessageOLD, MessageDirection,
+ FinTSCustomerMessage, FinTSMessage, MessageDirection,
)
from .segments.auth import HKIDN, HKIDN2, HKSYN, HKVVB, HKVVB3
from .segments.dialog import HKEND, HKEND1
for k, v in data_unpickled.items():
setattr(self, k, v)
-
-
-class FinTSDialogOLD:
- def __init__(self, blz, username, pin, systemid, connection):
- self.blz = blz
- self.username = username
- self.pin = pin
- self.systemid = systemid
- self.connection = connection
- self.msgno = 1
- self.dialogid = 0
- self.hksalversion = 6
- self.hkkazversion = 6
- self.tan_mechs = []
-
- def _get_msg_sync(self):
- seg_identification = HKIDN(3, self.blz, self.username, 0)
- seg_prepare = HKVVB(4)
- seg_sync = HKSYN(5)
-
- return FinTSMessageOLD(self.blz, self.username, self.pin, self.systemid, self.dialogid, self.msgno, [
- seg_identification,
- seg_prepare,
- seg_sync
- ])
-
- def _get_msg_init(self):
- seg_identification = HKIDN(3, self.blz, self.username, self.systemid)
- seg_prepare = HKVVB(4)
-
- return FinTSMessageOLD(self.blz, self.username, self.pin, self.systemid, self.dialogid, self.msgno, [
- seg_identification,
- seg_prepare,
- ], self.tan_mechs)
-
- def _get_msg_end(self):
- return FinTSMessageOLD(self.blz, self.username, self.pin, self.systemid, self.dialogid, self.msgno, [
- HKEND(3, self.dialogid)
- ])
-
- def sync(self):
- logger.info('Initialize SYNC')
-
- with self.pin.protect():
- logger.debug('Sending SYNC: {}'.format(self._get_msg_sync()))
-
- resp = self.send(self._get_msg_sync())
- logger.debug('Got SYNC response: {}'.format(resp))
- self.systemid = resp.get_systemid()
- self.dialogid = resp.get_dialog_id()
- self.bankname = resp.get_bank_name()
- self.hksalversion = resp.get_segment_max_version('HIKAZS')
- self.hkkazversion = resp.get_segment_max_version('HISALS')
- self.hktanversion = resp.get_segment_max_version('HKTAN')
- self.tan_mechs = resp.get_supported_tan_mechanisms()
-
- logger.debug('Bank name: {}'.format(self.bankname))
- logger.debug('System ID: {}'.format(self.systemid))
- logger.debug('Dialog ID: {}'.format(self.dialogid))
- logger.debug('HKKAZ max version: {}'.format(self.hkkazversion))
- logger.debug('HKSAL max version: {}'.format(self.hksalversion))
- logger.debug('HKTAN max version: {}'.format(self.hktanversion))
- logger.debug('TAN mechanisms: {}'.format(', '.join(str(t) for t in self.tan_mechs)))
- self.end()
-
- def init(self):
- logger.info('Initialize Dialog')
-
- with self.pin.protect():
- logger.debug('Sending INIT: {}'.format(self._get_msg_init()))
-
- res = self.send(self._get_msg_init())
- logger.debug('Got INIT response: {}'.format(res))
-
- self.dialogid = res.get_dialog_id()
- logger.info('Received dialog ID: {}'.format(self.dialogid))
-
- return self.dialogid
-
- def end(self):
- logger.info('Initialize END')
-
- with self.pin.protect():
- logger.debug('Sending END: {}'.format(self._get_msg_end()))
-
- resp = self.send(self._get_msg_end())
- logger.debug('Got END response: {}'.format(resp))
- logger.info('Resetting dialog ID and message number count')
- self.dialogid = 0
- self.msgno = 1
- return resp
-
- def send(self, msg):
- logger.info('Sending Message')
- msg.msgno = self.msgno
- msg.dialogid = self.dialogid
-
- try:
- resp = self.connection.send(msg)
- if not resp.is_success():
- raise FinTSDialogError(
- resp.get_summary_by_segment()
- )
- self.msgno += 1
- return resp
- except:
- # TODO: Error handling
- raise
from .formals import SegmentSequence
from .segments import HIBPA3, HIRMG2, HIRMS2, FinTS3Segment, HITANSBase
-from .segments.message import HNHBK3, HNHBS1, HNSHA, HNSHK, HNVSD, HNVSK
+from .segments.message import HNHBK3, HNHBS1
class MessageDirection(Enum):
class FinTSInstituteMessage(FinTSMessage):
DIRECTION = MessageDirection.FROM_INSTITUTE
-
-class FinTSMessageOLD:
- def __init__(self, blz, username, pin, systemid, dialogid, msgno, encrypted_segments, tan_mechs=None, tan=None):
- self.blz = blz
- self.username = username
- self.pin = pin
- self.tan = tan
- self.systemid = systemid
- self.dialogid = dialogid
- self.msgno = msgno
- self.segments = []
- self.encrypted_segments = []
-
- if tan_mechs and '999' not in [t.security_function for t in tan_mechs]:
- self.profile_version = 2
- self.security_function = tan_mechs[0].security_function
- else:
- self.profile_version = 1
- self.security_function = '999'
-
- sig_head = self.build_signature_head()
- enc_head = self.build_encryption_head()
- self.segments.append(enc_head)
-
- self.enc_envelop = HNVSD(999, '')
- self.segments.append(self.enc_envelop)
-
- self.append_enc_segment(sig_head)
- for s in encrypted_segments:
- self.append_enc_segment(s)
-
- cur_count = len(encrypted_segments) + 3
-
- sig_end = HNSHA(cur_count, self.secref, self.pin, self.tan)
- self.append_enc_segment(sig_end)
- self.segments.append(HNHBS(cur_count + 1, msgno))
-
- def append_enc_segment(self, seg):
- self.encrypted_segments.append(seg)
- self.enc_envelop.set_data(self.enc_envelop.encoded_data + str(seg))
-
- def build_signature_head(self):
- rand = random.SystemRandom()
- self.secref = rand.randint(1000000, 9999999)
- return HNSHK(2, self.secref, self.blz, self.username, self.systemid, self.profile_version,
- self.security_function)
-
- def build_encryption_head(self):
- return HNVSK(998, self.blz, self.username, self.systemid, self.profile_version)
-
- def build_header(self):
- l = sum([len(str(s)) for s in self.segments])
- return HNHBK(l, self.dialogid, self.msgno)
-
- def __str__(self):
- return str(self.build_header()) + ''.join([str(s) for s in self.segments])
-
-
-class FinTSResponse(SegmentSequence):
- def is_success(self):
- for seg in self.find_segments(HIRMG2):
- for response in seg.responses:
- if response.code.startswith('9'):
- return False
- return True
-
- def get_dialog_id(self):
- seg = self.find_segment_first(HNHBK3)
- if not seg:
- raise ValueError('Invalid response, no HNHBK segment')
-
- return seg.dialogue_id
-
- def get_bank_name(self):
- seg = self.find_segment_first(HIBPA3)
- if seg:
- return seg.bank_name
-
- def get_systemid(self):
- seg = self.find_segment_first(HISYN4)
- if not seg:
- raise ValueError('Could not find systemid')
- return seg.customer_system_id
-
- def get_segment_max_version(self, type):
- return max((seg.header.version for seg in self.find_segments(type)), default=3)
-
- def get_supported_tan_mechanisms(self):
- tan_methods = []
- for seg in self.find_segments(HIRMS2):
- for response in seg.responses:
- if response.code == '3920':
- tan_methods.extend( response.parameters )
-
- # Get parameters for tan methods
- methods = []
- for seg in self.find_segments(HITANSBase):
- for params in seg.parameter.twostep_parameters:
- if params.security_function in tan_methods:
- methods.append(params)
-
- return methods
-
- def _find_segment_for_reference(self, name, ref):
- for seg in self.find_segments(name):
- if seg.header.reference == int(str(ref.segmentno)):
- return seg
-
- def get_touchdowns(self, msg: FinTSMessageOLD):
- touchdown = {}
- for msgseg in msg.encrypted_segments:
- seg = self._find_segment_for_reference(HIRMS2, msgseg)
- if seg:
- for p in seg[1:]:
- if p[0] == "3040":
- touchdown[msgseg.type] = p[3]
- return touchdown
-from . import FinTS3Segment, FinTS3SegmentOLD
+from . import FinTS3Segment
from ..fields import DataElementGroupField
from ..formals import KTZ1, Account3
-
-class HKSPA(FinTS3SegmentOLD):
- """
- HKSPA (SEPA-Kontoverbindung anfordern)
- Section C.10.1.3
- """
- type = 'HKSPA'
- version = 1
-
- def __init__(self, segno, accno, subaccfeature, blz):
- data = [
- ':'.join([
- accno, subaccfeature,
- self.country_code, blz
- ]) if accno is not None else ''
- ]
- super().__init__(segno, data)
-
class HKSPA1(FinTS3Segment):
"""SEPA-Kontoverbindung anfordern, version 1
)
from fints.utils import fints_escape
-from . import FinTS3Segment, FinTS3SegmentOLD
-
-
-class HKIDN(FinTS3SegmentOLD):
- """
- HKIDN (Identifikation)
- Section C.3.1.2
- """
- type = 'HKIDN'
- version = 2
-
- def __init__(self, segmentno, blz, username, systemid=0, customerid=1):
- data = [
- '{}:{}'.format(self.country_code, blz),
- fints_escape(username),
- systemid,
- customerid
- ]
- super().__init__(segmentno, data)
+from . import FinTS3Segment
+
class HKIDN2(FinTS3Segment):
"""Identifikation, version 2
system_id_status = CodeField(enum=SystemIDStatus, length=1, _d="Kundensystem-Status")
-class HKVVB(FinTS3SegmentOLD):
- """
- HKVVB (Verarbeitungsvorbereitung)
- Section C.3.1.3
- """
- type = 'HKVVB'
- version = 3
-
- LANG_DE = 1
- LANG_EN = 2
- LANG_FR = 3
-
- PRODUCT_NAME = 'pyfints'
- PRODUCT_VERSION = '0.1'
-
- def __init__(self, segmentno, lang=LANG_DE):
- data = [
- 0, 0, lang, fints_escape(self.PRODUCT_NAME), fints_escape(self.PRODUCT_VERSION)
- ]
- super().__init__(segmentno, data)
-
class HKVVB3(FinTS3Segment):
"""Verarbeitungsvorbereitung, version 3
product_name = DataElementField(type='an', max_length=25, _d="Produktbezeichnung")
product_version = DataElementField(type='an', max_length=5, _d="Produktversion")
-class HKSYN(FinTS3SegmentOLD):
- """
- HKSYN (Synchronisation)
- Section C.8.1.2
- """
- type = 'HKSYN'
- version = 3
-
- SYNC_MODE_NEW_CUSTOMER_ID = 0
- SYNC_MODE_LAST_MSG_NUMBER = 1
- SYNC_MODE_SIGNATURE_ID = 2
-
- def __init__(self, segmentno, mode=SYNC_MODE_NEW_CUSTOMER_ID):
- data = [
- mode
- ]
- super().__init__(segmentno, data)
-
-
-class HKTAN(FinTS3SegmentOLD):
- """
- HKTAN (TAN-Verfahren festlegen)
- Section B.5.1
- """
- type = 'HKTAN'
-
- def __init__(self, segno, process, aref, medium, version):
- self.version = version
-
- if process not in ('2', '4'):
- raise NotImplementedError("HKTAN process {} currently not implemented.".format(process))
- if version not in (3, 4, 5, 6):
- raise NotImplementedError("HKTAN version {} currently not implemented.".format(version))
-
- if process == '4':
- if medium:
- if version == 3:
- data = [process, '', '', '', '', '', '', '', medium]
- elif version == 4:
- data = [process, '', '', '', '', '', '', '', '', medium]
- elif version == 5:
- data = [process, '', '', '', '', '', '', '', '', '', '', medium]
- elif version == 6:
- data = [process, '', '', '', '', '', '', '', '', '', medium]
- else:
- data = [process]
- elif process == '2':
- if version == 6:
- data = [process, '', '', '', aref, 'N']
- elif version == 5:
- data = [process, '', '', '', aref, '', 'N']
- elif version in (3, 4):
- data = [process, '', aref, '', 'N']
- super().__init__(segno, data)
-
class HKTAN3(FinTS3Segment):
"""Zwei-Schritt-TAN-Einreichung, version 3
challenge_valid_until = DataElementGroupField(type=ChallengeValidUntil, required=False, _d="Gültigkeitsdatum und -uhrzeit für Challenge")
tan_medium_name = DataElementField(type='an', max_length=32, required=False, _d="Bezeichnung des TAN-Mediums")
-
-class HKTAB(FinTS3SegmentOLD):
- """
- HKTAB (Verfügbarre TAN-Medien ermitteln)
- Section C.2.1.2
- """
- type = 'HKTAB'
-
- def __init__(self, segno):
- self.version = 5
- data = [
- '0', 'A'
- ]
- super().__init__(segno, data)
-
class HKTAB4(FinTS3Segment):
"""TAN-Generator/Liste anzeigen Bestand, version 4
from fints.fields import DataElementField, DataElementGroupField
from fints.formals import Account2, Account3
-from . import FinTS3Segment, FinTS3SegmentOLD
-
-
-class HKWPD(FinTS3SegmentOLD):
- """
- HKWPD (Depotaufstellung anfordern)
- Section C.4.3.1
- Example: HKWPD:3:7+23456::280:10020030+USD+2'
- """
- type = 'HKWPD'
-
- def __init__(self, segno, version, account):
- self.version = version
- data = [
- account
- # The spec allows the specification of currency and
- # quality of valuations, as shown in the docstring above.
- # However, both 1822direkt and CortalConsors reject the
- # call if these two are present, claiming an invalid input.
- # 'EUR' # Währung der Depotaufstellung"
- # 2 # Kursqualität
- ]
- super().__init__(segno, data)
+from . import FinTS3Segment
+
class HKWPD5(FinTS3Segment):
"""Depotaufstellung anfordern, version 5
-from . import FinTS3Segment, FinTS3SegmentOLD
+from . import FinTS3Segment
from ..fields import CodeField, DataElementField
from ..formals import SynchronisationMode
security_reference_signature_key = DataElementField(type='num', max_length=16, required=False, _d="Sicherheitsreferenznummer für Signierschlüssel")
security_reference_digital_signature = DataElementField(type='num', max_length=16, required=False, _d="Sicherheitsreferenznummer für Digitale Signatur")
-class HKEND(FinTS3SegmentOLD):
- """
- HKEND (Dialogende)
- Section C.4.1.2
- """
- type = 'HKEND'
- version = 1
-
- def __init__(self, segno, dialogid):
- data = [
- dialogid,
- ]
- super().__init__(segno, data)
-
class HKEND1(FinTS3Segment):
"""Dialogende, version 1
)
from fints.utils import fints_escape
-from . import FinTS3Segment, FinTS3SegmentOLD
+from . import FinTS3Segment
class HNHBK3(FinTS3Segment):
"Nachrichtenabschluss"
message_number = DataElementField(type='num', max_length=4, _d="Nachrichtennummer")
-
-class HNHBK(FinTS3SegmentOLD):
- """
- HNHBK (Nachrichtenkopf)
- Section B.5.2
- """
- type = 'HNHBK'
- version = 3
-
- HEADER_LENGTH = 29
-
- def __init__(self, msglen, dialogid, msgno):
-
- if len(str(msglen)) != 12:
- msglen = str(int(msglen) + self.HEADER_LENGTH + len(str(dialogid)) + len(str(msgno))).zfill(12)
-
- data = [
- msglen,
- 300,
- dialogid,
- msgno
- ]
- super().__init__(1, data)
-
-
-class HNSHK(FinTS3SegmentOLD):
- """
- HNSHK (Signaturkopf)
- Section B.5.1
- """
- type = 'HNSHK'
- version = 4
-
- SECURITY_FUNC = 999
- SECURITY_BOUNDARY = 1 # SHM
- SECURITY_SUPPLIER_ROLE = 1 # ISS
-
- def __init__(self, segno, secref, blz, username, systemid, profile_version, security_function=SECURITY_FUNC):
- data = [
- ':'.join(['PIN', str(profile_version)]),
- security_function,
- secref,
- self.SECURITY_BOUNDARY,
- self.SECURITY_SUPPLIER_ROLE,
- ':'.join(['1', '', fints_escape(str(systemid))]),
- 1,
- ':'.join(['1', time.strftime('%Y%m%d'), time.strftime('%H%M%S')]),
- ':'.join(['1', '999', '1']), # Negotiate hash algorithm
- ':'.join(['6', '10', '16']), # RSA mode
- ':'.join([str(self.country_code), blz, username, 'S', '0', '0']),
- ]
- super().__init__(segno, data)
-
-
-class HNVSK(FinTS3SegmentOLD):
- """
- HNVSK (Verschlüsslungskopf)
- Section B.5.3
- """
- type = 'HNVSK'
- version = 3
-
- COMPRESSION_NONE = 0
- SECURITY_SUPPLIER_ROLE = 1 # ISS
-
- def __init__(self, segno, blz, username, systemid, profile_version):
- data = [
- ':'.join(['PIN', str(profile_version)]),
- 998,
- self.SECURITY_SUPPLIER_ROLE,
- ':'.join(['1', '', fints_escape(str(systemid))]),
- ':'.join(['1', time.strftime('%Y%m%d'), time.strftime('%H%M%S')]),
- ':'.join(['2', '2', '13', '@8@00000000', '5', '1']), # Crypto algorithm
- ':'.join([str(self.country_code), blz, username, 'S', '0', '0']),
- self.COMPRESSION_NONE
- ]
- super().__init__(segno, data)
-
-
-class HNVSD(FinTS3SegmentOLD):
- """
- HNVSD (Verschlüsselte Daten)
- Section B.5.4
- """
- type = 'HNVSD'
- version = 1
-
- def __init__(self, segno, encoded_data):
- self.encoded_data = encoded_data
- data = [
- '@{}@{}'.format(len(encoded_data), encoded_data)
- ]
- super().__init__(segno, data)
-
- def set_data(self, encoded_data):
- self.encoded_data = encoded_data
- self.data = [
- '@{}@{}'.format(len(encoded_data), encoded_data)
- ]
-
-
-class HNSHA(FinTS3SegmentOLD):
- """
- HNSHA (Signaturabschluss)
- Section B.5.2
- """
- type = 'HNSHA'
- version = 2
-
- SECURITY_FUNC = 999
- SECURITY_BOUNDARY = 1 # SHM
- SECURITY_SUPPLIER_ROLE = 1 # ISS
- PINTAN_VERSION = 1 # 1-step
-
- def __init__(self, segno, secref, pin, tan=None):
- pintan = fints_escape(pin)
- if tan:
- pintan += ':' + fints_escape(tan)
- data = [
- secref,
- '',
- pintan,
- ]
- super().__init__(segno, data)
-
-
-class HNHBS(FinTS3SegmentOLD):
- """
- HNHBS (Nachrichtenabschluss)
- Section B.5.3
- """
- type = 'HNHBS'
- version = 1
-
- def __init__(self, segno, msgno):
- data = [
- str(msgno)
- ]
- super().__init__(segno, data)
-
class HNVSK3(FinTS3Segment):
"""Verschlüsselungskopf, version 3
KTI1, Account2, Account3, Amount1, Balance1, Balance2, Timestamp1,
)
-from . import FinTS3Segment, FinTS3SegmentOLD
+from . import FinTS3Segment
-class HKSAL(FinTS3SegmentOLD):
- """
- HKSAL (Konto Saldo anfordern)
- Section C.2.1.2
- """
- type = 'HKSAL'
-
- def __init__(self, segno, version, account):
- self.version = version
- data = [
- account,
- 'N'
- ]
- super().__init__(segno, data)
-
class HKSAL5(FinTS3Segment):
"""Saldenabfrage, version 5
KTI1, Account3, Account2
)
-from . import FinTS3Segment, FinTS3SegmentOLD
-
-
-class HKKAZ(FinTS3SegmentOLD):
- """
- HKKAZ (Kontoumsätze)
-
- Refs: http://www.hbci-zka.de/dokumente/spezifikation_deutsch/fintsv3/FinTS_3.0_Messages_Geschaeftsvorfaelle_2015-08-07_final_version.pdf
- Section C.2.1.1.1.2
- """
- type = 'HKKAZ'
-
- def __init__(self, segno, version, account, date_start, date_end, touchdown):
- self.version = version
-
- data = [
- account,
- 'N',
- date_start.strftime('%Y%m%d'),
- date_end.strftime('%Y%m%d'),
- '',
- fints_escape(touchdown) if touchdown is not None else ''
- ]
- super().__init__(segno, data)
+from . import FinTS3Segment
+
+
class HKKAZ5(FinTS3Segment):
"""Kontoumsätze anfordern/Zeitraum, version 5
KTI1, Amount1, BatchTransferParameter1
)
-from . import FinTS3Segment, ParameterSegment, FinTS3SegmentOLD
+from . import FinTS3Segment, ParameterSegment
from ..models import SEPAAccount
-class HKCCS(FinTS3SegmentOLD):
- """
- HKCCS (SEPA Überweisung übertragen)
- Section C.2.1.2
- """
- type = 'HKCCS'
-
- def __init__(self, segno, account: SEPAAccount, pain_msg):
- self.version = 1
- sepa_descriptor = 'urn?:iso?:std?:iso?:20022?:tech?:xsd?:pain.001.001.03'
- msg = ':'.join([
- account.iban,
- account.bic
- ])
- data = [
- msg,
- sepa_descriptor,
- '@{}@{}'.format(len(pain_msg), pain_msg)
- ]
- super().__init__(segno, data)
-
class HKCCS1(FinTS3Segment):
"""SEPA Einzelüberweisung, version 1
Source: FinTS Financial Transaction Services, Schnittstellenspezifikation, Messages -- Multibankfähige Geschäftsvorfälle """
parameter = DataElementGroupField(type=BatchTransferParameter1, _d="Parameter SEPA-Sammelüberweisung")
-
-
-class HKCCM(FinTS3SegmentOLD):
- """
- HKCCM (SEPA-Sammelüberweisung einreichen)
- Section C.10.3.1.1
- """
- type = 'HKCCM'
-
- def __init__(self, segno, account: SEPAAccount, pain_msg, control_sum, currency, book_as_single):
- self.version = 1
- sepa_descriptor = 'urn?:iso?:std?:iso?:20022?:tech?:xsd?:pain.001.001.03'
- msg = ':'.join([
- account.iban,
- account.bic
- ])
- data = [
- msg,
- str(control_sum).replace('.', ',') + ':' + currency,
- 'J' if book_as_single else '',
- sepa_descriptor,
- '@{}@{}'.format(len(pain_msg), pain_msg)
- ]
- super().__init__(segno, data)
import pytest
from conftest import TEST_MESSAGES
from fints.formals import SegmentSequence
-from fints.message import FinTSResponse
from fints.parser import FinTS3Parser, FinTSParserError, FinTSParserWarning
from fints.segments import FinTS3Segment