python concurrent programming: multiprocess multithreaded asynchronous and co programming

  • 2020-05-12 02:52:13
  • OfStack

Recently, I learned about python concurrency, so I made a summary of multi-process, multi-threaded, asynchronous, and coroutine.
1. A multithreaded

Multithreading allows multiple controls within a process so that multiple functions are active at the same time, allowing the operations of multiple functions to run simultaneously. Even a single CPU computer can have the effect of running multiple threads simultaneously by constantly switching between instructions from one thread to another.

Multithreading is equivalent to one concurrent (concunrrency) system. Concurrent systems 1 normally perform multiple tasks simultaneously. If multiple tasks can share resources, especially when writing a variable at the same time, it is necessary to solve the synchronization problem, such as multi-threaded train ticketing system: two instructions, one instruction to check whether the ticket is sold out, another instruction, multiple Windows to sell tickets at the same time, may appear to sell tickets that do not exist.

In the case of concurrency, the order in which instructions are executed is determined by the kernel. Within the same thread, instructions are executed in sequence, but it is difficult for instructions between threads to clear which one will be executed first. Therefore, the problem of multithreading synchronization should be considered. Synchronization (synchronization) means that only one thread is allowed to access a resource at a given time.

1. thread module

2. threading module
threading.Thread create 1 thread.

A mutex is used to determine whether there are excess tickets or sold tickets, so that one thread does not just decide that there are no remaining tickets, while another thread sells tickets.


#! /usr/bin/python
#-* coding: utf-8 -*
# __author__ ="tyomcat"
import threading
import time
import os

def booth(tid):
  global i
  global lock
  while True:
    lock.acquire()
    if i!=0:
      i=i-1
      print " window :",tid,", The remaining votes :",i
      time.sleep(1)
    else:
      print "Thread_id",tid,"No more tickets"
      os._exit(0)
    lock.release()
    time.sleep(1)

i = 100
lock=threading.Lock()

for k in range(10):

  new_thread = threading.Thread(target=booth,args=(k,))
  new_thread.start()

2. Coroutine (also known as tasklet, fiber)

Coroutine, unlike preemptive scheduling of threads, is collaborative scheduling. The coroutine is also single-threaded, but it allows non-human code that would otherwise have been written in an asynchronous + callback mode to be written in a seemingly synchronous manner.

1. Coroutines can be implemented by generators (generator) in python.

The first step is to have a solid understanding of generators and yield.

A call to a normal python function usually starts at line 1 of the function and ends with an return statement, exception, or function execution (also considered an implicit return of None).

When the function returns control to the caller, it is all over. Sometimes you can "save your work" by creating a function that produces a sequence, which is the generator (the function that USES the yield keyword).

It is possible to "produce a sequence" because the function does not return as it normally would. The implication of return is that the function is returning control of the executed code to the place where the function was called. The implication of "yield" is that the transfer of control is temporary and voluntary, and that our function will take back control in the future.

Take a look at 1 producer/consumer example:


#! /usr/bin/python
#-* coding: utf-8 -*
# __author__ ="tyomcat"
import time
import sys
#  producers 
def produce(l):
  i=0
  while 1:
    if i < 10:
      l.append(i)
      yield i
      i=i+1
      time.sleep(1)
    else:
      return   
#  consumers 
def consume(l):
  p = produce(l)
  while 1:
    try:
      p.next()
      while len(l) > 0:
        print l.pop()
    except StopIteration:
      sys.exit(0)
if __name__ == "__main__":
  l = []
  consume(l)

When we call p.next () in custom, the program returns to produce yield i to continue execution, so that l append has elements in l, and then we print l.pop (), until p.next () raises an StopIteration exception.

2, Stackless Python

3. greenlet module

The greenlet-based implementation is second only to Stackless Python in performance, roughly twice as slow as Stackless Python, and nearly an order of magnitude faster than other schemes. In fact, greenlet is not a real concurrency mechanism. Instead, it switches between the execution blocks of different functions within the same thread, implementing "you run 1 will, I run 1 will", and must specify when and where to switch when switching.

4. eventlet module

3. Multiple processes
1. Child process (subprocess package)

In python, through the subprocess package, fork1 subprocess, and run external programs.

The os module is the first to be considered when calling the system's commands. Use os.system () and os.popen (). However, these two commands are too simple to perform complex operations, such as providing input to a running command or reading the output of a command, determining the running state of the command, managing the parallelism of multiple commands, and so on. At this point, the Popen command in subprocess can effectively complete the operation we need


>>>import subprocess
>>>command_line=raw_input()
ping -c 10 www.baidu.com
>>>args=shlex.split(command_line)
>>>p=subprocess.Popen(args)

subprocess.PIPE is used to connect the input and output of multiple sub-processes in one place to form a pipeline (pipe):


import subprocess
child1 = subprocess.Popen(["ls","-l"], stdout=subprocess.PIPE)
child2 = subprocess.Popen(["wc"], stdin=child1.stdout,stdout=subprocess.PIPE)
out = child2.communicate()
print(out)

The communicate() method reads the data from stdout and stderr and enters it into stdin.

2. Multi-process (multiprocessing package)

(1) multiprocessing package is the multi-process management package in Python. Similar to threading.Thread, it can create a process using the multiprocessing.Process object.

Process pools (Process Pool) can create multiple processes.

apply_async(func,args) takes 1 process from the process pool to execute func, args being the parameter of func. It will return an AsyncResult object on which you can call the get() method to get the result.

The close() process pool no longer creates new processes

All processes in the join() wait process pool. You must call the close() method on Pool to join.


#! /usr/bin/env python
# -*- coding:utf-8  -*-
# __author__ == "tyomcat"
# " My computer does 4 a cpu"

from multiprocessing import Pool
import os, time

def long_time_task(name):
  print 'Run task %s (%s)...' % (name, os.getpid())
  start = time.time()
  time.sleep(3)
  end = time.time()
  print 'Task %s runs %0.2f seconds.' % (name, (end - start))

if __name__=='__main__':
  print 'Parent process %s.' % os.getpid()
  p = Pool()
  for i in range(4):
    p.apply_async(long_time_task, args=(i,))
  print 'Waiting for all subprocesses done...'
  p.close()
  p.join()
  print 'All subprocesses done.'

(2) multiple processes share resources

Use Shared memory and Manager objects: use one process as a server and build Manager to actually store resources.

Other processes can access Manager by parameter passing or by address, establish a connection, and manipulate resources on the server.


#! /usr/bin/env python
# -*- coding:utf-8  -*-
# __author__ == "tyomcat"

from multiprocessing import Queue,Pool
import multiprocessing,time,random

def write(q):

  for value in ['A','B','C','D']:
    print "Put %s to Queue!" % value
    q.put(value)
    time.sleep(random.random())


def read(q,lock):
  while True:
    lock.acquire()
    if not q.empty():
      value=q.get(True)
      print "Get %s from Queue" % value
      time.sleep(random.random())
    else:
      break
    lock.release()

if __name__ == "__main__":
  manager=multiprocessing.Manager()
  q=manager.Queue()
  p=Pool()
  lock=manager.Lock()
  pw=p.apply_async(write,args=(q,))
  pr=p.apply_async(read,args=(q,lock))
  p.close()
  p.join()
  print
  print " All data is written and read "

4. The asynchronous

When blocking occurs, the performance will be greatly reduced and the potential of CPU cannot be fully utilized. As a result, hardware investment will be wasted. More importantly, the software modules will become rigid, tightly coupled and unable to be cut, which is not conducive to future expansion and change.

Whether it is a process or a thread, each block or switch involves a system call (system call). CPU runs the operating system's scheduler first, and then the scheduler decides which process (thread) to run. Multiple threads need to lock each other when accessing mutually exclusive code,

The popular asynchronous server is event-driven (e.g. nginx).

In the asynchronous event-driven model, the blocking operation is transformed into an asynchronous operation, and the main thread is responsible for initiating the asynchronous operation and processing the results of the asynchronous operation. Because all blocking operations are turned into asynchronous operations, the main thread theoretically spends most of its time dealing with actual computing tasks, eliminating the multithreaded scheduling time, so the performance of this model is generally better.


Related articles: