前言 如何通过 multiprocessing 模块实现Python的RPC
实现方法 首先, 我们需要实现一个RPC的注册处理类
改类实现了方法注册和保存功能, 通过connect对象,去循环接收执行函数并返回.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import pickleclass RPCHandler : def __init__ (self) : self._functions = {} def register_function (self, func) : self._functions[func.__name__] = func def handle_connection (self, connection) : try : while True : func_name, args, kwargs = pickle.loads(connection.recv()) try : r = self._functions[func_name](*args, **kwargs) connection.send(pickle.dumps(r)) except Exception as e: connection.send(pickle.dumps(e)) except EOFError: pass
我们还需要一个服务端的代码, 负责监听客户端的连接.
1 2 3 4 5 6 7 8 9 10 11 12 13 from multiprocessing.connection import Listenerfrom threading import Threaddef rpc_server(handler, address, authkey): sock = Listener(address, authkey =authkey) while True : client = sock.accept() t = Thread(target =handler.handle_connection, args=(client,)) t.daemon = True t.start()
创建函数
1 2 3 4 5 6 7 8 9 10 11 from RPCHandler import RPCHandlerdef add (x, y) : return x + y handler = RPCHandler() handler.register_function(add)
启动函数
1 2 3 4 5 6 7 8 9 10 from server import rpc_serverfrom functions import handlerdef main () : rpc_server(handler, ('localhost' , 17000 ), authkey=b'HKx2A7r@h3&z@#G2r7jT#w6ziqhnPzZ#' ) if __name__ == '__main__' : main()
通过上述方法, 实现了服务端的功能, 但是我们还需要一个客户端, 并且为了调用方便, 我们需要写一个Proxy的类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import pickle class RPCProxy : def __init__ (self , connection) : self ._connection = connection def __getattr__ (self , name) : def do_rpc (*args, **kwargs) : self ._connection.send(pickle.dumps((name, args, kwargs))) result = pickle.loads(self ._connection.recv()) if isinstance(result, Exception): raise result return result return do_rpc from multiprocessing.connection import Client c = Client(('localhost' , 17000 ), authkey=b'HKx2A7r@h3&z@#G2r7jT#w6ziqhnPzZ#' ) proxy = RPCProxy(c) print(proxy.add(2 , 3 ))