class Workflow

The Workflow class provides a framework for building stateful workflows with selective recomputation and intelligent caching. This is particularly useful for tasks like data loading, cleaning, and analysis, where intermediate results can be cached to avoid unnecessary re-execution.


Key Features

  1. Stateful Workflow Object: Maintains the state of computations, allowing efficient and reusable workflows.
  2. Caching: Intelligent caching avoids redundant computations for tasks that haven’t changed.
  3. Retry Policy: Optional retry policies for handling failures, with a default policy provided.
  4. Selective Recomputation: Rerun only affected parts of the workflow when changes occur.

Workflow Structure

Atoms

Atoms are individual, reusable units of computation. These are functions decorated with @workflow.atom(). Atoms can:

  • Have dependencies (other atoms they rely on).
  • Be cached to prevent redundant execution.

Functions

def atom(
        self,
        dependencies: Optional[List[str]] = None,
        retry_policy: Optional[RetryPolicy] = None,
        force_recompute: bool = False,
    ):

A decorator used to define an atom (a node in the workflow).

Parameters:

  • dependencies (list, optional): Names of other atoms this atom depends on.
  • RetryPolicy (optional): Specify a retry policy for this atom. If not provided, the default retry policy is used.
  • force_recompute (bool, optional): Whether to force computation of this atom even if it is unchanged.

def execute(
        self, recompute_atoms: Optional[Set[str]] = None
    ) -> Dict[str, AtomResult]:

Executes all atoms in the workflow, respecting dependencies and caching.

Parameters:

  • recompute_atoms (set, optional): Names of atoms to force recomputation, bypassing the cache.

Returns:

  • results (dict): A dictionary mapping atom names to their results.

Common Use Cases

  • Data Loading and Cleaning: Load and preprocess data in stages, caching results to avoid reloading.
  • Selective Execution: Rerun only the parts of the workflow affected by changes to interactive elements or inputs.
  • Intermediate Results: Inspect cached intermediate results for debugging or analysis.

Example Workflow

from preswald import Workflow
import pandas as pd

# Create a workflow instance
workflow = Workflow()

# Define an atom for loading data
@workflow.atom()
def load_data():
    # This atom has no dependencies
    return pd.read_csv("data.csv")

# Define an atom for cleaning data, dependent on load_data
@workflow.atom(dependencies=['load_data'])
def clean_data(load_data):
    # Automatically passes the output of load_data as an argument
    return load_data.dropna()

# Define an atom for analysis, dependent on clean_data
@workflow.atom(dependencies=['clean_data'])
def analyze_data(clean_data):
    # Uses the cleaned data from the previous atom
    return clean_data.describe()

# Execute the workflow
results = workflow.execute()

# Access the results
print(results)

Output Behavior

  1. Results: A dictionary mapping atom names to their outputs:
    {
        "load_data": <DataFrame>,
        "clean_data": <Cleaned DataFrame>,
        "analyze_data": <Analysis Results>
    }
    
  2. Selective Execution: If the script is rerun without changes, only affected atoms are recomputed.

Reactive Runtime: Dependency-Aware Selective Execution

Preswald’s Workflow engine uses a reactive runtime that selectively reruns only the atoms affected by state changes — skipping any computation whose inputs haven’t changed. This improves performance and ensures fast, responsive apps.

The dependency graph that powers this system can be built in two ways:


1. Manual dependency declaration

You can explicitly declare dependencies using the dependencies argument:

@workflow.atom()
def task_a():
    return text("A")

@workflow.atom(dependencies=['task_a'])
def task_b(task_a):
    return text("B")

workflow.execute()

In this example:

  • task_b depends on task_a even though it doesn’t use its output.
  • This ensures that task_a always runs before task_b.

Manual dependencies are useful when:

  • You need explicit control of execution order
  • Atoms produce side effects or state changes rather than return values
  • Dependencies cannot be inferred from function parameters

2. Automatic dependency inference

If an atom accepts another atom’s name as a parameter, the dependency is inferred automatically:

@workflow.atom()
def select_value():
    return slider("Pick a number", min_val=0, max_val=10, default=1)

@workflow.atom()
def show_value(select_value):
    return text(f"You picked: {select_value}")

workflow.execute()

In this example, show_value automatically depends on select_value.


No matter how you define dependencies, the reactive runtime ensures that only atoms whose inputs have changed and their downstream dependents are recomputed.


3. Automatic Atom Lifting of Top-Level Components

Preswald automatically wraps top-level UI component calls (e.g., slider(), text(), etc.) in workflow atoms, even if you don’t explicitly use @workflow.atom().

This allows you to write concise, interactive apps without boilerplate:

from preswald import (
    get_df,
    text,
    slider,
    table,
)

df = get_df("iris_csv")
val = slider("Pick a number", min_val=0, max_val=10, default=1)
table(df, limit=val)

These calls are automatically transformed into atoms during script execution. This allows Preswald to:

  • Assign stable identifiers and track component state
  • Include them in the dependency graph (DAG)
  • Selectively rerun only the affected parts of the script when state changes

This automatic lifting is how Preswald maintains reactivity even in simple scripts without manual atom definitions.


If you run the app with debug logging enabled, you’ll see the transformed source in your logs:

2025-05-05 05:50:56,327 - preswald.engine.transformers.reactive_runtime - INFO - Transformed source code:
from preswald import get_df, text, slider, table
from preswald import get_workflow

@workflow.atom(name='_auto_atom_b4f77a63')
def _auto_atom_b4f77a63():
    return slider('Pick a number', min_val=0, max_val=10, default=1, component_id='slider-b4f77a63')

@workflow.atom(name='_auto_atom_d93912c6', dependencies=['_auto_atom_b4f77a63'])
def _auto_atom_d93912c6(param0):
    return table(df, limit=param0, component_id='table-d93912c6')

workflow = get_workflow()
df = get_df('iris_csv')
workflow.execute()

This automatic lifting enables full reactivity while keeping your scripts clean and minimal.


Why Use Workflow?

  • Efficiency: Avoids redundant computation with caching.
  • Modularity: Break down workflows into reusable, testable atoms.
  • Flexibility: Supports retry policies and selective execution.

Streamline your data workflows with Workflow! 🚀