Swarm Basics

Understanding the fundamental concepts of Swarm development in Manta.

What is a Swarm?

A Swarm is a distributed algorithm that coordinates multiple tasks across computing nodes. It consists of:

  • Task definitions - What computations to perform

  • Execution graph - How tasks relate and flow

  • Global state - Shared parameters and data

  • Network configuration - Communication topology

Creating Your First Swarm

Basic Structure

Every Swarm inherits from the base Swarm class:

from manta import Swarm

class MyFirstSwarm(Swarm):
    """A simple distributed computation"""

    # Optional: Set swarm name
    name = "my_first_swarm"

    def __init__(self):
        super().__init__()
        # Initialize swarm configuration
        pass

    def execute(self):
        """Define the task execution graph"""
        # Must return a Task or list of Tasks
        pass

The execute() Method

The execute() method is where you define your algorithm’s workflow:

def execute(self):
    """Define task graph for the swarm"""

    # Create tasks
    task1 = Task(module=module1)
    task2 = Task(module=module2)

    # Connect tasks (task2 runs after task1)
    return task2(task1)

Essential Components

1. Modules

Modules contain the actual code that runs on nodes:

from manta import Module

# From Python code string
module = Module(
    name="processor",
    python_program="""
    def main():
        print("Processing data...")
        # Your computation here

    if __name__ == "__main__":
        main()
    """,
    image="python:3.10"
)

# From file
with open("processor.py", "r") as f:
    module = Module(
        name="processor",
        python_program=f.read(),
        image="manta:latest"
    )

2. Tasks

Tasks are instances of modules with specific configuration:

from manta import Task

task = Task(
    module=module,           # Module to execute
    method="all",           # Run on all nodes
    alias="data_processor"  # Friendly name
)

3. Task Graph

Connect tasks to define execution flow:

# Sequential execution
task_a = Task(module_a)
task_b = Task(module_b)
task_c = Task(module_c)

# Chain: A -> B -> C
flow = task_c(task_b(task_a))

# Parallel with aggregation
workers = [Task(worker_module) for _ in range(5)]
aggregator = Task(aggregator_module)

# All workers -> aggregator
flow = aggregator(workers)

Global State Management

Swarms can maintain global state accessible to all tasks:

Setting Global Values

class StatefulSwarm(Swarm):
    def __init__(self):
        super().__init__()

        # Set simple values
        self.set_global("iteration", 0)
        self.set_global("threshold", 0.95)

        # Set complex objects
        self.set_global("config", {
            "batch_size": 32,
            "learning_rate": 0.01,
            "epochs": 10
        })

        # Set binary data
        import pickle
        model_bytes = pickle.dumps(initial_model)
        self.set_global("model", model_bytes)

Accessing Globals in Tasks

Within task code, access globals using the World API:

# In your task module code
from manta.light import World

world = World()

# Get global values
iteration = world.get("iteration")
config = world.get("config")
model_bytes = world.get("model")

# Update globals
world.set("iteration", iteration + 1)

Complete Example

Here’s a complete working swarm:

from manta import Swarm, Task, Module

class DataProcessingSwarm(Swarm):
    """Distributed data processing pipeline"""

    name = "data_pipeline"

    def __init__(self):
        super().__init__()

        # Set processing parameters
        self.set_global("chunk_size", 1000)
        self.set_global("output_format", "parquet")

    def execute(self):
        # Data loading module
        loader_code = """
import random
from manta.light import Results

def main():
    # Simulate data loading
    data = [random.random() for _ in range(1000)]

    # Save intermediate results
    results = Results()
    results.save({"data": data}, tag="raw_data")

    print(f"Loaded {len(data)} records")

if __name__ == "__main__":
    main()
        """

        loader_module = Module(
            name="data_loader",
            python_program=loader_code,
            image="python:3.10"
        )

        # Processing module
        processor_code = """
from manta.light import World, Results

def main():
    world = World()
    chunk_size = world.get("chunk_size")

    # Process data in chunks
    print(f"Processing with chunk size: {chunk_size}")

    # Save processed results
    results = Results()
    results.save({"status": "completed"}, tag="processing")

if __name__ == "__main__":
    main()
        """

        processor_module = Module(
            name="data_processor",
            python_program=processor_code,
            image="python:3.10"
        )

        # Create tasks
        loaders = [
            Task(
                module=loader_module,
                method="all",
                alias=f"loader_{i}"
            )
            for i in range(3)
        ]

        processor = Task(
            module=processor_module,
            method="any",
            maximum=1,
            alias="processor"
        )

        # Define flow: all loaders -> processor
        return processor(loaders)

Deployment

Deploy your swarm using the UserAPI:

from manta import UserAPI

# Initialize API
api = UserAPI(
    token="your_token",
    host="localhost",
    port=50052
)

# Create and deploy swarm
swarm = DataProcessingSwarm()

# Deploy to cluster
result = api.deploy_swarm("cluster_id", swarm)
print(f"Deployed swarm: {result['swarm_id']}")

# Monitor execution
status = api.get_swarm(result['swarm_id'])
print(f"Status: {status['status']}")

Key Concepts Summary

  1. Swarm: Container for your distributed algorithm

  2. Module: Packaged code that executes on nodes

  3. Task: Configured instance of a module

  4. Graph: Execution flow connecting tasks

  5. Globals: Shared state across all tasks

  6. execute(): Method defining the task graph

Common Patterns

Sequential Pipeline

def execute(self):
    extract = Task(extract_module)
    transform = Task(transform_module)
    load = Task(load_module)

    # ETL pipeline
    return load(transform(extract))

Fan-out/Fan-in

def execute(self):
    # Fan-out to workers
    workers = [Task(worker_module) for _ in range(10)]

    # Fan-in to aggregator
    aggregator = Task(aggregator_module, maximum=1)

    return aggregator(workers)

Iterative Processing

def execute(self):
    # Tasks that loop back
    trainer = Task(trainer_module, method="all")
    evaluator = Task(evaluator_module, maximum=1)
    scheduler = Task(scheduler_module, maximum=1)

    # Create cycle for iterations
    return scheduler(evaluator(trainer))

Next Steps