]> git.ipfire.org Git - thirdparty/hostap.git/commitdiff
tests: Extend DSCP testing coverage
authorJouni Malinen <jouni@codeaurora.org>
Thu, 30 Sep 2021 13:55:01 +0000 (16:55 +0300)
committerJouni Malinen <j@w1.fi>
Thu, 30 Sep 2021 14:53:43 +0000 (17:53 +0300)
Signed-off-by: Jouni Malinen <jouni@codeaurora.org>
tests/hwsim/test_dscp.py

index cc09dc4753efcb59760bb6563e2b92b9631b66eb..e017938bc355d2420008156c2e5590a1b8368958 100644 (file)
@@ -20,15 +20,20 @@ def register_dscp_req(hapd):
     if "OK" not in hapd.request("REGISTER_FRAME %04x %s" % (type, match)):
         raise Exception("Could not register frame reception for Vendor specific protected type")
 
-def send_dscp_req(hapd, da, oui_subtype, dialog_token, req_control, qos_ie):
+def send_dscp_req(hapd, da, oui_subtype, dialog_token, req_control, qos_ie,
+                  truncate=False):
     type = 0
     subtype = 13
     category = 126
     oui_type = 0x506f9a1a
-    req = struct.pack('>BLBBB', category, oui_type, oui_subtype,
-                      dialog_token, req_control)
-    if qos_ie:
-        req += qos_ie
+    if truncate:
+        req = struct.pack('>BLBB', category, oui_type, oui_subtype,
+                          dialog_token)
+    else:
+        req = struct.pack('>BLBBB', category, oui_type, oui_subtype,
+                          dialog_token, req_control)
+        if qos_ie:
+            req += qos_ie
 
     msg = {}
     msg['fc'] = 0x00d0
@@ -44,8 +49,8 @@ def send_dscp_req(hapd, da, oui_subtype, dialog_token, req_control, qos_ie):
     if ev is None or "stype=13 ok=1" not in ev:
         raise Exception("No DSCP Policy Request sent")
 
-def prepare_qos_ie(policy_id, req_type, dscp, start_port, end_port,
-                   frame_classifier, frame_class_len, domain_name):
+def prepare_qos_ie(policy_id, req_type, dscp, start_port=0, end_port=0,
+                   frame_classifier=None, frame_class_len=0, domain_name=None):
     qos_elem_oui_type = 0x229a6f50
     qos_elem_id = 221
 
@@ -93,17 +98,17 @@ def validate_dscp_req_event(dev, event):
     if ev is None:
         raise Exception("No DSCP request reported")
     if ev != event:
-        print("Invalid DSCP event received")
+        raise Exception("Invalid DSCP event received (%s; expected: %s)" % (ev, event))
 
 def handle_dscp_query(hapd, query):
     msg = hapd.mgmt_rx()
     if msg['payload'] != query:
-        print("Invalid DSCP Query received at AP")
+        raise Exception("Invalid DSCP Query received at AP")
 
 def handle_dscp_response(hapd, response):
     msg = hapd.mgmt_rx()
     if msg['payload'] != response:
-        print("Invalid DSCP Response received at AP")
+        raise Exception("Invalid DSCP Response received at AP")
 
 def ap_sta_connectivity(dev, apdev, params):
     p = hostapd.wpa2_params(passphrase="12345678")
@@ -116,6 +121,7 @@ def ap_sta_connectivity(dev, apdev, params):
     dev[0].request("SET enable_dscp_policy_capa 1")
     dev[0].connect("dscp", psk="12345678", ieee80211w="1",
                    key_mgmt="WPA-PSK WPA-PSK-SHA256", scan_freq="2412")
+    hapd.wait_sta()
 
     hapd.dump_monitor()
     hapd.set("ext_mgmt_frame_handling", "1")
@@ -149,6 +155,11 @@ def test_dscp_query(dev, apdev):
     handle_dscp_query(hapd, query)
 
     # Negative tests
+
+    cmd = "DSCP_QUERY domain_name=" + 250*'a' + ".example.com"
+    if "FAIL" not in dev[0].request(cmd):
+        raise Exception("Invalid DSCP_QUERY accepted")
+
     dev[0].disconnect_and_stop_scan()
     # AP without DSCP Capabilities
     params = {"ssid": "dscp",
@@ -279,6 +290,28 @@ def test_dscp_request(dev, apdev):
     event = "<3>CTRL-EVENT-DSCP-POLICY request_end"
     validate_dscp_req_event(dev[0], event)
 
+    # Too short DSCP Policy Request frame
+    dialog_token += 1
+    send_dscp_req(hapd, da, 1, dialog_token, 0, None, truncate=True)
+
+    # Request Type: Remove
+    dialog_token += 1
+    qos_ie = prepare_qos_ie(1, 1, 36)
+    send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie)
+    validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_start")
+    validate_dscp_req_event(dev[0],
+                            "<3>CTRL-EVENT-DSCP-POLICY remove policy_id=1")
+    validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_end")
+
+    # Request Type: Reserved
+    dialog_token += 1
+    qos_ie = prepare_qos_ie(1, 2, 36)
+    send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie)
+    validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_start")
+    validate_dscp_req_event(dev[0],
+                            "<3>CTRL-EVENT-DSCP-POLICY reject policy_id=1")
+    validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_end")
+
 def test_dscp_response(dev, apdev):
     """DSCP Policy Response"""
 
@@ -333,3 +366,42 @@ def test_dscp_response(dev, apdev):
     cmd = "DSCP_RESP solicited policy_id=1 status=0 policy_id=5 status=0"
     if "FAIL" not in dev[0].request(cmd):
         raise Exception("Able to send invalid DSCP response")
+
+def test_dscp_unsolicited_req_at_assoc(dev, apdev):
+    """DSCP Policy and unsolicited request at association"""
+    params = {"ssid": "dscp",
+              "ext_capa": 6*"00" + "40",
+              "assocresp_elements": "dd06506f9a230103",
+              "vendor_elements": "dd06506f9a230103"}
+    hapd = ap_sta_connectivity(dev, apdev, params)
+    da = dev[0].own_addr()
+
+    dialog_token = 1
+    qos_ie = prepare_qos_ie(1, 0, 36, 12345, 23456)
+    send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie)
+    validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_start")
+    validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY add policy_id=1 dscp=36 ip_version=0 start_port=12345 end_port=23456")
+    validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_end")
+
+    cmd = "DSCP_QUERY wildcard"
+    if "OK" not in dev[0].request(cmd):
+        raise Exception("Sending DSCP Query failed")
+
+def test_dscp_missing_unsolicited_req_at_assoc(dev, apdev):
+    """DSCP Policy and missing unsolicited request at association"""
+    params = {"ssid": "dscp",
+              "ext_capa": 6*"00" + "40",
+              "assocresp_elements": "dd06506f9a230103",
+              "vendor_elements": "dd06506f9a230103"}
+    hapd = ap_sta_connectivity(dev, apdev, params)
+    da = dev[0].own_addr()
+
+    cmd = "DSCP_QUERY wildcard"
+    if "FAIL" not in dev[0].request(cmd):
+        raise Exception("DSCP_QUERY accepted during wait for unsolicited requesdt")
+    time.sleep(5)
+    validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_wait end")
+
+    cmd = "DSCP_QUERY wildcard"
+    if "OK" not in dev[0].request(cmd):
+        raise Exception("Sending DSCP Query failed")