Skip to content

rahcp-client

Async HTTP client built on httpx. Handles authentication, retries, presigned URL transfers, multipart uploads, and bulk transfers. Uses rahcp-tracker for resumable progress tracking.

How it works

The SDK never sends file data through the backend API. Instead, it uses presigned URLs -- short-lived, signed S3 URLs that allow direct transfer between your code and HCP storage:

sequenceDiagram
    participant App as Your Code
    participant API as HCP Backend API
    participant S3 as HCP S3 Storage

    App->>API: POST /auth/token (username, password)
    API-->>App: JWT token

    note over App,S3: Upload flow (presigned PUT)
    App->>API: POST /presign {bucket, key, method: "put_object"}
    API-->>App: Presigned URL (valid 10 min)
    App->>S3: PUT presigned-url (file bytes)
    S3-->>App: 200 OK + ETag

    note over App,S3: Download flow (presigned GET)
    App->>API: POST /presign {bucket, key, method: "get_object"}
    API-->>App: Presigned URL (valid 1 hour)
    App->>S3: GET presigned-url
    S3-->>App: File bytes

This design keeps the backend stateless and avoids bottlenecking large transfers through the API server.

Quick start

import asyncio
from rahcp_client import HCPClient

async def main():
    async with HCPClient(
        endpoint="http://localhost:8000/api/v1",
        username="admin",
        password="password",
        tenant="dev-ai",
    ) as client:
        # List buckets
        result = await client.s3.list_buckets()
        print(result)

        # Upload a file (auto-selects presigned or multipart)
        from pathlib import Path
        etag = await client.s3.upload("my-bucket", "data/report.pdf", Path("report.pdf"))
        print(f"Uploaded: {etag}")

        # Download
        size = await client.s3.download("my-bucket", "data/report.pdf", Path("out.pdf"))
        print(f"Downloaded {size} bytes")

asyncio.run(main())

Configuration

HCPClient accepts these parameters:

Parameter Type Default Description
endpoint str http://localhost:8000/api/v1 HCP API base URL
username str "" HCP username
password str "" HCP password
tenant str \| None None Target tenant (omit for system-level)
timeout float 30.0 Request timeout in seconds
max_retries int 4 Maximum retries for transient failures
retry_base_delay float 1.0 Base delay for exponential backoff
multipart_threshold int 104857600 File size threshold for multipart upload (100 MB)
multipart_chunk int 67108864 Chunk size per multipart part (64 MB)
multipart_concurrency int 6 Number of parallel part uploads
verify_ssl bool True Verify SSL certificates for S3 data transfers

From environment variables

client = HCPClient.from_env()

Reads from HCP_ENDPOINT, HCP_USERNAME, HCP_PASSWORD, HCP_TENANT, HCP_TIMEOUT, HCP_MAX_RETRIES, HCP_MULTIPART_THRESHOLD, HCP_MULTIPART_CHUNK, HCP_MULTIPART_CONCURRENCY, HCP_VERIFY_SSL.

Properties

Property Type Description
client.s3 S3Ops S3 data-plane operations (lazy-loaded)
client.mapi MapiOps MAPI namespace administration (lazy-loaded)
client.token str \| None Current JWT bearer token (set after login)
client.transfer_settings TransferSettings Bulk transfer settings (verify_ssl, timeout, multipart_threshold) — used internally by the bulk engine

S3 operations

client.s3 returns an S3Ops instance with these methods:

Uploads and downloads

The upload() method automatically selects the best transfer strategy based on file size:

flowchart TD
    START["client.s3.upload(bucket, key, data)"] --> SIZE{File size}
    SIZE -->|"< 100 MB"| PRESIGN["Presign PUT URL"]
    SIZE -->|">= 100 MB"| MULTI["Multipart upload"]

    PRESIGN --> PUT["PUT file bytes to presigned URL"]
    PUT --> ETAG["Return ETag"]

    MULTI --> INIT["Initiate multipart"]
    INIT --> PARTS["Presign N part URLs"]
    PARTS --> PARALLEL["Upload parts in parallel<br/>(semaphore, default 6)"]
    PARALLEL --> COMPLETE["Complete multipart"]
    COMPLETE --> ETAG
# Upload (auto-selects presigned PUT or multipart based on file size)
etag = await client.s3.upload("bucket", "key", data_or_path)

# Multipart upload (explicit, parallel parts)
etag = await client.s3.upload_multipart("bucket", "key", Path("large.bin"), concurrency=6)

# Download to file
byte_count = await client.s3.download("bucket", "key", Path("dest.bin"))

# Download to bytes
data = await client.s3.download_bytes("bucket", "key")

Presigned URLs

All data transfers use presigned URLs internally. You can also generate them directly for sharing or browser-based transfers:

# Single presigned URL
url = await client.s3.presign_get("bucket", "key", expires=3600)
url = await client.s3.presign_put("bucket", "key", expires=3600)

# Bulk presigned download URLs (default method="get_object")
urls = await client.s3.presign_bulk("bucket", ["key1", "key2"], expires=3600)

# Bulk presigned upload URLs
urls = await client.s3.presign_bulk("bucket", ["key1", "key2"], method="put_object", expires=3600)

Listing and metadata

# List buckets
result = await client.s3.list_buckets()

# List objects (with prefix filtering and optional delimiter)
result = await client.s3.list_objects("bucket", prefix="data/", max_keys=1000, delimiter="/")
# Returns: {'objects': [...], 'common_prefixes': [...], 'is_truncated': bool, ...}

# Object metadata (HEAD)
meta = await client.s3.head("bucket", "key")

Delete and copy

# Single delete
await client.s3.delete("bucket", "key")

# Bulk delete
result = await client.s3.delete_bulk("bucket", ["key1", "key2", "key3"])

# Copy object
await client.s3.copy("dest-bucket", "dest-key", "src-bucket", "src-key")

Staging pattern

Atomic directory-level operations: upload to a staging prefix, validate, then commit to the final prefix in one step. This prevents downstream consumers from seeing partial results.

flowchart LR
    subgraph Write["1. Write to staging"]
        W1["upload obj-1"] --> STG[("staging/<br/>batch-1/")]
        W2["upload obj-2"] --> STG
        W3["upload obj-N"] --> STG
    end

    subgraph Commit["2. Commit"]
        STG -->|"copy all"| FINAL[("final/<br/>batch-1/")]
        STG -->|"delete staging"| GONE["(removed)"]
    end

    subgraph Fail["On failure"]
        STG -.->|"cleanup_staging()"| GONE2["(removed)"]
    end

    style FINAL fill:#d4edda,stroke:#28a745
    style GONE fill:#f8f9fa,stroke:#dee2e6
    style GONE2 fill:#f8d7da,stroke:#dc3545
# Move all objects from staging/ to final/
count = await client.s3.commit_staging("bucket", "staging/batch-1/", "final/batch-1/")

# Clean up staging on failure
count = await client.s3.cleanup_staging("bucket", "staging/batch-1/")
Full runnable example — staging_commit.py
        try:
            # 1. Upload files to a staging prefix
            print("Uploading to staging/...")
            for i in range(5):
                record = {"id": i, "value": f"record-{i}", "status": "processed"}
                data = json.dumps(record).encode()
                key = f"{STAGING_PREFIX}record-{i:03d}.json"
                await client.s3.upload(BUCKET, key, data)
                print(f"  Staged {key}")

            # 2. Verify all files landed
            result = await client.s3.list_objects(BUCKET, prefix=STAGING_PREFIX)
            staged_count = len(result["objects"])
            print(f"\nStaged {staged_count} objects")

            if staged_count != 5:
                raise RuntimeError(f"Expected 5 staged objects, got {staged_count}")

            # 3. Commit: copy staging → final, then delete staging
            print(f"\nCommitting {STAGING_PREFIX}{FINAL_PREFIX}...")
            committed = await client.s3.commit_staging(
                BUCKET, STAGING_PREFIX, FINAL_PREFIX
            )
            print(f"Committed {committed} objects")

            # 4. Verify final prefix
            result = await client.s3.list_objects(BUCKET, prefix=FINAL_PREFIX)
            print(f"Final prefix contains {len(result['objects'])} objects:")
            for obj in result["objects"]:
                print(f"  - {obj['key']}")

        except Exception:
            # Clean up staging on any failure
            print("\nError — cleaning up staging prefix...")
            deleted = await client.s3.cleanup_staging(BUCKET, STAGING_PREFIX)
            print(f"Deleted {deleted} staged objects")
            raise

Bulk transfers

The bulk_upload and bulk_download functions provide the same producer-consumer pipeline used by the CLI, available for programmatic use:

import asyncio
from pathlib import Path
from rahcp_client import HCPClient, BulkUploadConfig, BulkDownloadConfig, bulk_upload, bulk_download
from rahcp_tracker import SqliteTracker

async def main():
    async with HCPClient.from_env() as client:
        tracker = SqliteTracker(Path(".upload-tracker.db"))

        stats = await bulk_upload(BulkUploadConfig(
            client=client,
            bucket="images-batch",
            source_dir=Path("/data/scans"),
            tracker=tracker,
            prefix="2025/",
            workers=20,
            on_progress=lambda s: print(f"{s.done} files, {s.mb_per_sec:.1f} MB/s"),
            on_error=lambda key, exc: print(f"FAILED: {key}{exc}"),
        ))

        print(f"Done: {stats.ok} uploaded, {stats.skipped} skipped, {stats.errors} errors")
        print(f"Throughput: {stats.mb_per_sec:.1f} MB/s over {stats.elapsed:.0f}s")
        tracker.close()

asyncio.run(main())

BulkUploadConfig

Field Type Default Description
client HCPClient required Authenticated client instance
bucket str required Target S3 bucket
source_dir Path required Local directory to upload
tracker TrackerProtocol required Progress tracker (e.g. SqliteTracker)
prefix str "" Key prefix prepended to all keys
workers int 10 Number of concurrent upload workers
queue_depth int 8 Queue size multiplier (queue = workers × depth)
presign_batch_size int 200 URLs presigned per API call (higher = fewer round-trips)
chunk_size int 1048576 Chunk size in bytes for streaming uploads (1 MB)
skip_existing bool True Skip files already on remote with matching size
retry_errors bool False Only process files marked as error in tracker
include list[str] [] Glob patterns — only matching filenames are transferred
exclude list[str] [] Glob patterns — matching filenames are skipped
validate_file Callable None Pre-upload validation callback fn(Path) — raises on failure
verify_upload bool False HEAD check after each upload to verify size matches
on_progress callback None Called periodically with TransferStats
on_error callback None Called on each file failure with (key, exception)
progress_interval float 5.0 Minimum seconds between progress callbacks

BulkDownloadConfig

Field Type Default Description
client HCPClient required Authenticated client instance
bucket str required Source S3 bucket
dest_dir Path required Local destination directory
tracker TrackerProtocol required Progress tracker (e.g. SqliteTracker)
prefix str "" Only download keys under this prefix
workers int 10 Number of concurrent download workers
queue_depth int 8 Queue size multiplier (queue = workers × depth)
presign_batch_size int 200 URLs presigned per API call (higher = fewer round-trips)
chunk_size int 1048576 Chunk size in bytes for streaming large files (1 MB)
stream_threshold int 104857600 Files below this (100 MB) are downloaded in one shot
retry_errors bool False Only process files marked as error in tracker
include list[str] [] Glob patterns — only matching keys are downloaded
exclude list[str] [] Glob patterns — matching keys are skipped
validate_file Callable None Post-download validation callback fn(Path) — raises on failure
verify_download bool False Verify each download by checking file size after transfer
on_progress callback None Called periodically with TransferStats
on_error callback None Called on each file failure with (key, exception)
progress_interval float 5.0 Minimum seconds between progress callbacks

Downloads always skip files that already exist on disk with matching size (no skip_existing flag — this behavior is always on).

Transfer tracking (rahcp-tracker)

Transfer state is managed by the standalone rahcp-tracker package. The tracker is shared across S3 bulk transfers and IIIF downloads — any backend implementing TrackerProtocol can be used.

The default backend is SqliteTracker (aliased as TransferTracker for backwards compatibility), backed by SQLite with WAL mode and buffered writes.

from rahcp_tracker import SqliteTracker, TransferStatus

tracker = SqliteTracker(Path("my-job.db"), flush_every=200)

# Mark files
tracker.mark("folder/file.jpg", 12345, TransferStatus.done, etag='"abc"', validated=True)
tracker.mark("folder/bad.jpg", 0, TransferStatus.error, "SSL timeout")

# Query state
done = tracker.done_keys()           # set[str] — instant skip lookups
errors = tracker.error_entries()     # list[(key, size)] — for retry
summary = tracker.summary()          # {"pending": 0, "done": 500, "error": 3}

# Audit — find files needing post-transfer verification or validation
unverified = tracker.unverified_keys()    # list[(key, size, etag)]
unvalidated = tracker.unvalidated_keys()  # list[(key, size)]

# Lifecycle
tracker.commit()  # flush buffered marks to DB
tracker.close()   # flush + release resources

The tracker package is structured for pluggable backends:

rahcp-tracker/
  models.py     — TransferStatus enum + Transfer table (SQLModel, shared across backends)
  protocol.py   — TrackerProtocol (interface — 8 methods any backend must implement)
  sqlite.py     — SqliteTracker (default SQLite implementation)

To add a new backend (e.g. Postgres), implement TrackerProtocol in a new file — no changes to bulk.py, the CLI, or any consumer code. The Transfer SQLModel table in models.py works with both SQLite and Postgres via SQLAlchemy's dialect system.

MAPI operations

client.mapi returns a MapiOps instance for namespace administration:

# List namespaces
namespaces = await client.mapi.list_namespaces("dev-ai", verbose=True)

# Get namespace details
ns = await client.mapi.get_namespace("dev-ai", "datasets", verbose=True)

# Create namespace
result = await client.mapi.create_namespace("dev-ai", {
    "name": "new-ns",
    "description": "My namespace",
    "hardQuota": "100 GB",
    "softQuota": 80,
})

# Update namespace
await client.mapi.update_namespace("dev-ai", "new-ns", {
    "description": "Updated description",
})

# Delete namespace
await client.mapi.delete_namespace("dev-ai", "new-ns")

# Export as template
template = await client.mapi.export_namespace("dev-ai", "datasets")

# Export multiple
bundle = await client.mapi.export_namespaces("dev-ai", ["datasets", "archives"])

Error handling

All errors inherit from HCPError:

Exception HTTP status When
AuthenticationError 401, 403 Invalid credentials or insufficient permissions
NotFoundError 404 Resource does not exist
ConflictError 409 Resource already exists
RetryableError 408, 429, 500, 503, 504 Transient failure (raised after retries exhausted)
UpstreamError 502 HCP system unreachable
from rahcp_client.errors import HCPError, NotFoundError

try:
    await client.s3.head("bucket", "missing-key")
except NotFoundError:
    print("Object not found")
except HCPError as e:
    print(f"HCP error {e.status_code}: {e.message}")

Observability

The SDK has optional OpenTelemetry tracing. Every API request creates a span with method, path, and status code attributes.

# Install with OTel support
uv pip install "rahcp-client[otel]"

When opentelemetry-api is installed, the SDK creates spans automatically for every HTTP request and key S3 operations (upload, download, multipart). The OTEL exporter is configured via standard environment variables -- the same ones the backend uses:

OTEL_SERVICE_NAME=rahcp-cli
OTEL_EXPORTER_OTLP_ENDPOINT=https://otlp-gateway.example.com/otlp
OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf    # or grpc
OTEL_EXPORTER_OTLP_HEADERS=Authorization=Basic%20...

When OTel is not installed, the tracer is a no-op -- zero overhead, no dependency. Structured logging via Python's logging module is always active (method, path, status, duration in ms).

OTEL can also be configured programmatically in the SDK:

from rahcp_client.tracing import configure_tracing

configure_tracing(
    service_name="my-etl-pipeline",
    endpoint="https://otlp-gateway.example.com/otlp",
    protocol="http/protobuf",  # or "grpc"
)

Or via the CLI config file (otel_endpoint, otel_protocol, otel_service_name fields per profile).

Logging

Control log verbosity with the --log-level flag, the RAHCP_LOG_LEVEL env var, or the log_level profile setting:

# Debug — see every HTTP request with timing
rahcp --log-level debug s3 ls

# Info — see auth events and summaries
rahcp --log-level info s3 download-all my-bucket
Level What you see
debug Every HTTP request with method, path, status, duration (ms)
info Authentication, upload/download summaries
warning Retries, transport errors (default)
error Non-retryable failures

Credential safety

The SDK never logs passwords or tokens. Error messages from HCP are redacted -- any JSON response field matching password, token, access_token, secret, or authorization is replaced with [REDACTED] before logging or raising exceptions.

Presigned URL errors (e.g. 403 on upload/download) never expose the signed URL -- the SDK strips the signature parameters and shows only 403 Forbidden for bucket/key. Use --log-level debug for the full request path and redacted response body.

Retry behavior

The client automatically retries transient failures with exponential backoff:

flowchart TD
    REQ["Send request"] --> STATUS{Response status}
    STATUS -->|"200-399"| OK["Return response"]
    STATUS -->|"401"| REAUTH["Re-authenticate"]
    REAUTH --> RETRY1["Retry once"]
    RETRY1 --> STATUS2{Status}
    STATUS2 -->|"200-399"| OK
    STATUS2 -->|"401"| FAIL["Raise AuthenticationError"]

    STATUS -->|"408, 429, 500,<br/>503, 504"| BACKOFF["Wait: base * 2^attempt"]
    STATUS -->|"502"| UPSTREAM["Raise UpstreamError<br/>(no retry)"]
    BACKOFF --> RETRY2["Retry (up to max_retries)"]
    RETRY2 --> STATUS
    RETRY2 -->|"retries exhausted"| RETRIABLE["Raise RetryableError"]

    STATUS -->|"404"| NOT_FOUND["Raise NotFoundError"]
    STATUS -->|"409"| CONFLICT["Raise ConflictError"]

    style OK fill:#d4edda,stroke:#28a745
    style FAIL fill:#f8d7da,stroke:#dc3545
    style RETRIABLE fill:#f8d7da,stroke:#dc3545
    style UPSTREAM fill:#f8d7da,stroke:#dc3545