Skip to content

HTTP Sink

HttpSink 用于把任务返回值以 JSON 请求发送到外部 HTTP 端点。它只实现 Sink,适合通知 Webhook、调用内部服务或把处理结果转发给只提供 HTTP 接口的系统。

基本用法

python
import os

from onestep import HttpSink, MemoryQueue, OneStepApp

app = OneStepApp("notify-demo")
source = MemoryQueue("events")
notify = HttpSink(
    "notify",
    url="https://example.com/hooks/events",
    headers={"Authorization": f"Bearer {os.environ['NOTIFY_TOKEN']}"},
    timeout_s=2.5,
)


@app.task(source=source, emit=notify)
async def forward_event(ctx, event):
    return {
        "id": event["id"],
        "kind": event["kind"],
    }

任务返回值会作为 JSON body 发送。HttpSink 默认使用 POST,并在没有显式设置时自动添加 Content-Type: application/json

配置选项

python
sink = HttpSink(
    "notify",
    url="https://example.com/hooks/events",
    method="POST",
    headers={"X-Api-Key": "secret-token"},
    timeout_s=5.0,
    success_statuses=[200, 201, 202, 204],
)
参数说明默认值
urlHTTP 或 HTTPS 目标地址必填
method请求方法,会转成大写POST
headers请求头映射,值会转成字符串{}
timeout_s单次请求超时时间,必须大于 05.0
success_statuses视为成功的 HTTP 状态码列表[200, 201, 202, 204]

如果响应状态码不在 success_statuses 中,发送会失败并抛出连接器错误。429 会被归类为限流,4084255xx 会被归类为临时错误,其余非成功状态会被归类为永久错误。

YAML 配置

yaml
resources:
  incoming:
    type: memory

  notify:
    type: http_sink
    url: "https://example.com/hooks/events"
    method: POST
    headers:
      Authorization: "Bearer ${NOTIFY_TOKEN}"
    timeout_s: 5
    success_statuses: [200, 202]

tasks:
  - name: forward_event
    source: incoming
    emit: notify
    handler:
      ref: myapp.handlers:normalize_event

http_sink 支持的字段是 urlmethodheaderstimeout_ssuccess_statuses。严格校验会拒绝未知字段。

YAML 直接转发

从 1.2.6 开始,YAML 任务可以省略 handler,只配置 emit。运行时会把 source 收到的 payload 原样转发给 sink。

yaml
resources:
  incoming:
    type: memory

  notify:
    type: http_sink
    url: "https://example.com/hooks/events"
    headers:
      X-Api-Key: "${NOTIFY_TOKEN}"

tasks:
  - name: forward_raw_event
    source: incoming
    emit: notify

直接转发只适合 payload 已经是目标接口需要的结构时使用。需要字段转换、签名、幂等校验或错误归一化时,仍然在 Python handler 中处理后再返回。

Control Plane

启用 Control Plane reporter 后,HttpSink 会作为 http_sink 出现在任务拓扑中。上报时会隐藏 URL 中的账号信息,移除查询参数和 fragment,并把所有 header 值标记为 <redacted>,避免 token 出现在拓扑 payload 里。

下一步

Released under the MIT License.