HiveNetPipeline.embed_router 源代码

# !/usr/bin/env python3
# -*- coding: UTF-8 -*-
# Copyright 2019 黎慧剑
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

"""
管道通用路由插件模块
@module extend_router
@file extend_router.py
"""

import os
import sys
# 根据当前文件路径将包路径纳入, 在非安装的情况下可以引用到
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)))
from HiveNetPipeline import Tools, PipelineRouter
from HiveNetCore.utils.condition_tool import ConditionTool


__MOUDLE__ = 'extend_router'  # 模块名
__DESCRIPT__ = u'管道通用路由插件模块'  # 模块描述
__VERSION__ = '0.1.0'  # 版本
__AUTHOR__ = u'黎慧剑'  # 作者
__PUBLISH__ = '2020.10.16'  # 发布日期


[文档]class GoToNode(PipelineRouter): """ 跳转到指定节点 """
[文档] @classmethod def router_name(cls) -> str: """ 路由器名称, 唯一标识路由器 @returns {str} - 当前路由器名称 """ return 'GoToNode'
[文档] @classmethod def get_next(cls, output, context: dict, pipeline_obj, run_id: str, **kwargs): """ 获取路由下一节点 @param {object} output - 上一个节点的输出结果 @param {dict} context - 上下文字典 @param {Pipeline} pipeline_obj - 管道对象 @param {str} run_id - 当前管道的运行id @param {kwargs} - 传入的扩展参数 default_to_next {bool} - 当没有给出上下文参数及扩展参数的时候是否默跳转到下一个节点, 默认为False 可在上下文设置或直接管道配置传入的参数: goto_node_id {str} - 可选, 要跳转到的节点id, 与goto_node_name传入一个即可 goto_node_name {str} - 可选, 要跳转到的节点名, 与goto_node_id传入一个即可 @returns {str} - 下一节点的配置id, 如果是最后的节点, 返回None """ # 优先以context的参数进行处理 _goto_node_id = str(context.pop('goto_node_id', '')) _goto_node_name = context.pop('goto_node_name', '') # 尝试从管道参数中获取跳转参数 if _goto_node_id == '' and _goto_node_name == '': _goto_node_id = str(kwargs.get('goto_node_id', '')) _goto_node_name = kwargs.get('goto_node_name', '') # 处理跳转 if _goto_node_id != '': _next_id = _goto_node_id elif _goto_node_name != '': _next_id = Tools.get_node_id_by_name(_goto_node_name, pipeline_obj) if _next_id is None: raise RuntimeError( 'GoToNode Router Error: goto_node_name[%s] not found!' % _goto_node_name) else: # 没有跳转参数 if kwargs.get('default_to_next', False): # 跳转到下一个节点 _next_id = None _temp_id = str(int(pipeline_obj.current_node_id(run_id=run_id)) + 1) if _temp_id in pipeline_obj.pipeline.keys(): _next_id = _temp_id else: raise RuntimeError( 'GoToNode Router Error: goto para not found in context or pipeline para!') # 返回路由节点 return _next_id
[文档]class IfGotoNode(PipelineRouter): """ 根据指定条件跳转到对应节点 """
[文档] @classmethod def router_name(cls) -> str: """ 路由器名称, 唯一标识路由器 @returns {str} - 当前路由器名称 """ return 'IfGotoNode'
[文档] @classmethod def get_next(cls, output, context: dict, pipeline_obj, run_id: str, **kwargs): """ 获取路由下一节点 @param {object} output - 上一个节点的输出结果 @param {dict} context - 上下文字典 @param {Pipeline} pipeline_obj - 管道对象 @param {str} run_id - 当前管道的运行id @param {kwargs} - 传入的扩展参数 default_to_next {bool} - 当没有给出上下文参数及扩展参数的时候是否默跳转到下一个节点, 默认为False 可在上下文设置或直接管道配置传入的参数: if_goto_node_conditions {dict} - IF判断条件, 格式如下: {'操作符': [('条件类型', {条件参数}), ...]} 操作符支持: '$and' - 数组中的条件以and方式联合判断, '$or' - 数组中的条件以or方式联合判断, '$not' - 数组中的条件以and联合并对结果取反 一个字典那的多个操作符以and联合, 例如: {'$and': [条件集合1], '$or': [条件集合2], '$not': [条件集合3]} 代表 (条件集合1结果) and (条件集合2结果) and (条件集合3结果) 支持条件的嵌套, 例如 {'$and': [(条件1), {'$or': [(条件2), (条件3)]}]} 代表 (条件1结果) and (条件2 or 条件3) 目前支持的条件类型包括: pyexp - python脚本表达式, 条件参数为{'exp': '表达式脚本'}, 脚本中可以直接使用context和output变量 例如: {'$and': [('pyexp', {'exp': 'context["a"] == 10'}),]} exists - 值(或key)是否在指定对象中, 条件参数为{'value': '判断值(或key)', 'obj_type': '检查对象获取类型', 'obj': 检查对象} obj_type可设置为以下值: instance - obj为直接给出的对象(默认) strexp - obj为字符格式的对象(json), 例如: '{"a": "aval", "b": "bval"}', '[1, 2, 3]' pyexp - obj为python表达式指定的变量, 脚本中可以直接使用context和output变量 例如: {'$and': [('exists', {'value': 'key', 'context["my_var"]'}),]} if_goto_node_id {str} - 可选, 条件为True时要跳转到的节点id, 与if_goto_node_id_name传入一个即可 if_goto_node_name {str} - 可选, 条件为True时要跳转到的节点name, 与if_goto_node_id传入一个即可 if_goto_node_else_id {str} - 可选, 条件为False时要跳转到的节点id, 与if_goto_node_else_name传入一个即可 if_goto_node_else_name {str} - 可选, 条件为True时要跳转到的节点name, 与if_goto_node_else_id传入一个即可 @returns {str} - 下一节点的配置id, 如果是最后的节点, 返回None """ # 获取条件并进行判断 _if_goto_node_conditions = context.pop( 'if_goto_node_conditions', kwargs.get('if_goto_node_conditions', None) ) _condition_result = True if _if_goto_node_conditions is not None: _condition_result = ConditionTool.run_conditions( _if_goto_node_conditions, globals={}, locals={'context': context, 'output': output} ) # 处理跳转的上下文参数 _if_goto_node_id = str(context.pop('if_goto_node_id', '')) _if_goto_node_name = context.pop('if_goto_node_name', '') _if_goto_node_else_id = str(context.pop('if_goto_node_else_id', '')) _if_goto_node_else_name = context.pop('if_goto_node_else_name', '') # 处理跳转 _goto_node_id = '' _goto_node_name = '' if _condition_result: if _if_goto_node_id == '' and _if_goto_node_name == '': _if_goto_node_id = str(kwargs.get('if_goto_node_id', '')) _if_goto_node_name = kwargs.get('if_goto_node_name', '') _goto_node_id = _if_goto_node_id _goto_node_name = _if_goto_node_name else: if _if_goto_node_else_id == '' and _if_goto_node_else_name == '': _if_goto_node_else_id = str(kwargs.get('if_goto_node_else_id', '')) _if_goto_node_else_name = kwargs.get('if_goto_node_else_name', '') _goto_node_id = _if_goto_node_else_id _goto_node_name = _if_goto_node_else_name # 处理跳转 if _goto_node_id != '': _next_id = _goto_node_id elif _goto_node_name != '': _next_id = Tools.get_node_id_by_name(_goto_node_name, pipeline_obj) if _next_id is None: raise RuntimeError( 'IfGoToNode Router Error: goto_node_name[%s] not found!' % _goto_node_name) else: # 没有跳转参数 if kwargs.get('default_to_next', False): # 跳转到下一个节点 _next_id = None _temp_id = str(int(pipeline_obj.current_node_id(run_id=run_id)) + 1) if _temp_id in pipeline_obj.pipeline.keys(): _next_id = _temp_id else: raise RuntimeError( 'IfGoToNode Router Error: goto para not found in context or pipeline para!') # 返回路由节点 return _next_id
if __name__ == '__main__': # 当程序自己独立运行时执行的操作 # 打印版本信息 print(('模块名:%s - %s\n' '作者:%s\n' '发布日期:%s\n' '版本:%s' % (__MOUDLE__, __DESCRIPT__, __AUTHOR__, __PUBLISH__, __VERSION__)))