Skip to content

功能特性

核心特性

简单直观

python
from onestep import IntervalSource, OneStepApp

app = OneStepApp("demo")


@app.task(source=IntervalSource.every(seconds=30))
async def my_task(ctx, item):
    print("处理:", item)

多种数据源

类型SourceSink描述
内存队列支持支持开发测试
定时器支持不支持Cron/Interval
Webhook支持不支持HTTP 接收
HTTP Sink不支持支持HTTP JSON 输出
RabbitMQ支持支持分布式队列
Redis Streams支持支持轻量级流队列
AWS SQS支持支持云队列
MySQL支持支持表队列/增量同步/表输出
自定义支持支持任意数据源

灵活流转

python
from onestep import CronSource, MySQLConnector, RabbitMQConnector

rmq = RabbitMQConnector("amqp://...")
db = MySQLConnector("mysql+pymysql://...")
results = rmq.queue("results")


@app.task(source=CronSource("0 * * * *"), emit=results)
async def scheduled_to_mq(ctx, _):
    return {"data": "..."}


@app.task(source=results, emit=db.table_sink(table="results"))
async def mq_to_db(ctx, item):
    return item

并发控制

python
@app.task(source=..., concurrency=4)
async def task1(ctx, item):
    ...


@app.task(source=..., concurrency=64)
async def task2(ctx, item):
    ...

重试机制

python
from onestep import MaxAttempts


@app.task(
    source=...,
    retry=MaxAttempts(max_attempts=3, delay_s=1.0),
)
async def might_fail(ctx, item):
    ...

max_attempts 包含首次执行。上面的配置表示首次执行失败后最多再尝试 2 次。

死信队列

python
@app.task(
    source=main_queue,
    dead_letter=dead_letter_queue,
    retry=MaxAttempts(max_attempts=3),
)
async def risky_task(ctx, item):
    ...


@app.task(source=dead_letter_queue)
async def handle_dead_letter(ctx, item):
    print(item["payload"])
    print(item["failure"])

执行超时

python
@app.task(source=..., timeout_s=30.0)
async def long_task(ctx, item):
    await asyncio.sleep(60)

超过 timeout_s 后,运行时会取消任务并按失败流程处理。

事件监听

python
from onestep import InMemoryMetrics, OneStepApp, StructuredEventLogger, TaskEventKind

app = OneStepApp("demo")
metrics = InMemoryMetrics()

app.on_event(metrics)
app.on_event(StructuredEventLogger())


@app.on_event
def log_success(event):
    if event.kind is TaskEventKind.SUCCEEDED:
        print(f"成功:{event.task}")

状态管理

python
from onestep import InMemoryStateStore, OneStepApp

app = OneStepApp("demo", state=InMemoryStateStore())


@app.task(source=...)
async def track_runs(ctx, item):
    runs = await ctx.state.get("runs", 0)
    await ctx.state.set("runs", runs + 1)
    print(f"第 {runs + 1} 次运行")

生命周期钩子

python
@app.on_startup
async def bootstrap(app):
    print("应用启动")


@app.on_shutdown
async def cleanup(app):
    print("应用关闭")

YAML 配置

yaml
app:
  name: my-app

resources:
  rmq:
    type: rabbitmq
    url: amqp://guest:guest@localhost/
  timer:
    type: interval
    minutes: 5
  queue:
    type: rabbitmq_queue
    connector: rmq
    queue: jobs
  notify:
    type: http_sink
    url: "https://example.com/hooks/jobs"

tasks:
  - name: process_jobs
    source: timer
    emit: [queue, notify]
    handler:
      ref: myapp.tasks:process_jobs
    retry:
      type: max_attempts
      max_attempts: 3

高级特性

任务编排

python
from onestep import MemoryQueue

input_queue = MemoryQueue("input")
stage1_out = MemoryQueue("stage1")
stage2_out = MemoryQueue("stage2")


@app.task(source=input_queue, emit=stage1_out)
async def stage1(ctx, item):
    return item * 2


@app.task(source=stage1_out, emit=stage2_out)
async def stage2(ctx, item):
    return item + 1


@app.task(source=stage2_out)
async def final(ctx, item):
    print(f"结果:{item}")

优雅关闭

python
app = OneStepApp("demo", shutdown_timeout_s=30.0)


@app.task(source=...)
async def task(ctx, item):
    await process(item)


@app.task(source=...)
async def shutdown_trigger(ctx, item):
    ctx.app.request_shutdown()

配置管理

python
app = OneStepApp(
    "my-app",
    config={
        "region": "cn",
        "debug": True,
        "batch_size": 100,
    },
)


@app.task(source=...)
async def task(ctx, item):
    region = ctx.config["region"]
    debug = ctx.config.get("debug", False)

Control Plane 集成

python
from onestep import ControlPlaneReporter, ControlPlaneReporterConfig

app = OneStepApp("my-app")
reporter = ControlPlaneReporter(
    ControlPlaneReporterConfig.from_env(app_name=app.name)
)
reporter.attach(app)

Reporter 会推送拓扑同步、心跳、指标和事件。

对比 0.5.x

特性0.5.x1.x
装饰器@step@app.task
消息来源from_brokersource
消息输出to_brokeremit
并发控制workersconcurrency
启动方式step.start()app.run() / CLI
重试策略TimesRetryMaxAttempts
中间件BaseMiddleware事件钩子
状态ctx.state
配置ctx.config
生命周期有限@app.on_startup/shutdown

下一步

Released under the MIT License.