logger = logging.getLogger()
import re
import socket
+import httplib
+import urlparse
+import urllib
+import xml.etree.ElementTree as ET
+import StringIO
import hwsim_utils
import hostapd
'MAN: "ssdp:discover"',
'ST: ' + st,
'', ''])
- ssdp_send(msg)
+ return ssdp_send(msg)
def test_ap_wps_ssdp_msearch(dev, apdev):
"""WPS AP and SSDP M-SEARCH messages"""
break
except socket.timeout:
raise Exception("No SSDP response")
+
+def ssdp_get_location(uuid):
+ res = ssdp_send_msearch("uuid:" + uuid)
+ location = None
+ for l in res.splitlines():
+ if l.lower().startswith("location:"):
+ location = l.split(':', 1)[1].strip()
+ break
+ if location is None:
+ raise Exception("No UPnP location found")
+ return location
+
+def upnp_get_urls(location):
+ conn = urllib.urlopen(location)
+ tree = ET.parse(conn)
+ root = tree.getroot()
+ urn = '{urn:schemas-upnp-org:device-1-0}'
+ service = root.find("./" + urn + "device/" + urn + "serviceList/" + urn + "service")
+ res = {}
+ res['scpd_url'] = urlparse.urljoin(location, service.find(urn + 'SCPDURL').text)
+ res['control_url'] = urlparse.urljoin(location, service.find(urn + 'controlURL').text)
+ res['event_sub_url'] = urlparse.urljoin(location, service.find(urn + 'eventSubURL').text)
+ return res
+
+def upnp_soap_action(conn, path, action, include_soap_action=True, soap_action_override=None):
+ soapns = 'http://schemas.xmlsoap.org/soap/envelope/'
+ wpsns = 'urn:schemas-wifialliance-org:service:WFAWLANConfig:1'
+ ET.register_namespace('soapenv', soapns)
+ ET.register_namespace('wfa', wpsns)
+ attrib = {}
+ attrib['{%s}encodingStyle' % soapns] = 'http://schemas.xmlsoap.org/soap/encoding/'
+ root = ET.Element("{%s}Envelope" % soapns, attrib=attrib)
+ body = ET.SubElement(root, "{%s}Body" % soapns)
+ act = ET.SubElement(body, "{%s}%s" % (wpsns, action))
+ tree = ET.ElementTree(root)
+ soap = StringIO.StringIO()
+ tree.write(soap, xml_declaration=True, encoding='utf-8')
+
+ headers = { "Content-type": 'text/xml; charset="utf-8"' }
+ if include_soap_action:
+ headers["SOAPAction"] = '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#%s"' % action
+ elif soap_action_override:
+ headers["SOAPAction"] = soap_action_override
+ conn.request("POST", path, soap.getvalue(), headers)
+ return conn.getresponse()
+
+def test_ap_wps_upnp(dev, apdev):
+ """WPS AP and UPnP operations"""
+ ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
+ add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
+
+ location = ssdp_get_location(ap_uuid)
+ urls = upnp_get_urls(location)
+
+ conn = urllib.urlopen(urls['scpd_url'])
+ scpd = conn.read()
+
+ conn = urllib.urlopen(urlparse.urljoin(location, "unknown.html"))
+ if conn.getcode() != 404:
+ raise Exception("Unexpected HTTP response to GET unknown URL")
+
+ url = urlparse.urlparse(location)
+ conn = httplib.HTTPConnection(url.netloc)
+ #conn.set_debuglevel(1)
+ headers = { "Content-type": 'text/xml; charset="utf-8"',
+ "SOAPAction": '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo"' }
+ conn.request("POST", "hello", "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 404:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+ conn.request("UNKNOWN", "hello", "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 501:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+ headers = { "Content-type": 'text/xml; charset="utf-8"',
+ "SOAPAction": '"urn:some-unknown-action#GetDeviceInfo"' }
+ ctrlurl = urlparse.urlparse(urls['control_url'])
+ conn.request("POST", ctrlurl.path, "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 401:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+ logger.debug("GetDeviceInfo without SOAPAction header")
+ resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo",
+ include_soap_action=False)
+ if resp.status != 401:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+ logger.debug("GetDeviceInfo with invalid SOAPAction header")
+ for act in [ "foo",
+ "urn:schemas-wifialliance-org:service:WFAWLANConfig:1#GetDeviceInfo",
+ '"urn:schemas-wifialliance-org:service:WFAWLANConfig:1"',
+ '"urn:schemas-wifialliance-org:service:WFAWLANConfig:123#GetDevice']:
+ resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo",
+ include_soap_action=False,
+ soap_action_override=act)
+ if resp.status != 401:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+ resp = upnp_soap_action(conn, ctrlurl.path, "GetDeviceInfo")
+ if resp.status != 200:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+ dev = resp.read()
+ if "NewDeviceInfo" not in dev:
+ raise Exception("Unexpected GetDeviceInfo response")
+
+ logger.debug("PutMessage without required parameters")
+ resp = upnp_soap_action(conn, ctrlurl.path, "PutMessage")
+ if resp.status != 600:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+ logger.debug("PutWLANResponse without required parameters")
+ resp = upnp_soap_action(conn, ctrlurl.path, "PutWLANResponse")
+ if resp.status != 600:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+ logger.debug("SetSelectedRegistrar from unregistered ER")
+ resp = upnp_soap_action(conn, ctrlurl.path, "SetSelectedRegistrar")
+ if resp.status != 501:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+ logger.debug("Unknown action")
+ resp = upnp_soap_action(conn, ctrlurl.path, "Unknown")
+ if resp.status != 401:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+def test_ap_wps_upnp_subscribe(dev, apdev):
+ """WPS AP and UPnP event subscription"""
+ ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
+ add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
+
+ location = ssdp_get_location(ap_uuid)
+ urls = upnp_get_urls(location)
+ eventurl = urlparse.urlparse(urls['event_sub_url'])
+
+ url = urlparse.urlparse(location)
+ conn = httplib.HTTPConnection(url.netloc)
+ #conn.set_debuglevel(1)
+ headers = { "callback": '<http://127.0.0.1:12345/event>',
+ "timeout": "Second-1234" }
+ conn.request("SUBSCRIBE", "hello", "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 412:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+ conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 412:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+ headers = { "NT": "upnp:event",
+ "timeout": "Second-1234" }
+ conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 412:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+ headers = { "callback": '<http://127.0.0.1:12345/event>',
+ "NT": "upnp:foobar",
+ "timeout": "Second-1234" }
+ conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 400:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+ logger.debug("Valid subscription")
+ headers = { "callback": '<http://127.0.0.1:12345/event>',
+ "NT": "upnp:event",
+ "timeout": "Second-1234" }
+ conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 200:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+ sid = resp.getheader("sid")
+ logger.debug("Subscription SID " + sid)
+
+ logger.debug("Invalid re-subscription")
+ headers = { "NT": "upnp:event",
+ "sid": "123456734567854",
+ "timeout": "Second-1234" }
+ conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 400:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+ logger.debug("Invalid re-subscription")
+ headers = { "NT": "upnp:event",
+ "sid": "uuid:123456734567854",
+ "timeout": "Second-1234" }
+ conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 400:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+ logger.debug("Invalid re-subscription")
+ headers = { "callback": '<http://127.0.0.1:12345/event>',
+ "NT": "upnp:event",
+ "sid": sid,
+ "timeout": "Second-1234" }
+ conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 400:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+ logger.debug("SID mismatch in re-subscription")
+ headers = { "NT": "upnp:event",
+ "sid": "uuid:4c2bca79-1ff4-4e43-85d4-952a2b8a51fb",
+ "timeout": "Second-1234" }
+ conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 412:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+ logger.debug("Valid re-subscription")
+ headers = { "NT": "upnp:event",
+ "sid": sid,
+ "timeout": "Second-1234" }
+ conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 200:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+ sid2 = resp.getheader("sid")
+ logger.debug("Subscription SID " + sid2)
+
+ if sid != sid2:
+ raise Exception("Unexpected SID change")
+
+ logger.debug("Valid re-subscription")
+ headers = { "NT": "upnp:event",
+ "sid": "uuid: \t \t" + sid.split(':')[1],
+ "timeout": "Second-1234" }
+ conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 200:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+ logger.debug("Invalid unsubscription")
+ headers = { "sid": sid }
+ conn.request("UNSUBSCRIBE", "/hello", "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 412:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+ headers = { "foo": "bar" }
+ conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 412:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+ logger.debug("Valid unsubscription")
+ headers = { "sid": sid }
+ conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 200:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+ logger.debug("Unsubscription for not existing SID")
+ headers = { "sid": sid }
+ conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 412:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+ logger.debug("Invalid unsubscription")
+ headers = { "sid": " \t \tfoo" }
+ conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 400:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+ logger.debug("Invalid unsubscription")
+ headers = { "sid": "uuid:\t \tfoo" }
+ conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 400:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+ logger.debug("Invalid unsubscription")
+ headers = { "NT": "upnp:event",
+ "sid": sid }
+ conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 400:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+ headers = { "callback": '<http://127.0.0.1:12345/event>',
+ "sid": sid }
+ conn.request("UNSUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 400:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+
+ logger.debug("Valid subscription with multiple callbacks")
+ headers = { "callback": '<http://127.0.0.1:12345/event> <http://127.0.0.1:12345/event>\t<http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event><http://127.0.0.1:12345/event>',
+ "NT": "upnp:event",
+ "timeout": "Second-1234" }
+ conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
+ resp = conn.getresponse()
+ if resp.status != 200:
+ raise Exception("Unexpected HTTP response: %s" % resp.status)
+ sid = resp.getheader("sid")
+ logger.debug("Subscription SID " + sid)