HiveNetCore.stream module

简单流数据获取及处理库

class HiveNetCore.stream.BaseStream(back_forward=False, keep_wait_data=False, stop_by_excepiton=False, logger=None, dealer_exception_fun=None, stream_closed_fun=None)[源代码]

基类:ABC

基础流数据处理定义基类, 定义流数据处理的基本框架函数

参数
  • back_forward (bool) – default=False, 是否允许反向移动,即跳转回前面已获取过的数据

  • keep_wait_data (bool) – default=False, 无数据时是否继续等待新数据进入,即到数据结尾后,关闭处理,还是继续等待扫描新数据

  • stop_by_excepiton (bool) – default=False, 当出现异常时是否中止流处理

  • logger (object) – default=None, 出现错误时进行error输出的日志类(需实现error方法),None代表不输出日志

  • dealer_exception_fun (function) –

    default=None, 流处理异常时执行的通知函数,函数有6个入参:

    stream_tag : string 流标识

    stream_obj : object 流对象

    deal_obj : object 正在处理的流对象

    position : object 正在处理的流对象的位置

    dealer_handle : fun 出现异常时所执行的处理函数对象

    error_obj : object 异常对象,sys.exc_info()

    trace_str : string 异常的堆栈信息

  • stream_closed_fun (function) –

    default=None, 流处理结束时执行的通知函数,函数有2个入参:

    stream_tag : string 流标识

    stream_obj : object 流对象

    position : object 正在处理的流对象的位置

    closed_status : EnumStreamClosedStatus 关闭状态

__init__(back_forward=False, keep_wait_data=False, stop_by_excepiton=False, logger=None, dealer_exception_fun=None, stream_closed_fun=None)[源代码]

构造函数

参数
  • back_forward (bool) – default=False, 是否允许反向移动,即跳转回前面已获取过的数据

  • keep_wait_data (bool) – default=False, 无数据时是否继续等待新数据进入,即到数据结尾后,关闭处理,还是继续等待扫描新数据

  • stop_by_excepiton (bool) – default=False, 当出现异常时是否中止流处理

  • logger (object) – default=None, 出现错误时进行error输出的日志类(需实现error方法),None代表不输出日志

  • dealer_exception_fun (function) –

    default=None, 流处理异常时执行的通知函数,函数有6个入参:

    stream_tag : string 流标识

    stream_obj : object 流对象

    deal_obj : object 正在处理的流对象

    position : object 正在处理的流对象的位置

    dealer_handle : fun 出现异常时所执行的处理函数对象

    error_obj : object 异常对象,sys.exc_info()

    trace_str : string 异常的堆栈信息

  • stream_closed_fun (function) –

    default=None, 流处理结束时执行的通知函数,函数有2个入参:

    stream_tag : string 流标识

    stream_obj : object 流对象

    position : object 正在处理的流对象的位置

    closed_status : EnumStreamClosedStatus 关闭状态

add_dealer(*args)[源代码]

添加流数据处理函数句柄

参数

args (*args) – 要添加的处理函数句柄清单,可以随意增加多个

property back_forward

获取是否允许反向移动的标记

@property {bool}

clear_dealer()[源代码]

清空流数据处理函数句柄集

del_dealer(*args)[源代码]

删除流数据处理函数句柄

参数

args (*args) – 要删除的处理函数句柄清单,可以随意删除多个

property keep_wait_data

获取无数据时是否继续等待新数据进入

@property {bool}

move_forward(step=1, stream_tag='default')[源代码]

流从当前位置向前移动指定步数

参数
  • step (int) – default=1, 要移动的步数

  • stream_tag (string) – default=’default’, 需要处理的流处理标签

引发
  • KeyError – 当传入的流标识不存在时抛出该异常

  • AttributeError – 流不支持移动时,抛出该异常

  • EOFError – 当移动的位置超过流本身数据位置,抛出EOFError异常

move_next(step=1, stream_tag='default')[源代码]

流从当前位置向后移动指定步数

参数
  • step (int) – default=1, 要移动的步数

  • stream_tag (string) – default=’default’, 需要处理的流处理标签

引发
  • KeyError – 当传入的流标识不存在时抛出该异常

  • AttributeError – 流不支持移动时,抛出该异常

  • EOFError – 当移动的位置超过流本身数据位置,抛出EOFError异常

pause_stream(stream_tag='default')[源代码]

暂停指定标签的流处理

参数

stream_tag (string) – default=’default’, 需要暂停的流处理标签

引发

KeyError – 当传入的流标识不存在时抛出该异常

resume_stream(stream_tag='default')[源代码]

恢复指定标签的流处理

参数

stream_tag (string) – default=’default’, 需要恢复的流处理标签

引发

KeyError – 当传入的流标识不存在时抛出该异常

seek(position, stream_tag='default')[源代码]

移动到流的指定位置

参数
  • position (int) – 要移动到的位置(注意位置从0开始)

  • stream_tag (string) – default=’default’, 需要处理的流处理标签

引发
  • KeyError – 当传入的流标识不存在时抛出该异常

  • AttributeError – 当流处理不支持向前移动,但操作需要向前移动,则抛出该异常

  • EOFError – 当移动的位置超过流本身数据位置,抛出EOFError异常

start_stream(stream_tag='default', is_sync=True, is_pause=False, seek_position=None, move_next_step=None, move_forward_step=None, **kwargs)[源代码]

启动指定的流数据处理

参数
  • stream_tag (string) – default=’default’, 所启动的流处理标签,用于后续调用stop_stream的时候使用

  • is_sync (bool) – default=True, True-同步完成,待流结束后才退出函数; False-异步处理,新启动线程执行流处理,函数直接返回

  • is_pause (bool) – default=False, 启动时是否暂停流处理(便于调用其他函数进行移动位置,仅在is_sync为False时有效)

  • seek_position (int) – default=None, 执行流处理前先移动到指定的位置(与move_next_step、move_forward_step不能共存)

  • move_next_step (int) – default=None, 执行流处理前先向后移动指定步数(seek_position、move_forward_step不能共存)

  • move_forward_step (int) – default=None, 执行流处理前先向前移动指定步数(与move_next_step、seek_position不能共存)

  • kwargs (**kwargs) – 启动流处理的动态key-value方式参数

引发

KeyError – stream_tag已经存在时,抛出该异常

stop_stream(stream_tag='default', is_wait=True)[源代码]

关闭指定标签的流处理

参数
  • stream_tag (string) – default=’default’, 需要关闭的流处理标签

  • is_wait (bool) – default=True, 是否等待流关闭后再返回

引发
  • AttributeError – 当keep_wait_data为False时,会自动关闭流,调用本方法应直接抛出异常

  • KeyError – 当传入的流标识不存在时抛出该异常

stop_stream_force(is_wait=True)[源代码]

强制关闭当前所有正在处理的流

参数

is_wait (bool) – default=True, 是否等待所有流关闭后再返回

classmethod stream_decorator(stop_by_excepiton=False, logger=None, dealer_exception_fun=None, stream_closed_fun=None, stream_tag='stream_dealer', is_sync=True, seek_position=None, move_next_step=None, move_forward_step=None)[源代码]

流处理修饰函数, 通过该函数来简单实现流定义及处理

参数
  • stop_by_excepiton (bool) – default=False, 当出现异常时是否中止流处理

  • logger (object) – default=None, 出现错误时进行error输出的日志类(需实现error方法),None代表不输出日志

  • dealer_exception_fun (function) –

    default=None, 流处理异常时执行的通知函数,函数有6个入参:

    stream_tag : string 流标识

    stream_obj : object 流对象

    deal_obj : object

    position : object 正在处理的流对象的位置

    dealer_handle : fun 出现异常时所执行的处理函数对象

    error_obj : object 异常对象,sys.exc_info()

    trace_str : string 异常的堆栈信息

  • stream_closed_fun (function) –

    default=None, 流处理结束时执行的通知函数,函数有2个入参:

    stream_tag : string 流标识

    stream_obj : object 流对象

    position : object 正在处理的流对象的位置

    closed_status : EnumStreamClosedStatus 关闭状态

  • stream_tag (string) – default=’stream_dealer’, 所启动的流处理标签,用于后续调用stop_stream的时候使用

  • is_sync (bool) – default=True, True-同步完成,待流结束后才退出函数; False-异步处理,新启动线程执行流处理,函数直接返回

  • seek_position (int) – default=None, 执行流处理前先移动到指定的位置(与move_next_step、move_forward_step不能共存)

  • move_next_step (int) – default=None, 执行流处理前先向后移动指定步数(seek_position、move_forward_step不能共存)

  • move_forward_step (int) – default=None, 执行流处理前先向前移动指定步数(与move_next_step、seek_position不能共存)

示例

@BaseStream.stream_decorator(stop_by_excepiton=True)

def dealer_fun(deal_obj, position, **kwargs):

# 进行流对象处理,deal_obj为传入的流对象,kwargs为函数自身的传入参数

pass

# 然后在实际要执行流处理的地方,启动流处理

dealer_fun(None, 0, key1=1, key2=2)

class HiveNetCore.stream.EnumStreamClosedStatus(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[源代码]

基类:Enum

流中止的状态枚举值

CallStop = 'CallStop'
ExceptionExit = 'ExceptionExit'
ForceStop = 'ForceStop'
RunOver = 'RunOver'
class HiveNetCore.stream.StringStream(stop_by_excepiton=False, logger=None, dealer_exception_fun=None, stream_closed_fun=None)[源代码]

基类:BaseStream

字符串流, 继承BaseStream,实现字符串的流处理

参数
  • stop_by_excepiton (bool) – default=False, 当出现异常时是否中止流处理

  • logger (object) – default=None, 出现错误时进行error输出的日志类(需实现error方法),None代表不输出日志

  • dealer_exception_fun (function) –

    default=None, 流处理异常时执行的通知函数,函数有6个入参:

    stream_tag : string 流标识

    stream_obj : object 流对象

    deal_obj : object 正在处理的流对象

    position : object 正在处理的流对象的位置

    dealer_handle : fun 出现异常时所执行的处理函数对象

    error_obj : object 异常对象,sys.exc_info()

    trace_str : string 异常的堆栈信息

  • stream_closed_fun (function) –

    default=None, 流处理结束时执行的通知函数,函数有2个入参:

    stream_tag : string 流标识

    stream_obj : object 流对象

    position : object 正在处理的流对象的位置

    closed_status : EnumStreamClosedStatus 关闭状态

示例

1、使用实例对象的方法(str_obj为要处理的字符串)

_stream = StringStream(stop_by_excepiton=False, logger=None, dealer_exception_fun=None, stream_closed_fun=None)

_stream.add_dealer(dealer_fun1, dealer_fun2, ….)

_stream.start_stream(stream_tag=’default’, is_sync=True, is_pause=False,

seek_position=None, move_next_step=None, move_forward_step=None, str_obj=’my test string’)

2、使用修饰符的方法(str_obj为要处理的字符串)

@StringStream.stream_decorator(stop_by_excepiton=False, logger=None, dealer_exception_fun=None, stream_closed_fun=None,

stream_tag=’stream_dealer’, is_sync=True, seek_position=None,

move_next_step=None, move_forward_step=None)

def string_stream_dealer_fun(deal_obj=None, position=’’, str_obj=’’, self_para1=’’, self_para2=’’):

do stream deal

# 启动流处理

string_stream_dealer_fun(None, 0, str_obj=’test’, self_para1=’’, self_para2=’’)

__init__(stop_by_excepiton=False, logger=None, dealer_exception_fun=None, stream_closed_fun=None)[源代码]

重载构造函数,去掉无需设置的参数

参数
  • stop_by_excepiton (bool) – default=False, 当出现异常时是否中止流处理

  • logger (object) – default=None, 出现错误时进行error输出的日志类(需实现error方法),None代表不输出日志

  • dealer_exception_fun (function) –

    default=None, 流处理异常时执行的通知函数,函数有6个入参:

    stream_tag : string 流标识

    stream_obj : object 流对象

    deal_obj : object 正在处理的流对象

    position : object 正在处理的流对象的位置

    dealer_handle : fun 出现异常时所执行的处理函数对象

    error_obj : object 异常对象,sys.exc_info()

    trace_str : string 异常的堆栈信息

  • stream_closed_fun (function) –

    default=None, 流处理结束时执行的通知函数,函数有2个入参:

    stream_tag : string 流标识

    stream_obj : object 流对象

    position : object 正在处理的流对象的位置

    closed_status : EnumStreamClosedStatus 关闭状态