]>
Commit | Line | Data |
---|---|---|
879aa787 MT |
1 | #!/usr/bin/env python |
2 | ||
3 | # Written by John Hoffman | |
4 | # see LICENSE.txt for license information | |
5 | ||
6 | from BitTornado import PSYCO | |
7 | if PSYCO.psyco: | |
8 | try: | |
9 | import psyco | |
10 | assert psyco.__version__ >= 0x010100f0 | |
11 | psyco.full() | |
12 | except: | |
13 | pass | |
14 | ||
15 | from download_bt1 import BT1Download | |
16 | from RawServer import RawServer, UPnP_ERROR | |
17 | from RateLimiter import RateLimiter | |
18 | from ServerPortHandler import MultiHandler | |
19 | from parsedir import parsedir | |
20 | from natpunch import UPnP_test | |
21 | from random import seed | |
22 | from socket import error as socketerror | |
23 | from threading import Event | |
24 | from sys import argv, exit | |
25 | import sys, os | |
26 | from clock import clock | |
27 | from __init__ import createPeerID, mapbase64, version | |
28 | from cStringIO import StringIO | |
29 | from traceback import print_exc | |
30 | ||
31 | try: | |
32 | True | |
33 | except: | |
34 | True = 1 | |
35 | False = 0 | |
36 | ||
37 | ||
38 | def fmttime(n): | |
39 | try: | |
40 | n = int(n) # n may be None or too large | |
41 | assert n < 5184000 # 60 days | |
42 | except: | |
43 | return 'downloading' | |
44 | m, s = divmod(n, 60) | |
45 | h, m = divmod(m, 60) | |
46 | return '%d:%02d:%02d' % (h, m, s) | |
47 | ||
48 | class SingleDownload: | |
49 | def __init__(self, controller, hash, response, config, myid): | |
50 | self.controller = controller | |
51 | self.hash = hash | |
52 | self.response = response | |
53 | self.config = config | |
54 | ||
55 | self.doneflag = Event() | |
56 | self.waiting = True | |
57 | self.checking = False | |
58 | self.working = False | |
59 | self.seed = False | |
60 | self.closed = False | |
61 | ||
62 | self.status_msg = '' | |
63 | self.status_err = [''] | |
64 | self.status_errtime = 0 | |
65 | self.status_done = 0.0 | |
66 | ||
67 | self.rawserver = controller.handler.newRawServer(hash, self.doneflag) | |
68 | ||
69 | d = BT1Download(self.display, self.finished, self.error, | |
70 | controller.exchandler, self.doneflag, config, response, | |
71 | hash, myid, self.rawserver, controller.listen_port) | |
72 | self.d = d | |
73 | ||
74 | def start(self): | |
75 | if not self.d.saveAs(self.saveAs): | |
76 | self._shutdown() | |
77 | return | |
78 | self._hashcheckfunc = self.d.initFiles() | |
79 | if not self._hashcheckfunc: | |
80 | self._shutdown() | |
81 | return | |
82 | self.controller.hashchecksched(self.hash) | |
83 | ||
84 | ||
85 | def saveAs(self, name, length, saveas, isdir): | |
86 | return self.controller.saveAs(self.hash, name, saveas, isdir) | |
87 | ||
88 | def hashcheck_start(self, donefunc): | |
89 | if self.is_dead(): | |
90 | self._shutdown() | |
91 | return | |
92 | self.waiting = False | |
93 | self.checking = True | |
94 | self._hashcheckfunc(donefunc) | |
95 | ||
96 | def hashcheck_callback(self): | |
97 | self.checking = False | |
98 | if self.is_dead(): | |
99 | self._shutdown() | |
100 | return | |
101 | if not self.d.startEngine(ratelimiter = self.controller.ratelimiter): | |
102 | self._shutdown() | |
103 | return | |
104 | self.d.startRerequester() | |
105 | self.statsfunc = self.d.startStats() | |
106 | self.rawserver.start_listening(self.d.getPortHandler()) | |
107 | self.working = True | |
108 | ||
109 | def is_dead(self): | |
110 | return self.doneflag.isSet() | |
111 | ||
112 | def _shutdown(self): | |
113 | self.shutdown(False) | |
114 | ||
115 | def shutdown(self, quiet=True): | |
116 | if self.closed: | |
117 | return | |
118 | self.doneflag.set() | |
119 | self.rawserver.shutdown() | |
120 | if self.checking or self.working: | |
121 | self.d.shutdown() | |
122 | self.waiting = False | |
123 | self.checking = False | |
124 | self.working = False | |
125 | self.closed = True | |
126 | self.controller.was_stopped(self.hash) | |
127 | if not quiet: | |
128 | self.controller.died(self.hash) | |
129 | ||
130 | ||
131 | def display(self, activity = None, fractionDone = None): | |
132 | # really only used by StorageWrapper now | |
133 | if activity: | |
134 | self.status_msg = activity | |
135 | if fractionDone is not None: | |
136 | self.status_done = float(fractionDone) | |
137 | ||
138 | def finished(self): | |
139 | self.seed = True | |
140 | ||
141 | def error(self, msg): | |
142 | if self.doneflag.isSet(): | |
143 | self._shutdown() | |
144 | self.status_err.append(msg) | |
145 | self.status_errtime = clock() | |
146 | ||
147 | ||
148 | class LaunchMany: | |
149 | def __init__(self, config, Output): | |
150 | try: | |
151 | self.config = config | |
152 | self.Output = Output | |
153 | ||
154 | self.torrent_dir = config['torrent_dir'] | |
155 | self.torrent_cache = {} | |
156 | self.file_cache = {} | |
157 | self.blocked_files = {} | |
158 | self.scan_period = config['parse_dir_interval'] | |
159 | self.stats_period = config['display_interval'] | |
160 | ||
161 | self.torrent_list = [] | |
162 | self.downloads = {} | |
163 | self.counter = 0 | |
164 | self.doneflag = Event() | |
165 | ||
166 | self.hashcheck_queue = [] | |
167 | self.hashcheck_current = None | |
168 | ||
169 | self.rawserver = RawServer(self.doneflag, config['timeout_check_interval'], | |
170 | config['timeout'], ipv6_enable = config['ipv6_enabled'], | |
171 | failfunc = self.failed, errorfunc = self.exchandler) | |
172 | upnp_type = UPnP_test(config['upnp_nat_access']) | |
173 | while True: | |
174 | try: | |
175 | self.listen_port = self.rawserver.find_and_bind( | |
176 | config['minport'], config['maxport'], config['bind'], | |
177 | ipv6_socket_style = config['ipv6_binds_v4'], | |
178 | upnp = upnp_type, randomizer = config['random_port']) | |
179 | break | |
180 | except socketerror, e: | |
181 | if upnp_type and e == UPnP_ERROR: | |
182 | self.Output.message('WARNING: COULD NOT FORWARD VIA UPnP') | |
183 | upnp_type = 0 | |
184 | continue | |
185 | self.failed("Couldn't listen - " + str(e)) | |
186 | return | |
187 | ||
188 | self.ratelimiter = RateLimiter(self.rawserver.add_task, | |
189 | config['upload_unit_size']) | |
190 | self.ratelimiter.set_upload_rate(config['max_upload_rate']) | |
191 | ||
192 | self.handler = MultiHandler(self.rawserver, self.doneflag) | |
193 | seed(createPeerID()) | |
194 | self.rawserver.add_task(self.scan, 0) | |
195 | self.rawserver.add_task(self.stats, 0) | |
196 | ||
197 | self.handler.listen_forever() | |
198 | ||
199 | self.Output.message('shutting down') | |
200 | self.hashcheck_queue = [] | |
201 | for hash in self.torrent_list: | |
202 | self.Output.message('dropped "'+self.torrent_cache[hash]['path']+'"') | |
203 | self.downloads[hash].shutdown() | |
204 | self.rawserver.shutdown() | |
205 | ||
206 | except: | |
207 | data = StringIO() | |
208 | print_exc(file = data) | |
209 | Output.exception(data.getvalue()) | |
210 | ||
211 | ||
212 | def scan(self): | |
213 | self.rawserver.add_task(self.scan, self.scan_period) | |
214 | ||
215 | r = parsedir(self.torrent_dir, self.torrent_cache, | |
216 | self.file_cache, self.blocked_files, | |
217 | return_metainfo = True, errfunc = self.Output.message) | |
218 | ||
219 | ( self.torrent_cache, self.file_cache, self.blocked_files, | |
220 | added, removed ) = r | |
221 | ||
222 | for hash, data in removed.items(): | |
223 | self.Output.message('dropped "'+data['path']+'"') | |
224 | self.remove(hash) | |
225 | for hash, data in added.items(): | |
226 | self.Output.message('added "'+data['path']+'"') | |
227 | self.add(hash, data) | |
228 | ||
229 | def stats(self): | |
230 | self.rawserver.add_task(self.stats, self.stats_period) | |
231 | data = [] | |
232 | for hash in self.torrent_list: | |
233 | cache = self.torrent_cache[hash] | |
234 | if self.config['display_path']: | |
235 | name = cache['path'] | |
236 | else: | |
237 | name = cache['name'] | |
238 | size = cache['length'] | |
239 | d = self.downloads[hash] | |
240 | progress = '0.0%' | |
241 | peers = 0 | |
242 | seeds = 0 | |
243 | seedsmsg = "S" | |
244 | dist = 0.0 | |
245 | uprate = 0.0 | |
246 | dnrate = 0.0 | |
247 | upamt = 0 | |
248 | dnamt = 0 | |
249 | t = 0 | |
250 | if d.is_dead(): | |
251 | status = 'stopped' | |
252 | elif d.waiting: | |
253 | status = 'waiting for hash check' | |
254 | elif d.checking: | |
255 | status = d.status_msg | |
256 | progress = '%.1f%%' % (d.status_done*100) | |
257 | else: | |
258 | stats = d.statsfunc() | |
259 | s = stats['stats'] | |
260 | if d.seed: | |
261 | status = 'seeding' | |
262 | progress = '100.0%' | |
263 | seeds = s.numOldSeeds | |
264 | seedsmsg = "s" | |
265 | dist = s.numCopies | |
266 | else: | |
267 | if s.numSeeds + s.numPeers: | |
268 | t = stats['time'] | |
269 | if t == 0: # unlikely | |
270 | t = 0.01 | |
271 | status = fmttime(t) | |
272 | else: | |
273 | t = -1 | |
274 | status = 'connecting to peers' | |
275 | progress = '%.1f%%' % (int(stats['frac']*1000)/10.0) | |
276 | seeds = s.numSeeds | |
277 | dist = s.numCopies2 | |
278 | dnrate = stats['down'] | |
279 | peers = s.numPeers | |
280 | uprate = stats['up'] | |
281 | upamt = s.upTotal | |
282 | dnamt = s.downTotal | |
283 | ||
284 | if d.is_dead() or d.status_errtime+300 > clock(): | |
285 | msg = d.status_err[-1] | |
286 | else: | |
287 | msg = '' | |
288 | ||
289 | data.append(( name, status, progress, peers, seeds, seedsmsg, dist, | |
290 | uprate, dnrate, upamt, dnamt, size, t, msg )) | |
291 | stop = self.Output.display(data) | |
292 | if stop: | |
293 | self.doneflag.set() | |
294 | ||
295 | def remove(self, hash): | |
296 | self.torrent_list.remove(hash) | |
297 | self.downloads[hash].shutdown() | |
298 | del self.downloads[hash] | |
299 | ||
300 | def add(self, hash, data): | |
301 | c = self.counter | |
302 | self.counter += 1 | |
303 | x = '' | |
304 | for i in xrange(3): | |
305 | x = mapbase64[c & 0x3F]+x | |
306 | c >>= 6 | |
307 | peer_id = createPeerID(x) | |
308 | d = SingleDownload(self, hash, data['metainfo'], self.config, peer_id) | |
309 | self.torrent_list.append(hash) | |
310 | self.downloads[hash] = d | |
311 | d.start() | |
312 | ||
313 | ||
314 | def saveAs(self, hash, name, saveas, isdir): | |
315 | x = self.torrent_cache[hash] | |
316 | style = self.config['saveas_style'] | |
317 | if style == 1 or style == 3: | |
318 | if saveas: | |
319 | saveas = os.path.join(saveas,x['file'][:-1-len(x['type'])]) | |
320 | else: | |
321 | saveas = x['path'][:-1-len(x['type'])] | |
322 | if style == 3: | |
323 | if not os.path.isdir(saveas): | |
324 | try: | |
325 | os.mkdir(saveas) | |
326 | except: | |
327 | raise OSError("couldn't create directory for "+x['path'] | |
328 | +" ("+saveas+")") | |
329 | if not isdir: | |
330 | saveas = os.path.join(saveas, name) | |
331 | else: | |
332 | if saveas: | |
333 | saveas = os.path.join(saveas, name) | |
334 | else: | |
335 | saveas = os.path.join(os.path.split(x['path'])[0], name) | |
336 | ||
337 | if isdir and not os.path.isdir(saveas): | |
338 | try: | |
339 | os.mkdir(saveas) | |
340 | except: | |
341 | raise OSError("couldn't create directory for "+x['path'] | |
342 | +" ("+saveas+")") | |
343 | return saveas | |
344 | ||
345 | ||
346 | def hashchecksched(self, hash = None): | |
347 | if hash: | |
348 | self.hashcheck_queue.append(hash) | |
349 | if not self.hashcheck_current: | |
350 | self._hashcheck_start() | |
351 | ||
352 | def _hashcheck_start(self): | |
353 | self.hashcheck_current = self.hashcheck_queue.pop(0) | |
354 | self.downloads[self.hashcheck_current].hashcheck_start(self.hashcheck_callback) | |
355 | ||
356 | def hashcheck_callback(self): | |
357 | self.downloads[self.hashcheck_current].hashcheck_callback() | |
358 | if self.hashcheck_queue: | |
359 | self._hashcheck_start() | |
360 | else: | |
361 | self.hashcheck_current = None | |
362 | ||
363 | def died(self, hash): | |
364 | if self.torrent_cache.has_key(hash): | |
365 | self.Output.message('DIED: "'+self.torrent_cache[hash]['path']+'"') | |
366 | ||
367 | def was_stopped(self, hash): | |
368 | try: | |
369 | self.hashcheck_queue.remove(hash) | |
370 | except: | |
371 | pass | |
372 | if self.hashcheck_current == hash: | |
373 | self.hashcheck_current = None | |
374 | if self.hashcheck_queue: | |
375 | self._hashcheck_start() | |
376 | ||
377 | def failed(self, s): | |
378 | self.Output.message('FAILURE: '+s) | |
379 | ||
380 | def exchandler(self, s): | |
381 | self.Output.exception(s) |