Cluster - Control swarms and access results & logs

class manta.cluster.Cluster(host: str = 'localhost', port: int = 50051, ca_port: int = 50050, secured_token: str | None = None)

gRPC client to connect to a Cluster

Parameters:
  • host (str) – Manager host

  • port (int) – Manager port

  • ca_port (int) – Port for the CA

  • secured_token (Optional[str]) – Secured token for the Manager first connection

async classmethod connect(host: str = 'localhost', port: int = 50051, ca_port: int = 50050, secured_token: str | None = None) Cluster

Create a Cluster gRPC client.

Parameters:
  • host (str) – Manager host, by default “localhost”

  • port (int) – Manager port, by default 50051

  • ca_port (int) – Port for the CA

  • secured_token (Optional[str]) – Secured token for the Manager first connection

Returns:

Cluster gRPC client

Return type:

Cluster

Examples

>>> await Cluster.connect(
...     "localhost", 50051, 50050, secured_token="TOKEN"
... )
async is_available(service: UserStub) Response

Check if the server is available

Parameters:

service (UserStub) – Service to connect to the gRPC server (not required)

Returns:

Response of the server

Return type:

Response

Examples

>>> cluster = Cluster("localhost", 50051)
>>> await cluster.is_available()
Response(message="Available", status=1)

Notes

The argument service is managed by a decorator under the hood.

async start_swarm(service: UserStub, swarm_id: str)

Start a Swarm given its ID if it is found in the database

Parameters:
  • service (UserStub) – Service to connect to the gRPC server (not required)

  • swarm_id (str) – Swarm ID

Returns:

Response of the server

Return type:

Response

Examples

>>> cluster = Cluster("localhost", 50051)
>>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9'
>>> await cluster.start_swarm(swarm_id)
Response(message='Swarm 9415dfd18edc45c9a6ffdf2055007bf9 has started.', status=2)

Notes

The argument service is managed by a decorator under the hood.

async send_swarm(service: UserStub, swarm: Swarm) SwarmOverview

Send a Swarm to the server to be inserted into its database

Parameters:
  • service (UserStub) – Service to connect to the gRPC server (not required)

  • swarm (Swarm) – Any class which inherit from the Swarm class

Returns:

Overview of the metadata of the swarm registered in the server

Return type:

SwarmOverview

Examples

>>> cluster = Cluster("localhost", 50051)
>>> swarm = FLSwarm()
>>> await cluster.send_swarm(swarm)
SwarmOverview(swarm_id='9415dfd18edc45c9a6ffdf2055007bf9', datetime='2024-09-13 10:18:06', status='ACTIVE')

Notes

The argument service is managed by a decorator under the hood.

async deploy_swarm(service: UserStub, swarm: Swarm) Deployment

Deploy a Swarm

Parameters:
  • service (UserStub) – Service to connect to the gRPC server (not required)

  • swarm (Swarm) – Any class which inherit from the Swarm class

Returns:

Response of the server

Return type:

Deployment

Examples

>>> cluster = Cluster("localhost", 50051)
>>> swarm = FLSwarm()
>>> await cluster.deploy_swarm(swarm)
Deployment(swarm_id='9415dfd18edc45c9a6ffdf2055007bf9', datetime='2024-09-13 10:18:06', status='ACTIVE', message='Swarm 9415dfd18edc45c9a6ffdf2055007bf9 has started.')

Notes

The argument service is managed by a decorator under the hood.

async list_swarms(service: UserStub) List[SwarmOverview]

List swarms registered in the server

Parameters:

service – Service to connect to the gRPC server (not required)

Returns:

List of swarm overviews

Return type:

List[SwarmOverview]

Examples

>>> cluster = Cluster("localhost", 50051)
>>> await cluster.list_swarms()
[SwarmOverview(swarm_id='9415dfd18edc45c9a6ffdf2055007bf9', datetime='2024-09-13 10:18:06', status='ACTIVE')]

Notes

The argument service is managed by a decorator under the hood.

async remove_swarm(service: UserStub, swarm_id: str) Response

Remove a swarm on the cluster

Parameters:
  • service (UserStub) – Service to connect to the gRPC server (not required)

  • swarm_id (str) – Id of the swarm

Returns:

Response of the server

Return type:

Response

Examples

>>> cluster = Cluster("localhost", 50051)
>>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9'
>>> await cluster.remove_swarm(swarm_id)
Response(message='9415dfd18edc45c9a6ffdf2055007bf9 deleted.', status=2)

Notes

The argument service is managed by a decorator under the hood.

async get_tasks(service: UserStub, swarm_id: str) List[Dict[str, str]]

Get tasks given swarm ID

Parameters:
  • service (UserStub) – Service to connect to the gRPC server (not required)

  • swarm_id (str) – Swarm ID

Returns:

List of several informations containing tasks

Return type:

List[Dict[str, str]]

Examples

>>> cluster = Cluster("localhost", 50051)
>>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9'
>>> await cluster.get_tasks(swarm_id)
[
    {'task_id': 'adcc83a14d8642bbb6413d196fb0b5b8', 'image': 'fl-pytorch-mnist:latest', 'previous_tasks': '[]', 'next_tasks': "['435709c73c5344c5a3d5754c7eb2ab6d']", ...},
]

Warning

The server is not returning the payloads in the response. Ask the Manta team to add this feature.

async select_task(service: UserStub, task_id: str) List[Dict[str, str]]

Select task given Node ID

Parameters:
  • service (UserStub) – Service to connect to the gRPC server (not required)

  • task_id (str) – Node ID

Returns:

List of several informations containing tasks

Return type:

List[Dict[str, str]]

Examples

>>> cluster = Cluster("localhost", 50051)
>>> node_id = 'a2ff3abc76e045d7bb4a04a5ac416318'
>>> await cluster.select_task(task_id)
[
    {'task_id': 'adcc83a14d8642bbb6413d196fb0b5b8', 'image': 'fl-pytorch-mnist:latest', 'previous_tasks': '[]', 'next_tasks': "['435709c73c5344c5a3d5754c7eb2ab6d']", ...},
]

Warning

The server is not returning the payloads in the response. Ask the Manta team to add this feature.

async get_node_ids(service: UserStub, available: bool = False) List[str]

Get node IDs

Parameters:
  • service (UserStub) – Service to connect to the gRPC server (not required)

  • available (bool) – Get only available nodes

Returns:

List of node IDs

Return type:

List[str]

Examples

>>> cluster = Cluster("localhost", 50051)
>>> await cluster.get_node_ids(available=True)
['a2ff3abc76e045d7bb4a04a5ac416318', 'f8a7c1d7f9d44f6b9d0d3d5d9c7b3c9d']
async select_results(service: UserStub, *queries: Tuple[str, List[str]]) Dict[str, Results]

Select results from the database given the queries

Parameters:
  • service (UserStub) – Service to connect to the gRPC server (not required)

  • queries (Tuple[str, List[str]]) – Queries under the structure (swarm_id, tags)

Returns:

Dictionary of (swarm_id, results)

Return type:

Dict[str, Results]

Examples

>>> cluster = Cluster("localhost", 50051)
>>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9'
>>> tags = ["accuracy", "loss"]
>>> await cluster.select_results((swarm_id, tags))
{'9415dfd18edc45c9a6ffdf2055007bf9': Results(len(iteration)=10, len(nodes)=8, len(tags)=2)}

Notes

The argument service is managed by a decorator under the hood.

async collect_logs(service: UserStub, id: str) List[Dict[str, str]]

Collect the logs stored in the Manager database given the ID

Parameters:
  • service (UserStub) – Service to connect to the gRPC server (not required)

  • id (str) – Swarm ID or Node ID

Returns:

List of several informations containing logs

Return type:

List[Dict[str, str]]

Examples

>>> cluster = Cluster("localhost", 50051)
>>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9'
>>> cluster.collect_logs(swarm_id)
[{'swarm_id': '9415dfd18edc45c9a6ffdf2055007bf9', 'task_id': 'a2ff3abc76e045d7bb4a04a5ac416318', 'message': ..., 'datetime': '2024-09-13 10:18:06', 'iteration': '1'}, ...]

Notes

The argument service is managed by a decorator under the hood.