Error Handling, Retries, and Idempotency¶
Production scripts need to handle transient failures, token expiry, and partial-success scenarios. HCP's S3 layer is eventually consistent for certain operations -- the patterns below help build resilient automation.
The rahcp SDK handles most of this automatically
The Python SDK includes built-in retries with exponential backoff, automatic token refresh on 401, presigned URL error safety, and the staged-commit pattern as commit_staging() / cleanup_staging(). The patterns below are for raw httpx/curl usage or when you need custom error handling beyond what the SDK provides.
graph LR
A["Client Request"] --> B{"Success?"}
B -->|Yes| C["Return Response"]
B -->|Retryable<br/>408 429 5xx| D["Wait<br/><small>exponential backoff</small>"]
D --> E{"Attempts<br/>left?"}
E -->|Yes| A
E -->|No| F["Raise Error"]
B -->|Non-retryable<br/>400 401 403 404| F
style C fill:#d4edda,stroke:#28a745
style F fill:#f8d7da,stroke:#dc3545
Retry with exponential backoff¶
SDK does this automatically
HCPClient retries transient errors (408, 429, 500-504) with exponential backoff and refreshes tokens on 401 — no manual retry logic needed. The code below is for raw httpx usage.
import asyncio
import httpx
from collections.abc import Callable
from typing import Any
RETRYABLE_STATUS = {408, 429, 500, 502, 503, 504}
async def retry(
fn: Callable[..., Any],
*args: Any,
max_attempts: int = 4,
base_delay: float = 1.0,
**kwargs: Any,
) -> Any:
"""Retry an async function with exponential backoff.
Retries on network errors and retryable HTTP status codes.
"""
last_exc: Exception | None = None
for attempt in range(max_attempts):
try:
return await fn(*args, **kwargs)
except httpx.HTTPStatusError as exc:
if exc.response.status_code not in RETRYABLE_STATUS:
raise # 400, 401, 403, 404, 409 -- don't retry
last_exc = exc
except (httpx.ConnectError, httpx.ReadTimeout, httpx.WriteTimeout) as exc:
last_exc = exc
delay = base_delay * (2 ** attempt)
print(f" Attempt {attempt + 1} failed, retrying in {delay:.1f}s...")
await asyncio.sleep(delay)
raise RuntimeError(f"All {max_attempts} attempts failed") from last_exc
Auto-refreshing auth wrapper¶
SDK does this automatically
HCPClient re-authenticates on 401 automatically. The wrapper below is for raw httpx.
sequenceDiagram
participant Client as HCPClient
participant API as HCP API
Client->>API: request (GET /statistics)
API-->>Client: 401 Unauthorized
Note over Client: Token expired — re-login
Client->>API: POST /auth/token
API-->>Client: new JWT
Client->>API: request (GET /statistics) with new token
API-->>Client: 200 OK + data
import httpx
from dataclasses import dataclass, field
BASE = "http://localhost:8000/api/v1"
@dataclass
class HCPClient:
"""HTTP client that re-authenticates on 401."""
username: str
password: str
tenant: str | None = None
_token: str = field(default="", init=False, repr=False)
_client: httpx.AsyncClient = field(init=False, repr=False)
async def __aenter__(self):
self._client = httpx.AsyncClient(base_url=BASE, timeout=30.0)
await self._login()
return self
async def __aexit__(self, *exc: object):
await self._client.aclose()
async def _login(self):
data: dict[str, str] = {
"username": self.username,
"password": self.password,
}
if self.tenant:
data["tenant"] = self.tenant
resp = await self._client.post("/auth/token", data=data)
resp.raise_for_status()
self._token = resp.json()["access_token"]
self._client.headers["Authorization"] = f"Bearer {self._token}"
async def request(self, method: str, url: str, **kwargs) -> httpx.Response:
"""Make a request, re-authenticate once on 401."""
resp = await self._client.request(method, url, **kwargs)
if resp.status_code == 401:
await self._login()
resp = await self._client.request(method, url, **kwargs)
return resp
Usage:
async def main():
async with HCPClient("<username>", "<password>", tenant="<tenant>") as hcp:
resp = await hcp.request("GET", "/mapi/tenants/<tenant>/statistics")
print(resp.json())
curl error handling¶
# Reusable function with status-code checking
hcp_request() {
local METHOD="$1" URL="$2"; shift 2
local RESP
RESP=$(curl -s -w "\n%{http_code}" -X "$METHOD" "$URL" \
-H "Authorization: Bearer $TOKEN" "$@")
local BODY=$(echo "$RESP" | sed '$d')
local CODE=$(echo "$RESP" | tail -1)
case "$CODE" in
2[0-9][0-9]) echo "$BODY" | jq . ;;
401) echo "ERROR: Token expired -- re-authenticate" >&2; return 1 ;;
404) echo "ERROR: Not found -- $URL" >&2; return 1 ;;
409) echo "ERROR: Conflict -- resource already exists" >&2; return 1 ;;
429|5[0-9][0-9])
echo "WARN: Retryable error $CODE, waiting 5s..." >&2
sleep 5
hcp_request "$METHOD" "$URL" "$@" # retry once
;;
*) echo "ERROR $CODE: $BODY" >&2; return 1 ;;
esac
}
# Usage
hcp_request GET "$BASE/mapi/tenants/$TENANT/statistics"
hcp_request PUT "$BASE/mapi/tenants/$TENANT/namespaces" \
-H "Content-Type: application/json" \
-d '{"name": "test-ns", "hardQuota": "10 GB"}'
Idempotent operations¶
Many HCP API operations are naturally idempotent. Knowing which ones are safe to retry avoids duplicate side-effects:
| Operation | Idempotent? | Notes |
|---|---|---|
GET / HEAD any endpoint |
Yes | Read-only, always safe to retry |
PUT create tenant/namespace |
Yes | Returns 409 Conflict if already exists -- check and continue |
PUT create user |
Yes | Same -- 409 means user already exists |
POST upload object (same key) |
Yes | Overwrites existing object with same key |
DELETE object/user/namespace |
Yes | Second delete returns 404 -- treat as success |
POST update settings |
Yes | Applies the same state, no cumulative effect |
POST change password |
No | Repeated calls succeed but generate audit events |
PUT multipart presign |
No | Each call creates a new upload ID |
Safe "create-if-not-exists" pattern¶
import httpx
async def ensure_namespace(
client: httpx.AsyncClient,
tenant: str,
name: str,
hard_quota: str = "100 GB",
) -> dict:
"""Create a namespace, or return the existing one."""
# Try to create
resp = await client.put(
f"/mapi/tenants/{tenant}/namespaces",
json={"name": name, "hardQuota": hard_quota},
)
match resp.status_code:
case 201 | 200:
print(f"Created namespace '{name}'")
return resp.json()
case 409:
# Already exists -- fetch and return it
resp = await client.get(
f"/mapi/tenants/{tenant}/namespaces/{name}",
params={"verbose": True},
)
resp.raise_for_status()
print(f"Namespace '{name}' already exists")
return resp.json()
case _:
resp.raise_for_status() # raises for unexpected errors
return {} # unreachable, satisfies type checker
# Bash equivalent -- create-if-not-exists
CODE=$(curl -s -o /dev/null -w "%{http_code}" -X PUT \
"$BASE/mapi/tenants/$TENANT/namespaces" \
-H "$AUTH" -H "Content-Type: application/json" \
-d "{\"name\": \"$NS\", \"hardQuota\": \"100 GB\"}")
if [ "$CODE" = "201" ] || [ "$CODE" = "200" ]; then
echo "Created namespace $NS"
elif [ "$CODE" = "409" ]; then
echo "Namespace $NS already exists (OK)"
else
echo "Failed to create namespace: HTTP $CODE" >&2
exit 1
fi
Multipart upload with fault tolerance¶
SDK does this automatically
client.s3.upload() auto-selects multipart for files >= 100 MB, uploads parts in parallel, and aborts on failure. The code below is for raw httpx.
Presigned multipart upload can resume after partial failures. If some parts fail, re-upload only the missing ones:
graph TD
INIT["1. Initiate<br/><small>POST /multipart/presign</small>"] --> PARTS
subgraph PARTS["2. Upload parts (parallel)"]
direction LR
P1["Part 1 ✓"]
P2["Part 2 ✗→retry→✓"]
P3["Part N ✓"]
end
PARTS --> CHECK{"All parts<br/>succeeded?"}
CHECK -->|Yes| COMPLETE["3. Complete<br/><small>POST /multipart/complete</small>"]
CHECK -->|No| ABORT["3. Abort<br/><small>POST /multipart/abort</small>"]
COMPLETE --> DONE["Object visible"]
ABORT --> ERR["Raise error<br/><small>caller may retry</small>"]
style DONE fill:#d4edda,stroke:#28a745
style ERR fill:#f8d7da,stroke:#dc3545
style P2 fill:#fff3cd,stroke:#ffc107
import asyncio
import httpx
from pathlib import Path
BASE = "http://localhost:8000/api/v1"
async def resilient_multipart_upload(
token: str,
bucket: str,
key: str,
file_path: str,
concurrency: int = 6,
part_retries: int = 3,
):
"""Multipart upload that retries individual failed parts."""
headers = {"Authorization": f"Bearer {token}"}
path = Path(file_path)
file_size = path.stat().st_size
# 1. Initiate
async with httpx.AsyncClient(base_url=BASE, headers=headers) as c:
resp = await c.post(
f"/buckets/{bucket}/multipart/{key}/presign",
json={"file_size": file_size},
)
resp.raise_for_status()
presign = resp.json()
upload_id = presign["upload_id"]
part_size = presign["part_size"]
data = path.read_bytes()
semaphore = asyncio.Semaphore(concurrency)
completed: dict[int, dict] = {}
failed: list[int] = []
async def upload_part(part_info: dict) -> None:
pn = part_info["part_number"]
url = part_info["url"]
start = (pn - 1) * part_size
end = min(start + part_size, file_size)
chunk = data[start:end]
async with semaphore:
for attempt in range(part_retries):
try:
async with httpx.AsyncClient(timeout=120.0) as hcp:
resp = await hcp.put(url, content=chunk)
resp.raise_for_status()
completed[pn] = {
"PartNumber": pn,
"ETag": resp.headers["etag"],
}
print(f" Part {pn}/{len(presign['urls'])} OK")
return
except (httpx.HTTPError, httpx.StreamError):
delay = 2.0 ** attempt
print(f" Part {pn} attempt {attempt + 1} failed, retry in {delay}s")
await asyncio.sleep(delay)
failed.append(pn)
# 2. Upload all parts with per-part retries
await asyncio.gather(*[upload_part(u) for u in presign["urls"]])
if failed:
# Abort the upload -- caller can retry the whole thing
async with httpx.AsyncClient(base_url=BASE, headers=headers) as c:
await c.post(
f"/buckets/{bucket}/multipart/{key}/abort",
json={"upload_id": upload_id},
)
raise RuntimeError(f"Parts {failed} failed after {part_retries} retries each")
# 3. Complete
parts = sorted(completed.values(), key=lambda p: p["PartNumber"])
async with httpx.AsyncClient(base_url=BASE, headers=headers) as c:
resp = await c.post(
f"/buckets/{bucket}/multipart/{key}/complete",
json={"upload_id": upload_id, "parts": parts},
)
resp.raise_for_status()
print("Upload complete:", resp.json())
Consistency considerations¶
HCP follows S3 semantics for consistency:
| Operation | Consistency |
|---|---|
PUT new object |
Read-after-write -- immediately visible |
PUT overwrite existing object |
Eventually consistent -- stale reads possible briefly |
DELETE object |
Eventually consistent -- object may still appear in listings briefly |
LIST objects |
Eventually consistent -- recently added/deleted objects may not appear immediately |
MAPI PUT/POST create/update |
Strongly consistent -- change visible on next read |
Eventual consistency workaround
If your script writes an object and then immediately reads it back (e.g., to verify a checksum), add a brief delay or use a HEAD request with retries:
import asyncio
import httpx
async def wait_for_object(
client: httpx.AsyncClient,
bucket: str,
key: str,
max_wait: float = 10.0,
) -> bool:
"""Poll until an object is visible after upload."""
elapsed = 0.0
delay = 0.5
while elapsed < max_wait:
resp = await client.head(f"/buckets/{bucket}/objects/{key}")
if resp.status_code == 200:
return True
await asyncio.sleep(delay)
elapsed += delay
delay = min(delay * 2, 2.0)
return False
Argo-native retries and timeouts¶
Argo Workflows has built-in retry and timeout support at the template level. Use these in addition to application-level retries -- Argo retries restart the entire pod, which handles OOM kills, node evictions, and infrastructure failures that application code cannot catch.
templates:
- name: process-data
retryStrategy:
limit: 3 # max 3 retries (4 total attempts)
retryPolicy: Always # retry on both errors and node failures
backoff:
duration: "10s" # initial delay
factor: 2 # exponential: 10s, 20s, 40s
maxDuration: "2m" # cap at 2 minutes
activeDeadlineSeconds: 600 # hard timeout: kill pod after 10 min
container:
image: python:3.13-slim
command: [python, -c]
args:
- |
# your processing code here
print("processing...")
from hera.workflows import models as m, script
@script(
image="python:3.13-slim",
retry_strategy=m.RetryStrategy(
limit="3",
retry_policy="Always",
backoff=m.Backoff(duration="10s", factor=2, max_duration="2m"),
),
active_deadline_seconds=600,
)
def process_data():
"""Processing with Argo-managed retries and timeout."""
print("processing...")
Cleanup on failure -- exit handlers¶
Use Argo exit handlers to guarantee cleanup runs regardless of workflow success or failure. This is essential for aborting in-progress multipart uploads, deleting partial results, or releasing resources.
graph TD
WF["Workflow<br/><small>upload-pipeline</small>"]
WF -->|runs| UPLOAD["do-upload"]
UPLOAD --> RESULT{"Status?"}
RESULT -->|Succeeded| EXIT_OK["cleanup<br/><small>no-op</small>"]
RESULT -->|Failed / Error| EXIT_FAIL["cleanup<br/><small>delete partial/</small>"]
EXIT_FAIL -.->|bulk DELETE| S3[("HCP S3<br/>partial/ prefix")]
style EXIT_OK fill:#d4edda,stroke:#28a745
style EXIT_FAIL fill:#fff3cd,stroke:#ffc107
linkStyle 3 stroke:#dc3545
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hcp-safe-upload-
spec:
entrypoint: upload-pipeline
onExit: cleanup # always runs, even on failure
arguments:
parameters:
- name: hcp-api-base
value: "http://hcp-api.default.svc:8000/api/v1"
- name: hcp-token
value: "<your-token>"
- name: bucket
value: "datasets"
templates:
- name: upload-pipeline
steps:
- - name: upload
template: do-upload
- name: do-upload
retryStrategy:
limit: 2
script:
image: python:3.13-slim
command: [python]
source: |
# ... upload logic ...
print("uploading data")
# ── Guaranteed cleanup ──────────────────────────────────────
- name: cleanup
script:
image: curlimages/curl:latest
command: [sh]
source: |
BASE="{{workflow.parameters.hcp-api-base}}"
TOKEN="{{workflow.parameters.hcp-token}}"
BUCKET="{{workflow.parameters.bucket}}"
STATUS="{{workflow.status}}" # Succeeded / Failed / Error
if [ "$STATUS" != "Succeeded" ]; then
echo "Workflow $STATUS -- cleaning up partial uploads..."
# Abort any in-progress multipart uploads
# Delete partial results to avoid polluting the bucket
curl -s -X POST "$BASE/buckets/$BUCKET/objects/delete" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d "{\"keys\": [\"partial/{{workflow.name}}/\"]}" || true
echo "Cleanup complete"
else
echo "Workflow succeeded -- no cleanup needed"
fi
from hera.workflows import DAG, Parameter, Steps, Workflow, models as m, script
HCP_BASE = "http://hcp-api.default.svc:8000/api/v1"
@script(image="python:3.13-slim", retry_strategy=m.RetryStrategy(limit="2"))
def do_upload(hcp_api_base: str, hcp_token: str, bucket: str):
"""Upload data to HCP."""
print("uploading data")
@script(image="curlimages/curl:latest")
def cleanup(hcp_api_base: str, hcp_token: str, bucket: str):
"""Clean up partial uploads on failure. Runs as onExit handler."""
import subprocess, os
status = os.environ.get("ARGO_WORKFLOW_STATUS", "Unknown")
if status != "Succeeded":
print(f"Workflow {status} -- cleaning up...")
subprocess.run(
[
"curl", "-s", "-X", "POST",
f"{hcp_api_base}/buckets/{bucket}/objects/delete",
"-H", f"Authorization: Bearer {hcp_token}",
"-H", "Content-Type: application/json",
"-d", '{"keys": ["partial/"]}',
],
check=False, # don't fail cleanup on errors
)
print("Cleanup complete")
else:
print("Workflow succeeded -- no cleanup needed")
with Workflow(
generate_name="hcp-safe-upload-",
entrypoint="upload-pipeline",
on_exit="cleanup",
arguments=[
Parameter(name="hcp-api-base", value=HCP_BASE),
Parameter(name="hcp-token", value="<your-token>"),
Parameter(name="bucket", value="datasets"),
],
) as w:
with Steps(name="upload-pipeline"):
do_upload(
name="upload",
arguments={
"hcp_api_base": "{{workflow.parameters.hcp-api-base}}",
"hcp_token": "{{workflow.parameters.hcp-token}}",
"bucket": "{{workflow.parameters.bucket}}",
},
)
# Exit handler template (registered by name via on_exit="cleanup")
cleanup(
name="cleanup",
arguments={
"hcp_api_base": "{{workflow.parameters.hcp-api-base}}",
"hcp_token": "{{workflow.parameters.hcp-token}}",
"bucket": "{{workflow.parameters.bucket}}",
},
)
w.create()
ACID-like guarantees for workflows¶
Object storage is not a relational database -- there are no transactions. But you can approximate ACID properties through careful workflow design:
graph TD
subgraph Atomicity
A1["onExit handler<br/><small>cleanup partial state</small>"]
A2["Staged-commit<br/><small>staging/ → final/</small>"]
end
subgraph Consistency
C1["HEAD polling<br/><small>wait_for_object</small>"]
C2["Unique keys<br/><small>per workflow run</small>"]
end
subgraph Isolation
I1["Namespace outputs<br/><small>results/workflow-id/</small>"]
end
subgraph Durability
D1["Verify uploads<br/><small>HEAD + ETag check</small>"]
D2["Versioned namespaces<br/><small>for critical data</small>"]
end
| Property | S3/HCP reality | How to achieve it |
|---|---|---|
| Atomicity | Individual PUTs are atomic, but a multi-object "transaction" is not. | Use exit handlers (onExit) to clean up partial state on failure. Write to a staging prefix first, then "commit" by copying to the final location. |
| Consistency | New PUTs are read-after-write consistent. Overwrites and deletes are eventually consistent. | Use HEAD polling after writes (wait_for_object above). Use unique keys per workflow run ({{workflow.name}}) to avoid overwrites. |
| Isolation | No locking — concurrent writers to the same key cause last-write-wins. | Namespace outputs by workflow run ID: results/{{workflow.name}}/output.json. Never write to a shared key from parallel pods. |
| Durability | S3 objects are durable once the PUT succeeds (HCP replicates across nodes). | Verify uploads with a HEAD request checking Content-Length and ETag. For critical data, use versioning-enabled namespaces. |
Staged-commit pattern¶
Write results to a temporary prefix, verify them, then "commit" by copying to the final location. On failure, the exit handler deletes the staging prefix.
Use the rahcp SDK
The SDK handles the staged-commit pattern in two method calls:
async with HCPClient.from_env() as client:
# Write files to staging/...
for f in files:
await client.s3.upload("bucket", f"staging/batch-1/{f.name}", f)
# Commit: copy staging → final, delete staging
count = await client.s3.commit_staging("bucket", "staging/batch-1/", "final/batch-1/")
# On failure: await client.s3.cleanup_staging("bucket", "staging/batch-1/")
graph LR
subgraph S3["HCP S3"]
STG[("staging/<br/>workflow-id/")]
FINAL[("final/<br/>key")]
end
W["1. Write"] -->|PUT| STG
STG --> V["2. Verify<br/><small>HEAD check size</small>"]
V -->|match| C["3. Commit<br/><small>COPY → final</small>"]
C -->|copy| FINAL
C -->|delete staging| CL["4. Cleanup<br/><small>DELETE staging</small>"]
V -->|mismatch| DEL["DELETE staging<br/><small>+ raise error</small>"]
style FINAL fill:#d4edda,stroke:#28a745
style DEL fill:#f8d7da,stroke:#dc3545
import httpx
BASE = "http://localhost:8000/api/v1"
async def staged_upload(
token: str,
bucket: str,
final_key: str,
data: bytes,
workflow_id: str,
):
"""Upload to a staging key, verify, then copy to the final location."""
headers = {"Authorization": f"Bearer {token}"}
staging_key = f"staging/{workflow_id}/{final_key}"
async with httpx.AsyncClient(base_url=BASE, headers=headers, timeout=60.0) as c:
# 1. Write to staging
resp = await c.put(
f"/buckets/{bucket}/objects/{staging_key}",
content=data,
)
resp.raise_for_status()
# 2. Verify — HEAD to confirm size
resp = await c.head(f"/buckets/{bucket}/objects/{staging_key}")
resp.raise_for_status()
actual_size = int(resp.headers.get("content-length", 0))
if actual_size != len(data):
# Mismatch — delete staging and raise
await c.delete(f"/buckets/{bucket}/objects/{staging_key}")
raise RuntimeError(
f"Size mismatch: expected {len(data)}, got {actual_size}"
)
# 3. Commit — copy from staging to final
resp = await c.post(
f"/buckets/{bucket}/objects/{final_key}/copy",
json={"source_bucket": bucket, "source_key": staging_key},
)
resp.raise_for_status()
# 4. Clean up staging
await c.delete(f"/buckets/{bucket}/objects/{staging_key}")
print(f"Committed {final_key} ({actual_size} bytes)")
# Bash equivalent — staged commit
STAGING_KEY="staging/$(uuidgen)/$FINAL_KEY"
# 1. Write to staging
curl -s -X PUT "$BASE/buckets/$BUCKET/objects/$STAGING_KEY" \
-H "$AUTH" --data-binary @"$LOCAL_FILE"
# 2. Verify
SIZE=$(curl -s -I "$BASE/buckets/$BUCKET/objects/$STAGING_KEY" \
-H "$AUTH" | grep -i content-length | awk '{print $2}' | tr -d '\r')
EXPECTED=$(stat -f%z "$LOCAL_FILE")
if [ "$SIZE" != "$EXPECTED" ]; then
echo "ERROR: Size mismatch ($SIZE != $EXPECTED)" >&2
curl -s -X DELETE "$BASE/buckets/$BUCKET/objects/$STAGING_KEY" -H "$AUTH"
exit 1
fi
# 3. Commit — copy to final location
curl -s -X POST "$BASE/buckets/$BUCKET/objects/$FINAL_KEY/copy" \
-H "$AUTH" -H "Content-Type: application/json" \
-d "{\"source_bucket\": \"$BUCKET\", \"source_key\": \"$STAGING_KEY\"}"
# 4. Clean up staging
curl -s -X DELETE "$BASE/buckets/$BUCKET/objects/$STAGING_KEY" -H "$AUTH"
echo "Committed $FINAL_KEY"
from rahcp_client import HCPClient
from pathlib import Path
async def staged_upload_sdk(
bucket: str,
local_dir: Path,
staging_prefix: str,
final_prefix: str,
):
"""Upload files to staging, then commit to final — or clean up on failure."""
async with HCPClient.from_env() as client:
try:
# 1. Write all files to staging
for f in local_dir.rglob("*"):
if f.is_file():
key = f"{staging_prefix}{f.relative_to(local_dir)}"
await client.s3.upload(bucket, key, f)
# 2. Commit: copy staging → final, delete staging
count = await client.s3.commit_staging(bucket, staging_prefix, final_prefix)
print(f"Committed {count} files to {final_prefix}")
except Exception:
# 3. Rollback: delete staging prefix
deleted = await client.s3.cleanup_staging(bucket, staging_prefix)
print(f"Rolled back {deleted} staged files")
raise
Multipart uploads are not atomic
A multipart upload is only visible after CompleteMultipartUpload. If the workflow fails between uploading parts and completing, the parts remain as invisible orphans consuming storage. Always use exit handlers to call AbortMultipartUpload on failure, and consider enabling artifactGC on the workflow to clean up Argo-managed artifacts.
Related pages¶
- Python SDK --
rahcp-clientwith built-in retries, presigned URLs, staged-commit, and multipart uploads. - API Workflows -- curl and Python examples for authentication, S3 operations, tenant/namespace management, and more.
- Argo Workflows -- ETL pipelines, presigned URL workflows, and batch fan-out/fan-in with YAML and Hera.