LOADING

加载过慢请开启缓存 浏览器默认开启

基于Python的一个Mysql工具类

2024/6/17 Lib Python Mysql

1 描述

在使用mybatis-plus进行CUDR后,感受到了其设计理念的巧妙和代码逻辑的清晰。但是在有时敏捷开发的需求下,使用mybatis-plus未免有些繁琐,
所以希望使用Python实现类似于java操作数据库的方式。基于以上原因,写了一个基于pymysql库封装的框架。

2 文件结构

项目的文件结构如下:

3 代码

./mysql/Mysql.py

# -*- coding: utf-8 -*-
"""
    数据库链接工具
    
    @Author  : RichardoGu
    @Time    : 2024/6/13 16:43
"""
import logging as log
from .data_object import *
import pymysql
import time

# 根日志器默认日志级别为WARNING,这里将其重置,以保证debug、info级别的日志也能输出
log.basicConfig(
    level=log.NOTSET,
    format='%(asctime)s.%(msecs)03d [%(levelname)s] [%(filename)s:%(lineno)d] %(message)s',
    datefmt='## %Y-%m-%d %H:%M:%S'
)


class MysqlUtil:
    def __init__(self, host: str = "localhost", port: int = 3306,
                 username: str = "root", password: str = "admin", db: str = "ruoyi-vue-pro",
                 charset: str = 'utf-8'):
        """mysql 连接初始化"""
        log.info("正在初始化数据库...")
        self._host = host
        self._port = port
        self._user = username
        self._password = password
        self._db = db
        self._charset = charset
        self._mysql_conn = None
        self.show_log = True

    def __enter__(self):
        """打开数据库连接"""
        self.mysql_conn = pymysql.connect(
            host=self._host,
            port=self._port,
            user=self._user,
            passwd=self._password,
            db=self._db,
            # charset=self._charset
        )
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """关闭数据库连接"""
        if self.mysql_conn:
            self.mysql_conn.close()
            self.mysql_conn = None

    def execute(self, sql: str, args: tuple = None, commit: bool = False) -> any:
        """执行 SQL 语句"""
        try:
            if self.show_log:
                log.info(f">>SQL :{sql},args:{args}")
            with self.mysql_conn.cursor() as cursor:
                cursor.execute(sql, args)
                if commit:
                    self.mysql_conn.commit()
                    if self.show_log:
                        log.info(f"数据提交成功")
                else:
                    result = cursor.fetchall()
                    if self.show_log:
                        log.info("<<查询到数据:")
                        for i in result:
                            log.info(f"<<{i}")
                    return result
        except Exception as e:
            log.info(f"执行 SQL 语句出错:{e}")
            self.mysql_conn.rollback()
            raise e

    def delete(self, do: BaseDO):
        # TODO R G 目前不需要写delete
        pass

    def insert(self, do: BaseDO) -> int:
        """按照DO中的值向数据库中插入值,并且返回插入主键ID"""
        if len(do.table_name) == 0:
            raise Exception("没有指定表名")
        insert_params = ""
        insert_value = ""
        for (param, value) in do.value.items():
            if value is not None:
                insert_params += param if insert_params == "" else f", {param}"
                if str == type(value):
                    # 处理字符串
                    insert_value += f"'{value}'" if insert_value == "" else f", '{value}'"
                elif time.struct_time == type(value):
                    # 处理时间
                    insert_value += (f"'{time.strftime('%Y-%m-%d %H:%M:%S', value)}'"
                                     if insert_value == ""
                                     else f", '{time.strftime('%Y-%m-%d %H:%M:%S', value)}'")
                elif bool == type(value):
                    # 处理bool类型
                    insert_value += f"b'{1 if value else 0}'" if insert_value == "" else f", b'{1 if value else 0}'"
                else:
                    insert_value += f"{value}" if insert_value == "" else f", {value}"
        sql = f"insert into `{do.table_name}` ({insert_params}) values ({insert_value})"
        try:
            with self.mysql_conn.cursor() as cursor:
                cursor.execute(sql)
                self.mysql_conn.commit()
                do.value["id"] = cursor.lastrowid
                return cursor.lastrowid
        except Exception as e:
            log.info(f"执行 SQL 语句出错:{e}")
            self.mysql_conn.rollback()
            raise e

    def query(self, do: BaseDO) -> tuple:
        if len(do.table_name) == 0:
            raise Exception("没有指定表名")
        return_data = ""
        query_data = ""
        for (param, value) in do.value.items():
            return_data += param if return_data == "" else f", {param}"
            if value is not None:
                query_data += param + "=" if query_data == "" else f" and {param}="
                if str == type(value):
                    # 处理字符串
                    query_data += f"'{value}'"
                elif time.struct_time == type(value):
                    # 处理时间
                    query_data += f"'{time.strftime('%Y-%m-%d %H:%M:%S', value)}'"
                elif bool == type(value):
                    # 处理bool类型
                    query_data += f"b'{1 if value else 0}'"
                else:
                    query_data += f"{value}"
        sql = f"select {return_data} from {do.table_name}"
        if query_data != "":
            sql += f" where {query_data}"
        # 执行sql语句
        return self.execute(sql)

./mysql/__init__.py

# -*- coding: utf-8 -*-
"""
    @Author  : RichardoGu
    @Time    : 2024/6/13 17:16
"""
from .data_object import *
from .Mysql import MysqlUtil

./mysql/data_object/__init__.py

# -*- coding: utf-8 -*-
"""
    DO
    
    @Author  : RichardoGu
    @Time    : 2024/6/13 20:15
"""
from .BaseDO import BaseDO
from .TenantDO import TenantDO
from .GraduateDO import GraduateDO
from .StandardDO import StandardDO
from .FileDO import FileDO

./mysql/data_object/BaseDO.py

# -*- coding: utf-8 -*-
"""
    数据库基础类
    
    @Author  : RichardoGu
    @Time    : 2024/6/13 20:14
"""
import time


class BaseDO:
    def __init__(self, table_name: str, creator: str, create_time: time.struct_time,
                 updater: str, update_time: time.struct_time):
        self.table_name: str = table_name
        self.value = {
            "creator": creator,
            "create_time": create_time,
            "updater": updater,
            "update_time": update_time,
            "deleted": False
        }

    def __str__(self):
        return str(self.value)

./mysql/data_object/FileDO.py

在BaseDO的基础上,实现了一个FileDO类,该类表示数据库中存放的文件信息,例如文件ID,文件名称,文件url等。

# -*- coding: utf-8 -*-
"""
    文件DO
    
    @Author  : RichardoGu
    @Time    : 2024/6/14 15:18
"""
from .BaseDO import BaseDO
import time


class FileDO(BaseDO):
    def __init__(self, ID: int = None, config_id: int = None, name: str = None,
                 path: str = None, url: str = None, Type: str = None, size: int = None,
                 creator: str = None, create_time: time.struct_time = None, updater: str = None,
                 update_time: time.struct_time = None):
        super().__init__(
            table_name="infra_file",
            creator=creator,
            create_time=create_time,
            updater=updater,
            update_time=update_time
        )
        self.value["id"] = ID
        self.value["config_id"] = config_id
        self.value["name"] = name
        self.value["path"] = path
        self.value["url"] = url
        self.value["type"] = Type
        self.value["size"] = size

4 范例

# -*- coding: utf-8 -*-
"""
    文件处理入口
    
    @Author  : RichardoGu
    @Time    : 2024/6/14 11:02
"""
# 库导入
import tqdm
import logging
import os
import re
from mysql import MysqlUtil, FileDO
import mimetypes

# 项目配置
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s.%(msecs)03d [%(levelname)s] [%(filename)s:%(lineno)d] %(message)s',
    datefmt='## %Y-%m-%d %H:%M:%S'
)
"""
==================== 全局变量 ==================
"""
# Mysql
MYSQL_HOST = "localhost"
MYSQL_PORT = 3306


if __name__ == "__main__":
    file_do = FileDO(
        config_id=1, name="file_name", path="save_name",
        url="test",
        Type="test", size=123,
    )
    with (MysqlUtil() as DB):
        DB.insert(file_do)