HiveNetCore.utils.net_tool 源代码
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
#
# Copyright 2018 黎慧剑
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
网络处理相关工具
@module net_tool
@file net_tool.py
"""
import os
import sys
import socket
import urllib
import urllib.request
import copy
import requests
from urllib.parse import urlparse
import re
import json
import datetime
import logging
import traceback
import hashlib
from io import BytesIO, FileIO
import netifaces
# 根据当前文件路径将包路径纳入, 在非安装的情况下可以引用到
sys.path.append(os.path.abspath(os.path.join(
os.path.dirname(__file__), os.path.pardir, os.path.pardir)))
from HiveNetCore.utils.run_tool import RunTool
import HiveNetCore.utils.wget as wget
__MOUDLE__ = 'net_tool' # 模块名
__DESCRIPT__ = u'网络处理相关工具' # 模块描述
__VERSION__ = '0.1.0' # 版本
__AUTHOR__ = u'黎慧剑' # 作者
__PUBLISH__ = '2018.08.29' # 发布日期
[文档]class NetTool(object):
"""
网络处理相关工具
提供网络处理相关的函数, 包括字节转换处理等
"""
#############################
# 网络字节转换
#############################
[文档] @staticmethod
def int_to_bytes(int_value, fix_len=4, byte_order="big", signed=True):
"""
将整型数据转换为字节数组,与bytes_to_int配套使用
@param {int} int_value - 要转换的数字
@param {int} fix_len=4 - 返回数组的长度, 如果整数转换出的字节数组长度超过了该长度, 则产生OverflowError
@param {string} byte_order="big" - 字节顺序, 值为'big'或者'little':
big - 表示最有意义的字节放在字节数组的开头
little - 表示最有意义的字节放在字节数组的结尾
sys.byteorder - 保存了主机系统的字节序, 可以使用这个属性获取本机顺序
@param {bool} signed=True - 确定是否使用补码来表示整数, 如果值为False, 并且是负数, 则产生OverflowError
@returns {bytes} - 转换后的字节数组
@throws {OverflowError} - 如果整数转换出的字节数组长度超过长度或无符号位时传入负数
"""
return int_value.to_bytes(length=fix_len, byteorder=byte_order, signed=signed)
[文档] @staticmethod
def bytes_to_int(bytes_value, byte_order="big", signed=True):
"""
将字节数组转换为整型数字, 与int_to_bytes配套使用
@param {bytes} bytes_value - 要转换的字节数组
@param {string} byte_order="big" - 字节顺序, 值为'big'或者'little':
big - 表示最有意义的字节放在字节数组的开头
little - 表示最有意义的字节放在字节数组的结尾
sys.byteorder - 保存了主机系统的字节序, 可以使用这个属性获取本机顺序
@param {bool} signed=True - 确定是否使用补码来表示整数, 如果值为False, 并且是负数, 则产生OverflowError
@returns {int} - 转换后的整数
@throws {OverflowError} - 无符号位时传入负数抛出该异常
"""
return int.from_bytes(bytes_value, byteorder=byte_order, signed=signed)
#############################
# 获取网卡相关信息
#############################
[文档] @staticmethod
def get_net_interfaces():
"""
获取本机网卡清单
@return {list} - 返回网卡名(NIC NAME)清单
例如: ['lo0', 'gif0', 'stf0', 'en0', 'en1', 'fw0']
"""
return netifaces.interfaces()
[文档] @staticmethod
def get_net_interface_info(nic_name):
"""
获取指定网卡的地址信息
@param {string} nic_name - 网卡名(NIC NAME)
@return {dict} - 返回网卡的地址信息, 对应信息的Key为协议(部分研究出来的):
netifaces.AF_LINK 物理地址 : {'addr' : 物理地址(MAC地址)}
netifaces.AF_INET 互联网网络地址(ipv4) : {'addr': IP地址, 'netmask': 网络掩码, 'broadcast': 广播地址}
netifaces.AF_INET6 互联网网络地址(ipv6): 跟AF_INET一样
AF_12844 25
AF_APPLETALK 16
AF_ATM 22
AF_BAN 21
AF_CCITT 10
AF_CHAOS 5
AF_CLUSTER 24
AF_DATAKIT 9
AF_DECnet 12
AF_DLI 13
AF_ECMA 8
AF_FIREFOX 19
AF_HYLINK 15
AF_IMPLINK 3
AF_IPX 6
AF_IRDA 26
AF_ISO 7
AF_LAT 14
AF_NETBIOS 17
AF_NETDES 28
AF_NS 6
AF_PUP 4
AF_SNA 11
AF_UNIX 1
AF_UNKNOWN1 20
AF_UNSPEC 0
netifaces.AF_VOICEVIEW 18
"""
return netifaces.ifaddresses(nic_name)
[文档] @staticmethod
def get_net_gateways():
"""
返回本机的网关信息清单
@return {dict} - 网关信息清单, key为协议( 见address_families, 除default不同) ,
value为数组['地址', '使用该网关地址的网卡名NIC NAME', 是否默认网关]
字典里有一个特殊key为'default', 可通过这个获取到默认网关信息
"""
return netifaces.gateways()
[文档] @classmethod
def get_net_interface_info_alias(cls, nic_name: str) -> dict:
"""
获取网卡信息(按容易理解的别名解析)
@param {str} nic_name - 网卡名
@returns {dict} - 网卡信息
{
'nic_name': '网卡名',
'mac' : 'MAC地址',
'ipv4' : { # ipv4信息
{'addr': 'IP地址', 'netmask': '网络掩码', 'broadcast': '广播地址'}
},
'ipv6': { # ipv6信息
{'addr': 'ipv6地址', 'netmask': '网络掩码', 'flags': 1024}
}
}
"""
_info = {
'nic_name': nic_name
}
_base_info = cls.get_net_interface_info(nic_name)
# mac
_mac = _base_info.get(netifaces.AF_LINK, None)
if _mac is not None:
_info['mac'] = _mac[0]['addr']
# ipv4
_ipv4 = _base_info.get(netifaces.AF_INET, None)
if _ipv4 is not None:
_info['ipv4'] = _ipv4[0]
# ipv6
_ipv6 = _base_info.get(netifaces.AF_INET6, None)
if _ipv6 is not None:
_info['ipv6'] = _ipv6[0]
return _info
#############################
# 网页处理相关
#############################
[文档] @classmethod
def get_full_url(cls, url: str, ref_url: str) -> str:
"""
获取完整的url地址
@param {str} url - 要处理的url地址
@param {str} ref_url - 引用的url地址( 页面url)
@returns {str} - 完整url地址
"""
if url.find('://') >= 0:
# 是完整路径
return url
else:
_url_info = urlparse(ref_url)
return '%s://%s/%s' % (
_url_info.scheme, _url_info.netloc, url
)
[文档] @staticmethod
def get_web_page_code(url: str, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
encoding='utf-8', retry=0, proxy: dict = None,
cafile=None, capath=None, cadefault=False, context=None, real_url=['']):
"""
获取网页代码(静态代码)
@param {str|urllib.request.Request} url - 要获取的网页的url, 可用是一个字符串地址或者Request对象
@param {bytes} data=None - Post提交的数据, 正常以GET参数获取, data传值后将以POST参数获取
示例(将data转换为字节, urlencode可用支持数组或列表的对象):
data = urllib.parse.urlencode(data).encode('utf-8')
@param {float} timeout=socket._GLOBAL_DEFAULT_TIMEOUT - 超时时间, 单位为秒
默认使用socket的默认超时时间, 如果没有使用socket.setdefaulttimeout设置, 则是不超时
@param {str} encoding='utf-8' - 解析返回页面内容的编码
@param {int} retry=0 - 超时重试次数
@param {dict} proxy=None - 设置访问代理, 例如{'http': 'http://61.135.217.7:80'}, 或 {'https': 'https://61.135.217.7:443'}
注: 指定代理模式不支持cafile、capath、cadefault、context等参数
@param {str} cafile=None - 本地CA证书文件
@param {str} capath=None - 本地CA证书所在路径
@param {bool} cadefault=False - ?
@param {ssl.SSLContext} context=None - SSL证书验证上下文
@param {list} real_url=[''] - 通过该入参返回真实的url地址(数组第0个)
@return {str} - 访问网页的静态代码
@example url使用的Request对象的生成示例:
# 访问地址
url = 'xxx'
# 报文头信息
headers ={
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36",
}
# 生成Request对象
urllib.request.Request(url, headers=headers)
@example 忽略网站的未经核实的证书认证错误
# 将以下生成的context对象传入执行参数
import ssl
context = ssl._create_unverified_context()
"""
# 创建代理句柄
if proxy is not None:
# 处理代理格式
_new_proxy = dict()
for _key, _val in proxy:
_url_info = urlparse(_val)
_new_proxy[_url_info.scheme] = _url_info.netloc
_proxy_handle = urllib.request.ProxyHandler(_new_proxy)
_opener = urllib.request.build_opener(_proxy_handle)
_retry_time = 1
while True:
# 正式的处理逻辑
try:
real_url[0] = url
if proxy is None:
# 非代理模式
with urllib.request.urlopen(
url, data=data, timeout=timeout, cafile=cafile,
capath=capath, cadefault=cadefault, context=context
) as _res:
real_url[0] = _res.url
return _res.read().decode(encoding)
else:
# 代理模式
with _opener.open(url, data=data, timeout=timeout) as _res:
real_url[0] = _res.url
return _res.read().decode(encoding)
except urllib.error.HTTPError:
if _retry_time <= retry:
_retry_time += 1
continue
else:
raise
#############################
# 下载文件相关
#############################
[文档] @staticmethod
def get_http_fileinfo(url: str, headers={}, connect_timeout=None,
params={}, proxies={}, verify=True, cookies={}):
"""
获取网络文件基本信息
@param {str} url - 下载文件的url
@param {dict} headers={} - 下载请求要自定义的http头
@param {float} connect_timeout=None - 连接服务器的超时时间, 单位为秒
@param {dict} params={} - 请求url的参数, 拼接到url中, 例如"https://www.baidu.com/s?wd=Python"
@param {dict} proxies={} - 代理服务器, 例如:
proxies={
"http":"http://1.192.242.107:9999"
# "https":"https://192.168.0.1:80"
}
@param {bool} verify=True - ssl证书验证是否跳过, 可设置为False跳过
@param {dict} cookies={} - cookies参数
@return {dict} - 文件基本信息字典:
name - 文件名
size - 文件大小, 如果不可预知的大小则为-1
support_continue - 是否支持断点续传
"""
# 基本参数
_headers = copy.deepcopy(headers)
_headers['Range'] = 'bytes=0-4'
_content_range = ''
_support_continue = False
_total_size = -1
# 访问网络并获取请求
_res = requests.head(
url, headers=_headers, timeout=connect_timeout,
verify=verify, proxies=proxies, params=params, cookies=cookies
)
try:
if _res.status_code == 206:
# 服务器支持断点
if 'content-range' in _res.headers.keys():
_content_range = _res.headers['content-range']
try:
_total_size = int(re.match(r'^bytes 0-4/(\d+)$', _content_range).group(1))
_support_continue = True
except:
_total_size = int(_res.headers['content-length'])
else:
_total_size = int(_res.headers['content-length'])
except:
pass
# 返回结果
return {
'name': os.path.split(url)[1],
'size': _total_size,
'support_continue': _support_continue
}
[文档] @staticmethod
def download_http_file(url: str, filename=None, path='', is_resume=False, headers={}, connect_timeout=None,
params={}, proxies={}, verify=True, cookies={},
block_size=1024, retry=0, show_rate=False):
"""
下载文件
@param {str} url - 要下载的文件url
@param {str} filename=None - 保存的文件名, 如果传None代表使用url的文件名
@param {str} path='' - 要保存的文件路径
@param {bool} is_resume=False - 是否使用续传功能, 如果不使用则统一用wget下载
@param {dict} headers={} - 要带上的http协议头
@param {float} connect_timeout=None - 连接超时时间, 单位为秒
@param {dict} params={} - 请求url的参数, 拼接到url中, 例如"https://www.baidu.com/s?wd=Python"
@param {dict} proxies={} - 代理服务器, 例如:
proxies={
"http":"http://1.192.242.107:9999"
# "https":"https://192.168.0.1:80"
}
@param {bool} verify=True - ssl证书验证是否跳过, 可设置为False跳过
@param {dict} cookies={} - cookies参数
@param {int} block_size=1024 - 每次下载块大小, 单位为byte
@param {int} retry=0 - 自动重试次数
@param {bool} show_rate=False - 显示下载进度(仅wget模式支持)
"""
_retry_time = 0
_fileinfo = None
_filename = filename
# 只有需要断点续传的情况才获取文件信息
if is_resume:
while True:
try:
_fileinfo = NetTool.get_http_fileinfo(
url, headers=headers, connect_timeout=connect_timeout, params=params,
proxies=proxies, verify=verify, cookies=cookies
)
break
except:
if _retry_time < retry:
_retry_time += 1
continue
else:
raise
_filename = filename
if filename is None:
_filename = _fileinfo['name']
# 加上路径
if _filename is None:
_filename = os.path.split(url)[1]
_filename = os.path.join(path, _filename)
if not is_resume or not _fileinfo['support_continue']:
# 不需要续传, 或不支持自动续传, 使用wget执行完整的下载, 但不显示下载进度
_bar = None
if show_rate:
_bar = wget.bar_adaptive
_new_proxy = dict()
if proxies is not None and len(proxies) > 0:
for _key in proxies.keys():
_url_info = urlparse(proxies[_key])
_new_proxy[_url_info.scheme] = _url_info.netloc
if cookies is not None and len(cookies) > 0:
# 将cookie放入头文件
if headers is None:
headers = {}
_cookie_str_list = list()
for _key, _val in cookies:
_cookie_str_list.append('%s=%s' % (_key, _val))
headers['Cookie'] = '; '.join(_cookie_str_list)
wget.download(url, out=_filename, bar=_bar, headers=headers, proxy=_new_proxy)
else:
# 自动续传
_headers = copy.deepcopy(headers)
_down_size = 0
_temp_file = _filename + '.dt'
if os.path.exists(_temp_file):
_down_size = os.path.getsize(_temp_file)
# 打开文件进行处理
_file = open(_temp_file, 'ab')
try:
# 设置报文头, 并连接
_headers['Range'] = 'bytes=%d-' % _down_size
_retry_time = 0
_res = None
while True:
try:
_res = requests.get(
url, timeout=connect_timeout, stream=True, verify=verify, headers=_headers,
proxies=proxies, params=params, cookies=cookies
)
break
except:
if _retry_time < retry:
_retry_time += 1
continue
else:
raise
for _chunk in _res.iter_content(chunk_size=block_size):
if _chunk:
_file.write(_chunk)
_down_size += len(_chunk)
_file.flush()
RunTool.sleep(0.001)
finally:
_file.close()
# 正常处理完成
if os.path.exists(_filename):
# 删除原来存在的文件
os.remove(_filename)
# 修改文件名
os.rename(_temp_file, _filename)
#############################
# 文件传输处理支持
#############################
[文档] @classmethod
def get_file_md5(self, file, buffer_size: int = 4096):
"""
获取文件md5值
@param {str|FileIO|bytes]} file - 文件路径, 或已打开的文件对象, 或文件字节数组
"""
_md5 = hashlib.md5() # 创建md5对象
if type(file) == str:
# 传入的是文件路径
with open(file, 'rb') as _f:
while True:
_data = _f.read(buffer_size)
if not _data:
break
_md5.update(_data) # 更新md5对象
elif type(file) == FileIO:
while True:
_data = file.read(buffer_size)
if not _data:
break
_md5.update(_data) # 更新md5对象
else:
# 字节
_md5.update(file)
return _md5.hexdigest() # 返回md5对象
#############################
# Restful Api相关
#############################
[文档] @staticmethod
def restful_api_call(url: str, method: str = 'get', back_type: str = 'json', encoding: str = None,
block_size: int = 1024, save_file: str = None,
raise_exception: bool = False, success_code: list = [200],
logger=None, log_level: int = logging.DEBUG, **kwargs):
"""
调用Restful Api
@param {str} url - 要调用的url地址
@param {str} method='get' - Http方法
@param {str} back_type='json' - 返回信息的类型
json - json对象(字典或列表)
text - 文本
file - 存入指定文件
bytes - 字节数组
@param {str} encoding=None - 编码, 如果不传则使用返回http头的字符集
@param {int} block_size=1024 - 如果指定stream时, 每次获取的数据块大小
@param {str} save_file=None - back_type为file时指定要存储的文件
@param {bool} raise_exception=False - 当出现异常时是否抛出异常
@param {list} success_code=[200] - 识别返回的status_code为成功的清单
@param {Logger} logger=None - 日志对象
@param {int} log_level=logging.DEBUG - 日志级别
@param {kwargs} - 扩展参数, 参考requests.request的参数, 主要参数如下:
headers {dict} - 要带上的http协议头
params {dict} - 请求url的参数, 拼接到url中, 例如"https://www.baidu.com/s?wd=Python"
json {object} - 报文体内容, 可以转换为json字符串的python对象, 例如dict、list等
data {object} - 报文体内容, 可以是字典, 元组列表, 字节或文件对象
注: json和data可选其中一种方式送入报文体中
timeout {float} - 超时时间(秒)
stream {bool} - 指示返回数据是否以流的方式处理
verify {bool} - ssl证书验证是否跳过, 可设置为False跳过
allow_redirects {bool} - 是否允许重定向
proxies {dict} - 代理服务器, 例如:
proxies={
"http":"http://1.192.242.107:9999"
# "https":"https://192.168.0.1:80"
}
@returns {dict} - 返回请求结果字典
{
'is_success': bool_是否成功,
'status_code': int_响应状态码,
'headers': dict_响应http头,
'back_object': object_返回对象, 对应调用参数可以为dict、str、bytes、文件名,
'exception': 如果出现异常, 异常对象,
'encoding': 字符集
}
"""
# 开始先记录日志
if logger is not None:
_logger_error = False
_start_time = datetime.datetime.now()
_log_str = '[INF-SEND]%s %s' % (method, url)
if 'params' in kwargs.keys():
_log_str = '%s %s' % (_log_str, str(kwargs['params']))
if 'headers' in kwargs.keys():
_log_str = '%s\n%s' % (_log_str, str(kwargs['headers']))
if 'json' in kwargs.keys():
_log_str = '%s\n%s' % (_log_str, str(kwargs['json']))
if 'data' in kwargs.keys():
if type(kwargs['data']) == bytes:
_log_str = '%s\n%s' % (_log_str, ' '.join(
[hex(int(i)) for i in kwargs['data']]))
else:
_log_str = '%s\n%s' % (_log_str, str(kwargs['data']))
logger.log(log_level, _log_str)
# 开始处理
_back = {
'is_success': True,
'status_code': -1,
'headers': None,
'back_object': None,
'exception': None,
'encoding': None
}
try:
_resp = requests.request(method, url, **kwargs)
_back['status_code'] = _resp.status_code
_back['is_success'] = (_resp.status_code in success_code)
_back['headers'] = _resp.headers
if _back['is_success']:
_encoding = _resp.encoding if encoding is None else encoding
_stream_io = None
_bytes = []
if back_type == 'file':
_stream_io = open(save_file, 'wb')
_back['back_object'] = save_file
else:
_stream_io = BytesIO()
if kwargs.get('stream', False):
# 流模式, 通过IO获取内容, 并保存
try:
for _chunk in _resp.iter_content(chunk_size=block_size):
if _chunk:
_stream_io.write(_chunk)
_stream_io.flush()
RunTool.sleep(0.001)
if back_type != 'file':
# 非文件模式, 转换为bytes数组
_bytes = _stream_io.getvalue()
finally:
_stream_io.close()
else:
_bytes = _resp.content
# 处理非file的存储
if back_type == 'json':
_back['back_object'] = json.loads(_bytes, encoding=_encoding)
elif back_type == 'text':
_back['back_object'] = str(_bytes, encoding=_encoding)
elif back_type != 'file':
# 直接返回字节数组
_back['back_object'] = _bytes
except:
if logger is not None:
_logger_error = True
_use = str((datetime.datetime.now() - _start_time).total_seconds())
_log_str = '[INF-SEND][USE:%ss][EX:%s]%s %s\n%s' % (
_use, str(sys.exc_info()[0]), method, url, traceback.format_exc()
)
logger.log(logging.ERROR, _log_str)
if raise_exception:
# 抛出异常
raise
else:
_back['is_success'] = False
_back['exception'] = sys.exc_info()[1]
if logger is not None and not _logger_error:
_use = str((datetime.datetime.now() - _start_time).total_seconds())
_log_str = '[INF-BACK][USE:%ss]%s %s %s\n%s\n%s' % (
_use, method, url, str(_back['status_code']), str(_back['headers']),
str(_back['back_object']) if type(_back['back_object']) != bytes else ' '.join(
[hex(int(i)) for i in _back['back_object']])
)
logger.log(log_level, _log_str)
# 返回结果
return _back
if __name__ == '__main__':
# 当程序自己独立运行时执行的操作
# 打印版本信息
print(('模块名: %s - %s\n'
'作者: %s\n'
'发布日期: %s\n'
'版本: %s' % (__MOUDLE__, __DESCRIPT__, __AUTHOR__, __PUBLISH__, __VERSION__)))