============================== 11.11 è¿›ç¨‹é—´ä¼ é€’Socket文件æè¿°ç¬¦ ============================== ---------- 问题 ---------- ä½ æœ‰å¤šä¸ªPythonè§£é‡Šå™¨è¿›ç¨‹åœ¨åŒæ—¶è¿è¡Œï¼Œä½ 想将æŸä¸ªæ‰“开的文件æè¿°ç¬¦ä»Žä¸€ä¸ªè§£é‡Šå™¨ä¼ 递给å¦å¤–一个。 比如,å‡è®¾æœ‰ä¸ªæœåŠ¡å™¨è¿›ç¨‹ç›¸åº”è¿žæŽ¥è¯·æ±‚ï¼Œä½†æ˜¯å®žé™…çš„ç›¸åº”é€»è¾‘æ˜¯åœ¨å¦ä¸€ä¸ªè§£é‡Šå™¨ä¸æ‰§è¡Œçš„。 ---------- 解决方案 ---------- 为了在多个进程ä¸ä¼ 递文件æè¿°ç¬¦ï¼Œä½ 首先需è¦å°†å®ƒä»¬è¿žæŽ¥åˆ°ä¸€èµ·ã€‚在Unixæœºå™¨ä¸Šï¼Œä½ å¯èƒ½éœ€è¦ä½¿ç”¨Unix域套接å—, 而在windows上é¢ä½ 需è¦ä½¿ç”¨å‘½å管é“。ä¸è¿‡ä½ æ— éœ€çœŸçš„éœ€è¦åŽ»æ“作这些底层, 通常使用 ``multiprocessing`` æ¨¡å—æ¥åˆ›å»ºè¿™æ ·çš„连接会更容易一些。 ä¸€æ—¦ä¸€ä¸ªè¿žæŽ¥è¢«åˆ›å»ºï¼Œä½ å¯ä»¥ä½¿ç”¨ ``multiprocessing.reduction`` ä¸çš„ ``send_handle()`` å’Œ ``recv_handle()`` 函数在ä¸åŒçš„处ç†å™¨ç›´æŽ¥ä¼ 递文件æè¿°ç¬¦ã€‚ 下é¢çš„例忼”示了最基本的用法: .. code-block:: python import multiprocessing from multiprocessing.reduction import recv_handle, send_handle import socket def worker(in_p, out_p): out_p.close() while True: fd = recv_handle(in_p) print('CHILD: GOT FD', fd) with socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd) as s: while True: msg = s.recv(1024) if not msg: break print('CHILD: RECV {!r}'.format(msg)) s.send(msg) def server(address, in_p, out_p, worker_pid): in_p.close() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) s.bind(address) s.listen(1) while True: client, addr = s.accept() print('SERVER: Got connection from', addr) send_handle(out_p, client.fileno(), worker_pid) client.close() if __name__ == '__main__': c1, c2 = multiprocessing.Pipe() worker_p = multiprocessing.Process(target=worker, args=(c1,c2)) worker_p.start() server_p = multiprocessing.Process(target=server, args=(('', 15000), c1, c2, worker_p.pid)) server_p.start() c1.close() c2.close() 在这个例åä¸ï¼Œä¸¤ä¸ªè¿›ç¨‹è¢«åˆ›å»ºå¹¶é€šè¿‡ä¸€ä¸ª ``multiprocessing`` 管é“连接起æ¥ã€‚ æœåŠ¡å™¨è¿›ç¨‹æ‰“å¼€ä¸€ä¸ªsocketå¹¶ç‰å¾…客户端连接请求。 工作进程仅仅使用 ``recv_handle()`` 在管é“上é¢ç‰å¾…接收一个文件æè¿°ç¬¦ã€‚ 当æœåŠ¡å™¨æŽ¥æ”¶åˆ°ä¸€ä¸ªè¿žæŽ¥ï¼Œå®ƒå°†äº§ç”Ÿçš„socket文件æè¿°ç¬¦é€šè¿‡ ``send_handle()`` ä¼ é€’ç»™å·¥ä½œè¿›ç¨‹ã€‚ 工作进程接收到socketåŽå‘客户端回应数æ®ï¼Œç„¶åŽæ¤æ¬¡è¿žæŽ¥å…³é—。 å¦‚æžœä½ ä½¿ç”¨Telnet或类似工具连接到æœåŠ¡å™¨ï¼Œä¸‹é¢æ˜¯ä¸€ä¸ªæ¼”示例å: bash % python3 passfd.py SERVER: Got connection from ('127.0.0.1', 55543) CHILD: GOT FD 7 CHILD: RECV b'Hello\r\n' CHILD: RECV b'World\r\n' æ¤ä¾‹æœ€é‡è¦çš„部分是æœåŠ¡å™¨æŽ¥æ”¶åˆ°çš„å®¢æˆ·ç«¯socket实际上被å¦å¤–一个ä¸åŒçš„进程处ç†ã€‚ æœåŠ¡å™¨ä»…ä»…åªæ˜¯å°†å…¶è½¬æ‰‹å¹¶å…³é—æ¤è¿žæŽ¥ï¼Œç„¶åŽç‰å¾…下一个连接。 ---------- 讨论 ---------- 对于大部分程åºå‘˜æ¥è®²åœ¨ä¸åŒè¿›ç¨‹ä¹‹é—´ä¼ 递文件æè¿°ç¬¦å¥½åƒæ²¡ä»€ä¹ˆå¿…è¦ã€‚ ä½†æ˜¯ï¼Œæœ‰æ—¶å€™å®ƒæ˜¯æž„å»ºä¸€ä¸ªå¯æ‰©å±•ç³»ç»Ÿçš„å¾ˆæœ‰ç”¨çš„å·¥å…·ã€‚ä¾‹å¦‚ï¼Œåœ¨ä¸€ä¸ªå¤šæ ¸æœºå™¨ä¸Šé¢ï¼Œ ä½ å¯ä»¥æœ‰å¤šä¸ªPython解释器实例,将文件æè¿°ç¬¦ä¼ 递给其它解释器æ¥å®žçŽ°è´Ÿè½½å‡è¡¡ã€‚ ``send_handle()`` å’Œ ``recv_handle()`` 函数åªèƒ½å¤Ÿç”¨äºŽ ``multiprocessing`` 连接。 使用它们æ¥ä»£æ›¿ç®¡é“的使用(å‚考11.7节),åªè¦ä½ 使用的是UnixåŸŸå¥—æŽ¥å—æˆ–Windows管é“。 ä¾‹å¦‚ï¼Œä½ å¯ä»¥è®©æœåŠ¡å™¨å’Œå·¥ä½œè€…å„自以å•ç‹¬çš„ç¨‹åºæ¥å¯åŠ¨ã€‚ä¸‹é¢æ˜¯æœåŠ¡å™¨çš„å®žçŽ°ä¾‹å: .. code-block:: python # servermp.py from multiprocessing.connection import Listener from multiprocessing.reduction import send_handle import socket def server(work_address, port): # Wait for the worker to connect work_serv = Listener(work_address, authkey=b'peekaboo') worker = work_serv.accept() worker_pid = worker.recv() # Now run a TCP/IP server and send clients to worker s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) s.bind(('', port)) s.listen(1) while True: client, addr = s.accept() print('SERVER: Got connection from', addr) send_handle(worker, client.fileno(), worker_pid) client.close() if __name__ == '__main__': import sys if len(sys.argv) != 3: print('Usage: server.py server_address port', file=sys.stderr) raise SystemExit(1) server(sys.argv[1], int(sys.argv[2])) è¿è¡Œè¿™ä¸ªæœåŠ¡å™¨ï¼Œåªéœ€è¦æ‰§è¡Œ `python3 servermp.py /tmp/servconn 15000` ï¼Œä¸‹é¢æ˜¯ç›¸åº”的工作者代ç : .. code-block:: python # workermp.py from multiprocessing.connection import Client from multiprocessing.reduction import recv_handle import os from socket import socket, AF_INET, SOCK_STREAM def worker(server_address): serv = Client(server_address, authkey=b'peekaboo') serv.send(os.getpid()) while True: fd = recv_handle(serv) print('WORKER: GOT FD', fd) with socket(AF_INET, SOCK_STREAM, fileno=fd) as client: while True: msg = client.recv(1024) if not msg: break print('WORKER: RECV {!r}'.format(msg)) client.send(msg) if __name__ == '__main__': import sys if len(sys.argv) != 2: print('Usage: worker.py server_address', file=sys.stderr) raise SystemExit(1) worker(sys.argv[1]) è¦è¿è¡Œå·¥ä½œè€…,执行执行命令 `python3 workermp.py /tmp/servconn` . 效果跟使用Pipe()ä¾‹åæ˜¯å®Œå…¨ä¸€æ ·çš„。 文件æè¿°ç¬¦çš„ä¼ é€’ä¼šæ¶‰åŠåˆ°UNIX域套接å—的创建和套接å—çš„ ``sendmsg()`` 方法。 ä¸è¿‡è¿™ç§æŠ€æœ¯å¹¶ä¸å¸¸è§ï¼Œä¸‹é¢æ˜¯ä½¿ç”¨å¥—æŽ¥å—æ¥ä¼ 递æè¿°ç¬¦çš„å¦å¤–一ç§å®žçŽ°ï¼š .. code-block:: python # server.py import socket import struct def send_fd(sock, fd): ''' Send a single file descriptor. ''' sock.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('i', fd))]) ack = sock.recv(2) assert ack == b'OK' def server(work_address, port): # Wait for the worker to connect work_serv = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) work_serv.bind(work_address) work_serv.listen(1) worker, addr = work_serv.accept() # Now run a TCP/IP server and send clients to worker s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) s.bind(('',port)) s.listen(1) while True: client, addr = s.accept() print('SERVER: Got connection from', addr) send_fd(worker, client.fileno()) client.close() if __name__ == '__main__': import sys if len(sys.argv) != 3: print('Usage: server.py server_address port', file=sys.stderr) raise SystemExit(1) server(sys.argv[1], int(sys.argv[2])) 䏋颿˜¯ä½¿ç”¨å¥—接å—的工作者实现: .. code-block:: python # worker.py import socket import struct def recv_fd(sock): ''' Receive a single file descriptor ''' msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(struct.calcsize('i'))) cmsg_level, cmsg_type, cmsg_data = ancdata[0] assert cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS sock.sendall(b'OK') return struct.unpack('i', cmsg_data)[0] def worker(server_address): serv = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) serv.connect(server_address) while True: fd = recv_fd(serv) print('WORKER: GOT FD', fd) with socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd) as client: while True: msg = client.recv(1024) if not msg: break print('WORKER: RECV {!r}'.format(msg)) client.send(msg) if __name__ == '__main__': import sys if len(sys.argv) != 2: print('Usage: worker.py server_address', file=sys.stderr) raise SystemExit(1) worker(sys.argv[1]) å¦‚æžœä½ æƒ³åœ¨ä½ çš„ç¨‹åºä¸ä¼ 递文件æè¿°ç¬¦ï¼Œå»ºè®®ä½ å‚é˜…å…¶ä»–ä¸€äº›æ›´åŠ é«˜çº§çš„æ–‡æ¡£ï¼Œ 比如 ``Unix Network Programming by W. Richard Stevens (Prentice Hall, 1990)`` . 在Windowsä¸Šä¼ é€’æ–‡ä»¶æè¿°ç¬¦è·ŸUnix是ä¸ä¸€æ ·çš„ï¼Œå»ºè®®ä½ ç ”ç©¶ä¸‹ ``multiprocessing.reduction`` ä¸çš„æºä»£ç 看看其工作原ç†ã€‚