Skip to content

功能特性

核心特性

🎯 简单直观

python
from onestep import OneStepApp, IntervalSource

app = OneStepApp("demo")


@app.task(source=IntervalSource.every(seconds=30))
async def my_task(ctx, item):
    print("处理:", item)

🔌 多种数据源

类型SourceSink描述
内存队列开发测试
定时器Cron/Interval
WebhookHTTP 接收
RabbitMQ分布式队列
AWS SQS云队列
MySQL表队列/增量同步
自定义任意数据源

🔄 灵活流转

python
# 自由组合不同 Broker
@app.task(
    source=CronSource("0 * * * *"),  # 定时触发
    emit=RabbitMQConnector("amqp://...").queue("results")  # 发到 MQ
)
async def scheduled_to_mq(ctx, _):
    return {"data": "..."}


@app.task(
    source=RabbitMQConnector("amqp://...").queue("results"),  # 从 MQ 消费
    emit=MySQLConnector("mysql://...").table_sink("results")  # 写入数据库
)
async def mq_to_db(ctx, item):
    return item

⚡ 并发控制

python
# 控制并发数
@app.task(source=..., concurrency=4)
async def task1(ctx, item):
    ...

# 高并发处理
@app.task(source=..., concurrency=64)
async def task2(ctx, item):
    ...

🛡️ 重试机制

python
from onestep import MaxAttempts

# 最多重试 3 次,每次间隔 1 秒
@app.task(
    source=...,
    retry=MaxAttempts(max_attempts=3, delay_s=1.0)
)
async def might_fail(ctx, item):
    ...

💀 死信队列

python
# 失败消息发送到死信队列
@app.task(
    source=main_queue,
    dead_letter=dead_letter_queue,
    retry=MaxAttempts(max_attempts=3)
)
async def risky_task(ctx, item):
    ...


# 处理死信
@app.task(source=dead_letter_queue)
async def handle_dead_letter(ctx, item):
    print(f"死信:{item}")

⏱️ 执行超时

python
# 30 秒超时
@app.task(source=..., timeout_s=30.0)
async def long_task(ctx, item):
    await asyncio.sleep(60)  # 会被取消

📊 事件监听

python
from onestep import InMemoryMetrics, StructuredEventLogger

app = OneStepApp("demo")

# 内置事件处理器
app.on_event(InMemoryMetrics())
app.on_event(StructuredEventLogger())

# 自定义事件处理器
class MyHandler(EventHandler):
    def on_event(self, event):
        if event.kind == "succeeded":
            print(f"成功:{event.task_name}")

app.on_event(MyHandler())

🔧 状态管理

python
from onestep import InMemoryStateStore

app = OneStepApp("demo", state=InMemoryStateStore())


@app.task(source=...)
async def track_runs(ctx, item):
    runs = await ctx.state.get("runs", 0)
    await ctx.state.set("runs", runs + 1)
    print(f"第 {runs + 1} 次运行")

🚀 生命周期钩子

python
@app.on_startup
async def bootstrap(app):
    print("应用启动")
    # 初始化资源

@app.on_shutdown
async def cleanup(app):
    print("应用关闭")
    # 清理资源

📝 YAML 配置

yaml
app:
  name: my-app

connectors:
  timer:
    type: interval
    minutes: 5
  queue:
    type: rabbitmq_queue
    connector: rmq
    queue: "jobs"

tasks:
  - name: process_jobs
    source: timer
    emit: queue
    retry:
      type: max_attempts
      max_attempts: 3

高级特性

任务编排

python
# 流水线处理
input_queue = MemoryQueue("input")
stage1_out = MemoryQueue("stage1")
stage2_out = MemoryQueue("stage2")


@app.task(source=input_queue, emit=stage1_out)
async def stage1(ctx, item):
    return item * 2


@app.task(source=stage1_out, emit=stage2_out)
async def stage2(ctx, item):
    return item + 1


@app.task(source=stage2_out)
async def final(ctx, item):
    print(f"结果:{item}")

优雅关闭

python
app = OneStepApp("demo", shutdown_timeout_s=30.0)


@app.task(source=...)
async def task(ctx, item):
    # 30 秒内未完成的任务会被取消
    await process(item)


# 请求关闭
@app.task(source=...)
async def shutdown_trigger(ctx, item):
    ctx.app.request_shutdown()

配置管理

python
app = OneStepApp(
    "my-app",
    config={
        "region": "cn",
        "debug": True,
        "batch_size": 100
    }
)


@app.task(source=...)
async def task(ctx, item):
    region = ctx.config["region"]
    debug = ctx.config.get("debug", False)

Control Plane 集成

python
from onestep import ControlPlaneReporter, ControlPlaneReporterConfig

app = OneStepApp("my-app")
reporter = ControlPlaneReporter(
    ControlPlaneReporterConfig.from_env(app_name=app.name)
)
reporter.attach(app)

# 自动推送:
# - 拓扑同步
# - 心跳
# - 指标
# - 事件

对比 0.5.x

特性0.5.x1.0.0
装饰器@step@app.task
消息来源from_brokersource
消息输出to_brokeremit
并发控制workersconcurrency
启动方式step.start()app.run() / CLI
重试策略TimesRetryMaxAttempts
中间件BaseMiddleware事件钩子
状态ctx.state
配置ctx.config
生命周期有限@app.on_startup/shutdown

下一步

Released under the MIT License.