Method of Producing Consumer Model Based on PythonAsyncio Module

  • 2021-09-11 20:49:45
  • OfStack

Keyword Description for asyncio

event_loop Event Loop: The program opens an infinite loop, registers some functions to the event loop, and calls the corresponding coprogram function when the event occurs coroutine Synergetic: Synergetic object refers to a function defined by async keyword. Its call will not execute the function immediately, but will return a Synergetic object. Synergetic object needs to be registered in the event loop and called by the event loop. task task: A co-program object is a native function that can be suspended, and the task is a step-by-step encapsulation of co-program, which contains various states of the task future: Represents the results of tasks performed or not performed in the future. There is no essential difference between it and task async/await keywords: async defines a co-routine, await is used to suspend blocking asynchronous call interfaces, and asyncio. coroutine/yield from is used in python3.4

In the design mode, the production consumer model occupies a very important position, and this model also has many interesting corresponding scenes in the real world, such as the steamed stuffed bun maker and the steamed stuffed bun eater. When the speed of the two does not match, it is necessary to have a model to match (couple), so that the steamed stuffed bun will be consumed in turn.


import asyncio

class ConsumerProducerModel:
  def __init__(self, producer, consumer, queue=asyncio.Queue(), plate_size=6): # the plate holds 6pcs bread
    self.queue = queue
    self.producer = producer
    self.consumer = consumer
    self.plate_size = plate_size

  async def produce_bread(self):
    for i in range(self.plate_size):
      bread = f"bread {i}"
      await asyncio.sleep(0.5) # bread makes faster, 0.5s/pc
      await self.queue.put(bread)
      print(f'{self.producer} makes {bread}')

  async def consume_bread(self):
    while True:
      bread = await self.queue.get()
      await asyncio.sleep(1) # eat slower, 1s/pc
      print(f'{self.consumer} eats {bread}')
      self.queue.task_done()

async def main():
  queue = asyncio.Queue()
  cp1 = ConsumerProducerModel("John", "Grace", queue) # group 1
  cp2 = ConsumerProducerModel("Mike", "Lucy", queue) # group 2

  producer_1 = cp1.produce_bread()
  producer_2 = cp2.produce_bread()

  consumer_1 = asyncio.ensure_future(cp1.consume_bread())
  consumer_2 = asyncio.ensure_future(cp2.consume_bread())

  await asyncio.gather(*[producer_1, producer_2])
  await queue.join()
  consumer_1.cancel()
  consumer_2.cancel()

if __name__ == '__main__':
  loop = asyncio.get_event_loop()
  loop.run_until_complete(main())
  loop.close()

The production consumer model can be implemented using multithreading and queuing, and the co-threading is chosen here not only because of good performance, but also because the whole logic is clear:

1. First define the initialization, have a queue, have producers, have consumers, and have the size of a plate for bread;

2. Producer: produce the corresponding things (bread) according to the size of the plate, and put the things into the plate (queue);

3. Consumer: Take something from the plate, each time it is a task, and every time the task is completed, it is marked as task_done (calling function). At this level, 1 straight cycle;

4. Main logic: instantiate the production consumer model object, create producer synergy, create tasks (ensure_future), collect synergy results, wait for all threads to end (join), and manually cancel two consumer synergies;

5. Run: First create the event loop, then enter the main logic until it is complete, and close the loop.


Related articles: