Cluster - Control swarms and access results & logs¶
- class manta.cluster.Cluster(host: str = 'localhost', port: int = 50051, ca_port: int = 50050, secured_token: str | None = None)¶
Synchronous gRPC client to connect to a cluster
- Parameters:
host (str) – Manager host
port (int) – Manager port
ca_port (int) – Port for the CA
secured_token (Optional[str]) – Secured token for the Manager first connection
See also
AsyncCluster
Asynchronous gRPC client
- classmethod connect(host: str = 'localhost', port: int = 50051, ca_port: int = 50050, secured_token: str | None = None) Cluster ¶
Create a Cluster gRPC client.
- Parameters:
host (str) – Manager host, by default “localhost”
port (int) – Manager port, by default 50051
ca_port (int) – Port for the CA
secured_token (Optional[str]) – Secured token for the Manager first connection
- Returns:
Cluster gRPC client
- Return type:
Examples
>>> Cluster.connect( ... "localhost", 50051, 50050, secured_token="TOKEN" ... )
- is_available() Response ¶
Check if the server is available
- Returns:
Response of the server
- Return type:
Response
Examples
>>> cluster = Cluster("localhost", 50051) >>> cluster.is_available() Response(message="Available", status=1)
- start_swarm(swarm_id: str) Response ¶
Start a Swarm given its ID if it is found in the database
- Parameters:
swarm_id (str) – Swarm ID
- Returns:
Response of the server
- Return type:
Response
Examples
>>> cluster = Cluster("localhost", 50051) >>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9' >>> cluster.start_swarm(swarm_id) Response(message='Swarm 9415dfd18edc45c9a6ffdf2055007bf9 has started.', status=2)
- send_swarm(swarm: Swarm) SwarmOverview ¶
Send a Swarm to the server to be inserted into its database
- Parameters:
swarm (Swarm) – Any class which inherit from the
Swarm
class- Returns:
Overview of the metadata of the swarm registered in the server
- Return type:
SwarmOverview
Examples
>>> cluster = Cluster("localhost", 50051) >>> swarm = FLSwarm() >>> cluster.send_swarm(swarm) SwarmOverview(swarm_id='9415dfd18edc45c9a6ffdf2055007bf9', datetime='2024-09-13 10:18:06', status='ACTIVE')
- deploy_swarm(swarm: Swarm) Deployment ¶
Deploy a Swarm
- Parameters:
swarm (Swarm) – Any class which inherit from the
Swarm
class- Returns:
Response of the server
- Return type:
Deployment
Examples
>>> cluster = Cluster("localhost", 50051) >>> swarm = FLSwarm() >>> cluster.deploy_swarm(swarm) Deployment(swarm_id='9415dfd18edc45c9a6ffdf2055007bf9', datetime='2024-09-13 10:18:06', status='ACTIVE', message='Swarm 9415dfd18edc45c9a6ffdf2055007bf9 has started.')
- list_swarms() List[SwarmOverview] ¶
List swarms registered in the server
- Returns:
List of swarm overviews
- Return type:
List[SwarmOverview]
Examples
>>> cluster = Cluster("localhost", 50051) >>> cluster.list_swarms() [SwarmOverview(swarm_id='9415dfd18edc45c9a6ffdf2055007bf9', datetime='2024-09-13 10:18:06', status='ACTIVE')]
- remove_swarm(swarm_id: str) Response ¶
Remove a swarm on the cluster
- Parameters:
swarm_id (str) – Id of the swarm
- Returns:
Response of the server
- Return type:
Response
Examples
>>> cluster = Cluster("localhost", 50051) >>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9' >>> cluster.remove_swarm(swarm_id) Response(message='9415dfd18edc45c9a6ffdf2055007bf9 deleted.', status=2)
- get_tasks(swarm_id: str) List[Dict[str, str]] ¶
Get tasks given swarm ID
- Parameters:
swarm_id (str) – Swarm ID
- Returns:
List of several informations containing tasks
- Return type:
List[Dict[str, str]]
Examples
>>> cluster = Cluster("localhost", 50051) >>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9' >>> cluster.get_tasks(swarm_id) [{'task_id': 'adcc83a14d8642bbb6413d196fb0b5b8', 'image': 'fl-pytorch-mnist:latest', 'previous_tasks': '[]', 'next_tasks': "['435709c73c5344c5a3d5754c7eb2ab6d']", ...}, ...]
Warning
The server is not returning the payloads in the response. Ask the Manta team to add this feature.
- select_tasks(node_id: str) List[Dict[str, str]] ¶
Select tasks given node ID. Tasks are sorted by progression of the swarm from the end to the beginning.
- Parameters:
node_id (str) – Node ID
- Returns:
List of several informations containing tasks
- Return type:
List[Dict[str, str]]
Examples
>>> cluster = Cluster("localhost", 50051) >>> node_id = 'a2ff3abc76e045d7bb4a04a5ac416318' >>> cluster.select_tasks(node_id) [{'task_id': 'adcc83a14d8642bbb6413d196fb0b5b8', 'swarm_id': '435709c73c5344c5a3d5754c7eb2ab6d', 'status': ...}, ...]
Warning
The server is not returning the payloads in the response. Ask the Manta team to add this feature.
- get_node_ids(available: bool = False) List[str] ¶
Get node IDs
- Parameters:
available (bool) – Get only available nodes
- Returns:
List of node IDs
- Return type:
List[str]
Examples
>>> cluster = Cluster("localhost", 50051) >>> cluster.get_node_ids(available=True) ['a2ff3abc76e045d7bb4a04a5ac416318', 'f8a7c1d7f9d44f6b9d0d3d5d9c7b3c9d']
- select_results(*queries: Tuple[str, List[str]]) Dict[str, Results] ¶
Select results from the database given the queries
- Parameters:
queries (Tuple[str, List[str]]) – Queries under the structure
(swarm_id, tags)
- Returns:
Dictionary of
(swarm_id, results)
- Return type:
Dict[str, Results]
Examples
>>> cluster = Cluster("localhost", 50051) >>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9' >>> tags = ["accuracy", "loss"] >>> cluster.select_results((swarm_id, tags)) {'9415dfd18edc45c9a6ffdf2055007bf9': Results(len(iteration)=10, len(nodes)=8, len(tags)=2)}
- collect_logs(id: str) List[Dict[str, str]] ¶
Collect the logs stored in the
Manager
database given the ID- Parameters:
id (str) – Swarm ID or Node ID
- Returns:
List of several informations containing logs
- Return type:
List[Dict[str, str]]
Examples
>>> cluster = Cluster("localhost", 50051) >>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9' >>> cluster.collect_logs(swarm_id) [{'swarm_id': '9415dfd18edc45c9a6ffdf2055007bf9', 'task_id': 'a2ff3abc76e045d7bb4a04a5ac416318', 'message': ..., 'datetime': '2024-09-13 10:18:06', 'iteration': '1'}, ...]
- class manta.cluster.AsyncCluster(host: str = 'localhost', port: int = 50051, ca_port: int = 50050, secured_token: str | None = None)¶
Asynchronous gRPC client to connect to a cluster
- Parameters:
host (str) – Manager host
port (int) – Manager port
ca_port (int) – Port for the CA
secured_token (Optional[str]) – Secured token for the Manager first connection
- async classmethod connect(host: str = 'localhost', port: int = 50051, ca_port: int = 50050, secured_token: str | None = None) Cluster ¶
Create a Cluster gRPC client.
- Parameters:
host (str) – Manager host, by default “localhost”
port (int) – Manager port, by default 50051
ca_port (int) – Port for the CA
secured_token (Optional[str]) – Secured token for the Manager first connection
- Returns:
Cluster gRPC client
- Return type:
Examples
>>> await AsyncCluster.connect( ... "localhost", 50051, 50050, secured_token="TOKEN" ... )
- async is_available(service: UserStub) Response ¶
Check if the server is available
- Parameters:
service (UserStub) – Service to connect to the gRPC server (not required)
- Returns:
Response of the server
- Return type:
Response
Examples
>>> cluster = AsyncCluster("localhost", 50051) >>> await cluster.is_available() Response(message="Available", status=1)
Notes
The argument
service
is managed by a decorator under the hood.
- async start_swarm(service: UserStub, swarm_id: str)¶
Start a Swarm given its ID if it is found in the database
- Parameters:
service (UserStub) – Service to connect to the gRPC server (not required)
swarm_id (str) – Swarm ID
- Returns:
Response of the server
- Return type:
Response
Examples
>>> cluster = AsyncCluster("localhost", 50051) >>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9' >>> await cluster.start_swarm(swarm_id) Response(message='Swarm 9415dfd18edc45c9a6ffdf2055007bf9 has started.', status=2)
Notes
The argument
service
is managed by a decorator under the hood.
- async send_swarm(service: UserStub, swarm: Swarm) SwarmOverview ¶
Send a Swarm to the server to be inserted into its database
- Parameters:
service (UserStub) – Service to connect to the gRPC server (not required)
swarm (Swarm) – Any class which inherit from the
Swarm
class
- Returns:
Overview of the metadata of the swarm registered in the server
- Return type:
SwarmOverview
Examples
>>> cluster = AsyncCluster("localhost", 50051) >>> swarm = FLSwarm() >>> await cluster.send_swarm(swarm) SwarmOverview(swarm_id='9415dfd18edc45c9a6ffdf2055007bf9', datetime='2024-09-13 10:18:06', status='ACTIVE')
Notes
The argument
service
is managed by a decorator under the hood.
- async deploy_swarm(service: UserStub, swarm: Swarm) Deployment ¶
Deploy a Swarm
- Parameters:
service (UserStub) – Service to connect to the gRPC server (not required)
swarm (Swarm) – Any class which inherit from the
Swarm
class
- Returns:
Response of the server
- Return type:
Deployment
Examples
>>> cluster = AsyncCluster("localhost", 50051) >>> swarm = FLSwarm() >>> await cluster.deploy_swarm(swarm) Deployment(swarm_id='9415dfd18edc45c9a6ffdf2055007bf9', datetime='2024-09-13 10:18:06', status='ACTIVE', message='Swarm 9415dfd18edc45c9a6ffdf2055007bf9 has started.')
Notes
The argument
service
is managed by a decorator under the hood.
- async list_swarms(service: UserStub) List[SwarmOverview] ¶
List swarms registered in the server
- Parameters:
service – Service to connect to the gRPC server (not required)
- Returns:
List of swarm overviews
- Return type:
List[SwarmOverview]
Examples
>>> cluster = AsyncCluster("localhost", 50051) >>> await cluster.list_swarms() [SwarmOverview(swarm_id='9415dfd18edc45c9a6ffdf2055007bf9', datetime='2024-09-13 10:18:06', status='ACTIVE')]
Notes
The argument
service
is managed by a decorator under the hood.
- async remove_swarm(service: UserStub, swarm_id: str) Response ¶
Remove a swarm on the cluster
- Parameters:
service (UserStub) – Service to connect to the gRPC server (not required)
swarm_id (str) – Id of the swarm
- Returns:
Response of the server
- Return type:
Response
Examples
>>> cluster = AsyncCluster("localhost", 50051) >>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9' >>> await cluster.remove_swarm(swarm_id) Response(message='9415dfd18edc45c9a6ffdf2055007bf9 deleted.', status=2)
Notes
The argument
service
is managed by a decorator under the hood.
- async get_tasks(service: UserStub, swarm_id: str) List[Dict[str, str]] ¶
Get tasks given swarm ID
- Parameters:
service (UserStub) – Service to connect to the gRPC server (not required)
swarm_id (str) – Swarm ID
- Returns:
List of several informations containing tasks
- Return type:
List[Dict[str, str]]
Examples
>>> cluster = AsyncCluster("localhost", 50051) >>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9' >>> await cluster.get_tasks(swarm_id) [{'task_id': 'adcc83a14d8642bbb6413d196fb0b5b8', 'image': 'fl-pytorch-mnist:latest', 'previous_tasks': '[]', 'next_tasks': "['435709c73c5344c5a3d5754c7eb2ab6d']", ...}, ...]
Warning
The server is not returning the payloads in the response. Ask the Manta team to add this feature.
- async select_tasks(service: UserStub, node_id: str) List[Dict[str, str]] ¶
Select tasks given node ID. Tasks are sorted by progression of the swarm from the end to the beginning.
- Parameters:
service (UserStub) – Service to connect to the gRPC server (not required)
node_id (str) – Node ID
- Returns:
List of several informations containing tasks
- Return type:
List[Dict[str, str]]
Examples
>>> cluster = AsyncCluster("localhost", 50051) >>> node_id = 'a2ff3abc76e045d7bb4a04a5ac416318' >>> await cluster.select_tasks(node_id) [{'task_id': 'adcc83a14d8642bbb6413d196fb0b5b8', 'swarm_id': '435709c73c5344c5a3d5754c7eb2ab6d', 'status': ...}, ...]
Warning
The server is not returning the payloads in the response. Ask the Manta team to add this feature.
- async get_node_ids(service: UserStub, available: bool = False) List[str] ¶
Get node IDs
- Parameters:
service (UserStub) – Service to connect to the gRPC server (not required)
available (bool) – Get only available nodes
- Returns:
List of node IDs
- Return type:
List[str]
Examples
>>> cluster = AsyncCluster("localhost", 50051) >>> await cluster.get_node_ids(available=True) ['a2ff3abc76e045d7bb4a04a5ac416318', 'f8a7c1d7f9d44f6b9d0d3d5d9c7b3c9d']
- async select_results(service: UserStub, *queries: Tuple[str, List[str]]) Dict[str, Results] ¶
Select results from the database given the queries
- Parameters:
service (UserStub) – Service to connect to the gRPC server (not required)
queries (Tuple[str, List[str]]) – Queries under the structure
(swarm_id, tags)
- Returns:
Dictionary of
(swarm_id, results)
- Return type:
Dict[str, Results]
Examples
>>> cluster = AsyncCluster("localhost", 50051) >>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9' >>> tags = ["accuracy", "loss"] >>> await cluster.select_results((swarm_id, tags)) {'9415dfd18edc45c9a6ffdf2055007bf9': Results(len(iteration)=10, len(nodes)=8, len(tags)=2)}
Notes
The argument
service
is managed by a decorator under the hood.
- async collect_logs(service: UserStub, id: str) List[Dict[str, str]] ¶
Collect the logs stored in the
Manager
database given the ID- Parameters:
service (UserStub) – Service to connect to the gRPC server (not required)
id (str) – Swarm ID or Node ID
- Returns:
List of several informations containing logs
- Return type:
List[Dict[str, str]]
Examples
>>> cluster = AsyncCluster("localhost", 50051) >>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9' >>> cluster.collect_logs(swarm_id) [{'swarm_id': '9415dfd18edc45c9a6ffdf2055007bf9', 'task_id': 'a2ff3abc76e045d7bb4a04a5ac416318', 'message': ..., 'datetime': '2024-09-13 10:18:06', 'iteration': '1'}, ...]
Notes
The argument
service
is managed by a decorator under the hood.