HiveNetGRpc.msg_formater 源代码
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
# Copyright 2022 黎慧剑
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
消息格式化模块
@module msg_formater
@file msg_formater.py
"""
import json
import os
from re import I
import sys
import logging
import traceback
from logging import Logger
from functools import wraps
from inspect import isasyncgen, isawaitable, isgenerator
import grpc._channel
import grpc.aio._call
from HiveNetCore.generic import CResult
from HiveNetCore.utils.run_tool import AsyncTools
from HiveNetCore.utils.string_tool import StringTool
# 根据当前文件路径将包路径纳入, 在非安装的情况下可以引用到
sys.path.append(os.path.abspath(os.path.join(
os.path.dirname(__file__), os.path.pardir)))
from HiveNetGRpc.enum import EnumCallMode
from HiveNetGRpc.proto import msg_json_pb2
[文档]class RemoteCallFormater(object):
"""
远程调用函数的消息格式化类
(基于默认的JsonService报文格式进行处理)
"""
#############################
# 修饰函数
#############################
[文档] @classmethod
def format_service(cls, with_request: bool = True, native_request: bool = False, logger: Logger = None):
"""
服务端处理函数的请求格式转换修饰函数
注: 支持修饰同步及异步函数, 但修饰后需按异步函数执行
@param {bool} with_request=True - 是否将请求对象送入处理函数的参数中执行
注1: 如果为True则代表会将原始请求对象送入处理函数(作为第一个参数送入)
注2: 如果是流模式, 该参数固定为True, 并且会将原请求request字典中的'request'对象替换为第二个迭代值开始的json字典的迭代对象
@param {bool} native_request=False - 送入的请求对象是否原生请求对象(RpcRequest)
注: 如果请求数据包含extend_bytes的信息, 则应设置为True以获取到对应的值, 否则只能获取到return_json的值
@param {Logger} logger=None - 日志对象
"""
def decorator(f):
@wraps(f)
async def decorated_function(*args, **kwargs):
_logger = cls._get_logger(logger)
_resp_obj = None # 响应对象
try:
# 正常情况只传入一个request参数字典
if len(args) > 1:
# 是类函数
_request = args[1]
else:
_request = args[0]
# 处理函数入参
if _request['call_mode'] in (EnumCallMode.Simple, EnumCallMode.ServerSideStream):
# 传入的是单个请求对象
_args, _kwargs = cls.service_request_to_paras(
_request, with_request=with_request, native_request=native_request
)
else:
# 传入的是迭代器
_args, _kwargs = cls.service_request_to_paras_iter(
_request, native_request=native_request
)
# 处理类函数的第一个参数
if len(args) > 1:
_args.insert(0, args[0])
except:
# 处理传入参数的异常
_error = str(sys.exc_info()[0])
_err_msg = 'argument consistency error'
_traceback_str = traceback.format_exc()
_logger.warning(
'deal with para_json error, [%s]%s: %s' % (
_error, _err_msg, _traceback_str
)
)
_resp_obj = cls.service_exception_to_grpc_resp(
'21003', _error, _err_msg
)
# 执行函数对象
if _resp_obj is None:
try:
_resp_obj = f(*_args, **_kwargs)
if isawaitable(_resp_obj):
_resp_obj = await _resp_obj
except:
# 执行函数出现异常
_error = str(sys.exc_info()[0])
_err_msg = 'remote function execute raise exception'
_traceback_str = traceback.format_exc()
_logger.error(
'call service function error, [%s]%s: %s' % (
_error, _err_msg, _traceback_str
)
)
_resp_obj = cls.service_exception_to_grpc_resp(
'31008', _error, _err_msg
)
# 返回结果处理
try:
return cls.service_resp_to_grpc_resp(_resp_obj, _request['call_mode'])
except:
# 处理返回结果出现异常
_error = str(sys.exc_info()[0])
_err_msg = 'deal with response error'
_traceback_str = traceback.format_exc()
_logger.warning(
'deal with service function response error, [%s]%s: %s' % (
_error, _err_msg, _traceback_str
)
)
_resp_obj = cls.service_exception_to_grpc_resp(
'31001', _error, _err_msg
)
return cls.service_resp_to_grpc_resp(_resp_obj, _request['call_mode'])
return decorated_function
return decorator
#############################
# 服务端工具函数
#############################
[文档] @classmethod
def service_request_to_paras(cls, request: dict, with_request: bool = True,
native_request: bool = False) -> tuple:
"""
服务端请求对象转换为函数入参
@param {dict} request - 服务的请求入参字典
{
'request': request, # 请求报文对象(如果是流模式也需获取对应的请求报文对象)
'context': context, # 请求服务端上下文, grpc.ServicerContext
'call_mode': call_mode # 调用模式
}
@param {bool} with_request=True - 是否将请求对象送入处理函数的参数中执行
@param {bool} native_request=False - 送入的请求对象是否原生请求对象(RpcRequest)
注: 如果请求数据包含extend_bytes的信息, 则应设置为True以获取到对应的值, 否则只能获取到return_json的值
@returns {tuple} - 返回函数入参的二元组(args, kwargs)
"""
_args = []
_kwargs = {}
# 按标准处理参数
_para_json = StringTool.json_loads_hive_net(
request['request'].para_json
)
if with_request:
# 请求对象送入处理函数的参数
if native_request:
_args.append(request)
else:
_args.append({
'request': _para_json,
'context': request['context'],
'call_mode': request['call_mode']
})
_args.extend(_para_json.get('args', []))
_kwargs.update(_para_json.get('kwargs', {}))
# 返回结果
return _args, _kwargs
[文档] @classmethod
def service_request_to_paras_iter(cls, request: dict, native_request: bool = False) -> tuple:
"""
服务端迭代类型的请求对象转换为函数入参及迭代对象
注: 第一个迭代值固定为函数入参, 后面的迭代值则转换为json值的迭代
@param {dict} request - 服务的请求入参字典
{
'request': request_iterator, # 流模式的请求对象迭代器
'context': context, # 请求服务端上下文, grpc.ServicerContext
'call_mode': call_mode # 调用模式
}
@param {bool} native_request=False - 送入的请求对象是否原生请求对象(RpcRequest)
注: 如果请求数据包含extend_bytes的信息, 则应设置为True以获取到对应的值, 否则只能获取到return_json的值
@returns {tuple} - 返回函数入参的二元组(args, kwargs)
"""
# 先分离第一个对象和第二个开始的值迭代器
_request, _value_iter = cls._get_service_request_json_iter(
request['request'], native_request=native_request
)
_temp_request = {
'request': _request,
'context': request['context'],
'call_mode': request['call_mode']
}
# 获取函数入参
_args, _kwargs = cls.service_request_to_paras(
_temp_request, with_request=True, native_request=native_request
)
# 替换请求对象并送入处理函数
_temp_request['request'] = _value_iter
_args[0] = _temp_request
return _args, _kwargs
[文档] @classmethod
def service_exception_to_grpc_resp(cls, code: str, error: str, err_msg: str,
msg_para: tuple = ()) -> msg_json_pb2.RpcResponse:
"""
服务端异常信息转换为grpc的响应对象
@param {str} code - 错误码
@param {str} error - 抛出异常时,异常对象的类型
@param {str} err_msg - 错误信息
@param {tuple} msg_para - 错误信息对应的参数, JSON格式, 数组()
@returns {msg_json_pb2.RpcResponse} - 返回的grpc响应对象
"""
return msg_json_pb2.RpcResponse(
return_json='', call_code=code, call_msg=err_msg, call_error=error,
call_msg_para=json.dumps(msg_para, ensure_ascii=False)
)
[文档] @classmethod
def service_resp_to_grpc_resp(cls, obj, call_mode: EnumCallMode, extend_bytes: bytes = None):
"""
将服务端函数返回对象转换为服务返回的grpc对象或迭代器
@param {Any} obj - 要处理的对象
@param {EnumCallMode} call_mode - 请求模式
@param {bytes} extend_bytes = None - 要返回的扩展字节数组
注: 如果传入的对象已经是msg_json_pb2.RpcResponse或迭代器则不会处理
@returns {msg_json_pb2.RpcResponse|iterator} - 返回grpc对象或迭代器
"""
if call_mode in (EnumCallMode.Simple, EnumCallMode.ClientSideStream):
# 返回的是对象
return cls._service_resp_obj_to_grpc_resp(obj, extend_bytes=extend_bytes)
else:
# 需要返回迭代器
return cls._service_resp_obj_to_grpc_async_iter(obj)
#############################
# 客户端工具函数
#############################
[文档] @classmethod
def paras_to_grpc_request(cls, args: list = None, kwargs: dict = None) -> msg_json_pb2.RpcRequest:
"""
将函数入参转换为grpc请求对象
@param {list} args=None - 函数固定参数
@param {dict} kwargs=None - 函数kv参数
@returns {msg_json_pb2.RpcRequest} - 请求对象
"""
_args = [] if args is None else args
_kwargs = {} if kwargs is None else kwargs
return msg_json_pb2.RpcRequest(
para_json=StringTool.json_dumps_hive_net(
{
'args': _args, 'kwargs': _kwargs
}, ensure_ascii=False
)
)
[文档] @classmethod
def paras_to_grpc_request_iter(cls, iter_obj, args: list = None, kwargs: dict = None):
"""
将迭代对象和函数入参转换为grpc请求对象迭代对象(流模式)
@param {generator|async_generator} iter_obj - 要发送的请求数据迭代对象(支持同步或异步)
@param {list} args=None - 函数固定参数
@param {dict} kwargs=None - 函数kv参数
@returns {generator} - 请求迭代对象
"""
# 第一个迭代对象是函数调用参数
yield cls.paras_to_grpc_request(args, kwargs)
# 从后面开始放送数据
for _data in AsyncTools.sync_for_async_iter(iter_obj):
if isinstance(_data, msg_json_pb2.RpcRequest):
# 已经是标准请求对象, 无需转换
yield _data
else:
yield msg_json_pb2.RpcRequest(
para_json=StringTool.json_dumps_hive_net(_data, ensure_ascii=False)
)
[文档] @classmethod
def format_call_result(cls, call_result: CResult) -> CResult:
"""
将GRpcClient调用返回的结果转换为标准CResult对象
@param {CResult} call_result - grpc客户端的call函数返回结果
@returns {CResult} - 标准CResult对象
cresult.resp - 远程调用的返回值
cresult.extend_bytes - 扩展返回的字节数组
"""
if not call_result.is_success():
# 本身调用失败
call_result.resp = None
return call_result
else:
if isgenerator(call_result.resp) or isasyncgen(call_result.resp) or getattr(call_result.resp, '__next__', None) is not None:
# 是迭代对象, 需要将call_result.resp转换为标准的CResult的迭代对象
return cls._client_grpc_iter_to_cresult_iter(call_result.resp)
else:
# 正常对象处理
return cls._client_grpc_resp_to_cresult(call_result.resp)
#############################
# 内部函数
#############################
@classmethod
def _get_logger(cls, logger: Logger) -> Logger:
"""
获取日志对象
@param {Logger} logger - 送入的日志对象
注: 如果送None代表自动创建一个日志对象
@returns {Logger} - 返回的日志对象
"""
if logger is None:
logging.basicConfig()
_logger = logging.getLogger(__name__)
_logger.level = logging.INFO
return _logger
else:
return logger
@classmethod
def _get_service_request_json_iter(cls, request_iter, native_request: bool = False) -> tuple:
"""
获取服务端请求的json迭代对象
@param {Any} request_iter - 流模式的请求对象迭代器
@param {list} out_first_obj=[] - 要返回的第一个request对象(放入数组)
@param {bool} native_request=False - 送入的请求对象是否原生请求对象(RpcRequest)
注: 如果请求数据包含extend_bytes的信息, 则应设置为True以获取到对应的值, 否则只能获取到return_json的值
@returns {tuple} - 返回第一个迭代值和剩余迭代器的数组(first, iter)
"""
# 获取第一个迭代对象
if hasattr(request_iter, '__anext__'):
_first = AsyncTools.sync_run_coroutine(
request_iter.__anext__()
)
else:
# 非异步模式, 需要用for才能获取到第一个对象
for _item in request_iter:
_first = _item
break
return _first, cls._service_request_iter_to_json_iter(
request_iter, native_request=native_request
)
@classmethod
def _service_request_iter_to_json_iter(cls, request_iter, native_request: bool = False):
"""
将服务端请求数据迭代对象转换为json迭代对象
@param {iter} request_iter - 请求迭代对象(第二个开始)
@param {bool} native_request=False - 送入的请求对象是否原生请求对象(RpcRequest)
注: 如果请求数据包含extend_bytes的信息, 则应设置为True以获取到对应的值, 否则只能获取到return_json的值
@returns {iter} - 如果native_request为False返回转换的json字典迭代对象, 否则返回原生的RpcRequest迭代对象
"""
if hasattr(request_iter, '__anext__'):
# 异步模式
while True:
try:
_item = AsyncTools.sync_run_coroutine(
request_iter.__anext__()
)
if native_request:
yield _item
else:
yield StringTool.json_loads_hive_net(_item.para_json)
except StopAsyncIteration:
break
else:
# 同步模式
for _item in request_iter:
if native_request:
yield _item
else:
yield StringTool.json_loads_hive_net(_item.para_json)
@classmethod
def _service_resp_obj_to_grpc_resp(cls, obj, extend_bytes: bytes = None) -> msg_json_pb2.RpcResponse:
"""
将服务端函数返回的单个对象转换为msg_json_pb2.RpcResponse对象(服务端使用)
@param {Any} obj - 要转换的对象
@param {bytes} extend_bytes = None - 要返回的扩展字节数组
注: 如果传入的对象已经是msg_json_pb2.RpcResponse则不会处理
@returns {msg_json_pb2.RpcResponse} - grpc的响应对象
"""
if isinstance(obj, msg_json_pb2.RpcResponse):
# 无需转换
return obj
return msg_json_pb2.RpcResponse(
return_json=StringTool.json_dumps_hive_net(obj, ensure_ascii=False),
extend_bytes=extend_bytes,
call_code='00000',
call_msg='success',
call_error='',
call_msg_para='[]'
)
@classmethod
async def _service_resp_obj_to_grpc_async_iter(cls, obj):
"""
将服务端返回对象转换为异步迭代grpc响应对象(服务端使用)
@param {Any} obj - 要处理的对象
"""
if isgenerator(obj):
# 普通迭代器
for _iter_item in obj:
yield cls._service_resp_obj_to_grpc_resp(_iter_item)
elif isasyncgen(obj):
# 异步迭代器
async for _iter_item in obj:
yield cls._service_resp_obj_to_grpc_resp(_iter_item)
else:
# obj是单个对象
yield cls._service_resp_obj_to_grpc_resp(obj)
@classmethod
def _client_grpc_resp_to_cresult(cls, grpc_resp: msg_json_pb2.RpcResponse) -> CResult:
"""
客户端将单个grpc响应对象转换为CResult对象
@param {msg_json_pb2.RpcResponse} grpc_resp - grpc的响应对象
@returns {CResult} - 标准CResult对象
cresult.resp - 远程调用的返回值
cresult.extend_bytes - 扩展返回的字节数组
"""
_result = CResult(
code=grpc_resp.call_code, msg=grpc_resp.call_msg, error=grpc_resp.call_error,
i18n_msg_paras=[] if grpc_resp.call_msg_para == '' else StringTool.json_loads_hive_net(
grpc_resp.call_msg_para
)
)
_result.resp = None if grpc_resp.return_json == '' else StringTool.json_loads_hive_net(
grpc_resp.return_json
)
_result.extend_bytes = grpc_resp.extend_bytes
return _result
@classmethod
def _client_grpc_iter_to_cresult_iter(cls, obj):
"""
将grpc流模式返回的迭代响应对象转换为CResult迭代对象
@param {iter} obj - 迭代对象
"""
try:
for _resp in AsyncTools.sync_for_async_iter(obj):
yield cls._client_grpc_resp_to_cresult(_resp)
except (grpc._channel._Rendezvous, grpc._channel._InactiveRpcError):
# 执行远程调用出现异常
_code = '20408'
_grpc_err = sys.exc_info()[1]
if _grpc_err._state.code.value[0] == 4:
# 调用超时
_code = '30403'
_result = CResult(
code=_code,
error=str(type(_grpc_err)),
trace_str=traceback.format_exc(),
i18n_msg_paras=(_grpc_err._state.code.name, _grpc_err._state.details)
)
_result.resp = None
yield _result
except (grpc.aio._call.AioRpcError):
# 执行远程调用出现异常(异步的情况)
_code = '20408'
_aio_grpc_error = sys.exc_info()[1]
_state_code = _aio_grpc_error.code()
if _state_code.value[0] == 4:
# 调用超时
_code = '30403'
_result = CResult(
code=_code,
error=str(type(_aio_grpc_error)),
trace_str=traceback.format_exc(),
i18n_msg_paras=(_state_code.name, _aio_grpc_error.details())
)
_result.resp = None
yield _result
except:
_error = str(sys.exc_info()[0])
_result = CResult(
code='21007',
error=_error,
trace_str=traceback.format_exc(),
i18n_msg_paras=(_error)
)
_result.resp = None
yield _result