Using the API in Jupyter Notebooks¶
The Manta Python API is designed to work seamlessly in Jupyter notebooks for interactive development and analysis.
Setup¶
Installation
# In a notebook cell
!pip install manta-sdk[full]
# Or for minimal installation
!pip install manta-sdk[api]
Import and Configuration
from manta import UserAPI, Swarm, Task, Module
import pandas as pd
import matplotlib.pyplot as plt
# Configure API
api = UserAPI(
token="your_token",
host="localhost",
port=50052
)
# Verify connection
if api.is_available():
print("✅ Connected to Manta platform")
else:
print("❌ Connection failed")
Interactive Development¶
Exploring Resources
# List clusters interactively
clusters = api.get_clusters()
df_clusters = pd.DataFrame(clusters)
df_clusters
# Visualize cluster resources
import plotly.express as px
fig = px.bar(
df_clusters,
x='name',
y='node_count',
title='Nodes per Cluster'
)
fig.show()
Module Development
# Develop module interactively
%%writefile trainer.py
from manta.light import Local, World, Results
import numpy as np
def train_model():
# Load data
local = Local()
data = local.load_data("training_set")
# Get global parameters
world = World()
learning_rate = world.get("learning_rate", 0.01)
# Training logic
model = train(data, learning_rate)
# Save results
results = Results()
results.save({"accuracy": 0.95}, tag="metrics")
if __name__ == "__main__":
train_model()
# Create module from file
with open("trainer.py", "r") as f:
code = f.read()
module = Module(
name="notebook_trainer",
python_program=code,
image="python:3.10"
)
# Upload module
module_id = api.send_module(module)
print(f"Module uploaded: {module_id}")
Real-time Monitoring¶
Status Monitoring
from IPython.display import display, clear_output
import time
def monitor_swarm(swarm_id, interval=5):
"""Monitor swarm execution in notebook"""
while True:
clear_output(wait=True)
# Get status
status = api.get_swarm(swarm_id)
# Display status
display(pd.DataFrame([status]))
# Check if complete
if status['status'] in ['COMPLETED', 'FAILED']:
break
time.sleep(interval)
# Start monitoring
monitor_swarm(swarm_id)
Progress Visualization
from ipywidgets import widgets
from IPython.display import display
def create_progress_monitor(swarm_id):
"""Create interactive progress monitor"""
# Create widgets
progress = widgets.FloatProgress(
value=0,
min=0,
max=100,
description='Progress:'
)
status_text = widgets.HTML()
display(progress, status_text)
# Update function
def update():
swarm = api.get_swarm(swarm_id)
progress.value = swarm.get('progress', 0)
status_text.value = f"<b>Status:</b> {swarm['status']}"
# Timer for updates
timer = widgets.Timer(interval=2000)
timer.callback = update
timer.start()
return timer
# Create monitor
monitor = create_progress_monitor(swarm_id)
Result Analysis¶
Loading Results
# Get results
results = api.select_results(swarm_id, tag="metrics")
# Convert to DataFrame
data = []
for swarm_id, swarm_results in results.items():
for iteration in swarm_results:
for node_id, node_data in swarm_results[iteration].items():
data.append({
'iteration': iteration,
'node': node_id,
**node_data.get('metrics', {})
})
df_results = pd.DataFrame(data)
df_results.head()
Visualizing Results
# Plot training metrics
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
# Accuracy over iterations
df_results.groupby('iteration')['accuracy'].mean().plot(
ax=axes[0],
title='Average Accuracy',
ylabel='Accuracy',
xlabel='Iteration'
)
# Loss over iterations
df_results.groupby('iteration')['loss'].mean().plot(
ax=axes[1],
title='Average Loss',
ylabel='Loss',
xlabel='Iteration'
)
plt.tight_layout()
plt.show()
Statistical Analysis
# Aggregate statistics
stats = df_results.groupby('iteration').agg({
'accuracy': ['mean', 'std', 'min', 'max'],
'loss': ['mean', 'std', 'min', 'max']
})
stats
Interactive Widgets¶
Parameter Exploration
from ipywidgets import interact
@interact(
learning_rate=(0.001, 0.1, 0.001),
batch_size=[16, 32, 64, 128],
epochs=(1, 50, 1)
)
def deploy_with_params(learning_rate, batch_size, epochs):
"""Deploy swarm with interactive parameters"""
# Create swarm with parameters
swarm = MySwarm()
swarm.set_global("learning_rate", learning_rate)
swarm.set_global("batch_size", batch_size)
swarm.set_global("epochs", epochs)
# Deploy
result = api.deploy_swarm(cluster_id, swarm)
print(f"Deployed with LR={learning_rate}, BS={batch_size}, E={epochs}")
print(f"Swarm ID: {result['swarm_id']}")
Cluster Selection
# Interactive cluster selector
clusters = api.list_cluster_ids()
cluster_dropdown = widgets.Dropdown(
options=clusters,
description='Cluster:',
value=clusters[0] if clusters else None
)
def on_cluster_change(change):
cluster_id = change['new']
cluster = api.get_cluster(cluster_id)
print(f"Selected: {cluster['name']} ({cluster['node_count']} nodes)")
cluster_dropdown.observe(on_cluster_change, names='value')
display(cluster_dropdown)
Log Analysis¶
Streaming Logs
from IPython.display import HTML
def display_logs(swarm_id, max_lines=50):
"""Display formatted logs in notebook"""
logs = api.collect_logs(
swarm_id=swarm_id,
limit=max_lines,
sort_order="desc"
)
# Format as HTML table
html = "<table style='width:100%'>"
html += "<tr><th>Time</th><th>Node</th><th>Message</th></tr>"
for log in logs:
severity_color = {
'ERROR': 'red',
'WARNING': 'orange',
'INFO': 'blue'
}.get(log.get('severity', 'INFO'), 'black')
html += f"<tr>"
html += f"<td>{log['timestamp']}</td>"
html += f"<td>{log.get('node_id', 'N/A')[:8]}</td>"
html += f"<td style='color:{severity_color}'>{log['message']}</td>"
html += f"</tr>"
html += "</table>"
display(HTML(html))
# Display logs
display_logs(swarm_id)
Error Analysis
# Analyze errors
errors = api.collect_logs(
swarm_id=swarm_id,
severity=["ERROR"],
limit=100
)
# Group errors by type
error_types = {}
for error in errors:
msg = error['message']
error_type = msg.split(':')[0] if ':' in msg else 'Unknown'
error_types[error_type] = error_types.get(error_type, 0) + 1
# Visualize error distribution
pd.Series(error_types).plot(kind='bar', title='Error Distribution')
plt.ylabel('Count')
plt.show()
Async Operations in Notebooks¶
Using AsyncIO
import asyncio
from manta import AsyncUserAPI
async def async_operations():
"""Async operations in notebook"""
api = AsyncUserAPI(token=token, host=host, port=port)
# Stream results
results = []
async for result in api.stream_results(swarm_id, "metrics"):
results.append(result)
if len(results) >= 10:
break
return results
# Run async code in notebook
results = await async_operations() # In Jupyter
# or
results = asyncio.run(async_operations()) # In regular Python
Best Practices¶
1. Resource Management
# Clean up resources
try:
# Your operations
result = api.deploy_swarm(cluster_id, swarm)
finally:
# Stop swarm if still running
if swarm_id:
api.stop_swarm(swarm_id)
2. Error Handling
from IPython.display import display, Markdown
try:
result = api.deploy_swarm(cluster_id, swarm)
except Exception as e:
display(Markdown(f"**Error:** {str(e)}"))
# Show helpful debugging info
display(Markdown("### Debugging Steps:"))
display(Markdown("1. Check connection: `api.is_available()`"))
display(Markdown("2. Verify cluster: `api.get_cluster(cluster_id)`"))
display(Markdown("3. Check logs: `api.collect_logs(swarm_id)`"))
3. Checkpoint Results
# Save intermediate results
def checkpoint_results(results, filename):
"""Save results to file for recovery"""
import pickle
with open(filename, 'wb') as f:
pickle.dump(results, f)
print(f"Checkpoint saved: {filename}")
# Periodically save
results = api.select_results(swarm_id, "metrics")
checkpoint_results(results, f"results_{swarm_id}.pkl")
4. Notebook Organization
# Use markdown cells for documentation
# Split into logical sections:
# 1. Setup and Configuration
# 2. Data Preparation
# 3. Swarm Definition
# 4. Deployment and Monitoring
# 5. Result Analysis
# 6. Visualization
Example Notebook¶
Complete example notebook structure:
# Cell 1: Setup
from manta import UserAPI, Swarm, Task, Module
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
api = UserAPI(token="token", host="localhost", port=50052)
# Cell 2: Define Swarm
class ExperimentSwarm(Swarm):
name = "notebook_experiment"
def execute(self):
# Define tasks
pass
# Cell 3: Deploy
swarm = ExperimentSwarm()
result = api.deploy_swarm("cluster_id", swarm)
swarm_id = result['swarm_id']
# Cell 4: Monitor
# Interactive monitoring widgets
# Cell 5: Results
results = api.select_results(swarm_id, "metrics")
df = pd.DataFrame(results)
# Cell 6: Visualization
df.plot(kind='line', x='iteration', y='accuracy')
plt.show()
Next Steps¶
Authentication and Security - Authentication setup
Python User API - Back to User API guide