check_dpp_capab(dev[0])
if "FAIL" not in dev[0].request("DPP_CONFIGURATOR_ADD curve=unknown"):
raise Exception("Unexpected success of invalid DPP_CONFIGURATOR_ADD")
+
+def rx_process_frame(dev):
+ msg = dev.mgmt_rx()
+ if "OK" not in dev.request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(msg['freq'], msg['datarate'], msg['ssi_signal'], msg['frame'].encode('hex'))):
+ raise Exception("MGMT_RX_PROCESS failed")
+
+def wait_auth_success(responder, initiator):
+ ev = responder.wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
+ if ev is None:
+ raise Exception("DPP authentication did not succeed (Responder)")
+ ev = initiator.wait_event(["DPP-AUTH-SUCCESS"], timeout=5)
+ if ev is None:
+ raise Exception("DPP authentication did not succeed (Initiator)")
+
+def wait_conf_completion(configurator, enrollee):
+ ev = configurator.wait_event(["DPP-CONF-SENT"], timeout=5)
+ if ev is None:
+ raise Exception("DPP configuration not completed (Configurator)")
+ ev = enrollee.wait_event(["DPP-CONF-RECEIVED", "DPP-CONF-FAILED"],
+ timeout=5)
+ if ev is None:
+ raise Exception("DPP configuration not completed (Enrollee)")
+
+def start_dpp(dev):
+ addr = dev[0].own_addr().replace(':', '')
+ cmd = "DPP_BOOTSTRAP_GEN type=qrcode chan=81/1 mac=" + addr
+ res = dev[0].request(cmd)
+ if "FAIL" in res:
+ raise Exception("Failed to generate bootstrapping info")
+ id0 = int(res)
+ uri0 = dev[0].request("DPP_BOOTSTRAP_GET_URI %d" % id0)
+
+ res = dev[1].request("DPP_QR_CODE " + uri0)
+ if "FAIL" in res:
+ raise Exception("Failed to parse QR Code URI")
+ id1 = int(res)
+
+ conf = '{"wi-fi_tech":"infra", "discovery":{"ssid":"test"},"cred":{"akm":"psk","pass":"secret passphrase"}}' + 3000*' '
+ dev[0].set("dpp_config_obj_override", conf)
+
+ dev[0].set("ext_mgmt_frame_handling", "1")
+ cmd = "DPP_LISTEN 2412"
+ if "OK" not in dev[0].request(cmd):
+ raise Exception("Failed to start listen operation")
+ cmd = "DPP_AUTH_INIT peer=%d role=enrollee" % id1
+ if "OK" not in dev[1].request(cmd):
+ raise Exception("Failed to initiate DPP Authentication")
+
+def test_dpp_gas_timeout_handling(dev, apdev):
+ """DPP and GAS timeout handling"""
+ check_dpp_capab(dev[0])
+ check_dpp_capab(dev[1])
+ start_dpp(dev)
+
+ # DPP Authentication Request
+ rx_process_frame(dev[0])
+
+ # DPP Authentication Confirmation
+ rx_process_frame(dev[0])
+
+ wait_auth_success(dev[0], dev[1])
+
+ # DPP Configuration Request (GAS Initial Request frame)
+ rx_process_frame(dev[0])
+
+ # DPP Configuration Request (GAS Comeback Request frame)
+ rx_process_frame(dev[0])
+
+ # Wait for GAS timeout
+ ev = dev[1].wait_event(["DPP-CONF-FAILED"], timeout=5)
+ if ev is None:
+ raise Exception("DPP configuration not completed (Enrollee)")
+
+def test_dpp_gas_comeback_after_failure(dev, apdev):
+ """DPP and GAS comeback after failure"""
+ check_dpp_capab(dev[0])
+ check_dpp_capab(dev[1])
+ start_dpp(dev)
+
+ # DPP Authentication Request
+ rx_process_frame(dev[0])
+
+ # DPP Authentication Confirmation
+ rx_process_frame(dev[0])
+
+ wait_auth_success(dev[0], dev[1])
+
+ # DPP Configuration Request (GAS Initial Request frame)
+ rx_process_frame(dev[0])
+
+ # DPP Configuration Request (GAS Comeback Request frame)
+ msg = dev[0].mgmt_rx()
+ frame = msg['frame'].encode('hex')
+ with alloc_fail(dev[0], 1, "gas_build_comeback_resp;gas_server_handle_rx_comeback_req"):
+ if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(msg['freq'], msg['datarate'], msg['ssi_signal'], frame)):
+ raise Exception("MGMT_RX_PROCESS failed")
+ wait_fail_trigger(dev[0], "GET_ALLOC_FAIL")
+ # Try the same frame again - this is expected to fail since the response has
+ # already been freed.
+ if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(msg['freq'], msg['datarate'], msg['ssi_signal'], frame)):
+ raise Exception("MGMT_RX_PROCESS failed")
+
+ # DPP Configuration Request (GAS Comeback Request frame retry)
+ msg = dev[0].mgmt_rx()
+
+def test_dpp_gas(dev, apdev):
+ """DPP and GAS protocol testing"""
+ check_dpp_capab(dev[0])
+ check_dpp_capab(dev[1])
+ start_dpp(dev)
+
+ # DPP Authentication Request
+ rx_process_frame(dev[0])
+
+ # DPP Authentication Confirmation
+ rx_process_frame(dev[0])
+
+ wait_auth_success(dev[0], dev[1])
+
+ # DPP Configuration Request (GAS Initial Request frame)
+ msg = dev[0].mgmt_rx()
+
+ # Protected Dual of GAS Initial Request frame (dropped by GAS server)
+ frame = msg['frame'].encode('hex')
+ frame = frame[0:48] + "09" + frame[50:]
+ if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(msg['freq'], msg['datarate'], msg['ssi_signal'], frame)):
+ raise Exception("MGMT_RX_PROCESS failed")
+
+ with alloc_fail(dev[0], 1, "gas_server_send_resp"):
+ frame = msg['frame'].encode('hex')
+ if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(msg['freq'], msg['datarate'], msg['ssi_signal'], frame)):
+ raise Exception("MGMT_RX_PROCESS failed")
+ wait_fail_trigger(dev[0], "GET_ALLOC_FAIL")
+
+ with alloc_fail(dev[0], 1, "gas_build_initial_resp;gas_server_send_resp"):
+ frame = msg['frame'].encode('hex')
+ if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(msg['freq'], msg['datarate'], msg['ssi_signal'], frame)):
+ raise Exception("MGMT_RX_PROCESS failed")
+ wait_fail_trigger(dev[0], "GET_ALLOC_FAIL")
+
+ # Add extra data after Query Request field to trigger
+ # "GAS: Ignored extra data after Query Request field"
+ frame = msg['frame'].encode('hex') + "00"
+ if "OK" not in dev[0].request("MGMT_RX_PROCESS freq={} datarate={} ssi_signal={} frame={}".format(msg['freq'], msg['datarate'], msg['ssi_signal'], frame)):
+ raise Exception("MGMT_RX_PROCESS failed")
+
+ # DPP Configuration Request (GAS Comeback Request frame)
+ rx_process_frame(dev[0])
+
+ # DPP Configuration Request (GAS Comeback Request frame)
+ rx_process_frame(dev[0])
+
+ # DPP Configuration Request (GAS Comeback Request frame)
+ rx_process_frame(dev[0])
+
+ wait_conf_completion(dev[0], dev[1])