From: A. Jesse Jiryu Davis Date: Wed, 15 Apr 2015 01:24:42 +0000 (-0400) Subject: Add a web spider example to demonstrate Queue. X-Git-Tag: v4.2.0b1~29^2~2 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=41724baf6588178ddf101da632c91a59b3755df5;p=thirdparty%2Ftornado.git Add a web spider example to demonstrate Queue. --- diff --git a/demos/webspider/webspider.py b/demos/webspider/webspider.py new file mode 100644 index 000000000..97cdd62dd --- /dev/null +++ b/demos/webspider/webspider.py @@ -0,0 +1,113 @@ +"""A trivial web-spider that crawls all the pages in http://tornadoweb.org. + +``spider()`` downloads the page at `base_url` and any pages it links to, +recursively. It ignores pages that are not beneath `base_url` hierarchically. + +This function demonstrates `queues.Queue`, especially its methods +`~queues.Queue.join` and `~queues.Queue.task_done`. +The queue begins containing only +`base_url`, and each discovered URL is added to it. We wait for +`~queues.Queue.join` to complete before exiting. This ensures that +the function as a whole ends when all URLs have been downloaded. +""" + +# start-file +import HTMLParser +import time +import urlparse +from datetime import timedelta + +from tornado import httpclient, gen, ioloop, queues + +base_url = 'http://www.tornadoweb.org/en/stable/' +concurrency = 10 + + +@gen.coroutine +def get_links_from_url(url): + """Download the page at `url` and parse it for links. + + Returned links have had the fragment after `#` removed, and have been made + absolute so, e.g. the URL 'gen.html#tornado.gen.coroutine' becomes + 'http://www.tornadoweb.org/en/stable/gen.html'. + """ + try: + response = yield httpclient.AsyncHTTPClient().fetch(url) + print('fetched %s' % url) + urls = [urlparse.urljoin(url, remove_fragment(new_url)) + for new_url in get_links(response.body)] + except Exception as e: + print('Exception: %s %s' % (e, url)) + raise gen.Return([]) + + raise gen.Return(urls) + + +def remove_fragment(url): + scheme, netloc, url, params, query, fragment = urlparse.urlparse(url) + return urlparse.urlunparse((scheme, netloc, url, params, query, '')) + + +def get_links(html): + class URLSeeker(HTMLParser.HTMLParser): + def __init__(self): + HTMLParser.HTMLParser.__init__(self) + self.urls = [] + + def handle_starttag(self, tag, attrs): + href = dict(attrs).get('href') + if href and tag == 'a': + self.urls.append(href) + + url_seeker = URLSeeker() + url_seeker.feed(html) + return url_seeker.urls + + +@gen.coroutine +def main(): + q = queues.Queue() + start = time.time() + fetching, fetched = set(), set() + + @gen.coroutine + def fetch_url(): + current_url = yield q.get() + try: + if current_url in fetching: + return + + print('fetching %s' % current_url) + fetching.add(current_url) + urls = yield get_links_from_url(current_url) + fetched.add(current_url) + + for new_url in urls: + # Only follow links beneath the base URL + if new_url.startswith(base_url): + yield q.put(new_url) + + finally: + q.task_done() + + @gen.coroutine + def worker(): + while True: + yield fetch_url() + + q.put(base_url) + + # Start workers, then wait for the work queue to be empty. + for _ in range(concurrency): + worker() + yield q.join(timeout=timedelta(seconds=300)) + assert fetching == fetched + print('Done in %d seconds, fetched %s URLs.' % ( + time.time() - start, len(fetched))) + + +if __name__ == '__main__': + import logging + logging.basicConfig() + io_loop = ioloop.IOLoop.current() + io_loop.run_sync(main) diff --git a/docs/guide.rst b/docs/guide.rst index 96aab0257..b9090b25f 100644 --- a/docs/guide.rst +++ b/docs/guide.rst @@ -6,6 +6,7 @@ User's guide guide/intro guide/async guide/coroutines + guide/queues guide/structure guide/templates guide/security diff --git a/docs/guide/queues.rst b/docs/guide/queues.rst new file mode 100644 index 000000000..a4bf97c5e --- /dev/null +++ b/docs/guide/queues.rst @@ -0,0 +1,25 @@ +:class:`~tornado.queues.Queue` example - a concurrent web spider +================================================================ + +.. currentmodule:: tornado.queues + +Tornado's `tornado.queues` module implements an asynchronous producer / +consumer pattern for coroutines, analogous to the pattern implemented for +threads by the Python standard library's `queue` module. + +A coroutine that yields `Queue.get` pauses until there is an item in the queue. +If the queue has a maximum size set, a coroutine that yields `Queue.put` pauses +until there is room for another item. + +A `~Queue` maintains a count of unfinished tasks, which begins at zero. +`~Queue.put` increments the count; `~Queue.task_done` decrements it. + +In the web-spider example here, when a worker fetches a page it parses the +links and puts new ones in the queue, then calls `~Queue.task_done` to +decrement the counter once. Eventually, a worker fetches a page whose URLs have +all been seen before, and there is also no work left in the queue. Thus that +worker's call to `~Queue.task_done` decrements the counter to zero. The main +coroutine, which is waiting for `~Queue.join`, is unpaused and finishes. + +.. literalinclude:: ../../demos/webspider/webspider.py + :start-after: # start-file