Generator

generator
A function which returns a generator iterator. It looks like a normal function except that it contains yield expressions for producing a series of values usable in a for-loop or that can be retrieved one at a time with the next() function.

Usually refers to a generator function, but may refer to a generator iterator in some contexts. In cases where the intended meaning isn’t clear, using the full terms avoids ambiguity.

docs.python.org

生成器是一个长得像函数的Iterator,即fn() 返回一个Iterator

这个函数里面需要用yield来吐东西,吐的东西可以通过next()取出来,或者使用for-loop取出来,停止方式就是丢一个raise StopIteration,也就是return

用一个斐波那契数列为例,生成数列直到limit。用一个list来实现:

1
2
3
4
5
6
7
8
9
10
limit = 100
fib = []
a, b = 0, 1
while True:
fib.append(b)
a, b = b, a + b
if b > limit:
break
for f in fib:
print(f)

生成器方式:

1
2
3
4
5
6
7
8
def fibonacci_list(limit: int = 100):
a, b = 0, 1
while True:
yield b
a, b = b, a + b
if b > limit:
return

通过一个for 循环可以得到结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
>>> for i in fibonacci_list():
... print(i)

1
1
2
3
5
8
13
21
34
55
89

也可以直接list

1
2
>>> print(list(feb()))
[1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89]

使用next()

1
2
3
4
5
6
7
8
9
10
11
12
13
>>> f = fibonacci_list(3)
>>> next(f)
1
>>> next(f)
1
>>> next(f)
2
>>> next(f)
3
>>> next(f)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration

显然,作为Iterator,生成器可以用像list一样进行迭代,用法和list等类型无异。list的斐波那契数列需要占用全部数字的内存,但是生成器只需要保留两个数a, b的内存。另外,生成器和函数的区别是有个yield,生成器的执行会在yield处挂起,并在下次迭代继续执行。这部分在PEP 255有详细说明。

Coroutine

生成器只是个简单的“吐东西的函数”,直到协程的支持。从PEP 342开始,生成器进化成协程,功能也不再只是简单的“吐东西”,而是可以和外面互相传东西。加入了三个函数send()throw()close()

  • send是把值传进协程
  • throw可以把一个异常传进协程,需要协程内catch
  • close可以关闭协程,即给协程传一个GeneratorExit异常

对应的,在协程内,yield也不再是“statement”而是一个“expression”。可以写在等号右边了

The yield-statement will be allowed to be used on the right-hand side of an assignment; in that case it is referred to as yield-expression. The value of this yield-expression is None unless send() was called with a non-None argument

1
2
3
4
5
6
7
def coro():
a = yield 2
print(a)

c = coro()
next(c)
c.send(1)

在生成器外面通过next(c)可以得到2,然后c.send(1)会给生成器里面的yield 21,然后a = 1

另外和生成器一样,协程会在yield处挂起直到下个next()或者send()等传东西给yield的东西

Multitasking

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from time import sleep

def coro1():
print("coro1 created")
yield
print("coro1 doing some work1")
sleep(1)
yield
print("coro1 doing some work2")
sleep(1)
yield

def coro2():
print("coro2 created")
yield
print("coro2 doing some work1")
sleep(1)
yield
print("coro2 doing some work2")
sleep(1)
yield

def scheduler():
c1 = coro1()
c2 = coro2()
c1.send(None)
c2.send(None)
c1.send(None)
c2.send(None)
c1.send(None)
c2.send(None)

c1c2创建后通过send启动,直到yield挂起,然后再次send继续执行,在这里yieldsend不再传值,而是作为一个信号来传递控制权。而且这里使用的都是c1.send(None),这和next(c1)是一样的。

更多例子

Asynchronous

在上面的代码中,c1在执行的时候c2没有执行,而且c1会sleep,导致整个程序都在干等着,我希望全都一起执行。需要改进一下scheduler。一共有4个sleep,程序至少需要sleep 4s,总用时4.02s。

将程序改成下面这样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import time
from collections import deque


def coro1():
print("coro1 created")
yield "coro1 created"
print("coro1 doing some work1")
yield "sleep", 1
print("coro1 doing some work2")
yield "sleep", 1


def coro2():
print("coro2 created")
yield "coro2 created"
print("coro2 doing some work1")
yield "sleep", 1
print("coro2 doing some work2")
yield "sleep", 1


def scheduler(coros):
start = time.time()
coro_queue = deque(coros)
sleeping_coros = []
while True:
if not coro_queue and not sleeping_coros:
break

if not coro_queue:
# Find the first coroutine that ends sleep
sleeping_coros.sort(key=lambda x: x[0])
deadline, coro = sleeping_coros[0]
sleeping_coros.pop(0)

if deadline > time.time():
time.sleep(deadline - time.time())
coro_queue.append(coro)

try:
coro = coro_queue.popleft()
result = coro.send(None)
# the special case of a coro that wants to be put to sleep
if result and len(result) == 2 and result[0] == "sleep":
deadline = time.time() + result[1]
sleeping_coros.append((deadline, coro))
else:
print(f"Got: {result}")
coro_queue.append(coro)
except StopIteration:
pass
print(f"Time elapsed: {time.time()-start:.3}s")


if __name__ == "__main__":
scheduler([coro1(), coro2()])

输出:

1
2
3
4
5
6
7
8
9
coro1 created
Got: coro1 created
coro2 created
Got: coro2 created
coro1 doing some work1
coro2 doing some work1
coro2 doing some work2
coro1 doing some work2
Time elapsed: 2.02s

一共用了2s。

解释一下代码,在coro里面的sleep 不能让它真的sleep,那样会把整个程序卡住,而是传"sleep",1来告诉scheduler需要1s的时间。

scheduler里面维护两个队列,coro_queue这里面的协程可以马上执行不需要等,直接拿出来就能send()sleeping_coros里保存正在sleep的协程。
coro_queue里拿出来的协程如果告诉scheduler需要sleep,那么把这个协程放到sleeping_coros,并且记录这个协程要等到什么时候deadline。当coro_queue里面没有可以马上执行的协程了,就等到sleeping_coros里面最小的deadline,等到这个协程ready然后再把这个协程放回coro_queue

Nested Coroutines

如果某个协程调用了其他协程怎么办?

1
2
3
4
5
def coro3():
c1_result = coro1()
c2 = coro2(c1_result)

scheduler([coro3()]) # 会报错

这里报错,因为coro3不是协程了。需要改成这样

1
2
3
4
5
6
def coro3():
for step in coro1():
yield step
c1_result = step
for step in coro2(c1_result):
yield step

这样写实在太麻烦了,直到PEP 380提出了yield from

就变成这样:

1
2
3
def coro3():
c1_result = yield from coro1()
yield from coro2(c1_result)

这就可以全都丢进scheduler([coro3()])

async await

经过漫长的“进化”,终于在2015年的python3.5里,协程变成了“正经的东西”PEP 492

前面的代码可以变成下面这样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio

async def coro1():
print("coro1 created")
print("coro1 doing some work1")
await asyncio.sleep(1)
print("coro1 doing some work2")
await asyncio.sleep(1)
return 1


async def coro2(x):
print("coro2 created")
print("coro2 doing some work1")
await asyncio.sleep(1)
print("coro2 doing some work2")
await asyncio.sleep(1)
return 2

async def coro3():
c1_result = await coro1()
await coro2(c1_result)

asyncio.run(coro3())

yield from 变成了 await

sleep可以直接调用asyncio.sleep,这里依然不能是time.sleep,因为这一切的本质都没有改变,time.sleep会卡住整个线程。

scheduler变成了asyncio.run这不是必须的,scheduler依然可以使用,asyncio仅仅是"内置的封装"

Notes

生成器/协程的进化历史大概就这这样,scheduler进化成为了event loop,loop从所有task里面拿出一个,然后控制权交给这个协程,一直执行到await,控制权回到loop,loop再继续执行下一个task。也就是说,在同一时刻还是只有一个task运行,直到这个task遇到await,其他的都暂停。和thread的区别也就是减少线程切换开销了。如果需要多cpu运行还是需要multiprocessing

需要注意的是,如果使用了协程,那么最好全部代码都使用协程实现,因为所有的协程都在同一个thread里面,所以time.sleepthread.Lock().acquire()Queue().get()这些线程级别的阻塞都会导致整个loop阻塞,然后可能造成协程间死锁。

比如两个协程c1,c2抢同一个thread.Lockc1持有锁,现在协程loop把控制权给到c2手里,c2停在了lock.acquire(),然后整个loop都卡在这里了,c1等不到loop把控制权给它,所以c1没法release lock,这样就死锁了。

asyncio也提供了在协程里执行线程级函数的办法loop.run_in_executor,也就是把thread函数放到一个线程池里运行,并且封装成可以await的东西(Awaitable)。docs.python.org