Cluster - Contrôlez les Swarms et accédez aux résultats et aux journaux

class manta.cluster.Cluster(host: str = 'localhost', port: int = 50051, ca_port: int = 50050, secured_token: str | None = None)

Client gRPC synchrone pour se connecter à un cluster

Paramètres:
  • host (str) – Adresse du Manager

  • port (int) – Port du Manager

  • ca_port (int) – Port pour le CA

  • secured_token (Optional[str]) – Jeton sécurisé pour la première connexion du Manager

Voir aussi

AsyncCluster

Client gRPC asynchrone

classmethod connect(host: str = 'localhost', port: int = 50051, ca_port: int = 50050, secured_token: str | None = None) Cluster

Créer un client Cluster gRPC.

Paramètres:
  • host (str) – Adresse du Manager, par défaut « localhost »

  • port (int) – Port du Manager, par défaut 50051

  • ca_port (int) – Port pour le CA

  • secured_token (Optional[str]) – Jeton sécurisé pour la première connexion du Manager

Renvoie:

Client gRPC du cluster

Type renvoyé:

Cluster

Exemples

>>> Cluster.connect(
...     "localhost", 50051, 50050, secured_token="TOKEN"
... )
is_available() Response

Vérifiez si le serveur est disponible

Renvoie:

Réponse du serveur

Type renvoyé:

Response

Exemples

>>> cluster = Cluster("localhost", 50051)
>>> cluster.is_available()
Response(message="Available", status=1)
start_swarm(swarm_id: str) Response

Démarrer un Swarm en fonction de son ID s’il est trouvé dans la base de données

Paramètres:

swarm_id (str) – ID du Swarm

Renvoie:

Réponse du serveur

Type renvoyé:

Response

Exemples

>>> cluster = Cluster("localhost", 50051)
>>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9'
>>> cluster.start_swarm(swarm_id)
Response(message='Swarm 9415dfd18edc45c9a6ffdf2055007bf9 has started.', status=2)
send_swarm(swarm: Swarm) SwarmOverview

Envoyer un Swarm au serveur pour qu’il soit inséré dans sa base de données

Paramètres:

swarm (Swarm) – Toute classe qui hérite de la classe Swarm

Renvoie:

Aperçu des métadonnées du Swarm enregistré sur le serveur

Type renvoyé:

SwarmOverview

Exemples

>>> cluster = Cluster("localhost", 50051)
>>> swarm = FLSwarm()
>>> cluster.send_swarm(swarm)
SwarmOverview(swarm_id='9415dfd18edc45c9a6ffdf2055007bf9', datetime='2024-09-13 10:18:06', status='ACTIVE')
deploy_swarm(swarm: Swarm) Deployment

Déployer un Swarm

Paramètres:

swarm (Swarm) – Toute classe qui hérite de la classe Swarm

Renvoie:

Réponse du serveur

Type renvoyé:

Deployment

Exemples

>>> cluster = Cluster("localhost", 50051)
>>> swarm = FLSwarm()
>>> cluster.deploy_swarm(swarm)
Deployment(swarm_id='9415dfd18edc45c9a6ffdf2055007bf9', datetime='2024-09-13 10:18:06', status='ACTIVE', message='Swarm 9415dfd18edc45c9a6ffdf2055007bf9 has started.')
list_swarms() List[SwarmOverview]

Lister les Swarms enregistrés sur le serveur

Renvoie:

Liste des aperçus des Swarms

Type renvoyé:

List[SwarmOverview]

Exemples

>>> cluster = Cluster("localhost", 50051)
>>> cluster.list_swarms()
[SwarmOverview(swarm_id='9415dfd18edc45c9a6ffdf2055007bf9', datetime='2024-09-13 10:18:06', status='ACTIVE')]
remove_swarm(swarm_id: str) Response

Supprimer un Swarm sur le cluster

Paramètres:

swarm_id (str) – ID du Swarm

Renvoie:

Réponse du serveur

Type renvoyé:

Response

Exemples

>>> cluster = Cluster("localhost", 50051)
>>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9'
>>> cluster.remove_swarm(swarm_id)
Response(message='9415dfd18edc45c9a6ffdf2055007bf9 deleted.', status=2)
get_tasks(swarm_id: str) List[Dict[str, str]]

Obtenir les tâches selon l’ID du Swarm

Paramètres:

swarm_id (str) – ID du Swarm

Renvoie:

Liste d’informations sur les tâches

Type renvoyé:

List[Dict[str, str]]

Exemples

>>> cluster = Cluster("localhost", 50051)
>>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9'
>>> cluster.get_tasks(swarm_id)
[{'task_id': 'adcc83a14d8642bbb6413d196fb0b5b8', 'image': 'fl-pytorch-mnist:latest', 'previous_tasks': '[]', 'next_tasks': "['435709c73c5344c5a3d5754c7eb2ab6d']", ...}, ...]

Avertissement

Le serveur ne retourne pas les payloads dans la réponse. Demandez à l’équipe Manta d’ajouter cette fonctionnalité.

select_tasks(node_id: str) List[Dict[str, str]]

Sélectionner les tâches en fonction de l’ID de nœud. Les tâches sont triées par progression du Swarm de la fin vers le début.

Paramètres:

node_id (str) – ID de nœud

Renvoie:

Liste d’informations sur les tâches

Type renvoyé:

List[Dict[str, str]]

Exemples

>>> cluster = Cluster("localhost", 50051)
>>> node_id = 'a2ff3abc76e045d7bb4a04a5ac416318'
>>> cluster.select_tasks(node_id)
[{'task_id': 'adcc83a14d8642bbb6413d196fb0b5b8', 'swarm_id': '435709c73c5344c5a3d5754c7eb2ab6d', 'status': ...}, ...]

Avertissement

Le serveur ne retourne pas les payloads dans la réponse. Demandez à l’équipe Manta d’ajouter cette fonctionnalité.

get_node_ids(available: bool = False) List[str]

Retourner les IDs de nœuds

Paramètres:

available (bool) – Retourner seulement les nœuds disponibles

Renvoie:

Liste des IDs de nœuds

Type renvoyé:

List[str]

Exemples

>>> cluster = Cluster("localhost", 50051)
>>> cluster.get_node_ids(available=True)
['a2ff3abc76e045d7bb4a04a5ac416318', 'f8a7c1d7f9d44f6b9d0d3d5d9c7b3c9d']
select_results(*queries: Tuple[str, List[str]]) Dict[str, Results]

Sélectionner les résultats de la base de données en fonction des requêtes

Paramètres:

queries (Tuple[str, List[str]]) – Requêtes sous la structure (swarm_id, tags)

Renvoie:

Dictionnaire de (swarm_id, results)

Type renvoyé:

Dict[str, Results]

Exemples

>>> cluster = Cluster("localhost", 50051)
>>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9'
>>> tags = ["accuracy", "loss"]
>>> cluster.select_results((swarm_id, tags))
{'9415dfd18edc45c9a6ffdf2055007bf9': Results(len(iteration)=10, len(nodes)=8, len(tags)=2)}
collect_logs(id: str) List[Dict[str, str]]

Collecter les journaux stockés dans la base de données Manager en fonction de l’ID

Paramètres:

id (str) – ID du Swarm ou ID de nœud

Renvoie:

Liste de plusieurs informations contenant des logs

Type renvoyé:

List[Dict[str, str]]

Exemples

>>> cluster = Cluster("localhost", 50051)
>>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9'
>>> cluster.collect_logs(swarm_id)
[{'swarm_id': '9415dfd18edc45c9a6ffdf2055007bf9', 'task_id': 'a2ff3abc76e045d7bb4a04a5ac416318', 'message': ..., 'datetime': '2024-09-13 10:18:06', 'iteration': '1'}, ...]
class manta.cluster.AsyncCluster(host: str = 'localhost', port: int = 50051, ca_port: int = 50050, secured_token: str | None = None)

Client gRPC asynchrone pour se connecter à un cluster

Paramètres:
  • host (str) – Adresse du Manager

  • port (int) – Port du Manager

  • ca_port (int) – Port pour le CA

  • secured_token (Optional[str]) – Jeton sécurisé pour la première connexion du Manager

async classmethod connect(host: str = 'localhost', port: int = 50051, ca_port: int = 50050, secured_token: str | None = None) Cluster

Créer un client Cluster gRPC.

Paramètres:
  • host (str) – Adresse du Manager, par défaut « localhost »

  • port (int) – Port du Manager, par défaut 50051

  • ca_port (int) – Port pour le CA

  • secured_token (Optional[str]) – Jeton sécurisé pour la première connexion du Manager

Renvoie:

Client gRPC du cluster

Type renvoyé:

Cluster

Exemples

>>> await AsyncCluster.connect(
...     "localhost", 50051, 50050, secured_token="TOKEN"
... )
async is_available(service: UserStub) Response

Vérifiez si le serveur est disponible

Paramètres:

service (UserStub) – Service de connexion au serveur gRPC (non requis)

Renvoie:

Réponse du serveur

Type renvoyé:

Response

Exemples

>>> cluster = AsyncCluster("localhost", 50051)
>>> await cluster.is_available()
Response(message="Available", status=1)

Remarques

L’argument service est géré en interne par un décorateur.

async start_swarm(service: UserStub, swarm_id: str)

Démarrer un Swarm en fonction de son ID s’il est trouvé dans la base de données

Paramètres:
  • service (UserStub) – Service de connexion au serveur gRPC (non requis)

  • swarm_id (str) – ID du Swarm

Renvoie:

Réponse du serveur

Type renvoyé:

Response

Exemples

>>> cluster = AsyncCluster("localhost", 50051)
>>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9'
>>> await cluster.start_swarm(swarm_id)
Response(message='Swarm 9415dfd18edc45c9a6ffdf2055007bf9 has started.', status=2)

Remarques

L’argument service est géré en interne par un décorateur.

async send_swarm(service: UserStub, swarm: Swarm) SwarmOverview

Envoyer un Swarm au serveur pour qu’il soit inséré dans sa base de données

Paramètres:
  • service (UserStub) – Service de connexion au serveur gRPC (non requis)

  • swarm (Swarm) – Toute classe qui hérite de la classe Swarm

Renvoie:

Aperçu des métadonnées du Swarm enregistré sur le serveur

Type renvoyé:

SwarmOverview

Exemples

>>> cluster = AsyncCluster("localhost", 50051)
>>> swarm = FLSwarm()
>>> await cluster.send_swarm(swarm)
SwarmOverview(swarm_id='9415dfd18edc45c9a6ffdf2055007bf9', datetime='2024-09-13 10:18:06', status='ACTIVE')

Remarques

L’argument service est géré en interne par un décorateur.

async deploy_swarm(service: UserStub, swarm: Swarm) Deployment

Déployer un Swarm

Paramètres:
  • service (UserStub) – Service de connexion au serveur gRPC (non requis)

  • swarm (Swarm) – Toute classe qui hérite de la classe Swarm

Renvoie:

Réponse du serveur

Type renvoyé:

Deployment

Exemples

>>> cluster = AsyncCluster("localhost", 50051)
>>> swarm = FLSwarm()
>>> await cluster.deploy_swarm(swarm)
Deployment(swarm_id='9415dfd18edc45c9a6ffdf2055007bf9', datetime='2024-09-13 10:18:06', status='ACTIVE', message='Swarm 9415dfd18edc45c9a6ffdf2055007bf9 has started.')

Remarques

L’argument service est géré en interne par un décorateur.

async list_swarms(service: UserStub) List[SwarmOverview]

Lister les Swarms enregistrés sur le serveur

Paramètres:

service – Service de connexion au serveur gRPC (non requis)

Renvoie:

Liste des aperçus des Swarms

Type renvoyé:

List[SwarmOverview]

Exemples

>>> cluster = AsyncCluster("localhost", 50051)
>>> await cluster.list_swarms()
[SwarmOverview(swarm_id='9415dfd18edc45c9a6ffdf2055007bf9', datetime='2024-09-13 10:18:06', status='ACTIVE')]

Remarques

L’argument service est géré en interne par un décorateur.

async remove_swarm(service: UserStub, swarm_id: str) Response

Supprimer un Swarm sur le cluster

Paramètres:
  • service (UserStub) – Service de connexion au serveur gRPC (non requis)

  • swarm_id (str) – ID du Swarm

Renvoie:

Réponse du serveur

Type renvoyé:

Response

Exemples

>>> cluster = AsyncCluster("localhost", 50051)
>>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9'
>>> await cluster.remove_swarm(swarm_id)
Response(message='9415dfd18edc45c9a6ffdf2055007bf9 deleted.', status=2)

Remarques

L’argument service est géré en interne par un décorateur.

async get_tasks(service: UserStub, swarm_id: str) List[Dict[str, str]]

Obtenir les tâches selon l’ID du Swarm

Paramètres:
  • service (UserStub) – Service de connexion au serveur gRPC (non requis)

  • swarm_id (str) – ID du Swarm

Renvoie:

Liste d’informations sur les tâches

Type renvoyé:

List[Dict[str, str]]

Exemples

>>> cluster = AsyncCluster("localhost", 50051)
>>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9'
>>> await cluster.get_tasks(swarm_id)
[{'task_id': 'adcc83a14d8642bbb6413d196fb0b5b8', 'image': 'fl-pytorch-mnist:latest', 'previous_tasks': '[]', 'next_tasks': "['435709c73c5344c5a3d5754c7eb2ab6d']", ...}, ...]

Avertissement

Le serveur ne retourne pas les payloads dans la réponse. Demandez à l’équipe Manta d’ajouter cette fonctionnalité.

async select_tasks(service: UserStub, node_id: str) List[Dict[str, str]]

Sélectionner les tâches en fonction de l’ID de nœud. Les tâches sont triées par progression du Swarm de la fin vers le début.

Paramètres:
  • service (UserStub) – Service de connexion au serveur gRPC (non requis)

  • node_id (str) – ID de nœud

Renvoie:

Liste d’informations sur les tâches

Type renvoyé:

List[Dict[str, str]]

Exemples

>>> cluster = AsyncCluster("localhost", 50051)
>>> node_id = 'a2ff3abc76e045d7bb4a04a5ac416318'
>>> await cluster.select_tasks(node_id)
[{'task_id': 'adcc83a14d8642bbb6413d196fb0b5b8', 'swarm_id': '435709c73c5344c5a3d5754c7eb2ab6d', 'status': ...}, ...]

Avertissement

Le serveur ne retourne pas les payloads dans la réponse. Demandez à l’équipe Manta d’ajouter cette fonctionnalité.

async get_node_ids(service: UserStub, available: bool = False) List[str]

Retourner les IDs de nœuds

Paramètres:
  • service (UserStub) – Service de connexion au serveur gRPC (non requis)

  • available (bool) – Retourner seulement les nœuds disponibles

Renvoie:

Liste des IDs de nœuds

Type renvoyé:

List[str]

Exemples

>>> cluster = AsyncCluster("localhost", 50051)
>>> await cluster.get_node_ids(available=True)
['a2ff3abc76e045d7bb4a04a5ac416318', 'f8a7c1d7f9d44f6b9d0d3d5d9c7b3c9d']
async select_results(service: UserStub, *queries: Tuple[str, List[str]]) Dict[str, Results]

Sélectionner les résultats de la base de données en fonction des requêtes

Paramètres:
  • service (UserStub) – Service de connexion au serveur gRPC (non requis)

  • queries (Tuple[str, List[str]]) – Requêtes sous la structure (swarm_id, tags)

Renvoie:

Dictionnaire de (swarm_id, results)

Type renvoyé:

Dict[str, Results]

Exemples

>>> cluster = AsyncCluster("localhost", 50051)
>>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9'
>>> tags = ["accuracy", "loss"]
>>> await cluster.select_results((swarm_id, tags))
{'9415dfd18edc45c9a6ffdf2055007bf9': Results(len(iteration)=10, len(nodes)=8, len(tags)=2)}

Remarques

L’argument service est géré en interne par un décorateur.

async collect_logs(service: UserStub, id: str) List[Dict[str, str]]

Collecter les journaux stockés dans la base de données Manager en fonction de l’ID

Paramètres:
  • service (UserStub) – Service de connexion au serveur gRPC (non requis)

  • id (str) – ID du Swarm ou ID de nœud

Renvoie:

Liste de plusieurs informations contenant des logs

Type renvoyé:

List[Dict[str, str]]

Exemples

>>> cluster = AsyncCluster("localhost", 50051)
>>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9'
>>> cluster.collect_logs(swarm_id)
[{'swarm_id': '9415dfd18edc45c9a6ffdf2055007bf9', 'task_id': 'a2ff3abc76e045d7bb4a04a5ac416318', 'message': ..., 'datetime': '2024-09-13 10:18:06', 'iteration': '1'}, ...]

Remarques

L’argument service est géré en interne par un décorateur.