Collect your results¶
After deploying your swarm, you can monitor your results directly from your computer. For easier computations, we will use Polars, but you could use Pandas <https://pandas.pydata.org/>`_ or any library you prefer.
Implementation¶
# collect_results.py
import asyncio
import polars as pl # pip install polars
from manta.cluster import AsyncCluster
from manta.utils import bytes_to_dict
async def load_swarm(cluster):
swarms = await cluster.list_swarms()
swarm_id = swarms[-1].swarm_id
tasks = await cluster.get_tasks(swarm_id)
tasks = {task["task_id"]: task["alias"] for task in tasks}
node_ids = await cluster.get_node_ids(available=True)
nodes = [{"name": node_id, "task": "normal"} for node_id in node_ids]
data = {"tasks": tasks, "rounds": [], "nodes": nodes}
return data, swarm_id, tasks
async def main():
try:
cluster = AsyncCluster("localhost", 50051)
data, swarm_id, tasks = await load_swarm(cluster)
while True:
results = await cluster.select_results((swarm_id, ["metrics"]))
metrics = results[swarm_id]("metrics") if results else []
dfs = [pl.from_dicts(map(bytes_to_dict, values)) for values in metrics if all(values)]
values = [df.mean().to_dicts()[0] for df in dfs]
data["tags"] = list(values[0]) if values else []
data["rounds"] = [{"round": i} | value for i, value in enumerate(values)]
for node_id in (await cluster.get_node_ids(available=True)):
results = await cluster.select_tasks(node_id)
node = next(filter(lambda node: node["name"] == node_id, data["nodes"]))
node["task"] = "normal"
if results:
result = results[0]
node["task"] = (
"normal" if int(result["status"]) == 3 else
tasks.get(result["task_id"])
)
if data["rounds"]:
print("Rounds dataframe")
print(pl.from_dicts(data["rounds"]))
if data["nodes"]:
print("Node status")
print(pl.from_dicts(data["nodes"]))
await asyncio.sleep(1)
except ConnectionRefusedError:
print("Manager not accessible")
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
Then, you can start your script:
python collect_results.py
Output:
Rounds dataframe
shape: (18, 4)
┌───────┬──────────┬──────────┬─────────┐
│ round ┆ loss ┆ val_loss ┆ val_acc │
│ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ f64 ┆ f64 ┆ f64 │
╞═══════╪══════════╪══════════╪═════════╡
│ 0 ┆ 2.191565 ┆ 2.195249 ┆ 0.389 │
│ 1 ┆ 1.797617 ┆ 1.797839 ┆ 0.529 │
│ 2 ┆ 1.066547 ┆ 1.029428 ┆ 0.7437 │
│ 3 ┆ 0.766294 ┆ 0.679413 ┆ 0.7988 │
│ 4 ┆ 0.666064 ┆ 0.56257 ┆ 0.8276 │
│ … ┆ … ┆ … ┆ … │
│ 13 ┆ 0.42975 ┆ 0.365476 ┆ 0.8863 │
│ 14 ┆ 0.418534 ┆ 0.35849 ┆ 0.8883 │
│ 15 ┆ 0.408234 ┆ 0.351929 ┆ 0.8907 │
│ 16 ┆ 0.400571 ┆ 0.346585 ┆ 0.892 │
│ 17 ┆ 0.393336 ┆ 0.340408 ┆ 0.8946 │
└───────┴──────────┴──────────┴─────────┘
Node status
shape: (4, 2)
┌─────────────────────────────────┬───────────┐
│ name ┆ task │
│ --- ┆ --- │
│ str ┆ str │
╞═════════════════════════════════╪═══════════╡
│ 8dd4359ecae049ac8bbbd06752d08d… ┆ normal │
│ 48d3d2e1542f4e4c85693ad824646d… ┆ normal │
│ fc65ed7e460d46438c72f843ba2a30… ┆ scheduler │
│ c0806bf0992745eebf02a60d1e9d89… ┆ normal │
└─────────────────────────────────┴───────────┘
More details¶
Let’s restart step by step the script :
# collect_results.py
import asyncio
import polars as pl # pip install polars
from manta.cluster import AsyncCluster
from manta.utils import bytes_to_dict
For better performance, instead of using Cluster
, we use AsyncCluster
for asynchronous interactions.
bytes_to_dict
helps us to convert results from bytes
to dict
. Results will always be received bytes
.
However, they could be from torch
or even numpy
. See the casting section for more information.
async def load_swarm(cluster):
swarms = await cluster.list_swarms()
swarm_id = swarms[-1].swarm_id
tasks = await cluster.get_tasks(swarm_id)
tasks = {task["task_id"]: task["alias"] for task in tasks}
node_ids = await cluster.get_node_ids(available=True)
nodes = [{"name": node_id, "task": "normal"} for node_id in node_ids]
data = {"tasks": tasks, "rounds": [], "nodes": nodes}
return data, swarm_id, tasks
We start by getting the list of swarms sorted by datetime (older to newer order).
The selected swarm_id
is the newest one.
Then we get all tasks from the Swarm definition.
It helps us to make a mapping between task IDs and aliases.
And we request all available node ids.
async def main():
try:
cluster = AsyncCluster("localhost", 50051)
data, swarm_id, tasks = await load_swarm(cluster)
while True:
# Some updates
# ...
await asyncio.sleep(1)
except ConnectionRefusedError:
print("Manager not accessible")
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
The main
function is a infinte loop which make updates on values from load_swarm
.
results = await cluster.select_results((swarm_id, ["metrics"]))
metrics = results[swarm_id]("metrics") if results else []
dfs = [pl.from_dicts(map(bytes_to_dict, values)) for values in metrics if all(values)]
values = [df.mean().to_dicts()[0] for df in dfs]
We select the tag "metrics"
of the swarm swarm_id
. It gives us a dictionary where keys are swarm ids and values are results.
See the results section for more information.
Then after selecting the tag "metrics"
in results, we cast bytes
to dict
to manipulate them easier.
Each item of these values is a dictionary {"loss": ..., "val_loss": ..., "val_acc": ...}
from results of the Worker task.
Lastly, we compute the mean of values by converting them into a pl.Dataframe
, applying df.mean()
and extracting the mean value.
data["tags"] = list(values[0]) if values else []
data["rounds"] = [{"round": i} | value for i, value in enumerate(values)]
Data buffer is updated with values computed above.
for node_id in (await cluster.get_node_ids(available=True)):
results = await cluster.select_tasks(node_id)
node = next(filter(lambda node: node["name"] == node_id, data["nodes"]))
node["task"] = "normal"
if results:
result = results[0]
node["task"] = (
"normal" if int(result["status"]) == 3 else
tasks.get(result["task_id"])
)
To get node status, we request all available node ids.
cluster.select_tasks(node_id)
allows us to get the node activities.
We select the current node values (ex: {"name": node_id, "task": "normal"}
) and store its reference into the variable node
.
We update its status given node activities collected previously.
results[0]
is the newer deployed task on the node. Given its status and the task_id
, we can get the current executed task on the node.
if data["rounds"]:
print("Rounds dataframe")
print(pl.from_dicts(data["rounds"]))
if data["nodes"]:
print("Node status")
print(pl.from_dicts(data["nodes"]))
These lines print the data
as table by converting them into dataframes.