Orm框架-SQLAlchemy使用

1、介绍

SQLAlchemy 提供了一个全功能的 SQL 工具包和对象关系映射(ORM)系统,可以屏蔽不同数据库之间的语法差异,提升开发效率和代码的可维护性。

2、基础配置

from contextlib import contextmanager

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

dbHost = 'mysql+pymysql://admin:admin@127.0.0.1:3306/filesystem'
engine = create_engine(
    dbHost,
    echo=True,  # 是否打印SQL
    pool_size=10,  # 连接池的大小,指定同时在连接池中保持的数据库连接数,默认:5
    max_overflow=20,  # 超出连接池大小的连接数,超过这个数量的连接将被丢弃,默认: 5
)

# 创建会话工厂
Session = sessionmaker(bind=engine)


@contextmanager
def getSession(autoCommitByExit=True):
    """使用上下文管理资源关闭"""
    session = Session()
    try:
        yield session
        # 退出时,是否自动提交
        if autoCommitByExit:
            session.commit()
    except Exception as e:
        session.rollback()
        raise e
    finally:
        session.close()

# YmUser.Base.metadata.create_all(engine)

3、模型-表映射

from sqlalchemy import Column, String, TIMESTAMP
from sqlalchemy import text
from sqlalchemy.dialects.mysql import BIGINT, TINYINT
from sqlalchemy.ext.declarative import declarative_base

# 模型父类
Base = declarative_base()


# 用户模型和表一一对应
class YmUser(Base):
    __tablename__ = 'ym_user'
    __table_args__ = {'comment': '用户表'}

    id = Column(BIGINT, primary_key=True, comment='主键')
    union_id = Column(String(64), comment='微信开放平台下的用户唯一标识')
    open_id = Column(String(64), comment='微信openid')
    nick_name = Column(String(32), index=True, comment='昵称')
    password = Column(String(64), comment='密码')
    avatar = Column(String(255), nullable=False, index=True, server_default=text("''"), comment='头像')
    phone = Column(String(11), index=True, comment='手机号')
    email = Column(String(50), comment='电子邮箱')
    last_login = Column(String(20), comment='上次登录时间')
    status = Column(TINYINT, server_default=text("'1'"), comment='状态;-1:黑名单 1:正常')
    delete_at = Column(String(20), comment='删除时间')
    created_at = Column(TIMESTAMP, comment='创建时间')
    updated_at = Column(TIMESTAMP, comment='更新时间')

4、增删改查

import json
from datetime import datetime

from sqlalchemy import and_, or_, desc, update

import sqlsessionutil
from YmUser import YmUser


def queryRows():
    """ 查询示例 """
    with sqlsessionutil.getSession() as session:
        query = session.query(YmUser).filter(
            or_(
                and_(
                    YmUser.id > 0,
                    YmUser.id < 200,
                    YmUser.nick_name.like("%飞%")
                ),
                YmUser.phone.in_(["17600000001", "17600000002", "17600000003"])
            )
        )
        result = query.all()
        # 转成json
        json_result = json.dumps([user.__dict__ for user in result], default=str)
        print("json_result:", json_result)
        for row in result:
            print("id:{} nick_name:{} phone:{}".format(row.id, row.nick_name, row.phone))

    return result


def addOne():
    """ 新增单条数据 """
    row = YmUser(
        union_id="ui_12344343434",
        open_id="op_ksjdhjjkdhdjdhh",
        nick_name="娃哈哈",
        password="123456",
        email="test@163.com",
        phone="17600000000",
        last_login=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        avatar="http://img-avatar.com/head-abc.jpg"
    )
    # 这里想获取新增后的id,需要refresh数据,就不能在上下文里提交
    with sqlsessionutil.getSession(False) as session:
        session.add(row)
        session.commit()
        session.refresh(row)
    print("添加成功,id:{}".format(row.id))
    print("row:".format(row.__dict__))


def batchAdd():
    """ 批量新增数据 """
    rows = []
    for n in range(3):
        row = YmUser(
            union_id="ui_12344343434",
            open_id="op_ksjdhjjkdhdjdhh",
            nick_name="娃哈哈" + str(n),
            password="123456",
            email="test@163.com",
            phone="17600000000",
            last_login=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            avatar="http://img-avatar.com/head-abc.jpg"
        )
        rows.append(row)
    # 这里设置不在上下文中提交,否则报错
    with sqlsessionutil.getSession() as session:
        session.bulk_save_objects(rows)


def updateDictById(id: int, newVal: dict) -> int:
    """ 根据id更新数据(值是字典) """
    updateStmt = update(YmUser).where(YmUser.id == id).values(newVal)
    with sqlsessionutil.getSession() as session:
        result = session.execute(updateStmt)
        rowcount = result.rowcount
    return rowcount


def updateModelById(id: int):
    """ 根据id更新数据(值是model) """
    with sqlsessionutil.getSession() as session:
        # 先查在更新
        exist = session.query(YmUser).filter(YmUser.id == id).first()
        if exist.id == 0:
            return
        exist.nick_name = "呵呵呵呵呵2222"
        exist.email = "112233@qq.com"


def queryByPage(page: int, pageSize: int, conditions: dict):
    """ 分页查询 """
    # 计算起始索引
    offset = (page - 1) * pageSize
    with sqlsessionutil.getSession() as session:
        query = session.query(YmUser)
        # 填充查询条件
        if len(conditions) > 0:
            query = query.filter_by(**conditions)

        # 查询总条数
        total = query.count()
        # 排序分页
        query = query.order_by(desc(YmUser.id)).offset(offset).limit(pageSize)
        # 查询记录
        result = query.all()

    return total, result


if __name__ == '__main__':
    # addOne()
    # batchAdd()
    # updateModelById(1)
    # updateVal = updateDictById(1, {
    #     "nick_name": "猿码记",
    #     "email": "猿码记@163.com",
    #     "status": -1,
    # })
    # queryRows()
    conditions = {
        "status": 1,
    }
    total, result = queryByPage(1, 5, conditions)
    print(total)

results matching ""

    No results matching ""