Skip to content

SDK API Reference

Auto-generated from source code docstrings.

rahcp-client

HCPClient

HCPClient(endpoint='http://localhost:8000/api/v1', username='', password='', tenant=None, *, timeout=30.0, max_retries=4, retry_base_delay=1.0, multipart_threshold=100 * 1024 * 1024, multipart_chunk=64 * 1024 * 1024, multipart_concurrency=6, verify_ssl=True)

Async HTTP client for the HCP Unified API.

Usage::

async with HCPClient.from_env() as hcp:
    buckets = await hcp.s3.list_objects("bucket", "prefix/")
    ns = await hcp.mapi.list_namespaces("tenant")

Initialize the HCP client.

Parameters:

Name Type Description Default
endpoint str

Base URL for the HCP Unified API.

'http://localhost:8000/api/v1'
username str

Username for authentication.

''
password str

Password for authentication.

''
tenant str | None

HCP tenant name (routes requests to the correct tenant).

None
timeout float

HTTP request timeout in seconds.

30.0
max_retries int

Maximum number of retry attempts for transient errors.

4
retry_base_delay float

Base delay between retries in seconds (doubles each attempt).

1.0
multipart_threshold int

File size in bytes above which multipart upload is used.

100 * 1024 * 1024
multipart_chunk int

Part size in bytes for multipart uploads.

64 * 1024 * 1024
verify_ssl bool

Whether to verify TLS certificates.

True
Source code in packages/rahcp-client/src/rahcp_client/client.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def __init__(
    self,
    endpoint: str = "http://localhost:8000/api/v1",
    username: str = "",
    password: str = "",
    tenant: str | None = None,
    *,
    timeout: float = 30.0,
    max_retries: int = 4,
    retry_base_delay: float = 1.0,
    multipart_threshold: int = 100 * 1024 * 1024,
    multipart_chunk: int = 64 * 1024 * 1024,
    multipart_concurrency: int = 6,
    verify_ssl: bool = True,
) -> None:
    """Initialize the HCP client.

    Args:
        endpoint: Base URL for the HCP Unified API.
        username: Username for authentication.
        password: Password for authentication.
        tenant: HCP tenant name (routes requests to the correct tenant).
        timeout: HTTP request timeout in seconds.
        max_retries: Maximum number of retry attempts for transient errors.
        retry_base_delay: Base delay between retries in seconds (doubles each attempt).
        multipart_threshold: File size in bytes above which multipart upload is used.
        multipart_chunk: Part size in bytes for multipart uploads.
        verify_ssl: Whether to verify TLS certificates.
    """
    self.endpoint = endpoint.rstrip("/")
    self.username = username
    self.password = password
    self.tenant = tenant
    self.timeout = timeout
    self.max_retries = max_retries
    self.retry_base_delay = retry_base_delay
    self.multipart_threshold = multipart_threshold
    self.multipart_chunk = multipart_chunk
    self.multipart_concurrency = multipart_concurrency
    self.verify_ssl = verify_ssl

    self._http = httpx.AsyncClient(
        base_url=self.endpoint,
        timeout=timeout,
        verify=verify_ssl,
    )
    self._token: str | None = None
    self._s3: S3Ops | None = None
    self._mapi: MapiOps | None = None

token property

The current bearer token (set after login).

s3 property

S3 data-plane operations.

mapi property

MAPI management-plane operations.

transfer_settings property

Settings needed by the bulk transfer engine.

from_env() classmethod

Create a client configured from HCP_* environment variables.

Source code in packages/rahcp-client/src/rahcp_client/client.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
@classmethod
def from_env(cls) -> HCPClient:
    """Create a client configured from ``HCP_*`` environment variables."""
    settings = HCPSettings()
    return cls(
        endpoint=settings.endpoint,
        username=settings.username,
        password=settings.password,
        tenant=settings.tenant,
        timeout=settings.timeout,
        max_retries=settings.max_retries,
        retry_base_delay=settings.retry_base_delay,
        multipart_threshold=settings.multipart_threshold,
        multipart_chunk=settings.multipart_chunk,
        multipart_concurrency=settings.multipart_concurrency,
        verify_ssl=settings.verify_ssl,
    )

request(method, path, *, params=None, json=None, data=None, content=None, headers=None) async

Send an HTTP request with retry, tracing, and automatic 401 refresh.

Source code in packages/rahcp-client/src/rahcp_client/client.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
async def request(
    self,
    method: str,
    path: str,
    *,
    params: dict[str, Any] | None = None,
    json: Any = None,
    data: Any = None,
    content: bytes | None = None,
    headers: dict[str, str] | None = None,
) -> httpx.Response:
    """Send an HTTP request with retry, tracing, and automatic 401 refresh."""
    merged_headers = {**self._auth_headers(), **(headers or {})}
    refreshed = False

    with tracer.start_as_current_span(f"{method} {path}") as span:
        span.set_attribute("http.method", method)
        span.set_attribute("http.path", path)

        async for attempt in AsyncRetrying(
            stop=stop_after_attempt(self.max_retries + 1),
            wait=wait_exponential_jitter(
                initial=self.retry_base_delay,
                max=60,
                jitter=self.retry_base_delay,
            ),
            retry=retry_if_exception_type(RetryableError),
            reraise=True,
        ):
            with attempt:
                t0 = time.monotonic()
                try:
                    response = await self._http.request(
                        method,
                        path,
                        params=params,
                        json=json,
                        data=data,
                        content=content,
                        headers=merged_headers,
                    )
                except httpx.TransportError as exc:
                    duration = (time.monotonic() - t0) * 1000
                    log.warning(
                        "%s %s — transport error after %.0fms: %s",
                        method,
                        path,
                        duration,
                        exc,
                    )
                    raise RetryableError(str(exc)) from exc

                duration = (time.monotonic() - t0) * 1000
                span.set_attribute("http.status_code", response.status_code)

                log.debug(
                    "%s %s%d (%.0fms)",
                    method,
                    path,
                    response.status_code,
                    duration,
                )

                # One-time token refresh on 401
                if response.status_code == 401 and self.username and not refreshed:
                    log.info("Token expired, refreshing...")
                    await self._login()
                    merged_headers = {**self._auth_headers(), **(headers or {})}
                    refreshed = True
                    raise RetryableError("token refresh")

                if response.status_code in _RETRYABLE_STATUSES:
                    raise error_for_status(
                        response.status_code, _redact(response.text)
                    )

                if response.status_code >= 400:
                    safe_body = _redact(response.text)
                    log_fn = log.debug if response.status_code == 404 else log.error
                    log_fn(
                        "%s %s%d (%.0fms): %s",
                        method,
                        path,
                        response.status_code,
                        duration,
                        safe_body,
                    )
                    raise error_for_status(response.status_code, safe_body)

                return response

    raise RetryableError("All retries exhausted")  # pragma: no cover

S3Ops

S3Ops(client)

S3 data-plane operations — presigned-first, multipart for large files.

All data transfer prefers presigned URLs. Files above client.multipart_threshold automatically use multipart upload.

Path mapping to the backend API (all under /api/v1): GET /buckets — list buckets GET /buckets/{b}/objects — list objects POST /presign — single presigned URL POST /buckets/{b}/objects/presign — bulk presigned URLs POST /buckets/{b}/multipart/{key} — initiate multipart POST /buckets/{b}/multipart/{key}/presign — presign parts POST /buckets/{b}/multipart/{key}/complete — complete multipart DELETE /buckets/{b}/objects/{key} — delete object POST /buckets/{b}/objects/delete — bulk delete POST /buckets/{b}/objects/{key}/copy — copy object HEAD /buckets/{b}/objects/{key} — head object

Source code in packages/rahcp-client/src/rahcp_client/s3.py
51
52
def __init__(self, client: HCPClient) -> None:
    self._client = client

presign_get(bucket, key, *, expires=3600) async

Get a presigned download URL.

Source code in packages/rahcp-client/src/rahcp_client/s3.py
62
63
64
65
66
67
68
69
70
71
72
73
74
async def presign_get(self, bucket: str, key: str, *, expires: int = 3600) -> str:
    """Get a presigned download URL."""
    resp = await self._client.request(
        "POST",
        "/presign",
        json={
            "bucket": bucket,
            "key": key,
            "method": "get_object",
            "expires_in": expires,
        },
    )
    return resp.json()["url"]

presign_put(bucket, key, *, expires=3600) async

Get a presigned upload URL.

Source code in packages/rahcp-client/src/rahcp_client/s3.py
76
77
78
79
80
81
82
83
84
85
86
87
88
async def presign_put(self, bucket: str, key: str, *, expires: int = 3600) -> str:
    """Get a presigned upload URL."""
    resp = await self._client.request(
        "POST",
        "/presign",
        json={
            "bucket": bucket,
            "key": key,
            "method": "put_object",
            "expires_in": expires,
        },
    )
    return resp.json()["url"]

presign_bulk(bucket, keys, *, method='get_object', expires=3600) async

Presign multiple keys in one API call. Returns {key: url} mapping.

Source code in packages/rahcp-client/src/rahcp_client/s3.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
async def presign_bulk(
    self,
    bucket: str,
    keys: list[str],
    *,
    method: str = "get_object",
    expires: int = 3600,
) -> dict[str, str]:
    """Presign multiple keys in one API call. Returns {key: url} mapping."""
    resp = await self._client.request(
        "POST",
        f"/buckets/{bucket}/objects/presign",
        json={
            "keys": keys,
            "method": method,
            "expires_in": expires,
        },
    )
    return {item["key"]: item["url"] for item in resp.json()["urls"]}

upload(bucket, key, data) async

Upload data — auto-selects presigned PUT or multipart based on size.

Returns the ETag of the uploaded object.

Source code in packages/rahcp-client/src/rahcp_client/s3.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
async def upload(self, bucket: str, key: str, data: bytes | Path) -> str:
    """Upload data — auto-selects presigned PUT or multipart based on size.

    Returns the ETag of the uploaded object.
    """
    with tracer.start_as_current_span("s3.upload") as span:
        span.set_attribute("s3.bucket", bucket)
        span.set_attribute("s3.key", key)

        if isinstance(data, Path):
            size = await asyncio.to_thread(lambda: data.stat().st_size)
            span.set_attribute("s3.size", size)
            if size >= self._client.multipart_threshold:
                return await self.upload_multipart(bucket, key, data)
            content = await asyncio.to_thread(data.read_bytes)
        else:
            content = data
            span.set_attribute("s3.size", len(content))

        url = await self.presign_put(bucket, key)
        async with self._make_http_client() as http:
            resp = await http.put(url, content=content)
            _raise_for_presigned(resp, bucket, key)
        log.debug("Uploaded %s/%s (%s bytes)", bucket, key, len(content))
        return resp.headers.get("etag", "")

download(bucket, key, dest) async

Download an object via presigned GET. Returns byte count.

Source code in packages/rahcp-client/src/rahcp_client/s3.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
async def download(self, bucket: str, key: str, dest: Path) -> int:
    """Download an object via presigned GET. Returns byte count."""
    with tracer.start_as_current_span("s3.download") as span:
        span.set_attribute("s3.bucket", bucket)
        span.set_attribute("s3.key", key)
        url = await self.presign_get(bucket, key)
        async with self._make_http_client() as http:
            async with http.stream("GET", url) as resp:
                _raise_for_presigned(resp, bucket, key)
                total = 0
                file_handle = await asyncio.to_thread(dest.open, "wb")
                try:
                    async for chunk in resp.aiter_bytes(chunk_size=8192):
                        await asyncio.to_thread(file_handle.write, chunk)
                        total += len(chunk)
                finally:
                    await asyncio.to_thread(file_handle.close)
        span.set_attribute("s3.bytes", total)
        log.debug("Downloaded %s/%s (%d bytes)", bucket, key, total)
        return total

download_bytes(bucket, key) async

Download an object as bytes via presigned GET.

Source code in packages/rahcp-client/src/rahcp_client/s3.py
159
160
161
162
163
164
165
async def download_bytes(self, bucket: str, key: str) -> bytes:
    """Download an object as bytes via presigned GET."""
    url = await self.presign_get(bucket, key)
    async with self._make_http_client() as http:
        resp = await http.get(url)
        _raise_for_presigned(resp, bucket, key)
    return resp.content

upload_multipart(bucket, key, path, *, concurrency=None) async

Presigned multipart upload for large files.

  1. Initiate multipart → upload_id
  2. Presign each part
  3. Upload parts in parallel (bounded semaphore)
  4. Complete multipart (or abort on failure)
Source code in packages/rahcp-client/src/rahcp_client/s3.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
async def upload_multipart(
    self,
    bucket: str,
    key: str,
    path: Path,
    *,
    concurrency: int | None = None,
) -> str:
    """Presigned multipart upload for large files.

    1. Initiate multipart → upload_id
    2. Presign each part
    3. Upload parts in parallel (bounded semaphore)
    4. Complete multipart (or abort on failure)
    """
    concurrency = concurrency or self._client.multipart_concurrency
    file_size = await asyncio.to_thread(lambda: path.stat().st_size)
    default_part_size = self._client.multipart_chunk

    resp = await self._client.request(
        "POST",
        f"/buckets/{bucket}/multipart/{key}",
    )
    upload_id = resp.json()["upload_id"]

    try:
        resp = await self._client.request(
            "POST",
            f"/buckets/{bucket}/multipart/{key}/presign",
            json={
                "upload_id": upload_id,
                "file_size": file_size,
                "part_size": default_part_size,
            },
        )
        data = resp.json()
        part_urls = [p["url"] for p in data["urls"]]
        part_size = data.get("part_size", default_part_size)

        sem = asyncio.Semaphore(concurrency)

        part_timeout = httpx.Timeout(
            max(300.0, self._client.timeout * 5), connect=30.0
        )

        async def _upload_part(part_num: int, url: str) -> dict[str, Any]:
            offset = part_num * part_size
            read_size = min(part_size, file_size - offset)

            def _read_chunk() -> bytes:
                with path.open("rb") as f:
                    f.seek(offset)
                    return f.read(read_size)

            part_data = await asyncio.to_thread(_read_chunk)
            async with sem:
                async with httpx.AsyncClient(
                    verify=self._client.verify_ssl, timeout=part_timeout
                ) as http:
                    log.debug(
                        "Uploading part %d/%d (%d bytes) for %s/%s",
                        part_num + 1,
                        len(part_urls),
                        len(part_data),
                        bucket,
                        key,
                    )
                    resp = await http.put(url, content=part_data)
                    _raise_for_presigned(
                        resp, bucket, f"{key} (part {part_num + 1})"
                    )
                    log.debug(
                        "Part %d/%d uploaded (etag: %s)",
                        part_num + 1,
                        len(part_urls),
                        resp.headers.get("etag", "?"),
                    )
                    return {"part_number": part_num + 1, "etag": resp.headers["etag"]}

        parts = await asyncio.gather(
            *[_upload_part(i, url) for i, url in enumerate(part_urls)]
        )

        parts_sorted = sorted(parts, key=lambda p: p["part_number"])
        resp = await self._client.request(
            "POST",
            f"/buckets/{bucket}/multipart/{key}/complete",
            json={
                "upload_id": upload_id,
                "parts": [
                    {"ETag": p["etag"], "PartNumber": p["part_number"]}
                    for p in parts_sorted
                ],
            },
        )
        log.info(
            "Multipart upload complete: %s/%s (%d parts)",
            bucket,
            key,
            len(parts_sorted),
        )
        return resp.json().get("etag", "")

    except Exception:
        log.warning("Multipart upload failed, aborting: %s/%s", bucket, key)
        try:
            await self._client.request(
                "POST",
                f"/buckets/{bucket}/multipart/{key}/abort",
                json={"upload_id": upload_id},
            )
        except Exception:
            log.warning("Failed to abort multipart upload: %s", upload_id)
        raise

list_buckets() async

List all S3 buckets.

Source code in packages/rahcp-client/src/rahcp_client/s3.py
286
287
288
289
async def list_buckets(self) -> dict[str, Any]:
    """List all S3 buckets."""
    resp = await self._client.request("GET", "/buckets")
    return resp.json()

list_objects(bucket, prefix='', *, max_keys=1000, continuation_token=None, delimiter=None) async

List objects in a bucket with optional prefix and pagination.

Source code in packages/rahcp-client/src/rahcp_client/s3.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
async def list_objects(
    self,
    bucket: str,
    prefix: str = "",
    *,
    max_keys: int = 1000,
    continuation_token: str | None = None,
    delimiter: str | None = None,
) -> dict[str, Any]:
    """List objects in a bucket with optional prefix and pagination."""
    params: dict[str, Any] = {"prefix": prefix, "max_keys": max_keys}
    if continuation_token:
        params["continuation_token"] = continuation_token
    if delimiter:
        params["delimiter"] = delimiter
    resp = await self._client.request(
        "GET",
        f"/buckets/{bucket}/objects",
        params=params,
    )
    return resp.json()

delete(bucket, key) async

Delete a single object.

Source code in packages/rahcp-client/src/rahcp_client/s3.py
330
331
332
async def delete(self, bucket: str, key: str) -> None:
    """Delete a single object."""
    await self._client.request("DELETE", f"/buckets/{bucket}/objects/{key}")

delete_bulk(bucket, keys) async

Delete multiple objects.

Source code in packages/rahcp-client/src/rahcp_client/s3.py
334
335
336
337
338
339
340
341
async def delete_bulk(self, bucket: str, keys: list[str]) -> dict[str, Any]:
    """Delete multiple objects."""
    resp = await self._client.request(
        "POST",
        f"/buckets/{bucket}/objects/delete",
        json={"keys": keys},
    )
    return resp.json()

copy(bucket, key, source_bucket, source_key) async

Copy an object.

Source code in packages/rahcp-client/src/rahcp_client/s3.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
async def copy(
    self,
    bucket: str,
    key: str,
    source_bucket: str,
    source_key: str,
) -> None:
    """Copy an object."""
    await self._client.request(
        "POST",
        f"/buckets/{bucket}/objects/{key}/copy",
        json={
            "source_bucket": source_bucket,
            "source_key": source_key,
        },
    )

head(bucket, key) async

Get object metadata (content-length, content-type, etag, last-modified).

Source code in packages/rahcp-client/src/rahcp_client/s3.py
360
361
362
363
async def head(self, bucket: str, key: str) -> dict[str, Any]:
    """Get object metadata (content-length, content-type, etag, last-modified)."""
    resp = await self._client.request("HEAD", f"/buckets/{bucket}/objects/{key}")
    return dict(resp.headers)

commit_staging(bucket, staging_prefix, dest_prefix) async

Move objects from staging prefix to destination. Returns count.

Source code in packages/rahcp-client/src/rahcp_client/s3.py
367
368
369
370
371
372
373
374
375
376
377
378
async def commit_staging(
    self, bucket: str, staging_prefix: str, dest_prefix: str
) -> int:
    """Move objects from staging prefix to destination. Returns count."""
    count = 0
    async for obj in self._paginate_objects(bucket, staging_prefix):
        src_key = obj["Key"]
        dest_key = src_key.replace(staging_prefix, dest_prefix, 1)
        await self.copy(bucket, dest_key, bucket, src_key)
        await self.delete(bucket, src_key)
        count += 1
    return count

cleanup_staging(bucket, staging_prefix) async

Delete all objects under a staging prefix. Paginates. Returns count.

Source code in packages/rahcp-client/src/rahcp_client/s3.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
async def cleanup_staging(self, bucket: str, staging_prefix: str) -> int:
    """Delete all objects under a staging prefix. Paginates. Returns count."""
    total = 0
    batch: list[str] = []
    async for obj in self._paginate_objects(bucket, staging_prefix):
        batch.append(obj["Key"])
        if len(batch) >= 1000:
            await self.delete_bulk(bucket, batch)
            total += len(batch)
            batch.clear()
    if batch:
        await self.delete_bulk(bucket, batch)
        total += len(batch)
    return total

MapiOps

MapiOps(client)

MAPI management-plane — tenant admin and namespace operations only.

This is intentionally thin: user/group CRUD, statistics, chargeback, and system-level admin are handled by the backend directly.

HTTP method mapping (matches HCP MAPI conventions): GET — read PUT — create POST — update/modify DELETE — delete

Source code in packages/rahcp-client/src/rahcp_client/mapi.py
27
28
def __init__(self, client: HCPClient) -> None:
    self._client = client

list_namespaces(tenant, *, verbose=False) async

List all namespaces for a tenant.

Source code in packages/rahcp-client/src/rahcp_client/mapi.py
30
31
32
33
34
35
36
37
38
39
40
async def list_namespaces(
    self, tenant: str, *, verbose: bool = False
) -> list[dict[str, Any]]:
    """List all namespaces for a tenant."""
    params: dict[str, Any] = {}
    if verbose:
        params["verbose"] = "true"
    resp = await self._client.request(
        "GET", f"/mapi/tenants/{tenant}/namespaces", params=params
    )
    return resp.json()

get_namespace(tenant, ns, *, verbose=False) async

Get namespace details.

Source code in packages/rahcp-client/src/rahcp_client/mapi.py
42
43
44
45
46
47
48
49
50
51
52
async def get_namespace(
    self, tenant: str, ns: str, *, verbose: bool = False
) -> dict[str, Any]:
    """Get namespace details."""
    params: dict[str, Any] = {}
    if verbose:
        params["verbose"] = "true"
    resp = await self._client.request(
        "GET", f"/mapi/tenants/{tenant}/namespaces/{ns}", params=params
    )
    return resp.json()

create_namespace(tenant, ns_data) async

Create a namespace. Uses PUT (MAPI convention).

Source code in packages/rahcp-client/src/rahcp_client/mapi.py
54
55
56
57
58
59
60
61
async def create_namespace(
    self, tenant: str, ns_data: dict[str, Any]
) -> dict[str, Any]:
    """Create a namespace. Uses PUT (MAPI convention)."""
    resp = await self._client.request(
        "PUT", f"/mapi/tenants/{tenant}/namespaces", json=ns_data
    )
    return resp.json()

update_namespace(tenant, ns, data) async

Update namespace settings. Uses POST (MAPI convention).

Source code in packages/rahcp-client/src/rahcp_client/mapi.py
63
64
65
66
67
68
69
async def update_namespace(
    self, tenant: str, ns: str, data: dict[str, Any]
) -> None:
    """Update namespace settings. Uses POST (MAPI convention)."""
    await self._client.request(
        "POST", f"/mapi/tenants/{tenant}/namespaces/{ns}", json=data
    )

delete_namespace(tenant, ns) async

Delete a namespace.

Source code in packages/rahcp-client/src/rahcp_client/mapi.py
71
72
73
async def delete_namespace(self, tenant: str, ns: str) -> None:
    """Delete a namespace."""
    await self._client.request("DELETE", f"/mapi/tenants/{tenant}/namespaces/{ns}")

export_namespace(tenant, ns) async

Export a namespace as a reusable template.

Source code in packages/rahcp-client/src/rahcp_client/mapi.py
75
76
77
78
79
80
async def export_namespace(self, tenant: str, ns: str) -> dict[str, Any]:
    """Export a namespace as a reusable template."""
    resp = await self._client.request(
        "GET", f"/mapi/tenants/{tenant}/namespaces/{ns}/export"
    )
    return resp.json()

export_namespaces(tenant, names) async

Export multiple namespaces as templates.

Source code in packages/rahcp-client/src/rahcp_client/mapi.py
82
83
84
85
86
87
88
89
async def export_namespaces(self, tenant: str, names: list[str]) -> dict[str, Any]:
    """Export multiple namespaces as templates."""
    resp = await self._client.request(
        "GET",
        f"/mapi/tenants/{tenant}/namespaces/export",
        params={"names": ",".join(names)},
    )
    return resp.json()

Configuration

HCPSettings

Bases: BaseSettings

Configuration for the HCP client.

All values can be set via HCP_-prefixed environment variables, e.g. HCP_ENDPOINT, HCP_USERNAME, HCP_TENANT.

Errors

errors

HCPError hierarchy — maps HTTP status codes to typed exceptions.

HCPError(message, *, status_code=None)

Bases: Exception

Base error for all rahcp operations.

Source code in packages/rahcp-client/src/rahcp_client/errors.py
 9
10
11
12
def __init__(self, message: str, *, status_code: int | None = None) -> None:
    super().__init__(message)
    self.message = message
    self.status_code = status_code

AuthenticationError(message, *, status_code=None)

Bases: HCPError

401 — bad credentials or expired token.

Source code in packages/rahcp-client/src/rahcp_client/errors.py
 9
10
11
12
def __init__(self, message: str, *, status_code: int | None = None) -> None:
    super().__init__(message)
    self.message = message
    self.status_code = status_code

NotFoundError(message, *, status_code=None)

Bases: HCPError

404 — tenant, namespace, or object not found.

Source code in packages/rahcp-client/src/rahcp_client/errors.py
 9
10
11
12
def __init__(self, message: str, *, status_code: int | None = None) -> None:
    super().__init__(message)
    self.message = message
    self.status_code = status_code

ConflictError(message, *, status_code=None)

Bases: HCPError

409 — resource already exists.

Source code in packages/rahcp-client/src/rahcp_client/errors.py
 9
10
11
12
def __init__(self, message: str, *, status_code: int | None = None) -> None:
    super().__init__(message)
    self.message = message
    self.status_code = status_code

RetryableError(message, *, status_code=None)

Bases: HCPError

408, 429, 500, 503, 504 — transient failure after all retries exhausted.

Source code in packages/rahcp-client/src/rahcp_client/errors.py
 9
10
11
12
def __init__(self, message: str, *, status_code: int | None = None) -> None:
    super().__init__(message)
    self.message = message
    self.status_code = status_code

UpstreamError(message, *, status_code=None)

Bases: HCPError

502 — backend's upstream service is unreachable. Not retried.

Source code in packages/rahcp-client/src/rahcp_client/errors.py
 9
10
11
12
def __init__(self, message: str, *, status_code: int | None = None) -> None:
    super().__init__(message)
    self.message = message
    self.status_code = status_code

error_for_status(status_code, message)

Map an HTTP status code to the appropriate HCPError subclass.

Source code in packages/rahcp-client/src/rahcp_client/errors.py
40
41
42
43
def error_for_status(status_code: int, message: str) -> HCPError:
    """Map an HTTP status code to the appropriate HCPError subclass."""
    cls = _STATUS_MAP.get(status_code, HCPError)
    return cls(message, status_code=status_code)

Tracing

tracing

Optional OpenTelemetry tracing — no-op when OTEL is not installed.

Automatically initializes the OTEL SDK when OTEL_EXPORTER_OTLP_ENDPOINT is set (or when configure_tracing() is called explicitly).

Supports both grpc and http/protobuf protocols via the standard OTEL_EXPORTER_OTLP_PROTOCOL env var.

Usage::

from rahcp_client.tracing import tracer

with tracer.start_as_current_span("s3.upload") as span:
    span.set_attribute("s3.bucket", bucket)
    ...

If opentelemetry-api is not installed, tracer is a no-op.

configure_tracing(service_name='rahcp-client', endpoint=None, protocol=None)

Initialize the OTEL SDK with OTLP exporter.

Called automatically if OTEL_EXPORTER_OTLP_ENDPOINT env var is set. Safe to call multiple times — only initializes once.

Parameters:

Name Type Description Default
service_name str

Service name in traces (default: rahcp-client).

'rahcp-client'
endpoint str | None

OTLP endpoint. Defaults to OTEL_EXPORTER_OTLP_ENDPOINT env var.

None
protocol str | None

grpc or http/protobuf. Defaults to OTEL_EXPORTER_OTLP_PROTOCOL env var, then http/protobuf.

None
Source code in packages/rahcp-client/src/rahcp_client/tracing.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def configure_tracing(
    service_name: str = "rahcp-client",
    endpoint: str | None = None,
    protocol: str | None = None,
) -> None:
    """Initialize the OTEL SDK with OTLP exporter.

    Called automatically if ``OTEL_EXPORTER_OTLP_ENDPOINT`` env var is set.
    Safe to call multiple times — only initializes once.

    Args:
        service_name: Service name in traces (default: rahcp-client).
        endpoint: OTLP endpoint. Defaults to ``OTEL_EXPORTER_OTLP_ENDPOINT`` env var.
        protocol: ``grpc`` or ``http/protobuf``. Defaults to
            ``OTEL_EXPORTER_OTLP_PROTOCOL`` env var, then ``http/protobuf``.
    """
    global _initialized  # noqa: PLW0603
    if _initialized or not OTEL_AVAILABLE:
        return

    otlp_endpoint = endpoint or os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", "")
    if not otlp_endpoint:
        return

    otlp_protocol = protocol or os.environ.get(
        "OTEL_EXPORTER_OTLP_PROTOCOL", "http/protobuf"
    )

    resource = Resource.create(
        {
            "service.name": os.environ.get("OTEL_SERVICE_NAME", service_name),
        }
    )
    provider = TracerProvider(resource=resource)

    try:
        if otlp_protocol == "grpc":
            from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
                OTLPSpanExporter,
            )
        else:
            from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
                OTLPSpanExporter,
            )

        exporter = OTLPSpanExporter(endpoint=otlp_endpoint)
        provider.add_span_processor(BatchSpanProcessor(exporter))
        trace.set_tracer_provider(provider)
        _initialized = True
        log.info("OTEL tracing enabled: %s (%s)", otlp_endpoint, otlp_protocol)
    except Exception:
        log.warning("Failed to initialize OTEL exporter", exc_info=True)

rahcp-cli

S3 Commands

s3

S3 CLI subcommands — thin wrappers over rahcp_client.

ls(ctx, bucket=typer.Argument(None), prefix=typer.Option('', '--prefix', '-p', help='Filter by key prefix'), max_keys=typer.Option(100, '--max-keys', '-n', help='Max results per page'), page=typer.Option(None, '--page', help='Continuation token for next page'), delimiter=typer.Option(None, '--delimiter', '-d', help='Group by delimiter (e.g. /)'), filter_key=typer.Option(None, '--filter', '-f', help='Filter keys containing this string'))

List buckets (no args) or objects in a bucket.

Source code in packages/rahcp-cli/src/rahcp_cli/s3.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
@app.command("ls")
def ls(
    ctx: typer.Context,
    bucket: str = typer.Argument(None),
    prefix: str = typer.Option("", "--prefix", "-p", help="Filter by key prefix"),
    max_keys: int = typer.Option(100, "--max-keys", "-n", help="Max results per page"),
    page: str = typer.Option(None, "--page", help="Continuation token for next page"),
    delimiter: str = typer.Option(
        None, "--delimiter", "-d", help="Group by delimiter (e.g. /)"
    ),
    filter_key: str = typer.Option(
        None, "--filter", "-f", help="Filter keys containing this string"
    ),
) -> None:
    """List buckets (no args) or objects in a bucket."""

    async def _run() -> None:
        async with make_client(ctx) as client:
            if bucket is None:
                data = await client.s3.list_buckets()
                if ctx.obj["json"]:
                    print_json(data)
                else:
                    buckets = data.get("buckets") or data.get("Buckets") or []
                    print_table(buckets, title="Buckets")
                return

            data = await client.s3.list_objects(
                bucket,
                prefix,
                max_keys=max_keys,
                continuation_token=page,
                delimiter=delimiter,
            )
            if ctx.obj["json"]:
                print_json(data)
                return

            rows = _format_object_rows(data)
            if filter_key:
                rows = [r for r in rows if filter_key in r.get("Key", "")]
            title = f"s3://{bucket}/{prefix}" if prefix else f"s3://{bucket}"
            print_table(rows, columns=["Key", "Size", "LastModified"], title=title)

            next_token = data.get("next_continuation_token")
            if next_token:
                console.print(
                    f"\n[dim]More results — next page:[/dim]\n"
                    f"  rahcp s3 ls {bucket} --page {next_token}"
                )

    run(_run())

upload(ctx, bucket=typer.Argument(...), key=typer.Argument(...), file=typer.Argument(..., exists=True))

Upload a local file (auto multipart if large).

Source code in packages/rahcp-cli/src/rahcp_cli/s3.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
@app.command()
def upload(
    ctx: typer.Context,
    bucket: str = typer.Argument(...),
    key: str = typer.Argument(...),
    file: Path = typer.Argument(..., exists=True),
) -> None:
    """Upload a local file (auto multipart if large)."""

    async def _run() -> None:
        async with make_client(ctx) as client:
            etag = await client.s3.upload(bucket, key, file)
            console.print(f"[green]Uploaded[/green] s3://{bucket}/{key} (etag: {etag})")

    run(_run())

download(ctx, bucket=typer.Argument(...), key=typer.Argument(...), output=typer.Option(None, '--output', '-o'))

Download an object via presigned URL.

Source code in packages/rahcp-cli/src/rahcp_cli/s3.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
@app.command()
def download(
    ctx: typer.Context,
    bucket: str = typer.Argument(...),
    key: str = typer.Argument(...),
    output: Path = typer.Option(None, "--output", "-o"),
) -> None:
    """Download an object via presigned URL."""
    dest = output or Path(key.split("/")[-1])

    async def _run() -> None:
        async with make_client(ctx) as client:
            size = await client.s3.download(bucket, key, dest)
            console.print(f"[green]Downloaded[/green] {dest} ({size:,} bytes)")

    run(_run())

rm(ctx, bucket=typer.Argument(...), keys=typer.Argument(...))

Delete one or more objects.

Source code in packages/rahcp-cli/src/rahcp_cli/s3.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
@app.command()
def rm(
    ctx: typer.Context,
    bucket: str = typer.Argument(...),
    keys: list[str] = typer.Argument(...),
) -> None:
    """Delete one or more objects."""

    async def _run() -> None:
        async with make_client(ctx) as client:
            if len(keys) == 1:
                await client.s3.delete(bucket, keys[0])
            else:
                await client.s3.delete_bulk(bucket, keys)
            console.print(f"[green]Deleted[/green] {len(keys)} object(s)")

    run(_run())

presign(ctx, bucket=typer.Argument(...), key=typer.Argument(...), expires=typer.Option(3600, '--expires'))

Get a presigned download URL.

Source code in packages/rahcp-cli/src/rahcp_cli/s3.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
@app.command()
def presign(
    ctx: typer.Context,
    bucket: str = typer.Argument(...),
    key: str = typer.Argument(...),
    expires: int = typer.Option(3600, "--expires"),
) -> None:
    """Get a presigned download URL."""

    async def _run() -> None:
        async with make_client(ctx) as client:
            url = await client.s3.presign_get(bucket, key, expires=expires)
            if ctx.obj["json"]:
                print_json({"url": url})
            else:
                console.print(url)

    run(_run())

upload_all(ctx, bucket=typer.Argument(...), source_dir=typer.Argument(..., help='Local directory to upload'), prefix=typer.Option('', '--prefix', '-p', help='Key prefix to prepend'), workers=typer.Option(0, '--workers', '-w', help='Concurrent workers (0 = use config)'), skip_existing=typer.Option(True, '--skip-existing/--overwrite', help='Skip files that already exist with matching size'), retry_errors=typer.Option(False, '--retry-errors', help='Only retry previously failed files'), include=typer.Option([], '--include', '-I', help="Only upload files matching these glob patterns (e.g. '*.jpg')"), exclude=typer.Option([], '--exclude', '-E', help="Skip files matching these glob patterns (e.g. '*.tmp')"), validate=typer.Option(False, '--validate', help='Validate each file before upload (auto-detects format by extension)'), verify=typer.Option(False, '--verify', help='Verify each upload by checking remote size after transfer'), tracker_db=typer.Option(None, '--tracker-db', help='Tracker DB path (overrides prefix and default)'), tracker_prefix=typer.Option(None, '--tracker-prefix', help="Prefix for tracker DB name (e.g. 'andraarkiv' → andraarkiv.upload-tracker.db)"), presign_batch_size=typer.Option(0, '--presign-batch-size', help='Presign URLs in batches of this size (0 = use config default)'))

Upload a directory to S3 with tracked resume and parallel workers.

Source code in packages/rahcp-cli/src/rahcp_cli/s3.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
@app.command("upload-all")
def upload_all(
    ctx: typer.Context,
    bucket: str = typer.Argument(...),
    source_dir: str = typer.Argument(..., help="Local directory to upload"),
    prefix: str = typer.Option("", "--prefix", "-p", help="Key prefix to prepend"),
    workers: int = typer.Option(0, "--workers", "-w", help="Concurrent workers (0 = use config)"),
    skip_existing: bool = typer.Option(
        True,
        "--skip-existing/--overwrite",
        help="Skip files that already exist with matching size",
    ),
    retry_errors: bool = typer.Option(
        False, "--retry-errors", help="Only retry previously failed files"
    ),
    include: list[str] = typer.Option(
        [],
        "--include",
        "-I",
        help="Only upload files matching these glob patterns (e.g. '*.jpg')",
    ),
    exclude: list[str] = typer.Option(
        [],
        "--exclude",
        "-E",
        help="Skip files matching these glob patterns (e.g. '*.tmp')",
    ),
    validate: bool = typer.Option(
        False,
        "--validate",
        help="Validate each file before upload (auto-detects format by extension)",
    ),
    verify: bool = typer.Option(
        False,
        "--verify",
        help="Verify each upload by checking remote size after transfer",
    ),
    tracker_db: str | None = typer.Option(
        None,
        "--tracker-db",
        help="Tracker DB path (overrides prefix and default)",
    ),
    tracker_prefix: str | None = typer.Option(
        None,
        "--tracker-prefix",
        help="Prefix for tracker DB name (e.g. 'andraarkiv' → andraarkiv.upload-tracker.db)",
    ),
    presign_batch_size: int = typer.Option(
        0,
        "--presign-batch-size",
        help="Presign URLs in batches of this size (0 = use config default)",
    ),
) -> None:
    """Upload a directory to S3 with tracked resume and parallel workers."""

    async def _run() -> None:
        from rahcp_client.bulk import BulkUploadConfig, bulk_upload

        src = Path(source_dir)
        if not src.is_dir():
            console.print(f"[red]Not a directory: {src}[/red]")
            raise SystemExit(1)

        validate_fn = _get_validator() if validate else None

        tracker, db_path = _resolve_tracker(
            ctx, tracker_db, ".upload-tracker.db", prefix=tracker_prefix
        )
        effective_workers = _resolve_workers(workers, ctx)

        console.print(f"Tracker: {db_path}{len(tracker.done_keys())} already done")
        flags = []
        if include:
            flags.append(f"include={include}")
        if exclude:
            flags.append(f"exclude={exclude}")
        if validate:
            flags.append("validate")
        if verify:
            flags.append("verify")
        flag_str = f" [{', '.join(flags)}]" if flags else ""
        console.print(
            f"Uploading {src}/ → s3://{bucket}/{prefix} ({effective_workers} workers){flag_str}"
        )

        async with make_client(ctx) as client:
            stats = await bulk_upload(
                BulkUploadConfig(
                    client=client,
                    bucket=bucket,
                    source_dir=src,
                    tracker=tracker,
                    prefix=prefix,
                    workers=effective_workers,
                    queue_depth=ctx.obj.get("bulk_queue_depth", 8),
                    skip_existing=skip_existing,
                    retry_errors=retry_errors,
                    include=include,
                    exclude=exclude,
                    validate_file=validate_fn,
                    verify_upload=verify,
                    presign_batch_size=presign_batch_size
                    or ctx.obj.get("bulk_presign_batch_size", 200),
                    chunk_size=ctx.obj.get("bulk_chunk_size", 4 * 1024 * 1024),
                    progress_interval=ctx.obj.get("bulk_progress_interval", 5.0),
                    on_progress=_print_progress,
                    on_error=_print_error,
                )
            )

        tracker.close()
        _print_summary("Uploaded", stats, db_path)

    run(_run())

download_all(ctx, bucket=typer.Argument(...), prefix=typer.Option('', '--prefix', '-p', help='Only download keys under this prefix'), dest_dir=typer.Option('.', '--output', '-o', help='Local destination directory'), workers=typer.Option(0, '--workers', '-w', help='Concurrent workers (0 = use config)'), retry_errors=typer.Option(False, '--retry-errors', help='Only retry previously failed files'), include=typer.Option([], '--include', '-I', help="Only download keys matching these glob patterns (e.g. '*.jpg')"), exclude=typer.Option([], '--exclude', '-E', help="Skip keys matching these glob patterns (e.g. '*.tmp')"), validate=typer.Option(False, '--validate', help='Validate each file after download (auto-detects format by extension)'), verify=typer.Option(False, '--verify', help='Verify each download by checking file size after transfer'), tracker_db=typer.Option(None, '--tracker-db', help='Tracker DB path (overrides prefix and default)'), tracker_prefix=typer.Option(None, '--tracker-prefix', help="Prefix for tracker DB name (e.g. 'backup' → backup.download-tracker.db)"), presign_batch_size=typer.Option(0, '--presign-batch-size', help='Presign URLs in batches of this size (0 = use config default)'))

Download a bucket to a local directory with tracked resume and parallel workers.

Source code in packages/rahcp-cli/src/rahcp_cli/s3.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
@app.command("download-all")
def download_all(
    ctx: typer.Context,
    bucket: str = typer.Argument(...),
    prefix: str = typer.Option(
        "", "--prefix", "-p", help="Only download keys under this prefix"
    ),
    dest_dir: str = typer.Option(
        ".", "--output", "-o", help="Local destination directory"
    ),
    workers: int = typer.Option(0, "--workers", "-w", help="Concurrent workers (0 = use config)"),
    retry_errors: bool = typer.Option(
        False, "--retry-errors", help="Only retry previously failed files"
    ),
    include: list[str] = typer.Option(
        [],
        "--include",
        "-I",
        help="Only download keys matching these glob patterns (e.g. '*.jpg')",
    ),
    exclude: list[str] = typer.Option(
        [],
        "--exclude",
        "-E",
        help="Skip keys matching these glob patterns (e.g. '*.tmp')",
    ),
    validate: bool = typer.Option(
        False,
        "--validate",
        help="Validate each file after download (auto-detects format by extension)",
    ),
    verify: bool = typer.Option(
        False,
        "--verify",
        help="Verify each download by checking file size after transfer",
    ),
    tracker_db: str | None = typer.Option(
        None,
        "--tracker-db",
        help="Tracker DB path (overrides prefix and default)",
    ),
    tracker_prefix: str | None = typer.Option(
        None,
        "--tracker-prefix",
        help="Prefix for tracker DB name (e.g. 'backup' → backup.download-tracker.db)",
    ),
    presign_batch_size: int = typer.Option(
        0,
        "--presign-batch-size",
        help="Presign URLs in batches of this size (0 = use config default)",
    ),
) -> None:
    """Download a bucket to a local directory with tracked resume and parallel workers."""

    async def _run() -> None:
        from rahcp_client.bulk import BulkDownloadConfig, bulk_download

        dest = Path(dest_dir)
        validate_fn = _get_validator() if validate else None
        tracker, db_path = _resolve_tracker(
            ctx, tracker_db, ".download-tracker.db", prefix=tracker_prefix
        )
        effective_workers = _resolve_workers(workers, ctx)

        console.print(f"Tracker: {db_path}{len(tracker.done_keys())} already done")
        flags = []
        if include:
            flags.append(f"include={include}")
        if exclude:
            flags.append(f"exclude={exclude}")
        if validate:
            flags.append("validate")
        if verify:
            flags.append("verify")
        flag_str = f" [{', '.join(flags)}]" if flags else ""
        console.print(
            f"Downloading s3://{bucket}/{prefix}{dest}/ ({effective_workers} workers){flag_str}"
        )

        async with make_client(ctx) as client:
            stats = await bulk_download(
                BulkDownloadConfig(
                    client=client,
                    bucket=bucket,
                    dest_dir=dest,
                    tracker=tracker,
                    prefix=prefix,
                    workers=effective_workers,
                    queue_depth=ctx.obj.get("bulk_queue_depth", 8),
                    retry_errors=retry_errors,
                    include=include,
                    exclude=exclude,
                    validate_file=validate_fn,
                    verify_download=verify,
                    presign_batch_size=presign_batch_size
                    or ctx.obj.get("bulk_presign_batch_size", 200),
                    chunk_size=ctx.obj.get("bulk_chunk_size", 4 * 1024 * 1024),
                    stream_threshold=ctx.obj.get(
                        "bulk_stream_threshold", 100 * 1024 * 1024
                    ),
                    progress_interval=ctx.obj.get("bulk_progress_interval", 5.0),
                    on_progress=_print_progress,
                    on_error=_print_error,
                )
            )

        tracker.close()
        _print_summary("Downloaded", stats, db_path)

    run(_run())

verify(ctx, bucket=typer.Argument(...), source_dir=typer.Argument(..., help='Local directory to compare against'), prefix=typer.Option('', '--prefix', '-p', help='Key prefix (same as upload-all)'))

Verify all local files exist in the bucket with matching sizes.

Source code in packages/rahcp-cli/src/rahcp_cli/s3.py
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
@app.command()
def verify(
    ctx: typer.Context,
    bucket: str = typer.Argument(...),
    source_dir: str = typer.Argument(..., help="Local directory to compare against"),
    prefix: str = typer.Option(
        "", "--prefix", "-p", help="Key prefix (same as upload-all)"
    ),
) -> None:
    """Verify all local files exist in the bucket with matching sizes."""

    async def _run() -> None:
        src = Path(source_dir)
        if not src.is_dir():
            console.print(f"[red]Not a directory: {src}[/red]")
            raise SystemExit(1)

        files = [f for f in src.rglob("*") if f.is_file()]
        if not files:
            console.print("[dim]No files found.[/dim]")
            return

        console.print(f"Listing remote objects in s3://{bucket}/{prefix}...")
        async with make_client(ctx) as client:
            remote = await _list_all_remote_objects(client, bucket, prefix)

        ok = 0
        missing: list[str] = []
        size_mismatch: list[tuple[str, int, int]] = []

        for f in files:
            key = _build_key(prefix, f.relative_to(src))
            local_size = f.stat().st_size
            if key not in remote:
                missing.append(key)
            elif remote[key] != local_size:
                size_mismatch.append((key, local_size, remote[key]))
            else:
                ok += 1

        console.print(
            f"\n[bold]Verification:[/bold] {len(files)} local, {len(remote)} remote\n"
        )
        console.print(f"  [green]{ok} OK[/green]")

        if missing:
            console.print(f"  [red]{len(missing)} MISSING[/red]:")
            for key in missing[:20]:
                console.print(f"    {key}")
            if len(missing) > 20:
                console.print(f"    ... and {len(missing) - 20} more")

        if size_mismatch:
            console.print(f"  [yellow]{len(size_mismatch)} SIZE MISMATCH[/yellow]:")
            for key, local, remote_size in size_mismatch[:10]:
                console.print(
                    f"    {key}: local={_human_size(local)}, remote={_human_size(remote_size)}"
                )

        if missing or size_mismatch:
            raise SystemExit(1)
        console.print("\n  [bold green]All files verified.[/bold green]")

    run(_run())

Auth Commands

auth

Auth commands — whoami.

whoami(ctx)

Show current user info by decoding the JWT token.

Source code in packages/rahcp-cli/src/rahcp_cli/auth.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@app.command()
def whoami(ctx: typer.Context) -> None:
    """Show current user info by decoding the JWT token."""

    async def _whoami() -> None:
        async with make_client(ctx) as client:
            token = client.token
            if not token:
                console.print("[red]Not authenticated. Check your config.[/red]")
                raise typer.Exit(1)
            try:
                payload_b64 = token.split(".")[1]
                payload_b64 += "=" * (4 - len(payload_b64) % 4)
                payload = json.loads(base64.urlsafe_b64decode(payload_b64))
            except (IndexError, json.JSONDecodeError, Exception) as exc:
                console.print(f"[red]Invalid token format:[/red] {exc}")
                raise typer.Exit(1) from exc
            if ctx.obj["json"]:
                print_json(payload)
            else:
                console.print(f"User: [bold]{payload.get('sub', '?')}[/bold]")
                console.print(f"Tenant: {payload.get('tenant', '(system)')}")

    run(_whoami())

Namespace Commands

namespace

Namespace subcommands.

list_namespaces(ctx, tenant=typer.Argument(...), verbose=typer.Option(False, '--verbose', '-v'))

List namespaces for a tenant.

Source code in packages/rahcp-cli/src/rahcp_cli/namespace.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@app.command("list")
def list_namespaces(
    ctx: typer.Context,
    tenant: str = typer.Argument(...),
    verbose: bool = typer.Option(False, "--verbose", "-v"),
) -> None:
    """List namespaces for a tenant."""

    async def _run() -> None:
        async with make_client(ctx) as client:
            data = await client.mapi.list_namespaces(tenant, verbose=verbose)
            if ctx.obj["json"]:
                print_json(data)
            else:
                if isinstance(data, dict) and "name" in data:
                    names = data["name"]
                    rows = (
                        [{"name": n} for n in names] if isinstance(names, list) else []
                    )
                elif isinstance(data, list):
                    rows = data
                else:
                    rows = []
                print_table(rows, title=f"Namespaces ({tenant})")

    run(_run())

get_namespace(ctx, tenant=typer.Argument(...), ns=typer.Argument(...), verbose=typer.Option(False, '--verbose', '-v'))

Get namespace details.

Source code in packages/rahcp-cli/src/rahcp_cli/namespace.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@app.command("get")
def get_namespace(
    ctx: typer.Context,
    tenant: str = typer.Argument(...),
    ns: str = typer.Argument(...),
    verbose: bool = typer.Option(False, "--verbose", "-v"),
) -> None:
    """Get namespace details."""

    async def _run() -> None:
        async with make_client(ctx) as client:
            data = await client.mapi.get_namespace(tenant, ns, verbose=verbose)
            print_json(data)

    run(_run())

create_namespace(ctx, tenant=typer.Argument(...), name=typer.Option(..., '--name'), quota=typer.Option(None, '--quota'))

Create a namespace.

Source code in packages/rahcp-cli/src/rahcp_cli/namespace.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@app.command("create")
def create_namespace(
    ctx: typer.Context,
    tenant: str = typer.Argument(...),
    name: str = typer.Option(..., "--name"),
    quota: str = typer.Option(None, "--quota"),
) -> None:
    """Create a namespace."""
    ns_data: dict[str, Any] = {"name": name}
    if quota:
        ns_data["hardQuota"] = quota

    async def _run() -> None:
        async with make_client(ctx) as client:
            result = await client.mapi.create_namespace(tenant, ns_data)
            if ctx.obj["json"]:
                print_json(result)
            else:
                console.print(f"[green]Created namespace[/green] {name}")

    run(_run())

delete_namespace(ctx, tenant=typer.Argument(...), ns=typer.Argument(...))

Delete a namespace.

Source code in packages/rahcp-cli/src/rahcp_cli/namespace.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
@app.command("delete")
def delete_namespace(
    ctx: typer.Context,
    tenant: str = typer.Argument(...),
    ns: str = typer.Argument(...),
) -> None:
    """Delete a namespace."""

    async def _run() -> None:
        async with make_client(ctx) as client:
            await client.mapi.delete_namespace(tenant, ns)
            console.print(f"[green]Deleted namespace[/green] {ns}")

    run(_run())

import_namespace(ctx, tenant=typer.Argument(...), file=typer.Argument(..., exists=True, help='Exported template JSON file'))

Create namespace(s) from an exported template file.

Examples:

rahcp ns export dev-ai my-ns > template.json rahcp ns import prod-tenant template.json

Source code in packages/rahcp-cli/src/rahcp_cli/namespace.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
@app.command("import")
def import_namespace(
    ctx: typer.Context,
    tenant: str = typer.Argument(...),
    file: Path = typer.Argument(..., exists=True, help="Exported template JSON file"),
) -> None:
    """Create namespace(s) from an exported template file.

    Examples:
        rahcp ns export dev-ai my-ns > template.json
        rahcp ns import prod-tenant template.json
    """

    async def _run() -> None:
        template = json.loads(file.read_text())
        namespaces = template.get("namespaces", [])
        if not namespaces:
            console.print("[red]No namespaces found in template[/red]")
            raise typer.Exit(1)
        async with make_client(ctx) as client:
            for ns_data in namespaces:
                name = ns_data.get("name", "?")
                result = await client.mapi.create_namespace(tenant, ns_data)
                if ctx.obj["json"]:
                    print_json(result)
                else:
                    console.print(f"[green]Created namespace[/green] {name}")

    run(_run())

export_namespace(ctx, tenant=typer.Argument(...), ns=typer.Argument(...), output=typer.Option(None, '--output', '-o'))

Export namespace as a reusable template.

Source code in packages/rahcp-cli/src/rahcp_cli/namespace.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
@app.command("export")
def export_namespace(
    ctx: typer.Context,
    tenant: str = typer.Argument(...),
    ns: str = typer.Argument(...),
    output: str = typer.Option(None, "--output", "-o"),
) -> None:
    """Export namespace as a reusable template."""

    async def _run() -> None:
        async with make_client(ctx) as client:
            data = await client.mapi.export_namespace(tenant, ns)
            text = json.dumps(data, indent=2)
            if output:
                with open(output, "w") as f:
                    f.write(text)
                console.print(f"[green]Exported to[/green] {output}")
            else:
                sys.stdout.write(text + "\n")

    run(_run())

Configuration

config

YAML config file with named profiles.

Config file: ~/.rahcp/config.yaml (or --config / RAHCP_CONFIG)

Example::

default: dev

profiles:
  dev:
    endpoint: http://localhost:8000/api/v1
    username: admin
    password: secret
    tenant: dev-ai
    verify_ssl: false
  prod:
    endpoint: http://localhost:8000/api/v1
    username: prod-user
    password: secret
    tenant: prod-archive

Profile

Bases: BaseModel

A named connection profile.

CLIConfig

Bases: BaseModel

Config with named profiles.

resolve(name=None)

Resolve a profile by name, falling back to default.

Source code in packages/rahcp-cli/src/rahcp_cli/config.py
83
84
85
86
87
88
89
90
def resolve(self, name: str | None = None) -> Profile:
    """Resolve a profile by name, falling back to default."""
    key = name or self.default
    if key and key in self.profiles:
        return self.profiles[key]
    if len(self.profiles) == 1:
        return next(iter(self.profiles.values()))
    return Profile()

load_config(path=None)

Load config from a YAML file.

Resolution: explicit path > RAHCP_CONFIG env > ~/.rahcp/config.yaml

Source code in packages/rahcp-cli/src/rahcp_cli/config.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def load_config(path: str | None = None) -> CLIConfig:
    """Load config from a YAML file.

    Resolution: explicit path > RAHCP_CONFIG env > ~/.rahcp/config.yaml
    """
    config_path = Path(path) if path else CONFIG_PATH
    if not config_path.exists():
        return CLIConfig()
    try:
        raw = yaml.safe_load(config_path.read_text()) or {}
    except Exception:
        log.warning("Failed to parse config file: %s", config_path)
        return CLIConfig()

    # Multi-profile format
    if "profiles" in raw:
        return CLIConfig(
            default=raw.get("default", ""),
            profiles={
                name: Profile(**vals)
                for name, vals in raw["profiles"].items()
                if isinstance(vals, dict)
            },
        )

    # Flat format (backwards compat) — single "default" profile
    return CLIConfig(
        default="default",
        profiles={"default": Profile(**raw)},
    )

rahcp-lance

LanceDataset

LanceDataset(client, bucket, prefix='')

Manage Lance datasets stored on HCP S3.

Uses presigned S3 credentials from the HCP backend to connect lancedb to the remote storage.

Source code in packages/rahcp-lance/src/rahcp_lance/dataset.py
26
27
28
29
30
def __init__(self, client: HCPClient, bucket: str, prefix: str = "") -> None:
    self._client = client
    self._bucket = bucket
    self._prefix = prefix.strip("/")
    self._db: lancedb.DBConnection | None = None

list_tables() async

List all tables in the dataset.

Source code in packages/rahcp-lance/src/rahcp_lance/dataset.py
43
44
45
46
async def list_tables(self) -> list[str]:
    """List all tables in the dataset."""
    db = await self._ensure_db()
    return await db.table_names()

open(table_name) async

Open an existing table.

Source code in packages/rahcp-lance/src/rahcp_lance/dataset.py
48
49
50
51
async def open(self, table_name: str) -> lancedb.table.AsyncTable:
    """Open an existing table."""
    db = await self._ensure_db()
    return await db.open_table(table_name)

create(table_name, schema=None, data=None) async

Create a new table with schema and optional initial data.

Source code in packages/rahcp-lance/src/rahcp_lance/dataset.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
async def create(
    self,
    table_name: str,
    schema: pa.Schema | None = None,
    data: Any = None,
) -> lancedb.table.AsyncTable:
    """Create a new table with schema and optional initial data."""
    db = await self._ensure_db()
    if data is not None:
        table = await db.create_table(table_name, data=data)
    elif schema is not None:
        table = await db.create_table(table_name, schema=schema)
    else:
        msg = "Either schema or data must be provided"
        raise ValueError(msg)
    log.info("Created table %s in %s/%s", table_name, self._bucket, self._prefix)
    return table

drop(table_name) async

Drop a table.

Source code in packages/rahcp-lance/src/rahcp_lance/dataset.py
71
72
73
74
75
async def drop(self, table_name: str) -> None:
    """Drop a table."""
    db = await self._ensure_db()
    await db.drop_table(table_name)
    log.info("Dropped table %s", table_name)

table_info(table_name) async

Get metadata about a table.

Source code in packages/rahcp-lance/src/rahcp_lance/dataset.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
async def table_info(self, table_name: str) -> TableInfo:
    """Get metadata about a table."""
    table = await self.open(table_name)
    schema = await table.schema()
    return TableInfo(
        name=table_name,
        num_rows=await table.count_rows(),
        schema_fields=[
            FieldInfo(
                name=field.name,
                dtype=str(field.type),
                nullable=field.nullable,
            )
            for field in schema
        ],
    )

ingest(table_name, data) async

Add data to an existing table. Returns ingest summary.

Source code in packages/rahcp-lance/src/rahcp_lance/dataset.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
async def ingest(
    self,
    table_name: str,
    data: pa.RecordBatch | pa.Table | list[dict[str, Any]],
) -> IngestResult:
    """Add data to an existing table. Returns ingest summary."""
    table = await self.open(table_name)
    if isinstance(data, list):
        rows_added = len(data)
    elif isinstance(data, pa.RecordBatch):
        rows_added = data.num_rows
    else:
        rows_added = data.num_rows
    await table.add(data)
    total = await table.count_rows()
    return IngestResult(
        table=table_name,
        rows_added=rows_added,
        total_rows=total,
    )

Query Helpers

query

Read helpers — scan, take, vector search.

scan(table, params=None) async

Scan a table with optional projection, filter, and pagination.

Source code in packages/rahcp-lance/src/rahcp_lance/query.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
async def scan(
    table: lancedb.table.AsyncTable,
    params: ScanParams | None = None,
) -> pa.Table:
    """Scan a table with optional projection, filter, and pagination."""
    p = params or ScanParams()
    query = table.query()
    if p.columns:
        query = query.select(p.columns)
    if p.filter:
        query = query.where(p.filter)
    if p.limit:
        query = query.limit(p.limit)
    if p.offset:
        query = query.offset(p.offset)
    return await query.to_arrow()

take(table, indices) async

Take specific rows by index.

Source code in packages/rahcp-lance/src/rahcp_lance/query.py
30
31
32
33
34
35
36
37
38
39
async def take(
    table: lancedb.table.AsyncTable,
    indices: list[int],
) -> pa.Table:
    """Take specific rows by index."""
    return (
        await table.query()
        .where(f"_rowid IN ({','.join(str(i) for i in indices)})")
        .to_arrow()
    )

Perform vector similarity search.

Source code in packages/rahcp-lance/src/rahcp_lance/query.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
async def vector_search(
    table: lancedb.table.AsyncTable,
    params: VectorSearchParams,
) -> list[SearchResult]:
    """Perform vector similarity search."""
    query = (
        table.query().nearest_to(params.vector).column(params.column).limit(params.k)
    )
    if params.filter:
        query = query.where(params.filter)
    if params.columns:
        query = query.select(params.columns)
    result = await query.to_arrow()
    rows: list[SearchResult] = []
    for batch in result.to_batches():
        for row in batch.to_pylist():
            distance = row.pop("_distance", 0.0)
            rows.append(SearchResult(data=row, distance=distance))
    return rows

Schemas

schemas

Pydantic schemas for Lance dataset operations.

TableInfo

Bases: BaseModel

Metadata about a Lance table.

FieldInfo

Bases: BaseModel

A single field in a Lance table schema.

IngestResult

Bases: BaseModel

Result of a batch ingest operation.

ScanParams

Bases: BaseModel

Parameters for scanning a Lance table.

VectorSearchParams

Bases: BaseModel

Parameters for vector similarity search.

SearchResult

Bases: BaseModel

A single vector search result row.


rahcp-etl

Pipeline

pipeline

Pipeline DAG — stage registration, retry policy, checkpoint resume.

Stage(name, handler, retries=3, backoff=2.0) dataclass

A single pipeline stage with retry configuration.

Pipeline(checkpoint_store=None)

Composable ETL pipeline with retry policy per stage.

Register stages with the @pipeline.stage() decorator, then run with pipeline.run(payload). Each stage receives the output of the previous stage. Checkpoints are saved after each successful stage so the pipeline can resume on failure.

Example::

pipeline = Pipeline()

@pipeline.stage("extract", retries=3)
async def extract(payload):
    ...
    return {"records": [...]}

@pipeline.stage("transform")
async def transform(payload):
    ...
    return {"transformed": [...]}

result = await pipeline.run({"source": "s3://..."})
Source code in packages/rahcp-etl/src/rahcp_etl/pipeline.py
52
53
54
def __init__(self, checkpoint_store: CheckpointStore | None = None) -> None:
    self._stages: list[Stage] = []
    self._checkpoint_store = checkpoint_store

stage(name, *, retries=3, backoff=2.0)

Decorator to register a pipeline stage.

Source code in packages/rahcp-etl/src/rahcp_etl/pipeline.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def stage(
    self,
    name: str,
    *,
    retries: int = 3,
    backoff: float = 2.0,
) -> Callable:
    """Decorator to register a pipeline stage."""

    def decorator(
        fn: Callable[..., Coroutine[Any, Any, dict[str, Any]]],
    ) -> Callable[..., Coroutine[Any, Any, dict[str, Any]]]:
        self._stages.append(
            Stage(name=name, handler=fn, retries=retries, backoff=backoff)
        )
        return fn

    return decorator

run(payload, *, pipeline_id=None) async

Execute all stages in order with retry and checkpointing.

Source code in packages/rahcp-etl/src/rahcp_etl/pipeline.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
async def run(
    self,
    payload: dict[str, Any],
    *,
    pipeline_id: str | None = None,
) -> dict[str, Any]:
    """Execute all stages in order with retry and checkpointing."""
    state = dict(payload)
    start_index = 0

    # Resume from checkpoint if available
    if pipeline_id and self._checkpoint_store:
        checkpoint = await self._checkpoint_store.load(pipeline_id)
        if checkpoint:
            completed_stage = checkpoint["stage"]
            state = checkpoint["state"]
            for i, s in enumerate(self._stages):
                if s.name == completed_stage:
                    start_index = i + 1
                    break
            log.info(
                "Resuming pipeline %s from stage %d (%s)",
                pipeline_id,
                start_index,
                completed_stage,
            )

    for stage in self._stages[start_index:]:
        state = await self._run_stage(stage, state)
        if pipeline_id and self._checkpoint_store:
            await self._checkpoint_store.save(pipeline_id, stage.name, state)

    # Clear checkpoint on success
    if pipeline_id and self._checkpoint_store:
        await self._checkpoint_store.clear(pipeline_id)

    return state

Consumer

consumer

JetStream durable consumer for ETL work items.

ETLConsumer(nats_url, stream, subject, durable, *, max_deliver=5, ack_wait=30.0)

Durable JetStream consumer with auto-reconnect.

Connects to NATS, binds a durable consumer on the given stream/subject, and dispatches messages to the handler. Acks on success, naks on failure.

Source code in packages/rahcp-etl/src/rahcp_etl/consumer.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(
    self,
    nats_url: str,
    stream: str,
    subject: str,
    durable: str,
    *,
    max_deliver: int = 5,
    ack_wait: float = 30.0,
) -> None:
    self.nats_url = nats_url
    self.stream = stream
    self.subject = subject
    self.durable = durable
    self.max_deliver = max_deliver
    self.ack_wait = ack_wait
    self._nc: nats.NATS | None = None
    self._sub: Any = None
    self._running = False

start(handler) async

Connect and start consuming messages.

handler receives the message payload as bytes and should return a dict (success) or raise an exception (triggers nak/retry).

Source code in packages/rahcp-etl/src/rahcp_etl/consumer.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
async def start(self, handler: Callable) -> None:
    """Connect and start consuming messages.

    ``handler`` receives the message payload as bytes and should
    return a dict (success) or raise an exception (triggers nak/retry).
    """
    self._nc = await nats.connect(
        self.nats_url,
        reconnected_cb=self._on_reconnect,
        disconnected_cb=self._on_disconnect,
    )
    js = self._nc.jetstream()

    config = ConsumerConfig(
        durable_name=self.durable,
        deliver_policy=DeliverPolicy.ALL,
        max_deliver=self.max_deliver,
        ack_wait=self.ack_wait,
    )

    self._sub = await js.subscribe(
        self.subject,
        stream=self.stream,
        config=config,
    )
    self._running = True
    log.info(
        "ETL consumer started: stream=%s subject=%s durable=%s",
        self.stream,
        self.subject,
        self.durable,
    )

    async for msg in self._sub.messages:
        if not self._running:
            break
        try:
            await handler(msg.data)
            await msg.ack()
        except Exception:
            log.exception("Handler failed for message on %s", msg.subject)
            await msg.nak()

stop() async

Stop consuming and disconnect.

Source code in packages/rahcp-etl/src/rahcp_etl/consumer.py
85
86
87
88
89
90
91
92
async def stop(self) -> None:
    """Stop consuming and disconnect."""
    self._running = False
    if self._sub:
        await self._sub.unsubscribe()
    if self._nc:
        await self._nc.close()
    log.info("ETL consumer stopped")

Checkpointing

checkpointing

KV-backed checkpoint storage for pipeline state.

CheckpointStore(kv)

KV-backed progress checkpoints using NATS JetStream KV.

Each pipeline run gets a key: {pipeline_id} with value containing the last completed stage and its output state. On failure, the pipeline can resume from the last checkpoint instead of restarting.

Source code in packages/rahcp-etl/src/rahcp_etl/checkpointing.py
23
24
def __init__(self, kv: KeyValue) -> None:
    self._kv = kv

create(nc, bucket='etl-checkpoints') async classmethod

Create or bind to a KV bucket.

Source code in packages/rahcp-etl/src/rahcp_etl/checkpointing.py
26
27
28
29
30
31
32
33
34
35
@classmethod
async def create(
    cls,
    nc: nats.NATS,
    bucket: str = "etl-checkpoints",
) -> CheckpointStore:
    """Create or bind to a KV bucket."""
    js = nc.jetstream()
    kv = await js.create_key_value(bucket=bucket)
    return cls(kv)

save(pipeline_id, stage, state) async

Save a checkpoint after a successful stage.

Source code in packages/rahcp-etl/src/rahcp_etl/checkpointing.py
37
38
39
40
41
42
43
44
45
46
async def save(
    self,
    pipeline_id: str,
    stage: str,
    state: dict[str, Any],
) -> None:
    """Save a checkpoint after a successful stage."""
    payload = json.dumps({"stage": stage, "state": state}).encode()
    await self._kv.put(pipeline_id, payload)
    log.debug("Checkpoint saved: %s @ %s", pipeline_id, stage)

load(pipeline_id) async

Load the last checkpoint for a pipeline run.

Returns {"stage": str, "state": dict} or None if no checkpoint exists.

Source code in packages/rahcp-etl/src/rahcp_etl/checkpointing.py
48
49
50
51
52
53
54
55
56
57
58
async def load(self, pipeline_id: str) -> dict[str, Any] | None:
    """Load the last checkpoint for a pipeline run.

    Returns ``{"stage": str, "state": dict}`` or ``None`` if no
    checkpoint exists.
    """
    try:
        entry = await self._kv.get(pipeline_id)
        return json.loads(entry.value)
    except Exception:
        return None

clear(pipeline_id) async

Clear the checkpoint for a pipeline run.

Source code in packages/rahcp-etl/src/rahcp_etl/checkpointing.py
60
61
62
63
64
65
async def clear(self, pipeline_id: str) -> None:
    """Clear the checkpoint for a pipeline run."""
    try:
        await self._kv.delete(pipeline_id)
    except Exception:
        pass

Dead Letter Queue

dlq

Dead-letter queue handler — route failed messages for inspection/replay.

DeadLetterHandler(js)

Route permanently-failed messages to a DLQ stream for inspection/replay.

Messages in the DLQ include the original payload, the error message, and the original subject for targeted replay.

Source code in packages/rahcp-etl/src/rahcp_etl/dlq.py
24
25
def __init__(self, js: JetStreamContext) -> None:
    self._js = js

create(nc) async classmethod

Create the DLQ handler, ensuring the DLQ stream exists.

Source code in packages/rahcp-etl/src/rahcp_etl/dlq.py
27
28
29
30
31
32
@classmethod
async def create(cls, nc: nats.NATS) -> DeadLetterHandler:
    """Create the DLQ handler, ensuring the DLQ stream exists."""
    js = nc.jetstream()
    await js.find_stream_name_by_subject(DLQ_SUBJECT)
    return cls(js)

send(subject, payload, error) async

Send a failed message to the DLQ.

Source code in packages/rahcp-etl/src/rahcp_etl/dlq.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
async def send(
    self,
    subject: str,
    payload: bytes,
    error: str,
) -> None:
    """Send a failed message to the DLQ."""
    import json

    dlq_payload = json.dumps(
        {
            "original_subject": subject,
            "payload": payload.decode("utf-8", errors="replace"),
            "error": error,
        }
    ).encode()
    dlq_subject = f"etl.dlq.{subject}"
    await self._js.publish(dlq_subject, dlq_payload)
    log.warning("Message sent to DLQ: %s%s", subject, error)

replay(*, filter_subject=None) async

Replay DLQ messages back to their original subjects.

Returns the number of messages replayed.

Source code in packages/rahcp-etl/src/rahcp_etl/dlq.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
async def replay(self, *, filter_subject: str | None = None) -> int:
    """Replay DLQ messages back to their original subjects.

    Returns the number of messages replayed.
    """
    import json

    sub_subject = f"etl.dlq.{filter_subject}" if filter_subject else DLQ_SUBJECT
    sub = await self._js.subscribe(
        sub_subject, stream=DLQ_STREAM, ordered_consumer=True
    )

    count = 0
    async for msg in sub.messages:
        try:
            data = json.loads(msg.data)
            original_subject = data["original_subject"]
            original_payload = data["payload"].encode()
            await self._js.publish(original_subject, original_payload)
            count += 1
        except Exception:
            log.exception("Failed to replay DLQ message")
        # Stop after draining existing messages
        if msg.pending == 0:
            break

    await sub.unsubscribe()
    log.info("Replayed %d DLQ messages", count)
    return count

purge(*, older_than=None) async

Purge DLQ messages. Returns count purged.

Source code in packages/rahcp-etl/src/rahcp_etl/dlq.py
84
85
86
87
88
89
90
async def purge(self, *, older_than: timedelta | None = None) -> int:
    """Purge DLQ messages. Returns count purged."""
    info = await self._js.stream_info(DLQ_STREAM)
    count = info.state.messages
    await self._js.purge_stream(DLQ_STREAM)
    log.info("Purged %d DLQ messages", count)
    return count

rahcp-validate

Image Validation

images

Image integrity checks — TIFF and JPEG corruption detection.

ValidationError(path, reason)

Bases: Exception

Raised when a file fails validation.

Source code in packages/rahcp-validate/src/rahcp_validate/images.py
14
15
16
17
def __init__(self, path: Path, reason: str) -> None:
    super().__init__(f"{path}: {reason}")
    self.path = path
    self.reason = reason

validate_tiff(path)

Verify a TIFF file is not corrupt and meets basic requirements.

Checks magic bytes, IFD structure, and that Pillow can fully load it. Raises ValidationError on failure.

Source code in packages/rahcp-validate/src/rahcp_validate/images.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def validate_tiff(path: Path) -> None:
    """Verify a TIFF file is not corrupt and meets basic requirements.

    Checks magic bytes, IFD structure, and that Pillow can fully load it.
    Raises ``ValidationError`` on failure.
    """
    _check_exists(path)

    # Check magic bytes (II for little-endian, MM for big-endian)
    with path.open("rb") as f:
        header = f.read(4)
    if len(header) < 4:
        raise ValidationError(path, "File too small to be a valid TIFF")
    if header[:2] not in (b"II", b"MM"):
        raise ValidationError(path, f"Invalid TIFF byte order marker: {header[:2]!r}")
    version = int.from_bytes(header[2:4], "little" if header[:2] == b"II" else "big")
    if version != 42:
        raise ValidationError(path, f"Invalid TIFF version: {version} (expected 42)")

    # Full load to detect truncation/corruption
    try:
        with Image.open(path) as img:
            img.load()
    except Exception as exc:
        raise ValidationError(path, f"TIFF load failed: {exc}") from exc

validate_jpg(path)

Verify a JPEG file is not corrupt.

Checks SOI/EOI markers and that Pillow can fully load it. Raises ValidationError on failure.

Source code in packages/rahcp-validate/src/rahcp_validate/images.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def validate_jpg(path: Path) -> None:
    """Verify a JPEG file is not corrupt.

    Checks SOI/EOI markers and that Pillow can fully load it.
    Raises ``ValidationError`` on failure.
    """
    _check_exists(path)

    with path.open("rb") as f:
        header = f.read(2)
        if header != b"\xff\xd8":
            raise ValidationError(path, "Missing JPEG SOI marker")
        # Seek to end to check EOI
        f.seek(-2, 2)
        trailer = f.read(2)
    if trailer != b"\xff\xd9":
        raise ValidationError(path, "Missing JPEG EOI marker (possibly truncated)")

    try:
        with Image.open(path) as img:
            img.load()
    except Exception as exc:
        raise ValidationError(path, f"JPEG load failed: {exc}") from exc

validate_png(path)

Verify a PNG file is not corrupt.

Checks PNG signature and that Pillow can fully load it. Raises ValidationError on failure.

Source code in packages/rahcp-validate/src/rahcp_validate/images.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def validate_png(path: Path) -> None:
    """Verify a PNG file is not corrupt.

    Checks PNG signature and that Pillow can fully load it.
    Raises ``ValidationError`` on failure.
    """
    _check_exists(path)

    with path.open("rb") as f:
        header = f.read(8)
    if header != b"\x89PNG\r\n\x1a\n":
        raise ValidationError(path, "Invalid PNG signature")

    try:
        with Image.open(path) as img:
            img.load()
    except Exception as exc:
        raise ValidationError(path, f"PNG load failed: {exc}") from exc

validate_by_extension(path)

Auto-detect file type by extension and validate.

Supported: .jpg, .jpeg, .tif, .tiff, .png. Unknown extensions are skipped (no error). Raises ValidationError if the file is corrupt.

Source code in packages/rahcp-validate/src/rahcp_validate/images.py
111
112
113
114
115
116
117
118
119
120
121
122
def validate_by_extension(path: Path) -> None:
    """Auto-detect file type by extension and validate.

    Supported: .jpg, .jpeg, .tif, .tiff, .png.
    Unknown extensions are skipped (no error).
    Raises ``ValidationError`` if the file is corrupt.
    """
    validators = _register_validators()
    ext = path.suffix.lower()
    validator = validators.get(ext)
    if validator:
        validator(path)

Rules

rules

Pluggable validation rules — size, dimensions, extensions.

Rule(name, check) dataclass

A named validation check.

max_file_size(limit_bytes)

Reject files larger than limit_bytes.

Source code in packages/rahcp-validate/src/rahcp_validate/rules.py
20
21
22
23
24
25
26
27
28
29
30
31
def max_file_size(limit_bytes: int) -> Rule:
    """Reject files larger than ``limit_bytes``."""

    def check(path: Path) -> None:
        size = path.stat().st_size
        if size > limit_bytes:
            raise ValidationError(
                path,
                f"File too large: {size:,} bytes (limit: {limit_bytes:,})",
            )

    return Rule(name=f"max_file_size({limit_bytes:,})", check=check)

image_dimensions(*, min_w=1, min_h=1, max_w=65535, max_h=65535)

Check image dimensions are within bounds.

Source code in packages/rahcp-validate/src/rahcp_validate/rules.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def image_dimensions(
    *,
    min_w: int = 1,
    min_h: int = 1,
    max_w: int = 65535,
    max_h: int = 65535,
) -> Rule:
    """Check image dimensions are within bounds."""

    def check(path: Path) -> None:
        from PIL import Image  # ty: ignore[unresolved-import]

        try:
            with Image.open(path) as img:
                w, h = img.size
        except Exception as exc:
            raise ValidationError(path, f"Cannot read image dimensions: {exc}") from exc

        if w < min_w or h < min_h:
            raise ValidationError(
                path, f"Image too small: {w}x{h} (minimum: {min_w}x{min_h})"
            )
        if w > max_w or h > max_h:
            raise ValidationError(
                path, f"Image too large: {w}x{h} (maximum: {max_w}x{max_h})"
            )

    return Rule(name="image_dimensions", check=check)

allowed_extensions(*exts)

Only allow files with the given extensions (case-insensitive).

Source code in packages/rahcp-validate/src/rahcp_validate/rules.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def allowed_extensions(*exts: str) -> Rule:
    """Only allow files with the given extensions (case-insensitive)."""
    normalized = {e.lower().lstrip(".") for e in exts}

    def check(path: Path) -> None:
        suffix = path.suffix.lower().lstrip(".")
        if suffix not in normalized:
            raise ValidationError(
                path,
                f"Extension '.{suffix}' not allowed (allowed: {', '.join(sorted(normalized))})",
            )

    return Rule(
        name=f"allowed_extensions({', '.join(sorted(normalized))})", check=check
    )

validate(path, rules)

Run all rules against a file, collecting all failures.

Does not stop on the first failure — returns all validation errors.

Source code in packages/rahcp-validate/src/rahcp_validate/rules.py
81
82
83
84
85
86
87
88
89
90
91
92
def validate(path: Path, rules: list[Rule]) -> list[ValidationError]:
    """Run all rules against a file, collecting all failures.

    Does not stop on the first failure — returns all validation errors.
    """
    errors: list[ValidationError] = []
    for rule in rules:
        try:
            rule.check(path)
        except ValidationError as exc:
            errors.append(exc)
    return errors