]>
git.ipfire.org Git - people/shoehn/ipfire.org.git/blob - www/pages/torrent/client/launchmanycore.py
3 # Written by John Hoffman
4 # see LICENSE.txt for license information
6 from BitTornado
import PSYCO
10 assert psyco
.__version
__ >= 0x010100f0
15 from download_bt1
import BT1Download
16 from RawServer
import RawServer
, UPnP_ERROR
17 from RateLimiter
import RateLimiter
18 from ServerPortHandler
import MultiHandler
19 from parsedir
import parsedir
20 from natpunch
import UPnP_test
21 from random
import seed
22 from socket
import error
as socketerror
23 from threading
import Event
24 from sys
import argv
, exit
26 from clock
import clock
27 from __init__
import createPeerID
, mapbase64
, version
28 from cStringIO
import StringIO
29 from traceback
import print_exc
40 n
= int(n
) # n may be None or too large
41 assert n
< 5184000 # 60 days
46 return '%d:%02d:%02d' % (h
, m
, s
)
49 def __init__(self
, controller
, hash, response
, config
, myid
):
50 self
.controller
= controller
52 self
.response
= response
55 self
.doneflag
= Event()
63 self
.status_err
= ['']
64 self
.status_errtime
= 0
65 self
.status_done
= 0.0
67 self
.rawserver
= controller
.handler
.newRawServer(hash, self
.doneflag
)
69 d
= BT1Download(self
.display
, self
.finished
, self
.error
,
70 controller
.exchandler
, self
.doneflag
, config
, response
,
71 hash, myid
, self
.rawserver
, controller
.listen_port
)
75 if not self
.d
.saveAs(self
.saveAs
):
78 self
._hashcheckfunc
= self
.d
.initFiles()
79 if not self
._hashcheckfunc
:
82 self
.controller
.hashchecksched(self
.hash)
85 def saveAs(self
, name
, length
, saveas
, isdir
):
86 return self
.controller
.saveAs(self
.hash, name
, saveas
, isdir
)
88 def hashcheck_start(self
, donefunc
):
94 self
._hashcheckfunc
(donefunc
)
96 def hashcheck_callback(self
):
101 if not self
.d
.startEngine(ratelimiter
= self
.controller
.ratelimiter
):
104 self
.d
.startRerequester()
105 self
.statsfunc
= self
.d
.startStats()
106 self
.rawserver
.start_listening(self
.d
.getPortHandler())
110 return self
.doneflag
.isSet()
115 def shutdown(self
, quiet
=True):
119 self
.rawserver
.shutdown()
120 if self
.checking
or self
.working
:
123 self
.checking
= False
126 self
.controller
.was_stopped(self
.hash)
128 self
.controller
.died(self
.hash)
131 def display(self
, activity
= None, fractionDone
= None):
132 # really only used by StorageWrapper now
134 self
.status_msg
= activity
135 if fractionDone
is not None:
136 self
.status_done
= float(fractionDone
)
141 def error(self
, msg
):
142 if self
.doneflag
.isSet():
144 self
.status_err
.append(msg
)
145 self
.status_errtime
= clock()
149 def __init__(self
, config
, Output
):
154 self
.torrent_dir
= config
['torrent_dir']
155 self
.torrent_cache
= {}
157 self
.blocked_files
= {}
158 self
.scan_period
= config
['parse_dir_interval']
159 self
.stats_period
= config
['display_interval']
161 self
.torrent_list
= []
164 self
.doneflag
= Event()
166 self
.hashcheck_queue
= []
167 self
.hashcheck_current
= None
169 self
.rawserver
= RawServer(self
.doneflag
, config
['timeout_check_interval'],
170 config
['timeout'], ipv6_enable
= config
['ipv6_enabled'],
171 failfunc
= self
.failed
, errorfunc
= self
.exchandler
)
172 upnp_type
= UPnP_test(config
['upnp_nat_access'])
175 self
.listen_port
= self
.rawserver
.find_and_bind(
176 config
['minport'], config
['maxport'], config
['bind'],
177 ipv6_socket_style
= config
['ipv6_binds_v4'],
178 upnp
= upnp_type
, randomizer
= config
['random_port'])
180 except socketerror
, e
:
181 if upnp_type
and e
== UPnP_ERROR
:
182 self
.Output
.message('WARNING: COULD NOT FORWARD VIA UPnP')
185 self
.failed("Couldn't listen - " + str(e
))
188 self
.ratelimiter
= RateLimiter(self
.rawserver
.add_task
,
189 config
['upload_unit_size'])
190 self
.ratelimiter
.set_upload_rate(config
['max_upload_rate'])
192 self
.handler
= MultiHandler(self
.rawserver
, self
.doneflag
)
194 self
.rawserver
.add_task(self
.scan
, 0)
195 self
.rawserver
.add_task(self
.stats
, 0)
197 self
.handler
.listen_forever()
199 self
.Output
.message('shutting down')
200 self
.hashcheck_queue
= []
201 for hash in self
.torrent_list
:
202 self
.Output
.message('dropped "'+self
.torrent_cache
[hash]['path']+'"')
203 self
.downloads
[hash].shutdown()
204 self
.rawserver
.shutdown()
208 print_exc(file = data
)
209 Output
.exception(data
.getvalue())
213 self
.rawserver
.add_task(self
.scan
, self
.scan_period
)
215 r
= parsedir(self
.torrent_dir
, self
.torrent_cache
,
216 self
.file_cache
, self
.blocked_files
,
217 return_metainfo
= True, errfunc
= self
.Output
.message
)
219 ( self
.torrent_cache
, self
.file_cache
, self
.blocked_files
,
222 for hash, data
in removed
.items():
223 self
.Output
.message('dropped "'+data
['path']+'"')
225 for hash, data
in added
.items():
226 self
.Output
.message('added "'+data
['path']+'"')
230 self
.rawserver
.add_task(self
.stats
, self
.stats_period
)
232 for hash in self
.torrent_list
:
233 cache
= self
.torrent_cache
[hash]
234 if self
.config
['display_path']:
238 size
= cache
['length']
239 d
= self
.downloads
[hash]
253 status
= 'waiting for hash check'
255 status
= d
.status_msg
256 progress
= '%.1f%%' % (d
.status_done
*100)
258 stats
= d
.statsfunc()
263 seeds
= s
.numOldSeeds
267 if s
.numSeeds
+ s
.numPeers
:
269 if t
== 0: # unlikely
274 status
= 'connecting to peers'
275 progress
= '%.1f%%' % (int(stats
['frac']*1000)/10.0)
278 dnrate
= stats
['down']
284 if d
.is_dead() or d
.status_errtime
+300 > clock():
285 msg
= d
.status_err
[-1]
289 data
.append(( name
, status
, progress
, peers
, seeds
, seedsmsg
, dist
,
290 uprate
, dnrate
, upamt
, dnamt
, size
, t
, msg
))
291 stop
= self
.Output
.display(data
)
295 def remove(self
, hash):
296 self
.torrent_list
.remove(hash)
297 self
.downloads
[hash].shutdown()
298 del self
.downloads
[hash]
300 def add(self
, hash, data
):
305 x
= mapbase64
[c
& 0x3F]+x
307 peer_id
= createPeerID(x
)
308 d
= SingleDownload(self
, hash, data
['metainfo'], self
.config
, peer_id
)
309 self
.torrent_list
.append(hash)
310 self
.downloads
[hash] = d
314 def saveAs(self
, hash, name
, saveas
, isdir
):
315 x
= self
.torrent_cache
[hash]
316 style
= self
.config
['saveas_style']
317 if style
== 1 or style
== 3:
319 saveas
= os
.path
.join(saveas
,x
['file'][:-1-len(x
['type'])])
321 saveas
= x
['path'][:-1-len(x
['type'])]
323 if not os
.path
.isdir(saveas
):
327 raise OSError("couldn't create directory for "+x
['path']
330 saveas
= os
.path
.join(saveas
, name
)
333 saveas
= os
.path
.join(saveas
, name
)
335 saveas
= os
.path
.join(os
.path
.split(x
['path'])[0], name
)
337 if isdir
and not os
.path
.isdir(saveas
):
341 raise OSError("couldn't create directory for "+x
['path']
346 def hashchecksched(self
, hash = None):
348 self
.hashcheck_queue
.append(hash)
349 if not self
.hashcheck_current
:
350 self
._hashcheck
_start
()
352 def _hashcheck_start(self
):
353 self
.hashcheck_current
= self
.hashcheck_queue
.pop(0)
354 self
.downloads
[self
.hashcheck_current
].hashcheck_start(self
.hashcheck_callback
)
356 def hashcheck_callback(self
):
357 self
.downloads
[self
.hashcheck_current
].hashcheck_callback()
358 if self
.hashcheck_queue
:
359 self
._hashcheck
_start
()
361 self
.hashcheck_current
= None
363 def died(self
, hash):
364 if self
.torrent_cache
.has_key(hash):
365 self
.Output
.message('DIED: "'+self
.torrent_cache
[hash]['path']+'"')
367 def was_stopped(self
, hash):
369 self
.hashcheck_queue
.remove(hash)
372 if self
.hashcheck_current
== hash:
373 self
.hashcheck_current
= None
374 if self
.hashcheck_queue
:
375 self
._hashcheck
_start
()
378 self
.Output
.message('FAILURE: '+s
)
380 def exchandler(self
, s
):
381 self
.Output
.exception(s
)