from .segments.statement import HKKAZ5, HKKAZ6, HKKAZ7
from .segments.transfer import HKCCM1, HKCCS1
from .types import SegmentSequence
-from .utils import MT535_Miniparser, Password, mt940_to_array, compress_datablob, decompress_datablob
+from .utils import MT535_Miniparser, Password, mt940_to_array, compress_datablob, decompress_datablob, SubclassesMixin
from .parser import FinTS3Serializer
logger = logging.getLogger(__name__)
SYSTEM_ID_UNASSIGNED = '0'
DATA_BLOB_MAGIC = b'python-fints_DATABLOB'
+DATA_BLOB_MAGIC_RETRY = b'python-fints_RETRY_DATABLOB'
-class NeedRetryResponse:
- pass
+class NeedRetryResponse(SubclassesMixin):
+ @classmethod
+ def from_data(cls, blob):
+ version, data = decompress_datablob(DATA_BLOB_MAGIC_RETRY, blob)
+
+ if version == 1:
+ for clazz in cls._all_subclasses():
+ if clazz.__name__ == data["_class_name"]:
+ return clazz._from_data_v1(data)
+
+ raise Exception("Invalid data blob data or version")
class FinTS3Client:
def __init__(self, bank_identifier, user_id, customer_id=None, set_data=None):
def set_data(self, blob):
# FIXME Test, document
- decompress_datablob(DATA_BLOB_MAGIC, self, blob)
+ decompress_datablob(DATA_BLOB_MAGIC, blob, self)
def _log_response(self, segment, response):
if response.code[0] in ('0', '1'):
self.command_seg = command_seg
self.hitan = hitan
+ @classmethod
+ def _from_data_v1(cls, data):
+ if data["version"] == 1:
+ segs = SegmentSequence(data['segments_bin']).segments
+ return cls(segs[0], segs[1])
+
+ raise Exception("Wrong blob data version")
+
+ def get_data(self):
+ data = {
+ "_class_name": self.__class__.__name__,
+ "version": 1,
+ "segments_bin": SegmentSequence([self.command_seg, self.hitan]).render_bytes(),
+ }
+ return compress_datablob(DATA_BLOB_MAGIC_RETRY, 1, data)
+
+
+# Note: Implementing HKTAN#6 implies support for Strong Customer Authentication (SCA)
+# which may require TANs for many more operations including dialog initialization.
+# We do not currently support that.
+IMPLEMENTED_HKTAN_VERSIONS = {
+ 3: HKTAN3,
+ 5: HKTAN5,
+}
+
class FinTS3PinTanClient(FinTS3Client):
def __init__(self, bank_identifier, user_id, pin, server, customer_id=None, *args, **kwargs):
tan_mechanism = self.get_tan_mechanisms()[self.get_current_tan_mechanism()]
hitans = self.bpd.find_segment_first('HITANS', tan_mechanism.VERSION)
- hktan = {
- 3: HKTAN3,
- 5: HKTAN5,
- }.get(tan_mechanism.VERSION)
+ hktan = IMPLEMENTED_HKTAN_VERSIONS.get(tan_mechanism.VERSION)
seg = hktan(tan_process=tan_process)
"""
Get the available TAN mechanisms.
- :return: Dictionary of security_function: TwoStepParameters[1-5] objects.
+ Note: Only checks for HITANS versions listed in IMPLEMENTED_HKTAN_VERSIONS.
+
+ :return: Dictionary of security_function: TwoStepParameters objects.
"""
retval = OrderedDict()
- for version in range(1, 6):
+ for version in sorted(IMPLEMENTED_HKTAN_VERSIONS.keys()):
for seg in self.bpd.find_segments('HITANS', version):
for parameter in seg.parameter.twostep_parameters:
if parameter.security_function in self.allowed_security_functions:
return b';'.join([magic, b'1', str(version).encode('us-ascii'), compressed])
-def decompress_datablob(magic: bytes, obj: object, blob: bytes):
+def decompress_datablob(magic: bytes, blob: bytes, obj: object = None):
if not blob.startswith(magic):
raise ValueError("Incorrect data blob")
s = blob.split(b';', 3)
if encoding_version != 1:
raise ValueError("Unsupported encoding version {}".format(encoding_version))
- setfunc = getattr(obj, "_set_data_v{}".format(blob_version), None)
- if not setfunc:
- raise ValueError("Unknown data blob version")
-
decompressed = zlib.decompress(s[3])
data = json.loads(decompressed.decode('utf-8'))
for k, v in data.items():
if v:
data[k] = base64.b64decode(v.encode('us-ascii'))
- setfunc(data)
+ if obj:
+ setfunc = getattr(obj, "_set_data_v{}".format(blob_version), None)
+ if not setfunc:
+ raise ValueError("Unknown data blob version")
+
+ setfunc(data)
+ else:
+ return blob_version, data
class SubclassesMixin: