Collect your results ==================== After deploying your swarm, you can monitor your results directly from your computer. For easier computations, we will use `Polars `_, but you could use Pandas `_ or any library you prefer. Implementation -------------- .. code:: python # collect_results.py import asyncio import polars as pl # pip install polars from manta.cluster import AsyncCluster from manta.utils import bytes_to_dict async def load_swarm(cluster): swarms = await cluster.list_swarms() swarm_id = swarms[-1].swarm_id tasks = await cluster.get_tasks(swarm_id) tasks = {task["task_id"]: task["alias"] for task in tasks} node_ids = await cluster.get_node_ids(available=True) nodes = [{"name": node_id, "task": "normal"} for node_id in node_ids] data = {"tasks": tasks, "rounds": [], "nodes": nodes} return data, swarm_id, tasks async def main(): try: cluster = AsyncCluster("localhost", 50051) data, swarm_id, tasks = await load_swarm(cluster) while True: results = await cluster.select_results((swarm_id, ["metrics"])) metrics = results[swarm_id]("metrics") if results else [] dfs = [pl.from_dicts(map(bytes_to_dict, values)) for values in metrics if all(values)] values = [df.mean().to_dicts()[0] for df in dfs] data["tags"] = list(values[0]) if values else [] data["rounds"] = [{"round": i} | value for i, value in enumerate(values)] for node_id in (await cluster.get_node_ids(available=True)): results = await cluster.select_tasks(node_id) node = next(filter(lambda node: node["name"] == node_id, data["nodes"])) node["task"] = "normal" if results: result = results[0] node["task"] = ( "normal" if int(result["status"]) == 3 else tasks.get(result["task_id"]) ) if data["rounds"]: print("Rounds dataframe") print(pl.from_dicts(data["rounds"])) if data["nodes"]: print("Node status") print(pl.from_dicts(data["nodes"])) await asyncio.sleep(1) except ConnectionRefusedError: print("Manager not accessible") try: asyncio.run(main()) except KeyboardInterrupt: pass Then, you can start your script: .. code:: bash python collect_results.py Output: .. code:: Rounds dataframe shape: (18, 4) ┌───────┬──────────┬──────────┬─────────┐ │ round ┆ loss ┆ val_loss ┆ val_acc │ │ --- ┆ --- ┆ --- ┆ --- │ │ i64 ┆ f64 ┆ f64 ┆ f64 │ ╞═══════╪══════════╪══════════╪═════════╡ │ 0 ┆ 2.191565 ┆ 2.195249 ┆ 0.389 │ │ 1 ┆ 1.797617 ┆ 1.797839 ┆ 0.529 │ │ 2 ┆ 1.066547 ┆ 1.029428 ┆ 0.7437 │ │ 3 ┆ 0.766294 ┆ 0.679413 ┆ 0.7988 │ │ 4 ┆ 0.666064 ┆ 0.56257 ┆ 0.8276 │ │ … ┆ … ┆ … ┆ … │ │ 13 ┆ 0.42975 ┆ 0.365476 ┆ 0.8863 │ │ 14 ┆ 0.418534 ┆ 0.35849 ┆ 0.8883 │ │ 15 ┆ 0.408234 ┆ 0.351929 ┆ 0.8907 │ │ 16 ┆ 0.400571 ┆ 0.346585 ┆ 0.892 │ │ 17 ┆ 0.393336 ┆ 0.340408 ┆ 0.8946 │ └───────┴──────────┴──────────┴─────────┘ Node status shape: (4, 2) ┌─────────────────────────────────┬───────────┐ │ name ┆ task │ │ --- ┆ --- │ │ str ┆ str │ ╞═════════════════════════════════╪═══════════╡ │ 8dd4359ecae049ac8bbbd06752d08d… ┆ normal │ │ 48d3d2e1542f4e4c85693ad824646d… ┆ normal │ │ fc65ed7e460d46438c72f843ba2a30… ┆ scheduler │ │ c0806bf0992745eebf02a60d1e9d89… ┆ normal │ └─────────────────────────────────┴───────────┘ More details ------------ Let's restart step by step the script : .. code:: python # collect_results.py import asyncio import polars as pl # pip install polars from manta.cluster import AsyncCluster from manta.utils import bytes_to_dict For better performance, instead of using :code:`Cluster`, we use :code:`AsyncCluster` for asynchronous interactions. :code:`bytes_to_dict` helps us to convert **results** from :code:`bytes` to :code:`dict`. Results will always be received :code:`bytes`. However, they could be from :code:`torch` or even :code:`numpy`. See the :doc:`casting section <../api/casting>` for more information. .. code:: python async def load_swarm(cluster): swarms = await cluster.list_swarms() swarm_id = swarms[-1].swarm_id tasks = await cluster.get_tasks(swarm_id) tasks = {task["task_id"]: task["alias"] for task in tasks} node_ids = await cluster.get_node_ids(available=True) nodes = [{"name": node_id, "task": "normal"} for node_id in node_ids] data = {"tasks": tasks, "rounds": [], "nodes": nodes} return data, swarm_id, tasks We start by getting the list of swarms sorted by datetime (older to newer order). The selected :code:`swarm_id` is the newest one. Then we get all tasks from the Swarm definition. It helps us to make a mapping between task IDs and aliases. And we request all available node ids. .. code:: python async def main(): try: cluster = AsyncCluster("localhost", 50051) data, swarm_id, tasks = await load_swarm(cluster) while True: # Some updates # ... await asyncio.sleep(1) except ConnectionRefusedError: print("Manager not accessible") try: asyncio.run(main()) except KeyboardInterrupt: pass The :code:`main` function is a infinte loop which make updates on values from :code:`load_swarm`. .. code:: python results = await cluster.select_results((swarm_id, ["metrics"])) metrics = results[swarm_id]("metrics") if results else [] dfs = [pl.from_dicts(map(bytes_to_dict, values)) for values in metrics if all(values)] values = [df.mean().to_dicts()[0] for df in dfs] We select the tag :code:`"metrics"` of the swarm :code:`swarm_id`. It gives us a dictionary where keys are swarm ids and values are results. See the :doc:`results section <../guide/interactions/collect_results>` for more information. Then after selecting the tag :code:`"metrics"` in results, we cast :code:`bytes` to :code:`dict` to manipulate them easier. Each item of these values is a dictionary :code:`{"loss": ..., "val_loss": ..., "val_acc": ...}` from results of the :doc:`Worker task `. Lastly, we compute the mean of values by converting them into a :code:`pl.Dataframe`, applying :code:`df.mean()` and extracting the mean value. .. code:: python data["tags"] = list(values[0]) if values else [] data["rounds"] = [{"round": i} | value for i, value in enumerate(values)] Data buffer is updated with values computed above. .. code:: python for node_id in (await cluster.get_node_ids(available=True)): results = await cluster.select_tasks(node_id) node = next(filter(lambda node: node["name"] == node_id, data["nodes"])) node["task"] = "normal" if results: result = results[0] node["task"] = ( "normal" if int(result["status"]) == 3 else tasks.get(result["task_id"]) ) To get node status, we request all available node ids. :code:`cluster.select_tasks(node_id)` allows us to get the :doc:`node activities<../guide/interactions/collect_results>`. We select the current node values (ex: :code:`{"name": node_id, "task": "normal"}`) and store its reference into the variable :code:`node`. We update its status given node activities collected previously. :code:`results[0]` is the newer deployed task on the node. Given its status and the :code:`task_id`, we can get the current executed task on the node. .. code:: python if data["rounds"]: print("Rounds dataframe") print(pl.from_dicts(data["rounds"])) if data["nodes"]: print("Node status") print(pl.from_dicts(data["nodes"])) These lines print the :code:`data` as table by converting them into dataframes.