当前位置:网站首页>Asyncio concept and usage

Asyncio concept and usage

2022-07-07 16:15:00 Tan sheep

original text :asyncio Concept and usage
Statement : This article aims at python3.4 Later versions of , Because from 3.4 It was introduced at the beginning asyncio, hinder 3.5 3.6 3.7 Version is forward compatible , There is only a slight change in grammar . For example 3.4 Used in version @asyncio.coroutine Decorators and yield from sentence , But in 3.5 Later versions will use asyncawait Two keywords replace , Although there are slight differences in grammar , But the principle is the same . This article explains python asyncio Some core concepts behind , Briefly analyzed asyncio Design framework , The application example is given python Conduct asyncio General templates for asynchronous programming .

One 、 Some of the most important concepts

Most of the other concurrency models are written in a linear way . And depends on the language runtime system or the underlying threads or processes of the operating system to change the context appropriately , And based on asyncio The application requires the processing context switch of application code display .

asyncio Provides a framework to loop through events (event loop) Centered , Program opens an infinite loop , The program registers some functions on the event loop . When the satisfaction event occurs , Call the corresponding coroutine function .

1. coroutines (coroutine)—— The essence is a function

So-called “ coroutines ” It's just a function , This function needs to have two basic components :

  • First of all , Need to use @asyncio.coroutine Decorate ;
  • second , There must be yield from Back to generator, Or use yield from Return another collaboration object .

Of course , These two conditions are not mandatory , Without these two conditions , It's still a function , It's just an ordinary function .

How to judge whether a function is a co process ? adopt asyncio.iscoroutine(obj) and asyncio.iscoroutinefunction(func) To judge , return True, It is .

import asyncio

async def time():
    asyncio.sleep(3)

t = time()
asyncio.iscoroutine(t)
asyncio.iscoroutinefunction(time)

It's all back True.

2. The event loop ——event_loop

Event loop is an effective way to deal with multi concurrency , In Wikipedia it's described as 「 A programming architecture that waits for a program to assign events or messages 」, We can define event loops to simplify the use of polling to monitor events , The popular saying is 「 When A occurs , perform B」. Event recycling poller object , So that programmers do not have to control the addition of tasks 、 Delete and event control . The event loop uses the callback method to know what happened . It is asyncio Provided 「 Central processing equipment 」, The following operations are supported :

  • register 、 Execute and cancel deferred calls ( Overtime )
  • To create a server and client that can be used for multiple types of communication Transports
  • Start the process and related and external communication programs Transports
  • Delegate time-consuming function calls to a thread pool
  • Single thread ( process ) The architecture also avoids multithreading ( process ) The problem of modifying variable state locks .

The application that interacts with the event loop will register the code that will run as shown , Let the event loop make the necessary calls to the application code when resources are available . Such as : No more data can be read from a socket , Then the server will give all control to the event loop .

3. What is? awaitable object —— You can pause the waiting object

There are three types of objects that can wait , namely coroutinesTasks and Futures.

  • coroutine: It's essentially a function , Take the previous generator yield and yield from Based on ;
  • Tasks: Mission , seeing the name of a thing one thinks of its function , Is to finish something , In fact, it is to further encapsulate the covariance function ;
  • Future: It's a “ Lower level ” The concept of , It represents the final result of a one-step operation , Because one-step operation is generally used for time-consuming operations , The result will not be immediately , Will be in “ future ” Get the result of asynchronous operation , Therefore, it is named Future.

The relationship among the three ,coroutine It can be automatically encapsulated into task, and Task yes Future Subclasses of .

4. What is? task Mission

task yes Future A subclass of , It knows how to package and manage the execution of an agreement . When the resources required by the task are available , The event cycle will schedule tasks to allow , And generate a result , So it can be consumed by other processes .

As mentioned earlier ,Task Concurrency for concurrent scheduling , That is, the further packaging of the covariance function . Then why do you need packaging ? Because a simple coprocessor function is just a function , Package it as a task , Tasks can contain various states , The most important thing of asynchronous programming is to control the asynchronous operation state .

5. What is? future?

future It's a data structure , Indicates the unfinished work result . Event loop can be monitored Future Is the object complete . This allows one part of the application to wait for another part to complete some work .

Future It is a lower level waiting (awaitable) object , What he represents is the end result of asynchronous operation , When one Future When the object is waiting , Xie Cheng will always wait , until Future The calculation has been completed .

Future yes Task Parent class of , In general , Don't worry about the detailed differences between them .

return future A good example of a low-level function of an object is loop.run_in_executor().

Two 、asyncio Basic architecture

asyncio Divided into high-level API And the lower levels API, We can all use , What we said earlier Coroutine and Tasks Belong to high level API, and Event Loop and Future It belongs to the lower level API. Of course asyncio The functions involved are far more than this , The so-called high-level API Mainly refers to those asyncio.xxx() Methods .

Below is the top API And the lower levels API Overview of :

One . Some common high-level API Method

1. Run asynchronous coroutines asyncio.run

asyncio.run(coro, *, debug=False)  # Run a one-step program ,python3.7  What's new 

Coroutines function , It is not called and run directly like ordinary functions , Must be added to the event loop , Then run by the event loop , Running a coprocessor function alone will not produce results , Take a simple example :

import time
import asyncio
 
async def say_after_time(delay,what):
    await asyncio.sleep(delay)
    print(what)
    
async def main():
    print(f" The starting time is :{
      time.time()}")
    await say_after_time(1, "hello")
    await say_after_time(2, "world")
    print(f" The end time is :{
      time.time()}")
    
loop=asyncio.get_event_loop()    # Create an event loop object 
#loop=asyncio.new_event_loop() # The same thing as above , Create a new event loop 
loop.run_until_complete(main())  # Run the coprocessor function through the event loop object 
loop.close()

If we execute a coprocessor function alone like an ordinary function , Only one... Will return coroutine object . As shown below :

>>> main()
<coroutine object main at 0x000001ED74F89040>

Several ways to get the event loop object :

The following ways can be used to obtain 、 Set up 、 Create an event loop object loop

  • loop=asyncio.get_running_loop() return ( obtain ) The event loop running in the current thread , If there is no running event loop , An error... Will be displayed ; It is python3.7 Newly added in .
  • loop=asyncio.get_event_loop() Get an event loop , If the current thread has no event loop , Then create a new event loop loop.
  • loop=asyncio.set_event_loop(loop) Set an event loop as the event loop of the current thread .
  • loop=asyncio.new_event_loop() Create a new event loop .

There are two ways to run a coprocessor function through an event loop :

  • Mode one : Create an event loop object loop, namely asyncio.get_event_loop(), Run the coprocessor function through the event loop
  • Mode two : Directly through asyncio.run(function_name) Run the coprocessor function . But here's the thing , First run The function is python3.7 Version newly added , There is no previous version ; secondly , This run function Always create a new event loop and run Close the event loop after the end , therefore , If there is already an event loop in the same thread , You can't use this function anymore , Because the same thread cannot have two event loops , And this run Function cannot be run twice at the same time , Because he has created one . That is, multiple event loops are not allowed in the same thread loop Of .

asyncio.run() yes python3.7 What's new , It is also the way to run tasks recommended later , Because it is high-rise API, Later we will talk about it and asyncio.run_until_complete() The difference ,run_until_complete() It is relatively low API.

Be careful : What exactly is an event cycle ? How to understand ?

It can be understood in this way : Threads have been constantly swimming among various co process methods , Meet a yield from perhaps await Just hang it , Then go to another method , Go ahead one by one , Know that all methods of the event loop have been executed . actually loop yes BaseEventLoop An example of , We can see the definition , What methods can it call .

2. Create tasks asyncio.create_task

(1) Create tasks ( The two methods ):

  • Method 1 :task = asyncio.create_task(coro()) # This is a 3.7 Version newly added
  • Method 2 :task = asyncio.ensure_future(coro())

You can also use

loop.create_future()
loop.create_task(coro)
remarks :

loop.create_task The accepted parameter needs to be a coroutine , however asyncio.ensure_future In addition to accepting the process , It can also be Future Object or awaitable object :

  1. If the parameter is a co process , In fact, the bottom layer is still used loop.create_task, return Task Yes .
  2. If it is Future Object will return directly .
  3. If it's a awaitable The object will await Of this object __await__ Method , Execute it again ensure_future, Finally back to Task perhaps Future.

So it's like ensure_future The name says , Make sure this is a Future object :Task yes Future Subclass , As mentioned above, developers generally do not need to create their own Future

In fact, what I said before asyncio.wait and asyncio.gather It's all used in it asyncio.ensure_future. For the vast majority of scenarios, concurrent execution is required , So just use asyncio.create_task That's enough .

(2) How to get a task :

  • Method 1 :task=asyncio.current_task(loop=None)
    Returns a specified loop in , Currently running tasks , If no task is running , Then return to None; If loop by None, The default is to get .

  • Method 2 :asyncio.all_tasks(loop=None)
    Return to a loop The unfinished task in . It should be noted that , Pass in ensure_future() Of coroutine Will not start immediately , It needs to be used somewhere await Statement operation created task It will be executed only when .

3. sleep asyncio.sleep

await asyncio.sleep(delay, result=None, *, loop=None)

This function represents : The current task ( Coroutines function ) How long do you sleep , And allow other tasks to execute . This is it with time.sleep() The difference between ,time.sleep() Is the current thread resting .

In addition, if parameters are provided result, When the current task ( coroutines ) At the end , It will be returned ;

loop Parameters will be in 3.10 Remove .

4. Waiting for multiple coprocessor functions asyncio.wait

await asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

Concurrent execution aws The waiting objects in the iteratable objects enter the blocking state until they are all satisfied return_when Conditions .

The first parameter aws It's a collection , Write it as a set set In the form of , such as :
{func(),func(),func3()}
It represents a series of coprocessor functions or tasks , The coordination process will be automatically packaged as a task . in fact , It is also possible to write in the form of a list .

Be careful : The return value of this function is two Tasks/Futures Set :(done, pending)

  • done It's a collection , A task that has been completed tasks;
  • pending It's also a collection , Indicates a task that has not been completed .

The common usage is :done, pending = await asyncio.wait(aws)

Parameter interpretation :

  • timeout (a float or int), The same meaning as above , It should be noted that , This will not trigger asyncio.TimeoutError abnormal , If it comes to timeout There are still tasks that have not been completed , Those that have not been implemented tasks and futures Will be returned to the second set pending Inside .
  • return_when Parameters , seeing the name of a thing one thinks of its function , What he means is , When wait The function returns the value . You can only go to the following values :
    • FIRST_COMPLETED:first_completes. When any one task Or is it future Complete or cancel ,wait Function returns
    • FIRST_EXCEPTION : When any one task Or is it future An exception was triggered , Just go back to ,. If it's all task and future No exception is triggered , Is equivalent to the following ALL_COMPLETED.
    • ALL_COMPLETED: When all task Or is it future When both are completed or cancelled , Back again .

Example a :

import asyncio
import time
 
a=time.time()
 
async def sleep1():  # about 1 second 
   print("sleep1 begin")
   await asyncio.sleep(1)
   print("sleep1 end")
 
async def sleep2():  # about 2 second 
    print("sleep2 begin")
    await asyncio.sleep(2)
    print("sleep2 end")
 
async def sleep3():  # about 3 second 
    print("sleep3 begin")
    await asyncio.sleep(3)
    print("sleep3 end")
 
async def main():   # Entry function 
    done,pending=await asyncio.wait({
    sleep1(),sleep2(),sleep3()},return_when=asyncio.FIRST_COMPLETED)
    for i in done:
        print(i)
    for j in pending:
        print(j)
 
asyncio.run(main()) # Run the entry function 
b=time.time()
print('---------------------------------------')
print(b-a)

The running result is :

sleep3 begin
sleep1 begin
sleep2 begin
sleep1 end
<Task finished name='Task-3' coro=<sleep1() done, defined at /data/py_code/asyncio_example/aio_wait.py:6> result=None>
<Task pending name='Task-4' coro=<sleep2() running at /data/py_code/asyncio_example/aio_wait.py:13> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f6133a81c10>()]>>
<Task pending name='Task-2' coro=<sleep3() running at /data/py_code/asyncio_example/aio_wait.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f6133a81bb0>()]>>
---------------------------------------
1.0069866180419922

As can be seen from the above ,sleep1() The operation is over ,sleep2() and sleep3() Isn't over .

Example 2 :

import asyncio
 
async def num(n):
    try:
        await asyncio.sleep(n*0.1)
        return n
    except asyncio.CancelledError:
        print(f" Numbers {
      n} Be cancelled ")
        raise
 
async def main():
    tasks = [num(i) for i in range(10)]
    complete, pending = await asyncio.wait(tasks, timeout=0.5)
    for i in complete:
        print(" Current number ",i.result())
    if pending:
        print(" Cancel unfinished tasks ")
        for p in pending:
            p.cancel()
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

The running result is :

 Current number  2
 Current number  1
 Current number  0
 Current number  3
 Current number  4
 Cancel unfinished tasks 
 Numbers 5 Be cancelled 
 Numbers 6 Be cancelled 
 Numbers 8 Be cancelled 
 Numbers 9 Be cancelled 
 Numbers 7 Be cancelled 

You can find The results are not displayed in numerical order , In the internal wait() Use one set Save the Task example . because set It's out of order, so that's why our tasks aren't sequential . wait The return value of is a tuple , Includes two sets , Indicate completed and unfinished tasks respectively .wait The second parameter is a timeout value .

When this timeout is reached , Unfinished task status changes to pending, If not called cancel Method cancel task , When the program exits and the task is not completed, you will see the following error prompt .

Task was destroyed but it is pending!
task: <Task pending name='Task-6' coro=<num() running at py_code/asyncio_example/aio_wait2.py:5> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f07ec732e20>()]>>
Task was destroyed but it is pending!
task: <Task pending name='Task-9' coro=<num() running at py_code/asyncio_example/aio_wait2.py:5> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f07ec732eb0>()]>>
Task was destroyed but it is pending!
task: <Task pending name='Task-10' coro=<num() running at py_code/asyncio_example/aio_wait2.py:5> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f07ec732ee0>()]>>
Task was destroyed but it is pending!
task: <Task pending name='Task-11' coro=<num() running at py_code/asyncio_example/aio_wait2.py:5> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f07ec732f10>()]>>

At this point, you can call cancel Method cancel task . That's the code

    if pending:
        print(" Cancel unfinished tasks ")
        for p in pending:
            p.cancel()

And wait_for() Different ,wait() Waiting objects are not cancelled when a timeout occurs .

5. Running multiple tasks concurrently asyncio.gather

await asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)

It's also awaitable Of .

*coros_or_futures It is a sequence splitting operation , If it is a coprocessor function , Will be automatically converted to Task. When all the tasks are finished , The returned result is in the form of a list , The order of values in the list and *coros_or_futures The order of completion is the same .

return_exceptions=False, This is his default , The first mission with abnormal departure will return immediately , Then other tasks continue ;
return_exceptions=True, For tasks where exceptions have occurred , It will also be like successfully executing the task , Wait until all tasks are completed and return the wrong results to the final result list .

If gather() Itself is cancelled , Then the task bound in it will be canceled .

gather Use :

gather Function and wait Similar differences .

  • gather Task cannot be cancelled .
  • The return value is a list of results
  • You can follow the order of the incoming parameters , Sequential output .

Let's change the code above to gather The way

import asyncio
 
async def num(n):
    try:
        await asyncio.sleep(n * 0.1)
        return n
    except asyncio.CancelledError:
        print(f" Numbers {
      n} Be cancelled ")
        raise
 
async def main():
    tasks = [num(i) for i in range(10)]
    complete = await asyncio.gather(*tasks)
    for i in complete:
        print(" Current number ", i)
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

Output :

 Current number  0
 Current number  1
 Current number  2
 Current number  3
 Current number  4
 Current number  5
 Current number  6
 Current number  7
 Current number  8
 Current number  9

gather Usually used for a staged operation , Do the first step to do the second step , Like the following

 Completion of the second phase 
 Time use at this time  2.003401517868042
 Phase I complete 
 Time use at this time  5.003954172134399
5
2
 Total use time  5.004146337509155

We can get the following conclusion through the above results :

  • step1 and step2 It runs in parallel .
  • gather Will wait for the most time-consuming one to complete before returning the result , The total time taken depends on the one with the longest task .

6. Prevent task cancellation asyncio.shield

await asyncio.shield(*arg, *, loop=None)

It's also awaitable. seeing the name of a thing one thinks of its function ,shield For shielding 、 Protection means , That is to protect one awaitable Object prevent cancellation , In general, it is not recommended to use , And in the process of using , Best use try Statement blocks are better .

try:
   res = asyncio.shield(something())
except asyncio.CancelledError:
   res = None

7. Set up timeout asyncio.wait_for

await asyncio.wait_for(aw, timeout, *, loop=None)

wait for aw You can wait for the object to complete , Appoint timeout Seconds after timeout .

  • If aw It's a coprocessor function , It will be automatically packaged as a task task.
  • wait for aw You can wait for the object to complete , Appoint timeout Timeout after seconds .
  • If aw It's a collaborative process , It will automatically be added to the schedule as a task .
  • timeout It can be for None, It can also be for float or int Type value indicates the number of waiting seconds . If timeout by None, Wait until... Is completed .
  • If a timeout occurs , The task will be cancelled and raise asyncio.TimeoutError.
  • Avoid task cancellations , You can add shield().
  • The function will wait until the target object is actually cancelled , So the total waiting time may exceed timeout Specified number of seconds .
  • If the wait is cancelled , be aw The specified object will also be cancelled .
  • loop Parameter is obsolete , Plan in Python 3.10 Remove .

See the example below :

import asyncio
 
async def eternity():
   print(' I will start to implement it immediately ')
   await asyncio.sleep(3600)  # The current task is dormant 1 Hours , namely 3600 second 
   print(' Finally it's my turn ')
 
async def main():
   # Wait for at most 1 second
    try:
        print(' Wait for you 3 Seconds ')
        await asyncio.wait_for(eternity(), timeout=3)  # rest 3 Seconds to perform the task 
    except asyncio.TimeoutError:
        print(' It's overtime !')
 
asyncio.run(main())

The running result is :

 Wait for you 3 Seconds 
 I will start to implement it immediately 
 It's overtime !

First call main() function , As an entry function , When the output ‘ Wait for you 3 Seconds ’ after ,main Hang up , perform eternity, And then print ‘ I will start to implement it immediately ’, then eternity Hang up , And hang 3600 second , Greater than 3, This is the trigger TimeoutError. Revise it :

import asyncio
    
async def eternity():
    print(' I will start to implement it immediately ')
    await asyncio.sleep(2)  # The current task is dormant 2 Second ,2<3
    print(' Finally it's my turn ')
    
async def main():
    # Wait for at most 1 second
    try:
        print(' Wait for you 3 Seconds ')
        await asyncio.wait_for(eternity(), timeout=3)  # Here you are. 3 Seconds to perform your task 
    except asyncio.TimeoutError:
        print(' It's overtime !')
    
asyncio.run(main())

The running result is :

 Wait for you 3 Seconds 
 I will start to implement it immediately 
 Finally it's my turn 

summary : When an asynchronous operation takes more than wait_for Set up timeout, It will trigger an exception , So when you write a program , If you want to set the asynchronous operation timeout, Be sure to choose the right , If the asynchronous operation itself takes a long time , And you set timeout Too short , It will involve that she has not finished , Throw an exception .

8. asyncio.as_completed() function

as_complete It's a generator , A task list specified will be managed , And generate their results . One result at a time is generated at the end of each process . And wait equally ,as_complete There is no guarantee of order , However, there is no need to wait before performing other actions, so the background operation is finished .

asyncio.as_completed(aws, *, loop=None, timeout=None)

The first parameter aws: Same as above , It's a collection {} The elements in the collection are coroutine、task perhaps future
The third parameter timeout: The meaning is the same as that mentioned above

import asyncio
import time
 
a=time.time()
 
async def sleep5():
    print("sleep5 begin")
    await asyncio.sleep(5)  # about 5 second 
    print("sleep5 end")
    return ' ha-ha 5'
  
async def sleep3():
   print("sleep3 begin")
   await asyncio.sleep(3) # about 3 second 
   print("sleep3 end")
   return ' ha-ha 3'
 
async def sleep4():
   print("sleep4 begin")
   await asyncio.sleep(4) # about 4 second 
   print("sleep4 end")
   return ' ha-ha 4'
 
async def main():
   s=asyncio.as_completed({
    sleep5(),sleep3(),sleep4()})
   for f in s:
       result=await f
       print(result)
   
asyncio.run(main())
b=time.time()
print('---------------------------------------')
print(b-a)

The running result is :

sleep4 begin
sleep5 begin
sleep3 begin
sleep3 end
 ha-ha 3
sleep4 end
 ha-ha 4
sleep5 end
 ha-ha 5
---------------------------------------
5.003020286560059

Conclusion :asyncio.as_completed() The function returns an iteratable (iterator) The object of , Each element of an object is a future object , A lot of friends said , Isn't this equivalent to unchanged ? In fact, I returned future Sets are for parameters future Set regroup , The order of combination is , The coprocessor function that executes first (coroutine、task、future) First return , From the above code we can see that , Parameter is
aws={ {sleep5(),sleep3(),sleep4()}}, because sleep5 About the cost 5 second 、sleep3 About the cost 3 second 、sleep4 About the cost 4 second . The result returned is
s={ {sleep3(),sleep4(),sleep5()}}, because sleep3() Shortest time , So put it in front ,sleep5 The longest time , So put it at the back . Then for the returned set s Start the iteration .

Example 2 :

import asyncio
import time
 
async def foo(n):
    print('Waiting: ', n)
    await asyncio.sleep(n)
    return n
 
async def main():
    coroutine1 = foo(1)
    coroutine2 = foo(2)
    coroutine3 = foo(4)
 
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    for task in asyncio.as_completed(tasks):
        result = await task
        print('Task ret: {}'.format(result))
 
now = lambda : time.time()
start = now()
 
loop = asyncio.get_event_loop()
done = loop.run_until_complete(main())
print(now() - start)

Output

Waiting:  1
Waiting:  2
Waiting:  4
Task ret: 1
Task ret: 2
Task ret: 4
4.004435062408447

Results can be found and output one by one .

9. Calling common functions in association call_soon, call_later, call_at

In the process of cooperation, you can call ordinary functions through some methods . The keywords you can use are call_soon,call_later,call_at.

call_soon

You can literally call back immediately .

loop.call_soon(callback, *args, context=None)

Call the callback function immediately in the time loop of the next iteration , Most callback functions support location parameters , Without support ” Key parameters ”, If it is Want to use keyword parameters , It is recommended to use functools.aprtial() Further packaging of the method . Optional keywords context Allows you to specify a custom callback to run contextvars.Context. Use the current context when no context is provided . stay Python 3.7 in , asyncio Context support is added to the process . Context can be used to implicitly pass variables in some scenarios , For example, database connection session etc. , Instead of having to explicitly pass these variables in all method calls .

Let's take a look at specific examples .

import asyncio
import functools
 
def callback(args, *, kwargs="defalut"):
    print(f" Normal function as callback function , To obtain parameters :{
      args},{
      kwargs}")
 
async def main(loop):
    print(" register callback")
    loop.call_soon(callback, 1)
    wrapped = functools.partial(callback, kwargs="not defalut")
    loop.call_soon(wrapped, 2)
    await asyncio.sleep(0.2)
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main(loop))
finally:
    loop.close()

Output results

 register callback
 Normal function as callback function , To obtain parameters :1,defalut
 Normal function as callback function , To obtain parameters :2,not defalut

Through the output, we can find that we have successfully called a normal function in the cooperation process , Printed in sequence 1 and 2.

Sometimes we don't want to call a function immediately , Now we can call_later Delay to call a function .

call_later

loop.call_later(delay, callback, *args, context=None)

First of all, let's talk about what it means , It's the cycle of events delay How long will it take callback function . Match the above call_soon Let's take a small example :

import asyncio
 
def callback(n):
    print(f"callback {
      n} invoked")
 
async def main(loop):
    print(" register callbacks")
    loop.call_later(0.2, callback, 1)
    loop.call_later(0.1, callback, 2)
    loop.call_soon(callback, 3)
    await asyncio.sleep(0.4)
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(loop))
    finally:
        loop.close()

Output

 register callbacks
callback 3 invoked
callback 2 invoked
callback 1 invoked

Through the above output, we can get the following results :

  1. call_soon Will be in call_later Before execution , It doesn't matter where it is
  2. call_later The smaller the first parameter of , Execute first .

call_at

loop.call_at(when, callback, *args, context=None)

call_at The meaning of the first parameter represents a monotonous time , It's a little different from the system time we usually talk about , The time here refers to the internal time of the event cycle , Can pass loop.time() obtain , Then we can operate on this basis . The parameters in the back are the same as the two methods in the front . actually call_later Internally called call_at.


import asyncio
 
def call_back(n, loop):
    print(f"callback {
      n}  Point in time {
      loop.time()}")
 
async def main(loop):
    now = loop.time()
    print(" Current internal time ", now)
    print(" Cycle time ", now)
    print(" register callback")
    loop.call_at(now + 0.1, call_back, 1, loop)
    loop.call_at(now + 0.2, call_back, 2, loop)
    loop.call_soon(call_back, 3, loop)
    await asyncio.sleep(1)
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        print(" Enter event loop ")
        loop.run_until_complete(main(loop))
    finally:
        print(" Turn off the cycle ")
        loop.close()

Output :

 Enter event loop 
 Current internal time  21494.175900164
 Cycle time  21494.175900164
 register callback
callback 3  Point in time 21494.1760417
callback 1  Point in time 21494.276630947
callback 2  Point in time 21494.377943985
 Turn off the cycle 

because call_later Internal implementation is through call_at Realization .

10. Process queue asyncio.Queue

mport asyncio
import random
 
async def product(queue,n):
    for x in range(n):
        print('producing {}/{}'.format(x,n))
        await asyncio.sleep(random.random())
        item = str(x)
        await queue.put(item)
    
async def consume(queue):
    while True:
        item = await queue.get()
        print('consuming{}...'.format(item))
        await asyncio.sleep(random.random())
        #  Notify the queue that the item has been processed 
        queue.task_done()
 
async def main(n):
    queue = asyncio.Queue(maxsize=4)
    #  here consume Method doesn't really start running 
    consumer = asyncio.ensure_future(consume(queue))
    #  here produce After the production ,consume It starts to run 
    await product(queue,n)
    #  Wait until the consumer has processed all the items 
    await queue.join()
    #  Consumers are still waiting for goods , Cancel it 
    consumer.cancel()
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(10))

Procedure passed produce Method to 10 Put tasks into Queue in , adopt consume Methods for consumption recycling .
Here it is , Want to emphasize about Queue.put_nowait Method , The official explanation is :

Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull.
It means that item Put in non blocking queue in , If no slots are available ( That is, there is no storage space ), Throw out QueueFull abnormal . Change the above procedure :

import asyncio
import random
 
async def produce(queue,n):
    for x in range(n):
        print('producing {}/{}'.format(x,n))
        await asyncio.sleep(random.random())
        item = str(x)
        # Replace  await queue.put(item)
        queue.put_nowait(item)
        # Print current queue Inside item Storage quantity 
        print('qsize:',queue.qsize())
    
async def main(n):
    queue = asyncio.Queue(maxsize=3)
    await produce(queue, n)
    
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(7))
    loop.close()

take await put Method replaced with put_nowait Method , take Queue The capacity of is set to 3 individual , The production task is set to 6 individual . The operation results are as follows :

producing 0/7
qsize: 1
producing 1/7
qsize: 2
producing 2/7
qsize: 3
producing 3/7
Traceback (most recent call last):
  File "/data/code/py_code/asyncio_example/aio_queue1.py", line 20, in <module>
    loop.run_until_complete(main(7))
  File "/data/python3/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/data/code/py_code/asyncio_example/aio_queue1.py", line 16, in main
    await produce(queue, n)
  File "/data/code/py_code/asyncio_example/aio_queue1.py", line 10, in produce
    queue.put_nowait(item)
  File "/data/python3/lib/python3.8/asyncio/queues.py", line 148, in put_nowait
    raise QueueFull
asyncio.queues.QueueFull

It can be seen that , In production 4 When it's a mission , because Queue The capacity of 3 individual , So throw QueueFull error , Meet the expected requirements .

Be careful , The above program only uses produce Method , There is no call consume Method , Because we call produce, While calling consume when ,consume Method will recycle the completed task . It is equivalent to the outlet pipe pumping water out of the pool , And the water inlet pipe will pump water into , It may never be full ( Throw QueueFull abnormal ).

11. Coroutine lock asyncio.Lock

When multiple functions call the same asynchronous function , The called function may be executing the same parameter object ;

For example, more than one func() Call the same containing request Asynchronous function of function , Its requested url It could be the same url, There will be repeated requests , This is not allowed in actual production ;

You need to use asyncio Built in Lock, Guarantee url Not called repeatedly , Here Lock It is implemented at the application level , Not as deep into the operating system as thread lock ; The sample code is as follows :

import aiohttp
import asyncio
 
cache = {
    }
lock = asyncio.Lock()
 
async def get_stuff(url):
    # await lock.acquire()
    async with lock:
        if url in cache:
            return cache[url]
        stuff = await aiohttp.request('GET', url)
        cache[url] = stuff
        return stuff
    # lock.release()
 
async def parse_sutff():
    stuff = await get_stuff()
    # do some parsing
    
async def use_stuff():
    stuff = await get_stuff()
    # use stuff to do something interesting
    
tasks = [parse_sutff(), use_stuff()]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

12. Asynchronous thread pool or process pool loop.run_in_executor

Thread pool :

The previous code is asynchronous , as sleep, Need to use asyncio.sleep Instead of blocking time.sleep, If there is synchronization logic , Yes? ; utilize asyncio Realize concurrency ? The answer is to use run_in_executor.
loop.run_in_executor(None, a) The first parameter is to pass concurrent.futures.Executo Example of , Pass on None The default will be selected executor:

import asyncio
import random
import time
from concurrent.futures import ThreadPoolExecutor
 
def random_sleep(num):
    print('sleep start:', num, 's')
    time.sleep(num)
    print('sleep end:', num, 's')
    
async def main():
    executor = ThreadPoolExecutor(5)
    tasks = []
    for _ in range(5):
        sleep_time = random.randint(1,5)
        task = loop.run_in_executor(executor, random_sleep, sleep_time)
        tasks.append(task)
    await asyncio.wait(tasks)
    
if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    
    loop.run_until_complete(main())
    print('end time: {}'.format(time.time()-start_time))

The operation results are as follows :

sleep start: 1 s
sleep start: 4 s
sleep start: 2 s
sleep start: 5 s
sleep start: 2 s
sleep end: 1 s
sleep end: 2 s
sleep end: 2 s
sleep end: 4 s
sleep end: 5 s
end time: 5.010895490646362

The process of pool :

import asyncio
import random
import time
from concurrent.futures import ProcessPoolExecutor
 
def random_sleep(num):
    print('sleep start:', num, 's')
    time.sleep(num)
    print('sleep end:', num, 's')
    
async def main():
    executor = ProcessPoolExecutor(5)
    tasks = []
    for _ in range(5):
        sleep_time = random.randint(1,5)
        task = loop.run_in_executor(executor, random_sleep, sleep_time)
        tasks.append(task)
    await asyncio.wait(tasks)
    
if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    
    loop.run_until_complete(main())
    print('end time: {}'.format(time.time()-start_time))

13. Execute coroutines on other threads run_coroutine_threadsafe( This is thread safe )

The sample code is as follows :

import time
import asyncio
from threading import Thread
from functools import partial
 
async def a():
    time.sleep(1)
    return 'A'
 
def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()
 
def shutdown(loop):
    loop.stop()
    
if __name__ == '__main__':
    new_loop = asyncio.new_event_loop()
    t = Thread(target=start_loop, args=(new_loop,))
    t.start()
    future = asyncio.run_coroutine_threadsafe(a(), new_loop)
    print(future)
    print(f'Result: {
      future.result(timeout=2)}')
    new_loop.call_soon_threadsafe(partial(shutdown, new_loop))

The operation results are as follows :

<Future at 0x7f6da03bcd60 state=pending>
Result: A

There are several details to pay attention to :

  1. The coroutine should be called from another thread , Instead of the thread on which the event loop runs , So use asyncio.new_event_loop () Create a new event loop .
  2. Make sure that the newly created event loop is running before executing the Protocol , So we need to use start_loop Start the cycle in such a way
  3. Then you can use asyncio.run_coroutine_threadsafe Execute the agreement a 了 , It returns a Future object .
  4. You can feel it through output future At first it was pending Of , Because Xie Cheng a It will sleep 1 It takes seconds to return the result .
  5. use future.result (timeout=2) You can get the results , Set up timeout The value of is greater than a The execution time of the coordination process , Otherwise it will be thrown TimeoutError.
  6. At the beginning, the new event loop we created runs in a thread , because loop.run_forever Will block the program from closing , So you need to kill the thread at the end , So use call_soon_threadsafe Callback function shutdown To stop the event cycle .

Let's talk about it here call_soon_threadsafe, You can tell by its name that it is a thread safe version call_soon, In fact, it is to schedule callbacks in another thread .BTW, Actually asyncio.run_coroutine_threadsafe The bottom layer also uses it .

Two . Task Class explanation

Let's take a look first Task Class ( English original document ):

class asyncio.Task(coro, *, loop=None)
A Future-like object that runs a Python coroutine. Not thread-safe.
Tasks are used to run coroutines in event loops. If a coroutine awaits on a Future, the Task suspends the execution of the coroutine and waits for the completion of the Future. When the Future is done, the execution of the wrapped coroutine resumes.
Event loops use cooperative scheduling: an event loop runs one Task at a time. While a Task awaits for the completion of a Future, the event loop runs other Tasks, callbacks, or performs IO operations.
Use the high-level asyncio.create_task() function to create Tasks, or the low-level loop.create_task() or ensure_future() functions. Manual instantiation of Tasks is discouraged.
To cancel a running Task use the cancel() method. Calling it will cause the Task to throw a CancelledError exception into the wrapped coroutine. If a coroutine is awaiting on a Future object during cancellation, the Future object will be cancelled.
cancelled() can be used to check if the Task was cancelled. The method returns True if the wrapped coroutine did not suppress the CancelledError exception and was actually cancelled.
asyncio.Task inherits from Future all of its APIs except Future.set_result() and Future.set_exception().
Tasks support the contextvars module. When a Task is created it copies the current context and later runs its coroutine in the copied context.

The above text description introduces several very important information , It is hereby summarized as follows :

  1. He is as a python Coroutine object , and Future The object is very similar to such an object , But not thread safe ; He inherited the role of Future be-all API,, except Future.set_result() and Future.set_Exception();
  2. Use high-level API asyncio.ccreate_task() Create tasks , Or use low-level API loop.create_task() Or is it loop.ensure_future() Create task object ;
  3. Compared with the coplanar function , Tasks are stateful , have access to Task.cancel() Cancel , This triggers CancelledError abnormal , Use cancelled() Check whether to cancel .

Let's introduce Task Some common use functions of class

cancel()

Request the Task to be cancelled.

In fact, I have already introduced it , It's best to use it. He will start CancelledError abnormal , So the code in the coprocessor function that needs to be cancelled is best in try-except Statement block , This makes it easy to trigger exceptions , Print related information , however Task.cancel() There is no way to guarantee that the task will be cancelled , and Future.cancel() It can guarantee that the task will be cancelled . See an example below :

import asyncio
 
async def cancel_me():
   print('cancel_me(): before sleep')
   try:
       await asyncio.sleep(3600) # Simulate a time-consuming task 
   except asyncio.CancelledError:
       print('cancel_me(): cancel sleep')
       raise
   finally:
        print('cancel_me(): after sleep')
 
async def main():
   # Create a task through collaboration , It should be noted that , When creating a task , It will jump into asynchronous execution 
   # the reason being that 3.7 edition , Creating a task is equivalent to running an asynchronous function cancel_me
   task = asyncio.create_task(cancel_me()) 
   # Wait a second 
   await asyncio.sleep(1)
   print('main The function is over ')
   # Send a request to cancel the task 
   task.cancel()  
   try:
       await task  # Because the task was canceled , Triggered an exception 
   except asyncio.CancelledError:
       print("main(): cancel_me is cancelled now")
 
asyncio.run(main())

The running result is :

cancel_me(): before sleep
main The function is over 
cancel_me(): cancel sleep
cancel_me(): after sleep
main(): cancel_me is cancelled now

Operation process analysis :

  1. First run Function starts the main function entry main, stay main in , Because the first sentence is to call asynchronous functions cancel_me() function , So first print out the first sentence ;
  2. Then enter cancel_me Medium try sentence , encounter await, Pause , Back at this time main In the implementation of , But there is main I met await, It will also be suspended. , But because of main Just pause 1 second , and cancel_me Pause in 3600 second , So wait until main After the pause of , Then run main, So print out the second sentence ;
  3. Next, I encounter a request to cancel the task task.cancel(), And then go ahead and do it main Inside try, I met again await, next main Enter pause , Next go to cancel_me Function , But because of main Cancel task requested in , So that takes time 3600 Second tasks are no longer performed , Directly triggered Cancelled_Error abnormal , Print out the third sentence , And then again raise An exception message ;
  4. Next cancel_me Of finally, Print out the fourth sentence , here cancel_me completion of enforcement , Because he threw an exception , Return to main program main in , An exception , Print out the fifth sentence .

done()

When a packaged process does not trigger an exception 、 There is no time to cancel , It means that it is done Of , return true.

result()

Returns the execution result of the task ,
When the task is normally executed , Returns the result ;
When the task is cancelled , Call this method , Will trigger CancelledError abnormal ;
When the result returned by the task is useless , Calling this method will trigger InvalidStateError;
When the task is interrupted by an exception , Calling this method will also trigger the exception that interrupts the program again .

exception()

Return the abnormal information of the task , What exception is triggered , Return any exception , If the task is executed normally without exception , Then return to None;
When the task is cancelled , Calling this method will trigger CancelledError abnormal ;
When the task is not finished , Calling this method will trigger InvalidStateError abnormal .
Here are some less commonly used methods , as follows :

add_done_callback(callback, *, context=None)

remove_done_callback(callback)

get_stack(*, limit=None)

print_stack(*, limit=None, file=None)

all_tasks(loop=None), This is a class method

current_task(loop=None), This is a class method

3、 ... and . Get the result of asynchronous function

For asynchronous programming 、 For asynchronous functions , The most important thing is after the asynchronous function call , Get the return value of the asynchronous function , We can get the return value of the function in the following ways , The first is to pass directly Task.result() To get ; The second is to bind a callback function to get , That is, after the function is executed, call a function to obtain the return value of the asynchronous function .

1. Directly through result To get

import asyncio
import time
 
async def hello1(a,b):
   print("Hello world 01 begin")
   await asyncio.sleep(3)  # Simulate time-consuming tasks 3 second 
   print("Hello again 01 end")
   return a+b
 
coroutine=hello1(10,5)
loop = asyncio.get_event_loop()                # First step : Create an event loop 
task=asyncio.ensure_future(coroutine)         # The second step : Wrap multiple coroutine functions into task lists 
loop.run_until_complete(task)                  # The third step : Run through the event cycle ,run_until_complete The parameter of is a futrue object 
print('-------------------------------------')
print(task.result())

The running result is :

Hello world 01 begin
Hello again 01 end
-------------------------------------
15

2. Get by defining a callback function

import asyncio
import time
 
async def hello1(a,b):
   print("Hello world 01 begin")
   await asyncio.sleep(3)  # Simulate time-consuming tasks 3 second 
   print("Hello again 01 end")
   return a+b
 
def callback(future):   # Defined callback function 
   print(future.result())
 
loop = asyncio.get_event_loop()                # First step : Create an event loop 
task=asyncio.ensure_future(hello1(10,5))       # The second step : Wrap multiple coprocessor functions into tasks 
task.add_done_callback(callback)               # And bind a callback function by the task 
loop.run_until_complete(task)                  # The third step : Run through the event cycle 
loop.close()                                   # Step four : Close event loop 

The running result is :

Hello world 01 begin
Hello again 01 end
15

Be careful : So called callback function , It refers to the coplanar function coroutine The callback function will be called at the end of execution . And pass the parameters future Get the result of the execution of the process . We created task And in the callback future object , It's actually the same object , because task yes future Subclasses of .

3、 ... and 、asyncio The basic writing of asynchronous programming

Define the process

The definition of process , Need to use async def sentence .

async def do_some_work(x): 
    pass

do_some_work It's a process . To be precise ,do_some_work It's a coprocessor function , Can pass asyncio.iscoroutinefunction To verify :

import asyncio

async def do_some_work(x): 
    pass

print(asyncio.iscoroutinefunction(do_some_work))

Running results :

True

Nothing has been done by the association , We let it sleep for a few seconds , To simulate the actual workload :

async def do_some_work(x): 
    print("waiting " + str(x))
    await asyncio.sleep(x)

In explanation await Before , It is necessary to explain what the cooperation can do . The process can be :

  • Waiting for one future end
  • Waiting for another cooperation ( Produce a result , Or throw an exception )
  • Produce a result to the process waiting for it
  • Throw an exception to the waiting process

asyncio.sleep It's also a collaborative process , therefore await asyncio.sleep(x) Just waiting for another cooperation . See also asyncio.sleep Documents :

async def sleep(delay, result=None, *, loop=None):
    """Coroutine that completes after a given time (in seconds)."""

Run the process

Call the coroutine function , The process will not start , Just return a process object , Sure asyncio.iscoroutine To verify :

print(asyncio.iscoroutinefunction(do_some_work()))

Running results :

True

There will also be a warning :

/data/code/temp.py:7: RuntimeWarning: coroutine 'do_some_work' was never awaited
  print(asyncio.iscoroutine(do_some_work(3)))
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

To make this orchestration object run , There are two ways :

  • Use... In another running process await Wait for it
  • adopt ensure_future Function to plan its execution

Simply speaking , Only loop It's running , It's possible for the process to run . Let's get the default of the current thread loop , Then give the object of the process to loop.run_until_complete, The process object will then be in loop It's working .

loop = asyncio.get_event_loop()
loop.run_until_complete(do_some_work(3))

run_until_complete It's a jam (blocking) call , Until the end of the process , It just came back . This is not hard to see from function names .run_until_complete The parameter of is a future, But what we're passing on here is the object of cooperation , The reason for this , It's because it's checked inside , adopt ensure_future Function to wrap a co program object (wrap) a future. therefore , We can write more clearly :

loop.run_until_complete(asyncio.ensure_future(do_some_work(3)))

Complete code :

import asyncio
 
async def do_some_work(x): 
    print("waiting " + str(x))
    await asyncio.sleep(x)
 
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.ensure_future(do_some_work(3)))

Running results :

Waiting 3
< Three seconds later the program ends >

Case template

1. python3.7 Previous version

a. Return value from the process

import asyncio
 
async def foo():
    print(" This is an agreement ")
    return " Return value "
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        print(" Start running the process ")
        coro = foo()
        print(" Enter event loop ")
        result = loop.run_until_complete(coro)
        print(f"run_until_complete Can get the {
      result}, Default output None")
    finally:
        print(" Close event loop ")
        loop.close()

The operation results are as follows :

 Start running the process 
 Enter event loop 
 This is an agreement 
run_until_complete Can get the return value of the process , Default output None
 Close event loop 

run_until_complete Can get the return value of the process , If no return value is given , It's like a function , Default return None.

b. Process call process

One process can start another process , So that tasks can be performed according to the work content , Encapsulation into different processes . We can use it in the cooperation process await keyword , Chain scheduling process , To form a collaborative task flow . As in the following example .

import asyncio
 
async def main():
    print(" Main process ")
    print(" wait for result1 Cooperation operation ")
    res1 = await result1()
    print(" wait for result2 Cooperation operation ")
    res2 = await result2(res1)
    return (res1,res2)
 
async def result1():
    print(" This is a result1 coroutines ")
    return "result1"
 
async def result2(arg):
    print(" This is a result2 coroutines ")
    return f"result2 Received a parameter ,{
      arg}"
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        result = loop.run_until_complete(main())
        print(f" Get the return value :{
      result}")
    finally:
        print(" Close event loop ")
        loop.close()

Running results :

 Main process 
 wait for result1 Cooperation operation 
 This is a result1 coroutines 
 wait for result2 Cooperation operation 
 This is a result2 coroutines 
 Get the return value :('result1', 'result2 Received a parameter ,result1')
 Close event loop 

c. No parameter 、 No return value

import asyncio
 
async def sleep1():
   print("Hello world 01 begin")
   await asyncio.sleep(1)  # Simulate time-consuming tasks 1 second 
   print("Hello again 01 end")
 
async def sleep2():
   print("Hello world 02 begin")
   await asyncio.sleep(2)   # Simulate time-consuming tasks 2 second 
   print("Hello again 02 end")
 
async def sleep3():
   print("Hello world 03 begin")
   await asyncio.sleep(3)   # Simulate time-consuming tasks 3 second 
   print("Hello again 03 end")
 
loop = asyncio.get_event_loop()                # First step : Create an event loop 
tasks = [sleep1(), sleep2(),sleep3()]          # The second step : Wrap multiple coroutine functions into task lists 
# loop.run_until_complete(asyncio.gather(*tasks))
loop.run_until_complete(asyncio.wait(tasks))   # The third step : Run through the event cycle 
loop.close()                                   # Step four : Cancel the event cycle 

The operation results are as follows :

Hello world 01 begin
Hello world 03 begin
Hello world 02 begin
Hello again 01 end
Hello again 02 end
Hello again 03 end

d. With parameters 、 There is a return value

import asyncio
 
async def sleep1(a,b):
   print("Hello world 01 begin")
   await asyncio.sleep(1)  # Simulate time-consuming tasks 1 second 
   print("Hello again 01 end")
   return a+b
 
async def sleep2(a,b):
   print("Hello world 02 begin")
   await asyncio.sleep(2)   # Simulate time-consuming tasks 2 second 
   print("Hello again 02 end")
   return a-b
 
async def sleep3(a,b):
   print("Hello world 03 begin")
   await asyncio.sleep(4)   # Simulate time-consuming tasks 3 second 
   print("Hello again 03 end")
   return a*b
  
loop = asyncio.get_event_loop()                     #  First step : Create an event loop 
task1=asyncio.ensure_future(sleep1(10,5))
task2=asyncio.ensure_future(sleep2(10,5))
task3=asyncio.ensure_future(sleep3(10,5))
tasks = [task1,task2,task3]                         #  The second step : Wrap multiple coroutine functions into task lists 
loop.run_until_complete(asyncio.wait(tasks))        #  The third step : Run through the event cycle 
# loop.run_until_complete(asyncio.gather(*tasks))
print(task1.result())                               #  And after all the tasks are completed , Get the return value of the asynchronous function  
print(task2.result())
print(task3.result())
loop.close()                                        #  Step four : Close event loop 

The running result is :

Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 01 end
Hello again 02 end
Hello again 03 end
15
5
50

e. With parameters 、 There is a return value , And output the results in list order

import asyncio
 
async def sleep1():
    print("Hello world 01 begin")
    await asyncio.sleep(1)
    print("Hello world 01 end")
    return 'sleep1'
    
async def sleep2():
    print("Hello world 02 begin")
    await asyncio.sleep(2)
    print("Hello world 02 end")
    return 'sleep2'
    
async def sleep3():
    print("Hello world 03 begin")
    await asyncio.sleep(3)
    print("Hello world 03 end")
    return 'sleep3'
    
async def run():
    task1 = asyncio.ensure_future(sleep1())
    task2 = asyncio.ensure_future(sleep2())
    task3 = asyncio.ensure_future(sleep3())
    tasks = [task3,task1,task2]
    results = await asyncio.gather(*tasks)
    for result in results:
        print(result)
 
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()

The running result is :

Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello world 01 end
Hello world 02 end
Hello world 03 end
sleep3
sleep1
sleep2

summary : Four steps ( in the light of python3.7 Previous version )

First step : Construct an event loop

  • loop=asyncio.get_running_loop() # return ( obtain ) The event loop running in the current thread , If there is no running event loop , An error... Will be displayed ; It is python3.7 Newly added in
  • loop=asyncio.get_event_loop() # Get an event loop , If the current thread has no event loop , Then create a new event loop loop;
  • loop=asyncio.set_event_loop(loop) # Set an event loop as the event loop of the current thread ;
  • loop=asyncio.new_event_loop() # Create a new event loop

The second step : Wrap one or more coprocessor functions into Task object

# high-level API

task = asyncio.create_task(coro( parameter list ))   #  This is a 3.7 Version newly added 
task = asyncio.ensure_future(coro( parameter list )) 

# low API

loop.create_future(coro)
loop.create_task(coro)

‘’‘ It should be noted that , In the use of Task.result() When obtaining the result of the covariance function , Use asyncio.create_task() But it will show errors
But use asyncio.ensure_future But right ’‘’

The third step : Run through the event cycle

  • loop.run_until_complete(asyncio.wait(tasks)) # adopt asyncio.wait() Integrate multiple task

  • loop.run_until_complete(asyncio.gather(tasks)) # adopt asyncio.gather() Integrate multiple task

  • loop.run_until_complete(task_1) # A single task does not need to be integrated

  • loop.run_forever() # But this method has been canceled in the new version , No longer recommended , Because it is not concise to use

Use gather perhaps wait You can register multiple tasks at the same time , Implement concurrency , But their design is completely different , The main differences are as follows :

(1) The parameter forms are different

gather The parameters for *coroutines_or_futures, That is, in this form

tasks = asyncio.gather(*[task1,task2,task3]) perhaps 
tasks = asyncio.gather(task1,task2,task3)

loop.run_until_complete(tasks)

wait The parameter of is in the form of list or set , as follows

tasks = asyncio.wait([task1,task2,task3])

loop.run_until_complete(tasks)
(2) The values returned are different

gather Is defined as follows ,gather What is returned is the result of each task ,

results = await asyncio.gather(*tasks) 

wait Is defined as follows , return dones It's a task that has been completed ,pending It's an unfinished task , They're all set types

done, pending = yield from asyncio.wait(fs)

In short, similarities and differences :

  • identical : functionally ,asyncio.wait and asyncio.gather The effect is the same , Is to put all Task Collect the task results .
  • Different :asyncio.wait Two values will be returned :done and pending,done For the completed collaboration Task,pending It refers to the cooperation process that has not been completed due to timeout Task, Need to pass future.result call Task Of result; and asyncio.gather Return all completed Task Of result, No more calls or other operations are required , You can get all the results .
results = await asyncio.gather(*tasks)
for result in results:
    print(result)

Equivalent to :

done,pending = await asyncio.wait(tasks)
for done_task in done:
    print(done_task.result())
asyncio.gather and asyncio.wait The first difference between them :
  1. asyncio.gather Packaged Task Black box in the whole process , Just tell you the result of the process .
  2. asyncio.wait It will return encapsulated Task( Include completed and pending tasks ), If you pay attention to the execution results of the collaboration process, you need to start from the corresponding Task The example uses result Take it by yourself .

Why do you say 「 The first difference 」,asyncio.wait The name can be understood as 「 wait for 」, So the second item of the return value is pending list , But look at the example above ,pending Is an empty set , So under what circumstances ,pending It's not empty ? This is the second difference :asyncio.wait Support to choose the time to return .

asyncio.wait Support a receive parameter return_when, By default ,asyncio.wait Will wait for all tasks to be completed (return_when='ALL_COMPLETED'), It also supports FIRST_COMPLETED( When the first coordination process is completed, it returns ) and FIRST_EXCEPTION( The first exception will return )

import asyncio
 
async def a():
    print('Suspending a')
    await asyncio.sleep(3)
    print('Resuming a')
    return 'A'
 
async def b():
    print('Suspending b')
    await asyncio.sleep(1)
    print('Resuming b')
    return 'B'
 
async def main():
    done, pending = await asyncio.wait([a(), b()], return_when=asyncio.tasks.FIRST_COMPLETED)
    print(done)
    print(pending)
    
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

The operation results are as follows :

Suspending b
Suspending a
Resuming b
{<Task finished name='Task-2' coro=<b() done, defined at /data/py_code/asyncio_example/aio_wait_gather.py:10> result='B'>}
{<Task pending name='Task-3' coro=<a() running at /data/py_code/asyncio_example/aio_wait_gather.py:5> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f6313d72ac0>()]>>}

This time only Xiecheng b It's done , coroutines a still pending state .

In most cases , use asyncio.gather Is enough , If you have special needs , You can choose asyncio.wait, for 2 An example :

  1. You need to get the sealed Task, In order to cancel or add a successful callback
  2. Business needs FIRST_COMPLETED/FIRST_EXCEPTION That is, the returned

Step four : Close event loop

loop.close()

None of the above examples call loop.close, There seems to be no problem . So do you want to adjust loop.close Well ?
Simply speaking ,loop As long as it doesn't shut down , You can run it again :

loop.run_until_complete(do_some_work(loop, 1))
loop.run_until_complete(do_some_work(loop, 3))
loop.close()

But if it's off , Can't run any more :

loop.run_until_complete(do_some_work(loop, 1))
loop.close()
loop.run_until_complete(do_some_work(loop, 3))  #  This is abnormal 

Recommend calling loop.close, To clean up thoroughly loop Object to prevent misuse

2. python3.7 edition

stay python3.7 In the version ,asyncio Also introduced some new features and API

(1) Example a : No parameter 、 No return value

import asyncio
 
async def sleep1():
   print("Hello world 01 begin")
   await asyncio.sleep(1)  # Simulate time-consuming tasks 1 second 
   print("Hello again 01 end")
 
async def sleep2():
   print("Hello world 02 begin")
   await asyncio.sleep(2)   # Simulate time-consuming tasks 2 second 
   print("Hello again 02 end")
 
async def sleep3():
   print("Hello world 03 begin")
   await asyncio.sleep(3)   # Simulate time-consuming tasks 3 second 
   print("Hello again 03 end")
 
async def main():
   results=await asyncio.gather(sleep1(),sleep2(),sleep3())
   for result in results:
       print(result)     # Because there is no return value , So return to None
 
asyncio.run(main())

The running result is :

Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 01 end
Hello again 02 end
Hello again 03 end
None
None
None

(2) Example 2 : With parameters 、 There is a return value

import asyncio
import time
 
a = time.time()
 
async def sleep1():
    print("Hello world 01 begin")
    await asyncio.sleep(1)
    print("Hello world 01 end")
    return 'sleep1'
    
async def sleep2():
    print("Hello world 02 begin")
    await asyncio.sleep(2)
    print("Hello world 02 end")
    return 'sleep2'
    
async def sleep3():
    print("Hello world 03 begin")
    await asyncio.sleep(3)
    print("Hello world 03 end")
    return 'sleep3'
    
async def main():
    task1 = asyncio.ensure_future(sleep1())
    task2 = asyncio.ensure_future(sleep2())
    task3 = asyncio.ensure_future(sleep3())
    tasks = [task3,task1,task2]
    results = await asyncio.gather(*tasks)
    for result in results:
        print(result)
 
asyncio.run(main())

The running result is :

Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello world 01 end
Hello world 02 end
Hello world 03 end
sleep3
sleep1
sleep2

summary : Two steps ( in the light of python3.7)

First step : Build an entry function main

It is also an asynchronous function , That is, through async Definition , And in main In the function await One or more processes , Same as before , I can get through gather Or is it wait Are combined , For a coprocessor function with a return value , In general main Get the results inside .

The second step : Start the main function main

This is a python3.7 The newly added function , Just a word , namely

asyncio.run(main())

Be careful :
There is no need to explicitly create an event loop , Because it's starting up run Function , Will automatically create a new event loop . And in main There is no need to call the wrapped coprocessor function through the event loop , Just call it like a normal function , Just use await It's just keywords .

Original reference :
https://blog.csdn.net/qq_27825451/article/details/86218230
https://blog.csdn.net/qiuqiuit/article/details/86773310
https://zhuanlan.zhihu.com/p/59671241
https://mp.weixin.qq.com/s/bJKbHph1T0Z95rCioV4iPA
https://mozillazg.com/2017/08/python-asyncio-note-task-usage.html

原网站

版权声明
本文为[Tan sheep]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/188/202207071352068232.html