YAML Task Definition
onestep treats YAML as a task-definition and wiring layer:
- YAML defines the app, resources, hooks, task runtime policy, and the Python entrypoints to call.
- Python still owns business logic such as transform, validation, enrichment, and custom hooks.
Design Boundary
YAML is responsible for:
app: name, global config, shutdown timeout, state store binding, framework log levelreporter: built-in control-plane telemetry wiringresources: named runtime objects and their dependencieshooks: app-level startup, shutdown, and event observerstasks: source, emit, dead-letter, retry, timeout, concurrency, handler, task config, task hooks
YAML does not define:
- transform DSLs
- conditional branches or workflow graphs
- expression engines
- embedded business logic
Strict Check
Use strict checking when you want YAML to behave like a real contract instead of a permissive loader:
onestep check --strict worker.yamlStrict mode is intended to catch configuration drift early:
- unknown top-level fields
- unknown task, hook, reporter, and resource fields
- invalid
apiVersion/kindvalues when they are present - silent mixing of legacy top-level app fields with the
app:section - invalid
app.logging.levelvalues when YAML opts into framework log control
Framework Logging
Pure YAML workers can set the onestep logger namespace level directly:
app:
name: hello-worker
logging:
level: DEBUGNotes:
- this only sets the
onesteplogger namespace - it does not configure the root logger, handlers, or formatters
DEBUGenables low-level framework logs such as successful sink sends
For long-lived configs, prefer adding:
apiVersion: onestep/v1alpha1
kind: AppReal Project Layout
When a team actually adopts YAML task definitions, the recommended shape is still small:
your-project/
├── pyproject.toml
├── worker.yaml
└── src/
└── your_worker/
├── tasks.py
├── transforms.py
└── hooks.pyThat example now exists in this repo at example/yaml_project/.
The rule stays the same:
worker.yamldefines runtime wiringtasks/defines handlerstransforms/holds business transformshooks.pyis optional and only for lifecycle or side-observer logic
If you want that shape immediately, scaffold it with:
onestep init your-projectinit intentionally generates the smallest runnable project. It does not add reporter config, hook modules, extra hooks, or more YAML structure by default.
From the repo root:
PYTHONPATH=src python -m onestep.cli check example/yaml_project/worker.yaml
PYTHONPATH=src python -m onestep.cli run example/yaml_project/worker.yamlRecommended Progression
Start with the smallest shape that runs. Add fields only when the task actually needs them.
Level 1: Minimal Task
app:
name: hello-worker
logging:
level: DEBUG
resources:
tick:
type: interval
minutes: 5
immediate: true
tasks:
- name: hello
source: tick
handler:
ref: worker.tasks.main:helloThis is the default mental model:
- one app
- one source
- one handler
- no hooks
- no extra config
Level 2: Add Passthrough Sinks
If a task only forwards the incoming payload to one or more sinks, handler can be omitted. The runtime will use a passthrough handler that returns the source payload unchanged.
app:
name: event-forwarder
resources:
incoming:
type: memory
notify:
type: http_sink
url: "https://example.com/hooks/events"
headers:
X-Api-Key: "${NOTIFY_TOKEN}"
tasks:
- name: forward_events
source: incoming
emit: notifyStrict mode still requires each task to define either handler or a non-empty emit. Use a Python handler when the payload needs transform, validation, signing, or enrichment.
Level 3: Add Sinks And Runtime Policy
app:
name: user-sync
resources:
users_source:
type: mysql_incremental
connector: mysql_main
table: users
key: id
cursor: [updated_at, id]
users_sink:
type: mysql_table_sink
connector: mysql_main
table: dw_users
mode: upsert
keys: [id]
mysql_main:
type: mysql
dsn: "${MYSQL_DSN}"
tasks:
- name: sync_users
source: users_source
emit: [users_sink]
handler:
ref: worker.tasks.users:sync_users
concurrency: 4
timeout_s: 120
retry:
type: max_attempts
max_attempts: 5
delay_s: 10Level 4: Add Task Config
Use tasks[].config for task definition data that should be visible at runtime through ctx.task_config.
tasks:
- name: sync_users
source: users_source
emit: [users_sink]
config:
dry_run: false
target_table: dw_users
handler:
ref: worker.tasks.users:sync_users
params:
mode: upsertRule of thumb:
handler.params: call-time parameters for the Python functiontask.config: task definition data the runtime and handler may inspect
Level 5: Add Hooks
Only add hooks when task wiring or lifecycle behavior cannot live inside the main handler.
hooks:
startup:
- ref: worker.lifecycle:on_startup
shutdown:
- ref: worker.lifecycle:on_shutdown
tasks:
- name: sync_users
source: users_source
emit: [users_sink]
handler:
ref: worker.tasks.users:sync_users
hooks:
before:
- ref: worker.task_hooks:before_sync_users
on_failure:
- ref: worker.task_hooks:on_sync_users_failedLevel 6: Add Built-In Reporter
Use the built-in reporter only when you need control-plane telemetry. Start with the smallest shape:
pip install 'onestep[control-plane]'reporter: trueThat means:
- enable
ControlPlaneReporter - resolve
base_urlandtokenfrom env - default
service_nametoapp.name
If you need explicit overrides, keep them minimal and use the same field names as ControlPlaneReporterConfig:
reporter:
base_url: https://control-plane.example.com
token: ${ONESTEP_CONTROL_PLANE_TOKEN}
service_name: billing-sync-workerLevel 7: Full Wiring Example
apiVersion: onestep/v1alpha1
kind: App
app:
name: user-sync
shutdown_timeout_s: 30
state: app_state
config:
region: cn
reporter: true
resources:
mysql_main:
type: mysql
dsn: "${MYSQL_DSN}"
app_state:
type: mysql_state_store
connector: mysql_main
table: onestep_state
cursor_users:
type: mysql_cursor_store
connector: mysql_main
table: onestep_cursor
users_source:
type: mysql_incremental
connector: mysql_main
table: users
key: id
cursor: [updated_at, id]
state: cursor_users
state_key: users-sync
users_sink:
type: mysql_table_sink
connector: mysql_main
table: dw_users
mode: upsert
keys: [id]
notify_api:
type: http_sink
url: "${NOTIFY_URL}"
headers:
Authorization: "Bearer ${NOTIFY_TOKEN}"
success_statuses: [200, 202]
audit_stream:
type: redis_stream
connector: redis_main
stream: audit:user_sync
group: onestep
redis_main:
type: redis
url: "${REDIS_URL:redis://localhost:6379}"
users_dead:
type: redis_stream
connector: redis_main
stream: dead_letter:user_sync
group: onestep
hooks:
startup:
- ref: worker.lifecycle:on_startup
params:
preload_cache: true
shutdown:
- ref: worker.lifecycle:on_shutdown
events:
- ref: worker.observers:metrics_handler
- ref: worker.observers:structured_logger
tasks:
- name: sync_users
description: Sync incremental users into DW
source: users_source
emit: [users_sink, audit_stream, notify_api]
dead_letter: [users_dead]
config:
target_table: dw_users
dry_run: false
metadata:
owner: data-platform
tags: [users, mysql]
handler:
ref: worker.tasks.users:sync_users
params:
mode: upsert
hooks:
before:
- ref: worker.task_hooks:before_sync_users
after_success:
- ref: worker.task_hooks:after_sync_users
on_failure:
- ref: worker.task_hooks:on_sync_users_failed
concurrency: 4
timeout_s: 120
retry:
type: max_attempts
max_attempts: 5
delay_s: 10Python Side
The business project mainly writes handlers, transforms, and optional hooks.
# worker/transforms/users.py
def normalize_user(payload: dict, *, region: str) -> dict:
return {
"id": payload["id"],
"name": payload["name"].strip(),
"region": region,
}# worker/tasks/users.py
from worker.transforms.users import normalize_user
async def sync_users(ctx, payload, *, mode: str):
row = normalize_user(payload, region=ctx.config["region"])
if ctx.task_config.get("dry_run"):
ctx.logger.info("dry run", extra={"payload": row})
return None
row["mode"] = mode
return rowRuntime Access
Handlers and task hooks can use:
ctx.config: app-level config fromapp.configctx.task_config: task-level config fromtasks[].configctx.task.config: the same task config on the task specctx.resources: named runtime objects fromresourcesctx.state: per-task namespaced state
App hooks can use:
app.resources: named runtime objects fromresourcesapp.tasks: loaded task specsapp.config: app-level config
Hook Signatures
onestep truncates positional arguments based on the callable signature, so hooks can choose the amount of context they need.
Supported app-level hooks:
startup:func(app)orfunc()shutdown:func(app)orfunc()events:func(event)orfunc()
Supported task-level hooks:
before:func(ctx, payload),func(ctx), orfunc()after_success:func(ctx, payload, result),func(ctx, payload),func(ctx), orfunc()on_failure:func(ctx, payload, failure),func(ctx, payload),func(ctx), orfunc()
Hook params are passed as keyword arguments after the runtime arguments.
Hook Semantics
beforeruns after the delivery starts processing and after thestartedevent is emitted.after_successruns after the handler returns successfully, before emitting to sinks and beforeack().on_failureruns for task failures before retry or dead-letter decisions are applied.- failures inside
on_failurehooks are logged and do not replace the original task failure. timeout_scurrently applies to the async handler body itself; task hooks remain outside that timeout.
Resource Notes
resourcesis the preferred top-level section for named runtime objects.- legacy
connectors,sources, andsinkssections are still accepted and merged into the same resource registry. - resources are available at runtime through
app.resourcesandctx.resources.
Built-in resource types:
memoryintervalcronwebhookhttp_sink
In strict mode, memory resources must set a positive maxsize; this keeps long-lived YAML workers from creating unbounded in-process queues by accident. Scheduled interval and cron resources accept max_queued_runs for overlap: queue, defaulting to 1000.
Plugin resource types:
onestep-mysql:mysql,mysql_state_store,mysql_cursor_store,mysql_table_queue,mysql_incremental,mysql_table_sinkonestep-mq:rabbitmq,rabbitmq_queueonestep-redis:redis,redis_streamonestep-sqs:sqs,sqs_queueonestep-feishu-bitable:feishu_bitable,feishu_bitable_incremental,feishu_bitable_table_sink
feishu_bitable_incremental accepts fallback_scan_page_limit to bound the fallback scan used when Feishu rejects cursor sorting. The default is 100 pages.
Install the corresponding plugin package in the worker environment before using plugin resource types in YAML.
Additional resource types can be provided by installed packages. A package can register YAML resources through the onestep.resources entry point group:
[project.entry-points."onestep.resources"]
feishu_bitable = "onestep_feishu_bitable:register"The entry point receives the resource registry and registers one or more resource handlers. Once the package is installed in the worker environment, YAML files can use the provided type values without changing onestep core.
The repository includes plugin packages under plugins/, each with its own entry point and release workflow.