Skip to content

YAML Task Definition

onestep treats YAML as a task-definition and wiring layer:

  • YAML defines the app, resources, hooks, task runtime policy, and the Python entrypoints to call.
  • Python still owns business logic such as transform, validation, enrichment, and custom hooks.

Design Boundary

YAML is responsible for:

  • app: name, global config, shutdown timeout, state store binding, framework log level
  • reporter: built-in control-plane telemetry wiring
  • resources: named runtime objects and their dependencies
  • hooks: app-level startup, shutdown, and event observers
  • tasks: source, emit, dead-letter, retry, timeout, concurrency, handler, task config, task hooks

YAML does not define:

  • transform DSLs
  • conditional branches or workflow graphs
  • expression engines
  • embedded business logic

Strict Check

Use strict checking when you want YAML to behave like a real contract instead of a permissive loader:

bash
onestep check --strict worker.yaml

Strict mode is intended to catch configuration drift early:

  • unknown top-level fields
  • unknown task, hook, reporter, and resource fields
  • invalid apiVersion / kind values when they are present
  • silent mixing of legacy top-level app fields with the app: section
  • invalid app.logging.level values when YAML opts into framework log control

Framework Logging

Pure YAML workers can set the onestep logger namespace level directly:

yaml
app:
  name: hello-worker
  logging:
    level: DEBUG

Notes:

  • this only sets the onestep logger namespace
  • it does not configure the root logger, handlers, or formatters
  • DEBUG enables low-level framework logs such as successful sink sends

For long-lived configs, prefer adding:

yaml
apiVersion: onestep/v1alpha1
kind: App

Real Project Layout

When a team actually adopts YAML task definitions, the recommended shape is still small:

text
your-project/
├── pyproject.toml
├── worker.yaml
└── src/
    └── your_worker/
        ├── tasks.py
        ├── transforms.py
        └── hooks.py

That example now exists in this repo at example/yaml_project/.

The rule stays the same:

  • worker.yaml defines runtime wiring
  • tasks/ defines handlers
  • transforms/ holds business transforms
  • hooks.py is optional and only for lifecycle or side-observer logic

If you want that shape immediately, scaffold it with:

bash
onestep init your-project

init intentionally generates the smallest runnable project. It does not add reporter config, hook modules, extra hooks, or more YAML structure by default.

From the repo root:

bash
PYTHONPATH=src python -m onestep.cli check example/yaml_project/worker.yaml
PYTHONPATH=src python -m onestep.cli run example/yaml_project/worker.yaml

Start with the smallest shape that runs. Add fields only when the task actually needs them.

Level 1: Minimal Task

yaml
app:
  name: hello-worker
  logging:
    level: DEBUG

resources:
  tick:
    type: interval
    minutes: 5
    immediate: true

tasks:
  - name: hello
    source: tick
    handler:
      ref: worker.tasks.main:hello

This is the default mental model:

  • one app
  • one source
  • one handler
  • no hooks
  • no extra config

Level 2: Add Passthrough Sinks

If a task only forwards the incoming payload to one or more sinks, handler can be omitted. The runtime will use a passthrough handler that returns the source payload unchanged.

yaml
app:
  name: event-forwarder

resources:
  incoming:
    type: memory

  notify:
    type: http_sink
    url: "https://example.com/hooks/events"
    headers:
      X-Api-Key: "${NOTIFY_TOKEN}"

tasks:
  - name: forward_events
    source: incoming
    emit: notify

Strict mode still requires each task to define either handler or a non-empty emit. Use a Python handler when the payload needs transform, validation, signing, or enrichment.

Level 3: Add Sinks And Runtime Policy

yaml
app:
  name: user-sync

resources:
  users_source:
    type: mysql_incremental
    connector: mysql_main
    table: users
    key: id
    cursor: [updated_at, id]

  users_sink:
    type: mysql_table_sink
    connector: mysql_main
    table: dw_users
    mode: upsert
    keys: [id]

  mysql_main:
    type: mysql
    dsn: "${MYSQL_DSN}"

tasks:
  - name: sync_users
    source: users_source
    emit: [users_sink]
    handler:
      ref: worker.tasks.users:sync_users
    concurrency: 4
    timeout_s: 120
    retry:
      type: max_attempts
      max_attempts: 5
      delay_s: 10

Level 4: Add Task Config

Use tasks[].config for task definition data that should be visible at runtime through ctx.task_config.

yaml
tasks:
  - name: sync_users
    source: users_source
    emit: [users_sink]
    config:
      dry_run: false
      target_table: dw_users
    handler:
      ref: worker.tasks.users:sync_users
      params:
        mode: upsert

Rule of thumb:

  • handler.params: call-time parameters for the Python function
  • task.config: task definition data the runtime and handler may inspect

Level 5: Add Hooks

Only add hooks when task wiring or lifecycle behavior cannot live inside the main handler.

yaml
hooks:
  startup:
    - ref: worker.lifecycle:on_startup
  shutdown:
    - ref: worker.lifecycle:on_shutdown

tasks:
  - name: sync_users
    source: users_source
    emit: [users_sink]
    handler:
      ref: worker.tasks.users:sync_users
    hooks:
      before:
        - ref: worker.task_hooks:before_sync_users
      on_failure:
        - ref: worker.task_hooks:on_sync_users_failed

Level 6: Add Built-In Reporter

Use the built-in reporter only when you need control-plane telemetry. Start with the smallest shape:

bash
pip install 'onestep[control-plane]'
yaml
reporter: true

That means:

  • enable ControlPlaneReporter
  • resolve base_url and token from env
  • default service_name to app.name

If you need explicit overrides, keep them minimal and use the same field names as ControlPlaneReporterConfig:

yaml
reporter:
  base_url: https://control-plane.example.com
  token: ${ONESTEP_CONTROL_PLANE_TOKEN}
  service_name: billing-sync-worker

Level 7: Full Wiring Example

yaml
apiVersion: onestep/v1alpha1
kind: App

app:
  name: user-sync
  shutdown_timeout_s: 30
  state: app_state
  config:
    region: cn

reporter: true

resources:
  mysql_main:
    type: mysql
    dsn: "${MYSQL_DSN}"

  app_state:
    type: mysql_state_store
    connector: mysql_main
    table: onestep_state

  cursor_users:
    type: mysql_cursor_store
    connector: mysql_main
    table: onestep_cursor

  users_source:
    type: mysql_incremental
    connector: mysql_main
    table: users
    key: id
    cursor: [updated_at, id]
    state: cursor_users
    state_key: users-sync

  users_sink:
    type: mysql_table_sink
    connector: mysql_main
    table: dw_users
    mode: upsert
    keys: [id]

  notify_api:
    type: http_sink
    url: "${NOTIFY_URL}"
    headers:
      Authorization: "Bearer ${NOTIFY_TOKEN}"
    success_statuses: [200, 202]

  audit_stream:
    type: redis_stream
    connector: redis_main
    stream: audit:user_sync
    group: onestep

  redis_main:
    type: redis
    url: "${REDIS_URL:redis://localhost:6379}"

  users_dead:
    type: redis_stream
    connector: redis_main
    stream: dead_letter:user_sync
    group: onestep

hooks:
  startup:
    - ref: worker.lifecycle:on_startup
      params:
        preload_cache: true
  shutdown:
    - ref: worker.lifecycle:on_shutdown
  events:
    - ref: worker.observers:metrics_handler
    - ref: worker.observers:structured_logger

tasks:
  - name: sync_users
    description: Sync incremental users into DW
    source: users_source
    emit: [users_sink, audit_stream, notify_api]
    dead_letter: [users_dead]
    config:
      target_table: dw_users
      dry_run: false
    metadata:
      owner: data-platform
      tags: [users, mysql]
    handler:
      ref: worker.tasks.users:sync_users
      params:
        mode: upsert
    hooks:
      before:
        - ref: worker.task_hooks:before_sync_users
      after_success:
        - ref: worker.task_hooks:after_sync_users
      on_failure:
        - ref: worker.task_hooks:on_sync_users_failed
    concurrency: 4
    timeout_s: 120
    retry:
      type: max_attempts
      max_attempts: 5
      delay_s: 10

Python Side

The business project mainly writes handlers, transforms, and optional hooks.

python
# worker/transforms/users.py
def normalize_user(payload: dict, *, region: str) -> dict:
    return {
        "id": payload["id"],
        "name": payload["name"].strip(),
        "region": region,
    }
python
# worker/tasks/users.py
from worker.transforms.users import normalize_user


async def sync_users(ctx, payload, *, mode: str):
    row = normalize_user(payload, region=ctx.config["region"])

    if ctx.task_config.get("dry_run"):
        ctx.logger.info("dry run", extra={"payload": row})
        return None

    row["mode"] = mode
    return row

Runtime Access

Handlers and task hooks can use:

  • ctx.config: app-level config from app.config
  • ctx.task_config: task-level config from tasks[].config
  • ctx.task.config: the same task config on the task spec
  • ctx.resources: named runtime objects from resources
  • ctx.state: per-task namespaced state

App hooks can use:

  • app.resources: named runtime objects from resources
  • app.tasks: loaded task specs
  • app.config: app-level config

Hook Signatures

onestep truncates positional arguments based on the callable signature, so hooks can choose the amount of context they need.

Supported app-level hooks:

  • startup: func(app) or func()
  • shutdown: func(app) or func()
  • events: func(event) or func()

Supported task-level hooks:

  • before: func(ctx, payload), func(ctx), or func()
  • after_success: func(ctx, payload, result), func(ctx, payload), func(ctx), or func()
  • on_failure: func(ctx, payload, failure), func(ctx, payload), func(ctx), or func()

Hook params are passed as keyword arguments after the runtime arguments.

Hook Semantics

  • before runs after the delivery starts processing and after the started event is emitted.
  • after_success runs after the handler returns successfully, before emitting to sinks and before ack().
  • on_failure runs for task failures before retry or dead-letter decisions are applied.
  • failures inside on_failure hooks are logged and do not replace the original task failure.
  • timeout_s currently applies to the async handler body itself; task hooks remain outside that timeout.

Resource Notes

  • resources is the preferred top-level section for named runtime objects.
  • legacy connectors, sources, and sinks sections are still accepted and merged into the same resource registry.
  • resources are available at runtime through app.resources and ctx.resources.

Built-in resource types:

  • memory
  • interval
  • cron
  • webhook
  • http_sink

In strict mode, memory resources must set a positive maxsize; this keeps long-lived YAML workers from creating unbounded in-process queues by accident. Scheduled interval and cron resources accept max_queued_runs for overlap: queue, defaulting to 1000.

Plugin resource types:

  • onestep-mysql: mysql, mysql_state_store, mysql_cursor_store, mysql_table_queue, mysql_incremental, mysql_table_sink
  • onestep-mq: rabbitmq, rabbitmq_queue
  • onestep-redis: redis, redis_stream
  • onestep-sqs: sqs, sqs_queue
  • onestep-feishu-bitable: feishu_bitable, feishu_bitable_incremental, feishu_bitable_table_sink

feishu_bitable_incremental accepts fallback_scan_page_limit to bound the fallback scan used when Feishu rejects cursor sorting. The default is 100 pages.

Install the corresponding plugin package in the worker environment before using plugin resource types in YAML.

Additional resource types can be provided by installed packages. A package can register YAML resources through the onestep.resources entry point group:

toml
[project.entry-points."onestep.resources"]
feishu_bitable = "onestep_feishu_bitable:register"

The entry point receives the resource registry and registers one or more resource handlers. Once the package is installed in the worker environment, YAML files can use the provided type values without changing onestep core.

The repository includes plugin packages under plugins/, each with its own entry point and release workflow.

Released under the MIT License.