Skip to content

Endpoint Modules

Auto-generated from source — always in sync with the backend.

S3 Buckets

buckets

S3 bucket CRUD endpoints.

Endpoints

GET /buckets — list all buckets POST /buckets — create a bucket HEAD /buckets/{bucket} — check if bucket exists DELETE /buckets/{bucket} — delete a bucket (force=true empties first) GET /buckets/{bucket}/versioning — get versioning status PUT /buckets/{bucket}/versioning — set versioning GET /buckets/{bucket}/acl — get bucket ACL PUT /buckets/{bucket}/acl — set bucket ACL GET /buckets/{bucket}/cors — get bucket CORS PUT /buckets/{bucket}/cors — set bucket CORS DELETE /buckets/{bucket}/cors — delete bucket CORS GET /buckets/{bucket}/uploads — list in-progress multipart uploads

list_buckets(s3=Depends(get_s3_service)) async

List all S3 buckets accessible to the authenticated user.

Source code in backend/app/api/v1/endpoints/s3/buckets.py
54
55
56
57
58
59
60
@router.get("", response_model=ListBucketsResponse)
async def list_buckets(s3: StorageProtocol = Depends(get_s3_service)):
    """List all S3 buckets accessible to the authenticated user."""
    result = await run_storage(s3.list_buckets(), "buckets")
    buckets = [BucketInfo.model_validate(b) for b in result.get("Buckets", [])]
    owner = OwnerInfo.model_validate(result["Owner"]) if "Owner" in result else None
    return ListBucketsResponse(buckets=buckets, owner=owner)

create_bucket(body, s3=Depends(get_s3_service)) async

Create a new S3 bucket.

Source code in backend/app/api/v1/endpoints/s3/buckets.py
63
64
65
66
67
68
69
70
@router.post("", response_model=BucketMutationResponse, status_code=201)
async def create_bucket(
    body: CreateBucketRequest,
    s3: StorageProtocol = Depends(get_s3_service),
):
    """Create a new S3 bucket."""
    await run_storage(s3.create_bucket(body.bucket), f"bucket '{body.bucket}'")
    return {"status": "created", "bucket": body.bucket}

head_bucket(bucket, s3=Depends(get_s3_service)) async

Check whether a bucket exists. Returns 200 or 404.

Source code in backend/app/api/v1/endpoints/s3/buckets.py
73
74
75
76
77
@router.head("/{bucket}")
async def head_bucket(bucket: str, s3: StorageProtocol = Depends(get_s3_service)):
    """Check whether a bucket exists. Returns 200 or 404."""
    await run_storage(s3.head_bucket(bucket), f"bucket '{bucket}'")
    return Response(status_code=200)

delete_bucket(bucket, force=Query(None), s3=Depends(get_s3_service), hcp=Depends(get_mapi_service), token='', storage_settings=Depends(get_storage_settings)) async

Delete an S3 bucket.

If force=true, all objects (including versions and delete markers) are removed before deleting the bucket itself. On HCP, when the S3 delete fails with BucketNotEmpty (typically due to immutable deletion records), the endpoint reconfigures the namespace via MAPI to allow pruning, then retries. On MinIO/generic, force-delete is just empty + delete (no MAPI involved).

Source code in backend/app/api/v1/endpoints/s3/buckets.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
@router.delete("/{bucket}", response_model=BucketMutationResponse)
async def delete_bucket(
    bucket: str,
    force: bool | None = Query(None),
    s3: StorageProtocol = Depends(get_s3_service),
    hcp: AuthenticatedMapiService = Depends(get_mapi_service),
    token: Annotated[str, Depends(oauth2_scheme)] = "",
    storage_settings: StorageSettings = Depends(get_storage_settings),
):
    """Delete an S3 bucket.

    If ``force=true``, all objects (including versions and delete markers)
    are removed before deleting the bucket itself.  On HCP, when the S3
    delete fails with BucketNotEmpty (typically due to immutable deletion
    records), the endpoint reconfigures the namespace via MAPI to allow
    pruning, then retries.  On MinIO/generic, force-delete is just
    empty + delete (no MAPI involved).
    """
    if force:
        empty_errors = await _empty_bucket(s3, bucket)

        if storage_settings.storage_backend == "hcp":
            return await _force_delete_hcp(s3, hcp, bucket, token, empty_errors)

        # MinIO / generic: simple empty + delete
        await run_storage(s3.delete_bucket(bucket), f"bucket '{bucket}'")
        return {"status": "deleted", "bucket": bucket}

    await run_storage(s3.delete_bucket(bucket), f"bucket '{bucket}'")
    return {"status": "deleted", "bucket": bucket}

get_bucket_versioning(bucket, s3=Depends(get_s3_service)) async

Get the versioning configuration for a bucket.

Source code in backend/app/api/v1/endpoints/s3/buckets.py
299
300
301
302
303
304
305
306
307
308
309
@router.get("/{bucket}/versioning", response_model=BucketVersioningResponse)
async def get_bucket_versioning(
    bucket: str,
    s3: StorageProtocol = Depends(get_s3_service),
):
    """Get the versioning configuration for a bucket."""
    result = await run_storage(s3.get_bucket_versioning(bucket), f"bucket '{bucket}'")
    return BucketVersioningResponse(
        status=result.get("Status"),
        mfa_delete=result.get("MFADelete"),
    )

put_bucket_versioning(bucket, body, s3=Depends(get_s3_service)) async

Enable or suspend versioning on a bucket.

Source code in backend/app/api/v1/endpoints/s3/buckets.py
312
313
314
315
316
317
318
319
320
321
322
@router.put("/{bucket}/versioning", response_model=VersioningMutationResponse)
async def put_bucket_versioning(
    bucket: str,
    body: PutBucketVersioningRequest,
    s3: StorageProtocol = Depends(get_s3_service),
):
    """Enable or suspend versioning on a bucket."""
    await run_storage(
        s3.put_bucket_versioning(bucket, body.status), f"bucket '{bucket}'"
    )
    return {"status": "updated", "versioning": body.status}

get_bucket_acl(bucket, s3=Depends(get_s3_service)) async

Get the access control list (ACL) for a bucket.

Source code in backend/app/api/v1/endpoints/s3/buckets.py
328
329
330
331
332
333
334
335
@router.get("/{bucket}/acl", response_model=AclResponse)
async def get_bucket_acl(bucket: str, s3: StorageProtocol = Depends(get_s3_service)):
    """Get the access control list (ACL) for a bucket."""
    result = await run_storage(s3.get_bucket_acl(bucket), f"bucket '{bucket}'")
    return {
        "owner": result.get("Owner", {}),
        "grants": result.get("Grants", []),
    }

put_bucket_acl(bucket, body, s3=Depends(get_s3_service)) async

Set the access control list (ACL) for a bucket.

Source code in backend/app/api/v1/endpoints/s3/buckets.py
338
339
340
341
342
343
344
345
346
347
348
349
@router.put("/{bucket}/acl", response_model=BucketMutationResponse)
async def put_bucket_acl(
    bucket: str,
    body: AclPolicy,
    s3: StorageProtocol = Depends(get_s3_service),
):
    """Set the access control list (ACL) for a bucket."""
    await run_storage(
        s3.put_bucket_acl(bucket, body.model_dump(exclude_none=True)),
        f"bucket '{bucket}'",
    )
    return {"status": "updated"}

get_bucket_cors(bucket, s3=Depends(get_s3_service)) async

Get the CORS configuration for a bucket.

Source code in backend/app/api/v1/endpoints/s3/buckets.py
355
356
357
358
359
@router.get("/{bucket}/cors", response_model=CorsResponse)
async def get_bucket_cors(bucket: str, s3: StorageProtocol = Depends(get_s3_service)):
    """Get the CORS configuration for a bucket."""
    result = await run_storage(s3.get_bucket_cors(bucket), f"bucket '{bucket}'")
    return {"cors_rules": result.get("CORSRules", [])}

put_bucket_cors(bucket, body, s3=Depends(get_s3_service)) async

Set the CORS configuration for a bucket.

Source code in backend/app/api/v1/endpoints/s3/buckets.py
362
363
364
365
366
367
368
369
370
371
372
373
@router.put("/{bucket}/cors", response_model=BucketMutationResponse)
async def put_bucket_cors(
    bucket: str,
    body: CorsConfiguration,
    s3: StorageProtocol = Depends(get_s3_service),
):
    """Set the CORS configuration for a bucket."""
    await run_storage(
        s3.put_bucket_cors(bucket, body.model_dump(exclude_none=True)),
        f"bucket '{bucket}'",
    )
    return {"status": "updated"}

delete_bucket_cors(bucket, s3=Depends(get_s3_service)) async

Delete the CORS configuration for a bucket.

Source code in backend/app/api/v1/endpoints/s3/buckets.py
376
377
378
379
380
381
382
@router.delete("/{bucket}/cors", response_model=BucketMutationResponse)
async def delete_bucket_cors(
    bucket: str, s3: StorageProtocol = Depends(get_s3_service)
):
    """Delete the CORS configuration for a bucket."""
    await run_storage(s3.delete_bucket_cors(bucket), f"bucket '{bucket}'")
    return {"status": "deleted"}

list_multipart_uploads(bucket, prefix=Query(None), max_uploads=Query(1000, ge=1, le=1000), s3=Depends(get_s3_service)) async

List in-progress multipart uploads for a bucket.

Source code in backend/app/api/v1/endpoints/s3/buckets.py
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
@router.get("/{bucket}/uploads", response_model=ListMultipartUploadsResponse)
async def list_multipart_uploads(
    bucket: str,
    prefix: str | None = Query(None),
    max_uploads: int = Query(1000, ge=1, le=1000),
    s3: StorageProtocol = Depends(get_s3_service),
):
    """List in-progress multipart uploads for a bucket."""
    result = await run_storage(
        s3.list_multipart_uploads(bucket, prefix, max_uploads), f"bucket '{bucket}'"
    )
    uploads = [MultipartUploadInfo.model_validate(u) for u in result.get("Uploads", [])]
    return ListMultipartUploadsResponse(
        bucket=bucket,
        uploads=uploads,
        is_truncated=result.get("IsTruncated", False),
    )

S3 Objects

objects

S3 object CRUD endpoints.

start_s3_stats(bucket, prefix=Query(None), delimiter=Query(None), s3=Depends(get_s3_service), cache=Depends(get_cache_service)) async

Start counting objects at a prefix in the background.

Returns 202 with a task_id. Poll GET /s3_stats/{task_id} for results.

Source code in backend/app/api/v1/endpoints/s3/objects.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
@router.post("/s3_stats", response_model=StatsTaskResponse, status_code=202)
async def start_s3_stats(
    bucket: str,
    prefix: str | None = Query(None),
    delimiter: str | None = Query(None),
    s3: StorageProtocol = Depends(get_s3_service),
    cache: KVCache | None = Depends(get_cache_service),
):
    """Start counting objects at a prefix in the background.

    Returns 202 with a task_id. Poll GET /s3_stats/{task_id} for results.
    """
    task_id = str(uuid.uuid4())
    _stats_tasks[task_id] = {
        "status": "counting",
        "files": 0,
        "folders": 0,
        "bucket": bucket,
        "prefix": prefix,
    }
    asyncio.ensure_future(
        _count_objects_task(task_id, s3, bucket, prefix, delimiter, cache)
    )
    return {"task_id": task_id, "status": "counting"}

get_s3_stats(bucket, task_id, cache=Depends(get_cache_service)) async

Poll a counting task. Returns current progress and status.

Source code in backend/app/api/v1/endpoints/s3/objects.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
@router.get("/s3_stats/{task_id}", response_model=StatsTaskResponse)
async def get_s3_stats(
    bucket: str,
    task_id: str,
    cache: KVCache | None = Depends(get_cache_service),
):
    """Poll a counting task. Returns current progress and status."""
    state = _stats_tasks.get(task_id)
    if not state and cache and cache.enabled:
        state = await cache.get(f"s3_stats:{task_id}")
    if not state:
        raise HTTPException(404, "Task not found")
    return {
        "task_id": task_id,
        "status": state["status"],
        "files": state["files"],
        "folders": state["folders"],
    }

start_zip_download(request, bucket, body, s3=Depends(get_s3_service), cache=Depends(get_cache_service)) async

Start a background ZIP download task.

Accepts either explicit keys or a prefix to list all objects under. Returns 202 with task_id for polling.

Source code in backend/app/api/v1/endpoints/s3/objects.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
@router.post("/download", response_model=ZipTaskResponse, status_code=202)
async def start_zip_download(
    request: Request,
    bucket: str,
    body: BulkDownloadRequest,
    s3: StorageProtocol = Depends(get_s3_service),
    cache: KVCache | None = Depends(get_cache_service),
):
    """Start a background ZIP download task.

    Accepts either explicit keys or a prefix to list all objects under.
    Returns 202 with task_id for polling.
    """
    if body.prefix is not None:
        keys = await _list_all_keys(s3, bucket, body.prefix)
    elif body.keys:
        keys = body.keys
    else:
        raise HTTPException(400, "Provide either 'prefix' or 'keys'")

    if len(keys) > MAX_ZIP_OBJECTS:
        raise HTTPException(
            400,
            f"Too many objects ({len(keys)}). Maximum is {MAX_ZIP_OBJECTS}.",
        )

    if len(keys) == 0:
        raise HTTPException(400, "No objects found under the given prefix")

    task_id = str(uuid.uuid4())

    asyncio.ensure_future(_build_zip(task_id, s3, bucket, keys, cache))

    return ZipTaskResponse(
        task_id=task_id,
        status="processing",
        total=len(keys),
    )

get_zip_download(bucket, task_id, cache=Depends(get_cache_service)) async

Poll ZIP task status or download the completed ZIP.

Source code in backend/app/api/v1/endpoints/s3/objects.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
@router.get("/download/{task_id}")
async def get_zip_download(
    bucket: str,
    task_id: str,
    cache: KVCache | None = Depends(get_cache_service),
):
    """Poll ZIP task status or download the completed ZIP."""
    state = await _get_task_state(task_id, cache)
    if not state:
        raise HTTPException(404, "Task not found")

    if state["status"] == "processing":
        return ZipTaskResponse(
            task_id=task_id,
            status="processing",
            total=state.get("total", 0),
            completed=state.get("completed", 0),
            failed=state.get("failed", 0),
        )

    if state["status"] == "failed":
        raise HTTPException(
            500,
            detail={
                "status": "failed",
                "error": state.get("error", "Unknown error"),
                "failed_keys": state.get("failed_keys", []),
            },
        )

    zip_path = state.get("path", "")
    if not zip_path:
        raise HTTPException(500, "ZIP file not found on disk")

    # Guard against path traversal — resolved path must be inside ZIP_TEMP_DIR
    resolved = Path(zip_path).resolve()
    if not resolved.is_relative_to(ZIP_TEMP_DIR.resolve()):
        raise HTTPException(400, "Invalid ZIP path")
    if not resolved.exists():
        raise HTTPException(500, "ZIP file not found on disk")

    async def _cleanup() -> None:
        try:
            resolved.unlink(missing_ok=True)
        except OSError:
            pass
        _zip_tasks.pop(task_id, None)
        if cache and cache.enabled:
            await cache.delete(f"zip_task:{task_id}")

    return FileResponse(
        path=resolved,
        media_type="application/zip",
        filename=f"{bucket}-objects.zip",
        background=StarletteBackgroundTask(_cleanup),
        headers={
            "X-Zip-Failed": str(state.get("failed", 0)),
            "X-Zip-Failed-Keys": ",".join(state.get("failed_keys", [])[:50]),
        },
    )

create_folder(bucket, body, s3=Depends(get_s3_service)) async

Create a folder (zero-byte object with trailing slash).

Source code in backend/app/api/v1/endpoints/s3/objects.py
440
441
442
443
444
445
446
447
448
449
@router.post("/folder", response_model=CreateFolderResponse, status_code=201)
async def create_folder(
    bucket: str,
    body: CreateFolderRequest,
    s3: StorageProtocol = Depends(get_s3_service),
):
    """Create a folder (zero-byte object with trailing slash)."""
    key = body.folder_name if body.folder_name.endswith("/") else f"{body.folder_name}/"
    await run_storage(s3.put_object(bucket, key, io.BytesIO(b"")), f"folder '{key}'")
    return CreateFolderResponse(bucket=bucket, key=key)

S3 Credentials

credentials

S3 presigned URL generation and credential derivation endpoints.

generate_presigned_url(body, s3=Depends(get_s3_service)) async

Generate a presigned URL for temporary access to an object.

Use get_object to create a download link, or put_object for an upload link. The URL is valid for the specified duration (default 1 hour, max 7 days). Anyone with the URL can access the object — no credentials needed.

Source code in backend/app/api/v1/endpoints/s3/credentials.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@router.post("/presign", response_model=PresignedUrlResponse)
async def generate_presigned_url(
    body: PresignedUrlRequest,
    s3: StorageProtocol = Depends(get_s3_service),
):
    """Generate a presigned URL for temporary access to an object.

    Use `get_object` to create a download link, or `put_object` for an upload link.
    The URL is valid for the specified duration (default 1 hour, max 7 days).
    Anyone with the URL can access the object — no credentials needed.
    """
    if body.method not in _ALLOWED_METHODS:
        raise HTTPException(
            status_code=422,
            detail=f"method must be one of: {', '.join(sorted(_ALLOWED_METHODS))}",
        )
    url = await run_storage(
        s3.generate_presigned_url(body.bucket, body.key, body.expires_in, body.method),
        f"object '{body.key}'",
    )
    return PresignedUrlResponse(
        url=url,
        bucket=body.bucket,
        key=body.key,
        expires_in=body.expires_in,
        method=body.method,
    )

get_s3_credentials(token, storage_settings=Depends(get_storage_settings), s3_settings=Depends(get_s3_settings)) async

Return the S3 credentials for the authenticated user.

HCP: derives keys from username/password (base64 + md5). MinIO/generic: returns the configured S3 access key and secret key.

Source code in backend/app/api/v1/endpoints/s3/credentials.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
@router.get("/credentials", response_model=S3CredentialsResponse)
async def get_s3_credentials(
    token: Annotated[str, Depends(oauth2_scheme)],
    storage_settings: StorageSettings = Depends(get_storage_settings),
    s3_settings: S3Settings = Depends(get_s3_settings),
):
    """Return the S3 credentials for the authenticated user.

    HCP: derives keys from username/password (base64 + md5).
    MinIO/generic: returns the configured S3 access key and secret key.
    """
    if storage_settings.storage_backend == "hcp":
        creds: HcpCredentials = verify_token_with_credentials(token)
        access_key, secret_key = derive_s3_keys(creds.username, creds.password)
        endpoint_url = s3_endpoint_for_tenant(creds.tenant, s3_settings.hcp_domain)
        return S3CredentialsResponse(
            access_key_id=access_key,
            secret_access_key=secret_key,
            username=creds.username,
            tenant=creds.tenant,
            endpoint_url=endpoint_url,
        )

    return S3CredentialsResponse(
        access_key_id=storage_settings.s3_access_key,
        secret_access_key=storage_settings.s3_secret_key.get_secret_value(),
        endpoint_url=storage_settings.s3_endpoint_url or None,
    )

Authentication

auth

Authentication endpoint — OAuth2 password flow.

login(request, form_data=Depends(), tenant=Form(None)) async

Authenticate with HCP credentials and receive a JWT bearer token.

Credentials are stored in the JWT and passed through to HCP on every API call. HCP validates them — login itself always succeeds.

Tenant can be provided in three ways (first match wins):

  1. tenant form field — set it directly (used by the frontend).
  2. tenant/username format — enter acc-ai/my_user in the username field. Works in the Swagger Authorize dialog (lock icon).
  3. Omit — for system-level access (no tenant routing).
Source code in backend/app/api/v1/endpoints/auth.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@router.post("/token", response_model=TokenResponse)
async def login(
    request: Request,
    form_data: OAuth2PasswordRequestForm = Depends(),
    tenant: str | None = Form(None),
):
    """Authenticate with HCP credentials and receive a JWT bearer token.

    Credentials are stored in the JWT and passed through to HCP on every
    API call.  HCP validates them — login itself always succeeds.

    **Tenant** can be provided in three ways (first match wins):

    1. **``tenant`` form field** — set it directly (used by the frontend).
    2. **``tenant/username`` format** — enter ``acc-ai/my_user`` in the
       username field.  Works in the Swagger **Authorize** dialog (lock icon).
    3. **Omit** — for system-level access (no tenant routing).
    """
    # Explicit tenant field takes priority
    resolved_tenant = tenant.strip() if tenant and tenant.strip() else None

    username = form_data.username
    if resolved_tenant is None:
        username, resolved_tenant = _parse_username(form_data.username)

    client_ip = request.client.host if request.client else None

    if resolved_tenant is not None and not _TENANT_RE.match(resolved_tenant):
        logger.warning(
            "Login failed: invalid tenant name %r user=%s ip=%s",
            resolved_tenant,
            username,
            client_ip,
        )
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
            detail="Invalid tenant name",
        )
    token = create_access_token(username, form_data.password, tenant=resolved_tenant)
    logger.info(
        "Login success: user=%s tenant=%s ip=%s",
        username,
        resolved_tenant or "(system)",
        client_ip,
    )
    return {"access_token": token, "token_type": "bearer"}