Swarm Development Guide

Learn how to develop distributed algorithms using the Manta Swarm API. This guide covers the complete process from basic concepts to advanced orchestration patterns.

Note

A Swarm represents a distributed computing workflow that coordinates multiple tasks across nodes in a cluster.

Overview

The Swarm API provides a high-level abstraction for defining distributed algorithms:

  • Tasks: Individual units of computation

  • Graphs: Task dependencies and execution flow

  • Modules: Packaged code for task execution

  • Orchestration: Coordination and scheduling

Core Concepts

Swarm

A complete distributed algorithm definition including tasks, dependencies, and global state.

Task

An individual computation unit that executes on one or more nodes.

Module

Packaged Python code that implements task logic.

Graph

The execution flow defining task dependencies and conditions.

Scheduling

Rules for task distribution across nodes.

Quick Example

from manta import Swarm, Task, Module

class MySwarm(Swarm):
    """Simple federated learning swarm"""

    def __init__(self):
        super().__init__()
        self.name = "federated_mnist"

        # Set global parameters
        self.set_global("epochs", 10)
        self.set_global("batch_size", 32)

    def execute(self):
        # Define module
        fl_module = Module(
            name="fl_trainer",
            python_program="trainer.py",
            image="manta:pytorch"
        )

        # Create tasks
        aggregator = Task(
            module=fl_module,
            method="any",
            maximum=1,
            alias="aggregator"
        )

        workers = Task(
            module=fl_module,
            method="all",
            alias="workers"
        )

        # Define execution flow
        return aggregator(workers)

Development Workflow

  1. Design Your Algorithm - Identify computation units - Define data flow - Plan resource requirements

  2. Create Task Modules - Write task implementation - Define entry points - Specify dependencies

  3. Build Task Graph - Connect tasks with dependencies - Set execution conditions - Configure scheduling

  4. Configure Swarm - Set global parameters - Define network settings - Configure fault tolerance

  5. Deploy and Monitor - Deploy to cluster - Monitor execution - Collect results

Key Components

Swarm Class

The base class for all swarms:

from manta import Swarm

class MySwarm(Swarm):
    name = "my_algorithm"

    def execute(self):
        # Define task graph
        pass

Task Definition

Tasks represent computation units:

from manta import Task, Module

task = Task(
    module=module,          # Module to execute
    method="all",          # Scheduling method
    fixed=False,           # Dynamic node assignment
    maximum=10,            # Max nodes to use
    network="overlay",     # Network type
    gpu=False,            # GPU requirement
    alias="worker"        # Task identifier
)

Module Packaging

Modules contain task implementation:

from manta import Module

module = Module(
    name="my_module",
    python_program=code,    # Python code or file
    image="manta:latest",   # Container image
    requirements=["numpy"], # Dependencies
    resources={            # Resource requirements
        "cpu": 2,
        "memory": "4Gi"
    }
)

Graph Construction

Connect tasks to form execution flow:

# Sequential execution
task1 = Task(module1)
task2 = Task(module2)
task3 = Task(module3)

# Chain tasks
flow = task3(task2(task1))

# Parallel execution
parallel_tasks = [Task(module) for _ in range(5)]
aggregator = Task(aggregator_module)

# Aggregate parallel results
flow = aggregator(parallel_tasks)

Execution Flow

Task Lifecycle

  1. Initialization: Task is created and configured

  2. Scheduling: Task assigned to nodes

  3. Execution: Code runs on assigned nodes

  4. Result Collection: Outputs gathered

  5. Condition Check: Next tasks triggered

Scheduling Methods

  • "all": Execute on all available nodes

  • "any": Execute on any single node

  • "fixed": Execute on specific nodes

  • "maximum": Execute on up to N nodes

Execution Conditions

  • FINISHED: Trigger when task completes

  • RESULT: Trigger when result available

Advanced Features

Global State Management

class StatefulSwarm(Swarm):
    def __init__(self):
        super().__init__()
        # Set global parameters
        self.set_global("model_weights", initial_weights)
        self.set_global("hyperparameters", {
            "learning_rate": 0.01,
            "momentum": 0.9
        })

Network Configuration

class NetworkedSwarm(Swarm):
    def __init__(self):
        super().__init__()
        # Add custom networks
        self.add_network("backend", Driver.BRIDGE)
        self.add_network("frontend", Driver.OVERLAY)

    def execute(self):
        # Assign tasks to networks
        backend_task = Task(
            module=backend_module,
            network="backend"
        )

Dynamic Task Generation

class DynamicSwarm(Swarm):
    def execute(self):
        # Generate tasks based on conditions
        num_workers = self.get_global("num_workers", 5)

        workers = [
            Task(
                module=worker_module,
                alias=f"worker_{i}"
            )
            for i in range(num_workers)
        ]

        return self.aggregator(workers)

Error Handling

Task Failure Recovery

task = Task(
    module=module,
    method="any",
    maximum=3,  # Try up to 3 nodes
    excepted_ids=["failed_node_1"]  # Exclude failed nodes
)

Validation

# Validate swarm before deployment
swarm = MySwarm()
graph = swarm.prepare_graph()

# Check for cycles
if has_cycles(graph):
    raise ValueError("Task graph contains cycles")

Performance Optimization

Resource Allocation

# Optimize resource usage
task = Task(
    module=module,
    method="any",
    maximum=0.5,  # Use 50% of available nodes
    gpu=True      # Require GPU nodes
)

Data Locality

# Keep data local to nodes
task = Task(
    module=module,
    fixed=True,  # Pin to specific nodes
    specified_ids=["node1", "node2"]  # Data-local nodes
)

Examples

Federated Learning

class FederatedLearning(Swarm):
    name = "fl_experiment"

    def execute(self):
        # Training tasks on each node
        trainers = Task(
            module=trainer_module,
            method="all",
            alias="trainers"
        )

        # Aggregation on single node
        aggregator = Task(
            module=aggregator_module,
            method="any",
            maximum=1,
            alias="aggregator"
        )

        # Scheduler for iterations
        scheduler = Task(
            module=scheduler_module,
            method="any",
            maximum=1,
            alias="scheduler"
        )

        # Connect in cycle for iterations
        return scheduler(aggregator(trainers))

MapReduce Pattern

class MapReduce(Swarm):
    name = "mapreduce_job"

    def execute(self):
        # Map phase - parallel processing
        mappers = [
            Task(
                module=map_module,
                alias=f"mapper_{i}"
            )
            for i in range(10)
        ]

        # Reduce phase - aggregation
        reducer = Task(
            module=reduce_module,
            method="any",
            maximum=1,
            alias="reducer"
        )

        # Connect mappers to reducer
        return reducer(mappers)

Best Practices

  1. Modular Design: Keep tasks focused and reusable

  2. Error Handling: Implement retry logic and fallbacks

  3. Resource Planning: Set appropriate resource limits

  4. Testing: Test locally before cluster deployment

  5. Monitoring: Add logging and metrics collection

  6. Documentation: Document task dependencies and flow

Next Steps