Skip to content

Async API Guide

The AsyncSandbox and AsyncSandboxClient classes provide fully async/await compatible APIs for concurrent sandbox execution. This guide covers async patterns, performance considerations, and best practices.


When to Use Async

Use Async When:

High concurrency: Running 10+ sandboxes simultaneously
I/O-bound workloads: Most time spent waiting for network/disk
Async framework integration: FastAPI, aiohttp, asyncio-based services
Long-running commands: Want to start multiple commands and wait for all
Resource efficiency: Need to handle 100+ concurrent sandboxes without spawning 100+ threads

Stay with Sync When:

Simple scripts: Sequential execution of 1-5 sandboxes
CPU-bound workloads: Async doesn't help (GIL limits Python parallelism)
Sync framework: Flask, Django, or synchronous codebase
Team unfamiliarity: Async/await adds complexity if team isn't comfortable


AsyncSandbox Overview

AsyncSandbox mirrors the sync Sandbox API but with async/await:

Sync API Async API Difference
Sandbox.from_local() await AsyncSandbox.from_local() Awaitable classmethod
sandbox.run() await sandbox.run() Awaitable method
with sandbox: async with sandbox: Async context manager
sandbox.session() await sandbox.session() Returns AsyncSessionWrapper
with sandbox.session(): async with session: Async context manager

Key Difference: All I/O operations are awaitable, allowing the event loop to handle other tasks while waiting.


Creating Async Sandboxes

AsyncSandbox.from_local()

@classmethod
async def from_local(
    cls,
    path: str,
    preset: Optional[str] = "view-only",
    permissions: Optional[List[Union[PermissionRule, Dict]]] = None,
    runtime: RuntimeType = RuntimeType.BWRAP,
    image: Optional[str] = None,
    resources: Optional[ResourceLimits] = None,
    endpoint: str = "localhost:9000",
    secure: bool = False,
    owner_id: Optional[str] = None,
    codebase_name: Optional[str] = None,
    ignore_patterns: Optional[List[str]] = None,
    labels: Optional[Dict[str, str]] = None,
    auto_start: bool = True,
) -> "AsyncSandbox"

Example:

import asyncio
from agentfense import AsyncSandbox

async def main():
    # Create async sandbox
    async with await AsyncSandbox.from_local("./project") as sandbox:
        result = await sandbox.run("pytest")
        print(result.stdout)

asyncio.run(main())

AsyncSandbox.from_codebase()

async with await AsyncSandbox.from_codebase(
    codebase_id="cb_xyz123",
    preset="agent-safe",
) as sandbox:
    result = await sandbox.run("make test")

AsyncSandbox.connect()

# Reconnect to existing sandbox
sandbox = await AsyncSandbox.connect("sb_abc123")
result = await sandbox.run("whoami")
await sandbox.destroy()

Concurrent Execution Patterns

Pattern 1: Run Multiple Sandboxes in Parallel

Problem: Execute the same command across 10 different projects.

import asyncio
from agentfense import AsyncSandbox

async def test_project(project_path: str) -> tuple[str, bool]:
    """Run tests in a project and return (name, success)."""
    async with await AsyncSandbox.from_local(project_path) as sandbox:
        result = await sandbox.run("pytest", timeout=300)
        return project_path, result.exit_code == 0

async def test_all_projects():
    """Test 10 projects concurrently."""
    projects = [f"./project-{i}" for i in range(10)]

    # Run all in parallel
    tasks = [test_project(p) for p in projects]
    results = await asyncio.gather(*tasks)

    # Print results
    for project, success in results:
        status = "✓ PASS" if success else "✗ FAIL"
        print(f"{status} {project}")

asyncio.run(test_all_projects())

Why Async: Instead of 10 * 60s = 600s sequential, runs in ~60s (all parallel).


Pattern 2: Fan-Out with Different Commands

Problem: Run different analysis tools on the same codebase concurrently.

async def run_analysis(project_path: str) -> dict:
    """Run multiple analysis tools in parallel."""

    async def pylint_check(sandbox):
        result = await sandbox.run("pylint .", timeout=120)
        return "pylint", result.stdout

    async def mypy_check(sandbox):
        result = await sandbox.run("mypy .", timeout=120)
        return "mypy", result.stdout

    async def pytest_coverage(sandbox):
        result = await sandbox.run("pytest --cov", timeout=300)
        return "coverage", result.stdout

    # Create sandbox once
    async with await AsyncSandbox.from_local(project_path) as sandbox:
        # Run all checks concurrently
        tasks = [
            pylint_check(sandbox),
            mypy_check(sandbox),
            pytest_coverage(sandbox),
        ]
        results = await asyncio.gather(*tasks)

        return {tool: output for tool, output in results}

# Usage
results = asyncio.run(run_analysis("./my-project"))
print(results["pylint"])

Pattern 3: Concurrent File Operations

Problem: Download multiple files from a sandbox in parallel.

async def download_outputs(sandbox: AsyncSandbox, file_paths: list[str]) -> dict[str, str]:
    """Download multiple files concurrently."""

    async def download_one(path: str) -> tuple[str, str]:
        content = await sandbox.read_file(path)
        return path, content

    tasks = [download_one(path) for path in file_paths]
    results = await asyncio.gather(*tasks)

    return {path: content for path, content in results}

# Usage
async with await AsyncSandbox.from_local("./project") as sandbox:
    await sandbox.run("./generate-reports.sh")

    files_to_download = [
        "/workspace/output/report1.txt",
        "/workspace/output/report2.txt",
        "/workspace/output/report3.txt",
    ]

    contents = await download_outputs(sandbox, files_to_download)
    for path, content in contents.items():
        print(f"{path}: {len(content)} bytes")

Pattern 4: Pipeline with Dependencies

Problem: Run stages where stage N depends on stage N-1.

async def ci_pipeline(project_path: str) -> bool:
    """Run CI pipeline stages sequentially within one sandbox."""
    async with await AsyncSandbox.from_local(
        project_path,
        preset="development",
        runtime=RuntimeType.DOCKER,
        image="python:3.11-slim",
    ) as sandbox:
        async with await sandbox.session() as session:
            # Stage 1: Install dependencies
            result = await session.exec("pip install -r requirements.txt")
            if result.exit_code != 0:
                print("Dependency install failed")
                return False

            # Stage 2: Linting (runs after install)
            result = await session.exec("flake8 .")
            if result.exit_code != 0:
                print("Linting failed")
                return False

            # Stage 3: Tests (runs after linting)
            result = await session.exec("pytest --cov")
            if result.exit_code != 0:
                print("Tests failed")
                return False

            # Stage 4: Build (runs after tests)
            result = await session.exec("python setup.py bdist_wheel")
            return result.exit_code == 0

# Usage
success = asyncio.run(ci_pipeline("./my-package"))

Note: This is sequential (no parallelism within one pipeline), but you can run multiple pipelines in parallel:

async def run_pipelines():
    projects = ["./project-A", "./project-B", "./project-C"]
    tasks = [ci_pipeline(p) for p in projects]
    results = await asyncio.gather(*tasks)  # 3 pipelines in parallel
    return results

results = asyncio.run(run_pipelines())

Pattern 5: Timeout with Cancellation

Problem: Run multiple sandboxes but cancel all if any fails or takes too long.

async def test_with_timeout(project_path: str, timeout: int) -> str:
    """Run tests with global timeout."""
    async with await AsyncSandbox.from_local(project_path) as sandbox:
        result = await sandbox.run("pytest", timeout=timeout)
        if result.exit_code != 0:
            raise Exception(f"Tests failed in {project_path}")
        return f"{project_path}: PASS"

async def test_all_with_global_timeout():
    """Run all tests, but cancel if any fails or global timeout."""
    projects = [f"./project-{i}" for i in range(5)]

    try:
        # 5-minute global timeout
        results = await asyncio.wait_for(
            asyncio.gather(*[test_with_timeout(p, 60) for p in projects]),
            timeout=300
        )
        print("All tests passed:", results)
    except asyncio.TimeoutError:
        print("Global timeout exceeded!")
    except Exception as e:
        print(f"Test failure: {e}")

asyncio.run(test_all_with_global_timeout())

Async Sessions

Creating Async Sessions

async with await AsyncSandbox.from_local("./project") as sandbox:
    async with await sandbox.session() as session:
        await session.exec("cd /workspace")
        await session.exec("npm install")
        result = await session.exec("npm test")

Parallel Commands in Separate Sessions

Problem: Run independent stateful workflows in parallel.

async def workflow_A(sandbox: AsyncSandbox):
    async with await sandbox.session() as session:
        await session.exec("cd /workspace/backend")
        await session.exec("pytest backend/")

async def workflow_B(sandbox: AsyncSandbox):
    async with await sandbox.session() as session:
        await session.exec("cd /workspace/frontend")
        await session.exec("npm test")

# Run both workflows concurrently (different sessions)
async with await AsyncSandbox.from_local("./monorepo") as sandbox:
    await asyncio.gather(
        workflow_A(sandbox),
        workflow_B(sandbox),
    )

Important: Each workflow has its own session (separate shell process), so state doesn't conflict.


AsyncSandboxClient (Low-Level)

For fine-grained control, use AsyncSandboxClient directly.

Example: Reuse Codebase Across Async Sandboxes

from agentfense import AsyncSandboxClient, RuntimeType

async def multi_sandbox_analysis(project_path: str):
    """Create one codebase, multiple sandboxes."""
    async with AsyncSandboxClient(endpoint="localhost:9000") as client:
        # Create and upload codebase once
        codebase = await client.create_codebase(name="shared", owner_id="team")

        # Upload files (sync walk_directory, async upload)
        from agentfense.utils import walk_directory
        for rel_path, content in walk_directory(project_path):
            await client.upload_file(codebase.id, rel_path, content)

        # Create multiple sandboxes from same codebase
        sandbox1 = await client.create_sandbox(
            codebase.id,
            permissions=[{"pattern": "**/*", "permission": "read"}],
            runtime=RuntimeType.BWRAP,
        )

        sandbox2 = await client.create_sandbox(
            codebase.id,
            permissions=[{"pattern": "**/*", "permission": "write"}],
            runtime=RuntimeType.DOCKER,
            image="python:3.11-slim",
        )

        # Start both sandboxes
        await asyncio.gather(
            client.start_sandbox(sandbox1.id),
            client.start_sandbox(sandbox2.id),
        )

        # Run commands concurrently
        results = await asyncio.gather(
            client.exec(sandbox1.id, command="pylint ."),
            client.exec(sandbox2.id, command="pytest --cov"),
        )

        # Cleanup
        await asyncio.gather(
            client.destroy_sandbox(sandbox1.id),
            client.destroy_sandbox(sandbox2.id),
        )
        await client.delete_codebase(codebase.id)

        return results

results = asyncio.run(multi_sandbox_analysis("./project"))

Performance Comparison

Benchmark: 10 Sandboxes Running 10-Second Command

Sync (Sequential):

import time
from agentfense import Sandbox

start = time.time()
results = []
for i in range(10):
    with Sandbox.from_local("./project") as sandbox:
        result = sandbox.run("sleep 10")
        results.append(result)
elapsed = time.time() - start
print(f"Time: {elapsed:.1f}s")  # ~100s (10 * 10s)

Async (Parallel):

import time
import asyncio
from agentfense import AsyncSandbox

async def run_one():
    async with await AsyncSandbox.from_local("./project") as sandbox:
        return await sandbox.run("sleep 10")

async def main():
    start = time.time()
    tasks = [run_one() for _ in range(10)]
    results = await asyncio.gather(*tasks)
    elapsed = time.time() - start
    print(f"Time: {elapsed:.1f}s")  # ~10s (all parallel)
    return results

asyncio.run(main())

Speedup: ~10x (10 sandboxes run concurrently instead of sequentially).


Scalability: 100 Concurrent Sandboxes

Async can handle 100+ concurrent sandboxes with minimal memory overhead:

async def run_100_sandboxes():
    """Run 100 sandboxes concurrently."""
    async def run_one(i: int):
        async with await AsyncSandbox.from_local(f"./project-{i}") as sandbox:
            return await sandbox.run("echo 'done'")

    tasks = [run_one(i) for i in range(100)]
    results = await asyncio.gather(*tasks)
    print(f"Completed {len(results)} sandboxes")

asyncio.run(run_100_sandboxes())

Memory Usage: ~50-100 MB (async overhead minimal compared to thread-per-sandbox).


Best Practices

1. Use asyncio.gather() for Parallel Execution

# ✓ Good: Run in parallel
tasks = [sandbox.run(cmd) for cmd in commands]
results = await asyncio.gather(*tasks)

# ✗ Bad: Sequential (defeats async purpose)
results = []
for cmd in commands:
    result = await sandbox.run(cmd)
    results.append(result)

2. Handle Exceptions in gather()

# ✓ Good: Catch exceptions per task
tasks = [run_test(project) for project in projects]
results = await asyncio.gather(*tasks, return_exceptions=True)

for project, result in zip(projects, results):
    if isinstance(result, Exception):
        print(f"{project} failed: {result}")
    else:
        print(f"{project} succeeded")

# ✗ Bad: One failure cancels all
try:
    results = await asyncio.gather(*tasks)  # First exception stops all
except Exception as e:
    print(f"Failed: {e}")  # Don't know which task failed

3. Use Semaphores to Limit Concurrency

async def run_with_limit(projects: list[str], max_concurrent: int = 5):
    """Run sandboxes with max concurrency limit."""
    semaphore = asyncio.Semaphore(max_concurrent)

    async def run_one(project: str):
        async with semaphore:  # Only 5 at a time
            async with await AsyncSandbox.from_local(project) as sandbox:
                return await sandbox.run("make test")

    tasks = [run_one(p) for p in projects]
    return await asyncio.gather(*tasks)

# Run 100 projects, but only 5 concurrent sandboxes
results = asyncio.run(run_with_limit([f"./p-{i}" for i in range(100)], max_concurrent=5))

4. Close Clients Properly

# ✓ Good: Use context manager
async with AsyncSandboxClient(endpoint="localhost:9000") as client:
    # ... use client ...
# Automatically closed

# ✗ Bad: Manual close (easy to forget)
client = AsyncSandboxClient(endpoint="localhost:9000")
# ... use client ...
await client.close()  # Easy to forget if exception occurs

5. Avoid Mixing Sync and Async

# ✗ Bad: Blocking call in async function
async def bad_example():
    async with await AsyncSandbox.from_local("./project") as sandbox:
        result = await sandbox.run("test.sh")

        # Blocking I/O in async context!
        with open("output.txt", "w") as f:  # Blocks event loop
            f.write(result.stdout)

# ✓ Good: Use async file I/O
import aiofiles

async def good_example():
    async with await AsyncSandbox.from_local("./project") as sandbox:
        result = await sandbox.run("test.sh")

        async with aiofiles.open("output.txt", "w") as f:
            await f.write(result.stdout)

Migration from Sync

Converting sync code to async is straightforward:

Step 1: Change Imports

# Sync
from agentfense import Sandbox

# Async
from agentfense import AsyncSandbox

Step 2: Add async/await

# Sync
def run_tests(project_path: str):
    with Sandbox.from_local(project_path) as sandbox:
        result = sandbox.run("pytest")
        return result.exit_code == 0

# Async
async def run_tests(project_path: str):
    async with await AsyncSandbox.from_local(project_path) as sandbox:
        result = await sandbox.run("pytest")
        return result.exit_code == 0

Step 3: Update Context Managers

# Sync
with sandbox.session() as session:
    session.exec("cd /workspace")

# Async
async with await sandbox.session() as session:
    await session.exec("cd /workspace")

Step 4: Use asyncio.run() at Entry Point

# Sync
if __name__ == "__main__":
    success = run_tests("./project")
    print(success)

# Async
import asyncio

if __name__ == "__main__":
    success = asyncio.run(run_tests("./project"))
    print(success)

See Also