HiveNetCore.stream module¶
简单流数据获取及处理库
- class HiveNetCore.stream.BaseStream(back_forward=False, keep_wait_data=False, stop_by_excepiton=False, logger=None, dealer_exception_fun=None, stream_closed_fun=None)[源代码]¶
基类:
abc.ABC基础流数据处理定义基类, 定义流数据处理的基本框架函数
- 参数
back_forward (bool) – default=False, 是否允许反向移动,即跳转回前面已获取过的数据
keep_wait_data (bool) – default=False, 无数据时是否继续等待新数据进入,即到数据结尾后,关闭处理,还是继续等待扫描新数据
stop_by_excepiton (bool) – default=False, 当出现异常时是否中止流处理
logger (object) – default=None, 出现错误时进行error输出的日志类(需实现error方法),None代表不输出日志
dealer_exception_fun (function) –
default=None, 流处理异常时执行的通知函数,函数有6个入参:
stream_tag : string 流标识
stream_obj : object 流对象
deal_obj : object 正在处理的流对象
position : object 正在处理的流对象的位置
dealer_handle : fun 出现异常时所执行的处理函数对象
error_obj : object 异常对象,sys.exc_info()
trace_str : string 异常的堆栈信息
stream_closed_fun (function) –
default=None, 流处理结束时执行的通知函数,函数有2个入参:
stream_tag : string 流标识
stream_obj : object 流对象
position : object 正在处理的流对象的位置
closed_status : EnumStreamClosedStatus 关闭状态
- __init__(back_forward=False, keep_wait_data=False, stop_by_excepiton=False, logger=None, dealer_exception_fun=None, stream_closed_fun=None)[源代码]¶
构造函数
- 参数
back_forward (bool) – default=False, 是否允许反向移动,即跳转回前面已获取过的数据
keep_wait_data (bool) – default=False, 无数据时是否继续等待新数据进入,即到数据结尾后,关闭处理,还是继续等待扫描新数据
stop_by_excepiton (bool) – default=False, 当出现异常时是否中止流处理
logger (object) – default=None, 出现错误时进行error输出的日志类(需实现error方法),None代表不输出日志
dealer_exception_fun (function) –
default=None, 流处理异常时执行的通知函数,函数有6个入参:
stream_tag : string 流标识
stream_obj : object 流对象
deal_obj : object 正在处理的流对象
position : object 正在处理的流对象的位置
dealer_handle : fun 出现异常时所执行的处理函数对象
error_obj : object 异常对象,sys.exc_info()
trace_str : string 异常的堆栈信息
stream_closed_fun (function) –
default=None, 流处理结束时执行的通知函数,函数有2个入参:
stream_tag : string 流标识
stream_obj : object 流对象
position : object 正在处理的流对象的位置
closed_status : EnumStreamClosedStatus 关闭状态
- property back_forward¶
获取是否允许反向移动的标记
@property {bool}
- property keep_wait_data¶
获取无数据时是否继续等待新数据进入
@property {bool}
- move_forward(step=1, stream_tag='default')[源代码]¶
流从当前位置向前移动指定步数
- 参数
step (int) – default=1, 要移动的步数
stream_tag (string) – default=’default’, 需要处理的流处理标签
- 引发
KeyError – 当传入的流标识不存在时抛出该异常
AttributeError – 流不支持移动时,抛出该异常
EOFError – 当移动的位置超过流本身数据位置,抛出EOFError异常
- move_next(step=1, stream_tag='default')[源代码]¶
流从当前位置向后移动指定步数
- 参数
step (int) – default=1, 要移动的步数
stream_tag (string) – default=’default’, 需要处理的流处理标签
- 引发
KeyError – 当传入的流标识不存在时抛出该异常
AttributeError – 流不支持移动时,抛出该异常
EOFError – 当移动的位置超过流本身数据位置,抛出EOFError异常
- pause_stream(stream_tag='default')[源代码]¶
暂停指定标签的流处理
- 参数
stream_tag (string) – default=’default’, 需要暂停的流处理标签
- 引发
KeyError – 当传入的流标识不存在时抛出该异常
- resume_stream(stream_tag='default')[源代码]¶
恢复指定标签的流处理
- 参数
stream_tag (string) – default=’default’, 需要恢复的流处理标签
- 引发
KeyError – 当传入的流标识不存在时抛出该异常
- seek(position, stream_tag='default')[源代码]¶
移动到流的指定位置
- 参数
position (int) – 要移动到的位置(注意位置从0开始)
stream_tag (string) – default=’default’, 需要处理的流处理标签
- 引发
KeyError – 当传入的流标识不存在时抛出该异常
AttributeError – 当流处理不支持向前移动,但操作需要向前移动,则抛出该异常
EOFError – 当移动的位置超过流本身数据位置,抛出EOFError异常
- start_stream(stream_tag='default', is_sync=True, is_pause=False, seek_position=None, move_next_step=None, move_forward_step=None, **kwargs)[源代码]¶
启动指定的流数据处理
- 参数
stream_tag (string) – default=’default’, 所启动的流处理标签,用于后续调用stop_stream的时候使用
is_sync (bool) – default=True, True-同步完成,待流结束后才退出函数; False-异步处理,新启动线程执行流处理,函数直接返回
is_pause (bool) – default=False, 启动时是否暂停流处理(便于调用其他函数进行移动位置,仅在is_sync为False时有效)
seek_position (int) – default=None, 执行流处理前先移动到指定的位置(与move_next_step、move_forward_step不能共存)
move_next_step (int) – default=None, 执行流处理前先向后移动指定步数(seek_position、move_forward_step不能共存)
move_forward_step (int) – default=None, 执行流处理前先向前移动指定步数(与move_next_step、seek_position不能共存)
kwargs (**kwargs) – 启动流处理的动态key-value方式参数
- 引发
KeyError – stream_tag已经存在时,抛出该异常
- stop_stream(stream_tag='default', is_wait=True)[源代码]¶
关闭指定标签的流处理
- 参数
stream_tag (string) – default=’default’, 需要关闭的流处理标签
is_wait (bool) – default=True, 是否等待流关闭后再返回
- 引发
AttributeError – 当keep_wait_data为False时,会自动关闭流,调用本方法应直接抛出异常
KeyError – 当传入的流标识不存在时抛出该异常
- stop_stream_force(is_wait=True)[源代码]¶
强制关闭当前所有正在处理的流
- 参数
is_wait (bool) – default=True, 是否等待所有流关闭后再返回
- classmethod stream_decorator(stop_by_excepiton=False, logger=None, dealer_exception_fun=None, stream_closed_fun=None, stream_tag='stream_dealer', is_sync=True, seek_position=None, move_next_step=None, move_forward_step=None)[源代码]¶
流处理修饰函数, 通过该函数来简单实现流定义及处理
- 参数
stop_by_excepiton (bool) – default=False, 当出现异常时是否中止流处理
logger (object) – default=None, 出现错误时进行error输出的日志类(需实现error方法),None代表不输出日志
dealer_exception_fun (function) –
default=None, 流处理异常时执行的通知函数,函数有6个入参:
stream_tag : string 流标识
stream_obj : object 流对象
deal_obj : object
position : object 正在处理的流对象的位置
dealer_handle : fun 出现异常时所执行的处理函数对象
error_obj : object 异常对象,sys.exc_info()
trace_str : string 异常的堆栈信息
stream_closed_fun (function) –
default=None, 流处理结束时执行的通知函数,函数有2个入参:
stream_tag : string 流标识
stream_obj : object 流对象
position : object 正在处理的流对象的位置
closed_status : EnumStreamClosedStatus 关闭状态
stream_tag (string) – default=’stream_dealer’, 所启动的流处理标签,用于后续调用stop_stream的时候使用
is_sync (bool) – default=True, True-同步完成,待流结束后才退出函数; False-异步处理,新启动线程执行流处理,函数直接返回
seek_position (int) – default=None, 执行流处理前先移动到指定的位置(与move_next_step、move_forward_step不能共存)
move_next_step (int) – default=None, 执行流处理前先向后移动指定步数(seek_position、move_forward_step不能共存)
move_forward_step (int) – default=None, 执行流处理前先向前移动指定步数(与move_next_step、seek_position不能共存)
示例
@BaseStream.stream_decorator(stop_by_excepiton=True)
def dealer_fun(deal_obj, position, **kwargs):
# 进行流对象处理,deal_obj为传入的流对象,kwargs为函数自身的传入参数
pass
# 然后在实际要执行流处理的地方,启动流处理
dealer_fun(None, 0, key1=1, key2=2)
- class HiveNetCore.stream.EnumStreamClosedStatus(value)[源代码]¶
基类:
enum.Enum流中止的状态枚举值
- CallStop = 'CallStop'¶
- ExceptionExit = 'ExceptionExit'¶
- ForceStop = 'ForceStop'¶
- RunOver = 'RunOver'¶
- class HiveNetCore.stream.StringStream(stop_by_excepiton=False, logger=None, dealer_exception_fun=None, stream_closed_fun=None)[源代码]¶
基类:
HiveNetCore.stream.BaseStream字符串流, 继承BaseStream,实现字符串的流处理
- 参数
stop_by_excepiton (bool) – default=False, 当出现异常时是否中止流处理
logger (object) – default=None, 出现错误时进行error输出的日志类(需实现error方法),None代表不输出日志
dealer_exception_fun (function) –
default=None, 流处理异常时执行的通知函数,函数有6个入参:
stream_tag : string 流标识
stream_obj : object 流对象
deal_obj : object 正在处理的流对象
position : object 正在处理的流对象的位置
dealer_handle : fun 出现异常时所执行的处理函数对象
error_obj : object 异常对象,sys.exc_info()
trace_str : string 异常的堆栈信息
stream_closed_fun (function) –
default=None, 流处理结束时执行的通知函数,函数有2个入参:
stream_tag : string 流标识
stream_obj : object 流对象
position : object 正在处理的流对象的位置
closed_status : EnumStreamClosedStatus 关闭状态
示例
1、使用实例对象的方法(str_obj为要处理的字符串)
_stream = StringStream(stop_by_excepiton=False, logger=None, dealer_exception_fun=None, stream_closed_fun=None)
_stream.add_dealer(dealer_fun1, dealer_fun2, ….)
_stream.start_stream(stream_tag=’default’, is_sync=True, is_pause=False,
seek_position=None, move_next_step=None, move_forward_step=None, str_obj=’my test string’)
2、使用修饰符的方法(str_obj为要处理的字符串)
@StringStream.stream_decorator(stop_by_excepiton=False, logger=None, dealer_exception_fun=None, stream_closed_fun=None,
stream_tag=’stream_dealer’, is_sync=True, seek_position=None,
move_next_step=None, move_forward_step=None)
def string_stream_dealer_fun(deal_obj=None, position=’’, str_obj=’’, self_para1=’’, self_para2=’’):
do stream deal
# 启动流处理
string_stream_dealer_fun(None, 0, str_obj=’test’, self_para1=’’, self_para2=’’)
- __init__(stop_by_excepiton=False, logger=None, dealer_exception_fun=None, stream_closed_fun=None)[源代码]¶
重载构造函数,去掉无需设置的参数
- 参数
stop_by_excepiton (bool) – default=False, 当出现异常时是否中止流处理
logger (object) – default=None, 出现错误时进行error输出的日志类(需实现error方法),None代表不输出日志
dealer_exception_fun (function) –
default=None, 流处理异常时执行的通知函数,函数有6个入参:
stream_tag : string 流标识
stream_obj : object 流对象
deal_obj : object 正在处理的流对象
position : object 正在处理的流对象的位置
dealer_handle : fun 出现异常时所执行的处理函数对象
error_obj : object 异常对象,sys.exc_info()
trace_str : string 异常的堆栈信息
stream_closed_fun (function) –
default=None, 流处理结束时执行的通知函数,函数有2个入参:
stream_tag : string 流标识
stream_obj : object 流对象
position : object 正在处理的流对象的位置
closed_status : EnumStreamClosedStatus 关闭状态