12.7小节完成

This commit is contained in:
yidao620c
2015-12-26 16:25:07 +08:00
parent df22dfb224
commit e7460ca291

View File

@@ -5,176 +5,173 @@
----------
问题
----------
You want to create a pool of worker threads for serving clients or performing other kinds
of work.
你创建一个工作者线程池,用来相应客户端请求或执行其他的工作。
|
----------
解决方案
----------
The concurrent.futures library has a ThreadPoolExecutor class that can be used for
this purpose. Here is an example of a simple TCP server that uses a thread-pool to serve
clients:
``concurrent.futures`` 函数库有一个 ``ThreadPoolExecutor`` 类可以被用来完成这个任务。
下面是一个简单的TCP服务器使用了一个线程池来响应客户端
from socket import AF_INET, SOCK_STREAM, socket
from concurrent.futures import ThreadPoolExecutor
.. code-block:: python
def echo_client(sock, client_addr):
'''
Handle a client connection
'''
print('Got connection from', client_addr)
while True:
msg = sock.recv(65536)
if not msg:
break
sock.sendall(msg)
print('Client closed connection')
sock.close()
from socket import AF_INET, SOCK_STREAM, socket
from concurrent.futures import ThreadPoolExecutor
def echo_server(addr):
pool = ThreadPoolExecutor(128)
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
pool.submit(echo_client, client_sock, client_addr)
def echo_client(sock, client_addr):
'''
Handle a client connection
'''
print('Got connection from', client_addr)
while True:
msg = sock.recv(65536)
if not msg:
break
sock.sendall(msg)
print('Client closed connection')
sock.close()
echo_server(('',15000))
def echo_server(addr):
pool = ThreadPoolExecutor(128)
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
pool.submit(echo_client, client_sock, client_addr)
If you want to manually create your own thread pool, its usually easy enough to do it
using a Queue. Here is a slightly different, but manual implementation of the same code:
echo_server(('',15000))
from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread
from queue import Queue
如果你想手动创建你自己的线程池,
通常可以使用一个Queue来轻松实现。下面是一个稍微不同但是手动实现的例子
def echo_client(q):
'''
Handle a client connection
'''
sock, client_addr = q.get()
print('Got connection from', client_addr)
while True:
msg = sock.recv(65536)
if not msg:
break
sock.sendall(msg)
print('Client closed connection')
.. code-block:: python
sock.close()
from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread
from queue import Queue
def echo_server(addr, nworkers):
# Launch the client workers
q = Queue()
for n in range(nworkers):
t = Thread(target=echo_client, args=(q,))
t.daemon = True
t.start()
def echo_client(q):
'''
Handle a client connection
'''
sock, client_addr = q.get()
print('Got connection from', client_addr)
while True:
msg = sock.recv(65536)
if not msg:
break
sock.sendall(msg)
print('Client closed connection')
# Run the server
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
q.put((client_sock, client_addr))
sock.close()
echo_server(('',15000), 128)
def echo_server(addr, nworkers):
# Launch the client workers
q = Queue()
for n in range(nworkers):
t = Thread(target=echo_client, args=(q,))
t.daemon = True
t.start()
One advantage of using ThreadPoolExecutor over a manual implementation is that it
makes it easier for the submitter to receive results from the called function. For example,
you could write code like this:
# Run the server
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
q.put((client_sock, client_addr))
from concurrent.futures import ThreadPoolExecutor
import urllib.request
echo_server(('',15000), 128)
def fetch_url(url):
u = urllib.request.urlopen(url)
data = u.read()
return data
使用 ``ThreadPoolExecutor`` 相对于手动实现的一个好处在于它使得
任务提交者更方便的从被调用函数中获取返回值。例如,你可能会像下面这样写:
pool = ThreadPoolExecutor(10)
# Submit work to the pool
a = pool.submit(fetch_url, 'http://www.python.org')
b = pool.submit(fetch_url, 'http://www.pypy.org')
.. code-block:: python
# Get the results back
x = a.result()
y = b.result()
from concurrent.futures import ThreadPoolExecutor
import urllib.request
The result objects in the example handle all of the blocking and coordination needed
to get data back from the worker thread. Specifically, the operation a.result() blocks
until the corresponding function has been executed by the pool and returned a value.
def fetch_url(url):
u = urllib.request.urlopen(url)
data = u.read()
return data
pool = ThreadPoolExecutor(10)
# Submit work to the pool
a = pool.submit(fetch_url, 'http://www.python.org')
b = pool.submit(fetch_url, 'http://www.pypy.org')
# Get the results back
x = a.result()
y = b.result()
例子中返回的handle对象会帮你处理所有的阻塞与协作然后从工作线程中返回数据给你。
特别的,``a.result()`` 操作会阻塞进程直到对应的函数执行完成并返回一个结果。
|
----------
讨论
----------
Generally, you should avoid writing programs that allow unlimited growth in the num
ber of threads. For example, take a look at the following server:
通常来讲,你应该避免编写线程数量可以无限制增长的程序。例如,看看下面这个服务器:
from threading import Thread
from socket import socket, AF_INET, SOCK_STREAM
.. code-block:: python
def echo_client(sock, client_addr):
'''
Handle a client connection
'''
print('Got connection from', client_addr)
while True:
msg = sock.recv(65536)
if not msg:
break
sock.sendall(msg)
print('Client closed connection')
sock.close()
from threading import Thread
from socket import socket, AF_INET, SOCK_STREAM
def echo_server(addr, nworkers):
# Run the server
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
t = Thread(target=echo_client, args=(client_sock, client_addr))
t.daemon = True
t.start()
def echo_client(sock, client_addr):
'''
Handle a client connection
'''
print('Got connection from', client_addr)
while True:
msg = sock.recv(65536)
if not msg:
break
sock.sendall(msg)
print('Client closed connection')
sock.close()
echo_server(('',15000))
def echo_server(addr, nworkers):
# Run the server
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
t = Thread(target=echo_client, args=(client_sock, client_addr))
t.daemon = True
t.start()
Although this works, it doesnt prevent some asynchronous hipster from launching an
attack on the server that makes it create so many threads that your program runs out
of resources and crashes (thus further demonstrating the “evils” of using threads). By
using a pre-initialized thread pool, you can carefully put an upper limit on the amount
of supported concurrency.
You might be concerned with the effect of creating a large number of threads. However,
modern systems should have no trouble creating pools of a few thousand threads.
Moreover, having a thousand threads just sitting around waiting for work isnt going to
have much, if any, impact on the performance of other code (a sleeping thread does just
that—nothing at all). Of course, if all of those threads wake up at the same time and
start hammering on the CPU, thats a different story—especially in light of the Global
Interpreter Lock (GIL). Generally, you only want to use thread pools for I/O-bound
processing.
One possible concern with creating large thread pools might be memory use. For ex
ample, if you create 2,000 threads on OS X, the system shows the Python process using
up more than 9 GB of virtual memory. However, this is actually somewhat misleading.
When creating a thread, the operating system reserves a region of virtual memory to
hold the threads execution stack (often as large as 8 MB). Only a small fragment of this
memory is actually mapped to real memory, though. Thus, if you look a bit closer, you
might find the Python process is using far less real memory (e.g., for 2,000 threads, only
echo_server(('',15000))
70 MB of real memory is used, not 9 GB). If the size of the virtual memory is a concern,
you can dial it down using the threading.stack_size() function. For example:
尽管这个也可以工作,
但是它不能抵御有人试图通过创建大量线程让你服务器资源枯竭而崩溃的攻击行为。
通过使用预先初始化的线程池,你可以设置同时运行线程的上限数量。
import threading
threading.stack_size(65536)
你可能会关心创建大量线程会有什么后果。
现代操作系统可以很轻松的创建几千个线程的线程池。
甚至,同时几千个线程等待工作并不会对其他代码产生性能影响。
当然了如果所有线程同时被唤醒并立即在CPU上执行那就不同了——特别是有了全局解释器锁GIL。
通常你应该只在I/O处理相关代码中使用线程池。
If you add this call and repeat the experiment of creating 2,000 threads, youll find that
the Python process is now only using about 210 MB of virtual memory, although the
amount of real memory in use remains about the same. Note that the thread stack size
must be at least 32,768 bytes, and is usually restricted to be a multiple of the system
memory page size (4096, 8192, etc.).
创建大的线程池的一个可能需要关注的问题是内存的使用。
例如如果你在OS X系统上面创建2000个线程系统显示Python进程使用了超过9GB的虚拟内存。
不过,这个计算通常是有误差的。当创建一个线程时,操作系统会预留一个虚拟内存区域来
放置线程的执行栈通常是8MB大小。但是这个内存只有一小片段被实际映射到真实内存中。
因此Python进程使用到的真实内存其实很小
比如对于2000个线程来讲只使用到了70MB的真实内存而不是9GB
如果你担心虚拟内存大小,可以使用 ``threading.stack_size()`` 函数来降低它。例如:
.. code-block:: python
import threading
threading.stack_size(65536)
如果你加上这条语句并再次运行前面的创建2000个线程试验
你会发现Python进程只使用到了大概210MB的虚拟内存而真实内存使用量没有变。
注意线程栈大小必须至少为32768字节通常是系统内存页大小4096、8192等的整数倍。