]> git.ipfire.org Git - people/shoehn/ipfire.org.git/blob - www/pages/torrent/client/launchmanycore.py
Change Color of Menuitem CeBIT.
[people/shoehn/ipfire.org.git] / www / pages / torrent / client / launchmanycore.py
1 #!/usr/bin/env python
2
3 # Written by John Hoffman
4 # see LICENSE.txt for license information
5
6 from BitTornado import PSYCO
7 if PSYCO.psyco:
8 try:
9 import psyco
10 assert psyco.__version__ >= 0x010100f0
11 psyco.full()
12 except:
13 pass
14
15 from download_bt1 import BT1Download
16 from RawServer import RawServer, UPnP_ERROR
17 from RateLimiter import RateLimiter
18 from ServerPortHandler import MultiHandler
19 from parsedir import parsedir
20 from natpunch import UPnP_test
21 from random import seed
22 from socket import error as socketerror
23 from threading import Event
24 from sys import argv, exit
25 import sys, os
26 from clock import clock
27 from __init__ import createPeerID, mapbase64, version
28 from cStringIO import StringIO
29 from traceback import print_exc
30
31 try:
32 True
33 except:
34 True = 1
35 False = 0
36
37
38 def fmttime(n):
39 try:
40 n = int(n) # n may be None or too large
41 assert n < 5184000 # 60 days
42 except:
43 return 'downloading'
44 m, s = divmod(n, 60)
45 h, m = divmod(m, 60)
46 return '%d:%02d:%02d' % (h, m, s)
47
48 class SingleDownload:
49 def __init__(self, controller, hash, response, config, myid):
50 self.controller = controller
51 self.hash = hash
52 self.response = response
53 self.config = config
54
55 self.doneflag = Event()
56 self.waiting = True
57 self.checking = False
58 self.working = False
59 self.seed = False
60 self.closed = False
61
62 self.status_msg = ''
63 self.status_err = ['']
64 self.status_errtime = 0
65 self.status_done = 0.0
66
67 self.rawserver = controller.handler.newRawServer(hash, self.doneflag)
68
69 d = BT1Download(self.display, self.finished, self.error,
70 controller.exchandler, self.doneflag, config, response,
71 hash, myid, self.rawserver, controller.listen_port)
72 self.d = d
73
74 def start(self):
75 if not self.d.saveAs(self.saveAs):
76 self._shutdown()
77 return
78 self._hashcheckfunc = self.d.initFiles()
79 if not self._hashcheckfunc:
80 self._shutdown()
81 return
82 self.controller.hashchecksched(self.hash)
83
84
85 def saveAs(self, name, length, saveas, isdir):
86 return self.controller.saveAs(self.hash, name, saveas, isdir)
87
88 def hashcheck_start(self, donefunc):
89 if self.is_dead():
90 self._shutdown()
91 return
92 self.waiting = False
93 self.checking = True
94 self._hashcheckfunc(donefunc)
95
96 def hashcheck_callback(self):
97 self.checking = False
98 if self.is_dead():
99 self._shutdown()
100 return
101 if not self.d.startEngine(ratelimiter = self.controller.ratelimiter):
102 self._shutdown()
103 return
104 self.d.startRerequester()
105 self.statsfunc = self.d.startStats()
106 self.rawserver.start_listening(self.d.getPortHandler())
107 self.working = True
108
109 def is_dead(self):
110 return self.doneflag.isSet()
111
112 def _shutdown(self):
113 self.shutdown(False)
114
115 def shutdown(self, quiet=True):
116 if self.closed:
117 return
118 self.doneflag.set()
119 self.rawserver.shutdown()
120 if self.checking or self.working:
121 self.d.shutdown()
122 self.waiting = False
123 self.checking = False
124 self.working = False
125 self.closed = True
126 self.controller.was_stopped(self.hash)
127 if not quiet:
128 self.controller.died(self.hash)
129
130
131 def display(self, activity = None, fractionDone = None):
132 # really only used by StorageWrapper now
133 if activity:
134 self.status_msg = activity
135 if fractionDone is not None:
136 self.status_done = float(fractionDone)
137
138 def finished(self):
139 self.seed = True
140
141 def error(self, msg):
142 if self.doneflag.isSet():
143 self._shutdown()
144 self.status_err.append(msg)
145 self.status_errtime = clock()
146
147
148 class LaunchMany:
149 def __init__(self, config, Output):
150 try:
151 self.config = config
152 self.Output = Output
153
154 self.torrent_dir = config['torrent_dir']
155 self.torrent_cache = {}
156 self.file_cache = {}
157 self.blocked_files = {}
158 self.scan_period = config['parse_dir_interval']
159 self.stats_period = config['display_interval']
160
161 self.torrent_list = []
162 self.downloads = {}
163 self.counter = 0
164 self.doneflag = Event()
165
166 self.hashcheck_queue = []
167 self.hashcheck_current = None
168
169 self.rawserver = RawServer(self.doneflag, config['timeout_check_interval'],
170 config['timeout'], ipv6_enable = config['ipv6_enabled'],
171 failfunc = self.failed, errorfunc = self.exchandler)
172 upnp_type = UPnP_test(config['upnp_nat_access'])
173 while True:
174 try:
175 self.listen_port = self.rawserver.find_and_bind(
176 config['minport'], config['maxport'], config['bind'],
177 ipv6_socket_style = config['ipv6_binds_v4'],
178 upnp = upnp_type, randomizer = config['random_port'])
179 break
180 except socketerror, e:
181 if upnp_type and e == UPnP_ERROR:
182 self.Output.message('WARNING: COULD NOT FORWARD VIA UPnP')
183 upnp_type = 0
184 continue
185 self.failed("Couldn't listen - " + str(e))
186 return
187
188 self.ratelimiter = RateLimiter(self.rawserver.add_task,
189 config['upload_unit_size'])
190 self.ratelimiter.set_upload_rate(config['max_upload_rate'])
191
192 self.handler = MultiHandler(self.rawserver, self.doneflag)
193 seed(createPeerID())
194 self.rawserver.add_task(self.scan, 0)
195 self.rawserver.add_task(self.stats, 0)
196
197 self.handler.listen_forever()
198
199 self.Output.message('shutting down')
200 self.hashcheck_queue = []
201 for hash in self.torrent_list:
202 self.Output.message('dropped "'+self.torrent_cache[hash]['path']+'"')
203 self.downloads[hash].shutdown()
204 self.rawserver.shutdown()
205
206 except:
207 data = StringIO()
208 print_exc(file = data)
209 Output.exception(data.getvalue())
210
211
212 def scan(self):
213 self.rawserver.add_task(self.scan, self.scan_period)
214
215 r = parsedir(self.torrent_dir, self.torrent_cache,
216 self.file_cache, self.blocked_files,
217 return_metainfo = True, errfunc = self.Output.message)
218
219 ( self.torrent_cache, self.file_cache, self.blocked_files,
220 added, removed ) = r
221
222 for hash, data in removed.items():
223 self.Output.message('dropped "'+data['path']+'"')
224 self.remove(hash)
225 for hash, data in added.items():
226 self.Output.message('added "'+data['path']+'"')
227 self.add(hash, data)
228
229 def stats(self):
230 self.rawserver.add_task(self.stats, self.stats_period)
231 data = []
232 for hash in self.torrent_list:
233 cache = self.torrent_cache[hash]
234 if self.config['display_path']:
235 name = cache['path']
236 else:
237 name = cache['name']
238 size = cache['length']
239 d = self.downloads[hash]
240 progress = '0.0%'
241 peers = 0
242 seeds = 0
243 seedsmsg = "S"
244 dist = 0.0
245 uprate = 0.0
246 dnrate = 0.0
247 upamt = 0
248 dnamt = 0
249 t = 0
250 if d.is_dead():
251 status = 'stopped'
252 elif d.waiting:
253 status = 'waiting for hash check'
254 elif d.checking:
255 status = d.status_msg
256 progress = '%.1f%%' % (d.status_done*100)
257 else:
258 stats = d.statsfunc()
259 s = stats['stats']
260 if d.seed:
261 status = 'seeding'
262 progress = '100.0%'
263 seeds = s.numOldSeeds
264 seedsmsg = "s"
265 dist = s.numCopies
266 else:
267 if s.numSeeds + s.numPeers:
268 t = stats['time']
269 if t == 0: # unlikely
270 t = 0.01
271 status = fmttime(t)
272 else:
273 t = -1
274 status = 'connecting to peers'
275 progress = '%.1f%%' % (int(stats['frac']*1000)/10.0)
276 seeds = s.numSeeds
277 dist = s.numCopies2
278 dnrate = stats['down']
279 peers = s.numPeers
280 uprate = stats['up']
281 upamt = s.upTotal
282 dnamt = s.downTotal
283
284 if d.is_dead() or d.status_errtime+300 > clock():
285 msg = d.status_err[-1]
286 else:
287 msg = ''
288
289 data.append(( name, status, progress, peers, seeds, seedsmsg, dist,
290 uprate, dnrate, upamt, dnamt, size, t, msg ))
291 stop = self.Output.display(data)
292 if stop:
293 self.doneflag.set()
294
295 def remove(self, hash):
296 self.torrent_list.remove(hash)
297 self.downloads[hash].shutdown()
298 del self.downloads[hash]
299
300 def add(self, hash, data):
301 c = self.counter
302 self.counter += 1
303 x = ''
304 for i in xrange(3):
305 x = mapbase64[c & 0x3F]+x
306 c >>= 6
307 peer_id = createPeerID(x)
308 d = SingleDownload(self, hash, data['metainfo'], self.config, peer_id)
309 self.torrent_list.append(hash)
310 self.downloads[hash] = d
311 d.start()
312
313
314 def saveAs(self, hash, name, saveas, isdir):
315 x = self.torrent_cache[hash]
316 style = self.config['saveas_style']
317 if style == 1 or style == 3:
318 if saveas:
319 saveas = os.path.join(saveas,x['file'][:-1-len(x['type'])])
320 else:
321 saveas = x['path'][:-1-len(x['type'])]
322 if style == 3:
323 if not os.path.isdir(saveas):
324 try:
325 os.mkdir(saveas)
326 except:
327 raise OSError("couldn't create directory for "+x['path']
328 +" ("+saveas+")")
329 if not isdir:
330 saveas = os.path.join(saveas, name)
331 else:
332 if saveas:
333 saveas = os.path.join(saveas, name)
334 else:
335 saveas = os.path.join(os.path.split(x['path'])[0], name)
336
337 if isdir and not os.path.isdir(saveas):
338 try:
339 os.mkdir(saveas)
340 except:
341 raise OSError("couldn't create directory for "+x['path']
342 +" ("+saveas+")")
343 return saveas
344
345
346 def hashchecksched(self, hash = None):
347 if hash:
348 self.hashcheck_queue.append(hash)
349 if not self.hashcheck_current:
350 self._hashcheck_start()
351
352 def _hashcheck_start(self):
353 self.hashcheck_current = self.hashcheck_queue.pop(0)
354 self.downloads[self.hashcheck_current].hashcheck_start(self.hashcheck_callback)
355
356 def hashcheck_callback(self):
357 self.downloads[self.hashcheck_current].hashcheck_callback()
358 if self.hashcheck_queue:
359 self._hashcheck_start()
360 else:
361 self.hashcheck_current = None
362
363 def died(self, hash):
364 if self.torrent_cache.has_key(hash):
365 self.Output.message('DIED: "'+self.torrent_cache[hash]['path']+'"')
366
367 def was_stopped(self, hash):
368 try:
369 self.hashcheck_queue.remove(hash)
370 except:
371 pass
372 if self.hashcheck_current == hash:
373 self.hashcheck_current = None
374 if self.hashcheck_queue:
375 self._hashcheck_start()
376
377 def failed(self, s):
378 self.Output.message('FAILURE: '+s)
379
380 def exchandler(self, s):
381 self.Output.exception(s)