1 描述
在使用mybatis-plus进行CUDR后,感受到了其设计理念的巧妙和代码逻辑的清晰。但是在有时敏捷开发的需求下,使用mybatis-plus未免有些繁琐,
所以希望使用Python实现类似于java操作数据库的方式。基于以上原因,写了一个基于pymysql库封装的框架。
2 文件结构
项目的文件结构如下:
3 代码
./mysql/Mysql.py
# -*- coding: utf-8 -*-
"""
数据库链接工具
@Author : RichardoGu
@Time : 2024/6/13 16:43
"""
import logging as log
from .data_object import *
import pymysql
import time
# 根日志器默认日志级别为WARNING,这里将其重置,以保证debug、info级别的日志也能输出
log.basicConfig(
level=log.NOTSET,
format='%(asctime)s.%(msecs)03d [%(levelname)s] [%(filename)s:%(lineno)d] %(message)s',
datefmt='## %Y-%m-%d %H:%M:%S'
)
class MysqlUtil:
def __init__(self, host: str = "localhost", port: int = 3306,
username: str = "root", password: str = "admin", db: str = "ruoyi-vue-pro",
charset: str = 'utf-8'):
"""mysql 连接初始化"""
log.info("正在初始化数据库...")
self._host = host
self._port = port
self._user = username
self._password = password
self._db = db
self._charset = charset
self._mysql_conn = None
self.show_log = True
def __enter__(self):
"""打开数据库连接"""
self.mysql_conn = pymysql.connect(
host=self._host,
port=self._port,
user=self._user,
passwd=self._password,
db=self._db,
# charset=self._charset
)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""关闭数据库连接"""
if self.mysql_conn:
self.mysql_conn.close()
self.mysql_conn = None
def execute(self, sql: str, args: tuple = None, commit: bool = False) -> any:
"""执行 SQL 语句"""
try:
if self.show_log:
log.info(f">>SQL :{sql},args:{args}")
with self.mysql_conn.cursor() as cursor:
cursor.execute(sql, args)
if commit:
self.mysql_conn.commit()
if self.show_log:
log.info(f"数据提交成功")
else:
result = cursor.fetchall()
if self.show_log:
log.info("<<查询到数据:")
for i in result:
log.info(f"<<{i}")
return result
except Exception as e:
log.info(f"执行 SQL 语句出错:{e}")
self.mysql_conn.rollback()
raise e
def delete(self, do: BaseDO):
# TODO R G 目前不需要写delete
pass
def insert(self, do: BaseDO) -> int:
"""按照DO中的值向数据库中插入值,并且返回插入主键ID"""
if len(do.table_name) == 0:
raise Exception("没有指定表名")
insert_params = ""
insert_value = ""
for (param, value) in do.value.items():
if value is not None:
insert_params += param if insert_params == "" else f", {param}"
if str == type(value):
# 处理字符串
insert_value += f"'{value}'" if insert_value == "" else f", '{value}'"
elif time.struct_time == type(value):
# 处理时间
insert_value += (f"'{time.strftime('%Y-%m-%d %H:%M:%S', value)}'"
if insert_value == ""
else f", '{time.strftime('%Y-%m-%d %H:%M:%S', value)}'")
elif bool == type(value):
# 处理bool类型
insert_value += f"b'{1 if value else 0}'" if insert_value == "" else f", b'{1 if value else 0}'"
else:
insert_value += f"{value}" if insert_value == "" else f", {value}"
sql = f"insert into `{do.table_name}` ({insert_params}) values ({insert_value})"
try:
with self.mysql_conn.cursor() as cursor:
cursor.execute(sql)
self.mysql_conn.commit()
do.value["id"] = cursor.lastrowid
return cursor.lastrowid
except Exception as e:
log.info(f"执行 SQL 语句出错:{e}")
self.mysql_conn.rollback()
raise e
def query(self, do: BaseDO) -> tuple:
if len(do.table_name) == 0:
raise Exception("没有指定表名")
return_data = ""
query_data = ""
for (param, value) in do.value.items():
return_data += param if return_data == "" else f", {param}"
if value is not None:
query_data += param + "=" if query_data == "" else f" and {param}="
if str == type(value):
# 处理字符串
query_data += f"'{value}'"
elif time.struct_time == type(value):
# 处理时间
query_data += f"'{time.strftime('%Y-%m-%d %H:%M:%S', value)}'"
elif bool == type(value):
# 处理bool类型
query_data += f"b'{1 if value else 0}'"
else:
query_data += f"{value}"
sql = f"select {return_data} from {do.table_name}"
if query_data != "":
sql += f" where {query_data}"
# 执行sql语句
return self.execute(sql)
./mysql/__init__.py
# -*- coding: utf-8 -*-
"""
@Author : RichardoGu
@Time : 2024/6/13 17:16
"""
from .data_object import *
from .Mysql import MysqlUtil
./mysql/data_object/__init__.py
# -*- coding: utf-8 -*-
"""
DO
@Author : RichardoGu
@Time : 2024/6/13 20:15
"""
from .BaseDO import BaseDO
from .TenantDO import TenantDO
from .GraduateDO import GraduateDO
from .StandardDO import StandardDO
from .FileDO import FileDO
./mysql/data_object/BaseDO.py
# -*- coding: utf-8 -*-
"""
数据库基础类
@Author : RichardoGu
@Time : 2024/6/13 20:14
"""
import time
class BaseDO:
def __init__(self, table_name: str, creator: str, create_time: time.struct_time,
updater: str, update_time: time.struct_time):
self.table_name: str = table_name
self.value = {
"creator": creator,
"create_time": create_time,
"updater": updater,
"update_time": update_time,
"deleted": False
}
def __str__(self):
return str(self.value)
./mysql/data_object/FileDO.py
在BaseDO的基础上,实现了一个FileDO类,该类表示数据库中存放的文件信息,例如文件ID,文件名称,文件url等。
# -*- coding: utf-8 -*-
"""
文件DO
@Author : RichardoGu
@Time : 2024/6/14 15:18
"""
from .BaseDO import BaseDO
import time
class FileDO(BaseDO):
def __init__(self, ID: int = None, config_id: int = None, name: str = None,
path: str = None, url: str = None, Type: str = None, size: int = None,
creator: str = None, create_time: time.struct_time = None, updater: str = None,
update_time: time.struct_time = None):
super().__init__(
table_name="infra_file",
creator=creator,
create_time=create_time,
updater=updater,
update_time=update_time
)
self.value["id"] = ID
self.value["config_id"] = config_id
self.value["name"] = name
self.value["path"] = path
self.value["url"] = url
self.value["type"] = Type
self.value["size"] = size
4 范例
# -*- coding: utf-8 -*-
"""
文件处理入口
@Author : RichardoGu
@Time : 2024/6/14 11:02
"""
# 库导入
import tqdm
import logging
import os
import re
from mysql import MysqlUtil, FileDO
import mimetypes
# 项目配置
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s.%(msecs)03d [%(levelname)s] [%(filename)s:%(lineno)d] %(message)s',
datefmt='## %Y-%m-%d %H:%M:%S'
)
"""
==================== 全局变量 ==================
"""
# Mysql
MYSQL_HOST = "localhost"
MYSQL_PORT = 3306
if __name__ == "__main__":
file_do = FileDO(
config_id=1, name="file_name", path="save_name",
url="test",
Type="test", size=123,
)
with (MysqlUtil() as DB):
DB.insert(file_do)