# #
###############################################################################
+import base64
import re
+import socket
import urllib
import urllib2
from __version__ import CLIENT_VERSION
+from .errors import *
from i18n import _
# Initialize the logger.
return proxy
- def guess_external_ipv6_address(self):
+ def _guess_external_ip_address(self, url, timeout=10):
"""
Sends a request to an external web server
to determine the current default IP address.
"""
- response = self.send_request("http://checkip6.dns.lightningwirelabs.com")
+ try:
+ response = self.send_request(url, timeout=timeout)
+
+ # If the server could not be reached, we will return nothing.
+ except DDNSNetworkError:
+ return
if not response.code == 200:
return
return match.group(1)
- def guess_external_ipv4_address(self):
+ def guess_external_ipv6_address(self):
"""
Sends a request to the internet to determine
- the public IP address.
-
- XXX does not work for IPv6.
+ the public IPv6 address.
"""
- response = self.send_request("http://checkip4.dns.lightningwirelabs.com")
+ return self._guess_external_ip_address("http://checkip6.dns.lightningwirelabs.com")
- if response.code == 200:
- match = re.search(r"Your IP address is: (\d+.\d+.\d+.\d+)", response.read())
- if match is None:
- return
-
- return match.group(1)
+ def guess_external_ipv4_address(self):
+ """
+ Sends a request to the internet to determine
+ the public IPv4 address.
+ """
+ return self._guess_external_ip_address("http://checkip4.dns.lightningwirelabs.com")
- def send_request(self, url, method="GET", data=None, timeout=30):
+ def send_request(self, url, method="GET", data=None, username=None, password=None, timeout=30):
assert method in ("GET", "POST")
# Add all arguments in the data dict to the URL and escape them properly.
query_args = self._format_query_args(data)
data = None
- url = "%s?%s" % (url, query_args)
+ if "?" in url:
+ url = "%s&%s" % (url, query_args)
+ else:
+ url = "%s?%s" % (url, query_args)
logger.debug("Sending request (%s): %s" % (method, url))
if data:
req = urllib2.Request(url, data=data)
+ if username and password:
+ basic_auth_header = self._make_basic_auth_header(username, password)
+ print repr(basic_auth_header)
+ req.add_header("Authorization", "Basic %s" % basic_auth_header)
+
# Set the user agent.
req.add_header("User-Agent", self.USER_AGENT)
logger.debug(" %s: %s" % (k, v))
try:
- resp = urllib2.urlopen(req)
+ resp = urllib2.urlopen(req, timeout=timeout)
# Log response header.
logger.debug(_("Response header:"))
# Return the entire response object.
return resp
+ except urllib2.HTTPError, e:
+ # 503 - Service Unavailable
+ if e.code == 503:
+ raise DDNSServiceUnavailableError
+
+ # Raise all other unhandled exceptions.
+ raise
+
except urllib2.URLError, e:
+ if e.reason:
+ # Network Unreachable (e.g. no IPv6 access)
+ if e.reason.errno == 101:
+ raise DDNSNetworkUnreachableError
+ elif e.reason.errno == 111:
+ raise DDNSConnectionRefusedError
+
+ # Raise all other unhandled exceptions.
raise
+ except socket.timeout, e:
+ logger.debug(_("Connection timeout"))
+
+ raise DDNSConnectionTimeoutError
+
def _format_query_args(self, data):
args = []
return "&".join(args)
+ def _make_basic_auth_header(self, username, password):
+ authstring = "%s:%s" % (username, password)
+
+ # Encode authorization data in base64.
+ authstring = base64.encodestring(authstring)
+
+ # Remove any newline characters.
+ authstring = authstring.replace("\n", "")
+
+ return authstring
+
def get_address(self, proto):
assert proto in ("ipv6", "ipv4")
# XXX TODO
assert False
+
+ def resolve(self, hostname, proto=None):
+ addresses = []
+
+ if proto is None:
+ family = 0
+ elif proto == "ipv6":
+ family = socket.AF_INET6
+ elif proto == "ipv4":
+ family = socket.AF_INET
+ else:
+ raise ValueError("Protocol not supported: %s" % proto)
+
+ # Resolve the host address.
+ try:
+ response = socket.getaddrinfo(hostname, None, family)
+ except socket.gaierror, e:
+ # Name or service not known
+ if e.errno == -2:
+ return []
+
+ raise
+
+ # Handle responses.
+ for family, socktype, proto, canonname, sockaddr in response:
+ # IPv6
+ if family == socket.AF_INET6:
+ address, port, flow_info, scope_id = sockaddr
+
+ # Only use the global scope.
+ if not scope_id == 0:
+ continue
+
+ # IPv4
+ elif family == socket.AF_INET:
+ address, port = sockaddr
+
+ # Ignore everything else...
+ else:
+ continue
+
+ # Add to repsonse list if not already in there.
+ if not address in addresses:
+ addresses.append(address)
+
+ return addresses