Operation of python multiprocessing Multi process Parallel Computing
- 2021-09-16 07:29:55
- OfStack
multiprocessing package of python is a multi-process parallel computing package provided by the standard library, which provides API functions similar to threading (multithreading), but assigns tasks to different CPU compared with threading, avoiding the limitation of GIL (Global Interpreter Lock).
Next, we introduce the Pool and Process classes in multiprocessing.
Pool
It is more convenient to use Pool process pool to process tasks in parallel. We can specify the number of parallel CPU, and then Pool will automatically put tasks into the process pool to run. Pool contains several parallel functions.
apply apply_async
apply executes tasks one by one and has been deprecated in python3, while apply_async is an asynchronous execution version of apply. Parallel computation 1 must use apply_async function.
import multiprocessing
import time
from random import randint, seed
def f(num):
seed()
rand_num = randint(0,10) # Randomly generated every time 1 Pause time
time.sleep(rand_num)
return (num, rand_num)
start_time = time.time()
cores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=cores)
pool_list = []
result_list = []
start_time = time.time()
for xx in xrange(10):
pool_list.append(pool.apply_async(f, (xx, ))) # Not here get , Blocks the process
result_list = [xx.get() for xx in pool_list]
# Here, some people have to wonder why not directly in for Direct in loop result.get() What about? This is because pool.apply_async Subsequent statements are blocking execution, calling result.get() Will wait 1 The following tasks will not be assigned until the tasks are executed 1 A mission. In fact, getting the return value is best done after the process pool is reclaimed to avoid blocking subsequent statements.
# Finally, we use 1 The following statement recycles the process pool:
pool.close()
pool.join()
print result_list
print ' Time spent in parallel %.2f' % (time.time() - start_time)
print ' Serial time spent %.2f' % (sum([xx[1] for xx in result_list]))
#[(0, 8), (1, 2), (2, 4), (3, 9), (4, 0), (5, 1), (6, 8), (7, 3), (8, 4), (9, 6)]
# Time spent in parallel 14.11
# Serial time spent 45.00
map map_async
map_async is the asynchronous execution function of map.
Compared with apply_async, map_async can only accept one parameter.
import time
from multiprocessing import Pool
def run(fn):
#fn: Function arguments are the 1 Elements
time.sleep(1)
return fn*fn
if __name__ == "__main__":
testFL = [1,2,3,4,5,6]
print ' Serial :' # Sequential execution ( That is, serial execution, single process )
s = time.time()
for fn in testFL:
run(fn)
e1 = time.time()
print " Sequential execution time: ", int(e1 - s)
print ' Parallel :' # Create multiple processes and execute them in parallel
pool = Pool(4) # Create Owns 5 Process pool with the number of processes
#testFL: The list of data to process, run : Handling testFL Function of data in list
rl =pool.map(run, testFL)
pool.close()# Close the process pool and no longer accept new processes
pool.join()# The main process blocks and waits for the child process to exit
e2 = time.time()
print " Parallel execution time: ", int(e2-e1)
print rl
# Serial :
# Sequential execution time: 6
# Parallel :
# Parallel execution time: 2
# [1, 4, 9, 16, 25, 36]
Process
When using Process, we must pay attention to the fact that Process object is used to create processes, and every process occupies 1 CPU, so the number of processes to be established must be less than or equal to CPU.
If too many processes are started, especially when CPU-intensive tasks are encountered, the efficiency of parallelism will be reduced.
#16.6.1.1. The Process class
from multiprocessing import Process, cpu_count
import os
import time
start_time = time.time()
def info(title):
# print(title)
if hasattr(os, 'getppid'): # only available on Unix
print 'parent process:', os.getppid()
print 'process id:', os.getpid()
time.sleep(3)
def f(name):
info('function f')
print 'hello', name
if __name__ == '__main__':
# info('main line')
p_list = [] # Save Process New process
cpu_num = cpu_count()
for xx in xrange(cpu_num):
p_list.append(Process(target=f, args=('xx_%s' % xx,)))
for xx in p_list:
xx.start()
for xx in p_list:
xx.join()
print('spend time: %.2f' % (time.time() - start_time))
parent process: 11741
# parent process: 11741
# parent process: 11741
# process id: 12249
# process id: 12250
# parent process: 11741
# process id: 12251
# process id: 12252
# hello xx_1
# hello xx_0
# hello xx_2
# hello xx_3
# spend time: 3.04
Interprocess communication
Both Process and Pool support Queues and Pipes types of communication.
Queue queue
Queues follow the principle of first in, first out and can be used between processes.
# 16.6.1.2. Exchanging objects between processes
# Queues
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print q.get() # prints "[42, None, 'hello']"
p.join()
pipe
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print parent_conn.recv() # prints "[42, None, 'hello']"
p.join()
Comparison between queue and pipe
Pipe() can only have two endpoints.
Queue() can have multiple producers and consumers.
When to use them
If you need more than two points to communicate, use a Queue().
If you need absolute performance, a Pipe() is much faster because Queue() is built on top of Pipe().
Reference:
https://stackoverflow.com/questions/8463008/python-multiprocessing-pipe-vs-queue
Shared resources
Multiple processes should avoid sharing resources. In multithreading, we can easily share resources, such as using global variables or passing parameters.
In multi-process situations, the above approach is not appropriate because each process has its own independent memory space.
At this time, we can share resources by sharing memory and Manager.
But doing so increases the complexity of the program and reduces the efficiency of the program because of the need for synchronization.
Shared memory
Shared memory is only applicable to Process class, not for process pool Pool
# 16.6.1.4. Sharing state between processes
# Shared memory
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print num.value
print arr[:]
# 3.1415927
# [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
Manager Class
Manager Class can be used for both Process and process pool Pool.
from multiprocessing import Manager, Process
def f(d, l, ii):
d[ii] = ii
l.append(ii)
if __name__ == '__main__':
manager = Manager()
d = manager.dict()
l = manager.list(range(10))
p_list = []
for xx in range(4):
p_list.append(Process(target=f, args=(d, l, xx)))
for xx in p_list:
xx.start()
for xx in p_list:
xx.join()
print d
print l
# {0: 0, 1: 1, 2: 2, 3: 3}
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3]
Supplement: python program multi-process runtime calculation/multi-process write data/multi-process read data
import time
time_start=time.time()
time_end=time.time()
print('time cost',time_end-time_start , 's')
The unit is seconds, and it can also be converted into other units for output
Note that when writing tests, the function name should start with test, otherwise it will not run.
Problems in multithreading:
1) Multi-threaded storage of data:
def test_save_features_to_db(self):
df1 = pd.read_csv('/home/sc/PycharmProjects/risk-model/xg_test/statis_data/shixin_company.csv')
com_list = df1['company_name'].values.tolist()
# com_list = com_list[400015:400019]
# print 'test_save_features_to_db'
# print(com_list)
p_list = [] # Process list
i = 1
p_size = len(com_list)
for company_name in com_list:
# Create process
p = Process(target=self.__save_data_iter_method, args=[company_name])
# p.daemon = True
p_list.append(p)
# Intermittent execution process
if i % 20 == 0 or i == p_size: # 20 Page processing 1 Times, Finally 1 Page processing residue
for p in p_list:
p.start()
for p in p_list:
p.join() # Wait for the process to end
p_list = [] # Empty the list of processes
i += 1
Summary: When writing to multiple processes, lock is not required, and no return value is required.
Core p = Process (target=self.__save_data_iter_method, args= [company_name]), where target points to a full iteration of multiple processes and arg is the input to that iteration.
Note that the writing args= [company_name] is correct. The original writing: args=company_name, args= (company_name) will report the following error: only one parameter is needed, but 34 parameters are given.
Multi-process outer loop is determined by input, and there are as many loops as there are inputs. Understand p. start and p. join;
def __save_data_iter_method(self, com):
# time_start = time.time()
# print(com)
f_d_t = ShiXinFeaturesDealSvc()
res = f_d_t.get_time_features(company_name=com)
# Whether to break one's promise
shixin_label = res.shixin_label
key1 = res.shixin_time
if key1:
public_at = res.shixin_time
company_name = res.time_map_features[key1].company_name
# print(company_name)
established_years = res.time_map_features[key1].established_years
industry_dx_rate = res.time_map_features[key1].industry_dx_rate
regcap_change_cnt = res.time_map_features[key1].regcap_change_cnt
share_change_cnt = res.time_map_features[key1].share_change_cnt
industry_dx_cnt = res.time_map_features[key1].industry_dx_cnt
address_change_cnt = res.time_map_features[key1].address_change_cnt
fr_change_cnt = res.time_map_features[key1].fr_change_cnt
judgedoc_cnt = res.time_map_features[key1].judgedoc_cnt
bidding_cnt = res.time_map_features[key1].bidding_cnt
trade_mark_cnt = res.time_map_features[key1].trade_mark_cnt
network_share_cancel_cnt = res.time_map_features[key1].network_share_cancel_cnt
cancel_cnt = res.time_map_features[key1].cancel_cnt
industry_all_cnt = res.time_map_features[key1].industry_all_cnt
network_share_zhixing_cnt = res.time_map_features[key1].network_share_zhixing_cnt
network_share_judge_doc_cnt = res.time_map_features[key1].network_share_judge_doc_cnt
net_judgedoc_defendant_cnt = res.time_map_features[key1].net_judgedoc_defendant_cnt
judge_doc_cnt = res.time_map_features[key1].judge_doc_cnt
f_d_do = ShixinFeaturesDto(company_name=company_name, established_years=established_years,
industry_dx_rate=industry_dx_rate, regcap_change_cnt=regcap_change_cnt,
share_change_cnt=share_change_cnt, industry_all_cnt=industry_all_cnt,
industry_dx_cnt=industry_dx_cnt, address_change_cnt=address_change_cnt,
fr_change_cnt=fr_change_cnt, judgedoc_cnt=judgedoc_cnt,
bidding_cnt=bidding_cnt, trade_mark_cnt=trade_mark_cnt,
network_share_cancel_cnt=network_share_cancel_cnt, cancel_cnt=cancel_cnt,
network_share_zhixing_cnt=network_share_zhixing_cnt,
network_share_judge_doc_cnt=network_share_judge_doc_cnt,
net_judgedoc_defendant_cnt=net_judgedoc_defendant_cnt,
judge_doc_cnt=judge_doc_cnt, public_at=public_at, shixin_label=shixin_label)
# time_end = time.time()
# print('totally cost', time_end - time_start)
self.cfdbsvc.save_or_update_features(f_d_do)
def save_or_update_features(self, shixin_features_dto):
"""
Add or update:
Insert 1 Row data , Insert if it does not exist, update if it exists
"""
self._pg_util = PgUtil()
p_id = None
if isinstance(shixin_features_dto, ShixinFeaturesDto):
p_id = str(uuid.uuid1())
self._pg_util.execute_sql(
self.s_b.insert_or_update_row(
self.model.COMPANY_NAME,
{
self.model.ID: p_id,
# Company name
self.model.COMPANY_NAME: shixin_features_dto.company_name,
# Time of dishonesty
self.model.PUBLIC_AT: shixin_features_dto.public_at,
self.model.SHIXIN_LABEL : shixin_features_dto.shixin_label,
self.model.ESTABLISHED_YEARS: shixin_features_dto.established_years,
self.model.INDUSTRY_DX_RATE: shixin_features_dto.industry_dx_rate,
self.model.REGCAP_CHANGE_CNT: shixin_features_dto.regcap_change_cnt,
self.model.SHARE_CHANGE_CNT: shixin_features_dto.share_change_cnt,
self.model.INDUSTRY_ALL_CNT: shixin_features_dto.industry_all_cnt,
self.model.INDUSTRY_DX_CNT: shixin_features_dto.industry_dx_cnt,
self.model.ADDRESS_CHANGE_CNT: shixin_features_dto.address_change_cnt,
self.model.NETWORK_SHARE_CANCEL_CNT: shixin_features_dto.network_share_cancel_cnt,
self.model.CANCEL_CNT: shixin_features_dto.cancel_cnt,
self.model.NETWORK_SHARE_ZHIXING_CNT: shixin_features_dto.network_share_zhixing_cnt,
self.model.FR_CHANGE_CNT: shixin_features_dto.fr_change_cnt,
self.model.JUDGEDOC_CNT: shixin_features_dto.judgedoc_cnt,
self.model.NETWORK_SHARE_JUDGE_DOC_CNT: shixin_features_dto.network_share_judge_doc_cnt,
self.model.BIDDING_CNT: shixin_features_dto.bidding_cnt,
self.model.TRADE_MARK_CNT: shixin_features_dto.trade_mark_cnt,
self.model.JUDGE_DOC_CNT: shixin_features_dto.judge_doc_cnt
},
[self.model.ADDRESS_CHANGE_CNT,self.model.BIDDING_CNT,self.model.CANCEL_CNT,
self.model.ESTABLISHED_YEARS,self.model.FR_CHANGE_CNT,self.model.INDUSTRY_ALL_CNT,
self.model.INDUSTRY_DX_RATE,self.model.INDUSTRY_DX_CNT,self.model.JUDGE_DOC_CNT,
self.model.JUDGEDOC_CNT,self.model.NETWORK_SHARE_CANCEL_CNT,self.model.NETWORK_SHARE_JUDGE_DOC_CNT,
self.model.NETWORK_SHARE_ZHIXING_CNT,self.model.REGCAP_CHANGE_CNT,self.model.TRADE_MARK_CNT,
self.model.SHARE_CHANGE_CNT,self.model.SHIXIN_LABEL,self.model.PUBLIC_AT]
)
)
return p_id
self. _pg_util = PgUtil () is reinitialized in the function, otherwise ssl error and ssl decryption errors will be reported, and the reasons behind them need to be studied!
import time
from multiprocessing import Pool
def run(fn):
#fn: Function arguments are the 1 Elements
time.sleep(1)
return fn*fn
if __name__ == "__main__":
testFL = [1,2,3,4,5,6]
print ' Serial :' # Sequential execution ( That is, serial execution, single process )
s = time.time()
for fn in testFL:
run(fn)
e1 = time.time()
print " Sequential execution time: ", int(e1 - s)
print ' Parallel :' # Create multiple processes and execute them in parallel
pool = Pool(4) # Create Owns 5 Process pool with the number of processes
#testFL: The list of data to process, run : Handling testFL Function of data in list
rl =pool.map(run, testFL)
pool.close()# Close the process pool and no longer accept new processes
pool.join()# The main process blocks and waits for the child process to exit
e2 = time.time()
print " Parallel execution time: ", int(e2-e1)
print rl
# Serial :
# Sequential execution time: 6
# Parallel :
# Parallel execution time: 2
# [1, 4, 9, 16, 25, 36]
0
Main program call:
import time
from multiprocessing import Pool
def run(fn):
#fn: Function arguments are the 1 Elements
time.sleep(1)
return fn*fn
if __name__ == "__main__":
testFL = [1,2,3,4,5,6]
print ' Serial :' # Sequential execution ( That is, serial execution, single process )
s = time.time()
for fn in testFL:
run(fn)
e1 = time.time()
print " Sequential execution time: ", int(e1 - s)
print ' Parallel :' # Create multiple processes and execute them in parallel
pool = Pool(4) # Create Owns 5 Process pool with the number of processes
#testFL: The list of data to process, run : Handling testFL Function of data in list
rl =pool.map(run, testFL)
pool.close()# Close the process pool and no longer accept new processes
pool.join()# The main process blocks and waits for the child process to exit
e2 = time.time()
print " Parallel execution time: ", int(e2-e1)
print rl
# Serial :
# Sequential execution time: 6
# Parallel :
# Parallel execution time: 2
# [1, 4, 9, 16, 25, 36]
1