Modules¶
Executable Task Packages in Distributed Computing
Modules are the fundamental building blocks of task execution in the Manta platform. They package Python code, dependencies, and execution logic into deployable units that run on distributed edge nodes. Understanding modules is essential for implementing distributed algorithms and federated learning workflows.
<� What You'll Learn
What are Modules?¶
Modules are packaged task implementations that contain the actual computation logic executed on distributed nodes. They serve as the bridge between high-level swarm definitions and low-level task execution.
Core Characteristics¶
Self-Contained Packages: Modules bundle Python code, dependencies, and resources into deployable units that can execute independently on any compatible node.
Task Implementation Containers: Each module contains one or more Task class implementations that define specific computation logic for different roles in distributed algorithms.
Environment Specifications: Modules specify their execution requirements including container images, datasets, GPU requirements, and resource constraints.
Version-Controlled Units: Modules can be versioned, shared, and reused across different swarms and deployments.
# Module definition in swarm
from manta.apis import Module, Task
# Create a module for federated learning worker
worker_module = Module(
python_program="modules/worker", # Path to module code
image="manta_light:pytorch", # Execution environment
datasets=["mnist"], # Required datasets
name="fl_worker" # Module identifier
)
# Wrap module in a task for execution
worker_task = Task(
module=worker_module,
method="all", # Execute on all nodes
maximum=-1, # No limit on nodes
gpu=False # CPU execution
)
Module Structure and Organization¶
Modules can be organized as either single files or directory structures, depending on complexity:
Single-File Modules¶
Simple tasks can be implemented as single Python files:
aggregator.py # Single-file module
Task class implementation
- Helper functions
main() entry point
# aggregator.py - Single file module
from manta.light.task import Task
class Aggregator(Task):
def run(self):
"""Aggregate models from worker nodes."""
models = self.world.results.select("model_params")
aggregated = self.aggregate_models(models)
self.world.globals["global_model"] = aggregated
def main():
Aggregator().run()
Directory-Based Modules¶
Complex tasks use directory structures for better organization:
worker/ # Directory module
__init__.py # Module entry point
worker_task.py # Main task implementation
model.py # Model definitions
- utils.py # Helper utilities
requirements.txt # Dependencies (optional)
# worker/__init__.py - Module entry point
from .worker_task import Worker
def main():
Worker().run()
# worker/worker_task.py - Task implementation
from manta.light.task import Task
from .model import MLP
class Worker(Task):
def run(self):
model = MLP()
# Training logic here...
Role in Swarm Implementation¶
Modules are the execution layer of swarms. While swarms define the algorithmic structure and task topology, modules contain the actual implementation logic.
Swarm-Module Relationship¶
Swarm Architecture: Swarm � Algorithm definition Task Task Task � Execution units Worker Aggregat. Schedule , , , <<< � � � Module Module Module � Implementation logic (Worker) (Aggregator (Scheduler)
Separation of Concerns:
Swarms: Define what to execute and when (orchestration)
Tasks: Define where to execute and how many (scheduling)
Modules: Define how to execute (implementation)
Implementation Patterns¶
Modules implement specific roles in distributed algorithms:
class FederatedLearningSwarm(Swarm):
def __init__(self):
super().__init__()
# Worker module: local training logic
self.worker = Task(
Module("modules/worker", "pytorch", datasets=["mnist"]),
method="all", maximum=-1
)
# Aggregator module: model combination logic
self.aggregator = Task(
Module("modules/aggregator", "pytorch"),
method="any", maximum=1
)
# Scheduler module: convergence and coordination logic
self.scheduler = Task(
Module("modules/scheduler", "pytorch"),
method="any", maximum=1
)
def execute(self):
"""Define execution flow."""
results = self.worker() # Parallel training
aggregated = self.aggregator(results) # Model aggregation
return self.scheduler(aggregated) # Coordination decision
Task Class Implementation¶
Within modules, Task classes define the core execution logic that runs on nodes.
Base Task Interface¶
All module tasks inherit from the base Task class:
from manta.light.task import Task
class MyTask(Task):
def run(self):
"""Main execution method called by the runtime."""
# Access local data
data = self.local.get_dataset("training_data")
# Access global state
global_params = self.world.globals["parameters"]
# Perform computation
results = self.compute(data, global_params)
# Share results with other tasks
self.world.results.add("output", results)
Task Context and Interfaces¶
Tasks have access to multiple interfaces for data and communication:
class ExampleTask(Task):
def run(self):
# Local interface - node-specific data and resources
local_dataset = self.local.get_dataset("mnist")
node_resources = self.local.get_resources()
# World interface - swarm-wide communication
global_model = self.world.globals["model"]
peer_results = self.world.results.select("gradients")
# Logger interface - structured logging
self.logger.info("Starting computation")
self.logger.debug(f"Dataset shape: {local_dataset.shape}")
Module Lifecycle¶
Modules progress through distinct phases from development to execution:
Development Phase¶
Code Implementation: Write Task classes and supporting code
Dependency Management: Specify required packages and datasets
Testing: Validate module logic locally
Documentation: Document module interface and behavior
# Development: Implement task logic
class TrainingTask(Task):
def run(self):
# Implementation details...
pass
Packaging Phase¶
Code Bundling: Package Python files into ZIP archives
Dependency Resolution: Resolve and include required libraries
Image Specification: Define execution container requirements
Validation: Ensure package integrity and compatibility
# Packaging: Module definition
training_module = Module(
python_program="modules/training_task.py", # Code location
image="manta_light:pytorch", # Container image
datasets=["cifar10"], # Data dependencies
name="distributed_training" # Module identifier
)
Deployment Phase¶
Upload: Transfer module packages to the platform
Distribution: Distribute modules to target nodes
Validation: Verify module integrity on nodes
Preparation: Set up execution environments
Execution Phase¶
Container Initialization: Start secure execution environment
Module Loading: Import and initialize module code
Task Instantiation: Create Task class instances
Runtime Execution: Execute task.run() method
Result Collection: Gather outputs and cleanup
Execution Flow: Node Agent Container Runtime Receive � Initialize Module Environment , � Monitor � Execute Execution Task.run() , � Collect � Cleanup & Results Terminate
Isolation and Sandboxing¶
Modules execute in isolated environments to ensure security, reproducibility, and resource management.
Container Isolation¶
Each module runs in its own container with:
Process Isolation: Separate process namespace
Filesystem Isolation: Read-only code, controlled data access
Network Isolation: Controlled communication channels
Resource Limits: CPU, memory, and GPU quotas
# Container specification in module
secure_module = Module(
python_program="sensitive_computation.py",
image="manta_light:pytorch", # Curated base image
datasets=["private_data"], # Controlled data access
name="secure_task"
)
Security Boundaries¶
Code Sandboxing: Modules cannot access: - Host filesystem outside designated paths - Other modules’ data or code - Network resources outside platform APIs - System-level resources or configurations
Data Isolation: Each module has controlled access to: - Assigned datasets only - Global state through platform APIs - Results from authorized previous tasks - Local temporary storage within limits
Runtime Environment¶
Reproducible Execution: Containers ensure: - Consistent Python environment across nodes - Identical library versions and dependencies - Deterministic resource allocation - Standardized runtime configuration
Container Architecture: Container Module Code � Isolated execution Task.py utils.py Runtime APIs � Controlled interfaces Local World Datasets � Authorized data only
Code Reusability and Sharing¶
Modules promote code reusability across different swarms, experiments, and deployments.
Module Libraries¶
Organizations can build libraries of reusable modules:
# Federated learning module library
fl_modules = {
"fedavg_worker": Module("fl/fedavg/worker.py", "pytorch"),
"fedavg_aggregator": Module("fl/fedavg/aggregator.py", "pytorch"),
"fedprox_worker": Module("fl/fedprox/worker.py", "pytorch"),
"scaffold_worker": Module("fl/scaffold/worker.py", "pytorch"),
}
# Reuse across different swarms
class FedAvgSwarm(Swarm):
def __init__(self):
self.worker = Task(fl_modules["fedavg_worker"])
self.aggregator = Task(fl_modules["fedavg_aggregator"])
class FedProxSwarm(Swarm):
def __init__(self):
self.worker = Task(fl_modules["fedprox_worker"])
self.aggregator = Task(fl_modules["fedavg_aggregator"]) # Reuse aggregator
Parameterized Modules¶
Modules can be parameterized for different use cases:
class ParameterizedWorker(Task):
def run(self):
# Get configuration from swarm
config = self.world.globals["training_config"]
# Adapt behavior based on parameters
if config["algorithm"] == "fedavg":
self.run_fedavg()
elif config["algorithm"] == "fedprox":
self.run_fedprox(config["mu"])
Cross-Domain Applications¶
Modules enable cross-domain algorithm reuse:
# Generic aggregation module
class GenericAggregator(Task):
def run(self):
method = self.world.globals.get("aggregation_method", "average")
results = self.world.results.select("local_results")
if method == "average":
aggregated = self.federated_averaging(results)
elif method == "weighted":
weights = self.world.results.select("sample_weights")
aggregated = self.weighted_averaging(results, weights)
self.world.globals["aggregated_result"] = aggregated
# Reusable across domains
vision_swarm = VisionFLSwarm() # Computer vision
nlp_swarm = LanguageFLSwarm() # Natural language processing
tabular_swarm = TabularFLSwarm() # Structured data
# All can use the same aggregator module
Versioning and Compatibility¶
Module versioning ensures reproducibility and compatibility across deployments.
Version Management¶
Semantic Versioning: Modules follow semantic versioning (MAJOR.MINOR.PATCH):
MAJOR: Breaking changes to module interface
MINOR: New features, backward compatible
PATCH: Bug fixes, fully compatible
# Version specification
module_v1_0 = Module(
python_program="worker_v1.0.py",
image="manta_light:pytorch",
name="fl_worker",
version="1.0.0"
)
module_v1_1 = Module(
python_program="worker_v1.1.py",
image="manta_light:pytorch",
name="fl_worker",
version="1.1.0" # Backward compatible
)
Compatibility Matrices¶
Platform maintains compatibility information:
Module Compatibility: Module Version Platform Version Compatible Images Notes <<<fl_worker 1.0 manta >= 0.3.0 pytorch, tensorflow Stable fl_worker 1.1 manta >= 0.4.0 pytorch, tensorflow New features fl_worker 2.0 manta >= 0.5.0 pytorch:2.0+ Breaking changes
Deployment Strategies¶
Blue-Green Deployments: Run old and new module versions side by side:
# Gradual migration strategy
class ExperimentalSwarm(Swarm):
def __init__(self):
# 80% nodes use stable version
self.worker_stable = Task(
Module("worker", version="1.0.0"),
method="random", maximum=0.8
)
# 20% nodes test new version
self.worker_experimental = Task(
Module("worker", version="1.1.0"),
method="random", maximum=0.2
)
Architectural Relationships¶
Understanding how modules relate to other Manta components is crucial for effective system design.
Module-Task-Swarm Hierarchy¶
Conceptual Hierarchy: Platform � Infrastructure layer , � Swarm � Algorithm definition Task A Task B ... � Execution scheduling , , << � � Module A Module B � Implementation logic Task Task � Executable code Class Class
Data Flow Relationships¶
# Data flows through the module ecosystem
class DataFlowExample(Swarm):
def execute(self):
# 1. Workers process local data � produce local models
local_results = self.workers()
# 2. Aggregator consumes local models � produces global model
global_model = self.aggregator(local_results)
# 3. Validators consume global model � produce metrics
metrics = self.validators(global_model)
# 4. Scheduler consumes metrics � produces decisions
return self.scheduler(metrics)
Communication Patterns¶
Modules interact through well-defined communication patterns:
Producer-Consumer: Sequential data processing
# Worker produces � Aggregator consumes
worker_results = worker_task()
aggregated = aggregator_task(worker_results)
All-to-One: Parallel collection and processing
# Multiple workers � Single aggregator
all_worker_results = [worker() for worker in worker_tasks]
global_result = aggregator(all_worker_results)
Broadcast: One-to-many distribution
# Aggregator produces � All workers consume
global_model = aggregator()
updated_workers = [worker(global_model) for worker in worker_tasks]
Best Practices and Patterns¶
Effective module development follows established patterns and practices.
Design Principles¶
Single Responsibility: Each module should have one clear purpose
# Good: Focused responsibility
class ModelTrainer(Task):
def run(self):
"""Only handles model training logic."""
pass
class ModelEvaluator(Task):
def run(self):
"""Only handles model evaluation logic."""
pass
# Avoid: Multiple responsibilities in one module
class TrainerAndEvaluator(Task): # Too broad
def run(self):
self.train_model() # Training responsibility
self.evaluate_model() # Evaluation responsibility
Interface Consistency: Maintain consistent data formats and APIs
# Consistent module interface
class StandardWorker(Task):
def run(self):
# Standard input format
params = self.world.globals["model_parameters"]
hyperparams = self.world.globals["hyperparameters"]
# Standard processing
results = self.train(params, hyperparams)
# Standard output format
self.world.results.add("model_update", results)
self.world.results.add("metrics", self.get_metrics())
Error Handling¶
Robust Error Management: Handle failures gracefully
class RobustTask(Task):
def run(self):
try:
# Main computation logic
self.execute_computation()
except DataCorruptionError as e:
self.logger.error(f"Data corruption detected: {e}")
self.world.results.add("status", "failed_data_corruption")
except ResourceExhaustionError as e:
self.logger.warning(f"Resource exhaustion: {e}")
self.world.results.add("status", "failed_resources")
except Exception as e:
self.logger.error(f"Unexpected error: {e}")
self.world.results.add("status", "failed_unknown")
raise # Re-raise for platform handling
Performance Optimization¶
Efficient Resource Usage: Optimize for distributed execution
class OptimizedTask(Task):
def run(self):
# Check available resources
resources = self.local.get_resources()
# Adapt computation to available resources
if resources.gpu_available:
batch_size = self.calculate_gpu_batch_size(resources.gpu_memory)
device = "cuda"
else:
batch_size = self.calculate_cpu_batch_size(resources.cpu_memory)
device = "cpu"
# Execute with optimized settings
self.process_data(batch_size=batch_size, device=device)
Testing and Validation¶
Comprehensive Testing: Validate module behavior thoroughly
# Module testing patterns
class TestableTask(Task):
def run(self):
# Validate inputs
assert self.validate_inputs(), "Invalid input data"
# Execute with logging
self.logger.info("Starting computation")
results = self.compute()
# Validate outputs
assert self.validate_outputs(results), "Invalid output data"
# Store results
self.world.results.add("output", results)
def validate_inputs(self):
"""Validate input data and parameters."""
# Validation logic
return True
def validate_outputs(self, results):
"""Validate computation results."""
# Validation logic
return True
Advanced Module Concepts¶
Complex distributed algorithms may require advanced module patterns and techniques.
Stateful Modules¶
Some modules need to maintain state across executions:
class StatefulOptimizer(Task):
def run(self):
# Load persistent state
momentum = self.local.get_state("optimizer_momentum", default={})
# Perform optimization step
gradients = self.world.results.select("gradients")
updated_params, new_momentum = self.sgd_with_momentum(
gradients, momentum
)
# Save updated state
self.local.set_state("optimizer_momentum", new_momentum)
self.world.globals["parameters"] = updated_params
Hierarchical Modules¶
Complex algorithms may use hierarchical module structures:
class HierarchicalAggregator(Task):
def run(self):
# Level 1: Regional aggregation
regional_results = self.aggregate_by_region()
# Level 2: Global aggregation
global_result = self.aggregate_global(regional_results)
self.world.globals["global_model"] = global_result
def aggregate_by_region(self):
"""Perform regional aggregation first."""
# Implementation for hierarchical aggregation
pass
Adaptive Modules¶
Modules that adapt their behavior based on runtime conditions:
class AdaptiveWorker(Task):
def run(self):
# Analyze current conditions
node_performance = self.local.get_performance_metrics()
network_quality = self.local.get_network_quality()
# Adapt training strategy
if network_quality < 0.5:
# Use local updates with less communication
self.run_local_training_rounds(extra_rounds=3)
elif node_performance["cpu_usage"] > 0.8:
# Reduce computational load
self.run_lightweight_training()
else:
# Standard training
self.run_standard_training()
Integration with Swarm Development¶
Modules work in conjunction with Swarms development. Understanding their relationship is crucial for building effective distributed systems.
From Swarm Perspective: Swarms orchestrate module execution and define the overall algorithm structure.
From Module Perspective: Modules implement the computational logic that swarms coordinate.
Development Workflow:
Design the algorithm (Swarm level)
Implement the components (Module level)
Test integration (Swarm + Module)
Deploy and monitor (Platform level)
=� Key Takeaway
Modules are the implementation layer of distributed computing in Manta. They package task logic into reusable, isolated units that execute securely on edge nodes. Mastering module development enables you to build robust, scalable distributed algorithms.
Summary¶
Modules serve as the executable foundation of Manta’s distributed computing platform:
Core Function: Package task implementations into deployable, isolated execution units
Key Benefits: - Code reusability across swarms and deployments - Secure, sandboxed execution environments - Version management and compatibility control - Clear separation between algorithm design and implementation
Relationship to Swarms: Modules provide the computational logic that swarms orchestrate and coordinate
Development Impact: Well-designed modules enable rapid development of diverse distributed algorithms while maintaining security, reproducibility, and performance.
Understanding modules deeply will empower you to build sophisticated federated learning systems, distributed simulations, and edge computing applications on the Manta platform.