asyncio Code Details in Python

  • 2021-06-28 09:34:08
  • OfStack

Introduction to asyncio

Those who are familiar with c#may know that it is easy to use async and await in c#to implement asynchronous programming, so what should be done in python. In fact, python also supports asynchronous programming. 1 Generally, asyncio library is used. Here is what asyncio is:

asyncio is a library for writing concurrent code using async/await syntax.asyncio is used as the basis for many high performance Python asynchronous frameworks, including network and website services, database connection libraries, distributed task queues, and so on.asyncio is often the best choice for building IO-intensive and high-level structured network code.

Essential concepts in asyncio

As you can see, using the asyncio library we can also use async and await in python code.There are four basic concepts in asyncio:

Eventloop

Eventloop can be said to be the core of asyncio application, central control, Eventloop instance provides registration, cancellation, task execution and callback methods.Simply put, we can register one of these asynchronous functions on this event loop, which executes these functions (only one at a time). If the currently executing function is waiting for I/O to return, the event loop will suspend its execution to execute other functions.When a function completes I/O, it is restored and will continue executing until the next loop reaches it.

Coroutine

A coequation is essentially a function.


import asyncio
import time
async def a():
 print('Suspending a')
 await asyncio.sleep(3)
 print('Resuming a')
async def b():
 print('Suspending b')
 await asyncio.sleep(1)
 print('Resuming b')
async def main():
 start = time.perf_counter()
 await asyncio.gather(a(), b())
 print(f'{main.__name__} Cost: {time.perf_counter() - start}')
if __name__ == '__main__':
 asyncio.run(main())

Executing the above code, you can see output like this:

Suspending a
Suspending b
Resuming b
Resuming a
main Cost: 3.0023356619999997

For a detailed description of the protocol, you can refer to my previous article, python, in which the protocol was written just like before, requiring decorators, which is obsolete.

Future

Future is an "future" object, similar to promise in javascript, which sets the final result on this Future object when the asynchronous operation ends, and Future is the encapsulation of the protocol.


>>> import asyncio
>>> def fun():
...  print("inner fun")
...  return 111
... 
>>> loop = asyncio.get_event_loop()
>>> future = loop.run_in_executor(None, fun) # Not used here await
inner fun
>>> future # As you can see, fun Method state is pending
<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/futures.py:348]>
>>> future.done() #  Not yet completed 
False
>>> [m for m in dir(future) if not m.startswith('_')]
['add_done_callback', 'cancel', 'cancelled', 'done', 'exception', 'get_loop', 'remove_done_callback', 'result', 'set_exception', 'set_result']
>>> future.result() # If called directly at this time result() Method will fail 
Traceback (most recent call last):
 File "<input>", line 1, in <module>
asyncio.base_futures.InvalidStateError: Result is not set.
>>> async def runfun():
...  result=await future
...  print(result)
...  
>>>loop.run_until_complete(runfun()) # You can also use the  loop.run_until_complete(future)  Execute, just for demonstration await
111
>>> future
<Future finished result=111>
>>> future.done()
True
>>> future.result()
111
Task

In addition to supporting protocols, Eventloop also supports registering two types of objects, Future and Task. Future is the encapsulation of protocols. Future objects provide many task methods (such as callbacks after completion, cancellations, setting task results, etc.), but generally developers do not need to operate on the underlying object, Future.Instead, concurrency is achieved directly through a collaborative scheduling collaboration of Task, a subclass of Future.So what is Task?The following are described:

An object similar to Future that can run the Python protocol.Non-thread safe.The Task object is used to run the protocol in an event loop.If a protocol is waiting for an Future object, the Task object suspends the execution of the protocol and waits for the Future object to complete.Execution resumes when the Future object completes the packaged protocol.Event Loop Collaborative Scheduling: An event loop runs one Task object at a time.An Task object waits for an Future object to complete, and the event loop runs another Task, callback, or IO operation.

Here's how to use it:


>>> async def a():
...  print('Suspending a')
...  await asyncio.sleep(3)
...  print('Resuming a')
...  
>>> task = asyncio.ensure_future(a())
>>> loop.run_until_complete(task)
Suspending a
Resuming a

Some common usage differences in asyncio

Asyncio.gather and asyncio.wait

We've used asyncio.gather in the code above, but there's another usage, asyncio.wait, that allows multiple protocols to execute concurrently. What's the difference?This is described below.


>>> import asyncio
>>> async def a():
...  print('Suspending a')
...  await asyncio.sleep(3)
...  print('Resuming a')
...  return 'A'
... 
... 
... async def b():
...  print('Suspending b')
...  await asyncio.sleep(1)
...  print('Resuming b')
...  return 'B'
... 
>>> async def fun1():
...  return_value_a, return_value_b = await asyncio.gather(a(), b())
...  print(return_value_a,return_value_b)
...  
>>> asyncio.run(fun1())
Suspending a
Suspending b
Resuming b
Resuming a
A B
>>> async def fun2():
...  done,pending=await asyncio.wait([a(),b()])
...  print(done)
...  print(pending)
...  task=list(done)[0]
...  print(task)
...  print(task.result())
...  
>>> asyncio.run(fun2())
Suspending b
Suspending a
Resuming b
Resuming a
{<Task finished coro=<a() done, defined at <input>:1> result='A'>, <Task finished coro=<b() done, defined at <input>:8> result='B'>}
set()
<Task finished coro=<a() done, defined at <input>:1> result='A'>
A

From the code above, we can see the difference between the two:

asyncio.gather collects the results of a protocol and saves the execution results of the corresponding protocol in the order in which it was entered. asyncio.wait returns two values, the first being a completed task list and the second representing a list of tasks to be completed.

asyncio.wait supports accepting a parameter return_when, by default, asyncio.wait waits for all tasks to complete (return_when='ALL_COMPLETED'), which also supports FIRST_COMPLETED (returned when the first protocol is completed) and FIRST_EXCEPTION (returns with the first exception):


>>> async def fun2():
...  done,pending=await asyncio.wait([a(),b()],return_when=asyncio.tasks.FIRST_COMPLETED)
...  print(done)
...  print(pending)
...  task=list(done)[0]
...  print(task)
...  print(task.result())
...  
>>> asyncio.run(fun2())
Suspending a
Suspending b
Resuming b
{<Task finished coro=<b() done, defined at <input>:8> result='B'>}
{<Task pending coro=<a() running at <input>:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10757bf18>()]>>}
<Task finished coro=<b() done, defined at <input>:8> result='B'>
B

1 In general, asyncio.gather is sufficient.

asyncio.create_task and loop.create_task and asyncio.ensure_future

All three methods can create Task, starting with Python 3.7, using a higher-order asyncio.create_task. Actually asyncio.create_task is loop.create_task. loop.create_The parameter accepted by task needs to be a protocol, but asyncio.ensure_In addition to accepting the protocol, future can also be an Future object or an awaitable object:

If the parameter is a protocol, the underlying layer uses loop.create_task, returns the Task object If it is an Future object, it will be returned directly If it is an awaitable object, it will be await_uawait_uMethod, execute ensure_once morefuture, and finally Task or Future.

So ensure_The main purpose of the future method is to ensure that this is an Future object and that in general, asyncio.create_is used directlytask is OK.

Register callbacks and execute synchronization code

You can use add_done_callback to add a successful callback:


def callback(future):
 print(f'Result: {future.result()}')
def callback2(future, n):
 print(f'Result: {future.result()}, N: {n}')
async def funa():
 await asyncio.sleep(1)
 return "funa"
async def main():
 task = asyncio.create_task(funa())
 task.add_done_callback(callback)
 await task
 # This can be callback Pass-through parameters 
 task = asyncio.create_task(funa())
 task.add_done_callback(functools.partial(callback2, n=1))
 await task
if __name__ == '__main__':
 asyncio.run(main())

Execute Synchronization Code

If you have synchronization logic and want to use asyncio for concurrency, what do you need to do?Look below:


def a1():
 time.sleep(1)
 return "A"
async def b1():
 await asyncio.sleep(1)
 return "B"
async def main():
 loop = asyncio.get_running_loop()
 await asyncio.gather(loop.run_in_executor(None, a1), b1())
if __name__ == '__main__':
 start = time.perf_counter()
 asyncio.run(main())
 print(f'main method Cost: {time.perf_counter() - start}')
#  Output:  main method Cost: 1.0050589740000002

have access to run_into_executor To convert the synchronization function logic into a protocol, the first parameter is to pass the concurrent.futures.Executor instance, passing None selects the default executor.

summary


Related articles: