]> git.ipfire.org Git - thirdparty/hostap.git/blob - wpa_supplicant/examples/dpp-nfc.py
9aaff9247ebb585bdd67c7b3df3c10b82d1e090e
[thirdparty/hostap.git] / wpa_supplicant / examples / dpp-nfc.py
1 #!/usr/bin/python3
2 #
3 # Example nfcpy to wpa_supplicant wrapper for DPP NFC operations
4 # Copyright (c) 2012-2013, Jouni Malinen <j@w1.fi>
5 # Copyright (c) 2019-2020, The Linux Foundation
6 #
7 # This software may be distributed under the terms of the BSD license.
8 # See README for more details.
9
10 import os
11 import sys
12 import time
13 import threading
14 import argparse
15
16 import nfc
17 import ndef
18
19 import logging
20
21 scriptsdir = os.path.dirname(os.path.realpath("dpp-nfc.py"))
22 sys.path.append(os.path.join(scriptsdir, '..', '..', 'wpaspy'))
23 import wpaspy
24
25 wpas_ctrl = '/var/run/wpa_supplicant'
26 ifname = None
27 init_on_touch = False
28 in_raw_mode = False
29 prev_tcgetattr = 0
30 no_input = False
31 srv = None
32 continue_loop = True
33 terminate_now = False
34 summary_file = None
35 success_file = None
36
37 def summary(txt):
38 print(txt)
39 if summary_file:
40 with open(summary_file, 'a') as f:
41 f.write(txt + "\n")
42
43 def success_report(txt):
44 summary(txt)
45 if success_file:
46 with open(success_file, 'a') as f:
47 f.write(txt + "\n")
48
49 def wpas_connect():
50 ifaces = []
51 if os.path.isdir(wpas_ctrl):
52 try:
53 ifaces = [os.path.join(wpas_ctrl, i) for i in os.listdir(wpas_ctrl)]
54 except OSError as error:
55 print("Could not find wpa_supplicant: ", error)
56 return None
57
58 if len(ifaces) < 1:
59 print("No wpa_supplicant control interface found")
60 return None
61
62 for ctrl in ifaces:
63 if ifname:
64 if ifname not in ctrl:
65 continue
66 try:
67 print("Trying to use control interface " + ctrl)
68 wpas = wpaspy.Ctrl(ctrl)
69 return wpas
70 except Exception as e:
71 pass
72 return None
73
74 def dpp_nfc_uri_process(uri):
75 wpas = wpas_connect()
76 if wpas is None:
77 return False
78 peer_id = wpas.request("DPP_NFC_URI " + uri)
79 if "FAIL" in peer_id:
80 print("Could not parse DPP URI from NFC URI record")
81 return False
82 peer_id = int(peer_id)
83 print("peer_id=%d" % peer_id)
84 cmd = "DPP_AUTH_INIT peer=%d" % peer_id
85 res = wpas.request(cmd)
86 if "OK" not in res:
87 print("Failed to initiate DPP Authentication")
88 return False
89 print("DPP Authentication initiated")
90 return True
91
92 def dpp_hs_tag_read(record):
93 wpas = wpas_connect()
94 if wpas is None:
95 return False
96 print(record)
97 if len(record.data) < 5:
98 print("Too short DPP HS")
99 return False
100 if record.data[0] != 0:
101 print("Unexpected URI Identifier Code")
102 return False
103 uribuf = record.data[1:]
104 try:
105 uri = uribuf.decode()
106 except:
107 print("Invalid URI payload")
108 return False
109 print("URI: " + uri)
110 if not uri.startswith("DPP:"):
111 print("Not a DPP URI")
112 return False
113 return dpp_nfc_uri_process(uri)
114
115 def get_status(wpas, extra=None):
116 if extra:
117 extra = "-" + extra
118 else:
119 extra = ""
120 res = wpas.request("STATUS" + extra)
121 lines = res.splitlines()
122 vals = dict()
123 for l in lines:
124 try:
125 [name, value] = l.split('=', 1)
126 except ValueError:
127 logger.info("Ignore unexpected status line: " + l)
128 continue
129 vals[name] = value
130 return vals
131
132 def get_status_field(wpas, field, extra=None):
133 vals = get_status(wpas, extra)
134 if field in vals:
135 return vals[field]
136 return None
137
138 def own_addr(wpas):
139 return get_status_field(wpas, "address")
140
141 def dpp_bootstrap_gen(wpas, type="qrcode", chan=None, mac=None, info=None,
142 curve=None, key=None):
143 cmd = "DPP_BOOTSTRAP_GEN type=" + type
144 if chan:
145 cmd += " chan=" + chan
146 if mac:
147 if mac is True:
148 mac = own_addr(wpas)
149 cmd += " mac=" + mac.replace(':', '')
150 if info:
151 cmd += " info=" + info
152 if curve:
153 cmd += " curve=" + curve
154 if key:
155 cmd += " key=" + key
156 res = wpas.request(cmd)
157 if "FAIL" in res:
158 raise Exception("Failed to generate bootstrapping info")
159 return int(res)
160
161 def wpas_get_nfc_uri(start_listen=True):
162 wpas = wpas_connect()
163 if wpas is None:
164 return None
165 global own_id
166 own_id = dpp_bootstrap_gen(wpas, type="nfc-uri", chan="81/1", mac=True)
167 res = wpas.request("DPP_BOOTSTRAP_GET_URI %d" % own_id).rstrip()
168 if "FAIL" in res:
169 return None
170 if start_listen:
171 wpas.request("DPP_LISTEN 2412 netrole=configurator")
172 return res
173
174 def wpas_report_handover_req(uri):
175 wpas = wpas_connect()
176 if wpas is None:
177 return None
178 global own_id
179 cmd = "DPP_NFC_HANDOVER_REQ own=%d uri=%s" % (own_id, uri)
180 return wpas.request(cmd)
181
182 def wpas_report_handover_sel(uri):
183 wpas = wpas_connect()
184 if wpas is None:
185 return None
186 global own_id
187 cmd = "DPP_NFC_HANDOVER_SEL own=%d uri=%s" % (own_id, uri)
188 return wpas.request(cmd)
189
190 def dpp_handover_client(llc):
191 uri = wpas_get_nfc_uri(start_listen=False)
192 uri = ndef.UriRecord(uri)
193 print("NFC URI record for DPP: " + str(uri))
194 carrier = ndef.Record('application/vnd.wfa.dpp', 'A', uri.data)
195 hr = ndef.HandoverRequestRecord(version="1.4", crn=os.urandom(2))
196 hr.add_alternative_carrier('active', carrier.name)
197 message = [hr, carrier]
198 print("NFC Handover Request message for DPP: " + str(message))
199
200 client = nfc.handover.HandoverClient(llc)
201 try:
202 summary("Trying to initiate NFC connection handover")
203 client.connect()
204 summary("Connected for handover")
205 except nfc.llcp.ConnectRefused:
206 summary("Handover connection refused")
207 client.close()
208 return
209 except Exception as e:
210 summary("Other exception: " + str(e))
211 client.close()
212 return
213
214 summary("Sending handover request")
215
216 if not client.send_records(message):
217 summary("Failed to send handover request")
218 client.close()
219 return
220
221 summary("Receiving handover response")
222 message = client.recv_records(timeout=3.0)
223 if message is None:
224 summary("No response received")
225 client.close()
226 return
227 print("Received message: " + str(message))
228 if len(message) < 1 or \
229 not isinstance(message[0], ndef.HandoverSelectRecord):
230 summary("Response was not Hs - received: " + message.type)
231 client.close()
232 return
233
234 print("Received message")
235 print("alternative carriers: " + str(message[0].alternative_carriers))
236
237 for carrier in message:
238 if isinstance(carrier, ndef.HandoverSelectRecord):
239 continue
240 print("Remote carrier type: " + carrier.type)
241 if carrier.type == "application/vnd.wfa.dpp":
242 if len(carrier.data) == 0 or carrier.data[0] != 0:
243 print("URI Identifier Code 'None' not seen")
244 continue
245 print("DPP carrier type match - send to wpa_supplicant")
246 uri = carrier.data[1:].decode("utf-8")
247 print("DPP URI: " + uri)
248 res = wpas_report_handover_sel(uri)
249 if res is None or "FAIL" in res:
250 summary("DPP handover report rejected")
251 break
252
253 success_report("DPP handover reported successfully (initiator)")
254 print("peer_id=" + res)
255 peer_id = int(res)
256 # TODO: Single Configurator instance
257 wpas = wpas_connect()
258 if wpas is None:
259 break
260 res = wpas.request("DPP_CONFIGURATOR_ADD")
261 if "FAIL" in res:
262 print("Failed to initiate Configurator")
263 break
264 conf_id = int(res)
265 global own_id
266 print("Initiate DPP authentication")
267 cmd = "DPP_AUTH_INIT peer=%d own=%d conf=sta-dpp configurator=%d" % (peer_id, own_id, conf_id)
268 res = wpas.request(cmd)
269 if "FAIL" in res:
270 print("Failed to initiate DPP authentication")
271 break
272
273 print("Remove peer")
274 client.close()
275 print("Done with handover")
276 global only_one
277 if only_one:
278 print("only_one -> stop loop")
279 global continue_loop
280 continue_loop = False
281
282 global no_wait
283 if no_wait:
284 print("Trying to exit..")
285 global terminate_now
286 terminate_now = True
287
288 print("Returning from dpp_handover_client")
289
290 class HandoverServer(nfc.handover.HandoverServer):
291 def __init__(self, llc):
292 super(HandoverServer, self).__init__(llc)
293 self.sent_carrier = None
294 self.ho_server_processing = False
295 self.success = False
296
297 def process_handover_request_message(self, records):
298 self.ho_server_processing = True
299 clear_raw_mode()
300 print("\nHandoverServer - request received: " + str(records))
301
302 carrier = None
303 hs = ndef.HandoverSelectRecord('1.4')
304 sel = [hs]
305
306 found = False
307
308 for carrier in records:
309 if isinstance(carrier, ndef.HandoverRequestRecord):
310 continue
311 print("Remote carrier type: " + carrier.type)
312 if carrier.type == "application/vnd.wfa.dpp":
313 print("DPP carrier type match - add DPP carrier record")
314 if len(carrier.data) == 0 or carrier.data[0] != 0:
315 print("URI Identifier Code 'None' not seen")
316 continue
317 uri = carrier.data[1:].decode("utf-8")
318 print("Received DPP URI: " + uri)
319
320 data = wpas_get_nfc_uri(start_listen=False)
321 print("Own URI (pre-processing): %s" % data)
322
323 res = wpas_report_handover_req(uri)
324 if res is None or "FAIL" in res:
325 print("DPP handover request processing failed")
326 continue
327
328 found = True
329 self.received_carrier = carrier
330
331 wpas = wpas_connect()
332 if wpas is None:
333 continue
334 global own_id
335 data = wpas.request("DPP_BOOTSTRAP_GET_URI %d" % own_id).rstrip()
336 if "FAIL" in data:
337 continue
338 print("Own URI (post-processing): %s" % data)
339 uri = ndef.UriRecord(data)
340 print("Own bootstrapping NFC URI record: " + str(uri))
341
342 info = wpas.request("DPP_BOOTSTRAP_INFO %d" % own_id)
343 freq = None
344 for line in info.splitlines():
345 if line.startswith("use_freq="):
346 freq = int(line.split('=')[1])
347 if freq is None:
348 print("No channel negotiated over NFC - use channel 1")
349 freq = 2412
350 res = wpas.request("DPP_LISTEN %d" % freq)
351 if "OK" not in res:
352 print("Failed to start DPP listen")
353 break
354
355 carrier = ndef.Record('application/vnd.wfa.dpp', 'A', uri.data)
356 print("Own DPP carrier record: " + str(carrier))
357 hs.add_alternative_carrier('active', carrier.name)
358 sel = [hs, carrier]
359 break
360
361 summary("Sending handover select: " + str(sel))
362 self.success = True
363 return sel
364
365 def clear_raw_mode():
366 import sys, tty, termios
367 global prev_tcgetattr, in_raw_mode
368 if not in_raw_mode:
369 return
370 fd = sys.stdin.fileno()
371 termios.tcsetattr(fd, termios.TCSADRAIN, prev_tcgetattr)
372 in_raw_mode = False
373
374 def getch():
375 import sys, tty, termios, select
376 global prev_tcgetattr, in_raw_mode
377 fd = sys.stdin.fileno()
378 prev_tcgetattr = termios.tcgetattr(fd)
379 ch = None
380 try:
381 tty.setraw(fd)
382 in_raw_mode = True
383 [i, o, e] = select.select([fd], [], [], 0.05)
384 if i:
385 ch = sys.stdin.read(1)
386 finally:
387 termios.tcsetattr(fd, termios.TCSADRAIN, prev_tcgetattr)
388 in_raw_mode = False
389 return ch
390
391 def dpp_tag_read(tag):
392 success = False
393 for record in tag.ndef.records:
394 print(record)
395 print("record type " + record.type)
396 if record.type == "application/vnd.wfa.dpp":
397 summary("DPP HS tag - send to wpa_supplicant")
398 success = dpp_hs_tag_read(record)
399 break
400 if isinstance(record, ndef.UriRecord):
401 print("URI record: uri=" + record.uri)
402 print("URI record: iri=" + record.iri)
403 if record.iri.startswith("DPP:"):
404 print("DPP URI")
405 if not dpp_nfc_uri_process(record.iri):
406 break
407 success = True
408 else:
409 print("Ignore unknown URI")
410 break
411
412 if success:
413 success_report("Tag read succeeded")
414
415 return success
416
417 def rdwr_connected_write_tag(tag):
418 summary("Tag found - writing - " + str(tag))
419 if not tag.ndef.is_writeable:
420 print("Not a writable tag")
421 return
422 global dpp_tag_data
423 if tag.ndef.capacity < len(dpp_tag_data):
424 print("Not enough room for the message")
425 return
426 tag.ndef.records = dpp_tag_data
427 success_report("Tag write succeeded")
428 print("Done - remove tag")
429 global only_one
430 if only_one:
431 global continue_loop
432 continue_loop = False
433 global dpp_sel_wait_remove
434 return dpp_sel_wait_remove
435
436 def write_nfc_uri(clf, wait_remove=True):
437 print("Write NFC URI record")
438 data = wpas_get_nfc_uri()
439 if data is None:
440 summary("Could not get NFC URI from wpa_supplicant")
441 return
442
443 global dpp_sel_wait_remove
444 dpp_sel_wait_remove = wait_remove
445 print("URI: %s" % data)
446 uri = ndef.UriRecord(data)
447 print(uri)
448
449 print("Touch an NFC tag")
450 global dpp_tag_data
451 dpp_tag_data = [uri]
452 clf.connect(rdwr={'on-connect': rdwr_connected_write_tag})
453
454 def write_nfc_hs(clf, wait_remove=True):
455 print("Write NFC Handover Select record on a tag")
456 data = wpas_get_nfc_uri()
457 if data is None:
458 summary("Could not get NFC URI from wpa_supplicant")
459 return
460
461 global dpp_sel_wait_remove
462 dpp_sel_wait_remove = wait_remove
463 print("URI: %s" % data)
464 uri = ndef.UriRecord(data)
465 print(uri)
466 carrier = ndef.Record('application/vnd.wfa.dpp', 'A', uri.data)
467 hs = ndef.HandoverSelectRecord('1.4')
468 hs.add_alternative_carrier('active', carrier.name)
469 print(hs)
470 print(carrier)
471
472 print("Touch an NFC tag")
473 global dpp_tag_data
474 dpp_tag_data = [hs, carrier]
475 print(dpp_tag_data)
476 clf.connect(rdwr={'on-connect': rdwr_connected_write_tag})
477
478 def rdwr_connected(tag):
479 global only_one, no_wait
480 summary("Tag connected: " + str(tag))
481
482 if tag.ndef:
483 print("NDEF tag: " + tag.type)
484 print(tag.ndef.records)
485 success = dpp_tag_read(tag)
486 if only_one and success:
487 global continue_loop
488 continue_loop = False
489 else:
490 summary("Not an NDEF tag - remove tag")
491 return True
492
493 return not no_wait
494
495 def llcp_worker(llc):
496 global init_on_touch
497 if init_on_touch:
498 print("Starting handover client")
499 dpp_handover_client(llc)
500 print("Exiting llcp_worker thread (init_in_touch)")
501 return
502
503 global no_input
504 if no_input:
505 print("Wait for handover to complete")
506 else:
507 print("Wait for handover to complete - press 'i' to initiate")
508 global srv
509 global wait_connection
510 while not wait_connection and srv.sent_carrier is None:
511 if srv.ho_server_processing:
512 time.sleep(0.025)
513 elif no_input:
514 time.sleep(0.5)
515 else:
516 res = getch()
517 if res != 'i':
518 continue
519 clear_raw_mode()
520 print("Starting handover client")
521 dpp_handover_client(llc)
522 print("Exiting llcp_worker thread (manual init)")
523 return
524
525 clear_raw_mode()
526 print("\rExiting llcp_worker thread")
527
528 def llcp_startup(llc):
529 print("Start LLCP server")
530 global srv
531 srv = HandoverServer(llc)
532 return llc
533
534 def llcp_connected(llc):
535 print("P2P LLCP connected")
536 global wait_connection
537 wait_connection = False
538 global init_on_touch
539 if not init_on_touch:
540 global srv
541 srv.start()
542 if init_on_touch or not no_input:
543 threading.Thread(target=llcp_worker, args=(llc,)).start()
544 return True
545
546 def llcp_release(llc):
547 print("LLCP release")
548 return True
549
550 def terminate_loop():
551 global terminate_now
552 return terminate_now
553
554 def main():
555 clf = nfc.ContactlessFrontend()
556
557 parser = argparse.ArgumentParser(description='nfcpy to wpa_supplicant integration for DPP NFC operations')
558 parser.add_argument('-d', const=logging.DEBUG, default=logging.INFO,
559 action='store_const', dest='loglevel',
560 help='verbose debug output')
561 parser.add_argument('-q', const=logging.WARNING, action='store_const',
562 dest='loglevel', help='be quiet')
563 parser.add_argument('--only-one', '-1', action='store_true',
564 help='run only one operation and exit')
565 parser.add_argument('--init-on-touch', '-I', action='store_true',
566 help='initiate handover on touch')
567 parser.add_argument('--no-wait', action='store_true',
568 help='do not wait for tag to be removed before exiting')
569 parser.add_argument('--ifname', '-i',
570 help='network interface name')
571 parser.add_argument('--no-input', '-a', action='store_true',
572 help='do not use stdout input to initiate handover')
573 parser.add_argument('--tag-read-only', '-t', action='store_true',
574 help='tag read only (do not allow connection handover)')
575 parser.add_argument('--handover-only', action='store_true',
576 help='connection handover only (do not allow tag read)')
577 parser.add_argument('--summary',
578 help='summary file for writing status updates')
579 parser.add_argument('--success',
580 help='success file for writing success update')
581 parser.add_argument('--device', default='usb', help='NFC device to open')
582 parser.add_argument('command', choices=['write-nfc-uri',
583 'write-nfc-hs'],
584 nargs='?')
585 args = parser.parse_args()
586 print(args)
587
588 global only_one
589 only_one = args.only_one
590
591 global no_wait
592 no_wait = args.no_wait
593
594 logging.basicConfig(level=args.loglevel)
595
596 global init_on_touch
597 init_on_touch = args.init_on_touch
598
599 if args.ifname:
600 global ifname
601 ifname = args.ifname
602 print("Selected ifname " + ifname)
603
604 if args.summary:
605 global summary_file
606 summary_file = args.summary
607
608 if args.success:
609 global success_file
610 success_file = args.success
611
612 if args.no_input:
613 global no_input
614 no_input = True
615
616 clf = nfc.ContactlessFrontend()
617 global wait_connection
618
619 try:
620 if not clf.open(args.device):
621 print("Could not open connection with an NFC device")
622 raise SystemExit
623
624 if args.command == "write-nfc-uri":
625 write_nfc_uri(clf, wait_remove=not args.no_wait)
626 raise SystemExit
627
628 if args.command == "write-nfc-hs":
629 write_nfc_hs(clf, wait_remove=not args.no_wait)
630 raise SystemExit
631
632 global continue_loop
633 while continue_loop:
634 clear_raw_mode()
635 print("\rWaiting for a tag or peer to be touched")
636 wait_connection = True
637 try:
638 if args.tag_read_only:
639 if not clf.connect(rdwr={'on-connect': rdwr_connected}):
640 break
641 elif args.handover_only:
642 if not clf.connect(llcp={'on-startup': llcp_startup,
643 'on-connect': llcp_connected,
644 'on-release': llcp_release},
645 terminate=terminate_loop):
646 break
647 else:
648 if not clf.connect(rdwr={'on-connect': rdwr_connected},
649 llcp={'on-startup': llcp_startup,
650 'on-connect': llcp_connected,
651 'on-release': llcp_release},
652 terminate=terminate_loop):
653 break
654 except Exception as e:
655 print("clf.connect failed: " + str(e))
656 break
657
658 global srv
659 if only_one and srv and srv.success:
660 raise SystemExit
661
662 except KeyboardInterrupt:
663 raise SystemExit
664 finally:
665 clf.close()
666
667 raise SystemExit
668
669 if __name__ == '__main__':
670 main()