Task Configuration¶
Detailed guide to configuring and optimizing tasks in your swarms.
Task Overview¶
A Task represents a unit of computation that executes on one or more nodes. Tasks are created from modules and configured with scheduling, resource, and networking parameters.
Creating Tasks¶
Basic Task Creation
from manta import Task, Module
# Create module first
module = Module(
name="worker",
python_program="worker.py",
image="python:3.10"
)
# Create task from module
task = Task(
module=module,
alias="worker_task"
)
Full Task Parameters
task = Task(
module=module, # Required: Module to execute
method="all", # Scheduling method
fixed=False, # Dynamic vs fixed node assignment
excepted_ids=None, # Nodes to exclude
specified_ids=None, # Specific nodes to use
maximum=-1, # Max nodes (-1 = unlimited)
network=None, # Network name
gpu=False, # GPU requirement
alias="my_task" # Task identifier
)
Scheduling Methods¶
The method
parameter determines how tasks are distributed:
“all” - Execute on All Nodes
task = Task(
module=module,
method="all", # Run on every available node
alias="broadcast"
)
Use cases: - Data distribution - Model updates - Configuration changes
“any” - Execute on Any Node
task = Task(
module=module,
method="any", # Run on any single available node
alias="singleton"
)
Use cases: - Aggregation tasks - Coordination services - Single-point operations
“fixed” - Fixed Node Assignment
task = Task(
module=module,
method="any",
fixed=True, # Keep same nodes across iterations
alias="stateful"
)
Use cases: - Stateful operations - Data locality requirements - Consistent routing
Node Selection¶
Maximum Nodes
Limit the number of nodes used:
# Use at most 5 nodes
task = Task(
module=module,
method="all",
maximum=5,
alias="limited"
)
# Use 50% of available nodes
task = Task(
module=module,
method="all",
maximum=0.5, # Fraction between 0 and 1
alias="half_cluster"
)
Specific Node Selection
Target specific nodes:
# Use only specified nodes
task = Task(
module=module,
specified_ids=["node1", "node2", "node3"],
alias="targeted"
)
# Exclude problematic nodes
task = Task(
module=module,
method="all",
excepted_ids=["slow_node", "unstable_node"],
alias="filtered"
)
Resource Requirements¶
GPU Tasks
# Require GPU nodes
gpu_task = Task(
module=gpu_module,
method="all",
gpu=True, # Only schedule on GPU-enabled nodes
alias="ml_training"
)
Resource Constraints in Module
module = Module(
name="heavy_compute",
python_program=code,
image="python:3.10",
resources={
"cpu": 4, # 4 CPU cores
"memory": "8Gi", # 8 GB memory
"gpu": 1 # 1 GPU
}
)
task = Task(module=module)
Network Configuration¶
Tasks can be assigned to specific networks for isolation:
Network Assignment
class NetworkedSwarm(Swarm):
def __init__(self):
super().__init__()
# Define networks
self.add_network("frontend", Driver.OVERLAY)
self.add_network("backend", Driver.BRIDGE)
def execute(self):
# Assign tasks to networks
api_task = Task(
module=api_module,
network="frontend",
alias="api"
)
db_task = Task(
module=db_module,
network="backend",
alias="database"
)
return api_task(db_task)
Task Dependencies¶
Sequential Dependencies
# Task B depends on Task A
task_a = Task(module_a, alias="first")
task_b = Task(module_b, alias="second")
# B runs after A completes
flow = task_b(task_a)
Parallel Dependencies
# Multiple tasks feed into one
data_tasks = [
Task(data_module, alias=f"data_{i}")
for i in range(5)
]
processor = Task(processor_module, alias="processor")
# All data tasks must complete before processor
flow = processor(data_tasks)
Conditional Execution
from manta.graph import Condition
# Execute based on condition
task_a = Task(module_a)
task_b = Task(module_b)
# B runs when A produces a result
task_a.add_edge(task_b.uuid, Condition.RESULT)
# B runs when A finishes (regardless of result)
task_a.add_edge(task_b.uuid, Condition.FINISHED)
Advanced Task Patterns¶
Dynamic Task Generation
class DynamicSwarm(Swarm):
def execute(self):
# Generate tasks based on global config
num_workers = self.globals.get("num_workers", 10)
tasks = []
for i in range(num_workers):
task = Task(
module=worker_module,
alias=f"worker_{i}",
maximum=1 # Each on different node
)
tasks.append(task)
# Connect all to aggregator
aggregator = Task(
module=aggregator_module,
method="any",
maximum=1,
alias="aggregator"
)
return aggregator(tasks)
Task Pools
class PooledSwarm(Swarm):
def execute(self):
# Create pool of identical workers
worker_pool = [
Task(
module=worker_module,
method="any",
alias=f"pool_worker_{i}"
)
for i in range(20)
]
# Load balancer distributes work
balancer = Task(
module=balancer_module,
method="any",
maximum=1,
alias="load_balancer"
)
return balancer(worker_pool)
Hierarchical Tasks
class HierarchicalSwarm(Swarm):
def execute(self):
# Leaf workers
workers = [
Task(worker_module, alias=f"worker_{i}")
for i in range(16)
]
# Mid-level aggregators
aggregators = []
for i in range(4):
agg = Task(
aggregator_module,
alias=f"aggregator_{i}"
)
# Connect 4 workers to each aggregator
agg(workers[i*4:(i+1)*4])
aggregators.append(agg)
# Top-level coordinator
coordinator = Task(
coordinator_module,
method="any",
maximum=1,
alias="coordinator"
)
return coordinator(aggregators)
Task Communication¶
Tasks communicate through:
1. Results
# In task module
from manta.light import Results
results = Results()
results.save({"output": processed_data}, tag="processed")
2. Global State
# In task module
from manta.light import World
world = World()
shared_data = world.get("shared_key")
world.set("shared_key", updated_data)
3. Local Storage
# In task module
from manta.light import Local
local = Local()
data = local.load_data("dataset_name")
Error Handling¶
Retry Logic
# Task with retry on failure
resilient_task = Task(
module=module,
method="any",
maximum=3, # Try up to 3 different nodes
excepted_ids=failed_nodes, # Exclude known bad nodes
alias="resilient"
)
Timeout Configuration
module = Module(
name="timeout_aware",
python_program=code,
image="python:3.10",
timeout=300 # 5 minute timeout
)
Error Recovery
class RobustSwarm(Swarm):
def execute(self):
# Primary task
primary = Task(
module=primary_module,
method="any",
alias="primary"
)
# Fallback task
fallback = Task(
module=fallback_module,
method="any",
alias="fallback"
)
# Recovery coordinator
recovery = Task(
module=recovery_module,
method="any",
maximum=1,
alias="recovery"
)
# Connect with error handling
return recovery([primary, fallback])
Performance Optimization¶
Batch Processing
# Process in batches for efficiency
batch_task = Task(
module=batch_module,
method="all",
maximum=10, # Limit parallelism
alias="batch_processor"
)
Data Locality
# Keep computation near data
local_task = Task(
module=module,
fixed=True, # Pin to data nodes
specified_ids=data_nodes,
alias="local_compute"
)
Resource Balancing
# Distribute load evenly
balanced_task = Task(
module=module,
method="all",
maximum=0.8, # Use 80% of nodes
excepted_ids=overloaded_nodes,
alias="balanced"
)
Monitoring and Debugging¶
Task Identification
# Use descriptive aliases
task = Task(
module=module,
alias=f"{module.name}_v{version}_{timestamp}"
)
Logging in Tasks
# In task module
import logging
from manta.light import Task as TaskInfo
task_info = TaskInfo()
logging.info(f"Task {task_info.task_id} starting on {task_info.node_id}")
Metrics Collection
# In task module
from manta.light import Results
import time
start_time = time.time()
# ... computation ...
duration = time.time() - start_time
results = Results()
results.save({
"duration": duration,
"throughput": items_processed / duration
}, tag="metrics")
Best Practices¶
Use Descriptive Aliases: Make debugging easier
Set Resource Limits: Prevent resource exhaustion
Handle Failures Gracefully: Use retry and fallback logic
Monitor Task Performance: Collect metrics and logs
Test Locally First: Validate task logic before deployment
Document Dependencies: Clear task relationships
Version Your Modules: Track code changes
Next Steps¶
Swarm Basics - Back to basics
Swarm Development Guide - Swarm development guide