Cluster - Contrôlez les Swarms et accédez aux résultats et aux journaux¶
- class manta.cluster.Cluster(host: str = 'localhost', port: int = 50051, ca_port: int = 50050, secured_token: str | None = None)¶
Client gRPC synchrone pour se connecter à un cluster
- Paramètres:
host (str) – Adresse du Manager
port (int) – Port du Manager
ca_port (int) – Port pour le CA
secured_token (Optional[str]) – Jeton sécurisé pour la première connexion du Manager
Voir aussi
AsyncCluster
Client gRPC asynchrone
- classmethod connect(host: str = 'localhost', port: int = 50051, ca_port: int = 50050, secured_token: str | None = None) Cluster ¶
Créer un client Cluster gRPC.
- Paramètres:
host (str) – Adresse du Manager, par défaut « localhost »
port (int) – Port du Manager, par défaut 50051
ca_port (int) – Port pour le CA
secured_token (Optional[str]) – Jeton sécurisé pour la première connexion du Manager
- Renvoie:
Client gRPC du cluster
- Type renvoyé:
Exemples
>>> Cluster.connect( ... "localhost", 50051, 50050, secured_token="TOKEN" ... )
- is_available() Response ¶
Vérifiez si le serveur est disponible
- Renvoie:
Réponse du serveur
- Type renvoyé:
Response
Exemples
>>> cluster = Cluster("localhost", 50051) >>> cluster.is_available() Response(message="Available", status=1)
- start_swarm(swarm_id: str) Response ¶
Démarrer un Swarm en fonction de son ID s’il est trouvé dans la base de données
- Paramètres:
swarm_id (str) – ID du Swarm
- Renvoie:
Réponse du serveur
- Type renvoyé:
Response
Exemples
>>> cluster = Cluster("localhost", 50051) >>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9' >>> cluster.start_swarm(swarm_id) Response(message='Swarm 9415dfd18edc45c9a6ffdf2055007bf9 has started.', status=2)
- send_swarm(swarm: Swarm) SwarmOverview ¶
Envoyer un Swarm au serveur pour qu’il soit inséré dans sa base de données
- Paramètres:
swarm (Swarm) – Toute classe qui hérite de la classe
Swarm
- Renvoie:
Aperçu des métadonnées du Swarm enregistré sur le serveur
- Type renvoyé:
SwarmOverview
Exemples
>>> cluster = Cluster("localhost", 50051) >>> swarm = FLSwarm() >>> cluster.send_swarm(swarm) SwarmOverview(swarm_id='9415dfd18edc45c9a6ffdf2055007bf9', datetime='2024-09-13 10:18:06', status='ACTIVE')
- deploy_swarm(swarm: Swarm) Deployment ¶
Déployer un Swarm
- Paramètres:
swarm (Swarm) – Toute classe qui hérite de la classe
Swarm
- Renvoie:
Réponse du serveur
- Type renvoyé:
Deployment
Exemples
>>> cluster = Cluster("localhost", 50051) >>> swarm = FLSwarm() >>> cluster.deploy_swarm(swarm) Deployment(swarm_id='9415dfd18edc45c9a6ffdf2055007bf9', datetime='2024-09-13 10:18:06', status='ACTIVE', message='Swarm 9415dfd18edc45c9a6ffdf2055007bf9 has started.')
- list_swarms() List[SwarmOverview] ¶
Lister les Swarms enregistrés sur le serveur
- Renvoie:
Liste des aperçus des Swarms
- Type renvoyé:
List[SwarmOverview]
Exemples
>>> cluster = Cluster("localhost", 50051) >>> cluster.list_swarms() [SwarmOverview(swarm_id='9415dfd18edc45c9a6ffdf2055007bf9', datetime='2024-09-13 10:18:06', status='ACTIVE')]
- remove_swarm(swarm_id: str) Response ¶
Supprimer un Swarm sur le cluster
- Paramètres:
swarm_id (str) – ID du Swarm
- Renvoie:
Réponse du serveur
- Type renvoyé:
Response
Exemples
>>> cluster = Cluster("localhost", 50051) >>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9' >>> cluster.remove_swarm(swarm_id) Response(message='9415dfd18edc45c9a6ffdf2055007bf9 deleted.', status=2)
- get_tasks(swarm_id: str) List[Dict[str, str]] ¶
Obtenir les tâches selon l’ID du Swarm
- Paramètres:
swarm_id (str) – ID du Swarm
- Renvoie:
Liste d’informations sur les tâches
- Type renvoyé:
List[Dict[str, str]]
Exemples
>>> cluster = Cluster("localhost", 50051) >>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9' >>> cluster.get_tasks(swarm_id) [{'task_id': 'adcc83a14d8642bbb6413d196fb0b5b8', 'image': 'fl-pytorch-mnist:latest', 'previous_tasks': '[]', 'next_tasks': "['435709c73c5344c5a3d5754c7eb2ab6d']", ...}, ...]
Avertissement
Le serveur ne retourne pas les payloads dans la réponse. Demandez à l’équipe Manta d’ajouter cette fonctionnalité.
- select_tasks(node_id: str) List[Dict[str, str]] ¶
Sélectionner les tâches en fonction de l’ID de nœud. Les tâches sont triées par progression du Swarm de la fin vers le début.
- Paramètres:
node_id (str) – ID de nœud
- Renvoie:
Liste d’informations sur les tâches
- Type renvoyé:
List[Dict[str, str]]
Exemples
>>> cluster = Cluster("localhost", 50051) >>> node_id = 'a2ff3abc76e045d7bb4a04a5ac416318' >>> cluster.select_tasks(node_id) [{'task_id': 'adcc83a14d8642bbb6413d196fb0b5b8', 'swarm_id': '435709c73c5344c5a3d5754c7eb2ab6d', 'status': ...}, ...]
Avertissement
Le serveur ne retourne pas les payloads dans la réponse. Demandez à l’équipe Manta d’ajouter cette fonctionnalité.
- get_node_ids(available: bool = False) List[str] ¶
Retourner les IDs de nœuds
- Paramètres:
available (bool) – Retourner seulement les nœuds disponibles
- Renvoie:
Liste des IDs de nœuds
- Type renvoyé:
List[str]
Exemples
>>> cluster = Cluster("localhost", 50051) >>> cluster.get_node_ids(available=True) ['a2ff3abc76e045d7bb4a04a5ac416318', 'f8a7c1d7f9d44f6b9d0d3d5d9c7b3c9d']
- select_results(*queries: Tuple[str, List[str]]) Dict[str, Results] ¶
Sélectionner les résultats de la base de données en fonction des requêtes
- Paramètres:
queries (Tuple[str, List[str]]) – Requêtes sous la structure
(swarm_id, tags)
- Renvoie:
Dictionnaire de
(swarm_id, results)
- Type renvoyé:
Dict[str, Results]
Exemples
>>> cluster = Cluster("localhost", 50051) >>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9' >>> tags = ["accuracy", "loss"] >>> cluster.select_results((swarm_id, tags)) {'9415dfd18edc45c9a6ffdf2055007bf9': Results(len(iteration)=10, len(nodes)=8, len(tags)=2)}
- collect_logs(id: str) List[Dict[str, str]] ¶
Collecter les journaux stockés dans la base de données
Manager
en fonction de l’ID- Paramètres:
id (str) – ID du Swarm ou ID de nœud
- Renvoie:
Liste de plusieurs informations contenant des logs
- Type renvoyé:
List[Dict[str, str]]
Exemples
>>> cluster = Cluster("localhost", 50051) >>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9' >>> cluster.collect_logs(swarm_id) [{'swarm_id': '9415dfd18edc45c9a6ffdf2055007bf9', 'task_id': 'a2ff3abc76e045d7bb4a04a5ac416318', 'message': ..., 'datetime': '2024-09-13 10:18:06', 'iteration': '1'}, ...]
- class manta.cluster.AsyncCluster(host: str = 'localhost', port: int = 50051, ca_port: int = 50050, secured_token: str | None = None)¶
Client gRPC asynchrone pour se connecter à un cluster
- Paramètres:
host (str) – Adresse du Manager
port (int) – Port du Manager
ca_port (int) – Port pour le CA
secured_token (Optional[str]) – Jeton sécurisé pour la première connexion du Manager
- async classmethod connect(host: str = 'localhost', port: int = 50051, ca_port: int = 50050, secured_token: str | None = None) Cluster ¶
Créer un client Cluster gRPC.
- Paramètres:
host (str) – Adresse du Manager, par défaut « localhost »
port (int) – Port du Manager, par défaut 50051
ca_port (int) – Port pour le CA
secured_token (Optional[str]) – Jeton sécurisé pour la première connexion du Manager
- Renvoie:
Client gRPC du cluster
- Type renvoyé:
Exemples
>>> await AsyncCluster.connect( ... "localhost", 50051, 50050, secured_token="TOKEN" ... )
- async is_available(service: UserStub) Response ¶
Vérifiez si le serveur est disponible
- Paramètres:
service (UserStub) – Service de connexion au serveur gRPC (non requis)
- Renvoie:
Réponse du serveur
- Type renvoyé:
Response
Exemples
>>> cluster = AsyncCluster("localhost", 50051) >>> await cluster.is_available() Response(message="Available", status=1)
Remarques
L’argument
service
est géré en interne par un décorateur.
- async start_swarm(service: UserStub, swarm_id: str)¶
Démarrer un Swarm en fonction de son ID s’il est trouvé dans la base de données
- Paramètres:
service (UserStub) – Service de connexion au serveur gRPC (non requis)
swarm_id (str) – ID du Swarm
- Renvoie:
Réponse du serveur
- Type renvoyé:
Response
Exemples
>>> cluster = AsyncCluster("localhost", 50051) >>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9' >>> await cluster.start_swarm(swarm_id) Response(message='Swarm 9415dfd18edc45c9a6ffdf2055007bf9 has started.', status=2)
Remarques
L’argument
service
est géré en interne par un décorateur.
- async send_swarm(service: UserStub, swarm: Swarm) SwarmOverview ¶
Envoyer un Swarm au serveur pour qu’il soit inséré dans sa base de données
- Paramètres:
service (UserStub) – Service de connexion au serveur gRPC (non requis)
swarm (Swarm) – Toute classe qui hérite de la classe
Swarm
- Renvoie:
Aperçu des métadonnées du Swarm enregistré sur le serveur
- Type renvoyé:
SwarmOverview
Exemples
>>> cluster = AsyncCluster("localhost", 50051) >>> swarm = FLSwarm() >>> await cluster.send_swarm(swarm) SwarmOverview(swarm_id='9415dfd18edc45c9a6ffdf2055007bf9', datetime='2024-09-13 10:18:06', status='ACTIVE')
Remarques
L’argument
service
est géré en interne par un décorateur.
- async deploy_swarm(service: UserStub, swarm: Swarm) Deployment ¶
Déployer un Swarm
- Paramètres:
service (UserStub) – Service de connexion au serveur gRPC (non requis)
swarm (Swarm) – Toute classe qui hérite de la classe
Swarm
- Renvoie:
Réponse du serveur
- Type renvoyé:
Deployment
Exemples
>>> cluster = AsyncCluster("localhost", 50051) >>> swarm = FLSwarm() >>> await cluster.deploy_swarm(swarm) Deployment(swarm_id='9415dfd18edc45c9a6ffdf2055007bf9', datetime='2024-09-13 10:18:06', status='ACTIVE', message='Swarm 9415dfd18edc45c9a6ffdf2055007bf9 has started.')
Remarques
L’argument
service
est géré en interne par un décorateur.
- async list_swarms(service: UserStub) List[SwarmOverview] ¶
Lister les Swarms enregistrés sur le serveur
- Paramètres:
service – Service de connexion au serveur gRPC (non requis)
- Renvoie:
Liste des aperçus des Swarms
- Type renvoyé:
List[SwarmOverview]
Exemples
>>> cluster = AsyncCluster("localhost", 50051) >>> await cluster.list_swarms() [SwarmOverview(swarm_id='9415dfd18edc45c9a6ffdf2055007bf9', datetime='2024-09-13 10:18:06', status='ACTIVE')]
Remarques
L’argument
service
est géré en interne par un décorateur.
- async remove_swarm(service: UserStub, swarm_id: str) Response ¶
Supprimer un Swarm sur le cluster
- Paramètres:
service (UserStub) – Service de connexion au serveur gRPC (non requis)
swarm_id (str) – ID du Swarm
- Renvoie:
Réponse du serveur
- Type renvoyé:
Response
Exemples
>>> cluster = AsyncCluster("localhost", 50051) >>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9' >>> await cluster.remove_swarm(swarm_id) Response(message='9415dfd18edc45c9a6ffdf2055007bf9 deleted.', status=2)
Remarques
L’argument
service
est géré en interne par un décorateur.
- async get_tasks(service: UserStub, swarm_id: str) List[Dict[str, str]] ¶
Obtenir les tâches selon l’ID du Swarm
- Paramètres:
service (UserStub) – Service de connexion au serveur gRPC (non requis)
swarm_id (str) – ID du Swarm
- Renvoie:
Liste d’informations sur les tâches
- Type renvoyé:
List[Dict[str, str]]
Exemples
>>> cluster = AsyncCluster("localhost", 50051) >>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9' >>> await cluster.get_tasks(swarm_id) [{'task_id': 'adcc83a14d8642bbb6413d196fb0b5b8', 'image': 'fl-pytorch-mnist:latest', 'previous_tasks': '[]', 'next_tasks': "['435709c73c5344c5a3d5754c7eb2ab6d']", ...}, ...]
Avertissement
Le serveur ne retourne pas les payloads dans la réponse. Demandez à l’équipe Manta d’ajouter cette fonctionnalité.
- async select_tasks(service: UserStub, node_id: str) List[Dict[str, str]] ¶
Sélectionner les tâches en fonction de l’ID de nœud. Les tâches sont triées par progression du Swarm de la fin vers le début.
- Paramètres:
service (UserStub) – Service de connexion au serveur gRPC (non requis)
node_id (str) – ID de nœud
- Renvoie:
Liste d’informations sur les tâches
- Type renvoyé:
List[Dict[str, str]]
Exemples
>>> cluster = AsyncCluster("localhost", 50051) >>> node_id = 'a2ff3abc76e045d7bb4a04a5ac416318' >>> await cluster.select_tasks(node_id) [{'task_id': 'adcc83a14d8642bbb6413d196fb0b5b8', 'swarm_id': '435709c73c5344c5a3d5754c7eb2ab6d', 'status': ...}, ...]
Avertissement
Le serveur ne retourne pas les payloads dans la réponse. Demandez à l’équipe Manta d’ajouter cette fonctionnalité.
- async get_node_ids(service: UserStub, available: bool = False) List[str] ¶
Retourner les IDs de nœuds
- Paramètres:
service (UserStub) – Service de connexion au serveur gRPC (non requis)
available (bool) – Retourner seulement les nœuds disponibles
- Renvoie:
Liste des IDs de nœuds
- Type renvoyé:
List[str]
Exemples
>>> cluster = AsyncCluster("localhost", 50051) >>> await cluster.get_node_ids(available=True) ['a2ff3abc76e045d7bb4a04a5ac416318', 'f8a7c1d7f9d44f6b9d0d3d5d9c7b3c9d']
- async select_results(service: UserStub, *queries: Tuple[str, List[str]]) Dict[str, Results] ¶
Sélectionner les résultats de la base de données en fonction des requêtes
- Paramètres:
service (UserStub) – Service de connexion au serveur gRPC (non requis)
queries (Tuple[str, List[str]]) – Requêtes sous la structure
(swarm_id, tags)
- Renvoie:
Dictionnaire de
(swarm_id, results)
- Type renvoyé:
Dict[str, Results]
Exemples
>>> cluster = AsyncCluster("localhost", 50051) >>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9' >>> tags = ["accuracy", "loss"] >>> await cluster.select_results((swarm_id, tags)) {'9415dfd18edc45c9a6ffdf2055007bf9': Results(len(iteration)=10, len(nodes)=8, len(tags)=2)}
Remarques
L’argument
service
est géré en interne par un décorateur.
- async collect_logs(service: UserStub, id: str) List[Dict[str, str]] ¶
Collecter les journaux stockés dans la base de données
Manager
en fonction de l’ID- Paramètres:
service (UserStub) – Service de connexion au serveur gRPC (non requis)
id (str) – ID du Swarm ou ID de nœud
- Renvoie:
Liste de plusieurs informations contenant des logs
- Type renvoyé:
List[Dict[str, str]]
Exemples
>>> cluster = AsyncCluster("localhost", 50051) >>> swarm_id = '9415dfd18edc45c9a6ffdf2055007bf9' >>> cluster.collect_logs(swarm_id) [{'swarm_id': '9415dfd18edc45c9a6ffdf2055007bf9', 'task_id': 'a2ff3abc76e045d7bb4a04a5ac416318', 'message': ..., 'datetime': '2024-09-13 10:18:06', 'iteration': '1'}, ...]
Remarques
L’argument
service
est géré en interne par un décorateur.