Swarm Basics¶
Understanding the fundamental concepts of Swarm development in Manta.
What is a Swarm?¶
A Swarm is a distributed algorithm that coordinates multiple tasks across computing nodes. It consists of:
Task definitions - What computations to perform
Execution graph - How tasks relate and flow
Global state - Shared parameters and data
Network configuration - Communication topology
Creating Your First Swarm¶
Basic Structure
Every Swarm inherits from the base Swarm
class:
from manta import Swarm
class MyFirstSwarm(Swarm):
"""A simple distributed computation"""
# Optional: Set swarm name
name = "my_first_swarm"
def __init__(self):
super().__init__()
# Initialize swarm configuration
pass
def execute(self):
"""Define the task execution graph"""
# Must return a Task or list of Tasks
pass
The execute() Method
The execute()
method is where you define your algorithm’s workflow:
def execute(self):
"""Define task graph for the swarm"""
# Create tasks
task1 = Task(module=module1)
task2 = Task(module=module2)
# Connect tasks (task2 runs after task1)
return task2(task1)
Essential Components¶
1. Modules
Modules contain the actual code that runs on nodes:
from manta import Module
# From Python code string
module = Module(
name="processor",
python_program="""
def main():
print("Processing data...")
# Your computation here
if __name__ == "__main__":
main()
""",
image="python:3.10"
)
# From file
with open("processor.py", "r") as f:
module = Module(
name="processor",
python_program=f.read(),
image="manta:latest"
)
2. Tasks
Tasks are instances of modules with specific configuration:
from manta import Task
task = Task(
module=module, # Module to execute
method="all", # Run on all nodes
alias="data_processor" # Friendly name
)
3. Task Graph
Connect tasks to define execution flow:
# Sequential execution
task_a = Task(module_a)
task_b = Task(module_b)
task_c = Task(module_c)
# Chain: A -> B -> C
flow = task_c(task_b(task_a))
# Parallel with aggregation
workers = [Task(worker_module) for _ in range(5)]
aggregator = Task(aggregator_module)
# All workers -> aggregator
flow = aggregator(workers)
Global State Management¶
Swarms can maintain global state accessible to all tasks:
Setting Global Values
class StatefulSwarm(Swarm):
def __init__(self):
super().__init__()
# Set simple values
self.set_global("iteration", 0)
self.set_global("threshold", 0.95)
# Set complex objects
self.set_global("config", {
"batch_size": 32,
"learning_rate": 0.01,
"epochs": 10
})
# Set binary data
import pickle
model_bytes = pickle.dumps(initial_model)
self.set_global("model", model_bytes)
Accessing Globals in Tasks
Within task code, access globals using the World API:
# In your task module code
from manta.light import World
world = World()
# Get global values
iteration = world.get("iteration")
config = world.get("config")
model_bytes = world.get("model")
# Update globals
world.set("iteration", iteration + 1)
Complete Example¶
Here’s a complete working swarm:
from manta import Swarm, Task, Module
class DataProcessingSwarm(Swarm):
"""Distributed data processing pipeline"""
name = "data_pipeline"
def __init__(self):
super().__init__()
# Set processing parameters
self.set_global("chunk_size", 1000)
self.set_global("output_format", "parquet")
def execute(self):
# Data loading module
loader_code = """
import random
from manta.light import Results
def main():
# Simulate data loading
data = [random.random() for _ in range(1000)]
# Save intermediate results
results = Results()
results.save({"data": data}, tag="raw_data")
print(f"Loaded {len(data)} records")
if __name__ == "__main__":
main()
"""
loader_module = Module(
name="data_loader",
python_program=loader_code,
image="python:3.10"
)
# Processing module
processor_code = """
from manta.light import World, Results
def main():
world = World()
chunk_size = world.get("chunk_size")
# Process data in chunks
print(f"Processing with chunk size: {chunk_size}")
# Save processed results
results = Results()
results.save({"status": "completed"}, tag="processing")
if __name__ == "__main__":
main()
"""
processor_module = Module(
name="data_processor",
python_program=processor_code,
image="python:3.10"
)
# Create tasks
loaders = [
Task(
module=loader_module,
method="all",
alias=f"loader_{i}"
)
for i in range(3)
]
processor = Task(
module=processor_module,
method="any",
maximum=1,
alias="processor"
)
# Define flow: all loaders -> processor
return processor(loaders)
Deployment¶
Deploy your swarm using the UserAPI:
from manta import UserAPI
# Initialize API
api = UserAPI(
token="your_token",
host="localhost",
port=50052
)
# Create and deploy swarm
swarm = DataProcessingSwarm()
# Deploy to cluster
result = api.deploy_swarm("cluster_id", swarm)
print(f"Deployed swarm: {result['swarm_id']}")
# Monitor execution
status = api.get_swarm(result['swarm_id'])
print(f"Status: {status['status']}")
Key Concepts Summary¶
Swarm: Container for your distributed algorithm
Module: Packaged code that executes on nodes
Task: Configured instance of a module
Graph: Execution flow connecting tasks
Globals: Shared state across all tasks
execute(): Method defining the task graph
Common Patterns¶
Sequential Pipeline
def execute(self):
extract = Task(extract_module)
transform = Task(transform_module)
load = Task(load_module)
# ETL pipeline
return load(transform(extract))
Fan-out/Fan-in
def execute(self):
# Fan-out to workers
workers = [Task(worker_module) for _ in range(10)]
# Fan-in to aggregator
aggregator = Task(aggregator_module, maximum=1)
return aggregator(workers)
Iterative Processing
def execute(self):
# Tasks that loop back
trainer = Task(trainer_module, method="all")
evaluator = Task(evaluator_module, maximum=1)
scheduler = Task(scheduler_module, maximum=1)
# Create cycle for iterations
return scheduler(evaluator(trainer))
Next Steps¶
Task Configuration - Deep dive into task configuration
Swarm Development Guide - Back to swarm development guide