Python User API

The Manta Python API provides programmatic access to deploy, orchestrate, and monitor distributed computing workloads. It’s designed for integration into notebooks, applications, and automated workflows.

Note

The API comes in two flavors: Synchronous (easier to use) and Asynchronous (better performance). Both provide identical functionality.

Overview

The User API enables:

  • Cluster Management - Create, configure, and monitor clusters

  • Swarm Deployment - Deploy and control distributed algorithms

  • Module Management - Upload and manage code modules

  • Result Collection - Stream and retrieve computation results

  • Node Operations - Monitor and manage computing nodes

  • Log Analysis - Collect and analyze execution logs

Quick Start

Synchronous API (Recommended for beginners)

from manta import UserAPI

# Initialize API client
api = UserAPI(
    token="your_jwt_token",
    host="localhost",
    port=50052
)

# Check connection
if api.is_available():
    print("Connected to Manta platform")

# Get user info
user = api.get_user()
print(f"Authenticated as: {user['username']}")

Asynchronous API (For advanced users)

import asyncio
from manta import AsyncUserAPI

async def main():
    # Initialize async client
    api = AsyncUserAPI(
        token="your_jwt_token",
        host="localhost",
        port=50052
    )

    # Check connection
    if await api.is_available():
        print("Connected to Manta platform")

    # Stream results
    async for result in api.stream_results(swarm_id, "metrics"):
        print(f"Result: {result}")

asyncio.run(main())

API Architecture

Two API Variants

  1. UserAPI - Synchronous interface - Simpler to use - Works in standard Python scripts - Blocks until operations complete - Ideal for notebooks and simple scripts

  2. AsyncUserAPI - Asynchronous interface - Better performance for I/O operations - Non-blocking operations - Supports streaming and concurrent requests - Ideal for production applications

API Hierarchy

UserAPI / AsyncUserAPI
├── Authentication & User Management
├── Cluster Operations
│   └── ClusterAPI / AsyncClusterAPI
├── Module Management
├── Swarm Deployment
├── Result Collection
└── Monitoring & Logs

Common Operations

Cluster Management

from manta import UserAPI

api = UserAPI(token="token", host="localhost", port=50052)

# List clusters
clusters = api.get_clusters()
for cluster in clusters:
    print(f"Cluster: {cluster['name']} ({cluster['cluster_id']})")

# Get specific cluster
cluster = api.get_cluster("cluster_id")

# Get cluster API for detailed operations
cluster_api = api.get_cluster_api("cluster_id")

Swarm Deployment

from manta import UserAPI, Swarm

# Create swarm
swarm = MySwarm()  # Your swarm class

# Deploy to cluster
api = UserAPI(token="token", host="localhost", port=50052)
result = api.deploy_swarm("cluster_id", swarm)

print(f"Deployed swarm: {result['swarm_id']}")
print(f"Status: {result['status']}")

Module Management

from manta import Module

# Create module
module = Module(
    name="my_algorithm",
    python_program=code,
    image="python:3.10"
)

# Upload module
module_id = api.send_module(module)

# List modules
modules = api.get_modules()

# Get specific module
module = api.get_module(module_id)

Result Collection

# Get results by tag
results = api.select_results(swarm_id, tag="accuracy")

# Process results
for swarm_id, swarm_results in results.items():
    print(f"Swarm {swarm_id}:")
    for iteration in swarm_results:
        print(f"  Iteration {iteration}: {swarm_results[iteration]}")

Log Collection

# Collect logs with filters
logs = api.collect_logs(
    swarm_id=swarm_id,
    severity=["ERROR", "WARNING"],
    limit=100,
    sort_order="desc"
)

for log in logs:
    print(f"[{log['timestamp']}] {log['message']}")

Streaming Operations

The async API supports streaming for real-time data:

import asyncio
from manta import AsyncUserAPI

async def stream_example():
    api = AsyncUserAPI(token="token", host="localhost", port=50052)

    # Stream clusters
    async for cluster in api.stream_clusters():
        print(f"Cluster: {cluster}")

    # Stream swarms
    async for swarm in api.stream_swarms():
        print(f"Swarm: {swarm}")

    # Stream results
    async for result in api.stream_results(swarm_id, "metrics"):
        print(f"Result: {result}")

    # Stream logs
    async for log in api.stream_logs(swarm_id):
        print(f"Log: {log}")

asyncio.run(stream_example())

Error Handling

from manta import UserAPI
from manta.errors import (
    AuthenticationError,
    ConnectionError,
    ResourceNotFoundError,
    ValidationError
)

try:
    api = UserAPI(token="token", host="localhost", port=50052)
    result = api.deploy_swarm("cluster_id", swarm)

except AuthenticationError as e:
    print(f"Authentication failed: {e}")
    # Refresh token or re-authenticate

except ConnectionError as e:
    print(f"Connection failed: {e}")
    # Check network and service status

except ResourceNotFoundError as e:
    print(f"Resource not found: {e}")
    # Verify resource IDs

except ValidationError as e:
    print(f"Validation error: {e}")
    # Check input parameters

except Exception as e:
    print(f"Unexpected error: {e}")

Configuration

From Configuration File

from manta import UserAPI
from manta.config import load_config

# Load from default location (~/.manta/config.toml)
config = load_config()
api = UserAPI(**config)

# Load specific profile
config = load_config(profile="production")
api = UserAPI(**config)

From Environment Variables

import os
from manta import UserAPI

# Set environment variables
os.environ["MANTA_HOST"] = "platform.manta.io"
os.environ["MANTA_PORT"] = "50052"
os.environ["MANTA_TOKEN"] = "your_token"

# API will use environment variables
api = UserAPI.from_env()

Performance Tips

  1. Use Async API for I/O-heavy operations

  2. Reuse API instances - Don’t create new connections repeatedly

  3. Use streaming for large datasets - Avoid loading everything into memory

  4. Batch operations when possible - Reduce round trips

  5. Handle pagination - Use limits and offsets for large lists

  6. Cache results - Avoid redundant API calls

Complete Example

from manta import UserAPI, Swarm, Task, Module

# Define a simple swarm
class SimpleSwarm(Swarm):
    name = "example_swarm"

    def execute(self):
        module = Module(
            name="processor",
            python_program="print('Processing...')",
            image="python:3.10"
        )

        task = Task(module=module, method="all")
        return task

# Deploy and monitor
def main():
    # Initialize API
    api = UserAPI(
        token="your_token",
        host="localhost",
        port=50052
    )

    # Check availability
    if not api.is_available():
        print("Service unavailable")
        return

    # List clusters
    clusters = api.list_cluster_ids()
    if not clusters:
        print("No clusters available")
        return

    cluster_id = clusters[0]

    # Deploy swarm
    swarm = SimpleSwarm()
    result = api.deploy_swarm(cluster_id, swarm)
    swarm_id = result["swarm_id"]

    print(f"Deployed swarm: {swarm_id}")

    # Monitor status
    import time
    while True:
        status = api.get_swarm(swarm_id)
        print(f"Status: {status['status']}")

        if status['status'] in ['COMPLETED', 'FAILED']:
            break

        time.sleep(5)

    # Collect results
    if status['status'] == 'COMPLETED':
        results = api.select_results(swarm_id, tag="output")
        print(f"Results: {results}")

    # Get logs
    logs = api.collect_logs(swarm_id, limit=10)
    for log in logs:
        print(f"[{log['timestamp']}] {log['message']}")

if __name__ == "__main__":
    main()

Best Practices

  1. Always check service availability before operations

  2. Use appropriate API variant (sync vs async)

  3. Handle errors gracefully with proper exception handling

  4. Close connections properly when done

  5. Use context managers for automatic cleanup

  6. Validate inputs before API calls

  7. Monitor rate limits if applicable

Next Steps