HiveNetBuildTool.plugins.processer_get_sys_infos 源代码

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
# Copyright 2022 黎慧剑
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
构建工具管道插件-获取系统信息并放入上下文

@module processer_get_sys_infos
@file processer_get_sys_infos.py
"""
import os
from HiveNetCore.utils.import_tool import DynamicLibManager
from HiveNetPipeline import PipelineProcesser
from HiveNetBuildTool.build import BuildPipeline


[文档]class ProcesserBuildGetSysInfos(PipelineProcesser): """ 获取系统信息(获取并放置到管道上下文的配置) 建议节点配置标识(current_key): getSysInfos 配置说明: Item Key: 获取信息唯一标识(自定义) infoType: str, 信息类型标识(extend_para.yaml中ProcesserGetSysInfos下的信息类型) 注: 如果不设置代表使用Item Key作为信息类型标识 getKey: str, 自定义写入上下文的信息获取key, 如果不设置则使用extend_para.yaml中对应信息类型的默认值 args: list, 调用获取信息函数的固定位置入参, 根据实际extend_para.yaml中对应信息类型的获取函数要求传参 kwargs: dict, 调用获取信息函数的key-value入参, 根据实际extend_para.yaml中对应信息类型的获取函数要求传参 """
[文档] @classmethod def processer_name(cls) -> str: """ 处理器名称,唯一标识处理器 @returns {str} - 当前处理器名称 """ return 'ProcesserBuildGetSysInfos'
[文档] @classmethod def execute(cls, input_data, context: dict, pipeline_obj, run_id: str): """ 执行处理 @param {object} input_data - 处理器输入数据值,除第一个处理器外,该信息为上一个处理器的输出值 @param {dict} context - 传递上下文,该字典信息将在整个管道处理过程中一直向下传递,可以在处理器中改变该上下文信息 @param {Pipeline} pipeline_obj - 管道对象 @returns {object} - 处理结果输出数据值, 供下一个处理器处理, 异步执行的情况返回None """ # 获取当前要处理的标识 _current_key = context.get('current_key', 'getSysInfos') _context_set_key = context.get('sys_infos_set_key', 'sysInfos') # 要设置到上下文的系统信息key _config = context['build_config'].get(_current_key, None) # 获取不到配置, 不处理 if _config is None: return input_data # 获取扩展参数配置 _extend_para = BuildPipeline.get_processer_extend_para(cls.processer_name(), default={}) # 初始化插件管理模块 _lib_manager = DynamicLibManager(os.getcwd()) # 遍历参数查询可支持的系统信息 _infos = {} for _get_id, _get_para in _config.items(): if _get_para is None: _get_para = {} # 获取信息查询函数 _info_type = _get_para.get('infoType', _get_id) _info_para = _extend_para.get(_info_type, None) if _info_para is None: raise ModuleNotFoundError('Info type [%s] not found in extend para!' % _info_type) _func = _info_para.get('func', None) if _func is None: _func = _lib_manager.load_by_config(_info_para['libConfig']) if not callable(_func): raise AttributeError('Info type [%s] is not callable!' % _info_type) _info_para['func'] = _func # 执行函数 _get_key = _get_para.get('getKey', _info_para.get('getKey', _info_type)) _infos[_get_key] = _func( *_get_para.get('args', []), **_get_para.get('kwargs', {}) ) # 将获取到的参数放入上下文 context[_context_set_key] = _infos # 返回输出结果 return input_data