HiveNetPipeline.pipeline module

管道处理框架

class HiveNetPipeline.pipeline.Pipeline(name: str, pipeline_config, is_asyn=False, asyn_notify_fun=None, running_notify_fun=None, end_running_notify_fun=None, logger=None)[源代码]

基类:object

管道控制框架

__init__(name: str, pipeline_config, is_asyn=False, asyn_notify_fun=None, running_notify_fun=None, end_running_notify_fun=None, logger=None)[源代码]

构造函数

参数
  • name (str) – 管道名称

  • pipeline_config (str|dict) –

    管道配置json字符串(也支持传入字典), 注意节点顺序必须是从1开始的连续整数

    {

    “1”: {

    “name”: “节点配置名”,

    ”predealer”: “预处理器名”, # 在正式执行节点前先执行预处理器, 预处理器的返回值可以控制是否跳过当前节点, 置空代表不执行预处理

    ”predealer_execute_para”: {}, # 预处理器执行的传入参数, 作为**kwargs传入预处理器, 置空或不设置值的情况传入{}

    ”processor”: “处理器名”,

    ”processor_execute_para”: {}, # 处理器执行的传入参数, 作为**kwargs传入执行函数, 置空或不设置值的情况传入{}

    ”is_sub_pipeline”: False, # 该子节点处理器是否子管道处理器

    ”sub_pipeline_para”: {}, # 生成子管道的参数, 由处理器具体实现来定义

    ”context”: {}, # 要更新的上下文字典, 执行处理器前将更新该上下文

    ”router”: “”, # 路由器名, 执行完将执行该路由器找下一个执行节点, 置空或不设置值的情况直接按顺序找下一个节点

    ”router_para”: {}, # 路由器的传入参数, 作为**kwargs传入路由器, 置空或不设置值的情况传入{}

    ”exception_router”: “”, 执行处理器出现异常时执行的路由器名, 置空或不设置值将抛出异常并结束管道执行

    ”exception_router_para”: {} # 异常路由器的传入参数, 作为**kwargs传入路由器, 置空或不设置值的情况传入{}

    },

    ”2”: {

    },

    }

  • is_asyn (bool) – default=False, 是否异步返回结果

  • asyn_notify_fun (function) –

    default=None, 异步结果通知函数, 格式如下:

    fun(name, run_id, status, context, output, pipeline)

    name {str} - 管道名称

    run_id {str} - 运行id

    status {str} - 管道状态

    context {dict} - 当前上下文

    output {object} - 管道输出数据

    pipeline {Pipeline} - 管道对象

    注: 该函数可以为同步也可以为异步函数

  • running_notify_fun (function) –

    default=None = 节点运行通知函数, 格式如下:,

    fun(name, run_id, node_id, node_name, pipeline)

    name {str} - 管道名称

    run_id {str} - 运行id

    node_id {str} - 运行节点id

    node_name {str} - 运行节点配置名

    pipeline {Pipeline} - 管道对象

    注: 该函数可以为同步也可以为异步函数

  • end_running_notify_fun (function) –

    default=None = 节点运行完成通知函数, 格式如下:,

    fun(name, run_id, node_id, node_name, status, status_msg, pipeline)

    name {str} - 管道名称

    run_id {str} - 运行id

    node_id {str} - 运行节点id

    node_name {str} - 运行节点配置名

    status {str} 执行状态, ‘S’ - 成功, ‘E’ - 出现异常, ‘K’ - 跳过节点

    status_msg {str} 状态描述, 当异常时送入异常信息

    pipeline {Pipeline} - 管道对象

    注: 该函数可以为同步也可以为异步函数

  • logger (Simple_log.Logger) – default=None, 日志对象

classmethod add_plugin(class_obj)[源代码]

添加插件

参数

class_obj (object) – 插件类

asyn_node_feeback(run_id: str, node_id: str, output=None, status: str = 'S', status_msg: str = 'S', context: dict = {})[源代码]

异步节点执行结果反馈

参数
  • run_id (str) – 运行id

  • node_id (str) – 节点配置id

  • output (object) – default=None, 节点执行输出结果

  • status (str) – default=’S’, 节点运行状态, ‘S’ - 成功, ‘E’ - 出现异常, ‘P’ - 暂停

  • status_msg (str) – default=’success’, 运行状态描述

  • context (dict) – default={}, 要修改的上下文信息

context(run_id: Optional[str] = None)[源代码]

获取管道当前上下文

参数

run_id (str) –

default=None, 要获取的管道运行ID

注: 如果不传入则获取最后执行的管道ID

返回

当前上下文字典

返回类型

dict

current_node_id(run_id: Optional[str] = None) str[源代码]

获取管道运行的当前节点ID

参数

run_id (str) –

default=None, 要获取的管道运行ID

注: 如果不传入则获取最后执行的管道ID

返回

当前运行的节点id

返回类型

str

current_node_status(run_id: Optional[str] = None) str[源代码]

获取管道运行的当前节点状态

参数

run_id (str) –

default=None, 要获取的管道运行ID

注: 如果不传入则获取最后执行的管道ID

返回

当前节点运行状态, I-初始化, R-正在执行, E-执行失败, S-执行成功, P-子管道暂停

返回类型

str

current_node_status_msg(run_id: Optional[str] = None) str[源代码]

获取管道运行的当前节点状态信息

异常时可以获取异常报错信息

参数

run_id (str) –

default=None, 要获取的管道运行ID

注: 如果不传入则获取最后执行的管道ID

返回

执行状态信息

返回类型

str

get_node_process(run_id: str, node_id: str)[源代码]

获取当前节点运行进度

参数
  • run_id (str) – 运行id

  • node_id (str) – 节点配置id

返回

返回 total, done, job_msg 进度信息

返回类型

int, int, str

classmethod get_plugin(plugin_type: str, name: str)[源代码]

获取制定插件

参数
  • plugin_type (str) –

    插件类型

    predealer - 预处理器

    processer - 处理器

    router - 路由器

  • name (str) – 插件名称

返回

插件类对象, 如果找不到返回None

返回类型

object

load_checkpoint(json_str: str, ignore_exists: bool = False)[源代码]

装载所保存的运行状态

注: 如果run_id重复则不导入

参数
  • json_str (str) – 所保存的管道运行状态json串

  • ignore_exists (bool) – default=False, 是否忽略已存在运行管道

classmethod load_plugins_by_file(file: str)[源代码]

装载指定文件的管道插件

参数

file (str) – 模块文件路径

classmethod load_plugins_by_path(path: str)[源代码]

装载指定目录下的管道插件(处理器和路由器)

参数

path (str) – 要装载的目录

classmethod load_plugins_embed()[源代码]

装载集成的管道插件

log_debug(msg: str, *args, **kwargs)[源代码]

输出debug日志

参数

msg (str) – 要输出的日志

log_error(msg: str, *args, **kwargs)[源代码]

输出error日志

参数

msg (str) – 要输出的日志

log_info(msg: str, *args, **kwargs)[源代码]

输出info日志

参数

msg (str) – 要输出的日志

log_warning(msg: str, *args, **kwargs)[源代码]

输出warning日志

参数

msg (str) – 要输出的日志

node_process_feeback(run_id: str, node_id: str, total: Optional[int] = None, done: Optional[int] = None, job_msg: Optional[str] = None)[源代码]

节点进度反馈函数

供节点运行过程中更新进度信息

参数
  • run_id (str) – 运行id

  • node_id (str) – 节点id

  • total (int) – default=None, 节点运行进度总任务数, 不送代表不更新

  • done (int) – default=None, 节点运行进度当前完成数, 不送代表不更新

  • job_msg (str) – default=None, 任务信息, 不送代表不更新

output(run_id: Optional[str] = None)[源代码]

获取管道运行输出结果

参数

run_id (str) –

default=None, 要获取的管道运行ID

注: 如果不传入则获取最后执行的管道ID

返回

管道输出结果, 如果管道运行未完成则返回None

返回类型

object

pause(run_id: Optional[str] = None)[源代码]

暂停管道执行

参数

run_id (str) –

default=None, 要暂停的管道运行ID

注: 如果不传入则获取最后执行的管道ID

remove(run_id: Optional[str] = None)[源代码]

删除指定管道执行

参数

run_id (str) –

default=None, 要处理的管道运行ID

注: 如果不传入则获取最后执行的管道ID

resume(run_id: Optional[str] = None, run_to_end: bool = False)[源代码]

从中断点重新执行

参数
  • run_id (str) –

    default=None, 要处理的管道运行ID

    注: 如果不传入则获取最后执行的管道ID

  • run_to_end (bool) – default=False, 当设置了step_by_step模式时, 可以通过该参数指定执行到结尾

返回

同步情况返回 run_id, status, output, 异步情况返回的status为R

返回类型

str, str, object

save_checkpoint(run_id: Optional[str] = None) str[源代码]

将运行状态保存为json串, 用于后续恢复

参数
  • run_id (str) – default=None, 管道运行id, 如果不传代表保存所有管道当前状态

  • 所保存的管道运行状态json串 (-) –

start(input_data=None, context: Optional[dict] = None, run_id: Optional[str] = None, is_step_by_step: bool = False)[源代码]

执行管道(从第一个节点开始执行)

参数
  • input_data (object) – default=None, 初始输入数据值

  • context (dict) – default=None, 初始上下文

  • run_id (str) – default=None, 指定的管道运行ID

  • is_step_by_step (bool) – default=False, 是否逐步执行, 即执行一步就pause, 通过resume执行下一步

返回

同步情况返回 run_id, status, output, 异步情况返回status为R

返回类型

str, str, object

引发

RuntimeError – 当状态为R、P时抛出异常

status(run_id: Optional[str] = None)[源代码]

获取管道运行状态

参数

run_id (str) –

default=None, 要获取的管道运行ID

注: 如果不传入则获取最后执行的管道ID

返回

当前状态, I-初始化, P-暂停, R-运行中, S-成功结束, E-异常结束

返回类型

str

trace_list(run_id: Optional[str] = None)[源代码]

获取管道当前执行追踪列表

参数

run_id (str) –

default=None, 要获取的管道运行ID

注: 如果不传入则获取最后执行的管道ID

返回

当前执行追踪列表

返回类型

list

class HiveNetPipeline.pipeline.PipelinePredealer[源代码]

基类:object

管道预处理器框架类

classmethod initialize()[源代码]

初始化处理类, 仅在装载的时候执行一次初始化动作

classmethod pre_deal(input_data, context: dict, pipeline_obj, run_id: str, **kwargs)[源代码]

执行预处理

参数
  • input_data (object) – 处理器输入数据值, 除第一个处理器外, 该信息为上一个处理器的输出值

  • context (dict) – 传递上下文, 该字典信息将在整个管道处理过程中一直向下传递, 可以在处理器中改变该上下文信息

  • pipeline_obj (Pipeline) –

    管道对象, 作用如下:

    1、更新执行进度

    2、输出执行日志

    3、异步执行的情况主动通知继续执行管道处理

  • run_id (str) – 当前管道的运行id

  • 传入的预处理扩展参数 (-) –

返回

是否继续执行该节点, True - 继续执行该节点, False - 跳过该节点直接执行下一个节点

返回类型

bool

classmethod predealer_name() str[源代码]

预处理器名称, 唯一标识处理器

返回

当前处理器名称

返回类型

str

class HiveNetPipeline.pipeline.PipelineProcesser[源代码]

基类:object

管道处理器框架类

classmethod execute(input_data, context: dict, pipeline_obj, run_id: str, **kwargs)[源代码]

执行处理

(可以为同步也可以为异步方法)

参数
  • input_data (object) – 处理器输入数据值, 除第一个处理器外, 该信息为上一个处理器的输出值

  • context (dict) – 传递上下文, 该字典信息将在整个管道处理过程中一直向下传递, 可以在处理器中改变该上下文信息

  • pipeline_obj (Pipeline) –

    管道对象, 作用如下:

    1、更新执行进度

    2、输出执行日志

    3、异步执行的情况主动通知继续执行管道处理

  • run_id (str) – 当前管道的运行id

  • 传入的运行扩展参数 (-) –

返回

处理结果输出数据值, 供下一个处理器处理, 异步执行的情况返回None

返回类型

object

classmethod initialize()[源代码]

初始化处理类, 仅在装载的时候执行一次初始化动作

classmethod is_asyn() bool[源代码]

是否异步处理

返回

标识处理器是否异步处理, 返回Fasle代表管道要等待处理器执行完成

返回类型

bool

classmethod processer_name() str[源代码]

处理器名称, 唯一标识处理器

返回

当前处理器名称

返回类型

str

class HiveNetPipeline.pipeline.PipelineRouter[源代码]

基类:object

管道路由器框架类

classmethod get_next(output, context: dict, pipeline_obj, run_id: str, **kwargs)[源代码]

获取路由下一节点

参数
  • output (object) – 上一个节点的输出结果

  • context (dict) – 上下文字典

  • pipeline_obj (Pipeline) – 管道对象

  • run_id (str) – 当前管道的运行id

  • 传入的扩展参数 (-) –

返回

下一节点的配置id, 如果是最后的节点, 返回None

返回类型

str

classmethod initialize()[源代码]

初始化处理类, 仅在装载的时候执行一次初始化动作

classmethod router_name() str[源代码]

路由器名称, 唯一标识路由器

返回

当前路由器名称

返回类型

str

class HiveNetPipeline.pipeline.SubPipeLineProcesser[源代码]

基类:object

子管道处理器

classmethod execute(input_data, context: dict, pipeline_obj, run_id: str, sub_pipeline_obj, is_step_by_step: bool = False, is_resume: bool = False, run_to_end: bool = False, **kwargs)[源代码]

执行处理

(可以为同步也可以为异步方法)

参数
  • input_data (object) – 处理器输入数据值, 除第一个处理器外, 该信息为上一个处理器的输出值

  • context (dict) – 传递上下文, 该字典信息将在整个管道处理过程中一直向下传递, 可以在处理器中改变该上下文信息

  • pipeline_obj (Pipeline) – 发起的管道对象

  • run_id (str) – 当前管道的运行id

  • sub_pipeline_obj (Pipeline) – 要执行的子管道对象

  • is_step_by_step (bool) – default=False, 是否逐步执行, 即执行一步就pause, 通过resume执行下一步

  • is_resume (bool) – default=False, 是否恢复执行的模式

  • run_to_end (bool) – default=False, 当设置了step_by_step模式时, 可以通过该参数指定执行到结尾

  • 传入的运行扩展参数 (-) –

返回

同步情况返回 run_id, status, output, 异步情况返回的status为R

返回类型

str, str, object

classmethod get_sub_pipeline(input_data, context: dict, pipeline_obj, run_id: str, sub_pipeline_para: dict, **kwargs)[源代码]

获取子管道对象的函数

参数
  • input_data (object) – 处理器输入数据值, 除第一个处理器外, 该信息为上一个处理器的输出值

  • context (dict) – 传递上下文, 该字典信息将在整个管道处理过程中一直向下传递, 可以在处理器中改变该上下文信息

  • pipeline_obj (Pipeline) – 发起的管道对象

  • run_id (str) – 当前管道的运行id

  • sub_pipeline_para (dict) – 获取子管道对象的参数字典

  • 传入的运行扩展参数 (-) –

返回

返回获取到的子管道对象(注意该子管道对象的使用模式必须与is_asyn一致)

返回类型

Pipeline

classmethod initialize()[源代码]

初始化处理类, 仅在装载的时候执行一次初始化动作

classmethod is_asyn() bool[源代码]

是否异步处理

返回

标识处理器是否异步处理, 返回Fasle代表管道要等待处理器执行完成

返回类型

bool

classmethod processer_name() str[源代码]

处理器名称, 唯一标识处理器

返回

当前处理器名称

返回类型

str

class HiveNetPipeline.pipeline.Tools[源代码]

基类:object

管道开发的工具函数

classmethod get_node_id_by_name(node_name: str, pipeline_obj)[源代码]

通过节点配置名获取节点id

参数
  • node_name (str) – 节点配置名

  • pipeline_obj (Pipeline) – 管道对象