rahcp-etl¶
Stateful ETL orchestration with NATS JetStream for event-driven pipelines.
Pipeline DAG¶
Define multi-stage pipelines with per-stage retry policies and checkpoint-based resumption:
flowchart LR
subgraph Pipeline
E["extract<br/><small>retries: 3</small>"] --> T["transform<br/><small>retries: 3</small>"]
T --> L["load<br/><small>retries: 3</small>"]
end
E -.->|checkpoint| KV[("NATS KV<br/><small>etl-checkpoints</small>")]
T -.->|checkpoint| KV
L -.->|clear| KV
L --> OK["Result"]
style OK fill:#d4edda,stroke:#28a745
Each stage saves a checkpoint after success. If the pipeline fails and is re-run with the same pipeline_id, it resumes from the last checkpoint.
import asyncio
from rahcp_etl.pipeline import Pipeline
from rahcp_etl.checkpointing import CheckpointStore
import nats
async def main():
nc = await nats.connect("nats://localhost:4222")
store = await CheckpointStore.create(nc)
pipeline = Pipeline(checkpoint_store=store)
@pipeline.stage("extract", retries=3, backoff=2.0)
async def extract(payload):
# Download source data
return {"records": ["a", "b", "c"]}
@pipeline.stage("transform")
async def transform(payload):
# Process records
return {"transformed": [r.upper() for r in payload["records"]]}
@pipeline.stage("load")
async def load(payload):
# Write results
return {"loaded": len(payload["transformed"])}
result = await pipeline.run(
{"source": "s3://bucket/input"},
pipeline_id="batch-2025-03-18", # enables checkpoint resume
)
print(result)
asyncio.run(main())
Stage dataclass (name, handler, retries=3, backoff=2.0):
- Stages execute sequentially; each receives the output of the previous stage
- Failed stages retry with exponential backoff (
delay * backoff^attempt) - After each successful stage, a checkpoint is saved to NATS KV
- If a pipeline fails and is re-run with the same
pipeline_id, it resumes from the last checkpoint - Checkpoints are cleared on successful completion
JetStream consumer¶
Durable message consumer for event-driven processing:
from rahcp_etl.consumer import ETLConsumer
consumer = ETLConsumer(
nats_url="nats://localhost:4222",
stream="INGEST",
subject="ingest.images.>",
durable="image-processor",
max_deliver=5,
ack_wait=30.0,
)
async def handle(payload: bytes):
data = json.loads(payload)
# Process message...
return {"status": "ok"}
await consumer.start(handle)
Dead letter queue¶
Route permanently-failed messages for inspection and replay:
from rahcp_etl.dlq import DeadLetterHandler
dlq = await DeadLetterHandler.create(nc)
# Send failed message to DLQ
await dlq.send("ingest.images.batch-1", payload, error="corrupt TIFF")
# Replay all DLQ messages back to original subjects
count = await dlq.replay()
# Purge old messages
count = await dlq.purge(older_than=timedelta(days=7))