if ev is None:
raise Exception("Operation timed out")
-def expect_gas_result(dev, result):
+def expect_gas_result(dev, result, status=None):
ev = dev.wait_event(["GAS-QUERY-DONE"], timeout=10)
if ev is None:
raise Exception("GAS query timed out")
if "result=" + result not in ev:
raise Exception("Unexpected GAS query result")
+ if status and "status_code=" + str(status) + ' ' not in ev:
+ raise Exception("Unexpected GAS status code")
def anqp_get(dev, bssid, id):
dev.request("ANQP_GET " + bssid + " " + str(id))
def anqp_adv_proto():
return struct.pack('BBBB', 108, 2, 127, 0)
-def anqp_initial_resp(dialog_token, status_code):
+def anqp_initial_resp(dialog_token, status_code, comeback_delay=0):
return struct.pack('<BBBHH', ACTION_CATEG_PUBLIC, GAS_INITIAL_RESPONSE,
- dialog_token, status_code, 0) + anqp_adv_proto()
-
-def anqp_comeback_resp(dialog_token):
+ dialog_token, status_code, comeback_delay) + anqp_adv_proto()
+
+def anqp_comeback_resp(dialog_token, status_code=0, id=0, more=False, comeback_delay=0, bogus_adv_proto=False):
+ if more:
+ id |= 0x80
+ if bogus_adv_proto:
+ adv = struct.pack('BBBB', 108, 2, 127, 1)
+ else:
+ adv = anqp_adv_proto()
return struct.pack('<BBBHBH', ACTION_CATEG_PUBLIC, GAS_COMEBACK_RESPONSE,
- dialog_token, 0, 0, 0) + anqp_adv_proto()
+ dialog_token, status_code, id, comeback_delay) + adv
def gas_rx(hapd):
count = 0
gas['action'] = action
pos = pos[3:]
- if len(pos) < 1:
+ if len(pos) < 1 and action != GAS_COMEBACK_REQUEST:
return None
gas['dialog_token'] = dialog_token
resp['bssid'] = req['bssid']
return resp
+def send_gas_resp(hapd, resp):
+ hapd.mgmt_tx(resp)
+ ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
+ if ev is None:
+ raise Exception("Missing TX status for GAS response")
+ if "ok=1" not in ev:
+ raise Exception("GAS response not acknowledged")
+
def test_gas_invalid_response_type(dev, apdev):
"""GAS invalid response type"""
hapd = start_ap(apdev[0])
resp = action_response(query)
# GAS Comeback Response instead of GAS Initial Response
resp['payload'] = anqp_comeback_resp(gas['dialog_token']) + struct.pack('<H', 0)
- hapd.mgmt_tx(resp)
- ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
- if ev is None:
- raise Exception("Missing TX status for GAS response")
- if "ok=1" not in ev:
- raise Exception("GAS response not acknowledged")
+ send_gas_resp(hapd, resp)
# station drops the invalid frame, so this needs to result in GAS timeout
expect_gas_result(dev[0], "TIMEOUT")
resp = action_response(query)
resp['payload'] = anqp_initial_resp(gas['dialog_token'], 61) + struct.pack('<H', 0)
- hapd.mgmt_tx(resp)
- ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
- if ev is None:
- raise Exception("Missing TX status for GAS response")
- if "ok=1" not in ev:
- raise Exception("GAS response not acknowledged")
+ send_gas_resp(hapd, resp)
expect_gas_result(dev[0], "FAILURE")
# part of the response is not valid. This is reported as successfulyl
# completed GAS exchange.
expect_gas_result(dev[0], "SUCCESS")
+
+def init_gas(hapd, bssid, dev):
+ anqp_get(dev, bssid, 263)
+ query = gas_rx(hapd)
+ gas = parse_gas(query['payload'])
+ dialog_token = gas['dialog_token']
+
+ resp = action_response(query)
+ resp['payload'] = anqp_initial_resp(dialog_token, 0, comeback_delay=1) + struct.pack('<H', 0)
+ send_gas_resp(hapd, resp)
+
+ query = gas_rx(hapd)
+ gas = parse_gas(query['payload'])
+ if gas['action'] != GAS_COMEBACK_REQUEST:
+ raise Exception("Unexpected request action")
+ if gas['dialog_token'] != dialog_token:
+ raise Exception("Unexpected dialog token change")
+ return query, dialog_token
+
+def test_gas_malformed_comeback_resp(dev, apdev):
+ """GAS malformed comeback response frames"""
+ hapd = start_ap(apdev[0])
+ bssid = apdev[0]['bssid']
+
+ dev[0].scan(freq="2412")
+ hapd.set("ext_mgmt_frame_handling", "1")
+
+ logger.debug("Non-zero status code in comeback response")
+ query, dialog_token = init_gas(hapd, bssid, dev[0])
+ resp = action_response(query)
+ resp['payload'] = anqp_comeback_resp(dialog_token, status_code=2) + struct.pack('<H', 0)
+ send_gas_resp(hapd, resp)
+ expect_gas_result(dev[0], "FAILURE", status=2)
+
+ logger.debug("Different advertisement protocol in comeback response")
+ query, dialog_token = init_gas(hapd, bssid, dev[0])
+ resp = action_response(query)
+ resp['payload'] = anqp_comeback_resp(dialog_token, bogus_adv_proto=True) + struct.pack('<H', 0)
+ send_gas_resp(hapd, resp)
+ expect_gas_result(dev[0], "PEER_ERROR")
+
+ logger.debug("Non-zero frag id and comeback delay in comeback response")
+ query, dialog_token = init_gas(hapd, bssid, dev[0])
+ resp = action_response(query)
+ resp['payload'] = anqp_comeback_resp(dialog_token, id=1, comeback_delay=1) + struct.pack('<H', 0)
+ send_gas_resp(hapd, resp)
+ expect_gas_result(dev[0], "PEER_ERROR")
+
+ logger.debug("Unexpected frag id in comeback response")
+ query, dialog_token = init_gas(hapd, bssid, dev[0])
+ resp = action_response(query)
+ resp['payload'] = anqp_comeback_resp(dialog_token, id=1) + struct.pack('<H', 0)
+ send_gas_resp(hapd, resp)
+ expect_gas_result(dev[0], "PEER_ERROR")
+
+ logger.debug("Empty fragment and replay in comeback response")
+ query, dialog_token = init_gas(hapd, bssid, dev[0])
+ resp = action_response(query)
+ resp['payload'] = anqp_comeback_resp(dialog_token, more=True) + struct.pack('<H', 0)
+ send_gas_resp(hapd, resp)
+ query = gas_rx(hapd)
+ gas = parse_gas(query['payload'])
+ if gas['action'] != GAS_COMEBACK_REQUEST:
+ raise Exception("Unexpected request action")
+ if gas['dialog_token'] != dialog_token:
+ raise Exception("Unexpected dialog token change")
+ resp = action_response(query)
+ resp['payload'] = anqp_comeback_resp(dialog_token) + struct.pack('<H', 0)
+ send_gas_resp(hapd, resp)
+ resp['payload'] = anqp_comeback_resp(dialog_token, id=1) + struct.pack('<H', 0)
+ send_gas_resp(hapd, resp)
+ expect_gas_result(dev[0], "SUCCESS")
+
+ logger.debug("Unexpected initial response when waiting for comeback response")
+ query, dialog_token = init_gas(hapd, bssid, dev[0])
+ resp = action_response(query)
+ resp['payload'] = anqp_initial_resp(dialog_token, 0) + struct.pack('<H', 0)
+ send_gas_resp(hapd, resp)
+ ev = hapd.wait_event(["MGMT-RX"], timeout=1)
+ if ev is not None:
+ raise Exception("Unexpected management frame")
+ expect_gas_result(dev[0], "TIMEOUT")
+
+ logger.debug("Too short comeback response")
+ query, dialog_token = init_gas(hapd, bssid, dev[0])
+ resp = action_response(query)
+ resp['payload'] = struct.pack('<BBBH', ACTION_CATEG_PUBLIC,
+ GAS_COMEBACK_RESPONSE, dialog_token, 0)
+ send_gas_resp(hapd, resp)
+ ev = hapd.wait_event(["MGMT-RX"], timeout=1)
+ if ev is not None:
+ raise Exception("Unexpected management frame")
+ expect_gas_result(dev[0], "TIMEOUT")
+
+ logger.debug("Too short comeback response(2)")
+ query, dialog_token = init_gas(hapd, bssid, dev[0])
+ resp = action_response(query)
+ resp['payload'] = struct.pack('<BBBHBB', ACTION_CATEG_PUBLIC,
+ GAS_COMEBACK_RESPONSE, dialog_token, 0, 0x80,
+ 0)
+ send_gas_resp(hapd, resp)
+ ev = hapd.wait_event(["MGMT-RX"], timeout=1)
+ if ev is not None:
+ raise Exception("Unexpected management frame")
+ expect_gas_result(dev[0], "TIMEOUT")
+
+ logger.debug("Maximum comeback response fragment claiming more fragments")
+ query, dialog_token = init_gas(hapd, bssid, dev[0])
+ resp = action_response(query)
+ resp['payload'] = anqp_comeback_resp(dialog_token, more=True) + struct.pack('<H', 0)
+ send_gas_resp(hapd, resp)
+ for i in range(1, 129):
+ query = gas_rx(hapd)
+ gas = parse_gas(query['payload'])
+ if gas['action'] != GAS_COMEBACK_REQUEST:
+ raise Exception("Unexpected request action")
+ if gas['dialog_token'] != dialog_token:
+ raise Exception("Unexpected dialog token change")
+ resp = action_response(query)
+ resp['payload'] = anqp_comeback_resp(dialog_token, id=i, more=True) + struct.pack('<H', 0)
+ send_gas_resp(hapd, resp)
+ expect_gas_result(dev[0], "PEER_ERROR")
+
+def test_gas_comeback_resp_additional_delay(dev, apdev):
+ """GAS comeback response requesting additional delay"""
+ hapd = start_ap(apdev[0])
+ bssid = apdev[0]['bssid']
+
+ dev[0].scan(freq="2412")
+ hapd.set("ext_mgmt_frame_handling", "1")
+
+ query, dialog_token = init_gas(hapd, bssid, dev[0])
+ for i in range(0, 2):
+ resp = action_response(query)
+ resp['payload'] = anqp_comeback_resp(dialog_token, status_code=95, comeback_delay=50) + struct.pack('<H', 0)
+ send_gas_resp(hapd, resp)
+ query = gas_rx(hapd)
+ gas = parse_gas(query['payload'])
+ if gas['action'] != GAS_COMEBACK_REQUEST:
+ raise Exception("Unexpected request action")
+ if gas['dialog_token'] != dialog_token:
+ raise Exception("Unexpected dialog token change")
+ resp = action_response(query)
+ resp['payload'] = anqp_comeback_resp(dialog_token, status_code=0) + struct.pack('<H', 0)
+ send_gas_resp(hapd, resp)
+ expect_gas_result(dev[0], "SUCCESS")