from .segments.auth import HKTAB4, HKTAB5, HKTAN3, HKTAN5
from .segments.depot import HKWPD5, HKWPD6
from .segments.dialog import HISYN4, HKSYN3
-from .segments.debit import HKDSE1, HKDSE2, HKDME1, HKDME2, HKDSC1, HKDMC1, HKDBS1, HKDBS2, HKDMB1
+from .segments.debit import HKDSE1, HKDSE2, HKDME1, HKDME2, HKDSC1, HKDMC1, HKDBS1, HKDBS2, HKDMB1, DebitResponseBase
from .segments.saldo import HKSAL5, HKSAL6, HKSAL7
from .segments.statement import HKKAZ5, HKKAZ6, HKKAZ7, DKKKU2
from .segments.transfer import HKCCM1, HKCCS1
raise Exception("Invalid data blob data or version")
+class ResponseStatus(Enum):
+ UNKNOWN = 0
+ SUCCESS = 1
+ WARNING = 2
+ ERROR = 3
+
+RESPONSE_STATUS_MAPPING = {
+ '0': ResponseStatus.SUCCESS,
+ '3': ResponseStatus.WARNING,
+ '9': ResponseStatus.ERROR,
+}
+
+class TransactionResponse:
+ status = ResponseStatus
+ responses = list
+ data = dict
+
+ def __init__(self, response_message):
+ self.status = ResponseStatus.UNKNOWN
+ self.responses = []
+ self.data = {}
+
+ for hirms in response_message.find_segments(HIRMS2):
+ for resp in hirms.responses:
+ self.set_status_if_higher(RESPONSE_STATUS_MAPPING.get(resp.code[0], ResponseStatus.UNKNOWN))
+
+ def set_status_if_higher(self, status):
+ if status.value > self.status.value:
+ self.status = status
+
+ def __repr__(self):
+ return "<{o.__class__.__name__}(status={o.status!r}, responses={o.responses!r}, data={o.data!r})>".format(o=self)
+
class FinTS3Client:
def __init__(self, bank_identifier, user_id, customer_id=None, set_data=None):
self.accounts = []
if self._standing_dialog:
raise Exception("Cannot double __enter__() {}".format(self))
self._standing_dialog = self._get_dialog()
- self._standing_dialog.lazy_init = True # FIXME Inelegant
self._standing_dialog.__enter__()
def __exit__(self, exc_type, exc_value, traceback):
retval = {
'bank': {},
'accounts': [],
+ 'auth': {},
}
if self.bpa:
retval['bank']['name'] = self.bpa.bank_name
hispas = self.bpd.find_segment_first('HISPAS')
if hispas:
retval['bank']['supported_sepa_formats'] = list(hispas.parameter.supported_sepa_formats)
+ else:
+ retval['bank']['supported_sepa_formats'] = []
if self.upd.segments:
for upd in self.upd.find_segments('HIUPD'):
acc = {}
return responses
- def start_sepa_transfer(self, account: SEPAAccount, iban: str, bic: str,
+ def simple_sepa_transfer(self, account: SEPAAccount, iban: str, bic: str,
recipient_name: str, amount: Decimal, account_name: str, reason: str,
endtoend_id='NOTPROVIDED'):
"""
:param account_name: Sender account name
:param reason: Transfer reason
:param endtoend_id: End-to-end-Id (defaults to ``NOTPROVIDED``)
- :return: Returns either a NeedRetryResponse or status (True, False)
+ :return: Returns either a NeedRetryResponse or TransactionResponse
"""
config = {
"name": account_name,
:param currency: Transfer currency
:param book_as_single: Kindly ask the bank to put multiple transactions as separate lines on the bank statement (defaults to ``False``)
:param pain_descriptor: URN of the PAIN message schema used.
- :return: Returns either a NeedRetryResponse or status (True, False)
+ :return: Returns either a NeedRetryResponse or TransactionResponse
"""
with self._get_dialog() as dialog:
return self._send_with_possible_retry(dialog, seg, self._continue_sepa_transfer)
def _continue_sepa_transfer(self, command_seg, response):
- # FIXME Properly find return code
- return True
+ retval = TransactionResponse(response)
+
+ for seg in response.find_segments(HIRMS2):
+ for resp in seg.responses:
+ retval.set_status_if_higher(RESPONSE_STATUS_MAPPING.get(resp.code[0], ResponseStatus.UNKNOWN))
+ retval.responses.append(resp)
+
+ return retval
def sepa_debit(self, account: SEPAAccount, pain_message: str, multiple=False, cor1=False,
control_sum=None, currency='EUR', book_as_single=False,
:param currency: Debit currency
:param book_as_single: Kindly ask the bank to put multiple transactions as separate lines on the bank statement (defaults to ``False``)
:param pain_descriptor: URN of the PAIN message schema used.
- :return: Returns either a NeedRetryResponse or status (True, False) ## FIXME Task ID?
+ :return: Returns either a NeedRetryResponse or TransactionResponse (with data['task_id'] set, if available)
"""
with self._get_dialog() as dialog:
return self._send_with_possible_retry(dialog, seg, self._continue_sepa_debit)
def _continue_sepa_debit(self, command_seg, response):
- # FIXME Properly return something
- return True
+ retval = TransactionResponse(response)
+
+ for seg in response.find_segments(HIRMS2):
+ for resp in seg.responses:
+ retval.set_status_if_higher(RESPONSE_STATUS_MAPPING.get(resp.code[0], ResponseStatus.UNKNOWN))
+ retval.responses.append(resp)
+
+ for seg in response.find_segments(DebitResponseBase):
+ if seg.task_id:
+ retval.data['task_id'] = seg.task_id
+
+ if not 'task_id' in retval.data:
+ for seg in response.find_segments('HITAN'):
+ if hasattr(seg, 'task_reference') and seg.task_reference:
+ retval.data['task_id'] = seg.task_reference
+
+ return retval
def add_response_callback(self, cb):
# FIXME document
self._standing_dialog = None
class NeedTANResponse(NeedRetryResponse):
- def __init__(self, command_seg, hitan, resume_method=None):
+ def __init__(self, command_seg, tan_request, resume_method=None, challenge_structured=False):
self.command_seg = command_seg
- self.hitan = hitan
+ self.tan_request = tan_request
+ self.tan_request_structured = challenge_structured
if hasattr(resume_method, '__func__'):
self.resume_method = resume_method.__func__.__name__
else:
self.resume_method = resume_method
+ def __repr__(self):
+ return '<o.__class__.__name__(command_seg={o.command_seg!r}, tan_request={o.tan_request!r})>'.format(o=self)
+
@classmethod
def _from_data_v1(cls, data):
if data["version"] == 1:
segs = SegmentSequence(data['segments_bin']).segments
- return cls(segs[0], segs[1], data['resume_method'])
+ return cls(segs[0], segs[1], data['resume_method'], data['challenge_structured'])
raise Exception("Wrong blob data version")
data = {
"_class_name": self.__class__.__name__,
"version": 1,
- "segments_bin": SegmentSequence([self.command_seg, self.hitan]).render_bytes(),
+ "segments_bin": SegmentSequence([self.command_seg, self.tan_request]).render_bytes(),
"resume_method": self.resume_method,
+ "challenge_structured": self.challenge_structured,
}
return compress_datablob(DATA_BLOB_MAGIC_RETRY, 1, data)
for resp in response.responses(tan_seg):
if resp.code == '0030':
- return NeedTANResponse(command_seg, response.find_segment_first('HITAN'), resume_func)
+ return NeedTANResponse(command_seg, response.find_segment_first('HITAN'), resume_func, self.is_challenge_structured())
if resp.code.startswith('9'):
raise Exception("Error response: {!r}".format(response))
else:
return resume_func(command_seg, response)
+ def is_challenge_structured(self):
+ param = self.get_tan_mechanisms()[self.get_current_tan_mechanism()]
+ if hasattr(param, 'challenge_structured'):
+ return param.challenge_structured
+ return False
+
def send_tan(self, challenge: NeedTANResponse, tan: str):
"""
Sends a TAN to confirm a pending operation.
"""
with self._get_dialog() as dialog:
- tan_seg = self._get_tan_segment(challenge.command_seg, '2', challenge.hitan)
+ tan_seg = self._get_tan_segment(challenge.command_seg, '2', challenge.tan_request)
self._pending_tan = tan
response = dialog.send(tan_seg)