Python中asyncio库的基本使用

Python中asyncio库的基本使用

asyncio是一个使用async / await语法编写并发代码的库。asyncio用作多个Python异步框架的基础,这些框架提供高性能的网络和Web服务器,数据库连接库,分布式任务队列等。asyncio通常非常适合IO绑定和高级 结构化网络代码。

asyncio提供了一组高级 API: 同时运行Python协同程序并完全控制它们的执行; 执行网络IO和IPC ; 控制子过程 ; 通过队列分配任务; 同步并发代码; 此外,还有一些用于库和框架开发人员的低级 API : 创建和管理事件循环,提供异步API networking,运行subprocesses,处理等;OS signals 使用传输实现有效的协议 ; 使用async / await语法桥接基于回调的库和代码。 Conroutines 使用async / await语法声明的协同程序是编写asyncio应用程序的首选方法。例如,以下代码片段(需要Python 3.7+)打印“hello”,等待1秒,然后打印“world”:

import asyncio
 
async def main():
     print('hello')
     await asyncio.sleep(1)
     print('world')
 
asyncio.run(main())
 
上面代码等同于下面(不推荐使用基于生成器的协同程序的支持,并计划在Python 3.10中删除。)
import asyncio
 
@asyncio.coroutine
def main():
     print('hello')
     yield  from asyncio.sleep(1)
     print('world')
 
asyncio.run(main())
 
asyncio实际等同于下面的工作(参数为An asyncio.Future, a coroutine or an awaitable is required)
import asyncio
 
@asyncio.coroutine
def main():
     print('hello')
     yield  from asyncio.sleep(1)
     print('world')
 
# asyncio.run(main())
loop = asyncio.events.new_event_loop()
asyncio.events.set_event_loop(loop)
loop.run_until_complete(main())
 
asyncio.run功能介绍 实际运行协程asyncio提供了三种主要机制: 1、The asyncio.run()函数来运行顶层入口点“main()”函数(见上面的例子) 2、Awaiting on a coroutine 以下代码片段将在等待1秒后打印“hello”,然后在等待另外 2秒后打印“world” :
import asyncio
import time
 
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
 
async def main():
    print(f"started at {time.strftime('%X')}")
 
    await say_after(1, 'hello')
    await say_after(2, 'world')
 
    print(f"finished at {time.strftime('%X')}")
 
asyncio.run(main())
# started at 11:54:48
# hello
# world
# finished at 11:54:51
 
3、asyncio.create_task()与asyncio同时运行协同程序的功能Tasks;让我们修改上面的例子并同时运行两个say_after协同程序 :
import asyncio
import time
 
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(f"{what} at {time.strftime('%X')}")
 
async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))
    task2 = asyncio.create_task(
        say_after(2, 'world'))
    print(f"started at {time.strftime('%X')}")
    # Wait until both tasks are completed (should take
    # around 2 seconds.)
 
    await task1
    await task2
    print(f"finished at {time.strftime('%X')}")
 
asyncio.run(main())
 
# started at 14:27:22
# hello at 14:27:23
# world at 14:27:24
# finished at 14:27:24
 
稍微改变一下形式,可以理解的更多

import asyncio
import time
 
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(f"{what} at {time.strftime('%X')}")
 
async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))
    task2 = asyncio.create_task(
        say_after(2, 'world'))
    print(f"started at {time.strftime('%X')}")
    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await asyncio.sleep(3)
 
    # await task1
    # await task2
    print(f"finished at {time.strftime('%X')}")
 
asyncio.run(main())
 
# started at 14:29:41
# hello at 14:29:42
# world at 14:29:43
# finished at 14:29:44
 
   Awaitables 我们说如果一个对象可以在表达式中使用,那么它就是一个等待对象await。许多asyncio API旨在接受等待。 有三种主要类型的等待对象:coroutines, Tasks, and Futures. Coroutines Python coroutines are awaitables and therefore can be awaited from other coroutines:
import asyncio
 
async def nested():
    return 42
 
async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()
 
    # Let's do it differently now and await it:
    print(await nested())  # will print "42".
 
asyncio.run(main())
 
# 42
 
重要 在本文档中,术语“coroutine”可用于两个密切相关的概念: 一个协程功能:一个功能;async def 一个协程对象:通过调用协同程序函数返回的对象 。 Tasks 任务用于调度协同程序并发。 当一个协程被包装到一个Task中时,会像asyncio.create_task()一样 conroutine自动安排很快运行:
import asyncio
 
async def nested():
    return 42
 
async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())
 
    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task
 
asyncio.run(main())
 
Futures A Future是一个特殊的低级别等待对象,它表示异步操作的最终结果。 当等待 Future对象时,它意味着协程将等到Future在其他地方解析。 需要asyncio中的未来对象以允许基于回调的代码与async / await一起使用。 通常,不需要在应用程序级代码中创建Future对象。 可以等待有时通过库和一些asyncio API公开的未来对象:
async def main():
    await function_that_returns_a_future_object()
 
    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )
 
返回Future对象的低级函数的一个很好的例子是loop.run_in_executor()。 Asyncio方法 1、运行asyncio程序 asyncio.run(coro,*,debug = False )   此函数运行传递的协同程序,负责管理asyncio事件循环并最终确定异步生成器。   当另一个asyncio事件循环在同一个线程中运行时,无法调用此函数。   如果是debugTrue,则事件循环将以调试模式运行。   此函数始终创建一个新的事件循环并在结束时将其关闭。它应该用作asyncio程序的主要入口点,理想情况下应该只调用一次。   版本3.7中的新功能:重要:此功能已临时添加到Python 3.7中的asyncio中。 2、创建任务 asyncio.create_task(coro)   将coro coroutine包装成a Task 并安排执行。返回Task对象。   任务在返回的循环中执行,如果当前线程中没有运行循环get_running_loop(), RuntimeError则引发该任务。   Python 3.7中添加了此功能。在Python 3.7之前,asyncio.ensure_future()可以使用低级函数:
async def coro():
    ...
 
# In Python 3.7+
task = asyncio.create_task(coro())
...
 
# This works in all Python versions but is less readable
task = asyncio.ensure_future(coro())

 
 
3、sleeping coroutine asyncio.sleep(delay, result=None, *, loop=None)   阻止 delay seconds.。   如果提供了result ,则在协程完成时将其返回给调用者。   leep() 始终挂起当前任务,允许其他任务运行。   该loop 参数已被弃用,并定于去除在Python 3.10。   协程示例每秒显示当前日期5秒:
import asyncio
import datetime
 
async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)
 
asyncio.run(display_date())

 
   4、同时运行任务 awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)   同时在aws 序列中运行 awaitable objects   如果在aws中任何awaitable 是协程,它将自动安排为任务   如果所有等待成功完成,则结果是返回值的汇总列表。结果值的顺序对应于aws中的等待顺序   如果return_exceptions是False(默认),则第一个引发的异常会立即传播到等待的任务gather()。   如果return_exceptions是True,异常的处理方式一样成功的结果,并在结果列表汇总。   如果gather()被取消,所有提交的awaitables(尚未完成)也被取消。   如果aws序列中的Task or Future被取消,则将其视为已引发CancelledError- 在这种情况下不会取消gather() 呼叫。这是为了防止取消一个提交的Tasks/Futures 以导致其他任务/期货被取消。
import asyncio
 
async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
 
async def main():
    # Schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )
 
asyncio.run(main())
 
# Expected output:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24
 
获取返回结果,异常情况

import asyncio
 
async def factorial(name, number):
 
    print(name)
    if name == 'A':
        return name
    elif name == 'B':
        raise SyntaxError(name)
    await asyncio.sleep(number)
 
 
async def main():
    # Schedule three calls *concurrently*:
    result = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
        return_exceptions=True
    )
    print(result)
 
asyncio.run(main())
 
# A
# B
# C
# ['A', SyntaxError('B'), None]
 
版本3.7中已更改:如果取消了聚集本身,则无论return_exceptions如何,都会传播取消。 5、Shielding From Cancellation awaitable asyncio.shield(aw, *, loop=None)   Protect an awaitable object from being cancelled.   If aw is a coroutine it is automatically scheduled as a Task.   The statement:
res = await shield(something())
 
  等同于
res = await something()
 
  除非取消包含它的协程,否则something()不会取消运行的任务。从观点来看something(),取消没有发生。虽然它的来电者仍然被取消,所以“等待”表达仍然提出了一个CancelledError。   如果something()通过其他方式取消(即从内部取消)也会取消shield()。   如果希望完全忽略取消(不推荐),则该shield()函数应与try / except子句结合使用,如下所示:
try:
    res = await shield(something())
except CancelledError:
    res = None
 
6、超时 coroutine asyncio.wait_for(aw, timeout, *, loop=None)   Wait for the aw awaitable to complete with a timeout.   If aw is a coroutine it is automatically scheduled as a Task.   timeout可以是None或等待的float或int秒数。如果超时是None,将等到完成   如果发生超时,它将取消任务并加注 asyncio.TimeoutError。   要避免该任务cancellation,请将其包装shield()。   该函数将一直等到将来实际取消,因此总等待时间可能会超过超时。   如果等待被取消,则未来的aw也会被取消。   该循环参数已被弃用,并定于去除在Python 3.10。 async def eternity(): # Sleep for one hour await asyncio.sleep(3600) print(‘yay!’) async def main(): # Wait for at most 1 second try: await asyncio.wait_for(eternity(), timeout=1.0) except asyncio.TimeoutError: print(‘timeout!’) asyncio.run(main()) # Expected output: # # timeout! 改变在3.7版本:当AW被取消,由于超时,wait_for等待AW被取消。以前,它asyncio.TimeoutError立即提出 。 7、超时原语 coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)   同时运行aws中的等待对象 并阻塞,直到return_when指定的条件为止。   如果在aws中任何等待的是协程,它将自动安排为任务。wait()直接传递协同程序对象 已被弃用,因为它会导致 混乱的行为。   返回两组任务/期货:。(done, pending)   用法: 1 done, pending = await asyncio.wait(aws)   该循环参数已被弃用,并定于去除在Python 3.10。   timeout(浮点数或整数),如果指定,可用于控制返回前等待的最大秒数。   请注意,此功能不会引发asyncio.TimeoutError。超时发生时未完成的期货或任务仅在第二组中返回。   return_when表示此函数何时返回。它必须是以下常量之一: 不变 描述 FIRST_COMPLETED 当任何未来完成或取消时,该函数将返回。 FIRST_EXCEPTION 当任何未来通过引发异常完成时,函数将返回。如果没有未来引发异常则等同于 ALL_COMPLETED。 ALL_COMPLETED 所有期货结束或取消时,该功能将返回。   不像wait_for(),wait()当发生超时不会取消期货。      注意 wait()将协同程序自动调度为任务,然后在 集合中返回隐式创建的任务对象。因此,以下代码将无法按预期方式工作:(done, pending)
async def foo():
    return 42
 
coro = foo()
done, pending = await asyncio.wait({coro})
 
if coro in done:
    # This branch will never be run!
  以下是如何修复上述代码段: async def foo(): return 42 task = asyncio.create_task(foo()) done, pending = await asyncio.wait({task}) if task in done: # Everything will work as expected now.   wait()不推荐直接传递协程对象。 8、 Scheduling From Other Threads asyncio.run_coroutine_threadsafe(coro, loop)   将协程提交给给定的事件循环。线程安全的。   返回a concurrent.futures.Future以等待另一个OS线程的结果。   此函数旨在从与运行事件循环的OS线程不同的OS线程调用。例: # Create a coroutine coro = asyncio.sleep(1, result=3) # Submit the coroutine to a given loop future = asyncio.run_coroutine_threadsafe(coro, loop) # Wait for the result with an optional timeout argument assert future.result(timeout) == 3   如果在协程中引发异常,则会通知返回的Future。它还可以用于取消事件循环中的任务: try: result = future.result(timeout) except asyncio.TimeoutError: print(‘The coroutine took too long, cancelling the task…’) future.cancel() except Exception as exc: print(f’The coroutine raised an exception: {exc!r}’) else: print(f’The coroutine returned: {result!r}’)   请参阅 文档的并发和多线程部分。   与其他asyncio函数不同,此函数需要 显式传递循环参数。   3.5.1版中的新功能。 9、自省 asyncio.current_task(loop = None )   返回当前正在运行的Task实例,或者None没有正在运行的任务。   如果loop是None get_running_loop()用来获取loop。   版本3.7中的新功能。 asyncio.all_tasks(loop = None )   返回Task循环运行的一组尚未完成的对象。   如果loop是None,get_running_loop()则用于获取当前循环。   版本3.7中的新功能。 任务对象 class asyncio.Task(coro,*,loop = None ) A Future-like object that runs a Python coroutine. Not thread-safe. 任务用于在事件循环中运行协同程序。如果一个协程在Future上等待,则Task暂停执行协程并等待Future的完成。当Future 完成后,包装协程的执行将恢复。 事件循环使用协作调度:事件循环一次运行一个任务。当一个Task等待完成Future时,事件循环运行其他任务,回调或执行IO操作。 使用高级asyncio.create_task()功能创建任务,或低级别loop.create_task()或 ensure_future()功能。不鼓励手动实例化任务。 要取消正在运行的任务,请使用该cancel()方法。调用它将导致Task将CancelledError异常抛出到包装的协同程序中。如果在取消期间协程正在等待Future对象,则Future对象将被取消。 cancelled()可用于检查任务是否被取消。True如果包装的协程没有抑制CancelledError异常并且实际上被取消,则该方法返回。 asyncio.Task继承自Future其所有API,除了Future.set_result()和 Future.set_exception()。 任务支持该contextvars模块。创建任务时,它会复制当前上下文,然后在复制的上下文中运行其协程。 版本3.7中已更改:添加了对contextvars模块的支持。 cancel()   请求取消任务。   这会安排CancelledError在事件循环的下一个循环中将异常抛入包装的协程。   协程则有机会通过抑制异常与清理,甚至拒绝请求try… … … … … … 块。因此,不同于,不保证任务将被取消,尽管完全抑制取消并不常见,并且积极地不鼓励。exceptCancelledErrorfinallyFuture.cancel()Task.cancel()   以下示例说明了协同程序如何拦截取消请求: async def cancel_me(): print(‘cancel_me(): before sleep’) try: # Wait for 1 hour await asyncio.sleep(3600) except asyncio.CancelledError: print(‘cancel_me(): cancel sleep’) raise finally: print(‘cancel_me(): after sleep’) async def main(): # Create a “cancel_me” Task task = asyncio.create_task(cancel_me()) # Wait for 1 second await asyncio.sleep(1) task.cancel() try: await task except asyncio.CancelledError: print(“main(): cancel_me is cancelled now”) asyncio.run(main()) # Expected output: # # cancel_me(): before sleep # cancel_me(): cancel sleep # cancel_me(): after sleep # main(): cancel_me is cancelled now cancelled()   True如果任务被取消,则返回。   请求取消时取消任务, cancel()并且包装的协同程序将CancelledError异常传播 到其中。 done()   True如果任务完成则返回。   一个任务完成时,包裹协程要么返回的值,引发异常,或者任务被取消。 result()   返回任务的结果。   如果任务完成,则返回包装协程的结果(或者如果协程引发异常,则重新引发该异常。)   如果已取消任务,则此方法会引发CancelledError异常。   如果Task的结果尚不可用,则此方法会引发InvalidStateError异常。 exception()   返回Task的例外。   如果包装的协同程序引发异常,则返回异常。如果包装的协程正常返回,则此方法返回None。   如果已取消任务,则此方法会引发 CancelledError异常。   如果尚未完成任务,则此方法会引发 InvalidStateError异常。 add_done_callback(回调,*,上下文=无)   添加要在任务完成时运行的回调。   此方法仅应在基于低级回调的代码中使用。    有关Future.add_done_callback() 详细信息,请参阅文档。 remove_done_callback(回调)   从回调列表中删除回调。   此方法仅应在基于低级回调的代码中使用。   有关Future.remove_done_callback() 详细信息,请参阅文档。 get_stack(*,limit = None )   返回此任务的堆栈帧列表。   如果未完成包装的协同程序,则会返回挂起它的堆栈。如果协程已成功完成或被取消,则返回一个空列表。如果协程被异常终止,则返回回溯帧列表。   帧始终从最旧到最新排序。   对于挂起的协程,只返回一个堆栈帧。   可选的limit参数设置要返回的最大帧数; 默认情况下,返回所有可用的帧。返回列表的排序取决于是返回堆栈还是返回:返回堆栈的最新帧,但返回最旧的回溯帧。(这与回溯模块的行为相匹配。) print_stack(*,limit = None,file = None )   打印此任务的堆栈或回溯。   这会为检索到的帧生成类似于回溯模块的输出get_stack()。   该极限参数传递给get_stack()直接。   的文件参数是其中输出被写入的I / O流; 默认输出写入sys.stderr。 classmethod all_tasks(loop = None )   返回一组事件循环的所有任务。   默认情况下,返回当前事件循环的所有任务。如果是loopNone,则该get_event_loop()函数用于获取当前循环。   不推荐使用此方法,将在Python 3.9中删除。请改用此asyncio.all_tasks()功能。 classmethod current_task(loop = None )   返回当前正在运行的任务或None。   如果是loopNone,则该get_event_loop()函数用于获取当前循环。   不推荐使用此方法,将在Python 3.9中删除。请改用此asyncio.current_task()功能。 其他 1、async for 运用 import asyncio class AsyncIter: def __init__(self, items): self.items = items async def __aiter__(self): for item in self.items: await asyncio.sleep(1) yield item async def print_iter(things): async for item in things: print(item) if __name__ == ‘__main__’: loop = asyncio.get_event_loop() things = AsyncIter([1, 2, 3]) loop.run_until_complete(print_iter(things)) loop.close()
分享到 :

发表评论

登录... 后才能评论