#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
#
# Copyright 2019 黎慧剑
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
连接池处理框架
@module connection_pool
@file connection_pool.py
"""
import os
import sys
import threading
import time
import asyncio
from logging import Logger
import traceback
from typing import Any
# 根据当前文件路径将包路径纳入, 在非安装的情况下可以引用到
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)))
from HiveNetCore.utils.run_tool import AsyncTools
__MOUDLE__ = 'connection_pool' # 模块名
__DESCRIPT__ = u'连接池处理框架' # 模块描述
__VERSION__ = '0.1.0' # 版本
__AUTHOR__ = u'黎慧剑' # 作者
__PUBLISH__ = '2022.04.22' # 发布日期
[文档]class TooManyConnections(Exception):
"""
连接数已太多抛出的异常
"""
pass
[文档]class AIOConnectionPool(object):
"""
支持异步模式的连接池处理框架
抽象连接池的公共方法形成框架, 并提供基本的处理功能, 简化连接池编程的难度
"""
#############################
# 构造函数
#############################
[文档] def __init__(self, creator: Any, pool_connection_class: Any, args: list = [], kwargs: dict = {}, connect_method_name: str = 'connect',
max_size: int = 100, min_size: int = 0, connect_on_init: bool = False,
blocking: bool = True, blocking_interval: float = 0.1, get_timeout: float = 10, free_idle_time: float = 30,
ping_on_get: bool = False, ping_on_back: bool = False, ping_on_idle: bool = True,
ping_interval: float = 20, ping_args: list = [], ping_kwargs: dict = {},
daemon_interval: float = 0.1, pool_extend_paras: dict = {}, logger: Logger = None):
"""
初始化连接池
@param {Any} creator - 连接创建模块或对象, 结合connect_method_name、args、kwargs创建连接
如果connect_method_name不为None: conn = creator.connect_method_name(*args, **kwargs)
如果connect_method_name为None: conn = creator(*args, **kwargs)
@param {Any} pool_connection_class - 连接池连接对象实现类(继承PoolConnectionFW的类对象)
@param {list} args=[] - 进行连接创建的固定位置参数
@param {dict} kwargs={} - 进行连接创建的kv参数
@param {str} connect_method_name='connect' - 连接创建模块要执行的连接方法, 传None代表直接使用creator创建连接
@param {int} max_size=100 - 连接池的最大连接数
@param {int} min_size=0 - 连接池中最少保持的连接数(空闲也不删除)
@param {bool} connect_on_init=False - 是否在初始化时创建一个连接
@param {bool} blocking=True - 当获取不到连接时是否阻塞等待, 如果为False则代表直接抛出异常
@param {float} blocking_interval=0.1 - 获取连接阻塞时的循环间隔时长, 单位为秒
@param {float} get_timeout=10 - 等待连接获取的超时时间, 单位为秒, 0或None代表永不超时
@param {float} free_idle_time=30 - 释放空闲连接的时间, 单位为秒, 0或None代表永不释放
@param {bool} ping_on_get=False - 是否在外部获取连接时先检查连接是否有效(注: 在ping_interval时间内不会检查)
@param {bool} ping_on_back=False - 是否在外部发挥连接时检查连接是否有效(注: 在ping_interval时间内不会检查)
@param {bool} ping_on_idle=True - 是否对空闲连接进行是否有效的检查
@param {float} ping_interval=20 - 空闲连接有效检查的间隔时长, 单位为秒
@param {list} ping_args=[] - 检查连接有效性的函数固定位置参数
@param {dict} ping_kwargs={} - 检查连接有效性的函数kv参数
@param {float} daemon_interval=1 - 守护程序的循环间隔时长, 单位为秒
@param {dict} pool_extend_paras={} - 连接池的扩展参数, 可传递到连接对象使用的个性参数
@param {Logger} logger=None - 日志对象
"""
# 进行参数处理
self._creator = creator # 该变量直接就是连接方法
self._pool_connection_class = pool_connection_class
self._args = args
self._kwargs = kwargs
self._connect_method_name = connect_method_name
if self._connect_method_name is not None:
self._creator = getattr(creator, self._connect_method_name)
self._min_size = max(0, min_size)
self._max_size = max(1, min_size, max_size)
self._blocking = blocking
self._blocking_interval = blocking_interval
self._get_timeout = 0 if get_timeout is None else max(0, get_timeout)
self._free_idle_time = 0 if free_idle_time is None else max(0, free_idle_time)
self._ping_on_get = ping_on_get
self._ping_on_back = ping_on_back
self._ping_on_idle = ping_on_idle
self._ping_interval = 0 if ping_interval is None else max(0, ping_interval)
self._ping_args = ping_args
self._ping_kwargs = ping_kwargs
self._daemon_interval = daemon_interval
self._pool_extend_paras = pool_extend_paras
self._logger = logger
# 内部的控制变量
self._is_closed = False # 指示连接池被关闭的标识
self._lock = threading.RLock() # 控制连接对象获取的多线程锁
self._size = 0 # 当前线程池的总线程数
self._conn_cached = [] # 空闲连接缓存数组
# 线程池的守护线程, 处理线程检查、释放等处理
if self._free_idle_time > 0 or self._ping_on_idle:
self._daemon_thread = threading.Thread(
target=self.__start_daemon_thread_fun,
args=(1,),
name='DaemonThread-ConnectionPool'
)
self._daemon_thread.setDaemon(True)
self._daemon_running = True
self._daemon_thread.start()
# 初始化连接
if connect_on_init:
self._lock.acquire()
try:
_conn = AsyncTools.sync_run_coroutine(self._create_connection())
self._conn_cached.append(_conn)
self._size += 1
finally:
self._lock.release()
#############################
# 属性
#############################
@property
def current_size(self):
"""
获取连接池当前大小
@property {int} - 连接池当前大小
"""
return self._size
#############################
# 公共函数
#############################
[文档] async def close(self):
"""
关闭连接池
"""
self._is_closed = True
# 释放所有连接
while self._size > 0:
self._lock.acquire()
try:
_conn = self._conn_cached.pop()
self._size -= 1
# 关闭连接
try:
await _conn._final_close()
except:
pass
except IndexError:
# 没有空闲的连接, 等待下一次获取
await asyncio.sleep(0.1)
finally:
self._lock.release()
[文档] async def connection(self):
"""
获取一个有效连接
"""
_start_time = time.time()
while True:
# 尝试获取连接
self._lock.acquire()
try:
try:
_conn = self._conn_cached.pop()
except IndexError:
# 没有空闲的连接, 尝试创建新连接然后重新获取
_conn = None
else:
# 获取到连接, 进行检查
if self._ping_on_get and self._ping_interval > 0 and (time.time() - _conn.last_ping) >= self._ping_interval:
if not await _conn.ping(*self._ping_args, **self._ping_kwargs):
# 连接已失效, 直接丢弃连接
_conn = None
self._size -= 1
if _conn is not None:
# 返回连接
return _conn
# 判断是否允许创建新连接
if self._size >= self._max_size:
if self._blocking:
if self._get_timeout == 0 or (time.time() - _start_time) < self._get_timeout:
# 等待连接释放
await asyncio.sleep(self._blocking_interval)
continue
else:
raise TooManyConnections('Too many connetions')
else:
# 不阻塞, 直接抛出异常
raise TooManyConnections('Too many connetions')
else:
# 创建一个新连接, 并直接返回
_conn = await self._create_connection()
self._size += 1
return _conn
finally:
self._lock.release()
#############################
# 内部函数
#############################
async def _create_connection(self) -> Any:
"""
创建一个新连接
@returns {Any} - 返回创建的PoolConnectionFW实现类对象
"""
return self._pool_connection_class(
self, self._creator, self._args, self._kwargs
)
[文档] async def back_to_pool(self, conn: Any):
"""
将完成使用的连接归还到连接池
@param {Any} conn - PoolConnectionFW实现对象
"""
self._lock.acquire()
try:
# 是否检查连接有效性
if self._ping_on_back and self._ping_interval > 0 and (time.time() - conn.last_ping) >= self._ping_interval:
if not await conn.ping(*self._ping_args, **self._ping_kwargs):
# 连接已无效
self._size -= 1
return
# 重新放回连接池
self._conn_cached.append(conn)
finally:
self._lock.release()
#############################
# 守护线程
#############################
def __start_daemon_thread_fun(self, tid):
"""
守护线程, 负责检查连接有效性和释放连接
@param {int} tid - 线程id
"""
while True:
if self._is_closed:
# 出现关闭标记,退出守护
break
# 检查需要释放的空闲连接
self._lock.acquire()
try:
_cache_size = len(self._conn_cached)
_remove_count = _cache_size - self._min_size
if self._free_idle_time > 0 and _remove_count > 0:
while _remove_count > 0:
if (time.time() - self._conn_cached[0].last_back) > self._free_idle_time:
# 达到空闲释放时间
_conn = self._conn_cached.pop(0)
self._size -= 1
_remove_count -= 1
# 关闭连接
try:
AsyncTools.sync_run_coroutine(_conn._final_close())
except:
# 记录日志
if self._logger is not None:
self._logger.warning(
'close connection error: %s' % traceback.format_exc()
)
# 继续检查下一个
continue
else:
# 第一个达不到时间, 其他返回的时间肯定更短, 无需再判断
break
except:
pass
finally:
self._lock.release()
# 检查连接的有效性, 从后往前检查, 此外为了避免检查导致获取连接的阻塞, 采用逐个检查的方式
if self._ping_on_idle:
_index = len(self._conn_cached)
while _index > 0:
self._lock.acquire()
try:
if len(self._conn_cached) < _index:
# 当前长度小于要获取的序号, 有可能检查对象被取出了, 继续检查前一个
_index -= 1
continue
_conn = self._conn_cached[_index - 1]
if (time.time() - _conn.last_ping) >= self._ping_interval:
# 需要进行检查
if not AsyncTools.sync_run_coroutine(_conn.ping(*self._ping_args, **self._ping_kwargs)):
# 连接已失效, 直接从连接池取出
self._conn_cached.pop(_index - 1)
self._size -= 1
except:
# 记录日志
if self._logger is not None:
self._logger.warning(
'connection ping error: %s' % traceback.format_exc()
)
finally:
self._lock.release()
# 继续检查下一个
_index -= 1
# 等待下一次处理
AsyncTools.sync_run_coroutine(asyncio.sleep(self._daemon_interval))
[文档]class PoolConnectionFW(object):
"""
连接池的通用连接对象框架(封装实际的连接对象)
"""
#############################
# 构造函数
#############################
[文档] def __init__(self, pool: AIOConnectionPool, creator: Any, args: list = [], kwargs: dict = {},
logger: Logger = None):
"""
构造函数
@param {AIOConnectionPool} pool - 创建该连接的连接池对象
@param {Any} creator - 连接创建模块或对象, 结合connect_method_name、args、kwargs创建连接
如果connect_method_name不为None: conn = creator.connect_method_name(*args, **kwargs)
如果connect_method_name为None: conn = creator(*args, **kwargs)
@param {list} args=[] - 进行连接创建的固定位置参数
@param {dict} kwargs={} - 进行连接创建的kv参数
@param {Logger} logger=None - 日志对象
"""
self._pool = pool
self._logger = logger
# 创建连接
self._conn = AsyncTools.sync_run_coroutine(
creator(*args, **kwargs)
)
self.last_ping = time.time() # 记录上次检查的时间
self.last_back = time.time() # 记录上次返回连接池的时间
#############################
# 通过重写__getattr__把真实连接对象的属性和函数绑定在当前类
#############################
def __getattr__(self, name):
"""
获取函数的属性和函数
@param {str} name - 属性名
"""
return getattr(self._conn, name)
#############################
# 通用函数
#############################
[文档] async def ping(self, *args, **kwargs) -> bool:
"""
连接有效性的检查函数
@returns {bool} - 返回检查结果
"""
try:
_ping_result = await AsyncTools.async_run_coroutine(self._real_ping(*args, **kwargs))
if _ping_result:
self.last_ping = time.time()
return _ping_result
except:
# 出现异常视为失败
if self._logger is not None:
self._logger.warning(
'pool_connection ping error: %s' % traceback.format_exc()
)
return False
[文档] async def close(self):
"""
关闭连接
"""
# 不是真实关闭, 而是返回连接池
self._conn = await AsyncTools.async_run_coroutine(self._fade_close())
await self._pool.back_to_pool(self)
#############################
# 内部函数
#############################
async def _final_close(self):
"""
真正进行连接的关闭
"""
await AsyncTools.async_run_coroutine(
self._real_close()
)
#############################
# 需要继承类实现的函数
#############################
async def _real_ping(self, *args, **kwargs) -> bool:
"""
实现类的真实检查连接对象是否有效的的函数
@returns {bool} - 返回检查结果
"""
raise NotImplementedError()
async def _fade_close(self) -> Any:
"""
实现类提供的虚假关闭函数
注1: 不关闭连接, 只是清空上一个连接使用的上下文信息(例如数据库连接进行commit或rollback处理)
注2: 如果必须关闭真实连接, 则可以关闭后创建一个新连接返回
@returns {Any} - 返回原连接或新创建的连接
"""
raise NotImplementedError()
async def _real_close(self):
"""
实现类提供的真实关闭函数
"""
raise NotImplementedError()
if __name__ == '__main__':
# 当程序自己独立运行时执行的操作
# 打印版本信息
print(('模块名: %s - %s\n'
'作者: %s\n'
'发布日期: %s\n'
'版本: %s' % (__MOUDLE__, __DESCRIPT__, __AUTHOR__, __PUBLISH__, __VERSION__)))