Orm框架-SQLAlchemy使用
1、介绍
SQLAlchemy 提供了一个全功能的 SQL 工具包和对象关系映射(ORM)系统,可以屏蔽不同数据库之间的语法差异,提升开发效率和代码的可维护性。
2、基础配置
from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
dbHost = 'mysql+pymysql://admin:admin@127.0.0.1:3306/filesystem'
engine = create_engine(
dbHost,
echo=True, # 是否打印SQL
pool_size=10, # 连接池的大小,指定同时在连接池中保持的数据库连接数,默认:5
max_overflow=20, # 超出连接池大小的连接数,超过这个数量的连接将被丢弃,默认: 5
)
# 创建会话工厂
Session = sessionmaker(bind=engine)
@contextmanager
def getSession(autoCommitByExit=True):
"""使用上下文管理资源关闭"""
session = Session()
try:
yield session
# 退出时,是否自动提交
if autoCommitByExit:
session.commit()
except Exception as e:
session.rollback()
raise e
finally:
session.close()
# YmUser.Base.metadata.create_all(engine)
3、模型-表映射
from sqlalchemy import Column, String, TIMESTAMP
from sqlalchemy import text
from sqlalchemy.dialects.mysql import BIGINT, TINYINT
from sqlalchemy.ext.declarative import declarative_base
# 模型父类
Base = declarative_base()
# 用户模型和表一一对应
class YmUser(Base):
__tablename__ = 'ym_user'
__table_args__ = {'comment': '用户表'}
id = Column(BIGINT, primary_key=True, comment='主键')
union_id = Column(String(64), comment='微信开放平台下的用户唯一标识')
open_id = Column(String(64), comment='微信openid')
nick_name = Column(String(32), index=True, comment='昵称')
password = Column(String(64), comment='密码')
avatar = Column(String(255), nullable=False, index=True, server_default=text("''"), comment='头像')
phone = Column(String(11), index=True, comment='手机号')
email = Column(String(50), comment='电子邮箱')
last_login = Column(String(20), comment='上次登录时间')
status = Column(TINYINT, server_default=text("'1'"), comment='状态;-1:黑名单 1:正常')
delete_at = Column(String(20), comment='删除时间')
created_at = Column(TIMESTAMP, comment='创建时间')
updated_at = Column(TIMESTAMP, comment='更新时间')
4、增删改查
import json
from datetime import datetime
from sqlalchemy import and_, or_, desc, update
import sqlsessionutil
from YmUser import YmUser
def queryRows():
""" 查询示例 """
with sqlsessionutil.getSession() as session:
query = session.query(YmUser).filter(
or_(
and_(
YmUser.id > 0,
YmUser.id < 200,
YmUser.nick_name.like("%飞%")
),
YmUser.phone.in_(["17600000001", "17600000002", "17600000003"])
)
)
result = query.all()
# 转成json
json_result = json.dumps([user.__dict__ for user in result], default=str)
print("json_result:", json_result)
for row in result:
print("id:{} nick_name:{} phone:{}".format(row.id, row.nick_name, row.phone))
return result
def addOne():
""" 新增单条数据 """
row = YmUser(
union_id="ui_12344343434",
open_id="op_ksjdhjjkdhdjdhh",
nick_name="娃哈哈",
password="123456",
email="test@163.com",
phone="17600000000",
last_login=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
avatar="http://img-avatar.com/head-abc.jpg"
)
# 这里想获取新增后的id,需要refresh数据,就不能在上下文里提交
with sqlsessionutil.getSession(False) as session:
session.add(row)
session.commit()
session.refresh(row)
print("添加成功,id:{}".format(row.id))
print("row:".format(row.__dict__))
def batchAdd():
""" 批量新增数据 """
rows = []
for n in range(3):
row = YmUser(
union_id="ui_12344343434",
open_id="op_ksjdhjjkdhdjdhh",
nick_name="娃哈哈" + str(n),
password="123456",
email="test@163.com",
phone="17600000000",
last_login=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
avatar="http://img-avatar.com/head-abc.jpg"
)
rows.append(row)
# 这里设置不在上下文中提交,否则报错
with sqlsessionutil.getSession() as session:
session.bulk_save_objects(rows)
def updateDictById(id: int, newVal: dict) -> int:
""" 根据id更新数据(值是字典) """
updateStmt = update(YmUser).where(YmUser.id == id).values(newVal)
with sqlsessionutil.getSession() as session:
result = session.execute(updateStmt)
rowcount = result.rowcount
return rowcount
def updateModelById(id: int):
""" 根据id更新数据(值是model) """
with sqlsessionutil.getSession() as session:
# 先查在更新
exist = session.query(YmUser).filter(YmUser.id == id).first()
if exist.id == 0:
return
exist.nick_name = "呵呵呵呵呵2222"
exist.email = "112233@qq.com"
def queryByPage(page: int, pageSize: int, conditions: dict):
""" 分页查询 """
# 计算起始索引
offset = (page - 1) * pageSize
with sqlsessionutil.getSession() as session:
query = session.query(YmUser)
# 填充查询条件
if len(conditions) > 0:
query = query.filter_by(**conditions)
# 查询总条数
total = query.count()
# 排序分页
query = query.order_by(desc(YmUser.id)).offset(offset).limit(pageSize)
# 查询记录
result = query.all()
return total, result
if __name__ == '__main__':
# addOne()
# batchAdd()
# updateModelById(1)
# updateVal = updateDictById(1, {
# "nick_name": "猿码记",
# "email": "猿码记@163.com",
# "status": -1,
# })
# queryRows()
conditions = {
"status": 1,
}
total, result = queryByPage(1, 5, conditions)
print(total)