]>
git.ipfire.org Git - people/shoehn/ipfire.org.git/blob - www/pages/torrent/client/SocketHandler.py
1 # Written by Bram Cohen
2 # see LICENSE.txt for license information
5 from errno
import EWOULDBLOCK
, ECONNREFUSED
, EHOSTUNREACH
7 from select
import poll
, error
, POLLIN
, POLLOUT
, POLLERR
, POLLHUP
10 from selectpoll
import poll
, error
, POLLIN
, POLLOUT
, POLLERR
, POLLHUP
12 from time
import sleep
13 from clock
import clock
15 from random
import shuffle
, randrange
16 from natpunch
import UPnP_open_port
, UPnP_close_port
17 # from BT1.StreamCheck import StreamCheck
25 all
= POLLIN | POLLOUT
27 UPnP_ERROR
= "unable to forward port via UPnP"
30 def __init__(self
, socket_handler
, sock
, handler
, ip
= None):
31 self
.socket_handler
= socket_handler
33 self
.handler
= handler
35 self
.last_hit
= clock()
36 self
.fileno
= sock
.fileno()
37 self
.connected
= False
39 # self.check = StreamCheck()
41 self
.ip
= self
.socket
.getpeername()[0]
48 def get_ip(self
, real
=False):
51 self
.ip
= self
.socket
.getpeername()[0]
58 for x in xrange(5,0,-1):
60 f = inspect.currentframe(x).f_code
61 print (f.co_filename,f.co_firstlineno,f.co_name)
68 self
.connected
= False
72 del self
.socket_handler
.single_sockets
[self
.fileno
]
73 self
.socket_handler
.poll
.unregister(sock
)
76 def shutdown(self
, val
):
77 self
.socket
.shutdown(val
)
80 return not self
.buffer
84 assert self
.socket
is not None
86 if len(self
.buffer) == 1:
95 amount
= self
.socket
.send(buf
)
100 if amount
!= len(buf
):
101 self
.buffer[0] = buf
[amount
:]
104 except socket
.error
, e
:
106 dead
= e
[0] != EWOULDBLOCK
110 if self
.skipped
>= 3:
113 self
.socket_handler
.dead_from_write
.append(self
)
116 self
.socket_handler
.poll
.register(self
.socket
, all
)
118 self
.socket_handler
.poll
.register(self
.socket
, POLLIN
)
120 def set_handler(self
, handler
):
121 self
.handler
= handler
124 def __init__(self
, timeout
, ipv6_enable
, readsize
= 100000):
125 self
.timeout
= timeout
126 self
.ipv6_enable
= ipv6_enable
127 self
.readsize
= readsize
129 # {socket: SingleSocket}
130 self
.single_sockets
= {}
131 self
.dead_from_write
= []
132 self
.max_connects
= 1000
133 self
.port_forwarded
= None
136 def scan_for_timeouts(self
):
137 t
= clock() - self
.timeout
139 for s
in self
.single_sockets
.values():
143 if k
.socket
is not None:
144 self
._close
_socket
(k
)
146 def bind(self
, port
, bind
= '', reuse
= False, ipv6_socket_style
= 1, upnp
= 0):
151 # if bind != "" thread it as a comma seperated list and bind to all
152 # addresses (can be ips or hostnames) else bind to default ipv6 and
156 socktype
= socket
.AF_UNSPEC
158 socktype
= socket
.AF_INET
159 bind
= bind
.split(',')
161 if sys
.version_info
< (2,2):
162 addrinfos
.append((socket
.AF_INET
, None, None, None, (addr
, port
)))
164 addrinfos
.extend(socket
.getaddrinfo(addr
, port
,
165 socktype
, socket
.SOCK_STREAM
))
168 addrinfos
.append([socket
.AF_INET6
, None, None, None, ('', port
)])
169 if not addrinfos
or ipv6_socket_style
!= 0:
170 addrinfos
.append([socket
.AF_INET
, None, None, None, ('', port
)])
171 for addrinfo
in addrinfos
:
173 server
= socket
.socket(addrinfo
[0], socket
.SOCK_STREAM
)
175 server
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
176 server
.setblocking(0)
177 server
.bind(addrinfo
[4])
178 self
.servers
[server
.fileno()] = server
180 self
.interfaces
.append(server
.getsockname()[0])
182 self
.poll
.register(server
, POLLIN
)
183 except socket
.error
, e
:
184 for server
in self
.servers
.values():
189 if self
.ipv6_enable
and ipv6_socket_style
== 0 and self
.servers
:
190 raise socket
.error('blocked port (may require ipv6_binds_v4 to be set)')
191 raise socket
.error(str(e
))
193 raise socket
.error('unable to open server port')
195 if not UPnP_open_port(port
):
196 for server
in self
.servers
.values():
202 self
.interfaces
= None
203 raise socket
.error(UPnP_ERROR
)
204 self
.port_forwarded
= port
207 def find_and_bind(self
, minport
, maxport
, bind
= '', reuse
= False,
208 ipv6_socket_style
= 1, upnp
= 0, randomizer
= False):
209 e
= 'maxport less than minport - no ports to check'
210 if maxport
-minport
< 50 or not randomizer
:
211 portrange
= range(minport
, maxport
+1)
214 portrange
= portrange
[:20] # check a maximum of 20 ports
217 while len(portrange
) < 20:
218 listen_port
= randrange(minport
, maxport
+1)
219 if not listen_port
in portrange
:
220 portrange
.append(listen_port
)
221 for listen_port
in portrange
:
223 self
.bind(listen_port
, bind
,
224 ipv6_socket_style
= ipv6_socket_style
, upnp
= upnp
)
226 except socket
.error
, e
:
228 raise socket
.error(str(e
))
231 def set_handler(self
, handler
):
232 self
.handler
= handler
235 def start_connection_raw(self
, dns
, socktype
= socket
.AF_INET
, handler
= None):
237 handler
= self
.handler
238 sock
= socket
.socket(socktype
, socket
.SOCK_STREAM
)
245 raise socket
.error(str(e
))
246 self
.poll
.register(sock
, POLLIN
)
247 s
= SingleSocket(self
, sock
, handler
, dns
[0])
248 self
.single_sockets
[sock
.fileno()] = s
252 def start_connection(self
, dns
, handler
= None, randomize
= False):
254 handler
= self
.handler
255 if sys
.version_info
< (2,2):
256 s
= self
.start_connection_raw(dns
,socket
.AF_INET
,handler
)
259 socktype
= socket
.AF_UNSPEC
261 socktype
= socket
.AF_INET
263 addrinfos
= socket
.getaddrinfo(dns
[0], int(dns
[1]),
264 socktype
, socket
.SOCK_STREAM
)
265 except socket
.error
, e
:
268 raise socket
.error(str(e
))
271 for addrinfo
in addrinfos
:
273 s
= self
.start_connection_raw(addrinfo
[4],addrinfo
[0],handler
)
278 raise socket
.error('unable to connect')
285 def handle_events(self
, events
):
286 for sock
, event
in events
:
287 s
= self
.servers
.get(sock
)
289 if event
& (POLLHUP | POLLERR
) != 0:
290 self
.poll
.unregister(s
)
292 del self
.servers
[sock
]
293 print "lost server socket"
294 elif len(self
.single_sockets
) < self
.max_connects
:
296 newsock
, addr
= s
.accept()
297 newsock
.setblocking(0)
298 nss
= SingleSocket(self
, newsock
, self
.handler
)
299 self
.single_sockets
[newsock
.fileno()] = nss
300 self
.poll
.register(newsock
, POLLIN
)
301 self
.handler
.external_connection_made(nss
)
305 s
= self
.single_sockets
.get(sock
)
309 if (event
& (POLLHUP | POLLERR
)):
310 self
._close
_socket
(s
)
315 data
= s
.socket
.recv(100000)
317 self
._close
_socket
(s
)
319 s
.handler
.data_came_in(s
, data
)
320 except socket
.error
, e
:
322 if code
!= EWOULDBLOCK
:
323 self
._close
_socket
(s
)
325 if (event
& POLLOUT
) and s
.socket
and not s
.is_flushed():
328 s
.handler
.connection_flushed(s
)
330 def close_dead(self
):
331 while self
.dead_from_write
:
332 old
= self
.dead_from_write
333 self
.dead_from_write
= []
336 self
._close
_socket
(s
)
338 def _close_socket(self
, s
):
340 s
.handler
.connection_lost(s
)
342 def do_poll(self
, t
):
343 r
= self
.poll
.poll(t
*timemult
)
345 connects
= len(self
.single_sockets
)
346 to_close
= int(connects
*0.05)+1 # close 5% of sockets
347 self
.max_connects
= connects
-to_close
348 closelist
= self
.single_sockets
.values()
350 closelist
= closelist
[:to_close
]
351 for sock
in closelist
:
352 self
._close
_socket
(sock
)
357 return { 'interfaces': self
.interfaces
,
359 'upnp': self
.port_forwarded
is not None }
363 for ss
in self
.single_sockets
.values():
368 for server
in self
.servers
.values():
373 if self
.port_forwarded
is not None:
374 UPnP_close_port(self
.port_forwarded
)