parallel模块说明¶
模块名:HiveNetCore.parallel
parallel是并行任务(多线程、多进程)的处理框架和实现,包括并行任务处理,以及并行池(类似线程池或进程池)管理的封装,同时该框架可以支持扩展为分布式的并行任务处理(按框架定义实现对应的实现类即可)。
使用该框架的最主要优点为:
1、多线程、多进程接口统一,可直接实现多进程或多线程的直接切换(无需修改代码);
2、简化多线程和多进程的代码实现,且实现了函数执行回调函数的支持;此外遵循HiveNet 日志规范(hivenet_log_standards);
3、框架可支持实现分布式计算并行任务的自定义扩展。
ParallelLockFw(并行锁框架)¶
定义了并行任务所需的锁对象框架,该框架与Python自带的threading.RLock定义兼容,目前已直接利用Python原生类实现了该框架的线程锁实例(ThreadParallelLock)、进程锁实例(ProcessParallelLock),分别可以针对多线程、多进程的锁控制。后续也可根据需要基于框架(ParallelLockFw)实现分布计算锁,来支持分布式计算的锁控制需求。
使用ParallelLockFw具体实例的简单示例如下:
ParallelFw(并行任务框架)¶
定义了并行任务处理的框架(例如多线程、多进程的任务发起),目前已实现了多线程任务实例(ThreadParallel)、多进程任务实例(ProcessParallel)。后续也可以根据需要基于框架(ParallelFw)实现分布式任务处理实例,来支持分布式计算的并行任务处理需求。
支持的方法包括:
start - 启动并行池任务处理
stop - 停止并行池任务处理
pause - 暂停并行池任务处理
resume - 恢复并行池任务处理
并行任务框架的应用的关键在于对象初始化,支持的参数如下:
@param {fuction} deal_fun - 并发处理主函数,按顺序入参,可以有返回值
@param {tuple} run_args=None - 并发处理主函数的入参列表
@param {dict} run_kwargs=None - 并发处理主函数的动态入参列表
@param {bool} auto_start=False - 是否创建时自动发起并发任务
@param {string} pid='' - 并发对象的id
@param {string} pname='' - 并发对象的标识名
@param {ParallelLockFw} lock=None - 并发锁对象,控制多个并发对象根据该锁控制单一处理
@param {fuction} callback_fun=None - 回调函数,在执行完函数后执行,定义如下
fun(id, name, call_result, deal_fun_ret)
id - 并发对象的id
name - 并发对象的标识名
call_result - 线程执行的结果,CResult对象,如果执行无异常返回'00000';
如果发生异常,返回'21399',并登记异常信息
deal_fun_ret - deal_fun函数执行的返回值
@param {bool} set_daemon=False - 是否设置守护,如果设置守护,则在主进程中使用join方法等待所有并发完成,
否则主进程结束后并发执行的结果不可预知;如果不设置守护,主进程结束后并发任务仍会执行
@param {Logger} logger=None - 日志对象,如果为None代表不需要输出日志,传入对象需满足:
1、标准logging的logger对象
2、自定义的日志类对象,但应实现info、warning、error等标准方法
@param {bool} is_use_global_logger=True - 当logger=None时,是否使用全局logger对象
注:通过RunTool.set_global_logger进行设置
@param {int} log_level=logging.INFO - 打印日志的级别
@param {bool} use_distributed_logger=False - 是否使用分布式logger,如果是,则每个分布任务自行创建独立logger记录日志
注:对于多进程及分布式并发任务,应采取该日志模式
@param {string} distributed_logger_module_name='' - 分布式日志类模块名
@param {string} distributed_logger_class_name='' - 分布式日志类类名
@param {tuple} distributed_logger_args=None - 分布式日志类创建参数
@param {dict} distributed_logger_kwargs=None - 分布式日志类创建参数
@param {dict} distributed_logger_replace_para=dict() - 分布式日志动态参数替换,在每创建一个并发任务通过该参数修改日志参数
key - 如果是int类型,代表替换distributed_logger_args的第几个参数;如果是string,代表替换distributed_logger_kwargs的指定参数
value - 替换参数字典,key为要替换正则表达式字符,value为要替换的动态值
要替换的动态值,可选值有以下几种:
'pid' - 并发任务id
'pname' - 并发任务名
'pocess_id' - 进程ID
'thread_id' - 线程ID - 暂不支持
@param {bool} is_logger_to_deal_fun=False - 是否传递并发任务logger到deal_fun中
注意:传递通过kwargs,参数名为logger
该框架的几个特性说明如下:
1、并发任务处理函数(deal_fun)支持返回值,返回值(或执行过程的异常信息)可在处理函数执行完成后,通过回调函数(callback_fun)通知回调用方;
2、可在定义中直接传入并发任务锁(lock),框架会保证同一锁发起的并发任务基于该所控制并发;
3、并发任务的日志记录支持单一日志对象(多进程情况不支持)及分布式日志对象(每个任务自行初始化一个新的日志对象),可通过distributed_logger相关参数实现各种情况的日志输出支持;此外也可支持将日志对象传递到执行函数中(is_logger_to_deal_fun参数),执行函数可以复用该日志对象进行日志的输出;
使用ParallelFw具体实例的简单示例如下:
# 定义主进程使用的logger
_logger = HiveNetCore.logging_hivenet.Logger(conf_file_name=None, logger_name=HiveNetCore.logging_hivenet.EnumLoggerName.Console,
config_type=HiveNetCore.logging_hivenet.EnumLoggerConfigType.JSON_STR)
# 定义并发任务的logger创建参数
_logger_kwargs = {
'conf_file_name': None,
'logger_name': HiveNetCore.logging_hivenet.EnumLoggerName.Console,
'config_type': HiveNetCore.logging_hivenet.EnumLoggerConfigType.JSON_STR
}
# 定义并发任务处理函数
def demo_deal_fun(a, b, c, p1='', p2=''):
print('demo_deal_fun print: a=%s, b=%s, c=%s, p1=%s, p2=%s' % (
str(a), str(b), str(c), str(p1), str(p2)
))
time.sleep(0.1)
_ret = ('demo_deal_fun:r1-%s' % (str(a)), 'demo_deal_fun:r2')
print('demo_deal_fun print: return=%s' % (str(_ret)))
return _ret
# 定义并发任务回调函数
def demo_callback(id, name, call_result, deal_fun_ret):
print('demo_callback print: id=%s, name=%s, call_result=%s, deal_fun_ret=%s' % (
str(id), str(name), str(call_result), str(deal_fun_ret)
))
return
# 初始化并发任务对象
_t1 = ProcessParallel(
deal_fun=demo_deal_fun, run_args=('demo2-a-1', 'demo2-b-1', 'demo2-c-1'),
run_kwargs={'p1': 'demo2-p1-1', 'p2': 'demo2-p2-1'}, pid='demo2-t1', pname='demo2-t1-name',
callback_fun=demo_callback, set_daemon=True, logger=_logger,
use_distributed_logger=True, distributed_logger_module_name='HiveNetCore.logging_hivenet',
distributed_logger_class_name='Logger',
distributed_logger_args=None, distributed_logger_kwargs=_logger_kwargs
)
# 执行并发任务
_t1.start()
# 等待任务完成
_t1.join()
# 如果过程中想强行中止并发任务
_t1.force_stop()
ParallelPool(并行池管理类)¶
实现了统一的并行池(线程池、进程池的管理),实现池中任务的创建、销毁、启动、暂停等管理,同时直接支持队列方式的消费者任务处理。该类可以直接支持后续扩展其他并行任务的管理(如分布式并行任务,只要并行任务基于以上的框架实现)。
并行池的应用的关键在于对象初始化,支持的参数如下:
@param {fuction} deal_fun - 并发任务处理主函数,按顺序入参,可以有返回值
注:该函数内部需自行实现获取数据并处理的流程,但约定如果无处理数据,函数应返回None(用于并发池判断是否释放任务)
@param {ParallelFw} parallel_class=None - 并行任务类定义对象,获取方法如下:
getattr(ImportTool.import_module('HiveNetCore.parallel'), 'ThreadParallel')
@param {tuple} run_args=None - 并发任务处理主函数的入参列表
@param {dict} run_kwargs=None - 并发任务处理主函数的动态入参列表
@param {string} pname='' - 并发任务处理主函数的标识名
@param {ParallelLockFw} lock=None - 并发锁对象,控制多个并发对象根据该锁控制单一处理
@param {fuction} callback_fun=None - 回调函数,在执行完函数后执行,定义如下
fun(id, name, call_result, deal_fun_ret)
id - 并发对象的id
name - 并发对象的标识名
call_result - 线程执行的结果,CResult对象,如果执行无异常返回'00000';
如果发生异常,返回'21399',并登记异常信息
deal_fun_ret - deal_fun函数执行的返回值
@param {Logger} logger=None - 日志对象,如果为None代表不需要输出日志,传入对象需满足:
1、标准logging的logger对象
2、自定义的日志类对象,但应实现info、warning、error等标准方法
@param {bool} is_use_global_logger=True - 当logger=None时,是否使用全局logger对象
注:通过RunTool.set_global_logger进行设置
@param {int} log_level=logging.INFO - 打印日志的级别
@param {bool} use_distributed_logger=False - 是否使用分布式logger,如果是,则每个分布任务自行创建独立logger记录日志
注:对于多进程及分布式并发任务,应采取该日志模式
@param {string} distributed_logger_module_name='' - 分布式日志类模块名
@param {string} distributed_logger_class_name='' - 分布式日志类类名
@param {tuple} distributed_logger_args=None - 分布式日志类创建参数
@param {dict} distributed_logger_kwargs=None - 分布式日志类创建参数
@param {dict} distributed_logger_replace_para=dict() - 分布式日志动态参数替换,在每创建一个并发任务通过该参数修改日志参数
key - 如果是int类型,代表替换distributed_logger_args的第几个参数;如果是string,代表替换distributed_logger_kwargs的指定参数
value - 替换参数字典,key为要替换正则表达式字符,value为要替换的动态值
要替换的动态值,可选值有以下几种:
'pid' - 并发任务id
'pname' - 并发任务名
'pocess_id' - 进程ID
'thread_id' - 线程ID
@param {bool} is_logger_to_deal_fun=False - 是否传递并发任务logger到deal_fun中
注意:传递通过kwargs,参数名为logger
@param {bool} auto_start=False - 是否自动启动并发池
@param {bool} auto_stop=False - 是否自动关闭并发池(当任务都已全部完成处理)
@param {QueueFw} task_queue=None - 并发池需要处理的任务队列
注:如果有指定队列,get_task_num_fun参数无效,则自动根据队列长度检查待处理任务
@param {function} get_task_num_fun=None - 获取待处理任务数量的函数
注:如果task_queue和get_task_num_fun均为None,则直接创建最大数量的线程数,且不释放空闲任务
@param {list} get_task_num_fun_args=None - 获取待处理任务数量的函数,的入参列表
@param {int} maxsize=10 - 并发池最大并发对象数
@param {int} minsize=0 - 并发池最小并发对象数
@param {number} worker_release_time=10 - 空闲工作并发对象释放时间,单位为秒, 0代表不控制空闲释放
@param {number} worker_overtime=0 - 正在执行的任务超时时间(秒),0代表不控制超时
注:对于超时执行任务,将视为失效任务,可以选择直接忽略或强制中止
@param {bool} force_kill_overtime_worker=False - 是否强制中止失效任务
@param {bool} replace_overtime_worker=False - 是否创建新任务替代超时任务
注:仅当force_kill_overtime_worker=False时才会进行替代
@param {number} daemon_thread_time=0.01 - 守护线程的间隔时间
@param {ParallelShareDictFw} sharedict_class=None - 进程间共享字典对象的类对象,获取方法如下:
getattr(ImportTool.import_module('HiveNetCore.parallel'), 'ThreadParallelShareDict')
@param {ParallelLockFw} parallel_lock_class=None - 进程间锁对象的类对象,获取方法如下:
getattr(ImportTool.import_module('HiveNetCore.parallel'), 'ThreadParallelLock')
该框架的几个特性说明如下:
1、支持多种并行任务类型,可通过参数指定具体的并行任务类型(parallel_class,注意要取到的是该类型的定义对象,通过getattr获取,后续基于该对象动态创建并行任务,例如多线程还是多进程);
2、支持指定队列发起并行任务(task_queue,队列对象应为Python原生队列对象,或基于HiveNetCore.queue_hivenet.QueueFw扩展的队列对象);如果需要执行固定数量的任务,可以将任务放入队列后,指定自动结束参数(auto_stop),再启动并发池,并发池将在任务完成后自行结束;
3、支持自动释放空闲并发任务(worker_release_time),减少并发任务空跑导致的性能消耗;
4、支持处理超时任务(worker_overtime),可强制中止超时任务(force_kill_overtime_worker),或启动新任务替代某一超时任务(replace_overtime_worker),避免超时任务堵塞了任务队列的处理;
使用ParallelPool的简单示例如下:
# 定义并行池的处理函数,注意示例中q为支持进程共享访问的队列
def _deal_fun_demo5(q, a, p1=''):
try:
obj = q.get(block=False)
print('_deal_fun_demo5: a=%s, p1=%s, qobj=%s' % (str(a), str(p1), str(obj)))
time.sleep(1)
return True
except Exception as e:
print('_deal_fun_demo5 error: a=%s, p1=%s, error=%s' % (str(a), str(p1), str(type(e))))
time.sleep(1)
return None
def demo():
# 定义要处理的队列
_task_queue = multiprocessing.Queue()
_i = 0
while _i < 100:
_task_queue.put(_i)
_i = _i + 1
# 定义进程池,注意大部分参数与ParallelFw一致
_pool = ParallelPool(
deal_fun=_deal_fun_demo5,
parallel_class=getattr(ImportTool.import_module('HiveNetCore.parallel'), 'ProcessParallel'),
run_args=(_task_queue, 'demo5-a'), run_kwargs={'p1': 'demo5-p1'},
pname='demo5', callback_fun=demo_callback,
logger=_logger,
use_distributed_logger=True, distributed_logger_module_name='HiveNetCore.logging_hivenet',
distributed_logger_class_name='Logger',
distributed_logger_args=None, distributed_logger_kwargs=_logger_kwargs,
auto_start=False, auto_stop=True, task_queue=_task_queue, get_task_num_fun=None, get_task_num_fun_args=None,
maxsize=10, minsize=0, worker_release_time=10, worker_overtime=0,
force_kill_overtime_worker=False, replace_overtime_worker=False, daemon_thread_time=0.01,
sharedict_class=getattr(ImportTool.import_module('HiveNetCore.parallel'), 'ProcessParallelShareDict'),
parallel_lock_class=getattr(ImportTool.import_module('HiveNetCore.parallel'), 'ProcessParallelLock')
)
# 启动并行池,该示例定义了自动结束
_pool.start()
while not _pool.is_stop:
time.sleep(1)