]> git.ipfire.org Git - ipfire.org.git/blobdiff - www/pages/torrent/client/natpunch.py
Initial checkin.
[ipfire.org.git] / www / pages / torrent / client / natpunch.py
diff --git a/www/pages/torrent/client/natpunch.py b/www/pages/torrent/client/natpunch.py
deleted file mode 100644 (file)
index 896827b..0000000
+++ /dev/null
@@ -1,254 +0,0 @@
-# Written by John Hoffman
-# derived from NATPortMapping.py by Yejun Yang
-# and from example code by Myers Carpenter
-# see LICENSE.txt for license information
-
-import socket
-from traceback import print_exc
-from subnetparse import IP_List
-from clock import clock
-from __init__ import createPeerID
-try:
-    True
-except:
-    True = 1
-    False = 0
-
-DEBUG = False
-
-EXPIRE_CACHE = 30 # seconds
-ID = "BT-"+createPeerID()[-4:]
-
-try:
-    import pythoncom, win32com.client
-    _supported = 1
-except ImportError:
-    _supported = 0
-
-
-
-class _UPnP1:   # derived from Myers Carpenter's code
-                # seems to use the machine's local UPnP
-                # system for its operation.  Runs fairly fast
-
-    def __init__(self):
-        self.map = None
-        self.last_got_map = -10e10
-
-    def _get_map(self):
-        if self.last_got_map + EXPIRE_CACHE < clock():
-            try:
-                dispatcher = win32com.client.Dispatch("HNetCfg.NATUPnP")
-                self.map = dispatcher.StaticPortMappingCollection
-                self.last_got_map = clock()
-            except:
-                self.map = None
-        return self.map
-
-    def test(self):
-        try:
-            assert self._get_map()     # make sure a map was found
-            success = True
-        except:
-            success = False
-        return success
-
-
-    def open(self, ip, p):
-        map = self._get_map()
-        try:
-            map.Add(p,'TCP',p,ip,True,ID)
-            if DEBUG:
-                print 'port opened: '+ip+':'+str(p)
-            success = True
-        except:
-            if DEBUG:
-                print "COULDN'T OPEN "+str(p)
-                print_exc()
-            success = False
-        return success
-
-
-    def close(self, p):
-        map = self._get_map()
-        try:
-            map.Remove(p,'TCP')
-            success = True
-            if DEBUG:
-                print 'port closed: '+str(p)
-        except:
-            if DEBUG:
-                print 'ERROR CLOSING '+str(p)
-                print_exc()
-            success = False
-        return success
-
-
-    def clean(self, retry = False):
-        if not _supported:
-            return
-        try:
-            map = self._get_map()
-            ports_in_use = []
-            for i in xrange(len(map)):
-                try:
-                    mapping = map[i]
-                    port = mapping.ExternalPort
-                    prot = str(mapping.Protocol).lower()
-                    desc = str(mapping.Description).lower()
-                except:
-                    port = None
-                if port and prot == 'tcp' and desc[:3] == 'bt-':
-                    ports_in_use.append(port)
-            success = True
-            for port in ports_in_use:
-                try:
-                    map.Remove(port,'TCP')
-                except:
-                    success = False
-            if not success and not retry:
-                self.clean(retry = True)
-        except:
-            pass
-
-
-class _UPnP2:   # derived from Yejun Yang's code
-                # apparently does a direct search for UPnP hardware
-                # may work in some cases where _UPnP1 won't, but is slow
-                # still need to implement "clean" method
-
-    def __init__(self):
-        self.services = None
-        self.last_got_services = -10e10
-                           
-    def _get_services(self):
-        if not self.services or self.last_got_services + EXPIRE_CACHE < clock():
-            self.services = []
-            try:
-                f=win32com.client.Dispatch("UPnP.UPnPDeviceFinder")
-                for t in ( "urn:schemas-upnp-org:service:WANIPConnection:1",
-                           "urn:schemas-upnp-org:service:WANPPPConnection:1" ):
-                    try:
-                        conns = f.FindByType(t,0)
-                        for c in xrange(len(conns)):
-                            try:
-                                svcs = conns[c].Services
-                                for s in xrange(len(svcs)):
-                                    try:
-                                        self.services.append(svcs[s])
-                                    except:
-                                        pass
-                            except:
-                                pass
-                    except:
-                        pass
-            except:
-                pass
-            self.last_got_services = clock()
-        return self.services
-
-    def test(self):
-        try:
-            assert self._get_services()    # make sure some services can be found
-            success = True
-        except:
-            success = False
-        return success
-
-
-    def open(self, ip, p):
-        svcs = self._get_services()
-        success = False
-        for s in svcs:
-            try:
-                s.InvokeAction('AddPortMapping',['',p,'TCP',p,ip,True,ID,0],'')
-                success = True
-            except:
-                pass
-        if DEBUG and not success:
-            print "COULDN'T OPEN "+str(p)
-            print_exc()
-        return success
-
-
-    def close(self, p):
-        svcs = self._get_services()
-        success = False
-        for s in svcs:
-            try:
-                s.InvokeAction('DeletePortMapping', ['',p,'TCP'], '')
-                success = True
-            except:
-                pass
-        if DEBUG and not success:
-            print "COULDN'T OPEN "+str(p)
-            print_exc()
-        return success
-
-
-class _UPnP:    # master holding class
-    def __init__(self):
-        self.upnp1 = _UPnP1()
-        self.upnp2 = _UPnP2()
-        self.upnplist = (None, self.upnp1, self.upnp2)
-        self.upnp = None
-        self.local_ip = None
-        self.last_got_ip = -10e10
-        
-    def get_ip(self):
-        if self.last_got_ip + EXPIRE_CACHE < clock():
-            local_ips = IP_List()
-            local_ips.set_intranet_addresses()
-            try:
-                for info in socket.getaddrinfo(socket.gethostname(),0,socket.AF_INET):
-                            # exception if socket library isn't recent
-                    self.local_ip = info[4][0]
-                    if local_ips.includes(self.local_ip):
-                        self.last_got_ip = clock()
-                        if DEBUG:
-                            print 'Local IP found: '+self.local_ip
-                        break
-                else:
-                    raise ValueError('couldn\'t find intranet IP')
-            except:
-                self.local_ip = None
-                if DEBUG:
-                    print 'Error finding local IP'
-                    print_exc()
-        return self.local_ip
-
-    def test(self, upnp_type):
-        if DEBUG:
-            print 'testing UPnP type '+str(upnp_type)
-        if not upnp_type or not _supported or self.get_ip() is None:
-            if DEBUG:
-                print 'not supported'
-            return 0
-        pythoncom.CoInitialize()                # leave initialized
-        self.upnp = self.upnplist[upnp_type]    # cache this
-        if self.upnp.test():
-            if DEBUG:
-                print 'ok'
-            return upnp_type
-        if DEBUG:
-            print 'tested bad'
-        return 0
-
-    def open(self, p):
-        assert self.upnp, "must run UPnP_test() with the desired UPnP access type first"
-        return self.upnp.open(self.get_ip(), p)
-
-    def close(self, p):
-        assert self.upnp, "must run UPnP_test() with the desired UPnP access type first"
-        return self.upnp.close(p)
-
-    def clean(self):
-        return self.upnp1.clean()
-
-_upnp_ = _UPnP()
-
-UPnP_test = _upnp_.test
-UPnP_open_port = _upnp_.open
-UPnP_close_port = _upnp_.close
-UPnP_reset = _upnp_.clean
-