--- /dev/null
+#!/usr/bin/python3
+#
+# Example nfcpy to wpa_supplicant wrapper for DPP NFC operations
+# Copyright (c) 2012-2013, Jouni Malinen <j@w1.fi>
+# Copyright (c) 2019-2020, The Linux Foundation
+#
+# This software may be distributed under the terms of the BSD license.
+# See README for more details.
+
+import os
+import sys
+import time
+import threading
+import argparse
+
+import nfc
+import ndef
+
+import logging
+
+scriptsdir = os.path.dirname(os.path.realpath("dpp-nfc.py"))
+sys.path.append(os.path.join(scriptsdir, '..', '..', 'wpaspy'))
+import wpaspy
+
+wpas_ctrl = '/var/run/wpa_supplicant'
+ifname = None
+init_on_touch = False
+in_raw_mode = False
+prev_tcgetattr = 0
+no_input = False
+srv = None
+continue_loop = True
+terminate_now = False
+summary_file = None
+success_file = None
+
+def summary(txt):
+ print(txt)
+ if summary_file:
+ with open(summary_file, 'a') as f:
+ f.write(txt + "\n")
+
+def success_report(txt):
+ summary(txt)
+ if success_file:
+ with open(success_file, 'a') as f:
+ f.write(txt + "\n")
+
+def wpas_connect():
+ ifaces = []
+ if os.path.isdir(wpas_ctrl):
+ try:
+ ifaces = [os.path.join(wpas_ctrl, i) for i in os.listdir(wpas_ctrl)]
+ except OSError as error:
+ print("Could not find wpa_supplicant: ", error)
+ return None
+
+ if len(ifaces) < 1:
+ print("No wpa_supplicant control interface found")
+ return None
+
+ for ctrl in ifaces:
+ if ifname:
+ if ifname not in ctrl:
+ continue
+ try:
+ print("Trying to use control interface " + ctrl)
+ wpas = wpaspy.Ctrl(ctrl)
+ return wpas
+ except Exception as e:
+ pass
+ return None
+
+def dpp_nfc_uri_process(uri):
+ wpas = wpas_connect()
+ if wpas is None:
+ return False
+ peer_id = wpas.request("DPP_NFC_URI " + uri)
+ if "FAIL" in peer_id:
+ print("Could not parse DPP URI from NFC URI record")
+ return False
+ peer_id = int(peer_id)
+ print("peer_id=%d" % peer_id)
+ cmd = "DPP_AUTH_INIT peer=%d" % peer_id
+ res = wpas.request(cmd)
+ if "OK" not in res:
+ print("Failed to initiate DPP Authentication")
+ return False
+ print("DPP Authentication initiated")
+ return True
+
+def dpp_hs_tag_read(record):
+ wpas = wpas_connect()
+ if wpas is None:
+ return False
+ print(record)
+ if len(record.data) < 5:
+ print("Too short DPP HS")
+ return False
+ if record.data[0] != 0:
+ print("Unexpected URI Identifier Code")
+ return False
+ uribuf = record.data[1:]
+ try:
+ uri = uribuf.decode()
+ except:
+ print("Invalid URI payload")
+ return False
+ print("URI: " + uri)
+ if not uri.startswith("DPP:"):
+ print("Not a DPP URI")
+ return False
+ return dpp_nfc_uri_process(uri)
+
+def get_status(wpas, extra=None):
+ if extra:
+ extra = "-" + extra
+ else:
+ extra = ""
+ res = wpas.request("STATUS" + extra)
+ lines = res.splitlines()
+ vals = dict()
+ for l in lines:
+ try:
+ [name, value] = l.split('=', 1)
+ except ValueError:
+ logger.info("Ignore unexpected status line: " + l)
+ continue
+ vals[name] = value
+ return vals
+
+def get_status_field(wpas, field, extra=None):
+ vals = get_status(wpas, extra)
+ if field in vals:
+ return vals[field]
+ return None
+
+def own_addr(wpas):
+ return get_status_field(wpas, "address")
+
+def dpp_bootstrap_gen(wpas, type="qrcode", chan=None, mac=None, info=None,
+ curve=None, key=None):
+ cmd = "DPP_BOOTSTRAP_GEN type=" + type
+ if chan:
+ cmd += " chan=" + chan
+ if mac:
+ if mac is True:
+ mac = own_addr(wpas)
+ cmd += " mac=" + mac.replace(':', '')
+ if info:
+ cmd += " info=" + info
+ if curve:
+ cmd += " curve=" + curve
+ if key:
+ cmd += " key=" + key
+ res = wpas.request(cmd)
+ if "FAIL" in res:
+ raise Exception("Failed to generate bootstrapping info")
+ return int(res)
+
+def wpas_get_nfc_uri(start_listen=True):
+ wpas = wpas_connect()
+ if wpas is None:
+ return None
+ global own_id
+ own_id = dpp_bootstrap_gen(wpas, type="nfc-uri", chan="81/1", mac=True)
+ res = wpas.request("DPP_BOOTSTRAP_GET_URI %d" % own_id).rstrip()
+ if "FAIL" in res:
+ return None
+ if start_listen:
+ wpas.request("DPP_LISTEN 2412 netrole=configurator")
+ return res
+
+def wpas_report_handover_req(uri):
+ wpas = wpas_connect()
+ if wpas is None:
+ return None
+ global own_id
+ cmd = "DPP_NFC_HANDOVER_REQ own=%d uri=%s" % (own_id, uri)
+ return wpas.request(cmd)
+
+def wpas_report_handover_sel(uri):
+ wpas = wpas_connect()
+ if wpas is None:
+ return None
+ global own_id
+ cmd = "DPP_NFC_HANDOVER_SEL own=%d uri=%s" % (own_id, uri)
+ return wpas.request(cmd)
+
+def dpp_handover_client(llc):
+ uri = wpas_get_nfc_uri(start_listen=False)
+ uri = ndef.UriRecord(uri)
+ print("NFC URI record for DPP: " + str(uri))
+ carrier = ndef.Record('application/vnd.wfa.dpp', 'A', uri.data)
+ hr = ndef.HandoverRequestRecord(version="1.4", crn=os.urandom(2))
+ hr.add_alternative_carrier('active', carrier.name)
+ message = [hr, carrier]
+ print("NFC Handover Request message for DPP: " + str(message))
+
+ client = nfc.handover.HandoverClient(llc)
+ try:
+ summary("Trying to initiate NFC connection handover")
+ client.connect()
+ summary("Connected for handover")
+ except nfc.llcp.ConnectRefused:
+ summary("Handover connection refused")
+ client.close()
+ return
+ except Exception as e:
+ summary("Other exception: " + str(e))
+ client.close()
+ return
+
+ summary("Sending handover request")
+
+ if not client.send_records(message):
+ summary("Failed to send handover request")
+ client.close()
+ return
+
+ summary("Receiving handover response")
+ message = client.recv_records(timeout=3.0)
+ if message is None:
+ summary("No response received")
+ client.close()
+ return
+ print("Received message: " + str(message))
+ if len(message) < 1 or \
+ not isinstance(message[0], ndef.HandoverSelectRecord):
+ summary("Response was not Hs - received: " + message.type)
+ client.close()
+ return
+
+ print("Received message")
+ print("alternative carriers: " + str(message[0].alternative_carriers))
+
+ for carrier in message:
+ if isinstance(carrier, ndef.HandoverSelectRecord):
+ continue
+ print("Remote carrier type: " + carrier.type)
+ if carrier.type == "application/vnd.wfa.dpp":
+ if len(carrier.data) == 0 or carrier.data[0] != 0:
+ print("URI Identifier Code 'None' not seen")
+ continue
+ print("DPP carrier type match - send to wpa_supplicant")
+ uri = carrier.data[1:].decode("utf-8")
+ print("DPP URI: " + uri)
+ res = wpas_report_handover_sel(uri)
+ if res is None or "FAIL" in res:
+ summary("DPP handover report rejected")
+ break
+
+ success_report("DPP handover reported successfully (initiator)")
+ print("peer_id=" + res)
+ peer_id = int(res)
+ # TODO: Single Configurator instance
+ wpas = wpas_connect()
+ if wpas is None:
+ break
+ res = wpas.request("DPP_CONFIGURATOR_ADD")
+ if "FAIL" in res:
+ print("Failed to initiate Configurator")
+ break
+ conf_id = int(res)
+ global own_id
+ print("Initiate DPP authentication")
+ cmd = "DPP_AUTH_INIT peer=%d own=%d conf=sta-dpp configurator=%d" % (peer_id, own_id, conf_id)
+ res = wpas.request(cmd)
+ if "FAIL" in res:
+ print("Failed to initiate DPP authentication")
+ break
+
+ print("Remove peer")
+ client.close()
+ print("Done with handover")
+ global only_one
+ if only_one:
+ print("only_one -> stop loop")
+ global continue_loop
+ continue_loop = False
+
+ global no_wait
+ if no_wait:
+ print("Trying to exit..")
+ global terminate_now
+ terminate_now = True
+
+ print("Returning from dpp_handover_client")
+
+class HandoverServer(nfc.handover.HandoverServer):
+ def __init__(self, llc):
+ super(HandoverServer, self).__init__(llc)
+ self.sent_carrier = None
+ self.ho_server_processing = False
+ self.success = False
+
+ def process_handover_request_message(self, records):
+ self.ho_server_processing = True
+ clear_raw_mode()
+ print("\nHandoverServer - request received: " + str(records))
+
+ carrier = None
+ hs = ndef.HandoverSelectRecord('1.4')
+ sel = [hs]
+
+ found = False
+
+ for carrier in records:
+ if isinstance(carrier, ndef.HandoverRequestRecord):
+ continue
+ print("Remote carrier type: " + carrier.type)
+ if carrier.type == "application/vnd.wfa.dpp":
+ print("DPP carrier type match - add DPP carrier record")
+ if len(carrier.data) == 0 or carrier.data[0] != 0:
+ print("URI Identifier Code 'None' not seen")
+ continue
+ uri = carrier.data[1:].decode("utf-8")
+ print("Received DPP URI: " + uri)
+
+ data = wpas_get_nfc_uri(start_listen=False)
+ print("Own URI (pre-processing): %s" % data)
+
+ res = wpas_report_handover_req(uri)
+ if res is None or "FAIL" in res:
+ print("DPP handover request processing failed")
+ continue
+
+ found = True
+ self.received_carrier = carrier
+
+ wpas = wpas_connect()
+ if wpas is None:
+ continue
+ global own_id
+ data = wpas.request("DPP_BOOTSTRAP_GET_URI %d" % own_id).rstrip()
+ if "FAIL" in data:
+ continue
+ print("Own URI (post-processing): %s" % data)
+ uri = ndef.UriRecord(data)
+ print("Own bootstrapping NFC URI record: " + str(uri))
+
+ info = wpas.request("DPP_BOOTSTRAP_INFO %d" % own_id)
+ freq = None
+ for line in info.splitlines():
+ if line.startswith("use_freq="):
+ freq = int(line.split('=')[1])
+ if freq is None:
+ print("No channel negotiated over NFC - use channel 1")
+ freq = 2412
+ res = wpas.request("DPP_LISTEN %d" % freq)
+ if "OK" not in res:
+ print("Failed to start DPP listen")
+ break
+
+ carrier = ndef.Record('application/vnd.wfa.dpp', 'A', uri.data)
+ print("Own DPP carrier record: " + str(carrier))
+ hs.add_alternative_carrier('active', carrier.name)
+ sel = [hs, carrier]
+ break
+
+ summary("Sending handover select: " + str(sel))
+ self.success = True
+ return sel
+
+def clear_raw_mode():
+ import sys, tty, termios
+ global prev_tcgetattr, in_raw_mode
+ if not in_raw_mode:
+ return
+ fd = sys.stdin.fileno()
+ termios.tcsetattr(fd, termios.TCSADRAIN, prev_tcgetattr)
+ in_raw_mode = False
+
+def getch():
+ import sys, tty, termios, select
+ global prev_tcgetattr, in_raw_mode
+ fd = sys.stdin.fileno()
+ prev_tcgetattr = termios.tcgetattr(fd)
+ ch = None
+ try:
+ tty.setraw(fd)
+ in_raw_mode = True
+ [i, o, e] = select.select([fd], [], [], 0.05)
+ if i:
+ ch = sys.stdin.read(1)
+ finally:
+ termios.tcsetattr(fd, termios.TCSADRAIN, prev_tcgetattr)
+ in_raw_mode = False
+ return ch
+
+def dpp_tag_read(tag):
+ success = False
+ for record in tag.ndef.records:
+ print(record)
+ print("record type " + record.type)
+ if record.type == "application/vnd.wfa.dpp":
+ summary("DPP HS tag - send to wpa_supplicant")
+ success = dpp_hs_tag_read(record)
+ break
+ if isinstance(record, ndef.UriRecord):
+ print("URI record: uri=" + record.uri)
+ print("URI record: iri=" + record.iri)
+ if record.iri.startswith("DPP:"):
+ print("DPP URI")
+ if not dpp_nfc_uri_process(record.iri):
+ break
+ success = True
+ else:
+ print("Ignore unknown URI")
+ break
+
+ if success:
+ success_report("Tag read succeeded")
+
+ return success
+
+def rdwr_connected_write_tag(tag):
+ summary("Tag found - writing - " + str(tag))
+ if not tag.ndef.is_writeable:
+ print("Not a writable tag")
+ return
+ global dpp_tag_data
+ if tag.ndef.capacity < len(dpp_tag_data):
+ print("Not enough room for the message")
+ return
+ tag.ndef.records = dpp_tag_data
+ success_report("Tag write succeeded")
+ print("Done - remove tag")
+ global only_one
+ if only_one:
+ global continue_loop
+ continue_loop = False
+ global dpp_sel_wait_remove
+ return dpp_sel_wait_remove
+
+def write_nfc_uri(clf, wait_remove=True):
+ print("Write NFC URI record")
+ data = wpas_get_nfc_uri()
+ if data is None:
+ summary("Could not get NFC URI from wpa_supplicant")
+ return
+
+ global dpp_sel_wait_remove
+ dpp_sel_wait_remove = wait_remove
+ print("URI: %s" % data)
+ uri = ndef.UriRecord(data)
+ print(uri)
+
+ print("Touch an NFC tag")
+ global dpp_tag_data
+ dpp_tag_data = [uri]
+ clf.connect(rdwr={'on-connect': rdwr_connected_write_tag})
+
+def write_nfc_hs(clf, wait_remove=True):
+ print("Write NFC Handover Select record on a tag")
+ data = wpas_get_nfc_uri()
+ if data is None:
+ summary("Could not get NFC URI from wpa_supplicant")
+ return
+
+ global dpp_sel_wait_remove
+ dpp_sel_wait_remove = wait_remove
+ print("URI: %s" % data)
+ uri = ndef.UriRecord(data)
+ print(uri)
+ carrier = ndef.Record('application/vnd.wfa.dpp', 'A', uri.data)
+ hs = ndef.HandoverSelectRecord('1.4')
+ hs.add_alternative_carrier('active', carrier.name)
+ print(hs)
+ print(carrier)
+
+ print("Touch an NFC tag")
+ global dpp_tag_data
+ dpp_tag_data = [hs, carrier]
+ print(dpp_tag_data)
+ clf.connect(rdwr={'on-connect': rdwr_connected_write_tag})
+
+def rdwr_connected(tag):
+ global only_one, no_wait
+ summary("Tag connected: " + str(tag))
+
+ if tag.ndef:
+ print("NDEF tag: " + tag.type)
+ print(tag.ndef.records)
+ success = dpp_tag_read(tag)
+ if only_one and success:
+ global continue_loop
+ continue_loop = False
+ else:
+ summary("Not an NDEF tag - remove tag")
+ return True
+
+ return not no_wait
+
+def llcp_worker(llc):
+ global init_on_touch
+ if init_on_touch:
+ print("Starting handover client")
+ dpp_handover_client(llc)
+ print("Exiting llcp_worker thread (init_in_touch)")
+ return
+
+ global no_input
+ if no_input:
+ print("Wait for handover to complete")
+ else:
+ print("Wait for handover to complete - press 'i' to initiate")
+ global srv
+ global wait_connection
+ while not wait_connection and srv.sent_carrier is None:
+ if srv.ho_server_processing:
+ time.sleep(0.025)
+ elif no_input:
+ time.sleep(0.5)
+ else:
+ res = getch()
+ if res != 'i':
+ continue
+ clear_raw_mode()
+ print("Starting handover client")
+ dpp_handover_client(llc)
+ print("Exiting llcp_worker thread (manual init)")
+ return
+
+ clear_raw_mode()
+ print("\rExiting llcp_worker thread")
+
+def llcp_startup(llc):
+ print("Start LLCP server")
+ global srv
+ srv = HandoverServer(llc)
+ return llc
+
+def llcp_connected(llc):
+ print("P2P LLCP connected")
+ global wait_connection
+ wait_connection = False
+ global init_on_touch
+ if not init_on_touch:
+ global srv
+ srv.start()
+ if init_on_touch or not no_input:
+ threading.Thread(target=llcp_worker, args=(llc,)).start()
+ return True
+
+def llcp_release(llc):
+ print("LLCP release")
+ return True
+
+def terminate_loop():
+ global terminate_now
+ return terminate_now
+
+def main():
+ clf = nfc.ContactlessFrontend()
+
+ parser = argparse.ArgumentParser(description='nfcpy to wpa_supplicant integration for DPP NFC operations')
+ parser.add_argument('-d', const=logging.DEBUG, default=logging.INFO,
+ action='store_const', dest='loglevel',
+ help='verbose debug output')
+ parser.add_argument('-q', const=logging.WARNING, action='store_const',
+ dest='loglevel', help='be quiet')
+ parser.add_argument('--only-one', '-1', action='store_true',
+ help='run only one operation and exit')
+ parser.add_argument('--init-on-touch', '-I', action='store_true',
+ help='initiate handover on touch')
+ parser.add_argument('--no-wait', action='store_true',
+ help='do not wait for tag to be removed before exiting')
+ parser.add_argument('--ifname', '-i',
+ help='network interface name')
+ parser.add_argument('--no-input', '-a', action='store_true',
+ help='do not use stdout input to initiate handover')
+ parser.add_argument('--tag-read-only', '-t', action='store_true',
+ help='tag read only (do not allow connection handover)')
+ parser.add_argument('--handover-only', action='store_true',
+ help='connection handover only (do not allow tag read)')
+ parser.add_argument('--summary',
+ help='summary file for writing status updates')
+ parser.add_argument('--success',
+ help='success file for writing success update')
+ parser.add_argument('--device', default='usb', help='NFC device to open')
+ parser.add_argument('command', choices=['write-nfc-uri',
+ 'write-nfc-hs'],
+ nargs='?')
+ args = parser.parse_args()
+ print(args)
+
+ global only_one
+ only_one = args.only_one
+
+ global no_wait
+ no_wait = args.no_wait
+
+ logging.basicConfig(level=args.loglevel)
+
+ global init_on_touch
+ init_on_touch = args.init_on_touch
+
+ if args.ifname:
+ global ifname
+ ifname = args.ifname
+ print("Selected ifname " + ifname)
+
+ if args.summary:
+ global summary_file
+ summary_file = args.summary
+
+ if args.success:
+ global success_file
+ success_file = args.success
+
+ if args.no_input:
+ global no_input
+ no_input = True
+
+ clf = nfc.ContactlessFrontend()
+ global wait_connection
+
+ try:
+ if not clf.open(args.device):
+ print("Could not open connection with an NFC device")
+ raise SystemExit
+
+ if args.command == "write-nfc-uri":
+ write_nfc_uri(clf, wait_remove=not args.no_wait)
+ raise SystemExit
+
+ if args.command == "write-nfc-hs":
+ write_nfc_hs(clf, wait_remove=not args.no_wait)
+ raise SystemExit
+
+ global continue_loop
+ while continue_loop:
+ clear_raw_mode()
+ print("\rWaiting for a tag or peer to be touched")
+ wait_connection = True
+ try:
+ if args.tag_read_only:
+ if not clf.connect(rdwr={'on-connect': rdwr_connected}):
+ break
+ elif args.handover_only:
+ if not clf.connect(llcp={'on-startup': llcp_startup,
+ 'on-connect': llcp_connected,
+ 'on-release': llcp_release},
+ terminate=terminate_loop):
+ break
+ else:
+ if not clf.connect(rdwr={'on-connect': rdwr_connected},
+ llcp={'on-startup': llcp_startup,
+ 'on-connect': llcp_connected,
+ 'on-release': llcp_release},
+ terminate=terminate_loop):
+ break
+ except Exception as e:
+ print("clf.connect failed: " + str(e))
+ break
+
+ global srv
+ if only_one and srv and srv.success:
+ raise SystemExit
+
+ except KeyboardInterrupt:
+ raise SystemExit
+ finally:
+ clf.close()
+
+ raise SystemExit
+
+if __name__ == '__main__':
+ main()