Just some daily notes about technology, programming, and life.
- 🔨 Currently developing and learning
- 📝 Sharing thoughts and experiences
- 🌱 Always growing
Just some daily notes about technology, programming, and life.
这篇文章讲 Python 中的并发,例如多线程 multithreading, 多进程 multiprocessing, 竟态条件 race conditions 以及同步状态 synchronization 的机制,例如锁。 然后会探讨如何关闭 GIT 来实现 Python 中真正的多线程,并通过清晰的代码来突出差异、优点以及注意事项。 Introduction 你可能会好奇为什么会需要并发。 大多数情况下,并不需要,但如果是以下情况可能需要: 数据处理 data processing 和*ETL* - 解析大量文本文件、清理混乱的数据、或将复杂的正则表达式应用到数百万行上 加密 cryptography 和哈希 hashing - 读取文件并计算每一行的加密哈希 (例如 SHA-256) 数据科学 - 计算蒙特卡罗模拟 Monte Carlo simulations 要求大量数学计算,例如 numpy, pandas 或 scikit-learn 网络操作 - 下载文件,爬取网站或调用 REST API 但首先,这里先把术语理清楚: Sequential 串行:单个进程一次只做一件事,在做下一件事情之前会等待直到完成后再开始下一个进程 Concurrency 并发:单个进程同时做多件事,但不一定在同一时刻做 Event Loop 事件循环: 一种控制接口,持续等待事件(例如 I/O 操作、定时触发或用户操作),分派对应的任务,然后重复这个过程 Parallelism/Multiprocessing 并行/多进程: 在同一时刻做多件事 Multithreading 多线程: 这种编程模型是单个进程生成多个独立的执行线程,从而达到并发。所有线程都共享内存空间和资源。 线程安全 thread safety 是指程序或系统的一种特性,它允许多个线程同时访问和修改共享内存与资源,而不会造成数据损坏、内存泄漏或程序崩溃。
在前面一篇文章,介绍了索引的结构,并解释了慢索引的原因。 下面会介绍如何发现并避免这些问题,先从 where 语句开始。 where 语句定义了 SQL 的搜索条件,因此它属于索引的核心功能范畴:快速寻找数据。 即使 where 语句对性能有很大影响,但它常常被随意使用,导致数据库不得不扫描索引的大部分内容。 结果导致:一个编写不当的 where 子句是导致查询缓慢的首要因素。 本章解释不同运算符如何影响索引的使用,以及如何确保索引能适用于尽可能多的查询。 The Equality Operator 等号是 SQL 中最简单也最常用的符号。 影响性能的索引错误仍然非常普遍,尤其是结合多个条件的 where 子句中,这种错误尤为常见。 本章展示如何验证索引的使用,并解释如何通过连接索引 combined conditions 来优化组合条件。 Primary Keys 下面从最简单的 where 子句开始:主键查询。 这里使用 EMPLOYEES 表做示例: CREATE TABLE employees ( employee_id NUMBER NOT NULL, first_name VARCHAR(1000) NOT NULL, last_name VARCHAR(1000) NOT NULL, date_of_birth DATE NOT NULL, phone_number VARCHAR(1000) NOT NULL, CONSTRAINT employess_pk PRIMARY KEY (employee_id) ); 数据库会自动为主键创建索引,这意味着 employee_id 列会有一个索引,即使没有 create index 语句。 ...
Index 索引是数据库中一个独特的结构,通过使用 create index 语句来构建。 这意味着一个索引是冗余的,创建索引不会改变表数据。 它会创建一个新的数据结构,并指向该表。 总而言之,索引有自己的空间,且高度冗余 redundant,并指向存储在不同位置的实际信息。 Clustered Indexes SQL Server 和 Mysql (InnoDB) 对索引是什么有更加广泛的视角。 它们将完全由索引结构构成的表,称为聚集索引 clustered indexes。 这些表在 Oracle 叫 Index Organized Tables (IOT)。 搜索数据库索引就像查找电话目录一样,核心思想是所有实例都是按顺序排列的。 查找有序的数据快速且简单,因为顺序决定了条目位置。 数据库索引比电话目录更加复杂一些,因为它会不断变化。 每次变化都去修改目录是不可行的,因为实体之间没有额外空间。 目录修改一般会等到累计到足够的变化后再处理,而数据库等不了那么久, 它必须处理 insert 插入,delete 删除和 update 更新。 数据库结合了两种数据结构来应对挑战: 双向链表 double linked list 和搜索树 search tree,这两种结构解释了数据库的大部分性能特征。 The Index Leaf Nodes 索引的首要目标是提供索引数据的排序表示。 然而,由于插入语句 insert statement 需要为新的数据移动后续条目。 移动大量数据是十分耗时的,这样会导致 insert 插入语句很慢。 解决方案是建立一个独立于内存物理顺序的逻辑顺序。 逻辑顺序通过双向链表建立,每个头都有指向两侧邻居的指针。 新 node 插入的时候,修改两侧的指针指向。 node 的物理地址不重要,因为双向链表维护的是逻辑顺序。 数据库使用双向链表来连接所谓是索引叶节点 leaf nodes。 每个叶节点存储在一个数据库块 database block 或叶 page 中,即数据库的最小存储单元。 每块都是一样的大小,通常几千字节 kilobytes。 数据库会尽可能利用每个块的空间,并在每个块中存储尽可能多的索引条目。 这意味着索引顺序在两个层面维护:每个叶节点内的索引条目,以及叶节点之间通过双向链表连接。 ...
Engines, Metadata and Sessions 下面是一个异步的数据库链接示例: from sqlalchemy import MetaData from sqlalchemy.orm import DeclarativeBase from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine, async_sessionmaker class Model(DeclarativeBase): metadata = MetaData(naming_convention={ 'ix': 'ix_%(column_0_label)s', 'uq': 'uq_%(table_name)s_%(column_0_name)s', 'ck': 'ck_%(table_name)s_%(constraint_name)s', 'fk': 'fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s', 'pk': 'pk_%(table_name)s', }) pg_dsn: str = 'postgresql+psycopg://postgresql:postgresql@localhost:5432/postgresql' engine = create_async_engine(pg_dsn) session_factory = async_sessionmaker( bind=engine, expire_on_commit=False, # !!! prevent implicit synchronous refresh ) 可以看到与同步代码其实十分类似,一个重要的不同点是 session 的 expire_on_commit 参数。 这会禁止 SQLALchemy 的默认行为:在会话 session 提交后将模型 model 标记为过期。 标记为过期的模型再次访问其任何属性时,会隐式地从数据库查询中刷新。 由于隐式的 implicit 数据库活动不能出现在任何异步应用中,因此不应该使用过期对象。 expire_on_commit=False 选项确保在提交后,不会将任何模型标记为过期。 在异步高并发环境下,仅靠 expire_on_commit 可以保证程序不报错,但是不能保证数据的一致性。 在 long-lived session 中可能会有陈旧模型 stale model,下面是一些解决方法: 手动清理模型 expunge 或显示刷新 refresh,但是会多产生一次网络请求 # refresh example async def create_new_post(db: AsyncSession, title: str, content: str): new_post = Post(title=title, content=content) db.add(new_post) await db.commit() # 提交到数据库 await db.refresh() # 刷新:强制从数据库拉取最新的一行数据 print(f'PostID: {new_post.id}, CreateTime: {new_post.created_at}') return new_post # expunge example async def export_all_posts_title_to_csv(db: AsyncSession): result = await db.execute(select(Post)) posts = result.scalars().all() titles = [] for post in posts: titles.append(post.title) db.expunge(post) # 踢出 Session return titles 这样可以保证数据正确写入,但可能覆盖其他人的数据修改 ...
多对多类型,何其名称暗示的一样,当无法认定任何一方为 “一” 的时候使用。 Many-To-Many Relationships 在标准的 one-to-many 一对多关系中,“多” 的一方会有一个 foreign key 外键指向 “一” 的一方。 但是在尝试建立 Product 和 Country 之前的关系时,在产品中添加指向国家的外键不行,因为这样一个产品就只能来自一个国家。 在国家中添加外键指向产品也不行,这样一个国家就只能对应一种产品。 How Many-To-Many Relationships Work 为了实现多对多关系,需要两个一对多关系来表示这种复杂的关系。 由于无法在两个表之间建立直接的多对多关系,因此需要添加一个称为连接表 join table 的第三个表。 每一边都和 join table 维护一个一对多关系,这意味着连接表有两个外键,指向两个表。 例如:products 表的 id 和 products_countries 的 products_id 形成 1:N 关系,countries 表的 id 和 cuontry_id 形成 1:N 关系。 这里的 products_countries 表就是 join table 连接表,常见的命名规范就是使用构成关系的两个实体的名称。 如果一个产品在 3 个国家生产,那么 join table 连接表就会有 3 对实例。 从另一个方面来看,对于一个已经创建了 7 个产品的国家,将会有对应数量的条目。 其 country_id 被设置为该国家,每个条目都将其与其中一个产品关联起来。 ...
One-To-Many Relationships 在之前的文章介绍了如何产品表的,有趣的事,有些查询是为了查找产品制造商,而不是产品本身。 这里会介绍如何分组去重。 很多时候,分组都十分有用,尤其是使用聚合函数 aggregation functions 计算为存储在数据库中的分组信息。 当次要数据存储在与主要实体相同的表中时,会出现重复的问题。 因为表可能会变得比实际需要的大得多,而重复数据中的拼写差异可能导致分组结果错误。 One-To-Many Relationships Implementation 关系数据库通过关系 relationships 创建链接,有两种主要的关系: One-to-many: 一对多 Many-to-many: 多对多 一对多是说,有表 A 和 B,A 中的项可以对应 B 中的任意多项,而 B 中的项最多只能对应 A 中的一项。 该模式与电脑制造商和其产品的关系相同,制造商可以制造多种型号产品,而每种产品只能有一个制造商。 这里制造商就是一 “one”,产品就是多 “many”。 定义关系的第一步是先为这两个实体创建数据库表。 因此要定义 products 表和 manufacturer 表。 class Manufacturer(Model): __tablename__ = 'manufacturers' id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(String(64), index=True, unique=True) def __repr__(self): return f'Manufacturer({self.id}, "{self.name}")' 原始的 Product 表中的 manufacturer 项被移除,并使用 manufacturer_id 替代: from sqlalchemy import ForeignKey class Product(Model): __tablename__ = 'products' id: Mapped[int] = mapped_column(primary_key) name: Mapped[str] = mapped_column(String(64), index=True, unique=True) manufacturer_id: Mapped[int] = mapped_column(ForeignKey('manufacturers.id'), index=True) year: Mapped[int] = mapped_column(index=True) country: Mapped[str] = mapped_column(String(32)) cpu: Mapped[Optional[str]] = mapped_column(String(32)) def __repr__(self): return f'Product({self.id}, "{self.name}")' 引用其他表的主键的列,被称为外键 foreign key,并会给予一个外键约束。 一个好的命名惯例是使用被引用的表,并添加后缀 _id。 ...
这篇文章将通过一个**“为阿里云 Qwen API 添加缓存”**的例子,带你彻底理解 Python 装饰器。 首先,我们定义一个基础的调用函数: import time def call_qwen(messages: list, model: str = 'qwen-max', temperature: float = 0.7): '''调用 Qwen API (模拟)''' print(f'正在请求 API (模型: {model})...') time.sleep(1) # 模拟网络耗时 return {'content': '这是 AI 的回复', 'usage': 100} 1. 函数是一等公民 (First-Class Citizen) 在 Python 中,函数可以像变量一样被传递和赋值。 # 1. 赋值给变量 run_api = call_qwen # 2. 作为参数传递 def logger(func, *args, **kwargs): print('[INFO] Calling qwen ...') return func(*args, **kwargs) logger(call_qwen, [{'role': 'user', 'content': '你好'}]) 2. 闭包 (Closure) 闭包是指函数内部定义了另一个函数,并且内部函数引用了外部函数的变量。它是实现装饰器的基石。 利用闭包,我们可以创建一个带缓存功能的函数: def make_cached_qwen(): cache = {} # 外部函数的变量,会被内部函数“捕获” def wrapped(messages, **kwargs): # 简单起见,用最后一条消息的内容当 Key key = messages[-1]['content'] if key in cache: print(' 命中缓存') return cache[key] result = call_qwen(messages, **kwargs) cache[key] = result return result return wrapped # 此时 cached_call 就是一个带有自己 “私有缓存字典” 的函数 cached_call = make_cached_qwen() 3. 高阶函数 (High-Order Function) 如果我们想让缓存逻辑通用化,不只针对 call_qwen,我们可以写一个接收函数作为参数的高阶函数: ...
SQLALchemy Core and SQLALchemy ORM SQLALchemy 分为两个模块:Core 和 ORM (Object-Relational Mapping)。 Core 模块包含对所有受支持数据库方言的集成逻辑,一组用于描述数据库表的类,用于 Python 语言生成 SQL 语句。 ORM 模块在 Python 应用程序中引入了一层抽象,使得许多数据库操作可以根据对 Python 对象执行的操作自动推导出来。 Database Engine SQLALchemy 使用 engine 对象来管理数据库连接,包含 Core 和 ORM 应用。 create_engine() 函数通过数据库 url 创建一个 engine。 格式为: {dialet}{+driver}://{username}:{password}@{hostname}:{port}/{database} 其中 SQLite 比较特殊,无需 driver。 对于导入 DATABASE_URL 有两种方式,一种是使用 load_dotenv() 加载 .env 文件的环境变量 import os from dotenv import load_dotenv from sqlalchemy import create_engine load_dotenv() engine = create_engine(os.environ['DATABASE_URL']) 另一种是通过 Pydantic BaseSettings import os from pathlib import Path from pydantic_settings import BaseSettings from pydantic impot SecretStr from functools import lru_cache from sqlalchemy import create_engine class Settings(BaseSettings): POSTGRESQL_HOST: str POSTGRESQL_PORT: int POSTGRESQL_USER: str POSTGRESQL_PASSWORD: SecretStr POSTGRESQL_DB: str @property def postgres_db_url(self) -> str: # 通常无需显示地写 driver,只有更换 dirver 时才需要写 return f'postgresql://{self.POSTGRESQL_USER}:{self.POSTGRESQL_PASSWORD.get_secret_value()}@{self.POSTGRESQL_HOST}:{self.POSTGRESQL_PORT}/{self.POSTGRESQL_DB}' class Config: env_file = str(Path(__file__).parent / '.env') case_sensitive = True extra = 'ignore' # 忽略额外字段 @lru_cache def get_settings() -> Settings: return Settings() # 使用示例 settings = get_settings() print('Host:', settings.POSTGRESQL_HOST) print('Password:', settings.PASSWORD.get_secret_value()) engine = create_engine(settings.postgres_db_url) create_engine() 函数有以下配置参数: ...
Python 程序由 modules 和 packages 组成,使用 import 语句导入。 Modules and the import Statement 任何 Python 源文件都可以作为一个模块导入,例如下面 module.py 代码: a = 37 def func(): print(f'func says that a is {a}') class SomeClass: def method(self): print('method says hi') print('loaded module') 改文件包含一些常见的编程元素,包括一个全局变量、一个函数、一个类定义 和 最后的语句。 通过下面方法导入: import module module.a module.func() s = module.SomeClass() s.method() 执行 import 会发送下面这几件事: 加载模块源码,如果找不到抛出 ImportError 创建新模块对象。该对象作为模块内所有全局定义 global defintions 的容器,被称为 “命名空间” namespace 该模块源码在新创建的模块命名空间内执行 如果没有错误发生,调用者会创建一个名称,指向新的模块对象。该名称与模块名称一致,但不包含任何文件后缀。 这些步骤中,第一步是最复杂的。新手容易犯的错误就是使用错误的名称或将代码放到了未知的位置。 且模块文件必须放在 sys.path 所包含的文件路径中,且文件名称要遵循和 python 变量一样的规则。 剩下的步骤都隔离在一个模块中,因此不用担心不同模块间命名冲突的问题。 Python import 会执行所有导入的源码,因此导入上面模块会输出 loaded module。 ...
使用 vars() 方法返回对象的属性和值的字典对象,不带参数时,返回当前局部作用域的变量字典 vars() # 相当于 locals() class Person: name: str age: int def __init__(self, name, age): self.name = name self.age = age self.city = "Beijing" p = Person("Alice", 25) print(vars(p)) Attribute Access 一个示例只有三种基础的方法:getting, setting 和 deleting 属性 class Attribute: owner: str blance: float def __init__(self, owner: str, balance: float): self.owner = owner self.balance = balance def __repr__(self): return f"Account({self.owner!r}, {self.balance!r})" def deposite(self, amount: float): self.balance += amount def withdraw(self, amount: float): self.balance -= amount def inquiry(self) -> float: return self.balance 例如 a = Account("Guido", 1000.0) a.owner # get a.balance = 75 # set del a.balance # delete Python 中的一切都是一个动态过程,几乎没有什么限制。 例如,可以给已创建的对象添加新属性: a = Account("Guido", 1000.0) a.creation_date = "2019-02-14" a.nickname = "Fromer BDFL" 有时候不适用点 . 操作符来执行任务,而是通过将属性名传递给 getattr(), setattr() 和 delattr() 函数来实现。 hasattr() 函数允许你测试一个已存在的属性: a = Account("Guido", 1000.0) getattr(a, "owner") setattr(a, "balance", 750.0) delattr(a, "balance") hasattr(a, "balance") # False getattr(a, "withdraw")(100) # Method Call # a = Account("Guido", 650.0) getattr() 函数可以携带一个默认值,如果想要查看一个可能不存在的属性,可以这样实现: ...