Using the API in Jupyter Notebooks

The Manta Python API is designed to work seamlessly in Jupyter notebooks for interactive development and analysis.

Setup

Installation

# In a notebook cell
!pip install manta-sdk[full]

# Or for minimal installation
!pip install manta-sdk[api]

Import and Configuration

from manta import UserAPI, Swarm, Task, Module
import pandas as pd
import matplotlib.pyplot as plt

# Configure API
api = UserAPI(
    token="your_token",
    host="localhost",
    port=50052
)

# Verify connection
if api.is_available():
    print("✅ Connected to Manta platform")
else:
    print("❌ Connection failed")

Interactive Development

Exploring Resources

# List clusters interactively
clusters = api.get_clusters()
df_clusters = pd.DataFrame(clusters)
df_clusters
# Visualize cluster resources
import plotly.express as px

fig = px.bar(
    df_clusters,
    x='name',
    y='node_count',
    title='Nodes per Cluster'
)
fig.show()

Module Development

# Develop module interactively
%%writefile trainer.py

from manta.light import Local, World, Results
import numpy as np

def train_model():
    # Load data
    local = Local()
    data = local.load_data("training_set")

    # Get global parameters
    world = World()
    learning_rate = world.get("learning_rate", 0.01)

    # Training logic
    model = train(data, learning_rate)

    # Save results
    results = Results()
    results.save({"accuracy": 0.95}, tag="metrics")

if __name__ == "__main__":
    train_model()
# Create module from file
with open("trainer.py", "r") as f:
    code = f.read()

module = Module(
    name="notebook_trainer",
    python_program=code,
    image="python:3.10"
)

# Upload module
module_id = api.send_module(module)
print(f"Module uploaded: {module_id}")

Real-time Monitoring

Status Monitoring

from IPython.display import display, clear_output
import time

def monitor_swarm(swarm_id, interval=5):
    """Monitor swarm execution in notebook"""
    while True:
        clear_output(wait=True)

        # Get status
        status = api.get_swarm(swarm_id)

        # Display status
        display(pd.DataFrame([status]))

        # Check if complete
        if status['status'] in ['COMPLETED', 'FAILED']:
            break

        time.sleep(interval)

# Start monitoring
monitor_swarm(swarm_id)

Progress Visualization

from ipywidgets import widgets
from IPython.display import display

def create_progress_monitor(swarm_id):
    """Create interactive progress monitor"""

    # Create widgets
    progress = widgets.FloatProgress(
        value=0,
        min=0,
        max=100,
        description='Progress:'
    )
    status_text = widgets.HTML()

    display(progress, status_text)

    # Update function
    def update():
        swarm = api.get_swarm(swarm_id)
        progress.value = swarm.get('progress', 0)
        status_text.value = f"<b>Status:</b> {swarm['status']}"

    # Timer for updates
    timer = widgets.Timer(interval=2000)
    timer.callback = update
    timer.start()

    return timer

# Create monitor
monitor = create_progress_monitor(swarm_id)

Result Analysis

Loading Results

# Get results
results = api.select_results(swarm_id, tag="metrics")

# Convert to DataFrame
data = []
for swarm_id, swarm_results in results.items():
    for iteration in swarm_results:
        for node_id, node_data in swarm_results[iteration].items():
            data.append({
                'iteration': iteration,
                'node': node_id,
                **node_data.get('metrics', {})
            })

df_results = pd.DataFrame(data)
df_results.head()

Visualizing Results

# Plot training metrics
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

# Accuracy over iterations
df_results.groupby('iteration')['accuracy'].mean().plot(
    ax=axes[0],
    title='Average Accuracy',
    ylabel='Accuracy',
    xlabel='Iteration'
)

# Loss over iterations
df_results.groupby('iteration')['loss'].mean().plot(
    ax=axes[1],
    title='Average Loss',
    ylabel='Loss',
    xlabel='Iteration'
)

plt.tight_layout()
plt.show()

Statistical Analysis

# Aggregate statistics
stats = df_results.groupby('iteration').agg({
    'accuracy': ['mean', 'std', 'min', 'max'],
    'loss': ['mean', 'std', 'min', 'max']
})

stats

Interactive Widgets

Parameter Exploration

from ipywidgets import interact

@interact(
    learning_rate=(0.001, 0.1, 0.001),
    batch_size=[16, 32, 64, 128],
    epochs=(1, 50, 1)
)
def deploy_with_params(learning_rate, batch_size, epochs):
    """Deploy swarm with interactive parameters"""

    # Create swarm with parameters
    swarm = MySwarm()
    swarm.set_global("learning_rate", learning_rate)
    swarm.set_global("batch_size", batch_size)
    swarm.set_global("epochs", epochs)

    # Deploy
    result = api.deploy_swarm(cluster_id, swarm)
    print(f"Deployed with LR={learning_rate}, BS={batch_size}, E={epochs}")
    print(f"Swarm ID: {result['swarm_id']}")

Cluster Selection

# Interactive cluster selector
clusters = api.list_cluster_ids()

cluster_dropdown = widgets.Dropdown(
    options=clusters,
    description='Cluster:',
    value=clusters[0] if clusters else None
)

def on_cluster_change(change):
    cluster_id = change['new']
    cluster = api.get_cluster(cluster_id)
    print(f"Selected: {cluster['name']} ({cluster['node_count']} nodes)")

cluster_dropdown.observe(on_cluster_change, names='value')
display(cluster_dropdown)

Log Analysis

Streaming Logs

from IPython.display import HTML

def display_logs(swarm_id, max_lines=50):
    """Display formatted logs in notebook"""

    logs = api.collect_logs(
        swarm_id=swarm_id,
        limit=max_lines,
        sort_order="desc"
    )

    # Format as HTML table
    html = "<table style='width:100%'>"
    html += "<tr><th>Time</th><th>Node</th><th>Message</th></tr>"

    for log in logs:
        severity_color = {
            'ERROR': 'red',
            'WARNING': 'orange',
            'INFO': 'blue'
        }.get(log.get('severity', 'INFO'), 'black')

        html += f"<tr>"
        html += f"<td>{log['timestamp']}</td>"
        html += f"<td>{log.get('node_id', 'N/A')[:8]}</td>"
        html += f"<td style='color:{severity_color}'>{log['message']}</td>"
        html += f"</tr>"

    html += "</table>"

    display(HTML(html))

# Display logs
display_logs(swarm_id)

Error Analysis

# Analyze errors
errors = api.collect_logs(
    swarm_id=swarm_id,
    severity=["ERROR"],
    limit=100
)

# Group errors by type
error_types = {}
for error in errors:
    msg = error['message']
    error_type = msg.split(':')[0] if ':' in msg else 'Unknown'
    error_types[error_type] = error_types.get(error_type, 0) + 1

# Visualize error distribution
pd.Series(error_types).plot(kind='bar', title='Error Distribution')
plt.ylabel('Count')
plt.show()

Async Operations in Notebooks

Using AsyncIO

import asyncio
from manta import AsyncUserAPI

async def async_operations():
    """Async operations in notebook"""
    api = AsyncUserAPI(token=token, host=host, port=port)

    # Stream results
    results = []
    async for result in api.stream_results(swarm_id, "metrics"):
        results.append(result)
        if len(results) >= 10:
            break

    return results

# Run async code in notebook
results = await async_operations()  # In Jupyter
# or
results = asyncio.run(async_operations())  # In regular Python

Best Practices

1. Resource Management

# Clean up resources
try:
    # Your operations
    result = api.deploy_swarm(cluster_id, swarm)
finally:
    # Stop swarm if still running
    if swarm_id:
        api.stop_swarm(swarm_id)

2. Error Handling

from IPython.display import display, Markdown

try:
    result = api.deploy_swarm(cluster_id, swarm)
except Exception as e:
    display(Markdown(f"**Error:** {str(e)}"))
    # Show helpful debugging info
    display(Markdown("### Debugging Steps:"))
    display(Markdown("1. Check connection: `api.is_available()`"))
    display(Markdown("2. Verify cluster: `api.get_cluster(cluster_id)`"))
    display(Markdown("3. Check logs: `api.collect_logs(swarm_id)`"))

3. Checkpoint Results

# Save intermediate results
def checkpoint_results(results, filename):
    """Save results to file for recovery"""
    import pickle

    with open(filename, 'wb') as f:
        pickle.dump(results, f)

    print(f"Checkpoint saved: {filename}")

# Periodically save
results = api.select_results(swarm_id, "metrics")
checkpoint_results(results, f"results_{swarm_id}.pkl")

4. Notebook Organization

# Use markdown cells for documentation
# Split into logical sections:
# 1. Setup and Configuration
# 2. Data Preparation
# 3. Swarm Definition
# 4. Deployment and Monitoring
# 5. Result Analysis
# 6. Visualization

Example Notebook

Complete example notebook structure:

# Cell 1: Setup
from manta import UserAPI, Swarm, Task, Module
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

api = UserAPI(token="token", host="localhost", port=50052)

# Cell 2: Define Swarm
class ExperimentSwarm(Swarm):
    name = "notebook_experiment"

    def execute(self):
        # Define tasks
        pass

# Cell 3: Deploy
swarm = ExperimentSwarm()
result = api.deploy_swarm("cluster_id", swarm)
swarm_id = result['swarm_id']

# Cell 4: Monitor
# Interactive monitoring widgets

# Cell 5: Results
results = api.select_results(swarm_id, "metrics")
df = pd.DataFrame(results)

# Cell 6: Visualization
df.plot(kind='line', x='iteration', y='accuracy')
plt.show()

Next Steps