Source code for fluxion_ai.workflows.workflow_progress_tracker

"""
fluxion_ai.workflows.workflow_progress_tracker
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
This module provides a class to track the progress of workflow execution. This class is used to monitor the status of each node in the workflow and calculate the overall progress.

Classes:
    - WorkflowProgressTracker: Class to track the progress of workflow execution.
"""

import time
from typing import Dict, List, Any

[docs] class WorkflowProgressTracker: """ Tracks the progress of workflow execution. """ def __init__(self, node_names: List[str]): """ Initialize the progress tracker with nodes. Args: node_names (List[str]): List of all node names in the workflow. """ self.node_status = {node: "Pending" for node in node_names} self.node_times = {node: None for node in node_names} self.start_time = None
[docs] def start_workflow(self): """ Mark the start of the workflow execution. """ self.start_time = time.time()
[docs] def mark_node_running(self, node_name: str): """ Mark a node as running. Args: node_name (str): Name of the node. """ if node_name not in self.node_status: raise ValueError(f"Node '{node_name}' is not part of the workflow.") self.node_status[node_name] = "Running" self.node_times[node_name] = time.time()
[docs] def mark_node_completed(self, node_name: str): """ Mark a node as completed and record its execution time. Args: node_name (str): Name of the node. """ if node_name not in self.node_status: raise ValueError(f"Node '{node_name}' is not part of the workflow.") if self.node_status[node_name] != "Running": raise ValueError(f"Node '{node_name}' was not running.") self.node_status[node_name] = "Completed" self.node_times[node_name] = time.time() - self.node_times[node_name]
[docs] def get_progress(self) -> Dict[str, Any]: """ Get the current progress of the workflow. Returns: Dict[str, Any]: Progress details including node status and overall percentage. """ completed_nodes = sum(1 for status in self.node_status.values() if status == "Completed") total_nodes = len(self.node_status) progress_percentage = (completed_nodes / total_nodes) * 100 if total_nodes > 0 else 0 return { "node_status": self.node_status, "node_times": self.node_times, "progress_percentage": progress_percentage, "elapsed_time": time.time() - self.start_time if self.start_time else None }