Python User API¶
The Manta Python API provides programmatic access to deploy, orchestrate, and monitor distributed computing workloads. It’s designed for integration into notebooks, applications, and automated workflows.
Note
The API comes in two flavors: Synchronous (easier to use) and Asynchronous (better performance). Both provide identical functionality.
Overview¶
The User API enables:
Cluster Management - Create, configure, and monitor clusters
Swarm Deployment - Deploy and control distributed algorithms
Module Management - Upload and manage code modules
Result Collection - Stream and retrieve computation results
Node Operations - Monitor and manage computing nodes
Log Analysis - Collect and analyze execution logs
Quick Start¶
Synchronous API (Recommended for beginners)
from manta import UserAPI
# Initialize API client
api = UserAPI(
token="your_jwt_token",
host="localhost",
port=50052
)
# Check connection
if api.is_available():
print("Connected to Manta platform")
# Get user info
user = api.get_user()
print(f"Authenticated as: {user['username']}")
Asynchronous API (For advanced users)
import asyncio
from manta import AsyncUserAPI
async def main():
# Initialize async client
api = AsyncUserAPI(
token="your_jwt_token",
host="localhost",
port=50052
)
# Check connection
if await api.is_available():
print("Connected to Manta platform")
# Stream results
async for result in api.stream_results(swarm_id, "metrics"):
print(f"Result: {result}")
asyncio.run(main())
API Architecture¶
Two API Variants
UserAPI - Synchronous interface - Simpler to use - Works in standard Python scripts - Blocks until operations complete - Ideal for notebooks and simple scripts
AsyncUserAPI - Asynchronous interface - Better performance for I/O operations - Non-blocking operations - Supports streaming and concurrent requests - Ideal for production applications
API Hierarchy
UserAPI / AsyncUserAPI
├── Authentication & User Management
├── Cluster Operations
│ └── ClusterAPI / AsyncClusterAPI
├── Module Management
├── Swarm Deployment
├── Result Collection
└── Monitoring & Logs
Common Operations¶
Cluster Management
from manta import UserAPI
api = UserAPI(token="token", host="localhost", port=50052)
# List clusters
clusters = api.get_clusters()
for cluster in clusters:
print(f"Cluster: {cluster['name']} ({cluster['cluster_id']})")
# Get specific cluster
cluster = api.get_cluster("cluster_id")
# Get cluster API for detailed operations
cluster_api = api.get_cluster_api("cluster_id")
Swarm Deployment
from manta import UserAPI, Swarm
# Create swarm
swarm = MySwarm() # Your swarm class
# Deploy to cluster
api = UserAPI(token="token", host="localhost", port=50052)
result = api.deploy_swarm("cluster_id", swarm)
print(f"Deployed swarm: {result['swarm_id']}")
print(f"Status: {result['status']}")
Module Management
from manta import Module
# Create module
module = Module(
name="my_algorithm",
python_program=code,
image="python:3.10"
)
# Upload module
module_id = api.send_module(module)
# List modules
modules = api.get_modules()
# Get specific module
module = api.get_module(module_id)
Result Collection
# Get results by tag
results = api.select_results(swarm_id, tag="accuracy")
# Process results
for swarm_id, swarm_results in results.items():
print(f"Swarm {swarm_id}:")
for iteration in swarm_results:
print(f" Iteration {iteration}: {swarm_results[iteration]}")
Log Collection
# Collect logs with filters
logs = api.collect_logs(
swarm_id=swarm_id,
severity=["ERROR", "WARNING"],
limit=100,
sort_order="desc"
)
for log in logs:
print(f"[{log['timestamp']}] {log['message']}")
Streaming Operations¶
The async API supports streaming for real-time data:
import asyncio
from manta import AsyncUserAPI
async def stream_example():
api = AsyncUserAPI(token="token", host="localhost", port=50052)
# Stream clusters
async for cluster in api.stream_clusters():
print(f"Cluster: {cluster}")
# Stream swarms
async for swarm in api.stream_swarms():
print(f"Swarm: {swarm}")
# Stream results
async for result in api.stream_results(swarm_id, "metrics"):
print(f"Result: {result}")
# Stream logs
async for log in api.stream_logs(swarm_id):
print(f"Log: {log}")
asyncio.run(stream_example())
Error Handling¶
from manta import UserAPI
from manta.errors import (
AuthenticationError,
ConnectionError,
ResourceNotFoundError,
ValidationError
)
try:
api = UserAPI(token="token", host="localhost", port=50052)
result = api.deploy_swarm("cluster_id", swarm)
except AuthenticationError as e:
print(f"Authentication failed: {e}")
# Refresh token or re-authenticate
except ConnectionError as e:
print(f"Connection failed: {e}")
# Check network and service status
except ResourceNotFoundError as e:
print(f"Resource not found: {e}")
# Verify resource IDs
except ValidationError as e:
print(f"Validation error: {e}")
# Check input parameters
except Exception as e:
print(f"Unexpected error: {e}")
Configuration¶
From Configuration File
from manta import UserAPI
from manta.config import load_config
# Load from default location (~/.manta/config.toml)
config = load_config()
api = UserAPI(**config)
# Load specific profile
config = load_config(profile="production")
api = UserAPI(**config)
From Environment Variables
import os
from manta import UserAPI
# Set environment variables
os.environ["MANTA_HOST"] = "platform.manta.io"
os.environ["MANTA_PORT"] = "50052"
os.environ["MANTA_TOKEN"] = "your_token"
# API will use environment variables
api = UserAPI.from_env()
Performance Tips¶
Use Async API for I/O-heavy operations
Reuse API instances - Don’t create new connections repeatedly
Use streaming for large datasets - Avoid loading everything into memory
Batch operations when possible - Reduce round trips
Handle pagination - Use limits and offsets for large lists
Cache results - Avoid redundant API calls
Complete Example¶
from manta import UserAPI, Swarm, Task, Module
# Define a simple swarm
class SimpleSwarm(Swarm):
name = "example_swarm"
def execute(self):
module = Module(
name="processor",
python_program="print('Processing...')",
image="python:3.10"
)
task = Task(module=module, method="all")
return task
# Deploy and monitor
def main():
# Initialize API
api = UserAPI(
token="your_token",
host="localhost",
port=50052
)
# Check availability
if not api.is_available():
print("Service unavailable")
return
# List clusters
clusters = api.list_cluster_ids()
if not clusters:
print("No clusters available")
return
cluster_id = clusters[0]
# Deploy swarm
swarm = SimpleSwarm()
result = api.deploy_swarm(cluster_id, swarm)
swarm_id = result["swarm_id"]
print(f"Deployed swarm: {swarm_id}")
# Monitor status
import time
while True:
status = api.get_swarm(swarm_id)
print(f"Status: {status['status']}")
if status['status'] in ['COMPLETED', 'FAILED']:
break
time.sleep(5)
# Collect results
if status['status'] == 'COMPLETED':
results = api.select_results(swarm_id, tag="output")
print(f"Results: {results}")
# Get logs
logs = api.collect_logs(swarm_id, limit=10)
for log in logs:
print(f"[{log['timestamp']}] {log['message']}")
if __name__ == "__main__":
main()
Best Practices¶
Always check service availability before operations
Use appropriate API variant (sync vs async)
Handle errors gracefully with proper exception handling
Close connections properly when done
Use context managers for automatic cleanup
Validate inputs before API calls
Monitor rate limits if applicable
Next Steps¶
Authentication and Security - Authentication and security
Using the API in Jupyter Notebooks - Using the API in Jupyter notebooks