python实现协程
认识协程
什么是迭代器和生成器
yield
协程生成器的基本行为
协程使用实例
预激(装饰器实现)
终止协程和异常处理
generator.close()和.throw(exc_type[, exc_value[, traceback]])使用实例
协程进阶
协程返回值.send(value)
yield from
认识协程
协程,又称微线程,Coroutine。
最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。
因为协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。
Python对协程的支持还非常有限,用在generator中的yield可以一定程度上实现协程。
例如:
传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。
如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。
什么是迭代器和生成器
迭代器是一种对象。
迭代器抽象的是一个「数据流」,是只允许迭代一次的对象。对迭代器不断调用 next() 方法,则可以依次获取下一个元素;当迭代器中没有元素时,调用 next() 方法会抛出 StopIteration 异常。迭代器的__iter__()方法返回迭代器自身;因此迭代器也是可迭代的。
生成器函数是一种特殊的函数;生成器则是特殊的迭代器
如果一个函数包含 yield 表达式,那么它是一个生成器函数;调用它会返回一个特殊的迭代器,称为生成器。
yield
参看:https://blog.csdn.net/LuciusJAY/article/details/94739400 中的yield部分
协程生成器的基本行为
python协程有四个状态,用inspect.getgeneratorstate(…)函数的返回值来确定协程状态:
1.GEN_CREATED # 等待开始执行
2.GEN_RUNNING # 解释器正在执行(只有在多线程应用中才能看到这个状态)
3.GEN_SUSPENDED # 在yield表达式处暂停
4.GEN_CLOSED # 执行结束
生成器接口包含有:
1、.next() 预激
2、.send(value)默认值为None 向生成器发送数据成为yield表达式的值
3、.throw(xxerror) 抛出异常,在生成器中处理
4、.close() 终止生成器
协程使用实例
参考来源:https://www.jianshu.com/p/004553ac771a
#! -*- coding: utf-8 -*-
import inspect
# 协程使用生成器函数定义:定义体中有yield关键字。
def simple_coroutine():
print('-> coroutine started')
# yield 在表达式中使用;如果协程只需要从客户那里接收数据,yield关键字右边不需要加表达式(yield默认返回None)
x = yield
print('-> coroutine received:', x)
my_coro = simple_coroutine()
my_coro # 和创建生成器的方式一样,调用函数得到生成器对象。
# 协程处于 GEN_CREATED (等待开始状态)
print(inspect.getgeneratorstate(my_coro))
my_coro.send(None)
# 首先要调用next()函数,因为生成器还没有启动,没有在yield语句处暂停,所以开始无法发送数据
# 发送 None 可以达到相同的效果 my_coro.send(None)
next(my_coro)
# 此时协程处于 GEN_SUSPENDED (在yield表达式处暂停)
print(inspect.getgeneratorstate(my_coro))
# 调用这个方法后,协程定义体中的yield表达式会计算出42;现在协程会恢复,一直运行到下一个yield表达式,或者终止。
my_coro.send(42)
print(inspect.getgeneratorstate(my_coro))
输出:
GEN_CREATED
-> coroutine started
GEN_SUSPENDED
-> coroutine received: 42
# 这里,控制权流动到协程定义体的尾部,导致生成器像往常一样抛出StopIteration异常
Traceback (most recent call last):
File "/Users/gs/coroutine.py", line 18, in <module>
my_coro.send(42)
StopIteration
send方法的参数会成为暂停yield表达式的值,所以,仅当协程处于暂停状态是才能调用send方法。
如果协程还未激活(GEN_CREATED 状态)要调用next(my_coro) 激活协程,也可以调用my_coro.send(None)
测试:
>>> my_coro = simple_coroutine()
>>> my_coro.send(123)
Traceback (most recent call last):
File "/Users/gs/coroutine.py", line 14, in <module>
my_coro.send(123)
TypeError: can't send non-None value to a just-started generator
因此需要最先调用next(my_coro) 这一步通常称为”预激“(prime)协程,即让协程向前执行到第一个yield表达式,准备好作为活跃的协程使用。
特别说明:
def square():
for yx in range(4):
yield y ** 2
square_gen = square()
for x in square_gen:
print(x)
对于for循环中没有显式的generator.next()仍可以使用yield改造得到的迭代器,是因为python处理for循环时候,首先会调用内建函数iter(something),他实际上会调用something.__iter()__, 返回something对应的迭代器,而后for循环会调用内建函数next(),作用在迭代器上,获取迭代器的下一个元素,并赋值给x,在完成以上操作之后,python才开始执行循环体。
预激(装饰器实现)
已知协程如果不预激,不能使用send()传入非None数据(None传入也无响应)。所以,调用my_coro.send(x)之前,一定要调用next(my_coro)。因此为了避免调用协程的迭代器而没有启动,可以通过装饰器实现自动预激活。
from functools import wraps
def coroutinue(func):
'''
装饰器: 向前执行到第一个`yield`表达式,预激`func`
:param func: func name
:return: primer
'''
@wraps(func)
def primer(*args, **kwargs):
# 把装饰器生成器函数替换成这里的primer函数;调用primer函数时,返回预激后的生成器。
gen = func(*args, **kwargs)
# 调用被被装饰函数,获取生成器对象
next(gen) # 预激生成器
return gen # 返回生成器
return primer
# 使用方法如下
@coroutinue
def simple_coro(a):
a = yield
simple_coro(12) # 已经预激
终止协程和异常处理
非常重要:
协程中,为处理的异常会向上冒泡,传递给next函数或send方法的调用方,未处理的异常会导致协程终止。
因此通过异常我们可以控制协程的进行和终止,如下
#! -*- coding: utf-8 -*-
from functools import wraps
def coroutinue(func):
'''
装饰器: 向前执行到第一个`yield`表达式,预激`func`
:param func: func name
:return: primer
'''
@wraps(func)
def primer(*args, **kwargs):
# 把装饰器生成器函数替换成这里的primer函数;调用primer函数时,返回预激后的生成器。
gen = func(*args, **kwargs)
# 调用被被装饰函数,获取生成器对象
next(gen) # 预激生成器
return gen # 返回生成器
return primer
@coroutinue
def averager():
# 使用协程求平均值
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total/count
coro_avg = averager()
print(coro_avg.send(40))
print(coro_avg.send(50))
print(coro_avg.send('123')) # 由于发送的不是数字,导致内部有异常抛出。
输出:
40.0
45.0
Traceback (most recent call last):
File "/Users/gs/coro_exception.py", line 37, in <module>
print(coro_avg.send('123'))
File "/Users/gs/coro_exception.py", line 30, in averager
total += term
TypeError: unsupported operand type(s) for +=: 'float' and 'str'
解释说明:
出错的原因是发送给协程的’123’值不能加到total变量上。
出错后,如果再次调用 coro_avg.send(x) 方法 会抛出 StopIteration 异常。
由上边的例子我们可以知道,如果想让协程退出,可以发送给它一个特定的值。比如None和Ellipsis。(推荐使用Ellipsis,因为我们不太使用这个值)
从Python2.5 开始,我们可以在生成器上调用两个方法,显式的把异常发给协程。
这两个方法是throw和close。
generator.throw(exc_type[, exc_value[, traceback]])
这个方法使生成器在暂停的yield表达式处抛出指定的异常。如果生成器处理了抛出的异常,代码会向前执行到下一个yield表达式,而产出的值会成为调用throw方法得到的返回值。如果没有处理,则向上冒泡,直接抛出。
generator.close()
1
生成器在暂停的yield表达式处抛出GeneratorExit异常。
如果生成器没有处理这个异常或者抛出了StopIteration异常,调用方不会报错。如果收到GeneratorExit异常,生成器一定不能产出值,否则解释器会抛出RuntimeError异常。
generator.close()和.throw(exc_type[, exc_value[, traceback]])使用实例
import inspect
class DemoException(Exception):
pass
@coroutinue
def exc_handling():
print('-> coroutine started')
while True:
try:
x = yield
except DemoException:
print('*** DemoException handled. Conginuing...')
else:
# 如果没有异常显示接收到的值
print('--> coroutine received: {!r}'.format(x))
raise RuntimeError('This line should never run.') # 这一行永远不会执行
exc_coro = exc_handling()
exc_coro.send(11)
exc_coro.send(12)
exc_coro.send(13)
exc_coro.close()
print(inspect.getgeneratorstate(exc_coro))
输出
-> coroutine started
--> coroutine received: 11
--> coroutine received: 12
--> coroutine received: 13
GEN_CLOSED # 协程终止
raise RuntimeError('This line should never run.') 永远不会执行,
因为只有未处理的异常才会终止循环,而一旦出现未处理的异常,协程会立即终止。
在使用generator.throw时候如果异常被处理,如下面的代码段中异常被传入DemoException,协程不会中止,直到遇到未处理的异常或是close(也是异常)才会终止。
exc_coro = exc_handling()
exc_coro.send(11)
exc_coro.send(12)
exc_coro.send(13)
exc_coro.throw(DemoException) # 协程不会中止,但是如果传入的是未处理的异常,协程会终止
print(inspect.getgeneratorstate(exc_coro))
exc_coro.close()
print(inspect.getgeneratorstate(exc_coro))
## output
-> coroutine started
--> coroutine received: 11
--> coroutine received: 12
--> coroutine received: 13
*** DemoException handled. Conginuing...
GEN_SUSPENDED
GEN_CLOSED
封装各类情况
@coroutinue
def exc_handling():
print('-> coroutine started')
try:
while True:
try:
x = yield
except DemoException:
print('*** DemoException handled. Conginuing...')
else:
# 如果没有异常显示接收到的值
print('--> coroutine received: {!r}'.format(x))
finally:
print('-> coroutine ending')
协程进阶
文章来源:https://www.jianshu.com/p/bb5604dafce2
协程返回值.send(value)
实例:返回最终均值的结果,每次激活协程时不会产出移动平均值,而是最后一次返回。
#! -*- coding: utf-8 -*-
from collections import namedtuple
Result = namedtuple('Result', 'count average')
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break # 为了返回值,协程必须正常终止;这里是退出条件
total += term
count += 1
average = total/count
# 返回一个namedtuple,包含count和average两个字段。在python3.3前,如果生成器返回值,会报错
return Result(count, average)
输出
>>> coro_avg = averager()
>>> next(coro_avg)
>>> coro_avg.send(20) # 并没有返回值
>>> coro_avg.send(30)
>>> coro_avg.send(40)
>>> coro_avg.send(None) # 发送None终止循环,导致协程结束。生成器对象会抛出StopIteration异常。异常对象的value属性保存着返回值。
Traceback (most recent call last):
...
StopIteration: Result(count=3, average=30)
return 表达式的值会传给调用方,赋值给StopIteration 异常的一个属性。
这样做虽然看着别扭,但为了保留生成器对象耗尽时抛出StopIteration异常的行为,也可以理解。
1
2
如果想要无Traceback的情况获得返回值可以如下操作:
>>> coro_avg = averager()
>>> next(coro_avg)
>>> coro_avg.send(20) # 并没有返回值
>>> coro_avg.send(30)
>>> coro_avg.send(40)
>>> try:
... coro_avg.send(None)
... except StopIteration as exc:
... result = exc.value
...
>>> result
Result(count=3, average=30)
可这样就显得太过奇怪,于是yield from应运而生。
yield from
yield from 结果会在内部自动捕获StopIteration 异常。这种处理方式与 for 循环处理StopIteration异常的方式一样。
对于yield from 结构来说,解释器不仅会捕获StopIteration异常,还会把value属性的值变成yield from 表达式的值,yield和yield from不能再函数体外部使用。
yield from 是 Python3.3 后新加的语言结构。和其他语言的await关键字类似,它表示:*在生成器 gen 中使用 yield from subgen()时,subgen 会获得控制权,把产出的值传个gen的调用方,即调用方可以直接控制subgen。于此同时,gen会阻塞,等待subgen终止。
如下:
def gen():
yield from 'AB'
yield from range(1, 3) # 此处range()可以理解为subgen()
list(gen())
['A', 'B', '1', '2']
上述代码等同于:
def gen():
for c in 'AB':
yield c
for i in range(1, 3):
yield i
list(gen())
['A', 'B', '1', '2']
下面的例子来源网络:https://www.jianshu.com/p/bb5604dafce2 未本地测试
# Example of flattening a nested sequence using subgenerators
from collections import Iterable
def flatten(items, ignore_types=(str, bytes)):
for x in items:
if isinstance(x, Iterable) and not isinstance(x, ignore_types):
yield from flatten(x) # 这里递归调用,如果x是可迭代对象,继续分解
else:
yield x
items = [1, 2, [3, 4, [5, 6], 7], 8]
# Produces 1 2 3 4 5 6 7 8
for x in flatten(items):
print(x)
items = ['Dave', 'Paula', ['Thomas', 'Lewis']]
for x in flatten(items):
print(x)
yield from基本概念:
委派生成器
包含yield from <iterable>表达式的生成器函数
子生成器
从yield from <iterable>部分获取的生成器。
调用方
调用委派生成器的客户端(调用方)代码
yield from应用实例:
#! -*- coding: utf-8 -*-
from collections import namedtuple
Result = namedtuple('Result', 'count average')
# 子生成器
# 这个例子和上边示例中的 averager 协程一样,只不过这里是作为子生成器使用
def averager():
total = 0.0
count = 0
average = None
while True:
# main 函数发送数据到这里
term = yield
if term is None: # 终止条件
break
total += term
count += 1
average = total/count
return Result(count, average) # 返回的Result 会成为grouper函数中yield from表达式的值
# 委派生成器
def grouper(results, key):
# 这个循环每次都会新建一个averager 实例,每个实例都是作为协程使用的生成器对象
while True:
# grouper 发送的每个值都会经由yield from 处理,通过管道传给averager 实例。
# grouper会在yield from表达式处暂停,等待averager实例处理客户端发来的值。
# averager实例运行完毕后,返回的值绑定到results[key] 上。
# while 循环会不断创建averager实例,处理更多的值。
results[key] = yield from averager()
# 调用方
def main(data):
results = {}
for key, values in data.items():
# group 是调用grouper函数得到的生成器对象,传给grouper 函数的第一个参数是results,用于收集结果;第二个是某个键
group = grouper(results, key)
next(group)
for value in values:
# 把各个value传给grouper 传入的值最终到达averager函数中;
# grouper并不知道传入的是什么,同时grouper实例在yield from处暂停
group.send(value)
# 把None传入groupper,传入的值最终到达averager函数中,导致当前实例终止。然后继续创建下一个实例。
# 如果没有group.send(None),那么averager子生成器永远不会终止,委派生成器也永远不会在此激活,也就不会为result[key]赋值
group.send(None)
report(results)
# 输出报告
def report(results):
for key, result in sorted(results.items()):
group, unit = key.split(';')
print('{:2} {:5} averaging {:.2f}{}'.format(result.count, group, result.average, unit))
data = {
'girls;kg':[40, 41, 42, 43, 44, 54],
'girls;m': [1.5, 1.6, 1.8, 1.5, 1.45, 1.6],
'boys;kg':[50, 51, 62, 53, 54, 54],
'boys;m': [1.6, 1.8, 1.8, 1.7, 1.55, 1.6],
}
if __name__ == '__main__':
main(data)
输出结果如下:
6 boys averaging 54.00kg
6 boys averaging 1.68m
6 girls averaging 44.00kg
6 girls averaging 1.58m
这段代码展示的是yield from 结构最简单的用法。委派生成器相当于管道,所以可以把任意数量的委派生成器连接在一起—一个委派生成器使用yield from 调用一个子生成器,而那个子生成器本身也是委派生成器,使用yield from调用另一个生成器。最终以一个只是用yield表达式的生成器(或者任意可迭代对象)结束。
|
|