Skip to content

快速开始

onestep 是一个轻量级 Python 异步任务运行时。它围绕 OneStepAppSourceSink 和任务处理函数组织代码,适合队列消费、定时同步、Webhook 接入和多阶段数据处理。

当前包版本为 1.2.6。文档站使用 VitePress 1.6.4,这是 2026-05-09 npm latest 对应的稳定版本;2.0.0-alpha.17 仍在 next 标签下。

安装

bash
pip install onestep
bash
uv add onestep
bash
poetry add onestep

按使用场景安装可选依赖:

bash
pip install 'onestep[yaml]'
bash
pip install 'onestep[mysql]'
bash
pip install 'onestep[rabbitmq]'
bash
pip install 'onestep[redis]'
bash
pip install 'onestep[sqs]'
bash
pip install 'onestep[all]'

第一个任务

创建 tasks.py

python
from onestep import IntervalSource, OneStepApp

app = OneStepApp("demo")


@app.task(source=IntervalSource.every(seconds=10, immediate=True))
async def hello(ctx, _):
    scheduled_at = ctx.current.meta["scheduled_at"]
    print(f"hello from onestep: {scheduled_at}")


if __name__ == "__main__":
    app.run()

运行:

bash
onestep run tasks:app
bash
python tasks.py

生产环境建议使用 CLI,因为它可以在启动前检查目标:

bash
onestep check tasks:app
onestep check --json tasks:app
onestep run tasks:app

onestep tasks:apponestep run tasks:app 的简写。

处理队列消息

MemoryQueue 同时实现了 SourceSink,适合本地开发和测试。

python
import asyncio

from onestep import MemoryQueue, OneStepApp

app = OneStepApp("memory-pipeline")
source = MemoryQueue("incoming")
sink = MemoryQueue("processed")


@app.task(source=source, emit=sink, concurrency=2)
async def double(ctx, item):
    return {"value": item["value"] * 2}


async def main():
    await source.publish({"value": 21})
    await app.serve()


asyncio.run(main())

真实部署时通常把输入或输出的 MemoryQueue 换成外部系统连接器,例如 RabbitMQ、Redis Streams、AWS SQS、MySQL,或把结果发送到 HTTP Sink。

使用外部连接器

python
from onestep import MySQLConnector, OneStepApp, RabbitMQConnector

app = OneStepApp("orders")
rmq = RabbitMQConnector("amqp://guest:guest@localhost/")
db = MySQLConnector("mysql+pymysql://user:pass@localhost/app")

jobs = rmq.queue("orders")
rows = db.table_sink(table="processed_orders", mode="upsert", keys=("id",))


@app.task(source=jobs, emit=rows, concurrency=8)
async def process_order(ctx, order):
    return {
        "id": order["id"],
        "status": "processed",
    }

YAML 配置

安装 onestep[yaml] 后,可以把运行时资源和任务拓扑写进 worker.yaml

yaml
app:
  name: billing-sync

resources:
  tick:
    type: interval
    minutes: 5
    immediate: true

tasks:
  - name: sync_billing
    source: tick
    handler:
      ref: your_package.handlers:sync_billing

检查并运行:

bash
onestep check --strict worker.yaml
onestep run worker.yaml

resources 是推荐写法。旧的 connectorssourcessinks 仍可读取,但新文档统一使用 resources

YAML 也支持把消息直接转发到 Sink。下面的任务没有 handler,运行时会把 incoming 的 payload 原样发送到 HTTP 端点:

yaml
resources:
  incoming:
    type: memory
  notify:
    type: http_sink
    url: "https://example.com/hooks/billing"

tasks:
  - name: forward_billing_event
    source: incoming
    emit: notify

下一步

Released under the MIT License.