Python implementation of multi thread crawl web page function example details

  • 2020-06-03 07:07:34
  • OfStack

This article describes the Python implementation of multithreaded web crawl function. To share for your reference, specific as follows:

Recently, 1 has been doing things related to web crawlers. Read 1 open source C++ written larbin crawler, carefully read the design ideas and 1 some key technology implementation.

1. Highly efficient bloom filter algorithm is reused by URL of larbin;
2. DNS processing, using adns asynchronous open source components;
3. For the processing of url queue, part of it is cached in memory and part of it is written to files.
4. larbin does a lot of work with files
5. There is connection pooling in larbin, by creating sockets, sending the GET method in HTTP protocol to the target site, getting the content, and then parsing header and the like
6. A large number of descriptors are reused for I/O using poll method, which is very efficient
7. larbin is very configurable
8. A lot of the data structures the authors used were written from the bottom up, with little use for things like STL
......

Still have a lot of, later have time to write an article well, sum up next.

These two days, python wrote a multithreaded download page program, I/O intensive applications, multithreading is obviously a good solution. The thread pool that I just wrote is also available. In fact, python crawling page is very simple, there is an urllib2 module, very convenient to use, the basic two or three lines of code can be done. Although it is very convenient to solve the problem by using the third party module, it is not good for personal technical accumulation, because the key algorithm is implemented by others, not by yourself, and many details are not known by you. We do technology, not just using modules written by others or api, we have to do it ourselves, so that we can learn more.

I decided to write from socket, is also to encapsulate GET protocol, header parsing process can also be separately processed DNS, such as DNS cache 1, so write their own, more controllable, more favorable for expansion. For timeout, I used a global timeout of 5 seconds, and for relocation (301or302), a maximum of 3 relocations, because in the previous test, I found that many sites repositioned themselves, so the loop was infinite, so an upper limit was set. Specific principle, simpler, see code directly good.

After I wrote it, I made a performance comparison with urllib2. The efficiency of my writing was relatively high, and the error rate of urllib2 was a little higher. I don't know why. Some people on the Internet say that urllib2 has some small problems in the context of multi-threading, I am not particularly clear about the specifics.

First post code:

fetchPage. py USES Get method of Http protocol to download pages and store them as files


'''
Created on 2012-3-13
Get Page using GET method
Default using HTTP Protocol , http port 80
@author: xiaojay
'''
import socket
import statistics
import datetime
import threading
socket.setdefaulttimeout(statistics.timeout)
class Error404(Exception):
  '''Can not find the page.'''
  pass
class ErrorOther(Exception):
  '''Some other exception'''
  def __init__(self,code):
    #print 'Code :',code
    pass
class ErrorTryTooManyTimes(Exception):
  '''try too many times'''
  pass
def downPage(hostname ,filename , trytimes=0):
  try :
    #To avoid too many tries .Try times can not be more than max_try_times
    if trytimes >= statistics.max_try_times :
      raise ErrorTryTooManyTimes
  except ErrorTryTooManyTimes :
    return statistics.RESULTTRYTOOMANY,hostname+filename
  try:
    s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    #DNS cache
    if statistics.DNSCache.has_key(hostname):
      addr = statistics.DNSCache[hostname]
    else:
      addr = socket.gethostbyname(hostname)
      statistics.DNSCache[hostname] = addr
    #connect to http server ,default port 80
    s.connect((addr,80))
    msg = 'GET '+filename+' HTTP/1.0\r\n'
    msg += 'Host: '+hostname+'\r\n'
    msg += 'User-Agent:xiaojay\r\n\r\n'
    code = ''
    f = None
    s.sendall(msg)
    first = True
    while True:
      msg = s.recv(40960)
      if not len(msg):
        if f!=None:
          f.flush()
          f.close()
        break
      # Head information must be in the first recv buffer
      if first:
        first = False
        headpos = msg.index("\r\n\r\n")
        code,other = dealwithHead(msg[:headpos])
        if code=='200':
          #statistics.fetched_url += 1
          f = open('pages/'+str(abs(hash(hostname+filename))),'w')
          f.writelines(msg[headpos+4:])
        elif code=='301' or code=='302':
          #if code is 301 or 302 , try down again using redirect location
          if other.startswith("http") :
            hname, fname = parse(other)
            downPage(hname,fname,trytimes+1)#try again
          else :
            downPage(hostname,other,trytimes+1)
        elif code=='404':
          raise Error404
        else :
          raise ErrorOther(code)
      else:
        if f!=None :f.writelines(msg)
    s.shutdown(socket.SHUT_RDWR)
    s.close()
    return statistics.RESULTFETCHED,hostname+filename
  except Error404 :
    return statistics.RESULTCANNOTFIND,hostname+filename
  except ErrorOther:
    return statistics.RESULTOTHER,hostname+filename
  except socket.timeout:
    return statistics.RESULTTIMEOUT,hostname+filename
  except Exception, e:
    return statistics.RESULTOTHER,hostname+filename
def dealwithHead(head):
  '''deal with HTTP HEAD'''
  lines = head.splitlines()
  fstline = lines[0]
  code =fstline.split()[1]
  if code == '404' : return (code,None)
  if code == '200' : return (code,None)
  if code == '301' or code == '302' :
    for line in lines[1:]:
      p = line.index(':')
      key = line[:p]
      if key=='Location' :
        return (code,line[p+2:])
  return (code,None)
def parse(url):
  '''Parse a url to hostname+filename'''
  try:
    u = url.strip().strip('\n').strip('\r').strip('\t')
    if u.startswith('http://') :
      u = u[7:]
    elif u.startswith('https://'):
      u = u[8:]
    if u.find(':80')>0 :
      p = u.index(':80')
      p2 = p + 3
    else:
      if u.find('/')>0:
        p = u.index('/')
        p2 = p
      else:
        p = len(u)
        p2 = -1
    hostname = u[:p]
    if p2>0 :
      filename = u[p2:]
    else : filename = '/'
    return hostname, filename
  except Exception ,e:
    print "Parse wrong : " , url
    print e
def PrintDNSCache():
  '''print DNS dict'''
  n = 1
  for hostname in statistics.DNSCache.keys():
    print n,'\t',hostname, '\t',statistics.DNSCache[hostname]
    n+=1
def dealwithResult(res,url):
  '''Deal with the result of downPage'''
  statistics.total_url+=1
  if res==statistics.RESULTFETCHED :
    statistics.fetched_url+=1
    print statistics.total_url , '\t fetched :', url
  if res==statistics.RESULTCANNOTFIND :
    statistics.failed_url+=1
    print "Error 404 at : ", url
  if res==statistics.RESULTOTHER :
    statistics.other_url +=1
    print "Error Undefined at : ", url
  if res==statistics.RESULTTIMEOUT :
    statistics.timeout_url +=1
    print "Timeout ",url
  if res==statistics.RESULTTRYTOOMANY:
    statistics.trytoomany_url+=1
    print e ,"Try too many times at", url
if __name__=='__main__':
  print 'Get Page using GET method'

Below, I will use the thread pool in the previous article as an aid to realize parallel crawling under multiple threads, and compare the performance of the download page written by myself with urllib2 under 1.


'''
Created on 2012-3-16
@author: xiaojay
'''
import fetchPage
import threadpool
import datetime
import statistics
import urllib2
'''one thread'''
def usingOneThread(limit):
  urlset = open("input.txt","r")
  start = datetime.datetime.now()
  for u in urlset:
    if limit <= 0 : break
    limit-=1
    hostname , filename = parse(u)
    res= fetchPage.downPage(hostname,filename,0)
    fetchPage.dealwithResult(res)
  end = datetime.datetime.now()
  print "Start at :\t" , start
  print "End at :\t" , end
  print "Total Cost :\t" , end - start
  print 'Total fetched :', statistics.fetched_url
'''threadpoll and GET method'''
def callbackfunc(request,result):
  fetchPage.dealwithResult(result[0],result[1])
def usingThreadpool(limit,num_thread):
  urlset = open("input.txt","r")
  start = datetime.datetime.now()
  main = threadpool.ThreadPool(num_thread)
  for url in urlset :
    try :
      hostname , filename = fetchPage.parse(url)
      req = threadpool.WorkRequest(fetchPage.downPage,args=[hostname,filename],kwds={},callback=callbackfunc)
      main.putRequest(req)
    except Exception:
      print Exception.message
  while True:
    try:
      main.poll()
      if statistics.total_url >= limit : break
    except threadpool.NoResultsPending:
      print "no pending results"
      break
    except Exception ,e:
      print e
  end = datetime.datetime.now()
  print "Start at :\t" , start
  print "End at :\t" , end
  print "Total Cost :\t" , end - start
  print 'Total url :',statistics.total_url
  print 'Total fetched :', statistics.fetched_url
  print 'Lost url :', statistics.total_url - statistics.fetched_url
  print 'Error 404 :' ,statistics.failed_url
  print 'Error timeout :',statistics.timeout_url
  print 'Error Try too many times ' ,statistics.trytoomany_url
  print 'Error Other faults ',statistics.other_url
  main.stop()
'''threadpool and urllib2 '''
def downPageUsingUrlib2(url):
  try:
    req = urllib2.Request(url)
    fd = urllib2.urlopen(req)
    f = open("pages3/"+str(abs(hash(url))),'w')
    f.write(fd.read())
    f.flush()
    f.close()
    return url ,'success'
  except Exception:
    return url , None
def writeFile(request,result):
  statistics.total_url += 1
  if result[1]!=None :
    statistics.fetched_url += 1
    print statistics.total_url,'\tfetched :', result[0],
  else:
    statistics.failed_url += 1
    print statistics.total_url,'\tLost :',result[0],
def usingThreadpoolUrllib2(limit,num_thread):
  urlset = open("input.txt","r")
  start = datetime.datetime.now()
  main = threadpool.ThreadPool(num_thread)
  for url in urlset :
    try :
      req = threadpool.WorkRequest(downPageUsingUrlib2,args=[url],kwds={},callback=writeFile)
      main.putRequest(req)
    except Exception ,e:
      print e
  while True:
    try:
      main.poll()
      if statistics.total_url >= limit : break
    except threadpool.NoResultsPending:
      print "no pending results"
      break
    except Exception ,e:
      print e
  end = datetime.datetime.now()
  print "Start at :\t" , start
  print "End at :\t" , end
  print "Total Cost :\t" , end - start
  print 'Total url :',statistics.total_url
  print 'Total fetched :', statistics.fetched_url
  print 'Lost url :', statistics.total_url - statistics.fetched_url
  main.stop()
if __name__ =='__main__':
  '''too slow'''
  #usingOneThread(100)
  '''use Get method'''
  #usingThreadpool(3000,50)
  '''use urllib2'''
  usingThreadpoolUrllib2(3000,50)

Experimental analysis:

Experimental data: the 3000 url captured by larbin and the url set processed by Mercator queue model (realized by c++ and sent by blog later) are random and representative. Use a thread pool of 50 threads.
Experimental environment: ubuntu10.04, good network, python2.6
Storage: small files, 1 file per page for storage
PS: Since the school is charging by the amount of data on the Internet, do web crawler, often cost the amount of traffic ah!! In a few days, we'll probably do a large-scale experiment with url downloads, and try url with a few hundred thousand.

Experimental results:

Use urllib2, usingThreadpoolUrllib2(3000,50)

Start at : 2012-03-16 22:18:20.956054
End at : 2012-03-16 22:22:15.203018
Total Cost : 0:03:54.246964
Total url : 3001
Total fetched : 2442
Lost url : 559

Physical storage size for download pages: 84088kb

Use your own getPageUsingGet, usingThreadpool(3000,50)

Start at : 2012-03-16 22:23:40.206730
End at : 2012-03-16 22:26:26.843563
Total Cost : 0:02:46.636833
Total url : 3002
Total fetched : 2484
Lost url : 518
Error 404 : 94
Error timeout : 312
Error Try too many times 0
Error Other faults 112

Physical storage size for download pages: 87168kb

Summary: write their own download page program, efficiency is still very good, and the loss of the page is less. However, there are still many aspects that can be optimized if you consider 1. For example, the files are too scattered, and the creation and release of too many small files will surely incur considerable performance overhead. Moreover, the program is named hash, which will also generate a lot of calculations. In addition, DNS can also not use DNS parsing which comes with python, because the default DNS parsing is synchronous operation, while DNS parsing 1 is generally time-consuming, it can be conducted in a multi-threaded asynchronous manner, and the appropriate DNS caching can greatly improve efficiency. Not only that, in the actual page fetching process, there will be a large number of url, it is impossible to put them into memory once, but they should be allocated according to a certain strategy or algorithm. In short, the collection page to do things and can be optimized, there are many, many more.

Attachment: demo source code click here to download this site.

More about Python related topics: interested readers to view this site "Python process and thread skills summary", "Python Socket programming skills summary", "Python data structure and algorithm tutorial" and "Python function using techniques", "Python string skills summary", "Python introduction and advanced tutorial" and "Python file and directory skills summary"

I hope this article has been helpful in Python programming.


Related articles: