Collecter vos résultats

Une fois que votre swarm est déployé, vous pouvez étudier vos résultats directement depuis votre ordinateur. Pour des calculs plus facile, nous allons utiliser Polars, mais vous pouvez aussi utliser Pandas ou une autre librairie que vous préférez.

Implémentation

# collect_results.py
import asyncio
import polars as pl # pip install polars
from manta.cluster import AsyncCluster
from manta.utils import bytes_to_dict

async def load_swarm(cluster):
    swarms = await cluster.list_swarms()
    swarm_id = swarms[-1].swarm_id
    tasks = await cluster.get_tasks(swarm_id)
    tasks = {task["task_id"]: task["alias"] for task in tasks}
    node_ids = await cluster.get_node_ids(available=True)
    nodes = [{"name": node_id, "task": "normal"} for node_id in node_ids]
    data = {"tasks": tasks, "rounds": [], "nodes": nodes}
    return data, swarm_id, tasks

async def main():
    try:
        cluster = AsyncCluster("localhost", 50051)
        data, swarm_id, tasks = await load_swarm(cluster)

        while True:
            results = await cluster.select_results((swarm_id, ["metrics"]))
            metrics = results[swarm_id]("metrics") if results else []
            dfs = [pl.from_dicts(map(bytes_to_dict, values)) for values in metrics if all(values)]
            values = [df.mean().to_dicts()[0] for df in dfs]

            data["tags"] = list(values[0]) if values else []
            data["rounds"] = [{"round": i} | value for i, value in enumerate(values)]

            for node_id in (await cluster.get_node_ids(available=True)):
                results = await cluster.select_tasks(node_id)
                node = next(filter(lambda node: node["name"] == node_id, data["nodes"]))
                node["task"] = "normal"
                if results:
                    result = results[0]
                    node["task"] = (
                        "normal" if int(result["status"]) == 3 else
                        tasks.get(result["task_id"])
                    )

            if data["rounds"]:
                print("Rounds dataframe")
                print(pl.from_dicts(data["rounds"]))
            if data["nodes"]:
                print("Node status")
                print(pl.from_dicts(data["nodes"]))
            await asyncio.sleep(1)
    except ConnectionRefusedError:
        print("Manager not accessible")

try:
    asyncio.run(main())
except KeyboardInterrupt:
    pass

Puis vous pouvez simplement lancer votre script :

python collect_results.py

Sortie:

Rounds dataframe
shape: (18, 4)
┌───────┬──────────┬──────────┬─────────┐
│ round ┆ loss     ┆ val_loss ┆ val_acc │
│ ---   ┆ ---      ┆ ---      ┆ ---     │
│ i64   ┆ f64      ┆ f64      ┆ f64     │
╞═══════╪══════════╪══════════╪═════════╡
│ 0     ┆ 2.191565 ┆ 2.195249 ┆ 0.389   │
│ 1     ┆ 1.797617 ┆ 1.797839 ┆ 0.529   │
│ 2     ┆ 1.066547 ┆ 1.029428 ┆ 0.7437  │
│ 3     ┆ 0.766294 ┆ 0.679413 ┆ 0.7988  │
│ 4     ┆ 0.666064 ┆ 0.56257  ┆ 0.8276  │
│ …     ┆ …        ┆ …        ┆ …       │
│ 13    ┆ 0.42975  ┆ 0.365476 ┆ 0.8863  │
│ 14    ┆ 0.418534 ┆ 0.35849  ┆ 0.8883  │
│ 15    ┆ 0.408234 ┆ 0.351929 ┆ 0.8907  │
│ 16    ┆ 0.400571 ┆ 0.346585 ┆ 0.892   │
│ 17    ┆ 0.393336 ┆ 0.340408 ┆ 0.8946  │
└───────┴──────────┴──────────┴─────────┘
Node status
shape: (4, 2)
┌─────────────────────────────────┬───────────┐
│ name                            ┆ task      │
│ ---                             ┆ ---       │
│ str                             ┆ str       │
╞═════════════════════════════════╪═══════════╡
│ 8dd4359ecae049ac8bbbd06752d08d… ┆ normal    │
│ 48d3d2e1542f4e4c85693ad824646d… ┆ normal    │
│ fc65ed7e460d46438c72f843ba2a30… ┆ scheduler │
│ c0806bf0992745eebf02a60d1e9d89… ┆ normal    │
└─────────────────────────────────┴───────────┘

Plus de détails

Commencçons par reprendre étape par étape le script :

# collect_results.py
import asyncio
import polars as pl # pip install polars
from manta.cluster import AsyncCluster
from manta.utils import bytes_to_dict

Pour de meilleurs performances, au lieu d’utiliser Cluster, nous utilisons AsyncCluster pour des intéractions asynchrones. bytes_to_dict nous aide à convertir results en dict à partir de bytes. Les résultats seront toujours reçus en bytes. Cependant, ils pourraient provenir torch ou même numpy. Voir la section casting pour plus d’informations.

async def load_swarm(cluster):
    swarms = await cluster.list_swarms()
    swarm_id = swarms[-1].swarm_id
    tasks = await cluster.get_tasks(swarm_id)
    tasks = {task["task_id"]: task["alias"] for task in tasks}
    node_ids = await cluster.get_node_ids(available=True)
    nodes = [{"name": node_id, "task": "normal"} for node_id in node_ids]
    data = {"tasks": tasks, "rounds": [], "nodes": nodes}
    return data, swarm_id, tasks

Nous commençons par recevoir une liste de swarms triée par date (dans l’ordre du plus ancien au plus récent). Le swarm_id sélectionné est le plus récent. Puis nous obtenons toutes les tâches de la défintion du Swarm. Cela nous aide à faire une correspondance entre les IDs des tâches et les alias. Et nous requêtons tous les IDs des noeuds disponibles.

async def main():
    try:
        cluster = AsyncCluster("localhost", 50051)
        data, swarm_id, tasks = await load_swarm(cluster)

        while True:
            # Some updates
            # ...
            await asyncio.sleep(1)
    except ConnectionRefusedError:
        print("Manager not accessible")

try:
    asyncio.run(main())
except KeyboardInterrupt:
    pass

Cette fonction main est une boucle infinie qui met à jour les valeurs venant de load_swarm.

results = await cluster.select_results((swarm_id, ["metrics"]))
metrics = results[swarm_id]("metrics") if results else []
dfs = [pl.from_dicts(map(bytes_to_dict, values)) for values in metrics if all(values)]
values = [df.mean().to_dicts()[0] for df in dfs]

Nous sélectionnons le tag "metrics" du swarm swarm_id. Il nous donne un dictionnaire où les clés sont les IDs des swarms et les valeurs sont les résultats. Voir la section résultats pour plus d’informations. Puis après avoir sélectionné le tag "metrics" dans les résultats, nous changeons des :code:`bytes en dict pour les manipuler plus facilement. Chacun des éléments de ces valeurs sont un dictionnaire {"loss": ..., "val_loss", ..., "val_acc": ... de résultats de la tâche Worker. Pour finir, nous calculons la moyenne des valeurs en les mettant dans pl.Dataframe, pour calculer df.mean() et extraire la valeur moyenne.

data["tags"] = list(values[0]) if values else []
data["rounds"] = [{"round": i} | value for i, value in enumerate(values)]

Un buffer de données est mis à jour avec les valeurs calculés au dessus.

for node_id in (await cluster.get_node_ids(available=True)):
    results = await cluster.select_tasks(node_id)
    node = next(filter(lambda node: node["name"] == node_id, data["nodes"]))
    node["task"] = "normal"
    if results:
        result = results[0]
        node["task"] = (
            "normal" if int(result["status"]) == 3 else
            tasks.get(result["task_id"])
        )

Pour avoir le status d’un noeud, nous requêtons tous les IDs des noeuds disponibles. cluster.select_tasks(node_id) nous permet d’avoir les activités d’un noeud. Nous sélectionnons les valeurs actuelles des noeuds (ex: {"name": node_id, "task": "normal"}) et nous stockons leur référence dans une variable node. Nous mettons à jour leur status en donnant les activités des noeuds collectées précédemment. results[0] est la tâche déployée la plus récente d’un noeud. Selon son status et le task_id, nous pouvont avoir la tâche actuellement exécutée du noeud.

if data["rounds"]:
    print("Rounds dataframe")
    print(pl.from_dicts(data["rounds"]))
if data["nodes"]:
    print("Node status")
    print(pl.from_dicts(data["nodes"]))

Ces lignes affichent les data sous forme de tableau en les convertissant en dataframes.