HiveNetCore.utils.string_tool 源代码

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
#
# Copyright 2018 黎慧剑
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

"""
字符串处理模块

@module string_tool
@file string_tool.py

"""
import os
import sys
import json
import re
from typing import Any
from random import Random
import dicttoxml
from lxml import etree
# 根据当前文件路径将包路径纳入, 在非安装的情况下可以引用到
sys.path.append(os.path.abspath(os.path.join(
    os.path.dirname(__file__), os.path.pardir, os.path.pardir)))
from HiveNetCore.generic import CResult, NullObj


__MOUDLE__ = 'string_tool'  # 模块名
__DESCRIPT__ = u'字符串处理模块'  # 模块描述
__VERSION__ = '0.1.0'  # 版本
__AUTHOR__ = u'黎慧剑'  # 作者
__PUBLISH__ = '2018.08.29'  # 发布日期


[文档]class JsonHiveNetEncoder(json.JSONEncoder): """ 支持对bytes对象转换为json字符串的Json编码器 """
[文档] def default(self, o: Any) -> Any: if isinstance(o, bytes): # 字节数组的转换支持 return { "__extend_type__": "bytes", "value": o.hex() } elif isinstance(o, CResult): # 对CResult的支持 return { "__extend_type__": "CResult", "value": NullObj.get_object_attr_dict(o, ignored_key=['_i18n_obj', ]) } else: # 其他情况使用默认处理 return super().default(o)
[文档]class JsonHiveNetDecoder(json.JSONDecoder): """ 支持将bytes编码的json字符串转换回bytes数组 """ def __init__(self): json.JSONDecoder.__init__(self, object_hook=JsonHiveNetDecoder.extend_dict)
[文档] @staticmethod def extend_dict(d: dict) -> Any: """ 处理扩展类型的字典解码 @param {dict} d - 扩展类型字典 @returns {Any} - 返回真实对象 """ if isinstance(d, dict): _extend_type = d.get('__extend_type__', None) if _extend_type == 'bytes': return bytes.fromhex(d['value']) elif _extend_type == 'CResult': _value = d['value'] # 初始化对象 _ret = CResult( code=_value.pop('code', '00000'), msg=_value.pop('msg', None), error=_value.pop('error', ''), trace_str=_value.pop('trace_str', ''), i18n_msg_paras=_value.pop('i18n_msg_paras', ()) ) # 附加值 for _name, _val in _value.items(): setattr(_ret, _name, _val) return _ret else: return d else: return d
[文档]class StringTool(object): """ 字符串处理通用类 提供各类字符串处理相关的常用工具函数( 静态方法) """ ############################# # 版本号处理 #############################
[文档] @classmethod def version_cmp(cls, v1: str, v2: str) -> str: """ 比较两个版本号字符串 @param {str} v1 - 版本号1, 例如'3.4.0'或'v3.4.0' @param {str} v2 - 版本号2, 例如'3.4.0'或'v3.4.0' @returns {str} - v1与v2的大小关系, '>', '<', '=' """ _d1 = re.split(r'\.', v1.lstrip('vV')) _d2 = re.split(r'\.', v2.lstrip('vV')) _d1 = [int(_d1[i]) for i in range(len(_d1))] _d2 = [int(_d2[i]) for i in range(len(_d2))] if(_d1 > _d2): return '>' elif(_d1 < _d2): return '<' else: return '='
############################# # 变量名处理 #############################
[文档] @classmethod def pascal_case_to_snake_case(cls, camel_case: str) -> str: """ 驼峰(帕斯卡)变量命名转蛇形 (支持大小驼峰模式) @param {str} camel_case - 驼峰(帕斯卡)变量名形式(MyVarName或myVarName) @returns {str} - 转换后的蛇形变量名(my_var_name) """ snake_case = re.sub(r"(?P<key>[A-Z])", r"_\g<key>", camel_case) return snake_case.lower().strip('_')
[文档] @classmethod def snake_case_to_big_pascal_case(cls, snake_case: str) -> str: """ 蛇形变量名转大驼峰(帕斯卡) @param {str} snake_case - 蛇形变量名(my_var_name) @returns {str} - 大驼峰(帕斯卡)变量名形式(MyVarName) """ words = snake_case.split('_') return ''.join(word.title() for word in words)
[文档] @classmethod def snake_case_to_little_pascal_case(cls, snake_case: str) -> str: """ 蛇形变量名转小驼峰(帕斯卡) @param {str} snake_case - 蛇形变量名(my_var_name) @returns {str} - 小驼峰(帕斯卡)变量名形式(myVarName) """ words = snake_case.split('_') return '%s%s' % (words[0], ''.join(word.title() for word in words[1:]))
############################# # 哈希转换 #############################
[文档] @staticmethod def bytes_to_hex(byte_array): """ 将byte串转换为哈希字符串 @param {byte[]} byte_array - 需要转换的byte数组 @returns {string} - 转换后的hex字符串 @example StringTool.bytes_to_hex(bytes("test string", encoding='utf-8')) """ return ''.join(["%02X" % x for x in byte_array]).strip()
[文档] @staticmethod def hex_to_bytes(hex_str): """ 将哈希字符串转换为byte数组 @param {string} hex_str - 需要转换的Hex样式的字符串 @returns {byte[]} - byte数组 @example StringTool.hex_to_bytes("A3D3F33433") """ return bytes.fromhex(hex_str)
############################# # 字符处理 #############################
[文档] @staticmethod def fill_fix_string(deal_str, fix_len, fill_char, left=True): """ 用指定字符填充字符串达到固定长度 @param {string} deal_str - 要处理的字符串 @param {int} fix_len - 返回字符串的固定长度 @param {string} fill_char - 填充字符(单字符) @param {bool} left=True - 填充方向, True-左填充, False-右填充 @returns {string} - 如果原字符串长度已超过指定长度, 则直接返回原字符串; 否则返回处理后的字符串 @example fix_str = StringTool.fill_fix_string('My job is', 50, ' ', False) """ _str = str(deal_str) # 生成填充串 _mixstr = "" _i = 0 while _i < fix_len - len(_str): _mixstr = _mixstr + fill_char _i = _i + 1 # 按方向填充 if left: return _mixstr + _str else: return _str + _mixstr
[文档] @staticmethod def get_list_from_str(deal_str): """ 从字符串中提炼出数组 按照python的模式提炼出数组, 说明如下: 1、数组内的对象根据字符的形式取得实际类型, 例如: 'text' - 字符串 10 - 数字 True - bool类型 2、如果数组有嵌套, 可以支持嵌套的模式 @param {string} deal_str - 要提炼的字符串, 内部要含有[a,b,c,d,'d']这类的字符串, 例如'dfdfdfd[ddd,aa,dd]' @returns {list} - 抽离出来的数组 @example mylist = StringTool.get_list_from_str('aaa["a", 10, [39, 4], True, 21.4]bbb') """ _array = [] _index1 = deal_str.find("[") _index2 = deal_str.rfind("]") # 从后往前找 if _index2 <= _index1: return _array _str = deal_str[_index1:_index2 + 1] _array = eval(_str) return _array
[文档] @staticmethod def get_random_str(random_length=8, chars="AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789"): """ 随机生成固定长度的字符串 @param {int} random_length=8 - 需生成的字符串长度 @param {string} chars="AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789" - 随机抽取的字符串内容 @returns {string} - 返回的字符串 @example randomstr = StringTool.get_random_str(10) """ _str = '' length = len(chars) - 1 random = Random() for i in range(random_length): _str += chars[random.randint(0, length)] return _str
[文档] @staticmethod def get_n_index(src, sub, n=1, start=0): """ 查找第n次出现子字符串的位置 @param {string} src - 要处理的字符串 @param {string} sub - 要查找的子字符串 @param {int} [n=1] - 要查找的字符串出现次数 @param {int} [start=0] - 查找开始位置 @returns {int} - 返回找到的位置, 如果找不到返回-1 """ if n < 1: # 已找不到了 return -1 index = src.find(sub, start) if index != -1 and n > 1: return StringTool.get_n_index(src, sub, n - 1, index + len(sub)) return index
[文档] @classmethod def get_starts_same_len(cls, str_a: str, str_b: str) -> int: """ 比较两个字符串开始相同的字符个数 @param {str} str_a - 字符a @param {str} str_b - 字符b @returns {int} - 相同字符个数 """ _len = 0 try: while True: if str_a[_len] != str_b[_len]: return _len _len += 1 except: pass return _len
[文档] @classmethod def get_ends_same_len(cls, str_a: str, str_b: str) -> int: """ 比较两个字符串结束位置相同的字符个数 @param {str} str_a - 字符a @param {str} str_b - 字符b @returns {int} - 相同字符个数 """ _len = 0 try: while True: _pos = -1 - _len if str_a[_pos] != str_b[_pos]: return _len _len += 1 except: pass return _len
[文档] @classmethod def lines_count(cls, text: str) -> int: """ 统计文本行数 @param {str} text - 要统计的文本 @returns {int} - 返回行数 """ return text.count('\n') + 1
[文档] @classmethod def find_nth_index(cls, s_str: str, f_str: str, n: int = 0, overlap: bool = False) -> int: """ 查找指定字符串在某字符串中第n次出现的位置 @param {str} s_str - 源字符串 @param {str} f_str - 要查找的字符串 @param {int} n=0 - 指定第几次出现 @param {bool} overlap=False - 是否允许重叠情况 @returns {int} - 位置, 如果为-1代表没有找到 """ _f_len = 1 if overlap else len(f_str) i = -_f_len for _time in range(n): i = s_str.find(f_str, i + _f_len) if i < 0: break return i
############################# # 对象与字符串转换 #############################
[文档] @staticmethod def format_obj_property_str(deal_obj, is_deal_subobj=False, c_level=0, max_level=10, is_same_line=False): """ 将对象属性格式化为可打印字符串 @param {[type]} deal_obj - 要格式化的对象 @param {bool} is_deal_subobj=False - 是否要打印属性对象的子属性 @param {int} c_level=0 - 打印级别( 根据级别调整缩进位数, 每一级缩进2个空格) @param {int} max_level=10 - 最大检索级别, <=0代表不进行限制 @param {bool} is_same_line=False - 输出内容是否不换行, 内部使用, 如果不换行则忽略缩进 @returns {string} - 返回格式化后的字符串 @example obj = NullObj() obj.aa = 1 obj.cb = 'fdfd' obj.kk = NullObj() obj.kk.abc = 3 obj.kk.bcd = 'dfdfd' print(StringTools.format_obj_property_str(obj=obj,is_deal_subobj=True)) """ # 先打印对象自身 _indent_str = '' if not is_same_line: _indent_str = StringTool.fill_fix_string( deal_str='', fix_len=c_level * 2, fill_char=' ', left=True) _retstr = '%stype(%s) ' % ( _indent_str, type(deal_obj) ) if is_deal_subobj and (max_level <= 0 or (max_level > c_level)): # print("c_level:" + str(c_level)) _indent_str = StringTool.fill_fix_string( deal_str='', fix_len=(c_level + 1) * 2, fill_char=' ', left=True) # 要打印子对象,区分类型进行处理 if type(deal_obj) in (list, tuple): # 数组和列表 _index = 0 while _index < len(deal_obj): _retstr = ( _retstr + '\n' + _indent_str + '[index:' + str(_index) + '] ' + StringTool.format_obj_property_str( deal_obj[_index], is_deal_subobj=is_deal_subobj, c_level=c_level + 1, max_level=max_level, is_same_line=True ) ) _index = _index + 1 elif isinstance(deal_obj, dict): # 字典 for _key in deal_obj.keys(): _retstr = ( _retstr + '\n' + _indent_str + 'key: ' + str(_key) + ' value: ' + StringTool.format_obj_property_str( deal_obj[_key], is_deal_subobj=is_deal_subobj, c_level=c_level + 2, max_level=max_level, is_same_line=True ) ) else: # 一般对象, 直接类的属性, 通过dir获取, 且非内置属性 _attr_print = False _attr_dir = list() if str(deal_obj).find(' object at 0x') > 0: # 通过str判断是否有重载处理 _attr_print = True _attr_dir = dir(deal_obj) for _item in _attr_dir: if _item[0: 2] != '__' and not callable(getattr(deal_obj, _item)): _retstr = ( _retstr + "\n" + _indent_str + _item + '(attr): ' + StringTool.format_obj_property_str( getattr(deal_obj, _item), is_deal_subobj=is_deal_subobj, c_level=c_level + 2, max_level=max_level, is_same_line=True ) ) # 一般对象,object上补充的属性 try: for _item in deal_obj.__dict__.items(): if _attr_print and _item[0] not in _attr_dir: _retstr = ( _retstr + "\n" + _indent_str + _item[0] + '(__dict__): ' + StringTool.format_obj_property_str( _item[1], is_deal_subobj=is_deal_subobj, c_level=c_level + 2, max_level=max_level, is_same_line=True ) ) except: # 可能对象没有__dict__属性 _retstr = _retstr + str(deal_obj) else: # 不打印子对象 _retstr = _retstr + str(deal_obj) return _retstr
############################# # JSON相关 #############################
[文档] @staticmethod def json_dumps_hive_net(obj: Any, *, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, indent=None, separators=None, default=None, sort_keys=False, **kw) -> str: """ 将对象转换为json字符串(支持bytes数组及CResult) @param {Any} obj - 要处理的对象 @param {kwargs} - json.dumps的其他入参 @returns {str} - json字符串 """ # 增加对字节数组编码的支持 return json.dumps( obj, skipkeys=skipkeys, ensure_ascii=ensure_ascii, check_circular=check_circular, allow_nan=allow_nan, cls=JsonHiveNetEncoder, indent=indent, separators=separators, default=default, sort_keys=sort_keys, **kw )
[文档] @staticmethod def json_loads_hive_net(s, *, object_hook=None, parse_float=None, parse_int=None, parse_constant=None, object_pairs_hook=None, **kw) -> Any: """ 将json转换为Python对象(支持bytes数组及CResult) @param {str|bytes} s - 要转换的字符串或字节数组 @param {kwargs} - json.loads的其他入参 @returns {Any} - 转换后的python对象 """ # 增加对字节数组解码的支持 return json.loads( s, cls=JsonHiveNetDecoder, object_hook=object_hook, parse_float=parse_float, parse_int=parse_int, parse_constant=parse_constant, object_pairs_hook=object_pairs_hook, **kw )
[文档] @staticmethod def object_to_json(obj): """ 将python对象转换为json字符串(支持所有对象的通用转换) @param {object} obj - 要转换的json @return {string} - 转换后的json字符串, 如果是None返回'' @see 处理方式: 1、如果对象包含__json__方法, 则直接调用对象的该方法进行转换 2、尝试通过json库标准方法进行转换 3、如果对象不支持序列化, 转换会出现异常; 如果对象包含__dict__, 则将__dict__转换为json 4、如果都失败, 则抛出异常 """ _json_str = None if obj is None: _json_str = '' elif hasattr(obj, '__json__'): _json_str = obj.__json__() else: try: _json_str = json.dumps(obj, ensure_ascii=False) except Exception as e: if hasattr(obj, '__dict__'): _json_str = json.dumps(obj.__dict__, ensure_ascii=False) raise e return _json_str
[文档] @staticmethod def json_to_object(json_str, class_ref=None, object_hook=None): """ 将json字符串转换为python对象(支持自定义转换) @param {string} json_str - 要转换的json字符串 @param {class} class_ref=None - 类定义引用, 例如generic.NullObj 引用的类必须实现__fromjson__的静态函数, 传入json字符串, 返回对象实例 @param {function} object_hook=None - 将json对象转换为所需实例类的函数 函数入参为通过json标准库转换后的json对象, 出参为转换后的对象实例 @return {object} - 转换后的对象 @see 处理方式: 1、如果json_str为'', 直接返回None 2、如果object_hook不为空, 则通过json库标准方法进行转换, 并使用object_hook将对象转换为所需的类型 3、如果class_ref不为None并且有__fromjson__的静态函数, 则通过该函数获取对象, 如果没有该函数抛出异常 """ _obj = None if json_str == '': pass elif object_hook is not None: _obj = json.loads(json_str, object_hook=object_hook) elif class_ref is not None: _obj = class_ref.__fromjson__(json_str) else: _obj = json.loads(json_str) return _obj
[文档] @staticmethod def json_to_xml(json_str, root=False, custom_root='root', ids=False, attr_type=False, item_func=None, cdata=False): """ 将JSON字符串转换为XML字符串 @param {string} json_str - 要转换的JSON字符串 @param {bool} root=False - 转换后的xml是否通过root标签包含 @param {string} custom_root='root' - 自定义的根标签, 与root参数共同使用 @param {bool} ids=False - 指定每个xml元素是否产生唯一id @param {bool} attr_type=False - 指定每个元素是否有一个类型的标签属性, 就像<item type="str"> @param {fuction} item_func=None - 指定生成元素名的函数, 函数定义如下: func(parent) {return 'item-name'} @param {bool} cdata=False - 指定字符串值是否包在CDATA中 @return {string} - 转换后的XML字符串 """ if item_func is None: item_func = dicttoxml.default_item_func _dict = json.loads(json_str) return str( dicttoxml.dicttoxml(_dict, root=root, custom_root=custom_root, ids=ids, attr_type=attr_type, item_func=item_func, cdata=cdata), 'utf-8' )
[文档] @staticmethod def xml_to_dict(xml_str, item_name='item'): """ 将XML字符串转换为字典对象 @param {string} xml_str - 要转换的xml字符串 @param {string} item_name='item' - 标识是列表项的标签名 @return {string} - 转换后的dict字典 """ _has_xml_def = False if xml_str[0:5] == "<?xml": _has_xml_def = True # 生成xml对象 _xml_doc = None if _has_xml_def: # 有xml定义, 要转回二进制处理 _xml_doc = etree.fromstring(bytes(xml_str, 'utf-8')) else: _xml_doc = etree.fromstring(xml_str) # 遍历xml节点并进行处理 _dict = dict() _root = _xml_doc while _root is not None: _key, _value = StringTool._xml_node_addto_dict(_root, item_name=item_name) if _key is not None: # 是节点的情况才增加字典 _dict[_key] = _value _root = _root.getnext() return _dict
@staticmethod def _xml_node_addto_dict(node, item_name='item'): if not (hasattr(node, 'tag') and type(node.tag) == str): # 不是节点的情况( 例如注释) , 直接返回None return None, None _key = node.tag _value = None _is_list = False if 'type' not in node.attrib.keys() or node.attrib['type'] == 'dict': # 是一个新对象, 检查是否有子对象 _childs = node.getchildren() if len(_childs) > 0: # 有子节点 _value = dict() for childnode in node.getchildren(): _child_key, _child_value = StringTool._xml_node_addto_dict( childnode, item_name=item_name) # 加到字典里面 if _child_key is not None: _value[_child_key] = _child_value else: _value = node.text elif node.attrib['type'] == 'list': # 是一个列表 _value = list() _is_list = True elif node.attrib['type'] == 'tuple': # 是数组 _value = tuple() _is_list = True elif node.attrib['type'] == 'bool': _value = (node.text == 'true') elif node.attrib['type'] == 'int': _value = round(float(node.text)) elif node.attrib['type'] == 'float': _value = float(node.text) else: # 字符串 _value = node.text # 针对列表和数组的处理 if _is_list: for childnode in node.getchildren(): _child_key, _child_value = StringTool._xml_node_addto_dict( childnode, item_name=item_name) if _child_key != item_name: # 非列表项, 当作一个新字典看待 _child_value = {_child_key: _child_value} # 加入列表中 _value.append(_child_value) # 返回自身的key值和Value值 if _value is None: _value = '' return _key, _value
[文档] @staticmethod def xml_to_json(xml_str, item_name='item'): """ 将XML字符串转换为JSON字符串 由于xmltodict与dicttoxml不匹配, 所以自行实现相关代码 @param {string} xml_str - 要转换的xml字符串 @param {string} item_name='item' - 标识是列表项的标签名 @return {string} - 转换后的JSON字符串 """ _dict = StringTool.xml_to_dict(xml_str, item_name=item_name) # 遍历并进行转换处理 return StringTool.object_to_json(_dict)
if __name__ == '__main__': # 当程序自己独立运行时执行的操作 # 打印版本信息 print(('模块名: %s - %s\n' '作者: %s\n' '发布日期: %s\n' '版本: %s' % (__MOUDLE__, __DESCRIPT__, __AUTHOR__, __PUBLISH__, __VERSION__)))