HiveNetNoSql.pgsql 源代码

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
# Copyright 2022 黎慧剑
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

"""
PostgreSQL的HiveNetNoSql实现模块

@module pgsql
@file pgsql.py
"""
import os
import sys
import copy
import json
from bson.objectid import ObjectId
from typing import Any, Union
from HiveNetCore.utils.run_tool import AsyncTools
from HiveNetCore.utils.validate_tool import ValidateTool
from HiveNetCore.connection_pool import PoolConnectionFW
# 自动安装依赖库
from HiveNetCore.utils.pyenv_tool import PythonEnvTools
process_install_psycopg = False
while True:
    try:
        # psycopg3 暂时不支持mac M1的版本
        if sys.platform == 'darwin':
            import psycopg2 as pgadapter
        else:
            from psycopg import AsyncConnection as pgadapter
        break
    except ImportError:
        if process_install_psycopg:
            break
        else:
            if sys.platform == 'darwin':
                PythonEnvTools.install_package('psycopg2-binary')
            else:
                PythonEnvTools.install_package('psycopg')
            process_install_psycopg = True
# 根据当前文件路径将包路径纳入, 在非安装的情况下可以引用到
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)))
from HiveNetNoSql.base.driver_fw import NosqlAIOPoolDriver


[文档]class PgSQLPoolConnection(PoolConnectionFW): """ PostgreSQL连接池连接对象 """ ############################# # 需要继承类实现的函数 ############################# async def _real_ping(self, *args, **kwargs) -> bool: """ 实现类的真实检查连接对象是否有效的的函数 @returns {bool} - 返回检查结果 """ try: self._conn.isolation_level return True except: return False async def _fade_close(self) -> Any: """ 实现类提供的虚假关闭函数 注1: 不关闭连接, 只是清空上一个连接使用的上下文信息(例如数据库连接进行commit或rollback处理) 注2: 如果必须关闭真实连接, 则可以关闭后创建一个新连接返回 @returns {Any} - 返回原连接或新创建的连接 """ _close_action = self._pool._pool_extend_paras.get('close_action', None) if _close_action == 'commit': await AsyncTools.async_run_coroutine(self._conn.commit()) elif _close_action == 'rollback': await AsyncTools.async_run_coroutine(self._conn.rollback()) return self._conn async def _real_close(self): """ 实现类提供的真实关闭函数 """ await AsyncTools.async_run_coroutine(self._conn.close())
[文档]class PgSQLNosqlDriver(NosqlAIOPoolDriver): """ PostgreSQL数据库MySQL驱动 """ ############################# # 构造函数重载, 主要是注释 #############################
[文档] def __init__(self, connect_config: dict = {}, pool_config: dict = {}, driver_config: dict = {}): """ 初始化驱动 @param {dict} connect_config={} - 数据库的连接参数 host {str} - 数据库主机地址, 默认为'localhost' port {int} - 连接数据库的端口, 默认为5432 usedb {str} - 登录后使用的数据库(其实是schema), 默认为'public' 注: postgresql的连接对象不支持切换真实数据库, 因此驱动所指的数据库其实是schema username {str} - 登录验证用户 password {str} - 登录验证密码 dbname {str} - 登录的真实数据库, 默认为'postgres' 注: 过程中无法切换真实数据库, 如果需要切换要重新创建驱动对象 connect_on_init {bool} - 是否启动时直接连接数据库 connect_timeout {float} - 连接数据库的超时时间, 单位为秒, 默认为20 default_str_len {int} - 默认的字符串类型长度, 默认为30 ...驱动实现类自定义支持的参数 transaction_share_cursor {bool} - 进行事务处理是否复用同一个游标对象, 默认为True psycopg2.connect 支持的其他参数... @param {dict} pool_config={} - 连接池配置 max_size {int} - 连接池的最大大小, 默认为100 min_size {int} - 连接池维持的最小连接数量, 默认为0 max_idle_time {float} - 连接被移除前的最大空闲时间, 单位为秒, 默认为None wait_queue_timeout {float} - 在没有空闲连接的时候, 请求连接所等待的超时时间, 单位为秒, 默认为None(不超时) ...驱动实现类自定义支持的参数 注: 其他参数为AIOConnectionPool所支持的参数 @param {dict} driver_config={} - 驱动配置 init_db {dict} - 要在启动驱动时创建的数据库 { '数据库名': { 'index_only': False, # 是否仅用于索引, 不创建 'comment': '', # 数据库注释 'args': [], # 创建数据库的args参数 'kwargs': {} #创建数据库的kwargs参数 } } init_collections {dict} - 要在启动驱动时创建的集合(表) { '数据库名': { '集合名': { 'index_only': False, # 是否仅用于索引, 不创建 'comment': '', # 集合注释 'indexs': {索引字典}, 'fixed_col_define': {固定字段定义}, 'partition': {表分区定义} } ... }, ... } init_yaml_file {str} - 要在启动时创建的数据库和集合(表)配置yaml文件 注1: 该参数用于将init_db和init_collections参数内容放置的配置文件中, 如果参数有值则忽略前面两个参数 注2: 配置文件为init_db和init_collections两个字典, 内容与这两个参数一致 logger {Logger} - 传入驱动的日志对象 ignore_index_error {bool} - 是否忽略索引创建的异常, 默认为True debug {bool} - 指定是否debug模式, 默认为False close_action {str} - 关闭连接时自动处理动作, None-不处理, 'commit'-自动提交, 'rollback'-自动回滚 regexp_ignore_case {bool} - 正则表达式是否忽略大小写, 默认为False """ super().__init__( connect_config=connect_config, pool_config=pool_config, driver_config=driver_config ) # 指定使用独立的insert_many语句, 性能更高 self._use_insert_many_generate_sqls = True
############################# # 特殊的重载函数 #############################
[文档] async def create_collection(self, collection: str, indexs: dict = None, fixed_col_define: dict = None, comment: str = None, **kwargs): """ 创建集合(相当于关系型数据库的表, 如果不存在则创建) 注意: 所有集合都有必须有 '_id' 这个记录的唯一主键字段 @param {str} collection - 集合名(表名) @param {dict} indexs=None - 要创建的索引字典, 格式为: { '索引名': { --索引的字段清单 'keys': { '字段名': { 'asc': 是否升序(1为升序, -1为降序) }, ... } --创建参数 'paras': { 'unique': 是否唯一索引(True/False), ...驱动实现类自定义支持的参数 } }, } @param {dict} fixed_col_define=None - 固定字段定义(只有固定字段才能进行索引), 格式如下: { '字段名': { 'type': '字段类型(str, int, float, bool, json)', 'len': 字段长度, 'nullable': True, # 是否可空 'default': 默认值, 'comment': '字段注释' }, ... } @param {str} comment=None - 集合注释 @param {kwargs} - 实现驱动自定义支持的参数 partition {dict} - 表分区设置 type : str, 指定创建分区的类型,可支持的分区类型如下: range - 范围分区, 支持通过表达式对分区字段进行格式转换(同步需配置转换表达式) 注1: 支持设置多个分区字段 注2: 每个分区的范围设置只有一个比较值, 实际符合分区的取值范围为"上一分区比较值 <= 字段值 < 当前分区比较值" 注3: 将自动创建一个默认分区, 当分区字段值无法匹配设置的分区时, 将存入默认分区中 list - 列表分区, 支持通过表达式对分区字段进行格式转换(同步需配置转换表达式) 注1: 分区字段仅支持设置单个字段 注2: 符合分区的取值范围为在数组中能找到的字段值 注3: 将自动创建一个默认分区, 当分区字段值无法匹配设置的分区时, 将存入默认分区中 hash - 哈希分区 注1: 支持设置多个分区字段 注2: hash类型分区不支持分区范围的设置, 而是通过count参数指定要拆分的分区数量 count: int, 拆分的分区数量, 仅hash的分区类型有效 columns : list, 分区字段设置, 列表每个值为对应的一个分区字段设置字典, 定义如下: col_name : str, 分区字段名 func : str, 转换函数表达式, 可通过{col_name}进行字段名的替换, 例如to_days({col_name}) range_list : list, 分区条件列表, 设置每个分区名和分区条件比较值, 仅range, list使用 name : str, 分区名, 不设置或设置为None代表自动生成分区名, 如果不是第一个分区字段无需设置(统一使用第一个分区字段的对应分区名) value : any, 分区条件比较值, 按不同分区类型应设置为不同的值 注1: 如果为range, 该值设置为单一比较常量值, 例如 3, "'test'", "to_days('2021-10-11')", None(代表最大值MAXVALUE) 注2: 如果为list, 该值应设置为list, 例如 [3, "'test'", 5, "to_days('2021-01-01')", None], None代表NULL 注3: 如果值为字符串, 应使用单引号进行包裹, 例如"'str_value'" sub_partition: dict, 子分区设置, 定义与主分区一致, 可以嵌套形成多级子分区 注意: 每个主分区下都会嵌套创建一套相同的子分区 注: 使用表分区的限制: 1. 如果 _id 没有作为分区条件, 则仅创建普通索引而非唯一索引, _id 的唯一性数据库层面不控制, 需要由应用自行控制 2. 要创建唯一索引的字段必须作为分区条件之一, 否则将会作为普通索引而非唯一索引来创建 """ # 只是单纯解决注释问题 await super().create_collection( collection, indexs=indexs, fixed_col_define=fixed_col_define, comment=comment, **kwargs )
############################# # 需要继承类实现的内部函数 ############################# def _get_db_creator(self, connect_config: dict, pool_config: dict, driver_config: dict) -> tuple: """ 获取数据库连接驱动及参数 @param {dict} connect_config={} - 数据库的连接参数 host {str} - 数据库主机地址, 默认为'localhost' port {int} - 连接数据库的端口, 默认为5432 usedb {str} - 登录后使用的数据库(其实是schema), 默认为'public' 注: postgresql的连接对象不支持切换真实数据库, 因此驱动所指的数据库其实是schema username {str} - 登录验证用户 password {str} - 登录验证密码 dbname {str} - 登录的真实数据库, 默认为'postgres' 注: 过程中无法切换真实数据库, 如果需要切换要重新创建驱动对象 connect_on_init {bool} - 是否启动时直接连接数据库 connect_timeout {float} - 连接数据库的超时时间, 单位为秒, 默认为20 default_str_len {int} - 默认的字符串类型长度, 默认为30 ...驱动实现类自定义支持的参数 transaction_share_cursor {bool} - 进行事务处理是否复用同一个游标对象, 默认为True psycopg2.connect 支持的其他参数... @param {dict} pool_config={} - 连接池配置 max_size {int} - 连接池的最大大小, 默认为100 min_size {int} - 连接池维持的最小连接数量, 默认为0 max_idle_time {float} - 连接被移除前的最大空闲时间, 单位为秒, 默认为None wait_queue_timeout {float} - 在没有空闲连接的时候, 请求连接所等待的超时时间, 单位为秒, 默认为None(不超时) ...驱动实现类自定义支持的参数 wait_connection_idle {float} - 没有空闲连接时, 等待获取连接的间隔时长, 单位为秒, 默认维护0.01 @param {dict} driver_config={} - 驱动配置 init_db {dict} - 要在启动驱动时创建的数据库 { '数据库名': { 'index_only': False, # 是否仅用于索引, 不创建 'comment': '', # 数据库注释 'args': [], # 创建数据库的args参数 'kwargs': {} #创建数据库的kwargs参数 } } init_collections {dict} - 要在启动驱动时创建的集合(表) { '数据库名': { '集合名': { 'index_only': False, # 是否仅用于索引, 不创建 'comment': '', # 集合注释 'indexs': {索引字典}, 'fixed_col_define': {固定字段定义} } ... }, ... } logger {Logger} - 传入驱动的日志对象 debug {bool} - 指定是否debug模式, 默认为False close_action {str} - 关闭连接时自动处理动作, None-不处理, 'commit'-自动提交, 'rollback'-自动回滚 regexp_ignore_case {bool} - 正则表达式是否忽略大小写, 默认为False @returns {dict} - 返回连接池的相关参数 { 'creator': class, # 连接创建模块或对象 'pool_connection_class': class, # 连接池连接对象实现类(继承PoolConnectionFW的类对象) 'args': [], # 进行连接创建的固定位置参数 'kwargs': {}, # 进行连接创建的kv参数 'connect_method_name': 'connect', # 连接创建模块要执行的连接方法, 传None代表直接使用creator创建连接 'pool_update_config': {}, # 要指定AIOConnectionPool特定值的更新字典 'current_db_name': '', # 当前数据库名 } """ # 标准类型和数据库类型的映射关系字典 self._dbtype_cast_mapping = { 'str': 'varchar', 'int': 'int', 'float': 'float', 'bool': 'boolean', 'json': 'json' } self._regexp_op = '~*' if self._driver_config.get('regexp_ignore_case', False) else '~' # 初始化处理函数的映射字典 self._sqls_fun_mapping = { 'create_db': self._sqls_fun_create_db, 'switch_db': self._sqls_fun_not_support, 'list_dbs': self._sqls_fun_list_dbs, 'drop_db': self._sqls_fun_drop_db, 'create_collection': self._sqls_fun_create_collection, 'list_collections': self._sql_fun_list_collections, 'drop_collection': self._sql_fun_drop_collection, 'turncate_collection': self._sql_fun_turncate_collection, 'insert_one': self._sql_fun_insert_one, 'insert_many': self._sql_fun_insert_many, 'update': self._sql_fun_update, 'delete': self._sql_fun_delete, 'query': self._sql_fun_query, 'query_count': self._sql_fun_query_count, 'query_group_by': self._sql_fun_query_group_by } # 初始化查询操作符对应的符号 self._filter_symbol_mapping = { '$lt': '<', '$lte': '<=', '$gt': '>', '$gte': '>=', '$ne': '!=' } # 生成aiomysql的连接配置 _args = [] _kwargs = {} # 要处理的参数 _kwargs['dbname'] = connect_config.get('dbname', 'postgres') _kwargs['user'] = connect_config.get('username', None) _kwargs['connect_timeout'] = connect_config.get('connect_timeout', 20) # 移除不使用的参数 _connect_config = copy.deepcopy(connect_config) for _pop_item in ('usedb', 'dbname', 'username', 'connect_on_init', 'connect_timeout', 'transaction_share_cursor'): _connect_config.pop(_pop_item, None) # 合并参数 _kwargs.update(_connect_config) return { 'creator': pgadapter, 'pool_connection_class': PgSQLPoolConnection, 'args': _args, 'kwargs': _kwargs, 'connect_method_name': 'connect', 'pool_update_config': { 'pool_extend_paras': { 'close_action': driver_config.get('close_action', None) } }, 'current_db_name': connect_config.get('usedb', 'public') } def _generate_sqls(self, op: str, *args, **kwargs) -> tuple: """ 生成对应操作要执行的sql语句数组 @param {str} op - 要执行的操作(传入函数名字符串) @param {args} - 要执行操作函数的固定位置入参 @param {kwargs} - 要执行操作函数的kv入参 @returns {tuple} - 返回要执行的sql信息(sqls, sql_paras, execute_paras) sqls: list, 要顺序执行的sql语句数组; 注意, 仅最后一个语句支持为查询语句, 前面的语句都必须为非查询语句 sql_paras: list, 传入的SQL参数字典(支持?占位), 注意必须与sqls数组一一对应(如果只有个别语句需要传参, 其他位置设置为None; 如果全部都无需传参, 该值直接传None) execute_paras: dict, 最后一个SQL语句的执行参数 {'is_query': ...} checks: list, 传入每个语句执行检查字典列表, 注意必须与sqls数组一一对应(如果只有个别语句需要传参, 其他位置设置为None; 如果全部都无需传参, 该值直接传None) """ _func = self._sqls_fun_mapping.get(op, None) if _func is None: raise pgadapter.NotSupportedError('driver not support this operation') _ret = _func(op, *args, **kwargs) if len(_ret) == 4: return _ret else: _new_ret = [] _new_ret.extend(_ret) _new_ret.append(None) # 补充最后一个参数 return _new_ret def _format_row_value(self, row: list) -> list: """ 处理行记录的值(数据库存储类型转为Python类型) @param {list} row - 行记录 @returns {list} - 转换后的行记录 """ _new_row = [] for _item in row: _new_row.append(self._dbtype_to_python(_item)) return _new_row def _driver_init_connection(self, conn: Any): """ 驱动对获取到的连接的初始化处理 @param {Any} conn - 传入连接对象 """ pass async def _get_cols_info(self, collection: str, db_name: str = None, session: Any = None) -> list: """ 获取制定集合(表)的列信息 @param {str} collection - 集合名(表) @param {str} db_name=None - 数据库名(不指定代表默认当前数据库) @param {Any} session=None - 指定事务连接对象 @returns {list} - 字典形式的列信息数组, 注意列名为name, 类型为type(类型应为标准类型: str, int, float, bool, json) """ if session is not None: _conn = session[0] _cursor = session[1] else: _conn = None _cursor = None # 获取表结构 _db_name = self._db_name if db_name is None else db_name _sql = "select column_name as name, data_type as type from information_schema.columns where table_schema='%s' and table_name='%s' order by ordinal_position" % ( _db_name, collection ) _ret = await self._execute_sql( _sql, paras=None, is_query=True, conn=_conn, cursor=_cursor ) # 处理标准类型 for _row in _ret: if _row['type'] == 'boolean': _row['type'] = 'bool' elif _row['type'] == 'integer': _row['type'] = 'int' elif _row['type'] == 'double precision': _row['type'] = 'float' elif _row['type'] == 'jsonb': _row['type'] = 'json' else: _row['type'] = 'str' return _ret async def _get_current_db_name(self, session: Any = None) -> str: """ 获取当前数据库名 @param {Any} session=None - 指定事务连接对象 @returns {str} - 数据库名 """ if self._db_name is not None: return self._db_name else: return 'public' ############################# # 需要单独重载的函数 #############################
[文档] async def switch_db(self, name: str, *args, **kwargs): """ 切换当前数据库到指定数据库 @param {str} name - 数据库名 """ self._db_name = name
############################# # 支持SQL处理的通用函数 ############################# def _dbtype_mapping(self, std_type: str, len: int) -> str: """ 获取字段的数据库类型 @param {str} std_type - 字段类型(str, int, float, bool, json) @param {int} len - 字段长度 @returns {str} - 返回数据库类型 """ if std_type in ('int', 'float'): return std_type elif std_type == 'bool': return 'boolean' elif std_type == 'json': return 'JSONB' else: return 'varchar(%d)' % (self._default_str_len if len is None else len) def _python_to_dbtype(self, val: Any, dbtype: str = None, is_json: bool = False) -> tuple: """ 将Python对象转换为数据库存储类型 @param {Any} val - 要转换的Python对象 @param {str} dbtype=None - 强制指定数据库类型, 如果不指定则自动判断 @param {str} is_json=False - 是否json数据(用于存入json的内容) @returns {tuple} - 返回转换后的结果(dbtype, dbvalue) """ # 判断类型 _dbtype = dbtype if _dbtype is None: _pytype = type(val) if _pytype == bool: _dbtype = 'bool' elif _pytype == int: _dbtype = 'int' elif _pytype == float: _dbtype = 'float' elif _pytype in (dict, tuple, list): _dbtype = 'json' else: _dbtype = 'str' # 进行转换处理 _dbvalue = None if _dbtype == 'bool': if is_json: _dbvalue = 'true' if val else 'false' else: _dbvalue = True if val else False elif _dbtype in ('int', 'float'): _dbvalue = val elif _dbtype == 'json': _dbvalue = json.dumps(val, ensure_ascii=False) else: _dbvalue = str(val) # 返回结果 return (_dbtype, _dbvalue) def _dbtype_to_python(self, val: Any) -> Any: """ 数据库值转换为python类型 @param {Any} val - 数据库值 @returns {Any} - Python值 """ if type(val) == str: try: return json.loads(val) except: return val else: return val def _db_quotes_str(self, val: str) -> str: """ 数据库单引号转义处理 @param {str} val - 要处理的字符串 @returns {str} - 转义处理后的字符串, 注意不包含外面的引号 """ return val.replace('\\', '\\\\').replace('\r', '\\r').replace( '\n', '\\n').replace('\t', '\\t').replace("'", "''").replace('"', '\\"') def _get_filter_unit_sql(self, key: str, val: Any, fixed_col_define: dict = None, sql_paras: list = [], left_join: list = None, session=None) -> str: """ 获取兼容mongodb过滤条件规则的单个规则对应的sql @param {str} key - 规则key @param {Any} val - 规则值 @param {dict} fixed_col_define=None - 表的固定字段配置信息字典 { 'cols': [], # 表固定字段名清单 'define': {'字段名': {'type': 'str|bool|int|...'}} } @param {list} sql_paras=[] - 返回sql对应的占位参数 @param {list} left_join=None - 左关联配置 @param {Any} session=None - 数据库事务连接对象 @returns {str} - 单个规则对应的sql """ # 根据字段判断是主表还是关联表 _key = key _fixed_cols = None if _key[0] != '#': # 主表的过滤条件 _as_name = '' if fixed_col_define is not None: _fixed_cols = copy.deepcopy(fixed_col_define.get('cols', [])) else: # 关联表的过滤条件 _index = _key.find('.') _join_para = left_join[int(_key[1: _index])] _key = _key[_index+1:] _join_db_name = _join_para.get('db_name', self._db_name) _join_tab = _join_para['collection'] _as_name = '"%s".' % _join_para.get('as', _join_tab) _fixed_col_define = AsyncTools.sync_run_coroutine(self._get_fixed_col_define( _join_tab, db_name=_join_db_name, session=session )) if _fixed_col_define is not None: _fixed_cols = copy.deepcopy(_fixed_col_define.get('cols', [])) # 判断是否处理json的值, 形成最后比较的 key 值 _col_type = None _is_json = False _cast_by_val = False # 是否需要根据比较值判断类型 if _fixed_cols is not None: if _key != '_id' and _key not in _fixed_cols: _is_json = True _path_cols = _key.split('.') if len(_path_cols) > 1 and _path_cols[0] in _fixed_cols: # json固定字段 _path = self._convert_jsonset_array(_path_cols[1:]) _col_name = _path_cols[0] else: _path = self._convert_jsonset_array(_path_cols) _col_name = 'nosql_driver_extend_tags' _col_type = fixed_col_define.get('define', {}).get(_key, {}).get('type', None) if _col_type is None: _key = "%s\"%s\"#>'{%s}'" % (_as_name, _col_name, _path) # _key = "jsonb_path_query_first(\"%s\", '$.%s')" % (_col_name, _path) # _key = "\"%s\"->'%s'" % (_col_name, _path) _cast_by_val = True elif _col_type == 'str': # 字符串返回, 去除头尾的双引号(否则会带双引号) _key = "%s\"%s\"#>>'{%s}'" % (_as_name, _col_name, _path) # _key = "right(left(cast(jsonb_path_query_first(\"%s\", '$.%s') as varchar), -1), -1)" % (_col_name, _path) # _key = "\"nosql_driver_extend_tags\"->>'%s'" % key else: _key = "cast(%s\"%s\"#>'{%s}' as %s)" % (_as_name, _col_name, _path, self._dbtype_cast_mapping[_col_type]) # _key = "cast(jsonb_path_query_first(\"%s\", '$.%s'), as %s)" % ( # _col_name, _path, self._dbtype_cast_mapping[_col_type] # ) if isinstance(val, dict): # 有特殊规则 _cds = [] # 每个规则的数组 for _op, _para in val.items(): if _op in self._filter_symbol_mapping.keys(): # 比较值转换为数据库的格式 _dbtype, _cmp_val = self._python_to_dbtype(_para) if _is_json: # json字段 if _cast_by_val: if _dbtype == 'str': _key = 'right(left(cast(%s as varchar), -1), -1)' % _key else: _key = 'cast(%s as %s)' % (_key, self._dbtype_cast_mapping[_dbtype]) else: # 固定字段 _key = '%s"%s"' % (_as_name, _key) _cds.append('%s %s %s' % (_key, self._filter_symbol_mapping[_op], '%s')) sql_paras.append(_cmp_val) elif _op in ('$in', '$nin'): # in 和 not in if _is_json: # json字段 if _cast_by_val: _dbtype, _cmp_val = self._python_to_dbtype(_para[0]) if _dbtype == 'str': _key = 'right(left(cast(%s as varchar), -1), -1)' % _key else: _key = 'cast(%s as %s)' % (_key, self._dbtype_cast_mapping[_dbtype]) else: # 固定字段 _key = '%s"%s"' % (_as_name, _key) _cds.append('%s %s (%s)' % ( _key, 'in' if _op == '$in' else 'not in', ','.join(['%s' for _item in _para]) )) for _item in _para: _dbtype, _cmp_val = self._python_to_dbtype(_item) sql_paras.append(_cmp_val) elif _op == '$regex': if _is_json: if _cast_by_val: _key = 'right(left(cast(%s as varchar), -1), -1)' % _key else: _key = '%s"%s"' % (_as_name, _key) _cds.append("%s %s %s" % (_key, self._regexp_op, '%s')) sql_paras.append(_para) else: raise pgadapter.NotSupportedError('psycopg not support this search operation [%s]' % _op) _sql = ' and '.join(_cds) else: # 直接相等的条件 if val is None: _sql = '%s is NULL' % (_key if _is_json else '%s"%s"' % (_as_name, _key)) else: _dbtype, _cmp_val = self._python_to_dbtype(val) if _is_json and _cast_by_val: if _dbtype == 'str': _key = 'right(left(cast(%s as varchar), -1), -1)' % _key else: _key = 'cast(%s as %s)' % (_key, self._dbtype_cast_mapping[_dbtype]) _sql = '%s = %s' % (_key if _is_json else '%s"%s"' % (_as_name, _key), '%s') sql_paras.append(_cmp_val) return _sql def _get_filter_sql(self, filter: dict, fixed_col_define: dict = None, sql_paras: list = [], left_join: list = None, session=None) -> str: """ 获取兼容mongodb过滤条件规则的sql语句 @param {dict} filter - 过滤规则字典 @param {dict} fixed_col_define=None - 表的固定字段配置信息字典 { 'cols': [], # 表固定字段名清单 'define': {'字段名': {'type': 'str|bool|int|...'}} } @param {list} sql_paras=[] - 返回sql对应的占位参数 @param {list} left_join=None - 左关联配置 @param {Any} session=None - 数据库事务连接对象 @returns {str} - 返回的sql语句, 如果没有条件则返回None """ if filter is None or len(filter) == 0: return None # 遍历进行解析 _condition_list = [] for _col, _val in filter.items(): if _col == '$or': # or的情况,处理的是字典数组 _or_list = list() for _condition in _val: # 逐个条件处理 _where = self._get_filter_sql( _condition, fixed_col_define=fixed_col_define, sql_paras=sql_paras, left_join=left_join, session=session ) if len(_condition) > 1: # 多条件的情况,需要增加括号进行集合处理 _where = '(%s)' % _where if _where is not None: _or_list.append(_where) # 组合放到全局列表中 _condition_list.append( '(%s)' % ' or '.join(_or_list) ) else: # 正常的and条件 _where = self._get_filter_unit_sql( _col, _val, fixed_col_define=fixed_col_define, sql_paras=sql_paras, left_join=left_join, session=session ) # 添加条件 _condition_list.append(_where) # 组合条件并返回 return ' and '.join(_condition_list) def _get_update_sql(self, update: dict, fixed_col_define: dict = None, sql_paras: list = []) -> str: """ 获取兼容mongodb更新语句的sql语句 @param {dict} update - 更新配置字典 @param {dict} fixed_col_define=None - 表的固定字段配置信息字典 { 'cols': [], # 表固定字段名清单 'define': {'字段名': {'type': 'str|bool|int|...'}} } @param {list} sql_paras=[] - 返回sql对应的占位参数 @returns {str} - 返回更新部分语句sql """ # 更新辅助字典, key为要更新的字段名, value为{'sql': '对应的sql语句, 比如%s', 'paras': [传入sql的参数列表]} _upd_dict = {} # 扩展字典, key为物理字段名, value为对应的扩展设置 # 'set_dict': {'sqls': [], 'paras': []} , 扩展字段的set字典, 设置json指定key的值, 每处理一个字段在sqls增加一个语句, 对应在paras参数列表 # 'remove_list': [] , 扩展字段的remove列表, 为要删除json的key的字段列表 _extend = {} # 遍历处理 for _op, _para in update.items(): for _key, _val in _para.items(): if fixed_col_define is None or _key == '_id' or _key in fixed_col_define.get('cols', []): # 是固定字段 if _op == '$set': _upd_dict[_key] = {'sql': '%s', 'paras': [self._python_to_dbtype(_val)[1]]} elif _op == '$unset': _upd_dict[_key] = {'sql': 'null', 'paras': []} elif _op == '$inc': _upd_dict[_key] = {'sql': 'COALESCE("%s",0) + %s' % (_key, '%s'), 'paras': [_val]} elif _op == '$mul': _upd_dict[_key] = {'sql': 'COALESCE("%s",0) * %s' % (_key, '%s'), 'paras': [_val]} elif _op == '$min': _upd_dict[_key] = { 'sql': 'case when COALESCE("{key}", {pos}) < {pos} then COALESCE("{key}",0) else {pos} end'.format(key=_key, pos='%s'), 'paras': [_val, _val, _val] } elif _op == '$max': _upd_dict[_key] = { 'sql': 'case when COALESCE("{key}", {pos}) > {pos} then COALESCE("{key}",0) else {pos} end'.format(key=_key, pos='%s'), 'paras': [_val, _val, _val] } else: raise pgadapter.NotSupportedError('psycopg not support this update operation [%s]' % _op) else: # 是扩展字段 _path_cols = _key.split('.') if len(_path_cols) > 1 and _path_cols[0] in fixed_col_define.get('cols', []): _col_name = _path_cols[0] _path = self._convert_jsonset_array(_path_cols[1:]) else: _col_name = 'nosql_driver_extend_tags' _path = self._convert_jsonset_array(_path_cols) # 初始化扩展字典 if _extend.get(_col_name, None) is None: _extend[_col_name] = { 'set_dict': {'sqls': [], 'paras': []}, 'remove_list': [] } if _op == '$set': _dbtype, _dbval = self._python_to_dbtype(_val, is_json=True) if _dbtype in ('int', 'float', 'bool'): _sql = "'{{{path}}}', '{val}'" elif _dbtype == 'json': _sql = "'{%s}', '%s'" % (_path, str(_dbval).replace("'", "''")) _extend[_col_name]['set_dict']['sqls'].append(_sql) continue else: _sql = "'{%s}', '\"%s\"'" % (_path, self._db_quotes_str(str(_dbval))) _extend[_col_name]['set_dict']['sqls'].append(_sql) continue elif _op == '$unset': _extend[_col_name]['remove_list'].append(" #- '{%s}'" % _path) continue elif _op in ('$inc', '$mul', '$min', '$max'): _dbtype, _dbval = self._python_to_dbtype(_val, is_json=True) # 需要取值出来, 添加查询字段 if _op == '$inc': _sql = "'{{{path}}}', to_jsonb(COALESCE(cast(\"{col_name}\"#>'{{{path}}}' as float), 0) + {val})" elif _op == '$mul': _sql = "'{{{path}}}', to_jsonb(COALESCE(cast(\"{col_name}\"#>'{{{path}}}' as float), 0) * {val})" elif _op == '$min': _sql = "'{{{path}}}', case when COALESCE(cast(\"{col_name}\"#>'{{{path}}}' as float), {val}) < {val} then to_jsonb(COALESCE(cast(\"{col_name}\"#>'{{{path}}}' as float), 0)) else '{val}' end" elif _op == '$max': _sql = "'{{{path}}}', case when COALESCE(cast(\"{col_name}\"#>'{{{path}}}' as float), {val}) > {val} then to_jsonb(COALESCE(cast(\"{col_name}\"#>'{{{path}}}' as float), 0)) else '{val}' end" else: raise pgadapter.NotSupportedError('psycopg not support this update operation [%s]' % _op) # 处理格式化 _extend[_col_name]['set_dict']['sqls'].append( _sql.format(col_name=_col_name, path=_path, pos='%s', val=str(_dbval)) ) # 开始生成sql语句和返回参数 _sqls = [] for _key, _val in _upd_dict.items(): _sqls.append('"%s"=%s' % (_key, _val['sql'])) if _val['paras'] is not None: sql_paras.extend(_val['paras']) # 处理扩展字段 for _col_name, _extend_para in _extend.items(): _remove_sql = '' if len(_extend_para['remove_list']) > 0: _remove_sql = ''.join(_extend_para['remove_list']) if len(_extend_para['set_dict']['sqls']) == 0: if _remove_sql != '': _sqls.append('"%s"="%s"%s' % (_col_name, _col_name, _remove_sql)) else: # 嵌套更新值 _set_sql = _col_name for _sql in _extend_para['set_dict']['sqls']: _set_sql = 'jsonb_set(%s, %s, true)' % (_set_sql, _sql) _sqls.append( '"%s"=%s%s' % ( _col_name, _set_sql, _remove_sql ) ) sql_paras.extend(_extend_para['set_dict']['paras']) return ','.join(_sqls) def _get_projection_sql(self, projection: Union[dict, list], fixed_col_define: dict = None, sql_paras: list = [], is_group_by: bool = False, as_name: str = None, left_join: list = None, session=None) -> str: """ 获取兼容mongodb查询返回字段的sql语句 @param {Union[dict, list]} projection - 指定结果返回的字段信息 列表模式: ['col1','col2', ...] 注意: 该模式一定会返回 _id 这个主键 字典模式: {'_id': False, 'col1': True, ...} 该方式可以通过设置False屏蔽 _id 的返回 @param {dict} fixed_col_define=None - 表的固定字段配置信息字典 { 'cols': [], # 表固定字段名清单 'define': {'字段名': {'type': 'str|bool|int|...'}} } @param {list} sql_paras=[] - 返回sql对应的占位参数 @param {bool} is_group_by=False - 指定是否group by的处理, 如果是不会处理_id字段 @param {str} as_name=None - 字段对应的表别名 @param {list} left_join - 左关联配置 @param {Any} session=None - 数据库事务连接对象 @returns {str} - 返回更新部分语句sql """ # 如果不指定, 返回所有字段 if projection is None: _project_sql = '%s*' % ('' if as_name is None else ('"%s".' % as_name)) if left_join is not None: # 补充关联表的所有字段获取 for _join_para in left_join: _project_sql = '%s, "%s".*' % ( _project_sql, _join_para.get('as', _join_para['collection']) ) return _project_sql # 定义的内部函数 def _get_join_col_info(col: str, left_join: list, tab_as_name: str) -> tuple: # 获取关联表列信息 if col[0] == '#': _index = col.find('.') _join_para = left_join[int(col[1: _index])] _col = col[_index+1:] _join_as_name = _join_para.get('as', _join_para['collection']) _join_as_name_sql = '"%s".' % _join_as_name else: _col = col _join_as_name = '_main_tab' if tab_as_name is None else tab_as_name _join_as_name_sql = '' if tab_as_name is None else ('"%s".' % tab_as_name) return _col, _join_as_name, _join_as_name_sql # 标准化要显示的字段清单 _projection = {} if isinstance(projection, dict): for _key, _show in projection.items(): if type(_show) == str and _show[0] == '$': _col, _tab_as_name, _tab_as_name_sql = _get_join_col_info( _show[1:], left_join, as_name ) _projection[_key] = { 'col': _col, 'tab_as': _tab_as_name, 'tab_as_sql': _tab_as_name_sql, 'col_as': _key } elif _show: _col, _tab_as_name, _tab_as_name_sql = _get_join_col_info( _key, left_join, as_name ) _projection[_key] = { 'col': _col, 'tab_as': _tab_as_name, 'tab_as_sql': _tab_as_name_sql } else: # 列表形式, _id是必须包含的 if not is_group_by and '_id' not in projection: _projection['_id'] = { 'col': '_id', 'tab_as': '_main_tab' if as_name is None else as_name, 'tab_as_sql': '' if as_name is None else ('"%s".' % as_name) } for _key in projection: _col, _tab_as_name, _tab_as_name_sql = _get_join_col_info( _key, left_join, as_name ) _projection[_key] = { 'col': _col, 'tab_as': _tab_as_name, 'tab_as_sql': _tab_as_name_sql } # 处理fixed_cols参数 _fixed_cols_dict = {} if fixed_col_define is not None: # 主表的列参数 _tab_as_name = '_main_tab' if as_name is None else as_name _fixed_cols_dict[_tab_as_name] = { 'fixed_define': fixed_col_define, 'fixed_cols': copy.deepcopy(fixed_col_define.get('cols', [])) } _fixed_cols_dict[_tab_as_name]['fixed_cols'].append('_id') if left_join is not None: # 关联表的列参数 for _join_para in left_join: _join_db_name = _join_para.get('db_name', self._db_name) _join_tab = _join_para['collection'] _join_as_name = _join_para.get('as', _join_tab) _fixed_col_define = AsyncTools.sync_run_coroutine(self._get_fixed_col_define( _join_tab, db_name=_join_db_name, session=session )) if _fixed_col_define is not None: _fixed_cols_dict[_join_as_name] = { 'fixed_define': _fixed_col_define, 'fixed_cols': copy.deepcopy(_fixed_col_define.get('cols', [])) } _fixed_cols_dict[_join_as_name]['fixed_cols'].append('_id') # 生成sql _real_cols = [] for _key, _val in _projection.items(): _fixed_col_define = _fixed_cols_dict.get(_val['tab_as'], {}).get('fixed_define', None) _fixed_cols = _fixed_cols_dict.get(_val['tab_as'], {}).get('fixed_cols', None) _col = _val['col'] _as_name = _val['tab_as_sql'] if _fixed_cols is None or _col in _fixed_cols: # 固定字段 _real_cols.append(('%s"%s"' % (_as_name, _col)) if _val.get('col_as', None) is None else '%s"%s" as "%s"' % (_as_name, _col, _val['col_as'])) else: # 其他非固定字段 _path_cols = _col.split('.') if len(_path_cols) > 1 and _path_cols[0] in _fixed_cols: _col_name = _path_cols[0] _path = self._convert_jsonset_array(_path_cols[1:]) else: _col_name = 'nosql_driver_extend_tags' _path = self._convert_jsonset_array(_path_cols) _col_type = _fixed_col_define.get('define', {}).get(_col, {}).get('type', '') _col_as_name = _col if _val.get('col_as', None) is None else _val['col_as'] if _col_type == 'str': # 字符串返回, 否则会带双引号 _real_cols.append( "{tab_as_name}\"{col_name}\"#>>'{{{path}}}' as \"{as_name}\"".format( path=_path, col_name=_col_name, as_name=_col_as_name, tab_as_name=_as_name ) ) elif _col_type != '': _real_cols.append( "cast({tab_as_name}\"{col_name}\"#>'{{{path}}}', as {col_type}) as \"{as_name}\"".format( path=_path, col_name=_col_name, as_name=_col_as_name, col_type=_col_type, tab_as_name=_as_name ) ) else: _real_cols.append( "{tab_as_name}\"{col_name}\"#>'{{{path}}}' as \"{as_name}\"".format( path=_path, col_name=_col_name, as_name=_col_as_name, tab_as_name=_as_name ) ) # 返回sql return ','.join(_real_cols) def _get_sort_sql(self, sort: list, fixed_col_define: dict = None, sql_paras: list = [], left_join: list = None, session=None) -> str: """ 获取兼容mongodb查询排序的sql语句 @param {list} sort - 查询结果的排序方式 例: [('col1', 1), ...] 注: 参数的第2个值指定是否升序(1为升序, -1为降序) @param {dict} fixed_col_define=None - 表的固定字段配置信息字典 { 'cols': [], # 表固定字段名清单 'define': {'字段名': {'type': 'str|bool|int|...'}} } @param {list} sql_paras=[] - 返回sql对应的占位参数 @param {list} left_join=None - 左关联配置 @param {Any} session=None - 数据库事务连接对象 @returns {str} - 返回更新部分语句sql """ _sorts = [] # 主表的字段定义参数 _main_fixed_cols = None if fixed_col_define is not None: _main_fixed_cols = copy.deepcopy(fixed_col_define.get('cols', [])) _main_fixed_cols.append('_id') # 关联表的字段定义列表 _fixed_cols_dict = {} # 循环处理每个排序参数 for _item in sort: # 处理临时参数 _col: str = _item[0] if _col[0] != '#': # 属于主表字段 _as_name = '' _fixed_cols = _main_fixed_cols else: # 属于关联表字段 _index = _col.find('.') _join_para = left_join[int(_col[1: _index])] _col = _col[_index+1:] _join_db_name = _join_para.get('db_name', self._db_name) _join_tab = _join_para['collection'] _as_name = '"%s".' % _join_para.get('as', _join_tab) if _as_name in _fixed_cols_dict.keys(): _fixed_cols = _fixed_cols_dict.get(_as_name, None) else: # 重新获取并放入缓存字典 _fixed_col_define = AsyncTools.sync_run_coroutine(self._get_fixed_col_define( _join_tab, db_name=_join_db_name, session=session )) _fixed_cols = copy.deepcopy(_fixed_col_define.get('cols', [])) _fixed_cols.append('_id') _fixed_cols_dict[_as_name] = _fixed_cols # 生成排序sql语句 if _fixed_cols is None: # 所有字段认为是固定字段 _sorts.append('%s"%s" %s NULLS LAST' % (_as_name, _col, 'asc' if _item[1] == 1 else 'desc')) else: if _col in _fixed_cols: _sorts.append('%s"%s" %s NULLS LAST' % (_as_name, _col, 'asc' if _item[1] == 1 else 'desc')) else: # 属于扩展字段 _path_cols = _col.split('.') if len(_path_cols) > 1 and _path_cols[0] in _fixed_cols: _col_name = _path_cols[0] _path = self._convert_jsonset_array(_path_cols[1:]) else: _col_name = 'nosql_driver_extend_tags' _path = self._convert_jsonset_array(_path_cols) _sorts.append( "%s\"%s\"#>'{%s}' %s NULLS LAST" % (_as_name, _col_name, _path, 'asc' if _item[1] == 1 else 'desc') ) # 返回结果 return ','.join(_sorts) def _get_group_sql(self, group: dict, fixed_col_define: dict = None, sql_paras: list = []) -> tuple: """ 生成分组sql语句 @param {dict} group - 分组返回设置字典(注意与mongodb的_id要求有所区别) 指定分组字段为col1、col2, 聚合字段为count、pay_amt, 其中pay_amt统计col_pay字段的合计数值 {'id': '$col1', 'name': '$col2', 'count': {'$sum': 1}, 'pay_amt': {'$sum': '$col_pay'}} 常见的聚合类型: $sum-计算总和, $avg-计算平均值, $min-取最小值, $max-取最大值, $first-取第一条, $last-取最后一条 @param {dict} fixed_col_define=None - 表的固定字段配置信息字典 { 'cols': [], # 表固定字段名清单 'define': {'字段名': {'type': 'str|bool|int|...'}} } @param {list} sql_paras=[] - 返回sql对应的占位参数 @param {list} json_query_cols=[] - 返回sql中json查询所需的json_tree处理的字段名 @returns {tuple} - 返回sql, (select语句, group by语句) """ # 固定字段定义 if fixed_col_define is None: _fixed_cols = [] else: _fixed_cols = copy.deepcopy(fixed_col_define.get('cols', [])) _fixed_cols.append('_id') # 函数操作名映射 _op_mapping = { '$sum': 'SUM', '$avg': 'AVG', '$min': 'MIN', '$max': 'MAX' } _select = [] _groupby = [] for _key, _val in group.items(): _val_type = type(_val) if isinstance(_val, dict): # 是聚合函数 _op = list(_val.keys())[0] _col = _val[_op] if type(_col) == str and _col.startswith('$'): # 是字段 _col = _col[1:] if _col not in _fixed_cols: # 非固定字段 _path_cols = _col.split('.') if len(_path_cols) > 1 and _path_cols[0] in _fixed_cols: _col_name = _path_cols[0] _path = self._convert_jsonset_array(_path_cols[1:]) else: _col_name = 'nosql_driver_extend_tags' _path = self._convert_jsonset_array(_path_cols) _col_type = fixed_col_define.get('define', {}).get(_col, {}).get('type', None) if _col_type is None: # 聚合函数只支持数字类型 _col = "cast(\"%s\"#>'{%s}' as float)" % (_col_name, _path) elif _col_type == 'str': _col = "\"%s\"#>>'{%s}'" % (_col_name, _path) else: _col = "cast(\"%s\"#>'{%s}' as %s)" % (_col_name, _path, _col_type) else: _col = '"%s"' % _col _select.append('%s(%s) as %s' % (_op_mapping[_op], _col, _key)) else: # 是值 _select.append('%s(%s) as %s' % (_op_mapping[_op], '%s', _key)) sql_paras.append(_col) elif _val_type == str and _val.startswith('$'): # 是字段 _col = _val[1:] if _col not in _fixed_cols: # 非固定字段 _path_cols = _col.split('.') if len(_path_cols) > 1 and _path_cols[0] in _fixed_cols: _col_name = _path_cols[0] _path = self._convert_jsonset_array(_path_cols[1:]) else: _col_name = 'nosql_driver_extend_tags' _path = self._convert_jsonset_array(_path_cols) _col_type = fixed_col_define.get('define', {}).get(_col, {}).get('type', None) if _col_type == 'str': _col = "\"%s\"#>>'{%s}'" % (_col_name, _path) else: _col = "\"%s\"#>'{%s}'" % (_col_name, _path) else: _col = '"%s"' % _col _select.append('%s as %s' % (_col, _key)) _groupby.append(_col) else: # 是固定值 _select.append('%s as %s' % ('%s', _key)) sql_paras.append(_val) return ','.join(_select), ','.join(_groupby) def _db_quotes_str_default(self, val: str) -> str: """ 数据库单引号转义处理(仅默认值使用, 不转义双引号) @param {str} val - 要处理的字符串 @returns {str} - 转义处理后的字符串, 注意不包含外面的引号 """ return val.replace('\\', '\\\\').replace('\r', '\\r').replace( '\n', '\\n').replace('\t', '\\t').replace("'", "''") # .replace('"', '\\"') def _get_col_default_value(self, val: str, dbtype: str) -> str: """ 获取字段的默认值 @param {str} val - 字符串形式的值 @param {str} dbtype - 数据类型 @returns {str} - 返回默认值的字符串 """ if dbtype in ('int', 'float'): return str(val) elif dbtype == 'bool': return 'true' if str(val).lower() == 'true' else 'false' elif dbtype == 'json': if type(val) != str: return "'%s'" % self._db_quotes_str_default(json.dumps(val, ensure_ascii=False)) else: return "'%s'" % self._db_quotes_str_default(val) else: return "'%s'" % self._db_quotes_str_default(str(val)) def _get_partition_sql(self, db_name: str, collection: str, partition: dict, add_partition_sqls: list = [], partition_cols: list = []) -> str: """ 生成分区创建sql语句 @param {str} db_name - 数据库名 @param {str} collection - 表名 @param {dict} partition - 分区创建配置字典 @param {list} add_partition_sqls=[] - 创建分区表的语句 @param {list} partition_cols=[] - 分区涉及到的字段清单 @returns {str} - 建表分区指定语句 """ # 复制参数对象 _partition = copy.deepcopy(partition) # 组装sql需要用到的sql关键字字典 _type_sql_mapping = { 'range': { 'name_sql': 'RANGE', 'value_sql': 'FOR VALUES FROM', 'is_columns': True }, 'list': { 'name_sql': 'LIST', 'value_sql': 'FOR VALUES IN', 'is_columns': False }, 'hash': { 'name_sql': 'HASH', 'value_sql': 'FOR VALUES WITH', 'is_columns': True } } _type_sql_para = _type_sql_mapping[_partition['type']] # 非多列模式的分区条件清单只取第一个参数 if _type_sql_para['is_columns']: _columns = _partition['columns'] else: # 只支持单字段 _columns = _partition['columns'][0: 1] _cols = [] # 分区字段清单 _partition_configs = [] # 分区定义清单 for _index in range(len(_columns)): _column = _columns[_index] # 分区字段清单处理 if _column['col_name'] not in partition_cols: # 登记分区涉及的字段清单 partition_cols.append(_column['col_name']) if _column.get('func', '') == '': _cols.append('"%s"' % _column['col_name']) else: # 表达式 _cols.append(_column['func'].format(col_name='"%s"' % _column['col_name'])) if _partition['type'] == 'hash': # 哈希分区不需要处理range_list continue _last_values = 'MINVALUE' for _range_index in range(len(_column['range_list'])): _range = _column['range_list'][_range_index] # 分区后缀名 _partition_name = None if _index == 0: _partition_name = ('p%d' % _range_index) if _range.get('name', None) is None else _range['name'] # 值处理 if _partition['type'] == 'range': # 范围模式, 形成最小值, 最大值范围数组 _temp_value = 'MAXVALUE' if _range['value'] is None else str(_range['value']) _value = [_last_values, _temp_value] _last_values = _temp_value # 修改最小值 # 添加到分区配置 if _index == 0: _partition_configs.append({ 'name': _partition_name, 'values': {'sm': [_value[0]], 'lg': [_value[1]]} }) else: _partition_configs[_range_index]['values']['sm'].append(_value[0]) _partition_configs[_range_index]['values']['lg'].append(_value[1]) else: # 列表模式, 直接就是数组 _value = [] for _temp_value in _range['value']: _value.append('NULL' if _temp_value is None else str(_temp_value)) # 添加到分区配置 if _index == 0: _partition_configs.append({ 'name': _partition_name, 'values': _value }) else: _partition_configs[_range_index]['values'].extend(_value) # hash 模式的_partition_configs处理 if _partition['type'] == 'hash': for _index in range(_partition['count']): _partition_configs.append({ 'name': 'p%d' % _index, 'values': _index }) # 建表分区指定语句 _partition_sql = 'PARTITION BY %s (%s)' % ( _type_sql_para['name_sql'], ','.join(_cols) ) # 创建分区表语句 for _partition_config in _partition_configs: _add_partition_sql = 'create table if not exists "{db_name}"."{collection}_{pname}" PARTITION OF "{db_name}"."{collection}" {value_sql}'.format( db_name=db_name, collection=collection, pname=_partition_config['name'], value_sql=_type_sql_para['value_sql'] ) if _partition['type'] == 'range': _add_partition_sql = '%s (%s) TO (%s)' % ( _add_partition_sql, ','.join(_partition_config['values']['sm']), ','.join(_partition_config['values']['lg']) ) elif _partition['type'] == 'list': _add_partition_sql = '%s (%s)' % (_add_partition_sql, ','.join(_partition_config['values'])) else: # hash _add_partition_sql = '%s (MODULUS %d, REMAINDER %d)' % ( _add_partition_sql, _partition['count'], _partition_config['values'] ) _add_sub_partition_sqls = [] if _partition.get('sub_partition', None) is not None: _sub_partition_sql = self._get_partition_sql( db_name, '%s_%s' % (collection, _partition_config['name']), _partition['sub_partition'], add_partition_sqls=_add_sub_partition_sqls, partition_cols=partition_cols ) # 分区创建语句添加分区定义 _add_partition_sql = '%s %s' % (_add_partition_sql, _sub_partition_sql) # 添加到分区创建脚本数组 add_partition_sqls.append(_add_partition_sql) # 添加子分区的创建脚本 add_partition_sqls.extend(_add_sub_partition_sqls) # 添加默认分区 if _partition['type'] != 'hash': _add_partition_sql = 'create table if not exists "{db_name}"."{collection}_{pname}" PARTITION OF "{db_name}"."{collection}" {value_sql}'.format( db_name=db_name, collection=collection, pname='p_default', value_sql='DEFAULT' ) add_partition_sqls.append(_add_partition_sql) return _partition_sql def _get_left_join_sqls(self, db_name: str, collection: str, left_join: list, sql_paras: list = [], session=None, fixed_col_define: dict = None) -> list: """ 获取左关联的关联表sql清单 @param {str} db_name - 主表数据库名 @param {str} collection - 主表名 @param {list} left_join - 左关联配置 @param {list} sql_paras=[] - 返回sql对应的占位参数 @param {Any} session=None - 数据库事务连接对象 @param {dict} fixed_col_define=None - 主表的固定字段配置信息字典 { 'cols': [], # 表固定字段名清单 'define': {'字段名': {'type': 'str|bool|int|...'}} } @returns {list} - 关联表sql清单 """ _sqls = [] # 遍历生成每个表的关联sql for _join_para in left_join: _join_db_name = _join_para.get('db_name', db_name) _join_tab = _join_para['collection'] _as_name = _join_para.get('as', _join_tab) # 表字段定义 _fixed_col_define = {'cols': []} if fixed_col_define is None else fixed_col_define _join_fixed_col_define = AsyncTools.sync_run_coroutine(self._get_fixed_col_define( _join_tab, db_name=_join_db_name, session=session )) # on语句 _on_fields = [] for _on in _join_para['join_fields']: if _on[0] != '_id' and _on[0] not in _fixed_col_define['cols']: # 扩展字段 _field0 = "\"nosql_driver_extend_tags\"->'%s'" % _on[0] else: _field0 = '"%s"' % _on[0] if _on[1] != '_id' and _on[1] not in _join_fixed_col_define['cols']: # 扩展字段 _field1 = "\"nosql_driver_extend_tags\"->'%s'" % _on[1] else: _field1 = '"%s"' % _on[1] _on_fields.append('"%s".%s = "%s".%s' % ( collection, _field0, _as_name, _field1 )) # 根据是否有过滤条件处理 _filter = _join_para.get('filter', None) if _filter is None: _sqls.append('"%s"."%s" "%s" on %s' % ( _join_db_name, _join_tab, _as_name, ' and '.join(_on_fields) )) else: # 有过滤条件, 按查询表的方式关联 _filter_sql = self._get_filter_sql( _filter, fixed_col_define=_join_fixed_col_define, sql_paras=sql_paras ) _sqls.append('(select * from "%s"."%s" where %s) "%s" on %s' % ( _join_db_name, _join_tab, _filter_sql, _as_name, ' and '.join(_on_fields) )) return _sqls ############################# # 生成SQL转换的处理函数 ############################# def _sqls_fun_not_support(self, op: str, *args, **kwargs): """ 驱动不支持的情况 """ raise pgadapter.NotSupportedError('psycopg not support this operation') def _sqls_fun_create_db(self, op: str, *args, **kwargs) -> tuple: """ 生成添加数据库的sql语句数组 """ # 获取参数 _name = args[0] # 数据库名 _authorization = args[1] if len(args) > 1 else kwargs.get('authorization', None) # schema所属用户 _sqls = [] _checks = [] _sqls.append("select count(*) as db_count from information_schema.schemata where schema_name='%s'" % _name) _checks.append(None) # 组成sql _sql = "CREATE SCHEMA \"%s\"" % _name if _authorization is not None: _sql = "%s AUTHORIZATION %s" % (_sql, _authorization) _sqls.append(_sql) _checks.append({'pre_check': { 'cmp_prev_return': [1, self.cmp_func_lt_value] }}) # 上一个语句小于1才执行 return (_sqls, None, {'is_query': [True, False]}, _checks) def _sqls_fun_list_dbs(self, op: str, *args, **kwargs) -> tuple: """ 生成获取数据库清单的sql语句数组 """ _sql = "SELECT schema_name as name FROM information_schema.schemata where schema_name not like 'pg\\_%' and schema_name not like 'information\\_%' order by schema_name" return ([_sql], None, {'is_query': True}) def _sqls_fun_drop_db(self, op: str, *args, **kwargs) -> tuple: """ 生成删除数据库的sql语句数组 """ _name = args[0] # 数据库名 _sql = 'DROP SCHEMA "%s" CASCADE' % _name return ([_sql], None, {}) def _sqls_fun_create_collection(self, op: str, *args, **kwargs) -> tuple: """ 生成建表的sql语句数组 """ # 参数处理 _collection = args[0] _temporary = kwargs.get('temporary', False) # 是否临时表 _on_commit = kwargs.get('on_commit', None) # 指定临时表事务提交时执行的操作, None-不采取任何操作, delete-删除所有数据, drop-删除表 _tablespace = kwargs.get('tablespace', None) # 指定表所在的表空间 # 创建索引的参数 _index_concurrently = kwargs.get('index_concurrently', False) # 启用选项后创建索引的过程中不在表上持有任何防止插入、更改、删除的写入锁 _index_method = kwargs.get('index_method', None) # 要使用的索引方法的名字。可选的名字是 btree(缺省), hash, gist, gin _index_tablespace = kwargs.get('index_tablespace', None) # 指定索引所在的表空间 _sqls = [] _checks = [] # 语句检查数组 # 生成表字段清单 _cols = [] if kwargs.get('fixed_col_define', None) is not None: for _col_name, _col_def in kwargs['fixed_col_define'].items(): _cols.append( '"%s" %s%s%s' % ( _col_name, self._dbtype_mapping(_col_def['type'], _col_def.get('len', None)), '' if _col_def.get('nullable', True) else ' not null', '' if _col_def.get('default', None) is None else ' default %s' % self._get_col_default_value( _col_def['default'], _col_def['type'] ) ) ) # 主键清单(如果存在分区的情况, 分区字段必须为主键) _primary_keys = ['_id'] # 生成分区sql _partition_sql = None _add_partition_sqls = [] _partition_cols = [] if kwargs.get('partition', None) is not None: _partition_sql = self._get_partition_sql( self._db_name, _collection, kwargs['partition'], add_partition_sqls=_add_partition_sqls, partition_cols=_partition_cols ) _primary_keys.extend(_partition_cols) # 建表脚本, 需要带上数据库前缀 _sql = 'create%s table if not exists "%s"."%s"("_id" varchar(100), %s "nosql_driver_extend_tags" JSONB)%s' % ( 'TEMPORARY' if _temporary else '', self._db_name, _collection, (', '.join(_cols) + ',') if len(_cols) > 0 else '', '' if _partition_sql is None else ' %s' % _partition_sql # 分区信息 ) # 额外参数 if _on_commit is not None: _sql = '%s ON COMMIT %s' % ( _sql, 'DROP' if _on_commit == 'drop' else ('DELETE ROWS' if _on_commit == 'delete' else 'PRESERVE ROWS') ) if _tablespace is not None: _sql = '%s TABLESPACE %s' % (_sql, _tablespace) _sqls.append(_sql) _checks.append(None) _sqls.extend(_add_partition_sqls) # 增加分区创建的脚本 for _index in range(len(_add_partition_sqls)): _checks.append(None) # 处理主键约束 _sql = 'alter table "{db_name}"."{collection}" add constraint "pk_{db_name}_{collection}" primary key("{key_str}")'.format( db_name=self._db_name, collection=_collection, key_str='","'.join(_primary_keys) ) _sqls.append(_sql) if self._ignore_index_error: _checks.append({'after_check': {'ignore_current_error': True}}) # 忽略语句执行失败 else: _checks.append(None) # 如果_id不是唯一主键, 需要设置为唯一索引 if len(_primary_keys) > 1: _sql = 'create {unique}index "idx_{db_name}_{collection}__id" on "{db_name}"."{collection}"("_id")'.format( unique='UNIQUE ' if '_id' in _partition_cols else '', db_name=self._db_name, collection=_collection ) _sqls.append(_sql) if self._ignore_index_error: _checks.append({'after_check': {'ignore_current_error': True}}) # 忽略语句执行失败 else: _checks.append(None) # 处理注释 if kwargs.get('comment', None) is not None: # 添加表注释 _sql = "comment on table \"%s\".\"%s\" is '%s'" % (self._db_name, _collection, self._db_quotes_str(kwargs['comment'])) _sqls.append(_sql) _checks.append(None) if kwargs.get('fixed_col_define', None) is not None: # 添加字段注释 for _col_name, _col_def in kwargs['fixed_col_define'].items(): if _col_def.get('comment', None) is not None: _sql = "comment on column \"%s\".\"%s\".\"%s\" is '%s'" % ( self._db_name, _collection, _col_name, self._db_quotes_str(_col_def['comment']) ) _sqls.append(_sql) _checks.append(None) # 建索引脚本 if kwargs.get('indexs', None) is not None: for _index_name, _index_def in kwargs['indexs'].items(): _support_unique = True _cols = [] for _col_name, _para in _index_def['keys'].items(): if _col_name not in _partition_cols: # 索引中存在非分区条件的字段, 则不支持唯一索引 _support_unique = False if _para.get('asc', 1) == -1: # 降序索引 _cols.append('"%s" desc' % _col_name) else: _cols.append('"%s"' % _col_name) _sql = 'create %sindex%s if not exists "%s" on "%s"."%s" %s(%s)%s' % ( 'UNIQUE ' if _index_def.get('paras', {}).get('unique', False) and _support_unique else '', ' CONCURRENTLY' if _index_concurrently else '', _index_name, self._db_name, _collection, '' if _index_method is None else 'USING %s ' % _index_method, ','.join(_cols), '' if _index_tablespace is None else ' TABLESPACE %s' % _index_tablespace ) _sqls.append(_sql) if self._ignore_index_error: _checks.append({'after_check': {'ignore_current_error': True}}) # 忽略语句执行失败 else: _checks.append(None) # 返回结果 return (_sqls, None, {}, _checks) def _sql_fun_list_collections(self, op: str, *args, **kwargs) -> tuple: """ 生成查询表清单的sql语句数组 """ _filter = kwargs.get('filter', None) # 生成where语句 _sql_paras = [] _where = self._get_filter_sql(_filter, sql_paras=_sql_paras) if _where is not None: _where = _where.replace('"name" ', 'tablename ', 1) _sql = "select tablename as name from pg_tables where schemaname = '%s'%s order by tablename" % ( self._db_name, '' if _where is None else ' and %s' % _where ) # 返回结果 return ([_sql], None if len(_sql_paras) == 0 else [_sql_paras], {'is_query': True}) def _sql_fun_drop_collection(self, op: str, *args, **kwargs) -> tuple: """ 生成删除表的sql语句数组 """ _collection = args[0] _sql = 'DROP TABLE IF EXISTS "%s"."%s" CASCADE' % (self._db_name, _collection) return ([_sql], None, {}) def _sql_fun_turncate_collection(self, op: str, *args, **kwargs) -> tuple: """ 生成清空表的sql语句数组 """ _collection = args[0] _sqls = [] _sqls.append( 'TRUNCATE TABLE "%s"."%s"' % (self._db_name, _collection) ) return (_sqls, None, {}) def _sql_fun_insert_one(self, op: str, *args, **kwargs) -> tuple: """ 生成插入数据的sql语句数组 """ _collection = args[0] _row = args[1] _fixed_col_define = args[2] if len(args) > 2 else kwargs.get('fixed_col_define', {}) # 生成插入字段和值 _cols = ['"_id"'] _sql_paras = [_row.pop('_id')] for _col in _fixed_col_define.get('cols', []): _val = _row.pop(_col, None) if _val is not None: _cols.append('"%s"' % _col) _sql_paras.append(self._python_to_dbtype(_val)[1]) # 剩余的内容放入扩展字段 _cols.append('"nosql_driver_extend_tags"') _sql_paras.append(self._python_to_dbtype(_row, dbtype='json')[1]) # 组成sql _sql = 'insert into "%s"."%s"(%s) values(%s)' % ( self._db_name, _collection, ','.join(_cols), ','.join(['%s' for _tcol in _cols]) ) return ([_sql], [_sql_paras], {}) def _sql_fun_insert_many(self, op: str, *args, **kwargs) -> tuple: """ 生成插入数据的sql语句数组 """ _collection = args[0] _rows = args[1] _fixed_col_define = args[2] if len(args) > 2 else kwargs.get('fixed_col_define', {}) # 生成插入字段列表 _cols = ['"_id"'] for _col in _fixed_col_define.get('cols', []): if _col == '_id': continue _cols.append('"%s"' % _col) _cols.append('"nosql_driver_extend_tags"') # 遍历生成插入数据和参数 _para_array = [] _value_array = [] for _s_row in _rows: _row = copy.copy(_s_row) # 浅复制即可 _sql_paras = [_row.pop('_id', str(ObjectId()))] _col_values = ['%s'] for _col in _fixed_col_define.get('cols', []): if _col == '_id': continue _val = _row.pop(_col, None) if _val is None: _col_values.append('null') else: _col_values.append('%s') _sql_paras.append(self._python_to_dbtype(_val)[1]) # 剩余的内容放入扩展字段 _col_values.append('%s') _sql_paras.append(self._python_to_dbtype(_row, dbtype='json')[1]) # 添加到数组 _para_array.extend(_sql_paras) _value_array.append('(%s)' % ','.join(_col_values)) # 组成sql _sql = 'insert into "%s"."%s"(%s) values %s' % ( self._db_name, _collection, ','.join(_cols), ','.join(_value_array) ) return ([_sql], [_para_array], {}) def _sql_fun_update(self, op: str, *args, **kwargs) -> tuple: """ 生成更新数据的sql语句数组 """ # 获取参数 _collection = args[0] _filter = args[1] _update = args[2] _fixed_col_define = kwargs.get('fixed_col_define', None) _partition = kwargs.get('partition', None) # 指定分区表 if _partition is not None: _collection = '%s_%s' % (_collection, _partition) # 处理where条件语句 _where_sql_paras = [] _where_sql = self._get_filter_sql( _filter, fixed_col_define=_fixed_col_define, sql_paras=_where_sql_paras ) # 处理更新配置语句 _update_sql_paras = [] _update_sql = self._get_update_sql( _update, fixed_col_define=_fixed_col_define, sql_paras=_update_sql_paras ) _sql_collection = '"%s"."%s"' % (self._db_name, _collection) _sql = 'update %s set %s' % (_sql_collection, _update_sql) if _where_sql is not None: _sql = '%s where %s' % (_sql, _where_sql) _update_sql_paras.extend(_where_sql_paras) # 返回语句 return ([_sql], [_update_sql_paras], {}) def _sql_fun_delete(self, op: str, *args, **kwargs) -> tuple: """ 生成删除数据的sql语句数组 """ # 获取参数 _collection = args[0] _filter = args[1] _fixed_col_define = kwargs.get('fixed_col_define', None) _partition = kwargs.get('partition', None) # 指定分区表 if _partition is not None: _collection = '%s_%s' % (_collection, _partition) # 处理where条件语句 _where_sql_paras = [] _where_sql = self._get_filter_sql( _filter, fixed_col_define=_fixed_col_define, sql_paras=_where_sql_paras ) _sql_paras = None _sql_collection = '"%s"."%s"' % (self._db_name, _collection) _sql = 'delete from %s' % _sql_collection if _where_sql is not None: _sql = '%s where %s' % (_sql, _where_sql) _sql_paras = _where_sql_paras # 返回语句 return ([_sql], [_sql_paras], {}) def _sql_fun_query(self, op: str, *args, **kwargs) -> tuple: """ 生成查询数据的sql语句数组 """ # 获取参数 _collection = args[0] _filter = kwargs.get('filter', {}) _projection = kwargs.get('projection', None) _sort = kwargs.get('sort', None) _skip = kwargs.get('skip', None) _limit = kwargs.get('limit', None) _fixed_col_define = kwargs.get('fixed_col_define', None) _left_join = kwargs.get('left_join', None) # 关联查询 _session = kwargs.get('session', None) # 数据库操作的session _partition = kwargs.get('partition', None) # 指定分区表 if _partition is not None: _collection = '%s_%s' % (_collection, _partition) # 处理where条件语句 _where_sql_paras = [] _where_sql = self._get_filter_sql( _filter, fixed_col_define=_fixed_col_define, sql_paras=_where_sql_paras, left_join=_left_join, session=_session ) # 处理sort语句 _sort_sql_paras = [] _sort_sql = None if _sort is not None: _sort_sql = self._get_sort_sql( _sort, fixed_col_define=_fixed_col_define, sql_paras=_sort_sql_paras, left_join=_left_join, session=_session ) # 处理projection语句 _projection_sql_paras = [] _projection_sql = self._get_projection_sql( _projection, fixed_col_define=_fixed_col_define, sql_paras=_projection_sql_paras, as_name=None if _left_join is None else _collection, left_join=_left_join, session=_session ) # 查询表 _tab = '"%s"."%s"' % (self._db_name, _collection) # 处理关联表 _left_join_sql_paras = [] if _left_join is not None: _left_join_sqls = self._get_left_join_sqls( self._db_name, _collection, _left_join, sql_paras=_left_join_sql_paras, session=_session, fixed_col_define=_fixed_col_define ) for _join_sql in _left_join_sqls: _tab = '%s left outer join %s' % (_tab, _join_sql) # 组装语句 _sql_paras = [] _sql = 'select %s from %s' % (_projection_sql, _tab) _sql_paras.extend(_projection_sql_paras) _sql_paras.extend(_left_join_sql_paras) if _where_sql is not None: _sql = '%s where %s' % (_sql, _where_sql) _sql_paras.extend(_where_sql_paras) if _sort_sql is not None: _sql = '%s order by %s' % (_sql, _sort_sql) _sql_paras.extend(_sort_sql_paras) # 增加skip和limit if _limit is not None: if _skip is not None: _sql = '%s %s' % (_sql, 'limit %d offset %d' % (_limit, _skip)) else: _sql = '%s %s' % (_sql, 'limit %d' % _limit) else: if _skip is not None: _sql = '%s %s' % (_sql, 'offset %d' % _skip) # 返回最终结果 return ([_sql], [_sql_paras], {'is_query': True}) def _sql_fun_query_count(self, op: str, *args, **kwargs) -> tuple: """ 生成查询数据count的sql语句数组 """ # 获取参数 _collection = args[0] _filter = kwargs.get('filter', {}) _skip = kwargs.get('skip', None) _limit = kwargs.get('limit', None) _fixed_col_define = kwargs.get('fixed_col_define', None) _left_join = kwargs.get('left_join', None) # 关联查询 _session = kwargs.get('session', None) # 数据库操作的session _partition = kwargs.get('partition', None) # 指定分区表 if _partition is not None: _collection = '%s_%s' % (_collection, _partition) # 处理where条件语句 _where_sql_paras = [] _where_sql = self._get_filter_sql( _filter, fixed_col_define=_fixed_col_define, sql_paras=_where_sql_paras, left_join=_left_join, session=_session ) # 查询表名 _tab = '"%s"."%s"' % (self._db_name, _collection) # 处理关联表 _left_join_sql_paras = [] if _left_join is not None: _left_join_sqls = self._get_left_join_sqls( self._db_name, _collection, _left_join, sql_paras=_left_join_sql_paras, session=_session, fixed_col_define=_fixed_col_define ) for _join_sql in _left_join_sqls: _tab = '%s left outer join %s' % (_tab, _join_sql) # 组装语句 if _limit is not None or _skip is not None: # 有获取数据区间, 只能采用性能差的子查询模式 _sql_paras = [] _sql = 'select 1 from %s' % _tab _sql_paras.extend(_left_join_sql_paras) if _where_sql is not None: _sql = '%s where %s' % (_sql, _where_sql) _sql_paras.extend(_where_sql_paras) # 增加skip和limit if _limit is not None: if _skip is not None: _sql = '%s %s' % (_sql, 'limit %d offset %d' % (_limit, _skip)) else: _sql = '%s %s' % (_sql, 'limit %d' % _limit) else: if _skip is not None: _sql = '%s %s' % (_sql, 'offset %d' % _skip) # 外面封装一层查询 _sql = 'select count(*) from (%s) t' % _sql else: _sql_paras = [] _sql = 'select count(*) from %s' % _tab _sql_paras.extend(_left_join_sql_paras) if _where_sql is not None: _sql = '%s where %s' % (_sql, _where_sql) _sql_paras.extend(_where_sql_paras) # 返回最终结果 return ([_sql], [_sql_paras], {'is_query': True}) def _sql_fun_query_group_by(self, op: str, *args, **kwargs) -> tuple: """ 生成查询数据聚合的sql语句数组 """ # 获取参数 _collection = args[0] _group = kwargs.get('group', None) _filter = kwargs.get('filter', {}) _projection = kwargs.get('projection', None) _sort = kwargs.get('sort', None) _fixed_col_define = kwargs.get('fixed_col_define', None) _partition = kwargs.get('partition', None) # 指定分区表 if _partition is not None: _collection = '%s_%s' % (_collection, _partition) # 处理group by语句 _select_sql_paras = [] _select_sql, _group_by_sql = self._get_group_sql( _group, fixed_col_define=_fixed_col_define, sql_paras=_select_sql_paras ) # 处理where条件语句 _where_sql_paras = [] _where_sql = self._get_filter_sql( _filter, fixed_col_define=_fixed_col_define, sql_paras=_where_sql_paras ) # 处理sort语句 _sort_sql_paras = [] _sort_sql = None if _sort is not None: _sort_sql = self._get_sort_sql( _sort, fixed_col_define=None, sql_paras=_sort_sql_paras ) # 处理projection语句 _projection_sql_paras = [] _projection_sql = self._get_projection_sql( _projection, fixed_col_define=None, sql_paras=_projection_sql_paras, is_group_by=True ) # 查询表名 _tab = '"%s"."%s"' % (self._db_name, _collection) # 组装查询语句 _sql_paras = [] _sql = 'select %s from %s' % ( _select_sql, _tab ) _sql_paras.extend(_select_sql_paras) if _where_sql is not None: _sql = '%s where %s' % (_sql, _where_sql) _sql_paras.extend(_where_sql_paras) _sql = '%s group by %s' % (_sql, _group_by_sql) if _sort_sql is not None: # 有排序, 需要包装多一层 _sql = 'select %s from (%s) t' % (_projection_sql, _sql) _sql = '%s order by %s' % (_sql, _sort_sql) # 返回结果 return ([_sql], None if len(_sql_paras) == 0 else [_sql_paras], {'is_query': True}) ############################# # 其他内部函数 ############################# def _convert_jsonset_array(self, path_list: list) -> str: """ 将json查询路径数组转换为pgsql支持的json_set查询路径字符串 @param {list} path_list - 路径数组 @returns {str} - 转换后的查询路径字符串 """ _path = '' for _key in path_list: if ValidateTool.str_is_int(_key): _path = '%s%s' % ('' if _path == '' else '%s, ' % _path, _key) else: _path = '%s"%s"' % ('' if _path == '' else '%s, ' % _path, _key) return _path
if __name__ == '__main__': # 当程序自己独立运行时执行的操作 # "test \'单引号\' \\"双引号\\", 反斜杠\\\\, 回车:\\r, 换行:\\n, tab:\\t new" _str = 'test \'单引号\' "双引号", 反斜杠\\, 回车:\r, 换行:\n, tab:\t' _str = _str.replace('\\', '\\\\').replace('\r', '\\r').replace('\n', '\\n').replace("'", "\\'").replace('"', '\\"') print(_str)