Python tornado queue example a concurrent web crawler code share

  • 2020-06-23 01:05:37
  • OfStack

Queue

The ES5en.queue module of Tornado implements an asynchronous producer/consumer pattern queue for coroutine based applications. This is similar to the queue module implemented by the python standard library for multithreaded environments.

The execution of a coroutine to yieldqueue. get is paused until there is an entry in the queue. If queue has an upper limit, the execution of yieldqueue. put for a coroutine will be paused until there is an empty place in the queue.

A reference count of one unfinished task is maintained within one queue, and each call to the put operation increases the reference count, while calling the task_done operation decreases the reference count.

Here is an example of a simple web crawler:

Initially, queue contained only one benchmark, url. When 1 worker pulls 1 url out of it, it parses the contained url from the corresponding page and queues it, then calls task_done to reduce the reference count by 1.

Finally, worker pulls out 1 url, and all of the url in the url page has already been processed, so there is no url in the queue. Calling task_done reduces the reference count to 0.

Thus, in the main coroutine, the join operation will lift the hang and end the master coroutine.

This crawler USES HTMLParse to parse the html page.


import time
from datetime import timedelta

try:
 from HTMLParser import HTMLParser
 from urlparse import urljoin, urldefrag
except ImportError:
 from html.parser import HTMLParser
 from urllib.parse import urljoin, urldefrag

from tornado import httpclient, gen, ioloop, queues

base_url = 'http://www.tornadoweb.org/en/stable/'
concurrency = 10


@gen.coroutine
def get_links_from_url(url):
 """Download the page at `url` and parse it for links.

 Returned links have had the fragment after `#` removed, and have been made
 absolute so, e.g. the URL 'gen.html#tornado.gen.coroutine' becomes
 'http://www.tornadoweb.org/en/stable/gen.html'.
 """
 try:
  response = yield httpclient.AsyncHTTPClient().fetch(url)
  print('fetched %s' % url)

  html = response.body if isinstance(response.body, str) \
   else response.body.decode()
  urls = [urljoin(url, remove_fragment(new_url))
    for new_url in get_links(html)]
 except Exception as e:
  print('Exception: %s %s' % (e, url))
  raise gen.Return([])

 raise gen.Return(urls)

# For from 1 Student: It contains fragments url Extract the real from the url.
def remove_fragment(url):  
 pure_url, frag = urldefrag(url)
 return pure_url


def get_links(html):
 class URLSeeker(HTMLParser):
  def __init__(self):
   HTMLParser.__init__(self)
   self.urls = []

	# From all a Tag in extract in href Properties. 
  def handle_starttag(self, tag, attrs):
   href = dict(attrs).get('href')
   if href and tag == 'a':
    self.urls.append(href)

 url_seeker = URLSeeker()
 url_seeker.feed(html)
 return url_seeker.urls


@gen.coroutine
def main():
 q = queues.Queue()
 start = time.time()
 fetching, fetched = set(), set()

 @gen.coroutine
 def fetch_url():
  current_url = yield q.get()
  try:
   if current_url in fetching:
    return

   print('fetching %s' % current_url)
   fetching.add(current_url)
   urls = yield get_links_from_url(current_url)
   fetched.add(current_url)

   for new_url in urls:
    # Only follow links beneath the base URL
    if new_url.startswith(base_url):
     yield q.put(new_url)

  finally:
   q.task_done()

 @gen.coroutine
 def worker():
  while True:
   yield fetch_url()

 q.put(base_url)

 # Start workers, then wait for the work queue to be empty.
 for _ in range(concurrency):
  worker()
 yield q.join(timeout=timedelta(seconds=300))
 assert fetching == fetched
 print('Done in %d seconds, fetched %s URLs.' % (
  time.time() - start, len(fetched)))


if __name__ == '__main__':
 import logging
 logging.basicConfig()
 io_loop = ioloop.IOLoop.current()
 io_loop.run_sync(main)

conclusion

Above, from the Tornado official website user guide introduction and example, this student carried out a simple translation, and then took the code. Time is a little short, this site has not carried out the installation of tornado and the test of this section of code, so there is no result demonstration, please forgive me.

The Python tornado queue example -1 concurrent web crawler code sharing introduction is here, I hope to help you. Interested friends can continue to refer to other related topics in this site, if there is any deficiency, welcome to comment out. Thank you for your support!


Related articles: