SQLALchemy - Asynchronous
Engines, Metadata and Sessions 下面是一个异步的数据库链接示例: from sqlalchemy import MetaData from sqlalchemy.orm import DeclarativeBase from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine, async_sessionmaker class Model(DeclarativeBase): metadata = MetaData(naming_convention={ 'ix': 'ix_%(column_0_label)s', 'uq': 'uq_%(table_name)s_%(column_0_name)s', 'ck': 'ck_%(table_name)s_%(constraint_name)s', 'fk': 'fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s', 'pk': 'pk_%(table_name)s', }) pg_dsn: str = 'postgresql+psycopg://postgresql:postgresql@localhost:5432/postgresql' engine = create_async_engine(pg_dsn) session_factory = async_sessionmaker( bind=engine, expire_on_commit=False, # !!! prevent implicit synchronous refresh ) 可以看到与同步代码其实十分类似,一个重要的不同点是 session 的 expire_on_commit 参数。 这会禁止 SQLALchemy 的默认行为:在会话 session 提交后将模型 model 标记为过期。 标记为过期的模型再次访问其任何属性时,会隐式地从数据库查询中刷新。 由于隐式的 implicit 数据库活动不能出现在任何异步应用中,因此不应该使用过期对象。 expire_on_commit=False 选项确保在提交后,不会将任何模型标记为过期。 在异步高并发环境下,仅靠 expire_on_commit 可以保证程序不报错,但是不能保证数据的一致性。 在 long-lived session 中可能会有陈旧模型 stale model,下面是一些解决方法: 手动清理模型 expunge 或显示刷新 refresh,但是会多产生一次网络请求 # refresh example async def create_new_post(db: AsyncSession, title: str, content: str): new_post = Post(title=title, content=content) db.add(new_post) await db.commit() # 提交到数据库 await db.refresh() # 刷新:强制从数据库拉取最新的一行数据 print(f'PostID: {new_post.id}, CreateTime: {new_post.created_at}') return new_post # expunge example async def export_all_posts_title_to_csv(db: AsyncSession): result = await db.execute(select(Post)) posts = result.scalars().all() titles = [] for post in posts: titles.append(post.title) db.expunge(post) # 踢出 Session return titles 这样可以保证数据正确写入,但可能覆盖其他人的数据修改 ...