WorkflowAnalyzer()

The WorkflowAnalyzer class provides tools to analyze and optimize workflows built using the Workflow class. It offers insights into critical paths, parallel execution opportunities, and dependency visualizations, enabling you to fine-tune workflows for efficiency.


Parameters

  • workflow (Workflow object): The workflow to be analyzed. This must be an instance of the Workflow class.
  • size (float): (Optional) The width of the component in a row. Defaults to 1.0 (full row). See the Layout Guide for details.

Methods

analyzer.get_critical_path()

Identifies the critical path in the workflow. The critical path represents the sequence of dependent atoms that determines the total execution time.

Returns:

  • critical_path (list): A list of atom names in the critical path, highlighting the workflow bottleneck.

analyzer.get_parallel_groups()

Identifies groups of atoms that can be computed in parallel, helping you optimize execution.

Returns:

  • parallel_groups (list of lists): Each sublist contains atom names that can be computed simultaneously.

analyzer.visualize(highlight_path=None, title="Workflow Dependency Graph")

Generates a DAG (Directed Acyclic Graph) visualization of the workflow.

Parameters:

  • highlight_path (list, optional): A list of atom names to highlight in the visualization (e.g., the critical path). If not provided, the entire DAG is displayed.
  • title (str, optional): The title of the visualization. Defaults to "Workflow Dependency Graph".

Returns:

  • Plotly Chart: An interactive visualization of the workflow’s dependencies.

Usage Example

from preswald import Workflow, WorkflowAnalyzer
import pandas as pd

# Create a sample workflow
workflow = Workflow()

@workflow.atom()
def load_data():
    return pd.read_csv("data.csv")

@workflow.atom(dependencies=['load_data'])
def clean_data(load_data):
    return load_data.dropna()

@workflow.atom(dependencies=['clean_data'])
def analyze_data(clean_data):
    return clean_data.describe()

# Initialize the WorkflowAnalyzer
analyzer = WorkflowAnalyzer(workflow)

# Execute the workflow
workflow.execute()

# Visualize the workflow
fig = analyzer.visualize(title="Workflow Dependency Graph")
fig.show()  # Opens visualization in a web browser

# Identify and display the critical path
critical_path = analyzer.get_critical_path()
print("Critical path:", ' -> '.join(critical_path))

# Visualize the critical path
fig_critical = analyzer.visualize(
    highlight_path=critical_path,
    title="Workflow Critical Path"
)
fig_critical.show()

# Identify and display parallel execution groups
parallel_groups = analyzer.get_parallel_groups()
print("\nParallel execution groups:")
for i, group in enumerate(parallel_groups, 1):
    print(f"Group {i}: {', '.join(group)}")

Key Features

  1. Critical Path Analysis:

    • Identify workflow bottlenecks.
    • Optimize the sequence of computations.
  2. Parallel Execution Groups:

    • Highlight opportunities for parallel processing.
    • Improve efficiency in workflows with independent atoms.
  3. Interactive DAG Visualization:

    • Explore dependencies visually.
    • Highlight specific paths for focused analysis.

Why Use WorkflowAnalyzer?

  • Optimize Performance: Identify critical paths and parallelizable tasks.
  • Visualize Complex Workflows: Clearly understand dependencies and bottlenecks.
  • Fine-Tune Workflows: Gain actionable insights for optimization.

Elevate your workflow analysis and optimization with WorkflowAnalyzer! 🔬