Retry
重试组件可以在任务执行失败时自动重试。
内置重试策略
NoRetry (默认)
不重试,任务失败后直接终止。
MaxAttempts
最多重试指定次数:
python
from onestep import MaxAttempts, IntervalSource, OneStepApp
app = OneStepApp("retry-demo")
@app.task(
source=IntervalSource.every(seconds=10),
retry=MaxAttempts(max_attempts=3, delay_s=1.0)
)
async def might_fail(ctx, _):
import random
if random.random() < 0.7:
raise Exception("随机失败")
print("成功!")参数:
max_attempts: 最大重试次数(包含首次执行)delay_s: 重试间隔秒数exponential_backoff: 是否使用指数退避max_delay_s: 最大延迟秒数(配合指数退避)
自定义重试策略
实现 RetryPolicy 接口:
python
from onestep import RetryPolicy, FailureInfo
class MyRetryPolicy(RetryPolicy):
def should_retry(self, failure: FailureInfo, attempts: int) -> bool:
# 根据失败类型决定是否重试
if failure.kind == "timeout":
return attempts < 5
if failure.kind == "error":
# 业务异常不重试
return False
return False
def retry_delay(self, failure: FailureInfo, attempts: int) -> float:
# 返回重试延迟秒数
return min(2 ** attempts, 60)
@app.task(source=..., retry=MyRetryPolicy())
async def my_task(ctx, item):
...失败类型
FailureInfo 包含以下信息:
kind: 失败类型error: 业务异常timeout: 执行超时cancelled: 任务取消
exception_type: 异常类型message: 异常消息
死信队列
配置 dead_letter 将终端失败的消息发送到死信队列:
python
from onestep import MemoryQueue, OneStepApp
app = OneStepApp("demo")
source = MemoryQueue("incoming")
dead_letter = MemoryQueue("dead-letter")
@app.task(
source=source,
dead_letter=dead_letter,
retry=MaxAttempts(max_attempts=3)
)
async def process(ctx, item):
if item.get("should_fail"):
raise Exception("处理失败")
return item
# 处理死信
@app.task(source=dead_letter)
async def handle_dead_letter(ctx, item):
print(f"死信消息: {item}")
# payload 在 item["payload"]
# 失败信息在 item["failure"]死信消息结构:
python
{
"payload": {...}, # 原始消息
"failure": {
"kind": "error",
"exception_type": "Exception",
"message": "处理失败"
}
}执行超时
使用 timeout_s 参数限制任务执行时间:
python
@app.task(source=..., timeout_s=30.0)
async def long_running(ctx, item):
# 超过 30 秒会被取消,触发 timeout 类型失败
await asyncio.sleep(60)超时后会触发 timeout 类型的失败,可根据 FailureInfo.kind 决定重试策略。
