Skip to content

连接器

onestep 1.x 使用 Source 表示输入,使用 Sink 表示输出。很多连接器同时实现两者,因此既能被任务消费,也能接收上游任务返回值。

内置连接器

内存

连接器SourceSink描述
Memory支持支持内存队列,适合开发测试

定时器

连接器SourceSink描述
Interval支持不支持固定间隔触发
Cron支持不支持Cron 表达式触发

消息队列

连接器SourceSink描述
Redis Streams支持支持Redis Streams 消息队列
RabbitMQ支持支持RabbitMQ 队列
AWS SQS支持支持AWS SQS 托管队列

数据库

连接器SourceSink描述
MySQL支持支持表队列/增量同步/表输出

Web

连接器SourceSink描述
Webhook支持不支持HTTP 请求接收

自定义

连接器SourceSink描述
Custom支持支持实现任意数据源

选择指南

开发测试

python
from onestep import MemoryQueue

source = MemoryQueue("test")

生产环境 - 分布式任务

python
from onestep import RabbitMQConnector

rmq = RabbitMQConnector("amqp://...")
source = rmq.queue("jobs")

生产环境 - 云原生

python
from onestep import SQSConnector

sqs = SQSConnector(region_name="us-east-1")
source = sqs.queue("https://sqs...")

数据库驱动

python
from onestep import MySQLConnector

db = MySQLConnector("mysql+pymysql://...")
source = db.table_queue(
    table="tasks",
    key="id",
    where="status = 0",
    claim={"status": 1},
    ack={"status": 2},
    nack={"status": 0},
)

定时任务

python
from onestep import CronSource, IntervalSource

# 固定间隔
source = IntervalSource.every(minutes=5)

# 特定时间点
source = CronSource("0 9 * * *")

外部集成

python
from onestep import WebhookSource

# 接收外部系统推送
source = WebhookSource(path="/webhooks/github")

YAML 配置

yaml
resources:
  memory:
    type: memory
  
  timer:
    type: interval
    minutes: 5
  
  cron:
    type: cron
    expression: "0 9 * * *"
  
  rmq:
    type: rabbitmq
    url: "amqp://..."
  
  jobs:
    type: rabbitmq_queue
    connector: rmq
    queue: "jobs"
  
  db:
    type: mysql
    dsn: "mysql+pymysql://..."
  
  tasks:
    type: mysql_table_queue
    connector: db
    table: "tasks"
  
  webhook:
    type: webhook
    path: "/webhook"
    port: 8080

tasks:
  - name: process_jobs
    source: jobs
    handler:
      ref: myapp:process_jobs

自定义 Source/Sink

参考 Custom Broker 实现自定义数据源。

下一步

Released under the MIT License.