import base64
import re
+import ssl
import socket
import urllib
import urllib2
# Connection to the core of the program.
self.core = core
+ # Address cache.
+ self.__addresses = {}
+
+ # Find out on which distribution we are running.
+ self.distro = self._get_distro_identifier()
+ logger.debug(_("Running on distribution: %s") % self.distro)
+
@property
def proxy(self):
proxy = self.core.settings.get("proxy")
return proxy
+ def get_local_ip_address(self, proto):
+ ip_address = self._get_local_ip_address(proto)
+
+ # Check if the IP address is usable and only return it then
+ if self._is_usable_ip_address(proto, ip_address):
+ return ip_address
+
+ def _get_local_ip_address(self, proto):
+ # Legacy code for IPFire 2.
+ if self.distro == "ipfire-2" and proto == "ipv4":
+ try:
+ with open("/var/ipfire/red/local-ipaddress") as f:
+ return f.readline()
+
+ except IOError, e:
+ # File not found
+ if e.errno == 2:
+ return
+
+ raise
+
+ # XXX TODO
+ raise NotImplementedError
+
def _guess_external_ip_address(self, url, timeout=10):
"""
Sends a request to an external web server
return match.group(1)
- def guess_external_ipv6_address(self):
- """
- Sends a request to the internet to determine
- the public IPv6 address.
- """
- return self._guess_external_ip_address("http://checkip6.dns.lightningwirelabs.com")
+ def guess_external_ip_address(self, family, **kwargs):
+ if family == "ipv6":
+ url = "http://checkip6.dns.lightningwirelabs.com"
+ elif family == "ipv4":
+ url = "http://checkip4.dns.lightningwirelabs.com"
+ else:
+ raise ValueError("unknown address family")
- def guess_external_ipv4_address(self):
- """
- Sends a request to the internet to determine
- the public IPv4 address.
- """
- return self._guess_external_ip_address("http://checkip4.dns.lightningwirelabs.com")
+ return self._guess_external_ip_address(url, **kwargs)
def send_request(self, url, method="GET", data=None, username=None, password=None, timeout=30):
assert method in ("GET", "POST")
resp = urllib2.urlopen(req, timeout=timeout)
# Log response header.
- logger.debug(_("Response header:"))
+ logger.debug(_("Response header (Status Code %s):") % resp.code)
for k, v in resp.info().items():
logger.debug(" %s: %s" % (k, v))
return resp
except urllib2.HTTPError, e:
+ # Log response header.
+ logger.debug(_("Response header (Status Code %s):") % e.code)
+ for k, v in e.hdrs.items():
+ logger.debug(" %s: %s" % (k, v))
+
+ # 400 - Bad request
+ if e.code == 400:
+ raise DDNSRequestError(e.reason)
+
+ # 401 - Authorization Required
+ # 403 - Forbidden
+ elif e.code in (401, 403):
+ raise DDNSAuthenticationError(e.reason)
+
+ # 404 - Not found
+ # Either the provider has changed the API, or
+ # there is an error on the server
+ elif e.code == 404:
+ raise DDNSNotFound(e.reason)
+
+ # 429 - Too Many Requests
+ elif e.code == 429:
+ raise DDNSTooManyRequests(e.reason)
+
+ # 500 - Internal Server Error
+ elif e.code == 500:
+ raise DDNSInternalServerError(e.reason)
+
# 503 - Service Unavailable
- if e.code == 503:
- raise DDNSServiceUnavailableError
+ elif e.code == 503:
+ raise DDNSServiceUnavailableError(e.reason)
# Raise all other unhandled exceptions.
raise
except urllib2.URLError, e:
if e.reason:
+ # Handle SSL errors
+ if isinstance(e.reason, ssl.SSLError):
+ e = e.reason
+
+ if e.reason == "CERTIFICATE_VERIFY_FAILED":
+ raise DDNSCertificateError
+
+ # Raise all other SSL errors
+ raise DDNSSSLError(e.reason)
+
+ # Name or service not known
+ if e.reason.errno == -2:
+ raise DDNSResolveError
+
# Network Unreachable (e.g. no IPv6 access)
if e.reason.errno == 101:
raise DDNSNetworkUnreachableError
+
+ # Connection Refused
elif e.reason.errno == 111:
raise DDNSConnectionRefusedError
+ # No route to host
+ elif e.reason.errno == 113:
+ raise DDNSNoRouteToHostError(req.host)
+
# Raise all other unhandled exceptions.
raise
return authstring
def get_address(self, proto):
+ """
+ Returns the current IP address for
+ the given IP protocol.
+ """
+ try:
+ return self.__addresses[proto]
+
+ # IP is currently unknown and needs to be retrieved.
+ except KeyError:
+ self.__addresses[proto] = address = \
+ self._get_address(proto)
+
+ return address
+
+ def _get_address(self, proto):
assert proto in ("ipv6", "ipv4")
+ # IPFire 2 does not support IPv6.
+ if self.distro == "ipfire-2" and proto == "ipv6":
+ return
+
# Check if the external IP address should be guessed from
# a remote server.
guess_ip = self.core.settings.get("guess_external_ip", "true")
+ guess_ip = guess_ip in ("true", "yes", "1")
- # If the external IP address should be used, we just do
- # that.
- if guess_ip in ("true", "yes", "1"):
- if proto == "ipv6":
- return self.guess_external_ipv6_address()
+ # Get the local IP address.
+ local_ip_address = None
- elif proto == "ipv4":
- return self.guess_external_ipv4_address()
+ if not guess_ip:
+ try:
+ local_ip_address = self.get_local_ip_address(proto)
+ except NotImplementedError:
+ logger.warning(_("Falling back to check the IP address with help of a public server"))
- # XXX TODO
- assert False
+ # If no local IP address could be determined, we will fall back to the guess
+ # it with help of an external server...
+ if not local_ip_address:
+ local_ip_address = self.guess_external_ip_address(proto)
+
+ return local_ip_address
+
+ def _is_usable_ip_address(self, proto, address):
+ """
+ Returns True is the local IP address is usable
+ for dynamic DNS (i.e. is not a RFC1918 address or similar).
+ """
+ if proto == "ipv4":
+ # This is not the most perfect solution to match
+ # these addresses, but instead of pulling in an entire
+ # library to handle the IP addresses better, we match
+ # with regular expressions instead.
+ matches = (
+ # RFC1918 address space
+ r"^10\.\d+\.\d+\.\d+$",
+ r"^192\.168\.\d+\.\d+$",
+ r"^172\.(1[6-9]|2[0-9]|31)\.\d+\.\d+$",
+
+ # Dual Stack Lite address space
+ r"^100\.(6[4-9]|[7-9][0-9]|1[01][0-9]|12[0-7])\.\d+\.\d+$",
+ )
+
+ for match in matches:
+ m = re.match(match, address)
+ if m is None:
+ continue
+
+ # Found a match. IP address is not usable.
+ return False
+
+ # In all other cases, return OK.
+ return True
def resolve(self, hostname, proto=None):
addresses = []
if e.errno == -2:
return []
+ # Temporary failure in name resolution
+ elif e.errno == -3:
+ raise DDNSResolveError(hostname)
+
# No record for requested family available (e.g. no AAAA)
elif e.errno == -5:
return []
addresses.append(address)
return addresses
+
+ def _get_distro_identifier(self):
+ """
+ Returns a unique identifier for the distribution
+ we are running on.
+ """
+ os_release = self.__parse_os_release()
+ if os_release:
+ return os_release
+
+ system_release = self.__parse_system_release()
+ if system_release:
+ return system_release
+
+ # If nothing else could be found, we return
+ # just "unknown".
+ return "unknown"
+
+ def __parse_os_release(self):
+ """
+ Tries to parse /etc/os-release and
+ returns a unique distribution identifier
+ if the file exists.
+ """
+ try:
+ f = open("/etc/os-release", "r")
+ except IOError, e:
+ # File not found
+ if e.errno == 2:
+ return
+
+ raise
+
+ os_release = {}
+ with f:
+ for line in f.readlines():
+ m = re.match(r"^([A-Z\_]+)=(.*)$", line)
+ if m is None:
+ continue
+
+ os_release[m.group(1)] = m.group(2)
+
+ try:
+ return "%(ID)s-%(VERSION_ID)s" % os_release
+ except KeyError:
+ return
+
+ def __parse_system_release(self):
+ """
+ Tries to parse /etc/system-release and
+ returns a unique distribution identifier
+ if the file exists.
+ """
+ try:
+ f = open("/etc/system-release", "r")
+ except IOError, e:
+ # File not found
+ if e.errno == 2:
+ return
+
+ raise
+
+ with f:
+ # Read first line
+ line = f.readline()
+
+ # Check for IPFire systems
+ m = re.match(r"^IPFire (\d).(\d+)", line)
+ if m:
+ return "ipfire-%s" % m.group(1)