Collect your results

After deploying your swarm, you can monitor your results directly from your computer. For easier computations, we will use Polars, but you could use Pandas <https://pandas.pydata.org/>`_ or any library you prefer.

Implementation

# collect_results.py
import asyncio
import polars as pl # pip install polars
from manta.cluster import AsyncCluster
from manta.utils import bytes_to_dict

async def load_swarm(cluster):
    swarms = await cluster.list_swarms()
    swarm_id = swarms[-1].swarm_id
    tasks = await cluster.get_tasks(swarm_id)
    tasks = {task["task_id"]: task["alias"] for task in tasks}
    node_ids = await cluster.get_node_ids(available=True)
    nodes = [{"name": node_id, "task": "normal"} for node_id in node_ids]
    data = {"tasks": tasks, "rounds": [], "nodes": nodes}
    return data, swarm_id, tasks

async def main():
    try:
        cluster = AsyncCluster("localhost", 50051)
        data, swarm_id, tasks = await load_swarm(cluster)

        while True:
            results = await cluster.select_results((swarm_id, ["metrics"]))
            metrics = results[swarm_id]("metrics") if results else []
            dfs = [pl.from_dicts(map(bytes_to_dict, values)) for values in metrics if all(values)]
            values = [df.mean().to_dicts()[0] for df in dfs]

            data["tags"] = list(values[0]) if values else []
            data["rounds"] = [{"round": i} | value for i, value in enumerate(values)]

            for node_id in (await cluster.get_node_ids(available=True)):
                results = await cluster.select_tasks(node_id)
                node = next(filter(lambda node: node["name"] == node_id, data["nodes"]))
                node["task"] = "normal"
                if results:
                    result = results[0]
                    node["task"] = (
                        "normal" if int(result["status"]) == 3 else
                        tasks.get(result["task_id"])
                    )

            if data["rounds"]:
                print("Rounds dataframe")
                print(pl.from_dicts(data["rounds"]))
            if data["nodes"]:
                print("Node status")
                print(pl.from_dicts(data["nodes"]))
            await asyncio.sleep(1)
    except ConnectionRefusedError:
        print("Manager not accessible")

try:
    asyncio.run(main())
except KeyboardInterrupt:
    pass

Then, you can start your script:

python collect_results.py

Output:

Rounds dataframe
shape: (18, 4)
┌───────┬──────────┬──────────┬─────────┐
│ round ┆ loss     ┆ val_loss ┆ val_acc │
│ ---   ┆ ---      ┆ ---      ┆ ---     │
│ i64   ┆ f64      ┆ f64      ┆ f64     │
╞═══════╪══════════╪══════════╪═════════╡
│ 0     ┆ 2.191565 ┆ 2.195249 ┆ 0.389   │
│ 1     ┆ 1.797617 ┆ 1.797839 ┆ 0.529   │
│ 2     ┆ 1.066547 ┆ 1.029428 ┆ 0.7437  │
│ 3     ┆ 0.766294 ┆ 0.679413 ┆ 0.7988  │
│ 4     ┆ 0.666064 ┆ 0.56257  ┆ 0.8276  │
│ …     ┆ …        ┆ …        ┆ …       │
│ 13    ┆ 0.42975  ┆ 0.365476 ┆ 0.8863  │
│ 14    ┆ 0.418534 ┆ 0.35849  ┆ 0.8883  │
│ 15    ┆ 0.408234 ┆ 0.351929 ┆ 0.8907  │
│ 16    ┆ 0.400571 ┆ 0.346585 ┆ 0.892   │
│ 17    ┆ 0.393336 ┆ 0.340408 ┆ 0.8946  │
└───────┴──────────┴──────────┴─────────┘
Node status
shape: (4, 2)
┌─────────────────────────────────┬───────────┐
│ name                            ┆ task      │
│ ---                             ┆ ---       │
│ str                             ┆ str       │
╞═════════════════════════════════╪═══════════╡
│ 8dd4359ecae049ac8bbbd06752d08d… ┆ normal    │
│ 48d3d2e1542f4e4c85693ad824646d… ┆ normal    │
│ fc65ed7e460d46438c72f843ba2a30… ┆ scheduler │
│ c0806bf0992745eebf02a60d1e9d89… ┆ normal    │
└─────────────────────────────────┴───────────┘

More details

Let’s restart step by step the script :

# collect_results.py
import asyncio
import polars as pl # pip install polars
from manta.cluster import AsyncCluster
from manta.utils import bytes_to_dict

For better performance, instead of using Cluster, we use AsyncCluster for asynchronous interactions. bytes_to_dict helps us to convert results from bytes to dict. Results will always be received bytes. However, they could be from torch or even numpy. See the casting section for more information.

async def load_swarm(cluster):
    swarms = await cluster.list_swarms()
    swarm_id = swarms[-1].swarm_id
    tasks = await cluster.get_tasks(swarm_id)
    tasks = {task["task_id"]: task["alias"] for task in tasks}
    node_ids = await cluster.get_node_ids(available=True)
    nodes = [{"name": node_id, "task": "normal"} for node_id in node_ids]
    data = {"tasks": tasks, "rounds": [], "nodes": nodes}
    return data, swarm_id, tasks

We start by getting the list of swarms sorted by datetime (older to newer order). The selected swarm_id is the newest one. Then we get all tasks from the Swarm definition. It helps us to make a mapping between task IDs and aliases. And we request all available node ids.

async def main():
    try:
        cluster = AsyncCluster("localhost", 50051)
        data, swarm_id, tasks = await load_swarm(cluster)

        while True:
            # Some updates
            # ...
            await asyncio.sleep(1)
    except ConnectionRefusedError:
        print("Manager not accessible")

try:
    asyncio.run(main())
except KeyboardInterrupt:
    pass

The main function is a infinte loop which make updates on values from load_swarm.

results = await cluster.select_results((swarm_id, ["metrics"]))
metrics = results[swarm_id]("metrics") if results else []
dfs = [pl.from_dicts(map(bytes_to_dict, values)) for values in metrics if all(values)]
values = [df.mean().to_dicts()[0] for df in dfs]

We select the tag "metrics" of the swarm swarm_id. It gives us a dictionary where keys are swarm ids and values are results. See the results section for more information. Then after selecting the tag "metrics" in results, we cast bytes to dict to manipulate them easier. Each item of these values is a dictionary {"loss": ..., "val_loss": ..., "val_acc": ...} from results of the Worker task. Lastly, we compute the mean of values by converting them into a pl.Dataframe, applying df.mean() and extracting the mean value.

data["tags"] = list(values[0]) if values else []
data["rounds"] = [{"round": i} | value for i, value in enumerate(values)]

Data buffer is updated with values computed above.

for node_id in (await cluster.get_node_ids(available=True)):
    results = await cluster.select_tasks(node_id)
    node = next(filter(lambda node: node["name"] == node_id, data["nodes"]))
    node["task"] = "normal"
    if results:
        result = results[0]
        node["task"] = (
            "normal" if int(result["status"]) == 3 else
            tasks.get(result["task_id"])
        )

To get node status, we request all available node ids. cluster.select_tasks(node_id) allows us to get the node activities. We select the current node values (ex: {"name": node_id, "task": "normal"}) and store its reference into the variable node. We update its status given node activities collected previously. results[0] is the newer deployed task on the node. Given its status and the task_id, we can get the current executed task on the node.

if data["rounds"]:
    print("Rounds dataframe")
    print(pl.from_dicts(data["rounds"]))
if data["nodes"]:
    print("Node status")
    print(pl.from_dicts(data["nodes"]))

These lines print the data as table by converting them into dataframes.