网站首页 > 技术文章 正文
引言:
DBConnector 类的目的是提供一个简洁且可扩展的数据库连接管理工具。它封装了与数据库交互的常见操作,比如执行查询、插入、更新、删除等,并且支持异步执行查询任务。
代码封装如下:
其中 self.config 配置的是数据库连接信息, 这里直接填写了对应的信息(测试需要这里使用了此方法)。 也可通过读取配置文件来使变量参数化(推荐)
import os
import pymssql
from concurrent.futures import ThreadPoolExecutor
class DBConnector:
def __init__(self, config=None):
# 配置数据库连接信息
self.config = config
self.connection = None # 单一数据库连接
self.setup_connection() # 初始化连接
def setup_connection(self):
"""初始化数据库连接"""
try:
self.connection = pymssql.connect(
server=self.config.get('host'),
user=self.config.get('user'),
password=self.config.get('password'),
database=self.config.get('db'),
charset='cp936',
as_dict=True
)
except Exception as e:
print(f"连接数据库失败: {e}")
raise e
def execute_query(self, query, params=None):
"""执行更新、删除、插入等非SELECT查询"""
if not self.connection:
print("数据库连接尚未建立")
return None
try:
with self.connection.cursor() as cursor:
cursor.execute(query, params)
self.connection.commit()
except Exception as e:
print(f"执行查询失败: {e}")
raise e
def fetch_query(self, query, params=None):
"""执行SELECT查询并返回结果"""
if not self.connection:
print("数据库连接尚未建立")
return None
try:
with self.connection.cursor() as cursor:
cursor.execute(query, params)
results = cursor.fetchall()
return results
except Exception as e:
print(f"执行查询失败: {e}")
raise e
def determine_query_type(self, query):
"""自动判断查询类型"""
query_upper = query.strip().upper()
if query_upper.startswith("SELECT"):
return 'fetch'
else:
return 'execute'
def async_execute(self, query, params=None):
"""异步执行SQL查询"""
query_type = self.determine_query_type(query)
results = []
with ThreadPoolExecutor() as executor:
futures = []
if query_type == 'fetch':
futures.append(executor.submit(self.fetch_query, query, params))
else:
futures.append(executor.submit(self.execute_query, query, params))
for future in futures:
try:
result = future.result()
if result is not None:
results.append(result)
except Exception as e:
print(f"执行任务失败: {e}")
return results # 返回所有查询的结果
'''async_execute 仍然可以并行执行查询,但因为只有一个连接,'''
'''所以实际上不会并行查询多个数据库。这里只是简化了异步操作的结构,保证可以继续用于未来扩展,或者在需要的情况下并行执行其他操作。'''
if __name__ == "__main__":
# 示例:连接到数据库并执行查询
config = { 'host': '数据库ip', 'user': '用户名', 'password': '密码', 'db': '库名', 'charset': 'utf-8' }
connector = DBConnector(config)
# 执行更新操作(插入、更新、删除等)
connector.async_execute('UPDATE table SET lastsysdate = "20250101"')
# 执行查询操作
select_results = connector.async_execute('SELECT * FROM 表名')
for result in select_results:
print(result)
猜你喜欢
- 2025-01-01 Python的一个轻量级桌面GUI开发第三方库:Eel
- 2025-01-01 隐藏彩蛋:你知道python有一个内置的数据库吗?
- 2025-01-01 Python编程绝对初学者指南,一步一步的指南,有示例和实验练习
- 2025-01-01 python服务器socket编程
- 2025-01-01 「大数据」Hana入门
- 2025-01-01 搭建Python自带静态Web服务器
- 2025-01-01 python套接字socket编程搭建简易服务器,完成计算器实例
- 2025-01-01 Python快速搭建HTTP服务
- 2025-01-01 手把手教你使用Python轻松搞定发邮件
- 2025-01-01 软件测试工程师必会技术:Python带你上手WebSocket
- 1509℃桌面软件开发新体验!用 Blazor Hybrid 打造简洁高效的视频处理工具
- 523℃Dify工具使用全场景:dify-sandbox沙盒的原理(源码篇·第2期)
- 491℃MySQL service启动脚本浅析(r12笔记第59天)
- 470℃服务器异常重启,导致mysql启动失败,问题解决过程记录
- 468℃启用MySQL查询缓存(mysql8.0查询缓存)
- 448℃「赵强老师」MySQL的闪回(赵强iso是哪个大学毕业的)
- 428℃mysql服务怎么启动和关闭?(mysql服务怎么启动和关闭)
- 425℃MySQL server PID file could not be found!失败
- 最近发表
- 标签列表
-
- c++中::是什么意思 (83)
- 标签用于 (65)
- 主键只能有一个吗 (66)
- c#console.writeline不显示 (75)
- pythoncase语句 (81)
- es6includes (73)
- windowsscripthost (67)
- apt-getinstall-y (86)
- node_modules怎么生成 (76)
- chromepost (65)
- c++int转char (75)
- static函数和普通函数 (76)
- el-date-picker开始日期早于结束日期 (70)
- js判断是否是json字符串 (67)
- checkout-b (67)
- localstorage.removeitem (74)
- vector线程安全吗 (70)
- & (66)
- java (73)
- js数组插入 (83)
- linux删除一个文件夹 (65)
- mac安装java (72)
- eacces (67)
- 查看mysql是否启动 (70)
- 无效的列索引 (74)