Workflow()

The Workflow class provides a framework for building stateful workflows with selective recomputation and intelligent caching. This is particularly useful for tasks like data loading, cleaning, and analysis, where intermediate results can be cached to avoid unnecessary re-execution.


Key Features

  1. Stateful Workflow Object: Maintains the state of computations, allowing efficient and reusable workflows.
  2. Caching: Intelligent caching avoids redundant computations for tasks that haven’t changed.
  3. Retry Policy: Optional retry policies for handling failures, with a default policy provided.
  4. Selective Recomputation: Rerun only affected parts of the workflow when changes occur.

Workflow Structure

Atoms

Atoms are individual, reusable units of computation. These are functions decorated with @workflow.atom(). Atoms can:

  • Have dependencies (other atoms they rely on).
  • Be cached to prevent redundant execution.

Workflow Methods

workflow.atom()

A decorator used to define an atom (a node in the workflow).

Parameters:

  • dependencies (list, optional): Names of other atoms this atom depends on.
  • RetryPolicy (optional): Specify a retry policy for this atom. If not provided, the default retry policy is used.

workflow.execute()

Executes all atoms in the workflow, respecting dependencies and caching.

Arguments:

  • recompute_atoms (set, optional): Names of atoms to force recomputation, bypassing the cache.

Returns:

  • results (dict): A dictionary mapping atom names to their results.

Common Use Cases

  • Data Loading and Cleaning: Load and preprocess data in stages, caching results to avoid reloading.
  • Selective Execution: Rerun only the parts of the workflow affected by changes to interactive elements or inputs.
  • Intermediate Results: Inspect cached intermediate results for debugging or analysis.

Example Workflow

from preswald import Workflow
import pandas as pd

# Create a workflow instance
workflow = Workflow()

# Define an atom for loading data
@workflow.atom()
def load_data():
    # This atom has no dependencies
    return pd.read_csv("data.csv")

# Define an atom for cleaning data, dependent on load_data
@workflow.atom(dependencies=['load_data'])
def clean_data(load_data):
    # Automatically passes the output of load_data as an argument
    return load_data.dropna()

# Define an atom for analysis, dependent on clean_data
@workflow.atom(dependencies=['clean_data'])
def analyze_data(clean_data):
    # Uses the cleaned data from the previous atom
    return clean_data.describe()

# Execute the workflow
results = workflow.execute()

# Access the results
print(results)

Output Behavior

  1. Results: A dictionary mapping atom names to their outputs:
    {
        "load_data": <DataFrame>,
        "clean_data": <Cleaned DataFrame>,
        "analyze_data": <Analysis Results>
    }
    
  2. Selective Execution: If the script is rerun without changes, only affected atoms are recomputed.

Why Use Workflow?

  • Efficiency: Avoids redundant computation with caching.
  • Modularity: Break down workflows into reusable, testable atoms.
  • Flexibility: Supports retry policies and selective execution.

Streamline your data workflows with Workflow! 🚀