HiveNetPipeline.pipeline module¶
管道处理框架
- class HiveNetPipeline.pipeline.Pipeline(name: str, pipeline_config, is_asyn=False, asyn_notify_fun=None, running_notify_fun=None, end_running_notify_fun=None, logger=None)[源代码]¶
基类:
object管道控制框架
- __init__(name: str, pipeline_config, is_asyn=False, asyn_notify_fun=None, running_notify_fun=None, end_running_notify_fun=None, logger=None)[源代码]¶
构造函数
- 参数
name (str) – 管道名称
pipeline_config (str|dict|list) –
管道配置, 支持传入3种类型:
str - json字符(可以是字典模式或数组模式)
dict - 配置字典, 例如{“1”: {节点1配置}, “2”: {节点2配置}, …}, 注意节点顺序必须是从1开始的连续整数
list - 配置数组, 例如[{节点1配置}, {节点2配置}, …], 比字典模式简单, 会自动生成连续的节点顺序
注: 节点配置说明如下
{
“name”: “节点配置名”,
”predealer”: “预处理器名”, # 在正式执行节点前先执行预处理器, 预处理器的返回值可以控制是否跳过当前节点, 置空代表不执行预处理
”predealer_execute_para”: {}, # 预处理器执行的传入参数, 作为**kwargs传入预处理器, 置空或不设置值的情况传入{}
”processor”: “处理器名”,
”processor_execute_para”: {}, # 处理器执行的传入参数, 作为**kwargs传入执行函数, 置空或不设置值的情况传入{}
”is_sub_pipeline”: False, # 该子节点处理器是否子管道处理器
”sub_pipeline_para”: {}, # 生成子管道的参数, 由处理器具体实现来定义
”context”: {}, # 要更新的上下文字典, 执行处理器前将更新该上下文
”router”: “”, # 路由器名, 执行完将执行该路由器找下一个执行节点, 置空或不设置值的情况直接按顺序找下一个节点
”router_para”: {}, # 路由器的传入参数, 作为**kwargs传入路由器, 置空或不设置值的情况传入{}
”exception_router”: “”, 执行处理器出现异常时执行的路由器名, 置空或不设置值将抛出异常并结束管道执行
”exception_router_para”: {} # 异常路由器的传入参数, 作为**kwargs传入路由器, 置空或不设置值的情况传入{}
}
is_asyn (bool) – default=False, 是否异步返回结果
asyn_notify_fun (function) –
default=None, 异步结果通知函数, 格式如下:
fun(name, run_id, status, context, output, pipeline)
name {str} - 管道名称
run_id {str} - 运行id
status {str} - 管道状态
context {dict} - 当前上下文
output {object} - 管道输出数据
pipeline {Pipeline} - 管道对象
注: 该函数可以为同步也可以为异步函数
running_notify_fun (function) –
default=None = 节点运行通知函数, 格式如下:,
fun(name, run_id, node_id, node_name, pipeline)
name {str} - 管道名称
run_id {str} - 运行id
node_id {str} - 运行节点id
node_name {str} - 运行节点配置名
pipeline {Pipeline} - 管道对象
注: 该函数可以为同步也可以为异步函数
end_running_notify_fun (function) –
default=None = 节点运行完成通知函数, 格式如下:,
fun(name, run_id, node_id, node_name, status, status_msg, pipeline)
name {str} - 管道名称
run_id {str} - 运行id
node_id {str} - 运行节点id
node_name {str} - 运行节点配置名
status {str} 执行状态, ‘S’ - 成功, ‘E’ - 出现异常, ‘K’ - 跳过节点
status_msg {str} 状态描述, 当异常时送入异常信息
pipeline {Pipeline} - 管道对象
注: 该函数可以为同步也可以为异步函数
logger (Simple_log.Logger) – default=None, 日志对象
- asyn_node_feeback(run_id: str, node_id: str, output=None, status: str = 'S', status_msg: str = 'S', context: dict = {})[源代码]¶
异步节点执行结果反馈
- 参数
run_id (str) – 运行id
node_id (str) – 节点配置id
output (object) – default=None, 节点执行输出结果
status (str) – default=’S’, 节点运行状态, ‘S’ - 成功, ‘E’ - 出现异常, ‘P’ - 暂停
status_msg (str) – default=’success’, 运行状态描述
context (dict) – default={}, 要修改的上下文信息
- context(run_id: str = None)[源代码]¶
获取管道当前上下文
- 参数
run_id (str) –
default=None, 要获取的管道运行ID
注: 如果不传入则获取最后执行的管道ID
- 返回
当前上下文字典
- 返回类型
dict
- current_node_id(run_id: str = None) str[源代码]¶
获取管道运行的当前节点ID
- 参数
run_id (str) –
default=None, 要获取的管道运行ID
注: 如果不传入则获取最后执行的管道ID
- 返回
当前运行的节点id
- 返回类型
str
- current_node_status(run_id: str = None) str[源代码]¶
获取管道运行的当前节点状态
- 参数
run_id (str) –
default=None, 要获取的管道运行ID
注: 如果不传入则获取最后执行的管道ID
- 返回
当前节点运行状态, I-初始化, R-正在执行, E-执行失败, S-执行成功, P-子管道暂停
- 返回类型
str
- current_node_status_msg(run_id: str = None) str[源代码]¶
获取管道运行的当前节点状态信息
异常时可以获取异常报错信息
- 参数
run_id (str) –
default=None, 要获取的管道运行ID
注: 如果不传入则获取最后执行的管道ID
- 返回
执行状态信息
- 返回类型
str
- get_node_process(run_id: str, node_id: str)[源代码]¶
获取当前节点运行进度
- 参数
run_id (str) – 运行id
node_id (str) – 节点配置id
- 返回
返回 total, done, job_msg 进度信息
- 返回类型
int, int, str
- classmethod get_plugin(plugin_type: str, name: str)[源代码]¶
获取制定插件
- 参数
plugin_type (str) –
插件类型
predealer - 预处理器
processer - 处理器
router - 路由器
name (str) – 插件名称
- 返回
插件类对象, 如果找不到返回None
- 返回类型
object
- load_checkpoint(json_str: str, ignore_exists: bool = False)[源代码]¶
装载所保存的运行状态
注: 如果run_id重复则不导入
- 参数
json_str (str) – 所保存的管道运行状态json串
ignore_exists (bool) – default=False, 是否忽略已存在运行管道
- node_process_feeback(run_id: str, node_id: str, total: int = None, done: int = None, job_msg: str = None)[源代码]¶
节点进度反馈函数
供节点运行过程中更新进度信息
- 参数
run_id (str) – 运行id
node_id (str) – 节点id
total (int) – default=None, 节点运行进度总任务数, 不送代表不更新
done (int) – default=None, 节点运行进度当前完成数, 不送代表不更新
job_msg (str) – default=None, 任务信息, 不送代表不更新
- output(run_id: str = None)[源代码]¶
获取管道运行输出结果
- 参数
run_id (str) –
default=None, 要获取的管道运行ID
注: 如果不传入则获取最后执行的管道ID
- 返回
管道输出结果, 如果管道运行未完成则返回None
- 返回类型
object
- pause(run_id: str = None)[源代码]¶
暂停管道执行
- 参数
run_id (str) –
default=None, 要暂停的管道运行ID
注: 如果不传入则获取最后执行的管道ID
- remove(run_id: str = None)[源代码]¶
删除指定管道执行
- 参数
run_id (str) –
default=None, 要处理的管道运行ID
注: 如果不传入则获取最后执行的管道ID
- resume(run_id: str = None, run_to_end: bool = False)[源代码]¶
从中断点重新执行
- 参数
run_id (str) –
default=None, 要处理的管道运行ID
注: 如果不传入则获取最后执行的管道ID
run_to_end (bool) – default=False, 当设置了step_by_step模式时, 可以通过该参数指定执行到结尾
- 返回
同步情况返回 run_id, status, output, 异步情况返回的status为R
- 返回类型
str, str, object
- save_checkpoint(run_id: str = None) str[源代码]¶
将运行状态保存为json串, 用于后续恢复
- 参数
run_id (str) – default=None, 管道运行id, 如果不传代表保存所有管道当前状态
所保存的管道运行状态json串 (-) –
- start(input_data=None, context: dict = None, run_id: str = None, is_step_by_step: bool = False)[源代码]¶
执行管道(从第一个节点开始执行)
- 参数
input_data (object) – default=None, 初始输入数据值
context (dict) – default=None, 初始上下文
run_id (str) – default=None, 指定的管道运行ID
is_step_by_step (bool) – default=False, 是否逐步执行, 即执行一步就pause, 通过resume执行下一步
- 返回
同步情况返回 run_id, status, output, 异步情况返回status为R
- 返回类型
str, str, object
- 引发
RuntimeError – 当状态为R、P时抛出异常
- class HiveNetPipeline.pipeline.PipelinePredealer[源代码]¶
基类:
object管道预处理器框架类
- classmethod pre_deal(input_data, context: dict, pipeline_obj, run_id: str, **kwargs)[源代码]¶
执行预处理
- 参数
input_data (object) – 处理器输入数据值, 除第一个处理器外, 该信息为上一个处理器的输出值
context (dict) – 传递上下文, 该字典信息将在整个管道处理过程中一直向下传递, 可以在处理器中改变该上下文信息
pipeline_obj (Pipeline) –
管道对象, 作用如下:
1、更新执行进度
2、输出执行日志
3、异步执行的情况主动通知继续执行管道处理
run_id (str) – 当前管道的运行id
传入的预处理扩展参数 (-) –
- 返回
是否继续执行该节点, True - 继续执行该节点, False - 跳过该节点直接执行下一个节点
- 返回类型
bool
- class HiveNetPipeline.pipeline.PipelineProcesser[源代码]¶
基类:
object管道处理器框架类
- classmethod execute(input_data, context: dict, pipeline_obj, run_id: str, **kwargs)[源代码]¶
执行处理
(可以为同步也可以为异步方法)
- 参数
input_data (object) – 处理器输入数据值, 除第一个处理器外, 该信息为上一个处理器的输出值
context (dict) – 传递上下文, 该字典信息将在整个管道处理过程中一直向下传递, 可以在处理器中改变该上下文信息
pipeline_obj (Pipeline) –
管道对象, 作用如下:
1、更新执行进度
2、输出执行日志
3、异步执行的情况主动通知继续执行管道处理
run_id (str) – 当前管道的运行id
传入的运行扩展参数 (-) –
- 返回
处理结果输出数据值, 供下一个处理器处理, 异步执行的情况返回None
- 返回类型
object
- class HiveNetPipeline.pipeline.PipelineRouter[源代码]¶
基类:
object管道路由器框架类
- class HiveNetPipeline.pipeline.SubPipeLineProcesser[源代码]¶
基类:
object子管道处理器
- classmethod execute(input_data, context: dict, pipeline_obj, run_id: str, sub_pipeline_obj, is_step_by_step: bool = False, is_resume: bool = False, run_to_end: bool = False, **kwargs)[源代码]¶
执行处理
(可以为同步也可以为异步方法)
- 参数
input_data (object) – 处理器输入数据值, 除第一个处理器外, 该信息为上一个处理器的输出值
context (dict) – 传递上下文, 该字典信息将在整个管道处理过程中一直向下传递, 可以在处理器中改变该上下文信息
pipeline_obj (Pipeline) – 发起的管道对象
run_id (str) – 当前管道的运行id
sub_pipeline_obj (Pipeline) – 要执行的子管道对象
is_step_by_step (bool) – default=False, 是否逐步执行, 即执行一步就pause, 通过resume执行下一步
is_resume (bool) – default=False, 是否恢复执行的模式
run_to_end (bool) – default=False, 当设置了step_by_step模式时, 可以通过该参数指定执行到结尾
传入的运行扩展参数 (-) –
- 返回
同步情况返回 run_id, status, output, 异步情况返回的status为R
- 返回类型
str, str, object