HiveNetCore.stream 源代码

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
#
# Copyright 2018 黎慧剑
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

"""
简单流数据获取及处理库

@module stream
@file stream.py

"""

import os
import sys
import traceback
import threading
try:
    from gevent import sleep
except ImportError:
    from time import sleep
from enum import Enum
from abc import ABC, abstractmethod  # 利用abc模块实现抽象类
# 根据当前文件路径将包路径纳入, 在非安装的情况下可以引用到
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)))
from HiveNetCore.generic import NullObj


__MOUDLE__ = 'stream'  # 模块名
__DESCRIPT__ = u'简单流数据获取及处理库'  # 模块描述
__VERSION__ = '0.1.0'  # 版本
__AUTHOR__ = u'黎慧剑'  # 作者
__PUBLISH__ = '2018.09.01'  # 发布日期


[文档]class EnumStreamClosedStatus(Enum): """ 流中止的状态枚举值 @enum {string} """ RunOver = 'RunOver' # 流运行到结尾关闭 CallStop = 'CallStop' # 外部调用停止方法关闭 ForceStop = 'ForceStop' # 外部调用强制停止方法关闭 ExceptionExit = 'ExceptionExit' # 出现异常关闭
[文档]class BaseStream(ABC): """ 基础流数据处理定义基类, 定义流数据处理的基本框架函数 @param {bool} back_forward=False - 是否允许反向移动,即跳转回前面已获取过的数据 @param {bool} keep_wait_data=False - 无数据时是否继续等待新数据进入,即到数据结尾后,关闭处理,还是继续等待扫描新数据 @param {bool} stop_by_excepiton=False - 当出现异常时是否中止流处理 @param {object} logger=None - 出现错误时进行error输出的日志类(需实现error方法),None代表不输出日志 @param {function} dealer_exception_fun=None - 流处理异常时执行的通知函数,函数有6个入参: stream_tag : string 流标识 stream_obj : object 流对象 deal_obj : object 正在处理的流对象 position : object 正在处理的流对象的位置 dealer_handle : fun 出现异常时所执行的处理函数对象 error_obj : object 异常对象,sys.exc_info() trace_str : string 异常的堆栈信息 @param {function} stream_closed_fun=None - 流处理结束时执行的通知函数,函数有2个入参: stream_tag : string 流标识 stream_obj : object 流对象 position : object 正在处理的流对象的位置 closed_status : EnumStreamClosedStatus 关闭状态 """ ############################# # 内部变量 ############################# _back_forward = False # 是否允许反向移动 _keep_wait_data = False # 无数据时是否继续等待新数据进入,不关闭流 _stop_by_excepiton = False # 当出现异常时是否中止流处理 _logger = None # 日志处理类 _dealer_exception_fun = None # 流处理异常时执行的通知函数 _stream_closed_fun = None # 流处理结束的通知函数 _dealer_handles = None # 处理流数据的处理函数句柄字典,key为函数句柄,value统一为None _stream_list = None # 正在处理的流对象列表,key为stream_tag,value为stream_obj _stream_list_tag = None # 正在处理的流对象对应的处理标记,key为stream_tag,value为(_stop_tag, _pause_tag): _stream_list_lock = None # 流处理对象列表更新锁 _force_stop_tag = False # 强制关闭所有流处理的标记 ############################# # 属性 ############################# @property def back_forward(self): """ 获取是否允许反向移动的标记 @property {bool} """ return self._back_forward @property def keep_wait_data(self): """ 获取无数据时是否继续等待新数据进入 @property {bool} """ return self._keep_wait_data ############################# # 构造函数 #############################
[文档] def __init__(self, back_forward=False, keep_wait_data=False, stop_by_excepiton=False, logger=None, dealer_exception_fun=None, stream_closed_fun=None): """ 构造函数 @param {bool} back_forward=False - 是否允许反向移动,即跳转回前面已获取过的数据 @param {bool} keep_wait_data=False - 无数据时是否继续等待新数据进入,即到数据结尾后,关闭处理,还是继续等待扫描新数据 @param {bool} stop_by_excepiton=False - 当出现异常时是否中止流处理 @param {object} logger=None - 出现错误时进行error输出的日志类(需实现error方法),None代表不输出日志 @param {function} dealer_exception_fun=None - 流处理异常时执行的通知函数,函数有6个入参: stream_tag : string 流标识 stream_obj : object 流对象 deal_obj : object 正在处理的流对象 position : object 正在处理的流对象的位置 dealer_handle : fun 出现异常时所执行的处理函数对象 error_obj : object 异常对象,sys.exc_info() trace_str : string 异常的堆栈信息 @param {function} stream_closed_fun=None - 流处理结束时执行的通知函数,函数有2个入参: stream_tag : string 流标识 stream_obj : object 流对象 position : object 正在处理的流对象的位置 closed_status : EnumStreamClosedStatus 关闭状态 """ self._dealer_handles = dict() # 处理流数据的处理函数句柄字典,key为函数句柄,value统一为None self._stream_list = dict() # 正在处理的流对象列表,key为stream_tag,value为stream_obj # 正在处理的流对象对应的处理标记,key为stream_tag,value为(_stop_tag, _pause_tag): self._stream_list_tag = dict() self._stream_list_lock = threading.RLock() # 流处理对象列表更新锁 self._back_forward = back_forward self._keep_wait_data = keep_wait_data self._stop_by_excepiton = stop_by_excepiton self._logger = logger self._dealer_exception_fun = dealer_exception_fun self._stream_closed_fun = stream_closed_fun
############################# # 内部函数 ############################# def _stream_deal_fun(self, tid=0, stream_tag=''): """ 流顺序处理函数, 按顺序进行流对象的获取和处理, 每获取一个对象,调用注册的处理函数 @param {int} tid=0 - 线程ID @param {string} stream_tag='' - 流处理标签 @throws {KeyError} - 当传入错误的stream_tag,抛出该异常 """ _closed_status = EnumStreamClosedStatus.RunOver self._stream_list_lock.acquire() try: if stream_tag not in self._stream_list.keys(): # 传入错误的标识 raise KeyError(u'Unknow tid!') _stream_obj = self._stream_list[stream_tag] finally: self._stream_list_lock.release() try: _pos = self._current_position(_stream_obj) while True: try: # 判断是否暂停或退出 if self._force_stop_tag: # 强制退出 _closed_status = EnumStreamClosedStatus.ForceStop return if self._stream_list_tag[stream_tag][0]: # 当前流的停止标记 _closed_status = EnumStreamClosedStatus.CallStop return if self._stream_list_tag[stream_tag][1]: # 当前流的暂停标记 sleep(0.01) continue # 循环进行流处理 _pos = self._current_position(_stream_obj) _get_obj = self._next(_stream_obj) for _handle in self._dealer_handles: # 根据配置循环进行流处理 try: _handle(_get_obj, _pos) except: # 先输出日志 _error_obj = sys.exc_info() _trace_str = traceback.format_exc() if self._logger is not None: _log_str = 'stream deal exception(%s):\n%s' % ( str(_handle), _trace_str ) # 通知函数 if self._dealer_exception_fun is not None: try: self._dealer_exception_fun(stream_tag=stream_tag, stream_obj=_stream_obj, deal_obj=_get_obj, position=_pos, dealer_handle=_handle, error_obj=_error_obj, trace_str=_trace_str) except: if self._logger is not None: _log_str = 'call dealer_exception_fun exception(%s):\n%s' % ( str(_handle), traceback.format_exc() ) self._logger.error(_log_str) # 判断是否要退出 if self._stop_by_excepiton: _closed_status = EnumStreamClosedStatus.ExceptionExit return # 准备执行下一个 sleep(0.01) except StopIteration: if self._keep_wait_data: # 没有获取到数据,但继续循环尝试获取 sleep(0.01) continue else: # 已经到结尾了,结束流处理 return finally: # 关闭流处理 try: if self._stream_closed_fun is not None: self._stream_closed_fun(stream_tag=stream_tag, stream_obj=_stream_obj, position=_pos, closed_status=_closed_status) except: if self._logger is not None: _log_str = 'call stream_closed_fun exception:\n%s' % traceback.format_exc() self._logger.error(_log_str) try: self._close_stream(stream_obj=_stream_obj) except Exception: if self._logger is not None: _log_str = 'call close_stream exception:\n%s' % traceback.format_exc() self._logger.error(_log_str) # 情况流列表 self._stream_list_lock.acquire() del self._stream_list[stream_tag] del self._stream_list_tag[stream_tag] self._stream_list_lock.release() @classmethod def _stream_deal_fun_decorator(cls, tid=0, stream_obj=None, stop_by_excepiton=False, logger=None, dealer_exception_fun=None, stream_closed_fun=None, stream_tag='stream_dealer', dealer_fun=None, **kwargs_dealer_fun): """ 函数修饰符方式流处理的处理函数 @param {int} tid=0 - 线程ID @param {object} stream_obj=None - 要处理的流对象 @param {bool} stop_by_excepiton=False - 当出现异常时是否中止流处理 @param {object} logger=None - 出现错误时进行error输出的日志类(需实现error方法),None代表不输出日志 @param {function} dealer_exception_fun=None - 流处理异常时执行的通知函数,函数有6个入参: stream_tag : string 流标识 stream_obj : object 流对象 deal_obj : object 正在处理的流对象 position : object 正在处理的流对象的位置 dealer_handle : fun 出现异常时所执行的处理函数对象 error_obj : object 异常对象,sys.exc_info() trace_str : string 异常的堆栈信息 @param {function} stream_closed_fun=None - 流处理结束时执行的通知函数,函数有2个入参: stream_tag : string 流标识 stream_obj : object 流对象 position : object 正在处理的流对象的位置 closed_status : EnumStreamClosedStatus 关闭状态 @param {string} stream_tag='stream_dealer' - 所启动的流处理标签,用于后续调用stop_stream的时候使用 @param {dict} dealer_fun=None - 原函数对象执行传入的动态key-value参数 """ try: _closed_status = EnumStreamClosedStatus.RunOver _pos = cls._current_position(stream_obj) # 组织动态函数,目的是传入对应的参数 # _exec_fun_str = 'dealer_fun(_get_obj, _pos' # for _key in kwargs_dealer_fun.keys(): # _exec_fun_str = '%s, %s=%s' % ( # _exec_fun_str, # _key, # 'kwargs_dealer_fun[\'' + _key + '\']' # ) # _exec_fun_str = _exec_fun_str + ')' while True: try: # 循环进行流处理 _pos = cls._current_position(stream_obj) _get_obj = cls._next(stream_obj=stream_obj) try: dealer_fun(_get_obj, _pos, **kwargs_dealer_fun) # exec(_exec_fun_str) except Exception: # 先输出日志 _error_obj = sys.exc_info() _trace_str = traceback.format_exc() if logger is not None: _log_str = 'stream decorator deal exception(%s):\n%s' % ( str(dealer_fun), _trace_str ) # 通知函数 if dealer_exception_fun is not None: try: dealer_exception_fun(stream_tag=stream_tag, stream_obj=stream_obj, deal_obj=_get_obj, position=_pos, dealer_handle=dealer_fun, error_obj=_error_obj, trace_str=_trace_str) except Exception: if logger is not None: _log_str = 'call dealer_exception_fun exception(%s):\n%s' % ( str(dealer_fun), traceback.format_exc() ) logger.error(_log_str) # 判断是否要退出 if stop_by_excepiton: _closed_status = EnumStreamClosedStatus.ExceptionExit return # 准备执行下一个 sleep(0.01) except StopIteration: # 已经到结尾了,结束流处理 return finally: # 关闭流处理 try: if stream_closed_fun is not None: stream_closed_fun(stream_tag=stream_tag, stream_obj=stream_obj, position=_pos, closed_status=_closed_status) except Exception: if logger is not None: _log_str = 'call stream_closed_fun exception:\n%s' % traceback.format_exc() logger.error(_log_str) try: cls._close_stream(stream_obj=stream_obj) except Exception: if logger is not None: _log_str = 'call close_stream exception:\n%s' % traceback.format_exc() logger.error(_log_str) ############################# # 公共处理函数 #############################
[文档] def add_dealer(self, *args): """ 添加流数据处理函数句柄 @param {*args} args - 要添加的处理函数句柄清单,可以随意增加多个 """ for _item in args: self._dealer_handles[_item] = None
[文档] def del_dealer(self, *args): """ 删除流数据处理函数句柄 @param {*args} args - 要删除的处理函数句柄清单,可以随意删除多个 """ for _item in args: if _item in self._dealer_handles.keys(): del self._dealer_handles[_item]
[文档] def clear_dealer(self): """ 清空流数据处理函数句柄集 """ self._dealer_handles.clear()
############################# # 需继承类实现的内部处理函数 ############################# @staticmethod @abstractmethod def _init_stream(**kwargs): """ 抽象方法,根据传入参数初始化流对象(实现类自定义,也可以是标识),基类将保留该对象并供后续流处理函数调用 @param {**kwargs} kwargs 启动流处理的动态key-value方式参数 @returns {object} - 传入后续处理的流对象 """ pass @staticmethod @abstractmethod def _next(stream_obj): """ 抽象方法,从流中获取下一个对象,并将流指针指向下一个位置 @param {object} stream_obj - _init_stream生成的流对象 @throws {StopIteration} - 如果到了流结尾,抛出该异常 """ pass @staticmethod @abstractmethod def _close_stream(stream_obj): """ 抽象方法,关闭流对象(与_init_stream对应),在中止流处理时调用 @param {object} stream_obj - _init_stream生成的流对象 """ pass @staticmethod @abstractmethod def _seek(stream_obj, position): """ 抽象方法,移动到流的指定位置 @param {object} stream_obj - _init_stream生成的流对象 @param {object} position - 要移动到的位置(具体类型由实现类确定) @throws {AttributeError} - 当流处理不支持向前移动,但操作需要向前移动,则抛出该异常 @throws {EOFError} - 当移动的位置超过流本身数据位置,抛出EOFError异常 """ pass @staticmethod @abstractmethod def _move_next(stream_obj, step=1): """ 抽象方法,流从当前位置向后移动指定步数 @param {object} stream_obj - _init_stream生成的流对象 @param {int} step=1 - 要移动的步数 @throws {AttributeError} - 流不支持移动时,抛出该异常 @throws {EOFError} - 当移动的位置超过流本身数据位置,抛出EOFError异常 """ pass @staticmethod @abstractmethod def _move_forward(stream_obj, step=1): """ 抽象方法,流从当前位置向前移动指定步数 @param {object} stream_obj - _init_stream生成的流对象 @param {int} step=1 - 要移动的步数 @throws {AttributeError} - 流不支持移动时,抛出该异常 @throws {EOFError} - 当移动的位置超过流本身数据位置,抛出EOFError异常 """ pass @staticmethod @abstractmethod def _current_position(stream_obj): """ 抽象方法,获取当前流的位置信息 @param {object} stream_obj - _init_stream生成的流对象 @returns {object} - 返回流对象的当前位置(具体类型由实现类确定) """ pass ############################# # 对外的通用流处理函数 #############################
[文档] def start_stream(self, stream_tag='default', is_sync=True, is_pause=False, seek_position=None, move_next_step=None, move_forward_step=None, **kwargs): """ 启动指定的流数据处理 @param {string} stream_tag='default' - 所启动的流处理标签,用于后续调用stop_stream的时候使用 @param {bool} is_sync=True - True-同步完成,待流结束后才退出函数; False-异步处理,新启动线程执行流处理,函数直接返回 @param {bool} is_pause=False - 启动时是否暂停流处理(便于调用其他函数进行移动位置,仅在is_sync为False时有效) @param {int} seek_position=None - 执行流处理前先移动到指定的位置(与move_next_step、move_forward_step不能共存) @param {int} move_next_step=None - 执行流处理前先向后移动指定步数(seek_position、move_forward_step不能共存) @param {int} move_forward_step=None - 执行流处理前先向前移动指定步数(与move_next_step、seek_position不能共存) @param {**kwargs} kwargs - 启动流处理的动态key-value方式参数 @throws {KeyError} - stream_tag已经存在时,抛出该异常 """ self._stream_list_lock.acquire() try: if stream_tag in self._stream_list.keys(): # 流处理标识不能重复 raise KeyError(u'处理标识已存在') # 打开流对象 _stream_obj = self._init_stream(**kwargs) self._stream_list[stream_tag] = _stream_obj self._stream_list_tag[stream_tag] = (False, is_pause) finally: self._stream_list_lock.release() # 处理流位置 if seek_position is not None: self._seek(stream_obj=_stream_obj, position=seek_position) elif move_next_step is not None: self._move_next(stream_obj=_stream_obj, step=move_next_step) elif move_forward_step is not None: self._move_forward(stream_obj=_stream_obj, step=move_forward_step) if is_sync: # 同步模式,直接处理流 self._stream_deal_fun(stream_tag=stream_tag) else: # 异步模式,通过线程方式处理 _dealer_thread = threading.Thread( target=self._stream_deal_fun, args=(1, stream_tag), name='Thread-Deal-Fun' ) _dealer_thread.setDaemon(True) _dealer_thread.start()
[文档] def stop_stream(self, stream_tag='default', is_wait=True): """ 关闭指定标签的流处理 @param {string} stream_tag='default' - 需要关闭的流处理标签 @param {bool} is_wait=True - 是否等待流关闭后再返回 @throws {AttributeError} - 当keep_wait_data为False时,会自动关闭流,调用本方法应直接抛出异常 @throws {KeyError} - 当传入的流标识不存在时抛出该异常 """ self._stream_list_lock.acquire() try: if not self._keep_wait_data: # 自动关闭流,参数无效 raise AttributeError(u'流参数为自动关闭,不允许手工关闭') if stream_tag not in self._stream_list.keys(): # 流标识不存在 raise KeyError(u'处理标识不存在') # 设置停止标签 self._stream_list_tag[stream_tag] = (True, self._stream_list_tag[stream_tag][1]) finally: self._stream_list_lock.release() # 是否等待关闭后才返回 if is_wait: while True: if stream_tag not in self._stream_list.keys(): break sleep(0.01)
[文档] def pause_stream(self, stream_tag='default'): """ 暂停指定标签的流处理 @param {string} stream_tag='default' - 需要暂停的流处理标签 @throws {KeyError} - 当传入的流标识不存在时抛出该异常 """ self._stream_list_lock.acquire() try: if stream_tag not in self._stream_list.keys(): # 流标识不存在 raise KeyError(u'Unknow stream_tag!') # 设置暂停标签 self._stream_list_tag[stream_tag] = (self._stream_list_tag[stream_tag][0], True) finally: self._stream_list_lock.release()
[文档] def resume_stream(self, stream_tag='default'): """ 恢复指定标签的流处理 @param {string} stream_tag='default' - 需要恢复的流处理标签 @throws {KeyError} - 当传入的流标识不存在时抛出该异常 """ self._stream_list_lock.acquire() try: if stream_tag not in self._stream_list.keys(): # 流标识不存在 raise KeyError(u'Unknow stream_tag!') # 设置暂停标签 self._stream_list_tag[stream_tag] = (self._stream_list_tag[stream_tag][0], False) finally: self._stream_list_lock.release()
[文档] def stop_stream_force(self, is_wait=True): """ 强制关闭当前所有正在处理的流 @param {bool} is_wait=True - 是否等待所有流关闭后再返回 """ self._force_stop_tag = True if is_wait: # 检查是否都已停止 while True: if len(self._stream_list_tag.keys()) == 0: break sleep(0.01)
[文档] def seek(self, position, stream_tag='default'): """ 移动到流的指定位置 @param {int} position - 要移动到的位置(注意位置从0开始) @param {string} stream_tag='default' - 需要处理的流处理标签 @throws {KeyError} - 当传入的流标识不存在时抛出该异常 @throws {AttributeError} - 当流处理不支持向前移动,但操作需要向前移动,则抛出该异常 @throws {EOFError} - 当移动的位置超过流本身数据位置,抛出EOFError异常 """ if not self._move_forward: # 不支持向前移动 raise AttributeError(u'This stream unsupport move forward!') self._stream_list_lock.acquire() try: if stream_tag not in self._stream_list.keys(): # 流处理标识不能重复 raise KeyError(u'Unknow stream_tag!') # 获取流对象 _stream_obj = self._stream_list[stream_tag] finally: self._stream_list_lock.release() # 执行移动 self._seek(stream_obj=_stream_obj, position=position)
[文档] def move_next(self, step=1, stream_tag='default'): """ 流从当前位置向后移动指定步数 @param {int} step=1 - 要移动的步数 @param {string} stream_tag='default' - 需要处理的流处理标签 @throws {KeyError} - 当传入的流标识不存在时抛出该异常 @throws {AttributeError} - 流不支持移动时,抛出该异常 @throws {EOFError} - 当移动的位置超过流本身数据位置,抛出EOFError异常 """ self._stream_list_lock.acquire() try: if stream_tag not in self._stream_list.keys(): # 流处理标识不能重复 raise KeyError(u'Unknow stream_tag!') # 获取流对象 _stream_obj = self._stream_list[stream_tag] finally: self._stream_list_lock.release() # 执行移动 self._move_next(stream_obj=_stream_obj, step=step)
[文档] def move_forward(self, step=1, stream_tag='default'): """ 流从当前位置向前移动指定步数 @param {int} step=1 - 要移动的步数 @param {string} stream_tag='default' - 需要处理的流处理标签 @throws {KeyError} - 当传入的流标识不存在时抛出该异常 @throws {AttributeError} - 流不支持移动时,抛出该异常 @throws {EOFError} - 当移动的位置超过流本身数据位置,抛出EOFError异常 """ self._stream_list_lock.acquire() try: if stream_tag not in self._stream_list.keys(): # 流处理标识不能重复 raise KeyError(u'stream_tag has exists!') # 获取流对象 _stream_obj = self._stream_list[stream_tag] finally: self._stream_list_lock.release() # 执行移动 self._move_forward(stream_obj=_stream_obj, step=step)
[文档] @classmethod def stream_decorator(cls, stop_by_excepiton=False, logger=None, dealer_exception_fun=None, stream_closed_fun=None, stream_tag='stream_dealer', is_sync=True, seek_position=None, move_next_step=None, move_forward_step=None): """ 流处理修饰函数, 通过该函数来简单实现流定义及处理 @param {bool} stop_by_excepiton=False - 当出现异常时是否中止流处理 @param {object} logger=None - 出现错误时进行error输出的日志类(需实现error方法),None代表不输出日志 @param {function} dealer_exception_fun=None - 流处理异常时执行的通知函数,函数有6个入参: stream_tag : string 流标识 stream_obj : object 流对象 deal_obj : object position : object 正在处理的流对象的位置 dealer_handle : fun 出现异常时所执行的处理函数对象 error_obj : object 异常对象,sys.exc_info() trace_str : string 异常的堆栈信息 @param {function} stream_closed_fun=None - 流处理结束时执行的通知函数,函数有2个入参: stream_tag : string 流标识 stream_obj : object 流对象 position : object 正在处理的流对象的位置 closed_status : EnumStreamClosedStatus 关闭状态 @param {string} stream_tag='stream_dealer' - 所启动的流处理标签,用于后续调用stop_stream的时候使用 @param {bool} is_sync=True - True-同步完成,待流结束后才退出函数; False-异步处理,新启动线程执行流处理,函数直接返回 @param {int} seek_position=None - 执行流处理前先移动到指定的位置(与move_next_step、move_forward_step不能共存) @param {int} move_next_step=None - 执行流处理前先向后移动指定步数(seek_position、move_forward_step不能共存) @param {int} move_forward_step=None - 执行流处理前先向前移动指定步数(与move_next_step、seek_position不能共存) @example @BaseStream.stream_decorator(stop_by_excepiton=True) def dealer_fun(deal_obj, position, **kwargs): # 进行流对象处理,deal_obj为传入的流对象,kwargs为函数自身的传入参数 pass # 然后在实际要执行流处理的地方,启动流处理 dealer_fun(None, 0, key1=1, key2=2) """ def dealer(func): def dealer_args(deal_obj, position, **kwargs_dealer): # 打开流对象 _stream_obj = cls._init_stream(**kwargs_dealer) # 处理流位置 if seek_position is not None: cls._seek(stream_obj=_stream_obj, position=seek_position) elif move_next_step is not None: cls._move_next(stream_obj=_stream_obj, step=move_next_step) elif move_forward_step is not None: cls._move_forward(stream_obj=_stream_obj, step=move_forward_step) if is_sync: # 同步模式,直接处理流 cls._stream_deal_fun_decorator(tid=0, stream_obj=_stream_obj, stop_by_excepiton=stop_by_excepiton, logger=logger, dealer_exception_fun=dealer_exception_fun, stream_closed_fun=stream_closed_fun, stream_tag=stream_tag, dealer_fun=func, **kwargs_dealer) else: # 异步模式,通过线程方式处理 _dealer_thread = threading.Thread( target=cls._stream_deal_fun_decorator, args=(1, _stream_obj, stop_by_excepiton, logger, dealer_exception_fun, stream_closed_fun, stream_tag, func), kwargs=kwargs_dealer, name='Thread-Decorator-Deal-Fun' ) _dealer_thread.setDaemon(True) _dealer_thread.start() return dealer_args return dealer
[文档]class StringStream(BaseStream): """ 字符串流, 继承BaseStream,实现字符串的流处理 @param {bool} stop_by_excepiton=False - 当出现异常时是否中止流处理 @param {object} logger=None - 出现错误时进行error输出的日志类(需实现error方法),None代表不输出日志 @param {function} dealer_exception_fun=None - 流处理异常时执行的通知函数,函数有6个入参: stream_tag : string 流标识 stream_obj : object 流对象 deal_obj : object 正在处理的流对象 position : object 正在处理的流对象的位置 dealer_handle : fun 出现异常时所执行的处理函数对象 error_obj : object 异常对象,sys.exc_info() trace_str : string 异常的堆栈信息 @param {function} stream_closed_fun=None - 流处理结束时执行的通知函数,函数有2个入参: stream_tag : string 流标识 stream_obj : object 流对象 position : object 正在处理的流对象的位置 closed_status : EnumStreamClosedStatus 关闭状态 @example 1、使用实例对象的方法(str_obj为要处理的字符串) _stream = StringStream(stop_by_excepiton=False, logger=None, dealer_exception_fun=None, stream_closed_fun=None) _stream.add_dealer(dealer_fun1, dealer_fun2, ....) _stream.start_stream(stream_tag='default', is_sync=True, is_pause=False, seek_position=None, move_next_step=None, move_forward_step=None, str_obj='my test string') 2、使用修饰符的方法(str_obj为要处理的字符串) @StringStream.stream_decorator(stop_by_excepiton=False, logger=None, dealer_exception_fun=None, stream_closed_fun=None, stream_tag='stream_dealer', is_sync=True, seek_position=None, move_next_step=None, move_forward_step=None) def string_stream_dealer_fun(deal_obj=None, position='', str_obj='', self_para1='', self_para2=''): do stream deal # 启动流处理 string_stream_dealer_fun(None, 0, str_obj='test', self_para1='', self_para2='') """ ############################# # 重载构造函数 #############################
[文档] def __init__(self, stop_by_excepiton=False, logger=None, dealer_exception_fun=None, stream_closed_fun=None): """ 重载构造函数,去掉无需设置的参数 @param {bool} stop_by_excepiton=False - 当出现异常时是否中止流处理 @param {object} logger=None - 出现错误时进行error输出的日志类(需实现error方法),None代表不输出日志 @param {function} dealer_exception_fun=None - 流处理异常时执行的通知函数,函数有6个入参: stream_tag : string 流标识 stream_obj : object 流对象 deal_obj : object 正在处理的流对象 position : object 正在处理的流对象的位置 dealer_handle : fun 出现异常时所执行的处理函数对象 error_obj : object 异常对象,sys.exc_info() trace_str : string 异常的堆栈信息 @param {function} stream_closed_fun=None - 流处理结束时执行的通知函数,函数有2个入参: stream_tag : string 流标识 stream_obj : object 流对象 position : object 正在处理的流对象的位置 closed_status : EnumStreamClosedStatus 关闭状态 """ BaseStream.__init__(self, back_forward=True, keep_wait_data=False, stop_by_excepiton=stop_by_excepiton, logger=logger, dealer_exception_fun=dealer_exception_fun, stream_closed_fun=stream_closed_fun)
############################# # 需继承类实现的内部处理函数 ############################# @staticmethod def _init_stream(**kwargs): """ 根据传入参数初始化流对象(实现类自定义,也可以是标识),基类将保留该对象并供后续流处理函数调用 @param {string} str_obj - 需进行流处理的字符串对象 @returns {object} - 返回具有两个属性的object对象: obj : string 流处理对象 pos : int 流当前位置 """ _stream_obj = NullObj() _stream_obj.obj = kwargs['str_obj'] _stream_obj.pos = 0 return _stream_obj @staticmethod def _next(stream_obj): """ 从流中获取下一个对象,并将流指针指向下一个位置 @param {object} stream_obj - _init_stream生成的流对象 @returns {string} - 获取到的下一个位置的字符 @throws {StopIteration} - 如果到了流结尾,抛出该异常 """ if stream_obj.pos + 1 > len(stream_obj.obj): # 已经到结尾了 raise StopIteration # 更新位置 stream_obj.pos = stream_obj.pos + 1 return stream_obj.obj[stream_obj.pos - 1: stream_obj.pos] @staticmethod def _close_stream(stream_obj): """ 关闭流对象(与_init_stream对应),在中止流处理时调用 @param {object} stream_obj - _init_stream生成的流对象 """ del stream_obj @staticmethod def _seek(stream_obj, position): """ 移动到流的指定位置 @param {object} stream_obj - _init_stream生成的流对象 @param {int} position - 要移动到的位置(注意位置从0开始) @throws {AttributeError} - 当流处理不支持向前移动,但操作需要向前移动,则抛出该异常 @throws {EOFError} - 当移动的位置超过流本身数据位置,抛出EOFError异常 """ if position < 0 or position >= len(stream_obj.obj): # 已经超过结尾 raise EOFError(u'Position not legal!') # 设置位置 stream_obj.pos = position @staticmethod def _move_next(stream_obj, step=1): """ 流从当前位置向后移动指定步数 @param {object} stream_obj - _init_stream生成的流对象 @param {int} step=1 - 要移动的步数 @throws {AttributeError} - 当流处理不支持向前移动,但操作需要向前移动,则抛出该异常 @throws {EOFError} - 当移动的位置超过流本身数据位置,抛出EOFError异常 """ _pos = stream_obj.pos + step if _pos < 0 or _pos >= len(stream_obj.obj): # 已经超过结尾 raise EOFError(u'Position not legal!') # 设置位置 stream_obj.pos = _pos @staticmethod def _move_forward(stream_obj, step=1): """ 流从当前位置向前移动指定步数 @param {object} stream_obj - _init_stream生成的流对象 @param {int} step=1 - 要移动的步数 @throws {AttributeError} - 当流处理不支持向前移动,但操作需要向前移动,则抛出该异常 @throws {EOFError} - 当移动的位置超过流本身数据位置,抛出EOFError异常 """ _pos = stream_obj.pos - step if _pos < 0 or _pos >= len(stream_obj.obj): # 已经超过结尾 raise EOFError(u'Position not legal!') # 设置位置 stream_obj.pos = _pos @staticmethod def _current_position(stream_obj): """ 获取当前流的位置信息 @param {object} stream_obj - _init_stream生成的流对象 @returns {int} - 返回流对象的当前位置 """ return stream_obj.pos
if __name__ == '__main__': # 当程序自己独立运行时执行的操作 # 打印版本信息 print(('模块名: %s - %s\n' '作者: %s\n' '发布日期: %s\n' '版本: %s' % (__MOUDLE__, __DESCRIPT__, __AUTHOR__, __PUBLISH__, __VERSION__)))