Collect your results
====================
After deploying your swarm, you can monitor your results directly from your computer.
For easier computations, we will use `Polars `_, but you could use Pandas `_ or any library you prefer.
Implementation
--------------
.. code:: python
# collect_results.py
import asyncio
import polars as pl # pip install polars
from manta.cluster import AsyncCluster
from manta.utils import bytes_to_dict
async def load_swarm(cluster):
swarms = await cluster.list_swarms()
swarm_id = swarms[-1].swarm_id
tasks = await cluster.get_tasks(swarm_id)
tasks = {task["task_id"]: task["alias"] for task in tasks}
node_ids = await cluster.get_node_ids(available=True)
nodes = [{"name": node_id, "task": "normal"} for node_id in node_ids]
data = {"tasks": tasks, "rounds": [], "nodes": nodes}
return data, swarm_id, tasks
async def main():
try:
cluster = AsyncCluster("localhost", 50051)
data, swarm_id, tasks = await load_swarm(cluster)
while True:
results = await cluster.select_results((swarm_id, ["metrics"]))
metrics = results[swarm_id]("metrics") if results else []
dfs = [pl.from_dicts(map(bytes_to_dict, values)) for values in metrics if all(values)]
values = [df.mean().to_dicts()[0] for df in dfs]
data["tags"] = list(values[0]) if values else []
data["rounds"] = [{"round": i} | value for i, value in enumerate(values)]
for node_id in (await cluster.get_node_ids(available=True)):
results = await cluster.select_tasks(node_id)
node = next(filter(lambda node: node["name"] == node_id, data["nodes"]))
node["task"] = "normal"
if results:
result = results[0]
node["task"] = (
"normal" if int(result["status"]) == 3 else
tasks.get(result["task_id"])
)
if data["rounds"]:
print("Rounds dataframe")
print(pl.from_dicts(data["rounds"]))
if data["nodes"]:
print("Node status")
print(pl.from_dicts(data["nodes"]))
await asyncio.sleep(1)
except ConnectionRefusedError:
print("Manager not accessible")
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
Then, you can start your script:
.. code:: bash
python collect_results.py
Output:
.. code::
Rounds dataframe
shape: (18, 4)
┌───────┬──────────┬──────────┬─────────┐
│ round ┆ loss ┆ val_loss ┆ val_acc │
│ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ f64 ┆ f64 ┆ f64 │
╞═══════╪══════════╪══════════╪═════════╡
│ 0 ┆ 2.191565 ┆ 2.195249 ┆ 0.389 │
│ 1 ┆ 1.797617 ┆ 1.797839 ┆ 0.529 │
│ 2 ┆ 1.066547 ┆ 1.029428 ┆ 0.7437 │
│ 3 ┆ 0.766294 ┆ 0.679413 ┆ 0.7988 │
│ 4 ┆ 0.666064 ┆ 0.56257 ┆ 0.8276 │
│ … ┆ … ┆ … ┆ … │
│ 13 ┆ 0.42975 ┆ 0.365476 ┆ 0.8863 │
│ 14 ┆ 0.418534 ┆ 0.35849 ┆ 0.8883 │
│ 15 ┆ 0.408234 ┆ 0.351929 ┆ 0.8907 │
│ 16 ┆ 0.400571 ┆ 0.346585 ┆ 0.892 │
│ 17 ┆ 0.393336 ┆ 0.340408 ┆ 0.8946 │
└───────┴──────────┴──────────┴─────────┘
Node status
shape: (4, 2)
┌─────────────────────────────────┬───────────┐
│ name ┆ task │
│ --- ┆ --- │
│ str ┆ str │
╞═════════════════════════════════╪═══════════╡
│ 8dd4359ecae049ac8bbbd06752d08d… ┆ normal │
│ 48d3d2e1542f4e4c85693ad824646d… ┆ normal │
│ fc65ed7e460d46438c72f843ba2a30… ┆ scheduler │
│ c0806bf0992745eebf02a60d1e9d89… ┆ normal │
└─────────────────────────────────┴───────────┘
More details
------------
Let's restart step by step the script :
.. code:: python
# collect_results.py
import asyncio
import polars as pl # pip install polars
from manta.cluster import AsyncCluster
from manta.utils import bytes_to_dict
For better performance, instead of using :code:`Cluster`, we use :code:`AsyncCluster` for asynchronous interactions.
:code:`bytes_to_dict` helps us to convert **results** from :code:`bytes` to :code:`dict`. Results will always be received :code:`bytes`.
However, they could be from :code:`torch` or even :code:`numpy`. See the :doc:`casting section <../api/casting>` for more information.
.. code:: python
async def load_swarm(cluster):
swarms = await cluster.list_swarms()
swarm_id = swarms[-1].swarm_id
tasks = await cluster.get_tasks(swarm_id)
tasks = {task["task_id"]: task["alias"] for task in tasks}
node_ids = await cluster.get_node_ids(available=True)
nodes = [{"name": node_id, "task": "normal"} for node_id in node_ids]
data = {"tasks": tasks, "rounds": [], "nodes": nodes}
return data, swarm_id, tasks
We start by getting the list of swarms sorted by datetime (older to newer order).
The selected :code:`swarm_id` is the newest one.
Then we get all tasks from the Swarm definition.
It helps us to make a mapping between task IDs and aliases.
And we request all available node ids.
.. code:: python
async def main():
try:
cluster = AsyncCluster("localhost", 50051)
data, swarm_id, tasks = await load_swarm(cluster)
while True:
# Some updates
# ...
await asyncio.sleep(1)
except ConnectionRefusedError:
print("Manager not accessible")
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
The :code:`main` function is a infinte loop which make updates on values from :code:`load_swarm`.
.. code:: python
results = await cluster.select_results((swarm_id, ["metrics"]))
metrics = results[swarm_id]("metrics") if results else []
dfs = [pl.from_dicts(map(bytes_to_dict, values)) for values in metrics if all(values)]
values = [df.mean().to_dicts()[0] for df in dfs]
We select the tag :code:`"metrics"` of the swarm :code:`swarm_id`. It gives us a dictionary where keys are swarm ids and values are results.
See the :doc:`results section <../guide/interactions/collect_results>` for more information.
Then after selecting the tag :code:`"metrics"` in results, we cast :code:`bytes` to :code:`dict` to manipulate them easier.
Each item of these values is a dictionary :code:`{"loss": ..., "val_loss": ..., "val_acc": ...}` from results of the :doc:`Worker task `.
Lastly, we compute the mean of values by converting them into a :code:`pl.Dataframe`, applying :code:`df.mean()` and extracting the mean value.
.. code:: python
data["tags"] = list(values[0]) if values else []
data["rounds"] = [{"round": i} | value for i, value in enumerate(values)]
Data buffer is updated with values computed above.
.. code:: python
for node_id in (await cluster.get_node_ids(available=True)):
results = await cluster.select_tasks(node_id)
node = next(filter(lambda node: node["name"] == node_id, data["nodes"]))
node["task"] = "normal"
if results:
result = results[0]
node["task"] = (
"normal" if int(result["status"]) == 3 else
tasks.get(result["task_id"])
)
To get node status, we request all available node ids.
:code:`cluster.select_tasks(node_id)` allows us to get the :doc:`node activities<../guide/interactions/collect_results>`.
We select the current node values (ex: :code:`{"name": node_id, "task": "normal"}`) and store its reference into the variable :code:`node`.
We update its status given node activities collected previously.
:code:`results[0]` is the newer deployed task on the node. Given its status and the :code:`task_id`, we can get the current executed task on the node.
.. code:: python
if data["rounds"]:
print("Rounds dataframe")
print(pl.from_dicts(data["rounds"]))
if data["nodes"]:
print("Node status")
print(pl.from_dicts(data["nodes"]))
These lines print the :code:`data` as table by converting them into dataframes.