0%

Python ORM基本使用

ORM概述

ORM = Object Relational Mapping(对象关系映射)。它是一种 让你用面向对象的方式操作数据库 的技术。

一句话解释:
ORM = 把 Python(或其他语言)里的对象 ↔ 数据库里的表记录 互相转换的工具。

举个最直观的例子:

不用 ORM(纯 SQL)

1
2
3
cursor.execute("SELECT id, name, age FROM users WHERE id = 1")
row = cursor.fetchone()
user = {"id": row[0], "name": row[1], "age": row[2]}

使用 ORM(以 SQLAlchemy 为例)
1
user = session.query(User).filter_by(id=1).first()

数据库里的users表,就像程序里的一个User类;每个数据行就是类的实例。

也就是说:

数据库表 → 类(Class)

数据行 → 对象(Object)

表字段 → 类属性(Attribute)

为什么要用ORM?

提升开发效率

数据库操作变成写对象操作,无需手写大量 SQL。

  • 不想写烦人的 SQL
  • 想快速 CRUD(增删查改)
  • ORM 能让操作像操作 Python 对象一样简单

代码更易维护、更优雅

ORM 的代码结构清晰:

1
User(name='Ming', age=18)

很容易理解,不像 SQL 字符串容易出错。

避免 SQL 注入风险

ORM 内部自动做参数化处理:

1
session.query(User).filter(User.name == name_input)

比拼接 SQL 安全得多。

底层数据库可随时切换

例如:

  • 开发用 SQLite
  • 部署换 MySQL、PostgreSQL、Oracle

ORM 只需改连接字符串,不用改所有 SQL。

提供丰富功能

ORM 通常内置:

  • 自动建表
  • 关系维护(外键、一对多、多对多)
  • 自动事务管理
  • 数据验证
  • 查询构建器(链式调用)

怎么使用ORM?

安装SQLAlchemy

Python中最常用的ORM框架是SQLAlchemy

安装方式:

1
pip install sqlalchemy

1
2
3
import sqlalchemy

sqlalchemy.__version__
'2.0.44'

定义模型(Model)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from sqlalchemy import Column, Integer, String, UniqueConstraint
from sqlalchemy.orm import declarative_base

Base = declarative_base()

class User(Base):
__tablename__ = "users" # 指定表格名称
__table_args__ = {"comment": "用户表,存用户信息"} # 表注释
# __table_args__ = {"schema": "myschema"} # 如果使用的是类似postgresql这样的数据库,可以通过这个表参数指定所使用的schema
# __table_args__ = (UniqueConstraint("name", "age", name="uk_user"),) # 联合唯一性约束,指name和age这两个参数不允许完全一样。

id = Column(Integer, primary_key=True, autoincrement=True, comment="编号")
name = Column(String(10), nullable=True, comment="姓名")
age = Column(Integer, nullable=True, comment="年龄")

创建数据库连接

轻量化的sqlite

1
2
3
4
5
6
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = create_engine("sqlite:///test.db") # 使用sqlite轻量级数据库
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

Mysql

需要使用到Pymysql驱动

1
pip install pymysql

1
2
3
4
5
6
7
8
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# 注意,需要自己先手动创建testdb数据库
# create database testdb charset utf8mb4;
engine = create_engine("mysql+pymysql://root:123456@localhost:3306/testdb", pool_pre_ping=True)
# pool_pre_ping=True 在从连接池“取出”一条数据库连接之前,先 ping(探测)一下这条连接是否仍然可用。如果连接断掉了,SQLAlchemy 会自动丢弃并重新创建新的连接。
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

PostgreSQL

需要使用到psycopg驱动

1
pip install psycopg[binary]

1
2
3
4
5
6
7
8
9
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# 注意,需要自己先手动创建testdb数据库
# create database testdb owner ming;
engine = create_engine("postgresql+psycopg://ming:123456@localhost:5432/testdb", pool_pre_ping=True)
# pool_pre_ping=True 在从连接池“取出”一条数据库连接之前,先 ping(探测)一下这条连接是否仍然可用。如果连接断掉了,SQLAlchemy 会自动丢弃并重新创建新的连接。
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

删除表

1
2
3
4
5
# 删除表
from sqlalchemy import text
with engine.connect() as conn:
conn.execute(text("DROP TABLE IF EXISTS users"))
conn.commit()

创建表

1
Base.metadata.create_all(engine)

插入数据

1
2
3
4
5
6
7
user_tables = {
"name": ["张三", "李四", "lm", "minglog", "wang"],
"age": [25, 21, 22, 18, 12]
}
for i in range(len(user_tables["name"])):
session.add(User(name=user_tables["name"][i], age=user_tables["age"][i]))
session.commit()

查询数据

1
2
3
4
5
# 查询这个表中的所有数据
all_res = session.query(User).all()
for res in all_res:
print(f"编号:{res.id},姓名:{res.name},年龄:{res.age}")

编号:1,姓名:张三,年龄:25
编号:2,姓名:李四,年龄:21
编号:3,姓名:lm,年龄:22
编号:4,姓名:minglog,年龄:18
编号:5,姓名:wang,年龄:12
1
2
3
4
# 通过filter_by对字段进行筛选
all_res = session.query(User).filter_by(name="minglog").all()
for res in all_res:
print(f"编号:{res.id},姓名:{res.name},年龄:{res.age}")
编号:4,姓名:minglog,年龄:18
1
2
3
4
# 通过复杂条件进行筛选
all_res = session.query(User).filter(User.age <= 18).all()
for res in all_res:
print(f"编号:{res.id},姓名:{res.name},年龄:{res.age}")
编号:4,姓名:minglog,年龄:18
编号:5,姓名:wang,年龄:12
1
2
3
4
# 多个条件进行合并
all_res = session.query(User).filter((User.age <= 18) & (User.name == "minglog")).all()
for res in all_res:
print(f"编号:{res.id},姓名:{res.name},年龄:{res.age}")
编号:4,姓名:minglog,年龄:18
1
2
3
4
# 多个条件进行合并
all_res = session.query(User).filter((User.age <= 18) | (User.name == "张三")).all()
for res in all_res:
print(f"编号:{res.id},姓名:{res.name},年龄:{res.age}")
编号:1,姓名:张三,年龄:25
编号:4,姓名:minglog,年龄:18
编号:5,姓名:wang,年龄:12
1
2
# 可通过__dict__获取所有的属性值
all_res[0].__dict__
{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState at 0x76974c7c37d0>,
 'name': '张三',
 'age': 25,
 'id': 1}

更新数据

1
2
3
4
5
6
# 先查出来
tmp_user = session.query(User).filter_by(name="minglog").first()
# 然后再对查出来的这个对象进行属性修改
tmp_user.name = "luoming"
# commit后就会对数据库中的对象进行更新
session.commit()
1
2
3
all_res = session.query(User).all()
for res in all_res:
print(f"编号:{res.id},姓名:{res.name},年龄:{res.age}")
编号:1,姓名:张三,年龄:25
编号:2,姓名:李四,年龄:21
编号:3,姓名:lm,年龄:22
编号:5,姓名:wang,年龄:12
编号:4,姓名:luoming,年龄:18

删除数据

1
2
3
4
5
6
session.delete(all_res[0])
session.commit()

all_res = session.query(User).all()
for res in all_res:
print(f"编号:{res.id},姓名:{res.name},年龄:{res.age}")
编号:2,姓名:李四,年龄:21
编号:3,姓名:lm,年龄:22
编号:5,姓名:wang,年龄:12
编号:4,姓名:luoming,年龄:18

在实际的开发使用中往往会和pydantic构成的Model结合使用

FastAPI通过pydantic可以严格的规定,接口的请求和响应的参数格式与字段类型

定义请求与响应字段格式

1
2
3
4
5
6
7
from pydantic import BaseModel, Field

class UserRequest(BaseModel):
name: str = Field(None, description="姓名")

class UserResponse(BaseModel):
age: int = Field(None, description="年龄")

定义路由

1
2
3
4
5
6
7
8
9
10
11
12
from fastapi import FastAPI, APIRouter

# 通过APP与路由的拆解可以很轻松的对API进行扩展
app = FastAPI()
router = APIRouter(prefix="/user", tags=["Users"]) # prefix定义路由的前缀

@router.get("/get_age", response_model=UserResponse, description="用户年龄+1")
async def age_add(request: UserRequest):
user = session.query(User).filter_by(name=request.name).first()
if user is None:
return {"age": 0}
return {"age": user.age}

路由注册

1
2
# 实际中使用的router可以是不同脚本中的多个路由,这里我们仅使用当前定义的router为例
app.include_router(router)

异步开启接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# jupyter lab
import uvicorn
import asyncio
import nest_asyncio

# 1. 应用 nest_asyncio 补丁
nest_asyncio.apply()

# 2. 配置 Uvicorn
# 确保 app 变量(您的 FastAPI/Starlette 应用实例)已经定义
config = uvicorn.Config(
app,
host="0.0.0.0",
port=8000,
# 建议在 Notebook 中禁用热重载,因为它经常导致问题
reload=False
)

# 3. 创建 Server 实例
server = uvicorn.Server(config=config)

# 4. 获取当前事件循环并运行服务
# 检查当前是否有循环在运行(通常在 Notebook 中是有的)
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.get_event_loop()

# 运行 server.serve() 协程直到完成
# 注意:在 Notebook 中运行服务通常会阻塞该单元格,除非您使用 threading/multiprocessing 或其他异步方式。
loop.run_until_complete(server.serve())
INFO:     Started server process [31380]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [31380]
1
2
3
4
5
6
7
8
# jupyter notebook
import nest_asyncio
nest_asyncio.apply()

import uvicorn

uvicorn.run(app, host="0.0.0.0", port=8000)

-------------本文结束感谢您的阅读-------------