self._timeout = None
with stack_context.StackContext(self.cleanup):
parsed = urlparse.urlsplit(self.request.url)
- if ":" in parsed.netloc:
- host, _, port = parsed.netloc.partition(":")
- port = int(port)
- else:
- host = parsed.netloc
+ host = parsed.hostname
+ if parsed.port is None:
port = 443 if parsed.scheme == "https" else 80
+ else:
+ port = parsed.port
if self.client.hostname_mapping is not None:
host = self.client.hostname_mapping.get(host, host)
if (self.request.validate_cert and
isinstance(self.stream, SSLIOStream)):
match_hostname(self.stream.socket.getpeercert(),
- parsed.netloc.partition(":")[0])
+ parsed.hostname)
if (self.request.method not in self._SUPPORTED_METHODS and
not self.request.allow_nonstandard_methods):
raise KeyError("unknown method %s" % self.request.method)
raise NotImplementedError('%s not supported' % key)
if "Host" not in self.request.headers:
self.request.headers["Host"] = parsed.netloc
- if self.request.auth_username:
- auth = "%s:%s" % (self.request.auth_username,
- self.request.auth_password)
+ username, password = None, None
+ if parsed.username is not None:
+ username, password = parsed.username, parsed.password
+ elif self.request.auth_username is not None:
+ username = self.request.auth_username
+ password = self.request.auth_password
+ if username is not None:
+ auth = "%s:%s" % (username, password)
self.request.headers["Authorization"] = ("Basic %s" %
auth.encode("base64"))
if self.request.user_agent:
from __future__ import with_statement
+import base64
import collections
import gzip
import logging
def test_default_certificates_exist(self):
open(_DEFAULT_CA_CERTS)
+ def test_credentials_in_url(self):
+ url = self.get_url("/auth").replace("http://", "http://me:secret@")
+ self.http_client.fetch(url, self.stop)
+ response = self.wait()
+ self.assertEqual("Basic " + base64.b64encode("me:secret"),
+ response.body)
+