import shutil
import struct
import sys
+from test_ap_hs20 import hs20_ap_params
try:
if sys.version_info[0] > 2:
with TestDbusInterworking(bus) as t:
if not t.success():
raise Exception("Expected signals not seen")
+
+def test_dbus_anqp_get(dev, apdev):
+ """D-Bus ANQP get test"""
+ (bus, wpa_obj, path, if_obj) = prepare_dbus(dev[0])
+ iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
+
+ bssid = apdev[0]['bssid']
+ params = hs20_ap_params(ssid="test-anqp")
+ params["hessid"] = bssid
+ params['mbo'] = '1'
+ params['mbo_cell_data_conn_pref'] = '1'
+ params['hs20_oper_friendly_name'] = ["eng:Example operator",
+ "fin:Esimerkkioperaattori"]
+ hapd = hostapd.add_ap(apdev[0], params)
+
+ dev[0].scan_for_bss(bssid, freq="2412", force_scan=True)
+ iface.ANQPGet({"addr": bssid,
+ "ids": dbus.Array([257], dbus.Signature("q")),
+ "mbo_ids": dbus.Array([2], dbus.Signature("y")),
+ "hs20_ids": dbus.Array([3, 4], dbus.Signature("y"))})
+
+ ev = dev[0].wait_event(["GAS-QUERY-DONE"], timeout=10)
+ if ev is None:
+ raise Exception("GAS query timed out")
+
+ ev = dev[0].wait_event(["RX-ANQP"], timeout=1)
+ if ev is None or "ANQP Capability list" not in ev:
+ raise Exception("Did not receive Capability list")
+
+ ev = dev[0].wait_event(["RX-HS20-ANQP"], timeout=1)
+ if ev is None or "Operator Friendly Name" not in ev:
+ raise Exception("Did not receive Operator Friendly Name")
+
+ ev = dev[0].wait_event(["RX-MBO-ANQP"], timeout=1)
+ if ev is None or "cell_conn_pref" not in ev:
+ raise Exception("Did not receive MBO Cellular Data Connection Preference")
+
+ bss = dev[0].get_bss(bssid)
+ if 'anqp_capability_list' not in bss:
+ raise Exception("Capability List ANQP-element not seen")
#ifdef CONFIG_INTERWORKING
+
DBusMessage *
wpas_dbus_handler_interworking_select(DBusMessage *message,
struct wpa_supplicant *wpa_s)
return reply;
}
+
+
+DBusMessage *
+wpas_dbus_handler_anqp_get(DBusMessage *message, struct wpa_supplicant *wpa_s)
+{
+ DBusMessageIter iter, iter_dict;
+ struct wpa_dbus_dict_entry entry;
+ int ret;
+ u8 dst_addr[ETH_ALEN];
+ bool is_addr_present = false;
+ unsigned int freq = 0;
+#define MAX_ANQP_INFO_ID 100 /* Max info ID count from CLI implementation */
+ u16 id[MAX_ANQP_INFO_ID];
+ size_t num_id = 0;
+ u32 subtypes = 0;
+ u32 mbo_subtypes = 0;
+ size_t i;
+
+ dbus_message_iter_init(message, &iter);
+
+ if (!wpa_dbus_dict_open_read(&iter, &iter_dict, NULL))
+ return wpas_dbus_error_invalid_args(message, NULL);
+
+ while (wpa_dbus_dict_has_dict_entry(&iter_dict)) {
+ if (!wpa_dbus_dict_get_entry(&iter_dict, &entry))
+ return wpas_dbus_error_invalid_args(message, NULL);
+
+ if (os_strcmp(entry.key, "addr") == 0 &&
+ entry.type == DBUS_TYPE_STRING) {
+ if (hwaddr_aton(entry.str_value, dst_addr)) {
+ wpa_printf(MSG_DEBUG,
+ "%s[dbus]: Invalid address '%s'",
+ __func__, entry.str_value);
+ wpa_dbus_dict_entry_clear(&entry);
+ return wpas_dbus_error_invalid_args(
+ message, "invalid address");
+ }
+
+ is_addr_present = true;
+ } else if (os_strcmp(entry.key, "freq") == 0 &&
+ entry.type == DBUS_TYPE_UINT32) {
+ freq = entry.uint32_value;
+ } else if (os_strcmp(entry.key, "ids") == 0 &&
+ entry.type == DBUS_TYPE_ARRAY &&
+ entry.array_type == DBUS_TYPE_UINT16) {
+ for (i = 0; i < entry.array_len &&
+ num_id < MAX_ANQP_INFO_ID; i++) {
+ id[num_id] = entry.uint16array_value[i];
+ num_id++;
+ }
+ } else if (os_strcmp(entry.key, "hs20_ids") == 0 &&
+ entry.type == DBUS_TYPE_ARRAY &&
+ entry.array_type == DBUS_TYPE_BYTE) {
+ for (i = 0; i < entry.array_len; i++) {
+ int num = entry.bytearray_value[i];
+
+ if (num <= 0 || num > 31) {
+ wpa_dbus_dict_entry_clear(&entry);
+ return wpas_dbus_error_invalid_args(
+ message,
+ "invalid HS20 ANQP id");
+ }
+ subtypes |= BIT(num);
+ }
+ } else if (os_strcmp(entry.key, "mbo_ids") == 0 &&
+ entry.type == DBUS_TYPE_ARRAY &&
+ entry.array_type == DBUS_TYPE_BYTE) {
+ for (i = 0; i < entry.array_len; i++) {
+ int num = entry.bytearray_value[i];
+
+ if (num <= 0 || num > MAX_MBO_ANQP_SUBTYPE) {
+ wpa_dbus_dict_entry_clear(&entry);
+ return wpas_dbus_error_invalid_args(
+ message, "invalid MBO ANQP id");
+ }
+ mbo_subtypes |= BIT(num);
+ }
+ } else {
+ wpa_dbus_dict_entry_clear(&entry);
+ return wpas_dbus_error_invalid_args(
+ message, "unsupported parameter");
+ }
+
+ wpa_dbus_dict_entry_clear(&entry);
+ }
+
+ if (!is_addr_present) {
+ wpa_printf(MSG_DEBUG,
+ "%s[dbus]: address not provided", __func__);
+ return wpas_dbus_error_invalid_args(message,
+ "address not provided");
+ }
+
+ ret = anqp_send_req(wpa_s, dst_addr, freq, id, num_id, subtypes,
+ mbo_subtypes);
+ if (ret < 0) {
+ wpa_printf(MSG_ERROR, "%s[dbus]: failed to send ANQP request",
+ __func__);
+ return wpas_dbus_error_unknown_error(
+ message, "error sending ANQP request");
+ }
+
+ return NULL;
+}
+
#endif /* CONFIG_INTERWORKING */