Method of Producing Consumer Model Based on PythonAsyncio Module
- 2021-09-11 20:49:45
- OfStack
Keyword Description for asyncio
In the design mode, the production consumer model occupies a very important position, and this model also has many interesting corresponding scenes in the real world, such as the steamed stuffed bun maker and the steamed stuffed bun eater. When the speed of the two does not match, it is necessary to have a model to match (couple), so that the steamed stuffed bun will be consumed in turn.
import asyncio
class ConsumerProducerModel:
def __init__(self, producer, consumer, queue=asyncio.Queue(), plate_size=6): # the plate holds 6pcs bread
self.queue = queue
self.producer = producer
self.consumer = consumer
self.plate_size = plate_size
async def produce_bread(self):
for i in range(self.plate_size):
bread = f"bread {i}"
await asyncio.sleep(0.5) # bread makes faster, 0.5s/pc
await self.queue.put(bread)
print(f'{self.producer} makes {bread}')
async def consume_bread(self):
while True:
bread = await self.queue.get()
await asyncio.sleep(1) # eat slower, 1s/pc
print(f'{self.consumer} eats {bread}')
self.queue.task_done()
async def main():
queue = asyncio.Queue()
cp1 = ConsumerProducerModel("John", "Grace", queue) # group 1
cp2 = ConsumerProducerModel("Mike", "Lucy", queue) # group 2
producer_1 = cp1.produce_bread()
producer_2 = cp2.produce_bread()
consumer_1 = asyncio.ensure_future(cp1.consume_bread())
consumer_2 = asyncio.ensure_future(cp2.consume_bread())
await asyncio.gather(*[producer_1, producer_2])
await queue.join()
consumer_1.cancel()
consumer_2.cancel()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
The production consumer model can be implemented using multithreading and queuing, and the co-threading is chosen here not only because of good performance, but also because the whole logic is clear:
1. First define the initialization, have a queue, have producers, have consumers, and have the size of a plate for bread;
2. Producer: produce the corresponding things (bread) according to the size of the plate, and put the things into the plate (queue);
3. Consumer: Take something from the plate, each time it is a task, and every time the task is completed, it is marked as task_done (calling function). At this level, 1 straight cycle;
4. Main logic: instantiate the production consumer model object, create producer synergy, create tasks (ensure_future), collect synergy results, wait for all threads to end (join), and manually cancel two consumer synergies;
5. Run: First create the event loop, then enter the main logic until it is complete, and close the loop.