HiveNetNoSql.sqlite 源代码

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
# Copyright 2022 黎慧剑
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
sqlite的HiveNetNoSql实现模块

@module sqlite
@file sqlite.py
"""
import os
import sys
import copy
import re
from typing import Any, Union
import sqlite3
import json
from bson.objectid import ObjectId
from HiveNetCore.utils.run_tool import AsyncTools
from HiveNetCore.utils.string_tool import StringTool
from HiveNetCore.utils.validate_tool import ValidateTool
from HiveNetCore.connection_pool import PoolConnectionFW
# 自动安装依赖库
from HiveNetCore.utils.pyenv_tool import PythonEnvTools
try:
    import aiosqlite
except ImportError:
    PythonEnvTools.install_package('aiosqlite')
    import aiosqlite
# 根据当前文件路径将包路径纳入, 在非安装的情况下可以引用到
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)))
from HiveNetNoSql.base.driver_fw import NosqlAIOPoolDriver


[文档]class SQLitePoolConnection(PoolConnectionFW): """ SQLite连接池连接对象 """ ############################# # 需要继承类实现的函数 ############################# async def _real_ping(self, *args, **kwargs) -> bool: """ 实现类的真实检查连接对象是否有效的的函数 @returns {bool} - 返回检查结果 """ # 不支持检测连接, 直接返回True就好 return True async def _fade_close(self) -> Any: """ 实现类提供的虚假关闭函数 注1: 不关闭连接, 只是清空上一个连接使用的上下文信息(例如数据库连接进行commit或rollback处理) 注2: 如果必须关闭真实连接, 则可以关闭后创建一个新连接返回 @returns {Any} - 返回原连接或新创建的连接 """ _close_action = self._pool._pool_extend_paras.get('close_action', None) if _close_action == 'commit': await AsyncTools.async_run_coroutine(self._conn.commit()) elif _close_action == 'rollback': await AsyncTools.async_run_coroutine(self._conn.rollback()) return self._conn async def _real_close(self): """ 实现类提供的真实关闭函数 """ await AsyncTools.async_run_coroutine(self._conn.close())
[文档]class SQLiteNosqlDriver(NosqlAIOPoolDriver): """ nosql数据库SQLite驱动 """ ############################# # 构造函数重载, 主要是注释 #############################
[文档] def __init__(self, connect_config: dict = {}, pool_config: dict = {}, driver_config: dict = {}): """ 初始化驱动 @param {dict} connect_config={} - 数据库的连接参数 host {str} - 数据库文件路径, 或使用":memory:"在内存上创建数据库 port {int} - 连接数据库的端口(SQLite无效) usedb {str} - 登录后默认切换到的数据库(SQLite无效) username {str} - 登录验证用户(SQLite无效) password {str} - 登录验证密码(SQLite无效) dbname {str} - 登录用户的数据库名(SQLite无效) connect_on_init {bool} - 是否启动时直接连接数据库 connect_timeout {float} - 连接数据库的超时时间, 单位为秒, 默认为20 default_str_len {int} - 默认的字符串类型长度, 默认为30 ...驱动实现类自定义支持的参数 transaction_share_cursor {bool} - 进行事务处理是否复用同一个游标对象, 默认为True sqlite3.connect 支持的其他参数... check_same_thread: 是否控制检查与创建连接的是一个线程 @param {dict} pool_config={} - 连接池配置 max_size {int} - 连接池的最大大小, 默认为100 min_size {int} - 连接池维持的最小连接数量, 默认为0 max_idle_time {float} - 连接被移除前的最大空闲时间, 单位为秒, 默认为None wait_queue_timeout {float} - 在没有空闲连接的时候, 请求连接所等待的超时时间, 单位为秒, 默认为None(不超时) ...驱动实现类自定义支持的参数 注: 其他参数为AIOConnectionPool所支持的参数 @param {dict} driver_config={} - 驱动配置 init_db {dict} - 要在启动驱动时创建的数据库 { '数据库名': { 'index_only': False, # 是否仅用于索引, 不创建 'comment': '', # 数据库注释 'args': [], # 创建数据库的args参数 'kwargs': {} #创建数据库的kwargs参数 } } init_collections {dict} - 要在启动驱动时创建的集合(表) { '数据库名': { '集合名': { 'index_only': False, # 是否仅用于索引, 不创建 'comment': '', # 集合注释 'indexs': {索引字典}, 'fixed_col_define': {固定字段定义} } ... }, ... } init_yaml_file {str} - 要在启动时创建的数据库和集合(表)配置yaml文件 注1: 该参数用于将init_db和init_collections参数内容放置的配置文件中, 如果参数有值则忽略前面两个参数 注2: 配置文件为init_db和init_collections两个字典, 内容与这两个参数一致 logger {Logger} - 传入驱动的日志对象 ignore_index_error {bool} - 是否忽略索引创建的异常, 默认为True debug {bool} - 指定是否debug模式, 默认为False close_action {str} - 关闭连接时自动处理动作, None-不处理, 'commit'-自动提交, 'rollback'-自动回滚 """ # 记录数据库所在路径, 创建无文件参数的数据库时默认使用该路径 _host = connect_config.get('host', ':memory:') if _host == ':memory:': self._db_path = os.getcwd() else: self._db_path = os.path.dirname(_host) # 登记当前已经加载的数据库 self._init_dbs = ['main'] super().__init__( connect_config=connect_config, pool_config=pool_config, driver_config=driver_config ) # 指定使用独立的insert_many语句, 性能更高 self._use_insert_many_generate_sqls = True
############################# # 需要继承类实现的内部函数 ############################# def _get_db_creator(self, connect_config: dict, pool_config: dict, driver_config: dict) -> tuple: """ 获取数据库连接驱动及参数 @param {dict} connect_config={} - 数据库的连接参数 host {str} - 数据库文件路径, 或使用":memory:"在内存上创建数据库 port {int} - 连接数据库的端口(SQLite无效) usedb {str} - 登录后默认切换到的数据库(SQLite无效) username {str} - 登录验证用户(SQLite无效) password {str} - 登录验证密码(SQLite无效) dbname {str} - 登录用户的数据库名(SQLite无效) connect_on_init {bool} - 是否启动时直接连接数据库 connect_timeout {float} - 连接数据库的超时时间, 单位为秒, 默认为20 ...驱动实现类自定义支持的参数 transaction_share_cursor {bool} - 进行事务处理是否复用同一个游标对象, 默认为True sqlite3.connect 支持的其他参数... check_same_thread: 是否控制检查与创建连接的是一个线程 @param {dict} pool_config={} - 连接池配置 max_size {int} - 连接池的最大大小, 默认为100 min_size {int} - 连接池维持的最小连接数量, 默认为0 max_idle_time {float} - 连接被移除前的最大空闲时间, 单位为秒, 默认为None wait_queue_timeout {float} - 在没有空闲连接的时候, 请求连接所等待的超时时间, 单位为秒, 默认为None(不超时) ...驱动实现类自定义支持的参数 wait_connection_idle {float} - 没有空闲连接时, 等待获取连接的间隔时长, 单位为秒, 默认维护0.01 @param {dict} driver_config={} - 驱动配置 init_db {dict} - 要在启动驱动时创建的数据库 { '数据库名': { 'index_only': False, # 是否仅用于索引, 不创建 'comment': '', # 数据库注释 'args': [], # 创建数据库的args参数 'kwargs': {} #创建数据库的kwargs参数 } } init_collections {dict} - 要在启动驱动时创建的集合(表) { '数据库名': { '集合名': { 'index_only': False, # 是否仅用于索引, 不创建 'comment': '', # 集合注释 'indexs': {索引字典}, 'fixed_col_define': {固定字段定义} } ... }, ... } logger {Logger} - 传入驱动的日志对象 debug {bool} - 指定是否debug模式, 默认为False close_action {str} - 关闭连接时自动处理动作, None-不处理, 'commit'-自动提交, 'rollback'-自动回滚 @returns {dict} - 返回连接池的相关参数 { 'creator': class, # 连接创建模块或对象 'pool_connection_class': class, # 连接池连接对象实现类(继承PoolConnectionFW的类对象) 'args': [], # 进行连接创建的固定位置参数 'kwargs': {}, # 进行连接创建的kv参数 'connect_method_name': 'connect', # 连接创建模块要执行的连接方法, 传None代表直接使用creator创建连接 'pool_update_config': {}, # 要指定AIOConnectionPool特定值的更新字典 'current_db_name': '', # 当前数据库名 } """ # 需要sqlite 3.9.0 以上的版本支持 if (StringTool.version_cmp(sqlite3.sqlite_version, '3.9.0') == '<'): raise aiosqlite.NotSupportedError('only support sqlite 3.9.0 or higher version') # 初始化处理函数的映射字典 self._sqls_fun_mapping = { 'create_db': self._sqls_fun_create_db, 'switch_db': self._sqls_fun_not_support, 'list_dbs': self._sqls_fun_list_dbs, 'drop_db': self._sqls_fun_drop_db, 'create_collection': self._sqls_fun_create_collection, 'list_collections': self._sql_fun_list_collections, 'drop_collection': self._sql_fun_drop_collection, 'turncate_collection': self._sql_fun_turncate_collection, 'insert_one': self._sql_fun_insert_one, 'insert_many': self._sql_fun_insert_many, 'update': self._sql_fun_update, 'delete': self._sql_fun_delete, 'query': self._sql_fun_query, 'query_count': self._sql_fun_query_count, 'query_group_by': self._sql_fun_query_group_by } # 初始化查询操作符对应的符号 self._filter_symbol_mapping = { '$lt': '<', '$lte': '<=', '$gt': '>', '$gte': '>=', '$ne': '!=' } # 生成SQLite的连接配置 _args = [connect_config['host']] _kwargs = {} if connect_config.get('connect_timeout', None) is not None: _kwargs['timeout'] = connect_config['connect_timeout'] # 移除不使用的参数 _connect_config = copy.deepcopy(connect_config) for _pop_item in ('host', 'port', 'usedb', 'dbname', 'username', 'password', 'connect_on_init', 'connect_timeout', 'transaction_share_cursor'): _connect_config.pop(_pop_item, None) # 合并参数 _kwargs.update(_connect_config) return { 'creator': aiosqlite, 'pool_connection_class': SQLitePoolConnection, 'args': _args, 'kwargs': _kwargs, 'connect_method_name': 'connect', 'pool_update_config': { # sqlite的限制: 不支持多线程, 连接池设置最大为1; 不检查空闲连接有效性, 也不释放连接 'max_size': 1, 'ping_on_idle': False, 'free_idle_time': 0, 'pool_extend_paras': { 'close_action': driver_config.get('close_action', None) } }, 'current_db_name': 'main' } def _generate_sqls(self, op: str, *args, **kwargs) -> tuple: """ 生成对应操作要执行的sql语句数组 @param {str} op - 要执行的操作(传入函数名字符串) @param {args} - 要执行操作函数的固定位置入参 @param {kwargs} - 要执行操作函数的kv入参 @returns {tuple} - 返回要执行的sql信息(sqls, sql_paras, execute_paras) sqls: list, 要顺序执行的sql语句数组; 注意, 仅最后一个语句支持为查询语句, 前面的语句都必须为非查询语句 sql_paras: list, 传入的SQL参数字典(支持?占位), 注意必须与sqls数组一一对应(如果只有个别语句需要传参, 其他位置设置为None; 如果全部都无需传参, 该值直接传None) execute_paras: dict, 最后一个SQL语句的执行参数 {'is_query': ...} checks: list, 传入每个语句执行检查字典列表, 注意必须与sqls数组一一对应(如果只有个别语句需要传参, 其他位置设置为None; 如果全部都无需传参, 该值直接传None) """ _func = self._sqls_fun_mapping.get(op, None) if _func is None: raise aiosqlite.NotSupportedError('driver not support this operation') _ret = _func(op, *args, **kwargs) if len(_ret) == 4: return _ret else: _new_ret = [] _new_ret.extend(_ret) _new_ret.append(None) # 补充最后一个参数 return _new_ret def _format_row_value(self, row: list) -> list: """ 处理行记录的值(数据库存储类型转为Python类型) @param {list} row - 行记录 @returns {list} - 转换后的行记录 """ _new_row = [] for _item in row: _new_row.append(self._dbtype_to_python(_item)) return _new_row def _driver_init_connection(self, conn: Any): """ 驱动对获取到的连接的初始化处理 @param {Any} conn - 传入连接对象 """ # 注入正则表达式的支持函数 AsyncTools.sync_run_coroutine(conn.create_function("REGEXP", 2, self._regexp)) async def _get_cols_info(self, collection: str, db_name: str = None, session: Any = None) -> list: """ 获取制定集合(表)的列信息 @param {str} collection - 集合名(表) @param {str} db_name=None - 数据库名(不指定代表默认当前数据库) @param {Any} session=None - 指定事务连接对象 @returns {list} - 字典形式的列信息数组, 注意列名为name, 类型为type(类型应为标准类型: str, int, float, bool, json) """ if session is not None: _conn = session[0] _cursor = session[1] else: _conn = None _cursor = None # 获取表结构 _db_name = self._db_name if db_name is None else db_name _db_prefix = '' if _db_name == 'main' else ('%s.' % _db_name) _sql = "PRAGMA %stable_info('%s')" % (_db_prefix, collection) _ret = await self._execute_sql( _sql, paras=None, is_query=True, conn=_conn, cursor=_cursor ) # 处理标准类型 for _row in _ret: if _row['type'] == 'JSON': _row['type'] = 'json' if _row['type'] == 'text': _row['type'] = 'str' elif _row['type'].startswith('varchar('): _row['type'] = 'str' return _ret ############################# # 需要单独重载的函数 #############################
[文档] async def switch_db(self, name: str, *args, **kwargs): """ 切换当前数据库到指定数据库 @param {str} name - 数据库名 """ if name in self._init_dbs: self._db_name = name else: await self.create_db(name)
[文档] async def create_db(self, name: str, *args, **kwargs): """ 创建数据库 注: 创建后会自动切换到该数据库 @param {str} name - 数据库名 """ _sqls, _sql_paras, _execute_paras, _checks = await AsyncTools.async_run_coroutine( self._generate_sqls('create_db', name, *args, **kwargs) ) await self._execute_sqls( _sqls, paras=_sql_paras, checks=_checks, **_execute_paras ) # 登记已加载的数据库 self._init_dbs.append(name) # 切换数据库 await self.switch_db(name)
[文档] async def drop_db(self, name: str, *args, **kwargs): """ 删除数据库 @param {str} name - 数据库名 """ _sqls, _sql_paras, _execute_paras, _checks = await AsyncTools.async_run_coroutine( self._generate_sqls('drop_db', name) ) await self._execute_sqls( _sqls, paras=_sql_paras, checks=_checks, **_execute_paras ) # 从清单中删除 self._init_dbs.pop(self._init_dbs.index(name)) # 切换后判断是不是删除当前数据库 if self._db_name == name: _dbs = await self.list_dbs() if len(_dbs) > 0: await self.switch_db(_dbs[0])
############################# # sqlite3支持正则表达式的处理 ############################# def _regexp(self, expr, item): """ 正则表达式的函数 @param {str} expr - 表达式文本 @param {str} item - 匹配对象 @returns {Any} - 返回匹配结果 """ reg = re.compile(expr) return reg.search(item) is not None ############################# # 支持SQL处理的通用函数 ############################# def _dbtype_mapping(self, std_type: str, len: int) -> str: """ 获取字段的数据库类型 @param {str} std_type - 字段类型(str, int, float, bool, json) @param {int} len - 字段长度 @returns {str} - 返回数据库类型 """ if std_type in ('int', 'float'): return std_type elif std_type == 'bool': return 'bool' elif std_type == 'json': return 'JSON' else: return 'varchar(%d)' % (self._default_str_len if len is None else len) def _python_to_dbtype(self, val: Any, dbtype: str = None, is_json: bool = False) -> tuple: """ 将Python对象转换为数据库存储类型 @param {Any} val - 要转换的Python对象 @param {str} dbtype=None - 强制指定数据库类型, 如果不指定则自动判断 @param {str} is_json=False - 是否json数据(用于存入json的内容) @returns {tuple} - 返回转换后的结果(dbtype, dbvalue) """ # 判断类型 _dbtype = dbtype if _dbtype is None: _pytype = type(val) if _pytype == bool: _dbtype = 'bool' elif _pytype == int: _dbtype = 'int' elif _pytype == float: _dbtype = 'float' elif _pytype in (dict, tuple, list): _dbtype = 'json' else: _dbtype = 'str' # 进行转换处理 _dbvalue = None if _dbtype == 'bool': if is_json: _dbvalue = 'true' if val else 'false' else: _dbvalue = 1 if val else 0 elif _dbtype in ('int', 'float'): _dbvalue = val elif _dbtype == 'json': _dbvalue = json.dumps(val, ensure_ascii=False) else: _dbvalue = str(val) # 返回结果 return (_dbtype, _dbvalue) def _dbtype_to_python(self, val: Any) -> Any: """ 数据库值转换为python类型 @param {Any} val - 数据库值 @returns {Any} - Python值 """ if type(val) == str: try: return json.loads(val) except: return val else: return val def _db_quotes_str(self, val: str) -> str: """ 数据库单引号转义处理 @param {str} val - 要处理的字符串 @returns {str} - 转义处理后的字符串, 注意不包含外面的引号 """ return re.sub(r"\'", "''", val) def _get_filter_unit_sql(self, key: str, val: Any, fixed_col_define: dict = None, sql_paras: list = [], json_query_cols_dict: dict = {}, left_join: list = None, session=None, as_name: str = None, unuse_as_name: bool = False) -> str: """ 获取兼容mongodb过滤条件规则的单个规则对应的sql @param {str} key - 规则key @param {Any} val - 规则值 @param {dict} fixed_col_define=None - 表的固定字段配置信息字典 { 'cols': [], # 表固定字段名清单 'define': {'字段名': {'type': 'str|bool|int|...'}} } @param {list} sql_paras=[] - 返回sql对应的占位参数 @param {dict} json_query_cols_dict={} - 返回sql中json查询所需的json_tree处理的字段字典 字典的key为别名, value为该别名下的配置字典, 字典的格式如下: key为查询名(json_query_别名_字段名_字段路径.value), value为配置字典{'col': 物理字段名, 'path': '真实查询路径', 'as': '字段别名'} 注1: 真实查询路径 'key1.key2[10].key3' 对应的字段路径为 'key1_key2_10_key3' 注2: nosql_driver_extend_tags字段名不填入查询名中 @param {list} left_join=None - 左关联配置 @param {Any} session=None - 数据库事务连接对象 @param {str} as_name=None - 指定主表的别名 @param {bool} unuse_as_name=False - 指定不使用as_name @returns {str} - 单个规则对应的sql """ # 根据字段判断是主表还是关联表 _key = key _fixed_cols = None if _key[0] != '#': # 主表的过滤条件 _as_name = '_main_table' if as_name is None else as_name _col_as_name = '' if unuse_as_name else '%s.' % _as_name if fixed_col_define is not None: _fixed_cols = copy.deepcopy(fixed_col_define.get('cols', [])) else: # 关联表的过滤条件 _index = _key.find('.') _join_para = left_join[int(_key[1: _index])] _key = _key[_index+1:] _join_db_name = _join_para.get('db_name', self._db_name) _join_tab = _join_para['collection'] _as_name = '%s' % _join_para.get('as', _join_tab) _col_as_name = '%s.' % _as_name _fixed_col_define = AsyncTools.sync_run_coroutine(self._get_fixed_col_define( _join_tab, db_name=_join_db_name, session=session )) if _fixed_col_define is not None: _fixed_cols = copy.deepcopy(_fixed_col_define.get('cols', [])) # 判断是否处理json的值, 形成最后比较的 key 值 _is_json = False if _fixed_cols is not None: _fixed_cols.append('_id') if _key not in _fixed_cols: # 非固定字段或json字段的处理 _is_json = True if _as_name in ('_main_table', '_inner_temp_as_name'): _key = self._add_to_json_query_cols( _key, json_query_cols_dict=json_query_cols_dict, fixed_cols=_fixed_cols, as_name=_as_name, unuse_as_name=(_col_as_name == '') ) _key = '%s.value' % json_query_cols_dict[_as_name][_key]['as'] else: # 关联表不使用json_tree形式 _key = self._get_json_extract_sql( _key, fixed_cols=_fixed_cols, as_name=_as_name, unuse_as_name=(_col_as_name == '') ) if isinstance(val, dict): # 有特殊规则 _cds = [] # 每个规则的数组 for _op, _para in val.items(): if _op in self._filter_symbol_mapping.keys(): # 比较值转换为数据库的格式 _dbtype, _cmp_val = self._python_to_dbtype(_para) _cds.append('%s %s ?' % ( _key if _is_json else '%s%s' % (_col_as_name, _key), self._filter_symbol_mapping[_op] )) sql_paras.append(_cmp_val) elif _op in ('$in', '$nin'): # in 和 not in _cds.append('%s %s (%s)' % ( _key if _is_json else '%s%s' % (_col_as_name, _key), 'in' if _op == '$in' else 'not in', ','.join(['?' for _item in _para]) )) for _item in _para: _dbtype, _cmp_val = self._python_to_dbtype(_item) sql_paras.append(_cmp_val) elif _op == '$regex': _cds.append("%s REGEXP ?" % (_key if _is_json else '%s%s' % (_col_as_name, _key))) sql_paras.append(_para) else: raise aiosqlite.NotSupportedError('sqlite3 not support this search operation [%s]' % _op) _sql = ' and '.join(_cds) else: # 直接相等的条件 if val is None: _sql = '%s is NULL' % (_key if _is_json else '%s%s' % (_col_as_name, _key)) else: _dbtype, _cmp_val = self._python_to_dbtype(val) _sql = '%s = ?' % (_key if _is_json else '%s%s' % (_col_as_name, _key)) sql_paras.append(_cmp_val) return _sql def _get_filter_sql(self, filter: dict, fixed_col_define: dict = None, sql_paras: list = [], json_query_cols_dict: dict = {}, left_join: list = None, session=None, as_name: str = None, unuse_as_name: bool = False) -> str: """ 获取兼容mongodb过滤条件规则的sql语句 @param {dict} filter - 过滤规则字典 @param {dict} fixed_col_define=None - 表的固定字段配置信息字典 { 'cols': [], # 表固定字段名清单 'define': {'字段名': {'type': 'str|bool|int|...'}} } @param {list} sql_paras=[] - 返回sql对应的占位参数 @param {dict} json_query_cols_dict={} - 返回sql中json查询所需的json_tree处理的字段字典 @param {list} left_join=None - 左关联配置 @param {Any} session=None - 数据库事务连接对象 @param {str} as_name=None - 指定主表的别名 @param {bool} unuse_as_name=False - 指定不使用as_name @returns {str} - 返回的sql语句, 如果没有条件则返回None """ if filter is None or len(filter) == 0: return None # 遍历进行解析 _condition_list = [] for _col, _val in filter.items(): if _col == '$or': # or的情况,处理的是字典数组 _or_list = list() for _condition in _val: # 逐个条件处理 _where = self._get_filter_sql( _condition, fixed_col_define=fixed_col_define, sql_paras=sql_paras, json_query_cols_dict=json_query_cols_dict, left_join=left_join, session=session, as_name=as_name, unuse_as_name=unuse_as_name ) if len(_condition) > 1: # 多条件的情况,需要增加括号进行集合处理 _where = '(%s)' % _where if _where is not None: _or_list.append(_where) # 组合放到全局列表中 _condition_list.append( '(%s)' % ' or '.join(_or_list) ) else: # 正常的and条件 _where = self._get_filter_unit_sql( _col, _val, fixed_col_define=fixed_col_define, sql_paras=sql_paras, json_query_cols_dict=json_query_cols_dict, left_join=left_join, session=session, as_name=as_name, unuse_as_name=unuse_as_name ) # 添加条件 _condition_list.append(_where) # 组合条件并返回 return ' and '.join(_condition_list) def _get_update_sql(self, update: dict, fixed_col_define: dict = None, sql_paras: list = [], json_query_cols_dict: dict = {}) -> str: """ 获取兼容mongodb更新语句的sql语句 @param {dict} update - 更新配置字典 @param {dict} fixed_col_define=None - 表的固定字段配置信息字典 { 'cols': [], # 表固定字段名清单 'define': {'字段名': {'type': 'str|bool|int|...'}} } @param {list} sql_paras=[] - 返回sql对应的占位参数 @param {dict} json_query_cols_dict={} - 返回sql中json查询所需的json_tree处理的字段字典 @returns {str} - 返回更新部分语句sql """ # 更新辅助字典, key为要更新的字段名, value为{'sql': '对应的sql语句, 比如?', 'paras': [传入sql的参数列表]} _upd_dict = {} # 扩展字段字典, key要更新的扩展字段名, value为字典: {'sql': None, 'paras': []} _extend_dict = {} # 遍历处理 for _op, _para in update.items(): for _key, _val in _para.items(): if fixed_col_define is None or _key == '_id' or _key in fixed_col_define.get('cols', []): # 是固定字段 if _op == '$set': _upd_dict[_key] = {'sql': '?', 'paras': [self._python_to_dbtype(_val)[1]]} elif _op == '$unset': _upd_dict[_key] = {'sql': 'NULL', 'paras': []} elif _op == '$inc': _upd_dict[_key] = {'sql': 'ifnull(%s,0) + ?' % _key, 'paras': [_val]} elif _op == '$mul': _upd_dict[_key] = {'sql': 'ifnull(%s,0) * ?' % _key, 'paras': [_val]} elif _op == '$min': _upd_dict[_key] = { 'sql': 'case when ifnull({key}, ?) < ? then ifnull({key},0) else ? end'.format(key=_key), 'paras': [_val, _val, _val] } elif _op == '$max': _upd_dict[_key] = { 'sql': 'case when ifnull({key}, ?) > ? then ifnull({key},0) else ? end'.format(key=_key), 'paras': [_val, _val, _val] } else: raise aiosqlite.NotSupportedError('sqlite3 not support this update operation [%s]' % _op) else: # 是扩展字段或json字段 _path_cols = _key.split('.') if len(_path_cols) > 1 and _path_cols[0] in fixed_col_define.get('cols', []): # 是固定的json字段 _col_name = _path_cols[0] _set_key = self._convert_path_array(_path_cols[1:]) else: # 是扩展字段 _col_name = 'nosql_driver_extend_tags' _set_key = self._convert_path_array(_path_cols) if _extend_dict.get(_col_name, None) is None: _extend_dict[_col_name] = {'sql': None, 'paras': []} if _op == '$set': _dbtype, _dbval = self._python_to_dbtype(_val, is_json=True) if _dbtype in ('bool', 'json'): _sql = 'json_set({sql}, "$.{key}", json(?))' else: _sql = 'json_set({sql}, "$.{key}", ?)' _extend_dict[_col_name]['paras'].append(_dbval) elif _op == '$unset': _sql = 'json_remove({sql}, "$.{key}")' elif _op in ('$inc', '$mul', '$min', '$max'): _dbtype, _dbval = self._python_to_dbtype(_val, is_json=True) # 需要取值出来, 添加查询字段 if _op == '$inc': _sql = 'json_set({sql}, "$.{key}", ifnull(json_extract({col_name}, "$.{key}"), 0) + ?)' _extend_dict[_col_name]['paras'].append(_dbval) elif _op == '$mul': _sql = 'json_set({sql}, "$.{key}", ifnull(json_extract({col_name}, "$.{key}"), 0) * ?)' _extend_dict[_col_name]['paras'].append(_dbval) elif _op == '$min': _sql = 'json_set({sql}, "$.{key}", case when ifnull(json_extract({col_name}, "$.{key}"), ?) < ? then ifnull(json_extract({col_name}, "$.{key}"), 0) else ? end)' _extend_dict[_col_name]['paras'].extend([_dbval, _dbval, _dbval]) elif _op == '$max': _sql = 'json_set({sql}, "$.{key}", case when ifnull(json_extract({col_name}, "$.{key}"), ?) > ? then ifnull(json_extract({col_name}, "$.{key}"), 0) else ? end)' _extend_dict[_col_name]['paras'].extend([_dbval, _dbval, _dbval]) else: raise aiosqlite.NotSupportedError('sqlite3 not support this update operation [%s]' % _op) # 处理格式化 _extend_dict[_col_name]['sql'] = _sql.format( sql=_col_name if _extend_dict[_col_name]['sql'] is None else _extend_dict[_col_name]['sql'], key=_set_key, col_name=_col_name ) # 开始生成sql语句和返回参数 _sqls = [] for _key, _val in _upd_dict.items(): _sqls.append('%s=%s' % (_key, _val['sql'])) if _val['paras'] is not None: sql_paras.extend(_val['paras']) # 处理扩展字段 for _col_name, _extend_para in _extend_dict.items(): _sqls.append('%s=%s' % (_col_name, _extend_para['sql'])) sql_paras.extend(_extend_para['paras']) return ','.join(_sqls) def _get_projection_sql(self, projection: Union[dict, list], fixed_col_define: dict = None, sql_paras: list = [], json_query_cols_dict: dict = {}, is_group_by: bool = False, as_name: str = None, unuse_as_name: bool = False, left_join: list = None, session=None) -> str: """ 获取兼容mongodb查询返回字段的sql语句 @param {Union[dict, list]} projection - 指定结果返回的字段信息 列表模式: ['col1','col2', ...] 注意: 该模式一定会返回 _id 这个主键 字典模式: {'_id': False, 'col1': True, ...} 该方式可以通过设置False屏蔽 _id 的返回 注: 字典模式的值也可以传入字符串, 如果是字符串, 则代表key为别名, value才是真正的字段 @param {dict} fixed_col_define=None - 表的固定字段配置信息字典 { 'cols': [], # 表固定字段名清单 'define': {'字段名': {'type': 'str|bool|int|...'}} } @param {list} sql_paras=[] - 返回sql对应的占位参数 @param {dict} json_query_cols_dict={} - 返回sql中json查询所需的json_tree处理的字段字典 @param {bool} is_group_by=False - 指定是否group by的处理, 如果是不会处理_id字段 @param {str} as_name=None - 字段对应的表别名 @param {bool} unuse_as_name=False - 指定不使用as_name @param {list} left_join - 左关联配置 @param {Any} session=None - 数据库事务连接对象 @returns {str} - 返回更新部分语句sql """ # 如果不指定, 返回所有字段 if projection is None: _base_as_name = '_main_table' if as_name is None else as_name _as_name = '' if unuse_as_name else ('%s.' % _base_as_name) _project_sql = '%s*' % _as_name if left_join is not None: # 补充关联表的所有字段获取 for _join_para in left_join: _project_sql = '%s, %s.*' % ( _project_sql, _join_para.get('as', _join_para['collection']) ) return _project_sql # 定义的内部函数 def _get_join_col_info(col: str, left_join: list, tab_as_name: str, unuse_as_name: bool) -> tuple: # 获取关联表列信息 if col[0] == '#': _index = col.find('.') _join_para = left_join[int(col[1: _index])] _col = col[_index+1:] _join_as_name = _join_para.get('as', _join_para['collection']) _join_as_name_sql = '%s.' % _join_as_name else: _col = col _join_as_name = '_main_table' if tab_as_name is None else tab_as_name _join_as_name_sql = '' if unuse_as_name else ('%s.' % _join_as_name) return _col, _join_as_name, _join_as_name_sql # 标准化要显示的字段清单 _projection = {} if isinstance(projection, dict): for _key, _show in projection.items(): if type(_show) == str and _show[0] == '$': _col, _tab_as_name, _tab_as_name_sql = _get_join_col_info( _show[1:], left_join, as_name, unuse_as_name ) _projection[_key] = { 'col': _col, 'tab_as': _tab_as_name, 'tab_as_sql': _tab_as_name_sql, 'col_as': _key } elif _show: _col, _tab_as_name, _tab_as_name_sql = _get_join_col_info( _key, left_join, as_name, unuse_as_name ) _projection[_key] = { 'col': _col, 'tab_as': _tab_as_name, 'tab_as_sql': _tab_as_name_sql } else: # 列表形式, _id是必须包含的 if not is_group_by and '_id' not in projection: _projection['_id'] = { 'col': '_id', 'tab_as': '_main_table' if as_name is None else as_name } _projection['_id']['tab_as_sql'] = '' if unuse_as_name else ('%s.' % _projection['_id']['tab_as']) for _key in projection: _col, _tab_as_name, _tab_as_name_sql = _get_join_col_info( _key, left_join, as_name, unuse_as_name ) _projection[_key] = { 'col': _col, 'tab_as': _tab_as_name, 'tab_as_sql': _tab_as_name_sql } # 处理fixed_cols参数 _fixed_cols_dict = {} if fixed_col_define is not None: # 主表的列参数 _tab_as_name = '_main_table' if as_name is None else as_name _fixed_cols_dict[_tab_as_name] = { 'fixed_define': fixed_col_define, 'fixed_cols': copy.deepcopy(fixed_col_define.get('cols', [])) } _fixed_cols_dict[_tab_as_name]['fixed_cols'].append('_id') if left_join is not None: # 关联表的列参数 for _join_para in left_join: _join_db_name = _join_para.get('db_name', self._db_name) _join_tab = _join_para['collection'] _join_as_name = _join_para.get('as', _join_tab) _fixed_col_define = AsyncTools.sync_run_coroutine(self._get_fixed_col_define( _join_tab, db_name=_join_db_name, session=session )) if _fixed_col_define is not None: _fixed_cols_dict[_join_as_name] = { 'fixed_define': _fixed_col_define, 'fixed_cols': copy.deepcopy(_fixed_col_define.get('cols', [])) } _fixed_cols_dict[_join_as_name]['fixed_cols'].append('_id') # 生成sql _real_cols = [] for _key, _val in _projection.items(): _fixed_col_define = _fixed_cols_dict.get(_val['tab_as'], {}).get('fixed_define', None) _fixed_cols = _fixed_cols_dict.get(_val['tab_as'], {}).get('fixed_cols', None) _col = _val['col'] _as_name = _val['tab_as_sql'] _base_as_name = _val['tab_as'] if _fixed_cols is None or _col in _fixed_cols: _real_cols.append(('%s%s' % (_as_name, _col)) if _val.get('col_as', None) is None else '%s%s as %s' % (_as_name, _col, _val['col_as'])) else: if _base_as_name in ('_main_table', '_inner_temp_as_name'): _json_key = self._add_to_json_query_cols( _col, json_query_cols_dict=json_query_cols_dict, fixed_cols=_fixed_cols, as_name=_base_as_name, unuse_as_name=(_as_name == '') ) _json_as = json_query_cols_dict[_base_as_name][_json_key]['as'] _json_col = json_query_cols_dict[_base_as_name][_json_key]['real_col'] _real_cols.append( '%s.value as %s' % ( _json_as, _json_col if _val.get('col_as', None) is None else _val['col_as'] ) ) else: # 关联表不使用json_tree形式 _real_cols.append( '%s as %s' % ( self._get_json_extract_sql( _col, fixed_cols=_fixed_cols, as_name=_base_as_name, unuse_as_name=(_as_name == '') ), _col if _val.get('col_as', None) is None else _val['col_as'] ) ) # 返回sql return ','.join(_real_cols) def _get_sort_sql(self, sort: list, fixed_col_define: dict = None, sql_paras: list = [], json_query_cols_dict: dict = {}, left_join: list = None, session=None, unuse_as_name: bool = False) -> str: """ 获取兼容mongodb查询排序的sql语句 @param {list} sort - 查询结果的排序方式 例: [('col1', 1), ...] 注: 参数的第2个值指定是否升序(1为升序, -1为降序) @param {dict} fixed_col_define=None - 表的固定字段配置信息字典 { 'cols': [], # 表固定字段名清单 'define': {'字段名': {'type': 'str|bool|int|...'}} } @param {list} sql_paras=[] - 返回sql对应的占位参数 @param {dict} json_query_cols={} - 返回sql中json查询所需的json_tree处理的字段字典 @param {list} left_join=None - 左关联配置 @param {Any} session=None - 数据库事务连接对象 @param {bool} unuse_as_name=False - 指定不使用as_name @returns {str} - 返回更新部分语句sql """ _sorts = [] # 主表的字段定义参数 _main_fixed_cols = None if fixed_col_define is not None: _main_fixed_cols = copy.deepcopy(fixed_col_define.get('cols', [])) _main_fixed_cols.append('_id') # 关联表的字段定义列表 _fixed_cols_dict = {} # 循环处理每个排序参数 for _item in sort: # 处理临时参数 _col: str = _item[0] if _col[0] != '#': # 属于主表字段 _as_name = '_main_table' _col_as_name = '' if unuse_as_name else ('%s.' % _as_name) _fixed_cols = _main_fixed_cols else: # 属于关联表字段 _index = _col.find('.') _join_para = left_join[int(_col[1: _index])] _col = _col[_index+1:] _join_db_name = _join_para.get('db_name', self._db_name) _join_tab = _join_para['collection'] _as_name = '%s' % _join_para.get('as', _join_tab) _col_as_name = '%s.' % _as_name if _as_name in _fixed_cols_dict.keys(): _fixed_cols = _fixed_cols_dict.get(_as_name, None) else: # 重新获取并放入缓存字典 _fixed_col_define = AsyncTools.sync_run_coroutine(self._get_fixed_col_define( _join_tab, db_name=_join_db_name, session=session )) _fixed_cols = copy.deepcopy(_fixed_col_define.get('cols', [])) _fixed_cols.append('_id') _fixed_cols_dict[_as_name] = _fixed_cols # 生成排序sql语句 if _fixed_cols is None: # 所有字段认为是固定字段 _sorts.append('%s%s %s' % (_col_as_name, _col, 'asc' if _item[1] == 1 else 'desc')) else: if _col in _fixed_cols: _sorts.append('%s%s %s' % (_col_as_name, _col, 'asc' if _item[1] == 1 else 'desc')) else: # 属于扩展字段 if _as_name in ('_main_table', '_inner_temp_as_name'): _key = self._add_to_json_query_cols( _col, json_query_cols_dict=json_query_cols_dict, fixed_cols=_fixed_cols, as_name=_as_name, unuse_as_name=(_col_as_name == '') ) _sorts.append('%s.value %s' % ( json_query_cols_dict[_as_name][_key]['as'], 'asc' if _item[1] == 1 else 'desc' )) else: # 关联表不使用json_tree形式 _sorts.append('%s %s' % ( self._get_json_extract_sql( _col, fixed_cols=_fixed_cols, as_name=_as_name, unuse_as_name=(_col_as_name == '') ), 'asc' if _item[1] == 1 else 'desc' )) # 返回结果 return ','.join(_sorts) def _get_group_sql(self, group: dict, fixed_col_define: dict = None, sql_paras: list = [], json_query_cols_dict: dict = {}, unuse_as_name: bool = False) -> tuple: """ 生成分组sql语句 @param {dict} group - 分组返回设置字典(注意与mongodb的_id要求有所区别) 指定分组字段为col1、col2, 聚合字段为count、pay_amt, 其中pay_amt统计col_pay字段的合计数值 {'id': '$col1', 'name': '$col2', 'count': {'$sum': 1}, 'pay_amt': {'$sum': '$col_pay'}} 常见的聚合类型: $sum-计算总和, $avg-计算平均值, $min-取最小值, $max-取最大值, $first-取第一条, $last-取最后一条 @param {dict} fixed_col_define=None - 表的固定字段配置信息字典 { 'cols': [], # 表固定字段名清单 'define': {'字段名': {'type': 'str|bool|int|...'}} } @param {list} sql_paras=[] - 返回sql对应的占位参数 @param {dict} json_query_cols_dict={} - 返回sql中json查询所需的json_tree处理的字段字典 @param {bool} unuse_as_name=False - 指定不使用as_name @returns {tuple} - 返回sql, (select语句, group by语句) """ # 固定字段定义 if fixed_col_define is None: _fixed_cols = [] else: _fixed_cols = copy.deepcopy(fixed_col_define.get('cols', [])) _fixed_cols.append('_id') # 函数操作名映射 _op_mapping = { '$sum': 'SUM', '$avg': 'AVG', '$min': 'MIN', '$max': 'MAX' } _select = [] _groupby = [] for _key, _val in group.items(): _val_type = type(_val) if isinstance(_val, dict): # 是聚合函数 _op = list(_val.keys())[0] _col = _val[_op] if type(_col) == str and _col.startswith('$'): # 是字段 _col = _col[1:] if _col not in _fixed_cols: # 非固定字段 _col = self._add_to_json_query_cols( _col, json_query_cols_dict=json_query_cols_dict, fixed_cols=_fixed_cols, unuse_as_name=unuse_as_name ) _col = json_query_cols_dict['_main_table'][_col]['as'] _select.append('%s(%s.value) as %s' % (_op_mapping[_op], _col, _key)) else: _select.append('%s(%s) as %s' % (_op_mapping[_op], _col, _key)) else: # 是值 _select.append('%s(?) as %s' % (_op_mapping[_op], _key)) sql_paras.append(_col) elif _val_type == str and _val.startswith('$'): # 是字段 _col = _val[1:] if _col not in _fixed_cols: # 非固定字段 _col = self._add_to_json_query_cols( _col, json_query_cols_dict=json_query_cols_dict, fixed_cols=_fixed_cols, unuse_as_name=unuse_as_name ) _col = json_query_cols_dict['_main_table'][_col]['as'] _select.append('%s.value as %s' % (_col, _key)) _groupby.append('%s.value' % _col) else: _select.append('%s as %s' % (_col, _key)) _groupby.append('%s' % _col) else: # 是固定值 _select.append('? as %s' % _key) sql_paras.append(_val) return ','.join(_select), ','.join(_groupby) def _get_col_default_value(self, val: str, dbtype: str) -> str: """ 获取字段的默认值 @param {str} val - 字符串形式的值 @param {str} dbtype - 数据类型 @returns {str} - 返回默认值的字符串 """ if dbtype in ('int', 'float'): return str(val) elif dbtype == 'bool': return '1' if str(val).lower() == 'true' else '0' elif dbtype == 'json': if type(val) != str: return "'%s'" % self._db_quotes_str(json.dumps(val, ensure_ascii=False)) else: return "'%s'" % self._db_quotes_str(val) else: return "'%s'" % self._db_quotes_str(str(val)) def _get_left_join_sqls(self, db_name: str, collection: str, left_join: list, sql_paras: list = [], json_query_cols_dict: dict = {}, session=None, fixed_col_define: dict = None) -> list: """ 获取左关联的关联表sql清单 @param {str} db_name - 主表数据库名 @param {str} collection - 主表名 @param {list} left_join - 左关联配置 @param {list} sql_paras=[] - 返回sql对应的占位参数 @param {dict} json_query_cols_dict={} - 返回sql中json查询所需的json_tree处理的字段字典 @param {Any} session=None - 数据库事务连接对象 @param {dict} fixed_col_define=None - 主表的固定字段配置信息字典 { 'cols': [], # 表固定字段名清单 'define': {'字段名': {'type': 'str|bool|int|...'}} } @returns {list} - 关联表sql清单 """ _sqls = [] # 遍历生成每个表的关联sql for _join_para in left_join: _join_db_name = _join_para.get('db_name', db_name) _join_tab = _join_para['collection'] _as_name = _join_para.get('as', _join_tab) if _as_name not in json_query_cols_dict.keys(): # 初始化json列字典表 json_query_cols_dict[_as_name] = {} # 表字段定义 _fixed_col_define = {'cols': []} if fixed_col_define is None else fixed_col_define _fixed_cols = _fixed_col_define['cols'] _join_fixed_col_define = AsyncTools.sync_run_coroutine(self._get_fixed_col_define( _join_tab, db_name=_join_db_name, session=session )) _join_fixed_cols = _join_fixed_col_define['cols'] # on语句 _on_fields = [] for _on in _join_para['join_fields']: if _on[0] != '_id' and _on[0] not in _fixed_col_define['cols']: # 扩展字段 # _key = self._add_to_json_query_cols( # _on[0], json_query_cols_dict=json_query_cols_dict, fixed_cols=_fixed_cols # ) # _field0 = "%s.value" % json_query_cols_dict['_main_table'][_key]['as'] _field0 = self._get_json_extract_sql( _on[0], fixed_cols=_fixed_cols ) else: _field0 = '_main_table.%s' % _on[0] if _on[1] != '_id' and _on[1] not in _join_fixed_col_define['cols']: # 扩展字段 # _key = self._add_to_json_query_cols( # _on[1], json_query_cols_dict=json_query_cols_dict, fixed_cols=_join_fixed_cols, # as_name=_as_name # ) # _field1 = "%s.value" % json_query_cols_dict[_as_name][_key]['as'] _field1 = self._get_json_extract_sql( _on[1], fixed_cols=_join_fixed_cols, as_name=_as_name ) else: _field1 = '%s.%s' % (_as_name, _on[1]) _on_fields.append('%s = %s' % (_field0, _field1)) # 根据是否有过滤条件处理 _filter = _join_para.get('filter', None) if _filter is None: _sqls.append({ 'as': _as_name, 'tab': '%s.%s' % (_join_db_name, _join_tab), 'on': _on_fields }) else: # 有过滤条件, 按查询表的方式关联 _self_json_query_cols_dict = {'_inner_temp_as_name': {}} _self_sql_paras = [] _filter_sql = self._get_filter_sql( _filter, fixed_col_define=_join_fixed_col_define, sql_paras=_self_sql_paras, json_query_cols_dict=_self_json_query_cols_dict, as_name='_inner_temp_as_name', unuse_as_name=True ) _self_tabs = ['%s.%s' % (_join_db_name, _join_tab)] for _key, _key_para in _self_json_query_cols_dict['_inner_temp_as_name'].items(): _self_tabs.append( 'json_tree(%s, "$.%s") as %s' % ( _key_para['col'], _key_para['path'], _key_para['as'] ) ) _sqls.append({ 'as': _as_name, 'on': _on_fields, 'sql_paras': _self_sql_paras, 'tab': '(select * from %s where %s)' % ( ','.join(_self_tabs), _filter_sql ) }) return _sqls ############################# # 生成SQL转换的处理函数 ############################# def _sqls_fun_not_support(self, op: str, *args, **kwargs): """ 驱动不支持的情况 """ raise aiosqlite.NotSupportedError('sqlite3 not support this operation') def _sqls_fun_create_db(self, op: str, *args, **kwargs) -> tuple: """ 生成添加数据库的sql语句数组 """ _name = args[0] # 数据库名 _file = args[1] if len(args) > 1 else os.path.join(self._db_path, _name + '.db') # 数据库文件 _sql = "ATTACH DATABASE ? AS ?" return ([_sql], [(_file, _name)], {}) def _sqls_fun_list_dbs(self, op: str, *args, **kwargs) -> tuple: """ 生成获取数据库清单的sql语句数组 """ _sql = "PRAGMA database_list" return ([_sql], None, {'is_query': True}) def _sqls_fun_drop_db(self, op: str, *args, **kwargs) -> tuple: """ 生成分离数据库的sql语句数组 """ _name = args[0] # 数据库名 _sql = "DETACH DATABASE ?" return ([_sql], [(_name, )], {}) def _sqls_fun_create_collection(self, op: str, *args, **kwargs) -> tuple: """ 生成建表的sql语句数组 """ _db_prefix = '' if self._db_name == 'main' else ('%s.' % self._db_name) _collection = args[0] _sqls = [] # 生成表字段清单 _cols = [] if kwargs.get('fixed_col_define', None) is not None: for _col_name, _col_def in kwargs['fixed_col_define'].items(): _cols.append( '%s %s%s%s' % ( _col_name, self._dbtype_mapping(_col_def['type'], _col_def.get('len', None)), '' if _col_def.get('nullable', True) else ' not null', '' if _col_def.get('default', None) is None else ' default %s' % self._get_col_default_value( _col_def['default'], _col_def['type'] ) ) ) # 建表脚本, 需要带上数据库前缀 _sql = 'create table if not exists %s%s(_id varchar(100) primary key, %s nosql_driver_extend_tags JSON)' % ( _db_prefix, _collection, (', '.join(_cols) + ',') if len(_cols) > 0 else '', ) _sqls.append(_sql) # 建索引脚本, 创建索引时, 索引名带数据库前缀, 表名无需带前缀 if kwargs.get('indexs', None) is not None: for _index_name, _index_def in kwargs['indexs'].items(): _cols = [] for _col_name, _para in _index_def['keys'].items(): _cols.append(_col_name) _sql = 'create %sindex if not exists %s%s on %s(%s)' % ( 'UNIQUE ' if _index_def.get('paras', {}).get('unique', False) else '', _db_prefix, _index_name, _collection, ','.join(_cols) ) _sqls.append(_sql) # 返回结果 return (_sqls, None, {}) def _sql_fun_list_collections(self, op: str, *args, **kwargs) -> tuple: """ 生成查询表清单的sql语句数组 """ _db_prefix = '' if self._db_name == 'main' else ('%s.' % self._db_name) _filter = kwargs.get('filter', None) # 生成where语句 _sql_paras = [] _where = self._get_filter_sql(_filter, sql_paras=_sql_paras, unuse_as_name=True) _sql = "SELECT name FROM %ssqlite_master where type='table'%s order by name" % ( _db_prefix, '' if _where is None else ' and %s' % _where ) # 返回结果 return ([_sql], None if len(_sql_paras) == 0 else [_sql_paras], {'is_query': True}) def _sql_fun_drop_collection(self, op: str, *args, **kwargs) -> tuple: """ 生成删除表的sql语句数组 """ _collection = '%s%s' % ( '' if self._db_name == 'main' else ('%s.' % self._db_name), args[0] ) _sql = "drop table %s" % _collection return ([_sql], None, {}) def _sql_fun_turncate_collection(self, op: str, *args, **kwargs) -> tuple: """ 生成清空表的sql语句数组 """ _db_prefix = '' if self._db_name == 'main' else ('%s.' % self._db_name) _collection = args[0] _sqls = [] _sqls.append( 'delete from %s%s' % (_db_prefix, _collection) ) # 自增长ID设置为0, 语句执行会报错 # _sqls.append( # "update %ssqlite_sequence SET seq = 0 where name ='%s'" % (_db_prefix, _collection) # ) return (_sqls, None, {}) def _sql_fun_insert_one(self, op: str, *args, **kwargs) -> tuple: """ 生成插入数据的sql语句数组 """ _collection = '%s%s' % ( '' if self._db_name == 'main' else ('%s.' % self._db_name), args[0] ) _row = args[1] _fixed_col_define = args[2] if len(args) > 2 else kwargs.get('fixed_col_define', {}) # 生成插入字段和值 _cols = ['_id'] _sql_paras = [_row.pop('_id')] for _col in _fixed_col_define.get('cols', []): _val = _row.pop(_col, None) if _val is not None: _cols.append(_col) _sql_paras.append(self._python_to_dbtype(_val)[1]) # 剩余的内容放入扩展字段 _cols.append('nosql_driver_extend_tags') _sql_paras.append(self._python_to_dbtype(_row, dbtype='json')[1]) # 组成sql _sql = 'insert into %s(%s) values(%s)' % ( _collection, ','.join(_cols), ','.join(['?' for _tcol in _cols]) ) return ([_sql], [_sql_paras], {}) def _sql_fun_insert_many(self, op: str, *args, **kwargs) -> tuple: """ 生成插入数据的sql语句数组 """ _collection = '%s%s' % ( '' if self._db_name == 'main' else ('%s.' % self._db_name), args[0] ) _rows = args[1] _fixed_col_define = args[2] if len(args) > 2 else kwargs.get('fixed_col_define', {}) # 生成插入字段列表 _cols = ['_id'] for _col in _fixed_col_define.get('cols', []): if _col == '_id': continue _cols.append(_col) _cols.append('nosql_driver_extend_tags') # 遍历生成插入数据和参数 _para_array = [] _value_array = [] for _s_row in _rows: _row = copy.copy(_s_row) # 浅复制即可 _sql_paras = [_row.pop('_id', str(ObjectId()))] _col_values = ['?'] for _col in _fixed_col_define.get('cols', []): if _col == '_id': continue _val = _row.pop(_col, None) if _val is None: _col_values.append('null') else: _col_values.append('?') _sql_paras.append(self._python_to_dbtype(_val)[1]) # 剩余的内容放入扩展字段 _col_values.append('?') _sql_paras.append(self._python_to_dbtype(_row, dbtype='json')[1]) # 添加到数组 _para_array.extend(_sql_paras) _value_array.append('select %s' % ','.join(_col_values)) # 生成插入sql _sql = 'insert into %s(%s) %s' % ( _collection, ','.join(_cols), ' union '.join(_value_array) ) return ([_sql], [_para_array], {}) def _sql_fun_update(self, op: str, *args, **kwargs) -> tuple: """ 生成更新数据的sql语句数组 """ # 获取参数 _db_prefix = '' if self._db_name == 'main' else ('%s.' % self._db_name) _collection = args[0] _filter = args[1] _update = args[2] _fixed_col_define = kwargs.get('fixed_col_define', None) # 处理where条件语句 _where_sql_paras = [] _json_query_cols_dict = {'_main_table': {}} _where_sql = self._get_filter_sql( _filter, fixed_col_define=_fixed_col_define, sql_paras=_where_sql_paras, json_query_cols_dict=_json_query_cols_dict, unuse_as_name=True ) # 处理更新配置语句 _update_sql_paras = [] _update_sql = self._get_update_sql( _update, fixed_col_define=_fixed_col_define, sql_paras=_update_sql_paras, json_query_cols_dict=_json_query_cols_dict ) _sql_collection = '%s%s' % (_db_prefix, _collection) if len(_json_query_cols_dict['_main_table']) == 0: # 没有以json对象做条件的情况 _sql = 'update %s set %s' % (_sql_collection, _update_sql) if _where_sql is not None: _sql = '%s where %s' % (_sql, _where_sql) _update_sql_paras.extend(_where_sql_paras) else: # 有使用json对象做条件, 由于update语句不支持json_tree, 要使用子查询的方式处理 _tabs = [_sql_collection] for _key, _key_para in _json_query_cols_dict['_main_table'].items(): _tabs.append( 'json_tree(%s, "$.%s") as %s' % ( _key_para['col'], _key_para['path'], _key_para['as'] ) ) _sql = 'update %s set %s where _id in (select _id from %s%s)' % ( _sql_collection, _update_sql, ','.join(_tabs), '' if _where_sql is None else ' where %s' % _where_sql ) if _where_sql is not None: _update_sql_paras.extend(_where_sql_paras) # 返回语句 return ([_sql], [_update_sql_paras], {}) def _sql_fun_delete(self, op: str, *args, **kwargs) -> tuple: """ 生成删除数据的sql语句数组 """ # 获取参数 _db_prefix = '' if self._db_name == 'main' else ('%s.' % self._db_name) _collection = args[0] _filter = args[1] _fixed_col_define = kwargs.get('fixed_col_define', None) # 处理where条件语句 _where_sql_paras = [] _json_query_cols_dict = {'_main_table': {}} _where_sql = self._get_filter_sql( _filter, fixed_col_define=_fixed_col_define, sql_paras=_where_sql_paras, json_query_cols_dict=_json_query_cols_dict, unuse_as_name=True ) _sql_paras = None _sql_collection = '%s%s' % (_db_prefix, _collection) if len(_json_query_cols_dict['_main_table']) == 0: # 没有以json对象做条件的情况 _sql = 'delete from %s' % _sql_collection if _where_sql is not None: _sql = '%s where %s' % (_sql, _where_sql) _sql_paras = _where_sql_paras else: # 有使用json对象做条件, 由于delete语句不支持json_tree, 要使用子查询的方式处理 _tabs = [_sql_collection] for _key, _key_para in _json_query_cols_dict['_main_table'].items(): _tabs.append( 'json_tree(%s, "$.%s") as %s' % ( _key_para['col'], _key_para['path'], _key_para['as'] ) ) _sql = 'delete from %s where _id in (select _id from %s%s)' % ( _sql_collection, ','.join(_tabs), '' if _where_sql is None else ' where %s' % _where_sql ) if _where_sql is not None: _sql_paras = _where_sql_paras # 返回语句 return ([_sql], [_sql_paras], {}) def _sql_fun_query(self, op: str, *args, **kwargs) -> tuple: """ 生成查询数据的sql语句数组 """ # 获取参数 _db_prefix = '' if self._db_name == 'main' else ('%s.' % self._db_name) _collection = args[0] _filter = kwargs.get('filter', {}) _projection = kwargs.get('projection', None) _sort = kwargs.get('sort', None) _skip = kwargs.get('skip', None) _limit = kwargs.get('limit', None) _fixed_col_define = kwargs.get('fixed_col_define', None) _left_join = kwargs.get('left_join', None) # 关联查询 _session = kwargs.get('session', None) # 数据库操作的session # 存储不同表的json查询列信息的字典 _json_query_cols_dict = { '_main_table': {} # 主表 } # 处理关联表 _left_join_sql_paras = [] _left_join_sqls = None if _left_join is not None: _left_join_sqls = self._get_left_join_sqls( self._db_name, _collection, _left_join, sql_paras=_left_join_sql_paras, json_query_cols_dict=_json_query_cols_dict, session=_session, fixed_col_define=_fixed_col_define ) # 处理where条件语句 _where_sql_paras = [] _where_sql = self._get_filter_sql( _filter, fixed_col_define=_fixed_col_define, sql_paras=_where_sql_paras, json_query_cols_dict=_json_query_cols_dict, left_join=_left_join, session=_session ) # 处理sort语句 _sort_sql_paras = [] _sort_sql = None if _sort is not None: _sort_sql = self._get_sort_sql( _sort, fixed_col_define=_fixed_col_define, sql_paras=_sort_sql_paras, json_query_cols_dict=_json_query_cols_dict, left_join=_left_join, session=_session ) # 处理projection语句 _projection_sql_paras = [] _projection_sql = self._get_projection_sql( _projection, fixed_col_define=_fixed_col_define, sql_paras=_projection_sql_paras, json_query_cols_dict=_json_query_cols_dict, left_join=_left_join, session=_session ) # 形成查询json的表别名 _tabs = ['%s%s as _main_table' % (_db_prefix, _collection)] for _key, _key_para in _json_query_cols_dict['_main_table'].items(): _tabs.append( 'json_tree(%s, "$.%s") as %s' % ( _key_para['col'], _key_para['path'], _key_para['as'] ) ) # 组装语句 _sql_paras = [] _sql = 'select %s from %s' % (_projection_sql, ','.join(_tabs)) _sql_paras.extend(_projection_sql_paras) # 组装关联表 if _left_join_sqls is not None: for _left_join_para in _left_join_sqls: _as_name = _left_join_para['as'] _join_tabs = ['%s as %s' % (_left_join_para['tab'], _as_name)] # for _key, _key_para in _json_query_cols_dict[_as_name].items(): # _join_tabs.append( # 'json_tree(%s, "$.%s") as %s' % ( # _key_para['col'], _key_para['path'], _key_para['as'] # ) # ) _sql = '%s left outer join %s' % ( _sql, '%s on %s' % ( ','.join(_join_tabs), ' and '.join(_left_join_para['on']) ) ) if _left_join_para.get('sql_paras', None) is not None: _sql_paras.extend(_left_join_para['sql_paras']) if _where_sql is not None: _sql = '%s where %s' % (_sql, _where_sql) _sql_paras.extend(_where_sql_paras) if _sort_sql is not None: _sql = '%s order by %s' % (_sql, _sort_sql) _sql_paras.extend(_sort_sql_paras) # 增加skip和limit if _limit is not None: if _skip is not None: _sql = '%s %s' % (_sql, 'limit %d offset %d' % (_limit, _skip)) else: _sql = '%s %s' % (_sql, 'limit %d' % _limit) else: if _skip is not None: _sql = '%s %s' % (_sql, 'limit %d offset %d' % (-1, _skip)) # 返回最终结果 return ([_sql], [_sql_paras], {'is_query': True}) def _sql_fun_query_count(self, op: str, *args, **kwargs) -> tuple: """ 生成查询数据count的sql语句数组 """ # 获取参数 _db_prefix = '' if self._db_name == 'main' else ('%s.' % self._db_name) _collection = args[0] _filter = kwargs.get('filter', {}) _skip = kwargs.get('skip', None) _limit = kwargs.get('limit', None) _fixed_col_define = kwargs.get('fixed_col_define', None) _left_join = kwargs.get('left_join', None) # 关联查询 _session = kwargs.get('session', None) # 数据库操作的session # 存储不同表的json查询列信息的字典 _json_query_cols_dict = { '_main_table': {} # 主表 } # 处理关联表 _left_join_sql_paras = [] _left_join_sqls = None if _left_join is not None: _left_join_sqls = self._get_left_join_sqls( self._db_name, _collection, _left_join, sql_paras=_left_join_sql_paras, json_query_cols_dict=_json_query_cols_dict, session=_session, fixed_col_define=_fixed_col_define ) # 处理where条件语句 _where_sql_paras = [] _where_sql = self._get_filter_sql( _filter, fixed_col_define=_fixed_col_define, sql_paras=_where_sql_paras, json_query_cols_dict=_json_query_cols_dict, left_join=_left_join, session=_session ) # 形成查询json的表别名 _tabs = ['%s%s as _main_table' % (_db_prefix, _collection)] for _key, _key_para in _json_query_cols_dict['_main_table'].items(): _tabs.append( 'json_tree(%s, "$.%s") as %s' % ( _key_para['col'], _key_para['path'], _key_para['as'] ) ) # 组装语句 if _limit is not None or _skip is not None: # 有获取数据区间, 只能采用性能差的子查询模式 _sql_paras = [] _sub_sql = 'select * from %s' % (','.join(_tabs)) # 组装关联表 if _left_join_sqls is not None: for _left_join_para in _left_join_sqls: _sub_sql = '%s left outer join %s' % ( _sub_sql, '%s as %s on %s' % ( _left_join_para['tab'], _left_join_para['as'], ' and '.join(_left_join_para['on']) ) ) if _left_join_para.get('sql_paras', None) is not None: _sql_paras.extend(_left_join_para['sql_paras']) if _where_sql is not None: _sub_sql = '%s where %s' % (_sub_sql, _where_sql) _sql_paras.extend(_where_sql_paras) # 增加skip和limit if _limit is not None: if _skip is not None: _sub_sql = '%s %s' % (_sub_sql, 'limit %d offset %d' % (_limit, _skip)) else: _sub_sql = '%s %s' % (_sub_sql, 'limit %d' % _limit) else: if _skip is not None: _sub_sql = '%s %s' % (_sub_sql, 'limit %d offset %d' % (-1, _skip)) _sql = 'select count(*) from (%s)' % _sub_sql else: _sql_paras = [] _sql = 'select count(*) from %s' % (','.join(_tabs)) # 组装关联表 if _left_join_sqls is not None: for _left_join_para in _left_join_sqls: _sql = '%s left outer join %s' % ( _sql, '%s as %s on %s' % ( _left_join_para['tab'], _left_join_para['as'], ' and '.join(_left_join_para['on']) ) ) if _left_join_para.get('sql_paras', None) is not None: _sql_paras.extend(_left_join_para['sql_paras']) if _where_sql is not None: _sql = '%s where %s' % (_sql, _where_sql) _sql_paras.extend(_where_sql_paras) # 返回最终结果 return ([_sql], [_sql_paras], {'is_query': True}) def _sql_fun_query_group_by(self, op: str, *args, **kwargs) -> tuple: """ 生成查询数据聚合的sql语句数组 """ # 获取参数 _db_prefix = '' if self._db_name == 'main' else ('%s.' % self._db_name) _collection = args[0] _group = kwargs.get('group', None) _filter = kwargs.get('filter', {}) _projection = kwargs.get('projection', None) _sort = kwargs.get('sort', None) _fixed_col_define = kwargs.get('fixed_col_define', None) # 处理group by语句 _select_sql_paras = [] _json_query_cols_dict = {'_main_table': {}} _select_sql, _group_by_sql = self._get_group_sql( _group, fixed_col_define=_fixed_col_define, sql_paras=_select_sql_paras, json_query_cols_dict=_json_query_cols_dict, unuse_as_name=True ) # 处理where条件语句 _where_sql_paras = [] _where_sql = self._get_filter_sql( _filter, fixed_col_define=_fixed_col_define, sql_paras=_where_sql_paras, json_query_cols_dict=_json_query_cols_dict, unuse_as_name=True ) # 处理sort语句 _sort_sql_paras = [] _sort_sql = None if _sort is not None: _sort_sql = self._get_sort_sql( _sort, fixed_col_define=None, sql_paras=_sort_sql_paras, json_query_cols_dict=_json_query_cols_dict, unuse_as_name=True ) # 处理projection语句 _projection_sql_paras = [] _projection_sql = self._get_projection_sql( _projection, fixed_col_define=None, sql_paras=_projection_sql_paras, json_query_cols_dict=_json_query_cols_dict, is_group_by=True, unuse_as_name=True ) # 形成查询json的表别名 _tabs = ['%s%s' % (_db_prefix, _collection)] for _key, _key_para in _json_query_cols_dict['_main_table'].items(): _tabs.append( 'json_tree(%s, "$.%s") as %s' % ( _key_para['col'], _key_para['path'], _key_para['as'] ) ) # 组装查询语句 _sql_paras = [] _sql = 'select %s from %s' % ( _select_sql, ','.join(_tabs) ) _sql_paras.extend(_select_sql_paras) if _where_sql is not None: _sql = '%s where %s' % (_sql, _where_sql) _sql_paras.extend(_where_sql_paras) _sql = '%s group by %s' % (_sql, _group_by_sql) if _sort_sql is not None or _projection_sql != '*': # 有排序或指定返回字段的情况, 需要包装多一层 _sql = 'select %s from (%s)' % (_projection_sql, _sql) if _sort_sql is not None: _sql = '%s order by %s' % (_sql, _sort_sql) # 返回结果 return ([_sql], None if len(_sql_paras) == 0 else [_sql_paras], {'is_query': True}) ############################# # 其他内部函数 ############################# def _convert_path_array(self, path_list: list) -> str: """ 将json查询路径数组转换为sqlite支持的json查询路径字符串 @param {list} path_list - 路径数组 @returns {str} - 转换后的查询路径字符串 """ _path = '' for _key in path_list: if ValidateTool.str_is_int(_key): # 是字符串 _path = '%s[%s]' % (_path, _key) else: _path = '%s%s' % ('' if _path == '' else '%s.' % _path, _key) return _path def _add_to_json_query_cols(self, col_name: str, json_query_cols_dict: dict = {}, fixed_cols: list = [], as_name: str = '_main_table', unuse_as_name: bool = False) -> str: """ 将字段查询信息添加到json_query_cols_dict字典 @param {str} col_name - 查询字段名(x.x.x形式) @param {dict} json_query_cols_dict={} - 返回sql中json查询所需的json_tree处理的字段字典 key为查询名(json_query_[as_name]_字段名_字段路径.value), value为配置字典{'col': 物理字段名, 'path': '真实查询路径', 'as': '字段别名'} 注1: 真实查询路径 'key1.key2[10].key3' 对应的字段路径为 'key1_key2_10_key3' 注2: nosql_driver_extend_tags字段名不填入查询名中 @param {list} fixed_cols=[] - 固定字段定义清单 @param {str} as_name='_main_table' - 字典对应表的别名 @param {bool} unuse_as_name=False - 指定不使用as_name @returns {str} - 返回json_query_cols_dict字典对应的key """ _path_cols = col_name.split('.') _as = '%s%s' % ('' if unuse_as_name else ('%s_' % as_name), '_'.join(_path_cols)) _key = 'json_query_%s.value' % _as _key_para = json_query_cols_dict[as_name].get(_key, None) if _key_para is None: # 不在查询字段中, 需要添加到查询字段 if len(_path_cols) > 1 and _path_cols[0] in fixed_cols: # 是json类型的固定字段 _col_name = _path_cols[0] _path = self._convert_path_array(_path_cols[1:]) else: _col_name = 'nosql_driver_extend_tags' _path = self._convert_path_array(_path_cols) json_query_cols_dict[as_name][_key] = { 'col': '%s%s' % ('' if unuse_as_name else ('%s.' % as_name), _col_name), 'path': _path, 'as': _as, 'real_col': col_name } return _key def _get_json_extract_sql(self, col_name: str, fixed_cols: list = [], as_name: str = '_main_table', unuse_as_name: bool = False) -> str: """ 获取指定字段的json_extract函数字符串 @param {str} col_name - 查询字段名(x.x.x形式) @param {list} fixed_cols=[] - 固定字段定义清单 @param {str} as_name='_main_table' - 字典对应表的别名 @param {bool} unuse_as_name=False - 指定不使用as_name @returns {str} - 返回的sql语句 """ _as = '' if unuse_as_name else ('%s.' % as_name) _path_cols = col_name.split('.') if len(_path_cols) > 1 and _path_cols[0] in fixed_cols: # 是json类型的固定字段 _col_name = _path_cols[0] _path = self._convert_path_array(_path_cols[1:]) else: _col_name = 'nosql_driver_extend_tags' _path = self._convert_path_array(_path_cols) return 'json_extract(%s%s, "$.%s")' % (_as, _col_name, _path)