#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
# Copyright 2022 黎慧剑
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
服务鉴权模块
@module auth
@file auth.py
"""
import sys
import os
import copy
import re
import json
import datetime
from typing import Any
from functools import wraps
from HiveNetCore.utils.run_tool import AsyncTools
from HiveNetCore.utils.string_tool import StringTool
# 根据当前文件路径将包路径纳入, 在非安装的情况下可以引用到
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)))
from HiveNetWebUtils.utils.cryptography import HCrypto
[文档]class AuthBaseFw(object):
"""
服务鉴权基础框架
"""
#############################
# 构造函数
#############################
[文档] def __init__(self, **kwargs):
"""
模块初始化函数
(需由实现类继承实现)
"""
pass
#############################
# 通用鉴权的修饰符处理函数
#############################
[文档] def auth_required(self, f=None):
"""
当前模块的鉴权修饰符函数
@param {function} f=None - 所执行的函数
"""
def auth_required_internal(f):
@wraps(f)
def decorated(*args, **kwargs):
# 执行实际处理函数函数
return AsyncTools.sync_run_coroutine(self.auth_required_call(f, *args, **kwargs))
return decorated
if f:
return auth_required_internal(f)
return auth_required_internal
[文档] async def auth_required_call(self, f, *args, **kwargs):
"""
直接执行的鉴权修饰符函数
@param {function} f - 要执行的函数
@param {args} - 执行函数的固定入参
@param {kwargs} - 执行函数的kv入参
@returns {Any} - 返回响应信息(如果执行成功返回函数返回信息, 如果执行失败返回验证失败信息)
"""
# 执行校验操作
_result = await AsyncTools.async_run_coroutine(self._auth_call(*args, **kwargs))
_is_auth_result = False
if not _result[0]:
# 校验失败, 获取校验失败的返回值
_ret = await AsyncTools.async_run_coroutine(
self._format_auth_resp(_result[1], _result[2])
)
_is_auth_result = True
else:
# 校验通过, 执行函数
_ret = await AsyncTools.async_run_coroutine(f(*args, **kwargs))
# 格式化响应对象并返回处理结果
_last_ret = await AsyncTools.async_run_coroutine(
self._format_last_resp(_ret, _is_auth_result)
)
return _last_ret
#############################
# 需实现类重载的函数
#############################
async def _auth_call(self, *args, **kwargs) -> tuple:
"""
真正的校验处理函数
@param {args} - 执行函数的固定入参
@param {kwargs} - 执行函数的kv入参
@returns {tuple} - 返回校验结果数组: (校验是否通过true/false, 错误码, 失败描述)
注: 错误码由实现类自行定义
"""
raise NotImplementedError()
def _format_auth_resp(self, code: Any, err_msg: str) -> Any:
"""
格式化校验结果返回值
@param {Any} code - 错误码
@param {str} err_msg - 失败描述
@returns {Any} - 格式化后的返回值
"""
raise NotImplementedError()
def _format_last_resp(self, resp: Any, is_auth_result: bool) -> Any:
"""
格式化最后的响应对象
@param {Any} resp - 最后的响应对象
@param {bool} is_auth_result - 是否服务鉴权所返回的结果
@returns {Any} - 转换以后的响应对象
"""
return resp
[文档]class IPAuth(AuthBaseFw):
"""
IP黑白名单模式验证模块
"""
#############################
# 构造函数
#############################
[文档] def __init__(self, **kwargs):
"""
IP黑白名单模式验证模块
@param {list} init_blacklist=None - 初始化的黑名单
名单可以使用通配符禁止某个网段, 例如 ['127.0.*.*', '138.*.*.*']
@param {list} init_whitelist=None - 初始化的白名单
@param {int} error_resp_status=403 - 验证失败返回状态码
@param {str|dict} error_resp_msg={'status': '10409', 'msg':'IP地址验证失败'} - 验证失败返回的信息
"""
self.para = kwargs
self.error_resp_status = self.para.get('error_resp_status', 403)
self.error_resp_msg = self.para.get(
'error_resp_msg', {'status': '10409', 'msg': 'IP地址验证失败'}
)
# 黑白名单管理
self.ip_dict = {
'blacklist': {
'show': list(), # 显示配置
'reg': dict() # 正则表的配置, key为show中的显示名, value为正则表达式规则
},
'whitelist': {
'show': list(), # 显示配置
'reg': dict() # 正则表的配置, key为show中的显示名, value为正则表达式规则
}
}
# 初始化黑白名单
if kwargs.get('init_blacklist', None) is not None:
self.add_blacklist(kwargs['init_blacklist'])
if kwargs.get('init_whitelist', None) is not None:
self.add_whitelist(kwargs['init_whitelist'])
#############################
# 黑白名单检查工具
#############################
[文档] def verify_blacklist(self, ip: str) -> bool:
"""
验证是否匹配黑名单
@param {str} ip - 要验证的ip
@returns {bool} - 检查结果, 匹配到返回True
"""
return self._verify_ip('blacklist', ip)
[文档] def verify_whitelist(self, ip: str) -> bool:
"""
验证是否匹配白名单
@param {str} ip - 要验证的ip
@returns {bool} - 检查结果, 匹配到返回True
"""
return self._verify_ip('whitelist', ip)
#############################
# 黑白名单维护工具
#############################
[文档] def add_blacklist(self, ips):
"""
添加黑名单
@param {str|list} ips - 要添加的ip或ip列表
"""
if type(ips) == str:
# 单个黑名单
self._add_list('blacklist', ips)
else:
# 多个黑名单
for _ip in ips:
self._add_list('blacklist', _ip)
[文档] def remove_blacklist(self, ips):
"""
删除黑名单
@param {str|list} ips - 要删除的ip或ip列表
"""
if type(ips) == str:
# 单个黑名单
self._remove_list('blacklist', ips)
else:
# 多个黑名单
for _ip in ips:
self._remove_list('blacklist', _ip)
[文档] def clear_blacklist(self):
"""
清除黑名单
"""
self.ip_dict['blacklist'].clear()
self.ip_dict['blacklist']['show'] = list()
self.ip_dict['blacklist']['reg'] = dict()
[文档] def add_whitelist(self, ips):
"""
添加白名单
@param {str|list} ips - 要添加的ip或ip列表
"""
if type(ips) == str:
# 单个黑名单
self._add_list('whitelist', ips)
else:
# 多个黑名单
for _ip in ips:
self._add_list('whitelist', _ip)
[文档] def remove_whitelist(self, ips):
"""
删除白名单
@param {str|list} ips - 要删除的ip或ip列表
"""
if type(ips) == str:
# 单个黑名单
self._remove_list('whitelist', ips)
else:
# 多个黑名单
for _ip in ips:
self._remove_list('whitelist', _ip)
[文档] def clear_whitelist(self):
"""
清除白名单
"""
self.ip_dict['whitelist'].clear()
self.ip_dict['whitelist']['show'] = list()
self.ip_dict['whitelist']['reg'] = dict()
#############################
# 内部函数
#############################
def _add_list(self, ip_type: str, ip: str):
"""
插入名单数据
@param {str} ip_type - 'blacklist' 或 'whitelist'
@param {str} ip - 要插入的ip地址
"""
_dict = self.ip_dict[ip_type]
if ip in _dict['show']:
# 名单已存在
return
# 加入显示名单
_dict['show'].append(ip)
# 生成名单的匹配正则表达式
if ip.find('*') >= 0:
# 需要生成正则表达式
_re = re.compile('^' + ip.replace('.', '\\.').replace('*', '.*') + '$')
_dict['reg'][ip] = _re
def _remove_list(self, ip_type: str, ip: str):
"""
删除名单数据
@param {str} ip_type - 'blacklist' 或 'whitelist'
@param {str} ip - 要删除的ip地址
"""
_dict = self.ip_dict[ip_type]
if ip in _dict['show']:
# 删除正则表达式
_dict['reg'].pop(ip, None)
# 删除显示ip
_dict['show'].remove(ip)
def _verify_ip(self, ip_type: str, ip: str) -> bool:
"""
检查ip是否匹配名单
@param {str} ip_type - 'blacklist' 或 'whitelist'
@param {str} ip - 要检查的ip地址
@returns {bool} - 检查结果, 匹配到返回True
"""
_dict = self.ip_dict[ip_type]
if ip in _dict['show']:
return True
# 遍历正则规则
for _re in _dict['reg'].values():
if _re.search(ip) is not None:
return True
# 没有匹配上
return False
#############################
# 重载基础框架的函数
#############################
async def _auth_call(self, *args, **kwargs) -> tuple:
"""
真正的校验处理函数
@param {args} - 执行函数的固定入参
@param {kwargs} - 执行函数的kv入参
@returns {tuple} - 返回校验结果数组: (校验是否通过true/false, 错误码, 失败描述)
错误码定义如下: 200-成功, 其他-失败
"""
_status = 200
_ip = await AsyncTools.async_run_coroutine(self._get_ip_from_request(*args, **kwargs))
# 先检查白名单
if len(self.ip_dict['whitelist']['show']) > 0:
if not self.verify_whitelist(_ip):
# 不在白名单内
_status = self.error_resp_status
_resp_msg = copy.deepcopy(self.error_resp_msg)
# 再检查黑名单
if _status == 200 and self.verify_blacklist(_ip):
# 在黑名单内
_status = self.error_resp_status
_resp_msg = copy.deepcopy(self.error_resp_msg)
if _status == 200:
# 返回校验成功
return (True, _status, 'success')
else:
# 校验失败
return (False, _status, _resp_msg)
#############################
# 需实现类重载的函数
#############################
def _format_auth_resp(self, code: Any, err_msg: str) -> Any:
"""
格式化校验结果返回值
@param {Any} code - 错误码
@param {str} err_msg - 失败描述
@returns {Any} - 格式化后的返回值
"""
raise NotImplementedError()
def _get_ip_from_request(self, *args, **kwargs) -> str:
"""
从请求信息中获取ip地址
@returns {str} - 返回ip地址
"""
raise NotImplementedError()
[文档]class AppKeyAuth(AuthBaseFw):
"""
AppKey模式验证模块
整体流程: 调用方对请求报文签名 -> 服务方验证请求签名 -> 服务方处理并对返回报文签名 -> 调用方验证返回报文签名
详细说明如下:
1. 服务端生成APP信息, 线下提供给商户
AppId: 商户id
AppKey: 公匙(相当于账号)
AppSecret: 私匙(相当于密码)
2. 客户端对要发送的数据进行签名, 算法如下:
(1) 客户端生成 nonce_str 随机字符串, 例如: 'ibuaiVcKdpRxkhJA'
(2) 设要发送的数据为集合M, 将所有非空参数值的参数按照参数名ASCII码从小到大排序(字典序),
使用URL键值对的格式(即key1=value1&key2=value2…)拼接成字符串stringA, 例如:
stringA="body=test&device_info=1000&mch_id=10000100"
(3) 拼接API密钥
# 拼接app_id、app_key、app_secret、nonce_str、timestamp进入签名字符串
stringSignTemp=stringA+"&app_id=1333&app_key=123456&app_secret=192006250b4c09247ec02edce69f6a2d&nonce_str=xx×tamp=xx"
# 如果选择MD5签名方式, 处理及得到结果如下
sign=MD5(stringSignTemp).toUpperCase()="9A0A8659F005D6984697E2CA0A9CF3B7"
# 如果选择HMAC-SHA256算法签名方式, 处理及得到结果如下
# 注意: 部分语言的hmac方法生成结果二进制结果, 需要调对应函数转化为十六进制字符串。
sign=hash_hmac("sha256",stringSignTemp,AppSecret).toUpperCase()="6A9AE1657590FD6257D693A078E1C3E4BB6BA4DC30B23E0EE2496E54170DACD6"
(4) 将sign放入要发送的数据集合中, 客户端调用api接口
(5) 服务器端同样做相应的认证检查
"""
#############################
# 构造函数
#############################
[文档] def __init__(self, **kwargs):
"""
AppKey模式验证模块
@param {fuction} get_secret_fun - 取 (app_key, app_secret) 密钥对的函数, 默认使用当前类的自有AppKey管理工具函数
fun(app_id:str) -> tuple
@param {bool} sign_resp=False - 是否对返回的报文进行签名
@param {int} sign_error_resp_status=403 - 签名验证失败返回状态码
@param {str|dict} sign_error_resp_msg={'status': '13007', 'msg':'签名检查失败'} - 签名验证失败返回的信息
@param {float} timestamp_expired_time=300.0 - 允许服务器时间差异时长, 单位为秒, 默认5分钟
@param {int} timestamp_error_resp_status=403 - timestamp已过期时返回状态码
@param {str|dict} timestamp_error_resp_msg={'status': '13008', 'msg':'时间戳已过期'} - timestamp已过期返回的信息
@param {int} nonce_len=8 - nonce字符串的长度
@param {str} timestamp_fmt='%Y%m%d%H%M%S' - timestamp的格式
@param {str} encoding='utf-8' - 对中文内容的转换编码
@param {str} algorithm='MD5' - 使用的签名算法名, 支持算法如下
MD5
HMAC-SHA256
@param {dict} algorithm_extend=None - 扩展算法支持, key为algorithm名, value为扩展的算法函数
扩展函数定义如下: fun(value:str, key:str) -> str
"""
self.para = kwargs
self.get_secret_fun = self.para.get('get_secret_fun', self.apk_get_secret_fun)
self.sign_resp = self.para.get('sign_resp', False)
self.interface_id_name = self.para.get('interface_id_name', '')
self.sign_error_resp_status = self.para.get('sign_error_resp_status', 403)
self.sign_error_resp_msg = self.para.get(
'sign_error_resp_msg', {'status': '13007', 'msg': '签名检查失败'}
)
self.timestamp_expired_time = self.para.get('timestamp_expired_time', 300.0)
self.timestamp_error_resp_status = self.para.get('timestamp_error_resp_status', 403)
self.timestamp_error_resp_msg = self.para.get(
'timestamp_error_resp_msg', {'status': '13008', 'msg': '时间戳已过期'}
)
self.timestamp_fmt = self.para.get('timestamp_fmt', '%Y%m%d%H%M%S')
self.encoding = self.para.get('encoding', 'utf-8')
# 算法扩展支持
self.algorithm_mapping = {
'MD5': HCrypto.md5,
'HMAC-SHA256': HCrypto.hmac_sha256
}
self.algorithm_mapping.update(self.para.get('algorithm_extend', {}))
self.algorithm = self.para.get('algorithm', 'MD5')
# 简易的AppKey管理台, 内存字典管理, key为app_id, value为(app_key, app_secret) 键值对
self._app_key_manager = dict()
#############################
# 签名工具
#############################
[文档] def get_signature(self, msg: dict, app_key: str, app_secret: str, algorithm: str = None) -> str:
"""
对消息字典进行签名
@param {dict} msg - 要签名的字典
@param {str} app_key - 商户持有的app_key(相当于公钥)
@param {str} app_secret - 商户持有的AppSecret私匙(相当于密码)
@param {str} algorithm=None - 使用的签名算法名, 如果不传使用初始化类的指定算法, 支持算法如下
MD5
HMAC-SHA256
@returns {str} - 返回签名验证字符串
"""
# 必须要有的参数
_app_id = msg['app_id']
_nonce_str = msg['nonce_str']
_timestamp = msg['timestamp']
# 参数清单组合, 按参数名排序, 去掉非空值, URL键值对方式组合
_para_list = list(msg.keys())
_para_list.sort()
_str_sign_temp = ''
for _para in _para_list:
if _para not in ('app_id', 'nonce_str', 'timestamp', 'sign') and msg[_para] not in (None, ''):
_value = msg[_para]
if type(_value) != str:
_value = json.dumps(_value, ensure_ascii=False, sort_keys=True)
_str_sign_temp = '%s%s=%s&' % (
_str_sign_temp, _para, _value
)
# 增加app_id、app_key、app_secret, nonce_str、timestamp到键值对中
_str_sign_temp = '%sapp_id=%s&app_key=%s&app_secret=%s&nonce_str=%s×tamp=%s' % (
_str_sign_temp, _app_id, app_key, app_secret, _nonce_str, _timestamp
)
# 进行加密处理并返回签名串
_algorithm = self.algorithm if algorithm is None else algorithm
return self.algorithm_mapping[_algorithm](
_str_sign_temp, key=app_secret, encoding=self.encoding
)
[文档] def sign(self, msg: dict) -> dict:
"""
对报文消息字典进行签名
@param {dict} msg - 要签名的报文字典
@returns {dict} - 签名后的报文字典
"""
_app_id = msg['app_id']
msg['nonce_str'] = HCrypto.generate_nonce(self.para.get('nonce_len', 8)) # 随机字符串
msg['timestamp'] = datetime.datetime.now().strftime(self.timestamp_fmt) # 时间戳
_sign_type = msg.get('sign_type', self.algorithm) # 签名类型, 如果有送值代表指定算法
_app_key, _app_secret = self.get_secret_fun(_app_id) # 通过指定的算法获取
msg['sign'] = self.get_signature(msg, _app_key, _app_secret, algorithm=_sign_type)
return msg
[文档] def verify_sign(self, msg: dict) -> bool:
"""
验证报文签名是否准确
@param {dict} msg - 要验证的报文字典
@returns {bool} - 报文验证结果
"""
try:
_app_id = msg['app_id']
_sign_type = msg.get('sign_type', self.algorithm) # 签名类型, 如果有送值代表指定算法
_app_key, _app_secret = self.get_secret_fun(_app_id) # 通过指定的算法获取
_sign = self.get_signature(msg, _app_key, _app_secret, algorithm=_sign_type)
return _sign == msg['sign']
except:
return False
[文档] def verify_timestamp(self, msg: dict) -> bool:
"""
验证时间戳是否已过期
@param {dict} msg - 要验证的报文字典
@returns {bool} - 验证结果
"""
try:
_timestamp = datetime.datetime.strptime(msg['timestamp'], self.timestamp_fmt)
if abs((datetime.datetime.now() - _timestamp).total_seconds()) > self.timestamp_expired_time:
return False
return True
except:
return False
#############################
# 简易AppKey管理台工具
#############################
[文档] def apk_get_secret_fun(self, app_id: str) -> tuple:
"""
自有AppKey管理工具(无安全控制)的取密钥对函数
@param {str} app_id - 要获取的app_id
@returns {tuple} - (app_key, app_secret) 密钥对
"""
return self._app_key_manager[app_id]
[文档] def apk_update_secret(self, app_id: str, key_pair: tuple):
"""
自有AppKey管理工具的密钥对更新
@param {str} app_id - 要更新的app_id
@param {tuple} key_pair - (app_key, app_secret) 密钥对
"""
self._app_key_manager[app_id] = key_pair
[文档] def apk_generate_key_pair(self, app_id: str) -> tuple:
"""
自有AppKey管理工具的生成新密钥对函数(同时可以加入管理工具)
@param {str} app_id - 要获取的app_id
@returns {tuple} - (app_key, app_secret) 密钥对
"""
# 随机生成字符串, app_key 8位, app_secret 32位
_app_key = StringTool.get_random_str(random_length=8)
_app_secret = StringTool.get_random_str(random_length=32)
self._app_key_manager[app_id] = (_app_key, _app_secret)
return (_app_key, _app_secret)
#############################
# 重载基础框架的函数
#############################
[文档] async def auth_required_call(self, f, *args, **kwargs):
"""
直接执行的鉴权修饰符函数
注: 进行了重载, 增加app_id的获取并送入_format_last_resp函数
@param {function} f - 要执行的函数
@param {args} - 执行函数的固定入参
@param {kwargs} - 执行函数的kv入参
@returns {Any} - 返回响应信息(如果执行成功返回函数返回信息, 如果执行失败返回验证失败信息)
"""
# 执行校验操作
_result = await AsyncTools.async_run_coroutine(self._auth_call(*args, **kwargs))
_is_auth_result = False
if not _result[0]:
# 校验失败, 获取校验失败的返回值
_ret = await AsyncTools.async_run_coroutine(
self._format_auth_resp(_result[1], _result[2])
)
_is_auth_result = True
else:
# 校验通过, 执行函数
_ret = await AsyncTools.async_run_coroutine(f(*args, **kwargs))
# 格式化响应对象并返回处理结果
_json = self._get_json_from_request(*args, **kwargs)
_last_ret = await AsyncTools.async_run_coroutine(
self._format_last_resp(_ret, _is_auth_result, _json['app_id'])
)
return _last_ret
async def _auth_call(self, *args, **kwargs) -> tuple:
"""
真正的校验处理函数
@param {args} - 执行函数的固定入参
@param {kwargs} - 执行函数的kv入参
@returns {tuple} - 返回校验结果数组: (校验是否通过true/false, 错误码, 失败描述)
错误码定义如下: 200-成功, 其他-失败
"""
_status = 200
_json_dict = await AsyncTools.async_run_coroutine(self._get_json_from_request(*args, **kwargs))
if not self.verify_timestamp(_json_dict):
# 日期验证失败
_status = self.timestamp_error_resp_status
_resp_msg = copy.deepcopy(self.timestamp_error_resp_msg)
elif not self.verify_sign(_json_dict):
# 验证失败, 返回标准的错误信息
_status = self.sign_error_resp_status
_resp_msg = copy.deepcopy(self.sign_error_resp_msg)
if _status == 200:
# 返回校验成功
return (True, _status, 'success')
else:
# 校验失败
return (False, _status, _resp_msg)
def _format_auth_resp(self, code: Any, err_msg: str) -> Any:
"""
格式化校验结果返回值
@param {Any} code - 错误码
@param {str} err_msg - 失败描述
@returns {tuple} - 返回校验结果数组: (错误码, 失败描述)
错误码定义如下: 200-成功, 其他-失败
"""
return (code, err_msg)
#############################
# 需实现类重载的函数
#############################
def _format_last_resp(self, resp: Any, is_auth_result: bool, app_id: str) -> Any:
"""
格式化最后的响应对象
注意: 实现类需实现以下的伪代码逻辑
if self.sign_resp:
resp_dict = get resp_dict from resp
put app_id to resp_dict
resp_dict = self.sign(resp_dict)
put resp_dict back to resp
@param {Any} resp - 最后的响应对象
@param {bool} is_auth_result - 是否服务鉴权所返回的结果
@param {str} app_id - 送入的app_id
@returns {Any} - 转换以后的响应对象
"""
raise NotImplementedError()
def _get_json_from_request(self, *args, **kwargs) -> dict:
"""
从请求信息中获取消息json
@returns {dict} - 返回json字典
"""
raise NotImplementedError()