momo's Blog.

Python进程间通讯,实现RPC调用

字数统计: 359阅读时长: 1 min
2021/10/15 Share

前言

如何通过 multiprocessing 模块实现Python的RPC

实现方法

首先, 我们需要实现一个RPC的注册处理类

改类实现了方法注册和保存功能, 通过connect对象,去循环接收执行函数并返回.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# RPCHandler.py
import pickle


class RPCHandler:
def __init__(self):
self._functions = {}

def register_function(self, func):
self._functions[func.__name__] = func

def handle_connection(self, connection):
try:
while True:
# 接收一条消息
func_name, args, kwargs = pickle.loads(connection.recv())
# 运行函数并返回
try:
r = self._functions[func_name](*args, **kwargs)
connection.send(pickle.dumps(r))
except Exception as e:
connection.send(pickle.dumps(e))
except EOFError:
pass

我们还需要一个服务端的代码, 负责监听客户端的连接.

1
2
3
4
5
6
7
8
9
10
11
12
13
# server.py

from multiprocessing.connection import Listener
from threading import Thread


def rpc_server(handler, address, authkey):
sock = Listener(address, authkey=authkey)
while True:
client = sock.accept()
t = Thread(target=handler.handle_connection, args=(client,))
t.daemon = True
t.start()

创建函数

1
2
3
4
5
6
7
8
9
10
11
# functions.py

from RPCHandler import RPCHandler

def add(x, y):
return x + y


# 注册函数
handler = RPCHandler()
handler.register_function(add)

启动函数

1
2
3
4
5
6
7
8
9
10
from server import rpc_server
from functions import handler


def main():
rpc_server(handler, ('localhost', 17000), authkey=b'HKx2A7r@h3&z@#G2r7jT#w6ziqhnPzZ#')


if __name__ == '__main__':
main()

通过上述方法, 实现了服务端的功能, 但是我们还需要一个客户端, 并且为了调用方便, 我们需要写一个Proxy的类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import pickle


class RPCProxy:
def __init__(self, connection):
self._connection = connection

def __getattr__(self, name):
def do_rpc(*args, **kwargs):
self._connection.send(pickle.dumps((name, args, kwargs)))
result = pickle.loads(self._connection.recv())
if isinstance(result, Exception):
raise result
return result
return do_rpc


from multiprocessing.connection import Client
c = Client(('localhost', 17000), authkey=b'HKx2A7r@h3&z@#G2r7jT#w6ziqhnPzZ#')
proxy = RPCProxy(c)

print(proxy.add(2, 3))
CATALOG
  1. 1. 前言
  2. 2. 实现方法