How does Python quickly implement distributed tasks

  • 2020-06-07 04:48:55
  • OfStack

After in-depth reading of the official documents of python, I found that the multiprocessing module with Python has many prefabricated interfaces, which can facilitate the communication between multiple hosts, thus realizing the typical producer-consumer mode of distributed task architecture.

Previously, in order to implement the producer-consumer pattern in Python, an additional queue system, such as rabbitMQ, was often chosen. In addition, you might want to design a set of task objects to be serialized so that they can be queued. If there is no queue support, then it is not excluded that some students have to start from socket server and directly deal with TCP/IP.

multiprocessing. BaseManager provides one such quick interface for developers.

The scenario we assume is a system of 1 producer (producer.py) +8 consumers (worker.py) with a central node responsible for coordination (ES22en.py) as follows:

server.py


from multiprocessing.managers import BaseManager
import Queue

queue = Queue.Queue() # Initialize the 1 a Q , for messaging 
class QueueManager(BaseManager):
  pass

QueueManager.register('get_queue', callable=lambda:queue) #  Publish in the system get_queue This business 

if __name__ == '__main__':
  m = QueueManager(address=('10.239.85.193', 50000),authkey='abr' )
 #  To monitor all 10.239.85.193 the 50000 mouth 
  s = m.get_server()
  s.serve_forever()

worker.py


from multiprocessing.managers import BaseManager
from multiprocessing import Pool


class QueueManager(BaseManager):
 pass

QueueManager.register('get_queue') 

def feb(i): # The classic ' The goat breeding '
  if i < 2: return 1
  if i < 5 : return feb(i-1) + feb(i-2)
  return feb(i-1) + feb(i-2) - feb(i-5)

def worker(i): 
  m = QueueManager(address=('10.239.85.193', 50000), authkey='abr')
# The connection server
  m.connect()
  while True:
    queue = m.get_queue()
#  To obtain Q
   c = queue.get()
 print feb(c)

if __name__ == '__main__':

  p = Pool(8) #  Split process start 8 a worker
  p.map(worker, range(8))
producer.py

from multiprocessing.managers import BaseManager


class QueueManager(BaseManager):
  pass
QueueManager.register('get_queue')


if __name__ == '__main__':
 m = QueueManager(address=('10.239.85.193', 50000), authkey='abr')
 m.connect()
 i = 0
 while True:
   queue = m.get_queue()
   queue.put(48)

   i+=1

Data in the Queue() object is directly encapsulated and passed between hosts via the TCP 50000 port. Note, however, that due to authkey, each node requires version 1 of python.


Related articles: