#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
# Copyright 2019 黎慧剑
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
文件保存对象
@module saver
@file saver.py
"""
import os
import sys
import json
import threading
from HiveNetCore.utils.file_tool import FileTool
from HiveNetCore.utils.net_tool import NetTool
from HiveNetCore.utils.run_tool import WaitLockTool
from HiveNetCore.utils.debug_tool import DebugTool
# 根据当前文件路径将包路径纳入, 在非安装的情况下可以引用到
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)))
from HiveNetFileTransfer.exceptions import InfoFileLockError, AlreadyKnowFileSizeError, Md5VerifyError
__MOUDLE__ = 'saver' # 模块名
__DESCRIPT__ = u'文件保存对象' # 模块描述
__VERSION__ = '0.1.0' # 版本
__AUTHOR__ = u'黎慧剑' # 作者
__PUBLISH__ = '2021.08.23' # 发布日期
[文档]class TransferSaver(object):
"""
文件保存对象
"""
[文档] def __init__(self, file: str, is_resume: bool = True, file_size: int = None, md5: str = None,
is_overwrite: bool = False,
temp_ext: str = 'tmp', info_ext: str = 'info',
extend_info: dict = None, thread_num: int = 1, block_size: int = 4096, cache_size: int = 1024,
auto_expand: bool = True):
"""
初始化文件保存对象
@param {str} file - 文件保存路径(含文件名)
@param {bool} is_resume=True - 指定是否续传(自动查找已下载的信息), 如果不指定续传将自动删除原来已下载临时文件
注:如果指定续传, 且可以找到原来的临时文件, 则以下参数将使用原来的信息, 如果有传入则会进行差异值的校验:
file_size、md5
@param {int} file_size=None - 文件大小, 单位为byte, 如果为None代表未知文件大小, 此时auto_expand参数固定为True
@param {str} md5=None - 验证文件的md5字符串, 如果不传代表不进行验证
@param {bool} is_overwrite=False - 是否覆盖已有文件, 如果为否, 则目标文件已存在的情况下抛出异常
@param {str} temp_ext='tmp' - 处理过程中临时文件扩展名
@param {str} info_ext='info' - 处理过程中信息文件扩展名
@param {dict} extend_info=None - 处理过程中要保存的信息字典, 例如保存文件下载路径, 引用页等信息
@param {int} thread_num=1 - 写入处理线程数量
@param {int} block_size=4096 - 每次写入块大小, 单位为byte
@param {int} cache_size=1024 - 单线程缓存大小, 单位为kb(注意:真实缓存大小还需要乘以处理线程数量)
@param {bool} auto_expand=True - 是否自动扩展文件大小(否则在初始化时会自动创建指定大小的文件)
@throws {FileExistsError} - 如果下载文件已存在且不允许覆盖的情况抛出异常
@throws {FileNotFoundError} - 续传情况下临时文件不存在则抛出异常
@throws {InfoFileLockError} - 如果已打开信息文件进行文件存储处理, 抛出该异常
"""
# 检查文件是否存在
self._file = os.path.abspath(file)
self._path, self._filename = os.path.split(self._file)
if os.path.exists(self._file):
# 文件已存在
if is_overwrite:
FileTool.remove_file(self._file)
else:
raise FileExistsError('file exists: %s' % self._file)
else:
# 创建目录
FileTool.create_dir(self._path, exist_ok=True)
# 文件信息字典, 该字典登记文件基本信息和写入情况
self._info: dict = None
# 锁文件, 控制一个文件不能被多个类处理, 先尝试创建锁文件, 如果创建失败会抛出异常
self._lock_file = os.path.join(self._path, '%s.%s' % (self._filename, 'lock'))
try:
self._lock_file_handle = os.open(
self._lock_file, os.O_CREAT | os.O_EXCL | os.O_RDWR
)
except:
raise InfoFileLockError('info file is locked')
try:
# 获取是否debug状态
self._debug_on = DebugTool.is_debug_on()
self._lock_print_timeout = None
if self._debug_on:
self._lock_print_timeout = 5.0 # 打印锁等待超时时间
# 处理信息字典、临时文件、信息文件
self._temp_file = os.path.join(self._path, '%s.%s' % (self._filename, temp_ext))
self._info_file = os.path.join(self._path, '%s.%s' % (self._filename, info_ext))
self._auto_expand = auto_expand
self._thread_num = thread_num
self._block_size = block_size
self._cache_size = cache_size * 1024
# 数据处理锁
self._cache_info_lock = threading.RLock() # 缓存信息更新锁
self._tmp_file_lock = threading.RLock() # 缓存文件写入锁
self._is_finished = False # 要控制的完成状态
self._dealed_finished_lock = threading.RLock() # 控制多线程操作结束函数的状态更新锁
self._dealed_finished = False # 控制多线程操作结束函数只执行一次的变量
if is_resume and os.path.exists(self._info_file):
# 自动续传情况
self._info_file_handle = open(self._info_file, 'r+', encoding='utf-8')
self._info_file_handle.seek(0)
self._info = json.loads(self._info_file_handle.read())
# 检查传入信息是否一致
if file_size is not None and file_size != self._info['file_size']:
raise AttributeError('resume info [file_size] inconsistency, info file [%s], now [%s]' % (
str(self._info['file_size']), str(file_size)
))
if md5 is not None and md5 != self._info['md5']:
raise AttributeError('resume info [md5] inconsistency, info file [%s], now [%s]' % (
self._info['md5'], md5
))
# 检查临时文件
self._temp_file = os.path.join(self._path, self._info['tmp_file'])
if not os.path.exists(self._temp_file):
# 临时文件不存在
raise FileNotFoundError('temp file is not found: %s' % self._temp_file)
self._tmp_file_handle = open(self._temp_file, 'rb+')
self._tmp_file_handle.seek(0)
else:
# 删除已存在的临时文件信息
if os.path.exists(self._temp_file):
FileTool.remove_file(self._temp_file)
if os.path.exists(self._info_file):
FileTool.remove_file(self._info_file)
# 形成信息字典
self._info = {
'tmp_file': '%s.%s' % (self._filename, temp_ext), # 临时文件名称
'file_size': -1 if file_size is None else file_size, # 文件大小
'write_size': 0, # 已写入数据大小
'md5': '' if md5 is None else md5, # md5校验值
'extend_info': {} if extend_info is None else extend_info, # 传入的扩展信息
# 存储索引, 按位置顺序在数组中登记未写入区间, 数组每一项登记未写入数据的开始位置和结束位置
'store_index': [[0, file_size - 1]]
}
# 生成临时文件
self._tmp_file_handle = open(self._temp_file, 'wb')
if not auto_expand and file_size is not None:
# 直接生成指定大小的文件
self._tmp_file_handle.seek(file_size - 1) # 跳到指定位置
self._tmp_file_handle.write(b'\x00') # 一定要写入一个字符, 否则无效
self._tmp_file_handle.flush()
# 写入信息字典文件
self._info_file_handle = open(self._info_file, 'w', encoding='utf-8')
self._write_info_file()
# 合并存储索引, 把碎片合并成为大块
self._info['store_index'] = self._f_merge_store_index(self._info['store_index'])
# 初始化缓存等信息
if self._info['file_size'] == -1:
# 如果没有文件大小的情况, 不支持拆分多写入线程和一次性创建指定大小文件的情况
self._thread_num = 1
self._auto_expand = True
# 缓存处理
self._max_cache_pos = [-1, ] # 当前缓存分配到的区域最大位置
self._cache = dict()
for _i in range(self._thread_num):
self._cache[_i] = {
'start': -1, # 缓存数据对应文件的写入位置, -1代表没有设置
'size': 0, # 缓存数据大小
'buffer': bytes(), # 具体的缓存数据
'end_pos': -1, # 该缓存对应线程要处理的文件块结束位置
'lock': threading.RLock(), # 用于缓存线程处理的锁)
'get_start': -1, # 当前正在获取的数据的开始位置
'get_size': 0, # 当前要获取数据的大小
}
# 分配每个缓存要处理文件区域
for _i in range(self._thread_num):
self._set_cache_area(_i)
except:
# 如果初始化出现异常, 清理文件句柄及锁文件
self._clear_file_handle_and_lock()
raise
#############################
# with 方法支持
#############################
def __enter__(self):
"""
with方法进入的处理
"""
return self
def __exit__(self, type, value, trace):
"""
with方法退出函数
@param {object} type - 执行异常的异常类型
@param {object} value - 执行异常的异常对象值
@param {object}} trace - 执行异常的异常trace对象
"""
# 关闭资源
self.close()
[文档] def close(self):
"""
主动关闭保存对象
"""
# 先写入缓存数据
self.flush()
# 关闭所有打开文件
self._clear_file_handle_and_lock()
# 清空变量
self._info.clear()
self._cache.clear()
#############################
# 工具函数
#############################
[文档] def get_save_info(self) -> dict:
"""
获取数据保存信息
@returns {dict} - 数据保存的信息字典
{
'file_size': -1, # 要接收的文件大小, -1 代表不确定文件实际大小
'write_size': 0, # 已写入的数据大小
'md5': '', # 文件的md5值
}
"""
return {
'file_size': -1 if self._info['file_size'] is None else self._info['file_size'],
'write_size': self._info['write_size'],
'md5': self._info['md5']
}
[文档] def get_extend_info(self) -> dict:
"""
获取保存的信息字典
@returns {dict} - 返回保存的信息字典
"""
return self._info['extend_info']
[文档] def flush(self):
"""
将缓存中的数据全部写入文件
"""
for _i in range(self._thread_num):
self._flush_cache(_i)
[文档] def finished(self):
"""
对于未知文件大小的情况, 由该函数指定文件写入完成
@throws {AlreadyKnowFileSizeError} - 对于文件大小已知的处理情况, 如果调用该函数会抛出异常
"""
if self._info['file_size'] != -1:
raise AlreadyKnowFileSizeError('already know file size')
# 先将缓存信息写入临时文件
self.flush()
# 将临时文件改名, 清除临时文件
self._write_finished()
[文档] def write_data(self, index: int = 0, start: int = None, size: int = None,
data: bytes = None) -> dict:
"""
写入数据的处理方法
@param {int} index=0 - 指定写入数据的线程索引
@param {int} start=None - 数据在文件的开始位置, 如果传空代表请求该线程索引对应的获取任务信息
@param {int} size=None - 传入数据的长度
@param {bytes} data=None - 传入数据字节数组
@returns {dict} - 返回下一个任务要获取的信息字典, 格式为:
{
'status': 0, # 状态, 0-成功, 1-开始位置与线程缓存不一致, 2-全部下载完成, 3-文件md5校验失败
'index': 0, # 当前线程索引
'start': -1, # 开始位置, 如果传入-1代表该线程已无获取任务
'size': 0, # 要获取数据的大小
}
"""
if index >= self._thread_num:
# 索引有错误, 直接返回无获取任务的信息, 让前端不再处理该线程索引
return {'status': 0, 'index': index, 'start': -1, 'size': 0}
# 单索引线程需要锁定
with WaitLockTool(
self._cache[index]['lock'], print_timeout=self._lock_print_timeout,
label='cache %d' % index, print_acquire_ok=self._debug_on,
print_release=self._debug_on
):
_status = 0
if start is not None:
# 涉及到修改cache, 统一锁定
with WaitLockTool(
self._cache_info_lock, print_timeout=self._lock_print_timeout,
label='_cache_info_lock %d' % index, print_acquire_ok=self._debug_on,
print_release=self._debug_on
):
# 写入数据到缓存
_status = self._write_data_to_cache(index, start, size, data)
# 获取新的获取信息
if _status == 0:
self._set_cache_area(index, lock_cache=False)
# 返回下一次获取的数据区间
_cache = self._cache[index]
if _cache['start'] == -1:
# 判断是否全部处理任务已完成
if self._is_finished:
# 全部完成
_status = 2
self._dealed_finished_lock.acquire()
# 将临时文件改名, 清除临时文件
try:
if not self._dealed_finished:
self._dealed_finished = True
self._write_finished()
except Md5VerifyError:
_status = 3
except:
raise
finally:
self._dealed_finished_lock.release()
return {
'status': _status, 'index': index, 'start': -1, 'size': 0
}
else:
return {
'status': _status, 'index': index,
'start': _cache['get_start'],
'size': _cache['get_size']
}
#############################
# 内部函数
#############################
def _write_info_file(self):
"""
将内存写入信息文件
"""
self._info_file_handle.truncate(0)
self._info_file_handle.seek(0)
self._info_file_handle.write(
json.dumps(self._info, ensure_ascii=False, indent=2)
)
self._info_file_handle.flush()
def _clear_file_handle_and_lock(self):
"""
清理文件句柄及锁文件
"""
# 关闭文件句柄
if hasattr(self, '_info_file_handle') and self._info_file_handle is not None:
self._info_file_handle.close()
if hasattr(self, '_tmp_file_handle') and self._tmp_file_handle is not None:
self._tmp_file_handle.close()
if hasattr(self, '_lock_file_handle'):
if self._lock_file_handle is not None:
# 删除锁文件
os.close(self._lock_file_handle)
os.remove(self._lock_file)
def _set_cache_area(self, index: int, lock_cache: bool = True):
"""
设置缓存的处理区域
@param {int} index - 要处理的缓存对应处理线程索引
@throws {KeyError} - 当送入的index超过了写入线程数范围时抛出异常
"""
if index >= self._thread_num:
raise KeyError('cache index[%s] error, out of range' % str(index))
self._f_set_cache_area(
self._cache, index, self._info['store_index'], self._cache_size,
self._block_size, self._max_cache_pos,
cache_info_lock=self._cache_info_lock if lock_cache else None,
lock_print_timeout=self._lock_print_timeout, debug_on=self._debug_on
)
def _flush_cache(self, index: int, lock_cache: bool = True) -> bool:
"""
将指定缓存数据写入文件
@param {int} index - 要处理的缓存对应处理线程索引
@param {bool} lock_cache=True - 是否锁定缓存
@returns {bool} - 指示是否有正确写入缓存数据
@throws {KeyError} - 当送入的index超过了写入线程数范围时抛出异常
"""
if index >= self._thread_num:
raise KeyError('cache index[%s] error, out of range' % str(index))
_is_writed = False
_cache = self._cache[index]
with WaitLockTool(
self._tmp_file_lock, print_timeout=self._lock_print_timeout,
label='_tmp_file_lock %d' % index, print_acquire_ok=self._debug_on,
print_release=self._debug_on, force_no_acquire=(not lock_cache)
):
if _cache['size'] <= 0:
# 没有要写入的数据
return _is_writed
# 临时参数
_start_pos = _cache['start']
_size = _cache['size']
# 写入文件, 更新info字典
self._tmp_file_handle.seek(_start_pos)
_write_size = self._tmp_file_handle.write(_cache['buffer'])
self._tmp_file_handle.flush()
# 更新info字典的store_index
with WaitLockTool(
self._cache_info_lock, print_timeout=self._lock_print_timeout,
label='_cache_info_lock %d' % index, print_acquire_ok=self._debug_on,
print_release=self._debug_on
):
_is_writed = self._f_update_store_index(
self._info['store_index'], _start_pos, _size
)
if len(self._info['store_index']) == 0:
# 如果已经没有区块, 设置完成标志
self._is_finished = True
if _is_writed:
# 更新已写入数据大小
self._info['write_size'] += _write_size
# 写入info文件
if not self._is_finished:
self._write_info_file()
# 更新cache信息
_end_pos = _cache['end_pos']
if _end_pos is not None and (_start_pos + _size) >= _end_pos:
# 区间已经处理完成
_cache['start'] = -1
_cache['end_pos'] = -1
else:
_cache['start'] = _start_pos + _size
_cache['size'] = 0
_cache['buffer'] = bytes()
def _write_finished(self):
"""
文件写入结束
@throws {Md5VerifyError} - 当文件校验失败时抛出异常
"""
# 关闭文件句柄
self._info_file_handle.close()
self._info_file_handle = None
self._tmp_file_handle.close()
self._tmp_file_handle = None
os.close(self._lock_file_handle)
self._lock_file_handle = None
# 检查md5
if self._info['md5'] != '':
_file_md5 = NetTool.get_file_md5(self._temp_file)
if self._info['md5'] != _file_md5:
raise Md5VerifyError('md5 verify error')
# 修改临时文件名
os.rename(
self._temp_file, self._file
)
# 删除临时文件
FileTool.remove_file(self._info_file)
FileTool.remove_file(self._lock_file)
def _write_data_to_cache(self, index: int, start: int, size: int, data: bytes) -> int:
"""
写入数据到缓存
@param {int} index - 获取缓存数据的线程索引
@param {int} start - 要写入文件的开始位置
@param {int} size - 要写入数据的大小
@param {bytes} data - 要写入的输入
@returns {int} - 写入结果, 0-成功, 1-开始位置与线程缓存不一致
"""
# 写入数据到缓存
_cache = self._cache[index]
if start != _cache['get_start']:
# 开始位置与线程缓存不一致, 写入失败
return 1
else:
# 进行写入处理, 添加数据到缓存中, 更新缓存数据需要获取锁
_cache['size'] += size
_cache['buffer'] += data
# 重置正在获取的数据位置
_cache['get_start'] = -1
_cache['get_size'] = 0
# 判断是否需要写入(超过缓存控制大小, 或已写到当前cache的结束位置)
_end_pos = _cache['end_pos']
if _cache['size'] >= self._cache_size or (_end_pos != -1 and _end_pos <= (_cache['start'] + _cache['size'])):
self._flush_cache(index, lock_cache=False)
# 返回处理成功
return 0
#############################
# 内部静态算法函数, 为了便于测试算法准确性
#############################
@classmethod
def _f_merge_store_index(cls, store_index: list) -> list:
"""
合并存储索引数组, 将碎片区域合并为大区域
@param {list} store_index - 索引存储数组
按位置顺序在数组中登记未写入区间, 数组每一项登记未写入数据的开始位置和结束位置
@returns {list} - 合并后的存储索引数组
"""
_index = len(store_index) - 1
while _index > 0:
if store_index[_index][0] == store_index[_index - 1][1] + 1:
# 两个区域可以合并, 将上一项的结尾设置为当前项的结尾, 并删除当前项
store_index[_index - 1][1] = store_index[_index][1]
store_index.pop(_index)
_index -= 1
return store_index
@classmethod
def _f_set_cache_area(cls, cache: dict, index: int, store_index: list, cache_size: int, block_size: int,
max_cache_pos: list,
cache_info_lock: threading.RLock = None,
lock_print_timeout: float = None, debug_on: bool = False):
"""
设置缓存的处理区域
@param {dict} cache - 所有缓存信息字典, 格式如下:
{
index: {
'start': -1, # 缓存数据对应文件的写入位置, -1代表没有设置
'size': 0, # 缓存数据大小
'buffer': bytes(), # 具体的缓存数据
'end_pos': -1, # 该缓存对应线程要处理的文件块结束位置
'lock': threading.RLock, # 用于锁定缓存数据的锁
},
...
}
@param {int} index - 当前要设置的缓存
@param {list} store_index - 索引存储数组
按位置顺序在数组中登记未写入区间, 数组每一项登记未写入数据的开始位置和结束位置
@param {int} cache_size - 单线程缓存大小
@param {int} block_size - 每次写入块大小, 单位为byte
@param {list} max_cache_pos - 当前缓存分配到的区域最大位置, 采用数组方式是因为需要更新返回值
@param {threading.RLock} cache_info_lock=None - 需要传入的缓存信息更新锁, 不传代表无需锁定
@param {float} lock_print_timeout=None - 锁等待打印超时时间
@param {bool} debug_on=False - debug状态
"""
with WaitLockTool(
cache_info_lock, print_timeout=lock_print_timeout,
label='_cache_info_lock %d' % index, print_acquire_ok=debug_on,
print_release=debug_on, force_no_acquire=(cache_info_lock is None)
):
_set_ok = False # 是否已分配成功的标记
# 非已完结缓存, 从自身分配即可
if cache[index]['start'] != -1:
_set_ok = True
# 自身无法分配, 需要从其他区域分配
# 先遍历信息字典中的存储索引, 看是否还有缓存未覆盖的区域
if not _set_ok:
for _store_area in store_index:
if _store_area[0] > max_cache_pos[0]:
# 找到可分配区域
cache[index]['start'] = _store_area[0]
cache[index]['end_pos'] = _store_area[1]
_set_ok = True
if _store_area[1] > max_cache_pos[0]:
max_cache_pos[0] = _store_area[1] # 当前缓存分配到的区域最大位置
break
# 已经没有可分配的存储索引, 考虑截断其他缓存的区域
if not _set_ok:
# 找到区域最大的缓存
_max_index = -1
_max_area = 0
for _index, _cache_info in cache.items():
if _cache_info['start'] == -1 or _cache_info['end_pos'] == -1:
# 无法拆分的缓存
continue
# _area = _cache_info['end_pos'] - _cache_info['start'] - _cache_info['size']
_cache_real_start = _cache_info['start'] + _cache_info['size']
if _cache_info['get_start'] != -1:
_cache_real_start = _cache_info['get_start'] + _cache_info['get_size']
_area = _cache_info['end_pos'] - _cache_real_start
if _area > _max_area:
_max_area = _area
_max_index = _index
if _max_index == -1:
# 没有找到可拆分的缓存
return
# 从最大缓存进行拆分
# _area = cache[_max_index]['end_pos'] - cache[_max_index]['start'] - cache_size
if cache[_max_index]['start'] == -1:
# 过程中已处理, 已经无法再分
return
_cache_real_start = cache[_max_index]['start'] + cache[_max_index]['size']
if cache[_max_index]['get_start'] != -1:
_cache_real_start = cache[_max_index]['get_start'] + \
cache[_max_index]['get_size']
_area = cache[_max_index]['end_pos'] - _cache_real_start
if _area <= cache_size:
# 余下区域小于一个cache, 无需再拆分
return
# 进行切割
_split_size = round(_area / 2)
cache[index]['start'] = _cache_real_start + _split_size
cache[index]['end_pos'] = cache[_max_index]['end_pos']
cache[_max_index]['end_pos'] = cache[index]['start'] - 1
_set_ok = True
# 设置已分配索引的获取位置
if _set_ok:
cache[index]['get_start'] = cache[index]['start'] + cache[index]['size']
cache[index]['get_size'] = min(
block_size, cache[index]['end_pos'] - cache[index]['get_start'] + 1
)
@classmethod
def _f_update_store_index(cls, store_index: list, start_pos: int, size: int) -> bool:
"""
写入数据后更新存储索引数组
注:该方法不处理跨多个存储块写入的情况
@param {list} store_index - 索引存储数组
按位置顺序在数组中登记未写入区间, 数组每一项登记未写入数据的开始位置和结束位置
@param {int} start_pos - 要写入数据的开始位置
@param {int} size - 要写入数据的大小
@param {bool} - 指示是否变更了存储索引数组
"""
for _i in range(len(store_index)):
if start_pos == store_index[_i][0]:
# 直接按顺序向下存储, 同时兼容了区间结尾为None的情况
store_index[_i][0] += size
elif start_pos > store_index[_i][0] and start_pos <= store_index[_i][1]:
# 将区域从中间切割开
_end_pos = store_index[_i][1]
store_index[_i][1] = start_pos - 1 # 前面的区域的结束位置设定为写入位置的前一个
_i += 1 # 要判断的是新区域
store_index.insert(_i, [start_pos + size, _end_pos]) # 添加新增加的区域
else:
# 在下一个区域
continue
# 判断当前区域是否已写入完成
if store_index[_i][1] is not None and store_index[_i][0] > store_index[_i][1]:
# 整个区域已经处理完成, 直接从数组移走
store_index.pop(_i)
# 退出循环处理
return True
# 如果能走到结束, 代表没有找到可修改的存储索引
return False
if __name__ == '__main__':
# 当程序自己独立运行时执行的操作
# 打印版本信息
print(('模块名:%s - %s\n'
'作者:%s\n'
'发布日期:%s\n'
'版本:%s' % (__MOUDLE__, __DESCRIPT__, __AUTHOR__, __PUBLISH__, __VERSION__)))