HiveNetNoSql.base.driver_fw 源代码

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
# Copyright 2022 黎慧剑
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
nosql数据库驱动基础框架
注: 实现类可以通过bson库的objectid模块自动生成与mongodb匹配的ObjectID

@module driver_fw
@file driver_fw.py
"""
import os
import sys
import logging
import copy
import traceback
from bson.objectid import ObjectId
from typing import Union, Any
from HiveNetCore.yaml import SimpleYaml, EnumYamlObjType
from HiveNetCore.utils.run_tool import AsyncTools
from HiveNetCore.utils.test_tool import TestTool
from HiveNetCore.connection_pool import AIOConnectionPool
# 根据当前文件路径将包路径纳入, 在非安装的情况下可以引用到
sys.path.append(os.path.abspath(os.path.join(
    os.path.dirname(__file__), os.path.pardir, os.path.pardir)))


[文档]class NosqlDriverFW(object): """ nosql数据库驱动框架 """ ############################# # 构造函数 #############################
[文档] def __init__(self, connect_config: dict = {}, pool_config: dict = {}, driver_config: dict = {}): """ 初始化驱动 @param {dict} connect_config={} - 数据库的连接参数 host {str} - 连接数据库的ip或uri, 如果使用uri方式, 其他的连接参数不生效 port {int} - 连接数据库的端口(可选) usedb {str} - 登录后默认切换到的数据库(可选), 如果不传使用登录后的默认数据库 username {str} - 登录验证用户(可选) password {str} - 登录验证密码(可选) dbname {str} - 登录用户的数据库名(可选) connect_on_init {bool} - 是否启动时直接连接数据库, 默认为False(等待第一次操作再连接) connect_timeout {float} - 连接数据库的超时时间, 单位为秒, 默认为20 ...驱动实现类自定义支持的参数 @param {dict} pool_config={} - 连接池配置 max_size {int} - 连接池的最大大小, 默认为100 min_size {int} - 连接池维持的最小连接数量, 默认为0 max_idle_time {float} - 连接被移除前的最大空闲时间, 单位为秒, 默认为None wait_queue_timeout {float} - 在没有空闲连接的时候, 请求连接所等待的超时时间, 单位为秒, 默认为None(不超时) ...驱动实现类自定义支持的参数 @param {dict} driver_config={} - 驱动配置 init_db {dict} - 要在启动驱动时创建的数据库 { '数据库名': { 'index_only': False, # 是否仅用于索引, 不创建 'comment': '', # 数据库注释 'args': [], # 创建数据库的args参数 'kwargs': {} #创建数据库的kwargs参数 }, ... } init_collections {dict} - 要在启动驱动时创建的集合(表) { '数据库名': { '集合名': { 'index_only': False, # 是否仅用于索引, 不创建 'comment': '', # 集合注释 'indexs': {索引字典}, 'fixed_col_define': {固定字段定义} }, ... }, ... } init_yaml_file {str} - 要在启动时创建的数据库和集合(表)配置yaml文件 注1: 该参数用于将init_db和init_collections参数内容放置的配置文件中, 如果参数有值则忽略前面两个参数 注2: 配置文件为init_db和init_collections两个字典, 内容与这两个参数一致 logger {Logger} - 传入驱动的日志对象 """ raise NotImplementedError()
def __del__(self): """ 注销对象, 应关闭数据库连接 """ AsyncTools.sync_run_coroutine(self.destroy()) ############################# # 主动销毁驱动 #############################
[文档] async def destroy(self): """ 主动销毁驱动(连接) """ raise NotImplementedError()
############################# # 通用属性 ############################# @property def db_name(self): """ 返回当前数据库名 @property {str} """ raise NotImplementedError() ############################# # 数据库操作 #############################
[文档] async def create_db(self, name: str, *args, **kwargs): """ 创建数据库 注: 创建后会自动切换到该数据库 @param {str} name - 数据库名 """ raise NotImplementedError()
[文档] async def switch_db(self, name: str, *args, **kwargs): """ 切换当前数据库到指定数据库 @param {str} name - 数据库名 """ raise NotImplementedError()
[文档] async def list_dbs(self, *args, **kwargs) -> list: """ 列出数据库清单 @returns {list} - 数据库名清单 """ raise NotImplementedError()
[文档] async def drop_db(self, name: str, *args, **kwargs): """ 删除数据库 @param {str} name - 数据库名 """ raise NotImplementedError()
############################# # 集合操作 #############################
[文档] async def create_collection(self, collection: str, indexs: dict = None, fixed_col_define: dict = None, comment: str = None, **kwargs): """ 创建集合(相当于关系型数据库的表, 如果不存在则创建) 注意: 所有集合都有必须有 '_id' 这个记录的唯一主键字段 @param {str} collection - 集合名(表名) @param {dict} indexs=None - 要创建的索引字典, 格式为: { '索引名': { # 索引的字段清单 'keys': { '字段名': { 'asc': 是否升序(1为升序, -1为降序) }, ... } # 创建参数 'paras': { 'unique': 是否唯一索引(True/False), ...启动实现类自定义支持的参数 } }, } @param {dict} fixed_col_define=None - 固定字段定义(只有固定字段才能进行索引), 格式如下: { '字段名': { 'type': '字段类型(str, int, float, bool, json)', 'len': 字段长度, 'nullable': True, # 是否可空 'default': 默认值, 'comment': '字段注释' }, ... } @param {str} comment=None - 集合注释 @param {kwargs} - 实现驱动自定义支持的参数 """ raise NotImplementedError()
[文档] async def list_collections(self, filter: dict = None, **kwargs) -> list: r""" 获取所有集合(表)清单 @param {dict} filter=None - 查找条件 例如查找所有非'system.'开头的集合: {"name": {"$regex": r"^(?!system\.)"}} @returns {list} - 集合(表)清单 """ raise NotImplementedError()
[文档] async def drop_collection(self, collection: str, *args, **kwargs): """ 删除集合 注: 集合不存在也正常返回 @param {str} collection - 集合名(表名) """ raise NotImplementedError()
[文档] async def turncate_collection(self, collection: str, *args, **kwargs): """ 清空集合记录 @param {str} collection - 集合名(表名) """ raise NotImplementedError()
[文档] async def collections_exists(self, collection: str, *args, **kwargs) -> bool: """ 判断集合(表)是否存在 @param {str} collection - 集合名(表名) @returns {bool} - 是否存在 """ raise NotImplementedError()
############################# # 事务支持 #############################
[文档] async def start_transaction(self, *args, **kwargs) -> Any: """ 启动事务 注: 通过该方法处理事务, 必须显式通过commit_transaction或abort_transaction关闭事务 @returns {Any} - 返回事务所在的连接(session) """ raise NotImplementedError()
[文档] async def commit_transaction(self, session, *args, **kwargs): """ 提交事务 @param {Any} session=None - 启动事务的连接(session) """ raise NotImplementedError()
[文档] async def abort_transaction(self, session, *args, **kwargs): """ 回滚事务 @param {Any} session=None - 启动事务的连接(session) """ raise NotImplementedError()
############################# # 数据操作 #############################
[文档] async def insert_one(self, collection: str, row: dict, session: Any = None, **kwargs) -> str: """ 插入一条记录 @param {str} collection - 集合(表) @param {dict} row - 行记录字典 注: 每个记录可以通过'_id'字段指定该记录的唯一主键, 如果不送入, 将自动生成一个唯一主键 @param {Any} session=None - 指定事务连接对象 @returns {str} - 返回所插入记录的 _id 字段值 """ raise NotImplementedError()
[文档] async def insert_many(self, collection: str, rows: list, session: Any = None, **kwargs) -> int: """ 插入多条记录 @param {str} collection - 集合(表) @param {list} rows - 行记录数组 注: 每个记录可以通过'_id'字段指定该记录的唯一主键, 如果不送入, 将自动生成一个唯一主键 @param {Any} session=None - 指定事务连接对象 @returns {int} - 返回插入的记录数量 """ raise NotImplementedError()
[文档] async def update(self, collection: str, filter: dict, update: dict, multi: bool = True, upsert: bool = False, hint: dict = None, session: Any = None, **kwargs) -> int: """ 更新找到的记录 @param {str} collection - 集合(表) @param {dict} filter - 查询条件字典, 与mongodb的查询条件设置参数一致 @param {dict} update - 更新信息字典, 与mongodb的更新设置参数一致, 参考如下: {'$set': {'name': 'myname', ...}}: name='myname', 设置某个字段的值 {'$inc': {'age': 3, ...}} : age = age + 3, 对数字类型字段, 在现有值上增加指定数值 {'$mul': {'age': 2, ...}} : age = age * 2, 对数字类型字段, 在现有值上乘以指定数值 {'$min': {'age': 10, ...}} : age = min(age, 10), 将现有值和给出值比较, 设置为小的值 {'$max': {'age': 10, ...}} : age = max(age, 10), 将现有值和给出值比较, 设置为大的值 {'$unset': {'job': 1}}: job=null, 删除指定字段 {'$rename': {'old_name': 'new_name', ...}}: 将字段名修改为新字段名 @param {bool} multi=True - 是否更新全部找到的记录, 如果为Fasle只更新找到的第一条记录 @param {bool} upsert=False - 指定如果记录不存在是否插入 @param {dict} hint=None - 指定查询使用索引的名字清单 @param {Any} session=None - 指定事务连接对象 @returns {int} - 返回更新的数据条数 """ raise NotImplementedError()
[文档] async def delete(self, collection: str, filter: dict, multi: bool = True, hint: dict = None, session: Any = None, **kwargs) -> int: """ 删除指定记录 @param {str} collection - 集合(表) @param {dict} filter - 查询条件字典, 与mongodb的查询条件设置参数一致 @param {bool} multi=True - 是否更新全部找到的记录, 如果为Fasle只更新找到的第一条记录 @param {dict} hint=None - 指定查询使用索引的名字清单 @param {Any} session=None - 指定事务连接对象 @returns {int} - 删除记录数量 """ raise NotImplementedError()
############################# # 数据查询 #############################
[文档] async def query_list(self, collection: str, filter: dict = None, projection: Union[dict, list] = None, sort: list = None, skip: int = None, limit: int = None, hint: dict = None, left_join: list = None, session: Any = None, **kwargs) -> list: """ 查询记录(直接返回清单) @param {str} collection - 集合(表) @param {dict} filter=None - 查询条件字典, 与mongodb的查询条件设置方法一样, 参考如下: {} : 查询全部记录 {'id': 'info', 'ver': '0.0.1'} : where id = 'info' and 'ver' = '0.0.1' {'ver': {'$lt': '0.0.1'}} : where ver < '0.0.1' 注: $lt - 小于, $lte - 小于或等于, $gt - 大于, $gte - 大于或等于, $ne - 不等于 {'id': {'$gt':50}, '$or': [{'name': 'lhj'},{'title': 'book'}]} : where id > 50 and (name='lhj' or 'title' = 'book') {'name': {'$regex': 'likestr'}} : where name like '%likestr%', 正则表达式 {'name': {'$in': ['a', 'b', 'c']}} : where name in ('a', 'b', 'c') {'name': {'$nin': ['a', 'b', 'c']}} : where name not in ('a', 'b', 'c') {'col_json.sub_col': 'test'}: 查询json字段的指定字典key, 可以支持多级 {'col_json.0': 'test'}: 查询json字段的指定数组索引, 可以支持多级 注: 可以在字段名前面加 "#序号." 用于与left_join参数配合使用, 指定当前排序字段所属的关联表索引(序号从0开始) @param {dict|list} projection=None - 指定结果返回的字段信息 列表模式: ['col1','col2', ...] 注意: 该模式一定会返回 _id 这个主键 字典模式: {'_id': False, 'col1': True, ...} 该方式可以通过设置False屏蔽 _id 的返回 注1: 只有 _id 字段可以设置为False, 其他字段不可设置为False(如果要屏蔽可以不放入字典) 注2: 可以通过字典模式的值设置为$开头的字段名或json检索路径的方式, 进行字段别名处理, 例如{'as_name': '$real_name'}或{'as_name': '$real_name.key.key'} 注3: 可以在字段名前面加 "#序号." 用于与left_join参数配合使用, 指定当前排序字段所属的关联表索引(序号从0开始), 例如{'#0.col1': True, 'as_name': '$#0.col2'} @param {list} sort=None - 查询结果的排序方式 例: [('col1', 1), ('#0.join_col1', -1)...] 注1: 参数的第1个值可以支持'col1.key1'的方式指定json值进行排序 注2: 参数的第2个值指定是否升序(1为升序, -1为降序) 注3: 可以在字段名前面加 "#序号." 用于与left_join参数配合使用, 指定当前排序字段所属的关联表索引(序号从0开始) @param {int} skip=None - 指定跳过返回结果的前面记录的数量 @param {int} limit=None - 指定限定返回结果记录的数量 @param {dict} hint=None - 指定查询使用索引的名字清单 例: {'index_name1': 1, 'index_name2': 1} @param {list} left_join=None - 指定左关联(left outer join)集合信息, 每个数组为一个关联表, 格式如下: [ { 'db_name': '指定集合的db', # 如果不设置则代表和主表是同一个数据库 'collection': '要关联的集合(表)名', 'as': '关联后的别名', # 如果不设置默认为集合名 'join_fields': [(主表字段名, 关联表字段名), ...], # 要关联的字段列表, 仅支持完全相等的关联条件 'filter': ..., # 关联表数据的过滤条件(仅用于内部过滤需要关联的数据), 注意字段无需添加集合的别名 }, ... ] @param {Any} session=None - 指定事务连接对象 @returns {list} - 返回的结果列表 """ raise NotImplementedError()
[文档] async def query_iter(self, collection: str, filter: dict = None, projection: Union[dict, list] = None, sort: list = None, skip: int = None, limit: int = None, hint: dict = None, fetch_each: int = 1, left_join: list = None, session: Any = None, **kwargs): """ 查询记录(通过迭代对象依次返回) @param {str} collection - 集合(表) @param {dict} filter=None - 查询条件字典, 与mongodb的查询条件设置方法一样, 参考如下: {} : 查询全部记录 {'id': 'info', 'ver': '0.0.1'} : where id = 'info' and 'ver' = '0.0.1' {'ver': {$lt: '0.0.1'}} : where ver < '0.0.1' 注: $lt - 小于, $lte - 小于或等于, $gt - 大于, $gte - 大于或等于, $ne - 不等于 {'id': {$gt:50}, $or: [{'name': 'lhj'},{'title': 'book'}]} : where id > 50 and (name='lhj' or 'title' = 'book') {'name': {'$regex': 'likestr'}} : where name like '%likestr%', 正则表达式 {'name': {'$in': ['a', 'b', 'c']}} : where name in ('a', 'b', 'c') {'name': {'$nin': ['a', 'b', 'c']}} : where name not in ('a', 'b', 'c') {'col_json.sub_col': 'test'}: 查询json字段的指定字典key, 可以支持多级 {'col_json.0': 'test'}: 查询json字段的指定数组索引, 可以支持多级 注: 可以在字段名前面加 "#序号." 用于与left_join参数配合使用, 指定当前排序字段所属的关联表索引(序号从0开始) @param {dict|list} projection=None - 指定结果返回的字段信息 列表模式: ['col1','col2', ...] 注意: 该模式一定会返回 _id 这个主键 字典模式: {'_id': False, 'col1': True, ...} 该方式可以通过设置False屏蔽 _id 的返回 注1: 只有 _id 字段可以设置为False, 其他字段不可设置为False(如果要屏蔽可以不放入字典) 注2: 可以通过字典模式的值设置为$开头的字段名或json检索路径的方式, 进行字段别名处理, 例如{'as_name': '$real_name'}或{'as_name': '$real_name.key.key'} 注3: 可以在字段名前面加 "#序号." 用于与left_join参数配合使用, 指定当前排序字段所属的关联表索引(序号从0开始) @param {list} sort=None - 查询结果的排序方式 例: [('col1', 1), ('#0.join_col1', -1)...] 注1: 参数的第1个值可以支持'col1.key1'的方式指定json值进行排序 注2: 参数的第2个值指定是否升序(1为升序, -1为降序) 注3: 可以在字段名前面加 "#序号." 用于与left_join参数配合使用, 指定当前排序字段所属的关联表索引(序号从0开始) @param {int} skip=None - 指定跳过返回结果的前面记录的数量 @param {int} limit=None - 指定限定返回结果记录的数量 @param {dict} hint=None - 指定查询使用索引的名字清单 例: {'index_name1': 1, 'index_name2': 1} @param {int} fetch_each=1 - 每次获取返回的记录数量 @param {list} left_join=None - 指定左关联(left outer join)集合信息, 每个数组为一个关联表, 格式如下: [ { 'db_name': '指定集合的db', # 如果不设置则代表和主表是同一个数据库 'collection': '要关联的集合(表)名', 'as': '关联后的别名', # 如果不设置默认为集合名 'join_fields': [(主表字段名, 关联表字段名), ...], # 要关联的字段列表, 仅支持完全相等的关联条件 'filter': ..., # 关联表数据的过滤条件(仅用于内部过滤需要关联的数据), 注意字段无需添加集合的别名 }, ... ] @param {Any} session=None - 指定事务连接对象 @returns {list} - 返回的结果列表迭代器 """ raise NotImplementedError()
[文档] async def query_count(self, collection: str, filter: dict = None, skip: int = None, limit: int = None, hint: dict = None, overtime: float = None, left_join: list = None, session: Any = None, **kwargs) -> int: """ 获取匹配查询条件的结果数量 @param {str} collection - 集合(表) @param {dict} filter=None - 查询条件字典, 与mongodb的查询条件设置方法一样, @param {int} skip=None - 指定跳过返回结果的前面记录的数量 @param {int} limit=None - 指定限定返回结果记录的数量 @param {dict} hint=None - 指定查询使用索引的名字清单 @param {float} overtime=None - 指定操作的超时时间, 单位为秒 @param {list} left_join=None - 指定左关联(left outer join)集合信息, 每个数组为一个关联表, 格式如下: [ { 'db_name': '指定集合的db', # 如果不设置则代表和主表是同一个数据库 'collection': '要关联的集合(表)名', 'as': '关联后的别名', # 如果不设置默认为集合名 'join_fields': [(主表字段名, 关联表字段名), ...], # 要关联的字段列表, 仅支持完全相等的关联条件 'filter': ..., # 关联表数据的过滤条件(仅用于内部过滤需要关联的数据), 注意字段无需添加集合的别名 }, ... ] @param {Any} session=None - 指定事务连接对象 @returns {int} - 返回查询条件匹配的记录数 """ raise NotImplementedError()
[文档] async def query_group_by(self, collection: str, group: dict = None, filter: dict = None, projection: Union[dict, list] = None, sort: list = None, overtime: float = None, session: Any = None, **kwargs) -> list: """ 获取记录聚合统计的结果 @param {str} collection - 集合(表) @param {dict} group=None - 分组返回设置字典(注意与mongodb的_id要求有所区别) 指定分组字段为col1、col2, 聚合字段为count、pay_amt, 其中pay_amt统计col_pay字段的合计数值 {'id': '$col1', 'name': '$col2', 'count': {'$sum': 1}, 'pay_amt': {'$sum': '$col_pay'}} 常见的聚合类型: $sum-计算总和, $avg-计算平均值, $min-取最小值, $max-取最大值, $first-取第一条, $last-取最后一条 @param {dict} filter=None - 查询条件字典, 与mongodb的查询条件设置方法一样 @param {dict|list} projection=None - 指定结果返回的字段信息(指统计后的结果) @param {list} sort=None - 查询结果的排序方式(注意排序字段为返回结果的分组字段, 而不是表的原始字段) @param {float} overtime=None - 指定操作的超时时间, 单位为秒 @param {Any} session=None - 指定事务连接对象 @returns {list} - 返回结果列表 """ raise NotImplementedError()
[文档] async def query_page_info(self, collection: str, page_size: int = 15, filter: dict = None, hint: dict = None, left_join: list = None, session: Any = None, **kwargs) -> dict: """ 查询分页信息字典 @param {str} collection - 集合(表) @param {int} page_size=15 - 每页大小 @param {dict} filter=None - 查询条件字典, 与mongodb的查询条件设置方法一样, 参考如下: {} : 查询全部记录 {'id': 'info', 'ver': '0.0.1'} : where id = 'info' and 'ver' = '0.0.1' {'ver': {'$lt': '0.0.1'}} : where ver < '0.0.1' 注: $lt - 小于, $lte - 小于或等于, $gt - 大于, $gte - 大于或等于, $ne - 不等于 {'id': {'$gt':50}, '$or': [{'name': 'lhj'},{'title': 'book'}]} : where id > 50 and (name='lhj' or 'title' = 'book') {'name': {'$regex': 'likestr'}} : where name like '%likestr%', 正则表达式 注: 可以在字段名前面加 "#序号." 用于与left_join参数配合使用, 指定当前排序字段所属的关联表索引(序号从0开始) @param {dict} hint=None - 指定查询使用索引的名字清单 例: {'index_name1': 1, 'index_name2': 1} @param {list} left_join=None - 指定左关联(left outer join)集合信息, 每个数组为一个关联表, 格式如下: [ { 'db_name': '指定集合的db', # 如果不设置则代表和主表是同一个数据库 'collection': '要关联的集合(表)名', 'as': '关联后的别名', # 如果不设置默认为集合名 'join_fields': [(主表字段名, 关联表字段名), ...], # 要关联的字段列表, 仅支持完全相等的关联条件 'filter': ..., # 关联表数据的过滤条件(仅用于内部过滤需要关联的数据), 注意字段无需添加集合的别名 }, ... ] @param {Any} session=None - 指定事务连接对象 @returns {dict} - 返回的分页信息 { 'total': ?, # 记录总数 'total_pages': ?, # 分页数 'page_size': ? # 每页大小 } """ # 获取记录总数 _count = await self.query_count( collection, filter=filter, hint=hint, left_join=left_join, session=session ) # 组成返回字典 return { 'total': _count, 'total_pages': int((_count + page_size - 1) / page_size), 'page_size': page_size }
[文档] async def query_page(self, collection: str, page_index: int = 1, page_size: int = 15, filter: dict = None, projection: Union[dict, list] = None, sort: list = None, hint: dict = None, left_join: list = None, session: Any = None, **kwargs) -> list: """ 查询分页记录(直接返回清单) @param {str} collection - 集合(表) @param {int} page_index=1 - 分页位置, 从1开始算 @param {int} page_size=15 - 每页大小 @param {dict} filter=None - 查询条件字典, 与mongodb的查询条件设置方法一样, 参考如下: {} : 查询全部记录 {'id': 'info', 'ver': '0.0.1'} : where id = 'info' and 'ver' = '0.0.1' {'ver': {'$lt': '0.0.1'}} : where ver < '0.0.1' 注: $lt - 小于, $lte - 小于或等于, $gt - 大于, $gte - 大于或等于, $ne - 不等于 {'id': {'$gt':50}, '$or': [{'name': 'lhj'},{'title': 'book'}]} : where id > 50 and (name='lhj' or 'title' = 'book') {'name': {'$regex': 'likestr'}} : where name like '%likestr%', 正则表达式 注: 可以在字段名前面加 "#序号." 用于与left_join参数配合使用, 指定当前排序字段所属的关联表索引(序号从0开始) @param {dict|list} projection=None - 指定结果返回的字段信息 列表模式: ['col1','col2', ...] 注意: 该模式一定会返回 _id 这个主键 字典模式: {'_id': False, 'col1': True, ...} 该方式可以通过设置False屏蔽 _id 的返回 注意: 只有 _id 字段可以设置为False, 其他字段不可设置为False(如果要屏蔽可以不放入字典) 注1: 只有 _id 字段可以设置为False, 其他字段不可设置为False(如果要屏蔽可以不放入字典) 注2: 可以通过字典模式的值设置为$开头的字段名或json检索路径的方式, 进行字段别名处理, 例如{'as_name': '$real_name'}或{'as_name': '$real_name.key.key'} 注3: 可以在字段名前面加 "#序号." 用于与left_join参数配合使用, 指定当前排序字段所属的关联表索引(序号从0开始), 例如{'#0.col1': True, 'as_name': '$#0.col2'} @param {list} sort=None - 查询结果的排序方式 例: [('col1', 1), ('#0.join_col1', -1)...] 注1: 参数的第1个值可以支持'col1.key1'的方式指定json值进行排序 注2: 参数的第2个值指定是否升序(1为升序, -1为降序) 注3: 可以在字段名前面加 "#序号." 用于与left_join参数配合使用, 指定当前排序字段所属的关联表索引(序号从0开始) @param {dict} hint=None - 指定查询使用索引的名字清单 例: {'index_name1': 1, 'index_name2': 1} @param {list} left_join=None - 指定左关联(left outer join)集合信息, 每个数组为一个关联表, 格式如下: [ { 'db_name': '指定集合的db', # 如果不设置则代表和主表是同一个数据库 'collection': '要关联的集合(表)名', 'as': '关联后的别名', # 如果不设置默认为集合名 'join_fields': [(主表字段名, 关联表字段名), ...], # 要关联的字段列表, 仅支持完全相等的关联条件 'filter': ..., # 关联表数据的过滤条件(仅用于内部过滤需要关联的数据), 注意字段无需添加集合的别名 }, ... ] @param {Any} session=None - 指定事务连接对象 @returns {list} - 返回的分页的结果列表 """ return await self.query_list( collection, filter=filter, projection=projection, sort=sort, skip=(page_index - 1) * page_size, limit=page_size, hint=hint, left_join=left_join, session=session )
############################# # 原生命令执行 #############################
[文档] async def run_native_cmd(self, *args, **kwargs): """ 执行原生命令(或SQL)并返回执行结果 注: 该函数不支持驱动的兼容处理 """ raise NotImplementedError()
############################# # 数据库及集合辅助索引 #############################
[文档] def init_index_extend_dbs(self, dbs: dict): """ 在初始化索引参数中扩展数据库索引信息 @param {dict} dbs - 要扩展的数据库信息字典(注: 仅用于索引, 不创建实际数据库) { '数据库名': { 'comment': '', # 数据库注释 'args': [], # 创建数据库的args参数 'kwargs': {} #创建数据库的kwargs参数 } } """ pass
[文档] def init_index_extend_collections(self, collections: dict): """ 在初始化索引参数中扩展集合索引信息 @param {dict} collections - 要扩展的集合信息字典(注: 仅用于索引, 不创建实际数据库) { '数据库名': { '集合名': { 'comment': '', # 集合注释 'indexs': {索引字典}, 'fixed_col_define': {固定字段定义} } ... }, ... } """ pass
[文档]class NosqlAIOPoolDriver(NosqlDriverFW): """ nosql数据库驱动使用关系型数据库连接池AIOConnectionPool的基础类 可以尝试整合的异步数据库驱动包括: asyncpg, aiopg, aiomysql, asyncmy, aiosqlite 为兼容mongodb的数据存储, 数据表的设计统一为(以集合名t_demo为例): t_demo(_id varchar, ...其他固定索引字段, nosql_driver_extend_tags json) _id为唯一主键, 可通过bson库的objectid模块自动生成 固定字段, 可用于查询条件的字段, 注意对顺序并无要求 nosql_driver_extend_tags, 存放其他扩展信息的字段(尽可能使用支持json的数据库类型, 以支持查询和更新等操作) 注: 对于原生数据库驱动没有连接池管理情况, 建议基于本基础类实现, 无需自行处理连接池的功能 """ ############################# # 静态工具函数 - 通用查询结果比较函数 #############################
[文档] @classmethod def cmp_func_equal_value(cls, query_ret, cmp_val: Any) -> bool: """ 比较函数-与查询结果第1行第1个值与比较值相等 @param {list} query_ret - 查询返回结果 @param {Any} cmp_val - 比较值 @returns {bool} - 比较结果(True代表通过) """ if query_ret is None or len(query_ret) == 0: return False _type = type(cmp_val) _ret_val = list(query_ret[0].values())[0] if isinstance(cmp_val, dict): return TestTool.cmp_dict(_ret_val, cmp_val, print_if_diff=False) elif _type in (list, tuple): return TestTool.cmp_list(_ret_val, cmp_val, print_if_diff=False) else: return _ret_val == cmp_val
[文档] @classmethod def cmp_func_equal_row(cls, query_ret, cmp_val: dict) -> bool: """ 比较函数-与查询结果第1行结果比较字典 @param {list} query_ret - 查询返回结果 @param {dict} cmp_val - 比较字典值 @returns {bool} - 比较结果(True代表通过) """ if query_ret is None: if cmp_val is None: return True else: return False else: return TestTool.cmp_dict(query_ret[0], cmp_val, print_if_diff=False)
[文档] @classmethod def cmp_func_lt_value(cls, query_ret, cmp_val: Any) -> bool: """ 比较函数-查询结果第1行第1个值小于比较值 @param {list} query_ret - 查询返回结果 @param {Any} cmp_val - 比较值 @returns {bool} - 比较结果(True代表通过) """ if query_ret is None or len(query_ret) == 0: return False _ret_val = list(query_ret[0].values())[0] return _ret_val < cmp_val
[文档] @classmethod def cmp_func_gt_value(cls, query_ret, cmp_val: Any) -> bool: """ 比较函数-查询结果第1行第1个值大于比较值 @param {list} query_ret - 查询返回结果 @param {Any} cmp_val - 比较值 @returns {bool} - 比较结果(True代表通过) """ if query_ret is None or len(query_ret) == 0: return False _ret_val = list(query_ret[0].values())[0] return _ret_val > cmp_val
############################# # 构造函数 #############################
[文档] def __init__(self, connect_config: dict = {}, pool_config: dict = {}, driver_config: dict = {}): """ 初始化驱动 @param {dict} connect_config={} - 数据库的连接参数 host {str} - 连接数据库的ip或uri, 如果使用uri方式, 其他的连接参数不生效 port {int} - 连接数据库的端口(可选) usedb {str} - 登录后默认切换到的数据库(可选), 如果不传使用登录后的默认数据库 username {str} - 登录验证用户(可选) password {str} - 登录验证密码(可选) dbname {str} - 登录用户的数据库名(可选) connect_on_init {bool} - 是否启动时直接连接数据库, 默认为False(等待第一次操作再连接) connect_timeout {float} - 连接数据库的超时时间, 单位为秒, 默认为20 default_str_len {int} - 默认的字符串类型长度, 默认为30 ...驱动实现类自定义支持的参数 transaction_share_cursor {bool} - 进行事务处理是否复用同一个游标对象, 默认为True @param {dict} pool_config={} - 连接池配置 max_size {int} - 连接池的最大大小, 默认为100 min_size {int} - 连接池维持的最小连接数量, 默认为0 max_idle_time {float} - 连接被移除前的最大空闲时间, 单位为秒, 默认为None wait_queue_timeout {float} - 在没有空闲连接的时候, 请求连接所等待的超时时间, 单位为秒, 默认为None(不超时) ...驱动实现类自定义支持的参数 注: 其他参数为AIOConnectionPool所支持的参数 @param {dict} driver_config={} - 驱动配置 init_db {dict} - 要在启动驱动时创建的数据库 { '数据库名': { 'index_only': False, # 是否仅用于索引, 不创建 'comment': '', # 数据库注释 'args': [], # 创建数据库的args参数 'kwargs': {} #创建数据库的kwargs参数 } } init_collections {dict} - 要在启动驱动时创建的集合(表) { '数据库名': { '集合名': { 'index_only': False, # 是否仅用于索引, 不创建 'comment': '', # 集合注释 'indexs': {索引字典}, 'fixed_col_define': {固定字段定义} } ... }, ... } init_yaml_file {str} - 要在启动时创建的数据库和集合(表)配置yaml文件 注1: 该参数用于将init_db和init_collections参数内容放置的配置文件中, 如果参数有值则忽略前面两个参数 注2: 配置文件为init_db和init_collections两个字典, 内容与这两个参数一致 logger {Logger} - 传入驱动的日志对象 ignore_index_error {bool} - 是否忽略索引创建的异常, 默认为True debug {bool} - 指定是否debug模式, 默认为False """ # 指定是否使用insert_many的单独生成语句, Fasle代表使用insert_one逐条插入替代(存在性能问题) self._use_insert_many_generate_sqls = False # 公共参数处理 self._connect_config = copy.deepcopy(connect_config) self._driver_config = copy.deepcopy(driver_config) self._default_str_len = self._connect_config.pop('default_str_len', 30) self._transaction_share_cursor = self._connect_config.pop('transaction_share_cursor', True) self._debug = self._driver_config.get('debug', False) self._ignore_index_error = self._driver_config.get('ignore_index_error', True) self._logger = driver_config.get('logger', None) if self._logger is None: logging.basicConfig() self._logger = logging.getLogger(__name__) if self._debug: self._logger.setLevel(logging.DEBUG) # 获取数据库连接驱动及参数 _creator_infos = self._get_db_creator(self._connect_config, pool_config, driver_config) self._db_name = _creator_infos.get('current_db_name', None) # 连接池设置参数 _pool_config = copy.deepcopy(pool_config) _pool_config.pop('wait_queue_timeout', None) _pool_config['get_timeout'] = pool_config.get('wait_queue_timeout', None) _pool_config.pop('max_idle_time', None) _pool_config['free_idle_time'] = pool_config.get('max_idle_time', None) _pool_config.update(_creator_infos.get('pool_update_config', {})) self._pool = AIOConnectionPool( _creator_infos['creator'], _creator_infos['pool_connection_class'], args=_creator_infos.get('args', []), kwargs=_creator_infos.get('kwargs', {}), connect_method_name=_creator_infos.get('connect_method_name', None), **_pool_config ) # 是否启动时直接连接数据库 if connect_config.get('connect_on_init', False): # 获取连接, 然后关闭, 相当于验证连接 _conn = AsyncTools.sync_run_coroutine(self._get_connection()) AsyncTools.sync_run_coroutine(_conn.close()) # 获取正确的数据库名(利用db_name的属性获取) if self._db_name is None: self._db_name = self.db_name # 启动后创建数据库 if self._driver_config.get('init_yaml_file', None) is None: _init_db = self._driver_config.get('init_db', {}) _init_collections = self._driver_config.get('init_collections', {}) else: _init_yaml = SimpleYaml( self._driver_config['init_yaml_file'], obj_type=EnumYamlObjType.File, encoding='utf-8' ) _init_db = _init_yaml.get_value('init_db', default={}) if _init_db is None: _init_db = {} _init_collections = _init_yaml.get_value('init_collections', default={}) if _init_collections is None: _init_collections = {} _temp_db_name = self._db_name for _name, _db_info in _init_db.items(): if _db_info.get('index_only', False): # 只索引不创建 continue # 创建数据库 AsyncTools.sync_run_coroutine( self.create_db(_name, *_db_info.get('args', []), **_db_info.get('kwargs', {})) ) # 切换回默认数据库 AsyncTools.sync_run_coroutine(self.switch_db(_temp_db_name)) # 启动连接池后驱动需要执行的后处理 AsyncTools.sync_run_coroutine(self._driver_after_init_pool()) # 数据表的字段信息字典, {'数据库名': {'表名': {'cols': [固定字段列表], 'define': {固定字段定义}}}} self._fixed_col_define = {} # 启动驱动时创建集合(表) AsyncTools.sync_run_coroutine( self._init_collections(_init_collections) )
############################# # 主动销毁驱动 #############################
[文档] async def destroy(self): """ 主动销毁驱动(连接) """ await AsyncTools.async_run_coroutine(self._pool.close())
############################# # 通用属性 ############################# @property def db_name(self): """ 返回当前数据库名 @property {str} """ if self._db_name is None: return AsyncTools.sync_run_coroutine(self._get_current_db_name()) else: return self._db_name ############################# # 数据库操作 #############################
[文档] async def create_db(self, name: str, *args, **kwargs): """ 创建数据库 注: 创建后会自动切换到该数据库 @param {str} name - 数据库名 """ _sqls, _sql_paras, _execute_paras, _checks = await AsyncTools.async_run_coroutine( self._generate_sqls('create_db', name, *args, **kwargs) ) await self._execute_sqls( _sqls, paras=_sql_paras, checks=_checks, **_execute_paras ) await self.switch_db(name)
[文档] async def switch_db(self, name: str, *args, **kwargs): """ 切换当前数据库到指定数据库 @param {str} name - 数据库名 """ _sqls, _sql_paras, _execute_paras, _checks = await AsyncTools.async_run_coroutine( self._generate_sqls('switch_db', name) ) await self._execute_sqls( _sqls, paras=_sql_paras, checks=_checks, **_execute_paras ) self._db_name = name
[文档] async def list_dbs(self, *args, **kwargs) -> list: """ 列出数据库清单 @returns {list} - 数据库名清单 """ _sqls, _sql_paras, _execute_paras, _checks = await AsyncTools.async_run_coroutine( self._generate_sqls('list_dbs') ) _ret = await self._execute_sqls( _sqls, paras=_sql_paras, checks=_checks, **_execute_paras ) # 需要将字典形式的列表转换为数据库名列表, 注意查询结果的字段名必须为name return [_db['name'] for _db in _ret]
[文档] async def drop_db(self, name: str, *args, **kwargs): """ 删除数据库 @param {str} name - 数据库名 """ _sqls, _sql_paras, _execute_paras, _checks = await AsyncTools.async_run_coroutine( self._generate_sqls('drop_db', name) ) await self._execute_sqls( _sqls, paras=_sql_paras, checks=_checks, **_execute_paras ) # 切换后判断是不是删除当前数据库 if self._db_name == name: _dbs = await self.list_dbs() if len(_dbs) > 0: await self.switch_db(_dbs[0])
############################# # 集合操作 #############################
[文档] async def create_collection(self, collection: str, indexs: dict = None, fixed_col_define: dict = None, comment: str = None, **kwargs): """ 创建集合(相当于关系型数据库的表, 如果不存在则创建) 注意: 所有集合都有必须有 '_id' 这个记录的唯一主键字段 @param {str} collection - 集合名(表名) @param {dict} indexs=None - 要创建的索引字典, 格式为: { '索引名': { # 索引的字段清单 'keys': { '字段名': { 'asc': 是否升序(1为升序, -1为降序) }, ... } # 创建参数 'paras': { 'unique': 是否唯一索引(True/False), ...驱动实现类自定义支持的参数 } }, } @param {dict} fixed_col_define=None - 固定字段定义(只有固定字段才能进行索引), 格式如下: { '字段名': { 'type': '字段类型(str, int, float, bool, json)', 'len': 字段长度, 'nullable': True, # 是否可空 'default': 默认值, 'comment': '字段注释' }, ... } @param {str} comment=None - 集合注释 @param {kwargs} - 实现驱动自定义支持的参数 """ _sqls, _sql_paras, _execute_paras, _checks = await AsyncTools.async_run_coroutine( self._generate_sqls( 'create_collection', collection, indexs=indexs, fixed_col_define=fixed_col_define, comment=comment, **kwargs ) ) return await self._execute_sqls( _sqls, paras=_sql_paras, checks=_checks, **_execute_paras )
[文档] async def list_collections(self, filter: dict = None, **kwargs) -> list: r""" 获取所有集合(表)清单 @param {dict} filter=None - 查找条件 例如查找所有非'system.'开头的集合: {"name": {"$regex": r"^(?!system\.)"}} @returns {list} - 集合(表)清单 """ _sqls, _sql_paras, _execute_paras, _checks = await AsyncTools.async_run_coroutine( self._generate_sqls('list_collections', filter=filter) ) _ret = await self._execute_sqls( _sqls, paras=_sql_paras, checks=_checks, **_execute_paras ) # 需要将字典形式的列表转换为数据库名列表, 注意查询结果的字段名必须为name return [_tab['name'] for _tab in _ret]
[文档] async def drop_collection(self, collection: str, *args, **kwargs): """ 删除集合 注: 集合不存在也正常返回 @param {str} collection - 集合名(表名) """ _sqls, _sql_paras, _execute_paras, _checks = await AsyncTools.async_run_coroutine( self._generate_sqls('drop_collection', collection) ) # 设置固定的参数 _execute_paras.update({ 'is_query': False, 'commit_on_finished': True, 'rollback_on_exception': True }) return await self._execute_sqls( _sqls, paras=_sql_paras, checks=_checks, **_execute_paras )
[文档] async def turncate_collection(self, collection: str, *args, **kwargs): """ 清空集合记录 @param {str} collection - 集合名(表名) """ _sqls, _sql_paras, _execute_paras, _checks = await AsyncTools.async_run_coroutine( self._generate_sqls('turncate_collection', collection) ) return await self._execute_sqls( _sqls, paras=_sql_paras, checks=_checks, **_execute_paras )
[文档] async def collections_exists(self, collection: str, *args, **kwargs) -> bool: """ 判断集合(表)是否存在 @param {str} collection - 集合名(表名) @returns {bool} - 是否存在 """ _list = await self.list_collections(filter={'name': collection}) return len(_list) > 0
############################# # 事务支持 #############################
[文档] async def start_transaction(self, *args, **kwargs) -> Any: """ 启动事务 注: 通过该方法处理事务, 必须显式通过commit_transaction或abort_transaction关闭事务 @returns {Any} - 返回事务所在的连接(session) """ # 获取新连接和游标, 开始处理事务 _conn = await self._get_connection() _cursor = None if self._transaction_share_cursor: # 复用相同的游标 _cursor = await AsyncTools.async_run_coroutine(_conn.cursor()) return (_conn, _cursor)
[文档] async def commit_transaction(self, session, *args, **kwargs): """ 提交事务 @param {Any} session=None - 启动事务的连接(session) """ _conn = session[0] _cursor = session[1] if _cursor is not None: # 先关闭游标 await AsyncTools.async_run_coroutine(_cursor.close()) # 提交事务 await AsyncTools.async_run_coroutine(_conn.commit()) # 关闭连接 await AsyncTools.async_run_coroutine(_conn.close())
[文档] async def abort_transaction(self, session, *args, **kwargs): """ 回滚事务 @param {Any} session=None - 启动事务的连接(session) """ _conn = session[0] _cursor = session[1] if _cursor is not None: # 先关闭游标 await AsyncTools.async_run_coroutine(_cursor.close()) # 回滚事务 await AsyncTools.async_run_coroutine(_conn.rollback()) # 关闭连接 await AsyncTools.async_run_coroutine(_conn.close())
############################# # 数据操作 #############################
[文档] async def insert_one(self, collection: str, row: dict, session: Any = None, **kwargs) -> str: """ 插入一条记录 @param {str} collection - 集合(表) @param {dict} row - 行记录字典 注: 每个记录可以通过'_id'字段指定该记录的唯一主键, 如果不送入, 将自动生成一个唯一主键 @param {Any} session=None - 指定事务连接对象 @returns {str} - 返回所插入记录的 _id 字段值 """ # 执行连接的固定参数 _upd_execute_paras = { 'commit_on_finished': True, 'rollback_on_exception': True, 'close_cursor': True, 'close_conn': True } # 获取session if session is not None: _conn = session[0] _cursor = session[1] _upd_execute_paras['commit_on_finished'] = False _upd_execute_paras['rollback_on_exception'] = False _upd_execute_paras['close_conn'] = False if _cursor is not None: _upd_execute_paras['close_cursor'] = False else: _conn = None _cursor = None # 处理_id _row = copy.copy(row) # 浅复制即可 _id = _row.get('_id', None) if _id is None: _id = str(ObjectId()) _row['_id'] = _id # 获取固定字段信息 _fixed_col_define = await self._get_fixed_col_define(collection, session=session) # 获取执行sql _sqls, _sql_paras, _execute_paras, _checks = await AsyncTools.async_run_coroutine( self._generate_sqls( 'insert_one', collection, _row, fixed_col_define=_fixed_col_define ) ) _execute_paras.update(_upd_execute_paras) _ret = await self._execute_sqls( _sqls, paras=_sql_paras, checks=_checks, conn=_conn, cursor=_cursor, **_execute_paras ) if _ret == 1: return _id
[文档] async def insert_many(self, collection: str, rows: list, session: Any = None, **kwargs) -> int: """ 插入多条记录 @param {str} collection - 集合(表) @param {list} rows - 行记录数组 注: 每个记录可以通过'_id'字段指定该记录的唯一主键, 如果不送入, 将自动生成一个唯一主键 @param {Any} session=None - 指定事务连接对象 @returns {int} - 返回插入的记录数量 """ # 执行连接的固定参数 _upd_execute_paras = { 'commit_on_finished': True, 'rollback_on_exception': True, 'close_cursor': True, 'close_conn': True } # 获取session if session is not None: _conn = session[0] _cursor = session[1] _upd_execute_paras['commit_on_finished'] = False _upd_execute_paras['rollback_on_exception'] = False _upd_execute_paras['close_conn'] = False if _cursor is not None: _upd_execute_paras['close_cursor'] = False else: _conn = None _cursor = None # 获取固定字段信息 _fixed_col_define = await self._get_fixed_col_define(collection, session=session) if self._use_insert_many_generate_sqls: # 生成插入多条数据的语句 # 获取执行sql _sqls, _sql_paras, _execute_paras, _checks = await AsyncTools.async_run_coroutine( self._generate_sqls( 'insert_many', collection, rows, fixed_col_define=_fixed_col_define ) ) _execute_paras.update(_upd_execute_paras) await self._execute_sqls( _sqls, paras=_sql_paras, checks=_checks, conn=_conn, cursor=_cursor, **_execute_paras ) else: # 通过多次执行insert_one插入 _msqls = [] _msql_paras = [] _mexecute_paras = {} _mchecks = None _mis_query = None for _s_row in rows: # 处理_id _row = copy.copy(_s_row) # 浅复制即可 if _row.get('_id', None) is None: _row['_id'] = str(ObjectId()) _sqls, _sql_paras, _execute_paras, _checks = await AsyncTools.async_run_coroutine( self._generate_sqls( 'insert_one', collection, _row, fixed_col_define=_fixed_col_define ) ) # 语句检查参数 if _checks is not None: if _mchecks is None: _mchecks = [None for _temp in _msqls] # 生成检查列表 _mchecks.extend(_checks) # 是否查询参数 if _execute_paras.get('is_query', None) is not None: _is_query = _execute_paras['is_query'] if type(_is_query) in (list, tuple): # 是列表的情况, 需要扩充 if _mis_query is None: _mis_query = [False for _temp in _msqls] # 生成查询列表 _mis_query.extend(_is_query) elif _is_query: # 最后一个要查询 if _mis_query is not None: _mis_query.extend([False for _temp in _sqls[: -1]]) _mis_query.append(True) else: # 最后一个非查询 if _mis_query is not None: _mis_query.extend([False for _temp in _sqls]) else: # 没有指定, 对于列表情况需要扩充 if _mis_query is not None: _mis_query.extend([False for _temp in _sqls]) _msqls.extend(_sqls) _msql_paras.extend(_sql_paras) _mexecute_paras = _execute_paras _mexecute_paras.update(_upd_execute_paras) if _mis_query is not None: _mexecute_paras['is_query'] = _mis_query await self._execute_sqls( _msqls, paras=_msql_paras, checks=_mchecks, conn=_conn, cursor=_cursor, **_mexecute_paras ) return len(rows)
[文档] async def update(self, collection: str, filter: dict, update: dict, multi: bool = True, upsert: bool = False, hint: dict = None, session: Any = None, **kwargs) -> int: """ 更新找到的记录 @param {str} collection - 集合(表) @param {dict} filter - 查询条件字典, 与mongodb的查询条件设置参数一致 @param {dict} update - 更新信息字典, 与mongodb的更新设置参数一致, 参考如下: {'$set': {'name': 'myname', ...}}: name='myname', 设置某个字段的值 {'$inc': {'age': 3, ...}} : age = age + 3, 对数字类型字段, 在现有值上增加指定数值 {'$mul': {'age': 2, ...}} : age = age * 2, 对数字类型字段, 在现有值上乘以指定数值 {'$min': {'age': 10, ...}} : age = min(age, 10), 将现有值和给出值比较, 设置为小的值 {'$max': {'age': 10, ...}} : age = max(age, 10), 将现有值和给出值比较, 设置为大的值 注: min和max当遇到字段不存在或为null时, 则直接设置为比较值 {'$unset': {'job': 1}}: job=null, 删除指定字段 {'$rename': {'old_name': 'new_name', ...}}: 将字段名修改为新字段名 @param {bool} multi=True - 是否更新全部找到的记录, 如果为Fasle只更新找到的第一条记录 @param {bool} upsert=False - 指定如果记录不存在是否插入 @param {dict} hint=None - 指定查询使用索引的名字清单 @param {Any} session=None - 指定事务连接对象 @param {list|str} partition=None - MySQL, PostgreSQL专有参数, 指定操作的分区 注: MySQL支持送入分区列表名, 例如(p1, s3); PostgreSQL仅支持送入单个分区后缀名, 例如'p1' @returns {int} - 返回更新的数据条数 """ # 执行连接的固定参数 _upd_execute_paras = { 'commit_on_finished': True, 'rollback_on_exception': True, 'close_cursor': True, 'close_conn': True } # 获取session if session is not None: _conn = session[0] _cursor = session[1] _upd_execute_paras['commit_on_finished'] = False _upd_execute_paras['rollback_on_exception'] = False _upd_execute_paras['close_conn'] = False if _cursor is not None: _upd_execute_paras['close_cursor'] = False else: _conn = None _cursor = None # 获取固定字段信息 _fixed_col_define = await self._get_fixed_col_define(collection, session=session) _filter = {} if filter is None else filter _no_match = False if not multi and '_id' not in _filter.keys(): # 只更新一条记录, 但又没有送主键进来, 需要查询记录的主键再更新 _ret = await self.query_list( collection, filter=_filter, projection={'_id': True}, limit=1, hint=hint, session=session ) if len(_ret) == 0: # 没有找到记录 _no_match = True else: # 变更更新的条件, 直接用_id就好 _filter = {'_id': _ret[0]['_id']} if not _no_match: # 执行更新操作 _sqls, _sql_paras, _execute_paras, _checks = await AsyncTools.async_run_coroutine( self._generate_sqls( 'update', collection, _filter, update, multi=multi, upsert=upsert, hint=hint, fixed_col_define=_fixed_col_define, **kwargs ) ) _execute_paras.update(_upd_execute_paras) _ret = await self._execute_sqls( _sqls, paras=_sql_paras, checks=_checks, conn=_conn, cursor=_cursor, **_execute_paras ) if _ret > 0: # 更新成功 return _ret if upsert: # 没有更新成功, 改为用insert_one插入 _row = {} if _filter is not None: for _key, _val in _filter.items(): if _key[0] != '$' and not isinstance(_val, dict): _row[_key] = _val for _op, _para in update.items(): if _op in ('$set', '$inc', '$min', '$max'): _row.update(_para) elif _op == '$mul': # 设置为0 for _key, _val in _para.items(): _row[_key] = 0 await self.insert_one(collection, _row, session=session) return 0 else: # 没有更新成功且无需插入 return 0
[文档] async def delete(self, collection: str, filter: dict, multi: bool = True, hint: dict = None, session: Any = None, **kwargs) -> int: """ 删除指定记录 @param {str} collection - 集合(表) @param {dict} filter - 查询条件字典, 与mongodb的查询条件设置参数一致 @param {bool} multi=True - 是否删除全部找到的记录, 如果为Fasle只删除找到的第一条记录 @param {dict} hint=None - 指定查询使用索引的名字清单 @param {Any} session=None - 指定事务连接对象 @param {list|str} partition=None - MySQL, PostgreSQL专有参数, 指定操作的分区 注: MySQL支持送入分区列表名, 例如(p1, s3); PostgreSQL仅支持送入单个分区后缀名, 例如'p1' @returns {int} - 删除记录数量 """ # 执行连接的固定参数 _upd_execute_paras = { 'commit_on_finished': True, 'rollback_on_exception': True, 'close_cursor': True, 'close_conn': True } # 获取session if session is not None: _conn = session[0] _cursor = session[1] _upd_execute_paras['commit_on_finished'] = False _upd_execute_paras['rollback_on_exception'] = False _upd_execute_paras['close_conn'] = False if _cursor is not None: _upd_execute_paras['close_cursor'] = False else: _conn = None _cursor = None # 获取固定字段信息 _fixed_col_define = await self._get_fixed_col_define(collection, session=session) _filter = {} if filter is None else filter if not multi and '_id' not in _filter.keys(): # 只删除一条记录, 但又没有送主键进来, 需要查询记录的主键再删除 _ret = await self.query_list( collection, filter=_filter, projection={'_id': True}, limit=1, hint=hint, session=session ) if len(_ret) == 0: # 没有找到记录 return 0 else: # 变更更新的条件, 直接用_id就好 _filter = {'_id': _ret[0]['_id']} # 获取并执行删除语句 _sqls, _sql_paras, _execute_paras, _checks = await AsyncTools.async_run_coroutine( self._generate_sqls( 'delete', collection, _filter, multi=multi, hint=hint, fixed_col_define=_fixed_col_define, **kwargs ) ) _execute_paras.update(_upd_execute_paras) return await self._execute_sqls( _sqls, paras=_sql_paras, checks=_checks, conn=_conn, cursor=_cursor, **_execute_paras )
############################# # 数据查询 #############################
[文档] async def query_list(self, collection: str, filter: dict = None, projection: Union[dict, list] = None, sort: list = None, skip: int = None, limit: int = None, hint: dict = None, left_join: list = None, session: Any = None, **kwargs) -> list: """ 查询记录(直接返回清单) @param {str} collection - 集合(表) @param {dict} filter=None - 查询条件字典, 与mongodb的查询条件设置方法一样, 参考如下: {} : 查询全部记录 {'id': 'info', 'ver': '0.0.1'} : where id = 'info' and 'ver' = '0.0.1' {'ver': {'$lt': '0.0.1'}} : where ver < '0.0.1' 注: $lt - 小于, $lte - 小于或等于, $gt - 大于, $gte - 大于或等于, $ne - 不等于 {'id': {'$gt':50}, '$or': [{'name': 'lhj'},{'title': 'book'}]} : where id > 50 and (name='lhj' or 'title' = 'book') {'name': {'$regex': 'likestr'}} : where name like '%likestr%', 正则表达式 {'name': {'$in': ['a', 'b', 'c']}} : where name in ('a', 'b', 'c') {'name': {'$nin': ['a', 'b', 'c']}} : where name not in ('a', 'b', 'c') {'col_json.sub_col': 'test'}: 查询json字段的指定字典key, 可以支持多级 {'col_json.0': 'test'}: 查询json字段的指定数组索引, 可以支持多级 注: 可以在字段名前面加 "#序号." 用于与left_join参数配合使用, 指定当前排序字段所属的关联表索引(序号从0开始) @param {dict|list} projection=None - 指定结果返回的字段信息 列表模式: ['col1','col2', ...] 注意: 该模式一定会返回 _id 这个主键 字典模式: {'_id': False, 'col1': True, ...} 该方式可以通过设置False屏蔽 _id 的返回 注1: 只有 _id 字段可以设置为False, 其他字段不可设置为False(如果要屏蔽可以不放入字典) 注2: 可以通过字典模式的值设置为$开头的字段名或json检索路径的方式, 进行字段别名处理, 例如{'as_name': '$real_name'}或{'as_name': '$real_name.key.key'} 注3: 可以在字段名前面加 "#序号." 用于与left_join参数配合使用, 指定当前排序字段所属的关联表索引(序号从0开始), 例如{'#0.col1': True, 'as_name': '$#0.col2'} @param {list} sort=None - 查询结果的排序方式 例: [('col1', 1), ('#0.join_col1', -1)...] 注1: 参数的第1个值可以支持'col1.key1'的方式指定json值进行排序 注2: 参数的第2个值指定是否升序(1为升序, -1为降序) 注3: 可以在字段名前面加 "#序号." 用于与left_join参数配合使用, 指定当前排序字段所属的关联表索引(序号从0开始) @param {int} skip=None - 指定跳过返回结果的前面记录的数量 @param {int} limit=None - 指定限定返回结果记录的数量 @param {dict} hint=None - 指定查询使用索引的名字清单 例: {'index_name1': 1, 'index_name2': 1} @param {list} left_join=None - 指定左关联(left outer join)集合信息, 每个数组为一个关联表, 格式如下: [ { 'db_name': '指定集合的db', # 如果不设置则代表和主表是同一个数据库 'collection': '要关联的集合(表)名', 'as': '关联后的别名', # 如果不设置默认为集合名 'join_fields': [(主表字段名, 关联表字段名), ...], # 要关联的字段列表, 仅支持完全相等的关联条件 'filter': ..., # 关联表数据的过滤条件(仅用于内部过滤需要关联的数据), 注意字段无需添加集合的别名 }, ... ] @param {Any} session=None - 指定事务连接对象 @param {list|str} partition=None - MySQL, PostgreSQL专有参数, 指定操作的分区 注: MySQL支持送入分区列表名, 例如(p1, s3); PostgreSQL仅支持送入单个分区后缀名, 例如'p1' @returns {list} - 返回的结果列表 """ # 执行连接的固定参数 _upd_execute_paras = { 'commit_on_finished': True, 'rollback_on_exception': True, 'close_cursor': True, 'close_conn': True, 'is_query': True } # 获取session if session is not None: _conn = session[0] _cursor = session[1] _upd_execute_paras['commit_on_finished'] = False _upd_execute_paras['rollback_on_exception'] = False _upd_execute_paras['close_conn'] = False if _cursor is not None: _upd_execute_paras['close_cursor'] = False else: _conn = None _cursor = None # 获取固定字段信息 _fixed_col_define = await self._get_fixed_col_define(collection, session=session) _filter = {} if filter is None else filter _sqls, _sql_paras, _execute_paras, _checks = await AsyncTools.async_run_coroutine( self._generate_sqls( 'query', collection, filter=_filter, projection=projection, sort=sort, skip=skip, limit=limit, hint=hint, left_join=left_join, fixed_col_define=_fixed_col_define, session=session, **kwargs ) ) # 更新执行sql的参数 _execute_paras.update(_upd_execute_paras) return await self._execute_sqls( _sqls, paras=_sql_paras, checks=_checks, conn=_conn, cursor=_cursor, **_execute_paras )
[文档] async def query_iter(self, collection: str, filter: dict = None, projection: Union[dict, list] = None, sort: list = None, skip: int = None, limit: int = None, hint: dict = None, left_join: list = None, fetch_each: int = 1, session: Any = None, **kwargs): """ 查询记录(通过迭代对象依次返回) @param {str} collection - 集合(表) @param {dict} filter=None - 查询条件字典, 与mongodb的查询条件设置方法一样, 参考如下: {} : 查询全部记录 {'id': 'info', 'ver': '0.0.1'} : where id = 'info' and 'ver' = '0.0.1' {'ver': {$lt: '0.0.1'}} : where ver < '0.0.1' 注: $lt - 小于, $lte - 小于或等于, $gt - 大于, $gte - 大于或等于, $ne - 不等于 {'id': {$gt:50}, $or: [{'name': 'lhj'},{'title': 'book'}]} : where id > 50 and (name='lhj' or 'title' = 'book') {'name': {'$regex': 'likestr'}} : where name like '%likestr%', 正则表达式 {'name': {'$in': ['a', 'b', 'c']}} : where name in ('a', 'b', 'c') {'name': {'$nin': ['a', 'b', 'c']}} : where name not in ('a', 'b', 'c') {'col_json.sub_col': 'test'}: 查询json字段的指定字典key, 可以支持多级 {'col_json.0': 'test'}: 查询json字段的指定数组索引, 可以支持多级 注: 可以在字段名前面加 "#序号." 用于与left_join参数配合使用, 指定当前排序字段所属的关联表索引(序号从0开始) @param {dict|list} projection=None - 指定结果返回的字段信息 列表模式: ['col1','col2', ...] 注意: 该模式一定会返回 _id 这个主键 字典模式: {'_id': False, 'col1': True, ...} 该方式可以通过设置False屏蔽 _id 的返回 注1: 只有 _id 字段可以设置为False, 其他字段不可设置为False(如果要屏蔽可以不放入字典) 注2: 可以通过字典模式的值设置为$开头的字段名或json检索路径的方式, 进行字段别名处理, 例如{'as_name': '$real_name'}或{'as_name': '$real_name.key.key'} 注3: 可以在字段名前面加 "#序号." 用于与left_join参数配合使用, 指定当前排序字段所属的关联表索引(序号从0开始), 例如{'#0.col1': True, 'as_name': '$#0.col2'} @param {list} sort=None - 查询结果的排序方式 例: [('col1', 1), ('#0.join_col1', -1)...] 注1: 参数的第1个值可以支持'col1.key1'的方式指定json值进行排序 注2: 参数的第2个值指定是否升序(1为升序, -1为降序) 注3: 可以在字段名前面加 "#序号." 用于与left_join参数配合使用, 指定当前排序字段所属的关联表索引(序号从0开始) @param {int} skip=None - 指定跳过返回结果的前面记录的数量 @param {int} limit=None - 指定限定返回结果记录的数量 @param {dict} hint=None - 指定查询使用索引的名字清单 例: {'index_name1': 1, 'index_name2': 1} @param {int} fetch_each=1 - 每次获取返回的记录数量 @param {list} left_join=None - 指定左关联(left outer join)集合信息, 每个数组为一个关联表, 格式如下: [ { 'db_name': '指定集合的db', # 如果不设置则代表和主表是同一个数据库 'collection': '要关联的集合(表)名', 'as': '关联后的别名', # 如果不设置默认为集合名 'join_fields': [(主表字段名, 关联表字段名), ...], # 要关联的字段列表, 仅支持完全相等的关联条件 'filter': ..., # 关联表数据的过滤条件(仅用于内部过滤需要关联的数据), 注意字段无需添加集合的别名 }, ... ] @param {Any} session=None - 指定事务连接对象 @param {list|str} partition=None - MySQL, PostgreSQL专有参数, 指定操作的分区 注: MySQL支持送入分区列表名, 例如(p1, s3); PostgreSQL仅支持送入单个分区后缀名, 例如'p1' @returns {list} - 返回的结果列表迭代器 """ # 执行连接的固定参数 _upd_execute_paras = { 'commit_on_finished': True, 'rollback_on_exception': True, 'close_cursor': True, 'close_conn': True, } # 获取session if session is not None: _conn = session[0] _cursor = session[1] _upd_execute_paras['commit_on_finished'] = False _upd_execute_paras['rollback_on_exception'] = False _upd_execute_paras['close_conn'] = False if _cursor is not None: _upd_execute_paras['close_cursor'] = False else: _conn = None _cursor = None # 获取固定字段信息 _fixed_col_define = await self._get_fixed_col_define(collection, session=session) _filter = {} if filter is None else filter _sqls, _sql_paras, _execute_paras, _checks = await AsyncTools.async_run_coroutine( self._generate_sqls( 'query', collection, filter=_filter, projection=projection, sort=sort, skip=skip, limit=limit, hint=hint, fixed_col_define=_fixed_col_define, left_join=left_join, session=session, **kwargs ) ) _execute_is_query = _execute_paras.get('is_query', True) _execute_paras.update(_upd_execute_paras) if _cursor is None: _conn = await self._get_connection(conn=_conn) _cursor = await AsyncTools.async_run_coroutine(_conn.cursor()) try: # 上一个语句执行结果和是否异常的标识 _prev_return = None _prev_error = False # 遍历执行语句 _index = 0 _lask_index = len(_sqls) - 1 for _sql in _sqls: # 参数准备 _is_last = (_index >= _lask_index) _sql_paras = None if _sql_paras is None else _sql_paras[_index] _run_check = {} if _checks is not None and _checks[_index] is not None: _run_check = _checks[_index] _is_query = False if type(_execute_is_query) in (list, tuple): _is_query = _execute_is_query[_index] elif _is_last: _is_query = True _index += 1 # 跳转下一个标识 # 执行前判断 if not self._execute_sql_pre_check(_run_check, _prev_return, _prev_error): # 检查不通过, 跳过执行, 当作空执行成功 _prev_return = None _prev_error = False continue try: if _is_last: # 最后一个查询 _prev_return = self._execute_sql_query_iter( _sql, paras=_sql_paras, fetch_each=fetch_each, conn=_conn, cursor=_cursor, commit_on_finished=False, rollback_on_exception=False, close_cursor=False, close_conn=False ) else: _prev_return = await self._execute_sql( _sql, paras=_sql_paras, is_query=_is_query, conn=_conn, cursor=_cursor, commit_on_finished=False, rollback_on_exception=False, close_cursor=False, close_conn=False ) _prev_error = False except: _prev_error = True _prev_return = None if not _run_check.get('after_check', {}).get('ignore_current_error', False): # 不忽略执行异常 raise # 最后一个语句为异步迭代器 async for _rows in _prev_return: yield _rows # 判断是否需要自动提交 if _execute_paras['commit_on_finished']: await AsyncTools.async_run_coroutine(_conn.commit()) except: # 出现异常, 判断是否要回滚 if _execute_paras['rollback_on_exception']: await AsyncTools.async_run_coroutine(_conn.rollback()) raise finally: # 判断是否关闭游标和连接 if _execute_paras['close_cursor']: await AsyncTools.async_run_coroutine(_cursor.close()) if _execute_paras['close_conn']: await AsyncTools.async_run_coroutine(_conn.close())
[文档] async def query_count(self, collection: str, filter: dict = None, skip: int = None, limit: int = None, hint: dict = None, overtime: float = None, left_join: list = None, session: Any = None, **kwargs) -> int: """ 获取匹配查询条件的结果数量 @param {str} collection - 集合(表) @param {dict} filter=None - 查询条件字典, 与mongodb的查询条件设置方法一样, @param {int} skip=None - 指定跳过返回结果的前面记录的数量 @param {int} limit=None - 指定限定返回结果记录的数量 @param {dict} hint=None - 指定查询使用索引的名字清单 @param {float} overtime=None - 指定操作的超时时间, 单位为秒 @param {list} left_join=None - 指定左关联(left outer join)集合信息, 每个数组为一个关联表, 格式如下: [ { 'db_name': '指定集合的db', # 如果不设置则代表和主表是同一个数据库 'collection': '要关联的集合(表)名', 'as': '关联后的别名', # 如果不设置默认为集合名 'join_fields': [(主表字段名, 关联表字段名), ...], # 要关联的字段列表, 仅支持完全相等的关联条件 'filter': ..., # 关联表数据的过滤条件(仅用于内部过滤需要关联的数据), 注意字段无需添加集合的别名 }, ... ] @param {Any} session=None - 指定事务连接对象 @param {list|str} partition=None - MySQL, PostgreSQL专有参数, 指定操作的分区 注: MySQL支持送入分区列表名, 例如(p1, s3); PostgreSQL仅支持送入单个分区后缀名, 例如'p1' @returns {int} - 返回查询条件匹配的记录数 """ # 执行连接的固定参数 _upd_execute_paras = { 'commit_on_finished': True, 'rollback_on_exception': True, 'close_cursor': True, 'close_conn': True, 'is_query': True } # 获取session if session is not None: _conn = session[0] _cursor = session[1] _upd_execute_paras['commit_on_finished'] = False _upd_execute_paras['rollback_on_exception'] = False _upd_execute_paras['close_conn'] = False if _cursor is not None: _upd_execute_paras['close_cursor'] = False else: _conn = None _cursor = None # 获取固定字段信息 _fixed_col_define = await self._get_fixed_col_define(collection, session=session) _filter = {} if filter is None else filter _sqls, _sql_paras, _execute_paras, _checks = await AsyncTools.async_run_coroutine( self._generate_sqls( 'query_count', collection, filter=_filter, skip=skip, limit=limit, hint=hint, fixed_col_define=_fixed_col_define, left_join=left_join, **kwargs ) ) # 更新执行sql的参数 _execute_paras.update(_upd_execute_paras) _ret = await self._execute_sqls( _sqls, paras=_sql_paras, checks=_checks, conn=_conn, cursor=_cursor, **_execute_paras ) # 返回第0行第0个记录 return list(_ret[0].values())[0]
[文档] async def query_group_by(self, collection: str, group: dict = None, filter: dict = None, projection: Union[dict, list] = None, sort: list = None, overtime: float = None, session: Any = None, **kwargs) -> list: """ 获取记录聚合统计的结果 @param {str} collection - 集合(表) @param {dict} group=None - 分组返回设置字典(注意与mongodb的_id要求有所区别) 指定分组字段为col1、col2, 聚合字段为count、pay_amt, 其中pay_amt统计col_pay字段的合计数值 {'id': '$col1', 'name': '$col2', 'count': {'$sum': 1}, 'pay_amt': {'$sum': '$col_pay'}} 常见的聚合类型: $sum-计算总和, $avg-计算平均值, $min-取最小值, $max-取最大值, $first-取第一条, $last-取最后一条 @param {dict} filter=None - 查询条件字典, 与mongodb的查询条件设置方法一样 @param {dict|list} projection=None - 指定结果返回的字段信息(指统计后的结果) @param {list} sort=None - 查询结果的排序方式(注意排序字段为返回结果的分组字段, 而不是表的原始字段) @param {float} overtime=None - 指定操作的超时时间, 单位为秒 @param {Any} session=None - 指定事务连接对象 @param {list|str} partition=None - MySQL, PostgreSQL专有参数, 指定操作的分区 注: MySQL支持送入分区列表名, 例如(p1, s3); PostgreSQL仅支持送入单个分区后缀名, 例如'p1' @returns {list} - 返回结果列表 """ # 执行连接的固定参数 _upd_execute_paras = { 'commit_on_finished': True, 'rollback_on_exception': True, 'close_cursor': True, 'close_conn': True, 'is_query': True } # 获取session if session is not None: _conn = session[0] _cursor = session[1] _upd_execute_paras['commit_on_finished'] = False _upd_execute_paras['rollback_on_exception'] = False _upd_execute_paras['close_conn'] = False if _cursor is not None: _upd_execute_paras['close_cursor'] = False else: _conn = None _cursor = None # 获取固定字段信息 _fixed_col_define = await self._get_fixed_col_define(collection, session=session) _filter = {} if filter is None else filter _sqls, _sql_paras, _execute_paras, _checks = await AsyncTools.async_run_coroutine( self._generate_sqls( 'query_group_by', collection, group=group, filter=_filter, projection=projection, sort=sort, fixed_col_define=_fixed_col_define, **kwargs ) ) _execute_paras.update(_upd_execute_paras) return await self._execute_sqls( _sqls, paras=_sql_paras, checks=_checks, conn=_conn, cursor=_cursor, **_execute_paras )
############################# # 原生命令执行 #############################
[文档] async def run_native_cmd(self, sql: str, paras: tuple = None, is_query: bool = True, conn: Any = None, cursor: Any = None, commit_on_finished: bool = True, rollback_on_exception: bool = True, close_cursor: bool = False, close_conn: bool = False, **kwargs): """ 执行原生命令(或SQL)并返回执行结果 注: 该函数不支持驱动的兼容处理 @param {str} sql - 要执行的SQL语句 @param {tuple} paras=None - 传入的SQL参数字典(支持?占位) @param {bool} is_query=True - 指定语句是否查询 @param {Any} conn=None - 传入的已打开连接, 如果传入代表纳入事务处理 @param {Any} cursor=None - 传入的已有游标, 不传入将自动创建新游标, 如果传入该值必须也传入conn @param {bool} commit_on_finished=True - 完成处理时是否执行commit操作 @param {bool} rollback_on_exception=True - 出现异常时是否执行rollback操作 @param {bool} close_cursor=False - 是否关闭所传入的游标 @param {bool} close_conn=False - 是否关闭所传入的连接 @returns {int} - 返回结果, 不同情况返回如下: 非查询语句: 返回当前语句影响的记录数量, 如果无记录情况返回None 一次性获取的查询语句: 返回行记录转换为字典形式的list列表 """ return await self._execute_sql( sql, paras=paras, is_query=is_query, conn=conn, cursor=cursor, commit_on_finished=commit_on_finished, rollback_on_exception=rollback_on_exception, close_cursor=close_cursor, close_conn=close_conn )
############################# # 数据库及集合辅助索引 #############################
[文档] def init_index_extend_dbs(self, dbs: dict): """ 在初始化索引参数中扩展数据库索引信息 @param {dict} dbs - 要扩展的数据库信息字典(注: 仅用于索引, 不创建实际数据库) { '数据库名': { 'comment': '', # 数据库注释 'args': [], # 创建数据库的args参数 'kwargs': {} #创建数据库的kwargs参数 } } """ pass
[文档] def init_index_extend_collections(self, collections: dict): """ 在初始化索引参数中扩展集合索引信息 @param {dict} collections - 要扩展的集合信息字典(注: 仅用于索引, 不创建实际数据库) { '数据库名': { '集合名': { 'comment': '', # 集合注释 'indexs': {索引字典}, 'fixed_col_define': {固定字段定义}, ... } ... }, ... } """ # 遍历数据库处理字段信息字典 for _db_name, _collections in collections.items(): # 构建数据表的字段信息字典缓存 if _db_name not in self._fixed_col_define.keys(): self._fixed_col_define[_db_name] = {} # 遍历表进行创建处理 for _collection, _info in _collections.items(): self._fixed_col_define[_db_name][_collection] = { 'cols': list(_info.get('fixed_col_define', {}).keys()), 'define': _info.get('fixed_col_define', {}) }
############################# # 内部函数 ############################# async def _get_fixed_col_define(self, collection: str, db_name: str = None, session: Any = None) -> dict: """ 获取制定集合(表)的固定字段定义信息 @param {str} collection - 集合名(表) @param {str} db_name=None - 数据库名(不指定代表默认当前数据库) @param {Any} session=None - 指定事务连接对象 @returns {dict} - 返回固定信息字典 """ _db_name = self._db_name if db_name is None else db_name _fixed_col_define = self._fixed_col_define.get(_db_name, {}).get(collection, None) if _fixed_col_define is None: # 尝试查询数据库获取 _cols_define = await AsyncTools.async_run_coroutine( self._get_cols_info(collection, db_name=db_name, session=session) ) _fixed_col_define = {'cols': [], 'define': {}} for _info in _cols_define: if _info['name'] in ('_id', 'nosql_driver_extend_tags'): continue _fixed_col_define['cols'].append(_info['name']) _fixed_col_define['define'][_info['name']] = _info['type'] # 添加到数据表的字段信息字典缓存 if len(_cols_define) > 0: if _db_name not in self._fixed_col_define.keys(): self._fixed_col_define[_db_name] = {} self._fixed_col_define[_db_name][collection] = _fixed_col_define return _fixed_col_define async def _get_connection(self, conn: Any = None) -> Any: """ 从连接池获取数据库连接 @param {Any} conn=None - 传入指定的连接对象 注: 如果传入连接对象则不再从连接池获取 @returns {Any} - 数据库连接对象 """ _conn = conn if _conn is None: _conn = await self._pool.connection() # 驱动初始化连接对象 await AsyncTools.async_run_coroutine( self._driver_init_connection(_conn) ) return _conn def _execute_sql_pre_check(self, pre_check: dict, prev_return: Any, prev_error: bool) -> bool: """ 执行SQL语句数组前检查函数 @param {dict} pre_check - 检查字典 { 'skip_when_prev_error': False, # 是否当上一条语句异常时跳过当前语句执行, 默认为False # # 默认为None忽略当前参数, 使用cmp_val与上一条执行语句的返回值比较 # (如果传入func比较函数或lamba, 则使用比较函数来处理, 否则默认使用self.cmp_func_equal_first_value比较), # 比较结果为True则执行当前语句, 为False则跳过当前语句 'cmp_prev_return': [cmp_val, func] } @param {Any} prev_return - 上一语句返回结果 @param {bool} prev_error - 上一结果是否异常 @returns {bool} - 检查结果, True为通过, False为跳过 """ if pre_check is None: # 没有检查参数 return True # 是否当上一条语句异常时跳过当前语句执行 if prev_error and pre_check.get('skip_when_prev_error', False): return False # 与上一返回值的比较 if pre_check.get('cmp_prev_return', None) is not None: _cmp_val = pre_check['cmp_prev_return'][0] if len(pre_check['cmp_prev_return']) < 2 or pre_check['cmp_prev_return'][1] is None: _func = self.cmp_func_equal_first_value else: _func = pre_check['cmp_prev_return'][1] if not _func(prev_return, _cmp_val): return False return True async def _execute_sqls(self, sqls: list, paras: list = None, checks: list = None, is_query: bool = False, conn: Any = None, cursor: Any = None, commit_on_finished: bool = True, rollback_on_exception: bool = True, close_cursor: bool = False, close_conn: bool = False): """ 执行SQL语句 @param {list} sqls - 要执行的SQL语句数组 @param {list} paras=None - 传入的SQL参数数组(支持?占位) @param {list} checks=None - SQL语句的检查列表, 如果设置了必须于sqls对应, 每个sql的检查值为: { 'pre_check': { . # 执行前检查, 支持的参数见self._execute_sql_pre_check的定义 }, 'after_check': { # 执行后检查 'ignore_current_error': False, # 是否忽略当前语句异常, 默认为False } } @param {bool|list} is_query=False - 指定语句是否查询(最后一个语句使用参数) 注: 如果传入的是list, 则代表指定每个语句是否查询 @param {Any} conn=None - 传入的已打开连接, 如果传入代表纳入事务处理 @param {Any} cursor=None - 传入的已有游标, 不传入将自动创建新游标, 如果传入该值必须也传入conn @param {bool} commit_on_finished=True - 完成处理时是否执行commit操作 @param {bool} rollback_on_exception=True - 出现异常时是否执行rollback操作 @param {bool} close_cursor=False - 是否关闭所传入的游标 @param {bool} close_conn=False - 是否关闭所传入的连接 @returns {int} - 最后一个语句的返回结果, 不同情况返回如下: 非查询语句: 返回当前语句影响的记录数量, 如果无记录情况返回None 一次性获取的查询语句: 返回行记录转换为字典形式的list列表 """ # 判断是否关闭传入游标和连接 _close_cursor = close_cursor if not _close_cursor and cursor is None: _close_cursor = True _close_conn = close_conn if _close_cursor: if not _close_conn and conn is None: _close_conn = True else: # 不关闭游标的情况下, 不能关闭连接 _close_conn = False # 获取连接和游标对象 _cursor = cursor _conn = conn if cursor is None: _conn = await self._get_connection(conn=conn) _cursor = await AsyncTools.async_run_coroutine(_conn.cursor()) try: # 上一个语句执行结果和是否异常的标识 _prev_return = None _prev_error = False # 遍历执行语句 _index = 0 _last_index = len(sqls) - 1 for _sql in sqls: # 参数准备 _sql_paras = None if paras is None else paras[_index] _run_check = {} if checks is not None and checks[_index] is not None: _run_check = checks[_index] _is_query = False if type(is_query) in (list, tuple): _is_query = is_query[_index] elif _index >= _last_index: # 最后一个 _is_query = is_query _index += 1 # 跳转下一个标识 # 执行前判断 if not self._execute_sql_pre_check(_run_check.get('pre_check', None), _prev_return, _prev_error): # 检查不通过, 跳过执行, 当作空执行成功 _prev_return = None _prev_error = False continue try: _prev_return = await self._execute_sql( _sql, paras=_sql_paras, is_query=_is_query, conn=_conn, cursor=_cursor, commit_on_finished=False, rollback_on_exception=False, close_cursor=False, close_conn=False ) _prev_error = False except: _prev_return = None _prev_error = True if not _run_check.get('after_check', {}).get('ignore_current_error', False): # 不忽略执行异常 raise # 判断是否需要自动提交 if commit_on_finished: await AsyncTools.async_run_coroutine(_conn.commit()) return _prev_return except: # 出现异常, 判断是否要回滚 if rollback_on_exception: await AsyncTools.async_run_coroutine(_conn.rollback()) raise finally: # 判断是否关闭游标和连接 if _close_cursor: await AsyncTools.async_run_coroutine(_cursor.close()) if _close_conn: await AsyncTools.async_run_coroutine(_conn.close()) async def _cursor_description_to_col_index(self, cursor_description) -> list: """ 将游标描述转换为对应的列名列表 """ _col_index = [] for _tup in cursor_description: _col_name = _tup[0] _copy_index = 0 if _col_name == 'nosql_driver_extend_tags' or _col_name not in _col_index: _col_index.append(_col_name) else: while True: _copy_index += 1 _copy_name = '%s_%d' % (_col_name, _copy_index) if _copy_name in _col_index: continue else: _col_index.append(_copy_name) break return _col_index async def _rows_to_dict(self, col_index: list, rows: list) -> list: """ 将查询结果数组转换为字典数组 @param {list} col_index - 列索引 @param {list} rows - 要处理的数组 @returns {list} - 返回处理后的字典数组 """ # 空结果的情况返回空列表 if rows is None: return [] _dict_list = [] for _row in rows: # 遍历形成, 需要去掉None的key # _dict = dict(zip(col_index, self._format_row_value(_row))) _dict = {} _formated_row = await AsyncTools.async_run_coroutine(self._format_row_value(_row)) for _i in range(len(col_index)): if _formated_row[_i] is not None: if col_index[_i] == 'nosql_driver_extend_tags': # 扩展字段, 放回第一层 for _col, _val in _formated_row[_i].items(): if _dict.get(_col, None) is None: _dict[_col] = _val else: _copy_index = 0 while True: _copy_index += 1 _copy_name = '%s_%d' % (_col, _copy_index) if _dict.get(_copy_name, None) is None: _dict[_copy_name] = _val break else: continue else: _dict[col_index[_i]] = _formated_row[_i] _dict_list.append(_dict) return _dict_list async def _execute_sql(self, sql: str, paras: tuple = None, is_query: bool = False, conn: Any = None, cursor: Any = None, commit_on_finished: bool = True, rollback_on_exception: bool = True, close_cursor: bool = False, close_conn: bool = False): """ 执行SQL语句(正常返回模式) @param {str} sql - 要执行的SQL语句 @param {tuple} paras=None - 传入的SQL参数字典(支持?占位) @param {bool} is_query=False - 指定语句是否查询 @param {Any} conn=None - 传入的已打开连接, 如果传入代表纳入事务处理 @param {Any} cursor=None - 传入的已有游标, 不传入将自动创建新游标, 如果传入该值必须也传入conn @param {bool} commit_on_finished=True - 完成处理时是否执行commit操作 @param {bool} rollback_on_exception=True - 出现异常时是否执行rollback操作 @param {bool} close_cursor=False - 是否关闭所传入的游标 @param {bool} close_conn=False - 是否关闭所传入的连接 @returns {int} - 返回结果, 不同情况返回如下: 非查询语句: 返回当前语句影响的记录数量, 如果无记录情况返回None 一次性获取的查询语句: 返回行记录转换为字典形式的list列表 """ # 判断是否关闭传入游标和连接 _close_cursor = close_cursor if not _close_cursor and cursor is None: _close_cursor = True _close_conn = close_conn if _close_cursor: if not _close_conn and conn is None: _close_conn = True else: # 不关闭游标的情况下, 不能关闭连接 _close_conn = False # 获取连接和游标对象 _cursor = cursor _conn = conn if _cursor is None: _conn = await self._get_connection(conn=conn) _cursor = await AsyncTools.async_run_coroutine(_conn.cursor()) try: # 返回结果对象 _ret = None # 执行sql if self._debug: # debug模式, 打印sql self._logger.debug('run sql: %s, para: %s' % (sql, paras)) if paras is None: await AsyncTools.async_run_coroutine(_cursor.execute(sql)) else: await AsyncTools.async_run_coroutine(_cursor.execute(sql, paras)) if is_query: # 查询语句, 一次性返回查询结果 _col_index = await self._cursor_description_to_col_index(_cursor.description) _rows = await AsyncTools.async_run_coroutine(_cursor.fetchall()) # 转换为字典形式并返回 _ret = await self._rows_to_dict(_col_index, _rows) else: # 非查询语句, 返回语句影响影响的记录行数 _rowcount = _cursor.rowcount _ret = None if _rowcount == -1 else _rowcount # 判断是否需要自动提交 if commit_on_finished: await AsyncTools.async_run_coroutine(_conn.commit()) return _ret except: # 出现异常, 判断是否要回滚 if rollback_on_exception: await AsyncTools.async_run_coroutine(_conn.rollback()) # 异常输出日志 self._logger.error( 'execute sql error, sql=%s paras=%s error: %s' % (sql, str(paras), traceback.format_exc()) ) raise finally: # 判断是否关闭游标和连接 if _close_cursor: await AsyncTools.async_run_coroutine(_cursor.close()) if _close_conn: await AsyncTools.async_run_coroutine(_conn.close()) async def _execute_sql_query_iter(self, sql: str, paras: tuple = None, fetch_each: int = 1, conn: Any = None, cursor: Any = None, commit_on_finished: bool = True, rollback_on_exception: bool = True, close_cursor: bool = False, close_conn: bool = False): """ 执行查询SQL语句(迭代获取模式) @param {str} sql - 要执行的SQL语句 @param {tuple} paras=None - 传入的SQL参数字典(支持?占位) @param {int} fetch_each=1 - 查询的情况下, 每次获取的记录数量 @param {Any} conn=None - 传入的已打开连接, 如果传入代表纳入事务处理 @param {Any} cursor=None - 传入的已有游标, 不传入将自动创建新游标, 如果传入该值必须也传入conn @param {bool} commit_on_finished=True - 完成处理时是否执行commit操作 @param {bool} rollback_on_exception=True - 出现异常时是否执行rollback操作 @param {bool} close_cursor=False - 是否关闭所传入的游标 @param {bool} close_conn=False - 是否关闭所传入的连接 @returns {async_generator} - 返回可异步迭代获取的查询结果 通过 async for 遍历返回的迭代结果列表, 或使用RunTool.AsyncTools工具遍历处理 """ # 判断是否关闭传入游标和连接 _close_cursor = close_cursor if not _close_cursor and cursor is None: _close_cursor = True _close_conn = close_conn if _close_cursor: if not _close_conn and conn is None: _close_conn = True else: # 不关闭游标的情况下, 不能关闭连接 _close_conn = False # 获取连接和游标对象 _cursor = cursor _conn = conn if _cursor is None: _conn = await self._get_connection(conn=conn) _cursor = await AsyncTools.async_run_coroutine(_conn.cursor()) try: # 执行sql if self._debug: # debug模式, 打印sql self._logger.debug('run sql: %s, para: %s' % (sql, paras)) if paras is None: await AsyncTools.async_run_coroutine(_cursor.execute(sql)) else: await AsyncTools.async_run_coroutine(_cursor.execute(sql, paras)) # 查询语句, 分批次返回查询结果 _col_index = await self._cursor_description_to_col_index(_cursor.description) while True: _rows = await AsyncTools.async_run_coroutine(_cursor.fetchmany(fetch_each)) if _rows is None or len(_rows) == 0: # 已无记录获取 break # 返回转换后的处理结果 _fetchs = await self._rows_to_dict(_col_index, _rows) yield _fetchs # 判断是否需要自动提交 if commit_on_finished: await AsyncTools.async_run_coroutine(_conn.commit()) except: # 出现异常, 判断是否要回滚 if rollback_on_exception: await AsyncTools.async_run_coroutine(_conn.rollback()) # 异常输出日志 self._logger.error( 'execute sql error, sql=%s paras=%s error: %s' % (sql, str(paras), traceback.format_exc()) ) raise finally: # 判断是否关闭游标和连接 if _close_cursor: await AsyncTools.async_run_coroutine(_cursor.close()) if _close_conn: await AsyncTools.async_run_coroutine(_conn.close()) ############################# # 初始化集合 ############################# async def _init_collections(self, init_configs: dict): """ 启动驱动时创建的集合(表) @param {dict} init_configs - 初始化参数, 在构造函数中传入 """ # 记录开始时的数据库名 _temp_name = self._db_name # 遍历数据库创建 for _db_name, _collections in init_configs.items(): if self._db_name != _db_name: # 切换到指定数据库 await self.switch_db(_db_name) # 构建数据表的字段信息字典缓存 if _db_name not in self._fixed_col_define.keys(): self._fixed_col_define[_db_name] = {} # 遍历表进行创建处理 for _collection, _info in _collections.items(): self._fixed_col_define[_db_name][_collection] = { 'cols': list(_info.get('fixed_col_define', {}).keys()), 'define': _info.get('fixed_col_define', {}) } if _info.get('index_only', True): # 只用于索引, 不创建 continue if await self.collections_exists(_collection): # 表已存在 continue # 建表操作 await self.create_collection( _collection, **_info ) # 切换回开始的数据库 await self.switch_db(_temp_name) ############################# # 需要继承类实现的内部函数 ############################# def _get_db_creator(self, connect_config: dict, pool_config: dict, driver_config: dict) -> dict: """ 获取数据库连接驱动及参数(同步函数) @param {dict} connect_config - 外部送入的连接参数 @param {dict} pool_config - 连接池配置 @param {dict} driver_config={} - 驱动配置 @returns {dict} - 返回连接池的相关参数 { 'creator': class, # 连接创建模块或对象 'pool_connection_class': class, # 连接池连接对象实现类(继承PoolConnectionFW的类对象) 'args': [], # 进行连接创建的固定位置参数 'kwargs': {}, # 进行连接创建的kv参数 'connect_method_name': 'connect', # 连接创建模块要执行的连接方法, 传None代表直接使用creator创建连接 'pool_update_config': {}, # 要指定AIOConnectionPool特定值的更新字典 'current_db_name': '', # 当前数据库名 } """ raise NotImplementedError() async def _generate_sqls(self, op: str, *args, **kwargs) -> tuple: """ 生成对应操作要执行的sql语句数组(同步或异步函数) @param {str} op - 要执行的操作(传入函数名字符串) @param {args} - 要执行操作函数的固定位置入参 @param {kwargs} - 要执行操作函数的kv入参 @returns {tuple} - 返回要执行的sql信息(sqls, sql_paras, execute_paras, checks) sqls: list, 要顺序执行的sql语句数组; 注意, 仅最后一个语句支持为查询语句, 前面的语句都必须为非查询语句 sql_paras: list, 传入的SQL参数字典(支持?占位), 注意必须与sqls数组一一对应(如果只有个别语句需要传参, 其他位置设置为None; 如果全部都无需传参, 该值直接传None) execute_paras: dict, 最后一个SQL语句的执行参数 {'is_query': ...} checks: list, 传入每个语句执行检查字典列表, 注意必须与sqls数组一一对应(如果只有个别语句需要传参, 其他位置设置为None; 如果全部都无需传参, 该值直接传None) """ raise NotImplementedError() async def _format_row_value(self, row: list) -> list: """ 处理行记录的值(数据库存储类型转为Python类型) (同步函数) @param {list} row - 行记录 @returns {list} - 转换后的行记录 """ raise NotImplementedError() async def _driver_after_init_pool(self): """ 初始化连接池以后驱动执行的处理(同步或异步函数) """ pass async def _driver_init_connection(self, conn: Any): """ 驱动对获取到的连接的初始化处理(同步或异步函数) @param {Any} conn - 传入连接对象 """ pass async def _get_cols_info(self, collection: str, db_name: str = None, session: Any = None) -> list: """ 获取制定集合(表)的列信息(同步或异步函数) @param {str} collection - 集合名(表) @param {str} db_name=None - 数据库名(不指定代表默认当前数据库) @param {Any} session=None - 指定事务连接对象 @returns {list} - 字典形式的列信息数组, 注意列名为name, 类型为type(类型应为标准类型: str, int, float, bool, json) """ return [] async def _get_current_db_name(self, session: Any = None) -> str: """ 获取当前数据库名 @param {Any} session=None - 指定事务连接对象 @returns {str} - 数据库名 """ return None