Python multithreaded task instance

  • 2021-12-12 08:55:37
  • OfStack

Directory

Send a good meal every day Excel After we import the summarized dining data into the database, the administrative office service compares it with the dining data in the company for duplicate checking.

The initial implementation is single-threaded, and import_records After removing multithreading, the part is similar.

Read Excel data- > Send to the administrative service interface

For safety reasons, online operation is carried out at night. Runtime found that each piece of data import consumes more than 1s, and it makes people crash to start running these thousands of pieces of data at 10 o'clock in the evening.

Waiting is also dry, go downstairs and turn around for air. The dirty air in the house makes people groggy, and the cold makes people awake. Suddenly, I think why don't you use multi-threading?

Version 1 multithreading and business processing programs are combined in one case, which is as difficult to read as shit 1. In the next two days, I took a little time to reconstruct several versions, and separated a thread pool, iterator and import_records .

A lot of clarity, but the iterator is exposed and needs import_records Call 1 to judge whether the current task is handled by the current thread, which is similar to the idea of co-process.

There are good and bad exposures, but they have basically met the daily use, so they can be put on one side first. It's a pleasure to read books and watch movies.


import threading

def task_pool(thread_num, task_fn):

  if thread_num <= 0 :
      raise ValueError

  threads = []

  def gen_thread_checker(thread_id, step):

      base = 1
      i = 0

      def thread_checker():
          nonlocal i

          i += 1
          # print((thread_id,i,step, i < base or (i - base) % step != thread_id))

          if i < base or (i - base) % step != thread_id:
              return False

          return True

      return thread_checker


  for x in range(0, thread_num):
    threads.append(threading.Thread(target=task_fn, args=(x,thread_num, gen_thread_checker(x, thread_num))))

  #  Start all threads 
  for t in threads:
    t.start()
  #  Waiting for all child threads in the main thread to exit 
  for t in threads:
    t.join()


import argparse
import re

import requests
from openpyxl import load_workbook
from requests import RequestException

import myThread

parser = argparse.ArgumentParser(description=' Import of transaction data from delicious food to store ')
parser.add_argument('--filename', '-f', help=' Meal to store transaction data  .xlsx  File path ', required=True)
parser.add_argument('--thread_num', '-t', help=' Number of threads ', default= 100, required=False)
parser.add_argument('--debug', '-d', help=' Debug mode ', default= 0, required=False)
args = parser.parse_args()

filename = args.filename
thread_num = int(args.thread_num)
debug = args.debug

if debug:
    print((filename,thread_num,debug))


def add_meican_meal_record(data):
   pass

def import_records(thread_id, thread_number, thread_checker):
    wb = load_workbook(filename=filename)
    ws = wb.active

    for row in ws:
        #------------------------------------------
        if row[0].value is None:
            break

        if not thread_checker():
            continue
        #------------------------------------------

        if row[0].value == ' Date ' or row[0].value == ' Total ' or not re.findall('^\d{4}-\d{1,2}-\d{1,2}$', row[0].value):
            continue
        else:

            date = str.replace(row[0].value,'-', '')

            order_id = row[3].value
            restaurant_name = row[5].value
            meal_plan_name = row[6].value
            meal_staffid = row[10].value
            identify = row[11].value
    
            add_meican_meal_record({
                'orderId':order_id,
                'date': date,
                'meal_plan_name':meal_plan_name,
                'meal_staffid':meal_staffid,
                'identify':identify,
                'restaurant_name':restaurant_name
            })

myThread.task_pool(thread_num,import_records)

Related articles: