]> git.ipfire.org Git - people/shoehn/ipfire.org.git/blob - www/pages/torrent/client/natpunch.py
Change Color of Menuitem CeBIT.
[people/shoehn/ipfire.org.git] / www / pages / torrent / client / natpunch.py
1 # Written by John Hoffman
2 # derived from NATPortMapping.py by Yejun Yang
3 # and from example code by Myers Carpenter
4 # see LICENSE.txt for license information
5
6 import socket
7 from traceback import print_exc
8 from subnetparse import IP_List
9 from clock import clock
10 from __init__ import createPeerID
11 try:
12 True
13 except:
14 True = 1
15 False = 0
16
17 DEBUG = False
18
19 EXPIRE_CACHE = 30 # seconds
20 ID = "BT-"+createPeerID()[-4:]
21
22 try:
23 import pythoncom, win32com.client
24 _supported = 1
25 except ImportError:
26 _supported = 0
27
28
29
30 class _UPnP1: # derived from Myers Carpenter's code
31 # seems to use the machine's local UPnP
32 # system for its operation. Runs fairly fast
33
34 def __init__(self):
35 self.map = None
36 self.last_got_map = -10e10
37
38 def _get_map(self):
39 if self.last_got_map + EXPIRE_CACHE < clock():
40 try:
41 dispatcher = win32com.client.Dispatch("HNetCfg.NATUPnP")
42 self.map = dispatcher.StaticPortMappingCollection
43 self.last_got_map = clock()
44 except:
45 self.map = None
46 return self.map
47
48 def test(self):
49 try:
50 assert self._get_map() # make sure a map was found
51 success = True
52 except:
53 success = False
54 return success
55
56
57 def open(self, ip, p):
58 map = self._get_map()
59 try:
60 map.Add(p,'TCP',p,ip,True,ID)
61 if DEBUG:
62 print 'port opened: '+ip+':'+str(p)
63 success = True
64 except:
65 if DEBUG:
66 print "COULDN'T OPEN "+str(p)
67 print_exc()
68 success = False
69 return success
70
71
72 def close(self, p):
73 map = self._get_map()
74 try:
75 map.Remove(p,'TCP')
76 success = True
77 if DEBUG:
78 print 'port closed: '+str(p)
79 except:
80 if DEBUG:
81 print 'ERROR CLOSING '+str(p)
82 print_exc()
83 success = False
84 return success
85
86
87 def clean(self, retry = False):
88 if not _supported:
89 return
90 try:
91 map = self._get_map()
92 ports_in_use = []
93 for i in xrange(len(map)):
94 try:
95 mapping = map[i]
96 port = mapping.ExternalPort
97 prot = str(mapping.Protocol).lower()
98 desc = str(mapping.Description).lower()
99 except:
100 port = None
101 if port and prot == 'tcp' and desc[:3] == 'bt-':
102 ports_in_use.append(port)
103 success = True
104 for port in ports_in_use:
105 try:
106 map.Remove(port,'TCP')
107 except:
108 success = False
109 if not success and not retry:
110 self.clean(retry = True)
111 except:
112 pass
113
114
115 class _UPnP2: # derived from Yejun Yang's code
116 # apparently does a direct search for UPnP hardware
117 # may work in some cases where _UPnP1 won't, but is slow
118 # still need to implement "clean" method
119
120 def __init__(self):
121 self.services = None
122 self.last_got_services = -10e10
123
124 def _get_services(self):
125 if not self.services or self.last_got_services + EXPIRE_CACHE < clock():
126 self.services = []
127 try:
128 f=win32com.client.Dispatch("UPnP.UPnPDeviceFinder")
129 for t in ( "urn:schemas-upnp-org:service:WANIPConnection:1",
130 "urn:schemas-upnp-org:service:WANPPPConnection:1" ):
131 try:
132 conns = f.FindByType(t,0)
133 for c in xrange(len(conns)):
134 try:
135 svcs = conns[c].Services
136 for s in xrange(len(svcs)):
137 try:
138 self.services.append(svcs[s])
139 except:
140 pass
141 except:
142 pass
143 except:
144 pass
145 except:
146 pass
147 self.last_got_services = clock()
148 return self.services
149
150 def test(self):
151 try:
152 assert self._get_services() # make sure some services can be found
153 success = True
154 except:
155 success = False
156 return success
157
158
159 def open(self, ip, p):
160 svcs = self._get_services()
161 success = False
162 for s in svcs:
163 try:
164 s.InvokeAction('AddPortMapping',['',p,'TCP',p,ip,True,ID,0],'')
165 success = True
166 except:
167 pass
168 if DEBUG and not success:
169 print "COULDN'T OPEN "+str(p)
170 print_exc()
171 return success
172
173
174 def close(self, p):
175 svcs = self._get_services()
176 success = False
177 for s in svcs:
178 try:
179 s.InvokeAction('DeletePortMapping', ['',p,'TCP'], '')
180 success = True
181 except:
182 pass
183 if DEBUG and not success:
184 print "COULDN'T OPEN "+str(p)
185 print_exc()
186 return success
187
188
189 class _UPnP: # master holding class
190 def __init__(self):
191 self.upnp1 = _UPnP1()
192 self.upnp2 = _UPnP2()
193 self.upnplist = (None, self.upnp1, self.upnp2)
194 self.upnp = None
195 self.local_ip = None
196 self.last_got_ip = -10e10
197
198 def get_ip(self):
199 if self.last_got_ip + EXPIRE_CACHE < clock():
200 local_ips = IP_List()
201 local_ips.set_intranet_addresses()
202 try:
203 for info in socket.getaddrinfo(socket.gethostname(),0,socket.AF_INET):
204 # exception if socket library isn't recent
205 self.local_ip = info[4][0]
206 if local_ips.includes(self.local_ip):
207 self.last_got_ip = clock()
208 if DEBUG:
209 print 'Local IP found: '+self.local_ip
210 break
211 else:
212 raise ValueError('couldn\'t find intranet IP')
213 except:
214 self.local_ip = None
215 if DEBUG:
216 print 'Error finding local IP'
217 print_exc()
218 return self.local_ip
219
220 def test(self, upnp_type):
221 if DEBUG:
222 print 'testing UPnP type '+str(upnp_type)
223 if not upnp_type or not _supported or self.get_ip() is None:
224 if DEBUG:
225 print 'not supported'
226 return 0
227 pythoncom.CoInitialize() # leave initialized
228 self.upnp = self.upnplist[upnp_type] # cache this
229 if self.upnp.test():
230 if DEBUG:
231 print 'ok'
232 return upnp_type
233 if DEBUG:
234 print 'tested bad'
235 return 0
236
237 def open(self, p):
238 assert self.upnp, "must run UPnP_test() with the desired UPnP access type first"
239 return self.upnp.open(self.get_ip(), p)
240
241 def close(self, p):
242 assert self.upnp, "must run UPnP_test() with the desired UPnP access type first"
243 return self.upnp.close(p)
244
245 def clean(self):
246 return self.upnp1.clean()
247
248 _upnp_ = _UPnP()
249
250 UPnP_test = _upnp_.test
251 UPnP_open_port = _upnp_.open
252 UPnP_close_port = _upnp_.close
253 UPnP_reset = _upnp_.clean
254