Implementation of Scrapy + adbapi in Python to Improve Database Writing Efficiency

  • 2021-12-11 07:53:03
  • OfStack

Directory 1: adbapi in twisted 1.1 Two main approaches
1.2 Use examples
2: Combine pipelines in scrapy

1: adbapi in twisted

commit () and execute () of the database pymysql submit data synchronously to the database. Because of the data parsing and asynchronous multithreading of scrapy framework, the data parsing speed of scrapy is much higher than the data writing speed of the database. If the data is written too slowly, it will block the database writing and affect the efficiency of database writing.
Using twisted asynchronous IO framework, the asynchronous writing of data can be realized, and the writing speed of data can be improved by writing data asynchronously through multi-thread.

1.1 Two main approaches

adbapi. ConnectionPool:

Create a database connection pool object that includes multiple connection objects, each working in a separate thread. adbapi only provides a programming framework for asynchronous access to databases, and it still makes libraries like MySQLdb access databases internally.

dbpool. runInteraction (do_insert, item):

Asynchronous call do_insert function, dbpool will select a connection object in the connection pool to call insert_db in an independent thread, where the parameter item will be passed to the second parameter of do_insert, and the first parameter passed to do_insert is an Transaction object. Its interface is similar to Cursor object, and execute method can be called to execute SQL statement. After do_insert is executed, the connection object will automatically call commit method

1.2 Use examples


from twisted.enterprise import adbapi

#  Initialize database connection pool ( Thread pool )
#  Parameter 1 : mysql Drive of 
#  Parameter 2 : Connection mysql Configuration information for 
dbpool = adbapi.ConnectionPool('pymysql', **params)

#  Parameter 1 Functions to execute in asynchronous tasks insert_db ; 
#  Parameter 2 Give this function insert_db Parameters passed 
query = self.dbpool.runInteraction(self.do_insert, item)

#  In execute() After that, there is no need to proceed commit() The commit operation is performed inside the connection pool. 
def do_insert(self, cursor, item):
    insert_sql = """
            insert into qa_sample( 
            need_id, 
            need_question_uptime, 
            need_title, 
            need_title_describe, 
            need_answer_uptime, 
            need_answer)
            values (%s, %s, %s, %s, %s, %s)
            """
    params = (item['need_id'],
              item['need_question_uptime'],
              item['need_title'],
              item['need_title_describe'],
              item['need_answer_uptime'],
              item['need_answer'])
    cursor.execute(insert_sql, params)

2: Combine pipelines in scrapy


# -*- coding: utf-8 -*-
from twisted.enterprise import adbapi
import pymysql
 
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
 
 
class QaSpiderPipeline(object):
    def process_item(self, item, spider):
        return item
 
class MysqlTwistedPipeline(object):
    def __init__(self, dbpool):
        self.dbpool = dbpool
 
    @classmethod
    def from_settings(cls, settings):
        dbparams = dict(
            host=settings['MYSQL_HOST'],
            db=settings['MYSQL_DBNAME'],
            user=settings['MYSQL_USER'],
            passwd=settings['MYSQL_PASSWORD'],
            charset='utf8',
            cursorclass=pymysql.cursors.DictCursor,
            use_unicode=True
        )
        dbpool = adbapi.ConnectionPool('pymysql', **dbparams)
        return cls(dbpool)
 
    def process_item(self, item, spider):
        query = self.dbpool.runInteraction(self.do_insert, item)
 
    def do_insert(self, cursor, item):
        insert_sql = """
                insert into qa_sample( 
                need_id, 
                need_question_uptime, 
                need_title, 
                need_title_describe, 
                need_answer_uptime, 
                need_answer)
                values (%s, %s, %s, %s, %s, %s)
                """
        params = (item['need_id'],
                  item['need_question_uptime'],
                  item['need_title'],
                  item['need_title_describe'],
                  item['need_answer_uptime'],
                  item['need_answer'])
        cursor.execute(insert_sql, params)

Related articles: