快速开始
onestep 是一个轻量级异步任务运行时,帮助您轻松实现分布式任务调度。
安装
bash
pip install onestepbash
poetry add onestep可选依赖
bash
pip install 'onestep[mysql]'bash
pip install 'onestep[rabbitmq]'bash
pip install 'onestep[sqs]'bash
pip install 'onestep[all]'5 分钟快速上手
1. 创建任务
python
# tasks.py
from onestep import IntervalSource, OneStepApp
app = OneStepApp("demo")
@app.task(source=IntervalSource.every(seconds=10, immediate=True))
async def hello_task(ctx, _):
print(f"Hello from onestep! Time: {ctx.current.meta.get('scheduled_at')}")
if __name__ == "__main__":
app.run()2. 运行任务
bash
# 直接运行
python tasks.py
# 或使用 CLI
onestep run tasks:app3. 查看输出
Hello from onestep! Time: 2024-01-15T10:30:00+08:00
Hello from onestep! Time: 2024-01-15T10:30:10+08:00
Hello from onestep! Time: 2024-01-15T10:30:20+08:00
...核心概念
OneStepApp
任务注册和生命周期管理器:
python
app = OneStepApp("my-app")Source
数据输入源(队列、定时器、Webhook 等):
python
from onestep import IntervalSource, MemoryQueue, CronSource
# 定时器
source = IntervalSource.every(minutes=5)
# 内存队列
source = MemoryQueue("incoming")
# Cron
source = CronSource("0 * * * *")Sink
数据输出目标:
python
from onestep import MemoryQueue, RabbitMQConnector
# 内存队列
sink = MemoryQueue("output")
# RabbitMQ
sink = RabbitMQConnector("amqp://...").queue("results")Task
任务定义:
python
@app.task(source=source, emit=sink, concurrency=4)
async def my_task(ctx, item):
# ctx: 上下文
# item: 消息体
return {"processed": item}常用场景
定时任务
python
from onestep import CronSource
@app.task(source=CronSource("0 9 * * *", timezone="Asia/Shanghai"))
async def daily_report(ctx, _):
# 每天上午 9 点执行
...消息队列处理
python
from onestep import RabbitMQConnector
rmq = RabbitMQConnector("amqp://guest:guest@localhost/")
@app.task(source=rmq.queue("jobs"), concurrency=8)
async def process_job(ctx, item):
# 处理队列消息
...Webhook 接收
python
from onestep import WebhookSource
@app.task(source=WebhookSource(path="/webhook", port=8080))
async def handle_webhook(ctx, event):
# 处理 HTTP 请求
return {"status": "ok"}数据库增量同步
python
from onestep import MySQLConnector
db = MySQLConnector("mysql+pymysql://...")
@app.task(
source=db.incremental(
table="users",
key="id",
cursor=("updated_at", "id")
)
)
async def sync_user(ctx, row):
# 增量同步用户数据
...CLI 命令
bash
# 检查配置
onestep check tasks:app
# 运行应用
onestep run tasks:app
# 简写
onestep tasks:app
# 使用 JSON 输出(适合 CI)
onestep check --json tasks:appYAML 配置
yaml
# worker.yaml
app:
name: my-app
connectors:
timer:
type: interval
seconds: 30
immediate: true
output:
type: memory
tasks:
- name: my_task
source: timer
emit: output
handler:
ref: tasks:my_task运行:
bash
onestep run worker.yaml