Task Configuration

Detailed guide to configuring and optimizing tasks in your swarms.

Task Overview

A Task represents a unit of computation that executes on one or more nodes. Tasks are created from modules and configured with scheduling, resource, and networking parameters.

Creating Tasks

Basic Task Creation

from manta import Task, Module

# Create module first
module = Module(
    name="worker",
    python_program="worker.py",
    image="python:3.10"
)

# Create task from module
task = Task(
    module=module,
    alias="worker_task"
)

Full Task Parameters

task = Task(
    module=module,              # Required: Module to execute
    method="all",              # Scheduling method
    fixed=False,               # Dynamic vs fixed node assignment
    excepted_ids=None,         # Nodes to exclude
    specified_ids=None,        # Specific nodes to use
    maximum=-1,                # Max nodes (-1 = unlimited)
    network=None,              # Network name
    gpu=False,                 # GPU requirement
    alias="my_task"           # Task identifier
)

Scheduling Methods

The method parameter determines how tasks are distributed:

“all” - Execute on All Nodes

task = Task(
    module=module,
    method="all",  # Run on every available node
    alias="broadcast"
)

Use cases: - Data distribution - Model updates - Configuration changes

“any” - Execute on Any Node

task = Task(
    module=module,
    method="any",  # Run on any single available node
    alias="singleton"
)

Use cases: - Aggregation tasks - Coordination services - Single-point operations

“fixed” - Fixed Node Assignment

task = Task(
    module=module,
    method="any",
    fixed=True,  # Keep same nodes across iterations
    alias="stateful"
)

Use cases: - Stateful operations - Data locality requirements - Consistent routing

Node Selection

Maximum Nodes

Limit the number of nodes used:

# Use at most 5 nodes
task = Task(
    module=module,
    method="all",
    maximum=5,
    alias="limited"
)

# Use 50% of available nodes
task = Task(
    module=module,
    method="all",
    maximum=0.5,  # Fraction between 0 and 1
    alias="half_cluster"
)

Specific Node Selection

Target specific nodes:

# Use only specified nodes
task = Task(
    module=module,
    specified_ids=["node1", "node2", "node3"],
    alias="targeted"
)

# Exclude problematic nodes
task = Task(
    module=module,
    method="all",
    excepted_ids=["slow_node", "unstable_node"],
    alias="filtered"
)

Resource Requirements

GPU Tasks

# Require GPU nodes
gpu_task = Task(
    module=gpu_module,
    method="all",
    gpu=True,  # Only schedule on GPU-enabled nodes
    alias="ml_training"
)

Resource Constraints in Module

module = Module(
    name="heavy_compute",
    python_program=code,
    image="python:3.10",
    resources={
        "cpu": 4,          # 4 CPU cores
        "memory": "8Gi",   # 8 GB memory
        "gpu": 1           # 1 GPU
    }
)

task = Task(module=module)

Network Configuration

Tasks can be assigned to specific networks for isolation:

Network Assignment

class NetworkedSwarm(Swarm):
    def __init__(self):
        super().__init__()
        # Define networks
        self.add_network("frontend", Driver.OVERLAY)
        self.add_network("backend", Driver.BRIDGE)

    def execute(self):
        # Assign tasks to networks
        api_task = Task(
            module=api_module,
            network="frontend",
            alias="api"
        )

        db_task = Task(
            module=db_module,
            network="backend",
            alias="database"
        )

        return api_task(db_task)

Task Dependencies

Sequential Dependencies

# Task B depends on Task A
task_a = Task(module_a, alias="first")
task_b = Task(module_b, alias="second")

# B runs after A completes
flow = task_b(task_a)

Parallel Dependencies

# Multiple tasks feed into one
data_tasks = [
    Task(data_module, alias=f"data_{i}")
    for i in range(5)
]

processor = Task(processor_module, alias="processor")

# All data tasks must complete before processor
flow = processor(data_tasks)

Conditional Execution

from manta.graph import Condition

# Execute based on condition
task_a = Task(module_a)
task_b = Task(module_b)

# B runs when A produces a result
task_a.add_edge(task_b.uuid, Condition.RESULT)

# B runs when A finishes (regardless of result)
task_a.add_edge(task_b.uuid, Condition.FINISHED)

Advanced Task Patterns

Dynamic Task Generation

class DynamicSwarm(Swarm):
    def execute(self):
        # Generate tasks based on global config
        num_workers = self.globals.get("num_workers", 10)

        tasks = []
        for i in range(num_workers):
            task = Task(
                module=worker_module,
                alias=f"worker_{i}",
                maximum=1  # Each on different node
            )
            tasks.append(task)

        # Connect all to aggregator
        aggregator = Task(
            module=aggregator_module,
            method="any",
            maximum=1,
            alias="aggregator"
        )

        return aggregator(tasks)

Task Pools

class PooledSwarm(Swarm):
    def execute(self):
        # Create pool of identical workers
        worker_pool = [
            Task(
                module=worker_module,
                method="any",
                alias=f"pool_worker_{i}"
            )
            for i in range(20)
        ]

        # Load balancer distributes work
        balancer = Task(
            module=balancer_module,
            method="any",
            maximum=1,
            alias="load_balancer"
        )

        return balancer(worker_pool)

Hierarchical Tasks

class HierarchicalSwarm(Swarm):
    def execute(self):
        # Leaf workers
        workers = [
            Task(worker_module, alias=f"worker_{i}")
            for i in range(16)
        ]

        # Mid-level aggregators
        aggregators = []
        for i in range(4):
            agg = Task(
                aggregator_module,
                alias=f"aggregator_{i}"
            )
            # Connect 4 workers to each aggregator
            agg(workers[i*4:(i+1)*4])
            aggregators.append(agg)

        # Top-level coordinator
        coordinator = Task(
            coordinator_module,
            method="any",
            maximum=1,
            alias="coordinator"
        )

        return coordinator(aggregators)

Task Communication

Tasks communicate through:

1. Results

# In task module
from manta.light import Results

results = Results()
results.save({"output": processed_data}, tag="processed")

2. Global State

# In task module
from manta.light import World

world = World()
shared_data = world.get("shared_key")
world.set("shared_key", updated_data)

3. Local Storage

# In task module
from manta.light import Local

local = Local()
data = local.load_data("dataset_name")

Error Handling

Retry Logic

# Task with retry on failure
resilient_task = Task(
    module=module,
    method="any",
    maximum=3,  # Try up to 3 different nodes
    excepted_ids=failed_nodes,  # Exclude known bad nodes
    alias="resilient"
)

Timeout Configuration

module = Module(
    name="timeout_aware",
    python_program=code,
    image="python:3.10",
    timeout=300  # 5 minute timeout
)

Error Recovery

class RobustSwarm(Swarm):
    def execute(self):
        # Primary task
        primary = Task(
            module=primary_module,
            method="any",
            alias="primary"
        )

        # Fallback task
        fallback = Task(
            module=fallback_module,
            method="any",
            alias="fallback"
        )

        # Recovery coordinator
        recovery = Task(
            module=recovery_module,
            method="any",
            maximum=1,
            alias="recovery"
        )

        # Connect with error handling
        return recovery([primary, fallback])

Performance Optimization

Batch Processing

# Process in batches for efficiency
batch_task = Task(
    module=batch_module,
    method="all",
    maximum=10,  # Limit parallelism
    alias="batch_processor"
)

Data Locality

# Keep computation near data
local_task = Task(
    module=module,
    fixed=True,  # Pin to data nodes
    specified_ids=data_nodes,
    alias="local_compute"
)

Resource Balancing

# Distribute load evenly
balanced_task = Task(
    module=module,
    method="all",
    maximum=0.8,  # Use 80% of nodes
    excepted_ids=overloaded_nodes,
    alias="balanced"
)

Monitoring and Debugging

Task Identification

# Use descriptive aliases
task = Task(
    module=module,
    alias=f"{module.name}_v{version}_{timestamp}"
)

Logging in Tasks

# In task module
import logging
from manta.light import Task as TaskInfo

task_info = TaskInfo()
logging.info(f"Task {task_info.task_id} starting on {task_info.node_id}")

Metrics Collection

# In task module
from manta.light import Results
import time

start_time = time.time()
# ... computation ...
duration = time.time() - start_time

results = Results()
results.save({
    "duration": duration,
    "throughput": items_processed / duration
}, tag="metrics")

Best Practices

  1. Use Descriptive Aliases: Make debugging easier

  2. Set Resource Limits: Prevent resource exhaustion

  3. Handle Failures Gracefully: Use retry and fallback logic

  4. Monitor Task Performance: Collect metrics and logs

  5. Test Locally First: Validate task logic before deployment

  6. Document Dependencies: Clear task relationships

  7. Version Your Modules: Track code changes

Next Steps