12.12小节完成
This commit is contained in:
@@ -5,392 +5,396 @@
|
||||
----------
|
||||
问题
|
||||
----------
|
||||
You want to implement concurrency using generators (coroutines) as an alternative to
|
||||
system threads. This is sometimes known as user-level threading or green threading.
|
||||
你想使用生成器(协程)替代系统线程来实现并发。这个有时又被称为用户级线程或绿色线程。
|
||||
|
||||
|
|
||||
|
||||
----------
|
||||
解决方案
|
||||
----------
|
||||
To implement your own concurrency using generators, you first need a fundamental
|
||||
insight concerning generator functions and the yield statement. Specifically, the fun‐
|
||||
damental behavior of yield is that it causes a generator to suspend its execution. By
|
||||
suspending execution, it is possible to write a scheduler that treats generators as a kind
|
||||
of “task” and alternates their execution using a kind of cooperative task switching.
|
||||
To illustrate this idea, consider the following two generator functions using a simple
|
||||
yield:
|
||||
要使用生成器实现自己的并发,你首先要对生成器函数和 ``yield`` 语句有深刻理解。
|
||||
``yield`` 语句会让一个生成器挂起它的执行,这样就可以编写一个调度器,
|
||||
将生成器当做某种“任务”并使用任务协作切换来替换它们的执行。
|
||||
要演示这种思想,考虑下面两个使用简单的 ``yield`` 语句的生成器函数:
|
||||
|
||||
# Two simple generator functions
|
||||
def countdown(n):
|
||||
while n > 0:
|
||||
print('T-minus', n)
|
||||
yield
|
||||
n -= 1
|
||||
print('Blastoff!')
|
||||
.. code-block:: python
|
||||
|
||||
def countup(n):
|
||||
x = 0
|
||||
while x < n:
|
||||
print('Counting up', x)
|
||||
yield
|
||||
x += 1
|
||||
# Two simple generator functions
|
||||
def countdown(n):
|
||||
while n > 0:
|
||||
print('T-minus', n)
|
||||
yield
|
||||
n -= 1
|
||||
print('Blastoff!')
|
||||
|
||||
These functions probably look a bit funny using yield all by itself. However, consider
|
||||
the following code that implements a simple task scheduler:
|
||||
def countup(n):
|
||||
x = 0
|
||||
while x < n:
|
||||
print('Counting up', x)
|
||||
yield
|
||||
x += 1
|
||||
|
||||
from collections import deque
|
||||
这些函数在内部使用yield语句,下面是一个实现了简单任务调度器的代码:
|
||||
|
||||
class TaskScheduler:
|
||||
def __init__(self):
|
||||
self._task_queue = deque()
|
||||
.. code-block:: python
|
||||
|
||||
def new_task(self, task):
|
||||
'''
|
||||
Admit a newly started task to the scheduler
|
||||
from collections import deque
|
||||
|
||||
'''
|
||||
self._task_queue.append(task)
|
||||
class TaskScheduler:
|
||||
def __init__(self):
|
||||
self._task_queue = deque()
|
||||
|
||||
def run(self):
|
||||
'''
|
||||
Run until there are no more tasks
|
||||
'''
|
||||
while self._task_queue:
|
||||
task = self._task_queue.popleft()
|
||||
try:
|
||||
# Run until the next yield statement
|
||||
next(task)
|
||||
self._task_queue.append(task)
|
||||
except StopIteration:
|
||||
# Generator is no longer executing
|
||||
pass
|
||||
def new_task(self, task):
|
||||
'''
|
||||
Admit a newly started task to the scheduler
|
||||
|
||||
# Example use
|
||||
sched = TaskScheduler()
|
||||
sched.new_task(countdown(10))
|
||||
sched.new_task(countdown(5))
|
||||
sched.new_task(countup(15))
|
||||
sched.run()
|
||||
'''
|
||||
self._task_queue.append(task)
|
||||
|
||||
In this code, the TaskScheduler class runs a collection of generators in a round-robin
|
||||
manner—each one running until they reach a yield statement. For the sample, the
|
||||
output will be as follows:
|
||||
def run(self):
|
||||
'''
|
||||
Run until there are no more tasks
|
||||
'''
|
||||
while self._task_queue:
|
||||
task = self._task_queue.popleft()
|
||||
try:
|
||||
# Run until the next yield statement
|
||||
next(task)
|
||||
self._task_queue.append(task)
|
||||
except StopIteration:
|
||||
# Generator is no longer executing
|
||||
pass
|
||||
|
||||
T-minus 10
|
||||
T-minus 5
|
||||
Counting up 0
|
||||
T-minus 9
|
||||
T-minus 4
|
||||
Counting up 1
|
||||
T-minus 8
|
||||
T-minus 3
|
||||
Counting up 2
|
||||
T-minus 7
|
||||
T-minus 2
|
||||
...
|
||||
|
||||
At this point, you’ve essentially implemented the tiny core of an “operating system” if
|
||||
you will. Generator functions are the tasks and the yield statement is how tasks signal
|
||||
that they want to suspend. The scheduler simply cycles over the tasks until none are left
|
||||
executing.
|
||||
In practice, you probably wouldn’t use generators to implement concurrency for some‐
|
||||
thing as simple as shown. Instead, you might use generators to replace the use of threads
|
||||
when implementing actors (see Recipe 12.10) or network servers.
|
||||
|
||||
The following code illustrates the use of generators to implement a thread-free version
|
||||
of actors:
|
||||
|
||||
from collections import deque
|
||||
|
||||
class ActorScheduler:
|
||||
def __init__(self):
|
||||
self._actors = { } # Mapping of names to actors
|
||||
self._msg_queue = deque() # Message queue
|
||||
|
||||
def new_actor(self, name, actor):
|
||||
'''
|
||||
Admit a newly started actor to the scheduler and give it a name
|
||||
'''
|
||||
self._msg_queue.append((actor,None))
|
||||
self._actors[name] = actor
|
||||
|
||||
def send(self, name, msg):
|
||||
'''
|
||||
Send a message to a named actor
|
||||
'''
|
||||
actor = self._actors.get(name)
|
||||
if actor:
|
||||
self._msg_queue.append((actor,msg))
|
||||
|
||||
def run(self):
|
||||
'''
|
||||
Run as long as there are pending messages.
|
||||
'''
|
||||
while self._msg_queue:
|
||||
actor, msg = self._msg_queue.popleft()
|
||||
try:
|
||||
actor.send(msg)
|
||||
except StopIteration:
|
||||
pass
|
||||
|
||||
# Example use
|
||||
if __name__ == '__main__':
|
||||
def printer():
|
||||
while True:
|
||||
msg = yield
|
||||
print('Got:', msg)
|
||||
|
||||
def counter(sched):
|
||||
while True:
|
||||
# Receive the current count
|
||||
n = yield
|
||||
if n == 0:
|
||||
break
|
||||
# Send to the printer task
|
||||
sched.send('printer', n)
|
||||
# Send the next count to the counter task (recursive)
|
||||
|
||||
sched.send('counter', n-1)
|
||||
|
||||
sched = ActorScheduler()
|
||||
# Create the initial actors
|
||||
sched.new_actor('printer', printer())
|
||||
sched.new_actor('counter', counter(sched))
|
||||
|
||||
# Send an initial message to the counter to initiate
|
||||
sched.send('counter', 10000)
|
||||
# Example use
|
||||
sched = TaskScheduler()
|
||||
sched.new_task(countdown(10))
|
||||
sched.new_task(countdown(5))
|
||||
sched.new_task(countup(15))
|
||||
sched.run()
|
||||
|
||||
The execution of this code might take a bit of study, but the key is the queue of pending
|
||||
messages. Essentially, the scheduler runs as long as there are messages to deliver. A
|
||||
remarkable feature is that the counter generator sends messages to itself and ends up
|
||||
in a recursive cycle not bound by Python’s recursion limit.
|
||||
Here is an advanced example showing the use of generators to implement a concurrent
|
||||
network application:
|
||||
``TaskScheduler`` 类在一个循环中运行生成器集合——每个都运行到碰到yield语句为止。
|
||||
运行这个例子,输出如下:
|
||||
|
||||
from collections import deque
|
||||
from select import select
|
||||
::
|
||||
|
||||
# This class represents a generic yield event in the scheduler
|
||||
class YieldEvent:
|
||||
def handle_yield(self, sched, task):
|
||||
pass
|
||||
def handle_resume(self, sched, task):
|
||||
pass
|
||||
T-minus 10
|
||||
T-minus 5
|
||||
Counting up 0
|
||||
T-minus 9
|
||||
T-minus 4
|
||||
Counting up 1
|
||||
T-minus 8
|
||||
T-minus 3
|
||||
Counting up 2
|
||||
T-minus 7
|
||||
T-minus 2
|
||||
...
|
||||
|
||||
# Task Scheduler
|
||||
class Scheduler:
|
||||
def __init__(self):
|
||||
self._numtasks = 0 # Total num of tasks
|
||||
self._ready = deque() # Tasks ready to run
|
||||
self._read_waiting = {} # Tasks waiting to read
|
||||
self._write_waiting = {} # Tasks waiting to write
|
||||
到此为止,我们实际上已经实现了一个“操作系统”的最小核心部分。
|
||||
生成器函数就是认为,而yield语句是任务挂起的信号。
|
||||
调度器循环检查任务列表直到没有任务要执行为止。
|
||||
|
||||
# Poll for I/O events and restart waiting tasks
|
||||
def _iopoll(self):
|
||||
rset,wset,eset = select(self._read_waiting,
|
||||
self._write_waiting,[])
|
||||
for r in rset:
|
||||
evt, task = self._read_waiting.pop(r)
|
||||
evt.handle_resume(self, task)
|
||||
for w in wset:
|
||||
evt, task = self._write_waiting.pop(w)
|
||||
evt.handle_resume(self, task)
|
||||
实际上,你可能想要使用生成器来实现简单的并发。
|
||||
那么,在实现actor或网络服务器的时候你可以使用生成器来替代线程的使用。
|
||||
|
||||
def new(self,task):
|
||||
'''
|
||||
Add a newly started task to the scheduler
|
||||
'''
|
||||
下面的代码演示了使用生成器来实现一个不依赖线程的actor:
|
||||
|
||||
self._ready.append((task, None))
|
||||
self._numtasks += 1
|
||||
.. code-block:: python
|
||||
|
||||
def add_ready(self, task, msg=None):
|
||||
'''
|
||||
Append an already started task to the ready queue.
|
||||
msg is what to send into the task when it resumes.
|
||||
'''
|
||||
self._ready.append((task, msg))
|
||||
from collections import deque
|
||||
|
||||
# Add a task to the reading set
|
||||
def _read_wait(self, fileno, evt, task):
|
||||
self._read_waiting[fileno] = (evt, task)
|
||||
class ActorScheduler:
|
||||
def __init__(self):
|
||||
self._actors = { } # Mapping of names to actors
|
||||
self._msg_queue = deque() # Message queue
|
||||
|
||||
# Add a task to the write set
|
||||
def _write_wait(self, fileno, evt, task):
|
||||
self._write_waiting[fileno] = (evt, task)
|
||||
def new_actor(self, name, actor):
|
||||
'''
|
||||
Admit a newly started actor to the scheduler and give it a name
|
||||
'''
|
||||
self._msg_queue.append((actor,None))
|
||||
self._actors[name] = actor
|
||||
|
||||
def run(self):
|
||||
'''
|
||||
Run the task scheduler until there are no tasks
|
||||
'''
|
||||
while self._numtasks:
|
||||
if not self._ready:
|
||||
self._iopoll()
|
||||
task, msg = self._ready.popleft()
|
||||
try:
|
||||
# Run the coroutine to the next yield
|
||||
r = task.send(msg)
|
||||
if isinstance(r, YieldEvent):
|
||||
r.handle_yield(self, task)
|
||||
else:
|
||||
raise RuntimeError('unrecognized yield event')
|
||||
except StopIteration:
|
||||
self._numtasks -= 1
|
||||
def send(self, name, msg):
|
||||
'''
|
||||
Send a message to a named actor
|
||||
'''
|
||||
actor = self._actors.get(name)
|
||||
if actor:
|
||||
self._msg_queue.append((actor,msg))
|
||||
|
||||
# Example implementation of coroutine-based socket I/O
|
||||
class ReadSocket(YieldEvent):
|
||||
def __init__(self, sock, nbytes):
|
||||
self.sock = sock
|
||||
self.nbytes = nbytes
|
||||
def handle_yield(self, sched, task):
|
||||
sched._read_wait(self.sock.fileno(), self, task)
|
||||
def handle_resume(self, sched, task):
|
||||
data = self.sock.recv(self.nbytes)
|
||||
sched.add_ready(task, data)
|
||||
def run(self):
|
||||
'''
|
||||
Run as long as there are pending messages.
|
||||
'''
|
||||
while self._msg_queue:
|
||||
actor, msg = self._msg_queue.popleft()
|
||||
try:
|
||||
actor.send(msg)
|
||||
except StopIteration:
|
||||
pass
|
||||
|
||||
class WriteSocket(YieldEvent):
|
||||
def __init__(self, sock, data):
|
||||
self.sock = sock
|
||||
self.data = data
|
||||
def handle_yield(self, sched, task):
|
||||
|
||||
sched._write_wait(self.sock.fileno(), self, task)
|
||||
def handle_resume(self, sched, task):
|
||||
nsent = self.sock.send(self.data)
|
||||
sched.add_ready(task, nsent)
|
||||
|
||||
class AcceptSocket(YieldEvent):
|
||||
def __init__(self, sock):
|
||||
self.sock = sock
|
||||
def handle_yield(self, sched, task):
|
||||
sched._read_wait(self.sock.fileno(), self, task)
|
||||
def handle_resume(self, sched, task):
|
||||
r = self.sock.accept()
|
||||
sched.add_ready(task, r)
|
||||
|
||||
# Wrapper around a socket object for use with yield
|
||||
class Socket(object):
|
||||
def __init__(self, sock):
|
||||
self._sock = sock
|
||||
def recv(self, maxbytes):
|
||||
return ReadSocket(self._sock, maxbytes)
|
||||
def send(self, data):
|
||||
return WriteSocket(self._sock, data)
|
||||
def accept(self):
|
||||
return AcceptSocket(self._sock)
|
||||
def __getattr__(self, name):
|
||||
return getattr(self._sock, name)
|
||||
|
||||
if __name__ == '__main__':
|
||||
from socket import socket, AF_INET, SOCK_STREAM
|
||||
import time
|
||||
|
||||
# Example of a function involving generators. This should
|
||||
# be called using line = yield from readline(sock)
|
||||
def readline(sock):
|
||||
chars = []
|
||||
while True:
|
||||
c = yield sock.recv(1)
|
||||
if not c:
|
||||
break
|
||||
chars.append(c)
|
||||
if c == b'\n':
|
||||
break
|
||||
return b''.join(chars)
|
||||
|
||||
# Echo server using generators
|
||||
class EchoServer:
|
||||
def __init__(self,addr,sched):
|
||||
self.sched = sched
|
||||
sched.new(self.server_loop(addr))
|
||||
|
||||
def server_loop(self,addr):
|
||||
s = Socket(socket(AF_INET,SOCK_STREAM))
|
||||
|
||||
s.bind(addr)
|
||||
s.listen(5)
|
||||
# Example use
|
||||
if __name__ == '__main__':
|
||||
def printer():
|
||||
while True:
|
||||
c,a = yield s.accept()
|
||||
print('Got connection from ', a)
|
||||
self.sched.new(self.client_handler(Socket(c)))
|
||||
msg = yield
|
||||
print('Got:', msg)
|
||||
|
||||
def client_handler(self,client):
|
||||
def counter(sched):
|
||||
while True:
|
||||
line = yield from readline(client)
|
||||
if not line:
|
||||
# Receive the current count
|
||||
n = yield
|
||||
if n == 0:
|
||||
break
|
||||
line = b'GOT:' + line
|
||||
while line:
|
||||
nsent = yield client.send(line)
|
||||
line = line[nsent:]
|
||||
client.close()
|
||||
print('Client closed')
|
||||
# Send to the printer task
|
||||
sched.send('printer', n)
|
||||
# Send the next count to the counter task (recursive)
|
||||
|
||||
sched = Scheduler()
|
||||
EchoServer(('',16000),sched)
|
||||
sched.run()
|
||||
sched.send('counter', n-1)
|
||||
|
||||
This code will undoubtedly require a certain amount of careful study. However, it is
|
||||
essentially implementing a small operating system. There is a queue of tasks ready to
|
||||
run and there are waiting areas for tasks sleeping for I/O. Much of the scheduler involves
|
||||
moving tasks between the ready queue and the I/O waiting area.
|
||||
sched = ActorScheduler()
|
||||
# Create the initial actors
|
||||
sched.new_actor('printer', printer())
|
||||
sched.new_actor('counter', counter(sched))
|
||||
|
||||
# Send an initial message to the counter to initiate
|
||||
sched.send('counter', 10000)
|
||||
sched.run()
|
||||
|
||||
完全弄懂这段代码需要更深入的学习,但是关键点在于收集消息的队列。
|
||||
本质上,调度器在有需要发送的消息时会一直运行着。
|
||||
计数生成器会给自己发送消息并在一个递归循环中结束。
|
||||
|
||||
下面是一个更加高级的例子,演示了使用生成器来实现一个并发网络应用程序:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from collections import deque
|
||||
from select import select
|
||||
|
||||
# This class represents a generic yield event in the scheduler
|
||||
class YieldEvent:
|
||||
def handle_yield(self, sched, task):
|
||||
pass
|
||||
def handle_resume(self, sched, task):
|
||||
pass
|
||||
|
||||
# Task Scheduler
|
||||
class Scheduler:
|
||||
def __init__(self):
|
||||
self._numtasks = 0 # Total num of tasks
|
||||
self._ready = deque() # Tasks ready to run
|
||||
self._read_waiting = {} # Tasks waiting to read
|
||||
self._write_waiting = {} # Tasks waiting to write
|
||||
|
||||
# Poll for I/O events and restart waiting tasks
|
||||
def _iopoll(self):
|
||||
rset,wset,eset = select(self._read_waiting,
|
||||
self._write_waiting,[])
|
||||
for r in rset:
|
||||
evt, task = self._read_waiting.pop(r)
|
||||
evt.handle_resume(self, task)
|
||||
for w in wset:
|
||||
evt, task = self._write_waiting.pop(w)
|
||||
evt.handle_resume(self, task)
|
||||
|
||||
def new(self,task):
|
||||
'''
|
||||
Add a newly started task to the scheduler
|
||||
'''
|
||||
|
||||
self._ready.append((task, None))
|
||||
self._numtasks += 1
|
||||
|
||||
def add_ready(self, task, msg=None):
|
||||
'''
|
||||
Append an already started task to the ready queue.
|
||||
msg is what to send into the task when it resumes.
|
||||
'''
|
||||
self._ready.append((task, msg))
|
||||
|
||||
# Add a task to the reading set
|
||||
def _read_wait(self, fileno, evt, task):
|
||||
self._read_waiting[fileno] = (evt, task)
|
||||
|
||||
# Add a task to the write set
|
||||
def _write_wait(self, fileno, evt, task):
|
||||
self._write_waiting[fileno] = (evt, task)
|
||||
|
||||
def run(self):
|
||||
'''
|
||||
Run the task scheduler until there are no tasks
|
||||
'''
|
||||
while self._numtasks:
|
||||
if not self._ready:
|
||||
self._iopoll()
|
||||
task, msg = self._ready.popleft()
|
||||
try:
|
||||
# Run the coroutine to the next yield
|
||||
r = task.send(msg)
|
||||
if isinstance(r, YieldEvent):
|
||||
r.handle_yield(self, task)
|
||||
else:
|
||||
raise RuntimeError('unrecognized yield event')
|
||||
except StopIteration:
|
||||
self._numtasks -= 1
|
||||
|
||||
# Example implementation of coroutine-based socket I/O
|
||||
class ReadSocket(YieldEvent):
|
||||
def __init__(self, sock, nbytes):
|
||||
self.sock = sock
|
||||
self.nbytes = nbytes
|
||||
def handle_yield(self, sched, task):
|
||||
sched._read_wait(self.sock.fileno(), self, task)
|
||||
def handle_resume(self, sched, task):
|
||||
data = self.sock.recv(self.nbytes)
|
||||
sched.add_ready(task, data)
|
||||
|
||||
class WriteSocket(YieldEvent):
|
||||
def __init__(self, sock, data):
|
||||
self.sock = sock
|
||||
self.data = data
|
||||
def handle_yield(self, sched, task):
|
||||
|
||||
sched._write_wait(self.sock.fileno(), self, task)
|
||||
def handle_resume(self, sched, task):
|
||||
nsent = self.sock.send(self.data)
|
||||
sched.add_ready(task, nsent)
|
||||
|
||||
class AcceptSocket(YieldEvent):
|
||||
def __init__(self, sock):
|
||||
self.sock = sock
|
||||
def handle_yield(self, sched, task):
|
||||
sched._read_wait(self.sock.fileno(), self, task)
|
||||
def handle_resume(self, sched, task):
|
||||
r = self.sock.accept()
|
||||
sched.add_ready(task, r)
|
||||
|
||||
# Wrapper around a socket object for use with yield
|
||||
class Socket(object):
|
||||
def __init__(self, sock):
|
||||
self._sock = sock
|
||||
def recv(self, maxbytes):
|
||||
return ReadSocket(self._sock, maxbytes)
|
||||
def send(self, data):
|
||||
return WriteSocket(self._sock, data)
|
||||
def accept(self):
|
||||
return AcceptSocket(self._sock)
|
||||
def __getattr__(self, name):
|
||||
return getattr(self._sock, name)
|
||||
|
||||
if __name__ == '__main__':
|
||||
from socket import socket, AF_INET, SOCK_STREAM
|
||||
import time
|
||||
|
||||
# Example of a function involving generators. This should
|
||||
# be called using line = yield from readline(sock)
|
||||
def readline(sock):
|
||||
chars = []
|
||||
while True:
|
||||
c = yield sock.recv(1)
|
||||
if not c:
|
||||
break
|
||||
chars.append(c)
|
||||
if c == b'\n':
|
||||
break
|
||||
return b''.join(chars)
|
||||
|
||||
# Echo server using generators
|
||||
class EchoServer:
|
||||
def __init__(self,addr,sched):
|
||||
self.sched = sched
|
||||
sched.new(self.server_loop(addr))
|
||||
|
||||
def server_loop(self,addr):
|
||||
s = Socket(socket(AF_INET,SOCK_STREAM))
|
||||
|
||||
s.bind(addr)
|
||||
s.listen(5)
|
||||
while True:
|
||||
c,a = yield s.accept()
|
||||
print('Got connection from ', a)
|
||||
self.sched.new(self.client_handler(Socket(c)))
|
||||
|
||||
def client_handler(self,client):
|
||||
while True:
|
||||
line = yield from readline(client)
|
||||
if not line:
|
||||
break
|
||||
line = b'GOT:' + line
|
||||
while line:
|
||||
nsent = yield client.send(line)
|
||||
line = line[nsent:]
|
||||
client.close()
|
||||
print('Client closed')
|
||||
|
||||
sched = Scheduler()
|
||||
EchoServer(('',16000),sched)
|
||||
sched.run()
|
||||
|
||||
这段代码有点复杂。不过,它实现了一个小型的操作系统。
|
||||
有一个就绪的任务队列,并且还有因I/O休眠的任务等待区域。
|
||||
还有很多调度器负责在就绪队列和I/O等待区域之间移动任务。
|
||||
|
||||
|
|
||||
|
||||
----------
|
||||
讨论
|
||||
----------
|
||||
When building generator-based concurrency frameworks, it is most common to work
|
||||
with the more general form of yield:
|
||||
在构建基于生成器的并发框架时,通常会使用更常见的yield形式:
|
||||
|
||||
def some_generator():
|
||||
...
|
||||
result = yield data
|
||||
...
|
||||
.. code-block:: python
|
||||
|
||||
Functions that use yield in this manner are more generally referred to as “coroutines.”
|
||||
Within a scheduler, the yield statement gets handled in a loop as follows:
|
||||
def some_generator():
|
||||
...
|
||||
result = yield data
|
||||
...
|
||||
|
||||
f = some_generator()
|
||||
使用这种形式的yield语句的函数通常被称为“协程”。
|
||||
通过调度器,yield语句在一个循环中被处理,如下:
|
||||
|
||||
# Initial result. Is None to start since nothing has been computed
|
||||
result = None
|
||||
while True:
|
||||
try:
|
||||
data = f.send(result)
|
||||
result = ... do some calculation ...
|
||||
except StopIteration:
|
||||
break
|
||||
.. code-block:: python
|
||||
|
||||
f = some_generator()
|
||||
|
||||
# Initial result. Is None to start since nothing has been computed
|
||||
result = None
|
||||
while True:
|
||||
try:
|
||||
data = f.send(result)
|
||||
result = ... do some calculation ...
|
||||
except StopIteration:
|
||||
break
|
||||
|
||||
这里的逻辑稍微有点复杂。不过,被传给 ``send()`` 的值定义了在yield语句醒来时的返回值。
|
||||
因此,如果一个yield准备在对之前yield数据的回应中返回结果时,会在下一次 ``send()`` 操作返回。
|
||||
如果一个生成器函数刚开始运行,发送一个None值会让它排在第一个yield语句前面。
|
||||
|
||||
除了发送值外,还可以在一个生成器上面执行一个 ``close()`` 方法。
|
||||
它会导致在执行yield语句时抛出一个 ``GeneratorExit`` 异常,从而终止执行。
|
||||
如果进一步设计,一个生成器可以捕获这个异常并执行清理操作。
|
||||
同样还可以使用生成器的 ``throw()`` 方法在yield语句执行时生成一个任意的执行指令。
|
||||
一个任务调度器可利用它来在运行的生成器中处理错误。
|
||||
|
||||
最后一个例子中使用的 ``yield from`` 语句被用来实现协程,可以被其它生成器作为子程序或过程来调用。
|
||||
本质上就是将控制权透明的传输给新的函数。
|
||||
不像普通的生成器,一个使用 ``yield from`` 被调用的函数可以返回一个作为 ``yield from`` 语句结果的值。
|
||||
关于 ``yield from`` 的更多信息可以在 `PEP 380 <http://www.python.org/dev/peps/pep-0380>`_ 中找到。
|
||||
|
||||
最后,如果使用生成器编程,要提醒你的是它还是有很多缺点的。
|
||||
特别是,你得不到任何线程可以提供的好处。例如,如果你执行CPU依赖或I/O阻塞程序,
|
||||
它会将整个任务挂起知道操作完成。为了解决这个问题,
|
||||
你只能选择将操作委派给另外一个可以独立运行的线程或进程。
|
||||
另外一个限制是大部分Python库并不能很好的兼容基于生成器的线程。
|
||||
如果你选择这个方案,你会发现你需要自己改写很多标准库函数。
|
||||
作为本节提到的协程和相关技术的一个基础背景,可以查看 `PEP 342 <http://www.python.org/dev/peps/pep-0342>`_
|
||||
和 `“协程和并发的一门有趣课程” <http://www.dabeaz.com/coroutines>`_
|
||||
|
||||
PEP 3156 同样有一个关于使用协程的异步I/O模型。
|
||||
特别的,你不可能自己去实现一个底层的协程调度器。
|
||||
不过,关于协程的思想是很多流行库的基础,
|
||||
包括 `gevent <http://www.gevent.org/>`_,
|
||||
`greenlet <http://pypi.python.org/pypi/greenlet>`_,
|
||||
`Stackless Python <http://www.stackless.com/>`_ 以及其他类似工程。
|
||||
|
||||
The logic concerning the result is a bit convoluted. However, the value passed to send()
|
||||
defines what gets returned when the yield statement wakes back up. So, if a yield is
|
||||
going to return a result in response to data that was previously yielded, it gets returned
|
||||
on the next send() operation. If a generator function has just started, sending in a value
|
||||
of None simply makes it advance to the first yield statement.
|
||||
In addition to sending in values, it is also possible to execute a close() method on a
|
||||
generator. This causes a silent GeneratorExit exception to be raised at the yield state‐
|
||||
ment, which stops execution. If desired, a generator can catch this exception and per‐
|
||||
form cleanup actions. It’s also possible to use the throw() method of a generator to raise
|
||||
an arbitrary execution at the yield statement. A task scheduler might use this to com‐
|
||||
municate errors into running generators.
|
||||
The yield from statement used in the last example is used to implement coroutines
|
||||
that serve as subroutines or procedures to be called from other generators. Essentially,
|
||||
control transparently transfers to the new function. Unlike normal generators, a func‐
|
||||
tion that is called using yield from can return a value that becomes the result of the
|
||||
yield from statement. More information about yield from can be found in PEP 380.
|
||||
Finally, if programming with generators, it is important to stress that there are some
|
||||
major limitations. In particular, you get none of the benefits that threads provide. For
|
||||
instance, if you execute any code that is CPU bound or which blocks for I/O, it will
|
||||
suspend the entire task scheduler until the completion of that operation. To work around
|
||||
this, your only real option is to delegate the operation to a separate thread or process
|
||||
where it can run independently. Another limitation is that most Python libraries have
|
||||
not been written to work well with generator-based threading. If you take this approach,
|
||||
you may find that you need to write replacements for many standard library functions.
|
||||
As basic background on coroutines and the techniques utilized in this recipe, see PEP
|
||||
342 and “A Curious Course on Coroutines and Concurrency”.
|
||||
PEP 3156 also has a modern take on asynchronous I/O involving coroutines. In practice,
|
||||
it is extremelyunlikely that you will write a low-level coroutine scheduler yourself.
|
||||
However, ideas surrounding coroutines are the basis for many popular libraries, in‐
|
||||
cluding gevent, greenlet, Stackless Python, and similar projects.
|
||||
|
||||
Reference in New Issue
Block a user