MySQL
MySQL Connector 提供三种模式:
- 表队列: 将数据库表作为任务队列
- 增量同步: 基于
(updated_at, id)的 Logstash 风格同步 - 表输出: 将结果写入数据库表
安装
bash
pip install 'onestep[mysql]'表队列 (Table Queue)
将数据库表作为任务队列,通过更新状态字段来"领取"任务。
基本用法
python
from onestep import MySQLConnector, OneStepApp
app = OneStepApp("orders")
# 创建连接
db = MySQLConnector("mysql+pymysql://root:root@localhost:3306/app")
# 创建表队列 Source
source = db.table_queue(
table="orders",
key="id",
where="status = 0", # 查询条件:待处理
claim={"status": 9}, # 领取时设置:处理中
ack={"status": 1}, # 成功后设置:已完成
nack={"status": 0}, # 失败后设置:待处理(可重试)
batch_size=100, # 每次领取数量
)
# 创建表输出 Sink
sink = db.table_sink(
table="processed_orders",
mode="upsert", # 插入或更新
keys=("id",), # 唯一键
)
@app.task(source=source, emit=sink, concurrency=16)
async def process_order(ctx, row):
return {
"id": row["id"],
"payload": row["payload"],
"status": "done"
}
if __name__ == "__main__":
app.run()工作流程
- 查询
status = 0的记录 - 批量更新
status = 9(领取) - 执行任务
- 成功:更新
status = 1 - 失败:更新
status = 0(可重试)
状态管理
python
# 状态流转
where="status = 'pending'" # 待处理
claim={"status": "processing"} # 处理中
ack={"status": "completed"} # 已完成
nack={"status": "failed"} # 失败增量同步 (Incremental Sync)
基于 (updated_at, id) 实现增量数据同步,适合数据仓库场景。
基本用法
python
from onestep import MySQLConnector, OneStepApp
app = OneStepApp("sync-users")
db = MySQLConnector("mysql+pymysql://root:root@localhost:3306/app")
# 游标存储(持久化位置)
cursor_store = db.cursor_store(table="onestep_cursor")
# 增量同步 Source
source = db.incremental(
table="users",
key="id",
cursor=("updated_at", "id"), # 游标字段
where="deleted = 0", # 过滤条件
batch_size=1000, # 每批数量
state=cursor_store, # 状态存储
)
# 输出到内存队列
out = MemoryQueue("dw")
@app.task(source=source, emit=out, concurrency=1)
async def sync_user(ctx, row):
return {
"id": row["id"],
"name": row["name"],
"updated_at": row["updated_at"]
}工作原理
- 从
cursor_store读取上次位置 - 查询
updated_at > last_updated OR (updated_at = last_updated AND id > last_id) - 处理数据
- 更新
cursor_store中的位置
游标存储
python
# 数据库存储(推荐生产环境)
cursor_store = db.cursor_store(table="sync_cursor")
# 或状态存储
state_store = db.state_store(table="onestep_state")表输出 (Table Sink)
将处理结果写入数据库表。
Upsert 模式
python
sink = db.table_sink(
table="results",
mode="upsert",
keys=("id",), # 唯一键,存在则更新,不存在则插入
)
@app.task(source=..., emit=sink)
async def process(ctx, item):
return {"id": item["id"], "data": item["data"]}Insert 模式
python
sink = db.table_sink(
table="logs",
mode="insert", # 仅插入
)状态存储
State Store
键值对存储,用于任务状态:
python
state = db.state_store(table="onestep_state")
# 在任务中使用
@app.task(source=...)
async def process(ctx, item):
count = await ctx.state.get("processed_count", 0)
await ctx.state.set("processed_count", count + 1)Cursor Store
游标存储,用于增量同步位置:
python
cursor = db.cursor_store(table="sync_cursor")
source = db.incremental(
table="orders",
key="id",
cursor=("updated_at", "id"),
state=cursor,
)YAML 配置
yaml
connectors:
db:
type: mysql
url: "mysql+pymysql://root:root@localhost:3306/app"
order_queue:
type: mysql_table_queue
connector: db
table: "orders"
key: "id"
where: "status = 0"
claim:
status: 9
ack:
status: 1
batch_size: 100
results:
type: mysql_table_sink
connector: db
table: "results"
mode: "upsert"
keys:
- "id"
cursor:
type: mysql_cursor_store
connector: db
table: "sync_cursor"
tasks:
- name: process_orders
source: order_queue
emit: results
concurrency: 16最佳实践
1. 索引优化
sql
-- 表队列:确保查询条件有索引
CREATE INDEX idx_status ON orders(status);
-- 增量同步:确保游标字段有索引
CREATE INDEX idx_cursor ON users(updated_at, id);2. 批量大小
python
# 小批量:低延迟
batch_size=10
# 大批量:高吞吐
batch_size=10003. 并发控制
python
# 表队列:可高并发(行级锁)
@app.task(source=source, concurrency=16)
# 增量同步:建议单并发(保持顺序)
@app.task(source=incremental, concurrency=1)4. 连接池
python
# URL 参数配置连接池
db = MySQLConnector(
"mysql+pymysql://user:pass@host/db"
"?pool_size=10"
"&max_overflow=20"
"&pool_recycle=3600"
)