源PPT和源码可以从这里下载:http://www.dabeaz.com/coroutines/
问题:1. 什么是协程2. 协程怎么用3. 要注意什么4. 用他们好么复制代码第一部分:生成器和协程的介绍生成器(Generator)的本质和特点生成器 是 可以生成一定序列的 函数。函数可以调用next()方法。
生成器的例子:- 例子1: follow.py可以使用生成器完成 tail -f 的功能,也就是跟踪输出的功能。
import timedef follow(thefile): thefile.seek(0,2) # Go to the end of the file while True: line = thefile.readline() if not line: time.sleep(0.1) # Sleep briefly continue yield line复制代码- 例子2: 生成器用作程序管道(类似unix pipe)
ps:unix pipe A pipeline is a sequence of processes chained together by their standard streams 标注:unix管道 一个uinx管道是由标准流链接在一起的一系列流程.复制代码pipeline.py
def grep(pattern,lines): for line in lines: if pattern in line: yield lineif __name__ == '__main__': from follow import follow # Set up a processing pipe : tail -f | grep python logfile = open("access-log") loglines = follow(logfile) pylines = grep("python",loglines) # Pull results out of the processing pipeline for line in pylines: print line,复制代码理解pipeline.py
在pipeline中,follow函数和grep函数相当于程序链,这样就能链式处理程序。
Yield作为表达【我们开始说协程了~】:grep.py
def grep(pattern): print "Looking for %s" % pattern print "give a value in the coroutines" while True: line = (yield) if pattern in line: print line# Example useif __name__ == '__main__': g = grep("python") g.next() g.send("Yeah, but no, but yeah, but no") g.send("A series of tubes") g.send("python generators rock!")复制代码yield最重要的问题在于yield的值是多少。
yield的值需要使用coroutine协程这个概念相对于仅仅生成值,函数可以动态处理传送进去的值,而最后值通过yield返回。
协程的执行:协程的执行和生成器的执行很相似。当你初始化一个协程,不会返回任何东西。协程只能响应run和send函数。协程的执行依赖run和send函数。
协程启动:所有的协程都需要调用.next( )函数。调用的next( )函数将要执行到第一个yield表达式的位置。在yield表达式的位置上,很容易去执行就可以。协程使用next()启动。
使用协程的修饰器:由【协程启动】中我们知道,启动一个协程需要记得调用next( )来开始协程,而这个启动器容易忘记使用。使用修饰器包一层,来让我们启动协程。【以后所有的协程器都会先有@coroutine
def coroutine(func): def start(*args, **kwargs): cr = func(*args, **kwargs) cr.next() return cr return start@coroutinedef grep(pattern): ...复制代码关闭一个协程:使用close()来关闭。
使用except捕获协程的关闭close():grepclose.py
@coroutinedef grep(pattern): print "Looking for %s" % pattern try: while True: line = (yield) if pattern in line: print line, except GeneratorExit: print "Going away. Goodbye"复制代码使用GeneratorExit这个异常类型
抛出一个异常:在一个协程中,可以抛出一个异常
g.throw(RuntimeError,"You're hosed")Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<stdin>", line 4, in grep RuntimeError: You're hosed复制代码异常起源于yield表达式可以用常规方法去抓取
一些小tips* 尽管有点相似,但是生成器和协程是*两个完全不同的概念*。* 生成器用来产生序列。* 协程用来处理序列。* 很容易产生一些误解。因为协程有的时候用来对进程里面的用来产生迭代对象的生成器作微调。复制代码生成器不能够同时生成值和接受值* 不能往generator里面send东西。* 协程和迭代器的概念没有关系* 虽然有一种用法,确实是在一个协程里面生成一些值,但是并不和迭代器有关系。复制代码第二部分:协程,管道,数据流进程管道:如下图所示,一连串进程串起来像管道一样。
协程可以用来作为进程管道。你仅仅需要把协程连接在一起,然后通过send()操作传递数据。整个进程管道由三部分组成:
第一部分,管道源/协程源:进程管道需要一个初始的源(一个生产者)。这个初始的源驱动整个管道。管道源不是协程。
第二部分,管道终止/协程终止:管道必须有个终止点。管道终止/协程终止是进程管道的终止点。
例子:以实现tail -f 功能为例子from coroutine import coroutine# A data source. This is not a coroutine, but it sends# data into one (target)import timedef follow(thefile, target): thefile.seek(0,2) # Go to the end of the file while True: line = thefile.readline() if not line: time.sleep(0.1) # Sleep briefly continue target.send(line)# A sink. A coroutine that receives data@coroutinedef printer(): while True: line = (yield) print line,# Example useif __name__ == '__main__': f = open("access-log") follow(f,printer())复制代码分析:第一个follow函数是协程源,第二个printer函数是协程终止。协程源不是一个协程,但是需要传入一个已经初始化完毕的协程。在协程源当中,调用send()。
第三部分,管道过滤器:叫过滤器其实并不贴切,应该叫中间人Intermediate:其两端都是send()函数。
(协程的中间层)典型的中间层如下:
@coroutine def filter(target): # 这个target是传递参数的对象 while True: item = (yield) # 这里用来接收上一个send()传入的value # Transform/filter item # processing items # Send it along to the next stage target.send(item) # 像target传递参数 复制代码分析可知,中间层需要接受上一个coroutine,也需要往下一个coroutine里面传递值。
一个管道过滤器的例子从文章中找出具有“python”关键字的句子打印。grep.py:
@coroutine def grep(pattern, target): # 这个target用来接收参数 while True: line = (yield) # 这里用来接收上一个send()传入的value # Transform/filter item # processing items if pattern in line: target.send(line) # Send it along to the next stage复制代码Hook it up with follow and printer:
f = open("access-log") follow(f, grep('python', printer())) 复制代码grep 从中间传入follow,然后printer传入grep。
协程和生成器的对比
不同处:生成器使用了迭代器拉取数据,协程使用send()压入数据。
变得多分支:(上一个协程发送数据去多个下一段协程)图示:
使用协程,你可以发送数据 给 多个 协程过滤器/协程终了。但是请注意,协程源只是用来传递数据的,过多的在协程源中传递数据是令人困惑并且复杂的。
一个例子
@coroutinedef broadcast(targets): while True: item = (yield) for target in targets: target.send(item)复制代码Hook it Up!
if __name__ == '__main__': f = open("access-log") follow(f, broadcast([grep('python',printer()), grep('ply',printer()), grep('swig',printer())]) )复制代码从文章中分别打印出含有’python‘ ’ply‘ ’swig‘ 关键字的句子。使用了一个协程队列向所有printer协程 送出 接收到的数据。图示:
或者这样Hook them up:
if __name__ == '__main__': f = open("access-log") p = printer() follow(f, broadcast([grep('python',p), grep('ply',p), grep('swig',p)]) )复制代码图示:
为什么我们用协程
- 协程相较于迭代器,存在更加强大的数据路由(就像上图的数据流向)的可能。
- 协程可以将一系列简单的数据处理组件,整合到管道,分支,合并等复杂的布置当中。
- 但有些限制…【后文会说】相对于对象的优势
- 从概念上简单一点:协程就是一个函数,对象要构建整个对象。
- 从代码执行角度上来说,协程相对要快一些。
第三部分:协程,事件分发事件处理协程可以用在写各种各样处理事件流的组件。
介绍一个例子【这个例子会贯穿这个第三部分始终】要求做一个实时的公交车GPS位置监控。编写程序的主要目的是处理一份文件。传统上,使用SAX进行处理。【SAX处理可以减少内存空间的使用,但SAX事件驱动的特性会让它笨重和低效】。复制代码把SAX和协程组合在一起我们可以使用协程分发SAX事件,比如:
import xml.saxclass EventHandler(xml.sax.ContentHandler): def __init__(self,target): self.target = target def startElement(self,name,attrs): self.target.send(('start',(name,attrs._attrs))) def characters(self,text): self.target.send(('text',text)) def endElement(self,name): self.target.send(('end',name))# example useif __name__ == '__main__': from coroutine import * @coroutine def printer(): while True: event = (yield) print event xml.sax.parse("allroutes.xml", EventHandler(printer()))复制代码解析:整个事件的处理如图所示
【最终的组合】比如,把xml改成json最后从中筛选的出固定信息.buses.py
@coroutinedef buses_to_dicts(target): while True: event, value = (yield) # Look for the start of a <bus> element if event == 'start' and value[0] == 'bus': busdict = {} fragments = [] # Capture text of inner elements in a dict while True: event, value = (yield) if event == 'start': fragments = [] elif event == 'text': fragments.append(value) elif event == 'end': if value != 'bus': busdict[value] = "".join(fragments) else: target.send(busdict) break复制代码协程的一个有趣的事情是,您可以将初始数据源推送到低级别的语言,而不需要重写所有处理阶段。比如,PPT 中69-73页介绍的,可以通过协程和低级别的语言进行联动,从而达成非常好的优化效果。如Expat模块或者cxmlparse模块。ps: ElementTree具有快速的递增xml句法分析
第四部分:从数据处理到并发编程复习一下上面学的特点:协程有以下特点。
- 协程和生成器非常像。
- 我们可以用协程,去组合各种简单的小组件。
- 我们可以使用创建进程管道,数据流图的方法去处理数据。
- 你可以使用伴有复杂数据处理代码的协程。
一个相似的主题:我们往协程内传送数据,向线程内传送数据,也向进程内传送数据。那么,协程自然很容易和线程和分布式系统联系起来。
基础的并发:我们可以通过添加一个额外的层,从而封装协程进入线程或者子进程。这描绘了几个基本的概念。
目标!协程+线程【没有蛀牙。下面看一个线程的例子。cothread.py
@coroutinedef threaded(target):# 第一部分: messages = Queue() def run_target(): while True: item = messages.get() if item is GeneratorExit: target.close() return else: target.send(item) Thread(target=run_target).start()# 第二部分: try: while True: item = (yield) messages.put(item) except GeneratorExit: messages.put(GeneratorExit)复制代码例子解析:第一部分:先新建一个队列。然后定义一个永久循环的线程;这个线程可以将其中的元素拉出消息队列,然后发送到目标里面。第二部分:接受上面送来的元素,并通过队列,将他们传送进线程里面。其中用到了GeneratorExit ,使得线程可以正确的关闭。
Hook up:cothread.py
if __name__ == '__main__': import xml.sax from cosax import EventHandler from buses import * xml.sax.parse("allroutes.xml", EventHandler( buses_to_dicts( threaded( filter_on_field("route", "22", filter_on_field("direction", "North Bound", bus_locations()))))))复制代码
但是:添加线程让这个例子慢了50%
目标!协程+子进程我们知道,进程之间是不共享系统资源的,所以要进行两个子进程之间的通信,我们需要通过一个文件桥接两个协程。
import cPickle as picklefrom coroutine import *@coroutinedef sendto(f): try: while True: item = (yield) pickle.dump(item, f) f.flush() except StopIteration: f.close()def recvfrom(f, target): try: while True: item = pickle.load(f) target.send(item) except EOFError: target.close()# Example useif __name__ == '__main__': import xml.sax from cosax import EventHandler from buses import * import subprocess p = subprocess.Popen(['python', 'busproc.py'], stdin=subprocess.PIPE) xml.sax.parse("allroutes.xml", EventHandler( buses_to_dicts( sendto(p.stdin))))复制代码程序通过sendto()和recvfrom()传递文件。
和环境结合的协程:使用协程,我们可以从一个任务的执行环境中剥离出他的实现。并且,协程就是那个实现。执行环境是你选择的线程,子进程,网络等。
需要注意的警告:
- 创建大量的协同程序,线程和进程可能是创建 不可维护 应用程序的一个好方法,并且会减慢你程序的速度。需要学习哪些是良好的使用协程的习惯。
- 在协程里send()方法需要被适当的同步。
- 如果你对已经正在执行了的协程使用send()方法,那么你的程序会发生崩溃。如:多个线程发送数据进入同一个协程。
- 同样的不能创造循环的协程:
- 堆栈发送正在构建一种调用堆栈(send()函数不返回,直到目标产生)。
- 如果调用一个正在发送进程的协程,将会抛出一个错误。
- send() 函数不会挂起任何一个协程的执行。
第五部分:任务一样的协程Task的概念在并发编程中,通常将问题细分为“任务”。“任务”有下面几个经典的特点:* 拥有独立的控制流。* 拥有内在的状态。* 可以被安排规划/挂起/恢复。* 可与其他的任务通信。协程也是任务的一种。
协程是任务的一种:- 下面的部分 来告诉你协程有他自己的控制流,这里 if 的控制就是控制流。
@coroutinedef grep(pattern): print "Looking for %s" % pattern print "give a value in the coroutines" while True: line = (yield) if pattern in line: print line复制代码- 协程是一个类似任何其他Python函数的语句序列。
- 协程有他们内在的自己的状态,比如一些变量:其中的pattern和line就算是自己的状态。
@coroutinedef grep(pattern): print "Looking for %s" % pattern print "give a value in the coroutines" while True: line = (yield) if pattern in line: print line复制代码- 本地的生存时间和协程的生存时间相同。
- 很多协程构建了一个可执行的环境。
- 协程可以互相通信,比如:yield就是用来接受传递的信息,而上一个协程的send( )就是用来向下一个协程。
@coroutinedef grep(pattern): print "Looking for %s" % pattern print "give a value in the coroutines" while True: line = (yield) if pattern in line: print line复制代码- 协程可以被挂起,重启,关闭。
- yield可以挂起执行进程。
- send() 用来 重启执行进程。
- close()用来终止/关闭进程。
总之,一个协程满足以上所有任务(task)的特点,所以协程非常像任务。但是协程不用与任何一个线程或者子进程绑定。
第六部分:操作系统的中断事件。(微嵌课程学的好的同学可以直接跳到这部分的“启示”✌️)操作系统的执行(复习微嵌知识)当计算机运行时,电脑没有同时运行好几条指令的打算。而无论是处理器,应用程序都不懂多任务处理。所以,操作系统需要去完成多任务的调度。操作系统通过在多个任务中快速切换来实现多任务。
需要解决的问题(还在复习微嵌知识)CPU执行的是应用程序,而不是你的操作系统,那没有被CPU执行的操作系统是怎么控制正在运行的应用程序中断的呢。
中断(interrupts)和陷阱(Traps)操作系统只能通过两个机制去获得对应用程序的控制:中断和陷阱。* 中断:和硬件有关的balabala。* 陷阱:一个软件发出的信号。在两种状况下,CPU都会挂起正在做的,然后执行OS的代码(这个时候,OS的代码成功插入了应用程序的执行),此时,OS来切换了程序。
中断的底层实现(略…码字员微嵌只有70分 |
|