YAML Task Definition
onestep treats YAML as a task-definition and wiring layer:
- YAML defines the app, resources, hooks, task runtime policy, and the Python entrypoints to call.
- Python still owns business logic such as transform, validation, enrichment, and custom hooks.
Design Boundary
YAML is responsible for:
app: name, global config, shutdown timeout, state store bindingreporter: built-in control-plane telemetry wiringresources: named runtime objects and their dependencieshooks: app-level startup, shutdown, and event observerstasks: source, emit, dead-letter, retry, timeout, concurrency, handler, task config, task hooks
YAML does not define:
- transform DSLs
- conditional branches or workflow graphs
- expression engines
- embedded business logic
Strict Check
Use strict checking when you want YAML to behave like a real contract instead of a permissive loader:
onestep check --strict worker.yamlStrict mode is intended to catch configuration drift early:
- unknown top-level fields
- unknown task, hook, reporter, and resource fields
- invalid
apiVersion/kindvalues when they are present - silent mixing of legacy top-level app fields with the
app:section
For long-lived configs, prefer adding:
apiVersion: onestep/v1alpha1
kind: AppReal Project Layout
When a team actually adopts YAML task definitions, the recommended shape is still small:
your-project/
├── pyproject.toml
├── worker.yaml
└── src/
└── your_worker/
├── tasks.py
├── transforms.py
└── hooks.pyThat example now exists in this repo at example/yaml_project/.
The rule stays the same:
worker.yamldefines runtime wiringtasks/defines handlerstransforms/holds business transformshooks.pyis optional and only for lifecycle or side-observer logic
If you want that shape immediately, scaffold it with:
onestep init your-projectinit intentionally generates the smallest runnable project. It does not add reporter config, hook modules, extra hooks, or more YAML structure by default.
From the repo root:
PYTHONPATH=src python -m onestep.cli check example/yaml_project/worker.yaml
PYTHONPATH=src python -m onestep.cli run example/yaml_project/worker.yamlRecommended Progression
Start with the smallest shape that runs. Add fields only when the task actually needs them.
Level 1: Minimal Task
app:
name: hello-worker
resources:
tick:
type: interval
minutes: 5
immediate: true
tasks:
- name: hello
source: tick
handler:
ref: worker.tasks.main:helloThis is the default mental model:
- one app
- one source
- one handler
- no hooks
- no extra config
Level 2: Add Sinks And Runtime Policy
app:
name: user-sync
resources:
users_source:
type: mysql_incremental
connector: mysql_main
table: users
key: id
cursor: [updated_at, id]
users_sink:
type: mysql_table_sink
connector: mysql_main
table: dw_users
mode: upsert
keys: [id]
mysql_main:
type: mysql
dsn: "${MYSQL_DSN}"
tasks:
- name: sync_users
source: users_source
emit: [users_sink]
handler:
ref: worker.tasks.users:sync_users
concurrency: 4
timeout_s: 120
retry:
type: max_attempts
max_attempts: 5
delay_s: 10Level 3: Add Task Config
Use tasks[].config for task definition data that should be visible at runtime through ctx.task_config.
tasks:
- name: sync_users
source: users_source
emit: [users_sink]
config:
dry_run: false
target_table: dw_users
handler:
ref: worker.tasks.users:sync_users
params:
mode: upsertRule of thumb:
handler.params: call-time parameters for the Python functiontask.config: task definition data the runtime and handler may inspect
Level 4: Add Hooks
Only add hooks when task wiring or lifecycle behavior cannot live inside the main handler.
hooks:
startup:
- ref: worker.lifecycle:on_startup
shutdown:
- ref: worker.lifecycle:on_shutdown
tasks:
- name: sync_users
source: users_source
emit: [users_sink]
handler:
ref: worker.tasks.users:sync_users
hooks:
before:
- ref: worker.task_hooks:before_sync_users
on_failure:
- ref: worker.task_hooks:on_sync_users_failedLevel 5: Add Built-In Reporter
Use the built-in reporter only when you need control-plane telemetry. Start with the smallest shape:
pip install 'onestep[control-plane]'reporter: trueThat means:
- enable
ControlPlaneReporter - resolve
base_urlandtokenfrom env - default
service_nametoapp.name
If you need explicit overrides, keep them minimal and use the same field names as ControlPlaneReporterConfig:
reporter:
base_url: https://control-plane.example.com
token: ${ONESTEP_CONTROL_PLANE_TOKEN}
service_name: billing-sync-workerLevel 6: Full Wiring Example
apiVersion: onestep/v1alpha1
kind: App
app:
name: user-sync
shutdown_timeout_s: 30
state: app_state
config:
region: cn
reporter: true
resources:
mysql_main:
type: mysql
dsn: "${MYSQL_DSN}"
app_state:
type: mysql_state_store
connector: mysql_main
table: onestep_state
cursor_users:
type: mysql_cursor_store
connector: mysql_main
table: onestep_cursor
users_source:
type: mysql_incremental
connector: mysql_main
table: users
key: id
cursor: [updated_at, id]
state: cursor_users
state_key: users-sync
users_sink:
type: mysql_table_sink
connector: mysql_main
table: dw_users
mode: upsert
keys: [id]
audit_stream:
type: redis_stream
connector: redis_main
stream: audit:user_sync
group: onestep
redis_main:
type: redis
url: "${REDIS_URL:redis://localhost:6379}"
users_dead:
type: redis_stream
connector: redis_main
stream: dead_letter:user_sync
group: onestep
hooks:
startup:
- ref: worker.lifecycle:on_startup
params:
preload_cache: true
shutdown:
- ref: worker.lifecycle:on_shutdown
events:
- ref: worker.observers:metrics_handler
- ref: worker.observers:structured_logger
tasks:
- name: sync_users
description: Sync incremental users into DW
source: users_source
emit: [users_sink, audit_stream]
dead_letter: [users_dead]
config:
target_table: dw_users
dry_run: false
metadata:
owner: data-platform
tags: [users, mysql]
handler:
ref: worker.tasks.users:sync_users
params:
mode: upsert
hooks:
before:
- ref: worker.task_hooks:before_sync_users
after_success:
- ref: worker.task_hooks:after_sync_users
on_failure:
- ref: worker.task_hooks:on_sync_users_failed
concurrency: 4
timeout_s: 120
retry:
type: max_attempts
max_attempts: 5
delay_s: 10Python Side
The business project mainly writes handlers, transforms, and optional hooks.
# worker/transforms/users.py
def normalize_user(payload: dict, *, region: str) -> dict:
return {
"id": payload["id"],
"name": payload["name"].strip(),
"region": region,
}# worker/tasks/users.py
from worker.transforms.users import normalize_user
async def sync_users(ctx, payload, *, mode: str):
row = normalize_user(payload, region=ctx.config["region"])
if ctx.task_config.get("dry_run"):
ctx.logger.info("dry run", extra={"payload": row})
return None
row["mode"] = mode
return rowRuntime Access
Handlers and task hooks can use:
ctx.config: app-level config fromapp.configctx.task_config: task-level config fromtasks[].configctx.task.config: the same task config on the task specctx.resources: named runtime objects fromresourcesctx.state: per-task namespaced state
App hooks can use:
app.resources: named runtime objects fromresourcesapp.tasks: loaded task specsapp.config: app-level config
Hook Signatures
onestep truncates positional arguments based on the callable signature, so hooks can choose the amount of context they need.
Supported app-level hooks:
startup:func(app)orfunc()shutdown:func(app)orfunc()events:func(event)orfunc()
Supported task-level hooks:
before:func(ctx, payload),func(ctx), orfunc()after_success:func(ctx, payload, result),func(ctx, payload),func(ctx), orfunc()on_failure:func(ctx, payload, failure),func(ctx, payload),func(ctx), orfunc()
Hook params are passed as keyword arguments after the runtime arguments.
Hook Semantics
beforeruns after the delivery starts processing and after thestartedevent is emitted.after_successruns after the handler returns successfully, before emitting to sinks and beforeack().on_failureruns for task failures before retry or dead-letter decisions are applied.- failures inside
on_failurehooks are logged and do not replace the original task failure. timeout_scurrently applies to the async handler body itself; task hooks remain outside that timeout.
Resource Notes
resourcesis the preferred top-level section for named runtime objects.- legacy
connectors,sources, andsinkssections are still accepted and merged into the same resource registry. - resources are available at runtime through
app.resourcesandctx.resources.
Supported resource types today:
memoryintervalcronwebhookrabbitmqrabbitmq_queueredisredis_streamsqssqs_queuemysqlmysql_state_storemysql_cursor_storemysql_table_queuemysql_incrementalmysql_table_sink