11.11小节完成
This commit is contained in:
@@ -5,76 +5,72 @@
|
||||
----------
|
||||
问题
|
||||
----------
|
||||
You have multiple Python interpreter processes running and you want to pass an open
|
||||
file descriptor from one interpreter to the other. For instance, perhaps there is a server
|
||||
process that is responsible for receiving connections, but the actual servicing of clients
|
||||
is to be handled by a different interpreter.
|
||||
你有多个Python解释器进程在同时运行,你想将某个打开的文件描述符从一个解释器传递给另外一个。
|
||||
比如,假设有个服务器进程相应连接请求,但是实际的相应逻辑是在另一个解释器中执行的。
|
||||
|
||||
|
|
||||
|
||||
----------
|
||||
解决方案
|
||||
----------
|
||||
To pass a file descriptor between processes, you first need to connect the processes
|
||||
together. On Unix machines, you might use a Unix domain socket, whereas on Win‐
|
||||
dows, you could use a named pipe. However, rather than deal with such low-level
|
||||
mechanics, it is often easier to use the multiprocessing module to set up such a
|
||||
connection.
|
||||
为了在多个进程中传递文件描述符,你首先需要将它们连接到一起。在Unix机器上,你可能需要使用Unix域套接字,
|
||||
而在windows上面你需要使用命名管道。不过你无需真的需要去操作这些底层,
|
||||
通常使用 ``multiprocessing`` 模块来创建这样的连接会更容易一些。
|
||||
|
||||
Once a connection is established, you can use the send_handle() and recv_handle()
|
||||
functions in multiprocessing.reduction to send file descriptors between processes.
|
||||
The following example illustrates the basics:
|
||||
一旦一个连接被创建,你可以使用 ``multiprocessing.reduction`` 中的
|
||||
``send_handle()`` 和 ``recv_handle()`` 函数在不同的处理器直接传递文件描述符。
|
||||
下面的例子演示了最基本的用法:
|
||||
|
||||
import multiprocessing
|
||||
from multiprocessing.reduction import recv_handle, send_handle
|
||||
import socket
|
||||
.. code-block:: python
|
||||
|
||||
def worker(in_p, out_p):
|
||||
out_p.close()
|
||||
while True:
|
||||
fd = recv_handle(in_p)
|
||||
print('CHILD: GOT FD', fd)
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd) as s:
|
||||
while True:
|
||||
msg = s.recv(1024)
|
||||
if not msg:
|
||||
break
|
||||
print('CHILD: RECV {!r}'.format(msg))
|
||||
s.send(msg)
|
||||
import multiprocessing
|
||||
from multiprocessing.reduction import recv_handle, send_handle
|
||||
import socket
|
||||
|
||||
def server(address, in_p, out_p, worker_pid):
|
||||
in_p.close()
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
|
||||
s.bind(address)
|
||||
s.listen(1)
|
||||
while True:
|
||||
client, addr = s.accept()
|
||||
print('SERVER: Got connection from', addr)
|
||||
send_handle(out_p, client.fileno(), worker_pid)
|
||||
client.close()
|
||||
def worker(in_p, out_p):
|
||||
out_p.close()
|
||||
while True:
|
||||
fd = recv_handle(in_p)
|
||||
print('CHILD: GOT FD', fd)
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd) as s:
|
||||
while True:
|
||||
msg = s.recv(1024)
|
||||
if not msg:
|
||||
break
|
||||
print('CHILD: RECV {!r}'.format(msg))
|
||||
s.send(msg)
|
||||
|
||||
if __name__ == '__main__':
|
||||
c1, c2 = multiprocessing.Pipe()
|
||||
worker_p = multiprocessing.Process(target=worker, args=(c1,c2))
|
||||
worker_p.start()
|
||||
def server(address, in_p, out_p, worker_pid):
|
||||
in_p.close()
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
|
||||
s.bind(address)
|
||||
s.listen(1)
|
||||
while True:
|
||||
client, addr = s.accept()
|
||||
print('SERVER: Got connection from', addr)
|
||||
send_handle(out_p, client.fileno(), worker_pid)
|
||||
client.close()
|
||||
|
||||
server_p = multiprocessing.Process(target=server,
|
||||
args=(('', 15000), c1, c2, worker_p.pid))
|
||||
server_p.start()
|
||||
if __name__ == '__main__':
|
||||
c1, c2 = multiprocessing.Pipe()
|
||||
worker_p = multiprocessing.Process(target=worker, args=(c1,c2))
|
||||
worker_p.start()
|
||||
|
||||
c1.close()
|
||||
c2.close()
|
||||
server_p = multiprocessing.Process(target=server,
|
||||
args=(('', 15000), c1, c2, worker_p.pid))
|
||||
server_p.start()
|
||||
|
||||
In this example, two processes are spawned and connected by a multiprocessing Pipe
|
||||
object. The server process opens a socket and waits for client connections. The worker
|
||||
process merely waits to receive a file descriptor on the pipe using recv_handle(). When
|
||||
the server receives a connection, it sends the resulting socket file descriptor to the worker
|
||||
c1.close()
|
||||
c2.close()
|
||||
|
||||
using send_handle(). The worker takes over the socket and echoes data back to the
|
||||
client until the connection is closed.
|
||||
If you connect to the running server using Telnet or a similar tool, here is an example
|
||||
of what you might see:
|
||||
在这个例子中,两个进程被创建并通过一个 ``multiprocessing`` 管道连接起来。
|
||||
服务器进程打开一个socket并等待客户端连接请求。
|
||||
工作进程仅仅使用 ``recv_handle()`` 在管道上面等待接收一个文件描述符。
|
||||
当服务器接收到一个连接,它将产生的socket文件描述符通过 ``send_handle()`` 传递给工作进程。
|
||||
工作进程接收到socket后向客户端回应数据,然后此次连接关闭。
|
||||
|
||||
如果你使用Telnet或类似工具连接到服务器,下面是一个演示例子:
|
||||
|
||||
bash % python3 passfd.py
|
||||
SERVER: Got connection from ('127.0.0.1', 55543)
|
||||
@@ -82,179 +78,179 @@ of what you might see:
|
||||
CHILD: RECV b'Hello\r\n'
|
||||
CHILD: RECV b'World\r\n'
|
||||
|
||||
The most important part of this example is the fact that the client socket accepted in the
|
||||
server is actually serviced by a completely different process. The server merely hands it
|
||||
off, closes it, and waits for the next connection.
|
||||
此例最重要的部分是服务器接收到的客户端socket实际上被另外一个不同的进程处理。
|
||||
服务器仅仅只是将其转手并关闭此连接,然后等待下一个连接。
|
||||
|
||||
|
|
||||
|
||||
----------
|
||||
讨论
|
||||
----------
|
||||
Passing file descriptors between processes is something that many programmers don’t
|
||||
even realize is possible. However, it can sometimes be a useful tool in building scalable
|
||||
systems. For example, on a multicore machine, you could have multiple instances of the
|
||||
Python interpreter and use file descriptor passing to more evenly balance the number
|
||||
of clients being handled by each interpreter.
|
||||
The send_handle() and recv_handle() functions shown in the solution really only
|
||||
work with multiprocessing connections. Instead of using a pipe, you can connect in‐
|
||||
terpreters as shown in Recipe 11.7, and it will work as long as you use UNIX domain
|
||||
sockets or Windows pipes. For example, you could implement the server and worker
|
||||
as completely separate programs to be started separately. Here is the implementation of
|
||||
the server:
|
||||
对于大部分程序员来讲在不同进程之间传递文件描述符好像没什么必要。
|
||||
但是,有时候它是构建一个可扩展系统的很有用的工具。例如,在一个多核机器上面,
|
||||
你可以有多个Python解释器实例,将文件描述符传递给其它解释器来实现负载均衡。
|
||||
|
||||
# servermp.py
|
||||
from multiprocessing.connection import Listener
|
||||
from multiprocessing.reduction import send_handle
|
||||
import socket
|
||||
``send_handle()`` 和 ``recv_handle()`` 函数只能够用于 ``multiprocessing`` 连接。
|
||||
使用它们来代替管道的使用(参考11.7节),只要你使用的是Unix域套接字或Windows管道。
|
||||
例如,你可以让服务器和工作者各自以单独的程序来启动。下面是服务器的实现例子:
|
||||
|
||||
def server(work_address, port):
|
||||
# Wait for the worker to connect
|
||||
work_serv = Listener(work_address, authkey=b'peekaboo')
|
||||
worker = work_serv.accept()
|
||||
worker_pid = worker.recv()
|
||||
.. code-block:: python
|
||||
|
||||
# Now run a TCP/IP server and send clients to worker
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
|
||||
s.bind(('', port))
|
||||
s.listen(1)
|
||||
while True:
|
||||
client, addr = s.accept()
|
||||
print('SERVER: Got connection from', addr)
|
||||
# servermp.py
|
||||
from multiprocessing.connection import Listener
|
||||
from multiprocessing.reduction import send_handle
|
||||
import socket
|
||||
|
||||
send_handle(worker, client.fileno(), worker_pid)
|
||||
client.close()
|
||||
def server(work_address, port):
|
||||
# Wait for the worker to connect
|
||||
work_serv = Listener(work_address, authkey=b'peekaboo')
|
||||
worker = work_serv.accept()
|
||||
worker_pid = worker.recv()
|
||||
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
if len(sys.argv) != 3:
|
||||
print('Usage: server.py server_address port', file=sys.stderr)
|
||||
raise SystemExit(1)
|
||||
# Now run a TCP/IP server and send clients to worker
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
|
||||
s.bind(('', port))
|
||||
s.listen(1)
|
||||
while True:
|
||||
client, addr = s.accept()
|
||||
print('SERVER: Got connection from', addr)
|
||||
|
||||
server(sys.argv[1], int(sys.argv[2]))
|
||||
send_handle(worker, client.fileno(), worker_pid)
|
||||
client.close()
|
||||
|
||||
To run this server, you would run a command such as python3 servermp.py /tmp/
|
||||
servconn 15000. Here is the corresponding client code:
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
if len(sys.argv) != 3:
|
||||
print('Usage: server.py server_address port', file=sys.stderr)
|
||||
raise SystemExit(1)
|
||||
|
||||
# workermp.py
|
||||
server(sys.argv[1], int(sys.argv[2]))
|
||||
|
||||
from multiprocessing.connection import Client
|
||||
from multiprocessing.reduction import recv_handle
|
||||
import os
|
||||
from socket import socket, AF_INET, SOCK_STREAM
|
||||
运行这个服务器,只需要执行 `python3 servermp.py /tmp/servconn 15000` ,下面是相应的工作者代码:
|
||||
|
||||
def worker(server_address):
|
||||
serv = Client(server_address, authkey=b'peekaboo')
|
||||
serv.send(os.getpid())
|
||||
while True:
|
||||
fd = recv_handle(serv)
|
||||
print('WORKER: GOT FD', fd)
|
||||
with socket(AF_INET, SOCK_STREAM, fileno=fd) as client:
|
||||
while True:
|
||||
msg = client.recv(1024)
|
||||
if not msg:
|
||||
break
|
||||
print('WORKER: RECV {!r}'.format(msg))
|
||||
client.send(msg)
|
||||
.. code-block:: python
|
||||
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
if len(sys.argv) != 2:
|
||||
print('Usage: worker.py server_address', file=sys.stderr)
|
||||
raise SystemExit(1)
|
||||
# workermp.py
|
||||
|
||||
worker(sys.argv[1])
|
||||
from multiprocessing.connection import Client
|
||||
from multiprocessing.reduction import recv_handle
|
||||
import os
|
||||
from socket import socket, AF_INET, SOCK_STREAM
|
||||
|
||||
To run the worker, you would type python3 workermp.py /tmp/servconn. The result‐
|
||||
ing operation should be exactly the same as the example that used Pipe().
|
||||
Under the covers, file descriptor passing involves creating a UNIX domain socket and
|
||||
the sendmsg() method of sockets. Since this technique is not widely known, here is a
|
||||
different implementation of the server that shows how to pass descriptors using sockets:
|
||||
def worker(server_address):
|
||||
serv = Client(server_address, authkey=b'peekaboo')
|
||||
serv.send(os.getpid())
|
||||
while True:
|
||||
fd = recv_handle(serv)
|
||||
print('WORKER: GOT FD', fd)
|
||||
with socket(AF_INET, SOCK_STREAM, fileno=fd) as client:
|
||||
while True:
|
||||
msg = client.recv(1024)
|
||||
if not msg:
|
||||
break
|
||||
print('WORKER: RECV {!r}'.format(msg))
|
||||
client.send(msg)
|
||||
|
||||
# server.py
|
||||
import socket
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
if len(sys.argv) != 2:
|
||||
print('Usage: worker.py server_address', file=sys.stderr)
|
||||
raise SystemExit(1)
|
||||
|
||||
import struct
|
||||
worker(sys.argv[1])
|
||||
|
||||
def send_fd(sock, fd):
|
||||
'''
|
||||
Send a single file descriptor.
|
||||
'''
|
||||
sock.sendmsg([b'x'],
|
||||
[(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('i', fd))])
|
||||
ack = sock.recv(2)
|
||||
assert ack == b'OK'
|
||||
要运行工作者,执行执行命令 `python3 workermp.py /tmp/servconn` .
|
||||
效果跟使用Pipe()例子是完全一样的。
|
||||
文件描述符的传递会涉及到UNIX域套接字的创建和套接字的 ``sendmsg()`` 方法。
|
||||
不过这种技术并不常见,下面是使用套接字来传递描述符的另外一种实现:
|
||||
|
||||
def server(work_address, port):
|
||||
# Wait for the worker to connect
|
||||
work_serv = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
work_serv.bind(work_address)
|
||||
work_serv.listen(1)
|
||||
worker, addr = work_serv.accept()
|
||||
.. code-block:: python
|
||||
|
||||
# Now run a TCP/IP server and send clients to worker
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
|
||||
s.bind(('',port))
|
||||
s.listen(1)
|
||||
while True:
|
||||
client, addr = s.accept()
|
||||
print('SERVER: Got connection from', addr)
|
||||
send_fd(worker, client.fileno())
|
||||
client.close()
|
||||
# server.py
|
||||
import socket
|
||||
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
if len(sys.argv) != 3:
|
||||
print('Usage: server.py server_address port', file=sys.stderr)
|
||||
raise SystemExit(1)
|
||||
import struct
|
||||
|
||||
server(sys.argv[1], int(sys.argv[2]))
|
||||
def send_fd(sock, fd):
|
||||
'''
|
||||
Send a single file descriptor.
|
||||
'''
|
||||
sock.sendmsg([b'x'],
|
||||
[(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('i', fd))])
|
||||
ack = sock.recv(2)
|
||||
assert ack == b'OK'
|
||||
|
||||
Here is an implementation of the worker using sockets:
|
||||
def server(work_address, port):
|
||||
# Wait for the worker to connect
|
||||
work_serv = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
work_serv.bind(work_address)
|
||||
work_serv.listen(1)
|
||||
worker, addr = work_serv.accept()
|
||||
|
||||
# worker.py
|
||||
import socket
|
||||
import struct
|
||||
# Now run a TCP/IP server and send clients to worker
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
|
||||
s.bind(('',port))
|
||||
s.listen(1)
|
||||
while True:
|
||||
client, addr = s.accept()
|
||||
print('SERVER: Got connection from', addr)
|
||||
send_fd(worker, client.fileno())
|
||||
client.close()
|
||||
|
||||
def recv_fd(sock):
|
||||
'''
|
||||
Receive a single file descriptor
|
||||
'''
|
||||
msg, ancdata, flags, addr = sock.recvmsg(1,
|
||||
socket.CMSG_LEN(struct.calcsize('i')))
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
if len(sys.argv) != 3:
|
||||
print('Usage: server.py server_address port', file=sys.stderr)
|
||||
raise SystemExit(1)
|
||||
|
||||
cmsg_level, cmsg_type, cmsg_data = ancdata[0]
|
||||
assert cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS
|
||||
sock.sendall(b'OK')
|
||||
server(sys.argv[1], int(sys.argv[2]))
|
||||
|
||||
return struct.unpack('i', cmsg_data)[0]
|
||||
下面是使用套接字的工作者实现:
|
||||
|
||||
def worker(server_address):
|
||||
serv = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
serv.connect(server_address)
|
||||
while True:
|
||||
fd = recv_fd(serv)
|
||||
print('WORKER: GOT FD', fd)
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd) as client:
|
||||
while True:
|
||||
msg = client.recv(1024)
|
||||
if not msg:
|
||||
break
|
||||
print('WORKER: RECV {!r}'.format(msg))
|
||||
client.send(msg)
|
||||
.. code-block:: python
|
||||
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
if len(sys.argv) != 2:
|
||||
print('Usage: worker.py server_address', file=sys.stderr)
|
||||
raise SystemExit(1)
|
||||
# worker.py
|
||||
import socket
|
||||
import struct
|
||||
|
||||
worker(sys.argv[1])
|
||||
def recv_fd(sock):
|
||||
'''
|
||||
Receive a single file descriptor
|
||||
'''
|
||||
msg, ancdata, flags, addr = sock.recvmsg(1,
|
||||
socket.CMSG_LEN(struct.calcsize('i')))
|
||||
|
||||
cmsg_level, cmsg_type, cmsg_data = ancdata[0]
|
||||
assert cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS
|
||||
sock.sendall(b'OK')
|
||||
|
||||
return struct.unpack('i', cmsg_data)[0]
|
||||
|
||||
def worker(server_address):
|
||||
serv = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
serv.connect(server_address)
|
||||
while True:
|
||||
fd = recv_fd(serv)
|
||||
print('WORKER: GOT FD', fd)
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd) as client:
|
||||
while True:
|
||||
msg = client.recv(1024)
|
||||
if not msg:
|
||||
break
|
||||
print('WORKER: RECV {!r}'.format(msg))
|
||||
client.send(msg)
|
||||
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
if len(sys.argv) != 2:
|
||||
print('Usage: worker.py server_address', file=sys.stderr)
|
||||
raise SystemExit(1)
|
||||
|
||||
worker(sys.argv[1])
|
||||
|
||||
如果你想在你的程序中传递文件描述符,建议你参阅其他一些更加高级的文档,
|
||||
比如 ``Unix Network Programming by W. Richard Stevens (Prentice Hall, 1990)`` .
|
||||
在Windows上传递文件描述符跟Unix是不一样的,建议你研究下 ``multiprocessing.reduction`` 中的源代码看看其工作原理。
|
||||
|
||||
If you are going to use file-descriptor passing in your program, it is advisable to read
|
||||
more about it in an advanced text, such as Unix Network Programming by W. Richard
|
||||
Stevens (Prentice Hall, 1990). Passing file descriptors on Windows uses a different
|
||||
technique than Unix (not shown). For that platform, it is advisable to study the source
|
||||
code to multiprocessing.reduction in close detail to see how it works.
|
||||
|
||||
Reference in New Issue
Block a user