Operation of python multiprocessing Multi process Parallel Computing

  • 2021-09-16 07:29:55
  • OfStack

multiprocessing package of python is a multi-process parallel computing package provided by the standard library, which provides API functions similar to threading (multithreading), but assigns tasks to different CPU compared with threading, avoiding the limitation of GIL (Global Interpreter Lock).

Next, we introduce the Pool and Process classes in multiprocessing.

Pool

It is more convenient to use Pool process pool to process tasks in parallel. We can specify the number of parallel CPU, and then Pool will automatically put tasks into the process pool to run. Pool contains several parallel functions.

apply apply_async

apply executes tasks one by one and has been deprecated in python3, while apply_async is an asynchronous execution version of apply. Parallel computation 1 must use apply_async function.


import multiprocessing
import time
from random import randint, seed
def f(num):
  seed()
  rand_num = randint(0,10) #  Randomly generated every time 1 Pause time 
  time.sleep(rand_num)
  return (num, rand_num)
start_time = time.time()
cores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=cores)
pool_list = []
result_list = []
start_time = time.time()
for xx in xrange(10):
  pool_list.append(pool.apply_async(f, (xx, ))) #  Not here  get ,   Blocks the process 
result_list = [xx.get() for xx in pool_list]
# Here, some people have to wonder why not directly in  for  Direct in loop  result.get() What about? This is because pool.apply_async Subsequent statements are blocking execution, calling  result.get()  Will wait 1 The following tasks will not be assigned until the tasks are executed 1 A mission. In fact, getting the return value is best done after the process pool is reclaimed to avoid blocking subsequent statements. 
#  Finally, we use 1 The following statement recycles the process pool:   
pool.close()
pool.join()
print result_list
print ' Time spent in parallel  %.2f' % (time.time() - start_time)
print ' Serial time spent  %.2f' % (sum([xx[1] for xx in result_list]))
#[(0, 8), (1, 2), (2, 4), (3, 9), (4, 0), (5, 1), (6, 8), (7, 3), (8, 4), (9, 6)]
# Time spent in parallel  14.11
# Serial time spent  45.00

map map_async

map_async is the asynchronous execution function of map.

Compared with apply_async, map_async can only accept one parameter.


import time
from multiprocessing import Pool
def run(fn):
 #fn:  Function arguments are the 1 Elements 
 time.sleep(1)
 return fn*fn
if __name__ == "__main__":
 testFL = [1,2,3,4,5,6] 
 print ' Serial :' # Sequential execution ( That is, serial execution, single process )
 s = time.time()
 for fn in testFL:
  run(fn)
 e1 = time.time()
 print " Sequential execution time: ", int(e1 - s)
 print ' Parallel :' # Create multiple processes and execute them in parallel 
 pool = Pool(4) # Create Owns 5 Process pool with the number of processes 
 #testFL: The list of data to process, run : Handling testFL Function of data in list 
 rl =pool.map(run, testFL) 
 pool.close()# Close the process pool and no longer accept new processes 
 pool.join()# The main process blocks and waits for the child process to exit 
 e2 = time.time()
 print " Parallel execution time: ", int(e2-e1)
 print rl
#  Serial :
#  Sequential execution time:  6
#  Parallel :
#  Parallel execution time:  2
# [1, 4, 9, 16, 25, 36]

Process

When using Process, we must pay attention to the fact that Process object is used to create processes, and every process occupies 1 CPU, so the number of processes to be established must be less than or equal to CPU.

If too many processes are started, especially when CPU-intensive tasks are encountered, the efficiency of parallelism will be reduced.


#16.6.1.1. The Process class
from multiprocessing import Process, cpu_count
import os
import time
start_time = time.time()
def info(title):
#   print(title)
  if hasattr(os, 'getppid'): # only available on Unix
    print 'parent process:', os.getppid()
  print 'process id:', os.getpid()
  time.sleep(3)
def f(name):
  info('function f')
  print 'hello', name
if __name__ == '__main__':
#   info('main line')
  p_list = [] #  Save Process New process 
  cpu_num = cpu_count()
  for xx in xrange(cpu_num):
    p_list.append(Process(target=f, args=('xx_%s' % xx,)))
  for xx in p_list:
    xx.start()
  for xx in p_list:
    xx.join()
  print('spend time: %.2f' % (time.time() - start_time))
parent process: 11741
# parent process: 11741
# parent process: 11741
# process id: 12249
# process id: 12250
# parent process: 11741
# process id: 12251
# process id: 12252
# hello xx_1
# hello xx_0
# hello xx_2
# hello xx_3
# spend time: 3.04

Interprocess communication

Both Process and Pool support Queues and Pipes types of communication.

Queue queue

Queues follow the principle of first in, first out and can be used between processes.


# 16.6.1.2. Exchanging objects between processes
# Queues
from multiprocessing import Process, Queue
def f(q):
  q.put([42, None, 'hello'])
if __name__ == '__main__':
  q = Queue()
  p = Process(target=f, args=(q,))
  p.start()
  print q.get()  # prints "[42, None, 'hello']"
  p.join()

pipe


from multiprocessing import Process, Pipe
def f(conn):
  conn.send([42, None, 'hello'])
  conn.close()
if __name__ == '__main__':
  parent_conn, child_conn = Pipe()
  p = Process(target=f, args=(child_conn,))
  p.start()
  print parent_conn.recv()  # prints "[42, None, 'hello']"
  p.join()

Comparison between queue and pipe

Pipe() can only have two endpoints.

Queue() can have multiple producers and consumers.

When to use them

If you need more than two points to communicate, use a Queue().

If you need absolute performance, a Pipe() is much faster because Queue() is built on top of Pipe().

Reference:

https://stackoverflow.com/questions/8463008/python-multiprocessing-pipe-vs-queue

Shared resources

Multiple processes should avoid sharing resources. In multithreading, we can easily share resources, such as using global variables or passing parameters.

In multi-process situations, the above approach is not appropriate because each process has its own independent memory space.

At this time, we can share resources by sharing memory and Manager.

But doing so increases the complexity of the program and reduces the efficiency of the program because of the need for synchronization.

Shared memory

Shared memory is only applicable to Process class, not for process pool Pool


# 16.6.1.4. Sharing state between processes
# Shared memory
from multiprocessing import Process, Value, Array
def f(n, a):
  n.value = 3.1415927
  for i in range(len(a)):
    a[i] = -a[i]
if __name__ == '__main__':
  num = Value('d', 0.0)
  arr = Array('i', range(10))
  p = Process(target=f, args=(num, arr))
  p.start()
  p.join()
  print num.value
  print arr[:]
# 3.1415927
# [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

Manager Class

Manager Class can be used for both Process and process pool Pool.


from multiprocessing import Manager, Process
def f(d, l, ii):
  d[ii] = ii
  l.append(ii)
if __name__ == '__main__':
  manager = Manager()
  d = manager.dict()
  l = manager.list(range(10))
  p_list = [] 
  for xx in range(4):
    p_list.append(Process(target=f, args=(d, l, xx)))
  for xx in p_list:
    xx.start()
  for xx in p_list:
    xx.join()
  print d
  print l
# {0: 0, 1: 1, 2: 2, 3: 3}
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3]

Supplement: python program multi-process runtime calculation/multi-process write data/multi-process read data


import time
time_start=time.time()
time_end=time.time()
print('time cost',time_end-time_start , 's')

The unit is seconds, and it can also be converted into other units for output

Note that when writing tests, the function name should start with test, otherwise it will not run.

Problems in multithreading:

1) Multi-threaded storage of data:


def test_save_features_to_db(self):
    df1 = pd.read_csv('/home/sc/PycharmProjects/risk-model/xg_test/statis_data/shixin_company.csv')
    com_list = df1['company_name'].values.tolist()
    # com_list = com_list[400015:400019]
    # print 'test_save_features_to_db'
    # print(com_list)
    p_list = [] #  Process list 
    i = 1
    p_size = len(com_list)
    for company_name in com_list:
      #  Create process 
      p = Process(target=self.__save_data_iter_method, args=[company_name])
      # p.daemon = True
      p_list.append(p)
      #  Intermittent execution process 
      if i % 20 == 0 or i == p_size: # 20 Page processing 1 Times,   Finally 1 Page processing residue 
        for p in p_list:
          p.start()
        for p in p_list:
          p.join() #  Wait for the process to end 
        p_list = [] #  Empty the list of processes 
      i += 1

Summary: When writing to multiple processes, lock is not required, and no return value is required.

Core p = Process (target=self.__save_data_iter_method, args= [company_name]), where target points to a full iteration of multiple processes and arg is the input to that iteration.

Note that the writing args= [company_name] is correct. The original writing: args=company_name, args= (company_name) will report the following error: only one parameter is needed, but 34 parameters are given.

Multi-process outer loop is determined by input, and there are as many loops as there are inputs. Understand p. start and p. join;


def __save_data_iter_method(self, com):
    # time_start = time.time()
    # print(com)
    f_d_t = ShiXinFeaturesDealSvc()
    res = f_d_t.get_time_features(company_name=com)
    #  Whether to break one's promise 
    shixin_label = res.shixin_label
    key1 = res.shixin_time
    if key1:
      public_at = res.shixin_time
      company_name = res.time_map_features[key1].company_name
      # print(company_name)
      established_years = res.time_map_features[key1].established_years
      industry_dx_rate = res.time_map_features[key1].industry_dx_rate
      regcap_change_cnt = res.time_map_features[key1].regcap_change_cnt
      share_change_cnt = res.time_map_features[key1].share_change_cnt
      industry_dx_cnt = res.time_map_features[key1].industry_dx_cnt
      address_change_cnt = res.time_map_features[key1].address_change_cnt
      fr_change_cnt = res.time_map_features[key1].fr_change_cnt
      judgedoc_cnt = res.time_map_features[key1].judgedoc_cnt
      bidding_cnt = res.time_map_features[key1].bidding_cnt
      trade_mark_cnt = res.time_map_features[key1].trade_mark_cnt
      network_share_cancel_cnt = res.time_map_features[key1].network_share_cancel_cnt
      cancel_cnt = res.time_map_features[key1].cancel_cnt
      industry_all_cnt = res.time_map_features[key1].industry_all_cnt
      network_share_zhixing_cnt = res.time_map_features[key1].network_share_zhixing_cnt
      network_share_judge_doc_cnt = res.time_map_features[key1].network_share_judge_doc_cnt
      net_judgedoc_defendant_cnt = res.time_map_features[key1].net_judgedoc_defendant_cnt
      judge_doc_cnt = res.time_map_features[key1].judge_doc_cnt
      f_d_do = ShixinFeaturesDto(company_name=company_name, established_years=established_years,
                    industry_dx_rate=industry_dx_rate, regcap_change_cnt=regcap_change_cnt,
                    share_change_cnt=share_change_cnt, industry_all_cnt=industry_all_cnt,
                    industry_dx_cnt=industry_dx_cnt, address_change_cnt=address_change_cnt,
                    fr_change_cnt=fr_change_cnt, judgedoc_cnt=judgedoc_cnt,
                    bidding_cnt=bidding_cnt, trade_mark_cnt=trade_mark_cnt,
                    network_share_cancel_cnt=network_share_cancel_cnt, cancel_cnt=cancel_cnt,
                    network_share_zhixing_cnt=network_share_zhixing_cnt,
                    network_share_judge_doc_cnt=network_share_judge_doc_cnt,
                    net_judgedoc_defendant_cnt=net_judgedoc_defendant_cnt,
                    judge_doc_cnt=judge_doc_cnt, public_at=public_at, shixin_label=shixin_label)
      # time_end = time.time()
      # print('totally cost', time_end - time_start)
      self.cfdbsvc.save_or_update_features(f_d_do)
def save_or_update_features(self, shixin_features_dto):
    """
     Add or update: 
     Insert 1 Row data ,  Insert if it does not exist, update if it exists 
    """
    self._pg_util = PgUtil()
    p_id = None
    if isinstance(shixin_features_dto, ShixinFeaturesDto):
      p_id = str(uuid.uuid1())
      self._pg_util.execute_sql(
        self.s_b.insert_or_update_row(
          self.model.COMPANY_NAME,
          {
            self.model.ID: p_id,
            #  Company name 
            self.model.COMPANY_NAME: shixin_features_dto.company_name,
            #  Time of dishonesty 
            self.model.PUBLIC_AT: shixin_features_dto.public_at,
            self.model.SHIXIN_LABEL : shixin_features_dto.shixin_label,
            self.model.ESTABLISHED_YEARS: shixin_features_dto.established_years, 
            self.model.INDUSTRY_DX_RATE: shixin_features_dto.industry_dx_rate, 
            self.model.REGCAP_CHANGE_CNT: shixin_features_dto.regcap_change_cnt, 
            self.model.SHARE_CHANGE_CNT: shixin_features_dto.share_change_cnt, 
            self.model.INDUSTRY_ALL_CNT: shixin_features_dto.industry_all_cnt, 
            self.model.INDUSTRY_DX_CNT: shixin_features_dto.industry_dx_cnt, 
            self.model.ADDRESS_CHANGE_CNT: shixin_features_dto.address_change_cnt, 
            self.model.NETWORK_SHARE_CANCEL_CNT: shixin_features_dto.network_share_cancel_cnt,
            self.model.CANCEL_CNT: shixin_features_dto.cancel_cnt, 
            self.model.NETWORK_SHARE_ZHIXING_CNT: shixin_features_dto.network_share_zhixing_cnt,
            self.model.FR_CHANGE_CNT: shixin_features_dto.fr_change_cnt, 
            self.model.JUDGEDOC_CNT: shixin_features_dto.judgedoc_cnt, 
            self.model.NETWORK_SHARE_JUDGE_DOC_CNT: shixin_features_dto.network_share_judge_doc_cnt,
            self.model.BIDDING_CNT: shixin_features_dto.bidding_cnt, 
            self.model.TRADE_MARK_CNT: shixin_features_dto.trade_mark_cnt, 
            self.model.JUDGE_DOC_CNT: shixin_features_dto.judge_doc_cnt 
          },
          [self.model.ADDRESS_CHANGE_CNT,self.model.BIDDING_CNT,self.model.CANCEL_CNT,
           self.model.ESTABLISHED_YEARS,self.model.FR_CHANGE_CNT,self.model.INDUSTRY_ALL_CNT,
           self.model.INDUSTRY_DX_RATE,self.model.INDUSTRY_DX_CNT,self.model.JUDGE_DOC_CNT,
           self.model.JUDGEDOC_CNT,self.model.NETWORK_SHARE_CANCEL_CNT,self.model.NETWORK_SHARE_JUDGE_DOC_CNT,
           self.model.NETWORK_SHARE_ZHIXING_CNT,self.model.REGCAP_CHANGE_CNT,self.model.TRADE_MARK_CNT,
           self.model.SHARE_CHANGE_CNT,self.model.SHIXIN_LABEL,self.model.PUBLIC_AT]
        )
      )
    return p_id

self. _pg_util = PgUtil () is reinitialized in the function, otherwise ssl error and ssl decryption errors will be reported, and the reasons behind them need to be studied!


import time
from multiprocessing import Pool
def run(fn):
 #fn:  Function arguments are the 1 Elements 
 time.sleep(1)
 return fn*fn
if __name__ == "__main__":
 testFL = [1,2,3,4,5,6] 
 print ' Serial :' # Sequential execution ( That is, serial execution, single process )
 s = time.time()
 for fn in testFL:
  run(fn)
 e1 = time.time()
 print " Sequential execution time: ", int(e1 - s)
 print ' Parallel :' # Create multiple processes and execute them in parallel 
 pool = Pool(4) # Create Owns 5 Process pool with the number of processes 
 #testFL: The list of data to process, run : Handling testFL Function of data in list 
 rl =pool.map(run, testFL) 
 pool.close()# Close the process pool and no longer accept new processes 
 pool.join()# The main process blocks and waits for the child process to exit 
 e2 = time.time()
 print " Parallel execution time: ", int(e2-e1)
 print rl
#  Serial :
#  Sequential execution time:  6
#  Parallel :
#  Parallel execution time:  2
# [1, 4, 9, 16, 25, 36]
0

Main program call:


import time
from multiprocessing import Pool
def run(fn):
 #fn:  Function arguments are the 1 Elements 
 time.sleep(1)
 return fn*fn
if __name__ == "__main__":
 testFL = [1,2,3,4,5,6] 
 print ' Serial :' # Sequential execution ( That is, serial execution, single process )
 s = time.time()
 for fn in testFL:
  run(fn)
 e1 = time.time()
 print " Sequential execution time: ", int(e1 - s)
 print ' Parallel :' # Create multiple processes and execute them in parallel 
 pool = Pool(4) # Create Owns 5 Process pool with the number of processes 
 #testFL: The list of data to process, run : Handling testFL Function of data in list 
 rl =pool.map(run, testFL) 
 pool.close()# Close the process pool and no longer accept new processes 
 pool.join()# The main process blocks and waits for the child process to exit 
 e2 = time.time()
 print " Parallel execution time: ", int(e2-e1)
 print rl
#  Serial :
#  Sequential execution time:  6
#  Parallel :
#  Parallel execution time:  2
# [1, 4, 9, 16, 25, 36]
1

Related articles: