Skip to content

Latest commit

 

History

History
1592 lines (1043 loc) · 54.7 KB

阻塞异步与协程.md

File metadata and controls

1592 lines (1043 loc) · 54.7 KB

从顺序执行到并行执行

前几天去外地参加朋友婚礼,作为一个外地人我是不认路的,但我被安排了去接其他客人,于是我不得不依赖导航.我需要打开导航,听着它的指挥开车.

这就是一个典型的并行执行的过程,我要同时开车,并且同时监听着手机上的导航的指挥.

人们往往是同时做几件,比如边看电视边吃饭,边听音乐边工作,边打电话边开车(千万不要这么做).并且很多时候我们不得不同时做几件事,而一件事是另一件事的依赖.

人可以并行的执行任务(事实上人脑就是并行处理事件的)但电脑'不行',单核电脑比较耿直只会按固定好的顺序执行任务.前文也已经介绍过了如何组织单线程过程.

但好在电脑的运转速度远比人的反应速度快,因此我们可以耍点小花招让多个任务看起来是一起执行的.

拿之前看导航开车的例子来说,实际上我开车这个事件可以看作一个循环,每个循环中我有两个动作

  • 我的耳朵在监听着手机(使用声音的音色语调等特征识别),当它有指示的时候我会按照指示执行
  • 没有指示就根据路况开一段

当然了这个事件看起来作为并发的例子更加合适,但道理是一样的.

注意:本文使用jupyter notebook在ipykernel 5.0+下编写.由于ipykernel有autoawait机制可以直接await协程,在.py文件中await关键字只能在协程函数中使用,并且启动项目需要使用标准库中的方法asyncio.run(cor)启动入口协程.

一个典型的基于协程的函数可以这样写:

import asyncio

async def main():
    print('Hello ...')
    await asyncio.sleep(1)
    print('... World!')

asyncio.run(main())

阻塞与非阻塞

阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.

  • 阻塞调用是指调用结果返回之前,当前线程会被挂起.调用线程只有在得到结果之后才会返回.

  • 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程.

如果开车的时候我监听导航是阻塞的,那就意味着我的关注点转移到了导航上,必须要有它的指导我才会有动作,这么开车早就出事故了.

推广到我们的程序,也就是说我们的流程需要可以被保存状态,将线程的控制权转移到其他流程中.同时也要可以下次再被转移回来接着上次的继续运行.

同步与异步

同步与异步同步和异步关注的是消息通信机制(synchronous communication/ asynchronous communication).

  • 所谓同步就是在发出一个*调用*时,在没有得到结果之前,该*调用*就不返回.但是一旦调用返回就得到返回值了.换句话说,就是由*调用者*主动等待这个*调用*的结果.

  • 而异步则是相反,*调用*在发出之后这个调用就直接返回了,所以没有返回结果.换句话说当一个异步过程调用发出后,调用者不会立刻得到结果.而是在*调用*发出后,*被调用者*通过状态,通知来通知调用者,或通过回调函数处理这个调用.

开车的时候导航就是异步的,当打开导航后就会有个反馈--地图上我们的位置会被标记出来.而实际的导航信息都是由导航自己语音通知我们的.

有了上面的概念,我们就可以来看看python中官方的单线程并行解决方案了

协程及其语法

协程是一种单线程通过调度器或者事件循环从而实现的并行的解决方案.它是由用户控制的"并行",因此只要代码一样(没有使用random)协程的运行顺序就是一样的.实际上完全可以等价的用回调函数实现.

协程是实现高并发的方案中开销最小的方案.在io密集型任务中往往是最高效的方案.python3.5以后协程语法已经基本定型.在随后的3.6,3.7中接口逐步完善,协程语法变得更加优雅好用.

python的协程模型可以分为如下几个部分:

  • coroutine function:协程函数,创建协程的工厂.用async关键字定义的函数,调用它就会创建一个协程对象

  • coroutine 协程对象:协程对象无法自己执行,需要将其注册到事件循环中转变为一个Task对象才会被执行.协程对象一定满足Awaitable协议

  • Future 期程对象:一个低级别对象用来链接底层回调式代码和高层异步/等待式代码,其概念类似JS中的Promise.通常在应用时不会用到,但在写模块时会用到,期程一定满足Awaitable协议

  • Task 任务对象:被注册到事件循环的协程对象就是一个任务对象.任务对象一定满足Awaitable协议,TaskFuture的子类,因此任务对象也是一个期程对象

  • event_loop 调度器/事件循环(:用于调度协程运行的顺序,调度器用于调度协程而事件循环则是一种特殊的调度器--程序开启一个无限的循环,程序员会把一些函数注册到事件循环上.当满足事件发生的时候,调用相应的协程函数.我们应该尽量的使用asyncio的高级api,隐式的执行协程.

协程语法可以说是函数的一个超集,它的特征是使用async def来定义,并且可以在其内部使用await关键字等待另一个协程完成.协程对象的抽象基类为collections.abc.Coroutine,实现send(value),throw(type, exc, tb),close()__await__()接口.

可以看出协程与生成器接口相似,就是多了个__await__()少了迭代器相关的__next__()和__iter__()事实上,在3.7版本之前,协程都是使用生成器来实现的.

协程对象内部需要实现Awaitable协议,也就是要实现__await__接口,这个接口必须返回一个迭代器,带有这一接口的对象我们称之为Future-like对象,有它的就可以被程序用await关键字挂起等待,Future-like类的抽象基类为collections.abc.Awaitable

await语法

await就是用来挂起等待任务结束的关键字它只能在协程中使用.它的后面必须是一个Awaitable的对象

有效用法:

表达式 被解析为
if await fut: pass if (await fut): pass
if await fut + 1: pass if (await fut) + 1: pass
pair = await fut, 'spam' pair = (await fut), 'spam'
with await fut, open(): pass with (await fut), open(): pass
await foo()['spam'].baz()() await ( foo()['spam'].baz()() )
return await coro() return ( await coro() )
res = await coro() ** 2 res = (await coro()) ** 2
func(a1=await coro(), a2=0) func(a1=(await coro()), a2=0)
await foo() + await bar() (await foo()) + (await bar())
-await foo() -(await foo())

无效用法:

表达式 应该写为
await await coro() await (await coro())
await -coro() await (-coro())

一般来说await会挂起直到它下面的一串Future-like对象都运行结束才会继续向下.

异步迭代器和async for

异步迭代器可以在它的iter实现里挂起,调用异步代码,也可以在它的__next__方法里挂起,调用异步代码.要支持异步迭代,需要:

  • 对象必须实现一个__aiter__接口,返回一个异步迭代器对象,这个异步迭代器对象在每次迭代时会返回一个Future-like对象
  • 一个异步迭代器必须实现一个__anext__方法,在每次迭代时返回一个Future-like对象
  • 要停止迭代,__anext__必须抛出一个StopAsyncIteration异常

python的buildin方法中有aiter()anext()可以直接调用异步迭代器的对应接口实现.

例子:

import asyncio
class Ticker:
    """Yield numbers from 0 to `to` every `delay` seconds."""

    def __init__(self, delay, to):
        self.delay = delay
        self.i = 0
        self.to = to

    def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        if i >= self.to:
            raise StopAsyncIteration
        self.i += 1
        if i:
            await asyncio.sleep(self.delay)
        return i

async def main():
    async for i in Ticker(1,5):
        print(i)
await main()
0
1
2
3
4

异步列表解析

列表解析中可以使用await来等待Future-like对象的结果,如:

result = [await fun() for fun in funcs if await condition()]

在列表中允许使用async for来做迭代,它的形式如下:

[i async for i in Ticker(1,5) if i % 2]

import asyncio
class Ticker:
    """Yield numbers from 0 to `to` every `delay` seconds."""

    def __init__(self, delay, to):
        self.delay = delay
        self.i = 0
        self.to = to

    def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        if i >= self.to:
            raise StopAsyncIteration
        self.i += 1
        if i:
            await asyncio.sleep(self.delay)
        return i

async def main():
    result = [i async for i in Ticker(1,5) if i % 2]
    print(result)
    
await main()
[1, 3]

异步迭代器工具

github上有一个异步迭代器工具aitertools,它的主要作用就是转换同步迭代器和对一步迭代器进行组合,主要的接口有:

  • aiter(iter) 将一个同步的可迭代对象转化为异步可迭代对象
  • alist(aiter) 将一个异步可迭代对象转化为list
  • atuple(aiter) 将一个异步可迭代对象转化为tuple
  • count(start=0, step=1) 生成一个从start开始每次步进step的异步计数器
  • cycle(aiter) 将一个异步可迭代对象转化为一个以他为基础的循环
  • (obj, times=None) 将一个对象转化为一个以他为基础的重复异步可迭代对象
  • accumulate(iterable, func=operator.add) 对一个异步可迭代对象进行卷积
  • chain(*iterables) 将几个可迭代对象串联
  • compress(data, selectors)并行处理两个可迭代的对象;如果selectors中的元素是真值,产出data中对应的元素
  • dropwhile(predicate, iterable)处理iterable,跳过predicate的计算结果为真值的元素,然后产出剩下的各个元素(不再进一步检查)
  • filterfalse(predicate, iterable)与filter函数的作用类似,不过predicate的逻辑是相反的--predicate返回假值时产出对应的元素
  • groupby(iterable, key=None)产出由两个元素组成的元素,形式为(key,group)其中key是分组标准,group是生成器,用于产出分组里的元素
  • islice(iterable, *args)产出it的切片,作用类似于s[:stop]或s[start:stop:step],不过it可以是任何可迭代的对象,而且这个函数实现的是惰性操作
  • starmap(func, iterable)把it中的各个元素传给func,产出结果;输入的可迭代对象应该产出可迭代的元素iit,然后以func(*iit)这种形式调用func
  • takewhile(predicate, iterable)predicate返回真值时产出对应的元素,然后立即停止,不再继续检查
  • tee(iterable, n=2)产出一个由n个生成器组成的元组,每个生成器用于单独产出输入的可迭代对象中的元素
  • zip_longest(*iterables, fillvalue=None)并行从输入的各个可迭代对象中获取元素,产出由N个元素组成的元组,等到最长的可迭代对象到头后才停止,空缺的值使用fillvalue填充
  • product(*iterables, repeat=1)把前两个元素传给func,然后把计算结果和第三个元素传给func,以此类推,返回最后的结果;如果提供了initial,把它当作第一个元素传入

异步上下文管理器和async with

异步上下文管理器类似普通的上下文管理器,可以让程序在进入上下文和离开上下文之间挂起状态,调用异步代码.

异步上下文管理器需要实现两个接口

  • __aenter__处理进入上下文时的操作,如果有返回值,则可以使用as标定上下文中的变量名
  • __aexit__处理离开上下文时的操作,和__exit__的参数一样,它的参数必须是self,exc_type, exc, tb,分别代表对象自身对象,exception_type , exception_value , 和 traceback,如果正常退出,exc_type, exc, tb将会是 None.

__aenter____aexit__,它们必须返回一个Future-like对象

和普通的with语句一样,可以在单个async with语句里指定多个上下文管理器.

异步上下文管理器的一个示例:

import asyncio
class Ticker:
    """Yield numbers from 0 to `to` every `delay` seconds."""

    def __init__(self, delay, to):
        self.delay = delay
        self.i = 0
        self.to = to

    def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        if i >= self.to:
            raise StopAsyncIteration
        self.i += 1
        if i:
            await asyncio.sleep(self.delay)
        return i
class AsyncContextTicker:
    def __init__(self,delay, to):
        self.data = Ticker(delay, to)
        
    async def __aenter__(self):
        print('entering context')
        await asyncio.sleep(1)
        return self.data
        
    async def __aexit__(self, exc_type, exc, tb):
        await asyncio.sleep(1)
        print('exit context')
        

async def main():
    async with AsyncContextTicker(1,5) as ticker:
        async for i in ticker:
            print(i)
        
await main()
entering context
0
1
2
3
4
exit context

contextlib快速创建异步上下文管理器

就像同步接口中的上下文管理器with一样,在python3.7中contextlib.asynccontextmanager装饰器现在可以快速创建异步上下文管理器了.

其使用方式也一致,使用yield区分上下文的创建和回收两段.上面的例子可以修改为:

import contextlib

@contextlib.asynccontextmanager
async def AsyncContextTicker2(delay, to):
    data = Ticker(delay, to)
    print('entering context')
    await asyncio.sleep(1)
    try:
        yield data
    finally:
        await asyncio.sleep(1)
        print('exit context')
        

async def main():
    async with AsyncContextTicker2(1,5) as ticker:
        async for i in ticker:
            print(i)
        
await main()
entering context
0
1
2
3
4
exit context

异步生成器

yield关键字的函数是生成器,带yield关键字的协程就是异步生成器,从效果上看异步生成器效果和异步迭代器效果差不多,它需要实现协议:

  • PyAsyncGenASend : __anext__asend()接口 ,对应一般生成器中的__next__send(),用于在异步生成器间交互信息
  • PyAsyncGenAThrow : athrow() and aclose()接口,对应一般生成器的throw()close(),用于关闭异步生成器或者抛出错误
  • StopAsyncIteration用于标注结束
import asyncio
async def ticker(delay, to):
    """Yield numbers from 0 to *to* every *delay* seconds."""
    for i in range(0,to):
        yield i
        await asyncio.sleep(delay)
        
async def main():
    async for i in ticker(1,5):
        print(i)
        
await main()
0
1
2
3
4

关于yield from

因为异步步生成器本质上是异步迭代器的子类,我们可以利用这一点使用async for语句代替yield from的语义.

import asyncio
async def g1(x):
    for i in range(x):
        yield i

async def g2():
    async for v in g1(5):
        yield v
        
async def main():
    async for i in g2():
        print(i)
        

await main()
0
1
2
3
4

协程的状态

协程可以有4种状态,可以是用python的反射模块inspect.getcoroutinestate(coroutine)来查看

  • CORO_CREATED: 等待被使用
  • CORO_RUNNING: 目前执行中
  • CORO_SUSPENDED: 目前在await处暂停等待信号中
  • CORO_CLOSED: 执行结束

实用例子

协程有三种不同的代码编写风格:

  • 拉取式

    典型的异步生成器和异步迭代器使用场景

  • 推送式

    通过将数据推送给协程让协程一步一步的计算返回数据

  • 任务式

    根据状态来排定运行顺序

推送式

我们用一个计算移动平均值的异步生成器来看看协程是如何工作的.

async def averager():
    total = 0.0
    count = 0
    average = None
    while True: 
        term = yield average
        total += term
        count += 1
        average = total/count
        
async def grouper():
    aver = averager()
    await aver.__anext__()
    for i in range(11):
        j = await aver.asend(i)
        print(j)
await grouper()
0.0
0.5
1.0
1.5
2.0
2.5
3.0
3.5
4.0
4.5
5.0

任务式

一个简单的离散事件仿真类--出租车队运营仿真

import random
from collections import namedtuple
import asyncio
import argparse
import time
Event = namedtuple('Event',[ 'time', 'proc', 'action'])#定义事件
DEFAULT_NUMBER_OF_TAXIS = 3#出租车数量
DEFAULT_END_TIME = 180#运行时间默认180
SEARCH_DURATION = 5 #找乘客时间默认为5
TRIP_DURATION = 20 #载客时间默认为20
DEPARTURE_INTERVAL = 5#出库间隔默认5
async def taxi_process(ident, trips, start_time=0):
    """每次改变状态时创建事件,把控制权让给仿真器"""
    # 定义一个异步生成器,用于描述process
    time = yield Event(start_time, ident, 'leave garage') 
    for i in range(trips):
        time = yield Event(time, ident, 'pick up passenger')
        time = yield Event(time, ident, 'drop off passenger') 
    yield Event(time, ident, 'going home')
def compute_duration(previous_action):
    """Compute action duration using exponential distribution"""
    if previous_action in ['leave garage', 'drop off passenger']:
        # new state is prowling
        interval = SEARCH_DURATION
    elif previous_action == 'pick up passenger':
        # new state is trip
        interval = TRIP_DURATION
    elif previous_action == 'going home':
        interval = 1
    else:
        raise ValueError('Unknown previous_action: %s' % previous_action)
    return int(random.expovariate(1/interval)) + 1
class Simulator:
    def __init__(self, procs_map):
        self.events = asyncio.PriorityQueue()
        self.procs = dict(procs_map)

    async def run(self, end_time): 
        """排定并显示事件,直到时间结束"""
        for _, proc in sorted(self.procs.items()):
            first_event = await proc.__anext__()
            await self.events.put(first_event)
        sim_time = 0
        while sim_time < end_time:
            if self.events.empty():
                print('*** end of events ***')
                break
            
            current_event = await self.events.get()
            sim_time, proc_id, previous_action = current_event
            print('taxi:', proc_id, proc_id * ' ', current_event)
            active_proc = self.procs[proc_id]
            next_time = sim_time + compute_duration(previous_action)
            try:
                next_event = await active_proc.asend(next_time)
            except StopAsyncIteration:
                del self.procs[proc_id]
            else:
                await self.events.put(next_event)
        else:
            msg = f'*** end of simulation time: {self.events.qsize()} events pending ***'
            print(msg)
taxis = {i: taxi_process(i, (i + 1) * 2, i * DEPARTURE_INTERVAL) for i in range(DEFAULT_NUMBER_OF_TAXIS)}
sim = Simulator(taxis)
await sim.run(DEFAULT_END_TIME)
taxi: 0  Event(time=0, proc=0, action='leave garage')
taxi: 0  Event(time=1, proc=0, action='pick up passenger')
taxi: 1   Event(time=5, proc=1, action='leave garage')
taxi: 1   Event(time=9, proc=1, action='pick up passenger')
taxi: 2    Event(time=10, proc=2, action='leave garage')
taxi: 2    Event(time=13, proc=2, action='pick up passenger')
taxi: 2    Event(time=25, proc=2, action='drop off passenger')
taxi: 2    Event(time=30, proc=2, action='pick up passenger')
taxi: 2    Event(time=32, proc=2, action='drop off passenger')
taxi: 0  Event(time=34, proc=0, action='drop off passenger')
taxi: 2    Event(time=35, proc=2, action='pick up passenger')
taxi: 0  Event(time=45, proc=0, action='pick up passenger')
taxi: 0  Event(time=47, proc=0, action='drop off passenger')
taxi: 0  Event(time=48, proc=0, action='going home')
taxi: 2    Event(time=48, proc=2, action='drop off passenger')
taxi: 2    Event(time=54, proc=2, action='pick up passenger')
taxi: 1   Event(time=81, proc=1, action='drop off passenger')
taxi: 2    Event(time=83, proc=2, action='drop off passenger')
taxi: 2    Event(time=85, proc=2, action='pick up passenger')
taxi: 2    Event(time=96, proc=2, action='drop off passenger')
taxi: 1   Event(time=107, proc=1, action='pick up passenger')
taxi: 2    Event(time=107, proc=2, action='pick up passenger')
taxi: 2    Event(time=114, proc=2, action='drop off passenger')
taxi: 2    Event(time=130, proc=2, action='going home')
taxi: 1   Event(time=154, proc=1, action='drop off passenger')
taxi: 1   Event(time=155, proc=1, action='pick up passenger')
taxi: 1   Event(time=164, proc=1, action='drop off passenger')
taxi: 1   Event(time=174, proc=1, action='pick up passenger')
taxi: 1   Event(time=185, proc=1, action='drop off passenger')
*** end of simulation time: 1 events pending ***

期程

Task是Future的子类所以也可以说Task也是期程.

asyncio模块的将协程注册到时间需要先将其包装为期程,也就是Future或者Task.

Task类用来管理协同程序运行的状态,是Future的子类,Future的接口如下:

  • cancel()

取消期程对象并安排回调.如果期程对象已经完成或取消,返回False.否则,将期程对象的状态更改为取消,调度回调并返回True.

  • cancelled()

如果期程对象被取消,返回True

  • done()

如果期程对象完成,返回True.完成意味着结果/异常可用,或者期程对象被取消

  • result()

返回期程对象代表的结果.如果期程对象取消,则会引发CancelledError.如果期程对象的结果尚不可用,则会引发InvalidStateError.如果期程对象已经完成并且设置了异常,则会引发异常.

  • exception()

返回在期程对象设置的异常.异常(如果没有设置异常,则为None)仅在期程对象完成时才会返回.如果期程对象取消,则会引发CancelledError.如果期程对象尚未完成,则会引发InvalidStateError.

  • add_done_callback(fn)

添加一个回调,以便在期程对象完成时运行.使用单个参数(未来对象)调用回调.如果在调用此函数时已经完成了未来,则使用call_soon()调度回调.

通常需要结合functools.partial使用

fut.add_done_callback(functools.partial(print, "Future:", flush=True))

会在回调时执行

print("Future:", fut, flush=True)

  • remove_done_callback(fn)

从'完成调用'列表中删除回调的所有实例.返回删除的回调数.

  • set_result(result)

标记期程对象的状态为done并设定其结果.如果在调用此方法时期程对象已经完成,则引发InvalidStateError

  • set_exception(exception)

标记期程对象的状态为done并设定一个异常.如果在调用此方法时期程对象已经完成,则引发InvalidStateError

Task作为Future的子类,额外的方法有:

  • classmethod all_tasks(loop=None)

返回一组事件循环的所有任务对象.默认情况下,返回当前事件循环的所有任务.

  • classmethod current_task(loop=None)

返回事件循环正在执行的任务对象,默认为当前的事件循环.在任务的上下文中调用时返回None.

  • cancel()

请求此任务自行取消.这将安排一个CancelledError通过事件循环在下一个循环中被引入到包装的协同程序中,然后,协调程序有机会使用try / except / finally清理甚至拒绝该请求.与Future.cancel()不同,这不保证任务将被取消.

异常可能会被捕获并被执行,延迟取消任务或者完全阻止取消.该任务也可能返回值或引发不同的异常.在调用此方法之后,cancelled()将不会返回True(除非该任务已被取消).当包装的协同程序以CancelledError异常终止(即使未调用cancel()时,任务将被标记为已取消.

  • get_stack(*, limit=None)

返回此任务的协程的堆栈帧列表

  • print_stack(*, limit=None, file=None) 打印此任务的协程的堆栈或追溯.对于由get_stack()检索到的帧,它会产生与追溯模块类似的输出.limit参数传递给get_stack().文件参数是写入输出的I/O流;默认情况下,输出将写入sys.stderr.

创建期程

创建Task可以使用asyncio的高级api:

  • task = asyncio.create_task(coro,*,name=None)
  • task = asyncio.ensure_future(coro)

也可以使用事件循环的低级api:

  • task = loop.create_task(coro)

  • loop.set_task_factory(factory)设置一个由AbstractEventLoop.create_task()使用的工厂函数.

    • 如果工厂函数为None,则将设置默认任务工厂
    • 如果工厂函数是可调用的,它应该有一个函数签名匹配(loop,coro),其中循环将是对活动事件循环的引用,coro将是一个协程对象.工厂函数必须返回一个asyncio.Future兼容的对象。
  • loop.get_task_factory()

    获取任务工厂函数,如果默认工厂函数正在使用,则为None

创建期程则必须使用事件循环loop,接口为:

  • loop.create_future()

管理任务

asyncio提供了高级接口可以直接获取当前运行的事件循环中所有的任务asyncio.all_tasks()和当前在执行的任务asyncio.current_task()

import asyncio
async def sleep_task():
    for i in range(5):
        print(i)
        await asyncio.sleep(1)
    print("done")
    return True
for i in range(3):
    asyncio.ensure_future(sleep_task())
0
0
0
1
1
1
tasks = asyncio.all_tasks()
2
2
2
tasks
{<Task pending coro=<sleep_task() running at <ipython-input-1-d8abb918cdb8>:5> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x108b50a08>()]>>,
 <Task pending coro=<sleep_task() running at <ipython-input-1-d8abb918cdb8>:5> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x108b50a98>()]>>,
 <Task pending coro=<sleep_task() running at <ipython-input-1-d8abb918cdb8>:5> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x108b50d68>()]>>}



3
3
3
4
4
4
done
done
done
curtask = asyncio.current_task()

多任务并行

我们可以使用asyncio.gather(*aws, loop=None, return_exceptions=False)来并行的执行多个协程,如果aws中的某个可等待对象为协程,它将自动作为一个任务加入日程.

如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表.结果值的顺序与aws中可等待对象的顺序一致.

如果return_exceptionsFalse(默认),所引发的首个异常会立即传播给等待gather()的任务.aws序列中的其他可等待对象不会被取消,并将继续运行.

如果return_exceptions为 True,异常会和成功的结果一样处理,并聚合至结果列表.

如果gather()被取消,所有被提交(尚未完成)的可等待对象也会被取消.

如果aws序列中的任一Task或Future对象被取消,它将被当作引发了CancelledError一样处理--在此情况下gather()调用不会被取消.这是为了防止一个已提交的Task/Future被取消导致其他Tasks/Future也被取消.

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number+1):
        print("Task %s: Compute factorial(%s)..." % (name, i))
        await asyncio.sleep(1)
        f *= i
    print("Task %s: factorial(%s) = %s" % (name, number, f))

await asyncio.gather(
    factorial("A", 2),
    factorial("B", 3),
    factorial("C", 4)
)
Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24





[None, None, None]

等待任务和过期

asyncio提供了等待任务完成和超时过期的接口asyncio.as_completed(List[aws], *, loop=None, timeout=None)->Iterator[Future],这也是异步并行最常用的接口.返回的每个Future对象代表来自剩余可等待对象集合的最早结果.如果在所有Future对象完成前发生超时则将引发asyncio.TimeoutError

import asyncio
import datetime

async def factorial(name, number):
    f = 1
    for i in range(2, number+1):
        print("Task %s: Compute factorial(%s)..." % (name, i))
        await asyncio.sleep(1)
        f *= i
    print("Task %s: factorial(%s) = %s" % (name, number, f))
    return f
async def main():
    for f in asyncio.as_completed([factorial("A", 2),factorial("B", 3),factorial("C", 4)],timeout=3):
        result = await f


await main()
Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...



---------------------------------------------------------------------------

TimeoutError                              Traceback (most recent call last)

<ipython-input-7-d7fc4a2b23b0> in async-def-wrapper()


<ipython-input-7-d7fc4a2b23b0> in main()
     15 
     16 
---> 17 await main()


~/Lib/conda/anaconda3/lib/python3.7/asyncio/tasks.py in _wait_for_one()
    530         if f is None:
    531             # Dummy value from _on_timeout().
--> 532             raise futures.TimeoutError
    533         return f.result()  # May raise f.exception().
    534 


TimeoutError: 


Task C: factorial(4) = 24

如果只是希望等待一个任务完成或过期,可以使用asyncio.wait_for(awaitable, timeout).它会等待一段时间让协程执行,如果时间到了但还没有完成就会对任务执行取消操作,之后抛出asyncio.TimeoutError

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number+1):
        print("Task %s: Compute factorial(%s)..." % (name, i))
        await asyncio.sleep(1)
        f *= i
    print("Task %s: factorial(%s) = %s" % (name, number, f))

await asyncio.wait_for(asyncio.gather(*[
    factorial("A", 2),
    factorial("B", 3),
    factorial("C", 4)]),timeout=3
)
Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...



---------------------------------------------------------------------------

TimeoutError                              Traceback (most recent call last)

<ipython-input-10-e4416cd3fda0> in async-def-wrapper()
     15 )


~/Lib/conda/anaconda3/lib/python3.7/asyncio/tasks.py in wait_for(fut, timeout, loop)
    421             # See https://bugs.python.org/issue32751
    422             await _cancel_and_wait(fut, loop=loop)
--> 423             raise futures.TimeoutError()
    424     finally:
    425         timeout_handle.cancel()


TimeoutError: 

取消任务和防止任务被取消

一个正常的任务取消如下例:

import asyncio
async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Wait for 1 hour
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    # Create a "cancel_me" Task
    task = asyncio.create_task(cancel_me())

    # Wait for 1 second
    await asyncio.sleep(1)

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")
await main()
cancel_me(): before sleep
cancel_me(): cancel sleep
cancel_me(): after sleep
main(): cancel_me is cancelled now

在外部使用task.cancel()取消任务会在协程内部触发一个asyncio.CancelledError.我们也会有时候希望任务无法被取消,这时候可以使用asyncio.shield(aws),它只是将内部的任务包装了一下,取消这个包装不会取消内部的任务本身

async def main():
    # Create a "cancel_me" Task
    task = asyncio.create_task(cancel_me())
    task_shield = asyncio.shield(task)
    #task = asyncio.create_task(task1)
    
    # Wait for 1 second
    await asyncio.sleep(1)

    task_shield.cancel()
    try:
        await task_shield
    except asyncio.CancelledError:
        print("main(): cancel_me_shield is cancelled now")
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")
await main()
cancel_me(): before sleep
main(): cancel_me_shield is cancelled now
cancel_me(): after sleep

监控常驻协程

我们的协程往往是一些不会结束的常驻协程,这时候我们就需要监控他们防止异常退出.

通常我们使用task.add_done_callback(callback)接口来监控和自动处理.注意callback会有一个参数,这个参数就是这个task自身.

%%writefile src/asyncio_done_callback.py
import asyncio
import random


async def never_end():
    while True:
        await asyncio.sleep(1)
        if random.random() >0.8:
            raise AssertionError("意外退出")
async def main():
    task = asyncio.create_task(never_end())
    task.add_done_callback(lambda x: print(f"task done with exception {x.exception()}"))
    while True:
        await asyncio.sleep(1)

asyncio.run(main())
Overwriting src/asyncio_done_callback.py

事件循环与低级api

事件循环是一个无限的的循环,用来监控触发事件.一般我们用loop = asyncio.new_event_loop()来创建一个事件循环的实例,然后将其使用asyncio.set_event_loop(loop)来将循环实例定义为当前的事件循环.如果程序并不需要考虑使用多个循环的话我们也可以直接使用asyncio.get_event_loop()或者asyncio.get_running_loop()来获取当前事件循环的实例

事实上python原生的事件循环并不高效,uvloop是一个高效的事件循环,它使用cython编写,并使用libuv,就是node.js用的那个高性能事件驱动的程序库.我们在生产环境可以使用它来运行协程.(windows下无法使用)

python的协程运转需要显式的指定循环.asyncio则提供了如'中央处理设备'一般的功能,它支持如下操作:

  • 产生,设置和管理事件循环
  • 异步时间管理
  • 将回调函数注册到事件循环
  • 管理协程的执行,包括取消,延迟,调用等
  • 将耗时函数调用委托给一个线程池
  • 协程错误处理
  • 创建可用于多种类型的通信的服务端和客户端的Transports
  • 启动进程以及相关的和外部通信程序的Transports

后两个操作在网络部分再讨论,本篇只讨论前面的功能

产生,设置和管理事件循环

上面已经介绍了如何产生事件循环,以下是关于设置管理事件循环的接口,这些接口的实例为loop:

  • loop.run_forever()

    运行直到stop()被调用.如果在调用run_forever()之前调用stop(),则以超时为0轮询I/O选择器一次,运行所有响应I/O事件(以及已经安排的回调)的回调,然后退出

    如果在运行run_forever()时调用stop(),则会运行当前批次的回调,然后退出.请注意,在这种情况下,回调计划的回调将不会运行;他们会在下一次run_forever()被调用时运行.

  • loop.run_until_complete(future)

    跑到期程完成.如果参数是一个coroutine对象,那么它被wrap_future()包装起来成为一个期程.返回期程的结果或者抛出异常.

  • loop.is_running()

    返回时间循环的状态

  • loop.stop()

    停止事件循环

  • is_closed()

    如果事件循环被关闭,则返回True.

  • close()

    关闭事件循环.循环不能再次运行,待处理的回调将丢失.这将清除队列并关闭执行程序且不等待执行程序完成.这一过程不可逆转,要再次使用必须重新创建一个时间循环并设置为当前事件循环

  • coroutine shutdown_asyncgens()[3.6]

    安排所有当前打开的异步生成器对象,以aclose()调用.调用此方法后,事件循环将在每次迭代新的异步生成器时发出警告.应该用于可靠地完成所有调度的异步生成器.

异步时间管理

  • asyncio.sleep(nbr)

    这是一个异步的延迟工具,必须在协程中使用await调用

  • loop.time()

    根据事件循环的内部时钟,将当前时间作为浮点值返回,返回的是时间戳

from datetime import datetime
import time
from asyncio import sleep
async def now():
    print(datetime.now())
    await sleep(1)
    print(datetime.now())
    await sleep(1)
    print(asyncio.get_event_loop().time())
await now()
2019-04-29 19:26:56.070580
2019-04-29 19:26:57.076129
334.906957015

将回调函数注册到事件循环

它的工作机制类似于先进先出队列,所以如果一些回调需要一段时间来处理任务,其它的回调就会相应的延迟,直到先前的回调结束

回调函数处理的接口同样是loop,他们有:

  • loop.call_soon(callback, *args,context=None) 基本的回调注册,行为如前面介绍类似先进先出队列

  • loop.call_later(delay, callback, *args,context=None) 在一定延迟后执行回调

  • loop.call_at(when, callback, *args,context=None) 使用int或者float代表时间戳,在该时间执行回调函数

这三个接口都可以通过参数context设置执行时的上下文变量,如果不填则使用当前的上下文.这个我们在惯用法部分详细介绍

import asyncio

def hello_world():
    print('Hello World')
    
loop = asyncio.get_event_loop()
# Schedule a call to hello_world()
loop.call_soon(hello_world)
loop.call_soon(hello_world)
loop.call_soon(hello_world)
loop.call_soon(hello_world)
# Blocking call interrupted by loop.stop()
<Handle hello_world() at <ipython-input-1-8d9ff2a5ba59>:3>



Hello World
Hello World
Hello World
Hello World
  • call_soon_threadsafe(callback, *args)

call_soon(callback, *args)的线程安全版本

很多时候,我们的事件循环用于注册协程,而有的协程需要动态的添加到事件循环中.一个简单的方式就是使用多线程.当前线程创建一个事件循环,然后在新建一个线程,在新线程中启动事件循环.当前线程不会被block.

from threading import Thread
import time
now = lambda: time.time()
def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def more_work(x):
    print('More work {}'.format(x))
    time.sleep(x)
    print('Finished more work {}'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

new_loop.call_soon_threadsafe(more_work, 6)
new_loop.call_soon_threadsafe(more_work, 3)
TIME: 0.0015170574188232422
More work 6





<Handle more_work(3) at <ipython-input-2-c5cef5a5e15a>:8>



Finished more work 6
More work 3
Finished more work 3

启动上述代码之后,当前线程不会被block,新线程中会按照顺序执行call_soon_threadsafe方法注册的more_work方法,后者因为time.sleep操作是同步阻塞的,因此运行完毕more_work需要大致6 + 3

管理协程的执行,包括取消,延迟,调用等

事件循环实际上上面只能注册期程,而asyncio的很多接口可以直接使用协程,其原因是这些接口会自动将协程包装为Task.

  • loop.run_until_complete()是最简单的将协程注册进事件循环中并运行的方法.
import asyncio
import datetime

async def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)


# Blocking call which returns when the display_date() coroutine is done
await display_date(loop)
2019-04-29 19:45:35.128550
2019-04-29 19:45:36.129906
2019-04-29 19:45:37.133177
2019-04-29 19:45:38.135350
2019-04-29 19:45:39.135987
  • asyncio.run_coroutine_threadsafe(coro, loop) 线程安全的执行协程,可以看做是loop.run_until_complete()的线程安全版本.
import asyncio
import time
from threading import Thread
now = lambda: time.time()
def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def do_some_work(x):
    print('Waiting {}'.format(x))
    await asyncio.sleep(x)
    print('Done after {}s'.format(x))

def more_work(x):
    print('More work {}'.format(x))
    time.sleep(x)
    print('Finished more work {}'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)
TIME: 0.0022988319396972656





<Future at 0x11007e748 state=pending>



Waiting 6
Waiting 4
Done after 4s
Done after 6s

上述的例子,主线程中创建一个new_loop,然后在另外的子线程中开启一个无限事件循环.主线程通过run_coroutine_threadsafe新注册协程对象.这样就能在子线程中进行事件循环的并发操作,同时主线程又不会被block.一共执行的时间大概在6s左右.

  • ensure_futureasyncio封装好的创建Task的函数,它还支持一些参数,甚至指定loop.

将耗时函数调用委托给线程进程执行

协程毕竟是假并发,当遇到会阻塞loop的需求时我们还是需要将任务交给线程或进程处理.每个loop都会有一个concurrent.futures.ThreadPoolExecutor对象作为默认执行器,我们可以通过loop.set_default_executor(executor)对其进行修改.

所谓执行器executor是指concurrent.futures模块下的ThreadPoolExecutor或者ProcessPoolExecutor的实例,在目前python标准api几乎只支持同步方法的情况下,ThreadPoolExecutor可以作为临时方案使用解io密集型问题,而对于计算密集型任务,更加适合使用ProcessPoolExecutor.但亲测ProcessPoolExecutor受操作系统平台限制很多时候必须在入口中预先设置使用起来不方便,且对于不太重的计算密集型任务来说性能损失过大,因此建议如果有绕开GIL的C实现可以考虑依然使用ThreadPoolExecutor,现在python正在对GIL进行移除工作,相信再过个2年即便是计算密集型任务也将可以使用ThreadPoolExecutor来处理.

如果我们希望将任务丢给线程处理可以调用asyncio.to_thread(func, /, *args, **kwargs)->Awaitable接口.它会将任务以及当前的上下文变量环境发送到默认执行器执行,并返回一个可等待对象,当任务顺利完成时起返回值可以在可等待对象中得到.

如果我们希望使用默认执行器之外的执行器,比如需要使用进程池执行器,那么就需要使用更底层的apiloop.run_in_executor(executor, func, *args)来实现了

安排在指定的执行器中调用func.通常我们用functools.partial来处理要执行的函数将其通过闭包封装以做到无副作用.

import asyncio
import time
async def factorial(name, number):
    f = 1
    for i in range(2, number+1):
        print("Task %s: Compute factorial(%s)..." % (name, i))
        await asyncio.to_thread(time.sleep,1)
        f *= i
    print("Task %s: factorial(%s) = %s" % (name, number, f))

await asyncio.gather(*[
    factorial("A", 2),
    factorial("B", 3),
    factorial("C", 4)]
)
Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24





[None, None, None]
import asyncio
import requests
import concurrent.futures


def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    
    return requests.get("http://www.baidu.com").text

def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))

async def main():
    # loop = asyncio.get_running_loop()

    ## Options:

    # 1. Run in the default loop's executor:
    # result = await loop.run_in_executor(
    # None, blocking_io)
    result = await asyncio.to_thread(blocking_io)
    print('default thread pool', result)

    # 2. Run in a custom thread pool:
    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_io)
        print('custom thread pool', result)
await main()
default thread pool <!DOCTYPE html>
<!--STATUS OK--><html> <head><meta http-equiv=content-type content=text/html;charset=utf-8><meta http-equiv=X-UA-Compatible content=IE=Edge><meta content=always name=referrer><link rel=stylesheet type=text/css href=http://s1.bdstatic.com/r/www/cache/bdorz/baidu.min.css><title>�度���你就��</title></head> <body link=#0000cc> <div id=wrapper> <div id=head> <div class=head_wrapper> <div class=s_form> <div class=s_form_wrapper> <div id=lg> <img hidefocus=true src=//www.baidu.com/img/bd_logo1.png width=270 height=129> </div> <form id=form name=f action=//www.baidu.com/s class=fm> <input type=hidden name=bdorz_come value=1> <input type=hidden name=ie value=utf-8> <input type=hidden name=f value=8> <input type=hidden name=rsv_bp value=1> <input type=hidden name=rsv_idx value=1> <input type=hidden name=tn value=baidu><span class="bg s_ipt_wr"><input id=kw name=wd class=s_ipt value maxlength=255 autocomplete=off autofocus></span><span class="bg s_btn_wr"><input type=submit id=su value=�度�� class="bg s_btn"></span> </form> </div> </div> <div id=u1> <a href=http://news.baidu.com name=tj_trnews class=mnav>��</a> <a href=http://www.hao123.com name=tj_trhao123 class=mnav>hao123</a> <a href=http://map.baidu.com name=tj_trmap class=mnav>��</a> <a href=http://v.baidu.com name=tj_trvideo class=mnav>��</a> <a href=http://tieba.baidu.com name=tj_trtieba class=mnav>贴�</a> <noscript> <a href=http://www.baidu.com/bdorz/login.gif?login&amp;tpl=mn&amp;u=http%3A%2F%2Fwww.baidu.com%2f%3fbdorz_come%3d1 name=tj_login class=lb>��</a> </noscript> <script>document.write('<a href="http://www.baidu.com/bdorz/login.gif?login&tpl=mn&u='+ encodeURIComponent(window.location.href+ (window.location.search === "" ? "?" : "&")+ "bdorz_come=1")+ '" name="tj_login" class="lb">��</a>');</script> <a href=//www.baidu.com/more/ name=tj_briicon class=bri style="display: block;">��产�</a> </div> </div> </div> <div id=ftCon> <div id=ftConw> <p id=lh> <a href=http://home.baidu.com>���度</a> <a href=http://ir.baidu.com>About Baidu</a> </p> <p id=cp>&copy;2017&nbsp;Baidu&nbsp;<a href=http://www.baidu.com/duty/>使��度��读</a>&nbsp; <a href=http://jianyi.baidu.com/ class=cp-feedback>����</a>&nbsp;京ICP�030173�&nbsp; <img src=//www.baidu.com/img/gs.gif> </p> </div> </div> </div> </body> </html>

custom thread pool <!DOCTYPE html>
<!--STATUS OK--><html> <head><meta http-equiv=content-type content=text/html;charset=utf-8><meta http-equiv=X-UA-Compatible content=IE=Edge><meta content=always name=referrer><link rel=stylesheet type=text/css href=http://s1.bdstatic.com/r/www/cache/bdorz/baidu.min.css><title>�度���你就��</title></head> <body link=#0000cc> <div id=wrapper> <div id=head> <div class=head_wrapper> <div class=s_form> <div class=s_form_wrapper> <div id=lg> <img hidefocus=true src=//www.baidu.com/img/bd_logo1.png width=270 height=129> </div> <form id=form name=f action=//www.baidu.com/s class=fm> <input type=hidden name=bdorz_come value=1> <input type=hidden name=ie value=utf-8> <input type=hidden name=f value=8> <input type=hidden name=rsv_bp value=1> <input type=hidden name=rsv_idx value=1> <input type=hidden name=tn value=baidu><span class="bg s_ipt_wr"><input id=kw name=wd class=s_ipt value maxlength=255 autocomplete=off autofocus></span><span class="bg s_btn_wr"><input type=submit id=su value=�度�� class="bg s_btn"></span> </form> </div> </div> <div id=u1> <a href=http://news.baidu.com name=tj_trnews class=mnav>��</a> <a href=http://www.hao123.com name=tj_trhao123 class=mnav>hao123</a> <a href=http://map.baidu.com name=tj_trmap class=mnav>��</a> <a href=http://v.baidu.com name=tj_trvideo class=mnav>��</a> <a href=http://tieba.baidu.com name=tj_trtieba class=mnav>贴�</a> <noscript> <a href=http://www.baidu.com/bdorz/login.gif?login&amp;tpl=mn&amp;u=http%3A%2F%2Fwww.baidu.com%2f%3fbdorz_come%3d1 name=tj_login class=lb>��</a> </noscript> <script>document.write('<a href="http://www.baidu.com/bdorz/login.gif?login&tpl=mn&u='+ encodeURIComponent(window.location.href+ (window.location.search === "" ? "?" : "&")+ "bdorz_come=1")+ '" name="tj_login" class="lb">��</a>');</script> <a href=//www.baidu.com/more/ name=tj_briicon class=bri style="display: block;">��产�</a> </div> </div> </div> <div id=ftCon> <div id=ftConw> <p id=lh> <a href=http://home.baidu.com>���度</a> <a href=http://ir.baidu.com>About Baidu</a> </p> <p id=cp>&copy;2017&nbsp;Baidu&nbsp;<a href=http://www.baidu.com/duty/>使��度��读</a>&nbsp; <a href=http://jianyi.baidu.com/ class=cp-feedback>����</a>&nbsp;京ICP�030173�&nbsp; <img src=//www.baidu.com/img/gs.gif> </p> </div> </div> </div> </body> </html>

事件循环中的错误处理

我们可以设置事件循环中的错误处理回调函数,这样一旦事件循环出错就会执行这个回调函数.我们也可以手动使用call_exception_handler()接口执行设置的异常处理

  • set_exception_handler(handler)

将处理程序设置为新的事件循环异常处理程序.如果处理程序为None,则将设置默认的异常处理程序.如果处理程序是可调用对象,它应该具有匹配的签名(循环,上下文),其中循环将是对活动事件循环的引用,上下文将是一个dict对象(有关上下文的详细信息,请参阅call_exception_handler()文档)

  • get_exception_handler()

    返回异常处理程序,如果使用默认处理程序,则返回None.

  • default_exception_handler(context)

    默认异常处理程序.当异常发生时调用,并且没有设置异常处理程序,并且可以由想要推迟到默认行为的自定义异常处理程序调用.context参数与call_exception_handler()中的含义相同.

  • call_exception_handler(context)

    调用当前的事件循环异常处理程序.上下文是一个包含以下键的dict对象(新键可以稍后介绍):

    • ‘message’: Error message;
    • ‘exception’ (optional): Exception object;
    • ‘future’ (optional): asyncio.Future instance;
    • ‘handle’ (optional): asyncio.Handle instance;
    • ‘protocol’ (optional): Protocol instance;
    • ‘transport’ (optional): Transport instance;
    • ‘socket’ (optional): socket.socket instance.
%%writefile src/loop_exception.py
import asyncio


async def main():
    count = 0
    loop = asyncio.get_event_loop()
    while True:
        count += 1
        print("ok")
        await asyncio.sleep(1)
        if count == 3:
            context = {
                'message': '错误消息',
                'exception': AttributeError('错误消息')
            }
            loop.call_exception_handler(context)


def AttributeErrorHaddler(loop, context):
    loop.call_exception_handler(context)


loop = asyncio.get_event_loop()
loop.set_exception_handler(lambda loop, context: print(context))
loop.run_until_complete(main())
Overwriting src/loop_exception.py

使用Unix信号管理事件循环

loop.add_signal_handler(signum, callback, *args)可以为标准库signal中定义的unix信号设置回调函数,一旦捕捉到对应的信号,就会执行注册了的回调函数,当然了也可以使用loop.remove_signal_handler(sig)移除对信号的监控.signal模块定义的unix信号可以在本章结语部分找到.下面的例子我们监听由signal.alarm()发起的定时闹钟信号signal.SIGALRM,并在收到信号后执行回调函数.

%%writefile src/asyncio_signal_test.py

import signal
import asyncio
signal.alarm(4)
loop = asyncio.get_event_loop()

loop.add_signal_handler(signal.SIGALRM,lambda : print("闹铃响了"))
async def main():
    while True:
        await asyncio.sleep(1)

loop.run_until_complete(main())
Overwriting src/asyncio_signal_test.py

例子: 生产者消费者模型

以下是一个生产者消费者模式的例子

import asyncio
import random


async def produce(queue, n):
    for x in range(1, n + 1):
        # produce an item
        print('producing {}/{}'.format(x, n))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())
        item = str(x)
        # put the item in the queue
        await queue.put(item)

    # indicate the producer is done
    await queue.put(None)


async def consume(queue):
    while True:
        # wait for an item from the producer
        item = await queue.get()
        if item is None:
            # the producer emits None to indicate that it is done
            break

        # process the item
        print('consuming item {}...'.format(item))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())


queue = asyncio.Queue()
producer_coro = produce(queue, 10)
consumer_coro = consume(queue)
await asyncio.gather(producer_coro, consumer_coro)
producing 1/10
producing 2/10
consuming item 1...
producing 3/10
consuming item 2...
producing 4/10
consuming item 3...
producing 5/10
consuming item 4...
producing 6/10
producing 7/10
producing 8/10
producing 9/10
consuming item 5...
consuming item 6...
producing 10/10
consuming item 7...
consuming item 8...
consuming item 9...
consuming item 10...





[None, None]