1 # Written by John Hoffman
2 # derived from NATPortMapping.py by Yejun Yang
3 # and from example code by Myers Carpenter
4 # see LICENSE.txt for license information
7 from traceback
import print_exc
8 from subnetparse
import IP_List
9 from clock
import clock
10 from __init__
import createPeerID
19 EXPIRE_CACHE
= 30 # seconds
20 ID
= "BT-"+createPeerID()[-4:]
23 import pythoncom
, win32com
.client
30 class _UPnP1
: # derived from Myers Carpenter's code
31 # seems to use the machine's local UPnP
32 # system for its operation. Runs fairly fast
36 self
.last_got_map
= -10e10
39 if self
.last_got_map
+ EXPIRE_CACHE
< clock():
41 dispatcher
= win32com
.client
.Dispatch("HNetCfg.NATUPnP")
42 self
.map = dispatcher
.StaticPortMappingCollection
43 self
.last_got_map
= clock()
50 assert self
._get
_map
() # make sure a map was found
57 def open(self
, ip
, p
):
60 map.Add(p
,'TCP',p
,ip
,True,ID
)
62 print 'port opened: '+ip
+':'+str(p
)
66 print "COULDN'T OPEN "+str(p
)
78 print 'port closed: '+str(p
)
81 print 'ERROR CLOSING '+str(p
)
87 def clean(self
, retry
= False):
93 for i
in xrange(len(map)):
96 port
= mapping
.ExternalPort
97 prot
= str(mapping
.Protocol
).lower()
98 desc
= str(mapping
.Description
).lower()
101 if port
and prot
== 'tcp' and desc
[:3] == 'bt-':
102 ports_in_use
.append(port
)
104 for port
in ports_in_use
:
106 map.Remove(port
,'TCP')
109 if not success
and not retry
:
110 self
.clean(retry
= True)
115 class _UPnP2
: # derived from Yejun Yang's code
116 # apparently does a direct search for UPnP hardware
117 # may work in some cases where _UPnP1 won't, but is slow
118 # still need to implement "clean" method
122 self
.last_got_services
= -10e10
124 def _get_services(self
):
125 if not self
.services
or self
.last_got_services
+ EXPIRE_CACHE
< clock():
128 f
=win32com
.client
.Dispatch("UPnP.UPnPDeviceFinder")
129 for t
in ( "urn:schemas-upnp-org:service:WANIPConnection:1",
130 "urn:schemas-upnp-org:service:WANPPPConnection:1" ):
132 conns
= f
.FindByType(t
,0)
133 for c
in xrange(len(conns
)):
135 svcs
= conns
[c
].Services
136 for s
in xrange(len(svcs
)):
138 self
.services
.append(svcs
[s
])
147 self
.last_got_services
= clock()
152 assert self
._get
_services
() # make sure some services can be found
159 def open(self
, ip
, p
):
160 svcs
= self
._get
_services
()
164 s
.InvokeAction('AddPortMapping',['',p
,'TCP',p
,ip
,True,ID
,0],'')
168 if DEBUG
and not success
:
169 print "COULDN'T OPEN "+str(p
)
175 svcs
= self
._get
_services
()
179 s
.InvokeAction('DeletePortMapping', ['',p
,'TCP'], '')
183 if DEBUG
and not success
:
184 print "COULDN'T OPEN "+str(p
)
189 class _UPnP
: # master holding class
191 self
.upnp1
= _UPnP1()
192 self
.upnp2
= _UPnP2()
193 self
.upnplist
= (None, self
.upnp1
, self
.upnp2
)
196 self
.last_got_ip
= -10e10
199 if self
.last_got_ip
+ EXPIRE_CACHE
< clock():
200 local_ips
= IP_List()
201 local_ips
.set_intranet_addresses()
203 for info
in socket
.getaddrinfo(socket
.gethostname(),0,socket
.AF_INET
):
204 # exception if socket library isn't recent
205 self
.local_ip
= info
[4][0]
206 if local_ips
.includes(self
.local_ip
):
207 self
.last_got_ip
= clock()
209 print 'Local IP found: '+self
.local_ip
212 raise ValueError('couldn\'t find intranet IP')
216 print 'Error finding local IP'
220 def test(self
, upnp_type
):
222 print 'testing UPnP type '+str(upnp_type
)
223 if not upnp_type
or not _supported
or self
.get_ip() is None:
225 print 'not supported'
227 pythoncom
.CoInitialize() # leave initialized
228 self
.upnp
= self
.upnplist
[upnp_type
] # cache this
238 assert self
.upnp
, "must run UPnP_test() with the desired UPnP access type first"
239 return self
.upnp
.open(self
.get_ip(), p
)
242 assert self
.upnp
, "must run UPnP_test() with the desired UPnP access type first"
243 return self
.upnp
.close(p
)
246 return self
.upnp1
.clean()
250 UPnP_test
= _upnp_
.test
251 UPnP_open_port
= _upnp_
.open
252 UPnP_close_port
= _upnp_
.close
253 UPnP_reset
= _upnp_
.clean