HiveNetGRpc总览¶
HiveNetGRpc是HiveNetAssemble提供的gRpc的封装, 可以简单实现兼容 HiveNetWebUtils.server.ServerBaseFW 和 HiveNetWebUtils.client.ClientBaseFw 的服务器和客户端功能。
安装方法¶
源码方式安装¶
HiveNetGRpc库安装
1、下载整体源码到需要安装的服务器上;
2、通过命令行进入源码目录
3、执行编译命令:python setup.py build
4、执行安装命令:python setup.py install
PIPY安装:pip install HiveNetGRpc
安装包打包(2种方式)
1、python安装包方式:python setup.py sdist
安装:python setup.py install
2、python setup.py bdist_wheel
安装:pip install HiveNetGRpc-0.1.0-py3-none-any.whl
安装注意事项
安装后需要执行一次proto_generate,重新生成系统默认的proto消息结构(应对系统环境不同或grpcio版本不同的情况)
库模块大纲¶
server¶
server模块提供ServerBaseFW服务框的gPpc实现类GRpcServer和AIOGRpcServer(异步IO模式),可基于该实现类快速构建gRpc服务。
注意:AIOGRpcServer是新的特性,隐含了一些异步的bug(表面看不影响程序处理,只是会打印一些错误),如果对异步IO没有要求建议直接使用GRpcServer。
client¶
client模块提供ClientBaseFw客户端连接框架的gRpc实现类AIOGRpcClient(异步IO模式)和GRpcClient,以及支持AIOConnectionPool连接池应用的PoolConnectionFW连接池对象实现类GRpcPoolConnection。
tool¶
tool模块提供GRpcTool工具类,支持一些常用的gRpc信息提取及处理。
proto¶
proto包提供HiveNetGRpc默认支持的消息定义,包括:
msg_json (JsonService) - 支持json信息收发的消息定义
msg_bytes (BytesService) - 支持bytes信息收发的消息定义
msg_formater¶
msg_formater模块提供HiveNetGRpc默认支持消息格式化处理类,包括:
RemoteCallFormater - 远程调用函数的消息格式化类,该类基于默认的JsonService报文格式进行处理,可以便捷地实现客户端通过gRpc方式对远程服务端函数的调用。
HiveNetGRpc使用说明¶
原生使用模式¶
该模式大部分处理需使用gRpc原生对象,可以根据自己的需要进行更灵活的扩展。
1、自定义gRpc的消息格式(protobuf)¶
您可以直接使用HiveNetGRpc默认支持的msg_json和msg_bytes,同样也可以自定义自己所需要的消息格式,操作步骤说明如下:
(1)创建 .proto 文件,例如 msg_test.proto,内容和格式可参考HiveNetGRpc.proto.msg_json.proto,按需要定义RpcRequest、RpcResponse的结构,注意除消息结构以及服务名以外,其他内容中的命名请保持与HiveNetGRpc.proto.msg_json.proto一致,例如:
syntax = "proto3";
package HiveNetGRPC;
// 服务名可按需要修改为不同的服务名(注意同一个包中的服务名不要重复)
// rpc名请保证跟示例保持一致, 为标识名称和Service的组合
service TestService {
rpc GRpcCallSimple (RpcRequest) returns (RpcResponse){}; // 简单调用
rpc GRpcCallClientSideStream (stream RpcRequest) returns (RpcResponse){}; // 客户端流式
rpc GRpcCallServerSideStream (RpcRequest) returns (stream RpcResponse){}; // 服务端流式
rpc GRpcCallBidirectionalStream (stream RpcRequest) returns (stream RpcResponse){}; // 双向数据流模式
rpc GRpcCallHealthCheck (HealthRequest) returns (HealthResponse){}; // 健康检查
}
// 请求消息结构
message RpcRequest {
// 可以自定义请求消息结构
...
}
// 响应消息结构
message RpcResponse {
// 可以自定义响应消息结构
...
}
// 自定义健康检查的服务
message HealthRequest {
string service = 1; // 健康监控请求
}
message HealthResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3;
}
ServingStatus status = 1;
}
(2)编译.proto文件
命令行进入.proto所在的目录,执行以下命令进行编译:
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. test_json.proto
编译后将生成test_json_pb2.py、msg_json_pb2_grpc.py两个文件。
注意:编译前需使用pip安装grpcio-tools和googleapis-common-protos
(3)修改test_json_pb2_grpc.py文件,增加上库路径指定 test_json_pb2 对象引入
import sys
import os
# 根据当前文件路径将包路径纳入,在非安装的情况下可以引用到
sys.path.append(os.path.abspath(os.path.join(
os.path.dirname(__file__), os.path.pardir, os.path.pardir)))
import xxx.msg_json_pb2 as msg__json__pb2
2、自定义服务管理类(Servicer)¶
服务管理类Servicer实现的功能包括管理请求处理函数路由和为gRpc提供标准接入接口,HiveNetGRpc已实现了通用的服务管类GRpcServicer(与GRpcServer配套使用)和AIOGRpcServicer(与AIOGRpcServer配套使用),如果需要自定义服务管理的逻辑(例如改变处理函数的入参和出参格式,以及增加获取到请求的自定义处理逻辑等),则可参考GRpcServicer或AIOGRpcServicer来实现自己的服务管理类(公开函数的接口需要保持一致)。
注意:GRpcServicer、AIOGRpcServicer必须与服务器类GRpcServicer、AIOGRpcServer配套使用,如果将AIOGRpcServicer用于GRpcServicer将无法正常处理。
3、开发服务端请求处理函数¶
开发服务端请求的处理函数,函数位置可以在模块中,也可以是类的静态函数或实例成员函数,同时函数也可以是异步IO函数。例如:
def deal_func(request: dict) -> msg_test_pb2.RpcResponse:
"""服务端处理函数"""
# 自定义的处理逻辑
...
# 返回响应对象 RpcResponse
...
注意,处理函数的入参和出参是需要根据不同的服务管理类(Servicer)进行定义,如果使用默认的GRpcServicer,函数定义说明如下:
(1)函数入参固定为 func(request):
request为请求字典,说明如下:
{
'request': xxx_pb2.RpcRequest/xxx_pb2.RpcRequest迭代器, # 请求报文对象, 如果是流模式则为请求报文对象的迭代器
'context': context, # 请求服务端上下文, grpc.ServicerContext
'call_mode': call_mode # 调用模式,具体见 HiveNetGRpc.enum.EnumCallMode
}
注意: 如果call_mode为ClientSideStream或BidirectionalStream,客户端通过流方式发送数据,则请求对象为迭代器,可以通过 __anext__()来逐个进行获取,参考代码如下:
while True:
try:
# 逐个获取客户端发送过来的请求
_item: xxx_pb2.RpcRequest = AsyncTools.sync_run_coroutine(
request_iter.__anext__()
)
# 处理逻辑
...
except StopAsyncIteration:
break
(2)函数的返回值为xxx_pb2.RpcResponse或xxx_pb2.RpcResponse的迭代对象,如果call_mode为ServerSideStream和BidirectionalStream,服务端应通过流方式返回数据,需要返回异步IO的迭代对象,参考处理函数的返回方式如下:
async def deal_fun(request):
# 请求处理逻辑
...
# 流方式返回处理结果
for xx in xxx:
...
_ret: xxx_pb2.RpcResponse = ...
yield _ret
4、gRpc服务端代码¶
服务端的参考代码如下:
# 创建服务管理类的映射字典, key为服务名, value维护服务管理类实例对象, 可以支持送入多个服务管理类
# 如果不需要自定义服务管理类可以不送该参数, 服务端会默认使用JsonService(msg_json格式)和GRpcServicer作为默认的服务管理类
_servicer_mapping = {
'JsonService': GRpcServicer(...),
'BytesService': GRpcServicer(...)
}
# 初始化gRpc服务对象
_server = GRpcServer(
'my_server', server_config={
'run_config': {
'host': '127.0.0.1', 'port': 50051, 'workers': 2,
'enable_health_check': True,
}
},
servicer_mapping=_servicer_mapping
)
# 添加请求处理函数
AsyncTools.sync_run_coroutine(_server.add_service(
'my_deal_func_uri', deal_func, call_mode=EnumCallMode.Simple, servicer_name='JsonService'
))
# 启动gRpc服务
AsyncTools.sync_run_coroutine(_server.start(is_asyn=False))
5、gRpc客户端代码¶
gRpc客户端可以根据需要使用AIOGRpcClient(异步IO模式)或GRpcClient(同步模式),参考代码如下:
with AIOGRpcClient({
'host': '127.0.0.1', 'port': 50051, 'ping_on_connect': True, 'ping_with_health_check': True,
'use_sync_client': False, 'timeout': 3
}) as _client:
_request = xxx_pb2.RpcRequest(
xx1='value1', xx2='value2', ...
)
_result = AsyncTools.sync_run_coroutine(client.call(
'my_deal_func_uri', _request, call_mode=EnumCallMode.Simple
))
通过RemoteCallFormater实现函数远程调用¶
HiveNetGRpc.msg_formater.RemoteCallFormater实现基于json格式传输数据的函数远程调用的服务端和客户端处理支持,可以让您能更便捷地进行gRpc远程应用,使用方法大致说明如下:
1、服务端通过RemoteCallFormater.format_service对请求处理函数进行修饰
format_service支持对请求对象和响应对象的格式转换处理,简化函数处理的逻辑,示例如下:
@RemoteCallFormater.format_service(with_request=False)
async def service_simple_call_para(a, b, *args, c=10, d={'d1': 'd1value'}, **kwargs):
"""
测试简单调用,直接返回传入的参数(数组形式)
"""
return [a, b, args, c, d, kwargs]
@RemoteCallFormater.format_service(with_request=True)
async def service_client_stream(request, a, b, c=10):
"""
测试客户端流(数字加总)
"""
d = 0
for _item in request['request']:
d += _item
return [a, b, c, d]
@RemoteCallFormater.format_service(with_request=True)
async def service_server_stream_async(request, a, b, *args, c=10, d={'d1': 'd1value'}, **kwargs):
"""
测试服务端流(异步函数)
"""
print([a, b, args, c, d, kwargs])
for i in [1, 2, 3, 4]:
yield i
使用的注意事项如下:
(1)with_request指定入参是否包含gRpc服务原始的请求字典对象,如果为True则在第一个参数送入该请求字典,此时需要处理函数定义对应的入参;
(2)该修饰符可以同时支持流模式和普通模式,如果为客户端流,with_request固定为True(设置了False也没有用),处理函数需要从request[’request’]中获取对应的迭代对象进行循环处理;
(3)返回值可以支持直接返回python对象(注意需要能转换为json的对象);如果为服务端流,则需要通过yield方式返回迭代对象。
2、服务端处理,初始化无需指定servicer_mapping
参考代码如下:
# 初始化gRpc服务对象
_server = GRpcServer(
'my_server', server_config={
'run_config': {
'host': '127.0.0.1', 'port': 50051, 'workers': 2,
'enable_health_check': True,
}
}
)
# 添加请求处理函数,建议service_uri直接为函数名
AsyncTools.sync_run_coroutine(_server.add_service(
'service_simple_call_para', service_simple_call_para, call_mode=EnumCallMode.Simple
))
AsyncTools.sync_run_coroutine(_server.add_service(
'service_client_stream', service_client_stream, call_mode=EnumCallMode.ClientSideStream
))
AsyncTools.sync_run_coroutine(_server.add_service(
'service_server_stream_async', service_server_stream_async, call_mode=EnumCallMode.ServerSideStream
))
# 启动gRpc服务
AsyncTools.sync_run_coroutine(_server.start(is_asyn=False))
3、客户端处理,通过RemoteCallFormater工具处理请求和返回值
with AIOGRpcClient({
'host': '127.0.0.1', 'port': 50051, 'ping_on_connect': True, 'ping_with_health_check': True,
'use_sync_client': False, 'timeout': 3
}) as _client:
# 将远程函数的入参转换为标准请求函数
_request = RemoteCallFormater.paras_to_grpc_request(
['a_val', 'b_val', 'fixed_add1', 'fixed_add2'],
{
'c': 14, 'e': 'e_val'
}
)
# 远程调用
_result = AsyncTools.sync_run_coroutine(_client.call(
'service_simple_call_para', _request
))
# 将返回值转换为标准的CResult对象,如果成功,_result.resp为对应的返回值
_result = RemoteCallFormater.format_call_result(_result)
# 客户端流的调用方式
_request = RemoteCallFormater.paras_to_grpc_request_iter(
[1, 2, 3, 4],
['a_val', 'b_val'],
{
'c': 14
}
)
_result = AsyncTools.sync_run_coroutine(client.call(
'service_client_stream', _request, call_mode=EnumCallMode.ClientSideStream
))
_result = RemoteCallFormater.format_call_result(_result)
客户端使用连接池管理¶
客户端支持使用 HiveNetCore.connection_pool.AIOConnectionPool 进行连接池的管理,参考代码如下:
_creator = AIOGRpcClient # 连接创建模块,也可以设置为GRpcClient
_connect_config = {
'host': '127.0.0.1', 'port': 50051, 'ping_on_connect': True, 'ping_with_health_check': True,
'use_sync_client': False, 'timeout': 3
}
# 建立连接池
_pool = AIOConnectionPool(
_creator, GRpcPoolConnection, args=[_connect_config],
connect_method_name=None, max_size=3, min_size=0, connect_on_init=True,
get_timeout=1,
free_idle_time=5, ping_on_get=True, ping_on_back=True, ping_on_idle=True,
ping_interval=5
)
# 获取一个连接对象
client = AsyncTools.sync_run_coroutine(_pool.connection())
# 通过连接对象执行call等处理
...
# 退回连接对象到连接池
AsyncTools.sync_run_coroutine(client.close())
使用SSL/TSL进行安全验证¶
HiveNetGRpc支持通过SSL/TSL进行验证,包括服务端验证和客户端反向验证,参考如下:
1、证书的生成
可以通过openssl来进行相关证书的生成,参考命令如下:
# 需先生成相应证书文件(域名为localhost)
# --执行前先进入HiveNetGRpc/test_data/路径
# --创建CA根证书(自签名证书)
# --生成rsa私钥文件,使用des3加密文件(密码111111)
# openssl genrsa -passout pass:111111 -des3 -out ca.key 4096
# --通过私钥生成签名证书
# openssl req -passin pass:111111 -new -x509 -days 365 -key ca.key -out ca.crt -subj "/CN=localhost"
#
# --创建服务器证书
# --生成rsa私钥文件
# openssl genrsa -passout pass:111111 -des3 -out server.key 4096
# --通过私钥生成签名证书签名请求文件
# openssl req -passin pass:111111 -new -key server.key -out server.csr -subj "/CN=localhost"
# --由CA根证书签发根据请求文件签发证书
# openssl x509 -req -passin pass:111111 -days 365 -in server.csr -CA ca.crt -CAkey ca.key -set_serial 01 -out server.crt
# --私钥文件由加密转为非加密
# openssl rsa -passin pass:111111 -in server.key -out server.key
#
# --创建客户端证书
# openssl genrsa -passout pass:111111 -des3 -out client.key 4096
# openssl req -passin pass:111111 -new -key client.key -out client.csr -subj "/CN=localhost"
# openssl x509 -passin pass:111111 -req -days 365 -in client.csr -CA ca.crt -CAkey ca.key -set_serial 01 -out client.crt
# openssl rsa -passin pass:111111 -in client.key -out client.key
#
# --对私钥进行pkcs8编码
# openssl pkcs8 -topk8 -nocrypt -in client.key -out client.pem
# openssl pkcs8 -topk8 -nocrypt -in server.key -out server.pem
2、服务器端单向验证
服务器端可以验证客户端的证书是否有效,参考代码如下:
# 服务端指定证书的参考代码如下:
_server = GRpcServer(
'server_server_ssl', server_config={
'run_config': {
'host': '127.0.0.1', 'port': 50052, 'workers': 2,
'enable_health_check': True,
'use_ssl': True,
'ssl': [{
'cert': os.path.join(_ca_path, 'server.crt'),
'key': os.path.join(_ca_path, 'server.pem')
}]
}
}
)
...
# 客户端建立的参数参考如下(注意host必须为域名,使用ip会验证不通过):
with AIOGRpcClient({
'host': 'localhost', 'port': 50052, 'ping_on_connect': True, 'ping_with_health_check': True,
'use_sync_client': False, 'timeout': 3,
'use_ssl': True, 'root_certificates': os.path.join(self.ca_path, 'server.crt')
}) as _client:
...
3、双向验证(服务器验证客户端,客户端验证服务器)
参考代码如下:
# 服务端指定证书的参考代码如下:
_server = GRpcServer(
'server_double_ssl', server_config={
'run_config': {
'host': '127.0.0.1', 'port': 50053, 'workers': 2,
'enable_health_check': True,
'use_ssl': True,
'ssl': [{
'cert': os.path.join(_ca_path, 'server.crt'),
'key': os.path.join(_ca_path, 'server.pem')
}],
'root_certificates': os.path.join(_ca_path, 'client.crt') # 客户端反向验证的证书指定
}
}
)
...
# 客户端建立的参数参考如下(注意host必须为域名,使用ip会验证不通过):
with AIOGRpcClient({
'host': 'localhost', 'port': 50053, 'ping_on_connect': True, 'ping_with_health_check': True,
'use_sync_client': False, 'timeout': 3,
'use_ssl': True,
'root_certificates': os.path.join(self.ca_path, 'server.crt'), # 客户端反向验证的证书指定
'ssl': {
'cert': os.path.join(self.ca_path, 'client.crt'),
'key': os.path.join(self.ca_path, 'client.pem')
}
}) as _client:
...