12.3节翻译完成

This commit is contained in:
Tony Yang
2015-08-28 08:49:57 +08:00
parent 7a48aa81b7
commit e429beb5ed

View File

@@ -1,62 +1,58 @@
============================
12.3 线程间通信
12.3 线程间通信
============================
----------
问题
----------
You have multiple threads in your program and you want to safely communicate or
exchange data between them.
|
你的程序中有多个线程,你需要在这些线程之间安全地交换信息或数据
----------
解决方案
----------
Perhaps the safest way to send data from one thread to another is to use a Queue from
the queue library. To do this, you create a Queue instance that is shared by the threads.
Threads then use put() or get() operations to add or remove items from the queue.
For example:
从一个线程向另一个线程发送数据最安全的方式可能就是使用 ``queue`` 库中的队列了。创建一个被多个线程共享的 ``Queue`` 对象,这些线程通过使用 ``put()````get()`` 操作来向队列中添加或者删除元素。
例如:
from queue import Queue
from threading import Thread
.. code-block:: python
from queue import Queue
from threading import Thread
# A thread that produces data
def producer(out_q):
# A thread that produces data
def producer(out_q):
while True:
# Produce some data
...
out_q.put(data)
# A thread that consumes data
def consumer(in_q):
# A thread that consumes data
def consumer(in_q):
while True:
# Get some data
data = in_q.get()
# Process the data
...
# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()
# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()
Queue instances already have all of the required locking, so they can be safely shared by
as many threads as you wish.
When using queues, it can be somewhat tricky to coordinate the shutdown of the pro
ducer and consumer. A common solution to this problem is to rely on a special sentinel
value, which when placed in the queue, causes consumers to terminate. For example:
``Queue`` 对象已经包含了必要的锁,所以你可以通过它在多个线程间多安全地共享数据。
当使用队列时,协调生产者和消费者的关闭问题可能会有一些麻烦。一个通用的解决方法是在队列中放置一个特殊的只,当消费者读到这个值的时候,终止执行。例如:
from queue import Queue
from threading import Thread
.. code-block:: python
# Object that signals shutdown
_sentinel = object()
from queue import Queue
from threading import Thread
# A thread that produces data
def producer(out_q):
# Object that signals shutdown
_sentinel = object()
# A thread that produces data
def producer(out_q):
while running:
# Produce some data
...
@@ -65,8 +61,8 @@ def producer(out_q):
# Put the sentinel on the queue to indicate completion
out_q.put(_sentinel)
# A thread that consumes data
def consumer(in_q):
# A thread that consumes data
def consumer(in_q):
while True:
# Get some data
data = in_q.get()
@@ -79,20 +75,15 @@ def consumer(in_q):
# Process the data
...
A subtle feature of this example is that the consumer, upon receiving the special sentinel
value, immediately places it back onto the queue. This propagates the sentinel to other
consumers threads that might be listening on the same queue—thus shutting them all
down one after the other.
Although queues are the most common thread communication mechanism, you can
build your own data structures as long as you add the required locking and synchroni
zation. The most common way to do this is to wrap your data structures with a condition
variable. For example, here is how you might build a thread-safe priority queue, as
discussed in Recipe 1.5.
本例中有一个特殊的地方:消费者在读到这个特殊值之后立即又把它放回到队列中,将之传递下去。这样,所有监听这个队列的消费者线程就可以全部关闭了。
尽管队列是最常见的线程间通信机制,但是仍然可以自己通过创建自己的数据结构并添加所需的锁和同步机制来实现线程间通信。最常见的方法是使用 ``Condition`` 变量来包装你的数据结构。下边这个例子演示了如何创建一个线程安全的优先级队列如同1.5节中介绍的那样。
import heapq
import threading
.. code-block:: python
class PriorityQueue:
import heapq
import threading
class PriorityQueue:
def __init__(self):
self._queue = []
self._count = 0
@@ -109,23 +100,22 @@ class PriorityQueue:
self._cv.wait()
return heapq.heappop(self._queue)[-1]
Thread communication with a queue is a one-way and nondeterministic process. In
general, there is no way to know when the receiving thread has actually received a
message and worked on it. However, Queue objects do provide some basic completion
features, as illustrated by the task_done() and join() methods in this example:
使用队列来进行线程间通信是一个单向、不确定的过程。通常情况下,你没有办法知道接收数据的线程是什么时候接收到的数据并开始工作的。不过队列对象提供一些基本完成的特性,比如下边这个例子中的 ``task_done()````join()``
from queue import Queue
from threading import Thread
.. code-block:: python
# A thread that produces data
def producer(out_q):
from queue import Queue
from threading import Thread
# A thread that produces data
def producer(out_q):
while running:
# Produce some data
...
out_q.put(data)
# A thread that consumes data
def consumer(in_q):
# A thread that consumes data
def consumer(in_q):
while True:
# Get some data
data = in_q.get()
@@ -135,25 +125,25 @@ def consumer(in_q):
# Indicate completion
in_q.task_done()
# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()
# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()
# Wait for all produced items to be consumed
q.join()
# Wait for all produced items to be consumed
q.join()
If a thread needs to know immediately when a consumer thread has processed a par
ticular item of data, you should pair the sent data with an Event object that allows the
producer to monitor its progress. For example:
如果一个线程需要在一个“消费者”线程处理完特定的数据项时立即得到通知,你可以把要发送的数据和一个 ``Event`` 放到一起使用这样“生产者”就可以通过这个Event对象来监测处理的过程了。示例如下
from queue import Queue
from threading import Thread, Event
.. code-block:: python
# A thread that produces data
def producer(out_q):
from queue import Queue
from threading import Thread, Event
# A thread that produces data
def producer(out_q):
while running:
# Produce some data
...
@@ -164,8 +154,8 @@ def producer(out_q):
# Wait for the consumer to process the item
evt.wait()
# A thread that consumes data
def consumer(in_q):
# A thread that consumes data
def consumer(in_q):
while True:
# Get some data
data, evt = in_q.get()
@@ -174,93 +164,73 @@ def consumer(in_q):
# Indicate completion
evt.set()
|
----------
讨论
----------
Writing threaded programs based on simple queuing is often a good way to maintain
sanity. If you can break everything down to simple thread-safe queuing, youll find that
you dont need to litter your program with locks and other low-level synchronization.
Also, communicating with queues often leads to designs that can be scaled up to other
kinds of message-based communication patterns later on. For instance, you might be
基于简单队列编写多线程程序在多数情况下是一个比较明智的选择。从线程安全队列的底层实现来看,你无需在你的代码中使用锁和其他底层的同步机制,这些只会把你的程序弄得乱七八糟。此外,使用队列这种基于消息的通信机制可以被扩展到更大的应用范畴,比如,你可以把你的程序放入多个进程甚至是分布式系统而无需改变底层的队列结构。
使用线程队列有一个要注意的问题是,向队列中添加数据项时并不会复制此数据项,线程间通信实际上是在线程间传递对象引用。如果你担心对象的共享状态,那你最好只传递不可修改的数据结构(如:整型、字符串或者元组)或者一个对象的深拷贝。例如:
able to split your program into multiple processes, or even a distributed system, without
changing much of its underlying queuing architecture.
One caution with thread queues is that putting an item in a queue doesnt make a copy
of the item. Thus, communication actually involves passing an object reference between
threads. If you are concerned about shared state, it may make sense to only pass im
mutable data structures (e.g., integers, strings, or tuples) or to make deep copies of the
queued items. For example:
from queue import Queue
from threading import Thread
import copy
.. code-block:: python
# A thread that produces data
def producer(out_q):
from queue import Queue
from threading import Thread
import copy
# A thread that produces data
def producer(out_q):
while True:
# Produce some data
...
out_q.put(copy.deepcopy(data))
# A thread that consumes data
def consumer(in_q):
# A thread that consumes data
def consumer(in_q):
while True:
# Get some data
data = in_q.get()
# Process the data
...
``Queue`` 对象提供一些在当前上下文很有用的附加特性。比如在创建 Queue 对象时提供可选的 ``size`` 参数来限制可以添加到队列中的元素数量。对于“生产者”与“消费者”速度有差异的情况,为队列中的元素数量添加上限是有意义的。比如,一个“生产者”产生项目的速度比“消费者” “消费”的速度快,那么使用固定大小的队列就可以在队列已满的时候阻塞队列,以免未预期的连锁效应扩散整个程序造成死锁或者程序运行失常。在通信的线程之间进行“流量控制”是一个看起来容易实现起来困难的问题。如果你发现自己曾经试图通过摆弄队列大小来解决一个问题,这也许就标志着你的程序可能存在脆弱设计或者固有的可伸缩问题。
``get()````put()`` 方法都支持非阻塞方式和设定超时,例如:
Queue objects provide a few additional features that may prove to be useful in certain
contexts. If you create a Queue with an optional size, such as Queue(N), it places a limit
on the number of items that can be enqueued before the put() blocks the producer.
Adding an upper bound to a queue might make sense if there is mismatch in speed
between a producer and consumer. For instance, if a producer is generating items at a
much faster rate than they can be consumed. On the other hand, making a queue block
when its full can also have an unintended cascading effect throughout your program,
possibly causing it to deadlock or run poorly. In general, the problem of “flow control”
between communicating threads is a much harder problem than it seems. If you ever
find yourself trying to fix a problem by fiddling with queue sizes, it could be an indicator
of a fragile design or some other inherent scaling problem.
Both the get() and put() methods support nonblocking and timeouts. For example:
.. code-block:: python
import queue
q = queue.Queue()
import queue
q = queue.Queue()
try:
try:
data = q.get(block=False)
except queue.Empty:
except queue.Empty:
...
try:
try:
q.put(item, block=False)
except queue.Full:
except queue.Full:
...
try:
try:
data = q.get(timeout=5.0)
except queue.Empty:
except queue.Empty:
...
Both of these options can be used to avoid the problem of just blocking indefinitely on
a particular queuing operation. For example, a nonblocking put() could be used with
a fixed-sized queue to implement different kinds of handling code for when a queue is
full. For example, issuing a log message and discarding:
这些操作都可以用来避免当执行某些特定队列操作时发生无限阻塞的情况,比如,一个非阻塞的 ``put()`` 方法和一个固定大小的队列一起使用,这样当队列已满时就可以执行不同的代码。比如输出一条日志信息并丢弃。
def producer(q):
.. code-block:: python
def producer(q):
...
try:
q.put(item, block=False)
except queue.Full:
log.warning('queued item %r discarded!', item)
A timeout is useful if youre trying to make consumer threads periodically give up on
operations such as q.get() so that they can check things such as a termination flag, as
described in Recipe 12.1.
如果你试图让消费者线程在执行像 ``q.get()`` 这样的操作时,超时自动终止以便检查终止标志,你应该使用 ``q.get()`` 的可选参数 ``timeout`` ,如下:
_running = True
.. code-block:: python
def consumer(q):
_running = True
def consumer(q):
while _running:
try:
item = q.get(timeout=5.0)
@@ -269,10 +239,4 @@ def consumer(q):
except queue.Empty:
pass
Lastly, there are utility methods q.qsize(), q.full(), q.empty() that can tell you the
current size and status of the queue. However, be aware that all of these are unreliable
in a multithreaded environment. For example, a call to q.empty() might tell you that
the queue is empty, but in the time that has elapsed since making the call, another thread
could have added an item to the queue. Frankly, its best to write your code not to rely
on such functions.
最后,有 ``q.qsize()`` ``q.full()`` ``q.empty()`` 等实用方法可以获取一个队列的当前大小和状态。但要注意,这些方法都不是线程安全的。可能你对一个队列使用 ``empty()`` 判断出这个队列为空,但同时另外一个线程可能已经向这个队列中插入一个数据项。所以,你最好不要在你的代码中使用这些方法。