# HiveNetGRpc总览 HiveNetGRpc是HiveNetAssemble提供的gRpc的封装, 可以简单实现兼容 HiveNetWebUtils.server.ServerBaseFW 和 HiveNetWebUtils.client.ClientBaseFw 的服务器和客户端功能。 ## 安装方法 ### 源码方式安装 - HiveNetGRpc库安装 1、下载整体源码到需要安装的服务器上; 2、通过命令行进入源码目录 3、执行编译命令:python setup.py build 4、执行安装命令:python setup.py install PIPY安装:pip install HiveNetGRpc - 安装包打包(2种方式) 1、python安装包方式:python setup.py sdist 安装:python setup.py install 2、python setup.py bdist_wheel 安装:pip install HiveNetGRpc-0.1.0-py3-none-any.whl - 安装注意事项 安装后需要执行一次proto_generate,重新生成系统默认的proto消息结构(应对系统环境不同或grpcio版本不同的情况) ## 库模块大纲 ### server server模块提供ServerBaseFW服务框的gPpc实现类GRpcServer和AIOGRpcServer(异步IO模式),可基于该实现类快速构建gRpc服务。 注意:AIOGRpcServer是新的特性,隐含了一些异步的bug(表面看不影响程序处理,只是会打印一些错误),如果对异步IO没有要求建议直接使用GRpcServer。 ### client client模块提供ClientBaseFw客户端连接框架的gRpc实现类AIOGRpcClient(异步IO模式)和GRpcClient,以及支持AIOConnectionPool连接池应用的PoolConnectionFW连接池对象实现类GRpcPoolConnection。 ### tool tool模块提供GRpcTool工具类,支持一些常用的gRpc信息提取及处理。 ### proto proto包提供HiveNetGRpc默认支持的消息定义,包括: - msg_json (JsonService) - 支持json信息收发的消息定义 - msg_bytes (BytesService) - 支持bytes信息收发的消息定义 ### msg_formater msg_formater模块提供HiveNetGRpc默认支持消息格式化处理类,包括: - RemoteCallFormater - 远程调用函数的消息格式化类,该类基于默认的JsonService报文格式进行处理,可以便捷地实现客户端通过gRpc方式对远程服务端函数的调用。 ## HiveNetGRpc使用说明 ### 原生使用模式 该模式大部分处理需使用gRpc原生对象,可以根据自己的需要进行更灵活的扩展。 #### 1、自定义gRpc的消息格式(protobuf) 您可以直接使用HiveNetGRpc默认支持的msg_json和msg_bytes,同样也可以自定义自己所需要的消息格式,操作步骤说明如下: (1)创建 .proto 文件,例如 msg_test.proto,内容和格式可参考HiveNetGRpc.proto.msg_json.proto,按需要定义RpcRequest、RpcResponse的结构,注意除消息结构以及服务名以外,其他内容中的命名请保持与HiveNetGRpc.proto.msg_json.proto一致,例如: ``` syntax = "proto3"; package HiveNetGRPC; // 服务名可按需要修改为不同的服务名(注意同一个包中的服务名不要重复) // rpc名请保证跟示例保持一致, 为标识名称和Service的组合 service TestService { rpc GRpcCallSimple (RpcRequest) returns (RpcResponse){}; // 简单调用 rpc GRpcCallClientSideStream (stream RpcRequest) returns (RpcResponse){}; // 客户端流式 rpc GRpcCallServerSideStream (RpcRequest) returns (stream RpcResponse){}; // 服务端流式 rpc GRpcCallBidirectionalStream (stream RpcRequest) returns (stream RpcResponse){}; // 双向数据流模式 rpc GRpcCallHealthCheck (HealthRequest) returns (HealthResponse){}; // 健康检查 } // 请求消息结构 message RpcRequest { // 可以自定义请求消息结构 ... } // 响应消息结构 message RpcResponse { // 可以自定义响应消息结构 ... } // 自定义健康检查的服务 message HealthRequest { string service = 1; // 健康监控请求 } message HealthResponse { enum ServingStatus { UNKNOWN = 0; SERVING = 1; NOT_SERVING = 2; SERVICE_UNKNOWN = 3; } ServingStatus status = 1; } ``` (2)编译.proto文件 命令行进入.proto所在的目录,执行以下命令进行编译: ``` python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. test_json.proto ``` 编译后将生成test_json_pb2.py、msg_json_pb2_grpc.py两个文件。 **注意:编译前需使用pip安装grpcio-tools和googleapis-common-protos** (3)修改test_json_pb2_grpc.py文件,增加上库路径指定 test_json_pb2 对象引入 ``` import sys import os # 根据当前文件路径将包路径纳入,在非安装的情况下可以引用到 sys.path.append(os.path.abspath(os.path.join( os.path.dirname(__file__), os.path.pardir, os.path.pardir))) import xxx.msg_json_pb2 as msg__json__pb2 ``` #### 2、自定义服务管理类(Servicer) 服务管理类Servicer实现的功能包括管理请求处理函数路由和为gRpc提供标准接入接口,HiveNetGRpc已实现了通用的服务管类GRpcServicer(与GRpcServer配套使用)和AIOGRpcServicer(与AIOGRpcServer配套使用),如果需要自定义服务管理的逻辑(例如改变处理函数的入参和出参格式,以及增加获取到请求的自定义处理逻辑等),则可参考GRpcServicer或AIOGRpcServicer来实现自己的服务管理类(公开函数的接口需要保持一致)。 注意:GRpcServicer、AIOGRpcServicer必须与服务器类GRpcServicer、AIOGRpcServer配套使用,如果将AIOGRpcServicer用于GRpcServicer将无法正常处理。 #### 3、开发服务端请求处理函数 开发服务端请求的处理函数,函数位置可以在模块中,也可以是类的静态函数或实例成员函数,同时函数也可以是异步IO函数。例如: ``` def deal_func(request: dict) -> msg_test_pb2.RpcResponse: """服务端处理函数""" # 自定义的处理逻辑 ... # 返回响应对象 RpcResponse ... ``` 注意,处理函数的入参和出参是需要根据不同的服务管理类(Servicer)进行定义,如果使用默认的GRpcServicer,函数定义说明如下: ``` (1)函数入参固定为 func(request): request为请求字典,说明如下: { 'request': xxx_pb2.RpcRequest/xxx_pb2.RpcRequest迭代器, # 请求报文对象, 如果是流模式则为请求报文对象的迭代器 'context': context, # 请求服务端上下文, grpc.ServicerContext 'call_mode': call_mode # 调用模式,具体见 HiveNetGRpc.enum.EnumCallMode } 注意: 如果call_mode为ClientSideStream或BidirectionalStream,客户端通过流方式发送数据,则请求对象为迭代器,可以通过 __anext__()来逐个进行获取,参考代码如下: while True: try: # 逐个获取客户端发送过来的请求 _item: xxx_pb2.RpcRequest = AsyncTools.sync_run_coroutine( request_iter.__anext__() ) # 处理逻辑 ... except StopAsyncIteration: break (2)函数的返回值为xxx_pb2.RpcResponse或xxx_pb2.RpcResponse的迭代对象,如果call_mode为ServerSideStream和BidirectionalStream,服务端应通过流方式返回数据,需要返回异步IO的迭代对象,参考处理函数的返回方式如下: async def deal_fun(request): # 请求处理逻辑 ... # 流方式返回处理结果 for xx in xxx: ... _ret: xxx_pb2.RpcResponse = ... yield _ret ``` #### 4、gRpc服务端代码 服务端的参考代码如下: ``` # 创建服务管理类的映射字典, key为服务名, value维护服务管理类实例对象, 可以支持送入多个服务管理类 # 如果不需要自定义服务管理类可以不送该参数, 服务端会默认使用JsonService(msg_json格式)和GRpcServicer作为默认的服务管理类 _servicer_mapping = { 'JsonService': GRpcServicer(...), 'BytesService': GRpcServicer(...) } # 初始化gRpc服务对象 _server = GRpcServer( 'my_server', server_config={ 'run_config': { 'host': '127.0.0.1', 'port': 50051, 'workers': 2, 'enable_health_check': True, } }, servicer_mapping=_servicer_mapping ) # 添加请求处理函数 AsyncTools.sync_run_coroutine(_server.add_service( 'my_deal_func_uri', deal_func, call_mode=EnumCallMode.Simple, servicer_name='JsonService' )) # 启动gRpc服务 AsyncTools.sync_run_coroutine(_server.start(is_asyn=False)) ``` #### 5、gRpc客户端代码 gRpc客户端可以根据需要使用AIOGRpcClient(异步IO模式)或GRpcClient(同步模式),参考代码如下: ``` with AIOGRpcClient({ 'host': '127.0.0.1', 'port': 50051, 'ping_on_connect': True, 'ping_with_health_check': True, 'use_sync_client': False, 'timeout': 3 }) as _client: _request = xxx_pb2.RpcRequest( xx1='value1', xx2='value2', ... ) _result = AsyncTools.sync_run_coroutine(client.call( 'my_deal_func_uri', _request, call_mode=EnumCallMode.Simple )) ``` ### 通过RemoteCallFormater实现函数远程调用 HiveNetGRpc.msg_formater.RemoteCallFormater实现基于json格式传输数据的函数远程调用的服务端和客户端处理支持,可以让您能更便捷地进行gRpc远程应用,使用方法大致说明如下: **1、服务端通过RemoteCallFormater.format_service对请求处理函数进行修饰** format_service支持对请求对象和响应对象的格式转换处理,简化函数处理的逻辑,示例如下: ``` @RemoteCallFormater.format_service(with_request=False) async def service_simple_call_para(a, b, *args, c=10, d={'d1': 'd1value'}, **kwargs): """ 测试简单调用,直接返回传入的参数(数组形式) """ return [a, b, args, c, d, kwargs] @RemoteCallFormater.format_service(with_request=True) async def service_client_stream(request, a, b, c=10): """ 测试客户端流(数字加总) """ d = 0 for _item in request['request']: d += _item return [a, b, c, d] @RemoteCallFormater.format_service(with_request=True) async def service_server_stream_async(request, a, b, *args, c=10, d={'d1': 'd1value'}, **kwargs): """ 测试服务端流(异步函数) """ print([a, b, args, c, d, kwargs]) for i in [1, 2, 3, 4]: yield i ``` 使用的注意事项如下: (1)with_request指定入参是否包含gRpc服务原始的请求字典对象,如果为True则在第一个参数送入该请求字典,此时需要处理函数定义对应的入参; (2)该修饰符可以同时支持流模式和普通模式,如果为客户端流,with_request固定为True(设置了False也没有用),处理函数需要从request['request']中获取对应的迭代对象进行循环处理; (3)返回值可以支持直接返回python对象(注意需要能转换为json的对象);如果为服务端流,则需要通过yield方式返回迭代对象。 **2、服务端处理,初始化无需指定servicer_mapping** 参考代码如下: ``` # 初始化gRpc服务对象 _server = GRpcServer( 'my_server', server_config={ 'run_config': { 'host': '127.0.0.1', 'port': 50051, 'workers': 2, 'enable_health_check': True, } } ) # 添加请求处理函数,建议service_uri直接为函数名 AsyncTools.sync_run_coroutine(_server.add_service( 'service_simple_call_para', service_simple_call_para, call_mode=EnumCallMode.Simple )) AsyncTools.sync_run_coroutine(_server.add_service( 'service_client_stream', service_client_stream, call_mode=EnumCallMode.ClientSideStream )) AsyncTools.sync_run_coroutine(_server.add_service( 'service_server_stream_async', service_server_stream_async, call_mode=EnumCallMode.ServerSideStream )) # 启动gRpc服务 AsyncTools.sync_run_coroutine(_server.start(is_asyn=False)) ``` 3、客户端处理,通过RemoteCallFormater工具处理请求和返回值 ``` with AIOGRpcClient({ 'host': '127.0.0.1', 'port': 50051, 'ping_on_connect': True, 'ping_with_health_check': True, 'use_sync_client': False, 'timeout': 3 }) as _client: # 将远程函数的入参转换为标准请求函数 _request = RemoteCallFormater.paras_to_grpc_request( ['a_val', 'b_val', 'fixed_add1', 'fixed_add2'], { 'c': 14, 'e': 'e_val' } ) # 远程调用 _result = AsyncTools.sync_run_coroutine(_client.call( 'service_simple_call_para', _request )) # 将返回值转换为标准的CResult对象,如果成功,_result.resp为对应的返回值 _result = RemoteCallFormater.format_call_result(_result) # 客户端流的调用方式 _request = RemoteCallFormater.paras_to_grpc_request_iter( [1, 2, 3, 4], ['a_val', 'b_val'], { 'c': 14 } ) _result = AsyncTools.sync_run_coroutine(client.call( 'service_client_stream', _request, call_mode=EnumCallMode.ClientSideStream )) _result = RemoteCallFormater.format_call_result(_result) ``` ### 客户端使用连接池管理 客户端支持使用 HiveNetCore.connection_pool.AIOConnectionPool 进行连接池的管理,参考代码如下: ``` _creator = AIOGRpcClient # 连接创建模块,也可以设置为GRpcClient _connect_config = { 'host': '127.0.0.1', 'port': 50051, 'ping_on_connect': True, 'ping_with_health_check': True, 'use_sync_client': False, 'timeout': 3 } # 建立连接池 _pool = AIOConnectionPool( _creator, GRpcPoolConnection, args=[_connect_config], connect_method_name=None, max_size=3, min_size=0, connect_on_init=True, get_timeout=1, free_idle_time=5, ping_on_get=True, ping_on_back=True, ping_on_idle=True, ping_interval=5 ) # 获取一个连接对象 client = AsyncTools.sync_run_coroutine(_pool.connection()) # 通过连接对象执行call等处理 ... # 退回连接对象到连接池 AsyncTools.sync_run_coroutine(client.close()) ``` ### 使用SSL/TSL进行安全验证 HiveNetGRpc支持通过SSL/TSL进行验证,包括服务端验证和客户端反向验证,参考如下: **1、证书的生成** 可以通过openssl来进行相关证书的生成,参考命令如下: ``` # 需先生成相应证书文件(域名为localhost) # --执行前先进入HiveNetGRpc/test_data/路径 # --创建CA根证书(自签名证书) # --生成rsa私钥文件,使用des3加密文件(密码111111) # openssl genrsa -passout pass:111111 -des3 -out ca.key 4096 # --通过私钥生成签名证书 # openssl req -passin pass:111111 -new -x509 -days 365 -key ca.key -out ca.crt -subj "/CN=localhost" # # --创建服务器证书 # --生成rsa私钥文件 # openssl genrsa -passout pass:111111 -des3 -out server.key 4096 # --通过私钥生成签名证书签名请求文件 # openssl req -passin pass:111111 -new -key server.key -out server.csr -subj "/CN=localhost" # --由CA根证书签发根据请求文件签发证书 # openssl x509 -req -passin pass:111111 -days 365 -in server.csr -CA ca.crt -CAkey ca.key -set_serial 01 -out server.crt # --私钥文件由加密转为非加密 # openssl rsa -passin pass:111111 -in server.key -out server.key # # --创建客户端证书 # openssl genrsa -passout pass:111111 -des3 -out client.key 4096 # openssl req -passin pass:111111 -new -key client.key -out client.csr -subj "/CN=localhost" # openssl x509 -passin pass:111111 -req -days 365 -in client.csr -CA ca.crt -CAkey ca.key -set_serial 01 -out client.crt # openssl rsa -passin pass:111111 -in client.key -out client.key # # --对私钥进行pkcs8编码 # openssl pkcs8 -topk8 -nocrypt -in client.key -out client.pem # openssl pkcs8 -topk8 -nocrypt -in server.key -out server.pem ``` **2、服务器端单向验证** 服务器端可以验证客户端的证书是否有效,参考代码如下: ``` # 服务端指定证书的参考代码如下: _server = GRpcServer( 'server_server_ssl', server_config={ 'run_config': { 'host': '127.0.0.1', 'port': 50052, 'workers': 2, 'enable_health_check': True, 'use_ssl': True, 'ssl': [{ 'cert': os.path.join(_ca_path, 'server.crt'), 'key': os.path.join(_ca_path, 'server.pem') }] } } ) ... # 客户端建立的参数参考如下(注意host必须为域名,使用ip会验证不通过): with AIOGRpcClient({ 'host': 'localhost', 'port': 50052, 'ping_on_connect': True, 'ping_with_health_check': True, 'use_sync_client': False, 'timeout': 3, 'use_ssl': True, 'root_certificates': os.path.join(self.ca_path, 'server.crt') }) as _client: ... ``` **3、双向验证(服务器验证客户端,客户端验证服务器)** 参考代码如下: ``` # 服务端指定证书的参考代码如下: _server = GRpcServer( 'server_double_ssl', server_config={ 'run_config': { 'host': '127.0.0.1', 'port': 50053, 'workers': 2, 'enable_health_check': True, 'use_ssl': True, 'ssl': [{ 'cert': os.path.join(_ca_path, 'server.crt'), 'key': os.path.join(_ca_path, 'server.pem') }], 'root_certificates': os.path.join(_ca_path, 'client.crt') # 客户端反向验证的证书指定 } } ) ... # 客户端建立的参数参考如下(注意host必须为域名,使用ip会验证不通过): with AIOGRpcClient({ 'host': 'localhost', 'port': 50053, 'ping_on_connect': True, 'ping_with_health_check': True, 'use_sync_client': False, 'timeout': 3, 'use_ssl': True, 'root_certificates': os.path.join(self.ca_path, 'server.crt'), # 客户端反向验证的证书指定 'ssl': { 'cert': os.path.join(self.ca_path, 'client.crt'), 'key': os.path.join(self.ca_path, 'client.pem') } }) as _client: ... ```