黑马程序员技术交流社区

标题: 【西安校区】python协程简单实现 [打印本页]

作者: 逆风TO    时间: 2019-8-28 10:48
标题: 【西安校区】python协程简单实现
python实现协程
认识协程
什么是迭代器和生成器
yield
协程生成器的基本行为
协程使用实例
预激(装饰器实现)
终止协程和异常处理
generator.close()和.throw(exc_type[, exc_value[, traceback]])使用实例
协程进阶
协程返回值.send(value)
yield from
认识协程
协程,又称微线程,Coroutine。
最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。
因为协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。
Python对协程的支持还非常有限,用在generator中的yield可以一定程度上实现协程。
例如:
传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。
如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。

什么是迭代器和生成器
迭代器是一种对象。
迭代器抽象的是一个「数据流」,是只允许迭代一次的对象。对迭代器不断调用 next() 方法,则可以依次获取下一个元素;当迭代器中没有元素时,调用 next() 方法会抛出 StopIteration 异常。迭代器的__iter__()方法返回迭代器自身;因此迭代器也是可迭代的。
生成器函数是一种特殊的函数;生成器则是特殊的迭代器
如果一个函数包含 yield 表达式,那么它是一个生成器函数;调用它会返回一个特殊的迭代器,称为生成器。

yield
参看:https://blog.csdn.net/LuciusJAY/article/details/94739400 中的yield部分

协程生成器的基本行为
python协程有四个状态,用inspect.getgeneratorstate(…)函数的返回值来确定协程状态:
1.GEN_CREATED # 等待开始执行
2.GEN_RUNNING # 解释器正在执行(只有在多线程应用中才能看到这个状态)
3.GEN_SUSPENDED # 在yield表达式处暂停
4.GEN_CLOSED # 执行结束
生成器接口包含有:
1、.next() 预激
2、.send(value)默认值为None 向生成器发送数据成为yield表达式的值
3、.throw(xxerror) 抛出异常,在生成器中处理
4、.close() 终止生成器

协程使用实例
参考来源:https://www.jianshu.com/p/004553ac771a

#! -*- coding: utf-8 -*-
import inspect

# 协程使用生成器函数定义:定义体中有yield关键字。
def simple_coroutine():
    print('-> coroutine started')
    # yield 在表达式中使用;如果协程只需要从客户那里接收数据,yield关键字右边不需要加表达式(yield默认返回None)
    x = yield
    print('-> coroutine received:', x)


my_coro = simple_coroutine()
my_coro # 和创建生成器的方式一样,调用函数得到生成器对象。
# 协程处于 GEN_CREATED (等待开始状态)
print(inspect.getgeneratorstate(my_coro))

my_coro.send(None)
# 首先要调用next()函数,因为生成器还没有启动,没有在yield语句处暂停,所以开始无法发送数据
# 发送 None 可以达到相同的效果 my_coro.send(None)
next(my_coro)
# 此时协程处于 GEN_SUSPENDED (在yield表达式处暂停)
print(inspect.getgeneratorstate(my_coro))

# 调用这个方法后,协程定义体中的yield表达式会计算出42;现在协程会恢复,一直运行到下一个yield表达式,或者终止。
my_coro.send(42)
print(inspect.getgeneratorstate(my_coro))
输出:

GEN_CREATED
-> coroutine started
GEN_SUSPENDED
-> coroutine received: 42

# 这里,控制权流动到协程定义体的尾部,导致生成器像往常一样抛出StopIteration异常
Traceback (most recent call last):
  File "/Users/gs/coroutine.py", line 18, in <module>
    my_coro.send(42)
StopIteration
send方法的参数会成为暂停yield表达式的值,所以,仅当协程处于暂停状态是才能调用send方法。
如果协程还未激活(GEN_CREATED 状态)要调用next(my_coro) 激活协程,也可以调用my_coro.send(None)

测试:

>>> my_coro = simple_coroutine()
>>> my_coro.send(123)

Traceback (most recent call last):
  File "/Users/gs/coroutine.py", line 14, in <module>
    my_coro.send(123)
TypeError: can't send non-None value to a just-started generator

因此需要最先调用next(my_coro) 这一步通常称为”预激“(prime)协程,即让协程向前执行到第一个yield表达式,准备好作为活跃的协程使用。
特别说明:

def square():
    for yx in range(4):
        yield y ** 2
square_gen = square()
for x in square_gen:
    print(x)

对于for循环中没有显式的generator.next()仍可以使用yield改造得到的迭代器,是因为python处理for循环时候,首先会调用内建函数iter(something),他实际上会调用something.__iter()__, 返回something对应的迭代器,而后for循环会调用内建函数next(),作用在迭代器上,获取迭代器的下一个元素,并赋值给x,在完成以上操作之后,python才开始执行循环体。

预激(装饰器实现)
已知协程如果不预激,不能使用send()传入非None数据(None传入也无响应)。所以,调用my_coro.send(x)之前,一定要调用next(my_coro)。因此为了避免调用协程的迭代器而没有启动,可以通过装饰器实现自动预激活。

from functools import wraps

def coroutinue(func):
    '''
    装饰器: 向前执行到第一个`yield`表达式,预激`func`
    :param func: func name
    :return: primer
    '''

    @wraps(func)
    def primer(*args, **kwargs):
        # 把装饰器生成器函数替换成这里的primer函数;调用primer函数时,返回预激后的生成器。
        gen = func(*args, **kwargs)
        # 调用被被装饰函数,获取生成器对象
        next(gen)  # 预激生成器
        return gen  # 返回生成器
    return primer


# 使用方法如下
@coroutinue
def simple_coro(a):
    a = yield

simple_coro(12)  # 已经预激

终止协程和异常处理
非常重要:
协程中,为处理的异常会向上冒泡,传递给next函数或send方法的调用方,未处理的异常会导致协程终止。
因此通过异常我们可以控制协程的进行和终止,如下

#! -*- coding: utf-8 -*-

from functools import wraps

def coroutinue(func):
    '''
    装饰器: 向前执行到第一个`yield`表达式,预激`func`
    :param func: func name
    :return: primer
    '''

    @wraps(func)
    def primer(*args, **kwargs):
        # 把装饰器生成器函数替换成这里的primer函数;调用primer函数时,返回预激后的生成器。
        gen = func(*args, **kwargs)
        # 调用被被装饰函数,获取生成器对象
        next(gen)  # 预激生成器
        return gen  # 返回生成器
    return primer


@coroutinue
def averager():
    # 使用协程求平均值
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield average
        total += term
        count += 1
        average = total/count

coro_avg = averager()
print(coro_avg.send(40))
print(coro_avg.send(50))
print(coro_avg.send('123')) # 由于发送的不是数字,导致内部有异常抛出。
输出:

40.0
45.0
Traceback (most recent call last):
  File "/Users/gs/coro_exception.py", line 37, in <module>
    print(coro_avg.send('123'))
  File "/Users/gs/coro_exception.py", line 30, in averager
    total += term
TypeError: unsupported operand type(s) for +=: 'float' and 'str'

解释说明:
出错的原因是发送给协程的’123’值不能加到total变量上。
出错后,如果再次调用 coro_avg.send(x) 方法 会抛出 StopIteration 异常。
由上边的例子我们可以知道,如果想让协程退出,可以发送给它一个特定的值。比如None和Ellipsis。(推荐使用Ellipsis,因为我们不太使用这个值)
从Python2.5 开始,我们可以在生成器上调用两个方法,显式的把异常发给协程。
这两个方法是throw和close。

generator.throw(exc_type[, exc_value[, traceback]])

这个方法使生成器在暂停的yield表达式处抛出指定的异常。如果生成器处理了抛出的异常,代码会向前执行到下一个yield表达式,而产出的值会成为调用throw方法得到的返回值。如果没有处理,则向上冒泡,直接抛出。

generator.close()
1
生成器在暂停的yield表达式处抛出GeneratorExit异常。
如果生成器没有处理这个异常或者抛出了StopIteration异常,调用方不会报错。如果收到GeneratorExit异常,生成器一定不能产出值,否则解释器会抛出RuntimeError异常。

generator.close()和.throw(exc_type[, exc_value[, traceback]])使用实例
import inspect


class DemoException(Exception):
    pass


@coroutinue
def exc_handling():
    print('-> coroutine started')
    while True:
        try:
            x = yield
        except DemoException:
            print('*** DemoException handled. Conginuing...')
        else:
            # 如果没有异常显示接收到的值
            print('--> coroutine received: {!r}'.format(x))
    raise RuntimeError('This line should never run.')  # 这一行永远不会执行


exc_coro = exc_handling()

exc_coro.send(11)
exc_coro.send(12)
exc_coro.send(13)
exc_coro.close()
print(inspect.getgeneratorstate(exc_coro))
输出

-> coroutine started
--> coroutine received: 11
--> coroutine received: 12
--> coroutine received: 13
GEN_CLOSED    # 协程终止

raise RuntimeError('This line should never run.') 永远不会执行,
因为只有未处理的异常才会终止循环,而一旦出现未处理的异常,协程会立即终止。
在使用generator.throw时候如果异常被处理,如下面的代码段中异常被传入DemoException,协程不会中止,直到遇到未处理的异常或是close(也是异常)才会终止。

exc_coro = exc_handling()

exc_coro.send(11)
exc_coro.send(12)
exc_coro.send(13)
exc_coro.throw(DemoException) # 协程不会中止,但是如果传入的是未处理的异常,协程会终止
print(inspect.getgeneratorstate(exc_coro))
exc_coro.close()
print(inspect.getgeneratorstate(exc_coro))

## output

-> coroutine started
--> coroutine received: 11
--> coroutine received: 12
--> coroutine received: 13
*** DemoException handled. Conginuing...
GEN_SUSPENDED
GEN_CLOSED

封装各类情况

@coroutinue
def exc_handling():
    print('-> coroutine started')
    try:
        while True:
            try:
                x = yield
            except DemoException:
                print('*** DemoException handled. Conginuing...')
            else:
                # 如果没有异常显示接收到的值
                print('--> coroutine received: {!r}'.format(x))
    finally:
        print('-> coroutine ending')

协程进阶
文章来源:https://www.jianshu.com/p/bb5604dafce2

协程返回值.send(value)
实例:返回最终均值的结果,每次激活协程时不会产出移动平均值,而是最后一次返回。

#! -*- coding: utf-8 -*-

from collections import namedtuple

Result = namedtuple('Result', 'count average')

def averager():
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield
        if term is None:
            break  # 为了返回值,协程必须正常终止;这里是退出条件
        total += term
        count += 1
        average = total/count
    # 返回一个namedtuple,包含count和average两个字段。在python3.3前,如果生成器返回值,会报错
    return Result(count, average)

输出

>>> coro_avg = averager()
>>> next(coro_avg)
>>> coro_avg.send(20) # 并没有返回值
>>> coro_avg.send(30)
>>> coro_avg.send(40)
>>> coro_avg.send(None) # 发送None终止循环,导致协程结束。生成器对象会抛出StopIteration异常。异常对象的value属性保存着返回值。
Traceback (most recent call last):
   ...
StopIteration: Result(count=3, average=30)

return 表达式的值会传给调用方,赋值给StopIteration 异常的一个属性。
这样做虽然看着别扭,但为了保留生成器对象耗尽时抛出StopIteration异常的行为,也可以理解。
1
2
如果想要无Traceback的情况获得返回值可以如下操作:

>>> coro_avg = averager()
>>> next(coro_avg)
>>> coro_avg.send(20) # 并没有返回值
>>> coro_avg.send(30)
>>> coro_avg.send(40)
>>> try:
...     coro_avg.send(None)
... except StopIteration as exc:
...     result = exc.value
...
>>> result
Result(count=3, average=30)

可这样就显得太过奇怪,于是yield from应运而生。

yield from
yield from 结果会在内部自动捕获StopIteration 异常。这种处理方式与 for 循环处理StopIteration异常的方式一样。
对于yield from 结构来说,解释器不仅会捕获StopIteration异常,还会把value属性的值变成yield from 表达式的值,yield和yield from不能再函数体外部使用。
yield from 是 Python3.3 后新加的语言结构。和其他语言的await关键字类似,它表示:*在生成器 gen 中使用 yield from subgen()时,subgen 会获得控制权,把产出的值传个gen的调用方,即调用方可以直接控制subgen。于此同时,gen会阻塞,等待subgen终止。
如下:

def gen():
    yield from 'AB'
    yield from range(1, 3) #  此处range()可以理解为subgen()

list(gen())
['A', 'B', '1', '2']

上述代码等同于:

def gen():
    for c in 'AB':
        yield c
    for i in range(1, 3):
        yield i

list(gen())
['A', 'B', '1', '2']

下面的例子来源网络:https://www.jianshu.com/p/bb5604dafce2 未本地测试

# Example of flattening a nested sequence using subgenerators

from collections import Iterable

def flatten(items, ignore_types=(str, bytes)):
    for x in items:
        if isinstance(x, Iterable) and not isinstance(x, ignore_types):
            yield from flatten(x) # 这里递归调用,如果x是可迭代对象,继续分解
        else:
            yield x

items = [1, 2, [3, 4, [5, 6], 7], 8]

# Produces 1 2 3 4 5 6 7 8
for x in flatten(items):
    print(x)

items = ['Dave', 'Paula', ['Thomas', 'Lewis']]
for x in flatten(items):
    print(x)
yield from基本概念:

委派生成器
包含yield from <iterable>表达式的生成器函数
子生成器
从yield from <iterable>部分获取的生成器。
调用方
调用委派生成器的客户端(调用方)代码
yield from应用实例:
#! -*- coding: utf-8 -*-

from collections import namedtuple


Result = namedtuple('Result', 'count average')


# 子生成器
# 这个例子和上边示例中的 averager 协程一样,只不过这里是作为子生成器使用
def averager():
    total = 0.0
    count = 0
    average = None
    while True:
        # main 函数发送数据到这里
        term = yield
        if term is None: # 终止条件
            break
        total += term
        count += 1
        average = total/count
            return Result(count, average) # 返回的Result 会成为grouper函数中yield from表达式的值


# 委派生成器
def grouper(results, key):
     # 这个循环每次都会新建一个averager 实例,每个实例都是作为协程使用的生成器对象
    while True:
        # grouper 发送的每个值都会经由yield from 处理,通过管道传给averager 实例。
        # grouper会在yield from表达式处暂停,等待averager实例处理客户端发来的值。
        # averager实例运行完毕后,返回的值绑定到results[key] 上。
        # while 循环会不断创建averager实例,处理更多的值。
        results[key] = yield from averager()


# 调用方
def main(data):
    results = {}
    for key, values in data.items():
        # group 是调用grouper函数得到的生成器对象,传给grouper 函数的第一个参数是results,用于收集结果;第二个是某个键
        group = grouper(results, key)
        next(group)
        for value in values:
            # 把各个value传给grouper 传入的值最终到达averager函数中;
            # grouper并不知道传入的是什么,同时grouper实例在yield from处暂停
            group.send(value)
        # 把None传入groupper,传入的值最终到达averager函数中,导致当前实例终止。然后继续创建下一个实例。
        # 如果没有group.send(None),那么averager子生成器永远不会终止,委派生成器也永远不会在此激活,也就不会为result[key]赋值
        group.send(None)
    report(results)


# 输出报告
def report(results):
    for key, result in sorted(results.items()):
        group, unit = key.split(';')
        print('{:2} {:5} averaging {:.2f}{}'.format(result.count, group, result.average, unit))


data = {
    'girls;kg':[40, 41, 42, 43, 44, 54],
    'girls;m': [1.5, 1.6, 1.8, 1.5, 1.45, 1.6],
    'boys;kg':[50, 51, 62, 53, 54, 54],
    'boys;m': [1.6, 1.8, 1.8, 1.7, 1.55, 1.6],
}

if __name__ == '__main__':
    main(data)
输出结果如下:

6 boys  averaging 54.00kg
6 boys  averaging 1.68m
6 girls averaging 44.00kg
6 girls averaging 1.58m

这段代码展示的是yield from 结构最简单的用法。委派生成器相当于管道,所以可以把任意数量的委派生成器连接在一起—一个委派生成器使用yield from 调用一个子生成器,而那个子生成器本身也是委派生成器,使用yield from调用另一个生成器。最终以一个只是用yield表达式的生成器(或者任意可迭代对象)结束。







欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) 黑马程序员IT技术论坛 X3.2